この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
データアナリティクス事業本部、池田です。東京まだ暑いです。
少し前にBigQuery subscriptionsという機能がリリースされていたので使ってみました。
【 BigQuery release notes > July 28, 2022 】
( Pub/Sub からBigQueryへのデータ投入は、 これまでは何か間に噛ませて行う必要がありました。 例えば、 Dataflow を噛ませる場合は、 この辺のガイド に従って実装が必要そうです。)
サンプルデータ
今回はサンプルとして、 一般に公開されているPub/Subのトピックの NYC Taxi Tycoon を使ってみます。 タクシーの乗降データのようです。
トピックに1度にパブリッシュされるメッセージは↓の感じのJSONです。(整形したものを記載。)
{
"ride_id": "cae9e683-aa43-49e2-98dc-9426731dfc52",
"point_idx": 290,
"latitude": 40.749930000000006,
"longitude": -73.98029000000001,
"timestamp": "2022-09-07T02:13:13.91666-04:00",
"meter_reading": 6.1052628,
"meter_increment": 0.02105263,
"ride_status": "enroute",
"passenger_count": 1
}
サブスクリプションの作成
Pub/Subトピックは前章のもの( projects/pubsub-public-data/topics/taxirides-realtime
)を使うので、今回は投入先のテーブルと
公式のガイド を参考に
サブスクリプションを作成します。
今回は公開されているトピックを使用するため
トピック スキーマ は使いません。
メタデータ (Pub/Subの情報)は書き込むパターンと書き込まないパターンを試してみます。
テーブル作成
先にBigQueryのデータセットは↓こんな感じで作成。
CREATE SCHEMA subscriptions
OPTIONS(
location="US"
);
メタデータを書き込むパターンのテーブルは ガイドのパラメータ に従ってカラムを定義します。
CREATE TABLE subscriptions.sample_with_meta (
subscription_name STRING,
message_id STRING,
publish_time TIMESTAMP,
data STRING,
attributes STRING
)
PARTITION BY DATE(publish_time);
(一応なんとなく publish_time
でパーティションしておきました。)
メタデータを書き込まないパターンは data
だけあれば良いので、テーブルは↓こんな感じ。
CREATE TABLE subscriptions.sample_no_meta (
data STRING
);
どちらもPub/Subのメッセージが data
カラムに格納される想定です。
権限付与
サブスクリプション作成のガイド に従ってPub/Subのサービスアカウントにロールを紐づけます。
To create a BigQuery subscription, the Pub/Sub service account must have permission to write to the specific BigQuery table and to read the table metadata.
Grant the BigQuery Data Editor (roles/bigquery.dataEditor) role and the BigQuery Metadata Viewer (roles/bigquery.metadataViewer) role to the Pub/Sub service account.
BigQuery データ編集者
と BigQuery メタデータ閲覧者
が必要とのこと。
サブスクリプション作成
gcloudコマンド で作成します。
↓まずはメタデータを書き込むパターンのサブスクリプションをコマンドで作成。
gcloud pubsub subscriptions create sample-with-meta-sub \
--topic=projects/pubsub-public-data/topics/taxirides-realtime \
--bigquery-table=my-project.subscriptions.sample_with_meta \
--write-metadata
ちなみに、上記のコマンドを発行する時点で、前節の権限がサービスアカウントに無いと、 以下のようなエラーになりました。
ERROR: Failed to create subscription [projects/my-project/subscriptions/sample-with-meta-sub]: The caller does not have permission.
ERROR: (gcloud.pubsub.subscriptions.create) Failed to create the following: [sample-with-meta-sub].
作成されたものをコンソールから確認すると、↓こんな感じでした。
画面からはBigQuery関連の設定値は確認できない?みたいです(執筆時点)。
作成するとすぐにBigQueryの作成しておいたテーブルへデータが投入され始めました。
data
カラムにSTRING型でJSONが格納されています。
↓次にメタデータを書き込まないパターンのコマンド。
gcloud pubsub subscriptions create sample-no-meta-sub \
--topic=projects/pubsub-public-data/topics/taxirides-realtime \
--bigquery-table=my-project.subscriptions.sample_no_meta
↓サブスクリプション作成後のテーブル。
文字列のままだと見づらいので、VIEWでJSON部分を各カラムに分解してみます。 (メタデータを書き込むパターンのテーブルで。)
CREATE VIEW subscriptions.sample_view AS
SELECT
JSON_VALUE(data, '$.ride_id') AS ride_id,
SAFE_CAST(JSON_VALUE(data, '$.point_idx') AS INT64) AS point_idx,
SAFE_CAST(JSON_VALUE(data, '$.latitude') AS BIGNUMERIC) AS latitude,
SAFE_CAST(JSON_VALUE(data, '$.longitude') AS BIGNUMERIC) AS longitude,
SAFE_CAST(JSON_VALUE(data, '$.timestamp') AS TIMESTAMP) AS timestamp,
SAFE_CAST(JSON_VALUE(data, '$.meter_reading') AS NUMERIC) AS meter_reading,
SAFE_CAST(JSON_VALUE(data, '$.meter_increment') AS NUMERIC) AS meter_increment,
JSON_VALUE(data, '$.ride_status') AS ride_status,
SAFE_CAST(JSON_VALUE(data, '$.passenger_count') AS INT64) AS passenger_count
FROM
subscriptions.sample_with_meta
WHERE
DATE(publish_time) >= CURRENT_DATE()
;
今回のブログのようにトピックスキーマを使わない場合、 実際に利用する際は、テーブルの更新頻度が高そうなので、何でJSONを分解するか検討が必要そう… max_staleness とかの使いどころなのかな…?
おわりに
簡単に設定できましたし、BigQueryへの連携が楽になる良い機能だと思いました。
(でも私の周りでストリーミング的なデータあんまり無いんだよな…)