DigdagのBigQuery Operatorを使ってみた

2022.04.24

DigdagからGoogle BigQuery(以下、BigQuery)を操作する場合は、下記のようにいくつかの方法があると思います。

  • BigQueryAPIクライアントライブラリを用いて、Python Scripts Operator(py>)で実行する
  • bqコマンドをShell Scripts Operator(sh>)で実行する..など

本記事では、DigdagでBigQuery Operator(bq>)を使ってBigQueryを操作してみた内容を記載しています。

事前準備

Google Cloud Shell (Digdag実行環境)

Digdagを実行するには、Javaがインストールされている必要があります。

今回は簡単な挙動確認するだけなので、既にJavaがインストールされているGoogle Cloud Shell上で確認いたしました。

$ java --version
openjdk 11.0.14 2022-01-18
OpenJDK Runtime Environment (build 11.0.14+9-post-Debian-1deb11u1)
OpenJDK 64-Bit Server VM (build 11.0.14+9-post-Debian-1deb11u1, mixed mode, sharing)
$

Digdag

Google Cloud Shell上で、下記の手順を実行し、Digdagをインストールしました。

( ※Digdagの公式ドキュメントを参照 )

今回インストールしたDigdagのバージョンは0.10.4でした。

$ curl -o ~/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
$ chmod +x ~/bin/digdag
$ echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc

Bigquery

DigdagのWorkflowからBigqueryにアクセスするためには、Secrets情報としてgcp.credentialを設定する必要があ理ます。

Digdagで使用するサービスアカウントを作成して、必要な権限を付与しました。

// サービスアカウント作成
$ gcloud iam service-accounts create XXXXXX

// Biqguery、Cloud Storage(エクスポート、インポート用)の管理者権限を付与 ※余分な権限かもしれませんが、一旦管理者権限で確認しました。
$ gcloud projects add-iam-policy-binding ohama-nagamasa --member="serviceAccount:XXXXXX@ohama-nagamasa.iam.gserviceaccount.com" --role=roles/bigquery.admin
$ gcloud projects add-iam-policy-binding ohama-nagamasa --member="serviceAccount:XXXXXX@ohama-nagamasa.iam.gserviceaccount.com" --role=roles/storage.admin

// サービスアカウントのキーを作成
$ gcloud iam service-accounts keys create XXXXXX.json --iam-account=XXXXXX@ohama-nagamasa.iam.gserviceaccount.com

サービスアカウントの認証情報をDigdag上のgcp.credentialに設定しました。

$ cat XXXXXX.json | digdag secrets --local --set gcp.credential=@XXXXXX.json

// gcp.credential が設定されたことを確認
$ digdag secrets --local

BigQuery関連のOperator

下記のOperatorを確認してみました。

  • bq>: Running Google BigQuery queries
  • bq_ddl>: Managing Google BigQuery Datasets and Tables
  • bq_extract>: Exporting Data from Google BigQuery
  • bq_load>: Importing Data into Google BigQuery

また本記事では確認できてませんが、有志のコミュニティによりBigQueryのテーブル/パーティションの待機処理も実装されているようです。

試してみた

ディレクトリ構成は、下記になります。

└── bq_sample
    ├── bq_operate.dig
    ├── bq_operate_ddl_dataset.dig
    ├── bq_operate_ddl_table.dig
    ├── bq_operate_extract.dig
    ├── bq_operate_load.dig
    └── queries
        └── aggregate.sql

bq>

まずはbq> を使って、`bigquery-public-data.new_york_subway.stations` から、line毎の件数を集計し、mart1テーブルに挿入します。

  • bq_operate.dig
_export:
  bq:
    dataset: ohama-nagamasa:digdag_test

+step1:
  bq>: queries/aggregate.sql
  destination_table: mart1

+output:
  echo>: 'job id : ${bq.last_job_id}' # 実行したBigQueryジョブのid
  • queries/aggregate.sql
SELECT
  line, count(*) as cnt
FROM
  `bigquery-public-data.new_york_subway.stations`
GROUP BY
  1
ORDER BY
  2 DESC
LIMIT 10
  • 実行ログ
$ digdag run bq_operate.dig
2022-04-24 10:17:42 +0000 [INFO] (main): Starting a new session project id=1 workflow name=bq_operate session_time=2022-04-22T00:00:00+09:00
2022-04-24 10:17:43 +0000 [INFO] (0019@[0:default]+bq_operate+step1): bq>: queries/aggregate.sql
2022-04-24 10:17:43 +0000 [INFO] (0019@[0:default]+bq_operate+step1): Submitting BigQuery job: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1
2022-04-24 10:17:44 +0000 [INFO] (0019@[0:default]+bq_operate+step1): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1
2022-04-24 10:17:50 +0000 [INFO] (0019@[0:default]+bq_operate+step1): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1
2022-04-24 10:17:51 +0000 [INFO] (0019@[0:default]+bq_operate+step1): BigQuery job successfully done: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1
2022-04-24 10:17:52 +0000 [INFO] (0019@[0:default]+bq_operate+output): echo>: job id : ohama-nagamasa:US.digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1
job id : ohama-nagamasa:US.digdag_s0_p_default_w_bq_operate_t_2_a_1_eca971c3-1897-4900-a36c-5c0cace203e1
$
  • 実行結果

データも想定通りでした。

bq_ddl>

