Audioデータをクラウドに送ってみました。 Amazon Kinesis Data Streams + Amazon Kinesis Data Firehose + S3 (JSON)
1 はじめに
CX事業本部の平内(SIN)です。
ここまで、エッジ側のAudioデータをクラウドへ送信する要領をいくつか確認してみました。
上記は、Amazon Kinesis Data Firehose -> S3という形で、主にAudioデータをクラウド上に保存することが目的になってましたが、今回は、リアルタイム分析などが要件に含まれた場合を想定してしみました。
2 構成
リアルタイムで処理したいと言うことで、Amazon Kinesis Data Firehoseの前に、Amazon Kinesis Data Streamsを入れて、Lambdaで処理してみました。
Amazon Kinesis Data StreamsをトリガーとするLambdaでは、RAWデータからボリューム値を取得し、確認できるようにTopic経由で、ブラウザに表示してみました。
右下のLambdaは、S3に溜まったデータから、wavファイルを生成するもので、前回までのものと同じです。
動作している様子です。
iTerm2のコンソールは、RasPi上でAudio入力をAmazon Kinesis Data Streamsに送信しているプログラムです。このプログラムが動作すると、トリガーされたLambdaがMQTTをpublishし、その値によって、ブラウザのメーターが変化していることが確認できます。
なお、Audio入力に使用しているのは、前回と同じ、Webカメラ(C920)です。
3 エッジ側のコード
前回まで、1秒に一回のタイミングで、Audioデータを送信していましたが、やや、リアルタイム性を上げるという意味で、250ms毎に、Amazon Kinesis Data Streamsに送信することにしました。
サンプルレートは、擬似的に32KHzから8Hzとし、チャンネルも1chに削除されています。(実際の送信データは、テキスト化することで、約1.3倍に増えています)
また、put_recordの際に、データの最後に、改行を入れることで、250msのデータが1行になるようにしています。
index.py
import pyaudio from producer import Producer import numpy as np producer = Producer() DEVICE_INDEX = 0 CHANNELS = 2 SAMPLE_RATE = 32000 # サンプルレート FORMAT = pyaudio.paInt16 CHUNK = int(SAMPLE_RATE/4) # 250msごとに取得する p = pyaudio.PyAudio() stream = p.open(format = FORMAT, channels = CHANNELS, rate = SAMPLE_RATE, input = True, input_device_index = DEVICE_INDEX, frames_per_buffer = CHUNK) try: print("start ...") while True: # 250ms秒分のデータ読み込み data = stream.read(CHUNK) # numpy配列に変換 data = np.frombuffer(data, dtype="int16") # チャンネル 2ch -> 1ch data = data[0::2] # サンプルレート 32000Hz -> 8000Hz data = data[0::4] # byteに戻す data = data.tobytes() # Amazon Kinesis Data Streamsへの送信 producer.send(data) except: stream.stop_stream() stream.close() p.terminate()
producer.py
import json from datetime import datetime import base64 import boto3 from boto3.session import Session import time import random class Producer(): def __init__(self): self.__identity_id = "ap-northeast-1:xxxxxxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" self.__region = "ap-northeast-1" self.__stream_name = 'audio_transmission_data_stream' self.__kinesis = self.__get_kinesis() def __get_kinesis(self): client = boto3.client('cognito-identity', self.__region) resp = client.get_id(IdentityPoolId = self.__identity_id) resp = client.get_credentials_for_identity(IdentityId=resp['IdentityId']) secretKey = resp['Credentials']['SecretKey'] accessKey = resp['Credentials']['AccessKeyId'] token = resp['Credentials']['SessionToken'] session = Session(aws_access_key_id = accessKey, aws_secret_access_key = secretKey, aws_session_token = token, region_name = self.__region) return session.client('kinesis') def send(self, data): now = datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f') b64_data = base64.b64encode(data) raw_data = b64_data.decode() data = { "timestamp": now, "raw_data": raw_data } try: response = self.__kinesis.put_record( StreamName = self.__stream_name, PartitionKey = str(random.randrange(0,100)), Data="{}\n".format(json.dumps(data)) ) except Exception as e: print("Exception: {}", e.args) print('put_record SequenceNumber:{}'.format(response['SequenceNumber']))
4 Amazon Kinesis Data Streams
audio_transmission_data_streamという名前で作成しました。
データは、1秒に約22Kなので、シャードは、1となっています。
5 Lambda(リアルタイムデータ処理)
Amazon Kinesis Data StreamsをトリガーするLambdaのコードです。
複数のレコードを処理するようには、書かれていますが、後で示す通り、このLambdaは、1レコード到着ごとに呼ばれます。
レコードからRAWデータを取り出して、最大値(ボリューム)を取得して、IoT Coreのトピックに送っています。
import json import datetime import os import boto3 import base64 import numpy as np def lambda_handler(event, context): topic = 'topic/level_meter' iot = boto3.client('iot-data') records = event["Records"] for record in records: payload = json.loads(base64.b64decode(record["kinesis"]["data"]).decode()) timestamp = payload["timestamp"] print("timestamp: {}".format(timestamp)) # テキストデータをバイナリに戻す b64_data = payload["raw_data"].encode() data = base64.b64decode(b64_data) print("len:{}".format(len(data))) # 1データを2byteとして扱う data = np.frombuffer(data, dtype="int16") # 最大値を取得する max = data.max() payload = { "timestamp": payload["timestamp"], "level": int(max) } print("payload:{}".format(payload)) iot.publish( topic=topic, qos=0, payload=json.dumps(payload) ) return {}
トリガーの設定は、バッチサイズを1とし、1レコード到着するごとに、Lambdaが起動されるようになっています。
6 ブラウザ
ブラウザでは、Epochという、チャート表示用のライブラリを使用させて頂いてます。
aws-iot-sdk-browser-bundle.jsで、トピックをSubscribeして表示を更新しています。
<!DOCTYPE html> <html> <head> <meta charset="utf-8"> <title>TITLE</title> <script type="text/javascript" src="https://code.jquery.com/jquery-3.4.1.min.js"></script> <script src="http://d3js.org/d3.v3.min.js" charset="utf-8"></script> <script src="https://sdk.amazonaws.com/js/aws-sdk-2.283.1.min.js"></script> <script src="js/aws-iot-sdk-browser-bundle.js"></script> <script src="js/epoch.js"></script> <link rel="stylesheet" type="text/css" href="css/epoch.css"> </head> <body> <div id="gaugeChart" class="epoch gauge-large"></div> <script> var chart = $('#gaugeChart').epoch({ type: 'time.gauge', value: 0.0, format: function(v) { return (v*100).toFixed(1); } }), playing = false, interval = null; var awsIot = require('aws-iot-device-sdk'); const PoolId = 'ap-northeast-1:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'; const region = 'ap-northeast-1'; async function getCredentials() { AWS.config.region = region; var cognitoidentity = new AWS.CognitoIdentity(); var params = { IdentityPoolId: PoolId }; const identityId = await cognitoidentity.getId(params).promise(); const data = await cognitoidentity.getCredentialsForIdentity(identityId).promise(); var credentials = { accessKey: data.Credentials.AccessKeyId, secretKey: data.Credentials.SecretKey, sessionToken: data.Credentials.SessionToken }; return credentials; } const topic = "topic/level_meter"; async function job() { const endpoint = 'xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'; const clientId = 'sample_id' const credentials = await getCredentials(); // Credentialsの取得 const deviceIot = awsIot.device({ region: region, clientId: clientId, accessKeyId: credentials.accessKey, secretKey: credentials.secretKey, sessionToken : credentials.sessionToken, protocol: 'wss', port: 443, host: endpoint }); deviceIot.on('message', function(_topic, payload) { level = JSON.parse(payload)["level"] level = level/30000 console.log('level: ' + level); chart.update(level) }); deviceIot.subscribe(topic, undefined, function (err, granted){ if( err ){ console.log('subscribe error: ' + err); } else { console.log('subscribe success'); } }); } job(); </script> </body> </html>
7 その他
Amazon Kinesis Data Firehoseの入力をAmazon Kinesis Data Streamsに変えた以外、S3への保存要領や、保存されたデータからwavファイルを生成するLambdaなど、前々回、前回と同じです。
8 最後に
Amazon Kinesis Data Streamsを挟んで、リアルタイムな処理について確認してみました。
次回は、データをテキスト化するオーバーヘッドを外して、バイナリで送信する要領を確認してみたいと思います。もう少し速度が出せるかも知れません。
全てのコードは下記に置きました
https://github.com/furuya02/AudioTransmission/tree/main/sample_3