AWS IoT Eventsから項目数の多いペイロードをLambdaに渡す場合にDynamoDBを経由してみる

2021.03.31

はじめに

CX事業本部の佐藤智樹です。

今回は、IoT EventsからLambdaへペイロードを渡す場合にDynamoDBを経由してデータを渡す処理を試しに実装してみたので紹介します。

IoT EventsでLambdaにペイロードを渡す際は以下の設定が必要になります。

  • IoT Ruleからもらったペイロードの項目名を全てIoT Eventsのインプットに定義する
  • ペイロードの項目名を全てカスタムペイロードの設定か変数に記載する

上記2点の問題はペイロードの項目数が少ない場合は問題ないですが、項目数が10~100個以上になった場合記述が大変になります。そこでIoT Ruleにきたデータを並行してDynamoDBに渡してから取得することを検討しました。

本記事は、IoT Eventsを使っていてペイロードの項目数が多くなってきた場合に参考になるかと思います。

参考元

基本的に実装は以前作成した以下のサンプルコードを元にしています。今回は主にスタックの内容やLambdaのコードだけ変更しています。

実装

上記のソースからCDKのスタックで変更している部分から紹介します。

lib/aws-cdk-iot-event-sample-stack.ts

import * as cdk from "@aws-cdk/core";
import * as iot from "@aws-cdk/aws-iot";
import * as iotEvents from "@aws-cdk/aws-iotevents";
import * as iam from "@aws-cdk/aws-iam";
import * as dynamodb from "@aws-cdk/aws-dynamodb";
import * as lambda from "@aws-cdk/aws-lambda";
import * as awsLambdaNodejs from "@aws-cdk/aws-lambda-nodejs";

export class AwsCdkIotEventSampleStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // テーブル作成
    const iotEventsItemTable = new dynamodb.Table(this, "IotEventsItemTable", {
      partitionKey: {
        name: "payloadHash",
        type: dynamodb.AttributeType.STRING,
      },
      tableName: "IOT_EVENTS_ITEM",
      removalPolicy: cdk.RemovalPolicy.DESTROY,
    });

    const lambdaRole = new iam.Role(this, "LambdaRole", {
      assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
      managedPolicies: [
        {
          managedPolicyArn: "arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess",
        },
        {
          managedPolicyArn: "arn:aws:iam::aws:policy/CloudWatchLogsFullAccess",
        },
      ],
    });

    // Lambda作成
    const iotEventsLambda = new awsLambdaNodejs.NodejsFunction(
      this,
      "iot-events-lambda",
      {
        entry: "./src/lambda/handler.ts",
        handler: "handler",
        runtime: lambda.Runtime.NODEJS_12_X,
        timeout: cdk.Duration.seconds(30),
        functionName: "iot-events-lambda",
        memorySize: 128,
        role: lambdaRole,
      }
    );

    // 変数かカスタムペイロードを使う場合は以下の`attributes`に1つずつ項目設定が必要
    const lineInput = new iotEvents.CfnInput(this, "LineInput", {
      inputName: "lineInput",
      inputDefinition: {
        attributes: [
          { jsonPath: "deviceId" },
          { jsonPath: "lineStartTime" },
          { jsonPath: "lineEndTime" },
          { jsonPath: "payloadHash" },
        ],
      },
    });

    const lineIotTopicRole = new iam.Role(this, "LineIotTopicRole", {
      assumedBy: new iam.ServicePrincipal("iot.amazonaws.com"),
      managedPolicies: [
        { managedPolicyArn: "arn:aws:iam::aws:policy/AWSIoTEventsFullAccess" },
        {
          managedPolicyArn: "arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess",
        },
      ],
    });

    new iot.CfnTopicRule(this, "LineIotTopic", {
      topicRulePayload: {
        sql: `SELECT newuuid() as payloadHash, * FROM 'line/'`,
        actions: [
          {
            iotEvents: {
              inputName: lineInput.inputName!,
              roleArn: lineIotTopicRole.roleArn,
            },
          },
          {
            dynamoDBv2: {
              putItem: {
                tableName: iotEventsItemTable.tableName,
              },
              roleArn: lineIotTopicRole.roleArn,
            },
          },
        ],
        ruleDisabled: false,
        awsIotSqlVersion: "2016-03-23",
      },
      ruleName: `LineIotTopicRule`,
    });

    const lineModelRole = new iam.Role(this, "LineModelRole", {
      assumedBy: new iam.ServicePrincipal("iotevents.amazonaws.com"),
      managedPolicies: [
        {
          managedPolicyArn: "arn:aws:iam::aws:policy/AWSLambda_FullAccess",
        },
      ],
    });

    const lineDetectorModelDefinition: iotEvents.CfnDetectorModel.DetectorModelDefinitionProperty = {
      initialStateName: "line-start",
      states: [
        {
          stateName: "line-start",
          onEnter: {
            events: [
              {
                eventName: "variable-initialize",
                condition: `true`,
                actions: [
                  {
                    setVariable: {
                      variableName: "previosLineStartTime",
                      value: `$input.${lineInput.inputName}.lineStartTime`,
                    },
                  },
                  {
                    setVariable: {
                      variableName: "previosLineEndTime",
                      value: `$input.${lineInput.inputName}.lineEndTime`,
                    },
                  },
                ],
              },
            ],
          },
          onInput: {
            transitionEvents: [
              {
                eventName: "line-end-check",
                nextState: "line-end",
                condition: `$input.${lineInput.inputName}.lineEndTime !=  $variable.previosLineEndTime`,
                actions: [
                  {
                    setVariable: {
                      variableName: "payloadHash",
                      value: `$input.${lineInput.inputName}.payloadHash`,
                    },
                  },
                ],
              },
            ],
          },
          onExit: {
            events: [
              {
                eventName: "line-start-event",
                actions: [
                  {
                    lambda: {
                      functionArn: iotEventsLambda.functionArn,
                      payload: {
                        // カスタムペイロードで渡す場合は payloadHash のように1項目ずつ定義が必要
                        contentExpression: `\'{
                                              \"payloadHash\": \"\${$input.${lineInput.inputName}.payloadHash}\"
                                            }\'`,
                        type: "JSON",
                      },
                    },
                  },
                  {
                    setVariable: {
                      variableName: "previosLineEndTime",
                      value: `$input.${lineInput.inputName}.lineEndTime`,
                    },
                  },
                ],
              },
            ],
          },
        },
        {
          stateName: "line-end",
          onInput: {
            transitionEvents: [
              {
                eventName: "line-start-check",
                nextState: "line-start",
                condition: `$input.${lineInput.inputName}.lineStartTime !=  $variable.previosLineStartTime`,
              },
            ],
          },
          onExit: {
            events: [
              {
                eventName: "line-start-event",
                actions: [
                  {
                    setVariable: {
                      variableName: "previosLineStartTime",
                      value: `$input.${lineInput.inputName}.lineStartTime`,
                    },
                  },
                ],
              },
            ],
          },
        },
      ],
    };

    new iotEvents.CfnDetectorModel(this, "lineModel", {
      detectorModelDefinition: lineDetectorModelDefinition,
      detectorModelName: "line-model",
      key: "deviceId",
      evaluationMethod: "BATCH",
      roleArn: lineModelRole.roleArn,
    });
  }
}

