MQTTに到着するズンドコをAWS Iot Eventsでキヨシ判定してみた

2021.08.01

1 はじめに

IoT事業部の平内(SIN)です。

ここ、DevelopersIOに、Kinesis Data Analyticsを使い方を超興味深く、紹介している記事があります。

今回、Kinesis Data Analyticsについて、ちょっと調べ物をしている際に、上記の記事にヒットして、改めて読ませて頂いていると・・・もしかすると、ズンドコは、AWS IoT Eventsでも、相性よく捌けるのでは?と思ってしまい、本題を忘れて、このブログを書いてます。

最初に結論ですが、やはり、ストリームを高速に捌くという要件では、Kinesis Data Analyticsが、俄然おすすめです。

本記事は、もし、「ズンドコ要件をIoT Eventsで実装するなら」ぐらいの軽いノリです。どうかお許しください。

2 構成

構成は、以下のようにしました。

入力側は、MQTTを送信するデバイスをイメージしたLambdaで、ランダムに、「ズン」若しくは、「ドコ」を送ります。

到着したメッセージは、そのままAWS IoT Eventsに送られます。

IoT Eventsでは、「ズン」「ズン」「ズン」「ズン」「ドコ」のパターンを検出すると、「キヨシ」というLambdaを実行します。

最終的に、入力側のLambdaと出力側のキヨシLambdaのログを付き合わせて、動作確認する流れです。

詳しい「ズンドコ要件」については、先の記事をご参照ください。

3 入力側Lambda

入力側のLambdaのコードです。内容は、ほぼ、先の記事のままで、Kinesis Data Analyticsに送るところを、MQTTPublishに置き換えているだけです。

import json
import boto3
import random
import datetime
import time

iot = boto3.client('iot-data', region_name='ap-northeast-1')
topic = 'zundoko'

def lambda_handler(event, context):
    zundoko_array = [""] * 5

    for i in range(300):
        zundoko = random.choice(['ズン', 'ドコ'])

        del zundoko_array[0]
        zundoko_array.append(zundoko)
        is_zun_zun_zun_zun_doko = zundoko_array == ['ズン', 'ズン', 'ズン', 'ズン', 'ドコ']
        if is_zun_zun_zun_zun_doko:
            kiyoshi_time = datetime.datetime.utcnow()
            print(kiyoshi_time)

        payload = {
            "zundoko": zundoko
        }   
        try:
            iot.publish(
                topic=topic,
                qos=0,
                payload=json.dumps(payload, ensure_ascii=False)
            )
        except Exception as e:
            print(e)
            break

        time.sleep(0.1)

実行すると、AWS IoTのコンソールで、到着しているメッセージを確認できます。

4 出力側Lambda

IoT Eventsからinvokeされる、キヨシLambdaです。呼ばれた時間をLogに記録しているだけです。

import json
import datetime

def lambda_handler(event, context):
    print(json.dumps(event))
    print(datetime.datetime.utcnow())
    return

5 IoT Events

(1) 入力

入力は、到着していたメッセージをそのまま使って、Zundokoという名前で作成しました。

(2) 探知機モデル

探知機モデルの状態は、2つです。

  • Idle
  • StandBy

2つの状態は、下記の条件で移行します。

toStandBy "ズン"を受信した際に、StandBy状態に移行する

$input.Zundoko.zundoko == "ズン"

toIdle "ドコ"を受信した際に、Idle状態に移行する

$input.Zundoko.zundoko == "ドコ"

StandBy状態では、以下の3つのイベントが定義されています。

  • OnEnter
  • OnInput
  • OnExit

OnEnterでは、変数counterを0に初期化します。

OnInputでは、「ズン」が到着した時に、counterをインクリメントしています。

OnExitでは、counterが3以上の場合に、キヨシLambdaをInvokeしています。 「StandBy」状態に居ると言うことは、「ズン」の受信が続いていることになりますが、「ドコ」が到着して「Idle」に遷移する際に、counterを確認してみて、4回「ズン」が続いている場合は、「パターンが成立している」ので、「キヨシ」の実行となる流れです。

6 結果

結果です。入力側のLambdaのログと、出力側のLambdaの時間を確認してみると、概ね1秒程度で「キヨシ」が、実行されているようです。

入力側 出力側
2021-08-01 11:03:15.783485 2021-08-01 11:03:16.512271
2021-08-01 11:03:21.147276 2021-08-01 11:03:22.045987
2021-08-01 11:03:24.510608 2021-08-01 11:03:25.232286
2021-08-01 11:03:27.616386 2021-08-01 11:03:28.728645
2021-08-01 11:03:30.414850 2021-08-01 11:03:31.233361
2021-08-01 11:03:32.989325 2021-08-01 11:03:33.754709
2021-08-01 11:03:35.209572 2021-08-01 11:03:36.036711

7 最後に

今回、IoT Eventsで、「ズンドコ」パターンの検出を行ってみました。「過去の状態に基づいて、条件判断する」という要件は、変数や状態を保持することができる、IoT Eventsで処理することが可能です。

ただ、最初に記載しました通り、ストリームを高速に捌くという意味では、Kinesisに軍配が上がると思います。