[Tips] Firehoseで吐いたJSONが繋がったファイルをPythonでパースしてみる

2020.02.08

CX事業本部の夏目です。

IoTのバックエンドをサーバーレスで開発しているのですが、デバイスからMQTTで上がってくるデータをIoT Rule -> Kinesis Data Strem -> Kinesis Firehose -> S3 Bucketといった形で吐き出させています。

しかしこのデータ、JSONの形式とは一致しないため、パースするには工夫が必要です。

今回はPythonでパースする方法を共有します。

データ

{"payload":{"id":"4d497f23-41c8-4488-8a45-099875a83106"},"timestamp":1581080246231}{"payload":{"id":"546c0712-2c93-424c-8350-f7ab3aaa3117"},"timestamp":1581080246785}{"payload":{"id":"9c385ea9-e507-4ae1-9e16-820df50c9786"},"timestamp":1581080247343}{"payload":{"id":"41edb0ab-24e4-445f-a84b-912e4bd38a61"},"timestamp":1581080247896}{"payload":{"id":"1c56d473-75e0-4518-bb2c-eb7d08c15c0c"},"timestamp":1581080248499}{"payload":{"id":"a51bf3e6-cb26-465c-b608-d2c0688289b3"},"timestamp":1581080249171}{"payload":{"id":"3021c568-8695-49c0-bae2-493145427ace"},"timestamp":1581080249751}{"payload":{"id":"073e05a2-d803-480a-b0ff-ce8dd25c1500"},"timestamp":1581080250308}{"payload":{"id":"71ac5a8c-a7b8-41a9-a756-764023c0e2e3"},"timestamp":1581080250911}{"payload":{"id":"16fab82e-66d1-4591-93cd-edfcf9477347"},"timestamp":1581080251466}{"payload":{"id":"cf865487-7847-428a-b593-51f193e99259"},"timestamp":1581080252017}{"payload":{"id":"f8d7dfdf-7a1a-4101-96d4-5dc3ead1bd76"},"timestamp":1581080252586}{"payload":{"id":"47928637-6c26-4cef-8c95-10ec1fe0d57f"},"timestamp":1581080253160}

複数のJSONドキュメントをただ連結しただけのものです。

MQTTでJSONデータを受け取ってFirehoseでS3に保存するとこうなります。

Pythonで読み込む

from json.decoder import WHITESPACE, JSONDecoder
from typing import Generator


def load_iter(text: str) -> Generator:
    size = len(text)
    decoder = JSONDecoder()
    index = 0
    while index < size:
        obj, offset = decoder.raw_decode(text, index)
        yield obj
        search = WHITESPACE.search(text, offset)
        if search is None:
            break
        index = search.end()
        
        
if __name__ == '__main__':
    text = open('path').read()
    for obj in load_iter(text):
        print('===')
        print(obj)

load_iterという関数でデータのパースを行っています。
PythonのGeneratorを使用しているので、そのままループで処理させることができます。

ちなみに実行結果はこうなります。

===
{'payload': {'id': '4d497f23-41c8-4488-8a45-099875a83106'}, 'timestamp': 1581080246231}
===
{'payload': {'id': '546c0712-2c93-424c-8350-f7ab3aaa3117'}, 'timestamp': 1581080246785}
===
{'payload': {'id': '9c385ea9-e507-4ae1-9e16-820df50c9786'}, 'timestamp': 1581080247343}
===
{'payload': {'id': '41edb0ab-24e4-445f-a84b-912e4bd38a61'}, 'timestamp': 1581080247896}
===
{'payload': {'id': '1c56d473-75e0-4518-bb2c-eb7d08c15c0c'}, 'timestamp': 1581080248499}
===
{'payload': {'id': 'a51bf3e6-cb26-465c-b608-d2c0688289b3'}, 'timestamp': 1581080249171}
===
{'payload': {'id': '3021c568-8695-49c0-bae2-493145427ace'}, 'timestamp': 1581080249751}
===
{'payload': {'id': '073e05a2-d803-480a-b0ff-ce8dd25c1500'}, 'timestamp': 1581080250308}
===
{'payload': {'id': '71ac5a8c-a7b8-41a9-a756-764023c0e2e3'}, 'timestamp': 1581080250911}
===
{'payload': {'id': '16fab82e-66d1-4591-93cd-edfcf9477347'}, 'timestamp': 1581080251466}
===
{'payload': {'id': 'cf865487-7847-428a-b593-51f193e99259'}, 'timestamp': 1581080252017}
===
{'payload': {'id': 'f8d7dfdf-7a1a-4101-96d4-5dc3ead1bd76'}, 'timestamp': 1581080252586}
===
{'payload': {'id': '47928637-6c26-4cef-8c95-10ec1fe0d57f'}, 'timestamp': 1581080253160}

まとめ

いかがでしょうか。 Firehoseで吐いたデータを簡単にパースできました。

機会があったら使ってみてください。