Google Workflows で、BigQuery に SQL を実行するサーバレスなバッチ処理を実装してみた。

2021.09.25

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、みかみです。

やりたいこと

  • BigQuery に SQL を実行するバッチ処理を、サーバレスで実装したい
  • Google Workflows を使ってバッチ実装してみたい
  • BigQuery に投げる SQL は、GCS に格納しておきたい
  • Workflows をスケジュール実行するにはどうすればいいか知りたい

概要

図にしてみるとこんな感じ。

シンプルで良さそうです。

サーバレスなバッチを実装するには Cloud Functions を使うこともできますが、 最大9分でタイムアウトしてしまうので長い SQL を同期実行できなかったり、 複数の SQL の依存関係を制御するのが大変だったり。

依存関係制御には、Cloud Composer の利用も考えられると思いますが、 サーバー立てるとコストがかかるので、サーバレスで実装したい。

そんな時には Workflows の出番では?!

BigQuery テーブルと SQL ファイルを準備

BigQuery に、以下のテーブルを作成しました。

また、上記のテーブルから、CTAS で新しいテーブルを作成する SQL と、作成した新しいテーブルのデータ件数をカウントする SQL を、GCS に格納しました。

CREATE TABLE dataset_1.temp_avocado AS (
  SELECT * FROM dataset_1.avocado
)
SELECT count(*) from dataset_1.temp_avocado

Workflows 実行用のサービスアカウントを作成

以下のコマンドで、Workflows を実行するサービスアカウントを作成しました。

export SERVICE_ACCOUNT=workflows-sa
gcloud iam service-accounts create ${SERVICE_ACCOUNT}

作成したサービスアカウントには、GCS から SQL を取得し、BigQuery に SQL 実行するためのロールを付与しておきます。

gcloud projects add-iam-policy-binding ${GOOGLE_CLOUD_PROJECT} \
    --member "serviceAccount:${SERVICE_ACCOUNT}@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com" \
    --role "roles/bigquery.dataOwner"
gcloud projects add-iam-policy-binding ${GOOGLE_CLOUD_PROJECT} \
    --member "serviceAccount:${SERVICE_ACCOUNT}@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com" \
    --role "roles/bigquery.user"
gcloud projects add-iam-policy-binding ${GOOGLE_CLOUD_PROJECT} \
    --member "serviceAccount:${SERVICE_ACCOUNT}@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com" \
    --role "rroles/storage.admin"

管理コンソールからも、必要なロールを持つサービスアカウントができたことがが確認できました。

workflow を作成

Google Workflows では、YAML または JSON で、実行するワークフローを定義します。 GCS や BigQuery などの、各種 Google Cloud Service のコネクタもあり、サンプルコードも用意されています。

以下のワークフロー定義を、bq_batch.yaml というファイル名で保存しました。

main:
  steps:
  - job_1:
      call: exec_query
      args:
        blob_path: "sql%2F00%2Ftest.sql"
      result: query_result
  - job_2:
      call: exec_query
      args:
        blob_path: "sql%2F02%2Ftest.sql"
      result: query_result
  - return_value:
      return: ${query_result}

exec_query:
  params: [blob_path]
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - bucket_name: "test-mikami"
    - get_sql:
        call: http.get
        args:
          url: ${"https://storage.googleapis.com/download/storage/v1/b/"+bucket_name+"/o/"+blob_path}
          headers:
            Content-Type: "text/plain"
          auth:
            type: OAuth2
          query:
            alt: media
        result: obj
    - exec_query:
        call: googleapis.bigquery.v2.jobs.query
        args:
          projectId: ${project_id}
          body:
            query: ${text.replace_all(text.decode(obj["body"]), "\n", " ")}
            useLegacySql: false
        result: query_result

main の job_1 step と job_2 step から、それぞれ別の SQL のファイルパスを渡し、GCS の SQL ファイルデータを取得して BigQuery にクエリ実行するサブワークフローを実行します。

以下のコマンドでデプロイします。

gcloud workflows deploy bq-batch --source=bq_batch.yaml --service-account=${SERVICE_ACCOUNT}@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com

無事デプロイできたようです。

mikami_yuki@cloudshell:~/workflow (cm-da-mikami-yuki-258308)$ gcloud workflows deploy bq-batch --source=bq_batch.yaml --service-account=${SERVICE_ACCOUNT}@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com
WARNING: The default location(us-central1) was used since the location flag was not specified.
Waiting for operation [operation-1630676452995-5cb1772b96fc4-00a85905-6532bf8f] to complete...done.
createTime: '2021-09-03T13:40:53.143115338Z'
name: projects/cm-da-mikami-yuki-258308/locations/us-central1/workflows/bq-batch
revisionCreateTime: '2021-09-03T13:40:53.176795660Z'
revisionId: 000001-e73
serviceAccount: projects/cm-da-mikami-yuki-258308/serviceAccounts/workflows-sa@cm-da-mikami-yuki-258308.iam.gserviceaccount.com
sourceContents: |+
(省略)
state: ACTIVE
updateTime: '2021-09-03T13:40:53.853564241Z'

