AWS再入門ブログリレー Amazon SQS編

2020.08.07

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、もこ@札幌オフィスです。

当エントリは弊社コンサルティング部による『AWS 再入門ブログリレー 2020』の 5日目のエントリです。

このブログリレーの企画は、普段 AWS サービスについて最新のネタ・深い/細かいテーマを主に書き連ねてきたメンバーの手によって、 今一度初心に返って、基本的な部分を見つめ直してみよう、解説してみようというコンセプトが含まれています。

AWS をこれから学ぼう!という方にとっては文字通りの入門記事として、またすでにAWSを活用されている方にとっても AWSサービスの再発見や2020 年のサービスアップデートのキャッチアップの場となればと考えておりますので、ぜひ最後までお付合い頂ければ幸いです。

では、さっそくいってみましょう。5日目のテーマはAmazon SQSです。

Amazon SQSとは?

Amazon SQSとは「Amazon Simple Queue Service」の略で、その名の通りメッセージのキューを提供するマネージド型のサービスで、アプリケーション間で処理を分離する時に利用する場合、処理の橋渡し役となる安全かつスケーラビリティの高いキューです。

バッチ処理などでワーカープロセスを柔軟にスケールするような構成でSQSを利用すると、様々なユースケースで利用することが出来ます。

SQSはメッセージを取得後も(一部の例外を除き)明示的にキューからメッセージを削除しない限りキューに残り続けるため、処理が失敗してもメッセージを失うこと無くリトライする事が出来ます。

具体的なユースケース

実際にSQSをサービスに組み込む時に使われる、よくあるSQSのアーキテクチャ一例をご紹介します。

SQSで時間が掛かる処理をオフロードするパターン

バッチ処理や複雑で重い処理をHTTPで受け付ける場合などでSQSを利用する場合、下記のような構成を取ることが出来ます。

  1. 処理リクエストを投げる
  2. SQSに処理内容が書かれたメッセージを送信
  3. 202 Acceptedレスポンスを返却しておく
  4. SQSをポーリングする形でメッセージを取得して、処理をする

このような構成の場合、フロントとバックエンドの処理を別けることができ、処理を待たずにキューイングするような実装を簡単に作る事ができます。

SQSではキューに貯まっているメッセージ数からAutoScalingさせるように構成する事も出来るので、処理するワーカーノードを柔軟にスケーリングさせることも出来ます。

アプリケーション間の連携に使うパターン

マイクロサービスなどのアプリケーション間連携にSQSを使うケースもよくあります。

  1. 処理内容をSQSに送信
  2. SQSをポーリングする形でアプリケーション間の連携を行う

アプリケーション間の連携でSQSを使うことで、万が一処理先のマイクロサービスで障害が起きた場合でもメッセージはSQSに貯まり続けて、障害復帰後に処理を再開出来ます。

SQS標準キューは"ほぼ無制限の"APIコールをサポートしているので、スパイク的な大量のリクエストも捌いてくれます。

標準キューとFIFOの違い

SQSは「標準キュー」と「FIFOキュー」の2種類があります。

標準キューでは"ほぼ無制限"のAPIコールをサポートしている一方で、メッセージを重複して取得してしまう可能性があったり、メッセージの取得順が保証されていません。

「メッセージが登録された順に1度だけ」のような順序保証と重複排除が必要な場合はFIFOキューを使いましょう。

FIFOキューは順番を保証する代わりに1秒あたりの最大トランザクションは3000となっており、料金も標準キューが100万リクエストにつき0.4ドルに対して、FIFOは0.5ドルと、若干高くなっています。(2020年8月7日時点)

FIFOキューについては「【新機能】Amazon SQSにFIFOが追加されました!(重複削除/単一実行/順序取得に対応)」のエントリーがとてもわかりやすく詳細に解説されているので、是非こちらも合わせてご覧下さい。

SQSをアプリケーションに組み込んでみる

ここまでSQSの基本的な仕組みとユースケースについてご紹介してきましたが、実際にアプリケーションに組み込んで挙動を確認してみましょう。

SQSにメッセージを登録するプロデューサー側

SQSではメッセージを登録するアプリケーションの事を「プロデューサー」と呼びます。

下記コードはSQSに対して「こんにちは」というメッセージを登録するサンプルコードです。

※実際には処理内容を記載して登録する形になります

const AWS = require("aws-sdk")
const SQS = new AWS.SQS({ region: "ap-northeast-1" })

const QueueUrl = "https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxxxx/example"

async function sendMessage(message) {
  try {
    const MessageBody = JSON.stringify({ message })
    const result = await SQS.sendMessage({ MessageBody, QueueUrl }).promise()
    console.log(result)
  } catch (e) {
    console.error(e)
  }
}

