Step Functions ExpressモードでSQSからの大容量メッセージ処理を試してみた!【CFn】

この記事は、AWS LambdaとServerless #1 Advent Calendar 2019の16日目の記事になります。

はじめに

reInvent2019でStep FunctionsのExpressモードがリリースされました。

[アップデート]Step Functionsに高パフォーマンス低コストのExpressワークフローが追加されました! #reinvent

通常モードとの比較表が公式ページに載っているので、参考にしてください。

コンソールを開くとExpressモードで使えるサンプルプロジェクトが早速追加されていました!

ということで、本記事ではStep Functionsのサンプルとして公式から提供されているProcess High Volume Messages from Amazon SQSのプロセスをご紹介します。

試してみた

コンソールから実行

サンプルプロジェクトを実行 を選択して SQSからの大容量メッセージを処理 を選択します。

選択するとフロー定義が以下の内容で補完されます。

{
  "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.",
  "StartAt": "Decode base64 string",
  "States": {
    "Decode base64 string": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "FunctionName": "StepFunctionsSample-ExpressSQS6-Base64DecodeLambda-xxxx",
        "Payload.$": "$"
      },
      "Next": "Generate statistics"
    },
    "Generate statistics": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "FunctionName": "StepFunctionsSample-ExpressSQS-GenerateStatsLambda-xxxx",
        "Payload.$": "$"
      },
      "Next": "Remove special characters"
    },
    "Remove special characters": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "FunctionName": "StepFunctionsSample-ExpressSQS-StringCleanerLambda-xxxx",
        "Payload.$": "$"
      },
      "Next": "Tokenize and count"
    },
    "Tokenize and count": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "FunctionName": "StepFunctionsSample-Express-TokenizerCounterLambda-xxxx",
        "Payload.$": "$"
      },
      "End": true
    }
  }
}

SQSメッセージをトリガにワークフローが実行されることの注意書きと上記のリンクが記載されたダイアログが表示されます。OKを押すとステップ2に進みます。

ステップ2の画面で作成されるリソース一覧が表示されます。

リソースのデプロイ を押すと、CFnが実行されてリソースが作成されます。

実際のCFnテンプレート

