[AWS Step Functions] タスクの開始時刻を制御してみた
各タスクの開始時刻を制御したい時もある
こんにちは、のんピ(@non____97)です。
皆さんはジョブ管理システムから抜け出したいと思ったことはありますか? 私は常に思っています。
ジョブネットの途中で指定時刻まで開始を待ってほしいジョブってありますよね? 絶対あります。ステートマシン(ジョブ管理システムで言うところのジョブネット)の開始時刻の制御は以下記事で紹介した通り、EventBridgeとCloudWatchaアラームを活用して実装しました。
一方でタスク(ジョブ管理システムで言うところのジョブ)の開始時刻の制御はまだ実装したことがなかったので、今回チャレンジしてみます。
いきなりまとめ
- ステートマシンからLambda関数を呼び出し、指定開始時刻までの待ち時間を求めて、Waitで待機する
検証の構成
AWS Step Functions単体の機能でも、Wait
で相対時間もしくは絶対時間で待機することができます。
しかし、Wait
で指定できる値は秒数もしくはタイムスタンプです。そのため、「毎日20:00まで処理を待つ」といった条件を指定することはできません。
指定時刻と現時刻との差(秒数)を求めるLambda関数を用意し、得られた秒数の間待つことで対応します。
今回は上述のワークフローをAWS Step Functionsを使って表現し、指定時刻まで後続処理の実行を待機することを確認します。
指定時刻までの秒数を計算するLambda関数の説明
指定時刻までの秒数を計算するLambda関数では、引数で渡された時刻までの秒数を計算します。
たまたま時間処理系のLambda関数を以前の記事で作成していたので、そちらのLambda関数をベースに作成しました。
実際のコードは以下の通りです。
interface Event { targetLocalTime: string; } export const handler = async (event: Event): Promise<number | null> => { if ( !event.targetLocalTime || !event.targetLocalTime.match(/^([01][0-9]|2[0-3]):[0-5][0-9]$/) ) { console.error(` The argument for the target time (targetLocalTime) has not been entered correctly. e.g. 23:30`); return null; } if ( !process.env["UTC_OFFSET"] || isNaN(Number(process.env["UTC_OFFSET"])) || Number(process.env["UTC_OFFSET"]) > 14 || Number(process.env["UTC_OFFSET"]) < -12 ) { console.error(` The environment variable for UTC offset (UTC_OFFSET) has not been entered correctly. e.g. For Asia/Tokyo, "9". For America/Los_Angeles, "-8".`); return null; } if ( !process.env["BASE_LOCAL_TIME"] || !process.env["BASE_LOCAL_TIME"].match(/^([01][0-9]|2[0-3]):[0-5][0-9]$/) ) { console.error(` The environment variable for base time (BASE_LOCAL_TIME) has not been entered correctly. e.g. 07:30`); return null; } const targetLocalTime: string[] = event.targetLocalTime.split(":"); const utcOffset: number = Number(process.env["UTC_OFFSET"]); const baseLocalTime: string[] = process.env["BASE_LOCAL_TIME"].split(":"); // Time difference from UTC (millisecond) const utcOffsetMillisecond = (new Date().getTimezoneOffset() + utcOffset * 60) * 60 * 1000; // Set the current local date. const currentLocalDate = new Date(Date.now() + utcOffsetMillisecond); console.log(`currentLocalDate : ${currentLocalDate}`); // Convert the base date to a local date. const tempBaseLocalDateMillisecond = new Date( currentLocalDate.getFullYear(), currentLocalDate.getMonth(), currentLocalDate.getDate(), Number(baseLocalTime[0]), Number(baseLocalTime[1]) ).getTime(); // Calculating the base date for a date change const baseLocalDate = currentLocalDate.getTime() < tempBaseLocalDateMillisecond ? new Date(tempBaseLocalDateMillisecond - 24 * 60 * 60 * 1000) : new Date(tempBaseLocalDateMillisecond); console.log(`baseLocalDate : ${baseLocalDate}`); // Convert the target date to a local date. const tempTargetLocalDateMillisecond = new Date( baseLocalDate.getFullYear(), baseLocalDate.getMonth(), baseLocalDate.getDate(), Number(targetLocalTime[0]), Number(targetLocalTime[1]) ).getTime(); // Calculating the target date const targetLocalDate = baseLocalDate.getTime() > tempTargetLocalDateMillisecond ? new Date(tempTargetLocalDateMillisecond + 24 * 60 * 60 * 1000) : new Date(tempTargetLocalDateMillisecond); console.log(`targetLocalDate : ${targetLocalDate}`); // Return the difference between the current date and the target date. const secounds = Math.round( (targetLocalDate.getTime() - currentLocalDate.getTime()) / 1000 ); console.log(`secounds : ${secounds}`); return secounds; };
やってみた
各種リソースのデプロイ
Lambda関数やステートマシンなど環境一式をAWS CDKでデプロイします。なお、AWS CDK関連のコードは長くなったので、以下に折りたたみます。
AWS CDK関連の情報
> tree . ├── .gitignore ├── .npmignore ├── README.md ├── bin │ └── app.ts ├── cdk.json ├── jest.config.js ├── lib │ ├── lambda-stack.ts │ └── state-machine-stack.ts ├── package-lock.json ├── package.json ├── src │ └── lambda │ └── functions │ └── calculate-waiting-time-for-target-time.ts ├── test │ └── app.test.ts └── tsconfig.json 6 directories, 13 files
{ "name": "app", "version": "0.1.0", "bin": { "app": "bin/app.js" }, "scripts": { "build": "tsc", "watch": "tsc -w", "test": "jest", "cdk": "cdk" }, "devDependencies": { "@types/jest": "^26.0.10", "@types/node": "10.17.27", "jest": "^26.4.2", "ts-jest": "^26.2.0", "aws-cdk": "2.2.0", "ts-node": "^9.0.0", "typescript": "~3.9.7" }, "dependencies": { "aws-cdk-lib": "2.2.0", "constructs": "^10.0.0", "source-map-support": "^0.5.16" } }
#!/usr/bin/env node import "source-map-support/register"; import * as cdk from "aws-cdk-lib"; import { LambdaStack } from "../lib/lambda-stack"; import { StateMachineStack } from "../lib/state-machine-stack"; const app = new cdk.App(); const lambdaStack = new LambdaStack(app, "LambdaStack"); new StateMachineStack(app, "StateMachineStack", { calculateWaitingTimeForTargetTimeFunction: lambdaStack.calculateWaitingTimeForTargetTimeFunction, });
import { Stack, StackProps } from "aws-cdk-lib"; import { Construct } from "constructs"; import * as iam from "aws-cdk-lib/aws-iam"; import * as lambda from "aws-cdk-lib/aws-lambda"; import * as nodejs from "aws-cdk-lib/aws-lambda-nodejs"; export class LambdaStack extends Stack { public readonly calculateWaitingTimeForTargetTimeFunction: nodejs.NodejsFunction; constructor(scope: Construct, id: string, props?: StackProps) { super(scope, id, props); // Create an IAM role for Lambda functions. const lambdaIamRole = new iam.Role(this, "LambdaIamRole", { assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"), managedPolicies: [ iam.ManagedPolicy.fromAwsManagedPolicyName( "service-role/AWSLambdaBasicExecutionRole" ), ], }); // Lambda function this.calculateWaitingTimeForTargetTimeFunction = new nodejs.NodejsFunction( this, "CalculateWaitingTimeForTargetTimeFunction", { entry: "src/lambda/functions/calculate-waiting-time-for-target-time.ts", runtime: lambda.Runtime.NODEJS_14_X, bundling: { minify: true, }, environment: { UTC_OFFSET: "9", BASE_LOCAL_TIME: "07:30", }, role: lambdaIamRole, } ); } }
import { Stack, StackProps, ScopedAws, Fn } from "aws-cdk-lib"; import { Construct } from "constructs"; import * as logs from "aws-cdk-lib/aws-logs"; import * as sfn from "aws-cdk-lib/aws-stepfunctions"; import * as tasks from "aws-cdk-lib/aws-stepfunctions-tasks"; import * as nodejs from "aws-cdk-lib/aws-lambda-nodejs"; interface StateMachineStackProps extends StackProps { calculateWaitingTimeForTargetTimeFunction: nodejs.NodejsFunction; } export class StateMachineStack extends Stack { public readonly controlStateMachineToRunOnlyOnceADayFunction: nodejs.NodejsFunction; constructor(scope: Construct, id: string, props: StateMachineStackProps) { super(scope, id, props); // Get the string after the stack name in the stack id to append to the end of the Log Group name to make it unique. const stackId = new ScopedAws(this).stackId; const stackIdAfterStackName = Fn.select(2, Fn.split("/", stackId)); // Create CloudWatch Logs for Step Functions const testStateMachineLogGroup = new logs.LogGroup( this, "TestStateMachineLogGroup", { logGroupName: `/aws/vendedlogs/states/testStateMachineLogGroup-${stackIdAfterStackName}`, retention: logs.RetentionDays.ONE_WEEK, } ); const succeed = new sfn.Succeed(this, "SucceedState"); new sfn.StateMachine(this, "TestStateMachine", { definition: new tasks.LambdaInvoke( this, "CalculateWaitingTimeForTargetTimeState", { resultPath: "$.Payload", lambdaFunction: props.calculateWaitingTimeForTargetTimeFunction, payload: sfn.TaskInput.fromObject({ "targetLocalTime.$": "$$.Execution.Input.targetLocalTime", }), } ).next( new sfn.Choice(this, "ChoiceState") .otherwise(new sfn.Fail(this, "FailState")) .when( sfn.Condition.numberGreaterThanEquals("$.Payload.Payload", 0), new sfn.Wait(this, "Wait For Trigger Time", { time: sfn.WaitTime.secondsPath("$.Payload.Payload"), }).next(succeed) ) .when(sfn.Condition.numberLessThan("$.Payload.Payload", 0), succeed) ), logs: { destination: testStateMachineLogGroup, level: sfn.LogLevel.ALL, }, }); } }
npx cdk deploy --all
で、AWS CDKで定義したリソースをデプロイすると、Lambda関数やステートマシンなど各種リソースが作成されていることが確認できます。
Lambda関数
ステートマシン
ステートマシンのワークフロー
現時刻 < 指定時刻の場合
まず、「現時刻 < 指定時刻」の場合を試してみます。
現在の時刻は14:08
です。
ステートマシンを選択して、実行の開始
をクリックします。実行の開始
をクリックすると、名前やJSONの入力を求められます。「現時刻 < 指定時刻」の場合を検証したいため{"targetLocalTime": "14:10"}
と入力します。名前は変更せず実行の開始
をクリックします。
すると、意図した通り、後続処理であるSucceedSate
には遷移せずWait For Trigger Time
で待機しているようです。なお、今回の待ち時間は$.Payload.Payload
にある通り114
秒です。
2分ほど待ち、指定時刻である14:10
になると、後続処理であるSucceedSate
に遷移することを確認できました。経過時刻(ms)
の値からもWait For Trigger Time
で114秒待機していたことが分かります。
現時刻 > 指定時刻の場合
次に、「現時刻 > 指定時刻」の場合を試してみます。
現在の14:19
です。
ステートマシンを選択して、実行の開始
をクリックします。実行の開始
をクリックすると、「現時刻 > 指定時刻」の場合を検証したいため{"targetLocalTime": "14:10"}
と入力します。名前は変更せず実行の開始
をクリックします。
すると今回はWait For Trigger Time
で待機せずに、すぐに後続処理であるSucceedSate
に遷移しました。「現時刻 > 指定時刻」であるため、$.Payload.Payload
も-551
となっています。
今回もジョブ管理システムから抜け出すためのTipsを作りました
Lambda関数とAWS Step FunctionsのWait
を使ってタスクの開始時刻の制御をしてみました。
固定の日時まで待機するように動作させるのであれば、AWS Step Functions単体の機能でも対応できますが、「毎日20:00に実行」や時差を考慮した計算はLambda関数を使う必要があります。
この記事が誰かの助けになれば幸いです。
以上、AWS事業本部 コンサルティング部の のんピ(@non____97)でした!