AWS CDK v2.87.0 で IoT rule actions に Step Functions State Machine の開始を追加可能になりました

2023.07.16

こんにちは、CX事業本部 Delivery部の若槻です。

AWS IoT rule actions を使用すると、指定したトピックにデータがパブリッシュされた際に、Lambda 関数の呼び出しや Amazon Kinesis Data Streams へのデータの送信などのアクションを定義することができます。

そして最近リリースされた AWS CDK v2.87.0 で IoT rule actions として、Step Functions State Machine の開始を追加可能になりました

アップデートされたのは Alpha modules となります。

Alpha modules (2.87.0-alpha.0)

Features

このアップデートにより、トピックにデータがパブリッシュされた際に、ETL パイプラインや分散処理のオーケストレーション実行を簡単に実装することが可能となります。

試してみた

必要なモジュールのインストール

IoT rule actions の CDK 実装は現在 Alpha module で提供されているので、モジュールを別途インストールします。

npm install -D @aws-cdk/aws-iot-alpha @aws-cdk/aws-iot-actions-alpha

インストールしたモジュールは下記となります。

CDK コード

AWS CDK(TypeScript)のコードです。

@aws-cdk/aws-iot-actions-alpha クラスで StepFunctionsStateMachineAction を使用して、TopicRule の actions および errorAction に State Machine を指定可能となっています。それぞれ異なる State Machine を指定しています。

lib/cdk-sample-stack.ts

import {
  aws_stepfunctions as stepfunctions,
  Stack,
  StackProps,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as aws_iot_alpha from '@aws-cdk/aws-iot-alpha';
import * as aws_iot_actions_alpha from '@aws-cdk/aws-iot-actions-alpha';

export class CdkSampleStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    const myStateMachine = new stepfunctions.StateMachine(
      this,
      'MyStateMachine',
      {
        stateMachineName: 'myStateMachine',
        definitionBody: stepfunctions.DefinitionBody.fromChainable(
          new stepfunctions.Pass(this, 'MyPassState', {
            parameters: {
              message: stepfunctions.JsonPath.stringAt('$.message'),
            },
          })
        ),
      }
    );

    const myErrorStateMachine = new stepfunctions.StateMachine(
      this,
      'MyErrorStateMachine',
      {
        stateMachineName: 'myErrorStateMachine',
        definitionBody: stepfunctions.DefinitionBody.fromChainable(
          new stepfunctions.Pass(this, 'ErrorState', {
            parameters: {
              ruleName: stepfunctions.JsonPath.stringAt('$.ruleName'),
              topic: stepfunctions.JsonPath.stringAt('$.topic'),
              failedResource: stepfunctions.JsonPath.stringAt(
                '$.failures[0].failedResource'
              ),
            },
          })
        ),
      }
    );

    new aws_iot_alpha.TopicRule(this, 'MyTopicRule', {
      topicRuleName: 'myTopicRule',
      sql: aws_iot_alpha.IotSql.fromStringAsVer20160323(
        "SELECT * FROM 'topic/subtopic'"
      ),
      actions: [
        new aws_iot_actions_alpha.StepFunctionsStateMachineAction(myStateMachine),
      ],
      errorAction: new aws_iot_actions_alpha.StepFunctionsStateMachineAction(
        myErrorStateMachine
      ),
    });
  }
}

上記定義により作成されるリソースは次のようになります。TopicRule の実体のリソースと、IoT action を実行するための権限リソースが作成されています。

動作確認

アクションの実行

トピックルールの SQL で指定したトピック topic/subtopic に正常な JSON データをパブリッシュしみます。

aws iot-data publish \
  --topic 'topic/subtopic' \
  --cli-binary-format raw-in-base64-out \
  --payload '{"message":"hello!"}'

myStateMachine の直近の実行履歴を取得します。

$ executionArn=$(aws stepfunctions list-executions \
  --state-machine-arn arn:aws:states:${region}:${account_id}:stateMachine:myStateMachine \
  --max-items 1 | jq -r '.executions[0].executionArn')

直近の実行内容を確認すると、トピックルールから myStateMachine にデータが渡されて実行が行われたことが分かります。

