BigQueryのリモート関数実行中にCloud Functionsでジョブをキャンセルしてみた

BigQueryのリモート関数実行中にCloud Functionsでジョブをキャンセルしてみた

Clock Icon2024.06.10

Google Cloudデータエンジニアのはんざわです。
先日、BigQueryのリモート関数で呼び出されたCloud Functionsから直接リモート関数を実行しているジョブをキャンセルさせたいケースがありました。

本ブログの検証項目は以下の通りです。

  • リモート関数で呼び出されたCloud Functionsから直接リモート関数を実行しているジョブをキャンセルする

そもそもリモート関数とは?

簡単にBigQueryのリモート関数を紹介すると、BigQueryでCloud FunctionsやCloud Runのような外部データソースの関数を呼び出すことができる機能です。

以下はリモート関数に関連するブログ記事になります。

検証の前提条件

  1. リモート関数からCloud Functionsを呼び出します
  2. 今回の検証で使用するCloud Functionsの設定は以下の通りです
    • 世代: 第2世代
    • ランタイム: Python 3.12
    • リージョン: asia-northeast1
    • メモリ: 256 MiB
    • CPU: 0.167 vCPU
    • タイムアウト: 30分

検証

リモート関数で呼び出されたCloud Functionsから直接リモート関数を実行しているジョブをキャンセルできるか検証してみます。

リソースの準備

リモート関数から呼び出されるCloud FunctionsのPythonのソースコードとパッケージは以下の通りです。

import json
import time
from logging import getLogger, INFO
from typing import Dict, List, Union

import functions_framework
from google.cloud import bigquery
import google.cloud.logging

logging_client = google.cloud.logging.Client()
logging_client.setup_logging()
logger = getLogger(__name__)
logger.setLevel(INFO)


def cancel_job(
    job_id: str,
    location: str = "asia-northeast1"
) -> None:
    client = bigquery.Client()
    job = client.cancel_job(job_id, location=location)
    logger.info(f"{job.location}:{job.job_id} cancelled")


@functions_framework.http
def hello_http(
    request: Dict[str, Union[str, List[List[str]]]]
) -> Dict[str, List[List[str]]]:
    """
    Args:
        request:
            {
                "requestId": "一意なID",
                "caller": "BigQueryのJob ID",
                "sessionUser": "ジョブを発行したGoogle Cloudのユーザー",
                "calls": [
                    [リモート関数が受け取るデータ],
                    ...
                ]
            }

    Reference:
        https://cloud.google.com/bigquery/docs/remote-functions?hl=ja#input_format

    Example:
        "caller": "//bigquery.googleapis.com/projects/myproject/jobs/bquxjob_5b4c112c_17961fafeaf"
    """
    request_json = request.get_json()

    caller = request_json["caller"]
    calls = request_json["calls"]

    # ジョブIDを抽出する
    jobid = caller.split('/')[-1]

    # ジョブをキャンセルする
    cancel_job(jobid)

    time.sleep(100)

    # 適当な結果を返す
    return json.dumps({"replies": ["Hello World" for _ in range(len(calls))]})
functions-framework==3.7.0
google-cloud-logging==3.10.0
google-cloud-bigquery==3.24.0

処理の概要として、入力データからBigQueryのジョブIDを取得し、BigQueryのAPIクライアントから対象のジョブをキャンセルさせる流れになっています。

本ブログでは省略しますがリモート関数のデプロイ方法は、以下のブログのCLOUD_RESOURCE 接続を作成リモート関数を作成の項目を確認してください。

動かしてみる

さっそくですが、以下のクエリでリモート関数を呼び出してみます。

SELECT remote_functions.test1("test")

少し待ちましたが終わりませんね...
ログを確認してみましょう

Cloud Loggingからログを確認すると以下のログが繰り返し流れていました。

ログの詳細は以下の通りです。

(省略)
...

google.api_core.exceptions.Forbidden: 403 POST https://bigquery.googleapis.com/bigquery/v2/projects/<PROJECT_ID>/jobs/bquxjob_7ec10878_18ffff4a225/cancel?projection=full&location=asia-northeast1&prettyPrint=false: Access Denied: Job <PROJECT_ID>:asia-northeast1.bquxjob_7ec10878_18ffff4a225: Permission bigquery.jobs.update denied on job <PROJECT_ID>:asia-northeast1.bquxjob_7ec10878_18ffff4a225 (or it may not exist).

ログの詳細を確認するとbigquery.jobs.updateの権限が不足しているため、対象のジョブをキャンセルできないと403エラーが発生していることがわかりました。

再度動かしてみる

不足している権限を付与してから再度動かしてみます。

bigquery.jobs.updateの権限を保有している事前定義済みロールは、roles/bigquery.adminroles/bigquery.studioAdminの2つのみでした。
今回の検証では、roles/bigquery.adminの権限を付与しますが、非常に強い権限になりますので必要に応じて、カスタムロールの作成を検討してください。

権限を付与した後に再度同じクエリでリモート関数を呼び出してみます。

期待していた通りにジョブをキャンセルすることができました!

まとめ

今回は、リモート関数で呼び出されたCloud Functionsから直接リモート関数を実行しているジョブをキャンセルする検証を実施しました。
開始当初に期待していた通りの結果になったので良かったです。

Cloud Functionsがリモート関数から受け取ったデータの中に不都合があった際などに利用していただけると良いと思います。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.