Amazon ECSタスクを擬似的にAmazon SQSトリガーで実行する

Amazon ECSはAmazon SQSをトリガーとしてタスクを起動することに対応していません。これに対する対処としてはAWS Lambdaを挟む事が一般的ですが、別解としてApplication Auto Scalingを使うことによって擬似的にSQSをトリガーにECSタスクを実行する構成を検討し検証してみました。
2021.02.14

はじめに

おはようございます、加藤です。Amazon ECSタスクをAmazon SQSトリガーで実行したい場合は、SQS→Lambda→ECSタスクやSQS→StepFunction→ECSタスク(StepFunction内)と実行する事が一般的と思います。

Application Auto ScalingによってSQSキュー内のメッセージ数に応じてECSタスク数をゼロからスケーリングさせる事で擬似的にSQS→ECSタスクを実現する構成を思いついたので実際に検証してみました。

構成図

何かしらからSQSキューに対してメッセージが投入されるとECSタスクがメッセージをPullして処理します。

ECSタスク数はは0〜1の範囲でスケーリングし、キュー内のメッセージが0の場合はタスク数が0で1以上の場合はタスク1となります。

フロー図

メトリクスにはApproximateNumberOfMessagesVisibleとApproximateNumberOfMessagesNotVisibleの合算値を使用しています。つまり可視性を無視したメッセージの数をメトリクスに使用しているということです。

使用可能な Amazon SQS の CloudWatch メトリクス - Amazon Simple Queue Service

メッセージがインキューされると合算値が1になりタスクが1つ立ち上がります。

タスクが立ち上がるとメッセージをPullするためMessagesVisibleは0になりますが代わりにMessagesNotVisibleが1になるため合算値は変わらずタスクは1つのが維持され続けます。

タスクはが終了するとメッセージが削除されるため合算値が0になりタスク数が0に変更され初期状態に戻ります。

説明

今回の構成は下記の前提のもと検討しています。

  • メインアプリケーションから長時間かかる処理をオフロードしたい
  • 1つのメッセージにかかる処理は約15分
  • 1日10個ほどの処理が発生する
  • 処理は24時間以内に完了すれば十分である

急ぎの処理ではないのでタスク数は最小が0で最大が1にしました。タスク数の調整方法にはステップスケーリングExactCapacityを指定します。今回はタスク数が0か1なので絶対値でタスク数を指定できると直感的に設定を理解しやすいです。

メトリクスをApproximateNumberOfMessagesVisible単体では無く合算値にしたのは処理中にメトリクスが0になってしまいタスク数が0になる恐れがあったからです。メトリクスの取得間隔によって対処することをおそらく可能ですがわかりにくいと感じたので除外しました。

実装

検証環境はAWS CDKを使って作成しました。

import { AdjustmentType } from '@aws-cdk/aws-applicationautoscaling'
import { MathExpression } from '@aws-cdk/aws-cloudwatch'
import { SubnetType, Vpc } from '@aws-cdk/aws-ec2'
import {
  Cluster,
  ContainerImage,
  FargatePlatformVersion,
  FargateService,
  FargateTaskDefinition,
  LogDrivers,
} from '@aws-cdk/aws-ecs'
import { Role, ServicePrincipal } from '@aws-cdk/aws-iam'
import { Queue } from '@aws-cdk/aws-sqs'
import { Construct, Duration, Stack, StackProps } from '@aws-cdk/core'
import { resolve } from 'path'

