BigQuery subscriptionsを使ってPub/Subと直接連携する

BigQuery subscriptionsを利用し、Pub/Subから直接BigQueryへデータを投入する。 サンプルデータとしてPub/Subの公開トピックのNYC Taxi Tycoonを利用。
2022.09.07

データアナリティクス事業本部、池田です。東京まだ暑いです。

少し前にBigQuery subscriptionsという機能がリリースされていたので使ってみました。
BigQuery release notes > July 28, 2022

Pub/Sub からBigQueryへのデータ投入は、 これまでは何か間に噛ませて行う必要がありました。 例えば、 Dataflow を噛ませる場合は、 この辺のガイド に従って実装が必要そうです。)

サンプルデータ

今回はサンプルとして、 一般に公開されているPub/SubのトピックNYC Taxi Tycoon を使ってみます。 タクシーの乗降データのようです。

トピックに1度にパブリッシュされるメッセージは↓の感じのJSONです。(整形したものを記載。)

{
    "ride_id": "cae9e683-aa43-49e2-98dc-9426731dfc52",
    "point_idx": 290,
    "latitude": 40.749930000000006,
    "longitude": -73.98029000000001,
    "timestamp": "2022-09-07T02:13:13.91666-04:00",
    "meter_reading": 6.1052628,
    "meter_increment": 0.02105263,
    "ride_status": "enroute",
    "passenger_count": 1
}

サブスクリプションの作成

Pub/Subトピックは前章のもの( projects/pubsub-public-data/topics/taxirides-realtime )を使うので、今回は投入先のテーブルと 公式のガイド を参考に サブスクリプションを作成します。

今回は公開されているトピックを使用するため トピック スキーマ は使いません。
メタデータ (Pub/Subの情報)は書き込むパターンと書き込まないパターンを試してみます。

テーブル作成

先にBigQueryのデータセットは↓こんな感じで作成。

CREATE SCHEMA subscriptions
OPTIONS(
    location="US"
);

メタデータを書き込むパターンのテーブルガイドのパラメータ に従ってカラムを定義します。

CREATE TABLE subscriptions.sample_with_meta (
    subscription_name STRING,
    message_id STRING,
    publish_time TIMESTAMP,
    data STRING,
    attributes STRING
)
PARTITION BY DATE(publish_time);

(一応なんとなく publish_time でパーティションしておきました。)


メタデータを書き込まないパターンは data だけあれば良いので、テーブルは↓こんな感じ。

CREATE TABLE subscriptions.sample_no_meta (
    data STRING
);

どちらもPub/Subのメッセージが data カラムに格納される想定です。

権限付与

サブスクリプション作成のガイド に従ってPub/Subのサービスアカウントにロールを紐づけます。

To create a BigQuery subscription, the Pub/Sub service account must have permission to write to the specific BigQuery table and to read the table metadata.

Grant the BigQuery Data Editor (roles/bigquery.dataEditor) role and the BigQuery Metadata Viewer (roles/bigquery.metadataViewer) role to the Pub/Sub service account.

BigQuery データ編集者BigQuery メタデータ閲覧者 が必要とのこと。

サブスクリプション作成

gcloudコマンド で作成します。

↓まずはメタデータを書き込むパターンのサブスクリプションをコマンドで作成。

gcloud pubsub subscriptions create sample-with-meta-sub \
--topic=projects/pubsub-public-data/topics/taxirides-realtime \
--bigquery-table=my-project.subscriptions.sample_with_meta \
--write-metadata

ちなみに、上記のコマンドを発行する時点で、前節の権限がサービスアカウントに無いと、 以下のようなエラーになりました。

ERROR: Failed to create subscription [projects/my-project/subscriptions/sample-with-meta-sub]: The caller does not have permission.
ERROR: (gcloud.pubsub.subscriptions.create) Failed to create the following: [sample-with-meta-sub].

作成されたものをコンソールから確認すると、↓こんな感じでした。

画面からはBigQuery関連の設定値は確認できない?みたいです(執筆時点)。

作成するとすぐにBigQueryの作成しておいたテーブルへデータが投入され始めました。

data カラムにSTRING型でJSONが格納されています。


↓次にメタデータを書き込まないパターンのコマンド。

gcloud pubsub subscriptions create sample-no-meta-sub \
--topic=projects/pubsub-public-data/topics/taxirides-realtime \
--bigquery-table=my-project.subscriptions.sample_no_meta

↓サブスクリプション作成後のテーブル。


文字列のままだと見づらいので、VIEWでJSON部分を各カラムに分解してみます。 (メタデータを書き込むパターンのテーブルで。)

CREATE VIEW subscriptions.sample_view AS
    SELECT
        JSON_VALUE(data, '$.ride_id') AS ride_id,
        SAFE_CAST(JSON_VALUE(data, '$.point_idx') AS INT64) AS point_idx,
        SAFE_CAST(JSON_VALUE(data, '$.latitude') AS BIGNUMERIC) AS latitude,
        SAFE_CAST(JSON_VALUE(data, '$.longitude') AS BIGNUMERIC) AS longitude,
        SAFE_CAST(JSON_VALUE(data, '$.timestamp') AS TIMESTAMP) AS timestamp,
        SAFE_CAST(JSON_VALUE(data, '$.meter_reading') AS NUMERIC) AS meter_reading,
        SAFE_CAST(JSON_VALUE(data, '$.meter_increment') AS NUMERIC) AS meter_increment,
        JSON_VALUE(data, '$.ride_status') AS ride_status,
        SAFE_CAST(JSON_VALUE(data, '$.passenger_count') AS INT64) AS passenger_count
    FROM
        subscriptions.sample_with_meta
    WHERE
        DATE(publish_time) >= CURRENT_DATE()
;


今回のブログのようにトピックスキーマを使わない場合、 実際に利用する際は、テーブルの更新頻度が高そうなので、何でJSONを分解するか検討が必要そう… max_staleness とかの使いどころなのかな…?

おわりに

簡単に設定できましたし、BigQueryへの連携が楽になる良い機能だと思いました。
(でも私の周りでストリーミング的なデータあんまり無いんだよな…)

関連情報/参考にさせていただいたページ