AWS IoT Analytics を CloudFormation で構築してみた

はじめに

テントの中から失礼します、CX事業本部のてんとタカハシです!

AWS IoT Analytics について、弊社ブログや他のサイトでも情報が少なかったため、記事を書こうと思います。最近だと CDK を使ってリソースを構築することが多いかと思いますが、私自身初めて触るサービスでもあり、まずは CloudFormation でチャレンジしてみました。

ソースコード全体は下記に置いてあります。記事の中では書きませんが、デプロイ用のスクリプト等も置いています。

GitHub - iam326/aws-iot-analytics-sample-cfn

AWS IoT Analytics とは

膨大な IoT データを簡単に分析するためのフルマネージドサービスです。AWS IoT Analytics を活用することで、データの収集、処理、保存、解析、可視化をするための環境を簡単に構築することができます。

AWS が公開している Black Belt Online Seminar の動画がとても参考になります。

今回作るもの

AWS IoT Core のトピックiot/topic にデータが来た場合、AWS IoT Analytics にデータを受け渡し、加工処理を行って保存するまでの流れを CloudFormation で構築しようと思います。ついでに、AWS IoT Analytics に保存されたデータを QuickSight で表示してみます。

データを加工処理する部分は Pipeline と呼ばれ、属性の操作や計算、フィルタリングを行うことができたり、複雑な処理を行うために Lambda と連携することができます。

今回は、Pipeline で下記を試してみようと思います。

  • 既存の属性を別の属性に変換する
  • 既存の属性を削除する
  • フィルタリング
  • 既存の属性に対して計算処理を行い、別の属性を作成する
  • Lambda と連携して、既存の属性を別の属性にコピーする

CloudFormation で構築する

テンプレートは下記になります。

template.yaml

AWSTemplateFormatVersion: "2010-09-09"
Description: AWS IoT Analytics Sample

Parameters:
  NamePrefix:
    Type: String
  IoTCertificateName:
    Type: String

