BigQuery Transfer ServiceのAmazon S3転送をPythonで操作する

2022.06.30

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

データアナリティクス事業本部のkobayashiです。

前回Amazon S3にあるCSVファイルをソースとしてBigQueryにロードするBigQuery Data Transfer Serviceをオンデマンドで登録し、Workflowsを使って実行しましたがデータ転送設定が複数ある場合はCloud Consoleやbqコマンドで1件ごとに登録したり削除するのが大変だったのでPythonで良い感じに操作できないかを調べたのでその内容をまとめます。

環境

  • macOS 10.15.7
  • Python 3.9.10
  • google-cloud-bigquery-datatransfer 3.6.2

BigQuery Data Transfer Service API

BigQuery Data Transfer ServiceをPythonから操作するにはgoogle-cloud-bigquery-datatransferを使用します。このモジュールはpipで簡単にインストールを行えます。

はじめにこのモジュールをインストールします。

pip install google-cloud-bigquery-datatransfer

次に使い方ですが、公式ドキュメント にそれぞれメソッドの使い方等詳しく記載されていますのでそちらを参考に実装を進めます。

今回行う操作は登録・実行・削除になりますので以下のメソッドを中心に扱います。

  • create_transfer_config
  • list_transfer_configs
  • start_manual_transfer_runs
  • delete_transfer_config

それでは具体的なコードを記述していきます。

データ転送の登録

はじめにデータ転送を登録します。使うのはcreate_transfer_configになります。 公式ドキュメント にサンプルコードが載っていますのでこれを参考にコードを作成します。

実装内容としてはAmazon S3に配置されたCSVファイルをデータソースとしていて、データ転送はスケジュールではなく手動実行のみを想定しています。

deploy.py

from google.cloud import bigquery_datatransfer_v1

PROJECT_ID = "{プロジェクトID}"
ACCESS_KEY_ID = "{アクセスキー}"
SECRET_ACCESS_KEY = "{シークレットキー}"


def create_dts(dataset, table, data_path):
    # Create a client
    transfer_client = bigquery_datatransfer_v1.DataTransferServiceClient()
    transfer_config = bigquery_datatransfer_v1.TransferConfig(
        destination_dataset_id=dataset,
        display_name="test_{}-dts".format(table),
        data_source_id="amazon_s3",
        schedule_options={"disable_auto_scheduling": True},
        params={
            "destination_table_name_template": table,
            "data_path": data_path,
            "access_key_id": ACCESS_KEY_ID,
            "secret_access_key": SECRET_ACCESS_KEY,
            "file_format": "CSV",
        },
    )

    transfer_config = transfer_client.create_transfer_config(
        bigquery_datatransfer_v1.CreateTransferConfigRequest(
            parent=transfer_client.common_project_path(PROJECT_ID),
            transfer_config=transfer_config,
        )
    )

    print(transfer_config.name)


if __name__ == "__main__":
      create_dts(
        "data_set_test",
        "jp_weather_2",
        "s3://{バケット名}/bigquery-transfer-service_test/{run_time+9h|\"%Y%m%d%H\"}/jp_weather_2.csv",
      )

今回の設定で特徴的な箇所を説明します。主に設定に使うパラメータはTransferConfigクラスで指定します。

  • Amazon S3転送なのでdata_source_id="amazon_s3"を指定する
  • カンマ,区切りの一般的なCSVファイルなので"file_format": "CSV"を指定する
  • 手動実行のみなのでschedule_options={"disable_auto_scheduling": True}を指定する

転送方法を変更したい場合は公式ドキュメントでこれらの設定方法をご確認ください。

これを実行すると以下のようにデータ転送が作成されます。ここで出力されるデータ転送のリソース名は後続で使います。

$ python3 deploy.py
projects/xxxxxxxxxxx/locations/asia-northeast1/transferConfigs/62c98fb9-0000-2314-bddc-30fd381760e0

データ転送の実行

上記で登録したデータ転送はスケジュール実行ではなく手動実行で作成したので、次に手動実行するスクリプトを作成します。 こちらも公式ドキュメント にサンプルコードが載っていますのでこれを参考にコードを作成します。

実装内容としては作成したデータ転送を手動実行するだけです。

exec.py

import datetime
from google.cloud import bigquery_datatransfer_v1

