[AWS Step Functions] タスクの開始時刻を制御してみた

Lambda関数とAWS Step Functions の Wait を使ってタスクの開始時刻の制御をしてみました。
2021.12.22

各タスクの開始時刻を制御したい時もある

こんにちは、のんピ(@non____97)です。

皆さんはジョブ管理システムから抜け出したいと思ったことはありますか? 私は常に思っています。

ジョブネットの途中で指定時刻まで開始を待ってほしいジョブってありますよね? 絶対あります。ステートマシン(ジョブ管理システムで言うところのジョブネット)の開始時刻の制御は以下記事で紹介した通り、EventBridgeとCloudWatchaアラームを活用して実装しました。

一方でタスク(ジョブ管理システムで言うところのジョブ)の開始時刻の制御はまだ実装したことがなかったので、今回チャレンジしてみます。

いきなりまとめ

  • ステートマシンからLambda関数を呼び出し、指定開始時刻までの待ち時間を求めて、Waitで待機する

検証の構成

AWS Step Functions単体の機能でも、Waitで相対時間もしくは絶対時間で待機することができます。

しかし、Waitで指定できる値は秒数もしくはタイムスタンプです。そのため、「毎日20:00まで処理を待つ」といった条件を指定することはできません。

指定時刻と現時刻との差(秒数)を求めるLambda関数を用意し、得られた秒数の間待つことで対応します。

処理の流れ

今回は上述のワークフローをAWS Step Functionsを使って表現し、指定時刻まで後続処理の実行を待機することを確認します。

指定時刻までの秒数を計算するLambda関数の説明

指定時刻までの秒数を計算するLambda関数では、引数で渡された時刻までの秒数を計算します。

たまたま時間処理系のLambda関数を以前の記事で作成していたので、そちらのLambda関数をベースに作成しました。

実際のコードは以下の通りです。

./src/lambda/functions/calculate-waiting-time-for-target-time.ts

interface Event {
  targetLocalTime: string;
}

export const handler = async (event: Event): Promise<number | null> => {
  if (
    !event.targetLocalTime ||
    !event.targetLocalTime.match(/^([01][0-9]|2[0-3]):[0-5][0-9]$/)
  ) {
    console.error(`
      The argument for the target time (targetLocalTime) has not been entered correctly.
      e.g. 23:30`);
    return null;
  }

  if (
    !process.env["UTC_OFFSET"] ||
    isNaN(Number(process.env["UTC_OFFSET"])) ||
    Number(process.env["UTC_OFFSET"]) > 14 ||
    Number(process.env["UTC_OFFSET"]) < -12
  ) {
    console.error(`
      The environment variable for UTC offset (UTC_OFFSET) has not been entered correctly.
      e.g. For Asia/Tokyo, "9". For America/Los_Angeles, "-8".`);
    return null;
  }

  if (
    !process.env["BASE_LOCAL_TIME"] ||
    !process.env["BASE_LOCAL_TIME"].match(/^([01][0-9]|2[0-3]):[0-5][0-9]$/)
  ) {
    console.error(`
      The environment variable for base time (BASE_LOCAL_TIME) has not been entered correctly.
      e.g. 07:30`);
    return null;
  }

  const targetLocalTime: string[] = event.targetLocalTime.split(":");
  const utcOffset: number = Number(process.env["UTC_OFFSET"]);
  const baseLocalTime: string[] = process.env["BASE_LOCAL_TIME"].split(":");

  // Time difference from UTC (millisecond)
  const utcOffsetMillisecond =
    (new Date().getTimezoneOffset() + utcOffset * 60) * 60 * 1000;

  // Set the current local date.
  const currentLocalDate = new Date(Date.now() + utcOffsetMillisecond);

  console.log(`currentLocalDate : ${currentLocalDate}`);

  // Convert the base date to a local date.
  const tempBaseLocalDateMillisecond = new Date(
    currentLocalDate.getFullYear(),
    currentLocalDate.getMonth(),
    currentLocalDate.getDate(),
    Number(baseLocalTime[0]),
    Number(baseLocalTime[1])
  ).getTime();

  // Calculating the base date for a date change
  const baseLocalDate =
    currentLocalDate.getTime() < tempBaseLocalDateMillisecond
      ? new Date(tempBaseLocalDateMillisecond - 24 * 60 * 60 * 1000)
      : new Date(tempBaseLocalDateMillisecond);

  console.log(`baseLocalDate : ${baseLocalDate}`);

  // Convert the target date to a local date.
  const tempTargetLocalDateMillisecond = new Date(
    baseLocalDate.getFullYear(),
    baseLocalDate.getMonth(),
    baseLocalDate.getDate(),
    Number(targetLocalTime[0]),
    Number(targetLocalTime[1])
  ).getTime();

  // Calculating the target date
  const targetLocalDate =
    baseLocalDate.getTime() > tempTargetLocalDateMillisecond
      ? new Date(tempTargetLocalDateMillisecond + 24 * 60 * 60 * 1000)
      : new Date(tempTargetLocalDateMillisecond);

  console.log(`targetLocalDate : ${targetLocalDate}`);

  // Return the difference between the current date and the target date.
  const secounds = Math.round(
    (targetLocalDate.getTime() - currentLocalDate.getTime()) / 1000
  );

  console.log(`secounds : ${secounds}`);

  return secounds;
};

