BigQuery Transfer ServiceのAmazon S3転送をPythonで操作する
はじめに
データアナリティクス事業本部のkobayashiです。
前回Amazon S3にあるCSVファイルをソースとしてBigQueryにロードするBigQuery Data Transfer Serviceをオンデマンドで登録し、Workflowsを使って実行しましたがデータ転送設定が複数ある場合はCloud Consoleやbqコマンドで1件ごとに登録したり削除するのが大変だったのでPythonで良い感じに操作できないかを調べたのでその内容をまとめます。
環境
- macOS 10.15.7
- Python 3.9.10
- google-cloud-bigquery-datatransfer 3.6.2
BigQuery Data Transfer Service API
BigQuery Data Transfer ServiceをPythonから操作するにはgoogle-cloud-bigquery-datatransfer
を使用します。このモジュールはpip
で簡単にインストールを行えます。
はじめにこのモジュールをインストールします。
pip install google-cloud-bigquery-datatransfer
次に使い方ですが、公式ドキュメント にそれぞれメソッドの使い方等詳しく記載されていますのでそちらを参考に実装を進めます。
今回行う操作は登録・実行・削除になりますので以下のメソッドを中心に扱います。
- create_transfer_config
- list_transfer_configs
- start_manual_transfer_runs
- delete_transfer_config
それでは具体的なコードを記述していきます。
データ転送の登録
はじめにデータ転送を登録します。使うのはcreate_transfer_config
になります。
公式ドキュメント にサンプルコードが載っていますのでこれを参考にコードを作成します。
実装内容としてはAmazon S3に配置されたCSVファイルをデータソースとしていて、データ転送はスケジュールではなく手動実行のみを想定しています。
deploy.py
from google.cloud import bigquery_datatransfer_v1 PROJECT_ID = "{プロジェクトID}" ACCESS_KEY_ID = "{アクセスキー}" SECRET_ACCESS_KEY = "{シークレットキー}" def create_dts(dataset, table, data_path): # Create a client transfer_client = bigquery_datatransfer_v1.DataTransferServiceClient() transfer_config = bigquery_datatransfer_v1.TransferConfig( destination_dataset_id=dataset, display_name="test_{}-dts".format(table), data_source_id="amazon_s3", schedule_options={"disable_auto_scheduling": True}, params={ "destination_table_name_template": table, "data_path": data_path, "access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY, "file_format": "CSV", }, ) transfer_config = transfer_client.create_transfer_config( bigquery_datatransfer_v1.CreateTransferConfigRequest( parent=transfer_client.common_project_path(PROJECT_ID), transfer_config=transfer_config, ) ) print(transfer_config.name) if __name__ == "__main__": create_dts( "data_set_test", "jp_weather_2", "s3://{バケット名}/bigquery-transfer-service_test/{run_time+9h|\"%Y%m%d%H\"}/jp_weather_2.csv", )
今回の設定で特徴的な箇所を説明します。主に設定に使うパラメータはTransferConfig
クラスで指定します。
- Amazon S3転送なので
data_source_id="amazon_s3"
を指定する - カンマ
,
区切りの一般的なCSVファイルなので"file_format": "CSV"
を指定する - 手動実行のみなので
schedule_options={"disable_auto_scheduling": True}
を指定する
転送方法を変更したい場合は公式ドキュメントでこれらの設定方法をご確認ください。
これを実行すると以下のようにデータ転送が作成されます。ここで出力されるデータ転送のリソース名は後続で使います。
$ python3 deploy.py projects/xxxxxxxxxxx/locations/asia-northeast1/transferConfigs/62c98fb9-0000-2314-bddc-30fd381760e0
データ転送の実行
上記で登録したデータ転送はスケジュール実行ではなく手動実行で作成したので、次に手動実行するスクリプトを作成します。 こちらも公式ドキュメント にサンプルコードが載っていますのでこれを参考にコードを作成します。
実装内容としては作成したデータ転送を手動実行するだけです。
exec.py
import datetime from google.cloud import bigquery_datatransfer_v1 PARENT = "projects/xxxxxxxxxxx/locations/asia-northeast1/transferConfigs/62c98fb9-0000-2314-bddc-30fd381760e0" def run_dts(): transfer_client = bigquery_datatransfer_v1.DataTransferServiceClient() # Initialize request argument(s) request = bigquery_datatransfer_v1.StartManualTransferRunsRequest( parent=PARENT, requested_run_time=datetime.datetime.now() ) # Make the request response = transfer_client.start_manual_transfer_runs(request=request) # Handle the response print(response) if __name__ == "__main__": run_dts()
こちらも今回の設定で特徴的な箇所を説明します。主に設定に使うパラメータはStartManualTransferRunsRequest
クラスで指定します。
parent
でデータ転送で作成時に設定されたconfig_id
を含んだデータ転送名を指定する- データ転送を行いたい時間を
requested_run_time
に指定する
parent
に指定するのはconfig_id
を含んだデータ転送のリソース名になるので注意してください。このリソース名はデータ転送を作成する際に自動で割り振られます。(自分はparent
でデータ転送時に作成した転送名display_name
を使ったところエラーになり少しハマりかけました)
これを実行すると以下のようにデータ転送が実行されます。
$ python3 exec.py runs { name: "projects/xxxxxxxxxxx/locations/asia-northeast1/transferConfigs/62c98fb9-0000-2314-bddc-30fd381760e0" destination_dataset_id: "data_set_test" schedule_time { seconds: 1656299915 nanos: 976251529 } update_time { seconds: 1656299915 nanos: 977673000 } data_source_id: "amazon_s3" state: PENDING params { fields { key: "access_key_id" value { string_value: "{アクセスキー}" } } fields { key: "data_path" value { string_value: "s3://{バケット名}/bigquery-transfer-service_test/{run_time+9h|\"%Y%m%d%H\"}/jp_weather_2.csv" } } fields { key: "destination_table_name_template" value { string_value: "jp_weather_2" } } fields { key: "file_format" value { string_value: "CSV" } } fields { key: "secret_access_key" value { string_value: "{シークレットキー}" } } } run_time { seconds: 1656299915 nanos: 184983000 } user_id: -99999999999999999999 email_preferences { } }
データ転送の削除
上記で登録したデータ転送を削除するスクリプトを作成します。 こちらも公式ドキュメント にサンプルコードが載っていますのでこれを参考にコードを作成します。
実装内容としては作成したデータ転送を削除するだけです。
delete.py
from google.cloud import bigquery_datatransfer_v1 name = "projects/xxxxxxxxxxx/locations/asia-northeast1/transferConfigs/62c98fb9-0000-2314-bddc-30fd381760e0" def delete_dts(): transfer_client = bigquery_datatransfer_v1.DataTransferServiceClient() # Initialize request argument(s) transfer_request = bigquery_datatransfer_v1.DeleteTransferConfigRequest( name=name, ) # Make the request transfer_client.delete_transfer_config(request=transfer_request) if __name__ == "__main__": delete_dts()
こちらも今回の設定で特徴的な箇所を説明します。主に設定に使うパラメータはDeleteTransferConfigRequest
クラスで指定します。
name
でデータ転送で作成時に設定されたconfig_id
を含んだデータ転送名を指定する
こちらもデータ転送の実行と同様にname
にデータ転送のリソース名を指定します。display_name
ではないので注意してください。
これを実行すると以下のようにデータ転送が実行されます。
$ python3 delete.py
まとめ
BigQuery Data Transfer ServiceでS3にあるCSVファイルをBigQueryにロードする転送ジョブの作成・実行・削除をPythonモジュールを使って操作してみました。今回は説明を簡単にするため一つのデータ転送を扱ってみましたが、Pythonなので繰り返し処理で複数のデータ転送ジョブを作成・実行・削除も行えます。その際にはデータ転送リストを取得するメソッドlist_transfer_configs
を使うことで実現可能です。
最後まで読んで頂いてありがとうございました。