DigdagのBigQuery Operatorを使ってみた
DigdagからGoogle BigQuery(以下、BigQuery)を操作する場合は、下記のようにいくつかの方法があると思います。
- BigQueryAPIクライアントライブラリを用いて、Python Scripts Operator(
py>
)で実行する - bqコマンドをShell Scripts Operator(
sh>
)で実行する..など
本記事では、DigdagでBigQuery Operator(bq>
)を使ってBigQueryを操作してみた内容を記載しています。
事前準備
Google Cloud Shell (Digdag実行環境)
Digdagを実行するには、Javaがインストールされている必要があります。
今回は簡単な挙動確認するだけなので、既にJavaがインストールされているGoogle Cloud Shell上で確認いたしました。
$ java --version openjdk 11.0.14 2022-01-18 OpenJDK Runtime Environment (build 11.0.14+9-post-Debian-1deb11u1) OpenJDK 64-Bit Server VM (build 11.0.14+9-post-Debian-1deb11u1, mixed mode, sharing) $
Digdag
Google Cloud Shell上で、下記の手順を実行し、Digdagをインストールしました。
( ※Digdagの公式ドキュメントを参照 )
今回インストールしたDigdagのバージョンは0.10.4
でした。
$ curl -o ~/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest" $ chmod +x ~/bin/digdag $ echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc $ source ~/.bashrc
Bigquery
DigdagのWorkflowからBigqueryにアクセスするためには、Secrets情報としてgcp.credential
を設定する必要があ理ます。
Digdagで使用するサービスアカウントを作成して、必要な権限を付与しました。
// サービスアカウント作成 $ gcloud iam service-accounts create XXXXXX // Biqguery、Cloud Storage(エクスポート、インポート用)の管理者権限を付与 ※余分な権限かもしれませんが、一旦管理者権限で確認しました。 $ gcloud projects add-iam-policy-binding ohama-nagamasa --member="serviceAccount:XXXXXX@ohama-nagamasa.iam.gserviceaccount.com" --role=roles/bigquery.admin $ gcloud projects add-iam-policy-binding ohama-nagamasa --member="serviceAccount:XXXXXX@ohama-nagamasa.iam.gserviceaccount.com" --role=roles/storage.admin // サービスアカウントのキーを作成 $ gcloud iam service-accounts keys create XXXXXX.json --iam-account=XXXXXX@ohama-nagamasa.iam.gserviceaccount.com
サービスアカウントの認証情報をDigdag上のgcp.credential
に設定しました。
$ cat XXXXXX.json | digdag secrets --local --set gcp.credential=@XXXXXX.json // gcp.credential が設定されたことを確認 $ digdag secrets --local
BigQuery関連のOperator
下記のOperatorを確認してみました。
- bq>: Running Google BigQuery queries
- bq_ddl>: Managing Google BigQuery Datasets and Tables
- bq_extract>: Exporting Data from Google BigQuery
- bq_load>: Importing Data into Google BigQuery
また本記事では確認できてませんが、有志のコミュニティによりBigQueryのテーブル/パーティションの待機処理も実装されているようです。
試してみた
ディレクトリ構成は、下記になります。
└── bq_sample ├── bq_operate.dig ├── bq_operate_ddl_dataset.dig ├── bq_operate_ddl_table.dig ├── bq_operate_extract.dig ├── bq_operate_load.dig └── queries └── aggregate.sql
bq>
まずはbq>
を使って、`bigquery-public-data.new_york_subway.stations`
から、line毎の件数を集計し、mart1テーブルに挿入します。
- bq_operate.dig
_export: bq: dataset: ohama-nagamasa:digdag_test +step1: bq>: queries/aggregate.sql destination_table: mart1 +output: echo>: 'job id : ${bq.last_job_id}' # 実行したBigQueryジョブのid
- queries/aggregate.sql
SELECT line, count(*) as cnt FROM `bigquery-public-data.new_york_subway.stations` GROUP BY 1 ORDER BY 2 DESC LIMIT 10
- 実行ログ
$ digdag run bq_operate.dig 2022-04-24 10:17:42 +0000 [INFO] (main): Starting a new session project id=1 workflow name=bq_operate session_time=2022-04-22T00:00:00+09:00 2022-04-24 10:17:43 +0000 [INFO] (0019@[0:default]+bq_operate+step1): bq>: queries/aggregate.sql 2022-04-24 10:17:43 +0000 [INFO] (0019@[0:default]+bq_operate+step1): Submitting BigQuery job: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1 2022-04-24 10:17:44 +0000 [INFO] (0019@[0:default]+bq_operate+step1): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1 2022-04-24 10:17:50 +0000 [INFO] (0019@[0:default]+bq_operate+step1): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1 2022-04-24 10:17:51 +0000 [INFO] (0019@[0:default]+bq_operate+step1): BigQuery job successfully done: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1 2022-04-24 10:17:52 +0000 [INFO] (0019@[0:default]+bq_operate+output): echo>: job id : ohama-nagamasa:US.digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1 job id : ohama-nagamasa:US.digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1 $
- 実行結果
データも想定通りでした。
bq_ddl>
データセットやテーブルを、bq_ddl
で作成・削除することができます。
- bq_operate_ddl_dataset.dig (データセット作成)
+prepare: bq_ddl>: create_datasets: - id: digdag_test2 location: asia-northeast1
- 実行結果
- bq_operate_ddl_table.dig (テーブル作成)
カラム定義なども指定できます。
_export: bq: dataset: ohama-nagamasa:digdag_test2 +prepare: bq_ddl>: create_tables: - id: user_info schema: fields: - {name: id, type: INTEGER} - {name: username, type: STRING}
- 実行結果
bq_extract>
bq>
検証時に作成したmart1テーブルのデータを、Cloud Storage上にエクスポートしたいと思います。
- bq_operate_extract.dig
_export: bq: dataset: digdag_test +export: bq_extract>: mart1 destination: gs://digdag_test_bucket2/mart.csv
- 実行ログ
2022-04-24 12:12:11 +0000 [INFO] (main): Using session /home/ohama_nagamasa/bq_sample/.digdag/status/20220424T000000+0000. 2022-04-24 12:12:11 +0000 [INFO] (main): Starting a new session project id=1 workflow name=bq_operate_extract session_time=2022-04-24T00:00:00+00:00 2022-04-24 12:12:12 +0000 [INFO] (0019@[0:default]+bq_operate_extract+export): bq_extract>: mart1 2022-04-24 12:12:12 +0000 [INFO] (0019@[0:default]+bq_operate_extract+export): Submitting BigQuery job: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_extract_t_2_a_1_52fc09f7-4829-4867-8304-b98b1c26d002 2022-04-24 12:12:13 +0000 [INFO] (0019@[0:default]+bq_operate_extract+export): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_extract_t_2_a_1_52fc09f7-4829-4867-8304-b98b1c26d002 2022-04-24 12:12:20 +0000 [INFO] (0019@[0:default]+bq_operate_extract+export): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_extract_t_2_a_1_52fc09f7-4829-4867-8304-b98b1c26d002 2022-04-24 12:12:20 +0000 [INFO] (0019@[0:default]+bq_operate_extract+export): BigQuery job successfully done: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_extract_t_2_a_1_52fc09f7-4829-4867-8304-b98b1c26d002
問題なくCloud Storageにエクスポートできてました。
bq_load>
下記のjsonl データをCloud Storageにアップロードして、Bigquery上のテーブルにロードするよう実行してみます。
- prefect.jsonl
{"code":"06","prefect":"山形県"} {"code":"34","prefect":"広島県"} {"code":"17","prefect":"石川県"} {"code":"17","prefect":"石川県"} {"code":"08","prefect":"茨城県"} {"code":"03","prefect":"岩手県"} {"code":"24","prefect":"三重県"} {"code":"27","prefect":"大阪府"} {"code":"33","prefect":"岡山県"} {"code":"33","prefect":"岡山県"}
- bq_operate_load.dig
_export: bq: dataset: digdag_test +load: bq_load>: gs://digdag_test_bucket2/prefect.jsonl destination_table: prefect source_format: NEWLINE_DELIMITED_JSON schema: fields: - name: "code" type: "INTEGER" - name: "prefect" type: "STRING"
- 実行ログ
2022-04-24 12:35:48 +0000 [INFO] (main): Using session /home/ohama_nagamasa/bq_sample/.digdag/status/20220424T000000+0000. 2022-04-24 12:35:48 +0000 [INFO] (main): Starting a new session project id=1 workflow name=bq_operate_load session_time=2022-04-24T00:00:00+00:00 2022-04-24 12:35:50 +0000 [INFO] (0019@[0:default]+bq_operate_load+load): bq_load>: gs://digdag_test_bucket2/prefect.jsonl 2022-04-24 12:35:50 +0000 [INFO] (0019@[0:default]+bq_operate_load+load): Submitting BigQuery job: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_load_t_2_a_1_fb170fdd-e87a-4dc4-bf47-4b18980cc9e4 2022-04-24 12:35:51 +0000 [INFO] (0019@[0:default]+bq_operate_load+load): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_load_t_2_a_1_fb170fdd-e87a-4dc4-bf47-4b18980cc9e4 2022-04-24 12:35:58 +0000 [INFO] (0019@[0:default]+bq_operate_load+load): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_load_t_2_a_1_fb170fdd-e87a-4dc4-bf47-4b18980cc9e4 2022-04-24 12:35:59 +0000 [INFO] (0019@[0:default]+bq_operate_load+load): BigQuery job successfully done: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_load_t_2_a_1_fb170fdd-e87a-4dc4-bf47-4b18980cc9e4
データも問題なく取り込めてました。
おわりに
DigdagのBigquery Operatorを一通り試してみました。
Cloud Storageとの連携処理(インポート、エクスポート)、データマート作成などのワークフローは簡単に実装できそうでした。
今回はOperator自体の動作のみを確認したかったため、全体的にOptionは最低限のものだけ指定しましたが、必要に応じて多用することができるので、もう少し細かく確認してみたいと思います。
以上、DA(データアナリティクス)事業本部のナガマサでした!