AWS IoTで取得した情報をKinesis Data FirehoseでまとめてS3に保存してみた

2021.01.04

こんにちは、コンサル部@大阪オフィスのTodaです。

前回M5Stick CPlusから情報を発信、AWS IoT Coreで受信するまでを試してみました。
今回は受信した情報をKinesis Data Firehoseに送り、一定時間情報をまとめてからS3に内容を保存するようにしてみます。

やりたいこと

  • AWS IoT Coreで受信した情報をKinesis Data Firehoseで貯める。
  • Kinesis Data Firehoseで貯めた情報をS3に保存する。

やりたいこと

前提条件

  • 情報を保存するS3バケットは作成済みの状態から始める。
  • IoT機器の認証ならびにモノ登録ができている状態から始める。

AWS IoT CoreとM5Stick CPlusの接続方法については過去記事をご覧ください。

Kinesis Data Firehoseがどのようなサービスかを確認する場合は、下記記事をご覧ください。

やってみる

Kinesis Data Firehoseを作成

Kinesis Data Firehoseの設定からおこないます。
サービスメニューから[Kinesis]を選択します。
左メニューから[Data Firehose(配信ストリーム)]を選択して画面内に表示される[Create Delivery Stream]をクリックします。

Kinesis Data Firehoseを作成

配信ストリームの名称、ソース設定

配信ストリームの名称とソースの受取、情報を暗号化するかの設定をおこないます。
今回は暗号化はせず、平文にて保存するように設定します。
入力できれば画面下の[Next]をクリックします。

配信ストリームの名称、ソース設定

レコードの変換設定

Kinesis Data Firehoseに送られてきた情報をS3等に配信する前に変換処理をする場合、設定をおこないます。 Lambdaを利用した変換か、JSONからApache ParquetまたはApache ORCに変換することができます。
今回は変換せずS3に送るため選択肢は変更せず、画面下の[Next]をクリックします。

配信先設定

Kinesis Data Firehoseに送られてきた情報を配信する先の指定を行います。
配信先として S3, Redshift, Elasticsearch, HTTP Endpointなどがあります。
今回は S3 を選択して作成済みのバケットを選択します。
選択後は画面下の[Next]をクリックします。

配信先設定

バッファ、圧縮、ロギング、権限の設定

Kinesis Data Firehoseで蓄える情報を配信する条件をバッファ設定にておこないます。
バッファの条件は蓄えた容量(MiB)または期間(秒)で指定します。
今回は1分間情報を蓄えてS3に保存するようにしたいため Buffer intervalの値を60に設定します。
後の設定は変更せず、画面下の[Next]をクリックします。

バッファ、圧縮、ロギング、権限の設定

確認画面

Kinesis Data Firehoseの設定内容を確認する画面が表示されます。
設定内容を確認の上、画面下の[Create delivery stream]をクリックします。

作成完了

作成が終わりますと、レコードが1行追加されます。
完了時点では、Statusが準備中の状態になっていますのでActiveに表示が変わるまで待ちます。

Kinesis Data Firehose作成完了

上記にてKinesis Data Firehoseの準備は完了です。

IoT CoreのACTルールを作成

IoT CoreにKinesis Data Firehoseへの配信設定を作成します。
サービスメニューから[IoT Core]を選択します。
左メニューから[ACT] > [ルール]を選択して画面内に表示される[ルールの作成]をクリックします。

IoT CoreのACTルールを作成

ルールの設定

モノから受信した情報をどのように処理するかのルールを設定します。
ルール名とルールクエリステートメントを設定します。

ルールの設定

ルールクエリステートメントはSQL文に似た構文を利用してIoT機器から受信した情報を抽出、変換する用途に利用します。
今回IoT機器からはJSON形式でenv配列の中に要素が入っているため、env内の要素を指定するようにします。

・IoT機器から送信される内容

{
  "env": {
    "clientid": "M5STICK-01",
    "temperture": 20.42039,
    "humidity": 38.05753,
    "pressure": 1028.938,
    "vbat": 4.158
  }
}

・ルールクエリステートメントの指定

SELECT env.* FROM '/iot/M5STICK'
or
SELECT env.clientid, env.temperture FROM '/iot/M5STICK'

「1 つ以上のアクションを設定する」の[アクションの追加]をクリックします。

アクションの追加

