Cloud Tasksで複数のCloud Functionsを非同期に呼び出してみた

Cloud Tasksで複数のCloud Functionsを非同期に呼び出してみた

Clock Icon2024.06.22

Google Cloudデータエンジニアのはんざわです。
前回の記事では、Cloud Tasksの簡単な概要を紹介した後に単一のCloud FunctionsをCloud Tasksで呼び出す方法を紹介しました。

今回のブログでは、Cloud Tasksで複数のCloud Functionsを非同期に呼び出してみたいと思います。
前回と同様にCloud Tasksでタスクを作成し、Cloud Functionsを呼び出しますが、複数のCloud Functionsを呼び出す場合、その呼び出す数だけタスクを作成する必要があります。
ひとつずつタスクを作成するのは現実的ではないため、Pythonのクライアントライブラリで複数のタスクをまとめて作成したいと思います。

事前準備

今回の検証では、以下のリソースを準備します。

  • Cloud Tasksのキュー
  • Cloud Tasksから呼び出されるCloud Functions
  • Cloud Tasksのタスクを作成するCloud Functions

さっそく作成していきましょう。

Cloud Tasksのキュー

まずは、Cloud Tasksのキューを作成します。
次のキャプチャのような構成でキューを作成しました。

  • 名前: exec-cloud-functions
  • 最大ディスバッチ数: 10 / 秒
  • 最大同時ディスバッチ数: 2

最大同時ディスバッチ数で最大並列数を2に制限しているため、同時に実行されるCloud Functionsは2つのみになる想定です。
これらのオプションの詳細は前回のブログで紹介しているので、興味がある方は是非確認してみてください。

Cloud Tasksから呼び出されるCloud Functions

次にCloud Tasksから呼び出されるCloud Functionsをデプロイします。

以下のコードをexec-by-tasksという名前でデプロイしました。
見ての通り、処理はシンプルで、Cloud Functionsが受け取ったリクエストボディから変数を受け取り、出力するだけです。
また、後続で呼び出されるCloud Functionsの挙動も確認したいため、5秒間のスリープを仕込んでいます。

import time
import functions_framework

@functions_framework.http
def main(request):
    print("Function Started")

    request_json = request.get_json()
    text = request_json['text']

    # 受け取った変数を出力
    print(text)

    time.sleep(5)

    print("Function Finished")

    return 'OK'
functions-framework==3.*

Cloud Tasksのタスクを作成するCloud Functions

最後にCloud Tasksのタスクを作成するCloud Functionsをデプロイします。

上記のドキュメントを参考にHTTPタスクをPythonで作成します。

処理の流れとしては、1から10までの数字をリクエストボディのパラメータに加えたタスクを順次作成し、それをキューに送信しています。

import json
import os

from typing import Optional
import functions_framework

from google.cloud import tasks_v2


@functions_framework.cloud_event
def create_http_task_with_token(cloud_event):
    print("Functions Started")

    client = tasks_v2.CloudTasksClient()

    # 環境変数を取得
    URL = os.getenv("URL")
    SA = os.getenv("SA")
    GCP_PROJECT = os.getenv("GCP_PROJECT")

    # 1から10までのタスクを作成する
    for i in range(1, 11):
        json_payload = {"text": f"これは{i}番です"}

        # タスクを構成する
        task = tasks_v2.Task(
            http_request = tasks_v2.HttpRequest(
                http_method = tasks_v2.HttpMethod.POST,
                url = URL,
                headers = {"Content-type": "application/json"},
                oidc_token = tasks_v2.OidcToken(
                    service_account_email = SA
                ),
                body = json.dumps(json_payload).encode()
            ),
        )

        # タスクをキューに送信する
        client.create_task(
            tasks_v2.CreateTaskRequest(
                parent = client.queue_path(
                    project = GCP_PROJECT,
                    location = 'asia-northeast1',
                    queue = 'exec-cloud-functions'
                ),
                task = task,
            )
        )

        print(f"{i}番目のタスクを送信しました")

    print("Functions Finished")

    return "OK"
functions-framework==3.*
google-cloud-tasks==2.13.1

さっそく動かしてみる

まずはcreate-tasksを起動してタスクをキューに送信します。

Cloud Loggingを確認すると次のキャプチャのようにログが出力されていました。
タスクがキューに送信されるとそのタスクの終了を待たず、非同期に次のタスクが送信されていることがわかると思います。

次にexec-by-tasksのログも確認します。

1番目と2番目は、ほぼ同時に呼び出されていますが、3番目と4番目は1番目と2番目が終了してから呼び出されていることがわかると思います。
これは事前にキューの設定で最大同時ディスバッチ数を2にしたため、同時に2つまでしか実行されないようにキューが制御しています。

制限事項

Cloud Tasksにもいくつか制限事項があります。
利用する際は、事前に上記のドキュメントを確認してみてください。

まとめ

今回は、Cloud Tasksで複数のCloud Functionsを非同期に呼び出してみました。
シンプルに1から10までの数字をリクエストボディに含めてみました。他の例として、バケットからファイルの一覧を取得して、それらをリクエストボディに含めて何らかの処理をさせたりすることも可能だと思います。

正確に並列数を制御したい場合、Cloud Tasksは有効なサービスだと思いますので、是非使ってみてください。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.