Workflowsの実行レートをCloud Tasksで制御してみた #cm_google_cloud_adcal_2024

Workflowsの実行レートをCloud Tasksで制御してみた #cm_google_cloud_adcal_2024

Clock Icon2024.12.17

はじめに

データ事業本部のkobayashiです。
クラスメソッドの Google Cloud Advent Calendar 2024 の 17 日目のブログです。

https://qiita.com/advent-calendar/2024/cm-google-cloud

Cloud Workflowsで1つのワークフローを実行する際にCloud Taskキューを使用して実行レートの制限をしてみたのでその内容をまとめます。

Cloud Tasksを使用したCloud Workflowsワークフロー実行の制御

大規模なシステムでは、同時に実行されるワークフローの数を適切に制御する必要があります。システムには同時実行できるワークフローの数に制限があり、この制限を超えると「429 Too many requests」エラーが発生し、システムの安定性が損なわれる可能性があります。そのような場合にGoogle Cloud Tasksを使用してワークフローの実行を制御することで実行レートの制限を行うことが出来ます。
Cloud Tasksは実行キューを管理し、設定されたレートに従ってタスクを実行することで、システムの負荷を適切にコントロールします。

仕組み

  1. 親ワークフローがタスクを作成し、Cloud Tasksキューに送信します
  2. Cloud Tasksは設定されたレートに従って、順次タスクを実行します
  3. 各タスクは子ワークフローとして処理されます
  • メリット
    • システムの安定性が向上
    • リソースの効率的な利用が可能
    • エラーの発生を抑制
    • 全体的なパフォーマンスの改善
  • 注意点
    • Cloud Tasksは「少なくとも1回」の実行を保証する設計となっています。つまり、まれにタスクが重複して実行される可能性があります。このため、重複実行に対する適切なハンドリングを実装することが推奨されます。

具体的なユースケースとしては大量の画像処理タスク、バッチ処理の実行制御、多数のユーザーデータの一括更新などで使えるかと思います。

Cloud TasksでCloud Workflowsを制御してみる

では実際にCloud TasksでCloud Workflowsを制御してみたいと思います。

仕組みとしてはWorkflowsで作成されたワークフローがありそれをCloud Tasksのキューを介して実行するものです。
手順としては以下になります。

  1. Workflows用のサービスアカウントを作成する
  2. 実行されるワークフローを作成する
  3. Cloud Tasksのキューを作成する
  4. ワークフローを実行するタスクをキューに登録する

Workflows用のサービスアカウントを作成する

はじめにワークフローに割り当てるサービスアカウントを作成します。

$ gcloud iam service-accounts create wf-sa-for-queue --display-name="Workflow Service Account for Cloud Tasks Queue"
$ gcloud projects add-iam-policy-binding {プロジェクトID} \
    --member="serviceAccount:wf-sa-for-queue@{プロジェクトID}.iam.gserviceaccount.com" \
    --role="roles/workflows.invoker" \
    --condition=None
$ gcloud projects add-iam-policy-binding {プロジェクトID} \
    --member="serviceAccount:wf-sa-for-queue@{プロジェクトID}.iam.gserviceaccount.com" \
    --role="roles/logging.logWriter" \
    --condition=None

roles/logging.logWriterはワークフロー中にcall: sys.logでCloud Loggingに出力するためです。

実行されるワークフローを作成する

次の単純なワークフローを作成します。

wf-task.yml
main:
  params: [args]
  steps:
    - init:
        assign:
          - iteration : ${args.iteration}
    - wait:
        call: sys.sleep
        args:
            seconds: 10
    - logging:
        call: sys.log
        args:
          text: ${"task-"+iteration}
          severity: INFO
    - return_message:
        return: ${"task-"+iteration}
  1. argで変数iterationを受け取る
  2. 10秒間待機する
  3. task-{iteration}でCloud Loggingに出力する
  4. task-{iteration}をreturnする

です。 ではこのワークフローを登録します。

$ gcloud workflows deploy wf-task \
	--source=wf-task.yml \
	--location=asia-northeast1 \
	--service-account=wf-sa-for-queue@{プロジェクトID}.iam.gserviceaccount.com

試しに{"iteration":1}を指定してワークフローを実行してみます。

