AWS Step Functions と Amazon SQS を使用して、 キューベースのステートマシンを作成する

2021.06.17

こんにちは、yagiです。

本日は、AWS Step Functions と Amazon SQS を使用して、 キューベースのステートマシンを作成したので紹介します。

ちょうどAWS Step Functions と Amazon SQS を使用したステートマシンで、 SQS からの戻り値をどうするか( Step Functions → SQS は可能だが、 SQS は Step Functions の直接入力としては使えない)について触れていたので、参考になりました。「コールバックパターン」と呼ばれるパターンとなります。

以下AWS公式チュートリアルを参考に実施しました。 Orchestrate Queue-based Microservices with AWS Step Functions and Amazon SQS

概要

このチュートリアルでは、AWS Step Functions と Amazon SQS を使用して、メッセージキューベースのマイクロサービスを調整するサーバーレスワークフローを設計および実行する方法を学びます。Step Functions はサーバーレスオーケストレーションサービスで、複数の AWS のサービスを連携させ、デバッグや変更を簡単に行える柔軟なワークフローを作成できます。Amazon SQS は、アプリケーションコンポーネントのクラウド内での通信を可能にする AWS のサービスです。

このチュートリアルでは、注文処理ワークフローの一部として、e コマースアプリケーションで受注してからの在庫確認リクエストをシミュレートします。Step Functions は、在庫確認リクエストを SQS のキューに送信します。AWS Lambda 関数は、キューを使用してリクエストをバッファリングするインベントリマイクロサービスとして機能します。リクエストを取得すると、在庫をチェックし、結果を Step Functions に返します。Step Functions のタスクがこのように設定されている場合、「コールバックパターン」と呼ばれます。コールバックパターンでは、このチュートリアルの在庫検証マイクロサービスなどの非同期タスクをワークフローに統合できます。

(上記チュートリアルより抜粋)

やってみた

チュートリアルに沿って、SQSを標準キューで作成します。

Step Functions ステートマシンでワークフローを作成します。

定義で 「コードスペニットで生成」 を選択し、指定のコードを入力します。 その際、 「QueueUrl」 に SQS の URL を指定します。これでメッセージを SQS キューに入れることができます。

Resource に .waitForTaskToken が追加されており、Step Functions はTaskTokenを JSON ペイロードに追加し、コールバックを待ちます。この後の手順でLambdaで実装するマイクロサービスによって、Step Functions API を呼び出すことにより、Step Functions に結果を返すことができる仕組みになっています。

      ```
      
      "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
      "Parameters": {
        "QueueUrl": "https://sqs.us-east-1.amazonaws.com/XXXXXXXXXXXX/Orders",
        "MessageBody": {
          "MessageTitle": "Callback Task started by Step Functions",
          "TaskToken.$": "$$.Task.Token"
        }
      },
      
      ```

以下の通りステートマシンを作成します。

次に、Step Functions からこの後の手順で Lambda で実装するマイクロサービスを介して、 SQS へのアクセスを許可する IAM ロールを作成します。 信頼されたエンティティに Lambda を指定し、管理ポリシーの「AmazonSQSFullAccess」、「AWSStepFunctionsFullAccess」をアタッチします。

次に、SQS からメッセージを取得し、リクエストの結果を Step Functions に返す Lambda を実装します。 チュートリアルに沿って Lambda を作成し、以下のコードで置換します。

console.log('Loading function');
          const aws = require('aws-sdk');

          exports.handler = (event, context, callback) => {
              const stepfunctions = new aws.StepFunctions();

              for (const record of event.Records) {
                  const messageBody = JSON.parse(record.body);
                  const taskToken = messageBody.TaskToken;

                  const params = {
                      output: "\"Callback task completed successfully.\"",
                      taskToken: taskToken
                  };

                  console.log(`Calling Step Functions to complete callback task with params ${JSON.stringify(params)}`);

                  stepfunctions.sendTaskSuccess(params, (err, data) => {
                      if (err) {
                          console.error(err.message);
                          callback(err.message);
                          return;
                      }
                      console.log(data);
                      callback(null);
                  });
              }
          };

トリガーを追加から SQS を設定します。

SQSのトリガーが有効となっていることを確認します。

ワークフローを実行

ステートマシンを実行します。

実行イベント履歴より、Step Functions が SQS を呼び出し、ステップ間でメッセージのペイロードを渡した箇所について確認することができます。

今回、実行結果は成功で終わっていますが、エラーの場合も該当エラーの箇所を選択して、詳細から確認することができます。

結論

AWS Step Functions と Amazon SQS を使用して、 キューベースのステートマシンを作成したので紹介しました。 実際に手を動かしてみることで、内部でどのようなやりとりが行われているかの理解が深まります。

参考(追記)

以下のブログで StepFunctions のパラメータについて詳しく解説されています。とっても勉強になりましたので追記させていただきます。

[StepFunctions]ParametersやらResultPathやら…。ステート間のパラメータ受け渡しって結局どうなってるの?を1つの図にしてみた。