AWS IoT EventsをAWS CDKで作ってみた

AWS IoT Eventsも AWS CDKで構築!
2021.03.31

はじめに

CX事業本部の佐藤智樹です。

今回はAWS IoT EventsをAWS CDKで構築する場合の方法とサンプルコードを紹介します。

AWS IoT Eventsのディテクターはエクスポートして保存することがBlack Beltで推奨されています。手動で作成したディテクターはエクスポートするとJSON形式になるので、AWS CDKなら簡単にIaCの一部として組み込めるかと思い実験してみました。

本記事はこれからAWS IoT Eventsを使う方にとって簡単にサンプルコードでお試しすることができます。またAWS IoT Eventsを使っている方でも、継続的に開発してバージョン管理をしたい場合に参考になるかと思います。

参考情報

以下の記事を参考に作成しているので、よければそちらもご覧ください。

今回作成するサービスの全体概要

以下に今回作成する内容の概要図を記載します。

IoT Ruleがデバイス側からのエンドポイントとなってIoT Eventsにデータを送り、データに応じてステートが変化するときにSNSでメール通知するといった内容です。ステートはline-start状態の場合、終了時間(lineEndTime)が変化するとline-endに移行し、line-end状態から開始時間(lineStartTime)が変化するとline-startに遷移します。

サンプルソース

サンプルソースは以下に置いてあります。本記事では以下のソースを元に解説/動作テストを行っていきます。

動作環境

項目 バージョン
Node v14.4.0
yarn 1.21.1
AWS CDK 1.92.0

AWS CDKでIoT Eventsの作成

コードで以下のリソースを定義しています。

  • メール通知用のSNSトピック
  • IoT Rule
  • IoT RuleからIoT EventsをキックするためのIAMロール
  • IoT Eventsの入力情報
  • IoT Eventsの探知器モデル
  • IoT Eventsの探知器モデルでSNSをキックするためのIAMロール

lib/aws-cdk-iot-event-sample-stack.ts

import * as cdk from "@aws-cdk/core";
import * as iot from "@aws-cdk/aws-iot";
import * as iotEvents from "@aws-cdk/aws-iotevents";
import * as sns from "@aws-cdk/aws-sns";
import * as iam from "@aws-cdk/aws-iam";

const SUBSCRIPTION_EMAIL = process.env.SUBSCRIPTION_EMAIL!;

export class AwsCdkIotEventSampleStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    if (!SUBSCRIPTION_EMAIL) {
      console.log("Set your email to an environment variable");
      throw new Error();
    }

    const lineNotificationTopic = new sns.Topic(this, "LineNotificationTopic");
    new sns.Subscription(this, "LineNotificationSubscribe", {
      endpoint: SUBSCRIPTION_EMAIL,
      protocol: sns.SubscriptionProtocol.EMAIL,
      topic: lineNotificationTopic,
    });

    // IoT Eventsで受け取る値の設定、設定していない値は受け取れない。
    const lineInput = new iotEvents.CfnInput(this, "LineInput", {
      inputName: "lineInput",
      inputDefinition: {
        attributes: [
          { jsonPath: "deviceId" },
          { jsonPath: "lineStartTime" },
          { jsonPath: "lineEndTime" },
        ],
      },
    });

    const lineIotTopicRole = new iam.Role(this, "LineIotTopicRole", {
      assumedBy: new iam.ServicePrincipal("iot.amazonaws.com"),
      managedPolicies: [
        { managedPolicyArn: "arn:aws:iam::aws:policy/AWSIoTEventsFullAccess" },
      ],
    });

    const lineIotTopic = new iot.CfnTopicRule(this, "LineIotTopic", {
      topicRulePayload: {
        sql: `SELECT deviceId, lineStartTime, lineEndTime  FROM 'line/'`,
        actions: [
          {
            iotEvents: {
              inputName: lineInput.inputName!,
              roleArn: lineIotTopicRole.roleArn,
            },
          },
        ],
        ruleDisabled: false,
        awsIotSqlVersion: "2016-03-23",
      },
      ruleName: `LineIotTopicRule`,
    });

    const lineModelRole = new iam.Role(this, "LineModelRole", {
      assumedBy: new iam.ServicePrincipal("iotevents.amazonaws.com"),
      managedPolicies: [
        {
          managedPolicyArn: "arn:aws:iam::aws:policy/AmazonSNSFullAccess",
        },
      ],
    });

    // 検出器モデルの作成。この部分の実装はGUIと併用することで効率的にモデルの作成/取り込みができる
    const lineDetectorModelDefinition = {
      initialStateName: "line-start",
      states: [
        {
          stateName: "line-start",
          onEnter: {
            events: [
              {
                eventName: "variable-initialize",
                condition: `true`,
                actions: [
                  {
                    setVariable: {
                      variableName: "previosLineStartTime",
                      value: `$input.${lineInput.inputName}.lineStartTime`,
                    },
                  },
                  {
                    setVariable: {
                      variableName: "previosLineEndTime",
                      value: `$input.${lineInput.inputName}.lineEndTime`,
                    },
                  },
                ],
              },
            ],
          },
          onInput: {
            transitionEvents: [
              {
                eventName: "line-end-check",
                nextState: "line-end",
                condition: `$input.${lineInput.inputName}.lineEndTime !=  $variable.previosLineEndTime`,
              },
            ],
          },
          onExit: {
            events: [
              {
                eventName: "line-start-event",
                actions: [
                  {
                    sns: {
                      payload: {
                        type: "STRING",
                        contentExpression: "'Line ended'",
                      },
                      targetArn: lineNotificationTopic.topicArn,
                    },
                  },
                  {
                    setVariable: {
                      variableName: "previosLineEndTime",
                      value: `$input.${lineInput.inputName}.lineEndTime`,
                    },
                  },
                ],
              },
            ],
          },
        },
        {
          stateName: "line-end",
          onInput: {
            transitionEvents: [
              {
                eventName: "line-start-check",
                nextState: "line-start",
                condition: `$input.${lineInput.inputName}.lineStartTime !=  $variable.previosLineStartTime`,
              },
            ],
          },
          onExit: {
            events: [
              {
                eventName: "line-start-event",
                actions: [
                  {
                    sns: {
                      payload: {
                        type: "STRING",
                        contentExpression: "'Line sterted'",
                      },
                      targetArn: lineNotificationTopic.topicArn,
                    },
                  },
                  {
                    setVariable: {
                      variableName: "previosLineStartTime",
                      value: `$input.${lineInput.inputName}.lineStartTime`,
                    },
                  },
                ],
              },
            ],
          },
        },
      ],
    };

    // TODO: Add Model Check Process
    // aws iotevents start-detector-model-analysis --detector-model-definition line-model

    const lineModel = new iotEvents.CfnDetectorModel(this, "lineModel", {
      detectorModelDefinition: lineDetectorModelDefinition,
      detectorModelName: "line-model",
      key: "deviceId",
      evaluationMethod: "BATCH",
      roleArn: lineModelRole.roleArn,
    });
  }
}

今回の実装には入っていませんが、実際にGUIでモデルを作らないでCDK側から実装を始める場合は、モデル作成後にstart-detector-model-analysisのコマンドで形式に誤りがないかチェックを入れる方がデプロイ前に間違いに気付けるので良さそうです。

上記試す場合はサンプルコードをクローンしてください。クローンできたら以下のコマンドで初期設定を行ってください。

$ yarn

AWSの認証情報などを設定後、環境変数に自分のメールアドレスを設定してください。

$ export SUBSCRIPTION_EMAIL=hogehoge@fugafuga

その後以下のコマンドでデプロイしてください。特に問題なければそのまま動作テストに移ります。

$ yarn cdk deploy

動作確認

本節からは実際にIoT EventsやIoT RuleのトピックをAWS SDKから実行して問題なく動作することを確認します。

余談ですが、IoT Eventsのデバッグで動作を確認したい場合はコンソールからログを有効化することをおすすめします。

IoT Eventsから実行

問題が起きた時に特定しやすくするため、先に以下のIoT Events->SNSの範囲だけ先に動作確認します。

以下のペイロードを送信して、探知器モデルを初期状態に移行させます。

tests/sample-event/iot-rule-test-payload.json

{
  "messages": [
    {
      "messageId": "001",
      "inputName": "lineInput",
      "payload": "{\"deviceId\": \"device001\", \"lineStartTime\":\"10:00:00\",  \"lineEndTime\":\"20:00:00\" }"
    }
  ]
}

