GoogleCloud WorkflowsのBigQuery Data Transferコネクタを使ってみる

GoogleCloud WorkflowsのBigQuery Data Transferコネクタを使ってみる

Clock Icon2024.08.30

はじめに

データアナリティクス事業本部のkobayashiです。

GoogleCloudのWorkflowsのBigQuery Data TransferコネクタがGAされていたので、GCSにあるCSVファイルをBigQueryにロードする処理をこのコネクタを使って実装してみましたのでその内容をまとめます。

BigQuery Data Transfer API Connector

対象のcsvファイルとBigQueryテーブル

対象となるGCS上のCSVデータとBigBigQueryのデータセットとテーブルは下記のものを使います。

CSVの中身は以下の天候状況になります。

jp_weather_2024.csv
2024-08-10,1,東京,,27.8,79,0,1
2024-08-10,2,東京,,27.8,80,0,1
2024-08-10,3,東京,,27.5,80,0,1
2024-08-10,4,東京,,27.5,82,0,1
2024-08-10,5,東京,,27.5,80,0.0,0
2024-08-10,6,東京,,27.4,76,0.0,0
2024-08-10,7,東京,,28.3,72,0.4,0
2024-08-10,8,東京,,29.4,67,0.9,0
2024-08-10,9,東京,,31.2,62,1.0,0
2024-08-10,10,東京,,31.0,59,0.5,0
2024-08-10,11,東京,,31.9,57,1.0,0
2024-08-10,12,東京,,33.2,49,1.0,0
2024-08-10,13,東京,,34.5,51,0.9,0
2024-08-10,14,東京,,34.5,54,0.8,0
2024-08-10,15,東京,,33.8,50,0.7,0
2024-08-10,16,東京,,33.0,60,0.5,0
....

データの中身は日付,月,都市名,天気,気温,湿度,日照時間,雲量となります。

BigQueryのテーブルのスキーマ情報は以下になります。

$ bq show --schema --format=prettyjson data_set_weather.jp_weather
[
  {
    "mode": "NULLABLE",
    "name": "date",
    "type": "DATE"
  },
  {
    "mode": "NULLABLE",
    "name": "month",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "city_name",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "weather_condition",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "temperature",
    "type": "NUMERIC"
  },
  {
    "mode": "NULLABLE",
    "name": "humidity",
    "type": "NUMERIC"
  },
  {
    "mode": "NULLABLE",
    "name": "sunshine_duration",
    "type": "NUMERIC"
  },
  {
    "mode": "NULLABLE",
    "name": "cloud_cover",
    "type": "NUMERIC"
  }
]

ワークフロー定義を記述

ワークフローをWorkflows構文に従ってyamlにて記述します。WorkflowsのBigQueryコネクタに関する公式ドキュメントがありますのでこのサンプルを参考に記述します。

jp-weather-bqdts.yml
main:
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - destination_dataset: "data_set_weather" # ターゲットとなるBigQueryのデータセット
          - destination_table: "jp_weather" # ターゲットとなるBigQueryのテーブル
          - run_config_display_name: "workflow-bigquery-datatransfer"
          - run_config_data_source_id: "google_cloud_storage"
          - location: "asia-northeast1"
          - data_path_template: "gs://{Bucket名}/jp_weather_2024.csv" # ソースとなるファイルのGCS URI
    - create_run_config:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.create
        args:
          parent: ${"projects/" + project_id + "/locations/" + location}
          body:
            displayName: ${run_config_display_name}
            schedule:
            scheduleOptions:
              disableAutoScheduling: true
            destinationDatasetId: ${destination_dataset}
            dataSourceId: ${run_config_data_source_id}
            params:
              destination_table_name_template: ${destination_table}
              file_format: "CSV"
              data_path_template: ${data_path_template}
        result: config
    - get_time_in_30s:
        assign:
          - now_plus_30s: ${time.format(sys.now() + 30)}
    - start_run:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
        args:
          parent: ${config.name}
          body:
            requestedRunTime: ${now_plus_30s}
        result: runsResp
    - remove_run_config:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.delete
        args:
          name: ${config.name}
    - the_end:
        return: ${runsResp}

