Amazon Monitron の異常検知アラートを Monitron アプリケーション以外(Slack)に送ってみる

Amazon Monitron の活用例として任意の宛先に通知する仕組みを考えてみました。
2023.10.13

Amazon Monitron の異常通知とは

Amazon Monitron では、専用のモバイルアプリケーションや Web アプリケーションが提供されており、振動や温度の異常を検知した場合に各アプリケーションへ通知を送ることができます。

異常判定の種類

Amazon Monitron による異常判定は次の2種類があります。

  • 機械学習による判定
  • ISO の基準ベースによる判定

「機械学習による判定」の場合、2週間程度データを実機から収集・蓄積すると機械学習により異常かどうかを判定し、異常と判断されると通知が行われます。また、機械学習による判定が行われる前(十分なデータが蓄積される前)であれば、ISO の基準にもとづいた「しきい値ベース」で判定が行われます。

異常通知の宛先

Amazon Monitron をデフォルトで利用する場合、専用アプリケーション以外へ通知することができません。 そのため、異常時に「メールで受け取りたい」「電話をかけてほしい」といったカスタム要件に対応できません。

また、Amazon Monitron では「サイトあたりの最大ユーザー数が 20」 という制約もあるので、20 人以上のユーザーがいると利用できない人が出てしまいます。

カスタム要件に対応する方法

このようにデフォルトの機能では満たせない要件がある場合、収集データを Amazon Kinesis Data Streams にエクスポートすることで機能拡張することができます。

ただし、カスタム要件に利用できるのはエクスポートしたデータだけなので、「データ取得タイミングを1時間より短くしたい」といった、エクスポートデータの利用以外のユースケースには対応できない点に注意が必要です。

Slack に異常を通知する

今回は、センサーで異常を検知した際に Slack の専用チャンネルに警告を投稿してみようと思います。
なお、機械学習ベースのデータはまだ蓄積できていないので、今回は「振動」データに対して ISO 基準で通知するようにします。

03-custom-notify

Kinesis v2 によるデータのエクスポート

データのエクスポートはとても簡単です。エクスポート手順のドキュメントは下記になります。

Amazon Monitron のコンソール画面で、エクスポートしたいプロジェクトを開きます。そのプロジェクトのページに「Live data export」という欄があります。このセクションを展開して「Start live data export」をクリックします。

04-live-data-export

すると次のような画面が開きます。ここではエクスポートしたデータの使い方や簡単なアーキテクチャ例が表示されているので、必要に応じて各種リンクのドキュメントなどを確認してください。

画面下の「Settings」の箇所でエクスポート先の Kinesis data stream を選択します。今回は新規に作成したので「Create a new data stream」をクリックします。

05-create-new-stream

下記のような手順を記載した画面が出ますが、そのまま「Create a new Amazon Kinesis data stream」を押して先に進みます。

06-acknowledged-creation

Amazon Kinesis Data Streams のコンソール画面が開くので、適当なデータストリーム名をセットしてください。「容量モード」と「シャード」は次のようにしました。

  • 容量モード:プロビジョンド
  • プロビジョニングされたシャード:1

Monitron によるデータ収集と送信は、基本的に「1時間に1回」なのでよほどセンサー数が多くなければシャードは1つでよいかと思います。
(手動で任意のタイミングでデータ収集と送信も可能ですが、モバイルアプリがインストールされたスマホをセンサーに近づけて実行する必要があります。)

07-set-stream-settings 08-create-stream

データストリームが作成できたら、Amazon Monitron のコンソールに戻ります。
ストリームのプルダウンから作成したストリームを選択して「Start live data export」 をクリックして完了です。

09-select-stream

ステータスが「Configured」に変わりエクスポートが開始されました。

10-setting-live-data-export

データエクスポートを停止する場合

Kinesis へのデータエクスポートを停止する場合は、先程の画面で「Actions」から「Stop live data export」をクリックするだけです。

11-stop-live-data-export

確認画面が出るので、そのまま「Stop」をクリックします。Kinesis 側のストリームを削除していなければいつでもエクスポートを再開することができます。

12-stop-export-cancel-stop

エクスポートを止めるとコンソール画面の表示が設定前の状態に変わります。

13-stopped-data-export

コンソールでデータを確認してみる

