AWS IoT SiteWise ゲートウェイのデータを Amazon QuickSight で分析するための AWS IoT Analytics 環境を AWS CloudFormation で作成してみた
以前、SiteWise ゲートウェイで収集した 設備機器の稼働データを IoT Analytics でデータ変換して QuickSight で利用する方法を紹介しました。
個人的にこの構成を頻繁に作る機会があったので、CloudFormation で作成できるようにしてみました。
前回のリソースとの変更点 〜 S3 バケットはユーザー管理〜
前回の記事では、IoT Analytics のリソースはシングルステップセットアップで作成していましたが、シングルステップセットアップでは、S3 バケットが AWS 管理のものになるためバケットの中身をユーザー直接見ることができません。
そのため今回のテンプレートでは、S3 バケットは全て「ユーザー管理のバケット」として作成することにしました。
作成される環境
紹介するテンプレートでは、下記の赤枠のリソースが作成されます。
IoT Analytics のチャンネル、データストア、パイプラインのそれぞれが別の S3 バケットとなります。
CloudFormation テンプレート
テンプレートは下記のとおりです。
AWSTemplateFormatVersion: "2010-09-09" Description: AWS IoT Analytics and Lambda to convert TQV data (SiteWise Gateway data). ############################################################### # Parameters ############################################################### Parameters: NamePrefix: Description: "Set resource name prefix. The following expressions are permitted: ^[a-z0-9]*$" Type: String AllowedPattern: ^[a-z0-9]*$ UseLambda: Description: "Choose whether to use the Lambda function" Default: UseLambda Type: String AllowedValues: - UseLambda - NoUseLambda ############################################################### # Conditions ############################################################### Conditions: CreateLambdaResources: !Equals [!Ref UseLambda, "UseLambda"] SkipLambdaResources: !Equals [!Ref UseLambda, "NoUseLambda"] ############################################################### # Resources ############################################################### Resources: # ------------------------------------------------------------# # S3 Bucket - for IoT Analytics Channel # ------------------------------------------------------------# BucketForIoTAnalyticsChannel: Type: AWS::S3::Bucket DeletionPolicy: Retain Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 BucketName: !Sub ${NamePrefix}-iot-analytics-channel-bucket # Bucket Policy for IoT Analytics Channel Bucket BucketPolicyForIoTAnalyticsChannnelBucket: Type: AWS::S3::BucketPolicy DeletionPolicy: Retain Properties: Bucket: !Ref BucketForIoTAnalyticsChannel PolicyDocument: { "Version": "2012-10-17", "Id": "MyPolicyID", "Statement": [ { "Sid": "MyStatementSid", "Effect": "Allow", "Principal": { "Service": "iotanalytics.amazonaws.com" }, "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:PutObject", "s3:DeleteObject" ], "Resource": [ "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsChannel}", "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsChannel}/*" ] } ] } # ------------------------------------------------------------# # S3 Bucket - for IoT Analytics Data store # ------------------------------------------------------------# BucketForIoTAnalyticsDatastore: Type: AWS::S3::Bucket DeletionPolicy: Retain Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 BucketName: !Sub ${NamePrefix}-iot-analytics-datastore-bucket # Bucket Policy for IoT Analytics DataStore Bucket BucketPolicyForIoTAnalyticsDatastorelBucket: Type: AWS::S3::BucketPolicy DeletionPolicy: Retain Properties: Bucket: !Ref BucketForIoTAnalyticsDatastore PolicyDocument: { "Version": "2012-10-17", "Id": "MyPolicyID", "Statement": [ { "Sid": "MyStatementSid", "Effect": "Allow", "Principal": { "Service": "iotanalytics.amazonaws.com" }, "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:PutObject", "s3:DeleteObject" ], "Resource": [ "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDatastore}", "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDatastore}/*" ] } ] } # ------------------------------------------------------------# # S3 Bucket - for IoT Analytics Data set # ------------------------------------------------------------# BucketForIoTAnalyticsDataset: Type: AWS::S3::Bucket DeletionPolicy: Retain Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 BucketName: !Sub ${NamePrefix}-iot-analytics-dataset-bucket DependsOn: IoTAnalyticsDatastore # Bucket Policy for IoT Analytics Dataset Bucket BucketPolicyForIoTAnalyticsDatasetBucket: Type: AWS::S3::BucketPolicy DeletionPolicy: Retain Properties: Bucket: !Ref BucketForIoTAnalyticsDataset PolicyDocument: { "Version": "2012-10-17", "Id": "MyPolicyID", "Statement": [ { "Sid": "MyStatementSid", "Effect": "Allow", "Principal": { "Service": "iotanalytics.amazonaws.com" }, "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:PutObject", "s3:DeleteObject" ], "Resource": [ "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDataset}", "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDataset}/*" ] } ] } # ------------------------------------------------------------# # IAM Policy for access S3 Buckets from IoT Analytics # ------------------------------------------------------------# # IAM Policy (IoT Analytics Channel -> IoT Analytics Bucket) PolicyForIoTAnalyticsChannelToBucket: Type: AWS::IAM::ManagedPolicy Properties: PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - s3:PutObject - s3:GetObject - s3:ListBucket - s3:GetBucketLocation Resource: - !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel} - !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel}/* ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-channel-policy # IAM Policy (IoT Analytics DataStore -> IoT Analytics Bucket) PolicyForIoTAnalyticsDatastoreToBucket: Type: AWS::IAM::ManagedPolicy Properties: PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - s3:PutObject - s3:DeleteObject - s3:GetBucketLocation Resource: - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore} - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore}/* ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-datastore-policy # IAM Policy (IoT Analytics DataSet -> IoT Analytics Bucket) PolicyForIoTAnalyticsDatasetToBucket: Type: AWS::IAM::ManagedPolicy Properties: PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - s3:PutObject Resource: - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDataset}/* ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-dataset-policy # ------------------------------------------------------------# # IAM Role for access S3 Buckets from IoT Analytics # ------------------------------------------------------------# # IAM Role (IoT Analytics Channel -> IoT Analytics Bucket) RoleForIoTAnalyticsChannelToBucket: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "iotanalytics.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" RoleName: !Sub ${NamePrefix}-iot-analytics-channel-bucket-role ManagedPolicyArns: - !Ref PolicyForIoTAnalyticsChannelToBucket # IAM Role (IoT Analytics Datastore -> IoT Analytics Bucket) RoleForIoTAnalyticsDatastoreToBucket: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "iotanalytics.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" RoleName: !Sub ${NamePrefix}-iot-analytics-datastore-bucket-role ManagedPolicyArns: - !Ref PolicyForIoTAnalyticsDatastoreToBucket # IAM Role (IoT Analytics Dataset -> IoT Analytics Bucket) RoleForIoTAnalyticsDatasetToBucket: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "iotanalytics.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" RoleName: !Sub ${NamePrefix}-iot-analytics-dataset-bucket-role ManagedPolicyArns: - !Ref PolicyForIoTAnalyticsDatasetToBucket # ------------------------------------------------------------# # AWS IoT Analytics # ------------------------------------------------------------# IoTAnalyticsChannel: Type: AWS::IoTAnalytics::Channel Properties: ChannelName: !Sub ${NamePrefix}_channel ChannelStorage: CustomerManagedS3: Bucket: !Ref BucketForIoTAnalyticsChannel #KeyPrefix: String RoleArn: !GetAtt RoleForIoTAnalyticsChannelToBucket.Arn DependsOn: BucketForIoTAnalyticsChannel IoTAnalyticsDatastore: Type: AWS::IoTAnalytics::Datastore Properties: DatastoreName: !Sub ${NamePrefix}_datastore DatastoreStorage: CustomerManagedS3: Bucket: !Ref BucketForIoTAnalyticsDatastore #KeyPrefix: String RoleArn: !GetAtt RoleForIoTAnalyticsDatastoreToBucket.Arn DependsOn: BucketForIoTAnalyticsDatastore LambdaExecutionRole: Type: AWS::IAM::Role Condition: CreateLambdaResources Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" ManagedPolicyArns: - !Sub 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' LambdaFunction: Type: AWS::Lambda::Function Condition: CreateLambdaResources Properties: FunctionName: !Sub ${NamePrefix}-pipeline-lambda-function Handler: index.lambda_handler Role: !GetAtt LambdaExecutionRole.Arn Runtime: python3.9 Code: ZipFile: | import json import logging import sys import time from datetime import datetime, timezone, timedelta # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) streamHandler = logging.StreamHandler(stream=sys.stdout) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) # timezone JST = timezone(timedelta(hours=+9), 'JST') def lambda_handler(event, context): logger.info("event: {}".format(event)) transformed = [] for e in event: # e に単一アセットのデータが全部入る logger.info("e: {}".format(json.dumps(e, indent=2))) print(type(e)) #<class 'dict'> property_alias = e['propertyAlias'] propertyValuesList = e['propertyValues'] property_value = propertyValuesList[0] value = "" if 'doubleValue' in property_value['value']: value = property_value['value']['doubleValue'] logger.info("value_type: doubleValue") if 'integerValue' in property_value['value']: value = property_value['value']['integerValue'] logger.info("value_type: integerValue") if 'booleanValue' in property_value['value']: value = property_value['value']['booleanValue'] logger.info("value_type: booleanValue") if 'stringValue' in property_value['value']: value = property_value['value']['stringValue'] logger.info("value_type: stringValue") quality = "" if 'quality' in property_value: quality = property_value['quality'] logger.debug("quality in payload") timestamp = "" unixtime = "" nanoseconds = "" devicetime = "" if 'timestamp' in property_value and 'timeInSeconds' in property_value['timestamp'] and 'offsetInNanos' in property_value['timestamp']: inttime = property_value['timestamp']['timeInSeconds'] # 1655307221 milliseconds = property_value['timestamp']['offsetInNanos'] / 1000000000 # 0.157 print("milliseconds: " + str(milliseconds)) unixtime = inttime + milliseconds # 1655307221.157 print("unixtime: " + str(unixtime)) dt = datetime.fromtimestamp(unixtime).replace(tzinfo=timezone.utc).astimezone(tz=JST) timestamp = dt.isoformat(timespec='milliseconds') print("timestamp: " + str(timestamp)) if 'timestamp' in property_value and 'timeInSeconds' in property_value['timestamp'] and not 'offsetInNanos' in property_value['timestamp']: unixtime = property_value['timestamp']['timeInSeconds'] # 1655307221 devicetime = datetime.fromtimestamp(unixtime) # '2022-06-15 15:33:41' (UTC) timestamp = str(devicetime) + str(".") + str(000) # '2022-06-16 15:33:41.000' (UTC) unixtime = property_value['timestamp']['timeInSeconds'] # 1655307221 devicetime = datetime.fromtimestamp(unixtime) # '2022-06-15 15:33:41' (UTC) dt = datetime.fromtimestamp(unixtime).replace(tzinfo=timezone.utc).astimezone(tz=JST) timestamp = dt.isoformat(timespec='milliseconds') # 2022-06-16T09:33:41.000+09:00 row = {} row['propertyAlias'] = property_alias row['value'] = value row['timestamp'] = timestamp row['quality'] = quality logger.debug("row: {}".format(row)) transformed.append(row) logger.info("transformed: {}\n".format(json.dumps(transformed, indent=2))) return transformed # return は[]リストで返す LambdaPermission: Type: AWS::Lambda::Permission Condition: CreateLambdaResources Properties: Action: lambda:InvokeFunction FunctionName: !GetAtt LambdaFunction.Arn Principal: iotanalytics.amazonaws.com IoTAnalyticsPipelineLambdaTrue: Type: AWS::IoTAnalytics::Pipeline Condition: CreateLambdaResources Properties: PipelineName: !Sub ${NamePrefix}_pipeline PipelineActivities: - Channel: # データの取得元を指定する Name: pipeline-channel-activity ChannelName: !Sub ${NamePrefix}_channel Next: pipeline-lambda-activity Lambda: # LambdaでTQVデータの全てのキーを同じレベルでフラット化したJSONに変換 Name: pipeline-lambda-activity BatchSize: 1 LambdaName: !Sub ${NamePrefix}-pipeline-lambda-function Next: pipeline-datastore-activity Datastore: # データの保存先指定する Name: pipeline-datastore-activity DatastoreName: !Sub ${NamePrefix}_datastore IoTAnalyticsPipelineLambdaFalse: Type: AWS::IoTAnalytics::Pipeline Condition: SkipLambdaResources Properties: PipelineName: !Sub ${NamePrefix}_pipeline PipelineActivities: - Channel: # データの取得元を指定する Name: pipeline-channel-activity ChannelName: !Sub ${NamePrefix}_channel Next: pipeline-datastore-activity Datastore: # データの保存先指定する Name: pipeline-datastore-activity DatastoreName: !Sub ${NamePrefix}_datastore IoTAnalyticsDataset: Type: AWS::IoTAnalytics::Dataset Properties: DatasetName: !Sub ${NamePrefix}_dataset Actions: - ActionName: SqlAction QueryAction: # データの取得 SqlQuery: !If [ CreateLambdaResources, !Sub "SELECT propertyAlias, value, timestamp, quality FROM ${NamePrefix}_datastore WHERE __dt > current_date - interval '1' day", !Sub "SELECT * FROM ${NamePrefix}_datastore WHERE __dt > current_date - interval '1' day" ] RetentionPeriod: # データセットの保持日数 NumberOfDays: 1 Unlimited: false Triggers: # データセットを自動更新する - Schedule: ScheduleExpression: rate(5 minute) # データセットをユーザー管理のS3に保存 ContentDeliveryRules: - Destination: S3DestinationConfiguration: Bucket: !Ref BucketForIoTAnalyticsDataset Key: '!{iotanalytics:scheduleTime}/!{iotanalytics:versionId}.csv' RoleArn: !GetAtt RoleForIoTAnalyticsDatasetToBucket.Arn DependsOn: IoTAnalyticsDatastore
Lambda 関数の作成有無を指定
冒頭でお伝えしたとおり、このテンプレートは SiteWise ゲートウェイからの TQV 形式のデータを QuickSight で分析しやすい形にする環境を構築するものです。
そのためデプロイされる Lambda 関数では TQV データを適切な形に変換する処理を行います。
しかし、場合によっては IoT Analytics の環境だけ欲しい( Lambda 関数は不要、もしくは自分のやりたい処理に合わせた関数を作りたい)といったケースもあると思います。 そのため本テンプレートでは、パラメーターで Lambda 関数の作成有無を入力することで、Lambda 関数の作成を選択できるようにしています。
もし Lambda 関数を作成しない場合、IoT Analytics のSQL データセットに指定される SQL 文は下記のようになるので、スタック作成後に必要に応じて適宜変更してください。
(${NamePrefix}
には CloudFormation のスタック作成時に指定したものが入ります。)
SELECT * FROM ${NamePrefix}_datastore
スタックの作成
スタックの作成時は「リソースを区別するためのプレフィックス」と「Lambda 関数の作成有無」を指定します。
UseLambda
のパラメーターの選択の違いは下記のとおりです。
UseLambda
: Lambda 関数を作成するNoUseLambda
: Lambda 関数を作成しない
(値にパラメーターと同じ UseLambda
を使ってしまっていますが…)
また、NamePrefix
では英小文字と数字だけ利用できるようにしています。
これは、S3 のバケット名と IoT Analytics の各種リソース名の命名規則によるものです。IoT Analytics のリソース名ではアンダースコアを利用できる一方、S3 バケット名ではアンダースコアが使えません。
もう少し柔軟に設定できるようにしても良かったのですが、テンプレートが更に長大化するので個人利用する分には不要と判断しました。
スタックの削除
スタックを削除する際ですが、収集したデータを別の用途に使いたいこともあるかもしれないので、スタックを削除しても S3 バケットは削除しないようにしています。
AWS マネージドな S3 バケットを使う場合
ユーザーマネージドではなく AWS マネージドの S3 バケットを使う場合は下記のテンプレートを参考にしていただければと思います。
スタック削除時に S3 バケットも削除するテンプレート(2022年7月8日追記)
上記のものを改修して、スタック削除時に S3 バケットも削除するテンプレートも作成してみました。
収集したデータも合わせて全て削除したい場合にご利用ください。
バケット内のオブジェクトを削除するためにカスタムリソースを作っていますが、この Lambda 関数のタイムアウトを最大の 900 秒にしています。膨大な量のデータがある場合はタイムアウトする可能性があるので、S3 のライフサイクルポリシーなどを活用して削除するようにしてください。
--- AWSTemplateFormatVersion: "2010-09-09" Description: AWS IoT Analytics and Lambda to convert TQV data (SiteWise Gateway data). And When deleting the stack, the S3 buckets are also deleted. ############################################################### # Parameters ############################################################### Parameters: NamePrefix: Description: "Set resource name prefix. The following expressions are permitted: ^[a-z0-9]*$" Type: String AllowedPattern: ^[a-z0-9]*$ UseLambda: Description: "Choose whether to use the Lambda function" Default: UseLambda Type: String AllowedValues: - UseLambda - NoUseLambda ############################################################### # Conditions ############################################################### Conditions: CreateLambdaResources: !Equals [!Ref UseLambda, "UseLambda"] SkipLambdaResources: !Equals [!Ref UseLambda, "NoUseLambda"] ############################################################### # Resources ############################################################### Resources: # ------------------------------------------------------------# # S3 Bucket - for IoT Analytics Channel # ------------------------------------------------------------# BucketForIoTAnalyticsChannel: Type: AWS::S3::Bucket Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 BucketName: !Sub ${NamePrefix}-iot-analytics-channel-bucket # Bucket Policy for IoT Analytics Channel Bucket BucketPolicyForIoTAnalyticsChannnelBucket: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref BucketForIoTAnalyticsChannel PolicyDocument: { "Version": "2012-10-17", "Id": "MyPolicyID", "Statement": [ { "Sid": "MyStatementSid", "Effect": "Allow", "Principal": { "Service": "iotanalytics.amazonaws.com" }, "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:PutObject", "s3:DeleteObject" ], "Resource": [ "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsChannel}", "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsChannel}/*" ] } ] } # ------------------------------------------------------------# # S3 Bucket - for IoT Analytics Data store # ------------------------------------------------------------# BucketForIoTAnalyticsDatastore: Type: AWS::S3::Bucket Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 BucketName: !Sub ${NamePrefix}-iot-analytics-datastore-bucket # Bucket Policy for IoT Analytics DataStore Bucket BucketPolicyForIoTAnalyticsDatastorelBucket: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref BucketForIoTAnalyticsDatastore PolicyDocument: { "Version": "2012-10-17", "Id": "MyPolicyID", "Statement": [ { "Sid": "MyStatementSid", "Effect": "Allow", "Principal": { "Service": "iotanalytics.amazonaws.com" }, "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:PutObject", "s3:DeleteObject" ], "Resource": [ "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDatastore}", "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDatastore}/*" ] } ] } # ------------------------------------------------------------# # S3 Bucket - for IoT Analytics Data set # ------------------------------------------------------------# BucketForIoTAnalyticsDataset: Type: AWS::S3::Bucket Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 BucketName: !Sub ${NamePrefix}-iot-analytics-dataset-bucket DependsOn: IoTAnalyticsDatastore # Bucket Policy for IoT Analytics Dataset Bucket BucketPolicyForIoTAnalyticsDatasetBucket: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref BucketForIoTAnalyticsDataset PolicyDocument: { "Version": "2012-10-17", "Id": "MyPolicyID", "Statement": [ { "Sid": "MyStatementSid", "Effect": "Allow", "Principal": { "Service": "iotanalytics.amazonaws.com" }, "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:PutObject", "s3:DeleteObject" ], "Resource": [ "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDataset}", "Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDataset}/*" ] } ] } # ------------------------------------------------------------# # IAM Policy for access S3 Buckets from IoT Analytics # ------------------------------------------------------------# # IAM Policy (IoT Analytics Channel -> IoT Analytics Bucket) PolicyForIoTAnalyticsChannelToBucket: Type: AWS::IAM::ManagedPolicy Properties: PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - s3:PutObject - s3:GetObject - s3:ListBucket - s3:GetBucketLocation Resource: - !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel} - !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel}/* ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-channel-policy # IAM Policy (IoT Analytics DataStore -> IoT Analytics Bucket) PolicyForIoTAnalyticsDatastoreToBucket: Type: AWS::IAM::ManagedPolicy Properties: PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - s3:PutObject - s3:DeleteObject - s3:GetBucketLocation Resource: - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore} - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore}/* ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-datastore-policy # IAM Policy (IoT Analytics DataSet -> IoT Analytics Bucket) PolicyForIoTAnalyticsDatasetToBucket: Type: AWS::IAM::ManagedPolicy Properties: PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - s3:PutObject Resource: - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDataset}/* ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-dataset-policy # ------------------------------------------------------------# # IAM Role for access S3 Buckets from IoT Analytics # ------------------------------------------------------------# # IAM Role (IoT Analytics Channel -> IoT Analytics Bucket) RoleForIoTAnalyticsChannelToBucket: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "iotanalytics.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" RoleName: !Sub ${NamePrefix}-iot-analytics-channel-bucket-role ManagedPolicyArns: - !Ref PolicyForIoTAnalyticsChannelToBucket # IAM Role (IoT Analytics Datastore -> IoT Analytics Bucket) RoleForIoTAnalyticsDatastoreToBucket: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "iotanalytics.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" RoleName: !Sub ${NamePrefix}-iot-analytics-datastore-bucket-role ManagedPolicyArns: - !Ref PolicyForIoTAnalyticsDatastoreToBucket # IAM Role (IoT Analytics Dataset -> IoT Analytics Bucket) RoleForIoTAnalyticsDatasetToBucket: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "iotanalytics.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" RoleName: !Sub ${NamePrefix}-iot-analytics-dataset-bucket-role ManagedPolicyArns: - !Ref PolicyForIoTAnalyticsDatasetToBucket # ------------------------------------------------------------# # Lambda # ------------------------------------------------------------# # Lambda for IoT Analytics activity LambdaExecutionRole: Type: AWS::IAM::Role Condition: CreateLambdaResources Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" ManagedPolicyArns: - !Sub 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' LambdaFunctionIoTAnalyticsActivity: Type: AWS::Lambda::Function Condition: CreateLambdaResources Properties: FunctionName: !Sub ${NamePrefix}-pipeline-lambda-function Handler: index.lambda_handler Role: !GetAtt LambdaExecutionRole.Arn Runtime: python3.9 Code: ZipFile: | import json import logging import sys import time from datetime import datetime, timezone, timedelta # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) streamHandler = logging.StreamHandler(stream=sys.stdout) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) # timezone JST = timezone(timedelta(hours=+9), 'JST') def lambda_handler(event, context): logger.info("event: {}".format(event)) transformed = [] for e in event: #for e, value in event.items(): # e に単一アセットのデータが全部入る logger.info("e: {}".format(json.dumps(e, indent=2))) print(type(e)) #<class 'dict'> property_alias = e['propertyAlias'] propertyValuesList = e['propertyValues'] property_value = propertyValuesList[0] value = "" if 'doubleValue' in property_value['value']: value = property_value['value']['doubleValue'] logger.info("value_type: doubleValue") if 'integerValue' in property_value['value']: value = property_value['value']['integerValue'] logger.info("value_type: integerValue") if 'booleanValue' in property_value['value']: value = property_value['value']['booleanValue'] logger.info("value_type: booleanValue") if 'stringValue' in property_value['value']: value = property_value['value']['stringValue'] logger.info("value_type: stringValue") quality = "" if 'quality' in property_value: quality = property_value['quality'] logger.debug("quality in payload") timestamp = "" unixtime = "" nanoseconds = "" devicetime = "" if 'timestamp' in property_value and 'timeInSeconds' in property_value['timestamp'] and 'offsetInNanos' in property_value['timestamp']: inttime = property_value['timestamp']['timeInSeconds'] # 1655307221 milliseconds = property_value['timestamp']['offsetInNanos'] / 1000000000 # 0.157 print("milliseconds: " + str(milliseconds)) unixtime = inttime + milliseconds # 1655307221.157 print("unixtime: " + str(unixtime)) dt = datetime.fromtimestamp(unixtime).replace(tzinfo=timezone.utc).astimezone(tz=JST) timestamp = dt.isoformat(timespec='milliseconds') print("timestamp: " + str(timestamp)) #print(timestamp) # 2022-06-16T09:33:41.157+09:00 if 'timestamp' in property_value and 'timeInSeconds' in property_value['timestamp'] and not 'offsetInNanos' in property_value['timestamp']: unixtime = property_value['timestamp']['timeInSeconds'] # 1655307221 devicetime = datetime.fromtimestamp(unixtime) # '2022-06-15 15:33:41' (UTC) timestamp = str(devicetime) + str(".") + str(000) # '2022-06-16 15:33:41.000' (UTC) unixtime = property_value['timestamp']['timeInSeconds'] # 1655307221 devicetime = datetime.fromtimestamp(unixtime) # '2022-06-15 15:33:41' (UTC) dt = datetime.fromtimestamp(unixtime).replace(tzinfo=timezone.utc).astimezone(tz=JST) timestamp = dt.isoformat(timespec='milliseconds') # 2022-06-16T09:33:41.000+09:00 row = {} row['propertyAlias'] = property_alias row['value'] = value row['timestamp'] = timestamp row['quality'] = quality logger.debug("row: {}".format(row)) transformed.append(row) logger.info("transformed: {}\n".format(json.dumps(transformed, indent=2))) return transformed # return は[]リストで返す LambdaPermission: Type: AWS::Lambda::Permission Condition: CreateLambdaResources Properties: Action: lambda:InvokeFunction FunctionName: !GetAtt LambdaFunctionIoTAnalyticsActivity.Arn Principal: iotanalytics.amazonaws.com ############################################################### # Custom Resources ############################################################### # ------------------------------------------------------------# # IAM Policy for CloudFormation custom resource Lambda # ------------------------------------------------------------# PolicyForCustomeResourceDeleteS3Object: Type: AWS::IAM::ManagedPolicy Properties: PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - s3:DeleteObject - s3:List* Resource: - !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel} - !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel}/* - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore} - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore}/* - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDataset} - !Sub arn:aws:s3:::${BucketForIoTAnalyticsDataset}/* # ------------------------------------------------------------# # Custom Resources # ------------------------------------------------------------# # カスタムリソース追加 # Delete Bucket for IoT Analytics Channel LambdaUsedToDeleteBucketForIoTAnalyticsChannel: Type: Custom::cleanupbucket Properties: ServiceToken: !GetAtt LambdaFunctionS3BucketDelete.Arn BucketName: !Ref BucketForIoTAnalyticsChannel # Delete Bucket for IoT Analytics Datastore LambdaUsedToDeleteBucketForIoTAnalyticsDatastore: Type: Custom::cleanupbucket Properties: ServiceToken: !GetAtt LambdaFunctionS3BucketDelete.Arn BucketName: !Ref BucketForIoTAnalyticsDatastore # Delete Bucket for IoT Analytics Dataset LambdaUsedToDeleteBucketForIoTAnalyticsDataset: Type: Custom::cleanupbucket Properties: ServiceToken: !GetAtt LambdaFunctionS3BucketDelete.Arn BucketName: !Ref BucketForIoTAnalyticsDataset # ------------------------------------------------------------# # Custom resource Lambda # ------------------------------------------------------------# # Lambda for S3 bucket delete custom resource # Lambda Role LambdaExecutionRoleCustomResource: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" ManagedPolicyArns: - !Sub 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' - !Ref PolicyForCustomeResourceDeleteS3Object # Lambda Function LambdaFunctionS3BucketDelete: Type: AWS::Lambda::Function Properties: FunctionName: !Sub ${NamePrefix}-delete-s3 Handler: index.lambda_handler Role: !GetAtt LambdaExecutionRoleCustomResource.Arn Runtime: python3.9 Timeout: 900 Code: ZipFile: | import json import boto3 import cfnresponse def lambda_handler(event, context): try: bucket = event['ResourceProperties']['BucketName'] if event['RequestType'] == 'Delete': s3 = boto3.resource('s3') bucket = s3.Bucket(bucket) for obj in bucket.objects.filter(): s3.Object(bucket.name, obj.key).delete() #bucket.object_versions.delete() # 必要に応じて cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Response': 'Success'}) except Exception as e: print(e) cfnresponse.send(event, context, cfnresponse.FAILED, {'Response': 'Failed'}) # ------------------------------------------------------------# # AWS IoT Analytics # ------------------------------------------------------------# # IoT Analytics Channel IoTAnalyticsChannel: Type: AWS::IoTAnalytics::Channel Properties: ChannelName: !Sub ${NamePrefix}_channel ChannelStorage: CustomerManagedS3: Bucket: !Ref BucketForIoTAnalyticsChannel RoleArn: !GetAtt RoleForIoTAnalyticsChannelToBucket.Arn DependsOn: BucketForIoTAnalyticsChannel # IoT Analytics Datastore IoTAnalyticsDatastore: Type: AWS::IoTAnalytics::Datastore Properties: DatastoreName: !Sub ${NamePrefix}_datastore DatastoreStorage: CustomerManagedS3: Bucket: !Ref BucketForIoTAnalyticsDatastore RoleArn: !GetAtt RoleForIoTAnalyticsDatastoreToBucket.Arn DependsOn: BucketForIoTAnalyticsDatastore # IoT Analytics Pipeline (No Lambda Activity) IoTAnalyticsPipelineLambdaFalse: Type: AWS::IoTAnalytics::Pipeline Condition: SkipLambdaResources Properties: PipelineName: !Sub ${NamePrefix}_pipeline PipelineActivities: - Channel: # データの取得元を指定する Name: pipeline-channel-activity ChannelName: !Sub ${NamePrefix}_channel Next: pipeline-datastore-activity Datastore: # データの保存先指定する Name: pipeline-datastore-activity DatastoreName: !Sub ${NamePrefix}_datastore # IoT Analytics Pipeline (with Lambda Activity) IoTAnalyticsPipelineLambdaTrue: Type: AWS::IoTAnalytics::Pipeline Condition: CreateLambdaResources Properties: PipelineName: !Sub ${NamePrefix}_pipeline PipelineActivities: - Channel: # データの取得元を指定する Name: pipeline-channel-activity ChannelName: !Sub ${NamePrefix}_channel Next: pipeline-lambda-activity Lambda: # LambdaでTQVデータの全てのキーを同じレベルでフラット化したJSONに変換 Name: pipeline-lambda-activity BatchSize: 1 LambdaName: !Sub ${NamePrefix}-pipeline-lambda-function Next: pipeline-datastore-activity Datastore: # データの保存先指定する Name: pipeline-datastore-activity DatastoreName: !Sub ${NamePrefix}_datastore IoTAnalyticsDataset: Type: AWS::IoTAnalytics::Dataset Properties: DatasetName: !Sub ${NamePrefix}_dataset Actions: - ActionName: SqlAction QueryAction: # 当日分のデータのみ取得する #SqlQuery: !Sub "SELECT propertyAlias, value, timestamp, quality FROM ${NamePrefix}_datastore" SqlQuery: !If [ CreateLambdaResources, !Sub "SELECT propertyAlias, value, timestamp, quality FROM ${NamePrefix}_datastore", !Sub "SELECT * FROM ${NamePrefix}_datastore" ] RetentionPeriod: # データセットの保持日数 NumberOfDays: 1 Unlimited: false Triggers: # データセットを自動更新する - Schedule: ScheduleExpression: rate(5 minute) # データセットをユーザー管理のS3に保存 ContentDeliveryRules: - Destination: S3DestinationConfiguration: Bucket: !Ref BucketForIoTAnalyticsDataset Key: '!{iotanalytics:scheduleTime}/!{iotanalytics:versionId}.csv' RoleArn: !GetAtt RoleForIoTAnalyticsDatasetToBucket.Arn #EntryName: String DependsOn: IoTAnalyticsDatastore
最後に
シングルステップセットアップで IoT Analytics 環境を簡単に作成することはできますが「データの中身を直接見る」ということができなかったので、これでいつでも欲しい環境がサクッと準備できるようになりました。
IoT Analytics を利用される際に参考にしていただければ幸いです。
以上です。