処理の中身を解説します。

大枠は公式のサンプル(BigQuery Data Transfer API Connector Overview  |  Workflows  |  Google Cloud )をほぼ踏襲しています。

  • initステップではBigQuerのテーブル情報とソースファイルの情報を設定しています。
  • create_run_configステップではWorkflowsのBigQuery Data Transferコネクタを使ってBigQueryデータ転送ジョブを作成しています。
  • get_time_in_30sステップではデータ転送ジョブを実行する時間を現在空30秒後にしたいため30秒後の時間を取得しています。。
  • start_runステップでは再びWorkflowsのBigQuery Data Transfer コネクタを使って作成したBigQueryデータ転送ジョブを先のステップで取得した30秒後の開始時間で実行して実行結果を待機します。
  • remove_run_configステップでは実行し終わった転送ジョブを削除しています。

ワークフローの作成と実行

それでは記述したワークフロー定義をWorkflowsに登録し、ワークフローを実行してみます。
その前にワークフローを実行するためにはサービスアカウントが必要になりますので事前に作成しておく必要があります。今回使うのはBigQueryコネクタを使ってGCSのcsvをロードするためBigQuery管理者とストレージ管理者の権限を付けておきます。
iam

このサービスアカウントを使ってまずはワークフローを作成します。

gcloud workflows deploy jp-weather-2-bqdts --source=jp-weather-2.yml --service-account jp-weather-2@{プロジェクト名}.iam.gserviceaccount.com --location asia-northeast1

これでワークフローの作成は完成です。
Cloudコンソールから確認すると以下のようにフローが図になって表現されています。
スクリーンショット_2024-08-30_9_49_51

ではこれを実行します。

$ gcloud workflows run jp-weather-2-bqdts --location asia-northeast1
Waiting for execution [fb4e1543-6b58-dcfe-5943-35013c3ced9a] to complete...done.  
...
startTime: '2024-08-30T09:12:48.893615382Z'
state: SUCCEEDED
status:
  currentSteps:
  - routine: main
    step: the_endbq
workflowRevisionId: 000006-4a7

yamlの記述やサービスアカウントの権限に問題がなければ特に問題なくワークフローの実行が終わります。

処理が終わりBigQueryで確認するとデータが登録されていることがわかります。

$ bq query "select * from data_set_weather.jp_weather limit 10" 
+------------+-------+-----------+-------------------+-------------+----------+-------------------+-------------+
|    date    | month | city_name | weather_condition | temperature | humidity | sunshine_duration | cloud_cover |
+------------+-------+-----------+-------------------+-------------+----------+-------------------+-------------+
2024-08-10   |     1 | 東京       ||        27.8 |       79 |               0.0 |           1 |
2024-08-10   |     2 | 東京       ||        27.8 |       80 |               0.0 |           1 |
2024-08-10   |     3 | 東京       ||        27.5 |       80 |               0.0 |           1 |
2024-08-10   |     4 | 東京       ||        27.5 |       82 |               0.0 |           1 |
2024-08-10   |     5 | 東京       ||        27.5 |       80 |               0.0 |           0 |
2024-08-10   |     6 | 東京       ||        27.4 |       76 |               0.0 |           0 |
2024-08-10   |     7 | 東京       ||        28.3 |       72 |               0.4 |           0 |
2024-08-10   |     8 | 東京       ||        29.4 |       67 |               0.9 |           0 |
2024-08-10   |     9 | 東京       ||        31.2 |       62 |               1.0 |           0 |
2024-08-10   |    10 | 東京       ||        31.0 |       59 |               0.5 |           0 |
+------------+-------+-----------+-------------------+-------------+----------+-------------------+-------------+

まとめ

GoogleCloudのWorkflowsでBigQuery Data Transferコネクタを使ってBigQueryデータ転送のジョブを作成・実行してGCS上のCSVファイルのデータをBigQueryのテーブルに読み込んでみました。BigQuery Data Transferコネクタを使うことで簡単にWorkflowsでデータ読み込みを行えるので非常に便利です。

最後まで読んで頂いてありがとうございました。

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.