ESP32をAWSに接続してみた(4) Lambdaでデータ変換
こんにちは。CX事業本部IoT事業部のアッキーです。前回まででESP32から送信されたデータをOpenSearchで可視化することができました。今回はKinesis Data FirehoseとLambdaを使ってデータを加工し、OpenSearchで表示してみます。
前回の記事
構成図
IoT Coreのルールを追加し、Kinesis Data Firehoseを通してLambdaでデータを加工し、OpenSearchに送信します。
電圧→ほこり濃度(粉塵濃度)の変換方法について
いままで、ほこり量はセンサからADコンバータで取得した電圧をミリボルト単位でそのまま送信・表示していました。
GP2Y1010AU0Fのデータシートには、電圧を粉塵濃度に変換するグラフが記載されています。
SHARP GP2Y1010AU0Fデータシートより
このグラフをWebPlotDigitizerで数値化しました。簡単に線形補間して値を変換します。
Lambdaの追加
Lambdaのコンソールを開き、関数の作成をクリックします。
「一から作成」を選択し、関数名を入力します。
今回は処理をPython3で書くこととして、最新のランタイムであるPython 3.9を選択しました。
アーキテクチャはx86_64, arm64のどちらでもいいのですが、利用料の安い(おそらく無料で済みますが)arm64を選びました。
ソースコードは以下の通りです。コードを張り付けて、Deployをクリックしてデプロイします。
入力されたデータからparticleを読み取り、dust_densityに変換して出力します。
import base64 import json def linear_interpolation(input_mv): sample_mv = [915.9192825, 1057.174888, 1410.313901, 1818.38565, 2171.524664, 2516.816143, 2885.650224, 3207.399103, 3482.06278, 3623.318386, 3654.70852] sample_mg = [0, 41.80138568, 100, 169.2840647, 223.3256351, 278.7528868, 334.1801386, 384.0646651, 446.4203233, 525.404157, 619.630485] if input_mv <= sample_mv[0]: return sample_mg[0] if input_mv >= sample_mv[-1]: return sample_mg[-1] v0 = 0 v1 = 0 d0 = 0 d1 = 0 for i, mv in enumerate(sample_mv[0:-1]): if input_mv >= mv: v0 = sample_mv[i] v1 = sample_mv[i + 1] d0 = sample_mg[i] d1 = sample_mg[i + 1] return d0 + (d1 - d0) * (input_mv - v0) / (v1 - v0) def lambda_handler(event, context): output = [] for record in event['records']: print(record['recordId']) payload = base64.b64decode(record['data']).decode() data = json.loads(payload) particle_mv = data['particle'] dust_density = linear_interpolation(particle_mv) new_data = dict() new_data['dust_density'] = dust_density new_data['principal'] = data['principal'] new_data['server_timestamp'] = data['server_timestamp'] payload = json.dumps(new_data).encode() output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode(payload) } output.append(output_record) print('Successfully processed {} records.'.format(len(event['records']))) return {'records': output}
Kinesis Data Firehoseの追加
IoT CoreとOpenSearchの間に割り込ませるKinesis Data Firehoseを追加します。
SourceにはDirect PUT(IoT Coreから送信されます)、DestinationにはAmazon OpenSearch Serviceを選択します。
Transform recordsのData transformationをEnabledにし、Browseをクリックして先ほど作成したLambdaを指定します。
ルールの追加
IoT Coreにルールを追加します。
ルールクエリステートメントは、particleのみを送信するため、以下のように記入します。
select principal() as principal, parse_time("yyyy-MM-dd'T'HH:mm:ss.SSSZ", timestamp()) as server_timestamp, particle from 'node/airsensor'
ストリーム名には先ほど作成したKinesis data firehoseストリームを指定します。
OpenSearchの設定
すでにデータのインデックス化が行われている状態では、新たに登録したdust_densityがインデックス化されずグラフ化できないことがあります。
その場合は、メニューからStack Managementを開き、Index patternsからserver_timestampをクリックし、dust_densityの右側の編集アイコンから、FormatをNumberにしてください。これで前項と同じようにグラフ化できるようになります。
左側が今回追加したほこり濃度のグラフになります。通常の状態ではほとんどゼロですが、データとして送信されていることが分かります。
このデータはIoT Coreから直接送信されているデータとは異なり、5分程度遅れて処理されます。
おわりに
IoT Coreでのデータ送信、OpenSearchでの可視化、そしてLambdaによるデータの加工まで一通りの処理を、すべてAWSのマネージドサービスを使って実現することができました!
ひとまずこのシリーズは終わりますが、今後もIoT Coreから他のサービスへの連携等をご紹介していきたいと思います。