ここまでできたら、Kinesis のデータストリームにデータが届いているか確認してみます。
データは1時間ごとに取得・送信されるので、次のタイミングまで待機するか手動でデータ取得して、データストリームにデータを入れるようにします。

次のデータ送信タイミングは Monitron のアプリケーションで確認できます。

14-next-measurement

今回は、Kinesis データストリームに入っているデータを「時間」で探すので、センサーでデータ測定された直近の時間を確認しておきます。(下記の「Last measured」の時間です)
当然ですが、この「データ測定の時間」はデータストリームを作成した後である必要があります。

16-last-measured

Kinesis データストリームのコンソールから「データビューワー」タブを開きます。

15-kinesis-data-viewer

次のようにセットして「レコードを取得」をクリックします。 

  • シャード:shardId-0000000... を選択
    • シャードは一つしか作成していないので、プルダウンに出てくるものを選択
  • 開始位置:「タイムスタンプで」
  • 開始日:2023/08/09
    • 先ほど Monitron アプリで確認した Last measured の日付をセット
    • 右側にあるカレンダーアイコンからセットできます
  • 開始時刻:03:33:00
    • 先ほど Monitron アプリで確認した Last measured の時刻をセット

17-get-data-settings

データがストリームに入っていれば次のようにレコードが表示されます。
「データ」をクリックするとデータの内容を確認できます。

18-getted-data-2

データの内容を JSON 形式で表示・コピーすることができます。

19-json-data

一部マスクしていますが、定期的に計測されるデータ(1時間に1回計測)は次のような内容になります。(定期計測のデータは "eventType": "measurement" となっています。)

{
    "timestamp": "2023-10-11 09:33:44.511",
    "eventId": "1543",
    "version": "2.0",
    "accountId": "xxxxxxxxxxxx",
    "projectName": "MonitronTest",
    "eventType": "measurement",
    "eventPayload": {
        "siteName": "Site 2",
        "assetName": "Table fan",
        "positionName": "Fan Motor",
        "assetPositionURL": "https://app.monitron.aws/#/xxx/@assets/xxx/@details",
        "sensor": {
            "physicalId": "xxxxxxxxxxxx",
            "rssi": -45
        },
        "gateway": {
            "physicalId": "xxxxxxxxxxxx"
        },
        "sequenceNo": 1543,
        "features": {
            "acceleration": {
                "band0To6000Hz": {
                    "xAxis": {
                        "rms": 0.091
                    },
                    "yAxis": {
                        "rms": 0.3688
                    },
                    "zAxis": {
                        "rms": 11.2063
                    }
                },
                "band10To1000Hz": {
                    "totalVibration": {
                        "absMax": 0.1581,
                        "absMin": 0,
                        "crestFactor": 3.5079,
                        "rms": 0.0451
                    },
                    "xAxis": {
                        "rms": 0.0263
                    },
                    "yAxis": {
                        "rms": 0.0192
                    },
                    "zAxis": {
                        "rms": 0.0311
                    }
                }
            },
            "velocity": {
                "band10To1000Hz": {
                    "totalVibration": {
                        "absMax": 0.3742,
                        "absMin": 0,
                        "crestFactor": 3333.3333,
                        "rms": 0.1123
                    },
                    "xAxis": {
                        "rms": 0.0748
                    },
                    "yAxis": {
                        "rms": 0.0374
                    },
                    "zAxis": {
                        "rms": 0.0748
                    }
                }
            },
            "temperature": 24.268
        },
        "models": {
            "temperatureML": {
                "previousPersistentClassificationOutput": "HEALTHY",
                "persistentClassificationOutput": "HEALTHY",
                "pointwiseClassificationOutput": "INITIALIZING"
            },
            "vibrationISO": {
                "isoClass": "CLASS1",
                "mutedThreshold": null,
                "previousPersistentClassificationOutput": "HEALTHY",
                "persistentClassificationOutput": "HEALTHY",
                "pointwiseClassificationOutput": "HEALTHY"
            },
            "vibrationML": {
                "previousPersistentClassificationOutput": "HEALTHY",
                "persistentClassificationOutput": "HEALTHY",
                "pointwiseClassificationOutput": "INITIALIZING"
            }
        },
        "measurementTrigger": "periodic"
    }
}

