[後編] Raspberry Piで取得したセンサーのデータをAWS IoTからKinesis Data Firehoseに流してS3に保存する
こちらの記事では、Raspberry PiからAWS IoTに送信したセンサーのデータをメッセージのルーティングを使ってKinesis Data Firehoseにデータを流し、その後にS3に保存する方法を紹介したいと思います。
今回は後編となります。
前編はこちらです。⇣
前編では主にRaspberry Pi側のセンサーデータ取得とMQTT送信用のコードの解説を行いました。
後編では、CDKによるAWSインフラ(AWS IoT や Kinesis data firehoseなど)の構築方法と、動作確認の結果について紹介したいと思います。
システム構成
今回構築したシステム構成の概要図は以下のようになります。
使用するセンサーはBME280というBOSH社製の非常にコンパクトなものです。このセンサーは温度、湿度、気圧を取得できます。
実行環境
項目名 | バージョン |
---|---|
mac OS | Ventura 13.2 |
AWS CDK | 2.81.0 |
AWS IoT Device SDK for Python | 1.4.9 |
Raspberry Piのモデル
$ cat /proc/cpuinfo | grep Model Model : Raspberry Pi 3 Model B Plus Rev 1.3
クライアント証明書作成のための準備
今回、クライアント側の秘密鍵を利用してAWS IoTでクライアント証明書を作成します。 AWSで秘密鍵の作成まで行うこともできますが、その場合はAWSから秘密鍵をデバイスにダウンロードしてくる必要があります。 要件として秘密鍵がインターネットを通る事がNGである場合は、このブログで紹介する方法で秘密鍵をクライアント側で作成する方法を採用できます。
デバイス上で秘密鍵の作成
Raspberry Pi上でopensslを使用して秘密鍵を作成します。 作成した秘密鍵はRaspberry Pi上の適当なディレクトリ内に保存しておきます。
openssl genrsa -out privatekey.pem 2048
CSR(Certificate Signing Request)の作成
AWS IoTのCAでクライアント証明書の作成をリクエストするためにCSRを作成します。 以下のコマンドで作成できます。
openssl req -new -subj "/C=JP/ST=Hokkaido/L=Sapporo/O=MyCompany/CN=AWS IoT Certificate" -key privatekey.pem -out deviceCert.csr
作成したCSRはAWSのデプロイの時に使用するのでCDKでAWSリソースをデプロイするローカルマシンに保存しておきます。
秘密鍵や証明書の作成方法、独自CAによる証明書の作成方法について
AWS IoTでは色々な方法でクライアント証明書の作成ができます。 それらをまとめて説明してくれている動画をAWSが公開していますので参考にしてください。 それぞれの方法メリット・デメリットや、CSRを使用しない場合のTLS相互認証についても詳しく説明されてますので、とても参考になると思います。
CDKのコードの紹介
CDKによるAWSインフラの構築コードを紹介します。
AWS IoT関連とKinesis Data FirehoseやS3のスタックを分けておりまして、
AWS IoT周りのスタック
こちらはAWS IoT関連のリソースのスタックを記述したコードになります。
import {Stack,StackProps,aws_iot,aws_logs,RemovalPolicy}from 'aws-cdk-lib'; import { Construct } from 'constructs'; import * as fs from "fs"; import * as path from "path"; import * as iot from '@aws-cdk/aws-iot-alpha'; import * as actions from '@aws-cdk/aws-iot-actions-alpha'; import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha'; interface DirectputDataFirehoseFromIotcoreStackProps extends StackProps { projectName: string; deliveryStream: firehose_alpha.DeliveryStream; } export class DirectputDataFirehoseFromIotcoreStack extends Stack { constructor(scope: Construct, id: string, props: DirectputDataFirehoseFromIotcoreStackProps) { super(scope, id, props); // モノの作成 const iotThingName = `${props.projectName}-thing`; const iotThing = new aws_iot.CfnThing(this, iotThingName, { thingName: iotThingName, }); // AWS IoT のポリシーの作成 const topicEnvironmentDataSendToCW = 'send/data/firehose/'; const iotPolicy = new aws_iot.CfnPolicy(this, 'Policy', { policyDocument: { Version: '2012-10-17', Statement: [ { Effect: 'Allow', Action: 'iot:Connect', //client/より後ろに記載する部分がclientIdとなる。これがraspiからのMQTT Publishに使われる Resource: `arn:aws:iot:${this.region}:${this.account}:client/${props.projectName}`, }, { Effect: 'Allow', Action: ['iot:Publish', 'iot:Receive'], Resource: [ `arn:aws:iot:${this.region}:${this.account}:topic/${topicEnvironmentDataSendToCW}`, ], }, { Effect: 'Allow', Action: 'iot:Subscribe', Resource: [ `arn:aws:iot:${this.region}:${this.account}:topic/${topicEnvironmentDataSendToCW}`, ], }, ], }, policyName: 'raspi3-environment-data-publish', }); // クライアント証明書の作成 const cfnCertificate = new aws_iot.CfnCertificate( this, `${props.projectName}-device-cert`, { status: 'ACTIVE', certificateSigningRequest: fs.readFileSync( path.resolve('certs/deviceCert.csr'), 'utf-8', ), }, ); // IoTポリシーをクライアント証明書へ紐づけ new aws_iot.CfnPolicyPrincipalAttachment( this, 'PolicyPrincipalAttachment', { policyName: iotPolicy.ref, principal: cfnCertificate.attrArn, // クライアント証明書のARN }, ); // クライアント証明書をモノへ紐づけ new aws_iot.CfnThingPrincipalAttachment(this, 'ThingPrincipalAttachment', { thingName: iotThing.ref, principal: cfnCertificate.attrArn, // クライアント証明書のARN }); // エラーログ用のロググループの作成 const errorLogGroup = new aws_logs.LogGroup( this, 'error-log-group', { logGroupName: `${props.projectName}-error-log-group`, removalPolicy: RemovalPolicy.DESTROY, retention: 30, }, ); // メッセージルーティングのルールの作成 const ruleNameSendEnvironmentData = 'environment_data_to_Cloud_Watch'; new iot.TopicRule(this, ruleNameSendEnvironmentData, { topicRuleName: ruleNameSendEnvironmentData, sql: iot.IotSql.fromStringAsVer20160323( `SELECT * FROM '${topicEnvironmentDataSendToCW}'`, ), actions: [ new actions.FirehosePutRecordAction(props.deliveryStream) ], errorAction: new actions.CloudWatchLogsAction(errorLogGroup), }); } }
コード解説
いくつか抜粋して説明します。
AWS IoT のポリシーの作成
AWS IoTのポリシーは以下の記述で作成可能です。MQTT通信でデータを受信するために必要な最小限のポリシーを付与してます。
const topicEnvironmentDataSendToCW = 'send/data/firehose/'; const iotPolicy = new aws_iot.CfnPolicy(this, 'Policy', { policyDocument: { Version: '2012-10-17', Statement: [ { Effect: 'Allow', Action: 'iot:Connect', Resource: `arn:aws:iot:${this.region}:${this.account}:client/${props.projectName}`, }, { Effect: 'Allow', Action: ['iot:Publish', 'iot:Receive', 'iot:Subscribe'], Resource: [ `arn:aws:iot:${this.region}:${this.account}:topic/${topicEnvironmentDataSendToCW}`, ], }, ], }, policyName: 'raspi3-environment-data-publish', });
iot:ConnectのResourceについて
iot:ConnectのResourceには、arn:aws:iot:${this.region}:${this.account}:client/${props.projectName}
を指定しています。
client/の後に記述した値がclientId
となります。
今回の場合、projectNameというcontextを使用しています。
値はdirectput-data-firehose-from-iotcore
で、これが今回のclientIdの値となります。
contextの指定は、cdk.jsonに以下のように記述しています。
"context": { "projectName": "directput-data-firehose-from-iotcore",
前編で紹介したセンサデータをAWS IoTに送付するコードでも、MQTTでデータをPublishする際のパラメーターとして使用します。
iot:Publish, iot:Receive, iot:SubscribeのResourceについて
題目のポリシーについてはリソースにMQTTトピックを指定します。今回はsend/data/firehose/
としてます。
トピックはMQTTで送信したメッセージの宛先を指定するために使用します。
トピック宛に送信されてきたメッセージをメッセージブローカーがメッセージのルーティングによって指定されたサービスに渡します。
こちらも前編で紹介したコードのMQTTPublishの際のパラメーターとして使用します。
クライアント証明書の作成
以下はCDKデプロイのタイミングでAWS IoTでクライアント証明書作るための記述となります。
// クライアント証明書の作成 const cfnCertificate = new aws_iot.CfnCertificate( this, `${props.projectName}-device-cert`, { status: 'ACTIVE', certificateSigningRequest: fs.readFileSync( path.resolve('certs/deviceCert.csr'), 'utf-8', ), }, );
status
プロパティでACTIVEを指定することで、有効な証明書として作成されます。
certificateSigningRequest
プロパティには、先程作成したCSRファイルのパスを指定して読み込ませています。
エラーログ用のロググループの作成
メッセージがルールにマッチせずにルーティングできなかった時にエラーとしてログを出力するためのロググループを作成します。
const errorLogGroup = new logs.LogGroup(this, 'errorLogGroup', { logGroupName: `${props.projectName}-error-log`, retention: 30, });
メッセージルーティングのルールの作成
以下の記述でトピック``にさ送信されるメッセージのルーティングルールを作成します。
const ruleNameSendEnvironmentData = 'environment_data_to_Cloud_Watch'; new iot.TopicRule(this, ruleNameSendEnvironmentData, { topicRuleName: ruleNameSendEnvironmentData, sql: iot.IotSql.fromStringAsVer20160323( `SELECT * FROM '${topicEnvironmentDataSendToCW}'`, ), actions: [ new actions.FirehosePutRecordAction(props.deliveryStream) ], errorAction: new actions.CloudWatchLogsAction(errorLogGroup), });
- sql
sql
プロパティでsqlライクな記述でトピックに送信されるメッセージの内どれをルールにマッチさせるかを指定します。今回はすべてのメッセージをルーティング対象にしたかったので、SELECT句の指定は*
としています。
- actions
- actionsプロパティには、ルーティングしたメッセージをどのようなアクションをとるかを指定します。今回はKinesis Data Firehoseにメッセージを流し込むために、
FirehosePutRecordAction
を指定しています。
- actionsプロパティには、ルーティングしたメッセージをどのようなアクションをとるかを指定します。今回はKinesis Data Firehoseにメッセージを流し込むために、
- errorAction
- errorActionプロパティには、ルールにマッチしなかったメッセージをどのようなアクションをとるかを指定します。今回はエラーログ用のロググループにメッセージを出力するために、
CloudWatchLogsAction
を指定しています。先程作成したエラーログ用のロググループを指定しています。
- errorActionプロパティには、ルールにマッチしなかったメッセージをどのようなアクションをとるかを指定します。今回はエラーログ用のロググループにメッセージを出力するために、
Kinesis Data FirehoseとS3のスタック
Kinesis Data FirehoseとS3のスタックは以下のコードとなります
import { Stack, StackProps, RemovalPolicy, aws_s3, aws_kinesisfirehose, Duration, Size } from 'aws-cdk-lib'; import { Construct } from 'constructs'; import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha'; import * as firehose_destination_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha'; interface DirectputDataFirehoseFromIotcoreKinesisS3StackProps extends StackProps { projectName: string; } export class DirectputDataFirehoseFromIotcoreKinesisS3Stack extends Stack { public readonly deliveryStream: firehose_alpha.DeliveryStream; constructor( scope: Construct, id: string, props: DirectputDataFirehoseFromIotcoreKinesisS3StackProps, ) { super(scope, id, props); // センサーデータ格納用のS3バケットの作成 const iotDataBucket = new aws_s3.Bucket( this, 'Dynamic partitioning data bucket', { bucketName: `${props.projectName}-${Stack.of(this).account}`, removalPolicy: RemovalPolicy.DESTROY, encryption: aws_s3.BucketEncryption.S3_MANAGED, //保存データの暗号化 blockPublicAccess: aws_s3.BlockPublicAccess.BLOCK_ALL, //これを有効化しておかないとSecurityHubのチェックに引っかかる(重要度:CRITICAL) accessControl: aws_s3.BucketAccessControl.PRIVATE, autoDeleteObjects: true, }, ); //配信ストリームの作成 const deliverySteam = new firehose_alpha.DeliveryStream(this, 'Delivery Stream', { deliveryStreamName: `${props.projectName}-delivery-stream`, destinations: [ new firehose_destination_alpha.S3Bucket(iotDataBucket, { dataOutputPrefix: 'environment_data/!{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/!{partitionKeyFromQuery:hour}/', errorOutputPrefix: 'error/', bufferingInterval: Duration.seconds(60), //Min:60 ~ Max:900 bufferingSize: Size.mebibytes(64), //動的パーティショニングを有効にする場合は64MB以上が必要 }), ], }); //L1コンストラクト(CfnDeliveryStream)のプロパティをL2のコンストラクト(DeliveryStream)に上書き const cfnDeliverySteam = deliverySteam.node.defaultChild as aws_kinesisfirehose.CfnDeliveryStream; cfnDeliverySteam.addPropertyOverride( 'ExtendedS3DestinationConfiguration.DynamicPartitioningConfiguration', { Enabled: true, } ); cfnDeliverySteam.addPropertyOverride( 'ExtendedS3DestinationConfiguration.ProcessingConfiguration', { Enabled: true, Processors: [ { Type: 'MetadataExtraction', Parameters: [ { ParameterName: 'MetadataExtractionQuery', ParameterValue: '{year: .timestamp[:4],month: .timestamp[4:6],day: .timestamp[6:8],hour: .timestamp[8:10]}', }, { ParameterName: 'JsonParsingEngine', ParameterValue: 'JQ-1.6', }, ], }, { Type: 'AppendDelimiterToRecord', Parameters: [ { ParameterName: 'Delimiter', ParameterValue: '\\n', }, ], }, ], }, ); this.deliveryStream= deliverySteam } }
コードの解説
Kinesis Data Firehoseの配信ストリームの作成
センサーのデータをストリームデータとしてKinesis Data Firehoseに流し込むために、配信ストリームを作成します。
配信ストリームの作成には、L2コンストラクタのDeliveryStream
を使用します。
L2コンストラクタはまだalphaモジュールであるため、aws_kinesisfirehose
とは別に@aws-cdk/aws-kinesisfirehose-alpha
をインポートして使用します。
const deliverySteam = new firehose_alpha.DeliveryStream(this, 'Delivery Stream', { deliveryStreamName: `${props.projectName}-delivery-stream`, destinations: [ new firehose_destination_alpha.S3Bucket(iotDataBucket, { dataOutputPrefix: 'environment_data/!{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/!{partitionKeyFromQuery:hour}/', errorOutputPrefix: 'error/', bufferingInterval: Duration.seconds(60), //Min:60 ~ Max:900 bufferingSize: Size.mebibytes(64), //動的パーティショニングを有効にする場合は64MB以上が必要 }), ], });
- dataOutputPrefix
- オブジェクトキーを指定するプロパティとなります。キープレフィックスは年月日と時間を組み合わせて
environment_data/YYYY/MM/DD/HH
としました。指定には!{partitionKeyFromQuery:year}
といった記法で記述しています。こちらについては以下のドキュメントを参考にしています。
- オブジェクトキーを指定するプロパティとなります。キープレフィックスは年月日と時間を組み合わせて
- 動的パーティショニングの Amazon S3 バケットプレフィックス
-
errorOutputPrefix
- エラーログのデータに付けるプレフィックスを指定します。こちらは
error/
としています。
- エラーログのデータに付けるプレフィックスを指定します。こちらは
- bufferingInterval
- 何秒間分のデータをまとめて送信するか、秒数を指定します。60秒から900秒までの間で指定可能です。
- bufferingSize
- データをバッファする際のサイズを指定します。動的パーティショニングを有効にする場合は64MB以上が必要です。
動的パーティショニングの有効化
センサーデータと一緒にRaspberry Piから送信するtimestampの値を使用してパーティションを作成します。
L2コンストラクタのDeliveryStream
は、動的パーティショニングを有効にするためのプロパティを持っていません。
そのため、CfnDeliveryStream
のプロパティをaddPropertyOverride()
で上書きします。
書き方は以下の公式ドキュメント参考にしています。
const cfnDeliverySteam = deliverySteam.node.defaultChild as aws_kinesisfirehose.CfnDeliveryStream; cfnDeliverySteam.addPropertyOverride( 'ExtendedS3DestinationConfiguration.DynamicPartitioningConfiguration', { Enabled: true, } ); cfnDeliverySteam.addPropertyOverride( 'ExtendedS3DestinationConfiguration.ProcessingConfiguration', { Enabled: true, Processors: [ { Type: 'MetadataExtraction', Parameters: [ { ParameterName: 'MetadataExtractionQuery', ParameterValue: '{year: .timestamp[:4],month: .timestamp[4:6],day: .timestamp[6:8],hour: .timestamp[8:10]}', }, { ParameterName: 'JsonParsingEngine', ParameterValue: 'JQ-1.6', }, ], }, { Type: 'AppendDelimiterToRecord', Parameters: [ { ParameterName: 'Delimiter', ParameterValue: '\\n', }, ], }, ], }, ); this.deliveryStream= deliverySteam
- 上書きするプロパティはExtendedS3DestinationConfigurationが持つ以下の2つです。※プロパティ名の指定際に頭文字を大文字にするのを忘れないようにお願いします。
- DynamicPartitioningConfiguration
- ProcessingConfiguration
- DynamicPartitioningConfiguration
- 動的パーティショニングを有効にするためのプロパティです。
Enabled: true
としています。
- 動的パーティショニングを有効にするためのプロパティです。
- ProcessingConfiguration
- パーティショニングに使うデータの加工を行うためのプロパティです。
Enabled: true
としています。 - こちらの書き方に付いては以前動的パーティショニングについて書いた以下のブログで解説しております。パーティションの設定も全く同じになりますので、こちらでは割愛します。
- パーティショニングに使うデータの加工を行うためのプロパティです。
- Kinesis Data FirehoseからS3に送信するオブジェクトに付くプレフィックスをJSTにするため動的パーティショニングを使ってみた:パーティション用のデータの抽出
動作確認
データの送信
Raspberry Piからデータを送信します。
送信するデータは以下のようなJSON形式のデータとなります。
{ "temperature": 26.26, "pressure": 1008.44, "humidity": 60.97, "timestamp": 20230530103003, }
AWS IoTで受信したデータの確認
S3バケット内のデータを確認します。
以下の通り、年月日と時間のフォルダが作成され、その中にデータが格納されていることが確認できます。
データをダウンロードしてみます。
ダウンロードしたデータは、bufferingIntervalプロパティで指定した60秒間に取得した複数のJSONデータが、行で分かれて一つファイルとなっています。
これは想定通りのデータ構造です。
{"temperature": 26.13, "pressure": 1006.24, "humidity": 60.58, "timestamp": "20230530180002"} {"temperature": 26.11, "pressure": 1006.18, "humidity": 60.14, "timestamp": "20230530180012"} {"temperature": 26.09, "pressure": 1006.15, "humidity": 60.13, "timestamp": "20230530180022"} {"temperature": 26.09, "pressure": 1006.26, "humidity": 60.11, "timestamp": "20230530180032"} {"temperature": 26.1, "pressure": 1006.22, "humidity": 59.97, "timestamp": "20230530180042"} {"temperature": 26.1, "pressure": 1006.22, "humidity": 59.75, "timestamp": "20230530180052"} {"temperature": 26.09, "pressure": 1006.2, "humidity": 59.66, "timestamp": "20230530180102"}
以上、想定どおり動作し問題がないことが確認できました。