AWS Step FunctionsステートマシンでLambdaを使わずに配列の作成や要素追加を行うアイデア(AWS CDK v2)

2022.06.17

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 IoT事業部の若槻です。

AWS Step Functionsステートマシンでは、いくつかの組み込みの文字列処理や演算子が用意されていますが、まだまだ充実しているとは言い難いため、少しでも複雑なデータ処理をしようとすると現状ではLambda関数に頼らざるを得ません。

そんな折、以前のエントリではステートマシンで数値の足し算/引き算を行うアイデアをご紹介しました。

そして今回は、同じくAWS Step FunctionsステートマシンでLambdaを使わずに配列の操作(作成や要素追加)を行うアイデアを紹介します。

やってみる

必要なリソースの作成はAWS CDK v2(TypeScript)で行います。次のようなCDKスタックを作成します。

lib/process-stack.ts

import { Construct } from 'constructs';
import {
  aws_stepfunctions,
  aws_stepfunctions_tasks,
  aws_dynamodb,
  Stack,
  StackProps,
  RemovalPolicy,
} from 'aws-cdk-lib';

export class ProcessStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // 適当なDynamoDBテーブル
    const someTable = new aws_dynamodb.Table(this, 'someTable', {
      tableName: 'someTable',
      partitionKey: {
        name: 'id',
        type: aws_dynamodb.AttributeType.STRING,
      },
      billingMode: aws_dynamodb.BillingMode.PAY_PER_REQUEST,
      removalPolicy: RemovalPolicy.DESTROY,
    });

    // 配列の作成
    const firstAppendTask = new aws_stepfunctions_tasks.DynamoUpdateItem(
      this,
      'firstAppendTask',
      {
        table: someTable,
        key: {
          id: aws_stepfunctions_tasks.DynamoAttributeValue.fromString('aaaaaa'), //適当なキー値
        },
        expressionAttributeNames: {
          '#array': 'array',
        },
        expressionAttributeValues: {
          ':vals': aws_stepfunctions_tasks.DynamoAttributeValue.fromList([
            aws_stepfunctions_tasks.DynamoAttributeValue.fromString(
              aws_stepfunctions.JsonPath.stringAt('$.val1'),
            ),
            aws_stepfunctions_tasks.DynamoAttributeValue.fromString(
              aws_stepfunctions.JsonPath.stringAt('$.val2'),
            ),
          ]),
          ':emptyArray': aws_stepfunctions_tasks.DynamoAttributeValue.fromList(
            [],
          ),
        },
        updateExpression: 'SET #array = list_append(:emptyArray, :vals)',
        returnValues: aws_stepfunctions_tasks.DynamoReturnValues.UPDATED_NEW,
        resultSelector: {
          items: aws_stepfunctions.JsonPath.stringAt('$.Attributes.array.L'),
        },
        resultPath: '$.firstAppendTaskOutPut',
      },
    );

    // 配列への要素追加
    const secondAppendTask = new aws_stepfunctions_tasks.DynamoUpdateItem(
      this,
      'secondAppendTask',
      {
        table: someTable,
        key: {
          id: aws_stepfunctions_tasks.DynamoAttributeValue.fromString('aaaaaa'), //適当なキー値
        },
        expressionAttributeNames: {
          '#array': 'array',
        },
        expressionAttributeValues: {
          ':vals': aws_stepfunctions_tasks.DynamoAttributeValue.fromList([
            aws_stepfunctions_tasks.DynamoAttributeValue.fromString(
              aws_stepfunctions.JsonPath.stringAt('$.val3'),
            ),
          ]),
        },
        updateExpression: 'SET #array = list_append(#array, :vals)',
        returnValues: aws_stepfunctions_tasks.DynamoReturnValues.UPDATED_NEW,
        resultSelector: {
          items: aws_stepfunctions.JsonPath.stringAt('$.Attributes.array.L'),
        },
        resultPath: '$.secondAppendTask',
      },
    );

    // Map処理
    const map = new aws_stepfunctions.Map(this, 'map', {
      itemsPath: aws_stepfunctions.JsonPath.stringAt(
        '$.secondAppendTask.items',
      ),
      parameters: {
        item: aws_stepfunctions.JsonPath.stringAt('$$.Map.Item.Value.S'),
      },
    });

    // ステートマシン
    new aws_stepfunctions.StateMachine(this, 'stateMachine', {
      stateMachineName: 'stateMachine',
      definition: firstAppendTask
        .next(secondAppendTask)
        .next(map.iterator(new aws_stepfunctions.Pass(this, 'pass'))),
    });
  }
}
  • 配列への要素の追加に利用したのはDynamoDBのUpdateItemです。
  • UpdateItemではUpdateExpressionlist_appendFunctionを使用して配列への要素の追加を行うことができます。
  • firstAppendTaskでは、2つの値を要素とした配列を作成しています。
    • 初回なので空の配列に対して要素を追加し配列作成を行なっています。
  • secondAppendTaskでは、firstAppendTaskで作成した配列に要素を追加しています。
  • 適当なDynamoDBテーブルに対してUpdateItem処理を実施し、Update後のItemを戻り値として取得することにより配列の操作を実現しています。
  • 作成した配列の利用例としてMapで処理するようにしています。

