boto3からSQSへのアクセスをmotoでモックしてみる

2022.07.04

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部の鈴木です。

SQSとboto3を使って連携するPythonスクリプトを作る際に、motoでモックしたくなったので、簡単な例ですがブログにしてみました。

前提

使用したツールのパージョンは以下になります。

  • Python:3.9.0
  • boto3:1.17.112
  • moto:3.1.16
  • pytest:7.1.2

motoはAWSの各種サービスをモックするためのPythonのライブラリです。 GitHubのスター数は6,000以上で多くのユーザーに利用されています。

motoについては、ほかにも以下の記事でS3とDynamoDBのモックについて取り上げられています。

motoはテスト用の関数にデコレータをつけるだけで簡単にモックできるため、使い心地は特定のサービスによらずS3やDynamoDBの例とほぼ変わらないと思いますが、今回はSQSとのやりとりを例にしてみました。

やってみる

テスト用のプロジェクト準備

以下のようなディレクトリ構造で環境を準備しました。

project
├── sqs_sample
│   ├── __init__.py
│   └── main.py
└── test
    ├── __init__.py
    ├── messages
    │   ├── message1.json
    │   ├── message2.json
    │   ├── message3.json
    │   ├── message4.json
    │   └── message5.json
    └── test_main.py

テストしたい関数として、以下のようにSQSに入っているメッセージを取得する関数を用意しました。wait_time_seconds5として5秒間ポーリングし、最大で10個のメッセージを取得する実装です。

main.py

import boto3


def get_messages(queue_url):
    # SQSのキューからメッセージを取得する。
    sqs = boto3.client('sqs')
    wait_time_seconds = 5
    messages = []

    r = sqs.receive_message(QueueUrl=queue_url,
                            WaitTimeSeconds=wait_time_seconds,
                            MaxNumberOfMessages=10)

    received_messages = r.get("Messages")
    if received_messages:
        messages += received_messages
        
    return messages

motoを使ったモックを実装するのは、testディレクトリ配下のtest_main.pyです。テスト用の関数test_send_message@mock_sqsデコレーターを付けておきました。

test_send_message関数は以下のような実装です。

test_main.py

import glob
import json

import boto3
from moto import mock_sqs

from sqs_sample.main import *


@mock_sqs
def test_send_message():
    sqs = boto3.client('sqs')
    r = sqs.create_queue(QueueName="unittest-queue")
    queue_url = r["QueueUrl"]

    entries = []
    json_paths = glob.glob("./test/messages/*.json")
    for i, json_path in enumerate(json_paths):
        with open(json_path, "r") as json_f:
            json_data = json.load(json_f)
            json_payload = json.dumps(json_data)
            # boto3のsend_message_batchの仕様に合わせてIdは文字列にする
            entry_id = str(i)
            entry = {'Id': entry_id, 'MessageBody': json_payload}
            entries.append(entry)
    _ = sqs.send_message_batch(QueueUrl=queue_url, Entries=entries)

    # テスト
    messages = get_messages(queue_url)

    assert len(messages) == 5

messages以下には、テスト用にSQSに投入するメッセージを格納しています。例えば、message1.jsonは、以下のようにしておきました。

message1.json

{
    "messsage": "test message 1"
}

__init__.pyは空のファイルです。

pytestでテストを実行してみる

プロジェクトのルート(今回だとtestの上のprojectディレクトリ)でpytestを実行します。

project % pytest    
==================================================================== test session starts ====================================================================
platform darwin -- Python 3.9.0, pytest-7.1.2, pluggy-1.0.0
rootdir: /Users/ユーザー名/Desktop/sqs-moto/project
collected 1 item                                                                                                                                            

test/test_main.py .                                                                                                                                   [100%]

===================================================================== 1 passed in 0.73s =====================================================================

SQSのモックを使って、テストコードが上手く動いたことが確認できました。

可視性タイムアウトの挙動も調べる

これだけでも便利ですが、試していて可視性タイムアウトの動きも再現してくれるのが分かり、すごいなと思ったのでご紹介します。

あくまで説明用なので、テストコードとしては変なコードですが、test_main.pyを以下のように修正します。2回get_messages関数を呼び出し、その間に30秒の休止を設けています。取得できたメッセージの件数を期待値と比較するようにしました。今回キューはデフォルト値で作っているので、可視性タイムアウトは30秒になります。