sendMessage("こんにちは")

実行するとこんな感じのレスポンスが帰ってきます。

{
  "ResponseMetadata": {
    "RequestId": "2245f4a0-f8b7-50a7-9e4b-a3060bda0671"
  },
  "MD5OfMessageBody": "04366f504cabfb9b64545bd8f0a6012b",
  "MessageId": "72dc98e1-84f8-4e8a-bdb9-7c0f9cb62e57"
}

マネージドコンソールを見るとキューにメッセージが貯まっていることを確認出来ます。

EC2などでメッセージをSQSから取得して処理するコンシューマー側

SQSではメッセージを取得して処理を行うアプリケーションを「コンシューマー」と呼びます。

プロデューサー側で登録したメッセージを取得して、処理をして、メッセージを削除する一連の流れのコードはこんな感じです。

const AWS = require("aws-sdk")
const SQS = new AWS.SQS({ region: "ap-northeast-1" })

const QueueUrl = "https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxxxx/example"

async function processingMessage() {
  try {
    const params = {
      QueueUrl,
      MaxNumberOfMessages: 10, // 最大取得メッセージ
      WaitTimeSeconds: 20 // ロングポーリング
    }
    const messages = await SQS.receiveMessage(params).promise() // メッセージを取得
    for (message of messages.Messages) {
      const body = JSON.parse(message.Body)
      console.log(message)
      console.log(body.message) //メッセージの実態
      //重い処理
      await new Promise((resolve, reject) => {
        setTimeout(resolve, 1000)
      })
      const params =  {
        QueueUrl,
        ReceiptHandle: message.ReceiptHandle
      }
      const deleteResult = await SQS.deleteMessage(params).promise()
      console.log(deleteResult)
    }

  } catch (e) {
    console.error(e)
  }
}

processingMessage()

SQSでは一部の例外(Dead Letter Queueと保存期間切れ)を除き明示的にメッセージを削除しないとキューにメッセージが残り続けるので、 DeleteMessage でメッセージを削除してあげる必要があります。

ReceiveMessage で取得出来るデータはこんな感じで、 ReceiptHandleDeleteMessage に渡してあげるとキューからメッセージを削除出来ます。

{
  "ResponseMetadata": {
    "RequestId": "8bc818cd-89b5-5746-8429-4f6b7eb8ecd4"
  },
  "Messages": [
    {
      "MessageId": "0428c6c9-9193-45f6-a7fa-10ca0b6efaf7",
      "ReceiptHandle": "AQEBnNV3auHIuRy8Oh8Pdlr8GAEmzu1Gt9iZ1zEr2AIxOvSpeqCthwI9JtaH7jZyC4CJUUrdQR60b0G6MtkNBLR4sJ1HFAhYg04kDGTfCySB49NKFTXtIkK5SF4jcf/W71j8+jDy7HwbsRqO8PrmdJ8wZKzKIHsYnTNpT9NYOltOgIMP22JA+v+/fn64W1jfTUOKmHaISkZwEnSF++aULZH/FQCXZbN7P9FbeeqmlWOOaY4w0FLNnftEjxEplu+NNx9thQn062TsAINCTFNI7yx26KlBG+fAqYTK0g1Uzyk+ZGRdd562ylrqB/hRrNgnPRRYfDOCD0mJ5qXhllXIUdRV8011P+UWf2s10WjheBq9WjNEB06xCKm9/1ZuIZaMpW1CS+6IJy3qGWIXOMK0cCM6mg==",
      "MD5OfBody": "04366f504cabfb9b64545bd8f0a6012b",
      "Body": "{\"message\":\"こんにちは\"}"
    },
    {
      "MessageId": "94124ef1-659f-48e4-a594-ea146e7e53cd",
      "ReceiptHandle": "AQEB3qoCIBtUecBjVLAtG4K/4lXVyoHInqj/8svoXZ5Oc3tcuxk6vc/GlAwb5gt/YtbtNAOHB2q4l9xSNFNgnTgtsKr9dxR+izHaJMXNLaXwPVCbQGkQAo01Jgj24qvHDXb3hz92EdFFiAryQHYMrIhV5ooo5un7pRygIvg4+Nn55KQkJMyrdoUdA2DthXARE0T5t6M2A8pbaYy0jG9uv0pOl34AQagdh/OqJ6qpdfEzsXx7r3gzjQ1Pg8GE1C2S+whMSdyau83O0irAfLbvPypPdUY4nGoqXyA4No/7wmvVt3n6b6XY05OIHCJRqeOqyIh2yMC8aTSLWQp+ZjF+YJFGh5igXt63+yPEPPkU0nQnLq3nKn94RTxFTMAxp21OKIytmBx6RYcwZeWe9TnU2s2kdw==",
      "MD5OfBody": "04366f504cabfb9b64545bd8f0a6012b",
      "Body": "{\"message\":\"こんにちは\"}"
    }
  ]
}

