Audioデータをクラウドに送ってみました。 Amazon Kinesis Data Streams + Amazon Kinesis Data Firehose + S3 (JSON)

2021.04.17

1 はじめに

CX事業本部の平内(SIN)です。

ここまで、エッジ側のAudioデータをクラウドへ送信する要領をいくつか確認してみました。

上記は、Amazon Kinesis Data Firehose -> S3という形で、主にAudioデータをクラウド上に保存することが目的になってましたが、今回は、リアルタイム分析などが要件に含まれた場合を想定してしみました。

2 構成

リアルタイムで処理したいと言うことで、Amazon Kinesis Data Firehoseの前に、Amazon Kinesis Data Streamsを入れて、Lambdaで処理してみました。

Amazon Kinesis Data StreamsをトリガーとするLambdaでは、RAWデータからボリューム値を取得し、確認できるようにTopic経由で、ブラウザに表示してみました。

右下のLambdaは、S3に溜まったデータから、wavファイルを生成するもので、前回までのものと同じです。

動作している様子です。

iTerm2のコンソールは、RasPi上でAudio入力をAmazon Kinesis Data Streamsに送信しているプログラムです。このプログラムが動作すると、トリガーされたLambdaがMQTTをpublishし、その値によって、ブラウザのメーターが変化していることが確認できます。

なお、Audio入力に使用しているのは、前回と同じ、Webカメラ(C920)です。

3 エッジ側のコード

前回まで、1秒に一回のタイミングで、Audioデータを送信していましたが、やや、リアルタイム性を上げるという意味で、250ms毎に、Amazon Kinesis Data Streamsに送信することにしました。

サンプルレートは、擬似的に32KHzから8Hzとし、チャンネルも1chに削除されています。(実際の送信データは、テキスト化することで、約1.3倍に増えています)

また、put_recordの際に、データの最後に、改行を入れることで、250msのデータが1行になるようにしています。

index.py

import pyaudio
from producer import Producer
import numpy as np

producer = Producer()

DEVICE_INDEX = 0
CHANNELS = 2 
SAMPLE_RATE = 32000 # サンプルレート
FORMAT = pyaudio.paInt16
CHUNK = int(SAMPLE_RATE/4) # 250msごとに取得する

p = pyaudio.PyAudio()
stream = p.open(format = FORMAT,
                channels = CHANNELS,
                rate = SAMPLE_RATE,
                input =  True,
                input_device_index = DEVICE_INDEX,
                frames_per_buffer = CHUNK)

try:
    print("start ...")
    while True: 
        # 250ms秒分のデータ読み込み
        data = stream.read(CHUNK)

        # numpy配列に変換    
        data = np.frombuffer(data, dtype="int16")
        # チャンネル 2ch -> 1ch
        data = data[0::2] 
        # サンプルレート  32000Hz -> 8000Hz
        data = data[0::4] 
        # byteに戻す 
        data = data.tobytes()

        # Amazon Kinesis Data Streamsへの送信
        producer.send(data)

except:
    stream.stop_stream()
    stream.close()
    p.terminate()

producer.py

import json
from datetime import datetime
import base64
import boto3
from boto3.session import Session
import time
import random

class Producer():
    def __init__(self):
        self.__identity_id = "ap-northeast-1:xxxxxxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
        self.__region = "ap-northeast-1"
        self.__stream_name = 'audio_transmission_data_stream'
        self.__kinesis = self.__get_kinesis()
    
    def __get_kinesis(self):
        client = boto3.client('cognito-identity', self.__region)
        resp =  client.get_id(IdentityPoolId = self.__identity_id)
        resp = client.get_credentials_for_identity(IdentityId=resp['IdentityId'])
        secretKey = resp['Credentials']['SecretKey']
        accessKey = resp['Credentials']['AccessKeyId']
        token = resp['Credentials']['SessionToken']
        session = Session(aws_access_key_id = accessKey,
                  aws_secret_access_key = secretKey,
                  aws_session_token = token,
                  region_name = self.__region)
        return session.client('kinesis')

    def send(self, data):
        now = datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')
        
        b64_data = base64.b64encode(data)
        raw_data = b64_data.decode()

        data = {
            "timestamp": now,
            "raw_data": raw_data
        }

        try:
            response = self.__kinesis.put_record(
                StreamName = self.__stream_name, 
                PartitionKey = str(random.randrange(0,100)), 
                Data="{}\n".format(json.dumps(data))
            )

        except Exception as e:
            print("Exception: {}", e.args)
        print('put_record SequenceNumber:{}'.format(response['SequenceNumber']))

4 Amazon Kinesis Data Streams

audio_transmission_data_streamという名前で作成しました。

