この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
CX事業本部の平内(SIN)です。
今回は、エッジ側のAudioデータをクラウドへ送信する要領を確認してみました。
送信の方法や、保存先は色々考えられますが、手始めにMQTTで送信し、Amazon Kinesis Data Firehose経由で、S3に保存する形を試してみました。
また、保存されたAudioデータから時間を指定して、wavファイルを生成するLambda関数も書いてみました。
確認が容易なように、とりあえず、Audioデータをターゲットとしていますが、連続するストリームを扱うという意味では、Audioに限らず応用ができるのではと考えています。
2 Audio入力
エッジ側は、RaspberryPiとしたのですが、デフォルトでAudio入力の機能が無いので、Webカメラをつないで、Audio入力に使用しました。
Webカメラ(C920)を接続して、arecordコマンドでAudioデバイスを一覧すると、device 0で認識されていることが分かります。
$ arecord -l
**** List of CAPTURE Hardware Devices ****
card 1: C920 [HD Pro Webcam C920], device 0: USB Audio [USB Audio]
Subdevices: 1/1
Subdevice #0: subdevice #0
このデバイスIDを使用して、下記のコードで、得られる情報から、C920のAudio入力は、サンプルレートが、32KHz 、チャンネル数が2ということが分かります。
device_info.py
import pyaudio
p = pyaudio.PyAudio()
info = p.get_device_info_by_index(0)
print(info)
$ python3 device_info.py
・・・略)
{'index': 0, 'structVersion': 2, 'name': 'HD Pro Webcam C920: USB Audio (hw:1,0)', \
'hostApi': 0, 'maxInputChannels': 2, 'maxOutputChannels': 0, \
'defaultLowInputLatency': 0.01196875, 'defaultLowOutputLatency': -1.0, \
'defaultHighInputLatency': 0.048, 'defaultHighOutputLatency': -1.0, \
'defaultSampleRate': 32000.0}
なお、pyaudioのインストールは、以下で行いました。
brew install portaudio
pip3 install pyaudio
3 録音
下記のコードは、Audio入力から10秒間録音し、sample.wavを生成するものです。
ストリームのバッファサイズをサンプルレートと同じにしているので、ループは、1秒毎に処理されます。ストリームから取得したデータは、チャンネルを1つ削減し、また、1/4に間引くことで、雑にサンプルレートを8Khzに変換しています。
チャンネル及びサンプルレートを削減したのは、事後、クラウドへ送信するデータ量を少し節約しようという意図です。
取得したデータは、waveライブラリを使用して、ヘッダを追加してwavファイルに保存しています。
record.py
import pyaudio
import wave
import numpy as np
DEVICE_INDEX = 0
CHANNELS = 2
SAMPLE_RATE = 32000 # サンプルレート
CHUNK = SAMPLE_RATE # 1秒ごとに取得する
FORMAT = pyaudio.paInt16
RECORD_SECONDS = 10 # 10秒間録音する
# open stream
p = pyaudio.PyAudio()
stream = p.open(format = FORMAT,
channels = CHANNELS,
rate = SAMPLE_RATE,
input = True,
input_device_index = DEVICE_INDEX,
frames_per_buffer = CHUNK)
# recording
print("recording ...")
frames = []
for _ in range(0, int(SAMPLE_RATE / CHUNK * RECORD_SECONDS)):
# 1秒分のデータ読み込み
data = stream.read(CHUNK)
# numpy配列に変換
data = np.frombuffer(data, dtype="int16")
# チャンネル 2ch -> 1ch
data = data[0::2]
# サンプルレート 32000Hz -> 8000Hz
data = data[0::4]
# byteに戻す
data = data.tobytes()
frames.append(data)
print ("data size:{}".format(len(data)))
data = b''.join(frames)
print("done.")
# close strema
stream.stop_stream()
stream.close()
p.terminate()
# save
CHANNELS = 1 # 1ch
SAMPLE_RATE = 8000 # 8kHz
file_name = "./sample.wav"
wf = wave.open(file_name, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(SAMPLE_RATE)
wf.writeframes(data)
wf.close()
4 エッジ側のコード
1秒に一回のタイミングで、AudioデータをMQTTで送信しているコードです。
AudioのRAWデータは、16K/secでしたが、テキスト化することで、サイズは、22K/sec程度になっています。
index.py
import pyaudio
from producer import Producer
import numpy as np
DEVICE_INDEX = 0
CHANNELS = 2
SAMPLE_RATE = 32000 # サンプルレート
FORMAT = pyaudio.paInt16
CHUNK = SAMPLE_RATE # 1秒ごとに取得する
# open stream
p = pyaudio.PyAudio()
stream = p.open(format = FORMAT,
channels = CHANNELS,
rate = SAMPLE_RATE,
input = True,
input_device_index = DEVICE_INDEX,
frames_per_buffer = CHUNK)
producer = Producer()
try:
print("start ...")
while True:
# 1秒分のデータ読み込み
data = stream.read(CHUNK)
# numpy配列に変換
data = np.frombuffer(data, dtype="int16")
# チャンネル 2ch -> 1ch
data = data[0::2]
# サンプルレート 32000Hz -> 8000Hz
data = data[0::4]
# byteに戻す
data = data.tobytes()
producer.send(data)
except:
stream.stop_stream()
stream.close()
p.terminate()
producer.py
from mqtt import Mqtt
import json
from datetime import datetime
import base64
class Producer():
def __init__(self):
self.__topic = "topic/audio_transmission"
root_ca = "./certs/RootCA.pem"
key = "./certs/xxxxxxxx-private.pem.key"
cert = "./certs/xxxxxxxx-certificate.pem.crt"
endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
self.__mqtt = Mqtt(root_ca, key, cert, endpoint)
def send(self, data):
now = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
b64_data = base64.b64encode(data)
raw_data = b64_data.decode()
payload = {
"timestamp": now,
"raw_data": raw_data
}
self.__mqtt.publish(self.__topic, json.dumps(payload))
IoT Coreのコンソールで、publishされたデータを確認している様子です。
5 Amazon Kinesis Data Firehose
Amazon Kinesis Data Firehoseは、audio_transmission_firehose_streamという名前で作成し、特に、コンバートや圧縮などは設定せず、destinationをS3バケットとしています。
なお、Buffer sizeは、10M、Buffer intervalは60secとしました。この設定により、1秒ごとに送られくるデータを、1分毎に1ファイルとしてS3に保存されることになります。
1秒分のデータは、約22Kなので、約1.4M(22k*60)以上のバッファにしておけば、Buffer Intervalが先にトリガーされるため、1分毎に出力されるという想定です。
もし、要件的に、あまりリアルタイム性が必要ない場合は、1ファイルに格納するデータ量を大きく取ることで、S3への書き込み回数は節約できることになります。
6 Rule
IoT Coreでは、ルールエンジンを設定して、メッセージブローカーに到着したデータをAmazon Kinesis Data Firehoseに送っています。
セパレータに「改行」を指定して、秒単位のデータを1行ごと分離できるようにしました。
7 S3
S3に出力されたデータです。年月日のプレフィックスと秒数まで入ったKey名で保存されていることが分かります。
1つのファイルを見ると、1秒ごとのデータが60行(60秒分)格納されているのが確認できます。ここから対象秒のRAWデータが取り出せます。
ここまでで、Audioデータの送信は完了となります。
8 Lambda
ここからは、S3に蓄積されたデータから、データを取り出す一例として、Lambdaを書いてみました。
この関数は、取得開始時間、期間(秒)及び、出力ファイル名を与えて実行します。
指定された時間から、S3上のオブジェクト名を検索し、期間に該当するものをダウンロードして、RAWデータをデコードしています。また、RAWデータは、ヘッダを追加してwavファイルとして、S3にアップロードされます。
lambda_function.py
import json
import datetime
import os
import boto3
import base64
import wave
def lambda_handler(event, context):
BUCKET_NAME = os.environ['BUCKET_NAME']
output_filename = event["output_filename"] # 出力ファイル名
start_time_str = event["start_time"] # 開始時間(YYYY-mm-dd HH:MM:SS)
period_sec = int(event["period_sec"]) # 期間(秒)
print("start_time:{} period_sec:{}".format(start_time_str, period_sec))
# 開始時間と終了時間(JST)
start_time_jst = datetime.datetime.strptime(start_time_str, '%Y-%m-%d %H:%M:%S')
end_time_jst = start_time_jst + datetime.timedelta(seconds=period_sec)
print("JST {} - {} ".format(start_time_jst, end_time_jst))
# S3上のオブジェクト名を検索するため
# 開始時間と終了時間(UTC) ファイル検索用なので、開始時間は、1分前とする
start_time_utc = start_time_jst + datetime.timedelta(hours=-9)
end_time_utc = start_time_utc + datetime.timedelta(seconds=period_sec)
start_time_utc = start_time_utc + datetime.timedelta(minutes=-1)
print("UTC {} - {}".format(start_time_utc, end_time_utc))
s3client = boto3.client('s3')
# 当日分のオブジェクト名を列挙する
prefix = "{:4d}/{:02d}/{:02d}".format(
start_time_utc.year,
start_time_utc.month,
start_time_utc.day
)
response = s3client.list_objects_v2(
Bucket = BUCKET_NAME,
Prefix = prefix
)
# オブジェクト名から、対象期間ヒットするオブジェクトを列挙する
target_keys = []
if("Contents" in response):
for content in response["Contents"]:
# オブジェクト名からtimestampを取得する
key = content["Key"]
tmp = key.split('/')[4].split('-')
year = int(tmp[2])
month = int(tmp[3])
day = int(tmp[4])
hour = int(tmp[5])
minute = int(tmp[6])
second = int(tmp[7])
dt = datetime.datetime(year, month, day, hour, minute, second, microsecond=0)
if(start_time_utc <= dt and dt <= end_time_utc):
target_keys.append(key)
target_keys.sort()
# オブジェクトをダウンロードして、timestampが取得期間ヒットするRAWデータを取得する
frames = []
for key in target_keys:
body = s3client.get_object(Bucket=BUCKET_NAME, Key=key)['Body'].read()
# 1行が、1秒分のデータとなっている
lines = body.decode().split('\n')
for line in lines:
if(line==''):
continue
s = json.loads(line)
timestamp = datetime.datetime.strptime(s["timestamp"], '%Y-%m-%dT%H:%M:%S')
# 取得期間のデータは、framesに追加する
if(start_time_jst <= timestamp and timestamp <= end_time_jst):
# テキストデータをバイナリに戻す
b64_data = s["raw_data"].encode()
frames.append(base64.b64decode(b64_data))
data = b''.join(frames)
# RAWデータをwavファイルとして保存する
CHANNELS = 1 # 1ch
SAMPLE_RATE = 8000 # 8kHz
SAMPLE_WIDTH = 2
tmp_file_name = "/tmp/tmp.wav"
wf = wave.open(tmp_file_name, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(SAMPLE_WIDTH)
wf.setframerate(SAMPLE_RATE)
wf.writeframes(data)
wf.close()
s3client.upload_file(tmp_file_name, BUCKET_NAME, output_filename)
return {}
AWSコンソールから、取得開始時間、期間及び、出力ファイル名を指定して関数を実行している様子です。
S3上に指定したファイル名でwavファイルがアップロードされます。
9 最後に
今回は、MQTTを使用してAudioデータをクラウドに送信する要領を試してみました。
Audioデータをテキスト化するなど、ちょっとオーバーヘッドな感じですが、一応、エッジ側のタイムスタンプが入っています。MQTTにJSON形式で送っているため、付随するアトリビュートがあれば、追加してルールで捌くなどの処理も可能だと思います。
全てのコードは下記に置きました
https://github.com/furuya02/AudioTransmission/tree/main/sample_1