LambdaでSQSのメッセージを取得して処理する方法

SQSをイベントソースにしてLambdaで処理を行うことが可能です。

通常は上記のようにAWS SDKを利用して ReceiveMessage でメッセージ取得して、処理が完了した後に DeleteMessage でキューからメッセージを削除する必要がありますが、SQSをイベントソースにしたLambdaの場合、Lambdaが正常終了するとメッセージが削除される挙動となります。

import { Context, SQSEvent, Callback } from 'aws-lambda';

export async function handler(event: SQSEvent, context: Context, callback: Callback) {
  try {
    for (const message of event.Records) {
      // キューの処理
      console.log(message);
      await new Promise((resolve, reject) => {
        setTimeout(resolve, 1000); //重い処理
      });
    }
    // 正常終了でcallback
    callback(null, 'success');
  } catch (e) {
    callback(e);
  }
}

SQSをイベントソースにしたLambdaの詳細については、下記ブログにて解説されていますので是非合わせてご覧下さい。

ライフサイクルとSQSの設定

さて、ここまでユースケースやアプリケーションへの実装方法をご紹介してきましたが、SQSではその他にも「かゆいところに手が届くメッセージライフサイクル」機能が複数存在します。

SQSのライフサイクルついて再入門していきましょう。

メッセージのライフサイクル

まずはじめにメッセージのライフサイクルについておさらいです。

SQSのメッセージライフサイクルは大きく分けて、「遅延キュー、可視性タイムアウト、DLQ(Dead Letter Queue)、保存期間切れ」の4段階に別けることが出来ます。

・SQSにメッセージを送信する SendMessage を利用した後、設定された「遅延キュー」の間はメッセージを取得出来ない状態になります。

・SQSは ReceiveMessage を使ってメッセージを取得した時、「可視性タイムアウト」が発動して、同じメッセージが設定期間見えなくなります。

・設定した回数以上ReceiveMessage されたが DeleteMessage されずにまだキューにある状態の場合、DLQ(Dead Letter Queue)にメッセージが移動されます。

・キューの設定した保存期間を過ぎるとメッセージは削除されます。 保存期間を過ぎた場合DLQ(Dead Letter Queue)に入らないので、注意が必要です。

遅延キュー

遅延キューを使うことで、メッセージがSQSに登録されてから指定した時間メッセージが非表示になります。

遅延キューは「キューに入ってくる新しいメッセージ全てに適用される」ため、キューに登録されてからn分後にコンシューマーで処理させたい場合などに利用することが出来るかと思います。

可視性タイムアウト

可視性タイムアウトは ReceiveMessage でメッセージを取得してから指定した時間の間、同じメッセージを取得出来なくなる、という物です。

SQSは一部の例外(Dead Letter Queueと保存期間切れ)を除き、明示的に DeleteMessage でメッセージを削除しない限りキューから削除されないので、処理中に別のコンシューマーが ReceiveMessageをして同じメッセージを受け取らないように可視性タイムアウトを設定する事で、多重実行の最小化が出来ます。(最小化出来るだけで、前述の通り標準キューでは複数回同じメッセージを受け取る可能性もあります。)

DLQ(Dead Letter Queue)

DLQ(Dead Letter Queue)は、メッセージを何らかの理由で処理出来なかった場合にメインのキューからDLQにメッセージを移動させる機能になります。

具体的には、指定した回数以上 ReceiveMessage された場合にDLQに移動する形になります。

DLQを利用するメリットとして、例えば何らかの理由でコンシューマーが処理出来ないメッセージがキューに入ってしまった場合、DLQを利用しないとメッセージの保存期間が過ぎるまでキューに貯まり、 ReceiveMessage で処理出来ないメッセージを引くたびに無駄が生じてしまいます。

DLQを利用することでこのようなメッセージを排除して、DLQに入ったらアラートを飛ばすようにしたり、処理出来なかった時用のコンシューマーを用意するなど、ケースに応じて柔軟に対応することが出来ます。

まとめ

以上、『AWS 再入門ブログリレー 2020』の 5日目のエントリ『Amazon SQS』編でした。 来週火曜日 (8/11) はたぬきの「Amazon S3」の予定です。お楽しみに!!

参考

https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html

https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-delay-queues.html