AWS IoT Analytics を CloudFormation で構築してみた
はじめに
テントの中から失礼します、CX事業本部のてんとタカハシです!
AWS IoT Analytics について、弊社ブログや他のサイトでも情報が少なかったため、記事を書こうと思います。最近だと CDK を使ってリソースを構築することが多いかと思いますが、私自身初めて触るサービスでもあり、まずは CloudFormation でチャレンジしてみました。
ソースコード全体は下記に置いてあります。記事の中では書きませんが、デプロイ用のスクリプト等も置いています。
GitHub - iam326/aws-iot-analytics-sample-cfn
AWS IoT Analytics とは
膨大な IoT データを簡単に分析するためのフルマネージドサービスです。AWS IoT Analytics を活用することで、データの収集、処理、保存、解析、可視化をするための環境を簡単に構築することができます。
AWS が公開している Black Belt Online Seminar の動画がとても参考になります。
今回作るもの
AWS IoT Core のトピックiot/topic
にデータが来た場合、AWS IoT Analytics にデータを受け渡し、加工処理を行って保存するまでの流れを CloudFormation で構築しようと思います。ついでに、AWS IoT Analytics に保存されたデータを QuickSight で表示してみます。
データを加工処理する部分は Pipeline と呼ばれ、属性の操作や計算、フィルタリングを行うことができたり、複雑な処理を行うために Lambda と連携することができます。
今回は、Pipeline で下記を試してみようと思います。
- 既存の属性を別の属性に変換する
- 既存の属性を削除する
- フィルタリング
- 既存の属性に対して計算処理を行い、別の属性を作成する
- Lambda と連携して、既存の属性を別の属性にコピーする
CloudFormation で構築する
テンプレートは下記になります。
AWSTemplateFormatVersion: "2010-09-09" Description: AWS IoT Analytics Sample Parameters: NamePrefix: Type: String IoTCertificateName: Type: String Resources: ### AWS IoT Core ### IoTPolicy: Type: AWS::IoT::Policy Properties: PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: "iot:*" Resource: "*" PolicyName: !Sub ${NamePrefix}_iot_policy IoTThing: Type: AWS::IoT::Thing Properties: ThingName: !Sub ${NamePrefix}_iot_thing IoTPolicyPrincipalAttachment: Type: AWS::IoT::PolicyPrincipalAttachment Properties: PolicyName: !Ref IoTPolicy Principal: !Sub arn:aws:iot:${AWS::Region}:${AWS::AccountId}:cert/${IoTCertificateName} IoTThingPrincipalAttachment: Type: AWS::IoT::ThingPrincipalAttachment Properties: ThingName: !Ref IoTThing Principal: !Sub arn:aws:iot:${AWS::Region}:${AWS::AccountId}:cert/${IoTCertificateName} ### AWS IoT Analytics ### IoTAnalyticsChannel: Type: AWS::IoTAnalytics::Channel Properties: ChannelName: !Sub ${NamePrefix}_iot_analytics_channel ChannelStorage: ServiceManagedS3: {} IoTAnalyticsDatastore: Type: AWS::IoTAnalytics::Datastore Properties: DatastoreName: !Sub ${NamePrefix}_iot_analytics_datastore DatastoreStorage: ServiceManagedS3: {} LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" ManagedPolicyArns: - !Sub 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' LambdaFunction: Type: AWS::Lambda::Function Properties: FunctionName: !Sub ${NamePrefix}_pipeline_lambda_function Handler: index.lambda_handler Role: !GetAtt LambdaExecutionRole.Arn Runtime: python3.7 Code: ZipFile: | import logging import sys # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) streamHandler = logging.StreamHandler(stream=sys.stdout) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) def lambda_handler(event, context): logger.info("event before processing: {}".format(event)) for e in event: if 'temperature' in e: # 既存の属性を別の属性にコピーする e['temperature_copy'] = e['temperature'] logger.info("event after processing: {}".format(event)) return event LambdaPermission: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction FunctionName: !GetAtt LambdaFunction.Arn Principal: iotanalytics.amazonaws.com IoTAnalyticsPipeline: Type: AWS::IoTAnalytics::Pipeline Properties: PipelineName: !Sub ${NamePrefix}_iot_analytics_pipeline PipelineActivities: - Channel: # データの取得元指定する Name: pipeline_channel_activity ChannelName: !Sub ${NamePrefix}_iot_analytics_channel Next: pipeline_add_attributes_activity AddAttributes: # 既存の属性 device を別の属性 id, name に変換する Name: pipeline_add_attributes_activity Attributes: device.id: id device.name: name Next: pipeline_remove_attributes_activity RemoveAttributes: # 既存の属性 device を削除する Name: pipeline_remove_attributes_activity Attributes: - device Next: pipeline_filter_activity Filter: # 温度が10~40の範囲にフィルタする Name: pipeline_filter_activity Filter: temperature >= 10 AND temperature <= 40 Next: pipeline_math_activity Math: # 摂氏を華氏に変換して新しい属性 temperature_f を作る Name: pipeline_math_activity Attribute: temperature_f Math: temperature * 1.8 + 32 Next: pipeline_lambda_activity Lambda: # Lambdaと連携して新しい属性 temperature_copy を作る Name: pipeline_lambda_activity BatchSize: 1 LambdaName: !Sub ${NamePrefix}_pipeline_lambda_function Next: pipeline_datastore_activity Datastore: # データの保存先指定する Name: pipeline_datastore_activity DatastoreName: !Sub ${NamePrefix}_iot_analytics_datastore IoTAnalyticsDataset: Type: AWS::IoTAnalytics::Dataset Properties: DatasetName: !Sub ${NamePrefix}_iot_analytics_dataset Actions: - ActionName: SqlAction QueryAction: # 当日分のデータのみ取得する SqlQuery: !Sub "SELECT * FROM ${NamePrefix}_iot_analytics_datastore WHERE __dt > current_date - interval '1' day" RetentionPeriod: # データセットの保持日数 NumberOfDays: 1 Unlimited: false Triggers: # データセットを自動更新する - Schedule: ScheduleExpression: rate(5 minute) DependsOn: IoTAnalyticsDatastore IoTBatchPutMessageRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - iot.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: !Sub ${NamePrefix}_iot_batch_put_message_role PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: "iotanalytics:BatchPutMessage" Resource: !Sub "arn:aws:iotanalytics:${AWS::Region}:${AWS::AccountId}:channel/${NamePrefix}_iot_analytics_channel" IoTTopicRule: Type: AWS::IoT::TopicRule Properties: RuleName: !Sub ${NamePrefix}_iot_topic_rule TopicRulePayload: Actions: - IotAnalytics: ChannelName: !Sub ${NamePrefix}_iot_analytics_channel RoleArn: !GetAtt IoTBatchPutMessageRole.Arn AwsIotSqlVersion: "2016-03-23" RuleDisabled: false Sql: "SELECT * FROM 'iot/topic'"
AWS IoT Core にデータを Pub する
上記のテンプレートでリソースをデプロイしたら、10秒ごとにランダムなデータを Pub してみます。
下記のデータを Pub しています。温度だけランダムな値が入ります。
{ "device": { "id": "device-1", "name": "hoge" }, "datetime": "2020-10-29 23:37:53", "temperature": 28 }
データを AWS IoT Analytics で確認する
データを5分ほど Pub し続けた後、AWS マネジメントコンソールで AWS IoT Analytics のページを開き、対象のデータセットを表示すると、結果のプレビュー欄に Pub したデータが表示されます。
こちらに表示されているデータは、Pipeline で加工処理を行った後のデータになります。そのため、下記のように、属性が変換されていたり、新しい属性が増えたりしています。__dt
については、AWS IoT Analytics が自動で付与する属性になります。
{ "id": "device-1", "name": "hoge", "datetime": "2020-10-29 23:37:53", "temperature": 28, "temperature_f": 82.4, "temperature_copy": 28, "__dt": "2020-10-29 00:00:00.000" }
QuickSight で可視化する
AWS IoT Analytics に保存されたデータを QuickSight で可視化してみます。
AWS マネジメントコンソールで QuickSight のページを開いて、左メニューの「分析」から「新しい分析」を選択します。
「新しいデータセット」を選択します。
「AWS IoT Analytics」を選択します。
対象のデータセットを選択して、「データソースを作成」します。
「視覚化する」を選択します。
可視化用のページが表示されます。
ビジュアルタイプから「折れ線グラフ」を選択して、X軸に「datetime」、値に「temperature」を指定します。
datetime を「分」で集計するように設定を変更します。
temperature を「平均」で集計するように設定を変更します。
分ごとの平均温度を示すグラフを表示することができました。他にも色々なグラフが使えますので、是非試してみてください。
おわりに
AWS IoT Analytics には他にも便利な機能や連携可能なサービスがありますので、何か試すことができたら記事にしていきたいと思います。また、可能であれば CDK を使って、今回と同じようなことを実現してみたいと思います。ラズパイとセンサー使って何か作りてえな。
今回は以上になります。最後まで読んで頂きありがとうございました!