Cloud WorkflowsでBigQueryコネクタを使ってGCS→BQのワークフローを作成してみる

2022.05.13

はじめに

データアナリティクス事業本部のkobayashiです。

前回GoogleCloudのETLサービスのDataflowを使ってGoogle Cloud Storage(以降GCS)にあるCSVファイルをBigQueryにロードする処理をPythonで実行しました。今回はGoogleCloudのWorkflowsで前回同様にGCSにあるCSVファイルをBigQueryにロードする処理を実装してみました。

Cloud Storage Text to BigQuery

WorkflowsはオーケストレーションサービスでGoolgeCloudのサービスや任意のHTTPベースのAPIをyaml,jsonで定義した順序で実行することができます。サーバーレスのサービスですのでスケールアップ等のインフラ管理をする必要はなく非常に便利に使えます。

WorkflowsでGCSからBigQueryにデータをロードする方法はいつくか見かけましたが何れもcall.postでbigquery.googleapisを叩いてGCSからBigQueryにデータをロードする方法でした。WorkflowsにはBigQueryコネクタ(BigQuery API Connector Overview  |  Workflows  |  Google Cloud )があるのでこのコネクタを使って実装してみたいと思います。

GCSからBigQueryへのロードを実装

対象となるGCS上のCSVデータは前回と同じものを使います。またBigBigQueryのデータセットとテーブルは下記のものを使います。

CSVの中身は以下の天候状況になります。

2021-11-13,11,名古屋,晴,10.8,0,9.7,3
2021-11-13,11,仙台,晴,11.3,0,8.1,1.8
2021-11-13,11,大阪,晴,12.1,0,8.8,5
2021-11-13,11,札幌,晴,7.8,0,1.7,7.5
2021-11-13,11,長野,晴,7.3,0,9.3,
2021-11-13,11,京都,晴,10.5,0,7.6,
2021-11-13,11,東京,晴,13.1,0,9.4,0.5
2021-11-13,11,横浜,晴,13.6,0,10.1,
2021-03-03,3,京都,晴,5.9,0,6.6,
2021-03-03,3,那覇,晴,18.4,0,8.5,4.3
2021-03-03,3,大阪,晴,6.8,0,8.2,5
2021-03-03,3,名古屋,晴,7.3,0,10.7,1
2021-03-03,3,横浜,晴,7.7,0,11,
2021-03-03,3,仙台,晴,3.1,0,9.7,4.3
....

データの中身は日付,月,都市名,天気,気温,湿度,日照時間,雲量となります。

BigQueryのテーブルは以下になります。

$ bq show --schema --format=prettyjson data_set_test.jp_weather_2
[
  {
    "mode": "NULLABLE",
    "name": "date",
    "type": "DATE"
  },
  {
    "mode": "NULLABLE",
    "name": "month",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "city",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "w_type",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "temperature",
    "type": "NUMERIC"
  },
  {
    "mode": "NULLABLE",
    "name": "precipitation",
    "type": "NUMERIC"
  },
  {
    "mode": "NULLABLE",
    "name": "sunlight",
    "type": "NUMERIC"
  },
  {
    "mode": "NULLABLE",
    "name": "cloudage",
    "type": "NUMERIC"
  }
]

ワークフローの作成・実行

ワークフロー定義を記述

ワークフローをWorkflows構文に従ってyamlにて記述します。WorkflowsのBigQueryコネクタに関する公式ドキュメントがありますのでこのサンプルを参考に記述します。

jp-weather-2.yml

main:
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - file_path: "gs://example_bucket_name/jp_weather_2021.csv" # ソースとなるファイルのGCS URI
          - dataset: "data_set_test" # ターゲットとなるBigQueryのデータセット
          - table: "jp_weather_2" # ターゲットとなるBigQueryのテーブル
    - runLoadJob:
        call: googleapis.bigquery.v2.jobs.insert
        args:
          projectId: ${project_id}
          body:
            configuration:
              jobType: LOAD
              load:
                sourceUris: ${file_path}
                writeDisposition: "WRITE_TRUNCATE"
                destinationTable:
                  projectId: ${project_id}
                  datasetId: ${dataset}
                  tableId: ${table}
        result: query_result
    - the_end:
        return: ${query_result}

処理の中身を解説します。

大枠は公式のサンプル(Method: googleapis.bigquery.v2.jobs.insert  |  Workflows  |  Google Cloud )を踏襲すれば問題ありませんが肝心のGCSからBigQueryにデータをロードする部分に関してはgoogleapis.bigquery.v2.jobs.insertメソッドのbody > configurationで設定を行います。 この記述に関してはAPIドキュメントを参照しながら必須フィールドに値を設定します。

GCSからBigQueryにデータをロードするにはジョブタイプjobTypeLOADにし、load内にソースとなるGCSのURIsourceUris、ロード方法writeDisposition、ターゲットとなるBigQueryの情報destinationTableを記述しています。

