BigQueryでオンデマンドのスケジュールされたクエリをPythonで呼び出してみる

2024.05.07

Google Cloudデータエンジニアのはんざわです。
この記事では、BigQueryでオンデマンドのスケジュールされたクエリをPythonでコールしてみたいと思います。

検証内容

BigQueryのスケジュールされたクエリは、BigQuery Data Transfer APIをコールし、実行させることが可能です。
APIをコールするPythonのコードを作成し、Cloud Functionsで実行してみたいと思います。

今回の検証では、以下のドキュメントのManually trigger a transfer or backfillの項目を参考にしています。

また、APIのリファレンスは以下のドキュメントを参考にしています。

準備

まずは、スケジュールされたクエリAPIをコールするCloud Functionsの2つを用意したいと思います。

スケジュールされたクエリを作成

最初にスケジュールされたクエリを登録します。
(めちゃくちゃ適当ですが、)登録するクエリは以下の通りです。

CREATE OR REPLACE TABLE `test.sample_table` AS
SELECT
  @run_time AS run_time,
  @run_date AS run_date

せっかくなのでスケジュールされたクエリの特有のパラメータである@run_time@run_dateを使用してみました。
パラメータの詳細を知りたい方は以下のリンクを参照してみてください。

スケジュールされたクエリに利用可能なパラメータ

上記のクエリを以下の構成で登録します。
登録したクエリはAPI経由でのみ実行させたいので実行頻度はオンデマンドにします。
(実際に使用する場合は、デフォルトのサービスアカウントの使用は避けてください。)

名前: ondemand_query
実行頻度: オンデマンド
リージョン: asia-northeast1 (東京)

登録できました。

Cloud Functionsを作成

以下のソースコードでCloud Functionsをデプロイします。

main.py

import os
import datetime

from google.cloud.bigquery_datatransfer_v1 import (
    DataTransferServiceClient,
    StartManualTransferRunsRequest,
)


def generate_request(transfer_config_name, requested_run_time):
    request = StartManualTransferRunsRequest(
      parent=transfer_config_name,
      requested_run_time=requested_run_time
    )

    return request


def exec_query(request):
    try:
        PROJECT_ID = os.getenv('PROJECT_ID')
        TRANSFER_ID = os.getenv('TRANSFER_ID')

        client = DataTransferServiceClient()

        transfer_config_name = f'projects/{PROJECT_ID}/locations/asia-northeast1/transferConfigs/{TRANSFER_ID}'
        current_time = datetime.datetime.now(datetime.timezone.utc)

        request = generate_request(transfer_config_name, current_time)

        # Make the request
        response = client.start_manual_transfer_runs(request=request)

        return 'OK'

    except Exception as e:
        print(f'Error: {e}')

        return 'ERROR'

requirements.txt

functions-framework==3.*
google-cloud-bigquery-datatransfer

main.pyの29行目にあるtransfer_config_nameには、事前に作成したスケジュールされたクエリのリソース名を指定します。
以下の画像のようにprojects/{projectId}/locations/{locationId}/transferConfigs/{configId}のフォーマットになっている想定です。

上記のソースコードを以下の構成でデプロイします。
(実際に使用する場合は、デフォルトサービスアカウントの使用を避けたり、トリガーの認証の設定を適切に行ってください。)

関数名: exec_scheduled_query
リージョン: asia-northeast1 (東京)
トリガー: 未認証の呼び出しを許可されたHTTPSトリガー

正常にデプロイできました。

実行してみる

デプロイしたCloud FunctionsのHTTPSエンドポイントを叩き、起動させます。
Cloud Functionsの起動に成功し、スケジュールされたクエリも正常に実行されていました。

パラメータの値も取れていました。

躓いたポイント

前述した通り、StartManualTransferRunsRequestのAPI経由で実行しています。

しかし、オンデマンドモードのスケジュールされたクエリを実行させる場合は、requested_run_timeを指定する必要があります。
指定しないと以下のようなエラーを吐きました。

400 Starting transfer run with on-demand schedule requires 'requested_run_time' to be specified.

総評

PythonでAPIを叩いて、スケジュールされたクエリを実行させてみました。
スケジュールされたクエリに登録することでクエリ単位での実行履歴を容易に確認することが可能になると思われます。
さらに実行履歴からクエリの実行時間やジョブIDなども一元で確認することが可能な点は非常に優れていると思いました。
次はスケジュールされたクエリをAPI経由で実行させ、クエリが完了するまで待つような実装を試してみたいと思います。

是非試してみてください!