特定の時間帯のみ AWS Step Functions のタスクを実行するための Python コードを書いてみた

特定時間帯(毎時10分~30分)のみ、AWS Step Functions のタスクを実行する方法をご紹介します。
2023.01.17

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

こんにちは、筧( @TakaakiKakei )です。

AWS Step Functions を使いながら、特定の時間帯のみでタスクを実行したいことはありませんか? 今回は AWS Lambda と組み合わせながら上記を実現する方法をご紹介します。

前提

紹介するコードでは、AWS Lambda で実行タイミングの算出をし、 ステートマシンの Wait でその時間まで待機した後、 目的の処理を実行します。 言語は Python です。 実行タイミングの算出ルールは以下です。

  • 現在時刻が、X 時 00 分~X 時 09 分であれば、X 時 10 分に目的の処理を実行する
  • 現在時刻が、X 時 31 分~X 時 59 分であれば、X+1 時 10 分に目的の処理を実行する
  • 現在時刻が、X 時 10 分~X 時 30 分であれば、即座にに目的の処理を実行する

ステートマシンの Wait タスクの TimestampPath が過去の時刻場合は即実行されることを利用します。 こちらは公式ドキュメントに記載は見受けられませんしたが、個人環境では検証済みです。

コード

下記は AWS Step Functions のステートマシンを定義したコードです。

  • CheckTime:時間を算出するステップ。後述の AWS Lambda のコードに紐づきます。
  • WaitUntil:算出された時間まで待機するステップ。
  • Work:目的の処理を実行するステップ。内容は任意です。
  • SuccessState:正常終了時のステップ
  • FailState:例外発生時のステップ

statemachine.yml

name: Test-${self:provider.stage}
definition:
  StartAt: CheckTime
  States:
    CheckTime:
      Type: Task
      Parameters:
        input.$: $
        execution.$: $$
      Resource:
        Fn::GetAtt: [check_time, Arn]
      Catch:
        - ErrorEquals:
            - States.ALL
          ResultPath: $.error_info
          Next: FailState
      Next: WaitUntil
    WaitUntil:
      Type: Wait
      TimestampPath: $.execute_time
      Next: Work
    Work:
      Type: Task
      Parameters:
        input.$: $
        execution.$: $$
      Resource:
        Fn::GetAtt: [work, Arn]
      Catch:
        - ErrorEquals:
            - States.ALL
          ResultPath: $.error_info
          Next: FailState
      Next: SuccessState
    SuccessState:
      Type: Succeed
    FailState:
      Type: Fail

下記は実行時刻を算出する AWS Lambda のコードです。 handler 関数がエントリポイントです。 aws_lambda_powertools は必須ではないので割愛いただいて結構です。

src/handlers/check_time.py

import copy
from datetime import datetime, timedelta
from typing import Dict

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = Logger()
tracer = Tracer()


def calc_execute_datetime(now: datetime) -> datetime:
    """
    現在時刻が、X時00分~X時09分であれば、X時10分を返す
    現在時刻が、X時31分~X時59分であれば、X+1時10分を返す
    現在時刻が、X時10分~X時30分であれば、現在時刻を返す
    """
    if now.minute < 10:
        # X:00〜X:09 の とき
        return now.replace(minute=10, second=0)
    elif 30 < now.minute:
        # X:31〜X:59 の とき
        return (now + timedelta(hours=1)).replace(minute=10, second=0)

    # X:10〜X:30 のとき
    return now


@tracer.capture_lambda_handler
@logger.inject_lambda_context(log_event=True)
def handler(event: Dict, context: LambdaContext) -> Dict:
    """
    check_time のエントリーポイント
    - X時31分~X+1時09分は、X+1時10分まで後続のタスク実行をストップすること
    - ステートマシンの Wait タスクの TimestampPath は過去の時刻場合は即実行される(検証済)
    - execute_time のタイムゾーンは UTC で、ISO 8601 の RFC3339 プロファイルに従う
        - refs: https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/amazon-states-language-wait-state.html

    event:
    {
        "input": {
            ..snip..
        },
        "execution": {
            "Execution":{
                ..snip..
            },
            "StateMachine":{
                ..snip..
            },
            "State":{
                ..snip..
            }
        }
    }

    return:
    {
        ..snip..
        "execute_time": "2023-1-1T11:10:00Z",
    }
    """
    _input: Dict = copy.deepcopy(event["input"])
    now = datetime.utcnow()
    _input["execute_time"] = calc_execute_datetime(now).strftime("%Y-%m-%dT%H:%M:%SZ")
    return _input

下記は実行時刻を算出する AWS Lambda のテストコードです。

tests/unit/handlers/test_check_time.py

import freezegun

from src.handlers.check_time import datetime, handler

FIXTURE_EVENT = {
    "input": {}
}

BASE_PACKAGE_FOR_PATCH = "src.handlers.check_time"


class TestCheckTimeHandler:
    @freezegun.freeze_time(datetime(2023, 1, 1, 10, 5, 40, 0))
    def test_handler_from_0_to_9(self, lambda_powertools_context):
        """
        現在時刻が、X時00分~X時09分であれば、X時10分を返す分岐のテスト
        """
        # setup
        expected_return = {
            **FIXTURE_EVENT["input"],
            "execute_time": "2023-01-01T10:10:00Z",
        }

        # exercise
        actual = handler(FIXTURE_EVENT, lambda_powertools_context)

        # verify
        assert actual == expected_return

    @freezegun.freeze_time(datetime(2023, 1, 1, 10, 15, 40, 0))
    def test_handler_from_10_to_30(self, lambda_powertools_context):
        """
        現在時刻が、X時10分~X時30分であれば、現在時刻を返す分岐のテスト
        """
        # setup
        expected_return = {
            **FIXTURE_EVENT["input"],
            "execute_time": "2023-01-01T10:15:40Z",
        }

        # exercise
        actual = handler(FIXTURE_EVENT, lambda_powertools_context)

        # verify
        assert actual == expected_return

    @freezegun.freeze_time(datetime(2023, 1, 1, 10, 40, 40, 0))
    def test_handler_from_31_to_59(self, lambda_powertools_context):
        """
        現在時刻が、X時31分~X時59分であれば、X+1時10分を返す分岐のテスト
        """
        # setup
        expected_return = {
            **FIXTURE_EVENT["input"],
            "execute_time": "2023-01-01T11:10:00Z",
        }

        # exercise
        actual = handler(FIXTURE_EVENT, lambda_powertools_context)

        # verify
        assert actual == expected_return

おわりに

最後まで読んでいただきありがとうございます。

Wait タスクの TimestampPath が過去の時刻場合は即実行されるのが、今回の記事の味噌でした。 この記事が皆さんの役に立てば幸いです。

それではまた!

参考