ESP32をAWSに接続してみた(4) Lambdaでデータ変換

2022.04.12

こんにちは。CX事業本部IoT事業部のアッキーです。前回まででESP32から送信されたデータをOpenSearchで可視化することができました。今回はKinesis Data FirehoseとLambdaを使ってデータを加工し、OpenSearchで表示してみます。

前回の記事

ESP32をAWSに接続してみた(3) OpenSearchの設定

構成図

IoT Coreのルールを追加し、Kinesis Data Firehoseを通してLambdaでデータを加工し、OpenSearchに送信します。

電圧→ほこり濃度(粉塵濃度)の変換方法について

いままで、ほこり量はセンサからADコンバータで取得した電圧をミリボルト単位でそのまま送信・表示していました。

GP2Y1010AU0Fのデータシートには、電圧を粉塵濃度に変換するグラフが記載されています。

SHARP GP2Y1010AU0Fデータシートより

このグラフをWebPlotDigitizerで数値化しました。簡単に線形補間して値を変換します。

Lambdaの追加

Lambdaのコンソールを開き、関数の作成をクリックします。

「一から作成」を選択し、関数名を入力します。

今回は処理をPython3で書くこととして、最新のランタイムであるPython 3.9を選択しました。

アーキテクチャはx86_64, arm64のどちらでもいいのですが、利用料の安い(おそらく無料で済みますが)arm64を選びました。

ソースコードは以下の通りです。コードを張り付けて、Deployをクリックしてデプロイします。

入力されたデータからparticleを読み取り、dust_densityに変換して出力します。

index.py

import base64
import json

def linear_interpolation(input_mv):
    sample_mv = [915.9192825, 1057.174888, 1410.313901, 1818.38565, 2171.524664, 2516.816143, 2885.650224, 3207.399103, 3482.06278, 3623.318386, 3654.70852]
    sample_mg = [0, 41.80138568, 100, 169.2840647, 223.3256351, 278.7528868, 334.1801386, 384.0646651, 446.4203233, 525.404157, 619.630485]
    if input_mv <= sample_mv[0]:
        return sample_mg[0]
    if input_mv >= sample_mv[-1]:
        return sample_mg[-1]
    v0 = 0
    v1 = 0
    d0 = 0
    d1 = 0
    for i, mv in enumerate(sample_mv[0:-1]):
        if input_mv >= mv:
            v0 = sample_mv[i]
            v1 = sample_mv[i + 1]
            d0 = sample_mg[i]
            d1 = sample_mg[i + 1]
    return d0 + (d1 - d0) * (input_mv - v0) / (v1 - v0)
    
def lambda_handler(event, context):
    output = []

    for record in event['records']:
        print(record['recordId'])
        payload = base64.b64decode(record['data']).decode()
        
        data = json.loads(payload)
        particle_mv = data['particle']
        dust_density = linear_interpolation(particle_mv)
        
        new_data = dict()
        new_data['dust_density'] = dust_density
        new_data['principal'] = data['principal']
        new_data['server_timestamp'] = data['server_timestamp']
        payload = json.dumps(new_data).encode()

        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(payload)
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}

Kinesis Data Firehoseの追加

IoT CoreとOpenSearchの間に割り込ませるKinesis Data Firehoseを追加します。

SourceにはDirect PUT(IoT Coreから送信されます)、DestinationにはAmazon OpenSearch Serviceを選択します。

Transform recordsのData transformationをEnabledにし、Browseをクリックして先ほど作成したLambdaを指定します。

Destination settingsでは、BrowseをクリックしてOpenSearchのドメインを指定します。Indexにはserver_timestampを入力します。
失敗時のデータを入れるS3バケットを作る必要がありますので、Createして指定してください。
Create delivery streamをクリックして完了します。

ルールの追加

IoT Coreにルールを追加します。

ルールクエリステートメントは、particleのみを送信するため、以下のように記入します。

select principal() as principal, parse_time("yyyy-MM-dd'T'HH:mm:ss.SSSZ", timestamp()) as server_timestamp, particle from 'node/airsensor'

ストリーム名には先ほど作成したKinesis data firehoseストリームを指定します。

OpenSearchの設定

すでにデータのインデックス化が行われている状態では、新たに登録したdust_densityがインデックス化されずグラフ化できないことがあります。

その場合は、メニューからStack Managementを開き、Index patternsからserver_timestampをクリックし、dust_densityの右側の編集アイコンから、FormatをNumberにしてください。これで前項と同じようにグラフ化できるようになります。

左側が今回追加したほこり濃度のグラフになります。通常の状態ではほとんどゼロですが、データとして送信されていることが分かります。

このデータはIoT Coreから直接送信されているデータとは異なり、5分程度遅れて処理されます。

おわりに

IoT Coreでのデータ送信、OpenSearchでの可視化、そしてLambdaによるデータの加工まで一通りの処理を、すべてAWSのマネージドサービスを使って実現することができました!

ひとまずこのシリーズは終わりますが、今後もIoT Coreから他のサービスへの連携等をご紹介していきたいと思います。

参考文献

秋月電子 SHARP GP2Y1010AU0Fデータシート