Cloud WorkflowsでBigQuery Transfer Serviceを実行する

2022.06.08

はじめに

データアナリティクス事業本部のkobayashiです。

前回GoogleCloudのWorkflowsでGCSにあるCSVファイルをBigQueryにロードする処理を行ってみました。今回は前回同様にBigQueryにロードする処理を試してみたいと思いますが、ソースはAmazon S3にあるCSVファイルとしBigQuery Data Transfer Serviceをオンデマンドで登録してWorkflowsで実行してみたいと思います。

BigQuery Data Transfer ServiceとWorkflows

BigQuery Data Transfer Serviceは様々なデータソースからBigQueryへデータを登録するサービスです。詳しくは公式ドキュメント か弊社ブログエントリに詳しく書かれているのでそちらを是非ご一読ください。

BigQuery Data Transfer Serviceで作成した転送ジョブはWorkflowsのgoogleapis.bigquerydatatransferメソッドで呼び出すことができます。今回はこれを使ってWorkflowsから転送ジョブを実行してみます。

Amazon S3からBigQueryへのロードを実装

対象となるS3上のCSVデータは前回と同じものを使います。またBigBigQueryのデータセットとテーブルも前回同様下記のものを使います。

CSVの中身は以下の天候状況になります。

2021-11-13,11,名古屋,晴,10.8,0,9.7,3
2021-11-13,11,仙台,晴,11.3,0,8.1,1.8
2021-11-13,11,大阪,晴,12.1,0,8.8,5
2021-11-13,11,札幌,晴,7.8,0,1.7,7.5
2021-11-13,11,長野,晴,7.3,0,9.3,
2021-11-13,11,京都,晴,10.5,0,7.6,
2021-11-13,11,東京,晴,13.1,0,9.4,0.5
2021-11-13,11,横浜,晴,13.6,0,10.1,
2021-03-03,3,京都,晴,5.9,0,6.6,
2021-03-03,3,那覇,晴,18.4,0,8.5,4.3
2021-03-03,3,大阪,晴,6.8,0,8.2,5
2021-03-03,3,名古屋,晴,7.3,0,10.7,1
2021-03-03,3,横浜,晴,7.7,0,11,
2021-03-03,3,仙台,晴,3.1,0,9.7,4.3
....

データの中身は日付,月,都市名,天気,気温,湿度,日照時間,雲量となります。

BigQueryのテーブルは以下になります。

$ bq show --schema --format=prettyjson data_set_test.jp_weather_2
[
  {
    "mode": "NULLABLE",
    "name": "date",
    "type": "DATE"
  },
  {
    "mode": "NULLABLE",
    "name": "month",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "city",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "w_type",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "temperature",
    "type": "NUMERIC"
  },
  {
    "mode": "NULLABLE",
    "name": "precipitation",
    "type": "NUMERIC"
  },
  {
    "mode": "NULLABLE",
    "name": "sunlight",
    "type": "NUMERIC"
  },
  {
    "mode": "NULLABLE",
    "name": "cloudage",
    "type": "NUMERIC"
  }
]

BigQuery Data Transfer Serviceの転送ジョブの作成

転送ジョブの作成は弊社ブログエントリの過去記事を参考にして作成します。記事内容通りに行えばそれほど難しい作業はなく作成できます。

今回はS3には以下のように日付のパス配下に日々のCSVファイルが配置されいている形式を想定しています。

bigquery-transfer-service_test
├── 20220509
│   └── jp_weather_2.csv
├── 20220510
│   └── jp_weather_2.csv
└── 20220511
    └── jp_weather_2.csv

このようなパス構造であってもBigQuery Data Transfer Serviceではランタイムパラメータ(転送でのランタイム パラメータの使用  |  BigQuery Data Transfer Service  |  Google Cloud )を使用できるので日付部分のパスは{run_time+9h|\"%Y%m%d%H\"}で記述できますので以下のようなコマンドを実行することで転送ジョブを作成できます。

bq mk --transfer_config --data_source=amazon_s3 \
--no_auto_scheduling \
--display_name=jp-weather-2-dts \
--target_dataset=data_set_test \
--params='{
"destination_table_name_template":"jp_weather_2",
"data_path":"s3://{バケット名}/bigquery-transfer-service_test/{run_time+9h|\"%Y%m%d%H\"}/jp_weather_2.csv",
"access_key_id":"{アクセスキー}",
"secret_access_key":"{シークレットキー}",
"file_format":"CSV",
}'

