AWS Step Functionsにサーキットブレーカーパターンを組み込んでみた

2022.11.30

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

AWS Step Functionsステートマシン実行中、再試行で回避できない予期しない障害が発生したときは、再試行せず、失敗系フローに遷移する必要があります。

ステートマシンをまたいで障害を管理できるよう、Amazon DynamoDBで障害を管理し、ステートマシンの安定性と回復性を向上する方法を紹介します。

サーキットブレーカーパターンをStep Functionsのステートマシンに組み込んだソリューションとも言えます。

基本的な考え方は次のAWSブログを参考にし、C#の実装をPythonに変更し、DynamoDBのTTLも使わずに実装をシンプルにしています。

Using the circuit breaker pattern with AWS Step Functions and Amazon DynamoDB | AWS Compute Blog

想定するユースケース

  • ステートマシンに依存関係にあり、依存するステートマシンの実行が失敗したら、後続のステートマシンを処理したくない
  • 運用のために、ステートマシンが実行されないよう、Step Functions外からコントロールしたい

といったユースケースを想定しています。

AWS Step Functionsへの組み込み

障害はDynamoDBで管理し、処理名をキーに障害発生時刻を登録し、現在時刻と比較して発生時刻が閾値内の場合、障害と判定します。

依存されるステートマシンでは、処理が失敗した時に、DynamoDBに障害情報を登録する(サーキットを中断・オープン状態にする)タスクを用意し、依存するステートマシンでは、処理の前に、障害が起きていないこと(サーキットがクローズド状態であること)を確認するタスクを用意します。

※ 図は 公式ブログから引用

Step Functionsのステートマシンは下図の通りです。

{
  "StartAt": "Get Circuit Status",
  "States": {
    "Get Circuit Status": {
      "Next": "Is Circuit Closed?",
      "Type": "Task",
      "Comment": "Get Circuit Status",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:GetCircuitStatus:$LATEST"
    },
    "Is Circuit Closed?": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.CircuitStatus",
          "StringEquals": "OPEN",
          "Next": "Circuit Open"
        },
        {
          "Variable": "$.CircuitStatus",
          "StringEquals": "CLOSED",
          "Next": "Execute Lambda"
        },
        {
          "Variable": "$.CircuitStatus",
          "StringEquals": "",
          "Next": "Execute Lambda"
        }
      ]
    },
    "Circuit Open": {
      "Type": "Fail"
    },
    "Update Circuit Status": {
      "Next": "Circuit Open",
      "Type": "Task",
      "Comment": "Update Circuit Status",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:UpdateCircuitStatus:$LATEST"
    },
    "Execute Lambda": {
      "Next": "Circuit Closed",
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:JobExe:$LATEST",
      "Comment": "Task to execute lambda. This will set circuit status to OPEN if the execution fails for three times or the task times out",
      "TimeoutSeconds": 12,
      "Retry": [
        {
          "BackoffRate": 1.5,
          "MaxAttempts": 3,
          "IntervalSeconds": 2,
          "ErrorEquals": [
            "States.TaskFailed",
            "States.Timeout"
          ]
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.TaskFailed",
            "States.Timeout"
          ],
          "Next": "Update Circuit Status",
          "ResultPath": "$.taskresult"
        }
      ]
    },
    "Circuit Closed": {
      "Type": "Succeed"
    }
  }
}

以下の3つの処理があります

  • 障害を確認するタスク(Get Circuit Status)
  • 障害情報を登録するタスク(Update Circuit Status)
  • メイン処理(Execute Lambda)

サンプルコードでは全てLambda関数で実装しています。

メイン処理は要件に合わせて変更してください。

サーキットブレーカーに関わる処理について軽く触れます。

障害情報を登録するタスク(Update Circuit Status)

メイン処理中に予期しない障害が起きた場合、DynamoDBにサービス(処理)をキーにタイムスタンプを登録します。

import boto3
import time

dynamodb = boto3.resource('dynamodb')

