AWS IoT CoreのルールでDynamoDBのデータを取得してみる

まいど、大阪の市田です。
昨年2019年の10月に「AWS IoT CoreのルールでDynamoDBからデータを取得できる」機能が追加リリースされました。まだ試せていなかったので、今回はこちらを紹介したいと思います。

AWS IoT Core にルールの SQL を使用して DynamoDB からデータを取得する機能を追加

概要

従来からAWS IoT Coreのルールでは、「DynamoDBにデータを登録」することは可能でしたが、テーブルのデータを参照することはできませんでした。
そのため、「DynamoDBのデータと比較して条件に応じて何かアクションしたい」 場合は、簡単な処理でもLambdaを使う必要がありました。

今回紹介する機能により、簡単な処理であればLambdaを使う事無くマネージドサービスとして簡単にDynamoDBのデータを利用することができるようになりました。

やってみた

いくつかのケースやパターンを想定して使い方を確認してみました。

モーターの異常を検知するケース

まずは、リリースのお知らせに書かれているユースケースを想定した内容で、検証してみたいと思います。

モーターのアンペア数が監視するしきい値を超過し、かつ、モーターのベルト速度が一定値よりも小さい場合、担当者に通知する

詳細

  • モーターのベルト速度が登録されているDynamoDBのテーブルが既に存在する(belt-speed-tbl)
  • モーターのアンペア数がセンサーからAWS IoT CoreにPublishされる
  • 条件にマッチした時、Amazon SNSで担当者にメール通知する

イメージ

構成のイメージとしては下記のような形です。
(左側のアイコンはモーターとベルトをイメージした自作アイコンです。)

17-diagram

準備

DynamoDBの作成

まずはDynamoDBを作成しましょう。各モーターのベルト速度が格納されるテーブルbelt-speed-tblを作成します。
今回は下記のようにdeviceIdをパーティションキー、locationをソートキーとしたテーブルを作成しました。

10-belt-speed-tbl

IAM Roleの作成

AWS IoT RulesがDynamoDBにアクセスするためのIAM Roleを作成しておきます。下記のように対象テーブルを指定して最小限な権限のみとします。
後でARNを指定するので控えておきましょう。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            “Effect”: “Allow”,
            “Action”: “dynamodb:GetItem”,
            “Resource”: “arn:aws:dynamodb:<aws-region>:<account-id>:table/<table-name>”
        }
    ]
}}

AWS IoTと信頼関係も設定しておきます。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "iot.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

AWS IoT Rulesの作成

今回はコンソールからルールを作成します。

11-make-rules

ルール名は適当なものを指定します。肝心のルールクエリステートメントは下記のようにしました。

SELECT * FROM 'some/topic' 
WHERE 
amperage > 20
AND 
get_dynamodb("belt-speed-tbl", "deviceId", id, "location", locale, "arn:aws:iam::xxxxxxxxxxxx:role/xxxxxxxxxx").speed < 30

書式としては下記の通りです。roleArnには、先程作成したIAM RoleのARNを指定します。

get_dynamodb (tableName、partitionKeyName、partitionKeyValue、sortKeyName、sortKeyValue, roleArn)

ここでのポイントは、DynamoDBのデータを取得する関数get_dynamodb()を使っている点です。
この関数が今回紹介したい全てなので、この関数の使い方だけ分かっていただければこの記事の役目は終わりです。

AWS IoT SQLリファレンス - get_dynamodb()

さて、先程のルールクエリの意味としては次のような内容になります。

  • トピックsome/topicにアンペア数amperage、デバイスIDid、設置場所localeが記載されたデータがデバイスからパブリッシュされる
  • 次の条件を満たす場合、指定したアクションを実行する(SNSで通知、S3にデータ格納)
    • アンペア数が「20」より大きい
    • 該当デバイスIDのベルト速度speedが「30」より小さい

また、ここで利用しているget_dynamodb()のパラメータで、idlocaleは、パブリッシュされてくるjsonデータのキー名に合わせる形になります。そのため、このkeyValueに相当する部分は""でくくらないようにします。

get_dynamodb("belt-speed-tbl", "deviceId", id, "location", locale, "<ARN>").speed < 30

実際に送られてくるデータは下記のようなものとします。