このデータを見ると公式の Monitron アプリでは見れないデータも含まれてます。
また、previousPersistentClassificationOutput という項目で「前回のタイミングで計測した時のステータス」も含まれていることが分かります。この項目を使えば、機器の状態遷移を別途保持する仕組みを用意しなくていいので、異常発生時にのみアラート通知する仕組みが簡単に実装できそうです。

(もし「前回の計測時のステータス」が分からなければ、アラート状態のデータが来るたびに通知されてしまうので、AWS IoT Events などを使ってステータスを管理する必要が出てしまいます)

これで「定期計測」のデータ構造が分かったので、Lambda で Slack に通知する方向性も確認できました。次は「通知処理」の部分を作成していきます。

Slack の準備

Slack 側の準備で必要な主な作業は以下のとおりです。

  • 通知用の Slack チャンネルの用意
  • Slack App の作成と Incoming Webhook の設定
  • (必要に応じて)Slack App で利用するアイコンの準備
    • 投稿される Slackアプリケーションのアイコンに利用されます

Slack チャンネルの作成

今回は既存のものは使わずに新規に専用のチャンネル(monitron-alarm)を作成しました。チャンネルはプライベートでも問題ありません。

Slack App の作成と Incoming Webhook の設定

Slack への通知には Incoming Webhook を使います。
Incoming Webhookについて検索すると古い方法である「カスタムインテグレーション」がヒットすることがありますが、現在は下記の「Slack App」の利用が推奨されています。

Slack App の作成方法については、上記のドキュメントの通りです。ここでは実際の作り方を画面ショットと共に記載していきます。
先程のドキュメントページを見ると Create your Slack app というボタンがあるのでクリックします。

20-1-create-slack-app

ドキュメントページからではなくとも、下記の「Slack API」のトップページにある Your apps からもたどることができます。

20-2-your-slack-apps

ドキュメントページのリンクからアクセスすると、いきなり「Create an app」 と書かれた画面が出てくるので、From scratch をクリックします。
From an app manifest は Slack アプリの設定を JSON もしくは YAML で記載したものをロードすることでアプリを作成することができるようです)

21-1-direct-create-app

22-from-scratch

次の画面では、この Slack アプリの名前と連携したい Slack ワークスペースを選択して、「Create App」をクリックします。これで Slack アプリのベースが作成できました。

23-name-choose-workspace

アプリが作成できると、作成したアプリの「Basic Infomation」 の画面になっています。その画面を下にスクロールしていくと「Display Infomation」という箇所が出てくるので、投稿するメッセージの基本的な情報の設定を行います。

24-basic-info-display

今回は次のようにアイコンと背景色の設定だけ行いました。必要に応じて各種項目を設定してみてください。
なお、以前の「カスタムインテグレーション」では、アプリのアイコンなどは投稿時の JSON で指定することができましたが、「Slack アプリ」では Slack 上から設定する形に変わっており、よりセキュアになっています。
ちなみに、アイコンは「512px 〜 2000px 四方の正方形」のものである必要があります。

設定が終わったら「Save Changes」をクリックして保存します。

25-set-display-info

次に「Incoming Webhooks」の設定を行います。「Basic Information」の画面からたどるか、画面左側にあるメニュー欄からアクセスしてください。

26-add-configure-incoming-webhook-2

この時点では、まだ「Incoming Webhook」が無効になっているのでトグルボタンをクリックして有効にします。

31-turn-on-enable

「Incoming Webhook」が有効になると、Webhook URL の発行ボタンが出てくるので「Add New Webhook to Workspace」をクリックします。

32-add-new-webhook

「Add New Webhook to Workspace」をクリックすると、次のような画面に変わります。ここでは Slack アプリが指定のワークスペースに連携することを許可します。
メッセージの投稿先となるチャンネルを選択して「許可する」をクリックします。投稿するチャンネルは先に作成しておいた monitron-alarm を選択しました。

33-allow-post

これで「Webhook URL」が発行されました。後で利用するので控えておきましょう。

34-copy-webhook-url

Lambda 関数の作成

Slack 側の準備ができたので、次は Lambda 関数を作成します。
Lambda 関数の作成には今回は AWS SAM を利用しました。好みの方法で作成してもらえればと思います。

下記に AWS SAM による手順を記載しておきます。

  • 初期化
    • プロジェクト名は「monitron-slack-notify」としました。
