Amazon SQSを使ってAWS Step Functionsで長時間かかる処理が終わるのを待つ

Amazon SQSのFIFOキューを使うことで正確にメッセージの処理状態を把握することができます。 このことを活用することで外部APIコールなどの長時間かかる可能性のある処理が終わるのを待ってから次のメッセージの処理に移る実装ができます。
2021.03.17

AWS Step Functionsの中に(外部APIの呼び出しなどで)長時間かかることもある処理がある場合に、その処理の結果を待ち、重複を防ぐために処理中は新しい処理のリクエストをしないという仕組みが必要になり、実装してみました。

重複の発生する実装(SQS未使用の実装)

次の手順で1分毎に実行するStep Functionsを定義します。

$ sls create --template aws-python3 --path sample-app
$ cd sample-app
$ sls plugin install -n serverless-python-requirements
$ sls plugin install -n serverless-prune-plugin
$ sls plugin install -n serverless-step-functions
$ sls plugin install -n serverless-pseudo-parameters

serverless.ymlを次のように記述します。

service: sample-app

plugins:
  - serverless-python-requirements
  - serverless-prune-plugin
  - serverless-step-functions
  - serverless-pseudo-parameters

custom:
  defaultStage: dev
  defaultRegion: ap-northeast-1
  defaultProfile: default
  enableTrigger:
    dev: true
    stg: false
    prod: false
  prune:
    automatic: true
    number: 3

provider:
  name: aws
  runtime: python3.8
  stage: ${opt:stage, self:custom.defaultStage}
  profile: ${opt:profile, self:custom.defaultProfile}
  memorySize: 128
  region: ${opt:region, self:custom.defaultRegion}

