BigQuery のスケジューリングクエリって何がうれしいの? ~ 毎時 GCS データを集計するユースケースを想定して動かしてみた~

2020.12.13

こんにちは、データアナリティクス事業本部のみかみです。

本エントリは、クラスメソッド BigQuery Advent Calendar 2020 の 13 日目のエントリです。 25日のアドベントカレンダー終了まで、弊社クラスメソッド データアナリティクス事業本部のメンバーで、Google BigQuery に関する記事を紡いでいこうと思います。

BigQuery のスケジューリングクエリって何がうれしいの?

BigQuery では管理コンソールの UI から、SQL のスケジュール実行を登録することができます。

つまり BigQuery では、マートデータ作成などの SQL 実行で完結する処理(+α)ならば、バッチジョブを実装する必要なく、スケジューリングクエリ機能で実現することができるのです! SQL 限定なので、もちろん全てのバッチ処理をスケジューリングクエリで置き換えることはできませんが、ジョブのコーディングやサードパーティ製ツールを導入する必要なく SQL がスケジュール実行できるって、もしかしてすごく便利なのでは?!

やりたいこと

  • BigQuery のスケジューリングクエリを試してみたい
  • 毎時 GCS データを集計して結果を BigQuery のテーブルに格納したい

どこか他のシステムから、GCS に毎時 CSV のデータファイルが連携されるものとします。 連携された CSV データを集計して、集計結果を BigQuery のテーブルに格納するケースを想定しました。

通常であれば、データ集計と BigQuery テーブルへの結果格納処理を実行するジョブをコーディングして、Cloud Functions などのサーバレス環境やバッチサーバから実行する方法、またはサードパーティ製のジョブ作成&実行ツールの導入を検討するところです。

BigQuery のスケジューリングクエリを使えば、BigQuery 管理画面の GUI 操作だけで SQL をスケジュール実行する設定ができるので、バッチコードを書く必要なく、GCS データの集計結果を毎時 BigQuery テーブルに格納することができてしまいます!

GCS の連携データを準備

CSV データを 1 時間に 1 回 GCS に連携する、どこか別のシステムの処理を、Cloud Scheduler、Cloud Pub/Sub、Cloud Functions を使って準備します。

5 行~20 行のクリスマスメニューの売上げデータをランダムで作成して CSV ファイルとして GCS に Put する、以下の Cloud Functions の Python コードと、必要なパッケージ情報を記載した requirements.txt を準備しました。

import pandas as pd
import random
from datetime import datetime
from pytz import timezone
from google.cloud import storage

def put_csv_sample(data, context):
    # create data
    FOODS = [
        {'name':'クリスマスケーキ', 'price':3000},
        {'name':'ブッシュドノエル', 'price':2000},
        {'name':'フライドチキン', 'price':1000},
        {'name':'ローストチキン', 'price':2000},
        {'name':'ローストビーフ', 'price':2000},
        {'name':'ピザ', 'price':1000},
        {'name':'ポットパイ', 'price':500},
        {'name':'寿司', 'price':5000},
        {'name':'オードブル', 'price':3000},
        {'name':'シャンパン', 'price':2000},
        {'name':'スパークリングワイン', 'price':2000},
        {'name':'シャンメリー', 'price':500},
    ]

    row_cnt = random.randint(5, 20)
    items = [random.choice(FOODS) for _ in range(row_cnt)]
    df = pd.DataFrame({
        'name':[d.get('name') for d in items], 
        'price':[d.get('price') for d in items], 
        'count':[random.randint(1, 5) for _ in range(row_cnt)], 
        'put_datetime':[datetime.now(timezone('Asia/Tokyo')).strftime('%Y-%m-%d %H:%M:%S') for _ in range(row_cnt)]
    })
    df = df[['name', "price", 'count', 'put_datetime']]
    print(df)

    # put data to GCS
    now = datetime.now(timezone('Asia/Tokyo'))
    bucket_name = 'test-mikami-schedule'
    blob_name = 'advent_calendar/foods_christmas_{}.csv'.format(now.strftime('%Y%m%d%H%M%S'))

    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(blob_name)
    blob.upload_from_string(data=df.to_csv(index=False), content_type='text/csv')
    print("Blob {} created.".format(blob_name))
google-cloud-storage==1.28.1
pandas==1.0.0

Cloud Shell から gcloud コマンドを使って、以下を作成します。

  • put_csv_sample という名前の Cloud Functions の関数と、トリガーとなる 同名の Cloud Pub/Sub トピック
  • sub_put_csv_sample という名前の put_csv_sample トピックのサブスクリプション
  • 毎時 0 分に put_csv_sample トピックにメッセージを送信する同名の Cloud Scheduler のジョブ

