特定の時間帯のみ AWS Step Functions のタスクを実行するための Python コードを書いてみた
はじめに
こんにちは、筧( @TakaakiKakei )です。
AWS Step Functions を使いながら、特定の時間帯のみでタスクを実行したいことはありませんか? 今回は AWS Lambda と組み合わせながら上記を実現する方法をご紹介します。
前提
紹介するコードでは、AWS Lambda で実行タイミングの算出をし、 ステートマシンの Wait でその時間まで待機した後、 目的の処理を実行します。 言語は Python です。 実行タイミングの算出ルールは以下です。
- 現在時刻が、X 時 00 分~X 時 09 分であれば、X 時 10 分に目的の処理を実行する
- 現在時刻が、X 時 31 分~X 時 59 分であれば、X+1 時 10 分に目的の処理を実行する
- 現在時刻が、X 時 10 分~X 時 30 分であれば、即座にに目的の処理を実行する
ステートマシンの Wait タスクの TimestampPath が過去の時刻場合は即実行されることを利用します。 こちらは公式ドキュメントに記載は見受けられませんしたが、個人環境では検証済みです。
コード
下記は AWS Step Functions のステートマシンを定義したコードです。
- CheckTime:時間を算出するステップ。後述の AWS Lambda のコードに紐づきます。
- WaitUntil:算出された時間まで待機するステップ。
- Work:目的の処理を実行するステップ。内容は任意です。
- SuccessState:正常終了時のステップ
- FailState:例外発生時のステップ
name: Test-${self:provider.stage} definition: StartAt: CheckTime States: CheckTime: Type: Task Parameters: input.$: $ execution.$: $$ Resource: Fn::GetAtt: [check_time, Arn] Catch: - ErrorEquals: - States.ALL ResultPath: $.error_info Next: FailState Next: WaitUntil WaitUntil: Type: Wait TimestampPath: $.execute_time Next: Work Work: Type: Task Parameters: input.$: $ execution.$: $$ Resource: Fn::GetAtt: [work, Arn] Catch: - ErrorEquals: - States.ALL ResultPath: $.error_info Next: FailState Next: SuccessState SuccessState: Type: Succeed FailState: Type: Fail
下記は実行時刻を算出する AWS Lambda のコードです。 handler 関数がエントリポイントです。 aws_lambda_powertools は必須ではないので割愛いただいて結構です。
import copy from datetime import datetime, timedelta from typing import Dict from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.typing import LambdaContext logger = Logger() tracer = Tracer() def calc_execute_datetime(now: datetime) -> datetime: """ 現在時刻が、X時00分~X時09分であれば、X時10分を返す 現在時刻が、X時31分~X時59分であれば、X+1時10分を返す 現在時刻が、X時10分~X時30分であれば、現在時刻を返す """ if now.minute < 10: # X:00〜X:09 の とき return now.replace(minute=10, second=0) elif 30 < now.minute: # X:31〜X:59 の とき return (now + timedelta(hours=1)).replace(minute=10, second=0) # X:10〜X:30 のとき return now @tracer.capture_lambda_handler @logger.inject_lambda_context(log_event=True) def handler(event: Dict, context: LambdaContext) -> Dict: """ check_time のエントリーポイント - X時31分~X+1時09分は、X+1時10分まで後続のタスク実行をストップすること - ステートマシンの Wait タスクの TimestampPath は過去の時刻場合は即実行される(検証済) - execute_time のタイムゾーンは UTC で、ISO 8601 の RFC3339 プロファイルに従う - refs: https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/amazon-states-language-wait-state.html event: { "input": { ..snip.. }, "execution": { "Execution":{ ..snip.. }, "StateMachine":{ ..snip.. }, "State":{ ..snip.. } } } return: { ..snip.. "execute_time": "2023-1-1T11:10:00Z", } """ _input: Dict = copy.deepcopy(event["input"]) now = datetime.utcnow() _input["execute_time"] = calc_execute_datetime(now).strftime("%Y-%m-%dT%H:%M:%SZ") return _input
下記は実行時刻を算出する AWS Lambda のテストコードです。
import freezegun from src.handlers.check_time import datetime, handler FIXTURE_EVENT = { "input": {} } BASE_PACKAGE_FOR_PATCH = "src.handlers.check_time" class TestCheckTimeHandler: @freezegun.freeze_time(datetime(2023, 1, 1, 10, 5, 40, 0)) def test_handler_from_0_to_9(self, lambda_powertools_context): """ 現在時刻が、X時00分~X時09分であれば、X時10分を返す分岐のテスト """ # setup expected_return = { **FIXTURE_EVENT["input"], "execute_time": "2023-01-01T10:10:00Z", } # exercise actual = handler(FIXTURE_EVENT, lambda_powertools_context) # verify assert actual == expected_return @freezegun.freeze_time(datetime(2023, 1, 1, 10, 15, 40, 0)) def test_handler_from_10_to_30(self, lambda_powertools_context): """ 現在時刻が、X時10分~X時30分であれば、現在時刻を返す分岐のテスト """ # setup expected_return = { **FIXTURE_EVENT["input"], "execute_time": "2023-01-01T10:15:40Z", } # exercise actual = handler(FIXTURE_EVENT, lambda_powertools_context) # verify assert actual == expected_return @freezegun.freeze_time(datetime(2023, 1, 1, 10, 40, 40, 0)) def test_handler_from_31_to_59(self, lambda_powertools_context): """ 現在時刻が、X時31分~X時59分であれば、X+1時10分を返す分岐のテスト """ # setup expected_return = { **FIXTURE_EVENT["input"], "execute_time": "2023-01-01T11:10:00Z", } # exercise actual = handler(FIXTURE_EVENT, lambda_powertools_context) # verify assert actual == expected_return
おわりに
最後まで読んでいただきありがとうございます。
Wait タスクの TimestampPath が過去の時刻場合は即実行されるのが、今回の記事の味噌でした。 この記事が皆さんの役に立てば幸いです。
それではまた!