Workflowsでエクスポネンシャルバックオフを実装してみた

Workflowsでエクスポネンシャルバックオフを実装して検証してみました。
2024.04.23

データアナリティクス事業本部の根本です。Workflowsでエクスポネンシャルバックオフを実装して検証してみました。WorkflowsでAPIを叩く予定のある方はよかったら読んでみてください。

この記事の対象者

  • WorkflowsでAPIを叩く際にエクスポネンシャルバックオフを実装したいひと

はじめに:エクスポネンシャルバックオフとは

  • エクスポネンシャル(exponential):指数関数的
  • バックオフ(backoff):後退. 簡潔に表すと、指数関数的にリトライ間隔を後退(遅らせる)させるアルゴリズムになります。
    以下の例は、リトライごとに2の累乗分の時間待機するものです。
呼び出し回数 待機時間
1 0秒 (初回呼び出し)
2 1秒 2の0乗
3 2秒 2の1乗
4 4秒 2の2乗
5 8秒 2の3乗
6 16秒 2の4乗
7 32秒 2の5乗
8 64秒 2の6乗
9 128秒 2の7乗

エクスポネンシャルバックオフは、APIを呼び出した時に、即時リトライをするのではなく待機時間を指数関数的に増加させて呼び出しタイミングを待機させます。
そうすることでサーバ負荷の軽減やリクエスト数の低減を図ることができますし、サーバ側の一時的な負荷でのAPI呼び出し失敗であれば再試行で成功する可能性も上がります。
Workflowsにはエクスポネンシャルバックオフを実装するための機能再試行ステップ (retry step)があります。
それでは実際にWorkflowsで再試行ステップ(retry step)を用いてエクスポネンシャルバックオフを実装してみます。

やってみる

事前準備

事前準備として、2つのCloud Functionsを実装してデプロイしました。以前記事にしたものです。
デプロイしているCloud Functionsは以下です。
1. ファイル一覧取得関数(list_files STEP)
2. 並列処理検証関数(processFile STEP)←エクスポネンシャルバックオフでの再試行対象関数

ファイル一覧取得関数はCloud Storageからファイルを取得するので、バケットを作成して以下のファイルを保存しています。

test1.txt
test2.txt

ファイル一覧取得関数

httpトリガー起動の関数として作成しています。指定バケットに保存されているファイル一覧を取得して返却します。

list_filest

import functions_framework
from google.cloud import storage
from flask import jsonify

def list_blobs(bucket_name):
  storage_client = storage.Client()
  blobs = storage_client.list_blobs(bucket_name)
  blob_names = [blob.name for blob in blobs]
  return blob_names

@functions_framework.http
def get_file_names(request):
  BUCKET = "作成したバケット名"
  list_blobs_result = list_blobs(BUCKET)
  return jsonify({"file_names": list_blobs_result})

並列処理検証関数

httpトリガーで実装をします。
Workflowsから呼び出す際に、リクエストボディにファイル名を格納しています。エクスポネンシャルバックオフを検証するため、ファイル名がtest1.txtだった場合は意図的にリトライ処理を発生させるためHTTPステータスコード503を返却します。
(test1.txtは呼び出しに失敗するため複数回再試行される,test2.txtは複数回呼び出されることはないという仮説です)

並列処理検証関数

import functions_framework
from flask import jsonify

@functions_framework.http
def parallel_process(request):
  data = request.get_json()
  file_name = data.get('file_name')
  print(f"[{file_name}]:関数が起動されました。対象ファイルは[{file_name}]です。")
  if(file_name == 'test1.txt'):
    response = jsonify({'message': 'Error test'})
    response.status_code = 503
    return response
  print(f"[{file_name}]:処理終了")
  return file_name + ':処理完了'

Workflowsの実装

以下のYAMLでは1.ファイル一覧取得関数で取得したファイルのリストを、parallelステップで並列処理して2.並列処理検証関数で並列処理する流れとなっています。

workflows.yaml

main:
  steps:
  - list_files:
      call: http.get
      args:
        url: "ファイル一覧取得関数のURL"
      result: getFilesResult
  - processFiles:
      parallel:
        for:
          value: name
          in: ${getFilesResult.body.file_names}
          steps:
            - processFile:
                try:
                  steps:
                    - parallelFunction:
                        call: http.post
                        args:
                          url: "並列処理検証関数のURL"
                          body:
                            file_name: ${name}
                        result: processResult
                    - logRecode:
                        call: sys.log
                        args:
                          text: ${processResult.body}
                          severity: "INFO"
                retry:
                  predicate: ${retryCheck}
                  max_retries: 6
                  backoff:
                    initial_delay: 1
                    max_delay: 32
                    multiplier: 2