先ほどの Python コードを main.py という名前で保存し、requirements.txt と一緒に Cloud Shell にアップロードしました。

gcp_da_user@cloudshell:~/test_schedule (cm-da-mikami-yuki-258308)$ ls -l
total 8
-rw-r--r-- 1 gcp_da_user gcp_da_user 1690 Dec  8 09:03 main.py
-rw-r--r-- 1 gcp_da_user gcp_da_user   45 Dec  8 09:03 requirements.txt

以下の 3 つの gcloud コマンドを実行します。

gcloud functions deploy put_csv_sample --runtime python37 --trigger-topic put_csv_sample
gcloud pubsub subscriptions create sub_put_csv_sample --topic put_csv_sample
gcloud scheduler jobs create pubsub put_csv_sample --schedule "00 */1 * * *" --time-zone "Asia/Tokyo" --topic put_csv_sample --message-body "test"
gcp_da_user@cloudshell:~/test_schedule (cm-da-mikami-yuki-258308)$ gcloud functions deploy put_csv_sample --runtime python37 --trigger-topic put_csv_sample
Deploying function (may take a while - up to 2 minutes)...⠹
For Cloud Build Stackdriver Logs, visit: https://console.cloud.google.com/logs/viewer?project=cm-da-mikami-yuki-258308&advancedFilter=resource.type%3Dbuild%0Aresource.labels.build_id
%3De37ed193-3450-4956-856b-40c8d151aeb1%0AlogName%3Dprojects%2Fcm-da-mikami-yuki-258308%2Flogs%2Fcloudbuild
Deploying function (may take a while - up to 2 minutes)...done.
availableMemoryMb: 256
buildId: e37ed193-3450-4956-856b-40c8d151aeb1
entryPoint: put_csv_sample
eventTrigger:
  eventType: google.pubsub.topic.publish
  failurePolicy: {}
  resource: projects/cm-da-mikami-yuki-258308/topics/put_csv_sample
  service: pubsub.googleapis.com
ingressSettings: ALLOW_ALL
labels:
  deployment-tool: cli-gcloud
name: projects/cm-da-mikami-yuki-258308/locations/us-central1/functions/put_csv_sample
runtime: python37
serviceAccountEmail: cm-da-mikami-yuki-258308@appspot.gserviceaccount.com
sourceUploadUrl: https://storage.googleapis.com/gcf-upload-us-central1-2365985c-f7e0-4882-9f01-971d82702e1f/25471d36-d53c-4fe1-bca6-e617efec64be.zip?GoogleAccessId=service-7971470195
23@gcf-admin-robot.iam.gserviceaccount.com&Expires=1607421201&Signature=o%2F0GwhiKJksDIKwtE6zzS7vDMg92066PnPOeU8zwbZkxPPkIQXvgoq38ZrpnBDyvWD9U%2FQvJ9AthWJCVLJN2%2Fgta3VL6KB%2B2ZGWQh0
SLbuAiivJKRL%2BUMOsPWb7b5TWTSEKDcDWB529GTHqAagyZ%2F7LJbI17U04Rk6nmPqvvHGd2QZEBdgqejyh9S%2Bc5l5BajZxA2FPI1WKi53VUcB47XojAAG6BV15Lb5LemnnpteHqcnAh1SERIQy7Fc6XteCVrrwjJO%2FTKOBDXmW2MS00
LS7pyfxkXmEtGbNvJSFuhpm8D%2F8gc0UwO6Wl1CbJOIMIaAXDZTT7dVESrv%2By9nJQ%2Bg%3D%3D
status: ACTIVE
timeout: 60s
updateTime: '2020-12-08T09:25:01.221Z'
versionId: '1'
gcp_da_user@cloudshell:~/test_schedule (cm-da-mikami-yuki-258308)$ gcloud pubsub subscriptions create sub_put_csv_sample --topic put_csv_sample
Created subscription [projects/cm-da-mikami-yuki-258308/subscriptions/sub_put_csv_sample].
gcp_da_user@cloudshell:~/test_schedule (cm-da-mikami-yuki-258308)$ gcloud scheduler jobs create pubsub put_csv_sample --schedule "00 */1 * * *" --time-zone "Asia/Tokyo" --topic put_c
sv_sample --message-body "test"
name: projects/cm-da-mikami-yuki-258308/locations/asia-northeast1/jobs/put_csv_sample
pubsubTarget:
  data: dGVzdA==
  topicName: projects/cm-da-mikami-yuki-258308/topics/put_csv_sample
retryConfig:
  maxBackoffDuration: 3600s
  maxDoublings: 16
  maxRetryDuration: 0s
  minBackoffDuration: 5s