AWSTemplateFormatVersion: "2010-09-09"
Description: An example of using Express workflows to run text processing for each message sent from an SQS queue.
Resources:
  ExpressLogGroup:
    Type: AWS::Logs::LogGroup
  ExpressStateMachineForTextProcessing:
    Type: "AWS::StepFunctions::StateMachine"
    Properties:
      StateMachineType: "EXPRESS"
      LoggingConfiguration:
        Level: ALL
        IncludeExecutionData: True
        Destinations:
          - CloudWatchLogsLogGroup:
                LogGroupArn: !GetAtt
                  - ExpressLogGroup
                  - Arn
      DefinitionString:
        !Sub
          - |-
            {
              "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.",
              "StartAt": "Decode base64 string",
              "States": {
                "Decode base64 string": {
                  "Type": "Task",
                  "Resource": "arn:${AWS::Partition}:states:::lambda:invoke",
                  "OutputPath": "$.Payload",
                  "Parameters": {
                    "FunctionName": "${Base64DecodeLambda}",
                    "Payload.$": "$"
                  },
                  "Next": "Generate statistics"
                },
                "Generate statistics": {
                  "Type": "Task",
                  "Resource": "arn:${AWS::Partition}:states:::lambda:invoke",
                  "OutputPath": "$.Payload",
                  "Parameters": {
                    "FunctionName": "${GenerateStatsLambda}",
                    "Payload.$": "$"
                  },
                  "Next": "Remove special characters"
                },
                "Remove special characters": {
                  "Type": "Task",
                  "Resource": "arn:${AWS::Partition}:states:::lambda:invoke",
                  "OutputPath": "$.Payload",
                  "Parameters": {
                    "FunctionName": "${StringCleanerLambda}",
                    "Payload.$": "$"
                  },
                  "Next": "Tokenize and count"
                },
                "Tokenize and count": {
                  "Type": "Task",
                  "Resource": "arn:${AWS::Partition}:states:::lambda:invoke",
                  "OutputPath": "$.Payload",
                  "Parameters": {
                    "FunctionName": "${TokenizerCounterLambda}",
                    "Payload.$": "$"
                  },
                  "End": true
                }
              }
            }
          - {Base64DecodeLambda: !Ref Base64DecodeLambda, StringCleanerLambda: !Ref StringCleanerLambda, TokenizerCounterLambda: !Ref TokenizerCounterLambda, GenerateStatsLambda: !Ref GenerateStatsLambda}
      RoleArn: !GetAtt [ StatesExecutionRole, Arn ]
  StatesExecutionRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service: states.amazonaws.com
            Action: "sts:AssumeRole"
      Path: "/"
      Policies:
        - PolicyName: StatesExecutionPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - "lambda:InvokeFunction"
                Resource:
                  - !GetAtt Base64DecodeLambda.Arn
                  - !GetAtt StringCleanerLambda.Arn
                  - !GetAtt TokenizerCounterLambda.Arn
                  - !GetAtt GenerateStatsLambda.Arn
        - PolicyName: CloudWatchLogs
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 'logs:CreateLogDelivery'
                  - 'logs:GetLogDelivery'
                  - 'logs:UpdateLogDelivery'
                  - 'logs:DeleteLogDelivery'
                  - 'logs:ListLogDeliveries'
                  - 'logs:PutResourcePolicy'
                  - 'logs:DescribeResourcePolicies'
                  - 'logs:DescribeLogGroups'
                Resource:
                  - '*'
  TriggerOnSQSQueueLambda:
    Type: "AWS::Lambda::Function"
    Properties:
      Handler: "index.lambda_handler"
      Role: !GetAtt [ SQSLambdaExecutionRole, Arn ]
      Code:
        ZipFile:
          !Sub
            - |-
              import boto3

              def lambda_handler(event, context):
                  message_body = event['Records'][0]['body']
                  client = boto3.client('stepfunctions')
                  response = client.start_execution(
                      stateMachineArn='${ExpressStateMachineArn}',
                      input=message_body
                  )
            - {ExpressStateMachineArn: !Ref ExpressStateMachineForTextProcessing}
      Runtime: "python3.7"
      Timeout: "30"
  EventSourceMappingForSQSQueueLambda:
    Type: AWS::Lambda::EventSourceMapping
    Properties: 
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt [ SQSQueue, Arn ]
      FunctionName: !Ref TriggerOnSQSQueueLambda
  Base64DecodeLambda:
    Type: "AWS::Lambda::Function"
    Properties:
      Handler: "index.lambda_handler"
      Role: !GetAtt [ BasicLambdaExecutionRole, Arn ]
      Code:
        ZipFile: |
          import base64

          def lambda_handler(event, context):
              return {
                  'statusCode': 200,
                  'input': event['input'],
                  'output': base64.b64decode(event['input']).decode('utf-8')
              }
      Runtime: "python3.7"
      Timeout: "20"
  StringCleanerLambda:
    Type: "AWS::Lambda::Function"
    Properties:
      Handler: "index.lambda_handler"
      Role: !GetAtt [ BasicLambdaExecutionRole, Arn ]
      Code:
        ZipFile: |
          import re

          def lambda_handler(event, context):
              input = event['input']
              cleaned_string = re.sub('\W+',' ', input)
              return {
                  'statusCode': 200,
                  'input': input,
                  'output': cleaned_string
              }
      Runtime: "python3.7"
      Timeout: "20"
  TokenizerCounterLambda:
    Type: "AWS::Lambda::Function"
    Properties:
      Handler: "index.lambda_handler"
      Role: !GetAtt [ BasicLambdaExecutionRole, Arn ]
      Code:
        ZipFile: |

          def lambda_handler(event, context):
              input = event['output']
              lowered_string = " ".join(x.lower() for x in input.split())
              lowered_string = lowered_string.replace('[^\w\s]','')
              split_string = lowered_string.split()
              word_count_map = {}
              for word in split_string:
                  if word in word_count_map:
                      word_count_map[word] += 1
                  else:
                      word_count_map[word] = 1
              return {
                  'statusCode': 200,
                  'input': input,
                  'word_counts': word_count_map
              }
      Runtime: "python3.7"
      Timeout: "20"
  GenerateStatsLambda:
    Type: "AWS::Lambda::Function"
    Properties:
      Handler: "index.lambda_handler"
      Role: !GetAtt [ BasicLambdaExecutionRole, Arn ]
      Code:
        ZipFile: |

          def avg_word_length(sentence):
            words = sentence.split()
            return (sum(len(word) for word in words)/len(words))

          def lambda_handler(event, context):
              input = event['output']
              stats = {}
              stats['text_length'] = len(input)
              stats['avg_word_length'] = avg_word_length(input)
              stats['num_digits'] = len([x for x in input.split() if x.isdigit()])
              stats['num_special_chars'] = len([x for x in input.split() if not x.isalnum()])
              return {
                  'statusCode': 200,
                  'stats': stats,
                  'input': input
              }
      Runtime: "python3.7"
      Timeout: "20"
  BasicLambdaExecutionRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: "sts:AssumeRole"
  SQSLambdaExecutionRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: "sts:AssumeRole"
      Policies:
        - PolicyName: LambdaSQSQueuePermissions
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - "sqs:ReceiveMessage"
                  - "sqs:DeleteMessage"
                  - "sqs:GetQueueAttributes"
                Resource: !GetAtt SQSQueue.Arn
        - PolicyName: LambdaStateMachinePermissions
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - "states:StartExecution"
                Resource: !Ref ExpressStateMachineForTextProcessing
  SQSQueue:
    Type: AWS::SQS::Queue