ワークフローの作成と実行

それでは記述したワークフロー定義をWorkflowsに登録し、ワークフローを実行してみます。 その前にワークフローを実行するためにはサービスアカウントが必要になりますので事前に作成しておく必要があります。今回使うのはBigQueryコネクタを使ってGCSのcsvをロードするためBigQuery管理者とストレージ管理者の権限を付けておきます。

このサービスアカウントを使ってまずはワークフローを作成します。

gcloud workflows deploy jp-weather-2 --source=jp-weather-2.yml --service-account jp-weather-2@{プロジェクト名}.iam.gserviceaccount.com --location asia-northeast1

これでワークフローの作成は完成です。 Cloudコンソールから確認すると以下のようにフローが図になって表現されています。

ではこれを実行します。

$ gcloud workflows run jp-weather-2 --location asia-northeast1
Waiting for execution [e6192e60-0256-4161-b13e-6021fc286c93] to complete...done.  
...
startTime: '2022-05-12T06:34:24.145982353Z'
state: SUCCEEDED
workflowRevisionId: 000003-505

yamlの記述やサービスアカウントの権限に問題がなければ特に問題なくワークフローの実行が終わります。

処理が終わりBigQueryで確認するとデータが登録されていることがわかります。

$ bq query "select * from data_set_test.jp_weather_2 limit 5" 
+------------+-------+------+--------+-------------+---------------+----------+----------+
|    date    | month | city | w_type | temperature | precipitation | sunlight | cloudage |
+------------+-------+------+--------+-------------+---------------+----------+----------+
| 2021-11-13 |    11 | 京都 | 晴     |        10.5 |             0 |      7.6 |     NULL |
| 2021-03-03 |     3 | 京都 | 晴     |         5.9 |             0 |      6.6 |     NULL |
| 2021-11-14 |    11 | 京都 | 晴     |        12.1 |             0 |      3.6 |     NULL |
| 2021-03-04 |     3 | 京都 | 晴     |          10 |             0 |      6.1 |     NULL |
| 2021-11-15 |    11 | 京都 | 晴     |        12.8 |             0 |      7.9 |     NULL |
+------------+-------+------+--------+-------------+---------------+----------+----------+

引数をワークフロー実行時に指定する

このままの記述でも特に問題ないのですがワークフローを実行する際にソースファイルやターゲットのデータセット・テーブルを指定したほうが汎用性が高いのでyamlの記述を修正してみます。

main:
  params: [ args ] # ワークフロー実行時の引数を取得
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
#          - file_path: "gs://example_bucket_name/jp_weather_2021.csv"
#          - dataset: "data_set_test"
#          - table: "jp_weather_2"
    - runLoadJob:
        call: googleapis.bigquery.v2.jobs.insert
        args:
          projectId: ${project_id}
          body:
            configuration:
              jobType: LOAD
              load:
#                sourceUris: ${file_path}
                sourceUris: ${args.file_path}
                writeDisposition: "WRITE_TRUNCATE"
                destinationTable:
                  projectId: ${project_id}
#                  datasetId: ${dataset}
#                  tableId: ${table}
                  datasetId: ${args.dataset}
                  tableId: ${args.table}
        result: query_result
    - the_end:
        return: ${query_result}

変更点としてはparamフィールドでワークフロー実行時の引数を受け取り、sourceUris,datasetId,tableIdの書くフィールドに受け取った値を指定しています。これにより汎用的にこのワークフローを使うことができるようになりました。

このワークフローを実行するにはワークフローを修正する必要があるため、作成時に使用したコマンドを再度実行します。

gcloud workflows deploy jp-weather-2 --source=jp-weather-2.yml --service-account jp-weather-2@{プロジェクト名}.iam.gserviceaccount.com --location asia-northeast1

その上でワークフローを以下のコマンドで実行します。 コマンドには--dataオプションに文字列でjson形式で値を記述します。

gcloud workflows run jp-weather-2 --location asia-northeast1 --data='{"file_path":"gs://example_bucket_name/jp_weather_2021.csv", "dataset":"data_set_test", "table":"jp_weather_2"}'

これで先程と全く同じワークフローを実行したことになります。別のデータソース、ターゲットテーブルを指定したい場合はfile_path,dataset,talbeを変更してワークフローを実行すれば良く、汎用的にGCSからBigQueryにデータを登録できるようになりました。

まとめ

GoogleCloudのWorkflowsで前回同様にGCSにあるCSVファイルをBigQueryにロードする処理を実装してみました。WorkflowsのドキュメントだけでなくBigQueryのAPIドキュメントも読み込む必要があるため読むドキュメントが多く大変でしたが、使い方のポイントさえ理解してしまえば簡単にワークフローを組めてしまいます。Workflowsはまだ色々試してみたいことがあるのでまた引き続き記事にしたいと思います。

最後まで読んで頂いてありがとうございました。