Dataflow SQL で Pub/Sub のストリームデータと BigQuery のテーブルデータを結合してみた

2020.10.23

こんにちは、みかみです。

Dataflow SQL を使うと、BigQuery 管理コンソールのクエリエディタに入力した SQL を、簡単にジョブ実行できるそうです。

BigQuery テーブルに対する SQL のジョブ実行はもちろん、データソースには Pub/Sub のストリームデータや GCS のファイルデータも指定できるそうなので、試してみました。

やりたいこと

  • Dataflow SQL をさわってみたい
  • Dataflow SQL でジョブ実行するにはどうすればよいのか知りたい
  • Dataflow SQL で Pub/Sub のストリーミングデータと BigQuery テーブルデータを結合してみたい

前提

公式ドキュメントの以下の処理を実際に動かしてみます。

動作確認には Cloud Shell を使用するため、Google Cloud SDK のインストールやサービスアカウントの設定は省略します。

また、Dataflow SQL ジョブ実行に必要な各種 API は有効化済みです。

Pub/Sub トピックとデータ送信スクリプトを準備

動作確認に使用するストリームデータを送信するスクリプトと Pub/Sub トピックを作成します。

gcloud pubsub コマンドで transactions トピックを作成しました。

gcp_da_user@cloudshell:~ (cm-da-mikami-yuki-258308)$ gcloud pubsub topics create transactions
Created topic [projects/cm-da-mikami-yuki-258308/topics/transactions].

また、ドキュメントに記載の 1 ~ 5 秒間隔で先ほど作成した Pub/Sub トピックにメッセージをパブリッシュする以下の Python コードを transactions_injector.py というファイル名で保存しました。

import datetime, json, os, random, time

# Set the `project` variable to a Google Cloud project ID.
project = 'cm-da-mikami-yuki-258308'

FIRST_NAMES = ['Monet', 'Julia', 'Angelique', 'Stephane', 'Allan', 'Ulrike', 'Vella', 'Melia',
    'Noel', 'Terrence', 'Leigh', 'Rubin', 'Tanja', 'Shirlene', 'Deidre', 'Dorthy', 'Leighann',
    'Mamie', 'Gabriella', 'Tanika', 'Kennith', 'Merilyn', 'Tonda', 'Adolfo', 'Von', 'Agnus',
    'Kieth', 'Lisette', 'Hui', 'Lilliana',]
CITIES = ['Washington', 'Springfield', 'Franklin', 'Greenville', 'Bristol', 'Fairview', 'Salem',
    'Madison', 'Georgetown', 'Arlington', 'Ashland',]
STATES = ['MO','SC','IN','CA','IA','DE','ID','AK','NE','VA','PR','IL','ND','OK','VT','DC','CO','MS',
    'CT','ME','MN','NV','HI','MT','PA','SD','WA','NJ','NC','WV','AL','AR','FL','NM','KY','GA','MA',
    'KS','VI','MI','UT','AZ','WI','RI','NY','TN','OH','TX','AS','MD','OR','MP','LA','WY','GU','NH']
PRODUCTS = ['Product 2', 'Product 2 XL', 'Product 3', 'Product 3 XL', 'Product 4', 'Product 4 XL', 'Product 5',
    'Product 5 XL',]

while True:
  first_name, last_name = random.sample(FIRST_NAMES, 2)
  data = {
    'tr_time_str': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    'first_name': first_name,
    'last_name': last_name,
    'city': random.choice(CITIES),
    'state':random.choice(STATES),
    'product': random.choice(PRODUCTS),
    'amount': float(random.randrange(50000, 70000)) / 100,
  }

  # For a more complete example on how to publish messages in Pub/Sub.
  #   https://cloud.google.com/pubsub/docs/publisher
  message = json.dumps(data)
  command = "gcloud --project={} pubsub topics publish transactions --message='{}'".format(project, message)
  print(command)
  os.system(command)
  time.sleep(random.randrange(1, 5))

BigQuery のデータセットとテーブルを作成してテストデータをロード

Pub/Sub のストリームデータと結合する BigQuery のテーブルデータを準備します。

ドキュメントに記載のある以下のサンプルデータを CSV ファイルに保存して、GCS にアップロードしました。

state_id,state_code,state_name,sales_region
1,MO,Missouri,Region_1
2,SC,South Carolina,Region_1
3,IN,Indiana,Region_1
6,DE,Delaware,Region_2
15,VT,Vermont,Region_2
(省略)
45,KY,Kentucky,Region_8
53,WI,Wisconsin,Region_8
57,OH,Ohio,Region_8
49,VI,United States Virgin Islands,Region_9
62,MP,Commonwealth of the Northern Mariana Islands,Region_9

以下の bq コマンドで、データセットを作成した後、スキーマ自動検出でテーブルを新規作成してサンプルデータをロードします。