TABLE_NAME = 'CircuitStatus'
KEY = 'Test'

table = dynamodb.Table(TABLE_NAME)

def lambda_handler(event, context):
    table.put_item(
      Item={
        'ServiceName' : KEY,
        'Timestamp' : int(time.time())
      }
    )

次の様なアイテムが登録されます。

ServiceName Timestamp
Test 1669749625

Step Functions外からサーキットを強制的にOPENにしたい場合、このLambda関数(Update Circuit Status)を直接呼び出します。

障害を確認するタスク(Get Circuit Status)

DynamoDBのテーブルを確認し、障害が起きていないか確認します。

メイン処理を実行する前に、障害が起きていないかDynamoDBに確認します。

このサンプルコードでは、キーが存在し、かつタイムスタンプが障害発生から15分以内(TTL_THRESHOLD = 15 * 60 # in seconds)であれば障害発生中と判定しています。

サーキットの状態に応じて

  • CLOSED : 障害が起きていない
  • OPEN : 障害が起きている

のどちらかを返します。

import boto3
import time

dynamodb = boto3.resource('dynamodb')

TABLE_NAME = 'CircuitStatus'
KEY = 'Test'

table = dynamodb.Table(TABLE_NAME)

def lambda_handler(event, context):
    TTL_THRESHOLD = 15 * 60 # in seconds
    TTL_THRESHOLD += time.time()

    res = table.get_item(
      Key={
          'ServiceName' : KEY
        }
    )
    
    item = res.get('Item')
    
    if item and item['Timestamp'] < TTL_THRESHOLD:
        status = 'OPEN'
    else:
        status = 'CLOSED'

    return {'CircuitStatus' : status}

動作確認

以降では、サーキットの状態に応じたフローを確認します。

正常系(サーキットがCLOSED状態)

まずは、障害が起きておらず、メイン処理も正常に実行できた場合です。

すべてのタスクは緑色です。

この段階では、DynamoDBテーブル内は空です。

ServiceName Timestamp 現在時刻
- - 1669749525

障害発生(サーキットをCLOSEDからOPENに変更)

メイン処理で障害が発生した場合、Update Circuit Status タスクにより、DynamoDBに障害アイテムを登録し、サーキットをオープン状態にします。

ServiceName Timestamp 現在時刻
Test 1669749625 1669749625

※ Timestamp = 現在時刻

障害発生中(サーキットがOPEN状態)

障害発生中にステートマシンが呼ばれると、Get Circuit Status タスクはCLOSEDを返し、メイン処理は呼ばれません。

ServiceName Timestamp 現在時刻
Test 1669749625 1669749925

※ Timestamp < 現在時刻 < Timestamp + TTL

障害から復帰(サーキットがOPENからCLOSEDに推移)

TTLが過ぎたあとで再度ステートマシンが呼ばれると、Get Circuit Status タスクはOPENを返し、メイン処理が呼ばれます。

ServiceName Timestamp 現在時刻
Test 1669749625 1669750825
  • TTL : 15分
  • 現在時刻 - Timestamp = 1200 = 20分

※ Timestamp + TTL < 現在時刻

再び障害発生(サーキットをCLOSEDからOPENに変更)

再び、メイン処理で障害が発生した場合、Update Circuit Status タスクにより、DynamoDBの既存アイテムのタイムスタンプを更新し、サーキットをオープン状態にします。

ServiceName Timestamp 現在時刻
Test 1669751125 1669751125

※ Timestamp = 現在時刻

最後に

Step FunctionsにDynamoDBとLambda関数でサーキットブレーカーを組み込む方法を紹介しました。

タスクやステートマシンで依存関係があるような処理で、例外処理を実行したいときに利用できます。

本記事の元になった記事では、同様のことがC#とCDKで実装されています。 サンプルコードがGitHubで公開されているため、一度覗いてみて下さい。

GitHub - aws-samples/circuit-breaker-netcore-blog

参考