Outputs:
  SQSResource:
    Value:
      Ref: SQSQueue
  SQSSampleMessage:
    Description: Sample input to SQS Queue
    Value: "{
      \"input\": \"QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5oZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcmFjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3VuZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91YmxlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwgQW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu\"
    }"
  StateMachineArn:
    Value:
      Ref: ExpressStateMachineForTextProcessing
  ExecutionInput:
    Description: Sample input to StartExecution.
    Value: "{
      \"input\": \"QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5oZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcmFjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3VuZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91YmxlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwgQW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu\"
    }"

StateMachineTypeEXPRESS と指定されています。

リソース

CFnによって以下のリソースとSQSのサンプルメッセージまでが作成されます。

StateMachine

  • ExpressStateMachineForTextProcessing

Lambda

  • Base64DecodeLambda
  • StringCleanerLambda
  • GenerateStatsLambda
  • TokenizerCounterLambda
  • TriggerOnSQSQueueLambda

Lambdaイベントソースマッピング

  • EventSourceMappingForSQSQueueLambda

SQSキュー

  • SQSQueue

CloudWatchロググループ

  • ExpressLogGroup

IAMロール

  • BasicLambdaExecutionRole
  • SQSLambdaExecutionRole
  • StatesExecutionRole

実行

ステートマシンの作成が完了すると、実行画面にうつります。

入力値のサンプルも自動で入力されているのでそのまま 実行の開始 を押します。

ログの確認

通常モードでは、実行イベント履歴がStepFunctionsのコンソールで確認できますが、Expressモードでは実行イベント履歴は出ません
なのでCloudWatch Logsで確認します。

Expressモードのコンソール上の表示はこのようになります。

モニタリングタブ

ログ記録タブ

実際に先程の実行ログをCloudWatchで見てみると、200のステータスコードで以下のレスポンスが返ってきてました。

And like the baseless fabric of this vision The cloud capped towers the gorgeous palaces The solemn temples the great globe itself Yea all which it inherit shall dissolve And like this insubstantial pageant faded Leave not a rack behind We are such stuff As dreams are made on and our little life Is rounded with a sleep Sir I am vexed Bear with my weakness My old brain is troubled Be not disturbed with my infirmity If you be pleased retire into my cell And there repose A turn or two I ll walk To still my beating mind

ちょっとあまり詳しくないんですが、シェイクスピアのメッセージだったようです!