AWS Step FunctionsステートマシンでDynamoDBテーブルからアイテムを取得してMapステートで処理する(AWS CDK)

2021.11.03

こんにちは、CX事業本部 IoT事業部の若槻です。

前回のエントリでは、Step Functions Workflow Studioを使用して、DynamoDBテーブルをQueryして取得したアイテムをMapステートで処理(SMSメッセージを送信)するステートマシンを作成しました。

今回は、同じくAWS Step FunctionsのステートマシンからDynamoDBテーブルをQueryして取得したアイテムをMapステートで処理(SMSメッセージを送信)する構成を、AWS CDKでで作ってみました。

作るもの

下記のようなグラフの定義のステートマシンを作成します。queryCrewsTableからDynamoDBテーブルをQueryし、Mapステート内のsendingSMSTasksからAmazon SNSでSMSメッセージを送信します。

やってみた

対象のDynamoDBテーブル

下記のDynamoDBテーブルをクエリ対象とします。

  • テーブル名:crews
  • PK:crewId(String)
  • GSI名:areaId-index
  • GSI-PK:areaId(String)

格納されているデータは以下となります。

CDKコード

lib/aws-cdk-app-stack.ts

import * as cdk from '@aws-cdk/core';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';

export class AwsCdkAppStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const accountId = cdk.Stack.of(this).account;
    const region = cdk.Stack.of(this).region;

    const crewsTableName = 'crews';
    const crewsTableAreaIdIndex = 'areaId-index';

    //Crewsテーブルデータ取得タスク
    const queryCrewsTable = new tasks.CallAwsService(this, 'queryCrewsTable', {
      service: 'dynamodb',
      action: 'query',
      parameters: {
        TableName: crewsTableName,
        IndexName: crewsTableAreaIdIndex,
        ExpressionAttributeNames: {
          '#areaId': 'areaId',
        },
        ExpressionAttributeValues: {
          ':areaId': {
            'S.$': '$.areaId',
          },
        },
        KeyConditionExpression: '#areaId = :areaId',
      },
      iamResources: [
        `arn:aws:dynamodb:${region}:${accountId}:table/${crewsTableName}/index/${crewsTableAreaIdIndex}`,
      ],
      iamAction: 'dynamodb:Query',
      resultSelector: {
        'Items.$': '$.Items',
      },
      resultPath: sfn.JsonPath.stringAt('$.crews'),
    });

    //Mapステート
    const sendingMap = new sfn.Map(this, 'sendingMap', {
      itemsPath: sfn.JsonPath.stringAt('$.crews.Items'),
      parameters: {
        'phoneNumber.$': '$$.Map.Item.Value.phoneNumber.S',
        'message.$': '$.message',
        'senderId.$': '$.senderId',
      },
    });

    //SMS送信タスク
    const sendingSMSTask = new tasks.CallAwsService(this, 'sendingSMSTask', {
      service: 'sns',
      action: 'publish',
      parameters: {
        'PhoneNumber.$': '$.phoneNumber',
        'Message.$': '$.message',
        MessageAttributes: {
          'AWS.SNS.SMS.SenderID': {
            DataType: tasks.MessageAttributeDataType.STRING,
            'StringValue.$': '$.senderId',
          },
        },
      },
      iamResources: ['*'],
      iamAction: 'sns:Publish',
    });

    //MapのイテレーターにSMS送信タスクを指定
    sendingMap.iterator(sendingSMSTask);

    //ステートマシン
    new sfn.StateMachine(this, 'stateMachine', {
      definition: queryCrewsTable.next(sendingMap),
    });
  }
}

cdk deployしてデプロイします。

マネジメントコンソールから確認すると、ステートマシンが作成できています。

ステートマシン定義は以下のようになります。

{
  "StartAt": "queryCrewsTable",
  "States": {
    "queryCrewsTable": {
      "Next": "sendingMap",
      "Type": "Task",
      "ResultPath": "$.crews",
      "ResultSelector": {
        "Items.$": "$.Items"
      },
      "Resource": "arn:aws:states:::aws-sdk:dynamodb:query",
      "Parameters": {
        "TableName": "crews",
        "IndexName": "areaId-index",
        "ExpressionAttributeNames": {
          "#areaId": "areaId"
        },
        "ExpressionAttributeValues": {
          ":areaId": {
            "S.$": "$.areaId"
          }
        },
        "KeyConditionExpression": "#areaId = :areaId"
      }
    },
    "sendingMap": {
      "Type": "Map",
      "End": true,
      "Parameters": {
        "phoneNumber.$": "$$.Map.Item.Value.phoneNumber.S",
        "message.$": "$.message",
        "senderId.$": "$.senderId"
      },
      "Iterator": {
        "StartAt": "sendingSMSTask",
        "States": {
          "sendingSMSTask": {
            "End": true,
            "Type": "Task",
            "Resource": "arn:aws:states:::aws-sdk:sns:publish",
            "Parameters": {
              "PhoneNumber.$": "$.phoneNumber",
              "Message.$": "$.message",
              "MessageAttributes": {
                "AWS.SNS.SMS.SenderID": {
                  "DataType": "String",
                  "StringValue.$": "$.senderId"
                }
              }
            }
          }
        }
      },
      "ItemsPath": "$.crews.Items"
    }
  }
}

またステートマシンの実行ロールも合わせて作成されますが、そのインラインポリシーは下記のようになります。CDK定義上での各ステートマシンTaskごとのiamResourcesおよびiamActionの指定がそのままポリシーとなっています。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": "dynamodb:Query",
            "Resource": "arn:aws:dynamodb:ap-northeast-1:XXXXXXXXXXXX:table/crews/index/areaId-index",
            "Effect": "Allow"
        },
        {
            "Action": "sns:Publish",
            "Resource": "*",
            "Effect": "Allow"
        }
    ]
}

動作

下記のJSONを指定して、ステートマシンを実行します。a001のエリアにいるクルーにTaito-Kuに移動するように通知する入力です。

{
  "areaId": "a001",
  "message": "Please move to Taito-Ku.",
  "senderId": "AllocateApp"
}

実行に成功しました。

それぞれのクルーの電話番号の端末でもSMSのメッセージが受信できています。

参考

以上