こんにちは、CX事業本部 Delivery部の若槻です。
AWS IoT rule actions を使用すると、指定したトピックにデータがパブリッシュされた際に、Lambda 関数の呼び出しや Amazon Kinesis Data Streams へのデータの送信などのアクションを定義することができます。
そして最近リリースされた AWS CDK v2.87.0 で IoT rule actions として、Step Functions State Machine の開始を追加可能になりました
アップデートされたのは Alpha modules となります。
Alpha modules (2.87.0-alpha.0)
Features
このアップデートにより、トピックにデータがパブリッシュされた際に、ETL パイプラインや分散処理のオーケストレーション実行を簡単に実装することが可能となります。
試してみた
必要なモジュールのインストール
IoT rule actions の CDK 実装は現在 Alpha module で提供されているので、モジュールを別途インストールします。
npm install -D @aws-cdk/aws-iot-alpha @aws-cdk/aws-iot-actions-alpha
インストールしたモジュールは下記となります。
CDK コード
AWS CDK(TypeScript)のコードです。
@aws-cdk/aws-iot-actions-alpha
クラスで StepFunctionsStateMachineAction
を使用して、TopicRule の actions および errorAction に State Machine を指定可能となっています。それぞれ異なる State Machine を指定しています。
lib/cdk-sample-stack.ts
import {
aws_stepfunctions as stepfunctions,
Stack,
StackProps,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as aws_iot_alpha from '@aws-cdk/aws-iot-alpha';
import * as aws_iot_actions_alpha from '@aws-cdk/aws-iot-actions-alpha';
export class CdkSampleStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
const myStateMachine = new stepfunctions.StateMachine(
this,
'MyStateMachine',
{
stateMachineName: 'myStateMachine',
definitionBody: stepfunctions.DefinitionBody.fromChainable(
new stepfunctions.Pass(this, 'MyPassState', {
parameters: {
message: stepfunctions.JsonPath.stringAt('$.message'),
},
})
),
}
);
const myErrorStateMachine = new stepfunctions.StateMachine(
this,
'MyErrorStateMachine',
{
stateMachineName: 'myErrorStateMachine',
definitionBody: stepfunctions.DefinitionBody.fromChainable(
new stepfunctions.Pass(this, 'ErrorState', {
parameters: {
ruleName: stepfunctions.JsonPath.stringAt('$.ruleName'),
topic: stepfunctions.JsonPath.stringAt('$.topic'),
failedResource: stepfunctions.JsonPath.stringAt(
'$.failures[0].failedResource'
),
},
})
),
}
);
new aws_iot_alpha.TopicRule(this, 'MyTopicRule', {
topicRuleName: 'myTopicRule',
sql: aws_iot_alpha.IotSql.fromStringAsVer20160323(
"SELECT * FROM 'topic/subtopic'"
),
actions: [
new aws_iot_actions_alpha.StepFunctionsStateMachineAction(myStateMachine),
],
errorAction: new aws_iot_actions_alpha.StepFunctionsStateMachineAction(
myErrorStateMachine
),
});
}
}
上記定義により作成されるリソースは次のようになります。TopicRule の実体のリソースと、IoT action を実行するための権限リソースが作成されています。
動作確認
アクションの実行
トピックルールの SQL で指定したトピック topic/subtopic
に正常な JSON データをパブリッシュしみます。
aws iot-data publish \
--topic 'topic/subtopic' \
--cli-binary-format raw-in-base64-out \
--payload '{"message":"hello!"}'
myStateMachine の直近の実行履歴を取得します。
$ executionArn=$(aws stepfunctions list-executions \
--state-machine-arn arn:aws:states:${region}:${account_id}:stateMachine:myStateMachine \
--max-items 1 | jq -r '.executions[0].executionArn')
直近の実行内容を確認すると、トピックルールから myStateMachine にデータが渡されて実行が行われたことが分かります。
$ aws stepfunctions describe-execution \
--execution-arn ${executionArn}
{
"executionArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXXX:execution:myStateMachine:268cc458-5ae6-490d-bc88-8b63d0ff3c3b",
"stateMachineArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXXX:stateMachine:myStateMachine",
"name": "268cc458-5ae6-490d-bc88-8b63d0ff3c3b",
"status": "SUCCEEDED",
"startDate": "2023-07-16T21:31:04.089000+09:00",
"stopDate": "2023-07-16T21:31:04.154000+09:00",
"input": "{\"message\":\"hello!\"}",
"inputDetails": {
"included": true
},
"output": "{\"message\":\"hello!\"}",
"outputDetails": {
"included": true
}
}
エラーアクションの実行
同じくトピック topic/subtopic
に、今度は不正な JSON データをパブリッシュしみます。
aws iot-data publish \
--topic 'topic/subtopic' \
--cli-binary-format raw-in-base64-out \
--payload 'invalid json'
myErrorStateMachine の直近の実行履歴を取得します。
$ executionArn=$(aws stepfunctions list-executions \
--state-machine-arn arn:aws:states:${region}:${account_id}:stateMachine:myErrorStateMachine \
--max-items 1 | jq -r '.executions[0].executionArn')
すると、トピックルールから myErrorStateMachine にトピックルールでのエラーの情報がデータとして渡されて実行が行われたことが分かります。
$ aws stepfunctions describe-execution \
--execution-arn ${executionArn}
{
"executionArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXXX:execution:myErrorStateMachine:47974352-9eb8-4a20-a368-423a8967b558",
"stateMachineArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXXX:stateMachine:myErrorStateMachine",
"name": "47974352-9eb8-4a20-a368-423a8967b558",
"status": "SUCCEEDED",
"startDate": "2023-07-16T21:31:31.440000+09:00",
"stopDate": "2023-07-16T21:31:31.515000+09:00",
"input": "{\"ruleName\":\"myTopicRule\",\"topic\":\"topic/subtopic\",\"cloudwatchTraceId\":\"49fc139d-31dc-d44d-9dc4-c738a11c1b41\",\"clientId\":\"N/A\",\"base64OriginalPayload\":\"aW52YWxpZCBqc29u\",\"failures\":[{\"failedAction\":\"StepFunctionsAction\",\"failedResource\":\"myStateMachine\",\"errorMessage\":\"Failed to start Step Functions execution. The error received was 'Input must be a valid JSON string' . Message arrived on: topic/subtopic, Action: stepFunctions, StateMachineName: myStateMachine, ExecutionName: adf42997-4f10-47aa-9fdf-9018294f6737\"}]}",
"inputDetails": {
"included": true
},
"output": "{\"ruleName\":\"myTopicRule\",\"topic\":\"topic/subtopic\",\"failedResource\":\"myStateMachine\"}",
"outputDetails": {
"included": true
}
}
今回のアップデートについて
今回、「Step Functions State Machine の開始」が追加されましたが、これにより@aws-cdk/aws-iot-actions-alpha
では現在合わせて14種類の IoT rule actions が利用可能になっています。
packages/@aws-cdk/aws-iot-actions-alpha/lib/index.ts
export * from './cloudwatch-logs-action';
export * from './cloudwatch-put-metric-action';
export * from './cloudwatch-set-alarm-state-action';
export * from './common-action-props';
export * from './dynamodbv2-put-item-action';
export * from './firehose-put-record-action';
export * from './iotevents-put-message-action';
export * from './iot-republish-action';
export * from './kinesis-put-record-action';
export * from './lambda-function-action';
export * from './s3-put-object-action';
export * from './sqs-queue-action';
export * from './sns-topic-action';
+ export * from './step-functions-state-machine-action';
選択肢が多いのは嬉しいですね。
おわりに
AWS CDK v2.87.0 で IoT rule actions に Step Functions State Machine の開始を追加可能になりました。
AWS CDK での IoT 周りの実装はまだまだ手薄なので、今後もアップデートが続くことを期待しています。
参考
以上