Audioデータをクラウドに送ってみました。 Amazon Kinesis Data Streams + Amazon Kinesis Data Firehose + S3 (バイナリ)

2021.04.17

1 はじめに

CX事業本部の平内(SIN)です。

ここまで、エッジ側のAudioデータをクラウドへ送信する要領をいくつか確認してみました。

上記は、すべて、JSON形式のデータとして送信しましたが、Amazon Kinesis Data StreamsAmazon Kinesis Data Firehoseもboto3のパラメータは、バイナリ(内部でシリアル化される時、Base64で変換されている)となっています。


https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_record


https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/firehose.html#Firehose.Client.put_record

そこで、今回は、Audioデータをテキスト化するオーバーヘッドを無くすために、バイナリ形式のまま送信する要領を確認してみました。

2 構成

構成は、前回と同じです。

デバイスからは、500msec単位で、Audioデータ(バイナリ形式)とタイムスタンプ(バイナリ形式)を結合して送信します。

このため、Amazon Kinesis Data StreamsAmazon Kinesis Data Firehose及び、S3バケットでは、バイナリ形式のまま保存されます。

データは、必要に応じて、Lambdaで処理(デコード)されます。

3 バイナリ形式のデータ

500msec分のRAWデータは、8,000byteですが、これにタイムスタンプ(8byte)を足して、1つのデータ単位は8,008byteとしました。

下記は、最終的にS3に保存された状況です。

やや、バラバラなサイズに見えますが、ダウンロードしてサイズを確認すると、8,008の倍数になっていることが分かります。

-rw-r--r--@ 1 976976  4 17 20:55 audio_transmission_firehose-1-2021-04-17-11-41-45-4f667d04-0ac6-457c-b274-54a9ac67375b
-rw-r--r--@ 1 840840  4 17 20:55 audio_transmission_firehose-1-2021-04-17-11-42-46-85bcf123-dddd-4d11-98d8-5af1c4b65db9
-rw-r--r--@ 1 968968  4 17 20:53 audio_transmission_firehose-1-2021-04-17-11-43-48-fd291ba6-f423-4651-8c3c-024f26852d3d
-rw-r--r--@ 1 976976  4 17 20:55 audio_transmission_firehose-1-2021-04-17-11-44-48-510ca90f-b905-4b99-96ba-a41192285941
976976/8008 = 122
840840/8008 = 105
968968/8008 = 121
976976/8008 = 122

下記のコードは、AudioのRAWデータ(擬似的に10byteのデータ)と、タイムスタンプを結合して、再び、分解しているサンプルです。

pack_unpack.py

import datetime
import struct

def disp(dt, data):
    print("{} {} type:{} len:{}".format(dt, data, type(data), len(data)))


# 転送前のデータ
print("- before -")
raw_data = bytes(range(0, 10)) 
now = datetime.datetime.now()
disp(now, raw_data)
# 時刻とデータの結合
transfer_data = struct.pack('<d', now.timestamp()) + raw_data


print("- transfer -")
print("{} type:{} len:{}".format(transfer_data, type(transfer_data), len(transfer_data)))

# 転送後のデータ
print("- after -")
# 時刻とデータの分離
ts = transfer_data[:8]
raw_data_2 = transfer_data[8:]
now_2 = datetime.datetime.fromtimestamp(struct.unpack('<d', ts)[0])
disp(now_2, raw_data_2)
$ python3 pack_unpack.py
- before -
2021-04-17 21:42:35.371176 b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t' type:<class 'bytes'> len:10
- transfer -
b'Y\xc1\xd7\xee\xb5\x1e\xd8A\x00\x01\x02\x03\x04\x05\x06\x07\x08\t' type:<class 'bytes'> len:18
- after -
2021-04-17 21:42:35.371176 b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t' type:<class 'bytes'> len:10

このコードを使用して、既存のコードをバイナリ対応に変更します。

4 エッジ側のコード

エッジ側で500ms毎に、Amazon Kinesis Data Streamsに送信しているコードです。

JSONの時は、改行でデータ単位を分割しましたが、今回は、1データのサイズが固定なので、セパレータはありません。

