[後編] Raspberry Piで取得したセンサーのデータをAWS IoTからKinesis Data Firehoseに流してS3に保存する

2023.05.30

こちらの記事では、Raspberry PiからAWS IoTに送信したセンサーのデータをメッセージのルーティングを使ってKinesis Data Firehoseにデータを流し、その後にS3に保存する方法を紹介したいと思います。
今回は後編となります。
前編はこちらです。⇣

前編では主にRaspberry Pi側のセンサーデータ取得とMQTT送信用のコードの解説を行いました。

後編では、CDKによるAWSインフラ(AWS IoT や Kinesis data firehoseなど)の構築方法と、動作確認の結果について紹介したいと思います。

システム構成

今回構築したシステム構成の概要図は以下のようになります。


使用するセンサーはBME280というBOSH社製の非常にコンパクトなものです。このセンサーは温度、湿度、気圧を取得できます。

実行環境

項目名 バージョン
mac OS Ventura 13.2
AWS CDK 2.81.0
AWS IoT Device SDK for Python 1.4.9

Raspberry Piのモデル

$ cat /proc/cpuinfo | grep Model
Model  : Raspberry Pi 3 Model B Plus Rev 1.3

クライアント証明書作成のための準備

今回、クライアント側の秘密鍵を利用してAWS IoTでクライアント証明書を作成します。 AWSで秘密鍵の作成まで行うこともできますが、その場合はAWSから秘密鍵をデバイスにダウンロードしてくる必要があります。 要件として秘密鍵がインターネットを通る事がNGである場合は、このブログで紹介する方法で秘密鍵をクライアント側で作成する方法を採用できます。

デバイス上で秘密鍵の作成

Raspberry Pi上でopensslを使用して秘密鍵を作成します。 作成した秘密鍵はRaspberry Pi上の適当なディレクトリ内に保存しておきます。

openssl genrsa -out privatekey.pem 2048

CSR(Certificate Signing Request)の作成

AWS IoTのCAでクライアント証明書の作成をリクエストするためにCSRを作成します。 以下のコマンドで作成できます。

openssl req -new -subj "/C=JP/ST=Hokkaido/L=Sapporo/O=MyCompany/CN=AWS IoT Certificate" -key privatekey.pem -out deviceCert.csr

作成したCSRはAWSのデプロイの時に使用するのでCDKでAWSリソースをデプロイするローカルマシンに保存しておきます。

秘密鍵や証明書の作成方法、独自CAによる証明書の作成方法について

AWS IoTでは色々な方法でクライアント証明書の作成ができます。 それらをまとめて説明してくれている動画をAWSが公開していますので参考にしてください。 それぞれの方法メリット・デメリットや、CSRを使用しない場合のTLS相互認証についても詳しく説明されてますので、とても参考になると思います。

CDKのコードの紹介

CDKによるAWSインフラの構築コードを紹介します。
AWS IoT関連とKinesis Data FirehoseやS3のスタックを分けておりまして、

AWS IoT周りのスタック

こちらはAWS IoT関連のリソースのスタックを記述したコードになります。

lib/directput-data-firehose-from-iotcore-stack.ts

import {Stack,StackProps,aws_iot,aws_logs,RemovalPolicy}from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as fs from "fs";
import * as path from "path";
import * as iot from '@aws-cdk/aws-iot-alpha';
import * as actions from '@aws-cdk/aws-iot-actions-alpha';
import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';

interface DirectputDataFirehoseFromIotcoreStackProps extends StackProps {
  projectName: string;
  deliveryStream: firehose_alpha.DeliveryStream;
}

