この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
テントの中から失礼します、CX事業本部のてんとタカハシです!
AWS IoT Analytics について、弊社ブログや他のサイトでも情報が少なかったため、記事を書こうと思います。最近だと CDK を使ってリソースを構築することが多いかと思いますが、私自身初めて触るサービスでもあり、まずは CloudFormation でチャレンジしてみました。
ソースコード全体は下記に置いてあります。記事の中では書きませんが、デプロイ用のスクリプト等も置いています。
GitHub - iam326/aws-iot-analytics-sample-cfn
AWS IoT Analytics とは
膨大な IoT データを簡単に分析するためのフルマネージドサービスです。AWS IoT Analytics を活用することで、データの収集、処理、保存、解析、可視化をするための環境を簡単に構築することができます。
AWS が公開している Black Belt Online Seminar の動画がとても参考になります。
今回作るもの
AWS IoT Core のトピックiot/topic
にデータが来た場合、AWS IoT Analytics にデータを受け渡し、加工処理を行って保存するまでの流れを CloudFormation で構築しようと思います。ついでに、AWS IoT Analytics に保存されたデータを QuickSight で表示してみます。
データを加工処理する部分は Pipeline と呼ばれ、属性の操作や計算、フィルタリングを行うことができたり、複雑な処理を行うために Lambda と連携することができます。
今回は、Pipeline で下記を試してみようと思います。
- 既存の属性を別の属性に変換する
- 既存の属性を削除する
- フィルタリング
- 既存の属性に対して計算処理を行い、別の属性を作成する
- Lambda と連携して、既存の属性を別の属性にコピーする
CloudFormation で構築する
テンプレートは下記になります。
template.yaml
AWSTemplateFormatVersion: "2010-09-09"
Description: AWS IoT Analytics Sample
Parameters:
NamePrefix:
Type: String
IoTCertificateName:
Type: String
Resources:
### AWS IoT Core ###
IoTPolicy:
Type: AWS::IoT::Policy
Properties:
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: "iot:*"
Resource: "*"
PolicyName: !Sub ${NamePrefix}_iot_policy
IoTThing:
Type: AWS::IoT::Thing
Properties:
ThingName: !Sub ${NamePrefix}_iot_thing
IoTPolicyPrincipalAttachment:
Type: AWS::IoT::PolicyPrincipalAttachment
Properties:
PolicyName: !Ref IoTPolicy
Principal: !Sub arn:aws:iot:${AWS::Region}:${AWS::AccountId}:cert/${IoTCertificateName}
IoTThingPrincipalAttachment:
Type: AWS::IoT::ThingPrincipalAttachment
Properties:
ThingName: !Ref IoTThing
Principal: !Sub arn:aws:iot:${AWS::Region}:${AWS::AccountId}:cert/${IoTCertificateName}
### AWS IoT Analytics ###
IoTAnalyticsChannel:
Type: AWS::IoTAnalytics::Channel
Properties:
ChannelName: !Sub ${NamePrefix}_iot_analytics_channel
ChannelStorage:
ServiceManagedS3: {}
IoTAnalyticsDatastore:
Type: AWS::IoTAnalytics::Datastore
Properties:
DatastoreName: !Sub ${NamePrefix}_iot_analytics_datastore
DatastoreStorage:
ServiceManagedS3: {}
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- "lambda.amazonaws.com"
Action:
- "sts:AssumeRole"
Path: "/"
ManagedPolicyArns:
- !Sub 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
LambdaFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub ${NamePrefix}_pipeline_lambda_function
Handler: index.lambda_handler
Role: !GetAtt LambdaExecutionRole.Arn
Runtime: python3.7
Code:
ZipFile: |
import logging
import sys
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
def lambda_handler(event, context):
logger.info("event before processing: {}".format(event))
for e in event:
if 'temperature' in e:
# 既存の属性を別の属性にコピーする
e['temperature_copy'] = e['temperature']
logger.info("event after processing: {}".format(event))
return event
LambdaPermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: !GetAtt LambdaFunction.Arn
Principal: iotanalytics.amazonaws.com
IoTAnalyticsPipeline:
Type: AWS::IoTAnalytics::Pipeline
Properties:
PipelineName: !Sub ${NamePrefix}_iot_analytics_pipeline
PipelineActivities:
- Channel:
# データの取得元指定する
Name: pipeline_channel_activity
ChannelName: !Sub ${NamePrefix}_iot_analytics_channel
Next: pipeline_add_attributes_activity
AddAttributes:
# 既存の属性 device を別の属性 id, name に変換する
Name: pipeline_add_attributes_activity
Attributes:
device.id: id
device.name: name
Next: pipeline_remove_attributes_activity
RemoveAttributes:
# 既存の属性 device を削除する
Name: pipeline_remove_attributes_activity
Attributes:
- device
Next: pipeline_filter_activity
Filter:
# 温度が10~40の範囲にフィルタする
Name: pipeline_filter_activity
Filter: temperature >= 10 AND temperature <= 40
Next: pipeline_math_activity
Math:
# 摂氏を華氏に変換して新しい属性 temperature_f を作る
Name: pipeline_math_activity
Attribute: temperature_f
Math: temperature * 1.8 + 32
Next: pipeline_lambda_activity
Lambda:
# Lambdaと連携して新しい属性 temperature_copy を作る
Name: pipeline_lambda_activity
BatchSize: 1
LambdaName: !Sub ${NamePrefix}_pipeline_lambda_function
Next: pipeline_datastore_activity
Datastore:
# データの保存先指定する
Name: pipeline_datastore_activity
DatastoreName: !Sub ${NamePrefix}_iot_analytics_datastore
IoTAnalyticsDataset:
Type: AWS::IoTAnalytics::Dataset
Properties:
DatasetName: !Sub ${NamePrefix}_iot_analytics_dataset
Actions:
- ActionName: SqlAction
QueryAction:
# 当日分のデータのみ取得する
SqlQuery: !Sub "SELECT * FROM ${NamePrefix}_iot_analytics_datastore WHERE __dt > current_date - interval '1' day"
RetentionPeriod:
# データセットの保持日数
NumberOfDays: 1
Unlimited: false
Triggers:
# データセットを自動更新する
- Schedule:
ScheduleExpression: rate(5 minute)
DependsOn: IoTAnalyticsDatastore
IoTBatchPutMessageRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service:
- iot.amazonaws.com
Action:
- sts:AssumeRole
Path: "/"
Policies:
- PolicyName: !Sub ${NamePrefix}_iot_batch_put_message_role
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: "iotanalytics:BatchPutMessage"
Resource: !Sub "arn:aws:iotanalytics:${AWS::Region}:${AWS::AccountId}:channel/${NamePrefix}_iot_analytics_channel"
IoTTopicRule:
Type: AWS::IoT::TopicRule
Properties:
RuleName: !Sub ${NamePrefix}_iot_topic_rule
TopicRulePayload:
Actions:
- IotAnalytics:
ChannelName: !Sub ${NamePrefix}_iot_analytics_channel
RoleArn: !GetAtt IoTBatchPutMessageRole.Arn
AwsIotSqlVersion: "2016-03-23"
RuleDisabled: false
Sql: "SELECT * FROM 'iot/topic'"
AWS IoT Core にデータを Pub する
上記のテンプレートでリソースをデプロイしたら、10秒ごとにランダムなデータを Pub してみます。
publish_message.py
#!/usr/bin/env python3
import datetime
import json
import os
import random
from time import sleep
from awscrt import io, mqtt
from awsiot import mqtt_connection_builder
ENDPOINT = os.environ['AWS_IOT_ENDPOINT']
CLIENT_ID = os.environ['AWS_IOT_CLIENT_ID']
PATH_TO_CERT = 'certificates/certificate.pem.crt'
PATH_TO_KEY = 'certificates/private.pem.key'
PATH_TO_ROOT = 'certificates/AmazonRootCA1.pem'
TOPIC = 'iot/topic'
WAIT_TIME = 10
def main():
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=ENDPOINT,
cert_filepath=PATH_TO_CERT,
pri_key_filepath=PATH_TO_KEY,
client_bootstrap=client_bootstrap,
ca_filepath=PATH_TO_ROOT,
client_id=CLIENT_ID,
clean_session=False,
keep_alive_secs=6
)
print('Connecting to {} with client ID {}...'.format(
ENDPOINT, CLIENT_ID))
connect_future = mqtt_connection.connect()
connect_future.result()
print('Connected!')
try:
while True:
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
rand = random.randint(0, 50)
message = {
'device': {
'id': 'device-1',
'name': 'hoge'
},
'datetime': now,
'temperature': rand
}
mqtt_connection.publish(topic=TOPIC, payload=json.dumps(
message), qos=mqtt.QoS.AT_LEAST_ONCE)
print('Published: {} to the topic: {}'.format(
json.dumps(message), TOPIC))
sleep(WAIT_TIME)
except KeyboardInterrupt:
pass
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
print('Disconnected!')
if __name__ == '__main__':
main()
下記のデータを Pub しています。温度だけランダムな値が入ります。
{
"device": {
"id": "device-1",
"name": "hoge"
},
"datetime": "2020-10-29 23:37:53",
"temperature": 28
}
データを AWS IoT Analytics で確認する
データを5分ほど Pub し続けた後、AWS マネジメントコンソールで AWS IoT Analytics のページを開き、対象のデータセットを表示すると、結果のプレビュー欄に Pub したデータが表示されます。
こちらに表示されているデータは、Pipeline で加工処理を行った後のデータになります。そのため、下記のように、属性が変換されていたり、新しい属性が増えたりしています。__dt
については、AWS IoT Analytics が自動で付与する属性になります。
{
"id": "device-1",
"name": "hoge",
"datetime": "2020-10-29 23:37:53",
"temperature": 28,
"temperature_f": 82.4,
"temperature_copy": 28,
"__dt": "2020-10-29 00:00:00.000"
}
QuickSight で可視化する
AWS IoT Analytics に保存されたデータを QuickSight で可視化してみます。
AWS マネジメントコンソールで QuickSight のページを開いて、左メニューの「分析」から「新しい分析」を選択します。
「新しいデータセット」を選択します。
「AWS IoT Analytics」を選択します。
対象のデータセットを選択して、「データソースを作成」します。
「視覚化する」を選択します。
可視化用のページが表示されます。
ビジュアルタイプから「折れ線グラフ」を選択して、X軸に「datetime」、値に「temperature」を指定します。
datetime を「分」で集計するように設定を変更します。
temperature を「平均」で集計するように設定を変更します。
分ごとの平均温度を示すグラフを表示することができました。他にも色々なグラフが使えますので、是非試してみてください。
おわりに
AWS IoT Analytics には他にも便利な機能や連携可能なサービスがありますので、何か試すことができたら記事にしていきたいと思います。また、可能であれば CDK を使って、今回と同じようなことを実現してみたいと思います。ラズパイとセンサー使って何か作りてえな。
今回は以上になります。最後まで読んで頂きありがとうございました!