この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
CX事業本部の平内(SIN)です。
前回、エッジ側のAudioデータをクラウドへ送信する要領を確認してみました。
上記は、MQTT -> Amazon Kinesis Data Firehose -> S3という形でしたが、今回は、単純にS3に保存する事だけが要件という想定で、IoT Coreを経由せず、直接 Amazon Kinesis Data Firehoseに送る要領を試してみました。
S3上の保存形式が変わらないようにしましたので、保存されたデータから時間を指定して、wavファイルとして取り出すLambdaは、前回のものが、そのまま利用可能です。
また、Audio入力も、前回と同じ、Webカメラ(C920)です。
2 エッジ側のコード
1秒に一回のタイミングで、AudioデータをAmazon Kinesis Data Firehoseに送信しているコードです。
サンプリングレートは、擬似的に32KHzから8Hzとし、チャンネルも1chに削除してますが、RAWデータは、テキスト化することで、16K/sec から 22K/sec程度になっています。
kinesisへのput_recordの際に、データの最後に、改行を入れることで、1秒のデータが1行になるようにしています。
権限は、サンプルということで、CognitoのIdentity Idの未認証のロールを使用しています。
index.py
import pyaudio
from producer import Producer
import numpy as np
producer = Producer()
DEVICE_INDEX = 0
CHANNELS = 2
SAMPLE_RATE = 32000 # サンプルレート
FORMAT = pyaudio.paInt16
CHUNK = SAMPLE_RATE # 1秒ごとに取得する
# open stream
p = pyaudio.PyAudio()
stream = p.open(format = FORMAT,
channels = CHANNELS,
rate = SAMPLE_RATE,
input = True,
input_device_index = DEVICE_INDEX,
frames_per_buffer = CHUNK)
try:
print("start ...")
while True:
# 1秒分のデータ読み込み
data = stream.read(CHUNK)
# numpy配列に変換
data = np.frombuffer(data, dtype="int16")
# チャンネル 2ch -> 1ch
data = data[0::2]
# サンプルレート 32000Hz -> 8000Hz
data = data[0::4]
# byteに戻す
data = data.tobytes()
# Amazon Kinesis Data Firehoseへの送信
producer.send(data)
except:
stream.stop_stream()
stream.close()
p.terminate()
producer.py
import json
from datetime import datetime
import base64
import boto3
from boto3.session import Session
import time
class Producer():
def __init__(self):
self.__identity_id = "ap-northeast-1:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
self.__region = "ap-northeast-1"
self.__delivery_stream_name = 'audio_transmission_firehose_stream'
self.__firehose = self.__get_firehose()
def __get_firehose(self):
client = boto3.client('cognito-identity', self.__region)
resp = client.get_id(IdentityPoolId = self.__identity_id)
resp = client.get_credentials_for_identity(IdentityId=resp['IdentityId'])
secretKey = resp['Credentials']['SecretKey']
accessKey = resp['Credentials']['AccessKeyId']
token = resp['Credentials']['SessionToken']
session = Session(aws_access_key_id = accessKey,
aws_secret_access_key = secretKey,
aws_session_token = token,
region_name = self.__region)
return session.client('firehose')
def send(self, data):
now = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
b64_data = base64.b64encode(data)
raw_data = b64_data.decode()
data = {
"timestamp": now,
"raw_data": raw_data
}
try:
response = self.__firehose.put_record(
DeliveryStreamName=self.__delivery_stream_name,
Record={
'Data': "{}\n".format(json.dumps(data))
}
)
except Exception as e:
print("Exception: {}", e.args)
print('put_record RecordId:{}'.format(response['RecordId']))
送信している様子です。
3 Amazon Kinesis Data Firehose
※ このセクションは、前回と同じ内容です。
Amazon Kinesis Data Firehoseは、audio_transmission_firehose_streamという名前で作成し、特に、コンバートや圧縮などは設定せず、destinationをS3バケットとしています。
なお、Buffer sizeは、10M、Buffer intervalは60secとしました。この設定により、1秒ごとに送られくるデータを、1分毎に1ファイルとしてS3に保存されることになります。
1秒分のデータは、約22Kなので、約1.4M(22k*60)以上のバッファにしておけば、Buffer Intervalが先にトリガーされるため、1分毎に出力されるという想定です。
もし、要件的に、あまりリアルタイム性が必要ない場合は、1ファイルに格納するデータ量を大きく取ることで、S3への書き込み回数は節約できることになります。
4 S3
※ このセクションは、前回と同じ内容です。
S3に出力されたデータです。年月日のプレフィックスと秒数まで入ったKey名で保存されていることが分かります。
60秒分のデータが格納された1つのファイルです。ここから対象秒のRAWデータが取り出せます。
1つのファイルを見ると、1秒ごとのデータが60行(60秒分)格納されているのが確認できます。ここから対象秒のRAWデータが取り出せます。
ここまでで、Audioデータの送信は完了となります。
5 Lambda
前回作成したLambdaが、そのまま利用可能です。
6 最後に
今回は、Amazon Kinesis Data Firehoseを使用してAudioデータをクラウドに送信する要領を試してみました。
最初に書いた通り、データに付随するアトリビュートなどが無く、単純にS3に保存するだけが要件ということであれば、これで十分かも知れません。
全てのコードは下記に置きました
https://github.com/furuya02/AudioTransmission/tree/main/sample_2