やってみた

各種リソースのデプロイ

Lambda関数やステートマシンなど環境一式をAWS CDKでデプロイします。なお、AWS CDK関連のコードは長くなったので、以下に折りたたみます。

AWS CDK関連の情報

AWS CDKのディレクトリ構成

> tree
.
├── .gitignore
├── .npmignore
├── README.md
├── bin
│   └── app.ts
├── cdk.json
├── jest.config.js
├── lib
│   ├── lambda-stack.ts
│   └── state-machine-stack.ts
├── package-lock.json
├── package.json
├── src
│   └── lambda
│       └── functions
│           └── calculate-waiting-time-for-target-time.ts
├── test
│   └── app.test.ts
└── tsconfig.json

6 directories, 13 files

package.json

{
  "name": "app",
  "version": "0.1.0",
  "bin": {
    "app": "bin/app.js"
  },
  "scripts": {
    "build": "tsc",
    "watch": "tsc -w",
    "test": "jest",
    "cdk": "cdk"
  },
  "devDependencies": {
    "@types/jest": "^26.0.10",
    "@types/node": "10.17.27",
    "jest": "^26.4.2",
    "ts-jest": "^26.2.0",
    "aws-cdk": "2.2.0",
    "ts-node": "^9.0.0",
    "typescript": "~3.9.7"
  },
  "dependencies": {
    "aws-cdk-lib": "2.2.0",
    "constructs": "^10.0.0",
    "source-map-support": "^0.5.16"
  }
}

./bin/app.ts

#!/usr/bin/env node
import "source-map-support/register";
import * as cdk from "aws-cdk-lib";
import { LambdaStack } from "../lib/lambda-stack";
import { StateMachineStack } from "../lib/state-machine-stack";

const app = new cdk.App();
const lambdaStack = new LambdaStack(app, "LambdaStack");
new StateMachineStack(app, "StateMachineStack", {
  calculateWaitingTimeForTargetTimeFunction:
    lambdaStack.calculateWaitingTimeForTargetTimeFunction,
});

./lib/lambda-stack.ts

import { Stack, StackProps } from "aws-cdk-lib";
import { Construct } from "constructs";
import * as iam from "aws-cdk-lib/aws-iam";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as nodejs from "aws-cdk-lib/aws-lambda-nodejs";

export class LambdaStack extends Stack {
  public readonly calculateWaitingTimeForTargetTimeFunction: nodejs.NodejsFunction;

  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    // Create an IAM role for Lambda functions.
    const lambdaIamRole = new iam.Role(this, "LambdaIamRole", {
      assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
      managedPolicies: [
        iam.ManagedPolicy.fromAwsManagedPolicyName(
          "service-role/AWSLambdaBasicExecutionRole"
        ),
      ],
    });

    // Lambda function
    this.calculateWaitingTimeForTargetTimeFunction = new nodejs.NodejsFunction(
      this,
      "CalculateWaitingTimeForTargetTimeFunction",
      {
        entry: "src/lambda/functions/calculate-waiting-time-for-target-time.ts",
        runtime: lambda.Runtime.NODEJS_14_X,
        bundling: {
          minify: true,
        },
        environment: {
          UTC_OFFSET: "9",
          BASE_LOCAL_TIME: "07:30",
        },
        role: lambdaIamRole,
      }
    );
  }
}

./lib/state-machine-stack.ts

import { Stack, StackProps, ScopedAws, Fn } from "aws-cdk-lib";
import { Construct } from "constructs";
import * as logs from "aws-cdk-lib/aws-logs";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as tasks from "aws-cdk-lib/aws-stepfunctions-tasks";
import * as nodejs from "aws-cdk-lib/aws-lambda-nodejs";

