PythonでBigQueryにクエリをスケジューリングしてみた

2022.04.25

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部の鈴木です。

PythonからBigQueryにクエリをスケジューリングする機会があったので、調べたことをまとめてみました。

Pythonからの設定は、以下のPython Client for BigQuery Data Transfer APIを使って行いました。

準備

前提

今回、PythonスクリプトはCloudShellから実行しました。

CloudShellにはクライアントがインストールされていなかったので、以下のコマンドでインストールしました。

python3 -m pip install google-cloud-bigquery-datatransfer

使用したツールのバージョンは以下になります。

  • python: 3.9.2
  • google.cloud.bigquery: 3.0.1
  • google-cloud-bigquery-datatransfer: 3.6.1

サービスアカウントの作成

スケジュールされたクエリ用にサービスアカウントを作成しました。

今回はロールは事前定義ロールの中から選ぶことにしました。bigquery.transfers.updateが必要になるので、少し広めですがBigQuery 管理者を付与してみました。

クエリのスケジュールを設定するために必要な権限は以下のドキュメントに記載されています。

BigQueryの事前定義ロールは以下のドキュメントに記載されています。

サービス アカウントの作成と管理を参考に、gcloudで作成しておきました。

# 日本語の箇所は適宜修正してください。

# サービスアカウントの作成
gcloud iam service-accounts create サービスアカウントのID \
    --description="Sample user for bigquery schedluled query" \
    --display-name="bigquery-schedluled-query-user-dev"

# サービスアカウントへの「BigQuery 管理者」の付与
gcloud projects add-iam-policy-binding プロジェクトID \
        --member serviceAccount:サービスアカウント名 \
        --role roles/bigquery.admin

サービスアカウント名は、サービスアカウントのID@プロジェクトID.iam.gserviceaccount.comのフォーマットのものです。コンソールからだと、サービスアカウント画面から確認することができます。

テーブルの作成

今回は、2つのテーブルをJOINして、別のテーブルに格納するような簡単なクエリをスケジュールしてみます。データセットはasia-northeast1sample_dataset_scheduling_queryというデータセットを作成しておきました。

ソースとなるテーブルは、以下の2つを作成しておきました。

logテーブルと、

logテーブル

typeテーブルです。

typeテーブル

格納先は、resultテーブルを作成しておきました。

ddl_result.sql

CREATE TABLE sample_dataset_scheduling_query.result
(
    ip STRING,
    port INTEGER,
    type STRING
)

最初は空です。

resultテーブル

最終的に、sample_dataset_scheduling_queryデータセットは以下のようになります。

データセット

やってみる

スクリプトの作成

試してみたスクリプトは以下になります。クエリのスケジューリングのガイドで紹介されているPythonのスクリプトを参考にしています。

schedule_query.py

from google.cloud import bigquery_datatransfer

transfer_client = bigquery_datatransfer.DataTransferServiceClient()

project_id = "プロジェクトのID/locations/asia-northeast1"
dataset_id = "sample_dataset_scheduling_query"

service_account_name = "サービスアカウント名"

# Use standard SQL syntax for the query.
with open("./sample_query.sql", 'r') as sql_f:
    query_string = sql_f.read()

parent = transfer_client.common_project_path(project_id)

transfer_config = bigquery_datatransfer.TransferConfig(
    destination_dataset_id=dataset_id,
    display_name="sample_scheduled_query",
    data_source_id="scheduled_query",
    params={
        "query": query_string,
        "destination_table_name_template": "result",
        "write_disposition": "WRITE_TRUNCATE",
        "partitioning_field": "",
    },
    schedule="every day 3:30",
)

transfer_config = transfer_client.create_transfer_config(
    bigquery_datatransfer.CreateTransferConfigRequest(
        parent=parent,
        transfer_config=transfer_config,
        service_account_name=service_account_name,
    )
)

print("Created scheduled query '{}'".format(transfer_config.name))

ポイントは以下です。

  • プロジェクトのIDはプロジェクト ダッシュボードのプロジェクト情報カードに記載のものです。プロジェクトの識別を参考にしてください。
  • project_idでプロジェクトIDだけではなく/locations/asia-northeast1をつけることで、クエリをスケジュールするリージョンを指定しました。
  • サービスアカウント名は準備で作成したサービスアカウント名です。
  • destination_table_name_templateでクエリの結果の書き込み先テーブルを指定しました。
  • write_dispositionで書き込みの方法を指定しました。今回は洗い替えとしました。 書き込み設定については、ガイドの書き込み設定に記載がありました。
  • scheduleでスケジュール実行時間を指定しました。この時間はUTCになるので、JSTで何時になるかは読み替えが必要でした。

APIについて分からないところは、一番最初にご紹介したPython Client for BigQuery Data Transfer APIガイドのAPI Referenceに挙げられている資料を参考にしつつ設定しました。

また、sample_query.sqlは以下を用意しました。

sample_query.sql

SELECT
  lt.ip as ip,
  lt.port as port,
  tt.type as type
FROM sample_dataset_scheduling_query.log lt
LEFT JOIN sample_dataset_scheduling_query.type tt
ON lt.port = tt.port

スクリプトのアップロード

作成したスクリプトは、CloudShellを起動し、アップロードから、ファイルをアップロードすることが可能です。

画面右上のボタンから起動して、

クラウドシェルの起動

アップロード機能でアップロードします。便利です。

アップロード機能

カレントディクトリで以下のようになっているようにします。

$ ls
# sample_query.sql  schedule_query.py

スクリプトの実行

スクリプトをアップロードできたので、実行します。

$ python3 schedule_query.py

BigQueryのスケジュールされたクエリ画面からスケジュールされたクエリの詳細を確認すると、期待通りに設定できていることが確認できました。

スケジュールされたクエリの詳細

また、設定した際に1度クエリが実行されるので、結果を格納するためのresultテーブルを確認すると、確かに結果が入っていることが確認できました。

クエリの実行結果

最後に

今回は、PythonからBigQueryにクエリをスケジューリングする方法をご紹介しました。CloudShellから実行ができるため、環境の整備も簡単でとても便利でした。

参考