スタックをCDK Deployしてリソースを作成します。作成されたステートマシン定義は次のようになります。

{
  "StartAt": "firstAppendTask",
  "States": {
    "firstAppendTask": {
      "Next": "secondAppendTask",
      "Type": "Task",
      "ResultPath": "$.firstAppendTaskOutPut",
      "ResultSelector": {
        "items.$": "$.Attributes.array.L"
      },
      "Resource": "arn:aws:states:::dynamodb:updateItem",
      "Parameters": {
        "Key": {
          "id": {
            "S": "aaaaaa"
          }
        },
        "TableName": "someTable",
        "ExpressionAttributeNames": {
          "#array": "array"
        },
        "ExpressionAttributeValues": {
          ":vals": {
            "L": [
              {
                "S.$": "$.val1"
              },
              {
                "S.$": "$.val2"
              }
            ]
          },
          ":emptyArray": {
            "L": []
          }
        },
        "ReturnValues": "UPDATED_NEW",
        "UpdateExpression": "SET #array = list_append(:emptyArray, :vals)"
      }
    },
    "secondAppendTask": {
      "Next": "map",
      "Type": "Task",
      "ResultPath": "$.secondAppendTask",
      "ResultSelector": {
        "items.$": "$.Attributes.array.L"
      },
      "Resource": "arn:aws:states:::dynamodb:updateItem",
      "Parameters": {
        "Key": {
          "id": {
            "S": "aaaaaa"
          }
        },
        "TableName": "someTable",
        "ExpressionAttributeNames": {
          "#array": "array"
        },
        "ExpressionAttributeValues": {
          ":vals": {
            "L": [
              {
                "S.$": "$.val3"
              }
            ]
          }
        },
        "ReturnValues": "UPDATED_NEW",
        "UpdateExpression": "SET #array = list_append(#array, :vals)"
      }
    },
    "map": {
      "Type": "Map",
      "End": true,
      "Parameters": {
        "item.$": "$$.Map.Item.Value.S"
      },
      "Iterator": {
        "StartAt": "pass",
        "States": {
          "pass": {
            "Type": "Pass",
            "End": true
          }
        }
      },
      "ItemsPath": "$.secondAppendTask.items"
    }
  }
}

動作確認

作成されたステートマシンを次のような入力を指定して実行してみます。

{
  "val1": "candy",
  "val2": "choco",
  "val3": "cacao"
}

すると実行が成功しました。

secondAppendTaskの出力を見ると、配列の作成および要素追加ができています!

DynamoDBテーブルを見るとこちらにもアイテム内に配列が作成できていますね。

また作成した配列をちゃんとMap処理で使用できています。

おわりに

同じくAWS Step FunctionsステートマシンでLambdaを使わずに配列の操作(作成や要素追加)を行うアイデアを紹介しました。今までステートマシン内で配列を操作したい場面はいくつかあったため、今後選択肢になりそうです。

ちなみに、あまり需要は無さそうですが、同じくUpdateItemを使用して配列からの要素の削除も行えます。可能性がありますね。

以上