BigQuery subscriptionsを使ってPub/Subと直接連携する
データアナリティクス事業本部、池田です。東京まだ暑いです。
少し前にBigQuery subscriptionsという機能がリリースされていたので使ってみました。
【 BigQuery release notes > July 28, 2022 】
( Pub/Sub からBigQueryへのデータ投入は、 これまでは何か間に噛ませて行う必要がありました。 例えば、 Dataflow を噛ませる場合は、 この辺のガイド に従って実装が必要そうです。)
サンプルデータ
今回はサンプルとして、 一般に公開されているPub/Subのトピックの NYC Taxi Tycoon を使ってみます。 タクシーの乗降データのようです。
トピックに1度にパブリッシュされるメッセージは↓の感じのJSONです。(整形したものを記載。)
{ "ride_id": "cae9e683-aa43-49e2-98dc-9426731dfc52", "point_idx": 290, "latitude": 40.749930000000006, "longitude": -73.98029000000001, "timestamp": "2022-09-07T02:13:13.91666-04:00", "meter_reading": 6.1052628, "meter_increment": 0.02105263, "ride_status": "enroute", "passenger_count": 1 }
サブスクリプションの作成
Pub/Subトピックは前章のもの( projects/pubsub-public-data/topics/taxirides-realtime
)を使うので、今回は投入先のテーブルと
公式のガイド を参考に
サブスクリプションを作成します。
今回は公開されているトピックを使用するため
トピック スキーマ は使いません。
メタデータ (Pub/Subの情報)は書き込むパターンと書き込まないパターンを試してみます。
テーブル作成
先にBigQueryのデータセットは↓こんな感じで作成。
CREATE SCHEMA subscriptions OPTIONS( location="US" );
メタデータを書き込むパターンのテーブルは ガイドのパラメータ に従ってカラムを定義します。
CREATE TABLE subscriptions.sample_with_meta ( subscription_name STRING, message_id STRING, publish_time TIMESTAMP, data STRING, attributes STRING ) PARTITION BY DATE(publish_time);
(一応なんとなく publish_time
でパーティションしておきました。)
メタデータを書き込まないパターンは data
だけあれば良いので、テーブルは↓こんな感じ。
CREATE TABLE subscriptions.sample_no_meta ( data STRING );
どちらもPub/Subのメッセージが data
カラムに格納される想定です。
権限付与
サブスクリプション作成のガイド に従ってPub/Subのサービスアカウントにロールを紐づけます。
To create a BigQuery subscription, the Pub/Sub service account must have permission to write to the specific BigQuery table and to read the table metadata.
Grant the BigQuery Data Editor (roles/bigquery.dataEditor) role and the BigQuery Metadata Viewer (roles/bigquery.metadataViewer) role to the Pub/Sub service account.
BigQuery データ編集者
と BigQuery メタデータ閲覧者
が必要とのこと。
サブスクリプション作成
gcloudコマンド で作成します。
↓まずはメタデータを書き込むパターンのサブスクリプションをコマンドで作成。
gcloud pubsub subscriptions create sample-with-meta-sub \ --topic=projects/pubsub-public-data/topics/taxirides-realtime \ --bigquery-table=my-project.subscriptions.sample_with_meta \ --write-metadata
ちなみに、上記のコマンドを発行する時点で、前節の権限がサービスアカウントに無いと、 以下のようなエラーになりました。
ERROR: Failed to create subscription [projects/my-project/subscriptions/sample-with-meta-sub]: The caller does not have permission. ERROR: (gcloud.pubsub.subscriptions.create) Failed to create the following: [sample-with-meta-sub].
作成されたものをコンソールから確認すると、↓こんな感じでした。
画面からはBigQuery関連の設定値は確認できない?みたいです(執筆時点)。
作成するとすぐにBigQueryの作成しておいたテーブルへデータが投入され始めました。
data
カラムにSTRING型でJSONが格納されています。
↓次にメタデータを書き込まないパターンのコマンド。
gcloud pubsub subscriptions create sample-no-meta-sub \ --topic=projects/pubsub-public-data/topics/taxirides-realtime \ --bigquery-table=my-project.subscriptions.sample_no_meta
↓サブスクリプション作成後のテーブル。
文字列のままだと見づらいので、VIEWでJSON部分を各カラムに分解してみます。 (メタデータを書き込むパターンのテーブルで。)
CREATE VIEW subscriptions.sample_view AS SELECT JSON_VALUE(data, '$.ride_id') AS ride_id, SAFE_CAST(JSON_VALUE(data, '$.point_idx') AS INT64) AS point_idx, SAFE_CAST(JSON_VALUE(data, '$.latitude') AS BIGNUMERIC) AS latitude, SAFE_CAST(JSON_VALUE(data, '$.longitude') AS BIGNUMERIC) AS longitude, SAFE_CAST(JSON_VALUE(data, '$.timestamp') AS TIMESTAMP) AS timestamp, SAFE_CAST(JSON_VALUE(data, '$.meter_reading') AS NUMERIC) AS meter_reading, SAFE_CAST(JSON_VALUE(data, '$.meter_increment') AS NUMERIC) AS meter_increment, JSON_VALUE(data, '$.ride_status') AS ride_status, SAFE_CAST(JSON_VALUE(data, '$.passenger_count') AS INT64) AS passenger_count FROM subscriptions.sample_with_meta WHERE DATE(publish_time) >= CURRENT_DATE() ;
今回のブログのようにトピックスキーマを使わない場合、 実際に利用する際は、テーブルの更新頻度が高そうなので、何でJSONを分解するか検討が必要そう… max_staleness とかの使いどころなのかな…?
おわりに
簡単に設定できましたし、BigQueryへの連携が楽になる良い機能だと思いました。
(でも私の周りでストリーミング的なデータあんまり無いんだよな…)