Resources:

  ### AWS IoT Core ###

  IoTPolicy:
    Type: AWS::IoT::Policy
    Properties:
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Action: "iot:*"
            Resource: "*"
      PolicyName: !Sub ${NamePrefix}_iot_policy

  IoTThing:
    Type: AWS::IoT::Thing
    Properties:
      ThingName: !Sub ${NamePrefix}_iot_thing

  IoTPolicyPrincipalAttachment:
    Type: AWS::IoT::PolicyPrincipalAttachment
    Properties:
      PolicyName: !Ref IoTPolicy
      Principal: !Sub arn:aws:iot:${AWS::Region}:${AWS::AccountId}:cert/${IoTCertificateName}

  IoTThingPrincipalAttachment:
    Type: AWS::IoT::ThingPrincipalAttachment
    Properties:
      ThingName: !Ref IoTThing
      Principal: !Sub arn:aws:iot:${AWS::Region}:${AWS::AccountId}:cert/${IoTCertificateName}

  ### AWS IoT Analytics ###

  IoTAnalyticsChannel:
    Type: AWS::IoTAnalytics::Channel
    Properties:
      ChannelName: !Sub ${NamePrefix}_iot_analytics_channel
      ChannelStorage:
        ServiceManagedS3: {}

  IoTAnalyticsDatastore:
    Type: AWS::IoTAnalytics::Datastore
    Properties:
      DatastoreName: !Sub ${NamePrefix}_iot_analytics_datastore
      DatastoreStorage:
        ServiceManagedS3: {}

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service:
                - "lambda.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Path: "/"
      ManagedPolicyArns:
        - !Sub 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'

  LambdaFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub ${NamePrefix}_pipeline_lambda_function
      Handler: index.lambda_handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Runtime: python3.7
      Code:
        ZipFile: |
          import logging
          import sys

          # Configure logging
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          streamHandler = logging.StreamHandler(stream=sys.stdout)
          formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
          streamHandler.setFormatter(formatter)
          logger.addHandler(streamHandler)

          def lambda_handler(event, context):
            logger.info("event before processing: {}".format(event))
            for e in event:
              if 'temperature' in e:
                # 既存の属性を別の属性にコピーする
                e['temperature_copy'] = e['temperature']
            logger.info("event after processing: {}".format(event))
            return event

  LambdaPermission:
    Type: AWS::Lambda::Permission
    Properties:
      Action: lambda:InvokeFunction
      FunctionName: !GetAtt LambdaFunction.Arn
      Principal: iotanalytics.amazonaws.com

  IoTAnalyticsPipeline:
    Type: AWS::IoTAnalytics::Pipeline
    Properties:
      PipelineName: !Sub ${NamePrefix}_iot_analytics_pipeline
      PipelineActivities:
        - Channel:
            # データの取得元指定する
            Name: pipeline_channel_activity
            ChannelName: !Sub ${NamePrefix}_iot_analytics_channel
            Next: pipeline_add_attributes_activity
          AddAttributes:
            # 既存の属性 device を別の属性 id, name に変換する
            Name: pipeline_add_attributes_activity
            Attributes:
              device.id: id
              device.name: name
            Next: pipeline_remove_attributes_activity
          RemoveAttributes:
            # 既存の属性 device を削除する
            Name: pipeline_remove_attributes_activity
            Attributes:
              - device
            Next: pipeline_filter_activity
          Filter:
            # 温度が10~40の範囲にフィルタする
            Name: pipeline_filter_activity
            Filter: temperature >= 10 AND temperature <= 40
            Next: pipeline_math_activity
          Math:
            # 摂氏を華氏に変換して新しい属性 temperature_f を作る
            Name: pipeline_math_activity
            Attribute: temperature_f
            Math: temperature * 1.8 + 32
            Next: pipeline_lambda_activity
          Lambda:
            # Lambdaと連携して新しい属性 temperature_copy を作る
            Name: pipeline_lambda_activity 
            BatchSize: 1 
            LambdaName: !Sub ${NamePrefix}_pipeline_lambda_function
            Next: pipeline_datastore_activity
          Datastore:
            # データの保存先指定する
            Name: pipeline_datastore_activity
            DatastoreName: !Sub ${NamePrefix}_iot_analytics_datastore

  IoTAnalyticsDataset:
    Type: AWS::IoTAnalytics::Dataset
    Properties:
      DatasetName: !Sub ${NamePrefix}_iot_analytics_dataset
      Actions:
        - ActionName: SqlAction
          QueryAction:
            # 当日分のデータのみ取得する
            SqlQuery: !Sub "SELECT * FROM ${NamePrefix}_iot_analytics_datastore WHERE __dt > current_date - interval '1' day"
      RetentionPeriod:
        # データセットの保持日数
        NumberOfDays: 1
        Unlimited: false
      Triggers:
        # データセットを自動更新する
        - Schedule:
            ScheduleExpression: rate(5 minute)
    DependsOn: IoTAnalyticsDatastore

  IoTBatchPutMessageRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - iot.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: !Sub ${NamePrefix}_iot_batch_put_message_role
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action: "iotanalytics:BatchPutMessage"
                Resource: !Sub "arn:aws:iotanalytics:${AWS::Region}:${AWS::AccountId}:channel/${NamePrefix}_iot_analytics_channel"

  IoTTopicRule:
    Type: AWS::IoT::TopicRule
    Properties:
      RuleName: !Sub ${NamePrefix}_iot_topic_rule
      TopicRulePayload:
        Actions:
          - IotAnalytics:
              ChannelName: !Sub ${NamePrefix}_iot_analytics_channel
              RoleArn: !GetAtt IoTBatchPutMessageRole.Arn
        AwsIotSqlVersion: "2016-03-23"
        RuleDisabled: false
        Sql: "SELECT * FROM 'iot/topic'"

AWS IoT Core にデータを Pub する

上記のテンプレートでリソースをデプロイしたら、10秒ごとにランダムなデータを Pub してみます。

