AWS再入門ブログリレー2022 SQS編

弊社コンサルティング部による『AWS 再入門ブログリレー 2022』の9日目のエントリでテーマはSQSです。 AWSでも歴史のあるサービスで、Simple Queue Serviceとはいいつつも様々な便利な機能があります。1つでも知らない機能があり、学びにつながれば幸いです。
2022.02.14

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、アシです。

当エントリは弊社コンサルティング部による『AWS 再入門ブログリレー 2022』の 9日目のエントリです。

このブログリレーの企画は、普段 AWS サービスについて最新のネタ・深い/細かいテーマを主に書き連ねてきたメンバーの手によって、 今一度初心に返って、基本的な部分を見つめ直してみよう、解説してみようというコンセプトが含まれています。

AWSをこれから学ぼう!という方にとっては文字通りの入門記事として、またすでにAWSを活用されている方にとってもAWSサービスの再発見や2022年のサービスアップデートのキャッチアップの場となればと考えておりますので、ぜひ最後までお付合い頂ければ幸いです。

では、さっそくいってみましょう。9日目のテーマはSQSです。

メッセージキューイングについての整理

メッセージキューイングとはプロセス間、スレッド間の通信に使用される方式の一つで、メッセージと呼ばれるデータをキューというデーター構造に格納することで通信を行います。 キューはデータのリストで先に格納したデータが先に排出されるような構造のものです。

メッセージキューイングを行うことで以下のようなメリットが考えられます。

コンポーネントが疎結合になる

キューを使用することで分離したコンポーネントをつなぎ合わせてシステムを構成することが可能になります。 こうすることで、保守性、拡張性、再利用性が高くなり、また異なった言語を組み合わせることもできます。 コンポーネントが分離していることによって、特定のコンポーネントのみのスケールを変更するといったことも容易になります。

非同期的に処理ができる

キューに処理したいものを一時的に保存することで、時間のかかる処理などを非同期的に処理することが可能になります。 キューに送信する側は送信した時点で処理を完了でき、その処理を別のコンポーネントが後から処理することができるようになります。

例えば、動画配信サービスなどでアップロードされた動画を変換する場合に、ユーザーエンドポイントのサーバーはアップロードを受け付けたことだけを返信し、キューにメッセージを送信します。

SQSについて

SQSはフルマネージド型のメッセージキューイングサービスです。 フルマネージド型ということで自分でサーバーやミドルウェアの管理をする必要がありません。 また、AWSの様々なサービスと連携することができる点も特徴です。

Amazon SQSを利用するメリット

セキュリティ

SQSではIAMを利用してアクセスを制御することができます。 また、KMSを用いてサーバーサイドでの暗号化も可能です。

耐久性

SQSでは複数のサーバーにメッセージを保存することでメッセージが喪失する可能性を低めています。 詳しくは後で書きますが、スタンダードキューでは少なくとも1回、FIFOキューでは厳密に1回のメッセージが処理されます。

可用性

SQSのインフラは冗長化されています。 また、メッセージへのアクセスは高い同時実効性を持ち、メッセージの配信と消費についても高い可用性を持っています。

スケーラビリティ

SQSは自動でスケールイン/アウトを行ってくれます。 プロビジョニングの操作などは不要で、負荷の増減に自動で対応してくれます。

信頼性

SQSでは処理中にメッセージをロックします。 これによって、複数のプロデューサー/コンシューマーが同時に操作を行っても整合性が保たれます。

カスタマイズ

SQSではキューに対して様々なオプションを設定することができます。 例えば、メッセージの配信に遅延を入れたり、大きなサイズのメッセージをS3やDynamoDBに保存したりすることが可能です。

スタンダードとFIFOキュー

SQSには2種類のキューが存在します。 それは、スタンダードキューとFIFOキューです。

比較すると以下のような感じです。

スタンダード FIFO
メッセージの順序 可能な限り順番は保持する 完全に先入先出し
配信回数 少なくとも1回 厳密に1回

また、このほかにもメッセージのスループットなどクォータが異なるので使用する際は注意してください。

