この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
CX事業本部の平内(SIN)です。
ここまで、エッジ側のAudioデータをクラウドへ送信する要領をいくつか確認してみました。
上記は、すべて、JSON形式のデータとして送信しましたが、Amazon Kinesis Data StreamsもAmazon Kinesis Data Firehoseもboto3のパラメータは、バイナリ(内部でシリアル化される時、Base64で変換されている)となっています。
そこで、今回は、Audioデータをテキスト化するオーバーヘッドを無くすために、バイナリ形式のまま送信する要領を確認してみました。
2 構成
構成は、前回と同じです。
デバイスからは、500msec単位で、Audioデータ(バイナリ形式)とタイムスタンプ(バイナリ形式)を結合して送信します。
このため、Amazon Kinesis Data Streams、Amazon Kinesis Data Firehose及び、S3バケットでは、バイナリ形式のまま保存されます。
データは、必要に応じて、Lambdaで処理(デコード)されます。
3 バイナリ形式のデータ
500msec分のRAWデータは、8,000byteですが、これにタイムスタンプ(8byte)を足して、1つのデータ単位は8,008byteとしました。
下記は、最終的にS3に保存された状況です。
やや、バラバラなサイズに見えますが、ダウンロードしてサイズを確認すると、8,008の倍数になっていることが分かります。
-rw-r--r--@ 1 976976 4 17 20:55 audio_transmission_firehose-1-2021-04-17-11-41-45-4f667d04-0ac6-457c-b274-54a9ac67375b
-rw-r--r--@ 1 840840 4 17 20:55 audio_transmission_firehose-1-2021-04-17-11-42-46-85bcf123-dddd-4d11-98d8-5af1c4b65db9
-rw-r--r--@ 1 968968 4 17 20:53 audio_transmission_firehose-1-2021-04-17-11-43-48-fd291ba6-f423-4651-8c3c-024f26852d3d
-rw-r--r--@ 1 976976 4 17 20:55 audio_transmission_firehose-1-2021-04-17-11-44-48-510ca90f-b905-4b99-96ba-a41192285941
976976/8008 = 122
840840/8008 = 105
968968/8008 = 121
976976/8008 = 122
下記のコードは、AudioのRAWデータ(擬似的に10byteのデータ)と、タイムスタンプを結合して、再び、分解しているサンプルです。
pack_unpack.py
import datetime
import struct
def disp(dt, data):
print("{} {} type:{} len:{}".format(dt, data, type(data), len(data)))
# 転送前のデータ
print("- before -")
raw_data = bytes(range(0, 10))
now = datetime.datetime.now()
disp(now, raw_data)
# 時刻とデータの結合
transfer_data = struct.pack('<d', now.timestamp()) + raw_data
print("- transfer -")
print("{} type:{} len:{}".format(transfer_data, type(transfer_data), len(transfer_data)))
# 転送後のデータ
print("- after -")
# 時刻とデータの分離
ts = transfer_data[:8]
raw_data_2 = transfer_data[8:]
now_2 = datetime.datetime.fromtimestamp(struct.unpack('<d', ts)[0])
disp(now_2, raw_data_2)
$ python3 pack_unpack.py
- before -
2021-04-17 21:42:35.371176 b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t' type:<class 'bytes'> len:10
- transfer -
b'Y\xc1\xd7\xee\xb5\x1e\xd8A\x00\x01\x02\x03\x04\x05\x06\x07\x08\t' type:<class 'bytes'> len:18
- after -
2021-04-17 21:42:35.371176 b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t' type:<class 'bytes'> len:10
このコードを使用して、既存のコードをバイナリ対応に変更します。
4 エッジ側のコード
エッジ側で500ms毎に、Amazon Kinesis Data Streamsに送信しているコードです。
JSONの時は、改行でデータ単位を分割しましたが、今回は、1データのサイズが固定なので、セパレータはありません。
index.py
import pyaudio
from producer import Producer
import numpy as np
producer = Producer()
DEVICE_INDEX = 0
CHANNELS = 2
SAMPLE_RATE = 32000 # サンプルレート
FORMAT = pyaudio.paInt16
CHUNK = int(SAMPLE_RATE/2) # 500msごとに取得する
p = pyaudio.PyAudio()
stream = p.open(format = FORMAT,
channels = CHANNELS,
rate = SAMPLE_RATE,
input = True,
input_device_index = DEVICE_INDEX,
frames_per_buffer = CHUNK)
try:
print("start ...")
while True:
# 500ms秒分のデータ読み込み
data = stream.read(CHUNK)
# numpy配列に変換
data = np.frombuffer(data, dtype="int16")
# チャンネル 2ch -> 1ch
data = data[0::2]
# サンプルレート 32000Hz -> 8000Hz
data = data[0::4]
# byteに戻す
data = data.tobytes()
# Amazon Kinesis Data Streamsへの送信
producer.send(data)
except:
stream.stop_stream()
stream.close()
p.terminate()
producer.py
import json
from datetime import datetime
import base64
import boto3
from boto3.session import Session
import time
import random
import struct
class Producer():
def __init__(self):
self.__identity_id = "ap-northeast-1:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
self.__region = "ap-northeast-1"
self.__stream_name = 'audio_transmission_data_stream'
self.__kinesis = self.__get_kinesis()
def __get_kinesis(self):
client = boto3.client('cognito-identity', self.__region)
resp = client.get_id(IdentityPoolId = self.__identity_id)
resp = client.get_credentials_for_identity(IdentityId=resp['IdentityId'])
secretKey = resp['Credentials']['SecretKey']
accessKey = resp['Credentials']['AccessKeyId']
token = resp['Credentials']['SessionToken']
session = Session(aws_access_key_id = accessKey,
aws_secret_access_key = secretKey,
aws_session_token = token,
region_name = self.__region)
return session.client('kinesis')
def send(self, data):
now = datetime.now()
# 時刻とデータの結合
ts = now.timestamp()
ts = struct.pack('<d', ts)
transfer_data = ts + data
try:
response = self.__kinesis.put_record(
StreamName = self.__stream_name,
PartitionKey = str(random.randrange(0,100)),
Data = transfer_data
)
except Exception as e:
print("Exception: {}", e.args)
print('put_record SequenceNumber:{}'.format(response['SequenceNumber']))
5 Lambda(リアルタイムデータ処理)
Amazon Kinesis Data StreamsをトリガーするLambdaのコードです。
レコードからタイムスタンプとRAWデータを分割して、最大値(ボリューム)を取得して、IoT Coreのトピックに送っています。
import json
import datetime
import os
import boto3
import base64
import numpy as np
import struct
def lambda_handler(event, context):
topic = 'topic/level_meter'
iot = boto3.client('iot-data')
records = event["Records"]
for record in records:
data = base64.b64decode(record["kinesis"]["data"])
print("{} {}".format(type(data), len(data)))
# データを分離
ts = data[:8]
raw_data = data[8:]
ts = struct.unpack('<d', ts)[0]
now = datetime.datetime.fromtimestamp(ts) + datetime.timedelta(hours=9)
# 1データを2byteとして扱う
raw_data = np.frombuffer(raw_data, dtype="int16")
# 最大値を取得する
max = int(raw_data.max())
payload = {
"timestamp": now.strftime('%Y-%m-%dT%H:%M:%S.%f'),
"level": max
}
print("payload:{}".format(payload))
iot.publish(
topic=topic,
qos=0,
payload=json.dumps(payload)
)
return {}
6 Lambda(wavデータ生成)
S3に保存される形式が変わったので、wavを作成するLambdaも変更されています。
import json
import datetime
import os
import boto3
import base64
import wave
import struct
def lambda_handler(event, context):
BUCKET_NAME = os.environ['BUCKET_NAME']
output_filename = event["output_filename"]
start_time_str = event["start_time"]
period_sec = int(event["period_sec"])
print("start_time:{} period_sec:{}".format(start_time_str, period_sec))
# 開始時間と終了時間(JST)
start_time_jst = datetime.datetime.strptime(start_time_str, '%Y-%m-%d %H:%M:%S')
end_time_jst = start_time_jst + datetime.timedelta(seconds=period_sec)
print("JST {} - {} ".format(start_time_jst, end_time_jst))
# S3上のオブジェクト名を検索するため
# 開始時間と終了時間(UTC) ファイル検索用なので、開始時間は、1分前とする
start_time_utc = start_time_jst + datetime.timedelta(hours=-9)
end_time_utc = start_time_utc + datetime.timedelta(seconds=period_sec)
start_time_utc = start_time_utc + datetime.timedelta(minutes=-1)
print("UTC {} - {}".format(start_time_utc, end_time_utc))
s3client = boto3.client('s3')
if("IsLocal" in os.environ and os.environ["IsLocal"] == "Yes"):
session = boto3.Session(profile_name="developer")
s3client = session.client('s3')
# 当日分のオブジェクト名を列挙する
prefix = "{:4d}/{:02d}/{:02d}".format(
start_time_utc.year,
start_time_utc.month,
start_time_utc.day
)
response = s3client.list_objects_v2(
Bucket = BUCKET_NAME,
Prefix = prefix
)
# オブジェクト名から、対象期間ヒットするオブジェクトを列挙する
target_keys = []
if("Contents" in response):
for content in response["Contents"]:
# オブジェクト名からtimestampを取得する
key = content["Key"]
tmp = key.split('/')[4].split('-')
year = int(tmp[2])
month = int(tmp[3])
day = int(tmp[4])
hour = int(tmp[5])
minute = int(tmp[6])
second = int(tmp[7])
dt = datetime.datetime(year, month, day, hour, minute, second, microsecond=0)
if(start_time_utc <= dt and dt <= end_time_utc):
target_keys.append(key)
target_keys.sort()
# オブジェクトをダウンロードして、timestampが取得期間ヒットするRAWデータを取得する
frames = []
for key in target_keys:
body = s3client.get_object(Bucket=BUCKET_NAME, Key=key)['Body'].read()
# 8008byte単位で処理する
data_size = 8008
start = 0
for _ in range(int(len(body)/data_size)):
data = body[start:start+data_size]
raw_data = data[8:]
ts = data[:8]
ts = struct.unpack('<d', ts)[0]
now = datetime.datetime.fromtimestamp(ts)
time_str = now.strftime('%Y-%m-%dT%H:%M:%S.%f')
timestamp = datetime.datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%S.%f')
# 取得期間のデータは、framesに追加する
if(start_time_jst <= timestamp and timestamp <= end_time_jst):
frames.append(raw_data)
start += data_size
data = b''.join(frames)
# RAWデータをwavファイルとして保存する
CHANNELS = 1 # 1ch
SAMPLE_RATE = 8000 # 8kHz
SAMPLE_WIDTH = 2
tmp_file_name = "/tmp/tmp.wav"
wf = wave.open(tmp_file_name, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(SAMPLE_WIDTH)
wf.setframerate(SAMPLE_RATE)
wf.writeframes(data)
wf.close()
s3client.upload_file(tmp_file_name, BUCKET_NAME, output_filename)
return {}
7 最後に
今回は、データをバイナリで扱う要領を確認してみました。
オーバーヘッドとデータサイズは、確実に減っていると思いますが、途中でデータをトリガーするには、やはり、少し面倒になります。
要件によって検討ということでしょうか・・・
全てのコードは下記に置きました
https://github.com/furuya02/AudioTransmission/tree/main/sample_4