PARENT = "projects/xxxxxxxxxxx/locations/asia-northeast1/transferConfigs/62c98fb9-0000-2314-bddc-30fd381760e0"


def run_dts():
    transfer_client = bigquery_datatransfer_v1.DataTransferServiceClient()

    # Initialize request argument(s)
    request = bigquery_datatransfer_v1.StartManualTransferRunsRequest(
        parent=PARENT,
        requested_run_time=datetime.datetime.now()
    )

    # Make the request
    response = transfer_client.start_manual_transfer_runs(request=request)

    # Handle the response
    print(response)


if __name__ == "__main__":
    run_dts()

こちらも今回の設定で特徴的な箇所を説明します。主に設定に使うパラメータはStartManualTransferRunsRequestクラスで指定します。

  • parentでデータ転送で作成時に設定されたconfig_idを含んだデータ転送名を指定する
  • データ転送を行いたい時間をrequested_run_timeに指定する

parentに指定するのはconfig_idを含んだデータ転送のリソース名になるので注意してください。このリソース名はデータ転送を作成する際に自動で割り振られます。(自分はparentでデータ転送時に作成した転送名display_nameを使ったところエラーになり少しハマりかけました)

これを実行すると以下のようにデータ転送が実行されます。

$ python3 exec.py
runs {
  name: "projects/xxxxxxxxxxx/locations/asia-northeast1/transferConfigs/62c98fb9-0000-2314-bddc-30fd381760e0"
  destination_dataset_id: "data_set_test"
  schedule_time {
    seconds: 1656299915
    nanos: 976251529
  }
  update_time {
    seconds: 1656299915
    nanos: 977673000
  }
  data_source_id: "amazon_s3"
  state: PENDING
  params {
    fields {
      key: "access_key_id"
      value {
        string_value: "{アクセスキー}"
      }
    }
    fields {
      key: "data_path"
      value {
        string_value: "s3://{バケット名}/bigquery-transfer-service_test/{run_time+9h|\"%Y%m%d%H\"}/jp_weather_2.csv"
      }
    }
    fields {
      key: "destination_table_name_template"
      value {
        string_value: "jp_weather_2"
      }
    }
    fields {
      key: "file_format"
      value {
        string_value: "CSV"
      }
    }
    fields {
      key: "secret_access_key"
      value {
        string_value: "{シークレットキー}"
      }
    }
  }
  run_time {
    seconds: 1656299915
    nanos: 184983000
  }
  user_id: -99999999999999999999
  email_preferences {
  }
}

データ転送の削除

上記で登録したデータ転送を削除するスクリプトを作成します。 こちらも公式ドキュメント にサンプルコードが載っていますのでこれを参考にコードを作成します。

実装内容としては作成したデータ転送を削除するだけです。

delete.py

from google.cloud import bigquery_datatransfer_v1

name = "projects/xxxxxxxxxxx/locations/asia-northeast1/transferConfigs/62c98fb9-0000-2314-bddc-30fd381760e0"


def delete_dts():
    transfer_client = bigquery_datatransfer_v1.DataTransferServiceClient()

    # Initialize request argument(s)
    transfer_request = bigquery_datatransfer_v1.DeleteTransferConfigRequest(
        name=name,
    )

    # Make the request
    transfer_client.delete_transfer_config(request=transfer_request)


if __name__ == "__main__":
    delete_dts()

こちらも今回の設定で特徴的な箇所を説明します。主に設定に使うパラメータはDeleteTransferConfigRequestクラスで指定します。

  • nameでデータ転送で作成時に設定されたconfig_idを含んだデータ転送名を指定する

こちらもデータ転送の実行と同様にnameにデータ転送のリソース名を指定します。display_nameではないので注意してください。

これを実行すると以下のようにデータ転送が実行されます。

$ python3 delete.py

まとめ

BigQuery Data Transfer ServiceでS3にあるCSVファイルをBigQueryにロードする転送ジョブの作成・実行・削除をPythonモジュールを使って操作してみました。今回は説明を簡単にするため一つのデータ転送を扱ってみましたが、Pythonなので繰り返し処理で複数のデータ転送ジョブを作成・実行・削除も行えます。その際にはデータ転送リストを取得するメソッドlist_transfer_configsを使うことで実現可能です。

最後まで読んで頂いてありがとうございました。

参考ドキュメント