この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
西田@大阪です
今回はSQS FIFO キューで処理に失敗した場合に実行順序がどうなるのかを調べてみました
SQS の FIFO とは
SQS の FIFOキューについては以下の記事を参照ください
標準キューと違って以下の点が特徴です
- キューの重複がおこりません
- キューの順序が保証されます
疑問
SQS FIFO のドキュメントから抜粋です
複数回の再試行
■ コンシューマーが失敗した SendMessage アクションを検出した場合、同じメッセージ重複排除 ID を使用して必要に応じて何回でも送信を再試行できます。重複排除の期間が終了する前にプロデューサーが少なくとも 1 つの確認を受信した場合、複数の再試行を行ってもメッセージの順序に影響したり、重複が発生したりすることはありません。
■ コンシューマーが失敗した ReceiveMessage アクションを検出した場合、同じ受信リクエスト試行 ID を使用して必要に応じて何回でも再試行できます。可視性タイムアウトが終了する前にコンシューマーが少なくとも 1 つの確認を受信した場合、複数の再試行を行ってもメッセージの順序に影響はありません。
■ メッセージグループ ID を含むメッセージを受信した場合、メッセージを削除するか、メッセージが表示されるまで、同じメッセージグループ ID のメッセージはそれ以上返されません。
ドキュメントからは以下のことは保証されていそうです
- 再試行されてもューの順序は守られる
- 再試行されてもキューの重複はおこらない
ただ、以下のケースについてはうまく読み取れませんでした
- 複数キューが失敗した場合は再試行される順序は保証されるのか?
- 複数サブスクライバーがいた場合、失敗したキューが Dead Letter Queueにいくまですべてのコンシューマーはブロックされるのか?
先に答えから
先に答えから書きます
- 失敗しても順番は 保証されます
- 失敗したキューが Dead Letter Queue に行くまで すべてのサブスクライバーはブロックされます
※ VisibilityTimeout
を超えてキューが処理が続行してしまう場合は順番は保証されず、複数回実行される点をご注意ください
試してみた
以下の設定で Dead Letter Queue に行くSQS FIFOキューを作成します
- VisibilityTimeout が 3秒
- 2回失敗
AWSTemplateFormatVersion: "2010-09-09"
Description: "SQS FIFO Template"
Resources:
CreateSetuppackDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: sqs-sample-dl.fifo
FifoQueue: true
CreateSetuppackQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: sqs-sample.fifo
FifoQueue: true
ContentBasedDeduplication: false
RedrivePolicy:
deadLetterTargetArn:
Fn::GetAtt:
- CreateSetuppackDeadLetterQueue
- Arn
maxReceiveCount: 2
VisibilityTimeout: 3
パブリッシャー側のコードです
def enqueue(n=1):
for i in range(n):
sqs.send_message(
QueueUrl=SQS_URL,
MessageBody=f"{i}",
MessageGroupId="test",
MessageDeduplicationId=str(uuid.uuid4())
)
渡された引数の数値の数だけ "1", "2" ... と数字(str)をエンキューしていきます
サブスクライバー側のコードです。
def dequeue():
print(threading.get_ident())
while True:
response = sqs.receive_message(
QueueUrl=SQS_URL,
MaxNumberOfMessages=1
)
if 'Messages' not in response:
time.sleep(0.1)
continue
for message in response['Messages']:
if message['Body'] == '15':
print(f"{threading.get_ident()} {datetime.datetime.now()} {message['Body']} failed")
time.sleep(5)
else:
print(f"{threading.get_ident()} {datetime.datetime.now()} {message['Body']}")
handle = message['ReceiptHandle']
try:
sqs.delete_message(
QueueUrl=SQS_URL,
ReceiptHandle=handle
)
except ClientError as e:
print(e)
enqueue(20)
with ThreadPoolExecutor(max_workers=3) as executor:
executor.submit(dequeue)
executor.submit(dequeue)
executor.submit(dequeue)
メッセージを受信しメッセージの内容が'15'
であれば VisibilityTimeout
より長めにスリープしています
実行結果です
スレッドID, 実行時間, 処理したメッセージの内容 の順に表示されています
123145397649408
123145414438912
123145431228416
123145397649408 2020-01-31 08:20:25.727753 0
123145397649408 2020-01-31 08:20:25.794476 1
123145397649408 2020-01-31 08:20:25.865409 2
123145397649408 2020-01-31 08:20:25.930104 3
123145431228416 2020-01-31 08:20:25.993035 4
123145431228416 2020-01-31 08:20:26.057589 5
123145431228416 2020-01-31 08:20:26.133130 6
123145397649408 2020-01-31 08:20:26.173065 7
123145397649408 2020-01-31 08:20:26.237981 8
123145397649408 2020-01-31 08:20:26.303023 9
123145414438912 2020-01-31 08:20:26.342946 10
123145414438912 2020-01-31 08:20:26.397174 11
123145414438912 2020-01-31 08:20:26.462444 12
123145414438912 2020-01-31 08:20:26.527355 13
123145414438912 2020-01-31 08:20:26.588985 14
123145414438912 2020-01-31 08:20:26.653587 15 failed
123145397649408 2020-01-31 08:20:29.719290 15 failed
123145414438912 2020-01-31 08:20:32.746732 16
123145414438912 2020-01-31 08:20:32.801750 17
123145414438912 2020-01-31 08:20:32.867375 18
123145414438912 2020-01-31 08:20:32.934917 19
0から順に処理され 15
で2回失敗する間すべてのサブスクライバーが止まってることが確認できると思います
最後に
この記事が誰かの参考になれば幸いです