この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
DigdagからGoogle BigQuery(以下、BigQuery)を操作する場合は、下記のようにいくつかの方法があると思います。
- BigQueryAPIクライアントライブラリを用いて、Python Scripts Operator(
py>
)で実行する - bqコマンドをShell Scripts Operator(
sh>
)で実行する..など
本記事では、DigdagでBigQuery Operator(bq>
)を使ってBigQueryを操作してみた内容を記載しています。
事前準備
Google Cloud Shell (Digdag実行環境)
Digdagを実行するには、Javaがインストールされている必要があります。
今回は簡単な挙動確認するだけなので、既にJavaがインストールされているGoogle Cloud Shell上で確認いたしました。
$ java --version
openjdk 11.0.14 2022-01-18
OpenJDK Runtime Environment (build 11.0.14+9-post-Debian-1deb11u1)
OpenJDK 64-Bit Server VM (build 11.0.14+9-post-Debian-1deb11u1, mixed mode, sharing)
$
Digdag
Google Cloud Shell上で、下記の手順を実行し、Digdagをインストールしました。
( ※Digdagの公式ドキュメントを参照 )
今回インストールしたDigdagのバージョンは0.10.4
でした。
$ curl -o ~/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
$ chmod +x ~/bin/digdag
$ echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc
Bigquery
DigdagのWorkflowからBigqueryにアクセスするためには、Secrets情報としてgcp.credential
を設定する必要があ理ます。
Digdagで使用するサービスアカウントを作成して、必要な権限を付与しました。
// サービスアカウント作成
$ gcloud iam service-accounts create XXXXXX
// Biqguery、Cloud Storage(エクスポート、インポート用)の管理者権限を付与 ※余分な権限かもしれませんが、一旦管理者権限で確認しました。
$ gcloud projects add-iam-policy-binding ohama-nagamasa --member="serviceAccount:XXXXXX@ohama-nagamasa.iam.gserviceaccount.com" --role=roles/bigquery.admin
$ gcloud projects add-iam-policy-binding ohama-nagamasa --member="serviceAccount:XXXXXX@ohama-nagamasa.iam.gserviceaccount.com" --role=roles/storage.admin
// サービスアカウントのキーを作成
$ gcloud iam service-accounts keys create XXXXXX.json --iam-account=XXXXXX@ohama-nagamasa.iam.gserviceaccount.com
サービスアカウントの認証情報をDigdag上のgcp.credential
に設定しました。
$ cat XXXXXX.json | digdag secrets --local --set gcp.credential=@XXXXXX.json
// gcp.credential が設定されたことを確認
$ digdag secrets --local
BigQuery関連のOperator
下記のOperatorを確認してみました。
- bq>: Running Google BigQuery queries
- bq_ddl>: Managing Google BigQuery Datasets and Tables
- bq_extract>: Exporting Data from Google BigQuery
- bq_load>: Importing Data into Google BigQuery
また本記事では確認できてませんが、有志のコミュニティによりBigQueryのテーブル/パーティションの待機処理も実装されているようです。
試してみた
ディレクトリ構成は、下記になります。
└── bq_sample
├── bq_operate.dig
├── bq_operate_ddl_dataset.dig
├── bq_operate_ddl_table.dig
├── bq_operate_extract.dig
├── bq_operate_load.dig
└── queries
└── aggregate.sql
bq>
まずはbq>
を使って、`bigquery-public-data.new_york_subway.stations`
から、line毎の件数を集計し、mart1テーブルに挿入します。
- bq_operate.dig
_export:
bq:
dataset: ohama-nagamasa:digdag_test
+step1:
bq>: queries/aggregate.sql
destination_table: mart1
+output:
echo>: 'job id : ${bq.last_job_id}' # 実行したBigQueryジョブのid
- queries/aggregate.sql
SELECT
line, count(*) as cnt
FROM
`bigquery-public-data.new_york_subway.stations`
GROUP BY
1
ORDER BY
2 DESC
LIMIT 10
- 実行ログ
$ digdag run bq_operate.dig
2022-04-24 10:17:42 +0000 [INFO] (main): Starting a new session project id=1 workflow name=bq_operate session_time=2022-04-22T00:00:00+09:00
2022-04-24 10:17:43 +0000 [INFO] (0019@[0:default]+bq_operate+step1): bq>: queries/aggregate.sql
2022-04-24 10:17:43 +0000 [INFO] (0019@[0:default]+bq_operate+step1): Submitting BigQuery job: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1
2022-04-24 10:17:44 +0000 [INFO] (0019@[0:default]+bq_operate+step1): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1
2022-04-24 10:17:50 +0000 [INFO] (0019@[0:default]+bq_operate+step1): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1
2022-04-24 10:17:51 +0000 [INFO] (0019@[0:default]+bq_operate+step1): BigQuery job successfully done: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1
2022-04-24 10:17:52 +0000 [INFO] (0019@[0:default]+bq_operate+output): echo>: job id : ohama-nagamasa:US.digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1
job id : ohama-nagamasa:US.digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1
$
- 実行結果
データも想定通りでした。
bq_ddl>
データセットやテーブルを、bq_ddl
で作成・削除することができます。
- bq_operate_ddl_dataset.dig (データセット作成)
+prepare:
bq_ddl>:
create_datasets:
- id: digdag_test2
location: asia-northeast1
- 実行結果
- bq_operate_ddl_table.dig (テーブル作成)
カラム定義なども指定できます。
_export:
bq:
dataset: ohama-nagamasa:digdag_test2
+prepare:
bq_ddl>:
create_tables:
- id: user_info
schema:
fields:
- {name: id, type: INTEGER}
- {name: username, type: STRING}
- 実行結果
bq_extract>
bq>
検証時に作成したmart1テーブルのデータを、Cloud Storage上にエクスポートしたいと思います。
- bq_operate_extract.dig
_export:
bq:
dataset: digdag_test
+export:
bq_extract>: mart1
destination: gs://digdag_test_bucket2/mart.csv
- 実行ログ
2022-04-24 12:12:11 +0000 [INFO] (main): Using session /home/ohama_nagamasa/bq_sample/.digdag/status/20220424T000000+0000.
2022-04-24 12:12:11 +0000 [INFO] (main): Starting a new session project id=1 workflow name=bq_operate_extract session_time=2022-04-24T00:00:00+00:00
2022-04-24 12:12:12 +0000 [INFO] (0019@[0:default]+bq_operate_extract+export): bq_extract>: mart1
2022-04-24 12:12:12 +0000 [INFO] (0019@[0:default]+bq_operate_extract+export): Submitting BigQuery job: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_extract_t_2_a_1_52fc09f7-4829-4867-8304-b98b1c26d002
2022-04-24 12:12:13 +0000 [INFO] (0019@[0:default]+bq_operate_extract+export): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_extract_t_2_a_1_52fc09f7-4829-4867-8304-b98b1c26d002
2022-04-24 12:12:20 +0000 [INFO] (0019@[0:default]+bq_operate_extract+export): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_extract_t_2_a_1_52fc09f7-4829-4867-8304-b98b1c26d002
2022-04-24 12:12:20 +0000 [INFO] (0019@[0:default]+bq_operate_extract+export): BigQuery job successfully done: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_extract_t_2_a_1_52fc09f7-4829-4867-8304-b98b1c26d002
問題なくCloud Storageにエクスポートできてました。
bq_load>
下記のjsonl データをCloud Storageにアップロードして、Bigquery上のテーブルにロードするよう実行してみます。
- prefect.jsonl
{"code":"06","prefect":"山形県"}
{"code":"34","prefect":"広島県"}
{"code":"17","prefect":"石川県"}
{"code":"17","prefect":"石川県"}
{"code":"08","prefect":"茨城県"}
{"code":"03","prefect":"岩手県"}
{"code":"24","prefect":"三重県"}
{"code":"27","prefect":"大阪府"}
{"code":"33","prefect":"岡山県"}
{"code":"33","prefect":"岡山県"}
- bq_operate_load.dig
_export:
bq:
dataset: digdag_test
+load:
bq_load>: gs://digdag_test_bucket2/prefect.jsonl
destination_table: prefect
source_format: NEWLINE_DELIMITED_JSON
schema:
fields:
- name: "code"
type: "INTEGER"
- name: "prefect"
type: "STRING"
- 実行ログ
2022-04-24 12:35:48 +0000 [INFO] (main): Using session /home/ohama_nagamasa/bq_sample/.digdag/status/20220424T000000+0000.
2022-04-24 12:35:48 +0000 [INFO] (main): Starting a new session project id=1 workflow name=bq_operate_load session_time=2022-04-24T00:00:00+00:00
2022-04-24 12:35:50 +0000 [INFO] (0019@[0:default]+bq_operate_load+load): bq_load>: gs://digdag_test_bucket2/prefect.jsonl
2022-04-24 12:35:50 +0000 [INFO] (0019@[0:default]+bq_operate_load+load): Submitting BigQuery job: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_load_t_2_a_1_fb170fdd-e87a-4dc4-bf47-4b18980cc9e4
2022-04-24 12:35:51 +0000 [INFO] (0019@[0:default]+bq_operate_load+load): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_load_t_2_a_1_fb170fdd-e87a-4dc4-bf47-4b18980cc9e4
2022-04-24 12:35:58 +0000 [INFO] (0019@[0:default]+bq_operate_load+load): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_load_t_2_a_1_fb170fdd-e87a-4dc4-bf47-4b18980cc9e4
2022-04-24 12:35:59 +0000 [INFO] (0019@[0:default]+bq_operate_load+load): BigQuery job successfully done: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_load_t_2_a_1_fb170fdd-e87a-4dc4-bf47-4b18980cc9e4
データも問題なく取り込めてました。
おわりに
DigdagのBigquery Operatorを一通り試してみました。
Cloud Storageとの連携処理(インポート、エクスポート)、データマート作成などのワークフローは簡単に実装できそうでした。
今回はOperator自体の動作のみを確認したかったため、全体的にOptionは最低限のものだけ指定しましたが、必要に応じて多用することができるので、もう少し細かく確認してみたいと思います。
以上、DA(データアナリティクス)事業本部のナガマサでした!