paho-mqttをpytestでmock化してみる

2024.01.02

はじめに

データアナリティクス事業本部ビッグデータチームのyosh-kです。
今回は、paho-mqttをpytestでmock化する実装に取り組みたいと思います。

前提

paho-mqttはMQTTブローカーに接続してmessageをpublishし、topicをsubscribeしてpublishされたmessageを受信できるようにするclient classです。

pytestでtestコードを実装しようとした際にpahoのサンプルコードやpytest-mqttというライブラリで検証しましたが、今回の実装に沿ったtest検証はできなかったため、pytest-mockを用いた実装を行いました。

実際に実装した処理がMQTT Brokerを通して動かなければ意味がないため、その検証用にmosquittoをMac OSにinstallします。オープンソースのMQTT Brokerはいくつかありますが、今回は以前使用しておりinstallが容易なmosquittoを選択しています。

以下のコマンドでmosquittoをinstallします。

brew install mosquitto

install後は、mosquittoが正常に立ち上がるかを確認します。

brew services start mosquitto
brew services list

立ち上がった場合は以下のようにstatusがstartedになります。

(base) @blog_code % brew services list
Name      Status  User           File
dbus      none                   
mosquitto started kasama~/Library/LaunchAgents/homebrew.mxcl.mosquitto.plist
mysql@5.7 none                   
unbound   none                   
(base) kasama@blog_code %

停止したい場合はstopコマンドを実行します。

brew services stop mosquitto

実装

それでは実装になります。実装の構成は以下になります。pythonでMQTT送受信記事を参考にmain.pyを実装し、main.pyをpytestでtestします。

(31-paho-mqtt-pytest-mocking-py3.12) @31_paho_mqtt_pytest_mocking % tree
.
├── README.md
├── main.py
├── pyproject.toml
└── tests
    ├── conftest.py
    └── test_main.py

4 directories, 9 files
(31-paho-mqtt-pytest-mocking-py3.12) @31_paho_mqtt_pytest_mocking %

まずは必要なライブラリをインストールします。

pip install paho-mqtt pytest pytest-mock pytest-cov

main.py

import json
import paho.mqtt.client as mqtt  # MQTTのライブラリをインポート
import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


# ブローカーに接続できたときの処理
def on_connect(client, userdata, flag, rc):
    logger.info("Connected with return code " + str(rc))  # 接続できた旨表示
    logger.info("subscribe drone/#")
    client.subscribe("drone/#")  # subするトピックを設定


# ブローカーが切断したときの処理
def on_disconnect(client, userdata, rc):
    if rc != 0:
        logger.warning("Unexpected disconnection.")


# publishが完了したときの処理
def on_publish(client, userdata, mid):
    logger.info(f"published: client:{client}, userdata: {userdata}, mid: {mid}")


def publish_msg(client, topic, msg):
    logger.info(f"publish client:{client},topic:{topic},msg:{msg}")
    client.publish(topic, msg)  # トピック名とメッセージを決めて送信


# メッセージが届いたときの処理
def on_message(client, userdata, msg):
    # msg.topicにトピック名が,msg.payloadに届いたデータ本体が入っている
    payload = json.loads(msg.payload.decode())
    topic = str(msg.topic)
    logger.info(
        f"Received payload:{str(payload)}, topic: {topic},  with QoS: {str(msg.qos)}, userdata: {str(userdata)}"
    )
    if "drone/" in topic:
        dron_num = topic.split("/")[1]
        if payload["status"] == "running":
            userdata.add(topic)
            publish_msg(client, f"feed/drone/{dron_num}", "start-runnning")
        elif payload["status"] == "stopped":
            userdata.discard(topic)
            publish_msg(client, f"feed/drone/{dron_num}", "stopped")