sam init \
    --runtime python3.11 \
    --name monitron-slack-notify \
    --app-template hello-world \
    --package-type Zip
  • template.yaml の作成
    • 初期化の後、プロジェクト名のディレクトリが作成されるので移動します。
$ cd monitron-slack-notify/
  • template.yaml の内容
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  monitron-slack-notify

  Sample SAM Template for monitron-slack-notify

Globals:
  Function:
    Timeout: 3

Resources:
  MonitronKinesisExportFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: monitron-slack-notify
      CodeUri: hello_world/
      Handler: app.lambda_handler
      Runtime: python3.11
      AutoPublishAlias: dev
      Architectures:
        - arm64
      Events:
        Stream:
          Type: Kinesis
          Properties:
            Stream: arn:aws:kinesis:us-east-1:xxxxxxxxxxxx:stream/monitronStream
            BatchSize: 100
            MaximumRecordAgeInSeconds: 60
            MaximumRetryAttempts: 2
            StartingPosition: LATEST

  # CloudWatch Logs for Lambda
  LambdaFuncLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub /aws/lambda/${MonitronKinesisExportFunction}
      RetentionInDays: 30

Outputs:
  MonitronKinesisExportFunctionARN:
    Description: "Monitron Kinesis Export Function ARN"
    Value: !GetAtt MonitronKinesisExportFunction.Arn
  • hello_world/requirements.txt
request
  • hello_world/app.py
    • 今回は振動データに対して ISO 基準ベースで異常時に通知させるので ['eventPayload']['models']['vibrationISO'] 以下に入っているステータスデータを使って異常判定しています。
import json
import base64
import requests
import datetime

web_hook_url = "https://hooks.slack.com/services/xxxxxx"

def lambda_handler(event, context):
    for record in event['Records']:
        # Kinesis data is base64 encoded so decode here
        payload = base64.b64decode(record["kinesis"]["data"])
        print("Decoded payload: " + str(payload))
        jsn_dict = json.loads(payload.decode())

        # メッセージ中の eventType で条件分岐
        # measurement なら振動値RMSを取得して判定処理
        # measurement 以外なら何もせず終了
        if jsn_dict['eventType'] ==  "measurement":
            # 必要なデータの取得
            timestamp = jsn_dict['timestamp'] # "2023-08-29 04:52:35.721"
            print(f"timestamp: {str(timestamp)}")
            project_name = jsn_dict['projectName']
            site_name = jsn_dict['eventPayload']['siteName']
            asset_name = jsn_dict['eventPayload']['assetName']
            position_name = jsn_dict['eventPayload']['positionName']
            asset_positon_url = jsn_dict['eventPayload']['assetPositionURL']
            previous_persistent_classification = jsn_dict['eventPayload']['models']['vibrationISO']['previousPersistentClassificationOutput']
            persistent_classification = jsn_dict['eventPayload']['models']['vibrationISO']['persistentClassificationOutput']
            velocity_band10To1000Hz_x = jsn_dict['eventPayload']['features']['velocity']['band10To1000Hz']['xAxis']['rms']
            velocity_band10To1000Hz_y = jsn_dict['eventPayload']['features']['velocity']['band10To1000Hz']['yAxis']['rms']
            velocity_band10To1000Hz_z = jsn_dict['eventPayload']['features']['velocity']['band10To1000Hz']['zAxis']['rms']

            # Slack 上で JST 表示するために時刻を JST に変換
            datetime_utc = datetime.datetime.strptime(timestamp + "+0000", "%Y-%m-%d %H:%M:%S.%f%z")
            datetime_jst = datetime_utc.astimezone(datetime.timezone(datetime.timedelta(hours=+9)))
            timestamp_jst = datetime.datetime.strftime(datetime_jst, '%Y-%m-%d %H:%M:%S%z')

            #print(f"timestamp_jst: {str(timestamp_jst)}")

            # ステータスが HEALTHY → ALARM になれば通知
            # "前回ステータス:HEALTY", "今回ステータス:ALARM" の条件で通知
            if previous_persistent_classification == "HEALTHY" and persistent_classification == "ALARM":
                # メッセージボディとヘッダーの指定
                payload = {
                    "text": f":bangbang:*異常振動を検知 (ISOベース)* \
                    \n*センサー情報* \
                    \n- プロジェクト名: {str(project_name)} \
                    \n- サイト名: {str(site_name)} \
                    \n- アセット名: {str(asset_name)} \
                    \n- ポジション名: {str(position_name)} \
                    \n- アセットポジションURL: {str(asset_positon_url)} *センサーデータ* \
                    \n- 検知時刻: {str(timestamp_jst)} \
                    \n- X軸方向: {str(velocity_band10To1000Hz_x)} rms \
                    \n- Y軸方向: {str(velocity_band10To1000Hz_y)} rms \
                    \n- Z軸方向: {str(velocity_band10To1000Hz_z)} rms"
                }
                headers = {'content-type': 'application/json'}
                requests.post(web_hook_url, data = json.dumps(payload), headers = headers)

    return {
        "success": "true"
    }