$ gcloud workflows run wf-task --location=asia-northeast1 --data='{"iteration":1}'
argument: '{"iteration":1}'
createTime: '2024-12-13T18:56:04.225813Z'
duration: 10.322506618s
endTime: '2024-12-13T18:56:14.548319618Z'
name: projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/8c26ea24-3c5e-4cb9-9f3c-64680fd6b49e
result: '"task-1"'
startTime: '2024-12-13T18:56:04.225813Z'
state: SUCCEEDED
status:
  currentSteps:
  - routine: main
    step: return_message
workflowRevisionId: 000002-d3f

durationが10秒程度でresultも"task-1"となっていることがわかり想定通りのワークフローが作成出来ていることがわかります。

Cloud Tasksのキューを作成する

次にワークフローの実行レートを制御するCloud Tasksのキューを作成します。

$ gcloud tasks queues create test-wf-queue1 --location=asia-northeast1
Created queue [asia-northeast1/test-wf-queue1].

問題なく作成出来たので設定を確認してみます。

gcloud tasks queues describe test-wf-queue1 --location=asia-northeast1
name: projects/{プロジェクトID}/locations/asia-northeast1/queues/test-wf-queue1
rateLimits:
  maxBurstSize: 100
  maxConcurrentDispatches: 1000
  maxDispatchesPerSecond: 500.0
retryConfig:
  maxAttempts: 100
  maxBackoff: 3600s
  maxDoublings: 16
  minBackoff: 0.100s
state: RUNNING

ワークフローを実行するタスクをキューに登録する(maxDispatchesPerSecond: 500.0)

では準備が整ったのでCloud Tasksキューを介してワークフローを呼び出してみます。
キューにワークフローを呼び出すタスクを登録する必要がありますが今回は以下のshellを使って登録します。

#!/bin/bash

# 変数定義
QUEUE_NAME="test-wf-queue1"
LOCATION="asia-northeast1"
PROJECT_ID="{プロジェクトID}"
WORKFLOW_NAME="wf-task"
SERVICE_ACCOUNT="wf-sa-for-queue@{プロジェクトID}.iam.gserviceaccount.com"
ITERATIONS=100

for i in $(seq 1 $ITERATIONS); do
    # UUIDの生成
    TASK_UUID=$(uuidgen)
    echo "Creating task $i of $ITERATIONS (UUID: $TASK_UUID)"

    gcloud tasks create-http-task "workflow-task-${i}-${TASK_UUID}" \
        --queue=$QUEUE_NAME \
        --location=$LOCATION \
        --url="https://workflowexecutions.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION/workflows/$WORKFLOW_NAME/executions" \
        --method=POST \
        --oauth-token-scope="https://www.googleapis.com/auth/cloud-platform" \
        --oauth-service-account-email=$SERVICE_ACCOUNT \
        --body-content="{\"argument\": \"{\\\"iteration\\\": $i}\"}"

done

echo "Task creation completed"

ではこのshellスクリプトを実行してキューにワークフローを呼び出すタスクを登録してみます。

$ ./create_tasks.sh
Creating task 1 of 20 (UUID: D4CF1E6A-134A-4405-8563-096F1506ED1A)
Created task [projects/{プロジェクトID}/locations/asia-northeast1/queues/test-wf-queue1/tasks/workflow-task-1-D4CF1E6A-134A-4405-8563-096F1506ED1A].
Creating task 2 of 20 (UUID: 10C794F6-BA90-4068-BEA4-6A5254AD37E7)
Created task [projects/{プロジェクトID}/locations/asia-northeast1/queues/test-wf-queue1/tasks/workflow-task-2-10C794F6-BA90-4068-BEA4-6A5254AD37E7].
...
Creating task 99 of 100 (UUID: 63EFF333-2FFF-4FC8-9BEA-E35836D7CB51)
Created task [projects/{プロジェクトID}/locations/asia-northeast1/queues/test-wf-queue1/tasks/workflow-task-99-63EFF333-2FFF-4FC8-9BEA-E35836D7CB51].
Creating task 100 of 100 (UUID: 46B75691-9A18-44D5-9F67-2855FBB0F8B9)
Created task [projects/{プロジェクトID}/locations/asia-northeast1/queues/test-wf-queue1/tasks/workflow-task--46B75691-9A18-44D5-9F67-2855FBB0F8B9].
Task creation completed

実行できたのでワークフローの実行結果を見てみます。

