Step Functions ExpressモードでSQSからの大容量メッセージ処理を試してみた!【CFn】
この記事は、AWS LambdaとServerless #1 Advent Calendar 2019の16日目の記事になります。
はじめに
reInvent2019でStep FunctionsのExpressモードがリリースされました。
[アップデート]Step Functionsに高パフォーマンス低コストのExpressワークフローが追加されました! #reinvent
通常モードとの比較表が公式ページに載っているので、参考にしてください。
コンソールを開くとExpressモードで使えるサンプルプロジェクトが早速追加されていました!
ということで、本記事ではStep Functionsのサンプルとして公式から提供されているProcess High Volume Messages from Amazon SQSのプロセスをご紹介します。
試してみた
コンソールから実行
サンプルプロジェクトを実行
を選択して SQSからの大容量メッセージを処理
を選択します。
選択するとフロー定義が以下の内容で補完されます。
{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "StepFunctionsSample-ExpressSQS6-Base64DecodeLambda-xxxx", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "StepFunctionsSample-ExpressSQS-GenerateStatsLambda-xxxx", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "StepFunctionsSample-ExpressSQS-StringCleanerLambda-xxxx", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "StepFunctionsSample-Express-TokenizerCounterLambda-xxxx", "Payload.$": "$" }, "End": true } } }
SQSメッセージをトリガにワークフローが実行されることの注意書きと上記のリンクが記載されたダイアログが表示されます。OKを押すとステップ2に進みます。
ステップ2の画面で作成されるリソース一覧が表示されます。
リソースのデプロイ
を押すと、CFnが実行されてリソースが作成されます。
実際のCFnテンプレート
AWSTemplateFormatVersion: "2010-09-09" Description: An example of using Express workflows to run text processing for each message sent from an SQS queue. Resources: ExpressLogGroup: Type: AWS::Logs::LogGroup ExpressStateMachineForTextProcessing: Type: "AWS::StepFunctions::StateMachine" Properties: StateMachineType: "EXPRESS" LoggingConfiguration: Level: ALL IncludeExecutionData: True Destinations: - CloudWatchLogsLogGroup: LogGroupArn: !GetAtt - ExpressLogGroup - Arn DefinitionString: !Sub - |- { "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:${AWS::Partition}:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "${Base64DecodeLambda}", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:${AWS::Partition}:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "${GenerateStatsLambda}", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:${AWS::Partition}:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "${StringCleanerLambda}", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:${AWS::Partition}:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "${TokenizerCounterLambda}", "Payload.$": "$" }, "End": true } } } - {Base64DecodeLambda: !Ref Base64DecodeLambda, StringCleanerLambda: !Ref StringCleanerLambda, TokenizerCounterLambda: !Ref TokenizerCounterLambda, GenerateStatsLambda: !Ref GenerateStatsLambda} RoleArn: !GetAtt [ StatesExecutionRole, Arn ] StatesExecutionRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: states.amazonaws.com Action: "sts:AssumeRole" Path: "/" Policies: - PolicyName: StatesExecutionPolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "lambda:InvokeFunction" Resource: - !GetAtt Base64DecodeLambda.Arn - !GetAtt StringCleanerLambda.Arn - !GetAtt TokenizerCounterLambda.Arn - !GetAtt GenerateStatsLambda.Arn - PolicyName: CloudWatchLogs PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'logs:CreateLogDelivery' - 'logs:GetLogDelivery' - 'logs:UpdateLogDelivery' - 'logs:DeleteLogDelivery' - 'logs:ListLogDeliveries' - 'logs:PutResourcePolicy' - 'logs:DescribeResourcePolicies' - 'logs:DescribeLogGroups' Resource: - '*' TriggerOnSQSQueueLambda: Type: "AWS::Lambda::Function" Properties: Handler: "index.lambda_handler" Role: !GetAtt [ SQSLambdaExecutionRole, Arn ] Code: ZipFile: !Sub - |- import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body ) - {ExpressStateMachineArn: !Ref ExpressStateMachineForTextProcessing} Runtime: "python3.7" Timeout: "30" EventSourceMappingForSQSQueueLambda: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True EventSourceArn: !GetAtt [ SQSQueue, Arn ] FunctionName: !Ref TriggerOnSQSQueueLambda Base64DecodeLambda: Type: "AWS::Lambda::Function" Properties: Handler: "index.lambda_handler" Role: !GetAtt [ BasicLambdaExecutionRole, Arn ] Code: ZipFile: | import base64 def lambda_handler(event, context): return { 'statusCode': 200, 'input': event['input'], 'output': base64.b64decode(event['input']).decode('utf-8') } Runtime: "python3.7" Timeout: "20" StringCleanerLambda: Type: "AWS::Lambda::Function" Properties: Handler: "index.lambda_handler" Role: !GetAtt [ BasicLambdaExecutionRole, Arn ] Code: ZipFile: | import re def lambda_handler(event, context): input = event['input'] cleaned_string = re.sub('\W+',' ', input) return { 'statusCode': 200, 'input': input, 'output': cleaned_string } Runtime: "python3.7" Timeout: "20" TokenizerCounterLambda: Type: "AWS::Lambda::Function" Properties: Handler: "index.lambda_handler" Role: !GetAtt [ BasicLambdaExecutionRole, Arn ] Code: ZipFile: | def lambda_handler(event, context): input = event['output'] lowered_string = " ".join(x.lower() for x in input.split()) lowered_string = lowered_string.replace('[^\w\s]','') split_string = lowered_string.split() word_count_map = {} for word in split_string: if word in word_count_map: word_count_map[word] += 1 else: word_count_map[word] = 1 return { 'statusCode': 200, 'input': input, 'word_counts': word_count_map } Runtime: "python3.7" Timeout: "20" GenerateStatsLambda: Type: "AWS::Lambda::Function" Properties: Handler: "index.lambda_handler" Role: !GetAtt [ BasicLambdaExecutionRole, Arn ] Code: ZipFile: | def avg_word_length(sentence): words = sentence.split() return (sum(len(word) for word in words)/len(words)) def lambda_handler(event, context): input = event['output'] stats = {} stats['text_length'] = len(input) stats['avg_word_length'] = avg_word_length(input) stats['num_digits'] = len([x for x in input.split() if x.isdigit()]) stats['num_special_chars'] = len([x for x in input.split() if not x.isalnum()]) return { 'statusCode': 200, 'stats': stats, 'input': input } Runtime: "python3.7" Timeout: "20" BasicLambdaExecutionRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: "sts:AssumeRole" SQSLambdaExecutionRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: "sts:AssumeRole" Policies: - PolicyName: LambdaSQSQueuePermissions PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "sqs:ReceiveMessage" - "sqs:DeleteMessage" - "sqs:GetQueueAttributes" Resource: !GetAtt SQSQueue.Arn - PolicyName: LambdaStateMachinePermissions PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "states:StartExecution" Resource: !Ref ExpressStateMachineForTextProcessing SQSQueue: Type: AWS::SQS::Queue Outputs: SQSResource: Value: Ref: SQSQueue SQSSampleMessage: Description: Sample input to SQS Queue Value: "{ \"input\": \"QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5oZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcmFjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3VuZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91YmxlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwgQW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu\" }" StateMachineArn: Value: Ref: ExpressStateMachineForTextProcessing ExecutionInput: Description: Sample input to StartExecution. Value: "{ \"input\": \"QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5oZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcmFjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3VuZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91YmxlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwgQW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu\" }"
StateMachineType
が EXPRESS
と指定されています。
リソース
CFnによって以下のリソースとSQSのサンプルメッセージまでが作成されます。
StateMachine
- ExpressStateMachineForTextProcessing
Lambda
- Base64DecodeLambda
- StringCleanerLambda
- GenerateStatsLambda
- TokenizerCounterLambda
- TriggerOnSQSQueueLambda
Lambdaイベントソースマッピング
- EventSourceMappingForSQSQueueLambda
SQSキュー
- SQSQueue
CloudWatchロググループ
- ExpressLogGroup
IAMロール
- BasicLambdaExecutionRole
- SQSLambdaExecutionRole
- StatesExecutionRole
実行
ステートマシンの作成が完了すると、実行画面にうつります。
入力値のサンプルも自動で入力されているのでそのまま 実行の開始
を押します。
ログの確認
通常モードでは、実行イベント履歴がStepFunctionsのコンソールで確認できますが、Expressモードでは実行イベント履歴は出ません。
なのでCloudWatch Logsで確認します。
Expressモードのコンソール上の表示はこのようになります。
モニタリングタブ
ログ記録タブ
実際に先程の実行ログをCloudWatchで見てみると、200のステータスコードで以下のレスポンスが返ってきてました。
And like the baseless fabric of this vision The cloud capped towers the gorgeous palaces The solemn temples the great globe itself Yea all which it inherit shall dissolve And like this insubstantial pageant faded Leave not a rack behind We are such stuff As dreams are made on and our little life Is rounded with a sleep Sir I am vexed Bear with my weakness My old brain is troubled Be not disturbed with my infirmity If you be pleased retire into my cell And there repose A turn or two I ll walk To still my beating mind
ちょっとあまり詳しくないんですが、シェイクスピアのメッセージだったようです!