PythonでBigQueryにクエリをスケジューリングしてみた
データアナリティクス事業本部の鈴木です。
PythonからBigQueryにクエリをスケジューリングする機会があったので、調べたことをまとめてみました。
Pythonからの設定は、以下のPython Client for BigQuery Data Transfer APIを使って行いました。
準備
前提
今回、PythonスクリプトはCloudShellから実行しました。
CloudShellにはクライアントがインストールされていなかったので、以下のコマンドでインストールしました。
python3 -m pip install google-cloud-bigquery-datatransfer
使用したツールのバージョンは以下になります。
- python:
3.9.2
- google.cloud.bigquery:
3.0.1
- google-cloud-bigquery-datatransfer:
3.6.1
サービスアカウントの作成
スケジュールされたクエリ用にサービスアカウントを作成しました。
今回はロールは事前定義ロールの中から選ぶことにしました。bigquery.transfers.update
が必要になるので、少し広めですがBigQuery 管理者
を付与してみました。
クエリのスケジュールを設定するために必要な権限は以下のドキュメントに記載されています。
BigQueryの事前定義ロールは以下のドキュメントに記載されています。
サービス アカウントの作成と管理を参考に、gcloud
で作成しておきました。
# 日本語の箇所は適宜修正してください。 # サービスアカウントの作成 gcloud iam service-accounts create サービスアカウントのID \ --description="Sample user for bigquery schedluled query" \ --display-name="bigquery-schedluled-query-user-dev" # サービスアカウントへの「BigQuery 管理者」の付与 gcloud projects add-iam-policy-binding プロジェクトID \ --member serviceAccount:サービスアカウント名 \ --role roles/bigquery.admin
サービスアカウント名
は、サービスアカウントのID@プロジェクトID.iam.gserviceaccount.com
のフォーマットのものです。コンソールからだと、サービスアカウント画面から確認することができます。
テーブルの作成
今回は、2つのテーブルをJOINして、別のテーブルに格納するような簡単なクエリをスケジュールしてみます。データセットはasia-northeast1
にsample_dataset_scheduling_query
というデータセットを作成しておきました。
ソースとなるテーブルは、以下の2つを作成しておきました。
log
テーブルと、
type
テーブルです。
格納先は、result
テーブルを作成しておきました。
CREATE TABLE sample_dataset_scheduling_query.result ( ip STRING, port INTEGER, type STRING )
最初は空です。
最終的に、sample_dataset_scheduling_query
データセットは以下のようになります。
やってみる
スクリプトの作成
試してみたスクリプトは以下になります。クエリのスケジューリングのガイドで紹介されているPythonのスクリプトを参考にしています。
from google.cloud import bigquery_datatransfer transfer_client = bigquery_datatransfer.DataTransferServiceClient() project_id = "プロジェクトのID/locations/asia-northeast1" dataset_id = "sample_dataset_scheduling_query" service_account_name = "サービスアカウント名" # Use standard SQL syntax for the query. with open("./sample_query.sql", 'r') as sql_f: query_string = sql_f.read() parent = transfer_client.common_project_path(project_id) transfer_config = bigquery_datatransfer.TransferConfig( destination_dataset_id=dataset_id, display_name="sample_scheduled_query", data_source_id="scheduled_query", params={ "query": query_string, "destination_table_name_template": "result", "write_disposition": "WRITE_TRUNCATE", "partitioning_field": "", }, schedule="every day 3:30", ) transfer_config = transfer_client.create_transfer_config( bigquery_datatransfer.CreateTransferConfigRequest( parent=parent, transfer_config=transfer_config, service_account_name=service_account_name, ) ) print("Created scheduled query '{}'".format(transfer_config.name))
ポイントは以下です。
プロジェクトのID
はプロジェクト ダッシュボードのプロジェクト情報
カードに記載のものです。プロジェクトの識別を参考にしてください。project_id
でプロジェクトIDだけではなく/locations/asia-northeast1
をつけることで、クエリをスケジュールするリージョンを指定しました。サービスアカウント名
は準備で作成したサービスアカウント名です。destination_table_name_template
でクエリの結果の書き込み先テーブルを指定しました。write_disposition
で書き込みの方法を指定しました。今回は洗い替えとしました。 書き込み設定については、ガイドの書き込み設定に記載がありました。schedule
でスケジュール実行時間を指定しました。この時間はUTCになるので、JSTで何時になるかは読み替えが必要でした。
APIについて分からないところは、一番最初にご紹介したPython Client for BigQuery Data Transfer API
ガイドのAPI Reference
に挙げられている資料を参考にしつつ設定しました。
また、sample_query.sql
は以下を用意しました。
SELECT lt.ip as ip, lt.port as port, tt.type as type FROM sample_dataset_scheduling_query.log lt LEFT JOIN sample_dataset_scheduling_query.type tt ON lt.port = tt.port
スクリプトのアップロード
作成したスクリプトは、CloudShellを起動し、アップロード
から、ファイルをアップロードすることが可能です。
画面右上のボタンから起動して、
アップロード機能でアップロードします。便利です。
カレントディクトリで以下のようになっているようにします。
$ ls # sample_query.sql schedule_query.py
スクリプトの実行
スクリプトをアップロードできたので、実行します。
$ python3 schedule_query.py
BigQueryのスケジュールされたクエリ
画面からスケジュールされたクエリの詳細
を確認すると、期待通りに設定できていることが確認できました。
また、設定した際に1度クエリが実行されるので、結果を格納するためのresult
テーブルを確認すると、確かに結果が入っていることが確認できました。
最後に
今回は、PythonからBigQueryにクエリをスケジューリングする方法をご紹介しました。CloudShellから実行ができるため、環境の整備も簡単でとても便利でした。