次にLambdaの実装を記載します。Lambdaはシンプルに受け取ったpayloadHashでDynamoDBを検索して値を取得しています。

src/lambda/handler.ts

import * as DynamoDB from "aws-sdk/clients/dynamodb";

export const dynamodbDocumentClient = new DynamoDB.DocumentClient();

interface Event {
  payloadHash: string;
}

export const handler = async (event: Event) => {
  console.log(JSON.stringify(event));

  // DynamoDBから取得
  const params: DynamoDB.DocumentClient.GetItemInput = {
    TableName: "IOT_EVENTS_ITEM",
    Key: {
      payloadHash: event.payloadHash,
    },
  };

  const response = await dynamodbDocumentClient.get(params).promise();
  console.log(JSON.stringify(response));
};

動作確認

実際にIoT Ruleへデータを送って、IoT Eventsで定義していないデータが取得できているか確認します。以下のコマンドでIoT Ruleへデータを送信します。

送信内容(sampleがIoT Eventsで定義していない項目):

tests/sample-event/iot-rule-test-payload.json

{
  "topic": "line/",
  "qos": 0,
  "payload": "{\"deviceId\": \"device001\", \"lineStartTime\":\"10:00:00\",  \"lineEndTime\":\"19:00:00\",  \"sample\":\"test\" }"
}
# ATSエンドポイント確認
$ aws iot describe-endpoint --endpoint-type iot:Data-ATS
{
    "endpointAddress": "xxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
}
# IoT Ruleへ送信
$ aws iot-data publish --cli-input-json file://tests/sample-event/iot-rule-test-payload.json --endpoint-url "https://xxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
# stateの`line-start`終了時(onExit)にLambdaの起動を設定したので、`lineEndTime`を`20:00:00`に修正して再実行
$ aws iot-data publish --cli-input-json file://tests/sample-event/iot-rule-test-payload.json --endpoint-url "https://xxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"

Lambda の CloudWatch Logs を確認するとデータが取得できていることが確認できます。

所感

IoT Eventsのインプットの設定、カスタムペイロードへの設定追加 or 変数定義追加を地道にやるパターンもありますがデータが多くなると記述量や管理が大変になるので今回の方法を試してみました。

大規模なシステムのIoTサービスへIoT Eventsを適応する場合に参考になれば幸いです。