[アップデート]Step Functionsで動的並列処理がサポートされました!

Step Functionsにアップデートがあり、動的並列処理がサポートされました!これにより、前段のステート等で動的に生成した配列に対して、繰り返し処理、おまけに並列処理を行う事が可能になりました。早速試してみたいと思います。
2019.09.19

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Step Functionsにアップデートがあり、動的並列処理がサポートされました!

これにより、前段のステート等で動的に生成した配列に対して、繰り返し処理、おまけに並列処理を行う事が可能になりました。

早速試してみたいと思います。

やってみるための準備

ステートマシンから呼び出す、シンプルなLambda関数を作成します。

ここでは、CloudWatch Logsに存在するロググループを取得して、 取得したロググループそれぞれに対して、繰り返し処理をしてみたいと思います。

Lambda関数作成

DescribeLogGroups関数

CloudWatch Logsから情報を取得するだけのシンプルな関数です。 取得結果をすべて呼び出し元(ここではステートマシン)に返しています。

import boto3
logs_client = boto3.client('logs')

def lambda_handler(event, context):
    #CloudWatch Logs情報取得
    response = logs_client.describe_log_groups()
    return response

PrintLogGroups関数

入力で受け取った値を書き出すだけのシンプルな関数です。

import boto3

logs_client = boto3.client('logs')

def lambda_handler(event, context):
    log_group_name = event['logGroupName']
    print(log_group_name)

    return log_group_name

ステートマシン

ステートマシンのワークフローは以下となります。

繰り返し並列処理するぞ感が可視化されていい感じです。

ステートマシンの定義は以下となります。

{
  "StartAt": "DescribeLogGroupsState",
  "States": {
    "DescribeLogGroupsState": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:DescribeLogGroups",
      "ResultPath": "$.describe_log_groups",
      "Next": "PrintLogGroupState"
    },
    "PrintLogGroupState": {
      "Type": "Map",
      "InputPath": "$.describe_log_groups",
      "ItemsPath": "$.logGroups",
      "MaxConcurrency": 1,
      "Iterator": {
        "StartAt": "IteratorState",
        "States": {
          "IteratorState": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:PrintLogGroups",
            "End": true
          }
        }
      },
      "End": true
    }
  }
}

定義について説明します。

DescribeLogGroupsStateステート

このステートが実行されると、Lambda関数(DescribeLogGroups)が実行されます。Lambda関数で戻した値がResultPathで指定した"describe_log_groups"に格納されます。ここでは、CloudWatch Logsのdescribe結果を戻しているので、以下のような値になります。

{
  "describe_log_groups": {
    "logGroups": [
      {
        "logGroupName": "/aws/lambda/DescribeExportTask",
        "creationTime": 1568873974706,
        "metricFilterCount": 0,
        "arn": "arn:aws:logs:ap-northeast-1:XXXXXXXXXXXX:log-group:/aws/lambda/DescribeExportTask:*",
        "storedBytes": 0
      },

      (省略)

      {
        "logGroupName": "/aws/sagemaker/TrainingJobs",
        "creationTime": 1567947081632,
        "metricFilterCount": 0,
        "arn": "arn:aws:logs:ap-northeast-1:XXXXXXXXXXXX:log-group:/aws/sagemaker/TrainingJobs:*",
        "storedBytes": 11247
      }
    ],
    "ResponseMetadata": {
      "RequestId": "2c574e1f-d270-4587-965e-8b7b78ccdfeb",
      "HTTPStatusCode": 200,
      "HTTPHeaders": {
        "x-amzn-requestid": "2c574e1f-d270-4587-965e-8b7b78ccdfeb",
        "content-type": "application/x-amz-json-1.1",
        "content-length": "1445",
        "date": "Thu, 19 Sep 2019 08:24:35 GMT"
      },
      "RetryAttempts": 0
    }
  }
}

PrintLogGroupStateステート

新たに追加されたステートTypeMapを利用しています。

InputPathに"describe_log_groups"を指定していますので、渡された入力のうち、"describe_log_groups"のみ、このステートで使用します。

ItemsPathには、入力の配列を指定します。ここでは、"$.logGroups"を指定していますので、以下のような値が実行時の入力に渡されます。

{
  "logGroupName": "/aws/lambda/DescribeExportTask",
  "creationTime": 1568873974706,
  "metricFilterCount": 0,
  "arn": "arn:aws:logs:ap-northeast-1:XXXXXXXXXXXX:log-group:/aws/lambda/DescribeExportTask:*",
  "storedBytes": 0
},

(省略)

{
  "logGroupName": "/aws/sagemaker/TrainingJobs",
  "creationTime": 1567947081632,
  "metricFilterCount": 0,
  "arn": "arn:aws:logs:ap-northeast-1:XXXXXXXXXXXX:log-group:/aws/sagemaker/TrainingJobs:*",
  "storedBytes": 11247
}

渡された配列の要素数分、Iteratorで指定したステートが実行されますので、ここでは"IteratorState"ステートが繰り返し実行されます。

MaxConcurrencyは、並列処理数の制限です。デフォルト値は"0"となります。これは、並列処理に制限を設けず、可能な限り同時にステートが呼び出しされます。ここでは、"1"を指定しているので、前の処理が完了するまで新しい繰り返し処理は開始されません。

Mapの詳細については、以下をご確認ください。

やってみた

上記定義を利用した実行結果になります。

インデックスを選択でき、処理を行った際の入出力を確認することができました。すごくわかりやすいです。

PrintLogGroups関数のログをみてみたいと思います。入力で渡された配列数分繰り返しLambda関数が起動されていることが確認できました。

ステートマシン定義内のMaxConcurrencyを"0"にすると、並列実行されていることも確認できました。

さいごに

これまで、Step Functionsで動的に繰り返し処理をするとなると、インデックス等をどこかで実装する必要がありました。先日、以下のブログを書いた際に、もっと楽に繰り返し処理ができれば..と感じていたところです。

今回のアップデートにより、ステートマシンの定義や、Lambda関数がこれまで以上にシンプルにでき、ワークフローの実装が捗りそうですね!!

参考