GoogleCloud WorkflowsのBigQuery Data Transferコネクタを使ってみる
はじめに
データアナリティクス事業本部のkobayashiです。
GoogleCloudのWorkflowsのBigQuery Data TransferコネクタがGAされていたので、GCSにあるCSVファイルをBigQueryにロードする処理をこのコネクタを使って実装してみましたのでその内容をまとめます。
- Workflows release notes | Google Cloud
- BigQuery Data Transfer API Connector Overview | Workflows | Google Cloud
BigQuery Data Transfer API Connector
対象のcsvファイルとBigQueryテーブル
対象となるGCS上のCSVデータとBigBigQueryのデータセットとテーブルは下記のものを使います。
CSVの中身は以下の天候状況になります。
2024-08-10,1,東京,晴,27.8,79,0,1
2024-08-10,2,東京,晴,27.8,80,0,1
2024-08-10,3,東京,晴,27.5,80,0,1
2024-08-10,4,東京,晴,27.5,82,0,1
2024-08-10,5,東京,晴,27.5,80,0.0,0
2024-08-10,6,東京,晴,27.4,76,0.0,0
2024-08-10,7,東京,晴,28.3,72,0.4,0
2024-08-10,8,東京,晴,29.4,67,0.9,0
2024-08-10,9,東京,晴,31.2,62,1.0,0
2024-08-10,10,東京,晴,31.0,59,0.5,0
2024-08-10,11,東京,晴,31.9,57,1.0,0
2024-08-10,12,東京,晴,33.2,49,1.0,0
2024-08-10,13,東京,晴,34.5,51,0.9,0
2024-08-10,14,東京,晴,34.5,54,0.8,0
2024-08-10,15,東京,晴,33.8,50,0.7,0
2024-08-10,16,東京,晴,33.0,60,0.5,0
....
データの中身は日付,月,都市名,天気,気温,湿度,日照時間,雲量
となります。
BigQueryのテーブルのスキーマ情報は以下になります。
$ bq show --schema --format=prettyjson data_set_weather.jp_weather
[
{
"mode": "NULLABLE",
"name": "date",
"type": "DATE"
},
{
"mode": "NULLABLE",
"name": "month",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "city_name",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "weather_condition",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "temperature",
"type": "NUMERIC"
},
{
"mode": "NULLABLE",
"name": "humidity",
"type": "NUMERIC"
},
{
"mode": "NULLABLE",
"name": "sunshine_duration",
"type": "NUMERIC"
},
{
"mode": "NULLABLE",
"name": "cloud_cover",
"type": "NUMERIC"
}
]
ワークフロー定義を記述
ワークフローをWorkflows構文に従ってyamlにて記述します。WorkflowsのBigQueryコネクタに関する公式ドキュメントがありますのでこのサンプルを参考に記述します。
main:
steps:
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- destination_dataset: "data_set_weather" # ターゲットとなるBigQueryのデータセット
- destination_table: "jp_weather" # ターゲットとなるBigQueryのテーブル
- run_config_display_name: "workflow-bigquery-datatransfer"
- run_config_data_source_id: "google_cloud_storage"
- location: "asia-northeast1"
- data_path_template: "gs://{Bucket名}/jp_weather_2024.csv" # ソースとなるファイルのGCS URI
- create_run_config:
call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.create
args:
parent: ${"projects/" + project_id + "/locations/" + location}
body:
displayName: ${run_config_display_name}
schedule:
scheduleOptions:
disableAutoScheduling: true
destinationDatasetId: ${destination_dataset}
dataSourceId: ${run_config_data_source_id}
params:
destination_table_name_template: ${destination_table}
file_format: "CSV"
data_path_template: ${data_path_template}
result: config
- get_time_in_30s:
assign:
- now_plus_30s: ${time.format(sys.now() + 30)}
- start_run:
call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
args:
parent: ${config.name}
body:
requestedRunTime: ${now_plus_30s}
result: runsResp
- remove_run_config:
call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.delete
args:
name: ${config.name}
- the_end:
return: ${runsResp}
処理の中身を解説します。
大枠は公式のサンプル(BigQuery Data Transfer API Connector Overview | Workflows | Google Cloud )をほぼ踏襲しています。
- initステップではBigQuerのテーブル情報とソースファイルの情報を設定しています。
- create_run_configステップではWorkflowsのBigQuery Data Transferコネクタを使ってBigQueryデータ転送ジョブを作成しています。
- get_time_in_30sステップではデータ転送ジョブを実行する時間を現在空30秒後にしたいため30秒後の時間を取得しています。。
- start_runステップでは再びWorkflowsのBigQuery Data Transfer コネクタを使って作成したBigQueryデータ転送ジョブを先のステップで取得した30秒後の開始時間で実行して実行結果を待機します。
- remove_run_configステップでは実行し終わった転送ジョブを削除しています。
ワークフローの作成と実行
それでは記述したワークフロー定義をWorkflowsに登録し、ワークフローを実行してみます。
その前にワークフローを実行するためにはサービスアカウントが必要になりますので事前に作成しておく必要があります。今回使うのはBigQueryコネクタを使ってGCSのcsvをロードするためBigQuery管理者とストレージ管理者の権限を付けておきます。
このサービスアカウントを使ってまずはワークフローを作成します。
gcloud workflows deploy jp-weather-2-bqdts --source=jp-weather-2.yml --service-account jp-weather-2@{プロジェクト名}.iam.gserviceaccount.com --location asia-northeast1
これでワークフローの作成は完成です。
Cloudコンソールから確認すると以下のようにフローが図になって表現されています。
ではこれを実行します。
$ gcloud workflows run jp-weather-2-bqdts --location asia-northeast1
Waiting for execution [fb4e1543-6b58-dcfe-5943-35013c3ced9a] to complete...done.
...
startTime: '2024-08-30T09:12:48.893615382Z'
state: SUCCEEDED
status:
currentSteps:
- routine: main
step: the_endbq
workflowRevisionId: 000006-4a7
yamlの記述やサービスアカウントの権限に問題がなければ特に問題なくワークフローの実行が終わります。
処理が終わりBigQueryで確認するとデータが登録されていることがわかります。
$ bq query "select * from data_set_weather.jp_weather limit 10"
+------------+-------+-----------+-------------------+-------------+----------+-------------------+-------------+
| date | month | city_name | weather_condition | temperature | humidity | sunshine_duration | cloud_cover |
+------------+-------+-----------+-------------------+-------------+----------+-------------------+-------------+
2024-08-10 | 1 | 東京 | 晴 | 27.8 | 79 | 0.0 | 1 |
2024-08-10 | 2 | 東京 | 晴 | 27.8 | 80 | 0.0 | 1 |
2024-08-10 | 3 | 東京 | 晴 | 27.5 | 80 | 0.0 | 1 |
2024-08-10 | 4 | 東京 | 晴 | 27.5 | 82 | 0.0 | 1 |
2024-08-10 | 5 | 東京 | 晴 | 27.5 | 80 | 0.0 | 0 |
2024-08-10 | 6 | 東京 | 晴 | 27.4 | 76 | 0.0 | 0 |
2024-08-10 | 7 | 東京 | 晴 | 28.3 | 72 | 0.4 | 0 |
2024-08-10 | 8 | 東京 | 晴 | 29.4 | 67 | 0.9 | 0 |
2024-08-10 | 9 | 東京 | 晴 | 31.2 | 62 | 1.0 | 0 |
2024-08-10 | 10 | 東京 | 晴 | 31.0 | 59 | 0.5 | 0 |
+------------+-------+-----------+-------------------+-------------+----------+-------------------+-------------+
まとめ
GoogleCloudのWorkflowsでBigQuery Data Transferコネクタを使ってBigQueryデータ転送のジョブを作成・実行してGCS上のCSVファイルのデータをBigQueryのテーブルに読み込んでみました。BigQuery Data Transferコネクタを使うことで簡単にWorkflowsでデータ読み込みを行えるので非常に便利です。
最後まで読んで頂いてありがとうございました。