Cloud Pub/Sub から Cloud Tasks のタスクを BufferTask メソッドで直接作成する
はじめに
データ事業本部のはんざわです。
Cloud Pub/Sub と Cloud Tasks は、非同期処理において非常に有用なサービスとしてよく比較されます。細かい違いについては、以下の公式ドキュメントを参照してください。
両者には優れた点がある一方で、単体では少し物足りない面もあります。
例として、Cloud Pub/Sub の Push 配信では、最大でも 10 分以内に HTTP レスポンスを返す必要があるため、長時間の処理には適していません。また、メッセージは即時に配送されるため、適切な配信レートを管理するためには工夫が必要です。
一方、Cloud Tasks はこれらの課題に対応できます。しかし、単体では大量のイベントをリアルタイムに処理したり、イベント駆動アーキテクチャを構成したりすることはできません。
そのため、ユースケースによっては、それぞれの苦手な部分を補完する形で、Cloud Pub/Sub と Cloud Tasks の両方を組み合わせて利用することも有効です。
その場合、通常は以下の構成図に示すように Cloud Pub/Sub で Cloud Run をトリガーし、Cloud Run からクライアントライブラリを用いて Cloud Tasks のタスクを作成するような流れになります。しかし、この方法は、タスクを作成する処理を別途実装する必要があるため、手間がかかります。
(筆者も最近まで知りませんでしたが)Cloud Tasks には、キューレベルのルーティング構成と BufferTask API という機能があります。これらを組み合わせることで、そのような手間をかけずに Cloud Pub/Sub から Cloud Tasks に直接タスクを登録できます。
今回のブログでは、これらの機能を紹介しつつ、実際に試してみたいと思います。
※ 今回のブログでは、Cloud Tasks の説明は割愛します。概要については、以下のブログを参考にしてください。
それぞれの機能について
今回のポイントとなる キューレベルのルーティング構成 と BufferTask API について、簡単に紹介します。
以下のブログが非常に参考になるため、あわせて参考にしてください。
キューレベルのルーティング構成とは
キューレベルのルーティング構成とは、キュー単位でタスクのルーティング先(HTTP エンドポイントなど)をあらかじめ設定できる機能です。
この設定を行うと、キューに追加されたすべてのタスクは、個別に HTTP ターゲットを指定しなくても、あらかじめ定義されたルーティング先に自動的に送信されます。つまり、タスクごとにターゲット URL を指定する必要がなくなり、タスク作成処理をよりシンプルに保つことができます。
以下は、Google Cloud の公式ブログで紹介されている構成図の例です。
この例では、事前に http-queue
に対して、hello2
の Cloud Run をルーティング先として設定してします。
この状態で hello1
の Cloud Run をターゲットとする HTTP タスクを作成してもリクエストは hello2
に送信されます。
参考:Cloud Tasks で HTTP リクエストをバッファリングする
BufferTask API とは
BufferTask API は、Cloud Tasks のキューに HTTP タスクを登録するための新しい API です。
従来の方法では、タスクごとに HTTP ターゲットの URL やヘッダーなどを明示的に指定する必要がありましたが、BufferTask API を使えば、それらの情報を指定せずにタスクを作成できます。
この API を利用する場合、HTTP ターゲットをタスク単位ではなく、キュー単位であらかじめ設定しておく必要があります。そのため、前述の「キューレベルのルーティング構成」と組み合わせて使用することが前提となります。
BufferTask API を活用することで、呼び出し元のサービスは、タスクの送信先や詳細な構成情報を意識する必要がなくなり、よりシンプルにサービス間の統合を実現できます。
動かしてみる
ここからは、Cloud Pub/Sub から Cloud Tasks を経由して Cloud Run を呼び出す構成を実際に動かして検証していきます。
全体構成は以下の図の通りです。
1. Cloud Run サービスをデプロイする
まずは、Cloud Tasks から呼び出される Cloud Run サービスをデプロイします。
今回の検証では、IAM 認証を有効にした設定にしています。
REGION="asia-northeast1"
SERVICE="sample-cloud-run"
# Cloud Run をデプロイ
gcloud run deploy $SERVICE \
--image=gcr.io/cloudrun/hello \
--no-allow-unauthenticated \
--region=$REGION
2. Eventarc を作成する
次に、Eventarc トリガーとそれに使用するサービスアカウントを作成します。
最終的には Eventarc から Cloud Tasks のキューにタスクを送信しますが、この時点ではまだ作成していないため、一時的に宛先を Cloud Run に設定しています。
REGION="asia-northeast1"
PROJECT_ID=""
TRIGGER="sample-eventarc"
SERVICE="sample-cloud-run"
QUEUE_SA="call-cloud-run"
TRIGGER_SA="create-tasks"
# Eventarc に割り当てるサービスアカウントを作成
gcloud iam service-accounts create $TRIGGER_SA
# Cloud Tasks キューへのタスク作成権限を付与
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${TRIGGER_SA}@${PROJECT_ID}.iam.gserviceaccount.com" \
--role="roles/cloudtasks.enqueuer" \
--condition=None
# Eventarc トリガーの作成(宛先は一時的にCloud Run)
gcloud eventarc triggers create $TRIGGER \
--destination-run-service=$SERVICE \
--destination-run-region=$REGION \
--event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
--service-account="${TRIGGER_SA}@${PROJECT_ID}.iam.gserviceaccount.com" \
--location=$REGION
3. キューレベルのルーティングを設定した Cloud Tasks キューを作成
続いて、キューレベルのルーティングを有効にした Cloud Tasks キューを作成します。
この設定により、タスクごとに HTTP URL を指定せずともキューに設定されたルーティング先へ自動的に送信されます。
REGION="asia-northeast1"
PROJECT_ID=""
QUEUE="sample-queue"
SERVICE="sample-cloud-run"
QUEUE_SA="call-cloud-run"
# サービスアカウントの作成
gcloud iam service-accounts create $QUEUE_SA
# Cloud Run の実行権限を付与
gcloud run services add-iam-policy-binding $SERVICE \
--region=$REGION \
--member="serviceAccount:${QUEUE_SA}@${PROJECT_ID}.iam.gserviceaccount.com" \
--role="roles/run.invoker" \
--condition=None
# Cloud Run のエンドポイントとクエリパラメータを設定
SERVICE_HOST="sample-cloud-run-ROJECT_NNUMBER.asia-northeast1.run.app"
QUERY="__GCP_CloudEventsMode=CUSTOM_PUBSUB_projects%2F${PROJECT_ID}%2Ftopics%2Feventarc-asia-northeast1-sample-eventarc-412"
# Cloud Tasks キューの作成(キューレベルのルーティングを設定)
gcloud tasks queues create $QUEUE \
--http-uri-override="host:${SERVICE_HOST},query:${QUERY}" \
--max-attempts=1 \
--location=$REGION \
--http-oidc-service-account-email-override="${QUEUE_SA}@${PROJECT_ID}.iam.gserviceaccount.com"
4. サービスアカウントユーザーのロールを付与
Cloud Tasks が Cloud Run を呼び出すためには、OIDC トークンを発行できるようにする必要があります。
そのため、タスク作成側のサービスアカウントに対して、Cloud Run 実行用サービスアカウントにサービスアカウントユーザーの権限を付与します。
PROJECT_ID=""
QUEUE_SA="call-cloud-run"
TRIGGER_SA="create-tasks"
gcloud iam service-accounts add-iam-policy-binding ${QUEUE_SA}@${PROJECT_ID}.iam.gserviceaccount.com \
--member="serviceAccount:${TRIGGER_SA}@${PROJECT_ID}.iam.gserviceaccount.com" \
--role="roles/iam.serviceAccountUser" \
--condition=None
5. Pub/Subサブスクリプションの宛先をBufferTask APIに変更
最後に、Eventarc が使用している Pub/Sub サブスクリプションの Push 先を BufferTask API に変更します。
これにより、Pub/Sub メッセージが直接 Cloud Tasks に配信されるようになります。
REGION="asia-northeast1"
PROJECT_ID=""
QUEUE="sample-queue"
TRIGGER_SA="create-tasks"
# Pub/Sub サブスクリプションの宛先を BufferTask API に変更
gcloud pubsub subscriptions update eventarc-asia-northeast1-sample-eventarc-sub-551 \
--push-endpoint="https://cloudtasks.googleapis.com/v2beta3/projects/${PROJECT_ID}/locations/${REGION}/queues/${QUEUE}/tasks:buffer" \
--push-auth-service-account="${TRIGGER_SA}@${PROJECT_ID}.iam.gserviceaccount.com"
動作確認
準備が整ったら、Pub/Sub トピックにメッセージを送信して、Cloud Run が起動するか確認します。
gcloud pubsub topics publish eventarc-asia-northeast1-sample-eventarc-412 \
--message="test"
- 実行結果
期待通り、Cloud Run が起動してリクエストを受け取っていることが確認できました。
おまけ
ここまで紹介してきた仕組みを応用することで Cloud Storage トリガーの Cloud Run を 1 時間実行させることも可能になります。
Cloud Pub/Sub のサブスクリプションでは、デフォルトで 10 秒以内、最大でも 10 分以内にレスポンスを返さないとメッセージが未処理と判断され再送されてしまいます。
この制限のため、Cloud Run で長時間の処理を直接実行するのは難しいケースがあります。
そこで、Cloud Tasks を中継として挟むことで、タスクを登録した時点で Pub/Sub 側には即座にレスポンスを返すことができます。
これにより、長時間の処理を行うことが可能になります。
- 実行結果
さらに、Cloud Tasks を経由することで、同時実行数の制限やデッドレターキューが不要なリトライポリシーの設定など処理を柔軟に制御できるようになります。
ぜひ試してみてください