test_main.py

import glob
import json
import time

import boto3
from moto import mock_sqs

from sqs_sample.main import *


@mock_sqs
def test_send_message():
    sqs = boto3.client('sqs')
    r = sqs.create_queue(QueueName="unittest-queue")
    queue_url = r["QueueUrl"]

    entries = []
    json_paths = glob.glob("./test/messages/*.json")
    for i, json_path in enumerate(json_paths):
        with open(json_path, "r") as json_f:
            json_data = json.load(json_f)
            json_payload = json.dumps(json_data)
            # boto3のsend_message_batchの仕様に合わせてIdは文字列にする
            entry_id = str(i)
            entry = {'Id': entry_id, 'MessageBody': json_payload}
            entries.append(entry)
    _ = sqs.send_message_batch(QueueUrl=queue_url, Entries=entries)

    # テスト
    messages = []
    messages += get_messages(queue_url)

    # 30秒待って、メッセージが見えるようになるのを待つ
    time.sleep(30)
    messages += get_messages(queue_url)

    assert len(messages) == 5

pytestを実行してみると、キューで1度不可視になったメッセージが再度見えるようになるので、2重に取れてしまい、取得したメッセージが10件になることが再現されていました。今回は無理矢理な例ではありますが、この挙動が再現されることを考慮してユニットテストを書いておく必要があります。

project % pytest   
==================================================================== test session starts ====================================================================
platform darwin -- Python 3.9.0, pytest-7.1.2, pluggy-1.0.0
rootdir: /Users/ユーザー名/Desktop/sqs-moto/project
collected 1 item                                                                                                                                            

test/test_main.py F                                                                                                                                   [100%]

========================================================================= FAILURES ==========================================================================
_____________________________________________________________________ test_send_message _____________________________________________________________________

    @mock_sqs
    def test_send_message():
        sqs = boto3.client('sqs')
        r = sqs.create_queue(QueueName="unittest-queue")
        queue_url = r["QueueUrl"]
    
        entries = []
        json_paths = glob.glob("./test/messages/*.json")
        for i, json_path in enumerate(json_paths):
            with open(json_path, "r") as json_f:
                json_data = json.load(json_f)
                json_payload = json.dumps(json_data)
                # boto3のsend_message_batchの仕様に合わせてIdは文字列にする
                entry_id = str(i)
                entry = {'Id': entry_id, 'MessageBody': json_payload}
                entries.append(entry)
        _ = sqs.send_message_batch(QueueUrl=queue_url, Entries=entries)
    
        # テスト
        messages = []
        messages += get_messages(queue_url)
    
        # 30秒待って、メッセージが見えるようになるのを待つ
        time.sleep(30)
        messages += get_messages(queue_url)
    
>       assert len(messages) == 5
E       assert 10 == 5
E        +  where 10 = len([{'Body': '{"messsage": "test message 1"}', 'MD5OfBody': 'f03da24ecd189336e67d66296adc0085', 'MessageId': 'cd3bbfe7-b8...qjzndkydyydcsrcnwjzqaquiyaewwgpwnsgploesbgxapkpjbhmacicevvpsvwnabpzqoxvmhjlpdhcinmmaunxqtehjlgbiiznemevwmofpcdc'}, ...])

test/test_main.py:38: AssertionError
================================================================== short test summary info ==================================================================
FAILED test/test_main.py::test_send_message - assert 10 == 5
==================================================================== 1 failed in 30.81s =====================================================================

休止の部分をtime.sleep(20)にして再度pytestを実行すると、今度は不可視の状態で2回目のメッセージ取得がされるので、5件のままとなり、成功しました。

project % pytest
==================================================================== test session starts ====================================================================
platform darwin -- Python 3.9.0, pytest-7.1.2, pluggy-1.0.0
rootdir: /Users/ユーザー名/Desktop/sqs-moto/project
collected 1 item                                                                                                                                            

test/test_main.py .                                                                                                                                   [100%]

==================================================================== 1 passed in 25.62s =====================================================================

最後に

boto3からSQSへのアクセスをmotoでモックする例をご紹介しました。可視性タイムアウトのようなSQSの挙動も再現でき、とても便利でした。参考になりましたら幸いです。

参考