def main():
    # MQTTの接続設定
    active_drones = set()
    client = mqtt.Client(userdata=active_drones)  # クラスのインスタンス(実体)の作成
    client.on_connect = on_connect  # 接続時のコールバック関数を登録
    client.on_disconnect = on_disconnect  # 切断時のコールバックを登録
    client.on_message = on_message  # メッセージ到着時のコールバック
    client.on_publish = on_publish  # メッセージ送信時のコールバック

    client.connect("localhost", 1883, 60)  # 接続先は自分自身

    client.loop_forever()  # 永久ループして待ち続ける


if __name__ == "__main__":
    main()
  • on_connect関数でclient.subscribe("drone/#")と記載することで、drone/ではじまる全てのTOPICに対してSubscribeします。
  • on_message関数でTOPICがdrone/である際にfeed/drone/TOPICにpublishするような実装とします。
  • userdataでは、statusがrunning状態のdroneを保持する処理を実装しています。

conftest.py

import pytest
import paho.mqtt.client as mqtt


def pytest_runtest_setup(item):
    print(f"\n--------Start----{item.name}---")


def pytest_runtest_teardown(item):
    print(f"\n--------End----{item.name}-----")


# MQTTクライアントのモックを提供するフィクスチャ
@pytest.fixture
def mock_client(mocker):
    return mocker.Mock(spec=mqtt.Client)


# active_dronesをフィクスチャとして定義
@pytest.fixture(scope="module")
def active_drones():
    return set()

conftest.pyはpytestがテストを検出して実行する際に参照されるファイルです。pytestはテストを実行する前に自動的にconftest.pyを読み込んで、その中に定義されているfixtureやhook関数を利用できるようにします。conftest.pyの中で定義された内容は、特定のイベントが発生したときに実行されます。

  • fixture: テスト関数にfixture名が引数として渡されたときに呼び出されます。fixtureのscope(function, class, module, session)によって、そのライフサイクル(再利用される期間)が決定されます。
  • hook関数(pytest_runtest_setup, pytest_runtest_teardownなど): pytestの特定の実行フェーズ(テスト前準備やテスト後始末など)で自動的に呼び出されます。 conftest.py自体はテスト実行プロセスの初期化中に読み込まれるが、その中のfixtureやhook関数は上記のような特定の状況やテストごとの実行フェーズに従って実行されます。 テスト関数が実行される直前にはpytest_runtest_setupが呼ばれ、テスト関数の実行が完了した直後にはpytest_runtest_teardownが呼ばれます。
  • mock_clientではmqtt.Clientをmock化し、active_dronesはmoduleごとに共通のsetを使用したいために定義しています。

  • Python(pytest)でテスト書くならfixture,conftest,parametrizeを理解すると世界が一気に変わる

test_main.py

import json

import sys
from pathlib import Path

# ルートディレクトリへのパスを追加
current_dir = Path(__file__).parent
root_dir = current_dir.parent
sys.path.append(str(root_dir))
import pytest


import main  # sub.pyの内容をインポート


def test_on_connect(mock_client):
    main.on_connect(mock_client, None, None, 0)
    mock_client.subscribe.assert_called_with("drone/#")


def test_on_disconnect(mocker, mock_client):
    # logger.warningのモックを作成
    mock_warning = mocker.patch("main.logger.warning")

    # 正常な切断シナリオ(rc=0)
    main.on_disconnect(mock_client, None, 0)
    mock_warning.assert_not_called()  # 通常の切断ではwarningが記録されないことを確認
    mock_warning.reset_mock()
    # 異常な切断シナリオ(rc!=0)
    main.on_disconnect(mock_client, None, 1)
    # 異常切断ではwarningが記録されることを確認
    mock_warning.assert_called_once_with("Unexpected disconnection.")