schedule: 00 */1 * * *
state: ENABLED
timeZone: Asia/Tokyo
userUpdateTime: '2020-12-08T09:27:01Z'

管理コンソールからも、Cloud Functions、Cloud Pub/Sub、Cloud Scheduler それぞれ作成されたことが確認できました。

Cloud Scheduler を手動実行して、動作確認してみます。

結果に「成功」と表示されたのを確認して、Pub/Sub、Cloud Functions も実行されたかどうか見てみます。

無事実行されたようです。

GCS にも CSV ファイルが出力されたことが確認できました。

データの中身はこんな感じです。期待通りのサンプルデータファイルが無事出力されました。

name,price,count,put_datetime
ブッシュドノエル,2000,5,2020-12-08 19:17:08
ローストビーフ,2000,2,2020-12-08 19:17:08
ローストビーフ,2000,4,2020-12-08 19:17:08
シャンメリー,500,2,2020-12-08 19:17:08
スパークリングワイン,2000,2,2020-12-08 19:17:08
ブッシュドノエル,2000,2,2020-12-08 19:17:08
寿司,5000,3,2020-12-08 19:17:08
シャンメリー,500,2,2020-12-08 19:17:08
クリスマスケーキ,3000,1,2020-12-08 19:17:08
ブッシュドノエル,2000,4,2020-12-08 19:17:08
オードブル,3000,5,2020-12-08 19:17:08
スパークリングワイン,2000,4,2020-12-08 19:17:08
ブッシュドノエル,2000,3,2020-12-08 19:17:08
ローストチキン,2000,3,2020-12-08 19:17:08
ローストビーフ,2000,2,2020-12-08 19:17:08
シャンパン,2000,4,2020-12-08 19:17:08
スパークリングワイン,2000,4,2020-12-08 19:17:08
シャンメリー,500,5,2020-12-08 19:17:08
スパークリングワイン,2000,1,2020-12-08 19:17:08
寿司,5000,5,2020-12-08 19:17:08

これで、クリスマスメニューの売上げデータを毎時 GCS に CSV ファイルで連携する別システムの処理の準備ができました。

BigQuery で SQL をスケジュール登録

さて、ここからが本題です。 GCS に連携された CSV ファイルのデータを集計して結果を BigQuery のテーブルに格納する処理を SQL で記載し、BigQuery 管理コンソールからスケジュール実行登録します。

登録するのは以下の 3 つの SQL です。

初めの SQL では、GCS ファイルデータを外部データソースとする、外部テーブルを作成します。 IF NOT EXISTS でテーブルが存在しない場合のみ作成するので、実際にテーブルが作成されるのは初回の 1 回のみです。

CREATE EXTERNAL TABLE IF NOT EXISTS dataset_advent_calendar.foods_christmas (
  	name STRING, 
  	price INT64, 
  	count INT64,
    put_datetime DATETIME
)
OPTIONS (
uris=["gs://test-mikami-schedule/advent_calendar/foods_christmas_*.csv"], 
     skip_leading_rows=1,
 	 format=CSV
);

BigQuery では今年 2020 年 10 月から、EXTERNAL TABLE も SQL で CREATE / DROP できるようになったので、GCS データを BigQuery から参照することが SQL の世界だけで完結できます。

GCS ファイルを外部データソースとする外部テーブルを作成してしまえば、GCS のデータも通常の BigQuery テーブル同様 SQL でアクセスできるので、GCS ファイルデータを SQL で集計できるようになります。

次の SQL では、集計結果を格納するテーブルを作成します。 こちらも初回のテーブルが存在しない場合に限って、集計結果格納用のテーブルを作成するための SQL です。

CREATE TABLE IF NOT EXISTS dataset_advent_calendar.mart_sales (
  	name STRING, 
  	sales INT64, 
  	sales_cnt INT64,
    create_timestamp DATETIME
)
PARTITION BY DATETIME_TRUNC (
    create_timestamp, DAY
);

最後の SQL で、SQL の実行と同じ時間帯に Put された CSV データを対象に、商品名ごとに合計の金額と個数を集計し、集計日時を付与して BigQuery の mart_sales テーブルに格納します。

INSERT INTO
    dataset_advent_calendar.mart_sales
SELECT
    name,
    SUM(count*price) as sales,
    SUM(count) AS sales_cnt,
    CURRENT_DATETIME('Asia/Tokyo') AS create_timestamp
FROM
    dataset_advent_calendar.foods_christmas
WHERE
    FORMAT_DATETIME('%Y%m%d%H', put_datetime) = FORMAT_DATETIME('%Y%m%d%H', CURRENT_DATETIME('Asia/Tokyo'))
GROUP BY
    name
;

この 3 つの SQL を BigQuery 管理コンソールのクエリエディタに続けて入力し、「クエリのスケジュール」プルダウンから「スケジュールされたクエリを新規作成」をクリックします。

