Audioデータをクラウドに送ってみました。 Amazon Kinesis Data Firehose + S3 (JSON)

2021.04.17

1 はじめに

CX事業本部の平内(SIN)です。

前回、エッジ側のAudioデータをクラウドへ送信する要領を確認してみました。

上記は、MQTT -> Amazon Kinesis Data Firehose -> S3という形でしたが、今回は、単純にS3に保存する事だけが要件という想定で、IoT Coreを経由せず、直接 Amazon Kinesis Data Firehoseに送る要領を試してみました。

S3上の保存形式が変わらないようにしましたので、保存されたデータから時間を指定して、wavファイルとして取り出すLambdaは、前回のものが、そのまま利用可能です。

また、Audio入力も、前回と同じ、Webカメラ(C920)です。

2 エッジ側のコード

1秒に一回のタイミングで、AudioデータをAmazon Kinesis Data Firehoseに送信しているコードです。

サンプリングレートは、擬似的に32KHzから8Hzとし、チャンネルも1chに削除してますが、RAWデータは、テキスト化することで、16K/sec から 22K/sec程度になっています。

kinesisへのput_recordの際に、データの最後に、改行を入れることで、1秒のデータが1行になるようにしています。

権限は、サンプルということで、CognitoのIdentity Idの未認証のロールを使用しています。

index.py

import pyaudio
from producer import Producer
import numpy as np

producer = Producer()

DEVICE_INDEX = 0
CHANNELS = 2 
SAMPLE_RATE = 32000 # サンプルレート
FORMAT = pyaudio.paInt16
CHUNK = SAMPLE_RATE # 1秒ごとに取得する

# open stream
p = pyaudio.PyAudio()
stream = p.open(format = FORMAT,
                channels = CHANNELS,
                rate = SAMPLE_RATE,
                input =  True,
                input_device_index = DEVICE_INDEX,
                frames_per_buffer = CHUNK)

try:
    print("start ...")
    while True: 

        # 1秒分のデータ読み込み
        data = stream.read(CHUNK)

        # numpy配列に変換    
        data = np.frombuffer(data, dtype="int16")
        # チャンネル 2ch -> 1ch
        data = data[0::2] 
        # サンプルレート  32000Hz -> 8000Hz
        data = data[0::4] 
        # byteに戻す 
        data = data.tobytes()

        # Amazon Kinesis Data Firehoseへの送信
        producer.send(data)
except:
    stream.stop_stream()
    stream.close()
    p.terminate()

producer.py

import json
from datetime import datetime
import base64
import boto3
from boto3.session import Session
import time

class Producer():
    def __init__(self):
        self.__identity_id = "ap-northeast-1:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
        self.__region = "ap-northeast-1"
        self.__delivery_stream_name = 'audio_transmission_firehose_stream'
        self.__firehose = self.__get_firehose()
    
    def __get_firehose(self):
        client = boto3.client('cognito-identity', self.__region)
        resp =  client.get_id(IdentityPoolId = self.__identity_id)
        resp = client.get_credentials_for_identity(IdentityId=resp['IdentityId'])
        secretKey = resp['Credentials']['SecretKey']
        accessKey = resp['Credentials']['AccessKeyId']
        token = resp['Credentials']['SessionToken']
        session = Session(aws_access_key_id = accessKey,
                  aws_secret_access_key = secretKey,
                  aws_session_token = token,
                  region_name = self.__region)
        return session.client('firehose')

    def send(self, data):
        now = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
        
        b64_data = base64.b64encode(data)
        raw_data = b64_data.decode()

        data = {
            "timestamp": now,
            "raw_data": raw_data
        }

        try:
            response = self.__firehose.put_record(
                DeliveryStreamName=self.__delivery_stream_name,
                Record={
                    'Data': "{}\n".format(json.dumps(data))
                }
            )

        except Exception as e:
            print("Exception: {}", e.args)
        print('put_record RecordId:{}'.format(response['RecordId']))

送信している様子です。

3 Amazon Kinesis Data Firehose

※ このセクションは、前回と同じ内容です。

Amazon Kinesis Data Firehoseは、audio_transmission_firehose_streamという名前で作成し、特に、コンバートや圧縮などは設定せず、destinationをS3バケットとしています。

なお、Buffer sizeは、10MBuffer interval60secとしました。この設定により、1秒ごとに送られくるデータを、1分毎に1ファイルとしてS3に保存されることになります。

1秒分のデータは、約22Kなので、約1.4M(22k*60)以上のバッファにしておけば、Buffer Intervalが先にトリガーされるため、1分毎に出力されるという想定です。

もし、要件的に、あまりリアルタイム性が必要ない場合は、1ファイルに格納するデータ量を大きく取ることで、S3への書き込み回数は節約できることになります。

4 S3

※ このセクションは、前回と同じ内容です。

S3に出力されたデータです。年月日のプレフィックスと秒数まで入ったKey名で保存されていることが分かります。

60秒分のデータが格納された1つのファイルです。ここから対象秒のRAWデータが取り出せます。

1つのファイルを見ると、1秒ごとのデータが60行(60秒分)格納されているのが確認できます。ここから対象秒のRAWデータが取り出せます。

ここまでで、Audioデータの送信は完了となります。

5 Lambda

前回作成したLambdaが、そのまま利用可能です。

6 最後に

今回は、Amazon Kinesis Data Firehoseを使用してAudioデータをクラウドに送信する要領を試してみました。

最初に書いた通り、データに付随するアトリビュートなどが無く、単純にS3に保存するだけが要件ということであれば、これで十分かも知れません。

全てのコードは下記に置きました
https://github.com/furuya02/AudioTransmission/tree/main/sample_2