AWS Step Functionsでジョブ・ステータス・ポーリングを実装する

2018.03.05

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

AWS Step Functionsを使ってAWS Batchのジョブをキューイング→ジョブステータスをポーリング→ジョブ終了までを管理するジョブ・ステータス・ポーラーを10分でプロビジョンするチュートリアルが追加されていたため、実際にやってみました。

ステートマシン

Lambda で AWS Batch ジョブをキューイングし、ジョブのステータスが完了系になるまでポーリングします。 ポーリング間隔は Wait ステートを利用します。

 

やってみた

以下の手順でサンプルプロジェクトを実行します。

  1. タスクタイマーをプロビジョン
  2. 後片付け

1. タスクタイマーをプロビジョン

Step Function のダッシュボードから ”Create a state machine”を選択します

Sample Projects -> Job Status Poller を選択し、"Create Sample Project" ボタンをクリックします。

CloudFormation でリソースが作成される旨のメッセージが表示されます。 "Create Resources" ボタンをクリックします。

リソースの作成が始まります。

リソースの作成が完了すると、ステートマシーンを実行する "New execution" のダイアログが表示されます。

Input の JSON において

  • jobName は AWS Batch のジョブ名
  • jobDefinition は AWS Batch のコンテナ定義
  • jobQueue は AWS Batch のジョブキュー
  • wait_time は待機する秒数

を表します。

sample input

{
  "jobName": "my-job",
  "jobDefinition": "arn:aws:batch:eu-central-1:123456789012:job-definition/SampleJobDefinition-XXX:1",
  "jobQueue": "arn:aws:batch:eu-central-1:123456789012:job-queue/SampleJobQueue-YYY",
  "wait_time": 60
}

"Start execution" ボタンをクリックして、ステートマシーンを実行します。

ステートマシーンが実行されます。

"Submit Job" Task では Lambda 関数 SubmitJobFunction を呼び出し、ジョブをキューイングします。 ジョブはコマンド「echo 'Hello world'」を実行します。

"Wait X Seconds" では Wait ステートを使って、wait_time で指定した秒数だけ待機します。

"Get Job Status" Task では Lambda 関数 CheckJobFunction を呼び出し、ジョブのステータスを取得します。

"Job Complete?" Choice では、ジョブステータスが "SUCCEEDED" または "FAILED" の場合次にステートに移動し、これら以外の場合は "Wait X Seconds" ステートに戻ってポーリングを繰り返します。

"Job Failed" では Fail ステートを使ってステートマシンの実行を停止し、失敗としてマークします。

"Get Final Job Status" Task では Lambda 関数 CheckJobFunction を呼び出し、ジョブの最後のステータスを取得し、End フィールドを使ってステートマシンの実行を停止します。 "SUCCEEDED" ステートで実行を停止していない理由ははっきりしません。 このサンプルプロジェクトでは "Get Job Status" ステップと同じ Lambda 関数を利用していますが、 実プロジェクトでは 要件に合わせた Lambda 関数(SNS 通知とか)などにかえることを想定しているものと思われます。

2. 後片付け

このステートマシーンに関するリソースは全て CloudFormation で作成されています。

CloudFormation のスタック一覧に"StepFunctionsSample-JobStatusPoller-XXX" のスタック名が存在するため、削除しましょう。

プロビジョン内容を確認

プロビジョニングには CloudFormation を利用しています。 作成されたリソースを確認してみましょう。

作成されたリソース

CloudFormation により以下のリソースが作成されます。

非 AWS Batch(ECS) 関連

  • AWS::StepFunctions::StateMachine : JobStatusPollerStateMachine
  • AWS::Lambda::Function : SubmitJobFunction
  • AWS::Lambda::Function : CheckJobFunction
  • AWS::IAM::Role : StatesExecutionRole
  • AWS::IAM::Role : LambdaExecutionRole

AWS Batch(ECS) 関連

  • VPC(SampleVPC) などネットワークリソース
  • AWS::Batch::JobQueue : SampleJobQueue
  • AWS::Batch::JobDefinition : SampleJobDefinition
  • AWS::Batch::ComputeEnvironment : SampleComputeEnvironment
  • AWS::IAM::Role : SampleEcsInstanceRole
  • AWS::IAM::InstanceProfile : SampleIamInstanceProfile

※ 論理ID

Step Functions : JobStatusPollerStateMachine はメインとなるステートマシーンです。

作成されるステートマシーンの Amazon States Language は以下です。