@pytest.mark.parametrize(
    "topic, status, expected_call",
    [
        ("drone/001", "running", "start-runnning"),
        ("drone/002", "stopped", "stopped"),
        ("drone/003", "running", "start-runnning"),
        ("drone/004", "stopped", "stopped"),
    ],
)
def test_on_message(mock_client, mocker, active_drones, topic, status, expected_call):
    mock_msg = mocker.Mock()
    payload_json = json.dumps({"status": status})
    mock_msg.topic = topic
    mock_msg.payload = payload_json.encode()
    mock_msg.qos = 1
    main.on_message(mock_client, active_drones, mock_msg)
    mock_client.publish.assert_called_once_with(f"feed/{topic}", expected_call)
    mock_client.publish.reset_mock()


def test_main(mocker, mock_client):
    mocker.patch("main.mqtt.Client", return_value=mock_client)
    main.main()
    mock_client.connect.assert_called_with("localhost", 1883, 60)
    mock_client.loop_forever.assert_called()

test_main.pyで実際のテスト内容を実装しています。

  • 最初にルートディレクトリのpathを追加しているのは、main.pyをimportするためです。test_main.pyが存在するカレントディレクトリ(tests)配下では、import errorとなってしまうため、main.pyが存在するルートディレクトリへのpathを追加しています。
  • test_on_connect関数では、mock化したmqtt.Clientを用いてsubscribe関数がdrone/#"を引数として実行されたか試験します。
  • test_on_disconnect関数では、mock化したmqtt.Clientを用いて切断処理の正常/異常を試験しています。
  • test_on_message関数では、mock化したmqtt.Clientmsg、データを用いて、publish関数が想定の引数で呼ばれているか、active_dronesの値は想定通りかを試験しています。parametrizeは1つのテスト関数で色々なパターンのデータでテストをしたい場合に使用します。
  • test_main関数では、mock化したmain.mqtt.Clientを用いてconnectloop_foreverが実行されたかを試験します。

実行

pytest実行前にmosquittoを用いて実装が動作するかをテストします。mosquittoのstatusはstartedであることを確認し、main.pyを実行します。

(31-paho-mqtt-pytest-mocking-py3.12) @31_paho_mqtt_pytest_mocking % python main.py
INFO:__main__:Connected with return code 0
INFO:__main__:subscribe drone/#

この状態で以下のコマンドを実行し、brokerにpublishします。

mosquitto_pub -h localhost -t "drone/001" -m '{"status": "running"}'
mosquitto_pub -h localhost -t "drone/002" -m '{"status": "running"}'
mosquitto_pub -h localhost -t "drone/001" -m '{"status": "stopped"}'

正常にsubscribe,publishできていること、userdataがadddiscard処理ができていることを確認できました。

次にpytestを実行します。私の場合poetryでプロジェクト管理をしており、tomlファイル上でpytest実行時のlogger出力を設定していますが、その設定が無い場合でも標準出力やlogger出力を表示させるために以下のoptionを付与したコマンドを実行します。

pytest -o log_cli=true -o log_cli_level=DEBUG -s
  • -o log_cliオプションは、trueに設定することによりpytestのログ出力をCLIを通じて実行されているConsole(Terminal)に直接表示させる設定です。
  • -o log_cli_levelオプションは、CLIを通じて実行されているConsole(Terminal)へのログ出力レベルを設定するものです。
  • -sオプションは、テスト中の標準出力、標準エラー出力をコンソール上に表示されることにする設定です。デフォルトでは、pytestはこれらの出力をキャプチャして、テストが失敗したときにのみ表示する設定になっています。

実行結果になります。想定通りtestは成功しました。test_on_messageも4回実行されています。

coverageも取得してみるために、以下コマンドを実行します。

pytest --cov

実行できなかった処理が2つあったため、処理全体のcoverageは98%となりました。

続いて網羅されていない箇所を確認したいため、cov-reportオプションを付与して実行しreportをhtmlとして出力します。

pytest --cov --cov-report=html

index.htmlファイルがhtmlconvフォルダの中に生成されるためブラウザ上からhtmlを表示します。

網羅されていない2箇所を確認することができました。

最後に

paho-mqttはまだまだ細かい設定があり、ドキュメントも豊富なので引き続き学習していきたいと思いました。