export class DirectputDataFirehoseFromIotcoreStack extends Stack {
  constructor(scope: Construct, id: string, props: DirectputDataFirehoseFromIotcoreStackProps) {
    super(scope, id, props);

    // モノの作成
    const iotThingName = `${props.projectName}-thing`;
    const iotThing = new aws_iot.CfnThing(this, iotThingName, {
      thingName: iotThingName,
    });

    // AWS IoT のポリシーの作成
    const topicEnvironmentDataSendToCW = 'send/data/firehose/';
    const iotPolicy = new aws_iot.CfnPolicy(this, 'Policy', {
      policyDocument: {
        Version: '2012-10-17',
        Statement: [
          {
            Effect: 'Allow',
            Action: 'iot:Connect',
            //client/より後ろに記載する部分がclientIdとなる。これがraspiからのMQTT Publishに使われる
            Resource: `arn:aws:iot:${this.region}:${this.account}:client/${props.projectName}`,
          },
          {
            Effect: 'Allow',
            Action: ['iot:Publish', 'iot:Receive'],
            Resource: [
              `arn:aws:iot:${this.region}:${this.account}:topic/${topicEnvironmentDataSendToCW}`,
            ],
          },
          {
            Effect: 'Allow',
            Action: 'iot:Subscribe',
            Resource: [
              `arn:aws:iot:${this.region}:${this.account}:topic/${topicEnvironmentDataSendToCW}`,
            ],
          },
        ],
      },
      policyName: 'raspi3-environment-data-publish',
    });

    // クライアント証明書の作成
    const cfnCertificate = new aws_iot.CfnCertificate(
      this,
      `${props.projectName}-device-cert`,
      {
        status: 'ACTIVE',
        certificateSigningRequest: fs.readFileSync(
          path.resolve('certs/deviceCert.csr'),
          'utf-8',
        ),
      },
    );

    // IoTポリシーをクライアント証明書へ紐づけ
    new aws_iot.CfnPolicyPrincipalAttachment(
      this,
      'PolicyPrincipalAttachment',
      {
        policyName: iotPolicy.ref,
        principal: cfnCertificate.attrArn, // クライアント証明書のARN
      },
    );

    // クライアント証明書をモノへ紐づけ
    new aws_iot.CfnThingPrincipalAttachment(this, 'ThingPrincipalAttachment', {
      thingName: iotThing.ref,
      principal: cfnCertificate.attrArn, // クライアント証明書のARN
    });

    // エラーログ用のロググループの作成
    const errorLogGroup = new aws_logs.LogGroup(
      this,
      'error-log-group',
      {
        logGroupName: `${props.projectName}-error-log-group`,
        removalPolicy: RemovalPolicy.DESTROY,
        retention: 30,
      },
    );

    // メッセージルーティングのルールの作成
    const ruleNameSendEnvironmentData = 'environment_data_to_Cloud_Watch';
    new iot.TopicRule(this, ruleNameSendEnvironmentData, {
      topicRuleName: ruleNameSendEnvironmentData,
      sql: iot.IotSql.fromStringAsVer20160323(
        `SELECT * FROM '${topicEnvironmentDataSendToCW}'`,
      ),
      actions: [
        new actions.FirehosePutRecordAction(props.deliveryStream)
      ],
      errorAction: new actions.CloudWatchLogsAction(errorLogGroup),
    });
  }
}

コード解説

いくつか抜粋して説明します。

AWS IoT のポリシーの作成

AWS IoTのポリシーは以下の記述で作成可能です。MQTT通信でデータを受信するために必要な最小限のポリシーを付与してます。

    const topicEnvironmentDataSendToCW = 'send/data/firehose/';
    const iotPolicy = new aws_iot.CfnPolicy(this, 'Policy', {
      policyDocument: {
        Version: '2012-10-17',
        Statement: [
          {
            Effect: 'Allow',
            Action: 'iot:Connect',
            Resource: `arn:aws:iot:${this.region}:${this.account}:client/${props.projectName}`,
          },
          {
            Effect: 'Allow',
            Action: ['iot:Publish', 'iot:Receive', 'iot:Subscribe'],
            Resource: [
              `arn:aws:iot:${this.region}:${this.account}:topic/${topicEnvironmentDataSendToCW}`,
            ],
          },
        ],
      },
      policyName: 'raspi3-environment-data-publish',
    });

iot:ConnectのResourceについて