{
  "Comment": "A state machine that submits a Job to AWS Batch and monitors the Job until it completes.",
  "StartAt": "Submit Job",
  "States": {
    "Submit Job": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-central-1:123456789012:function:StepFunctionsSample-JobStatusPol-SubmitJobFunction-1CBAEF68A1X8A",
      "ResultPath": "$.guid",
      "Next": "Wait X Seconds"
    },
    "Wait X Seconds": {
      "Type": "Wait",
      "SecondsPath": "$.wait_time",
      "Next": "Get Job Status"
    },
    "Get Job Status": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-central-1:123456789012:function:StepFunctionsSample-JobStatusPoll-CheckJobFunction-PXJ28RR8GQ4M",
      "Next": "Job Complete?",
      "InputPath": "$.guid",
      "ResultPath": "$.status"
    },
    "Job Complete?": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.status",
          "StringEquals": "FAILED",
          "Next": "Job Failed"
        },
        {
          "Variable": "$.status",
          "StringEquals": "SUCCEEDED",
          "Next": "Get Final Job Status"
        }
      ],
      "Default": "Wait X Seconds"
    },
    "Job Failed": {
      "Type": "Fail",
      "Cause": "AWS Batch Job Failed",
      "Error": "DescribeJob returned FAILED"
    },
    "Get Final Job Status": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-central-1:123456789012:function:StepFunctionsSample-JobStatusPoll-CheckJobFunction-PXJ28RR8GQ4M",
      "InputPath": "$.guid",
      "End": true
    }
  }
}

Task ステートで Lambda を呼び出せるように、 StatesExecutionRole ロールをアタッチしています。

StatesExecutionPolicy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Resource": "*",
            "Effect": "Allow"
        }
    ]
}

Lambda 関数 SubmitJobFunction/CheckJobFunction は AWS Batch へのジョブ Submit とステータスチェックだけをするシンプルなものです。

SubmitJobFunction では submit_job API でジョブを Sumit しています。

SubmitJobFunction

import json
import boto3

print('Loading function')
batch = boto3.client('batch')
def lambda_handler(event, context):
    # Log the received event
    print("Received event: " + json.dumps(event, indent=2))
    # Get parameters for the SubmitJob call
    # http://docs.aws.amazon.com/batch/latest/APIReference/API_SubmitJob.html
    jobName = event['jobName']
    jobQueue = event['jobQueue']
    jobDefinition = event['jobDefinition']
    # containerOverrides and parameters are optional
    if event.get('containerOverrides'):
        containerOverrides = event['containerOverrides']
    else:
        containerOverrides = {}
    if event.get('parameters'):
        parameters = event['parameters']
    else:
        parameters = {}

    try:
        # Submit a Batch Job
        response = batch.submit_job(jobQueue=jobQueue, jobName=jobName, jobDefinition=jobDefinition,
                                    containerOverrides=containerOverrides, parameters=parameters)
        # Log response from AWS Batch
        print("Response: " + json.dumps(response, indent=2))
        # Return the jobId
        jobId = response['jobId']
        return {
            'jobId': jobId
        }
    except Exception as e:
        print(e)
        message = 'Error submitting Batch Job'
        print(message)
        raise Exception(message)

CheckJobFunction では describe_jobs API でジョブキューのステータスを取得しています。

CheckJobFunction

from __future__ import print_function

import json
import boto3

print('Loading function')

batch = boto3.client('batch')

def lambda_handler(event, context):
    # Log the received event
    print("Received event: " + json.dumps(event, indent=2))
    # Get jobId from the event
    jobId = event['jobId']

    try:
        # Call DescribeJobs
        response = batch.describe_jobs(jobs=[jobId])
        # Log response from AWS Batch
        print("Response: " + json.dumps(response, indent=2))
        # Return the jobtatus
        jobStatus = response['jobs'][0]['status']
        return jobStatus
    except Exception as e:
        print(e)
        message = 'Error getting Batch Job status'
        print(message)
        raise Exception(message)

これら Lambda 関数では AWS Batch を操作できるように、 LambdaExecutionRole ロールをアタッチしています。

LambdaExecutionRole

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "batch:SubmitJob",
                "batch:DescribeJobs"
            ],
            "Resource": "*",
            "Effect": "Allow"
        }
    ]
}

最後に

Document History によると、このサンプルプロジェクトは既存のチュートリアルをより試しやすいように CloudFormation 化して 2018/01/18 に追加されたようです。

Step Functions はステートを定義するだけではほぼ何もやれず、タスクなどと連携して初めてまともに動作します。

Step Functions ではポーリングしながら状態遷移するケースが非常に多いかと思います。 今回のプロジェクトを参考に、AWS Batch 部分を AWS Lambda など類似のサービスに置き換えるとだけで、ミニマムなおれおれジョブ・ステータス・ポーラーの完成です。

一度お試しください。

参照