この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Step Functionsにアップデートがあり、動的並列処理がサポートされました!
これにより、前段のステート等で動的に生成した配列に対して、繰り返し処理、おまけに並列処理を行う事が可能になりました。
早速試してみたいと思います。
やってみるための準備
ステートマシンから呼び出す、シンプルなLambda関数を作成します。
ここでは、CloudWatch Logsに存在するロググループを取得して、 取得したロググループそれぞれに対して、繰り返し処理をしてみたいと思います。
Lambda関数作成
DescribeLogGroups関数
CloudWatch Logsから情報を取得するだけのシンプルな関数です。 取得結果をすべて呼び出し元(ここではステートマシン)に返しています。
import boto3
logs_client = boto3.client('logs')
def lambda_handler(event, context):
#CloudWatch Logs情報取得
response = logs_client.describe_log_groups()
return response
PrintLogGroups関数
入力で受け取った値を書き出すだけのシンプルな関数です。
import boto3
logs_client = boto3.client('logs')
def lambda_handler(event, context):
log_group_name = event['logGroupName']
print(log_group_name)
return log_group_name
ステートマシン
ステートマシンのワークフローは以下となります。
繰り返し並列処理するぞ感が可視化されていい感じです。
ステートマシンの定義は以下となります。
{
"StartAt": "DescribeLogGroupsState",
"States": {
"DescribeLogGroupsState": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:DescribeLogGroups",
"ResultPath": "$.describe_log_groups",
"Next": "PrintLogGroupState"
},
"PrintLogGroupState": {
"Type": "Map",
"InputPath": "$.describe_log_groups",
"ItemsPath": "$.logGroups",
"MaxConcurrency": 1,
"Iterator": {
"StartAt": "IteratorState",
"States": {
"IteratorState": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:PrintLogGroups",
"End": true
}
}
},
"End": true
}
}
}
定義について説明します。
DescribeLogGroupsStateステート
このステートが実行されると、Lambda関数(DescribeLogGroups)が実行されます。Lambda関数で戻した値がResultPath
で指定した"describe_log_groups"に格納されます。ここでは、CloudWatch Logsのdescribe結果を戻しているので、以下のような値になります。
{
"describe_log_groups": {
"logGroups": [
{
"logGroupName": "/aws/lambda/DescribeExportTask",
"creationTime": 1568873974706,
"metricFilterCount": 0,
"arn": "arn:aws:logs:ap-northeast-1:XXXXXXXXXXXX:log-group:/aws/lambda/DescribeExportTask:*",
"storedBytes": 0
},
(省略)
{
"logGroupName": "/aws/sagemaker/TrainingJobs",
"creationTime": 1567947081632,
"metricFilterCount": 0,
"arn": "arn:aws:logs:ap-northeast-1:XXXXXXXXXXXX:log-group:/aws/sagemaker/TrainingJobs:*",
"storedBytes": 11247
}
],
"ResponseMetadata": {
"RequestId": "2c574e1f-d270-4587-965e-8b7b78ccdfeb",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amzn-requestid": "2c574e1f-d270-4587-965e-8b7b78ccdfeb",
"content-type": "application/x-amz-json-1.1",
"content-length": "1445",
"date": "Thu, 19 Sep 2019 08:24:35 GMT"
},
"RetryAttempts": 0
}
}
}
PrintLogGroupStateステート
新たに追加されたステートTypeMap
を利用しています。
InputPath
に"describe_log_groups"を指定していますので、渡された入力のうち、"describe_log_groups"のみ、このステートで使用します。
ItemsPath
には、入力の配列を指定します。ここでは、"$.logGroups"を指定していますので、以下のような値が実行時の入力に渡されます。
{
"logGroupName": "/aws/lambda/DescribeExportTask",
"creationTime": 1568873974706,
"metricFilterCount": 0,
"arn": "arn:aws:logs:ap-northeast-1:XXXXXXXXXXXX:log-group:/aws/lambda/DescribeExportTask:*",
"storedBytes": 0
},
(省略)
{
"logGroupName": "/aws/sagemaker/TrainingJobs",
"creationTime": 1567947081632,
"metricFilterCount": 0,
"arn": "arn:aws:logs:ap-northeast-1:XXXXXXXXXXXX:log-group:/aws/sagemaker/TrainingJobs:*",
"storedBytes": 11247
}
渡された配列の要素数分、Iterator
で指定したステートが実行されますので、ここでは"IteratorState"ステートが繰り返し実行されます。
MaxConcurrency
は、並列処理数の制限です。デフォルト値は"0"となります。これは、並列処理に制限を設けず、可能な限り同時にステートが呼び出しされます。ここでは、"1"を指定しているので、前の処理が完了するまで新しい繰り返し処理は開始されません。
Mapの詳細については、以下をご確認ください。
やってみた
上記定義を利用した実行結果になります。
インデックスを選択でき、処理を行った際の入出力を確認することができました。すごくわかりやすいです。
PrintLogGroups関数のログをみてみたいと思います。入力で渡された配列数分繰り返しLambda関数が起動されていることが確認できました。
ステートマシン定義内のMaxConcurrency
を"0"にすると、並列実行されていることも確認できました。
さいごに
これまで、Step Functionsで動的に繰り返し処理をするとなると、インデックス等をどこかで実装する必要がありました。先日、以下のブログを書いた際に、もっと楽に繰り返し処理ができれば..と感じていたところです。
今回のアップデートにより、ステートマシンの定義や、Lambda関数がこれまで以上にシンプルにでき、ワークフローの実装が捗りそうですね!!