functions:
  CallSomeLongApi:
    handler: handler.lambda_handler
    name: "sample-app-${self:provider.stage}-call-some-long-api"
    description: 'レスポンスが長時間得られないこともあるAPIの呼び出し'
    environment:
      STAGE: ${self:provider.stage}
    package:
      include:
        - /**
    timeout: 120

stepFunctions:
  stateMachines:
    CallSomeLongApiOrchestrator:
      name: "sample-app-${self:provider.stage}-call-some-long-api-orchestrator"
      events:
        - schedule:
           name: "some-app-${self:provider.stage}-call-some-long-api-orchestrator"
           description: '1分ごとに処理を実行する'
           rate: rate(1 minute)
           enabled: ${self:custom.enableTrigger.${self:provider.stage}}
      definition:
        StartAt: Task Call Some Long Api
        States:
          Task Call Some Long Api:
            Type: Task
            Resource:
              Fn::GetAtt: [CallSomeLongApi, Arn]
            InputPath: "$"
            ResultPath: "$.body"
            OutputPath: "$"
            End: true

ステートマシンは次の構造になります。

ステートマシンの構造

「Task Call Some Long Api」で実行する処理をhandler.pyに記述します。 handler.pyを次のように記述します。

import json
import time
import random
import logging

logging.basicConfig(
    format="%(asctime)s %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s"
)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    body = {
        "message": "Go Serverless v1.0! Your function executed successfully!",
        "input": event
    }

    response = {
        "statusCode": 200,
        "body": json.dumps(body)
    }

    # 長時間かかる場合もあるAPIを呼び出す処理に見立てる
    process_time = random.randint(40, 120)
    logger.info("%d秒かかるAPIコールが発生しています。", process_time)
    time.sleep(process_time)

    return response

time.sleepで40秒から120秒かかるAPI処理に見立てたhandler.lambda_handlerを1分単位に実行することで、次画像のように複数の処理が同時に走る状況を作りました。

ステートマシン実行状況

SQSを使用して重複を発生させない実装

serverless.ymlを次のように記述します。

service: sample-app

plugins:
  - serverless-python-requirements
  - serverless-prune-plugin
  - serverless-step-functions
  - serverless-pseudo-parameters

custom:
  defaultStage: dev
  defaultRegion: ap-northeast-1
  defaultProfile: default
  enableTrigger:
    dev: true
    stg: false
    prod: false
  prune:
    automatic: true
    number: 3

provider:
  name: aws
  runtime: python3.8
  stage: ${opt:stage, self:custom.defaultStage}
  profile: ${opt:profile, self:custom.defaultProfile}
  memorySize: 128
  region: ${opt:region, self:custom.defaultRegion}
  iamRoleStatements:
    - Effect: Allow
      Action:
        - sqs:*
      Resource:
        - "*"

functions:
  EnQueue:
    handler: functions/enqueue/handler.lambda_handler
    name: "sample-app-${self:provider.stage}-enqueue"
    description: 'SQSへのメッセージの入力。まだ稼働中のメッセージがあったら削除する'
    environment:
      STAGE: ${self:provider.stage}
    package:
      include:
        - /functions/enqueue/**
  CallSomeLongApi:
    handler: functions/call-some-long-api/handler.lambda_handler
    name: "sample-app-${self:provider.stage}-call-some-long-api"
    description: 'レスポンスが長時間得られないこともあるAPIの呼び出し'
    environment:
      STAGE: ${self:provider.stage}
    package:
      include:
        - /functions/call-some-long-api/**
    timeout: 120
    events:
      - sqs:
          arn:
            Fn::GetAtt: [SampleAppQueue, Arn]
          batchSize: 1

stepFunctions:
  stateMachines:
    CallSomeLongApiOrchestrator:
      name: "sample-app-${self:provider.stage}-call-some-long-api-orchestrator"
      events:
        - schedule:
           name: "some-app-${self:provider.stage}-call-some-long-api-orchestrator"
           description: '1分ごとに処理を実行する'
           rate: rate(1 minute)
           enabled: ${self:custom.enableTrigger.${self:provider.stage}}
      definition:
        StartAt: EnQueue
        States:
          EnQueue:
            Type: Task
            Resource:
              Fn::GetAtt: [EnQueueLambdaFunction, Arn]
            InputPath: "$"
            ResultPath: "$.body"
            OutputPath: "$"
            End: true

resources:
  Resources:
    SampleAppQueue:
      Type: "AWS::SQS::Queue"
      Properties:
        QueueName: "SampleAppQueue.fifo"
        FifoQueue: true
        VisibilityTimeout: 120

SQSのリソースとしてFIFOキューを定義して、このFIFOキューに1分単位にメッセージをエンキューするEnQueue関数を定義しました。 なお、キューのVisibilityTimeoutの値はLambdaファンクションのタイムアウト値より大きい必要があるので注意してください。ここでは共に120秒に設定しています。

resources:
  Resources:
    SampleAppQueue:
      Type: "AWS::SQS::Queue"
      Properties:
        QueueName: "SampleAppQueue.fifo"
        FifoQueue: true
        VisibilityTimeout: 120

また、CallSomeLongApi関数はFIFOキューへのエンキューをトリガーに動作するように定義しています。

  CallSomeLongApi:
    :
    events:
      - sqs:
          arn:
            Fn::GetAtt: [SampleAppQueue, Arn]
          batchSize: 1

ステートマシンは次の構造になります。

重複回避したステートマシンの構造

Producerの実装

EnQueue関数(Producer)の実装は次の通りです。

import logging
import boto3
import datetime
import uuid

logging.basicConfig(
    format="%(asctime)s %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s"
)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

sqs = boto3.client('sqs')
sqsr = boto3.resource('sqs')

queue_name = 'SampleAppQueue.fifo'

def get_queue_name():
    return queue_name

def get_queue_resource():
    return sqsr.get_queue_by_name(QueueName=get_queue_name())

def get_queue_url(sqs):
    response = sqs.get_queue_url(QueueName=get_queue_name())
    return response['QueueUrl']

def lambda_handler(event, context):
    qr = get_queue_resource()
    count_of_message_processed = qr.attributes.get('ApproximateNumberOfMessagesNotVisible')
    logger.info('count of message: %s' % count_of_message_processed)

    date_now = datetime.datetime.now()

    if int(count_of_message_processed) > 0:
        # 処理中のメッセージがあったら何もしない
        logger.info('処理中のメッセージがあります。処理を抜けます。 date: %s' % date_now)
        return
    else:
        # 処理中のメッセージがなかったらエンキューする
        logger.info('処理中のメッセージがありません。処理を続けます。 date: %s' % date_now)
        response = sqs.send_message(
            QueueUrl=get_queue_url(sqs),
            MessageBody="message date : %s" % datetime.datetime.now(),
            MessageGroupId="sampleApp",
            MessageDeduplicationId=str(uuid.uuid4())
        )
        return response

SQSのGetQueueAttributesアクションで処理中のメッセージ数を取得しています。

    qr = get_queue_resource()
    count_of_message_processed = qr.attributes.get('ApproximateNumberOfMessagesNotVisible')

ApproximateNumberOfMessagesNotVisibleには処理中1のメッセージの数が入っています。 注意が必要なのは、FIFOキューでなければ、ApproximateNumberOfMessagesNotVisibleは「おおよそ」の数が入るということです。 処理中のメッセージ数を正確に調べたかったのでFIFOキューを採用しています。

Amazon SQS メッセージの処理に必要なリソース - Amazon Simple Queue Service

処理中のメッセージがあればFIFOキューに新たなメッセージをエンキューせず、処理中のメッセージがなければFIFOキューに新たなメッセージをエンキューしています。

Consumerの実装

CallSomeLongApi関数(Consumer)の実装は次の通りです。

import json
import logging
import boto3
import time
import random

logging.basicConfig(
    format="%(asctime)s %(module)s:%(funcName)s(%(lineno)d) - %(levelname)s - %(message)s"
)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

sqs = boto3.client('sqs')

queue_name = 'SampleAppQueue.fifo'

def get_queue_name():
    return queue_name

def get_queue_url(sqs):
    response = sqs.get_queue_url(QueueName=get_queue_name())
    return response['QueueUrl']

def lambda_handler(event, context):
    for record in event['Records']:
        # 長時間かかる場合もあるAPIを呼び出す処理に見立てる
        process_time = random.randint(40, 120)
        logger.info("%d秒かかるAPIコールが発生しています。", process_time)
        time.sleep(process_time)
        # メッセージを削除
        sqs.delete_message(QueueUrl=get_queue_url(sqs), ReceiptHandle=record["receiptHandle"])

    body = {
        "message": "Go Serverless v1.0! Your function executed successfully!",
        "input": event
    }

    response = {
        "statusCode": 200,
        "body": json.dumps(body)
    }

    return response

SQSへのエンキューをトリガーに設定した場合、送信されたメッセージはevent['Records']からとれます。 処理(長いAPIコール)を行った後、メッセージを削除しています。

結果

次図に示す通り、処理中のメッセージがある場合は新たにメッセージをエンキューしない動きが確認できます。

まとめ

Amazon SQSのFIFOキューを使うことで正確にメッセージの処理状態を把握することができました。 このことを活用することで処理が終わるのを待ってから次のメッセージの処理に移る実装ができます。 Step Functionsで外部APIの呼び出しなどで長時間の処理が含まれる場合にご参考になればと思います。


  1. メッセージがクライアントに送信されたが、まだ削除されていない場合、または表示期限に達していない場合、メッセージは処理中とみなされます。