IoT機器から情報受信時のアクションを指定します。
今回は[Kinesis Data Firehoseストリームにメッセージを送信する]を選択します。
選択後、画面下の[アクションの設定]をクリックします。

アクションの設定

アクションの設定

送信するAmazon Kinesis Firehoseの指定と、レコード間の区切り文字を指定します。
区切り文字は改行(\n)を選択します。
IAMロールはIoTの情報アクセス用のロールを利用します。
選択後、画面下の[アクションの追加]をクリックします。

アクションの追加確認

アクションの追加が正常にできると項目が追加されます。
確認後、画面下の[ルールの作成]をクリックします。

アクションの追加確認

上記にてIoT CoreとKinesis Data Firehoseの設定は完了です。

確認してみる

Kinesis Data Firehoseのモニタリングを確認

Kinesis Data Firehoseの一覧から作成した配信ストリームを選択して、タブで[Monitoring]をクリックします。

確認してみる1

表示されるグラフを確認するとIoTから情報を受け取っている事が確認できます。

確認してみる2

S3の内容を確認

S3には年/月/日/時(UTC)単位でフォルダが作成され、その中にKinesis Data Firehoseからの情報が保存されます。

S3の内容を確認

保存されているファイル内容を確認するとIoT機器から送信されている内容が保存されている事を確認しました。

{"clientid":"M5STICK-01","temperture":23.60342,"humidity":31.38476,"pressure":1028.655,"vbat":4.1558}
{"clientid":"M5STICK-01","temperture":23.53399,"humidity":31.30388,"pressure":1028.67,"vbat":4.1558}
{"clientid":"M5STICK-01","temperture":23.49126,"humidity":31.2993,"pressure":1028.721,"vbat":4.1569}
{"clientid":"M5STICK-01","temperture":23.43519,"humidity":31.35424,"pressure":1028.741,"vbat":4.1558}
{"clientid":"M5STICK-01","temperture":23.39246,"humidity":31.39849,"pressure":1028.698,"vbat":4.1558}
{"clientid":"M5STICK-01","temperture":23.36576,"humidity":31.45495,"pressure":1028.72,"vbat":4.1558}

時間を追加する

上記にてS3に保存するまでの対応ができたのですが、見ると保存されている情報に日時の情報がないことに気がつきました。
IoT Coreのルールクエリステートメントを調整して時間の要素を追加してみます。

IoT Core ACTルールの変更

作成したルールの詳細からルールクエリステートメントの編集をクリックします。
SQL文に日時を出力するように変更します。

SELECT parse_time("yyyy-MM-dd HH:mm:ss", timestamp(), "Asia/Tokyo") as timestamp, env.* FROM '/iot/M5STICK'
or
SELECT parse_time("yyyy-MM-dd HH:mm:ss", timestamp(), "Asia/Tokyo") as timestamp, env.clientid, env.temperture FROM '/iot/M5STICK'

S3の出力結果

ルールクエリステートメントを変更する事で日本時間がレコードに追加されるようになりました。

{"timestamp":"2021-01-04 21:52:01","clientid":"M5STICK-01","temperture":22.1828,"humidity":34.15885,"pressure":1027.703,"vbat":4.147}
{"timestamp":"2021-01-04 21:52:12","clientid":"M5STICK-01","temperture":22.30831,"humidity":34.00473,"pressure":1027.713,"vbat":4.1514}
{"timestamp":"2021-01-04 21:52:22","clientid":"M5STICK-01","temperture":22.50858,"humidity":33.76516,"pressure":1027.665,"vbat":4.147}
{"timestamp":"2021-01-04 21:52:33","clientid":"M5STICK-01","temperture":22.62074,"humidity":33.51949,"pressure":1027.651,"vbat":4.147}
{"timestamp":"2021-01-04 21:52:44","clientid":"M5STICK-01","temperture":22.77295,"humidity":33.38674,"pressure":1027.626,"vbat":4.1492}
{"timestamp":"2021-01-04 21:52:55","clientid":"M5STICK-01","temperture":22.91447,"humidity":33.09834,"pressure":1027.604,"vbat":4.1481}

さいごに

今回はAWS IoT Coreで受信した情報をKinesis Data Firehose、S3に保存してみました。
Kinesis Data Firehoseを利用する事で多くのIoT機器からアクセスされる状況にてS3ファイルが乱立することを防ぐことができます。
他のAWSサービス連携も試してみたいと思います。