Step FunctionsからAWS Batchの配列ジョブを実行してみる – パラメータの渡し方 #reinvent

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

re:Invent 2018において、Step Functionsから新たに8つのサービスが連携可能になったことが発表されました。

[神アップデート]Step Functionsが新たに8つのマネージドサービスと連携可能になりました! #reinvent

実際にこれを使っていくにあたっては、連携先のサービスに対してより細かい制御を行うため、Step Functionsから各サービスへパラメータを渡す必要があります。 その具体的な方法について、AWS Batchの配列ジョブを動かしながら試してみました。 詳細は公式ドキュメントの以下ページ配下に記載されています。(現在はまだ英語です)

AWS Service Integrations

パラメータの渡し方

Pass Parameters to a Service API

各サービスへ渡すパラメータは、各タスクのStateの"Parameters"で指定します。以下はSNSへ渡すパラメータを指定する例です。

   "Resource": "arn:aws:states:::sns:publish",
     "Parameters": {
       "TopicArn": "arn:aws:sns:us-east-1:123456789012:myTopic",
       "Message": "test message",
       "MessageStructure": "json",
       "MessageAttributes": {
         "my attribute no 1": {
           "DataType": "String",
           "StringValue": "value of my attribute no 1"
         },
         "my attribute no 2": {
           "DataType": "String",
           "StringValue": "value of my attribute no 2"
         }
       }
     },

前タスクの入力を渡したい場合はJsonPath形式で指定します。 その際、パラメータ名の末尾に.$をつけます。例えばMessageというパラメータに入力データの$.input.messageを渡したい場合は、以下のようになります。

"Parameters": {"Message.$": "$.input.message"},

Step Functionsから指定できるパラメータはサービスごとに決まっています。 詳細はこちらのドキュメントに記載されています。

補足・制限事項

  • 各サービスへ渡すパラメータ名の先頭は大文字です。例えばAWS BatchのArrayPropertiesのSizeを指定するとき、小文字にすると以下のように怒られます。ドキュメントに小文字で書いてあるので少しハマりました。
  • JsonPath形式で扱える入力・出力の値は文字列に限られるようです(リファレンス)。例えばArrayPropertiesのSizeは数値のため、JsonPath形式の値を指定することはできません。 *1

やってみた

実際にStep FunctionsからAWS Batchへパラメータを渡してみます。まず、以下の記事で紹介されているサンプルプロジェクトを構築します。

[新機能] AWS Step FunctionsがAWS BatchとSNSに連携可能となったのでさっそく試してみた #reinvent

このサンプルプロジェクトのBatchジョブはecho Hello worldを行うだけの簡単なものですが、これを少し変更してみます。

概要

例えば、何らかのIDのリストを取得してきて、それらに対して並列に何らかの処理を行う、といったバッチ処理を行うケース *2をイメージして、以下のような変更を加えてみます。

  • AWS Batchの前に、IDのリストを出力するLambda関数を追加します。
    • これはJSON文字列 {"statusCode": 200, "body": {"idlist": "a,b,c,d,e"}} を出力します。
  • Lambda関数が出力するIDのリストをAWS Batchのパラメータに渡し、それを配列ジョブで並列に処理します。
    • 今回はとりあえず、各ジョブは受け取ったidlistをそのまま出力するだけです。

手順

まず、IDのリストを出力するLambda関数を作成します。

次に、これを処理するAWS Batchのジョブ定義を修正し、コマンドを以下のように変更します。 Ref::idlistというのはAWS Batchでパラメータ置換を行う記法で、Parametersのキーidlistで指定された値に置換されます。

echo idlist: Ref::idlist

最後に、ステートマシンの定義を以下のように変更します。

  • Lambda関数getIdListを実行するState "Get ID List"を"Submit Batch Job"の前に追加
  • StartAtを"Get ID List"に変更
  • AWS BatchのState "Submit Batch Job"の以下を変更
    • JobDefinitionの末尾を修正後のリビジョンに変更
    • ArrayPropertiesを追加
    • Parametersを追加。"idlist"にLambdaの出力を渡す。
  • Lambda関数を追加したので、その実行権を追加したIAMロールを作成します
    • 画面下の"実行の IAM ロール"で"自分用のIAMロールを作成する"を選択

修正後のステートマシンの定義は以下のようになります。

{
  "Comment": "An example of the Amazon States Language for notification on an AWS Batch job completion",
  "StartAt": "Get ID List",
  "TimeoutSeconds": 3600,
  "States": {
    "Get ID List": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:xxxxxxxxxxxx:function:getIdList",
      "Next": "Submit Batch Job"
    },
    "Submit Batch Job": {
      "Type": "Task",
      "Resource": "arn:aws:states:::batch:submitJob.sync",
      "Parameters": {
        "JobName": "BatchJobNotification",
        "JobQueue": "arn:aws:batch:ap-northeast-1:xxxxxxxxxxxx:job-queue/BatchJobQueue-7bf649924d554dd",
        "JobDefinition": "arn:aws:batch:ap-northeast-1:xxxxxxxxxxxx:job-definition/BatchJobDefinition-8c71e1e40c37c60:17",
        "ArrayProperties": {
          "Size": 5
        },
        "Parameters": {
          "idlist.$": "$.body.idlist"
        }
      },
      "Next": "Notify Success",
      "Catch": [
          {
            "ErrorEquals": [ "States.ALL" ],
            "Next": "Notify Failure"
          }
      ]
    },

実行結果

AWS Batchのダッシュボードから成功したジョブの結果を確認できます。さらに個別のジョブのログをCloudWatch Logsで確認できます。

さらに個別のジョブのログをCloudWatch Logsで確認できます。

まとめ

Step FunctionsからAWS Batchの配列ジョブを動かしてみました。 今回のアップデートで様々なサービスを直接連携でき、パラメータも直接渡せるようになったので、複雑な処理も構築しやすくなったと思います。

脚注

  1. したがって、例えば前タスクで配列サイズを計算して、それをAWS Batchに渡して動的に配列サイズを変更するようなことはできなさそうです。
  2. 自分がちょうど今業務でこのようなバッチ処理を構築しようとしています