Workflowsの実行レートをCloud Tasksで制御してみた #cm_google_cloud_adcal_2024
はじめに
データ事業本部のkobayashiです。
クラスメソッドの Google Cloud Advent Calendar 2024 の 17 日目のブログです。
Cloud Workflowsで1つのワークフローを実行する際にCloud Taskキューを使用して実行レートの制限をしてみたのでその内容をまとめます。
Cloud Tasksを使用したCloud Workflowsワークフロー実行の制御
大規模なシステムでは、同時に実行されるワークフローの数を適切に制御する必要があります。システムには同時実行できるワークフローの数に制限があり、この制限を超えると「429 Too many requests」エラーが発生し、システムの安定性が損なわれる可能性があります。そのような場合にGoogle Cloud Tasksを使用してワークフローの実行を制御することで実行レートの制限を行うことが出来ます。
Cloud Tasksは実行キューを管理し、設定されたレートに従ってタスクを実行することで、システムの負荷を適切にコントロールします。
仕組み
- 親ワークフローがタスクを作成し、Cloud Tasksキューに送信します
- Cloud Tasksは設定されたレートに従って、順次タスクを実行します
- 各タスクは子ワークフローとして処理されます
- メリット
- システムの安定性が向上
- リソースの効率的な利用が可能
- エラーの発生を抑制
- 全体的なパフォーマンスの改善
- 注意点
- Cloud Tasksは「少なくとも1回」の実行を保証する設計となっています。つまり、まれにタスクが重複して実行される可能性があります。このため、重複実行に対する適切なハンドリングを実装することが推奨されます。
具体的なユースケースとしては大量の画像処理タスク、バッチ処理の実行制御、多数のユーザーデータの一括更新などで使えるかと思います。
Cloud TasksでCloud Workflowsを制御してみる
では実際にCloud TasksでCloud Workflowsを制御してみたいと思います。
仕組みとしてはWorkflowsで作成されたワークフローがありそれをCloud Tasksのキューを介して実行するものです。
手順としては以下になります。
- Workflows用のサービスアカウントを作成する
- 実行されるワークフローを作成する
- Cloud Tasksのキューを作成する
- ワークフローを実行するタスクをキューに登録する
Workflows用のサービスアカウントを作成する
はじめにワークフローに割り当てるサービスアカウントを作成します。
$ gcloud iam service-accounts create wf-sa-for-queue --display-name="Workflow Service Account for Cloud Tasks Queue"
$ gcloud projects add-iam-policy-binding {プロジェクトID} \
--member="serviceAccount:wf-sa-for-queue@{プロジェクトID}.iam.gserviceaccount.com" \
--role="roles/workflows.invoker" \
--condition=None
$ gcloud projects add-iam-policy-binding {プロジェクトID} \
--member="serviceAccount:wf-sa-for-queue@{プロジェクトID}.iam.gserviceaccount.com" \
--role="roles/logging.logWriter" \
--condition=None
roles/logging.logWriter
はワークフロー中にcall: sys.log
でCloud Loggingに出力するためです。
実行されるワークフローを作成する
次の単純なワークフローを作成します。
main:
params: [args]
steps:
- init:
assign:
- iteration : ${args.iteration}
- wait:
call: sys.sleep
args:
seconds: 10
- logging:
call: sys.log
args:
text: ${"task-"+iteration}
severity: INFO
- return_message:
return: ${"task-"+iteration}
- argで変数
iteration
を受け取る - 10秒間待機する
task-{iteration}
でCloud Loggingに出力するtask-{iteration}
をreturnする
です。 ではこのワークフローを登録します。
$ gcloud workflows deploy wf-task \
--source=wf-task.yml \
--location=asia-northeast1 \
--service-account=wf-sa-for-queue@{プロジェクトID}.iam.gserviceaccount.com
試しに{"iteration":1}
を指定してワークフローを実行してみます。
$ gcloud workflows run wf-task --location=asia-northeast1 --data='{"iteration":1}'
argument: '{"iteration":1}'
createTime: '2024-12-13T18:56:04.225813Z'
duration: 10.322506618s
endTime: '2024-12-13T18:56:14.548319618Z'
name: projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/8c26ea24-3c5e-4cb9-9f3c-64680fd6b49e
result: '"task-1"'
startTime: '2024-12-13T18:56:04.225813Z'
state: SUCCEEDED
status:
currentSteps:
- routine: main
step: return_message
workflowRevisionId: 000002-d3f
durationが10秒程度でresultも"task-1"
となっていることがわかり想定通りのワークフローが作成出来ていることがわかります。
Cloud Tasksのキューを作成する
次にワークフローの実行レートを制御するCloud Tasksのキューを作成します。
$ gcloud tasks queues create test-wf-queue1 --location=asia-northeast1
Created queue [asia-northeast1/test-wf-queue1].
問題なく作成出来たので設定を確認してみます。
gcloud tasks queues describe test-wf-queue1 --location=asia-northeast1
name: projects/{プロジェクトID}/locations/asia-northeast1/queues/test-wf-queue1
rateLimits:
maxBurstSize: 100
maxConcurrentDispatches: 1000
maxDispatchesPerSecond: 500.0
retryConfig:
maxAttempts: 100
maxBackoff: 3600s
maxDoublings: 16
minBackoff: 0.100s
state: RUNNING
maxDispatchesPerSecond: 500.0
)
ワークフローを実行するタスクをキューに登録する(では準備が整ったのでCloud Tasksキューを介してワークフローを呼び出してみます。
キューにワークフローを呼び出すタスクを登録する必要がありますが今回は以下のshellを使って登録します。
#!/bin/bash
# 変数定義
QUEUE_NAME="test-wf-queue1"
LOCATION="asia-northeast1"
PROJECT_ID="{プロジェクトID}"
WORKFLOW_NAME="wf-task"
SERVICE_ACCOUNT="wf-sa-for-queue@{プロジェクトID}.iam.gserviceaccount.com"
ITERATIONS=100
for i in $(seq 1 $ITERATIONS); do
# UUIDの生成
TASK_UUID=$(uuidgen)
echo "Creating task $i of $ITERATIONS (UUID: $TASK_UUID)"
gcloud tasks create-http-task "workflow-task-${i}-${TASK_UUID}" \
--queue=$QUEUE_NAME \
--location=$LOCATION \
--url="https://workflowexecutions.googleapis.com/v1/projects/$PROJECT_ID/locations/$LOCATION/workflows/$WORKFLOW_NAME/executions" \
--method=POST \
--oauth-token-scope="https://www.googleapis.com/auth/cloud-platform" \
--oauth-service-account-email=$SERVICE_ACCOUNT \
--body-content="{\"argument\": \"{\\\"iteration\\\": $i}\"}"
done
echo "Task creation completed"
ではこのshellスクリプトを実行してキューにワークフローを呼び出すタスクを登録してみます。
$ ./create_tasks.sh
Creating task 1 of 20 (UUID: D4CF1E6A-134A-4405-8563-096F1506ED1A)
Created task [projects/{プロジェクトID}/locations/asia-northeast1/queues/test-wf-queue1/tasks/workflow-task-1-D4CF1E6A-134A-4405-8563-096F1506ED1A].
Creating task 2 of 20 (UUID: 10C794F6-BA90-4068-BEA4-6A5254AD37E7)
Created task [projects/{プロジェクトID}/locations/asia-northeast1/queues/test-wf-queue1/tasks/workflow-task-2-10C794F6-BA90-4068-BEA4-6A5254AD37E7].
...
Creating task 99 of 100 (UUID: 63EFF333-2FFF-4FC8-9BEA-E35836D7CB51)
Created task [projects/{プロジェクトID}/locations/asia-northeast1/queues/test-wf-queue1/tasks/workflow-task-99-63EFF333-2FFF-4FC8-9BEA-E35836D7CB51].
Creating task 100 of 100 (UUID: 46B75691-9A18-44D5-9F67-2855FBB0F8B9)
Created task [projects/{プロジェクトID}/locations/asia-northeast1/queues/test-wf-queue1/tasks/workflow-task--46B75691-9A18-44D5-9F67-2855FBB0F8B9].
Task creation completed
実行できたのでワークフローの実行結果を見てみます。
$ gcloud workflows executions list wf-task --location=asia-northeast1 --limit=5
NAME STATE START_TIME END_TIME
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/f21ed7f0-3e46-42df-9a16-df56ccaac5d6 SUCCEEDED 2024-12-13T19:16:31.593272044Z 2024-12-13T19:16:41.923285741Z
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/85244b74-66dc-4756-afe6-debba7895c92 SUCCEEDED 2024-12-13T19:16:30.765198338Z 2024-12-13T19:16:41.079595940Z
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/ae9d4d14-47df-4ee8-945c-d2b754a3a5df SUCCEEDED 2024-12-13T19:16:29.893702518Z 2024-12-13T19:16:40.188414705Z
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/f81479af-8bca-4b40-a6d9-14fba36f3e8a SUCCEEDED 2024-12-13T19:16:28.951901629Z 2024-12-13T19:16:39.257394378Z
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/4f858e27-5024-4886-b488-e34539a3ea62 SUCCEEDED 2024-12-13T19:16:28.142351934Z 2024-12-13T19:16:38.428702578Z
キューの設定がmaxDispatchesPerSecond: 500.0
なので1秒間に500件ワークフローを実行する設定となっており、shellでキューにタスクを登録した直後に次々ワークフローが実行されていることがわかります。
maxDispatchesPerSecond: 0.5
)
ワークフローを実行するタスクをキューに登録する(次にキューの設定をmaxDispatchesPerSecond: 0.5
すなわち2秒でに1件ワークフローを実行する設定にしてみます。
$ gcloud tasks queues update test-wf-queue1 --location=asia-northeast1 \
--max-dispatches-per-second=0.5 --max-concurrent-dispatches=1000
maxDispatchesPerSecond
の設定を変更したのでワークフローの実行結果を見てみます。
$ gcloud workflows executions list wf-task --location=asia-northeast1 --limit=10
NAME STATE START_TIME END_TIME
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/3c80de96-0cd2-4ea0-a678-748699f0b934 SUCCEEDED 2024-12-13T19:28:09.205293897Z 2024-12-13T19:28:19.476531179Z
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/84840954-ff18-4400-a423-1cfebc69c8b1 SUCCEEDED 2024-12-13T19:28:07.196907733Z 2024-12-13T19:28:17.495681330Z
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/e124ed21-e22a-4a36-bad6-18fa1106457f SUCCEEDED 2024-12-13T19:28:05.205954094Z 2024-12-13T19:28:15.559974750Z
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/23bee7f1-b3ad-467f-9076-622a89b33e81 SUCCEEDED 2024-12-13T19:28:03.203982207Z 2024-12-13T19:28:13.506164123Z
projects/{プロジェクト番号}/locations/asia-northeast1/workflows/wf-task/executions/853359cf-4eff-430f-be34-56670a517cf7 SUCCEEDED 2024-12-13T19:28:01.373220934Z 2024-12-13T19:28:11.677475440Z
START_TIMEを見てみると2秒毎に実行されていることがわかります。
これで目的通りCloud Workflowsで1つのワークフローを実行する際にCloud Taskキューを使用して実行レートの制限をすることが出来ました。
なおCloud Tasksのキューで最大同時ディスバッチ数maxConcurrentDispatches
を設定することで最大並列数を制限できるため以下の設定でワークフロータスクを登録してみました。
$ gcloud tasks queues update test-wf-queue1 --location=asia-northeast1 \
--max-concurrent-dispatches=1
同一ワークフローの同時実行を制御したかったのですが結果は失敗でした。
理由はキューのタスクへの登録内容を考えればわかることで、Cloud TasksがHTTPリクエストを送信してWorkflowsを実行する際にそのリクエストはすぐに完了してしまいCloud Tasksの視点からは、タスクはすぐに完了したと認識されるためmaxConcurrentDispatches
は効かないためです。
まとめ
Cloud Workflowsで1つのワークフローを実行する際にCloud Taskキューを使用して実行レートの制限をしてみました。Cloud Tasks媒介することでワークフローの実行レートが制限できることがわかったので大量の画像処理タスク、バッチ処理の実行制御の場面で使ってみたいと思います。
またCloud Tasksを初めて触ってみたのですがなかなかおもしろいサービスなので検証して別のブログを書いてみたいと思います。
明日 12/18 は hanzawa.yuya さんです。お楽しみに。