publish_message.py

#!/usr/bin/env python3

import datetime
import json
import os
import random
from time import sleep

from awscrt import io, mqtt
from awsiot import mqtt_connection_builder

ENDPOINT = os.environ['AWS_IOT_ENDPOINT']
CLIENT_ID = os.environ['AWS_IOT_CLIENT_ID']
PATH_TO_CERT = 'certificates/certificate.pem.crt'
PATH_TO_KEY = 'certificates/private.pem.key'
PATH_TO_ROOT = 'certificates/AmazonRootCA1.pem'
TOPIC = 'iot/topic'
WAIT_TIME = 10


def main():
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint=ENDPOINT,
        cert_filepath=PATH_TO_CERT,
        pri_key_filepath=PATH_TO_KEY,
        client_bootstrap=client_bootstrap,
        ca_filepath=PATH_TO_ROOT,
        client_id=CLIENT_ID,
        clean_session=False,
        keep_alive_secs=6
    )

    print('Connecting to {} with client ID {}...'.format(
        ENDPOINT, CLIENT_ID))
    connect_future = mqtt_connection.connect()
    connect_future.result()
    print('Connected!')

    try:
        while True:
            now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            rand = random.randint(0, 50)
            message = {
                'device': {
                    'id': 'device-1',
                    'name': 'hoge'
                },
                'datetime': now,
                'temperature': rand
            }
            mqtt_connection.publish(topic=TOPIC, payload=json.dumps(
                message), qos=mqtt.QoS.AT_LEAST_ONCE)
            print('Published: {} to the topic: {}'.format(
                json.dumps(message), TOPIC))
            sleep(WAIT_TIME)
    except KeyboardInterrupt:
        pass

    disconnect_future = mqtt_connection.disconnect()
    disconnect_future.result()
    print('Disconnected!')


if __name__ == '__main__':
    main()

下記のデータを Pub しています。温度だけランダムな値が入ります。

{
  "device": {
    "id": "device-1",
    "name": "hoge"
  },
  "datetime": "2020-10-29 23:37:53",
  "temperature": 28
}

データを AWS IoT Analytics で確認する

データを5分ほど Pub し続けた後、AWS マネジメントコンソールで AWS IoT Analytics のページを開き、対象のデータセットを表示すると、結果のプレビュー欄に Pub したデータが表示されます。

こちらに表示されているデータは、Pipeline で加工処理を行った後のデータになります。そのため、下記のように、属性が変換されていたり、新しい属性が増えたりしています。__dt については、AWS IoT Analytics が自動で付与する属性になります。

{
  "id": "device-1",
  "name": "hoge",
  "datetime": "2020-10-29 23:37:53",
  "temperature": 28,
  "temperature_f": 82.4,
  "temperature_copy": 28,
  "__dt": "2020-10-29 00:00:00.000"
}

QuickSight で可視化する

AWS IoT Analytics に保存されたデータを QuickSight で可視化してみます。

AWS マネジメントコンソールで QuickSight のページを開いて、左メニューの「分析」から「新しい分析」を選択します。

「新しいデータセット」を選択します。

「AWS IoT Analytics」を選択します。

対象のデータセットを選択して、「データソースを作成」します。

「視覚化する」を選択します。

可視化用のページが表示されます。

ビジュアルタイプから「折れ線グラフ」を選択して、X軸に「datetime」、値に「temperature」を指定します。

datetime を「分」で集計するように設定を変更します。

temperature を「平均」で集計するように設定を変更します。

分ごとの平均温度を示すグラフを表示することができました。他にも色々なグラフが使えますので、是非試してみてください。

おわりに

AWS IoT Analytics には他にも便利な機能や連携可能なサービスがありますので、何か試すことができたら記事にしていきたいと思います。また、可能であれば CDK を使って、今回と同じようなことを実現してみたいと思います。ラズパイとセンサー使って何か作りてえな。

今回は以上になります。最後まで読んで頂きありがとうございました!