interface StateMachineStackProps extends StackProps {
  calculateWaitingTimeForTargetTimeFunction: nodejs.NodejsFunction;
}

export class StateMachineStack extends Stack {
  public readonly controlStateMachineToRunOnlyOnceADayFunction: nodejs.NodejsFunction;

  constructor(scope: Construct, id: string, props: StateMachineStackProps) {
    super(scope, id, props);

    // Get the string after the stack name in the stack id to append to the end of the Log Group name to make it unique.
    const stackId = new ScopedAws(this).stackId;
    const stackIdAfterStackName = Fn.select(2, Fn.split("/", stackId));

    // Create CloudWatch Logs for Step Functions
    const testStateMachineLogGroup = new logs.LogGroup(
      this,
      "TestStateMachineLogGroup",
      {
        logGroupName: `/aws/vendedlogs/states/testStateMachineLogGroup-${stackIdAfterStackName}`,
        retention: logs.RetentionDays.ONE_WEEK,
      }
    );

    const succeed = new sfn.Succeed(this, "SucceedState");

    new sfn.StateMachine(this, "TestStateMachine", {
      definition: new tasks.LambdaInvoke(
        this,
        "CalculateWaitingTimeForTargetTimeState",
        {
          resultPath: "$.Payload",
          lambdaFunction: props.calculateWaitingTimeForTargetTimeFunction,
          payload: sfn.TaskInput.fromObject({
            "targetLocalTime.$": "$$.Execution.Input.targetLocalTime",
          }),
        }
      ).next(
        new sfn.Choice(this, "ChoiceState")
          .otherwise(new sfn.Fail(this, "FailState"))
          .when(
            sfn.Condition.numberGreaterThanEquals("$.Payload.Payload", 0),
            new sfn.Wait(this, "Wait For Trigger Time", {
              time: sfn.WaitTime.secondsPath("$.Payload.Payload"),
            }).next(succeed)
          )
          .when(sfn.Condition.numberLessThan("$.Payload.Payload", 0), succeed)
      ),
      logs: {
        destination: testStateMachineLogGroup,
        level: sfn.LogLevel.ALL,
      },
    });
  }
}

npx cdk deploy --allで、AWS CDKで定義したリソースをデプロイすると、Lambda関数やステートマシンなど各種リソースが作成されていることが確認できます。

Lambda関数 指定時刻までの秒数を計算するLambda関数

ステートマシン ステートマシン

ステートマシンのワークフロー ステートマシンのワークフロー

現時刻 < 指定時刻の場合

まず、「現時刻 < 指定時刻」の場合を試してみます。

現在の時刻は14:08です。

ステートマシンを選択して、実行の開始をクリックします。実行の開始をクリックすると、名前やJSONの入力を求められます。「現時刻 < 指定時刻」の場合を検証したいため{"targetLocalTime": "14:10"}と入力します。名前は変更せず実行の開始をクリックします。

実行の開始

すると、意図した通り、後続処理であるSucceedSateには遷移せずWait For Trigger Timeで待機しているようです。なお、今回の待ち時間は$.Payload.Payloadにある通り114秒です。

後続処理の待機

2分ほど待ち、指定時刻である14:10になると、後続処理であるSucceedSateに遷移することを確認できました。経過時刻(ms)の値からもWait For Trigger Timeで114秒待機していたことが分かります。

待機後の後続処理の実行

現時刻 > 指定時刻の場合

次に、「現時刻 > 指定時刻」の場合を試してみます。

現在の14:19です。

ステートマシンを選択して、実行の開始をクリックします。実行の開始をクリックすると、「現時刻 > 指定時刻」の場合を検証したいため{"targetLocalTime": "14:10"}と入力します。名前は変更せず実行の開始をクリックします。

すると今回はWait For Trigger Timeで待機せずに、すぐに後続処理であるSucceedSateに遷移しました。「現時刻 > 指定時刻」であるため、$.Payload.Payload-551となっています。

即座に後続処理を実行

今回もジョブ管理システムから抜け出すためのTipsを作りました

Lambda関数とAWS Step FunctionsのWaitを使ってタスクの開始時刻の制御をしてみました。

固定の日時まで待機するように動作させるのであれば、AWS Step Functions単体の機能でも対応できますが、「毎日20:00に実行」や時差を考慮した計算はLambda関数を使う必要があります。

この記事が誰かの助けになれば幸いです。

以上、AWS事業本部 コンサルティング部の のんピ(@non____97)でした!