export class MyStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props)

    const vpc = new Vpc(this, 'vpc', {
      natGateways: 0,
      maxAzs: 1,
    })
    const cluster = new Cluster(this, 'cluster', { vpc })
    const queue = new Queue(this, 'queue', {
      visibilityTimeout: Duration.minutes(30),
    })
    const taskRole = new Role(this, 'EcsServiceTaskRole', {
      roleName: 'ecs-service-task-role',
      assumedBy: new ServicePrincipal('ecs-tasks.amazonaws.com'),
    })
    queue.grantConsumeMessages(taskRole)
    queue.grantPurge(taskRole)

    const taskDefinition = new FargateTaskDefinition(this, 'ecs-task', {
      taskRole,
    })
    taskDefinition.addContainer('get-queue', {
      image: ContainerImage.fromAsset(
        resolve(__dirname, '..', 'src', 'docker')
      ),
      environment: {
        AWS_REGION: 'ap-northeast-1',
        QUEUE_URL: queue.queueUrl,
      },
      logging: LogDrivers.awsLogs({ streamPrefix: 'sqs-to-ecs' }),
    })
    const fargateService = new FargateService(this, 'ecs-service', {
      cluster,
      taskDefinition,
      assignPublicIp: true,
      vpcSubnets: { subnetType: SubnetType.PUBLIC },
      platformVersion: FargatePlatformVersion.VERSION1_4,
    })
    const scalableTaskCount = fargateService.autoScaleTaskCount({
      minCapacity: 0,
      maxCapacity: 1,
    })

    const allMessage = new MathExpression({
      expression: 'visible + notVisible',
      usingMetrics: {
        visible: queue.metricApproximateNumberOfMessagesVisible(),
        notVisible: queue.metricApproximateNumberOfMessagesNotVisible(),
      },
    })
    scalableTaskCount.scaleOnMetric('task-scaling', {
      metric: allMessage,
      scalingSteps: [
        { change: 1, lower: 1 },
        { change: 0, upper: 0 },
      ],
      adjustmentType: AdjustmentType.EXACT_CAPACITY,
    })
  }

ECSで動くアプリケーションはNode.jsを使用しています。言語はTypeScriptを使用しました。テストしていない適当なコードなので、もしこのコードを参考にする際は内容を自身でチェックしてください。

import {
  SQSClient,
  ReceiveMessageCommand,
  DeleteMessageCommand,
} from '@aws-sdk/client-sqs'

const REGION = process.env.AWS_REGION ?? process.env.REGION
const QUEUE_URL = process.env.QUEUE_URL

if (REGION == null || REGION === '') {
  throw new Error('AWS_REGION or REGION is required')
}

if (QUEUE_URL == null || QUEUE_URL === '') {
  throw new Error('QUEUE_URL is required')
}

const sqs = new SQSClient({ region: REGION })

const processQueue = async () => {
  const data = await sqs
    .send(
      new ReceiveMessageCommand({
        MaxNumberOfMessages: 10,
        QueueUrl: QUEUE_URL,
        WaitTimeSeconds: 0,
      })
    )
    .catch((err: Error) => err)
  if (data instanceof Error) {
    console.error('receive error', data)
    return
  }

  if (data.Messages == null) {
    console.log('no messages to delete')
    return
  }

  await Promise.all(
    data.Messages.map(async ({ MessageId, Body, ReceiptHandle }) => {
      console.log(`got message ID: ${MessageId}, body: ${Body}`)
      return await sqs
        .send(
          new DeleteMessageCommand({
            QueueUrl: QUEUE_URL,
            ReceiptHandle,
          })
        )
        .then(() => {
          console.log(`message ID: ${MessageId} deleted`)
        })
    })
  )
}

const run = async () => {
  const sleep = (msec: number) =>
    new Promise((resolve) => setTimeout(resolve, msec))

  while (true) {
    await processQueue()
    await sleep(5000)
  }
}

run()

動作確認

デプロイ後に設定と動作確認を行います。まずApplication Auto Scalingが意図されたとおりに設定されているかCloudWatch Alarmから確認します。意図したとおり、合算値に応じてECSタスク数の絶対値を変更するように設定が行われていました。

SQSのマネジメントコンソールからキューにメッセージをインキューします。2回インキューを行いました。

ECSタスクのログを確認するとメッセージ内容が受け取れていることが確認できました。

ECSサービスのイベントを確認してみるとタスク数がApplication Auto Scalingによって変更されたことが確認できます。

まとめ

SQS→Lambda→ECSタスクという構成に比べてこの構成の良さはコードがLambdaに分散せずコンテナに集約されることにあります。代償としてスケーリングポリシーの考慮と学習が必要になります。

実際に構築してみた感想としてはLambdaからECSタスクを実行した方が構成が簡潔で良いなと思いました。またAWS利用費がかかりますが思い切ってECSタスクのゼロスケーリングを諦めて常にタスクを1つ起動し代わりにFargate Spotを使うとAWS利用費の増加を抑えつつ構成はより簡潔になります。(ただし処理が中断されても問題ないようにアプリケーションを修正する必要があります。)

プロジェクトメンバーのスキルセットや予算に応じて検討するのが良さそうですね。ただし前述の通りファーストチョイスとしてはSQS→Lambda→ECSタスクの構成をおすすめ致します。