$ gcloud workflows executions list wf-task --location=asia-northeast1 --limit=5
NAME                                                                                                               STATE      START_TIME                      END_TIME
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/f21ed7f0-3e46-42df-9a16-df56ccaac5d6  SUCCEEDED  2024-12-13T19:16:31.593272044Z  2024-12-13T19:16:41.923285741Z
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/85244b74-66dc-4756-afe6-debba7895c92  SUCCEEDED  2024-12-13T19:16:30.765198338Z  2024-12-13T19:16:41.079595940Z
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/ae9d4d14-47df-4ee8-945c-d2b754a3a5df  SUCCEEDED  2024-12-13T19:16:29.893702518Z  2024-12-13T19:16:40.188414705Z
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/f81479af-8bca-4b40-a6d9-14fba36f3e8a  SUCCEEDED  2024-12-13T19:16:28.951901629Z  2024-12-13T19:16:39.257394378Z
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/4f858e27-5024-4886-b488-e34539a3ea62  SUCCEEDED  2024-12-13T19:16:28.142351934Z  2024-12-13T19:16:38.428702578Z

キューの設定がmaxDispatchesPerSecond: 500.0なので1秒間に500件ワークフローを実行する設定となっており、shellでキューにタスクを登録した直後に次々ワークフローが実行されていることがわかります。

ワークフローを実行するタスクをキューに登録する(maxDispatchesPerSecond: 0.5)

次にキューの設定をmaxDispatchesPerSecond: 0.5すなわち2秒でに1件ワークフローを実行する設定にしてみます。

$ gcloud tasks queues update test-wf-queue1 --location=asia-northeast1 \
    --max-dispatches-per-second=0.5 --max-concurrent-dispatches=1000

maxDispatchesPerSecondの設定を変更したのでワークフローの実行結果を見てみます。

$ gcloud workflows executions list wf-task --location=asia-northeast1 --limit=10
NAME                                                                                                               STATE      START_TIME                      END_TIME
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/3c80de96-0cd2-4ea0-a678-748699f0b934  SUCCEEDED  2024-12-13T19:28:09.205293897Z  2024-12-13T19:28:19.476531179Z
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/84840954-ff18-4400-a423-1cfebc69c8b1  SUCCEEDED  2024-12-13T19:28:07.196907733Z  2024-12-13T19:28:17.495681330Z
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/e124ed21-e22a-4a36-bad6-18fa1106457f  SUCCEEDED  2024-12-13T19:28:05.205954094Z  2024-12-13T19:28:15.559974750Z
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/23bee7f1-b3ad-467f-9076-622a89b33e81  SUCCEEDED  2024-12-13T19:28:03.203982207Z  2024-12-13T19:28:13.506164123Z
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/853359cf-4eff-430f-be34-56670a517cf7  SUCCEEDED  2024-12-13T19:28:01.373220934Z  2024-12-13T19:28:11.677475440Z

START_TIMEを見てみると2秒毎に実行されていることがわかります。
これで目的通りCloud Workflowsで1つのワークフローを実行する際にCloud Taskキューを使用して実行レートの制限をすることが出来ました。

なおCloud Tasksのキューで最大同時ディスバッチ数maxConcurrentDispatchesを設定することで最大並列数を制限できるため以下の設定でワークフロータスクを登録してみました。

$ gcloud tasks queues update test-wf-queue1 --location=asia-northeast1 \
    --max-concurrent-dispatches=1

同一ワークフローの同時実行を制御したかったのですが結果は失敗でした。
理由はキューのタスクへの登録内容を考えればわかることで、Cloud TasksがHTTPリクエストを送信してWorkflowsを実行する際にそのリクエストはすぐに完了してしまいCloud Tasksの視点からは、タスクはすぐに完了したと認識されるためmaxConcurrentDispatchesは効かないためです。

まとめ

Cloud Workflowsで1つのワークフローを実行する際にCloud Taskキューを使用して実行レートの制限をしてみました。Cloud Tasks媒介することでワークフローの実行レートが制限できることがわかったので大量の画像処理タスク、バッチ処理の実行制御の場面で使ってみたいと思います。
またCloud Tasksを初めて触ってみたのですがなかなかおもしろいサービスなので検証して別のブログを書いてみたいと思います。

明日 12/18 は hanzawa.yuya さんです。お楽しみに。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.