{"id":"001","amperage":"26","locale":"chuo-osaka-jp"}

作成できたら下のようになります。

13-setted-rule

テストデータで確認

それでは、下記のテストデータをPublishして動作確認をしてみましょう。

{"id":"001","amperage":"26","locale":"chuo-osaka-jp"}

テストデータのPublishはコンソールから行いました。

14-test-publish

DynamoDBのbelt-speed-tblには、deviceIdlocationが該当するデータがあります。このspeedの値が30より小さいので指定アクションが発動します。

{"deviceId": "001","location": "chuo-osaka-jp","speed": "25"}

実際に、Publishしたものと同じデータがS3に保存されました。

{"id":"001","amperage":"26","locale":"chuo-osaka-jp"}

またメールでも通知が届いていました。

15-sns-message

次のようなデータだとdeviceIdが「003」で、locationが「chuo-osaka-jp」であるspeedの値は25で、条件にマッチしないのでアクションは実行されません。

{"id":"003","amperage":"22","locale":"chuo-osaka-jp"}

同様に、次のようなデータの場合、deviceIdが「005」でlocationが「shinjuku-tokyo-jp」であるデータはDBに存在しないので、これもアクションは実行されません。

{"id":"005","amperage":"22","locale":"shinjuku-tokyo-jp"}

このようにget_dynamodb()は、Publishされてきたjsonデータのキー名をDynamoDBのキーに置き換えてデータ取得することができる関数になっています。

Publishされるjsonデータのキー DynamoDBのキー
id deviceId(パーティションキー)
locale location(ソートキー)

DynamoDBのデータを返すパターン

DynamoDBのデータを取得して、そのデータを返すこともできます。
例えば、指定のアンペア数を超えた時に、DynamoDBに格納されているベルト速度も一緒に返す(通知に含める)ような事ができます。

ルールとしては下記のようなものになります。

SELECT *, get_dynamodb("belt-speed-tbl", "deviceId", id, "location", locale, "arn:aws:iam::xxxxxxxxxxxx:role/xxxxxxxxxx").speed
AS belt_speed
FROM
'some/topic' 
WHERE 
amperage > 20

ここで先程と同じデータをPublishしてみると、今度はDynamoDBのspeedのデータがbelt_speedとして追加された形で通知を受け取ることができました。

{"id":"001","amperage":"26","locale":"chuo-osaka-jp","belt_speed":"25"}

なお、先程の条件判定と組み合わせて、2回get_dynamodb()を使えば次のような事ができそうです。

  • 指定アンペアを超過、かつ指定速度よりベルト速度が小さい場合
  • ベルト速度のデータを追加して通知する

しかしget_dynamodb()「1つのSQL文につき1回まで」という利用制限があります。2回以上使うとアクションが実施されないので注意が必要です。
その他の注意点はリファレンスを参照してください。

入れ子になっているデータを返すパターン

また、DynamoDBで入れ子になっている項目でも.構文で取得したいデータを細かく指定することができます。

16-ddb-ireko-item

{
  "deviceId": "001",
  "testitem": {
    "color": {
      "blue": 0,
      "green": 0,
      "red": 255
    },
    "location": "osaka",
    "temperature": 51
  },
  "location": "osaka"
}

上記のようなデータの内、redの値を取得したい場合は下記のようになります。この場合はred_valueという名前で取得されます。

get_dynamodb("testtbl", "deviceId", id, "location", locale, "arn:aws:iam::xxxxxxxxxxxx:role/xxxxxxxxxx").testitem.color.red AS red_value

試してみましょう。ルールクエリは次のようなものを指定します。

SELECT id, get_dynamodb("testtbl", "deviceId", id, "location", locale, "arn:aws:iam::xxxxxxxxxxxx:role/xxxxxxxxxx").testitem.color.red AS red_value
FROM 'some/topic'

トピックsome/topicに次のデータをPublishしてみます。

{"id":"001","amperage":"26","locale":"osaka"}

無事にred_valueとしてredのデータを取得することができました。(SNSで通知を受け取れました)

{"id":"001","red_value":255}

最後に

一つのSQLに1回しか利用できないという制約はあるものの、簡単な処理であれば、Lambdaからマネージドなこの機能に変更して運用コストを減らすことができると思います。

ぜひ活用していただければと思います。

以上です。