Audioデータをクラウドに送ってみました。 MQTT + Amazon Kinesis Data Firehose + S3 (JSON)

2021.04.17

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

CX事業本部の平内(SIN)です。

今回は、エッジ側のAudioデータをクラウドへ送信する要領を確認してみました。

送信の方法や、保存先は色々考えられますが、手始めにMQTTで送信し、Amazon Kinesis Data Firehose経由で、S3に保存する形を試してみました。

また、保存されたAudioデータから時間を指定して、wavファイルを生成するLambda関数も書いてみました。

確認が容易なように、とりあえず、Audioデータをターゲットとしていますが、連続するストリームを扱うという意味では、Audioに限らず応用ができるのではと考えています。

2 Audio入力

エッジ側は、RaspberryPiとしたのですが、デフォルトでAudio入力の機能が無いので、Webカメラをつないで、Audio入力に使用しました。

Webカメラ(C920)を接続して、arecordコマンドでAudioデバイスを一覧すると、device 0で認識されていることが分かります。

$ arecord -l
**** List of CAPTURE Hardware Devices ****
card 1: C920 [HD Pro Webcam C920], device 0: USB Audio [USB Audio]
  Subdevices: 1/1
  Subdevice #0: subdevice #0

このデバイスIDを使用して、下記のコードで、得られる情報から、C920のAudio入力は、サンプルレートが、32KHz 、チャンネル数が2ということが分かります。

device_info.py

import pyaudio

p = pyaudio.PyAudio()
info = p.get_device_info_by_index(0)
print(info)
$ python3 device_info.py
・・・略)
{'index': 0, 'structVersion': 2, 'name': 'HD Pro Webcam C920: USB Audio (hw:1,0)', \
 'hostApi': 0, 'maxInputChannels': 2, 'maxOutputChannels': 0, \
 'defaultLowInputLatency': 0.01196875, 'defaultLowOutputLatency': -1.0, \
 'defaultHighInputLatency': 0.048, 'defaultHighOutputLatency': -1.0, \
 'defaultSampleRate': 32000.0}

なお、pyaudioのインストールは、以下で行いました。

brew install portaudio
pip3 install pyaudio

3 録音

下記のコードは、Audio入力から10秒間録音し、sample.wavを生成するものです。

ストリームのバッファサイズをサンプルレートと同じにしているので、ループは、1秒毎に処理されます。ストリームから取得したデータは、チャンネルを1つ削減し、また、1/4に間引くことで、雑にサンプルレートを8Khzに変換しています。

チャンネル及びサンプルレートを削減したのは、事後、クラウドへ送信するデータ量を少し節約しようという意図です。

取得したデータは、waveライブラリを使用して、ヘッダを追加してwavファイルに保存しています。

record.py

import pyaudio
import wave
import numpy as np

DEVICE_INDEX = 0
CHANNELS = 2 
SAMPLE_RATE = 32000 # サンプルレート
CHUNK = SAMPLE_RATE # 1秒ごとに取得する
FORMAT = pyaudio.paInt16
RECORD_SECONDS = 10 # 10秒間録音する

# open stream
p = pyaudio.PyAudio()
stream = p.open(format = FORMAT,
                channels = CHANNELS,
                rate = SAMPLE_RATE,
                input =  True,
                input_device_index = DEVICE_INDEX,
                frames_per_buffer = CHUNK)
# recording
print("recording ...")
frames = []
for _ in range(0, int(SAMPLE_RATE / CHUNK * RECORD_SECONDS)):
    # 1秒分のデータ読み込み
    data = stream.read(CHUNK)
    # numpy配列に変換    
    data = np.frombuffer(data, dtype="int16")
    # チャンネル 2ch -> 1ch
    data = data[0::2] 
    # サンプルレート  32000Hz -> 8000Hz
    data = data[0::4] 
    # byteに戻す 
    data = data.tobytes()
    frames.append(data)
    print ("data size:{}".format(len(data)))
data = b''.join(frames)
print("done.")

# close strema
stream.stop_stream()
stream.close()
p.terminate()

