この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
具体的なユースケース
Step FunctionsのMapステートを利用している方で、以下2つのケースを満たしたい方向けです。
- Mapステートを利用していて、エラー時に並列処理を中断させたくない
- Mapステート内で1つでもエラーがある場合、ステートマシンを失敗させたい
前置き
Mapステート内でエラーをキャッチした上で、ステートマシンを失敗させるなんてなんでややこしいことするの?と思うかもしれませんが、まずはMapステートの動作を理解する必要があります。
Mapステートのデフォルト動作
まずエラー時にMapステートのデフォルト動作を確認してみます。サンプルとして全リージョンにMapステートを並列実行するものを利用しています。
Map内の並列処理で1つでも失敗した場合、Mapステートが失敗となりステートマシン自体も失敗となりました。
このとき他の並列処理はキャンセル扱いとなってしまうため、最後まで処理されずにキャンセルされます。今回は1つのリージョンで処理が失敗したため、処理途中の13リージョンが途中でキャンセルされています。
このようなケースでは、失敗した1リージョンに原因があるのかわかりません、今回キャンセルされたリージョンが次の実行時にエラーとなる可能性もあるため、全リージョン実行して結果を確認したくなります。
並列処理を停止させない実装
そこで、並列処理を停止させずに完了させる方法を調べると、Map内でエラーをキャッチすることでMapステート全体が失敗することを防ぐことができるようです。
上記を参考にエラーをキャッチするステップをMap内に追加してみました。CDKのコードは以下ブログを参考にしてください。
これで実行してみるとエラーキャッチのステップ自体は成功となるため、想定通りMapステートが失敗することなく並列処理が実行されます。
これでMapステートが途中で止まることなく、全リージョンの処理を成功させることができました。
しかし、この場合はエラーがMap内でキャッチされてしまうためステートマシン自体が成功扱いとなります。(Mapステートが成功となるため)
Map処理を中断させないことは達成できましたが、このままではエラーに気づけない恐れが…。
そこで解決策の1つとして、Mapステート外でエラーを判定してステートマシンを失敗させる方法を考えました。
並列処理を失敗させずにステートマシンを失敗させる
前置きが長くなりましたが、ここからが本エントリの本題です。
考えた結果、以下のような形で実装してみました。Mapステートの後にMapステートの出力編集(result)とエラー判定(choice)し、成功(success)と失敗(failed)のステップへ分岐するようにしています。
ステートマシン
CDKサンプル
実装はCDK(Typescript)で行なっています。
./lib/aws-cdk-multi-region-sfn-stack.ts
import { Construct } from 'constructs';
import {
Stack,
StackProps,
Duration,
aws_iam as iam,
aws_lambda as lambda,
aws_stepfunctions as sfn,
aws_stepfunctions_tasks as tasks,
} from 'aws-cdk-lib';
export class AwsCdkMultiRegionSfnStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
//IAMロール
const lambdaRole = new iam.Role(this, "lambdaRole", {
assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
});
lambdaRole.addManagedPolicy(
iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole")
);
lambdaRole.addManagedPolicy(
iam.ManagedPolicy.fromAwsManagedPolicyName("AmazonEC2ReadOnlyAccess")
);
//リージョン一覧を取得するLambda
const createGetRegionLambdaFunction = new lambda.Function(this, 'createGetRegionLambdaFunction', {
code: new lambda.AssetCode("lambda"),
runtime: lambda.Runtime.PYTHON_3_9,
functionName: "GetRegionLambdaFunction",
handler: 'get_regions.handler',
role: lambdaRole,
timeout: Duration.seconds(180),
});
//リージョンを出力するLambda
const createPrintRegionLambdaFunction = new lambda.Function(this, 'createPrintRegionLambdaFunction', {
code: new lambda.AssetCode("lambda"),
runtime: lambda.Runtime.PYTHON_3_9,
functionName: "PrintRegionLambdaFunction",
handler: 'print_region.handler',
role: lambdaRole,
timeout: Duration.seconds(180),
});
const getRegionsTask = new tasks.LambdaInvoke(this, 'getRegionsTask', {
lambdaFunction: createGetRegionLambdaFunction,
resultPath: '$.getRegions',
});
const printRegionTask = new tasks.LambdaInvoke(this, 'printRegionTask', {
lambdaFunction: createPrintRegionLambdaFunction,
resultSelector: {
"Region.$": "$.Payload",
},
});
//Mapステート
const printRegionMap = new sfn.Map(this, 'printRegionMap', {
inputPath: "$.getRegions.Payload.Regions",
resultPath: "$.printRegionMap"
});
//MAPのイテレーターにRegion出力タスクを指定
printRegionMap.iterator(printRegionTask)
//MAP内エラー取得
const printRegionTaskError = new sfn.Pass(this, 'printRegionTaskError')
printRegionTask.addCatch(printRegionTaskError, { resultPath: '$.Error' })
// result(結果の文字列を結合)
const result = new sfn.Pass(this, 'result', {
parameters: {
"jsonToString.$": "States.JsonToString($.printRegionMap)"
},
resultPath: "$.result"
});
// 失敗
const failed = new sfn.Fail(this, 'failed');
// 成功
const success = new sfn.Succeed(this, 'success');
// エラー判定
const check = new sfn.Choice(this, 'check');
check.when(
sfn.Condition.stringMatches(sfn.JsonPath.stringAt("$.result.jsonToString"), "*Error*"),
failed
)
check.otherwise(success)
//ステートマシン
const definition = getRegionsTask.next(printRegionMap).next(result).next(check);
new sfn.StateMachine(this, 'RegionMapStateMachine', {
stateMachineName: "RegionMapStateMachine",
definition: definition,
});
}
}
Lambda(Python)
Lambdaはリージョン取得と、出力用で2つを用意しました。
リージョン取得用(getRegionsTask)
リージョンを取得したあと、リストの形式だとステートマシン内で扱いにくいため、オブジェクト(辞書型)の形式にしてreturnしています。
./lambda/get_regions.py
import boto3
def handler(event, context):
ec2 = boto3.client('ec2')
regions = list(map(lambda x: x['RegionName'], ec2.describe_regions()['Regions']))
ret = []
for region in regions:
ret.append(
{
"Region": region,
}
)
return {"Regions": ret}
リージョン出力用(printRegionTask)
こちらはインプットとしてRegionを取得してprintだけしています。今回はMap内で1つだけエラーとしたいため、東京リージョンのみエラーになるようにしています。
./lambda/print_region.py
def handler(event, context):
region = event.get("Region")
print(region)
if region == "ap-northeast-1":
raise
return region
これらをCDKでデプロイしています。
やってみる
CDKでデプロイできたら、実際にエラーがうまく判定できるか試してみます。インプットは不要なので、そのまま実行してみるとするとステートマシンは想定通り失敗となりました。
Mapステートは成功していますが、後続のchoiceで failed に分岐しています。これは、Mapステートで並列実行された中でエラーとなったことを判定されたためです。複数リージョン失敗した場合でも、同じようにfailedに遷移します。
ここではどのようにエラー判定が行われているか分かりにくいので、実際の入出力を追ってみます。(全て記載すると、不要な情報も多いため解説に不要な部分は省略しています。)
入出力の確認
Mapステートの出力は各リージョンごとの結果が含まれるため、配列の形で出力されます。以下の場合、東京リージョンのみがエラーとなっているケースです。
Mapステート出力
"printRegionMap": [
{
"Region": "eu-north-1"
},
{
"Region": "ap-northeast-1",
"Error": {
"Error": "RuntimeError",
"Cause": "{\"errorMessage\":\"No active exception to reraise\",\"errorType\":\"RuntimeError\",\"requestId\":\"369e40bb-deb8-429d-af0d-058307aed690\",\"stackTrace\":[\" File \\\"/var/task/print_region.py\\\", line 6, in handler\\n raise\\n\"]}"
}
},
{
"Region": "sa-east-1"
}
~~略~~
]
このままではエラーの判定ができないため、Mapステートの入力を次の result ステップで文字列を連結します。
result出力
"result": {
"jsonToString": "[{\"Region\":\"eu-north-1\"},{\"Region\":\"ap-northeast-1\",\"Error\":{\"Error\":\"RuntimeError\",\"Cause\":\"{\\\"errorMessage\\\":\\\"No active exception to reraise\\\",\\\"errorType\\\":\\\"RuntimeError\\\",\\\"requestId\\\":\\\"369e40bb-deb8-429d-af0d-058307aed690\\\",\\\"stackTrace\\\":[\\\" File \\\\\\\"/var/task/print_region.py\\\\\\\", line 6, in handler\\\\n raise\\\\n\\\"]}\"}},{\"Region\":\"sa-east-1\"}]"
}
CDK実装では以下の部分です。Mapステートの出力($.printRegionMap)を結合した結果をパラメータjsonToStringとして設定。出力先を$.resultとしています。
// result(結果の文字列を結合)
const result = new sfn.Pass(this, 'result', {
parameters: {
"jsonToString.$": "States.JsonToString($.printRegionMap)"
},
resultPath: "$.result"
});
これでMapの出力が文字列として扱えるようになりました。
次に Choice のステップではresultの結果にError
の文字列が含まれているかを判定。含まれている場合はfailedへ遷移するようにしています。
CDKでは以下の部分です。
// エラー判定
const check = new sfn.Choice(this, 'check');
check.when(
sfn.Condition.stringMatches(sfn.JsonPath.stringAt("$.result.jsonToString"), "*Error*"),
failed
)
check.otherwise(success)
一応成功するパターンも試しましたが、問題なくsuccessに遷移しました。
おわりに
これで無事にMapステートを失敗させずに、ステートマシンを失敗させることができました。これに合わせてステートマシンの失敗を通知する仕組みがあればエラーに気づくことができそうですね。
一応Mapステート内でエラー時に通知するステップを追加する方法も考えたのですが、全ての並列実行がエラーとなった通知量がすごいことになりそうなのでやめました。
ステートマシンの実行完了を通知する仕組みは、以下を参考にすると幸せになれるかもしれません。
AWS Step Functionsの実行完了をAWS Chatbotを使っていい感じにSlackに通知する | DevelopersIO
Step FunctionsでMapを実装している方の助けになれば幸いです。