gcp_da_user@cloudshell:~/dataflow (cm-da-mikami-yuki-258308)$ bq --location=asia-northeast1 mk -d \
>     dataflow_sql_dataset
Dataset 'cm-da-mikami-yuki-258308:dataflow_sql_dataset' successfully created.
gcp_da_user@cloudshell:~/dataflow (cm-da-mikami-yuki-258308)$ bq load \
>     --autodetect \
>     --source_format=CSV \
>     dataflow_sql_dataset.us_state_salesregions \
>     gs://test-mikami-dataflow/us_state_salesregions.csv
Waiting on bqjob_r66199b966b89e403_00000175495375ef_1 ... (1s) Current status: DONE

データセットの作成と新規テーブルにサンプルデータがロードできました。

BigQuery リソースに Pub/Sub トピックを追加してスキーマを定義

Dataflow 管理コンソール上部の「SQL からジョブを実行」をクリックし、Dataflow エンジンに変換済みの BigQuery 管理画面に遷移します。

BigQuery 管理コンソール「その他」のプルダウンから、「クエリの設定」で「クエリエンジン」を「Cloud Dataflow エンジン」に変更でも大丈夫です。

ナビゲーションパネル「リソース」の右横の「データを追加」リンクをクリックし、「Cloud Dataflow のソース」を選択します。

「Cloud Pub/Sub トピック」のラジオボタンが ON になっていることを確認して、transactions トピックにチェックして「追加」をクリックします。

ナビゲーションメニュー「リソース」に「Cloud Pub/Sub トピック」の「transactions」が追加されたことを確認して、「スキーマを編集」ボタンをクリック。 スキーマ編集画面で「テキストとして編集」チェックを ON に変更後、テキストボックスにドキュメントに記載のスキーマをペーストして「送信」します。

transactions トピックのスキーマが定義できました。

Dataflow SQL ジョブを作成して実行

ドキュメント記載の SQL を BigQuery 管理コンソールクエリエディタにペーストしてクエリを検証したら、「Cloud Dataflow ジョブを作成」ボタンをクリックします。

「リージョンエンドポイント」を europe-west4 に変更し、「Destination」の「出力タイプ」で「BigQuery」を選択。 「データセットID」と「テーブル名」を入力して「作成」します。

データ送信用のスクリプトを実行してしばらく待ちます。

gcp_da_user@cloudshell:~/dataflow (cm-da-mikami-yuki-258308)$ python3 transactions_injector.py
gcloud --project=cm-da-mikami-yuki-258308 pubsub topics publish transactions --message='{"tr_time_str": "2020-10-23 12:06:52", "first_name": "Kennith", "last_name": "Leighann", "city
": "Ashland", "state": "LA", "product": "Product 2 XL", "amount": 522.98}'
messageIds:
- '1668558289758654'
gcloud --project=cm-da-mikami-yuki-258308 pubsub topics publish transactions --message='{"tr_time_str": "2020-10-23 12:06:54", "first_name": "Hui", "last_name": "Tonda", "city": "Gre
enville", "state": "MN", "product": "Product 5", "amount": 536.55}'
messageIds:
- '1668558415047767'
gcloud --project=cm-da-mikami-yuki-258308 pubsub topics publish transactions --message='{"tr_time_str": "2020-10-23 12:06:57", "first_name": "Ulrike", "last_name": "Monet", "city": "
Salem", "state": "TX", "product": "Product 3 XL", "amount": 641.26}'
messageIds:
- '1668573944654240'

ジョブ実行状況は、Dataflow 管理画面からも確認できます。

5分ほど経ってから、BigQuery のテーブルを確認してみました。

期待通り、Dataflow SQL のジョブで実行した SQL の結果が BigQuery のテーブルに格納されていることが確認できました。

つまずいたところ

Dataflow SQL ジョブ実行時、NullPointerException のエラー終了が発生しました。。

Pub/Sub トピックのスキーマを変更したり、location を変更したり、切り分けのためにソースデータを GCS にして同様の処理を実行したり、一般公開の Pub/Sub に変更してみたりしましたが、原因分からず。。

結局、下記投稿の Answer で解決しました!(どうもありがとうございますmm

2020/10/23 現在、asia-northeast1 および us-central1 のリージョンでは、Pub/Sub ストリームデータをソースとした Dataflow SQL ジョブの正常終了は確認できませんでしたが、 同じ手順で europe-west4 で実行すると正常に実行できることが確認できました。

他リージョンでも近々対応してもらえるのではないかと思います。

まとめ(所感)

2020/10 現在、Dataflow SQL のデータソースとして利用できるのは以下の 3 つです。

  • Pub/Sub トピック
  • Cloud Storage ファイルセット
  • BigQuery テーブル

また、Dataflow SQL で実行したクエリの出力先は、BigQuery または Pub/Sub トピックのどちらかです。

Dataflow SQL を使えば、Pub/Sub のストリームデータや GCS 上のファイルデータをロードすることなく、BigQuery のテーブルデータと結合できるので便利だと思いました。

また、Dataflow SQL の実行手順も、ソースデータの指定方法などの若干の考慮は必要ですが、通常通り BigQuery 管理コンソールでクエリエディタに入力する感覚で気軽にジョブ実行できるので簡単でした。

今後もデータソースや出力先の追加など、より便利になるアップデートを心待ちにしております!

参考