データセットやテーブルを、bq_ddlで作成・削除することができます。

  • bq_operate_ddl_dataset.dig (データセット作成)
+prepare:
  bq_ddl>:
  create_datasets:
    - id: digdag_test2
      location: asia-northeast1
  • 実行結果

  • bq_operate_ddl_table.dig (テーブル作成)

カラム定義なども指定できます。

_export:
  bq:
    dataset: ohama-nagamasa:digdag_test2

+prepare:
  bq_ddl>:
  create_tables:
    - id: user_info
      schema:
        fields:
          - {name: id, type: INTEGER}
          - {name: username, type: STRING}
  • 実行結果

bq_extract>

bq>検証時に作成したmart1テーブルのデータを、Cloud Storage上にエクスポートしたいと思います。

  • bq_operate_extract.dig
_export:
  bq:
    dataset: digdag_test

+export:
  bq_extract>: mart1
  destination: gs://digdag_test_bucket2/mart.csv
  • 実行ログ
2022-04-24 12:12:11 +0000 [INFO] (main): Using session /home/ohama_nagamasa/bq_sample/.digdag/status/20220424T000000+0000.
2022-04-24 12:12:11 +0000 [INFO] (main): Starting a new session project id=1 workflow name=bq_operate_extract session_time=2022-04-24T00:00:00+00:00
2022-04-24 12:12:12 +0000 [INFO] (0019@[0:default]+bq_operate_extract+export): bq_extract>: mart1
2022-04-24 12:12:12 +0000 [INFO] (0019@[0:default]+bq_operate_extract+export): Submitting BigQuery job: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_extract_t_2_a_1_52fc09f7-4829-4867-8304-b98b1c26d002
2022-04-24 12:12:13 +0000 [INFO] (0019@[0:default]+bq_operate_extract+export): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_extract_t_2_a_1_52fc09f7-4829-4867-8304-b98b1c26d002
2022-04-24 12:12:20 +0000 [INFO] (0019@[0:default]+bq_operate_extract+export): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_extract_t_2_a_1_52fc09f7-4829-4867-8304-b98b1c26d002
2022-04-24 12:12:20 +0000 [INFO] (0019@[0:default]+bq_operate_extract+export): BigQuery job successfully done: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_extract_t_2_a_1_52fc09f7-4829-4867-8304-b98b1c26d002

問題なくCloud Storageにエクスポートできてました。

bq_load>

下記のjsonl データをCloud Storageにアップロードして、Bigquery上のテーブルにロードするよう実行してみます。

  • prefect.jsonl
{"code":"06","prefect":"山形県"}
{"code":"34","prefect":"広島県"}
{"code":"17","prefect":"石川県"}
{"code":"17","prefect":"石川県"}
{"code":"08","prefect":"茨城県"}
{"code":"03","prefect":"岩手県"}
{"code":"24","prefect":"三重県"}
{"code":"27","prefect":"大阪府"}
{"code":"33","prefect":"岡山県"}
{"code":"33","prefect":"岡山県"}
  • bq_operate_load.dig
_export:
  bq:
    dataset: digdag_test

+load:
  bq_load>: gs://digdag_test_bucket2/prefect.jsonl
  destination_table: prefect
  source_format: NEWLINE_DELIMITED_JSON
  schema:
    fields:
      - name: "code"
        type: "INTEGER"
      - name: "prefect"
        type: "STRING"
  • 実行ログ
2022-04-24 12:35:48 +0000 [INFO] (main): Using session /home/ohama_nagamasa/bq_sample/.digdag/status/20220424T000000+0000.
2022-04-24 12:35:48 +0000 [INFO] (main): Starting a new session project id=1 workflow name=bq_operate_load session_time=2022-04-24T00:00:00+00:00
2022-04-24 12:35:50 +0000 [INFO] (0019@[0:default]+bq_operate_load+load): bq_load>: gs://digdag_test_bucket2/prefect.jsonl
2022-04-24 12:35:50 +0000 [INFO] (0019@[0:default]+bq_operate_load+load): Submitting BigQuery job: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_load_t_2_a_1_fb170fdd-e87a-4dc4-bf47-4b18980cc9e4
2022-04-24 12:35:51 +0000 [INFO] (0019@[0:default]+bq_operate_load+load): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_load_t_2_a_1_fb170fdd-e87a-4dc4-bf47-4b18980cc9e4
2022-04-24 12:35:58 +0000 [INFO] (0019@[0:default]+bq_operate_load+load): Checking BigQuery job status: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_load_t_2_a_1_fb170fdd-e87a-4dc4-bf47-4b18980cc9e4
2022-04-24 12:35:59 +0000 [INFO] (0019@[0:default]+bq_operate_load+load): BigQuery job successfully done: ohama-nagamasa:digdag_s0_p_default_w_bq_operate_load_t_2_a_1_fb170fdd-e87a-4dc4-bf47-4b18980cc9e4

データも問題なく取り込めてました。

おわりに

DigdagのBigquery Operatorを一通り試してみました。

Cloud Storageとの連携処理(インポート、エクスポート)、データマート作成などのワークフローは簡単に実装できそうでした。

今回はOperator自体の動作のみを確認したかったため、全体的にOptionは最低限のものだけ指定しましたが、必要に応じて多用することができるので、もう少し細かく確認してみたいと思います。

以上、DA(データアナリティクス)事業本部のナガマサでした!

参考