index.py

import pyaudio
from producer import Producer
import numpy as np

producer = Producer()

DEVICE_INDEX = 0
CHANNELS = 2 
SAMPLE_RATE = 32000 # サンプルレート
FORMAT = pyaudio.paInt16
CHUNK = int(SAMPLE_RATE/2) # 500msごとに取得する

p = pyaudio.PyAudio()
stream = p.open(format = FORMAT,
                channels = CHANNELS,
                rate = SAMPLE_RATE,
                input =  True,
                input_device_index = DEVICE_INDEX,
                frames_per_buffer = CHUNK)

try:
    print("start ...")
    while True: 
        # 500ms秒分のデータ読み込み
        data = stream.read(CHUNK)

        # numpy配列に変換    
        data = np.frombuffer(data, dtype="int16")
        # チャンネル 2ch -> 1ch
        data = data[0::2] 
        # サンプルレート  32000Hz -> 8000Hz
        data = data[0::4] 
        # byteに戻す 
        data = data.tobytes()

        # Amazon Kinesis Data Streamsへの送信
        producer.send(data)

except:
    stream.stop_stream()
    stream.close()
    p.terminate()

producer.py

import json
from datetime import datetime
import base64
import boto3
from boto3.session import Session
import time
import random
import struct

class Producer():
    def __init__(self):
        self.__identity_id = "ap-northeast-1:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
        self.__region = "ap-northeast-1"
        self.__stream_name = 'audio_transmission_data_stream'
        self.__kinesis = self.__get_kinesis()

    def __get_kinesis(self):
        client = boto3.client('cognito-identity', self.__region)
        resp =  client.get_id(IdentityPoolId = self.__identity_id)
        resp = client.get_credentials_for_identity(IdentityId=resp['IdentityId'])
        secretKey = resp['Credentials']['SecretKey']
        accessKey = resp['Credentials']['AccessKeyId']
        token = resp['Credentials']['SessionToken']
        session = Session(aws_access_key_id = accessKey,
                  aws_secret_access_key = secretKey,
                  aws_session_token = token,
                  region_name = self.__region)
        return session.client('kinesis')

    def send(self, data):
        now = datetime.now()
        # 時刻とデータの結合
        ts = now.timestamp()
        ts = struct.pack('<d', ts)
        transfer_data = ts + data
        try:
            response = self.__kinesis.put_record(
                StreamName = self.__stream_name, 
                PartitionKey = str(random.randrange(0,100)), 
                Data = transfer_data
            )
        except Exception as e:
            print("Exception: {}", e.args)
        print('put_record SequenceNumber:{}'.format(response['SequenceNumber']))

5 Lambda(リアルタイムデータ処理)

Amazon Kinesis Data StreamsをトリガーするLambdaのコードです。

レコードからタイムスタンプとRAWデータを分割して、最大値(ボリューム)を取得して、IoT Coreのトピックに送っています。

import json
import datetime
import os
import boto3
import base64
import numpy as np
import struct

def lambda_handler(event, context):

    topic = 'topic/level_meter'
    iot = boto3.client('iot-data')

    records = event["Records"]
    for record in records:
        data = base64.b64decode(record["kinesis"]["data"])
        print("{} {}".format(type(data), len(data)))

        # データを分離
        ts = data[:8]
        raw_data = data[8:]

        ts = struct.unpack('<d', ts)[0]
        now = datetime.datetime.fromtimestamp(ts) + datetime.timedelta(hours=9)

        # 1データを2byteとして扱う
        raw_data = np.frombuffer(raw_data, dtype="int16")
        # 最大値を取得する
        max = int(raw_data.max())

        payload = {
            "timestamp": now.strftime('%Y-%m-%dT%H:%M:%S.%f'),
            "level": max
        }
        print("payload:{}".format(payload))
        iot.publish(
            topic=topic,
            qos=0,
            payload=json.dumps(payload)
        )

    return {}

6 Lambda(wavデータ生成)

S3に保存される形式が変わったので、wavを作成するLambdaも変更されています。

import json
import datetime
import os
import boto3
import base64
import wave
import struct