# save
CHANNELS = 1 # 1ch
SAMPLE_RATE = 8000 # 8kHz
file_name = "./sample.wav"
wf = wave.open(file_name, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(SAMPLE_RATE)
wf.writeframes(data)
wf.close()

4 エッジ側のコード

1秒に一回のタイミングで、AudioデータをMQTTで送信しているコードです。

AudioのRAWデータは、16K/secでしたが、テキスト化することで、サイズは、22K/sec程度になっています。

index.py

import pyaudio
from producer import Producer
import numpy as np

DEVICE_INDEX = 0
CHANNELS = 2 
SAMPLE_RATE = 32000 # サンプルレート
FORMAT = pyaudio.paInt16
CHUNK = SAMPLE_RATE # 1秒ごとに取得する

# open stream
p = pyaudio.PyAudio()
stream = p.open(format = FORMAT,
                channels = CHANNELS,
                rate = SAMPLE_RATE,
                input =  True,
                input_device_index = DEVICE_INDEX,
                frames_per_buffer = CHUNK)

producer = Producer()

try:
    print("start ...")
    while True: 
        # 1秒分のデータ読み込み
        data = stream.read(CHUNK)

        # numpy配列に変換    
        data = np.frombuffer(data, dtype="int16")
        # チャンネル 2ch -> 1ch
        data = data[0::2] 
        # サンプルレート  32000Hz -> 8000Hz
        data = data[0::4] 
        # byteに戻す 
        data = data.tobytes()

        producer.send(data)
except:
    stream.stop_stream()
    stream.close()
    p.terminate()

producer.py

from mqtt import Mqtt
import json
from datetime import datetime
import base64

class Producer():
    def __init__(self):
        self.__topic = "topic/audio_transmission"

        root_ca = "./certs/RootCA.pem"
        key = "./certs/xxxxxxxx-private.pem.key"
        cert = "./certs/xxxxxxxx-certificate.pem.crt"
        endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
        self.__mqtt = Mqtt(root_ca, key, cert, endpoint)
        
    def send(self, data):
        now = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
        
        b64_data = base64.b64encode(data)
        raw_data = b64_data.decode()

        payload = {
            "timestamp": now,
            "raw_data": raw_data
        }
        self.__mqtt.publish(self.__topic, json.dumps(payload))

IoT Coreのコンソールで、publishされたデータを確認している様子です。

5 Amazon Kinesis Data Firehose

Amazon Kinesis Data Firehoseは、audio_transmission_firehose_streamという名前で作成し、特に、コンバートや圧縮などは設定せず、destinationをS3バケットとしています。

なお、Buffer sizeは、10MBuffer interval60secとしました。この設定により、1秒ごとに送られくるデータを、1分毎に1ファイルとしてS3に保存されることになります。

1秒分のデータは、約22Kなので、約1.4M(22k*60)以上のバッファにしておけば、Buffer Intervalが先にトリガーされるため、1分毎に出力されるという想定です。

もし、要件的に、あまりリアルタイム性が必要ない場合は、1ファイルに格納するデータ量を大きく取ることで、S3への書き込み回数は節約できることになります。

6 Rule

IoT Coreでは、ルールエンジンを設定して、メッセージブローカーに到着したデータをAmazon Kinesis Data Firehoseに送っています。

セパレータに「改行」を指定して、秒単位のデータを1行ごと分離できるようにしました。

7 S3

S3に出力されたデータです。年月日のプレフィックスと秒数まで入ったKey名で保存されていることが分かります。

1つのファイルを見ると、1秒ごとのデータが60行(60秒分)格納されているのが確認できます。ここから対象秒のRAWデータが取り出せます。

ここまでで、Audioデータの送信は完了となります。

8 Lambda

ここからは、S3に蓄積されたデータから、データを取り出す一例として、Lambdaを書いてみました。

この関数は、取得開始時間、期間(秒)及び、出力ファイル名を与えて実行します。

指定された時間から、S3上のオブジェクト名を検索し、期間に該当するものをダウンロードして、RAWデータをデコードしています。また、RAWデータは、ヘッダを追加してwavファイルとして、S3にアップロードされます。

lambda_function.py

import json
import datetime
import os
import boto3
import base64
import wave

def lambda_handler(event, context):

    BUCKET_NAME = os.environ['BUCKET_NAME']
    
    output_filename = event["output_filename"] # 出力ファイル名
    start_time_str = event["start_time"] # 開始時間(YYYY-mm-dd HH:MM:SS)
    period_sec = int(event["period_sec"]) # 期間(秒)
    print("start_time:{} period_sec:{}".format(start_time_str, period_sec))
    
    # 開始時間と終了時間(JST)
    start_time_jst = datetime.datetime.strptime(start_time_str, '%Y-%m-%d %H:%M:%S')
    end_time_jst = start_time_jst + datetime.timedelta(seconds=period_sec)
    print("JST {} - {} ".format(start_time_jst, end_time_jst))
    
    # S3上のオブジェクト名を検索するため
    # 開始時間と終了時間(UTC) ファイル検索用なので、開始時間は、1分前とする
    start_time_utc = start_time_jst + datetime.timedelta(hours=-9)
    end_time_utc = start_time_utc + datetime.timedelta(seconds=period_sec)
    start_time_utc = start_time_utc + datetime.timedelta(minutes=-1)
    print("UTC {} - {}".format(start_time_utc, end_time_utc))

    s3client = boto3.client('s3')

    # 当日分のオブジェクト名を列挙する
    prefix = "{:4d}/{:02d}/{:02d}".format(
        start_time_utc.year,
        start_time_utc.month,
        start_time_utc.day
    )
    response = s3client.list_objects_v2(
                Bucket = BUCKET_NAME,
                Prefix = prefix
            )
        
    # オブジェクト名から、対象期間ヒットするオブジェクトを列挙する
    target_keys = []
    if("Contents" in response):
        for content in response["Contents"]:
            # オブジェクト名からtimestampを取得する
            key = content["Key"]
            tmp = key.split('/')[4].split('-')
            year = int(tmp[2])
            month = int(tmp[3])
            day = int(tmp[4])
            hour = int(tmp[5])
            minute = int(tmp[6])
            second = int(tmp[7])
            dt = datetime.datetime(year, month, day, hour, minute, second, microsecond=0)
            if(start_time_utc <= dt and dt <= end_time_utc):
                target_keys.append(key)
    target_keys.sort()

    # オブジェクトをダウンロードして、timestampが取得期間ヒットするRAWデータを取得する
    frames = []
    for key in target_keys:
        body = s3client.get_object(Bucket=BUCKET_NAME, Key=key)['Body'].read()
        # 1行が、1秒分のデータとなっている
        lines = body.decode().split('\n') 
        for line in lines:
            if(line==''):
                continue
            s = json.loads(line)
            timestamp = datetime.datetime.strptime(s["timestamp"], '%Y-%m-%dT%H:%M:%S')
            # 取得期間のデータは、framesに追加する
            if(start_time_jst <= timestamp and timestamp <= end_time_jst):
                # テキストデータをバイナリに戻す
                b64_data = s["raw_data"].encode()
                frames.append(base64.b64decode(b64_data))
    data = b''.join(frames)

    # RAWデータをwavファイルとして保存する
    CHANNELS = 1 # 1ch
    SAMPLE_RATE = 8000 # 8kHz
    SAMPLE_WIDTH = 2 
    tmp_file_name = "/tmp/tmp.wav"
    wf = wave.open(tmp_file_name, 'wb')
    wf.setnchannels(CHANNELS)
    wf.setsampwidth(SAMPLE_WIDTH)
    wf.setframerate(SAMPLE_RATE)
    wf.writeframes(data)
    wf.close()

    s3client.upload_file(tmp_file_name, BUCKET_NAME, output_filename)

    return {}

AWSコンソールから、取得開始時間、期間及び、出力ファイル名を指定して関数を実行している様子です。

S3上に指定したファイル名でwavファイルがアップロードされます。

9 最後に

今回は、MQTTを使用してAudioデータをクラウドに送信する要領を試してみました。

Audioデータをテキスト化するなど、ちょっとオーバーヘッドな感じですが、一応、エッジ側のタイムスタンプが入っています。MQTTにJSON形式で送っているため、付随するアトリビュートがあれば、追加してルールで捌くなどの処理も可能だと思います。

全てのコードは下記に置きました
https://github.com/furuya02/AudioTransmission/tree/main/sample_1