この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
CX事業本部の夏目です。
IoTのバックエンドをサーバーレスで開発しているのですが、デバイスからMQTTで上がってくるデータをIoT Rule -> Kinesis Data Strem -> Kinesis Firehose -> S3 Bucket
といった形で吐き出させています。
しかしこのデータ、JSONの形式とは一致しないため、パースするには工夫が必要です。
今回はPythonでパースする方法を共有します。
データ
{"payload":{"id":"4d497f23-41c8-4488-8a45-099875a83106"},"timestamp":1581080246231}{"payload":{"id":"546c0712-2c93-424c-8350-f7ab3aaa3117"},"timestamp":1581080246785}{"payload":{"id":"9c385ea9-e507-4ae1-9e16-820df50c9786"},"timestamp":1581080247343}{"payload":{"id":"41edb0ab-24e4-445f-a84b-912e4bd38a61"},"timestamp":1581080247896}{"payload":{"id":"1c56d473-75e0-4518-bb2c-eb7d08c15c0c"},"timestamp":1581080248499}{"payload":{"id":"a51bf3e6-cb26-465c-b608-d2c0688289b3"},"timestamp":1581080249171}{"payload":{"id":"3021c568-8695-49c0-bae2-493145427ace"},"timestamp":1581080249751}{"payload":{"id":"073e05a2-d803-480a-b0ff-ce8dd25c1500"},"timestamp":1581080250308}{"payload":{"id":"71ac5a8c-a7b8-41a9-a756-764023c0e2e3"},"timestamp":1581080250911}{"payload":{"id":"16fab82e-66d1-4591-93cd-edfcf9477347"},"timestamp":1581080251466}{"payload":{"id":"cf865487-7847-428a-b593-51f193e99259"},"timestamp":1581080252017}{"payload":{"id":"f8d7dfdf-7a1a-4101-96d4-5dc3ead1bd76"},"timestamp":1581080252586}{"payload":{"id":"47928637-6c26-4cef-8c95-10ec1fe0d57f"},"timestamp":1581080253160}
複数のJSONドキュメントをただ連結しただけのものです。
MQTTでJSONデータを受け取ってFirehoseでS3に保存するとこうなります。
Pythonで読み込む
from json.decoder import WHITESPACE, JSONDecoder
from typing import Generator
def load_iter(text: str) -> Generator:
size = len(text)
decoder = JSONDecoder()
index = 0
while index < size:
obj, offset = decoder.raw_decode(text, index)
yield obj
search = WHITESPACE.search(text, offset)
if search is None:
break
index = search.end()
if __name__ == '__main__':
text = open('path').read()
for obj in load_iter(text):
print('===')
print(obj)
load_iter
という関数でデータのパースを行っています。
PythonのGeneratorを使用しているので、そのままループで処理させることができます。
ちなみに実行結果はこうなります。
===
{'payload': {'id': '4d497f23-41c8-4488-8a45-099875a83106'}, 'timestamp': 1581080246231}
===
{'payload': {'id': '546c0712-2c93-424c-8350-f7ab3aaa3117'}, 'timestamp': 1581080246785}
===
{'payload': {'id': '9c385ea9-e507-4ae1-9e16-820df50c9786'}, 'timestamp': 1581080247343}
===
{'payload': {'id': '41edb0ab-24e4-445f-a84b-912e4bd38a61'}, 'timestamp': 1581080247896}
===
{'payload': {'id': '1c56d473-75e0-4518-bb2c-eb7d08c15c0c'}, 'timestamp': 1581080248499}
===
{'payload': {'id': 'a51bf3e6-cb26-465c-b608-d2c0688289b3'}, 'timestamp': 1581080249171}
===
{'payload': {'id': '3021c568-8695-49c0-bae2-493145427ace'}, 'timestamp': 1581080249751}
===
{'payload': {'id': '073e05a2-d803-480a-b0ff-ce8dd25c1500'}, 'timestamp': 1581080250308}
===
{'payload': {'id': '71ac5a8c-a7b8-41a9-a756-764023c0e2e3'}, 'timestamp': 1581080250911}
===
{'payload': {'id': '16fab82e-66d1-4591-93cd-edfcf9477347'}, 'timestamp': 1581080251466}
===
{'payload': {'id': 'cf865487-7847-428a-b593-51f193e99259'}, 'timestamp': 1581080252017}
===
{'payload': {'id': 'f8d7dfdf-7a1a-4101-96d4-5dc3ead1bd76'}, 'timestamp': 1581080252586}
===
{'payload': {'id': '47928637-6c26-4cef-8c95-10ec1fe0d57f'}, 'timestamp': 1581080253160}
まとめ
いかがでしょうか。 Firehoseで吐いたデータを簡単にパースできました。
機会があったら使ってみてください。