以下のコマンドでIoT Eventsにデータを送信します。

$ aws iotevents-data batch-put-message --cli-input-json file://tests/sample-event/iot-events-test-payload.json

送信後AWSのコンソールを確認すると、探知器が追加されています。

また以下のコマンドで探知器を指定して確認もできます。

$ aws iotevents-data describe-detector --detector-model-name line-model --key-value device001
{
    "detector": {
        "detectorModelName": "line-model",
        "keyValue": "device001",
        "detectorModelVersion": "1",
        "state": {
            "stateName": "line-start",
            "variables": [
                {
                    "name": "previosLineEndTime",
                    "value": "\"20:00:00\""
                },
                {
                    "name": "previosLineStartTime",
                    "value": "\"10:00:00\""
                }
            ],
            "timers": []
        },
        "creationTime": 1616829105.227,
        "lastUpdateTime": 1616907366.627
    }
}

次に条件に応じて状態が変化するか確認します。lineEndTimeを以下のように変更して再度コマンドを実行します。

tests/sample-event/iot-rule-test-payload.json

{
  "messages": [
    {
      "messageId": "001",
      "inputName": "lineInput",
      "payload": "{\"deviceId\": \"device001\", \"lineStartTime\":\"10:00:00\",  \"lineEndTime\":\"21:00:00\" }"
    }
  ]
}

AWSコンソールを確認すると状態がline-startからline-endに変化していることが確認できます。

コマンドでも変更が確認できます。

$ aws iotevents-data describe-detector --detector-model-name line-model --key-value device001
{
    "detector": {
        "detectorModelName": "line-model",
        "keyValue": "device001",
        "detectorModelVersion": "1",
        "state": {
            "stateName": "line-end",
            "variables": [
                {
                    "name": "previosLineEndTime",
                    "value": "\"21:00:00\""
                },
                {
                    "name": "previosLineStartTime",
                    "value": "\"10:00:00\""
                }
            ],
            "timers": []
        },
        "creationTime": 1616829105.227,
        "lastUpdateTime": 1616908481.411
    }
}

SNSの送信先のメールアドレスも確認すると、開始と終了でメールを送信できていることが確認できました。

若干冗長になるので省きますが、line-end`からline-start`への変更も確認できました。

IoT Ruleから実行

IoT Events->SNSの動作が確認できたので次は、IoT Ruleからの動作確認も行います。

まず以下のコマンドでIoT CoreのATSエンドポイントを確認します。

$ aws iot describe-endpoint --endpoint-type iot:Data-ATS
{
    "endpointAddress": "xxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
}

次にIoT Ruleに送信するペイロードを設定します。今はline-end状態になっているのでline-start状態になるようlineStartTimeを変更します。

tests/sample-event/iot-rule-test-payload.json

{
  "topic": "line/",
  "qos": 0,
  "payload": "{\"deviceId\": \"device001\", \"lineStartTime\":\"09:00:00\",  \"lineEndTime\":\"21:00:00\" }"
}

ATSエンドポイントを設定して、コマンドを実行します。

$ aws iot-data publish --cli-input-json file://tests/sample-event/iot-rule-test-payload.json --endpoint-url "<Your IoT Core ATS Endpoint>"

コマンドで状態を確認するとline-start状態に遷移できていることが確認できます。

aws-cdk-iot-event-sample % aws iotevents-data describe-detector --detector-model-name line-model --key-value device001                 
{
    "detector": {
        "detectorModelName": "line-model",
        "keyValue": "device001",
        "detectorModelVersion": "1",
        "state": {
            "stateName": "line-start",
            "variables": [
                {
                    "name": "previosLineEndTime",
                    "value": "\"21:00:00\""
                },
                {
                    "name": "previosLineStartTime",
                    "value": "\"09:00:00\""
                }
            ],
            "timers": []
        },
        "creationTime": 1616829105.227,
        "lastUpdateTime": 1616910695.37
    }
}

メールでも状態が開始に遷移していることが確認できます。

所感

IoT Eventsの探知器モデル自体がJSONでエクスポートできるので、AWS CDKとも親和性があるかと思います。TerraformだとまだIoT Events自体に対応していない注意してください。これからIoT Eventsを始める際に参考になれば幸いです。