BigQueryからCloud Functions(第2世代)をリモート関数で呼び出す際に同時実行数の上限制限ができるか確認してみた

トータルでのリクエストの同時実行数に関するパラメータがいくつかあって難しかったので、実際にパターンを試して確認しました。
2024.02.01

データアナリティクス事業本部 機械学習チームの鈴木です。

BigQueryからCloud Functions(第2世代)をリモート関数で呼び出す際に正味の同時実行数の上限制限ができるか確認してみました。

この記事について

BigQueryのリモート関数を使うと、SQLで対象のレコードをCloud Functionsに送信し、処理した結果を取得するということが可能です。

例えば外部のAPI(Google Cloudサービス含む)へCloud Functionsからデータ処理のリクエストを送り、結果をBigQueryにまた戻すということも可能です。

以下のような例があります。

Cloud Functionsのスケールの上限を大きくしておけばその分早く結果を返すことができますが、Cloud Functionsの先にあるAPIがその処理量に対応できるとは限らないので、どこかでCloud Functionsの同時実行数の上限を制限する必要があります。

今回はCloud Functions(第2世代)をリモート関数で呼び出す際を想定して、このような設定が可能なのか調べました。

Cloud Functions(第2世代)の実行数制限に使えそうな設定

設定項目について

以下の設定が使えそうなので、設定することとします。

  • Cloud Functions(第2世代)の最大インスタンス数
  • Cloud Functions(第2世代)の同時実行数
  • リモート関数のmax_batching_rowsオプション

Cloud Functions(第2世代)は起動するCloud Runインスタンスの最大数を指定できます。また、インスタンス上で実行する関数の同時実行数も設定できます。BigQueryから処理を依頼する場合、これらの値が小さいほど1度に依頼できる数が制限されると想像できます。

また、リモート関数をCREATE FUNCTION文で作成する際に、オプションとしてmax_batching_rowsを指定できます。このオプションによりCloud Functionsの関数に対する1つのリクエストに含めるレコード数を制限できます。関数の実装に依存しますが、受け取ったレコードをまとめてその先のAPIにリクエストするような場合だと、APIの処理量の制限を超過するリスクがあります。

リモート関数の作成

検証のためにリモート関数を作成しておきます。今回行った手順を記載します。

手順はリモート関数を作成すると同じです。

1. Cloud Functionsの関数の作成

Cloud Functionsのコンソールよりファンクションを作成ボタンから作成しました。

環境は第2世代、リージョンは東京リージョンとし、トリガーはHTTPSとしました。ランタイムはPython 3.10としました。インスタンス最大数などは後から変えるのでデフォルトとしておきました。CPUは1としました。

ソースコードはガイドのCloud Functions の関数のコード例に、検証結果が分かりやすいようsleepを追加したものを設定しました。

ハイライト箇所が変更した部分になります。ほかは記事執筆時点でガイドより引用したものです。

main.py

import functions_framework
import time

from flask import jsonify

# Max INT64 value encoded as a number in JSON by TO_JSON_STRING. Larger values are encoded as
# strings.
# See https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#json_encodings
_MAX_LOSSLESS=9007199254740992

@functions_framework.http
def batch_add(request):
  try:
    return_value = []
    request_json = request.get_json()
    calls = request_json['calls']
    time.sleep(5)
    for call in calls:
      print("処理を行います。")
      return_value.append(sum([int(x) if isinstance(x, str) else x for x in call if x is not None]))
    replies = [str(x) if x > _MAX_LOSSLESS or x < -_MAX_LOSSLESS else x for x in return_value]
    return_json = jsonify( { "replies":  replies } )
    return return_json
  except Exception as e:
    return jsonify( { "errorMessage": str(e) } ), 400

処理対象のレコードごとに5秒待ってから加算を行います。待機はforループの前にあるので、リクエストに含まれるレコード数が増えても待機時間は増加しません。

2. 接続の作成

東京リージョンにCLOUD_RESOURCEというIDの接続を作成しました。

接続の作成

作成した接続のサービスアカウントをメモしておき、IAMにてCloud Run起動元のロールを設定しました。

サービスアカウントの設定

3. リモート関数の作成

適当なデータセットを東京リージョンに作成します。今回はremote_function_sampleデータセットを作成しました。

続いて、以下のSQLでremote_addリモート関数を作成しました。

CREATE FUNCTION remote_function_sample.remote_add(x INT64, y INT64) RETURNS INT64
REMOTE WITH CONNECTION `プロジェクト名.asia-northeast1.CLOUD_RESOURCE`
OPTIONS (
  endpoint = 'Cloud Functionsの関数のURL',
  max_batching_rows = 1
)

max_batching_rowsはとりあえず1にしておきましたが、後の検証で変更します。Cloud Functionsの関数のURLは作成した関数の画面から取得しました。

制限についての検証

以下に試した検証内容を記載します。

結論としては、インスタンス最大数 x 同時実行数を見積もった上で2つのパラメータを設定することで、処理の同時実行数を制限することができることが分かりました。max_batching_rowsは関数コード内のループで1レコードあたりの処理にオーバーヘッドがあるならあまり気にしなくて良い(Cloud Functions関数自体のタイムアウトだけ気をつけておけば良い)、ないなら適切な値を見積もって設定するのが良いと考えます。

1. 時間計測用のSQL

以下のSQL文をBigQuery Studioから実行しました。

SELECT
  val,
  remote_function_sample.remote_add(val, 2)
FROM
  UNNEST([2,3,5,8]) AS val;

2. 各種パラメータと結果

以下のようにパラメータを動かして検証しました。

実測値を見ると、予想値とほぼ同じ値になっていることが分かります。

# インスタンス最大数 同時実行数 max_batching_rows 予想値/秒 実測値/秒
1 1 1 1 20 20
2 2 1 1 10 12
3 1 2 1 10 10
4 2 2 1 5 7
5 1 1 2 10 11

今回は同時実行数制限ができそうか知りたいだけなので小さい値の範囲かつ計測も1度ですがご容赦ください。時間は2回実行して2回目の値を取りました。

予想値は、「5(レコードあたりに設定したsleepの時間) * 4(レコード数) / (インスタンス最大数 x 同時実行数 x max_batching_rows)」で算出しました。

3. 実行ログの例

特にNo.1のケースのSQL実行で、Cloud Functionsの関数のログを表示すると以下のようになっていました。

実行ログの確認

トータルとしては20秒でしたが、確かに5秒ごとにログが出ていることが確認できました。

なお、SQL実行のジョブ情報自体は以下です。上記ログはこのジョブと同じ時間に実行されたものであることが分かります。

ジョブ情報

最後に

この記事では、Cloud Functions(第2世代)をリモート関数で呼び出す際に同時実行数の上限制限ができるか確認してみました。

小さい設定値でいくつか変えてみただけですが、想定通りの実行時間になりました。上限がもっと大きいときも、同じように考えて性能テストなどすると良さそうです。

リモート関数から外部にAPIリクエストを投げているような例で、同じように上限の調整をしたい方の参考になりましたら幸いです。

補足

SQL自体のタイムアウトについて

Cloud Functionsの処理実行数上限を設定すると、BigQuery側で実行したSQLの実行時間が大きくなってしまう可能性があります。

こちらの上限については検証はしていませんが、割り当てと上限ガイドによるとクエリ / 複数ステートメント クエリの実行時間の上限が記事執筆時点では6時間だったため、この時間に収まるように上限を調整すると良さそうでした。

とはいえ6時間も実行するとCloud Functionsの先にあるAPIへのリクエストもそれなりの量になるので、設計を検討した方が良いかもしれません。