Google Workflows で、BigQuery に SQL を実行するサーバレスなバッチ処理を実装してみた。
こんにちは、みかみです。
やりたいこと
- BigQuery に SQL を実行するバッチ処理を、サーバレスで実装したい
- Google Workflows を使ってバッチ実装してみたい
- BigQuery に投げる SQL は、GCS に格納しておきたい
- Workflows をスケジュール実行するにはどうすればいいか知りたい
概要
図にしてみるとこんな感じ。
シンプルで良さそうです。
サーバレスなバッチを実装するには Cloud Functions を使うこともできますが、 最大9分でタイムアウトしてしまうので長い SQL を同期実行できなかったり、 複数の SQL の依存関係を制御するのが大変だったり。
依存関係制御には、Cloud Composer の利用も考えられると思いますが、 サーバー立てるとコストがかかるので、サーバレスで実装したい。
そんな時には Workflows の出番では?!
BigQuery テーブルと SQL ファイルを準備
BigQuery に、以下のテーブルを作成しました。
また、上記のテーブルから、CTAS で新しいテーブルを作成する SQL と、作成した新しいテーブルのデータ件数をカウントする SQL を、GCS に格納しました。
CREATE TABLE dataset_1.temp_avocado AS ( SELECT * FROM dataset_1.avocado )
SELECT count(*) from dataset_1.temp_avocado
Workflows 実行用のサービスアカウントを作成
以下のコマンドで、Workflows を実行するサービスアカウントを作成しました。
export SERVICE_ACCOUNT=workflows-sa gcloud iam service-accounts create ${SERVICE_ACCOUNT}
作成したサービスアカウントには、GCS から SQL を取得し、BigQuery に SQL 実行するためのロールを付与しておきます。
gcloud projects add-iam-policy-binding ${GOOGLE_CLOUD_PROJECT} \ --member "serviceAccount:${SERVICE_ACCOUNT}@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com" \ --role "roles/bigquery.dataOwner" gcloud projects add-iam-policy-binding ${GOOGLE_CLOUD_PROJECT} \ --member "serviceAccount:${SERVICE_ACCOUNT}@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com" \ --role "roles/bigquery.user" gcloud projects add-iam-policy-binding ${GOOGLE_CLOUD_PROJECT} \ --member "serviceAccount:${SERVICE_ACCOUNT}@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com" \ --role "rroles/storage.admin"
管理コンソールからも、必要なロールを持つサービスアカウントができたことがが確認できました。
workflow を作成
Google Workflows では、YAML または JSON で、実行するワークフローを定義します。 GCS や BigQuery などの、各種 Google Cloud Service のコネクタもあり、サンプルコードも用意されています。
- Syntax overview | Workflows ドキュメント
- Workflows のチュートリアル | Workflows ドキュメント
- BigQuery 用コネクタ| Workflows ドキュメント
- BigQuery API Connector Overview | Workflows ドキュメント
- Cloud Storage JSON API Connector Overview | Workflows ドキュメント
- GoogleCloudPlatform/workflows-samples | GitHub
以下のワークフロー定義を、bq_batch.yaml というファイル名で保存しました。
main: steps: - job_1: call: exec_query args: blob_path: "sql%2F00%2Ftest.sql" result: query_result - job_2: call: exec_query args: blob_path: "sql%2F02%2Ftest.sql" result: query_result - return_value: return: ${query_result} exec_query: params: [blob_path] steps: - init: assign: - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} - bucket_name: "test-mikami" - get_sql: call: http.get args: url: ${"https://storage.googleapis.com/download/storage/v1/b/"+bucket_name+"/o/"+blob_path} headers: Content-Type: "text/plain" auth: type: OAuth2 query: alt: media result: obj - exec_query: call: googleapis.bigquery.v2.jobs.query args: projectId: ${project_id} body: query: ${text.replace_all(text.decode(obj["body"]), "\n", " ")} useLegacySql: false result: query_result
main の job_1
step と job_2
step から、それぞれ別の SQL のファイルパスを渡し、GCS の SQL ファイルデータを取得して BigQuery にクエリ実行するサブワークフローを実行します。
以下のコマンドでデプロイします。
gcloud workflows deploy bq-batch --source=bq_batch.yaml --service-account=${SERVICE_ACCOUNT}@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com
無事デプロイできたようです。
mikami_yuki@cloudshell:~/workflow (cm-da-mikami-yuki-258308)$ gcloud workflows deploy bq-batch --source=bq_batch.yaml --service-account=${SERVICE_ACCOUNT}@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com WARNING: The default location(us-central1) was used since the location flag was not specified. Waiting for operation [operation-1630676452995-5cb1772b96fc4-00a85905-6532bf8f] to complete...done. createTime: '2021-09-03T13:40:53.143115338Z' name: projects/cm-da-mikami-yuki-258308/locations/us-central1/workflows/bq-batch revisionCreateTime: '2021-09-03T13:40:53.176795660Z' revisionId: 000001-e73 serviceAccount: projects/cm-da-mikami-yuki-258308/serviceAccounts/workflows-sa@cm-da-mikami-yuki-258308.iam.gserviceaccount.com sourceContents: |+ (省略) state: ACTIVE updateTime: '2021-09-03T13:40:53.853564241Z'
以下のコマンドで実行してみます。
gcloud workflows run bq-batch
mikami_yuki@cloudshell:~/workflow (cm-da-mikami-yuki-258308)$ gcloud workflows run bq-batch WARNING: The default location(us-central1) was used since the location flag was not specified. Waiting for execution [2dacaa51-84ba-4abd-81a1-58af1143f0a8] to complete...done. argument: 'null' endTime: '2021-09-03T13:42:31.240818573Z' name: projects/797147019523/locations/us-central1/workflows/bq-batch/executions/2dacaa51-84ba-4abd-81a1-58af1143f0a8 result: 'null' startTime: '2021-09-03T13:42:25.736969016Z' state: SUCCEEDED workflowRevisionId: 000001-e73
state: SUCCEEDED
とのことなので、正常に実行できたようです。
管理コンソールから BigQuery を確認してみます。 期待通り、テーブルが作成され、件数カウントの SQL も実行されたことが確認できました。
Workflows をスケジュール実行する Cloud Scheduler のジョブを作成
以下の公式ドキュメントを参考に、Workflows を実行する Cloud Scheduler のジョブを作成してみます。
まずは以下のコマンドで、Cloud Scheduler 用のサービスアカウントを作成しました。
export SERVICE_ACCOUNT_CS=cloudscheduler-sa gcloud iam service-accounts create ${SERVICE_ACCOUNT_CS} gcloud projects add-iam-policy-binding ${GOOGLE_CLOUD_PROJECT} \ --member "serviceAccount:${SERVICE_ACCOUNT_CS}@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com" \ --role "roles/workflows.invoker"
続いて以下のコマンドで、Workflows を実行する Cloud Scheduler のジョブを作成します。
gcloud scheduler jobs create http exec_workflow \ --schedule="00 23 * * *" \ --uri="https://workflowexecutions.googleapis.com/v1/projects/${GOOGLE_CLOUD_PROJECT}/locations/us-central1/workflows/bq-batch/executions" \ --message-body="{\"argument\": \"{\\\"foo\\\": \\\"bar\\\"}\"}" \ --time-zone="Asia/Tokyo" \ --oauth-service-account-email="${SERVICE_ACCOUNT_CS}@${GOOGLE_CLOUD_PROJECT}.iam.gserviceaccount.com"
スケジュール実行用のジョブが作成できました。
スケジュール実行に備えて、先ほど作成済みの temp_avocado
テーブルは削除しておきます。
指定した時間になったので、ワークフローが実行されたか確認してみます。
期待通り、正常にスケジュール実行されたことが確認できました。
Workflows の料金
Google Workflows では、実行した step 数ごとに課金が発生します。 実行 step は、BigQuery や GCS などの Google Cloud サービスを呼び出す内部 step と、一般公開されている REST API などを呼び出す外部 step に分けられます。 1ヶ月あたり、5,000 の内部 step、2,000 の外部 step は無料で利用できる上に、無料枠を使い切ったとしても、内部 step は 1,000 ステップあたり $0.01、外部 step は 1,000 ステップあたり $0.025 で、お安く利用できます!
仮に、デイリーバッチで BigQuery に 1000 回 SQL を実行する(=内部 Step:1,000/日) のワークフローを 1ヶ月実行したとしても、約 $0.25(≒¥28)/月しかかかりません!(BigQueryのクエリ料金は別途必要になりますが) 24/365でバッチサーバーを立ち上げている場合と比べると、遥かにコスト削減できちゃいますv
また、管理コンソールの Workflows のダッシュボードからも、利用状況や料金が確認できるので安心です。
まとめ(所感)
AWS CloudFormation や Terraform を少し触ったくらいでしか YAML を書いたことがなかったので、定義ファイルの作成に初めは少し手こずりましたが、慣れれば処理ステップごとにシンプルに定義できる定義ファイルです。
また、定義ファイルの処理内容は管理コンソールからフローチャートでも確認できるので、依存関係のある処理が意図通りに実行できているか確認する場合などに便利です!
「Workflows」というサービス名的にググラビリティがいまいちなのと(自分の検索スキルの問題もありますが。。)、2021/09現在、標準では並列実行のシンタックスがないことが少し残念ではありますが、こんなにお安く YAML(または JSON)で簡単にサーバレスなバッチを実装できる Workflows、非常にありがたいサービスだと思いましたv
参考
- Workflows | Workflows ドキュメント
- 概要 | Workflows ドキュメント
- Syntax overview | Workflows ドキュメント
- BigQuery API Connector Overview | Workflows ドキュメント
- Cloud Storage JSON API Connector Overview | Workflows ドキュメント
- Workflows のチュートリアル | Workflows ドキュメント
- Cloud Scheduler の使用によるワークフローのスケジュール設定 | Workflows ドキュメント
- gcloud workflows deploy | Workflows ドキュメント
- Using Cloud Workflows to load Cloud Storage files into BigQuery
- Automate the execution of BigQuery queries with Cloud Workflows
- Parallel executions with Google Workflows
- GCPのCloud Workflowsを試す | Hatena Blog
- GCPのWorkflowsでBigQueryコネクタを使ってみる | Hatena Blog
- GoogleCloudPlatform/workflows-samples | GitHub
- guillaumeblaquiere/parallel-workflow | GitHub