使い分けとしては、メッセージの順番や配信回数に制約がある場合にはFIFOキューを使うのが良いでしょう。

例えば、アップロードされた動画を非同期的に処理するといったシステムの場合は実行順序や重複実行してもあまり問題はないのでスタンダードキューでも問題ないでしょう。

逆に、銀行の取引システムなどでは重複実行や実行順序などが重要になってくるためFIFOキューを使うのが良いでしょう。

便利な機能

可視性タイムアウト

SQSではキューからメッセージを取得しても、メッセージは削除するまでキューに残っています。 そうすると、そうすると何度も同じメッセージを取得することになってしまいますが、そこで出てくるのが「可視性タイムアウト」です。

これを設定することでメッセージの取得後、そのメッセージは再度アクセスしても一定時間取得できないようになります。 タイムアウトはデフォルトが30秒、最小は0秒、最大は12時間で設定できます。

ただし、注意が必要でスタンダードキューの場合は可視性タイムアウトを設定していても2回以上取得されてしまう場合があります。 これを厳密に1回にしたい場合はFIFOキューを使います。

デッドレターキュー

SQSでは削除されるまでキューにメッセージが残り続けるため、処理に失敗した場合などは、取得→タイムアウトによる復帰→取得を何度も繰り返すことになり、処理できないメッセージを何度も処理することになります。

こういったときに便利なのがデッドレターキューです。デッドレターキューを設定すると取得回数が一定以上を超えた場合に別のキューに移してくれます。失敗したメッセージの保管庫のようなイメージです。

これは、特別なキューではなく、単なるSQSのキューです。なので、設定する場合は先にキューを作成して設定する必要があります。

遅延キュー

遅延キューを使用するとメセージが配信されてから取得できるようになるまでに遅延を設けることができます。

例えば、別のシステムと協調して動くような処理で、メッセージが配信されてから数秒間待ってから処理をしたいといった場合に便利です。コンシューマー側で遅延の処理を入れる必要がなくなります。

近い仕組みで、メッセージタイマーというものがSQSにはあります。こちらはキュー単位ではなくメッセージ単位で遅延を設定できます。メッセージ毎に遅延したい時間が異なる場合に便利です。

使ってみる

SQSを実際に使ってみます。

準備

今回はCDKを利用してキューを用意します。

sqs.py

from aws_cdk import (
    Stack,
    CfnOutput,
    aws_sqs as sqs
)
from constructs import Construct


class QueueStack(Stack):

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        queue = sqs.Queue(self, "ExampleQueue")

        CfnOutput(self, "QueueURL", value=queue.queue_url)

CDKにはSQSのキューのハイレベルなインターフェイスが用意されているので便利です。 権限の管理やデッドレターキューの登録なども簡単に行うことができます。

デプロイ後にキューのURLがアウトプットとして出力されます。これはキューにアクセスする際に使用します。

アクセスしてみる

今回は簡単にメッセージを配信ー>取得ー>削除という流れをやってみようと思います。

app.py

import boto3

sqs = boto3.resource('sqs')

URL = 'https://sqs.ap-northeast-1.amazonaws.com/XXXXXX/XXXXXX'
queue = sqs.Queue(URL)


for i in range(5):
    # メッセージを送信
    # send_messagesを使えば複数個同時に送信できる
    queue.send_message(
        MessageBody=f'No. {i}'
    )

for i in range(5):
    # メッセージの受信
    # MaxNumberOfMessagesを指定すれば複数個同時に取得できる。デフォルトは1
    # WaitTimeSecondsを指定すればキューがからの場合に待ってくれる
    message = queue.receive_messages()
    message = message[0]
    print(message.body)

    # 処理が終わった後は削除
    message.delete()

実行結果は以下のようになります。

実行してみる

$ python3 sqs.py 
No. 1
No. 4
No. 2
No. 3
No. 0

投稿順とは一致していませんが全てのデータが取得できました。

最後に

以上、『AWS 再入門ブログリレー 2022』の9日目のエントリ『SQS』編でした。 次回2/15(火)はshiraishiさんの「ElastiCache編」の予定です。