データは、1秒に約22Kなので、シャードは、1となっています。

5 Lambda(リアルタイムデータ処理)

Amazon Kinesis Data StreamsをトリガーするLambdaのコードです。

複数のレコードを処理するようには、書かれていますが、後で示す通り、このLambdaは、1レコード到着ごとに呼ばれます。

レコードからRAWデータを取り出して、最大値(ボリューム)を取得して、IoT Coreのトピックに送っています。

import json
import datetime
import os
import boto3
import base64
import numpy as np

def lambda_handler(event, context):

    topic = 'topic/level_meter'
    iot = boto3.client('iot-data')

    records = event["Records"]
    for record in records:
        payload = json.loads(base64.b64decode(record["kinesis"]["data"]).decode())
        timestamp = payload["timestamp"]
        print("timestamp: {}".format(timestamp))

        # テキストデータをバイナリに戻す
        b64_data = payload["raw_data"].encode()
        data = base64.b64decode(b64_data)
        print("len:{}".format(len(data)))
        # 1データを2byteとして扱う
        data = np.frombuffer(data, dtype="int16")
        # 最大値を取得する
        max = data.max()

        payload = {
            "timestamp": payload["timestamp"],
            "level": int(max)
        }
        print("payload:{}".format(payload))
        iot.publish(
            topic=topic,
            qos=0,
            payload=json.dumps(payload)
        )

    return {}

トリガーの設定は、バッチサイズを1とし、1レコード到着するごとに、Lambdaが起動されるようになっています。

6 ブラウザ

ブラウザでは、Epochという、チャート表示用のライブラリを使用させて頂いてます。

aws-iot-sdk-browser-bundle.jsで、トピックをSubscribeして表示を更新しています。

<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>TITLE</title>
<script type="text/javascript" src="https://code.jquery.com/jquery-3.4.1.min.js"></script>
<script src="http://d3js.org/d3.v3.min.js" charset="utf-8"></script>
<script src="https://sdk.amazonaws.com/js/aws-sdk-2.283.1.min.js"></script>
<script src="js/aws-iot-sdk-browser-bundle.js"></script>
<script src="js/epoch.js"></script>
<link rel="stylesheet" type="text/css" href="css/epoch.css">
</head>
<body>
	<div id="gaugeChart" class="epoch gauge-large"></div>
	<script>
		var chart = $('#gaugeChart').epoch({
					type: 'time.gauge',
					value: 0.0,
					format: function(v) { return (v*100).toFixed(1); }
				}),
				playing = false,
				interval = null;

		var awsIot = require('aws-iot-device-sdk');

		const PoolId = 'ap-northeast-1:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx';
		const region = 'ap-northeast-1';

		async function getCredentials() {
		  AWS.config.region = region;
		  var cognitoidentity = new AWS.CognitoIdentity();
		  var params = {
			IdentityPoolId: PoolId
		  };
		  const identityId = await cognitoidentity.getId(params).promise();
		  const data = await cognitoidentity.getCredentialsForIdentity(identityId).promise();
			  var credentials = {
				accessKey: data.Credentials.AccessKeyId,
				secretKey: data.Credentials.SecretKey,
				sessionToken: data.Credentials.SessionToken
			  };
			  return credentials;
		}

		const topic = "topic/level_meter";

		async function job() {
			const endpoint = 'xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com';
			const clientId = 'sample_id'

			const credentials = await getCredentials(); // Credentialsの取得

			const deviceIot = awsIot.device({
				region: region,
				clientId: clientId,
				accessKeyId: credentials.accessKey,
				secretKey: credentials.secretKey,
				sessionToken : credentials.sessionToken,
				protocol: 'wss',
				port: 443,
				host: endpoint
			});

			deviceIot.on('message', function(_topic, payload) {
				level = JSON.parse(payload)["level"]
				level = level/30000
				console.log('level: ' + level);
				chart.update(level)
			});

			deviceIot.subscribe(topic, undefined, function (err, granted){
				if( err ){
					console.log('subscribe error: ' + err);
				} else {
					console.log('subscribe success');
				}
			});
		  }

		job();

	</script>

</body>
</html>

7 その他

Amazon Kinesis Data Firehoseの入力をAmazon Kinesis Data Streamsに変えた以外、S3への保存要領や、保存されたデータからwavファイルを生成するLambdaなど、前々回前回と同じです。

8 最後に

Amazon Kinesis Data Streamsを挟んで、リアルタイムな処理について確認してみました。

次回は、データをテキスト化するオーバーヘッドを外して、バイナリで送信する要領を確認してみたいと思います。もう少し速度が出せるかも知れません。

全てのコードは下記に置きました
https://github.com/furuya02/AudioTransmission/tree/main/sample_3