以下のコマンドで実行してみます。

gcloud workflows run bq-batch
mikami_yuki@cloudshell:~/workflow (cm-da-mikami-yuki-258308)$ gcloud workflows run bq-batch
WARNING: The default location(us-central1) was used since the location flag was not specified.
Waiting for execution [2dacaa51-84ba-4abd-81a1-58af1143f0a8] to complete...done.
argument: 'null'
endTime: '2021-09-03T13:42:31.240818573Z'
name: projects/797147019523/locations/us-central1/workflows/bq-batch/executions/2dacaa51-84ba-4abd-81a1-58af1143f0a8
result: 'null'
startTime: '2021-09-03T13:42:25.736969016Z'
state: SUCCEEDED
workflowRevisionId: 000001-e73

state: SUCCEEDED とのことなので、正常に実行できたようです。

管理コンソールから BigQuery を確認してみます。 期待通り、テーブルが作成され、件数カウントの SQL も実行されたことが確認できました。

Workflows をスケジュール実行する Cloud Scheduler のジョブを作成

以下の公式ドキュメントを参考に、Workflows を実行する Cloud Scheduler のジョブを作成してみます。

まずは以下のコマンドで、Cloud Scheduler 用のサービスアカウントを作成しました。

export SERVICE_ACCOUNT_CS=cloudscheduler-sa
gcloud iam service-accounts create ${SERVICE_ACCOUNT_CS}
gcloud projects add-iam-policy-binding ${GOOGLE_CLOUD_PROJECT} \
    --member "serviceAccount:${SERVICE_ACCOUNT_CS}@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com" \
    --role "roles/workflows.invoker"

続いて以下のコマンドで、Workflows を実行する Cloud Scheduler のジョブを作成します。

gcloud scheduler jobs create http exec_workflow \
  --schedule="00 23 * * *" \
  --uri="https://workflowexecutions.googleapis.com/v1/projects/${GOOGLE_CLOUD_PROJECT}/locations/us-central1/workflows/bq-batch/executions" \
  --message-body="{\"argument\": \"{\\\"foo\\\": \\\"bar\\\"}\"}" \
  --time-zone="Asia/Tokyo" \
  --oauth-service-account-email="${SERVICE_ACCOUNT_CS}@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com"

スケジュール実行用のジョブが作成できました。

スケジュール実行に備えて、先ほど作成済みの temp_avocado テーブルは削除しておきます。

指定した時間になったので、ワークフローが実行されたか確認してみます。

期待通り、正常にスケジュール実行されたことが確認できました。

Workflows の料金

Google Workflows では、実行した step 数ごとに課金が発生します。 実行 step は、BigQuery や GCS などの Google Cloud サービスを呼び出す内部 step と、一般公開されている REST API などを呼び出す外部 step に分けられます。 1ヶ月あたり、5,000 の内部 step、2,000 の外部 step は無料で利用できる上に、無料枠を使い切ったとしても、内部 step は 1,000 ステップあたり $0.01、外部 step は 1,000 ステップあたり $0.025 で、お安く利用できます!

仮に、デイリーバッチで BigQuery に 1000 回 SQL を実行する(=内部 Step:1,000/日) のワークフローを 1ヶ月実行したとしても、約 $0.25(≒¥28)/月しかかかりません!(BigQueryのクエリ料金は別途必要になりますが) 24/365でバッチサーバーを立ち上げている場合と比べると、遥かにコスト削減できちゃいますv

また、管理コンソールの Workflows のダッシュボードからも、利用状況や料金が確認できるので安心です。

まとめ(所感)

AWS CloudFormation や Terraform を少し触ったくらいでしか YAML を書いたことがなかったので、定義ファイルの作成に初めは少し手こずりましたが、慣れれば処理ステップごとにシンプルに定義できる定義ファイルです。

また、定義ファイルの処理内容は管理コンソールからフローチャートでも確認できるので、依存関係のある処理が意図通りに実行できているか確認する場合などに便利です!

「Workflows」というサービス名的にググラビリティがいまいちなのと(自分の検索スキルの問題もありますが。。)、2021/09現在、標準では並列実行のシンタックスがないことが少し残念ではありますが、こんなにお安く YAML(または JSON)で簡単にサーバレスなバッチを実装できる Workflows、非常にありがたいサービスだと思いましたv

参考