def lambda_handler(event, context):

    BUCKET_NAME = os.environ['BUCKET_NAME']
    output_filename = event["output_filename"]

    start_time_str = event["start_time"]
    period_sec = int(event["period_sec"])
    print("start_time:{} period_sec:{}".format(start_time_str, period_sec))

    # 開始時間と終了時間(JST)
    start_time_jst = datetime.datetime.strptime(start_time_str, '%Y-%m-%d %H:%M:%S')
    end_time_jst = start_time_jst + datetime.timedelta(seconds=period_sec)
    print("JST {} - {} ".format(start_time_jst, end_time_jst))

    # S3上のオブジェクト名を検索するため
    # 開始時間と終了時間(UTC) ファイル検索用なので、開始時間は、1分前とする
    start_time_utc = start_time_jst + datetime.timedelta(hours=-9)
    end_time_utc = start_time_utc + datetime.timedelta(seconds=period_sec)
    start_time_utc = start_time_utc + datetime.timedelta(minutes=-1)
    print("UTC {} - {}".format(start_time_utc, end_time_utc))

    s3client = boto3.client('s3')
    if("IsLocal" in os.environ and os.environ["IsLocal"] == "Yes"):
        session = boto3.Session(profile_name="developer")
        s3client = session.client('s3')

    # 当日分のオブジェクト名を列挙する
    prefix = "{:4d}/{:02d}/{:02d}".format(
        start_time_utc.year,
        start_time_utc.month,
        start_time_utc.day
    )
    response = s3client.list_objects_v2(
                Bucket = BUCKET_NAME,
                Prefix = prefix
            )

    # オブジェクト名から、対象期間ヒットするオブジェクトを列挙する
    target_keys = []
    if("Contents" in response):
        for content in response["Contents"]:
            # オブジェクト名からtimestampを取得する
            key = content["Key"]
            tmp = key.split('/')[4].split('-')
            year = int(tmp[2])
            month = int(tmp[3])
            day = int(tmp[4])
            hour = int(tmp[5])
            minute = int(tmp[6])
            second = int(tmp[7])
            dt = datetime.datetime(year, month, day, hour, minute, second, microsecond=0)
            if(start_time_utc <= dt and dt <= end_time_utc):
                target_keys.append(key)
    target_keys.sort()

    # オブジェクトをダウンロードして、timestampが取得期間ヒットするRAWデータを取得する
    frames = []
    for key in target_keys:
        body = s3client.get_object(Bucket=BUCKET_NAME, Key=key)['Body'].read()

        # 8008byte単位で処理する
        data_size = 8008
        start = 0
        for _ in range(int(len(body)/data_size)):
            data = body[start:start+data_size]

            raw_data = data[8:]
            ts = data[:8]

            ts = struct.unpack('<d', ts)[0]
            now = datetime.datetime.fromtimestamp(ts)
            time_str = now.strftime('%Y-%m-%dT%H:%M:%S.%f')
            timestamp = datetime.datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%S.%f')

            # 取得期間のデータは、framesに追加する
            if(start_time_jst <= timestamp and timestamp <= end_time_jst):
                frames.append(raw_data)
            start += data_size
    data = b''.join(frames)

    # RAWデータをwavファイルとして保存する
    CHANNELS = 1 # 1ch
    SAMPLE_RATE = 8000 # 8kHz
    SAMPLE_WIDTH = 2 
    tmp_file_name = "/tmp/tmp.wav"
    wf = wave.open(tmp_file_name, 'wb')
    wf.setnchannels(CHANNELS)
    wf.setsampwidth(SAMPLE_WIDTH)
    wf.setframerate(SAMPLE_RATE)
    wf.writeframes(data)
    wf.close()

    s3client.upload_file(tmp_file_name, BUCKET_NAME, output_filename)

    return {}

7 最後に

今回は、データをバイナリで扱う要領を確認してみました。

オーバーヘッドとデータサイズは、確実に減っていると思いますが、途中でデータをトリガーするには、やはり、少し面倒になります。

要件によって検討ということでしょうか・・・

全てのコードは下記に置きました
https://github.com/furuya02/AudioTransmission/tree/main/sample_4