「スケジュールされたクエリの名前」に任意の名前を入力し、「繰り返しの頻度」のプルダウンで「毎時」を選択。「開始時と実行時間」で現在時の 5 分を JST で指定し、「処理を行うロケーション」に「東京(asia-northeast1)」を選択しました。

これで、毎時 5 分に、登録した SQL をスケジュール実行する設定ができました。

BigQuery 管理コンソールの「スケジュールされたクエリ」をクリックすると、登録済みのスケジューリングクエリが確認できます。

設定した時間が過ぎたので、「スケジュールされたクエリ」一覧画面から SQL の実行状況を確認します。

「実行の詳細」から、登録した SQL が正常に実行されたことが確認できました。

BigQuery のテーブルも確認してみます。

1 つ目の SQL で GCS ファイルをデータソースとする外部テーブルが作成され、

2 つ目の SQL で集計結果格納用の BigQuery テーブルが作成されました。

集計結果データも期待通り格納されています。

この時間に GCS に出力されていた CSV ファイルは以下でした。

name,price,count,put_datetime
ピザ,1000,1,2020-12-09 00:00:13
スパークリングワイン,2000,1,2020-12-09 00:00:13
ピザ,1000,5,2020-12-09 00:00:13
スパークリングワイン,2000,3,2020-12-09 00:00:13
シャンパン,2000,2,2020-12-09 00:00:13
ローストビーフ,2000,4,2020-12-09 00:00:13

集計処理内容も問題なさそうです。

さらに、そのまま半日ほど放置してから、集計結果テーブルの中身を確認しました。

集計結果の件数を確認してみます。

集計結果レコードが増えてます。

SQL がスケジュール実行された各時間に集計結果が追加で格納され、それぞれの時間ごとの売上の合計も確認できました。

また、スケジューリングクエリの詳細画面からも、期待通り、毎時 SQL クエリがスケジュール実行されていたことが確認できました。

スケジューリングクエリの結果を通知

SQL のスケジュール実行では、実行エラーが発生した場合にメールや Cloud Pub/Sub に通知することができます。

実行エラーになる SQL をスケジュール登録して、メールが通知されるか確認してみます。 メール通知の設定は、スケジューリングクエリの作成(編集)画面で「メール通知を送信する」チェックボックスをチェックするだけです。 またその下の「Cloud Pub/Sub トピック」にトピック ID を入力すれば Pub/Sub に通知することも可能なので、Pub/Sub トリガーで起動する Cloud Functions と連携すれば 、Slack などの他の通知先に通知したり、任意のリカバリ処理を実装することもできそうです。

メール通知を設定しておくと、SQL クエリのオーナー(管理コンソールにログイン中のアカウント)のメールアドレス宛に、通知メールが送信されます。 通知先に別のメールアドレスを指定することはできませんが、Gmail の場合は転送設定が可能なので、運用時にはメーリングリスト宛てに転送設定しておくと良さそうです。

スケジュール実行がエラーになったことを確認して、通知メールが来ているかどうか確認します。

こんなメールが届いてました。

登録した SQL は BigQuery Data Transfer Service を利用してスケジュール実行されているため、通知メールの送信元も BigQuery Data Transfer Service になってます。 Display name に、スケジューリングクエリの表示名、Source として scheduled_query の記載があるので、スケジューリングクエリのエラー通知だということが分かります。 Run summary の内容からエラーの詳細が確認できますし、View run history リンクをクリックすると管理コンソール上でエラーが発生したスケジュールの詳細を確認することもできます。

まとめ(所感)

あまりフォーカスされていない(ように感じる)BigQuery の「スケジュールされたクエリ」の機能ですが、定期的な SQL 実行を画面 UI 操作だけで簡単に登録できて、エラー通知機能もついてます。 今回は GCS データを集計して BigQuery テーブルに格納するケースを想定してみましたが、他にも、例えば定期的に BigQuery テーブルデータを GCS にエクスポートしたり、INFORMATION_SCHEMA を参照して BigQuery のスロット使用率を確認したりする処理を、BigQuery 管理コンソールのクエリエディタと UI 操作だけで簡単に実現することができます。

今まで BigQuery 管理コンソールを使いながら「『スケジュールされたクエリ』って何?」って思ってましたが、これってうまく活用すればバッチジョブの開発工数が削減できるってことで、要件によってはすごく便利な機能なんじゃない?!と思いました!v


明日 14 日目の BigQuery Advent Calendar 2020kobayashi.mからお送り予定です。 また、今後のアドベントカレンダーの予定はこちらにも掲載しておりますので、引き続きお楽しみいただけますと幸いです。

参考