上記の通り、エラー処理やロギングの無い動作検証用のスクリプトなので、Webhook URL もそのまま「ベタ書き」しています。必要に応じて「AWS Parameters and Secrets Lambda Extension」などを使うようにしてください。

「AWS Parameters and Secrets Lambda Extension」の詳細は下記記事を参考にしてもらえればと思います。

  • ビルド & デプロイ
$ sam build
$ sam deploy --guided

動作確認

Lambda がデプロイできたら動作確認を行います。

テストデータの作成

動作確認には、次に Monitron がデータ収集するタイミングを待つ必要がありますが「1時間に1回」のチェックになるので場合によっては時間がかかります。
また、アラート状態を作り出しても、ISO ベースのアラートはすぐには異常と判断せず1時間以上待機する場合があったので、ここではダミーのテストデータで動作確認しようと思います。

冒頭で Kinesis のコンソールからデータを確認しました。そのデータを少し編集したものをテストデータとして使います。
Kinesis から取得したデータの 82〜84 行目を次のように修正します。
(正常 → 異常に変化した時のデータに編集します)

"vibrationISO": {
    "isoClass": "CLASS1",
    "mutedThreshold": null,
    "previousPersistentClassificationOutput": "HEALTHY",
    "persistentClassificationOutput": "ALARM",
    "pointwiseClassificationOutput": "ALARM"
},

編集できたらデータ全体を base64 でエンコードします。今回は動作検証なので横着して Web ツールを利用しました。

編集した JSON のテストデータを貼り付けてエンコードします。

35-base64encode

エンコードできたら、そのデータを後で使うので控えておきます。

36-encoded

次に Lambda のコンソールで該当の関数から「テスト」タブを開きます。
ここで「新しいイベントを作成」を選択してテストを作成します。

イベント名は適当なものを指定します。
イベント JSON はテンプレートから作成します。プルダウンをクリックすると一覧が出るので、「Kinesis Data Stream」 を選択すると自動的に「kinesis-get-records」という表示になります。

37-lambda-test

自動で生成された「Kinesis Data Stream」 の JSON にある data の値に、先程エンコードした内容を貼り付けます。

38-edit-data

ここまでできたら「保存」をクリックしてテストを保存します。

39-save-test

異常通知の動作確認

すでに Lambda 関数はデプロイ済みなので、このまま「テスト」をクリックして Lambda 関数を動作させてみます。

40-lambda-test

無事に Slack へ異常検知のメッセージが送られてきました。

41-post-alarm-to-slack

後ほど実機のセンサーを付けている扇風機の強度を「弱→強」に変えてみると、同様のメッセージを受信できたことを確認できました。
下記では Monitron の Web アプリで「10月13日 14:40」にアラームになったことが分かります。

42-2-real-alarm-monitron-app

実際には、ISO のアラーム基準を超過したのは 09:40 〜10:40 の間ですが、Monitron がアラームと判断したのは 14:40 と 4 時間ほどラグがありました。ISO ベースの場合でも一過性のものかどうか判定しているのかもしれません。

同じ時刻で Slack にもアラームが投稿されていました。
テストデータとは異なり、有効な「アセットポジションURL」が発行されていますね。また「X 軸方向」の振動値が突出しており、これがアラームの原因になったことも分かります。

43-real-alarm-to-slack

最後に

今回は 通知先のカスタム例として Slack に通知してみました。
Lambda を使えば基本的に何でもできてしまいますが、Slack の他にもメール通知や Amazon Connect を使って電話通知などもできるようになります。

Kinesis エクスポートを使うことで Monitron データの活用用途が広がるだけでなく、公式のMonitron アプリでは実現できないことができるようになるので、引続き試していきたいと思います。