$ aws stepfunctions describe-execution \
  --execution-arn ${executionArn}
{
    "executionArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXXX:execution:myStateMachine:268cc458-5ae6-490d-bc88-8b63d0ff3c3b",
    "stateMachineArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXXX:stateMachine:myStateMachine",
    "name": "268cc458-5ae6-490d-bc88-8b63d0ff3c3b",
    "status": "SUCCEEDED",
    "startDate": "2023-07-16T21:31:04.089000+09:00",
    "stopDate": "2023-07-16T21:31:04.154000+09:00",
    "input": "{\"message\":\"hello!\"}",
    "inputDetails": {
        "included": true
    },
    "output": "{\"message\":\"hello!\"}",
    "outputDetails": {
        "included": true
    }
}

エラーアクションの実行

同じくトピック topic/subtopic に、今度は不正な JSON データをパブリッシュしみます。

aws iot-data publish \
  --topic 'topic/subtopic' \
  --cli-binary-format raw-in-base64-out \
  --payload 'invalid json'

myErrorStateMachine の直近の実行履歴を取得します。

$ executionArn=$(aws stepfunctions list-executions \
  --state-machine-arn arn:aws:states:${region}:${account_id}:stateMachine:myErrorStateMachine \
  --max-items 1 | jq -r '.executions[0].executionArn')

すると、トピックルールから myErrorStateMachine にトピックルールでのエラーの情報がデータとして渡されて実行が行われたことが分かります。

$ aws stepfunctions describe-execution \
  --execution-arn ${executionArn}
{
    "executionArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXXX:execution:myErrorStateMachine:47974352-9eb8-4a20-a368-423a8967b558",
    "stateMachineArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXXX:stateMachine:myErrorStateMachine",
    "name": "47974352-9eb8-4a20-a368-423a8967b558",
    "status": "SUCCEEDED",
    "startDate": "2023-07-16T21:31:31.440000+09:00",
    "stopDate": "2023-07-16T21:31:31.515000+09:00",
    "input": "{\"ruleName\":\"myTopicRule\",\"topic\":\"topic/subtopic\",\"cloudwatchTraceId\":\"49fc139d-31dc-d44d-9dc4-c738a11c1b41\",\"clientId\":\"N/A\",\"base64OriginalPayload\":\"aW52YWxpZCBqc29u\",\"failures\":[{\"failedAction\":\"StepFunctionsAction\",\"failedResource\":\"myStateMachine\",\"errorMessage\":\"Failed to start Step Functions execution. The error received was 'Input must be a valid JSON string' . Message arrived on: topic/subtopic, Action: stepFunctions, StateMachineName: myStateMachine, ExecutionName: adf42997-4f10-47aa-9fdf-9018294f6737\"}]}",
    "inputDetails": {
        "included": true
    },
    "output": "{\"ruleName\":\"myTopicRule\",\"topic\":\"topic/subtopic\",\"failedResource\":\"myStateMachine\"}",
    "outputDetails": {
        "included": true
    }
}

今回のアップデートについて

今回、「Step Functions State Machine の開始」が追加されましたが、これにより@aws-cdk/aws-iot-actions-alphaでは現在合わせて14種類の IoT rule actions が利用可能になっています。

packages/@aws-cdk/aws-iot-actions-alpha/lib/index.ts

export * from './cloudwatch-logs-action';
export * from './cloudwatch-put-metric-action';
export * from './cloudwatch-set-alarm-state-action';
export * from './common-action-props';
export * from './dynamodbv2-put-item-action';
export * from './firehose-put-record-action';
export * from './iotevents-put-message-action';
export * from './iot-republish-action';
export * from './kinesis-put-record-action';
export * from './lambda-function-action';
export * from './s3-put-object-action';
export * from './sqs-queue-action';
export * from './sns-topic-action';
+ export * from './step-functions-state-machine-action';

選択肢が多いのは嬉しいですね。

おわりに

AWS CDK v2.87.0 で IoT rule actions に Step Functions State Machine の開始を追加可能になりました。

AWS CDK での IoT 周りの実装はまだまだ手薄なので、今後もアップデートが続くことを期待しています。

参考

以上