Google Cloudのコンソールで作成した転送ジョブを確認します。

リソース名は後段のWorkflowsで使用するのでメモしておきます。

ワークフロー定義を記述

ワークフローをWorkflows構文に従ってyamlにて記述します。今回はBigQuery Data Transfer ServiceをWorkflowsから呼び出すので以下のメソッドを使います。

このメソッドのドキュメントに従いWorkflowsの定義ファイルを作成すると以下のようになります。

jp-weather-2-dts.yml

main:
  steps:
    - start_run:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
        args:
          parent: "projects/233151396088/locations/asia-northeast1/transferConfigs/6296f7fa-0000-2242-b344-94eb2c09d0a4"
          body:
            requestedRunTime: ${time.format(sys.now())}
        result: runsResp
    - return_value:
        return: ${runsResp}

記述は非常に簡単でステップの中でcallにてgoogleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRunsメソッドを指定し、 argsparentにBigQuery Data Transfer Serviceのリソース名を指定するだけです。

この時bodyrequestedRunTimeに転送ジョブを実行する時間を指定します。この値は転送ジョブを実行する際のランタイムパラメータとして使われます。したがって上記の設定ですとWorkflowsを実行した時間がランタイムパラメータとして使われますが、

requestedRunTime: "2022-05-17T16:28:06.126485+09:00"

のように指定すると特定の日をランタイムパラメータとして使うことができます。

ワークフローの作成と実行

それでは記述したワークフロー定義をWorkflowsに登録し、ワークフローを実行してみます。 その前にワークフローを実行するためにはサービスアカウントが必要になりますので事前に作成しておく必要があります。今回使うのはBigQuery Data Transfer ServiceなのでBigQuery管理者の権限を付けておきます。

このサービスアカウントを使ってまずはワークフローを作成します。

gcloud workflows deploy jp-weather-2-dts --source=jp-weather-2-dts.yml --service-account jp-weather-2-dts@{プロジェクト名}.iam.gserviceaccount.com --location asia-northeast1

これでワークフローの作成は完成です。 Cloudコンソールから確認すると以下のようにフローが図になって表現されています。

ではこれを実行します。

$ gcloud workflows run jp-weather-2-dts --location asia-northeast1
Waiting for execution [5798eeec-5473-450e-95bb-0b84c42baf59]] to complete...done.  
...
startTime: '2022-05-20T10:44:12.9876543Z'
state: SUCCEEDED
workflowRevisionId: 000001-202

yamlの記述やサービスアカウントの権限に問題がなければ特に問題なくワークフローの実行が終わります。

処理が終わりBigQueryで確認するとデータが登録されていることがわかります。

$ bq query "select * from data_set_test.jp_weather_2 limit 5" 
+------------+-------+------+--------+-------------+---------------+----------+----------+
|    date    | month | city | w_type | temperature | precipitation | sunlight | cloudage |
+------------+-------+------+--------+-------------+---------------+----------+----------+
| 2021-10-13 |    10 | 東京 | 雨     |        12.1 |             0 |      8.1 |     NULL |
| 2021-10-03 |    10 | 東京 | 雨     |         13.2 |             0 |      3.7 |     NULL |
| 2021-10-14 |    10 | 東京 | 雨     |        12.5 |             0 |      5.3 |     NULL |
| 2021-10-04 |     10 | 那覇 | 雨     |          11 |             0 |      3.6 |     NULL |
| 2021-12-15 |    12 | 京都 | 雨     |        17.4 |             0 |      5.3 |     NULL |
+------------+-------+------+--------+-------------+---------------+----------+----------+

まとめ

BigQuery Data Transfer ServiceでS3にあるCSVファイルをBigQueryにロードする転送ジョブを作成し、そのジョブをWorkflowsから実行してみました。BigQuery Data Transfer Service単体でもスケジュール実行は行なえますが、例えば特定のレコードを削除してからロードを行うといった他の処理を含めたワークフローを作ろうと思った場合は有効に使えるのではないでしょうか。

最後まで読んで頂いてありがとうございました。