この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
以前、SiteWise ゲートウェイで収集した 設備機器の稼働データを IoT Analytics でデータ変換して QuickSight で利用する方法を紹介しました。
個人的にこの構成を頻繁に作る機会があったので、CloudFormation で作成できるようにしてみました。
前回のリソースとの変更点 〜 S3 バケットはユーザー管理〜
前回の記事では、IoT Analytics のリソースはシングルステップセットアップで作成していましたが、シングルステップセットアップでは、S3 バケットが AWS 管理のものになるためバケットの中身をユーザー直接見ることができません。
そのため今回のテンプレートでは、S3 バケットは全て「ユーザー管理のバケット」として作成することにしました。
作成される環境
紹介するテンプレートでは、下記の赤枠のリソースが作成されます。
IoT Analytics のチャンネル、データストア、パイプラインのそれぞれが別の S3 バケットとなります。
CloudFormation テンプレート
テンプレートは下記のとおりです。
AWSTemplateFormatVersion: "2010-09-09"
Description: AWS IoT Analytics and Lambda to convert TQV data (SiteWise Gateway data).
###############################################################
# Parameters
###############################################################
Parameters:
NamePrefix:
Description: "Set resource name prefix. The following expressions are permitted: ^[a-z0-9]*$"
Type: String
AllowedPattern: ^[a-z0-9]*$
UseLambda:
Description: "Choose whether to use the Lambda function"
Default: UseLambda
Type: String
AllowedValues:
- UseLambda
- NoUseLambda
###############################################################
# Conditions
###############################################################
Conditions:
CreateLambdaResources: !Equals [!Ref UseLambda, "UseLambda"]
SkipLambdaResources: !Equals [!Ref UseLambda, "NoUseLambda"]
###############################################################
# Resources
###############################################################
Resources:
# ------------------------------------------------------------#
# S3 Bucket - for IoT Analytics Channel
# ------------------------------------------------------------#
BucketForIoTAnalyticsChannel:
Type: AWS::S3::Bucket
DeletionPolicy: Retain
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
BucketName: !Sub ${NamePrefix}-iot-analytics-channel-bucket
# Bucket Policy for IoT Analytics Channel Bucket
BucketPolicyForIoTAnalyticsChannnelBucket:
Type: AWS::S3::BucketPolicy
DeletionPolicy: Retain
Properties:
Bucket: !Ref BucketForIoTAnalyticsChannel
PolicyDocument: { "Version": "2012-10-17",
"Id": "MyPolicyID",
"Statement": [
{
"Sid": "MyStatementSid",
"Effect": "Allow",
"Principal": {
"Service": "iotanalytics.amazonaws.com"
},
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": [
"Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsChannel}",
"Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsChannel}/*"
]
}
]
}
# ------------------------------------------------------------#
# S3 Bucket - for IoT Analytics Data store
# ------------------------------------------------------------#
BucketForIoTAnalyticsDatastore:
Type: AWS::S3::Bucket
DeletionPolicy: Retain
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
BucketName: !Sub ${NamePrefix}-iot-analytics-datastore-bucket
# Bucket Policy for IoT Analytics DataStore Bucket
BucketPolicyForIoTAnalyticsDatastorelBucket:
Type: AWS::S3::BucketPolicy
DeletionPolicy: Retain
Properties:
Bucket: !Ref BucketForIoTAnalyticsDatastore
PolicyDocument: { "Version": "2012-10-17",
"Id": "MyPolicyID",
"Statement": [
{
"Sid": "MyStatementSid",
"Effect": "Allow",
"Principal": {
"Service": "iotanalytics.amazonaws.com"
},
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": [
"Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDatastore}",
"Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDatastore}/*"
]
}
]
}
# ------------------------------------------------------------#
# S3 Bucket - for IoT Analytics Data set
# ------------------------------------------------------------#
BucketForIoTAnalyticsDataset:
Type: AWS::S3::Bucket
DeletionPolicy: Retain
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
BucketName: !Sub ${NamePrefix}-iot-analytics-dataset-bucket
DependsOn: IoTAnalyticsDatastore
# Bucket Policy for IoT Analytics Dataset Bucket
BucketPolicyForIoTAnalyticsDatasetBucket:
Type: AWS::S3::BucketPolicy
DeletionPolicy: Retain
Properties:
Bucket: !Ref BucketForIoTAnalyticsDataset
PolicyDocument: { "Version": "2012-10-17",
"Id": "MyPolicyID",
"Statement": [
{
"Sid": "MyStatementSid",
"Effect": "Allow",
"Principal": {
"Service": "iotanalytics.amazonaws.com"
},
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": [
"Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDataset}",
"Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDataset}/*"
]
}
]
}
# ------------------------------------------------------------#
# IAM Policy for access S3 Buckets from IoT Analytics
# ------------------------------------------------------------#
# IAM Policy (IoT Analytics Channel -> IoT Analytics Bucket)
PolicyForIoTAnalyticsChannelToBucket:
Type: AWS::IAM::ManagedPolicy
Properties:
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- s3:PutObject
- s3:GetObject
- s3:ListBucket
- s3:GetBucketLocation
Resource:
- !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel}
- !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel}/*
ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-channel-policy
# IAM Policy (IoT Analytics DataStore -> IoT Analytics Bucket)
PolicyForIoTAnalyticsDatastoreToBucket:
Type: AWS::IAM::ManagedPolicy
Properties:
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- s3:PutObject
- s3:DeleteObject
- s3:GetBucketLocation
Resource:
- !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore}
- !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore}/*
ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-datastore-policy
# IAM Policy (IoT Analytics DataSet -> IoT Analytics Bucket)
PolicyForIoTAnalyticsDatasetToBucket:
Type: AWS::IAM::ManagedPolicy
Properties:
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- s3:PutObject
Resource:
- !Sub arn:aws:s3:::${BucketForIoTAnalyticsDataset}/*
ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-dataset-policy
# ------------------------------------------------------------#
# IAM Role for access S3 Buckets from IoT Analytics
# ------------------------------------------------------------#
# IAM Role (IoT Analytics Channel -> IoT Analytics Bucket)
RoleForIoTAnalyticsChannelToBucket:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "iotanalytics.amazonaws.com"
Action:
- "sts:AssumeRole"
Path: "/"
RoleName: !Sub ${NamePrefix}-iot-analytics-channel-bucket-role
ManagedPolicyArns:
- !Ref PolicyForIoTAnalyticsChannelToBucket
# IAM Role (IoT Analytics Datastore -> IoT Analytics Bucket)
RoleForIoTAnalyticsDatastoreToBucket:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- "iotanalytics.amazonaws.com"
Action:
- "sts:AssumeRole"
Path: "/"
RoleName: !Sub ${NamePrefix}-iot-analytics-datastore-bucket-role
ManagedPolicyArns:
- !Ref PolicyForIoTAnalyticsDatastoreToBucket
# IAM Role (IoT Analytics Dataset -> IoT Analytics Bucket)
RoleForIoTAnalyticsDatasetToBucket:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "iotanalytics.amazonaws.com"
Action:
- "sts:AssumeRole"
Path: "/"
RoleName: !Sub ${NamePrefix}-iot-analytics-dataset-bucket-role
ManagedPolicyArns:
- !Ref PolicyForIoTAnalyticsDatasetToBucket
# ------------------------------------------------------------#
# AWS IoT Analytics
# ------------------------------------------------------------#
IoTAnalyticsChannel:
Type: AWS::IoTAnalytics::Channel
Properties:
ChannelName: !Sub ${NamePrefix}_channel
ChannelStorage:
CustomerManagedS3:
Bucket: !Ref BucketForIoTAnalyticsChannel
#KeyPrefix: String
RoleArn: !GetAtt RoleForIoTAnalyticsChannelToBucket.Arn
DependsOn: BucketForIoTAnalyticsChannel
IoTAnalyticsDatastore:
Type: AWS::IoTAnalytics::Datastore
Properties:
DatastoreName: !Sub ${NamePrefix}_datastore
DatastoreStorage:
CustomerManagedS3:
Bucket: !Ref BucketForIoTAnalyticsDatastore
#KeyPrefix: String
RoleArn: !GetAtt RoleForIoTAnalyticsDatastoreToBucket.Arn
DependsOn: BucketForIoTAnalyticsDatastore
LambdaExecutionRole:
Type: AWS::IAM::Role
Condition: CreateLambdaResources
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- "lambda.amazonaws.com"
Action:
- "sts:AssumeRole"
Path: "/"
ManagedPolicyArns:
- !Sub 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
LambdaFunction:
Type: AWS::Lambda::Function
Condition: CreateLambdaResources
Properties:
FunctionName: !Sub ${NamePrefix}-pipeline-lambda-function
Handler: index.lambda_handler
Role: !GetAtt LambdaExecutionRole.Arn
Runtime: python3.9
Code:
ZipFile: |
import json
import logging
import sys
import time
from datetime import datetime, timezone, timedelta
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
# timezone
JST = timezone(timedelta(hours=+9), 'JST')
def lambda_handler(event, context):
logger.info("event: {}".format(event))
transformed = []
for e in event:
# e に単一アセットのデータが全部入る
logger.info("e: {}".format(json.dumps(e, indent=2)))
print(type(e)) #<class 'dict'>
property_alias = e['propertyAlias']
propertyValuesList = e['propertyValues']
property_value = propertyValuesList[0]
value = ""
if 'doubleValue' in property_value['value']:
value = property_value['value']['doubleValue']
logger.info("value_type: doubleValue")
if 'integerValue' in property_value['value']:
value = property_value['value']['integerValue']
logger.info("value_type: integerValue")
if 'booleanValue' in property_value['value']:
value = property_value['value']['booleanValue']
logger.info("value_type: booleanValue")
if 'stringValue' in property_value['value']:
value = property_value['value']['stringValue']
logger.info("value_type: stringValue")
quality = ""
if 'quality' in property_value:
quality = property_value['quality']
logger.debug("quality in payload")
timestamp = ""
unixtime = ""
nanoseconds = ""
devicetime = ""
if 'timestamp' in property_value and 'timeInSeconds' in property_value['timestamp'] and 'offsetInNanos' in property_value['timestamp']:
inttime = property_value['timestamp']['timeInSeconds'] # 1655307221
milliseconds = property_value['timestamp']['offsetInNanos'] / 1000000000 # 0.157
print("milliseconds: " + str(milliseconds))
unixtime = inttime + milliseconds # 1655307221.157
print("unixtime: " + str(unixtime))
dt = datetime.fromtimestamp(unixtime).replace(tzinfo=timezone.utc).astimezone(tz=JST)
timestamp = dt.isoformat(timespec='milliseconds')
print("timestamp: " + str(timestamp))
if 'timestamp' in property_value and 'timeInSeconds' in property_value['timestamp'] and not 'offsetInNanos' in property_value['timestamp']:
unixtime = property_value['timestamp']['timeInSeconds'] # 1655307221
devicetime = datetime.fromtimestamp(unixtime) # '2022-06-15 15:33:41' (UTC)
timestamp = str(devicetime) + str(".") + str(000) # '2022-06-16 15:33:41.000' (UTC)
unixtime = property_value['timestamp']['timeInSeconds'] # 1655307221
devicetime = datetime.fromtimestamp(unixtime) # '2022-06-15 15:33:41' (UTC)
dt = datetime.fromtimestamp(unixtime).replace(tzinfo=timezone.utc).astimezone(tz=JST)
timestamp = dt.isoformat(timespec='milliseconds') # 2022-06-16T09:33:41.000+09:00
row = {}
row['propertyAlias'] = property_alias
row['value'] = value
row['timestamp'] = timestamp
row['quality'] = quality
logger.debug("row: {}".format(row))
transformed.append(row)
logger.info("transformed: {}\n".format(json.dumps(transformed, indent=2)))
return transformed # return は[]リストで返す
LambdaPermission:
Type: AWS::Lambda::Permission
Condition: CreateLambdaResources
Properties:
Action: lambda:InvokeFunction
FunctionName: !GetAtt LambdaFunction.Arn
Principal: iotanalytics.amazonaws.com
IoTAnalyticsPipelineLambdaTrue:
Type: AWS::IoTAnalytics::Pipeline
Condition: CreateLambdaResources
Properties:
PipelineName: !Sub ${NamePrefix}_pipeline
PipelineActivities:
- Channel:
# データの取得元を指定する
Name: pipeline-channel-activity
ChannelName: !Sub ${NamePrefix}_channel
Next: pipeline-lambda-activity
Lambda:
# LambdaでTQVデータの全てのキーを同じレベルでフラット化したJSONに変換
Name: pipeline-lambda-activity
BatchSize: 1
LambdaName: !Sub ${NamePrefix}-pipeline-lambda-function
Next: pipeline-datastore-activity
Datastore:
# データの保存先指定する
Name: pipeline-datastore-activity
DatastoreName: !Sub ${NamePrefix}_datastore
IoTAnalyticsPipelineLambdaFalse:
Type: AWS::IoTAnalytics::Pipeline
Condition: SkipLambdaResources
Properties:
PipelineName: !Sub ${NamePrefix}_pipeline
PipelineActivities:
- Channel:
# データの取得元を指定する
Name: pipeline-channel-activity
ChannelName: !Sub ${NamePrefix}_channel
Next: pipeline-datastore-activity
Datastore:
# データの保存先指定する
Name: pipeline-datastore-activity
DatastoreName: !Sub ${NamePrefix}_datastore
IoTAnalyticsDataset:
Type: AWS::IoTAnalytics::Dataset
Properties:
DatasetName: !Sub ${NamePrefix}_dataset
Actions:
- ActionName: SqlAction
QueryAction:
# データの取得
SqlQuery: !If [ CreateLambdaResources, !Sub "SELECT propertyAlias, value, timestamp, quality FROM ${NamePrefix}_datastore WHERE __dt > current_date - interval '1' day", !Sub "SELECT * FROM ${NamePrefix}_datastore WHERE __dt > current_date - interval '1' day" ]
RetentionPeriod:
# データセットの保持日数
NumberOfDays: 1
Unlimited: false
Triggers:
# データセットを自動更新する
- Schedule:
ScheduleExpression: rate(5 minute)
# データセットをユーザー管理のS3に保存
ContentDeliveryRules:
- Destination:
S3DestinationConfiguration:
Bucket: !Ref BucketForIoTAnalyticsDataset
Key: '!{iotanalytics:scheduleTime}/!{iotanalytics:versionId}.csv'
RoleArn: !GetAtt RoleForIoTAnalyticsDatasetToBucket.Arn
DependsOn: IoTAnalyticsDatastore
Lambda 関数の作成有無を指定
冒頭でお伝えしたとおり、このテンプレートは SiteWise ゲートウェイからの TQV 形式のデータを QuickSight で分析しやすい形にする環境を構築するものです。
そのためデプロイされる Lambda 関数では TQV データを適切な形に変換する処理を行います。
しかし、場合によっては IoT Analytics の環境だけ欲しい( Lambda 関数は不要、もしくは自分のやりたい処理に合わせた関数を作りたい)といったケースもあると思います。 そのため本テンプレートでは、パラメーターで Lambda 関数の作成有無を入力することで、Lambda 関数の作成を選択できるようにしています。
もし Lambda 関数を作成しない場合、IoT Analytics のSQL データセットに指定される SQL 文は下記のようになるので、スタック作成後に必要に応じて適宜変更してください。
(${NamePrefix}
には CloudFormation のスタック作成時に指定したものが入ります。)
SELECT * FROM ${NamePrefix}_datastore
スタックの作成
スタックの作成時は「リソースを区別するためのプレフィックス」と「Lambda 関数の作成有無」を指定します。
UseLambda
のパラメーターの選択の違いは下記のとおりです。
UseLambda
: Lambda 関数を作成するNoUseLambda
: Lambda 関数を作成しない
(値にパラメーターと同じ UseLambda
を使ってしまっていますが…)
また、NamePrefix
では英小文字と数字だけ利用できるようにしています。
これは、S3 のバケット名と IoT Analytics の各種リソース名の命名規則によるものです。IoT Analytics のリソース名ではアンダースコアを利用できる一方、S3 バケット名ではアンダースコアが使えません。
もう少し柔軟に設定できるようにしても良かったのですが、テンプレートが更に長大化するので個人利用する分には不要と判断しました。
スタックの削除
スタックを削除する際ですが、収集したデータを別の用途に使いたいこともあるかもしれないので、スタックを削除しても S3 バケットは削除しないようにしています。
AWS マネージドな S3 バケットを使う場合
ユーザーマネージドではなく AWS マネージドの S3 バケットを使う場合は下記のテンプレートを参考にしていただければと思います。
スタック削除時に S3 バケットも削除するテンプレート(2022年7月8日追記)
上記のものを改修して、スタック削除時に S3 バケットも削除するテンプレートも作成してみました。
収集したデータも合わせて全て削除したい場合にご利用ください。
バケット内のオブジェクトを削除するためにカスタムリソースを作っていますが、この Lambda 関数のタイムアウトを最大の 900 秒にしています。膨大な量のデータがある場合はタイムアウトする可能性があるので、S3 のライフサイクルポリシーなどを活用して削除するようにしてください。
---
AWSTemplateFormatVersion: "2010-09-09"
Description: AWS IoT Analytics and Lambda to convert TQV data (SiteWise Gateway data). And When deleting the stack, the S3 buckets are also deleted.
###############################################################
# Parameters
###############################################################
Parameters:
NamePrefix:
Description: "Set resource name prefix. The following expressions are permitted: ^[a-z0-9]*$"
Type: String
AllowedPattern: ^[a-z0-9]*$
UseLambda:
Description: "Choose whether to use the Lambda function"
Default: UseLambda
Type: String
AllowedValues:
- UseLambda
- NoUseLambda
###############################################################
# Conditions
###############################################################
Conditions:
CreateLambdaResources: !Equals [!Ref UseLambda, "UseLambda"]
SkipLambdaResources: !Equals [!Ref UseLambda, "NoUseLambda"]
###############################################################
# Resources
###############################################################
Resources:
# ------------------------------------------------------------#
# S3 Bucket - for IoT Analytics Channel
# ------------------------------------------------------------#
BucketForIoTAnalyticsChannel:
Type: AWS::S3::Bucket
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
BucketName: !Sub ${NamePrefix}-iot-analytics-channel-bucket
# Bucket Policy for IoT Analytics Channel Bucket
BucketPolicyForIoTAnalyticsChannnelBucket:
Type: AWS::S3::BucketPolicy
Properties:
Bucket: !Ref BucketForIoTAnalyticsChannel
PolicyDocument: { "Version": "2012-10-17",
"Id": "MyPolicyID",
"Statement": [
{
"Sid": "MyStatementSid",
"Effect": "Allow",
"Principal": {
"Service": "iotanalytics.amazonaws.com"
},
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": [
"Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsChannel}",
"Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsChannel}/*"
]
}
]
}
# ------------------------------------------------------------#
# S3 Bucket - for IoT Analytics Data store
# ------------------------------------------------------------#
BucketForIoTAnalyticsDatastore:
Type: AWS::S3::Bucket
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
BucketName: !Sub ${NamePrefix}-iot-analytics-datastore-bucket
# Bucket Policy for IoT Analytics DataStore Bucket
BucketPolicyForIoTAnalyticsDatastorelBucket:
Type: AWS::S3::BucketPolicy
Properties:
Bucket: !Ref BucketForIoTAnalyticsDatastore
PolicyDocument: { "Version": "2012-10-17",
"Id": "MyPolicyID",
"Statement": [
{
"Sid": "MyStatementSid",
"Effect": "Allow",
"Principal": {
"Service": "iotanalytics.amazonaws.com"
},
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": [
"Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDatastore}",
"Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDatastore}/*"
]
}
]
}
# ------------------------------------------------------------#
# S3 Bucket - for IoT Analytics Data set
# ------------------------------------------------------------#
BucketForIoTAnalyticsDataset:
Type: AWS::S3::Bucket
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
BucketName: !Sub ${NamePrefix}-iot-analytics-dataset-bucket
DependsOn: IoTAnalyticsDatastore
# Bucket Policy for IoT Analytics Dataset Bucket
BucketPolicyForIoTAnalyticsDatasetBucket:
Type: AWS::S3::BucketPolicy
Properties:
Bucket: !Ref BucketForIoTAnalyticsDataset
PolicyDocument: { "Version": "2012-10-17",
"Id": "MyPolicyID",
"Statement": [
{
"Sid": "MyStatementSid",
"Effect": "Allow",
"Principal": {
"Service": "iotanalytics.amazonaws.com"
},
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": [
"Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDataset}",
"Fn::Sub": "arn:aws:s3:::${BucketForIoTAnalyticsDataset}/*"
]
}
]
}
# ------------------------------------------------------------#
# IAM Policy for access S3 Buckets from IoT Analytics
# ------------------------------------------------------------#
# IAM Policy (IoT Analytics Channel -> IoT Analytics Bucket)
PolicyForIoTAnalyticsChannelToBucket:
Type: AWS::IAM::ManagedPolicy
Properties:
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- s3:PutObject
- s3:GetObject
- s3:ListBucket
- s3:GetBucketLocation
Resource:
- !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel}
- !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel}/*
ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-channel-policy
# IAM Policy (IoT Analytics DataStore -> IoT Analytics Bucket)
PolicyForIoTAnalyticsDatastoreToBucket:
Type: AWS::IAM::ManagedPolicy
Properties:
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- s3:PutObject
- s3:DeleteObject
- s3:GetBucketLocation
Resource:
- !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore}
- !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore}/*
ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-datastore-policy
# IAM Policy (IoT Analytics DataSet -> IoT Analytics Bucket)
PolicyForIoTAnalyticsDatasetToBucket:
Type: AWS::IAM::ManagedPolicy
Properties:
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- s3:PutObject
Resource:
- !Sub arn:aws:s3:::${BucketForIoTAnalyticsDataset}/*
ManagedPolicyName: !Sub ${NamePrefix}-iot-analytics-dataset-policy
# ------------------------------------------------------------#
# IAM Role for access S3 Buckets from IoT Analytics
# ------------------------------------------------------------#
# IAM Role (IoT Analytics Channel -> IoT Analytics Bucket)
RoleForIoTAnalyticsChannelToBucket:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "iotanalytics.amazonaws.com"
Action:
- "sts:AssumeRole"
Path: "/"
RoleName: !Sub ${NamePrefix}-iot-analytics-channel-bucket-role
ManagedPolicyArns:
- !Ref PolicyForIoTAnalyticsChannelToBucket
# IAM Role (IoT Analytics Datastore -> IoT Analytics Bucket)
RoleForIoTAnalyticsDatastoreToBucket:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- "iotanalytics.amazonaws.com"
Action:
- "sts:AssumeRole"
Path: "/"
RoleName: !Sub ${NamePrefix}-iot-analytics-datastore-bucket-role
ManagedPolicyArns:
- !Ref PolicyForIoTAnalyticsDatastoreToBucket
# IAM Role (IoT Analytics Dataset -> IoT Analytics Bucket)
RoleForIoTAnalyticsDatasetToBucket:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "iotanalytics.amazonaws.com"
Action:
- "sts:AssumeRole"
Path: "/"
RoleName: !Sub ${NamePrefix}-iot-analytics-dataset-bucket-role
ManagedPolicyArns:
- !Ref PolicyForIoTAnalyticsDatasetToBucket
# ------------------------------------------------------------#
# Lambda
# ------------------------------------------------------------#
# Lambda for IoT Analytics activity
LambdaExecutionRole:
Type: AWS::IAM::Role
Condition: CreateLambdaResources
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- "lambda.amazonaws.com"
Action:
- "sts:AssumeRole"
Path: "/"
ManagedPolicyArns:
- !Sub 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
LambdaFunctionIoTAnalyticsActivity:
Type: AWS::Lambda::Function
Condition: CreateLambdaResources
Properties:
FunctionName: !Sub ${NamePrefix}-pipeline-lambda-function
Handler: index.lambda_handler
Role: !GetAtt LambdaExecutionRole.Arn
Runtime: python3.9
Code:
ZipFile: |
import json
import logging
import sys
import time
from datetime import datetime, timezone, timedelta
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
# timezone
JST = timezone(timedelta(hours=+9), 'JST')
def lambda_handler(event, context):
logger.info("event: {}".format(event))
transformed = []
for e in event:
#for e, value in event.items():
# e に単一アセットのデータが全部入る
logger.info("e: {}".format(json.dumps(e, indent=2)))
print(type(e)) #<class 'dict'>
property_alias = e['propertyAlias']
propertyValuesList = e['propertyValues']
property_value = propertyValuesList[0]
value = ""
if 'doubleValue' in property_value['value']:
value = property_value['value']['doubleValue']
logger.info("value_type: doubleValue")
if 'integerValue' in property_value['value']:
value = property_value['value']['integerValue']
logger.info("value_type: integerValue")
if 'booleanValue' in property_value['value']:
value = property_value['value']['booleanValue']
logger.info("value_type: booleanValue")
if 'stringValue' in property_value['value']:
value = property_value['value']['stringValue']
logger.info("value_type: stringValue")
quality = ""
if 'quality' in property_value:
quality = property_value['quality']
logger.debug("quality in payload")
timestamp = ""
unixtime = ""
nanoseconds = ""
devicetime = ""
if 'timestamp' in property_value and 'timeInSeconds' in property_value['timestamp'] and 'offsetInNanos' in property_value['timestamp']:
inttime = property_value['timestamp']['timeInSeconds'] # 1655307221
milliseconds = property_value['timestamp']['offsetInNanos'] / 1000000000 # 0.157
print("milliseconds: " + str(milliseconds))
unixtime = inttime + milliseconds # 1655307221.157
print("unixtime: " + str(unixtime))
dt = datetime.fromtimestamp(unixtime).replace(tzinfo=timezone.utc).astimezone(tz=JST)
timestamp = dt.isoformat(timespec='milliseconds')
print("timestamp: " + str(timestamp))
#print(timestamp) # 2022-06-16T09:33:41.157+09:00
if 'timestamp' in property_value and 'timeInSeconds' in property_value['timestamp'] and not 'offsetInNanos' in property_value['timestamp']:
unixtime = property_value['timestamp']['timeInSeconds'] # 1655307221
devicetime = datetime.fromtimestamp(unixtime) # '2022-06-15 15:33:41' (UTC)
timestamp = str(devicetime) + str(".") + str(000) # '2022-06-16 15:33:41.000' (UTC)
unixtime = property_value['timestamp']['timeInSeconds'] # 1655307221
devicetime = datetime.fromtimestamp(unixtime) # '2022-06-15 15:33:41' (UTC)
dt = datetime.fromtimestamp(unixtime).replace(tzinfo=timezone.utc).astimezone(tz=JST)
timestamp = dt.isoformat(timespec='milliseconds') # 2022-06-16T09:33:41.000+09:00
row = {}
row['propertyAlias'] = property_alias
row['value'] = value
row['timestamp'] = timestamp
row['quality'] = quality
logger.debug("row: {}".format(row))
transformed.append(row)
logger.info("transformed: {}\n".format(json.dumps(transformed, indent=2)))
return transformed # return は[]リストで返す
LambdaPermission:
Type: AWS::Lambda::Permission
Condition: CreateLambdaResources
Properties:
Action: lambda:InvokeFunction
FunctionName: !GetAtt LambdaFunctionIoTAnalyticsActivity.Arn
Principal: iotanalytics.amazonaws.com
###############################################################
# Custom Resources
###############################################################
# ------------------------------------------------------------#
# IAM Policy for CloudFormation custom resource Lambda
# ------------------------------------------------------------#
PolicyForCustomeResourceDeleteS3Object:
Type: AWS::IAM::ManagedPolicy
Properties:
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- s3:DeleteObject
- s3:List*
Resource:
- !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel}
- !Sub arn:aws:s3:::${BucketForIoTAnalyticsChannel}/*
- !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore}
- !Sub arn:aws:s3:::${BucketForIoTAnalyticsDatastore}/*
- !Sub arn:aws:s3:::${BucketForIoTAnalyticsDataset}
- !Sub arn:aws:s3:::${BucketForIoTAnalyticsDataset}/*
# ------------------------------------------------------------#
# Custom Resources
# ------------------------------------------------------------#
# カスタムリソース追加
# Delete Bucket for IoT Analytics Channel
LambdaUsedToDeleteBucketForIoTAnalyticsChannel:
Type: Custom::cleanupbucket
Properties:
ServiceToken: !GetAtt LambdaFunctionS3BucketDelete.Arn
BucketName: !Ref BucketForIoTAnalyticsChannel
# Delete Bucket for IoT Analytics Datastore
LambdaUsedToDeleteBucketForIoTAnalyticsDatastore:
Type: Custom::cleanupbucket
Properties:
ServiceToken: !GetAtt LambdaFunctionS3BucketDelete.Arn
BucketName: !Ref BucketForIoTAnalyticsDatastore
# Delete Bucket for IoT Analytics Dataset
LambdaUsedToDeleteBucketForIoTAnalyticsDataset:
Type: Custom::cleanupbucket
Properties:
ServiceToken: !GetAtt LambdaFunctionS3BucketDelete.Arn
BucketName: !Ref BucketForIoTAnalyticsDataset
# ------------------------------------------------------------#
# Custom resource Lambda
# ------------------------------------------------------------#
# Lambda for S3 bucket delete custom resource
# Lambda Role
LambdaExecutionRoleCustomResource:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- "lambda.amazonaws.com"
Action:
- "sts:AssumeRole"
Path: "/"
ManagedPolicyArns:
- !Sub 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
- !Ref PolicyForCustomeResourceDeleteS3Object
# Lambda Function
LambdaFunctionS3BucketDelete:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub ${NamePrefix}-delete-s3
Handler: index.lambda_handler
Role: !GetAtt LambdaExecutionRoleCustomResource.Arn
Runtime: python3.9
Timeout: 900
Code:
ZipFile: |
import json
import boto3
import cfnresponse
def lambda_handler(event, context):
try:
bucket = event['ResourceProperties']['BucketName']
if event['RequestType'] == 'Delete':
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket)
for obj in bucket.objects.filter():
s3.Object(bucket.name, obj.key).delete()
#bucket.object_versions.delete() # 必要に応じて
cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Response': 'Success'})
except Exception as e:
print(e)
cfnresponse.send(event, context, cfnresponse.FAILED, {'Response': 'Failed'})
# ------------------------------------------------------------#
# AWS IoT Analytics
# ------------------------------------------------------------#
# IoT Analytics Channel
IoTAnalyticsChannel:
Type: AWS::IoTAnalytics::Channel
Properties:
ChannelName: !Sub ${NamePrefix}_channel
ChannelStorage:
CustomerManagedS3:
Bucket: !Ref BucketForIoTAnalyticsChannel
RoleArn: !GetAtt RoleForIoTAnalyticsChannelToBucket.Arn
DependsOn: BucketForIoTAnalyticsChannel
# IoT Analytics Datastore
IoTAnalyticsDatastore:
Type: AWS::IoTAnalytics::Datastore
Properties:
DatastoreName: !Sub ${NamePrefix}_datastore
DatastoreStorage:
CustomerManagedS3:
Bucket: !Ref BucketForIoTAnalyticsDatastore
RoleArn: !GetAtt RoleForIoTAnalyticsDatastoreToBucket.Arn
DependsOn: BucketForIoTAnalyticsDatastore
# IoT Analytics Pipeline (No Lambda Activity)
IoTAnalyticsPipelineLambdaFalse:
Type: AWS::IoTAnalytics::Pipeline
Condition: SkipLambdaResources
Properties:
PipelineName: !Sub ${NamePrefix}_pipeline
PipelineActivities:
- Channel:
# データの取得元を指定する
Name: pipeline-channel-activity
ChannelName: !Sub ${NamePrefix}_channel
Next: pipeline-datastore-activity
Datastore:
# データの保存先指定する
Name: pipeline-datastore-activity
DatastoreName: !Sub ${NamePrefix}_datastore
# IoT Analytics Pipeline (with Lambda Activity)
IoTAnalyticsPipelineLambdaTrue:
Type: AWS::IoTAnalytics::Pipeline
Condition: CreateLambdaResources
Properties:
PipelineName: !Sub ${NamePrefix}_pipeline
PipelineActivities:
- Channel:
# データの取得元を指定する
Name: pipeline-channel-activity
ChannelName: !Sub ${NamePrefix}_channel
Next: pipeline-lambda-activity
Lambda:
# LambdaでTQVデータの全てのキーを同じレベルでフラット化したJSONに変換
Name: pipeline-lambda-activity
BatchSize: 1
LambdaName: !Sub ${NamePrefix}-pipeline-lambda-function
Next: pipeline-datastore-activity
Datastore:
# データの保存先指定する
Name: pipeline-datastore-activity
DatastoreName: !Sub ${NamePrefix}_datastore
IoTAnalyticsDataset:
Type: AWS::IoTAnalytics::Dataset
Properties:
DatasetName: !Sub ${NamePrefix}_dataset
Actions:
- ActionName: SqlAction
QueryAction:
# 当日分のデータのみ取得する
#SqlQuery: !Sub "SELECT propertyAlias, value, timestamp, quality FROM ${NamePrefix}_datastore"
SqlQuery: !If [ CreateLambdaResources, !Sub "SELECT propertyAlias, value, timestamp, quality FROM ${NamePrefix}_datastore", !Sub "SELECT * FROM ${NamePrefix}_datastore" ]
RetentionPeriod:
# データセットの保持日数
NumberOfDays: 1
Unlimited: false
Triggers:
# データセットを自動更新する
- Schedule:
ScheduleExpression: rate(5 minute)
# データセットをユーザー管理のS3に保存
ContentDeliveryRules:
- Destination:
S3DestinationConfiguration:
Bucket: !Ref BucketForIoTAnalyticsDataset
Key: '!{iotanalytics:scheduleTime}/!{iotanalytics:versionId}.csv'
RoleArn: !GetAtt RoleForIoTAnalyticsDatasetToBucket.Arn
#EntryName: String
DependsOn: IoTAnalyticsDatastore
最後に
シングルステップセットアップで IoT Analytics 環境を簡単に作成することはできますが「データの中身を直接見る」ということができなかったので、これでいつでも欲しい環境がサクッと準備できるようになりました。
IoT Analytics を利用される際に参考にしていただければ幸いです。
以上です。