設備稼働データを AWS Lambda で BigQuery にストリーミング挿入してみた
背景
以前の記事では、工場などにある設備の稼働データを Amazon Timestream に保存していましたが、今回は データストアの比較検討のため、BigQuery に保存してみることにしました。
BigQuery へのデータ転送は、以前に下記の記事で試していましたが、今回はリアルタイムにデータ転送したかったので、ストリーミング API を使用して届いたデータを随時保存するようにしてみました。
構成
BigQuery へデータ挿入することと直接関係はありませんが、前回のような構成で、AWS IoT Core へ届いたデータを Lambda アクションで BigQuery に送る想定です。
BigQuery のデータセットとテーブルの作成
事前に データを格納する BigQuery のデータセットとテーブルを作成しておきます。
BigQuery のコンソールを開いて、対象プロジェクトを選択して「データセットを作成」をクリックすると、データセットの作成画面が開くので、必要な情報を入れて作ります。
同様にテーブルも作成しておきましょう。(手順は割愛します)
テーブルの作成時にスキーマを設定する画面では、書き込みたいデータに合わせて設定しておきます。
以前のデータ送信環境を想定しているので、今回は下記のように設定しました。環境に合わせて設定して下さい。
BIgQuery クライアントライブラリの利用開始
次に BigQuery にデータを書き込むためのサービスアカウント関連の作業を行います。
作業は下記の公式ドキュメントに沿って行います。
BigQuery API の有効化
最初に「プロジェクトセレクタに移動」をクリックします。
対象のプロジェクトを選択します。
選択したプロジェクトのダッシュボードが開きますが、後で使うのでブラウザでオープンにしたままにしておきます。
先程のページに戻って「API を有効にする」をクリックします。
開いたままにしておいた「プロジェクトのダッシュボード」の画面が次のように変わりますので、「次へ」をクリックします。
「有効にする」をクリックして、BigQuery API を有効にします。
有効にできたら、リンクがグレーアウトします。(クリックできなくなります)
サービスアカウントの作成
次に「サービスアカウント」を作成します。最初のページで「サービスアカウントの作成に移動」をクリックします。
クリックすると先ほどと同様にプロジェクトの選択画面が表示されるので、対象のプロジェクトを選択します。
次の画面でサービスアカウントの詳細を設定します。
- サービスアカウント名:適当なものを設定して下さい。
- 今回は
insert-data-from-aws
としました。
- 今回は
- サービスアカウントID:サービスアカウント名と同じものが自動入力されます。
- サービスアカウントの説明:適当な説明文を入力して下さい。
- 入力できたら「作成して続行」をクリックします。
プロジェクトへのアクセス権限を付与するには、サービスアカウントにロールを付与します。
「ロールを選択」のプルダウンをクリックして下さい。
ロールには、インサート API に必要な権限を持つ事前定義ロールとして「BigQuery データ編集者」を選択します。
選択して「続行」をクリックします。
最後に「完了」をクリックします。
サービスアカウントキーの作成
次に、Lambda による書き込みに必要な認証情報を「サービスアカウントキー」として取得します。
先程の作業で「完了」をクリック後に表示されるページで、作成済みのサービスアカウントが表示されているはずなので、クリックして開きます。
画面上部の「キー」タブを開いて、「鍵を追加」>「新しい鍵を作成」を順にクリックします。
「キーのタイプ」は「JSON」を選択して「作成」ボタンをクリックします。
ダウンロードして保存するウィンドウが開くので、PC の適当なディレクトリに保存して「閉じる」をクリックして閉じます。
以上で、Google Cloud 側の作業は完了です。
Lambda 関数の作成
次に Lambda 関数を作成します。今回は AWS SAM で作成します。
sam init \ --runtime python3.9 \ --name bigquery-insert-lambda \ --app-template hello-world \ --package-type Zip
認証ファイルの配置
上記の init 処理で作成した AWS SAM のプロジェクト用ディレクトリに、ダウンロード済みの 「GCP サービスアカウントキー」を配置して下さい。このキーファイルを Lambda 関数で環境変数として参照します。
配置する際は関数本体のコードと同じディレクトリに配置します。(下記の9行目)
. ├── README.md ├── __init__.py ├── events │ └── event.json ├── hello_world │ ├── __init__.py │ ├── app.py │ ├── lambda-bigquery-insert-key.json │ └── requirements.txt ├── template.yaml └── tests ├── __init__.py ├── integration │ ├── __init__.py │ └── test_api_gateway.py ├── requirements.txt └── unit ├── __init__.py └── test_handler.py
クライアントライブラリのインストール
BigQuery にアクセスするためのライブラリが必要なので、requirements.txt
に記載しておきます。
google-cloud-bigquery
想定する設備データ
今回も「以前の記事」と同じデータを想定しているので、そのデータ仕様に合わせた Lambda 関数を作成しています。
{ "timestamp":"2022-12-22T11:30:01.106+09:00", "accid": "123456", "seq_num":21, "device":"LMG-300", "version":"1.00", "name":"MACHINE01", "place":"Tokyo-001", "command":"report", "data_num":1, "data000":[ { "datetime":"2022-12-22T11:29:00.006+09:00", "value":{ "st":"000001", "D00":-1234, "D01":5678, "D02":1234, "D03":5648, "L00":1, "L01":0, "L02":1 } }, { "datetime":"2022-12-22T11:30:00.006+09:00", "value":{ "st":"000001", "D00":-1212, "D01":5656, "D02":2323, "D03":7878, "L00":1, "L01":1, "L02":0 } } ] }
Lambda のコード
下記は Google Cloud に掲載されている Python のサンプルコードです。
from google.cloud import bigquery # Construct a BigQuery client object. client = bigquery.Client() # TODO(developer): Set table_id to the ID of table to append to. # table_id = "your-project.your_dataset.your_table" rows_to_insert = [ {"full_name": "Phred Phlyntstone", "age": 32}, {"full_name": "Wylma Phlyntstone", "age": 29}, ] errors = client.insert_rows_json(table_id, rows_to_insert) # Make an API request. if errors == []: print("New rows have been added.") else: print("Encountered errors while inserting rows: {}".format(errors))
このサンプルコードを元に、今回のデータ要件(前回の記事参照)に合わせて作成したコードが下記になります。
from google.cloud import bigquery import json import logging import sys import time from datetime import datetime import os # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) streamHandler = logging.StreamHandler(stream=sys.stdout) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) # Google Application credential google_application_credentials = os.environ['GOOGLE_APPLICATION_CREDENTIALS'] # BigQuery データセットとテーブル google_cloud_project = os.environ['GOOGLE_CLOUD_PROJECT'] bigquery_dataset = os.environ['BIGQUERY_DATASET'] bigqyery_table = os.environ['BIGQUERY_TABLE'] table_id = google_cloud_project + "." + bigquery_dataset + "." + bigqyery_table # Measure Name measure_name = 'plc_metrics' # 1レコードあたりに入れるアイテムをデバイスデータから取得 def gen_record(event): device_name = event['name'] device_location = event['place'] # 'data000'のvalueだけ抽出 only_data000 = event['data000'] # 'data000'にある要素数を抽出(LMG300がデータ取得した回数をデータから読み取り) # 1つのJSONに含まれる LMG300が取得したデータの数。BigQueryに書き込みたいレコードの数 length_data000 = len(only_data000) #print("length data000: " + str(length_data000)) num = 0 json_rows = [] # BigQueryにinsertするレコード郡のリスト while num < length_data000: raw_datatime = event['data000'][num]['datetime'] # 2022-09-14T16:41:00.006+09:00 datetime_type_time = datetime.fromisoformat(raw_datatime) # datetime型に変換 : 2022-09-14 16:41:00.006000+09:00 unixtime_data = int(datetime_type_time.timestamp() * 1000) # ミリ秒のunixtimeに変換 ex.1663141260006 # valueの各要素の key:value を取得 json_obj = {'DeviceName': device_name, 'Location': device_location, 'measure_name': measure_name, 'time': raw_datatime} for item_name, item_value in event['data000'][num]['value'].items(): print(f'key:{item_name}, value:{item_value}') json_obj[item_name] = item_value # {'st': '0001', 'D00': '1234', ・・・} print(json_obj) num += 1 # BigQueryにインサートする全レコードを生成 # [{'st': '0001', 'D00': '1234'}, {'st': '0002', 'D00': '5678'}, ・・・] json_rows.append(json_obj) return(json_rows) def lambda_handler(event, context): client = bigquery.Client() insert_json = gen_record(event) print(insert_json) errors = client.insert_rows_json(table_id, insert_json) # Make an API request. if errors == []: print("New rows have been added.") else: print("Encountered errors while inserting rows: {}".format(errors)) return
AWS SAM で関数作成
準備ができたので Lambda 関数をデプロイします。
AWS SAM でデプロイされるスタックを扱いやすくするため、いくつかパラメーターを環境変数に指定してデプロイします。
下記は Mac の zsh で環境変数をセットする内容になります。ご利用の環境に応じて適宜変更してください。
TOPIC=bigquery/test PROJECT_PREFIX=sample_project GOOGLE_CLOUD_PROJECT=sample_google_project BIGQUERY_DATASET=sample_dataset BIGQUERY_TABLE=sample_table
各パラメーターの意味は次のとおりです。
項目名 | 設定内容 |
---|---|
TOPIC |
AWS IoT Core でサブスクライブする対象のトピック名 |
PROJECT_PREFIX |
作成される各種 AWS リソースを識別するプレフィックス |
GOOGLE_CLOUD_PROJECT |
Google Cloud で利用している対象のプロジェクト名 |
BIGQUERY_DATASET |
対象の BigQuery データセット |
BIGQUERY_TABLE |
対象の BigQuery テーブル |
環境変数が設定できたらデプロイします。--parameter-overrides
でパラメーターに環境変数を渡しています。
sam package \ --output-template-file packaged.yaml \ --s3-bucket [アーティファクト用の S3 バケット名] sam deploy \ --template-file packaged.yaml \ --stack-name bigquery-insert-lambda-stak \ --s3-bucket [アーティファクトがある S3 バケット名] \ --capabilities CAPABILITY_NAMED_IAM \ --no-fail-on-empty-changeset \ --parameter-overrides \ Topic=${TOPIC} \ ProjectPrefix=${PROJECT_PREFIX} \ GoogleCloudProject=${GOOGLE_CLOUD_PROJECT} \ BigQueryDataset=${BIGQUERY_DATASET} \ BiqQuieryTable=${BIGQUERY_TABLE}
動作確認
ダミーデータを使って、BigQuery に書き込みできるか確認してみましょう。次のようなデータを Lambda のコンソールからセットしてテストします。
{ "timestamp": "2023-01-10T11:31:01.106+09:00", "accid": "123456", "seq_num": 21, "device": "LMG-300", "version": "1.00", "name": "MACHINE03", "place": "Tokyo-001", "command": "report", "data_num": 1, "data000": [ { "datetime": "2023-01-10T11:30:00.006+09:00", "value": { "st": "000001", "D00": -1234, "D01": 5678, "D02": 1234, "D03": 5648, "L00": 1, "L01": 0, "L02": 1 } }, { "datetime": "2023-01-10T11:31:00.006+09:00", "value": { "st": "000001", "D00": -1212, "D01": 5656, "D02": 2323, "D03": 7878, "L00": 1, "L01": 1, "L02": 0 } } ] }
BigQuery のコンソールから確認してみます。送信した 2 つ分のレコードが格納されていることが確認できました。
イベント駆動の Storage Transfer Service という選択肢
今回は Lambda 関数を使っていますが、個人的には Lambda を含めコンピュート環境の運用はできるだけ行いたくありません。
年末に見た下記の機能がGA になって要件を満たすようであれば、将来的に切り替えたいと思います。
最後に
Lambda 関数の部分は、IoT ゲートウェイである「LMG-300」の仕様に合わせたものなので、汎用性はあまりありませんが、ストリーミング API の使い方は把握できました。
また、今回は BigQuery への認証情報を環境変数として渡しています。
今回はこの設定で困ることがなかったのですが、必要に応じて AWS Systems Manager のパラメーターストアや AWS Secrets Manager の利用を検討してみようと思います。
以上です。