checkRetry:
  params: [error_array]
  steps:
    - checkError:
        switch:
        - condition: ${("code" in error_array) and (error_array.code == 503)}
          return: True
    - nothingToDo:
        return: False

それぞれのステップを見ていきます。

- list_files:
    call: http.get
    args:
      url: "ファイル一覧取得関数のURL"
    result: getFilesResult

ファイル一覧を取得してgetFilesResultに格納をします。

parallel:
  for:
    value: name
    in: ${getFilesResult.body.file_names}
          steps:
            - processFile:
                try:
                  steps:
                    - parallelFunction:
                        call: http.post
                        args:
                          url: "並列処理検証関数のURL"
                          body:
                            file_name: ${name}
                        result: processResult
                    - logRecode:
                        call: sys.log
                        args:
                          text: ${processResult.body}
                          severity: "INFO"

parallelステップは並列処理のステップとなります。for文でgetFilesResultに格納されたファイル名を
取り出し、リクエストボディ格納してにそれぞれ並列処理検証関数を呼び出します。
tryステップ内で起きたエラーはretryステップにてキャッチされます。

retry:
  predicate: ${checkRetry}
  max_retries: 6
  backoff:
    initial_delay: 1
    max_delay: 32
    multiplier: 2

Workflowsにおけるエクスポネンシャルバックオフの肝の部分です。
max_retries:最大再試行回数(初回実行はカウント対象外)
initial_delay:初回遅延秒数
max_delay:最大遅延秒数
multiplier:乗数(直前の遅延秒数に掛け算される値)

initial_delayに1が設定されていて、multiplierが2、max_retriesが6の場合だと、待機時間は以下となります。
1回目の呼び出し:待機時間1秒
2回目の呼び出し:待機時間2秒(multiplier * 1回目の呼び出し待機時間 = 2 * 1)
3回目の呼び出し:待機時間4秒(multiplier * 2回目の呼び出し待機時間 = 2 * 2)
4回目の呼び出し:待機時間8秒(multiplier * 3回目の呼び出し待機時間 = 2 * 4)
5回目の呼び出し:待機時間16秒(multiplier * 4回目の呼び出し待機時間 = 2 * 8)
6回目の呼び出し:待機時間32秒(multiplier * 5回目の呼び出し待機時間 = 2 * 16)

checkRetry:
  params: [error_array]
  steps:
    - checkError:
        switch:
        - condition: ${("code" in error_array) and (error_array.code == 503)}
          return: True
    - nothingToDo:
        return: False

上記はリトライ判定のステップです。並列処理検証関数の実行結果のエラーコードが503の場合Trueを返却してリトライ処理が行われます。

実行して検証してみる

Workflowsを実行したところ、1分ほど経過してエラーとなりました。
呼び出している並列処理検証関数は[test1.txt]の場合、503しか返さないためです。

並列処理検証関数のログを見てみます。
test2.txtを引数にした場合は、503を返却せず正常終了しているため1回のみ呼び出されているのが確認できます(青枠です)。
test1.txtを引数にした呼び出しに関しては503を発生させているので全てエラーになっており、複数回呼び出されているのが確認できました。

なかなかみづらいのですが、伝わりましたでしょうか?

呼び出し時間 待機時間 呼び出し回数
2024-04-23 17:46:33.467 JST 0 1回目
2024-04-23 17:46:34.895 JST 1 2回目
2024-04-23 17:46:37.209 JST 2 3回目
2024-04-23 17:46:41.529 JST 4 4回目
2024-04-23 17:46:49.838 JST 8 5回目
2024-04-23 17:47:06.170 JST 16 6回目
2024-04-23 17:47:38.504 JST 32 7回目

max_retriesに6を指定しているので、合計7回関数の呼び出しが行われています。 max_retriesに設定した数値は初回実行はカウント対象外なので、6を指定すると1(初回実行) + 6(max_retries) = 7となるからです。
WorkflowsのYAMLに再試行ステップを用いることで、簡潔にエクスポネンシャルバックオフを実装することができました。

おわりに

再試行ステップを用いることで、判定回数の分岐や判定処理を実装することなく確実にエクスポネンシャルバックオフを実装することができとても嬉しくなりました。
再試行回数や秒数の設定は個々のワークロードによると思いますが、適切な値を設定しておけばAPIの一時的なエラーにも対応できて良いと考えます。個人的には今後Workflowsを本番環境で運用する際には実装したい機能だなと思いました。
この記事がどなたかのお役に立てば嬉しいです。それでは。

参考

再試行ステップ