iot:ConnectのResourceには、arn:aws:iot:${this.region}:${this.account}:client/${props.projectName}を指定しています。
client/の後に記述した値がclientIdとなります。
今回の場合、projectNameというcontextを使用しています。 値はdirectput-data-firehose-from-iotcoreで、これが今回のclientIdの値となります。 contextの指定は、cdk.jsonに以下のように記述しています。

cdk.json

  "context": {
    "projectName": "directput-data-firehose-from-iotcore",

前編で紹介したセンサデータをAWS IoTに送付するコードでも、MQTTでデータをPublishする際のパラメーターとして使用します。

iot:Publish, iot:Receive, iot:SubscribeのResourceについて

題目のポリシーについてはリソースにMQTTトピックを指定します。今回はsend/data/firehose/としてます。

トピックはMQTTで送信したメッセージの宛先を指定するために使用します。
トピック宛に送信されてきたメッセージをメッセージブローカーがメッセージのルーティングによって指定されたサービスに渡します。
こちらも前編で紹介したコードのMQTTPublishの際のパラメーターとして使用します。

クライアント証明書の作成

以下はCDKデプロイのタイミングでAWS IoTでクライアント証明書作るための記述となります。

    // クライアント証明書の作成
    const cfnCertificate = new aws_iot.CfnCertificate(
      this,
      `${props.projectName}-device-cert`,
      {
        status: 'ACTIVE',
        certificateSigningRequest: fs.readFileSync(
          path.resolve('certs/deviceCert.csr'),
          'utf-8',
        ),
      },
    );

statusプロパティでACTIVEを指定することで、有効な証明書として作成されます。
certificateSigningRequestプロパティには、先程作成したCSRファイルのパスを指定して読み込ませています。

エラーログ用のロググループの作成

メッセージがルールにマッチせずにルーティングできなかった時にエラーとしてログを出力するためのロググループを作成します。

    const errorLogGroup = new logs.LogGroup(this, 'errorLogGroup', {
      logGroupName: `${props.projectName}-error-log`,
      retention: 30,
    });

メッセージルーティングのルールの作成

以下の記述でトピック``にさ送信されるメッセージのルーティングルールを作成します。

    const ruleNameSendEnvironmentData = 'environment_data_to_Cloud_Watch';
    new iot.TopicRule(this, ruleNameSendEnvironmentData, {
      topicRuleName: ruleNameSendEnvironmentData,
      sql: iot.IotSql.fromStringAsVer20160323(
        `SELECT * FROM '${topicEnvironmentDataSendToCW}'`,
      ),
      actions: [
        new actions.FirehosePutRecordAction(props.deliveryStream)
      ],
      errorAction: new actions.CloudWatchLogsAction(errorLogGroup),
    });
  • sql
    • sqlプロパティでsqlライクな記述でトピックに送信されるメッセージの内どれをルールにマッチさせるかを指定します。今回はすべてのメッセージをルーティング対象にしたかったので、SELECT句の指定は*としています。
  • actions
    • actionsプロパティには、ルーティングしたメッセージをどのようなアクションをとるかを指定します。今回はKinesis Data Firehoseにメッセージを流し込むために、FirehosePutRecordActionを指定しています。
  • errorAction
    • errorActionプロパティには、ルールにマッチしなかったメッセージをどのようなアクションをとるかを指定します。今回はエラーログ用のロググループにメッセージを出力するために、CloudWatchLogsActionを指定しています。先程作成したエラーログ用のロググループを指定しています。

Kinesis Data FirehoseとS3のスタック

Kinesis Data FirehoseとS3のスタックは以下のコードとなります

lib/directput-data-firehose-from-iotcore-stack.ts

import {
  Stack,
  StackProps,
  RemovalPolicy,
  aws_s3,
  aws_kinesisfirehose,
  Duration,
  Size
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha';
import * as firehose_destination_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha';

interface DirectputDataFirehoseFromIotcoreKinesisS3StackProps extends StackProps {
  projectName: string;
}

export class DirectputDataFirehoseFromIotcoreKinesisS3Stack extends Stack {
  public readonly deliveryStream: firehose_alpha.DeliveryStream;
  constructor(
    scope: Construct,
    id: string,
    props: DirectputDataFirehoseFromIotcoreKinesisS3StackProps,
    )
    {
      super(scope, id, props);

      // センサーデータ格納用のS3バケットの作成
      const iotDataBucket = new aws_s3.Bucket(
        this,
        'Dynamic partitioning data bucket',
        {
          bucketName: `${props.projectName}-${Stack.of(this).account}`,
          removalPolicy: RemovalPolicy.DESTROY,
          encryption: aws_s3.BucketEncryption.S3_MANAGED, //保存データの暗号化
          blockPublicAccess: aws_s3.BlockPublicAccess.BLOCK_ALL, //これを有効化しておかないとSecurityHubのチェックに引っかかる(重要度:CRITICAL)
          accessControl: aws_s3.BucketAccessControl.PRIVATE,
          autoDeleteObjects: true,
        },
      );

      //配信ストリームの作成
      const deliverySteam = new firehose_alpha.DeliveryStream(this, 'Delivery Stream', {
        deliveryStreamName: `${props.projectName}-delivery-stream`,
        destinations: [
          new firehose_destination_alpha.S3Bucket(iotDataBucket, {
            dataOutputPrefix: 'environment_data/!{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/!{partitionKeyFromQuery:hour}/',
            errorOutputPrefix: 'error/',
            bufferingInterval: Duration.seconds(60), //Min:60 ~ Max:900
            bufferingSize: Size.mebibytes(64), //動的パーティショニングを有効にする場合は64MB以上が必要
          }),
        ],
      });

      //L1コンストラクト(CfnDeliveryStream)のプロパティをL2のコンストラクト(DeliveryStream)に上書き
      const cfnDeliverySteam = deliverySteam.node.defaultChild as aws_kinesisfirehose.CfnDeliveryStream;
      cfnDeliverySteam.addPropertyOverride(
        'ExtendedS3DestinationConfiguration.DynamicPartitioningConfiguration',
        {
          Enabled: true,
        }
      );
      cfnDeliverySteam.addPropertyOverride(
        'ExtendedS3DestinationConfiguration.ProcessingConfiguration',
        {
          Enabled: true,
          Processors: [
            {
              Type: 'MetadataExtraction',
              Parameters: [
                {
                  ParameterName: 'MetadataExtractionQuery',
                  ParameterValue: '{year: .timestamp[:4],month: .timestamp[4:6],day: .timestamp[6:8],hour: .timestamp[8:10]}',
                },
                {
                  ParameterName: 'JsonParsingEngine',
                  ParameterValue: 'JQ-1.6',
                },
              ],
            },
            {
              Type: 'AppendDelimiterToRecord',
              Parameters: [
                {
                  ParameterName: 'Delimiter',
                  ParameterValue: '\\n',
                },
              ],
            },
          ],
        },
      );
      this.deliveryStream= deliverySteam
    }
}

コードの解説

Kinesis Data Firehoseの配信ストリームの作成

センサーのデータをストリームデータとしてKinesis Data Firehoseに流し込むために、配信ストリームを作成します。
配信ストリームの作成には、L2コンストラクタのDeliveryStreamを使用します。
L2コンストラクタはまだalphaモジュールであるため、aws_kinesisfirehoseとは別に@aws-cdk/aws-kinesisfirehose-alphaをインポートして使用します。

const deliverySteam = new firehose_alpha.DeliveryStream(this, 'Delivery Stream', {
  deliveryStreamName: `${props.projectName}-delivery-stream`,
  destinations: [
    new firehose_destination_alpha.S3Bucket(iotDataBucket, {
      dataOutputPrefix: 'environment_data/!{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/!{partitionKeyFromQuery:hour}/',
      errorOutputPrefix: 'error/',
      bufferingInterval: Duration.seconds(60), //Min:60 ~ Max:900
      bufferingSize: Size.mebibytes(64), //動的パーティショニングを有効にする場合は64MB以上が必要
    }),
  ],
});
  • dataOutputPrefix
    • オブジェクトキーを指定するプロパティとなります。キープレフィックスは年月日と時間を組み合わせてenvironment_data/YYYY/MM/DD/HHとしました。指定には!{partitionKeyFromQuery:year}といった記法で記述しています。こちらについては以下のドキュメントを参考にしています。
  • 動的パーティショニングの Amazon S3 バケットプレフィックス

  • errorOutputPrefix

    • エラーログのデータに付けるプレフィックスを指定します。こちらはerror/としています。
  • bufferingInterval
    • 何秒間分のデータをまとめて送信するか、秒数を指定します。60秒から900秒までの間で指定可能です。
  • bufferingSize
    • データをバッファする際のサイズを指定します。動的パーティショニングを有効にする場合は64MB以上が必要です。

動的パーティショニングの有効化

センサーデータと一緒にRaspberry Piから送信するtimestampの値を使用してパーティションを作成します。
L2コンストラクタのDeliveryStreamは、動的パーティショニングを有効にするためのプロパティを持っていません。
そのため、CfnDeliveryStreamのプロパティをaddPropertyOverride()で上書きします。
書き方は以下の公式ドキュメント参考にしています。

const cfnDeliverySteam = deliverySteam.node.defaultChild as aws_kinesisfirehose.CfnDeliveryStream;
cfnDeliverySteam.addPropertyOverride(
  'ExtendedS3DestinationConfiguration.DynamicPartitioningConfiguration',
  {
    Enabled: true,
  }
);
cfnDeliverySteam.addPropertyOverride(
  'ExtendedS3DestinationConfiguration.ProcessingConfiguration',
  {
    Enabled: true,
    Processors: [
      {
        Type: 'MetadataExtraction',
        Parameters: [
          {
            ParameterName: 'MetadataExtractionQuery',
            ParameterValue: '{year: .timestamp[:4],month: .timestamp[4:6],day: .timestamp[6:8],hour: .timestamp[8:10]}',
          },
          {
            ParameterName: 'JsonParsingEngine',
            ParameterValue: 'JQ-1.6',
          },
        ],
      },
      {
        Type: 'AppendDelimiterToRecord',
        Parameters: [
          {
            ParameterName: 'Delimiter',
            ParameterValue: '\\n',
          },
        ],
      },
    ],
  },
);
this.deliveryStream= deliverySteam

動作確認

データの送信

Raspberry Piからデータを送信します。
送信するデータは以下のようなJSON形式のデータとなります。

{
  "temperature": 26.26,
  "pressure": 1008.44,
  "humidity": 60.97,
  "timestamp": 20230530103003,
}

AWS IoTで受信したデータの確認

S3バケット内のデータを確認します。
以下の通り、年月日と時間のフォルダが作成され、その中にデータが格納されていることが確認できます。
データをダウンロードしてみます。 ダウンロードしたデータは、bufferingIntervalプロパティで指定した60秒間に取得した複数のJSONデータが、行で分かれて一つファイルとなっています。 これは想定通りのデータ構造です。

{"temperature": 26.13, "pressure": 1006.24, "humidity": 60.58, "timestamp": "20230530180002"}
{"temperature": 26.11, "pressure": 1006.18, "humidity": 60.14, "timestamp": "20230530180012"}
{"temperature": 26.09, "pressure": 1006.15, "humidity": 60.13, "timestamp": "20230530180022"}
{"temperature": 26.09, "pressure": 1006.26, "humidity": 60.11, "timestamp": "20230530180032"}
{"temperature": 26.1, "pressure": 1006.22, "humidity": 59.97, "timestamp": "20230530180042"}
{"temperature": 26.1, "pressure": 1006.22, "humidity": 59.75, "timestamp": "20230530180052"}
{"temperature": 26.09, "pressure": 1006.2, "humidity": 59.66, "timestamp": "20230530180102"}

以上、想定どおり動作し問題がないことが確認できました。