Audioデータをクラウドに送ってみました。 証明書を使用してAmazon Kinesis Data Streamsへ(Authorizing Direct Calls)

2021.04.25

1 はじめに

CX事業本部の平内(SIN)です。

前回、Audioデータをクラウドへ送信する要領の一つとして、エッジデバイスからAmazon Kinesis Data Streams へ送る要領を確認してみました。

上記では、権限付与のためCognitoのIdentity Idを使用しましたが、今回は、AWS IoT CoreのAuthorizing Direct Callsを使用して、証明書を使用して接続する要領を確認してみました。

2 構成

Audioデータ送信の内容は変化していませんが、AWSリソースへアクセスする権限をデバイスに保持した証明書で行っている点が変わっています。

3 AWS側の設定

既に証明書及び、秘密鍵がデバイスに配置されているとした場合、Authorizing Direct Callsを使用するためのAWS側の手順は、以下の3つです。

  • ロール作成
  • ロールエリアス作成
  • ポリシーへの権限付与

(1) ロール作成

最初に、付与したい権限のロールを作成します。 audio_transmission_roleという名前で、Kinesis Data Streams(audio_transmission_data_stream)へのPutRecord権限だけを付与したロールを作成しました。

また、このロールは、AWS IoTの(資格情報)サービスからAssumeRoleされるので、信頼元をcredentials.iot.amazonaws.comとします。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "credentials.iot.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

(2) ロールエリアス作成

上記のロールを選択して、audio_transmission_role_aliasという名前でロールエリアスを作成します。

(3) ポリシーへの権限付与

証明書のポリシーには、iot:AssumeRoleWithCertificateを追加します。もし、この証明書で、MQTT接続しないのであれば、下記のように、iot:connectとか無くても大丈夫です

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "iot:AssumeRoleWithCertificate",
      "Resource": "arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:rolealias/audio_transmission_role_alias"
    }
  ]
}

4 エッジ側のコード

500ms毎にAmazon Kinesis Data StreamsにAudioデータを送信しているコードです。

Kinesisのクライアント生成前にSTSでトークンを取得しています。

なお、ここでのendpointは、IoT Coreの通常のエンドポイントでは無く、下記のように取得したものとなっています。

% aws iot describe-endpoint --endpoint-type iot:CredentialProvider --p profileName
{
    "endpointAddress": "xxxxxxxxx.credentials.iot.ap-northeast-1.amazonaws.com"
}

すいません、トークンの有効期限が切れた場合、再取得が必要ですが、下記のコードは、そこが顧慮されていないことをご了承ください。

index.py

import pyaudio
from producer import Producer
import numpy as np

DEVICE_INDEX = 0
CHANNELS = 2 
SAMPLE_RATE = 32000 # サンプルレート
FORMAT = pyaudio.paInt16
CHUNK = int(SAMPLE_RATE/2) # 500msごとに取得する

# open stream
p = pyaudio.PyAudio()
stream = p.open(format = FORMAT,
                channels = CHANNELS,
                rate = SAMPLE_RATE,
                input =  True,
                input_device_index = DEVICE_INDEX,
                frames_per_buffer = CHUNK)

producer = Producer()

try:
    print("start ...")
    while True: 

        # 500ms分のデータ読み込み
        data = stream.read(CHUNK)

        # numpy配列に変換    
        data = np.frombuffer(data, dtype="int16")
        # チャンネル 2ch -> 1ch
        data = data[0::2] 
        # サンプルレート  32000Hz -> 8000Hz
        data = data[0::4] 
        # byteに戻す 
        data = data.tobytes()

        producer.send(data)
except:
    stream.stop_stream()
    stream.close()
    p.terminate()

producer.py

from mqtt import Mqtt
import json
from datetime import datetime
import struct
import requests
import boto3
import random

class Producer():
    def __init__(self):

        self.__cert = "./certs/xxxxxxxx-certificate.pem.crt"
        self.__key = "./certs/xxxxxxxx-private.pem.key"
        self.__endpoint = "https://xxxxxxxxxxxx.credentials.iot.ap-northeast-1.amazonaws.com"
        self.__role_alias = 'audio_transmission_role_alias'

        self.__stream_name = 'audio_transmission_data_stream'
        self.__region = 'ap-northeast-1'
        self.__kinesis = self.__get_kinesis()

    def __get_kinesis(self):
        result = requests.get(
            '{}/role-aliases/{}/credentials'.format(self.__endpoint, self.__role_alias),
            cert=(self.__cert, self.__key)
        )
        print(result)
        if(result.status_code != 200):
            exit()

        body = json.loads(result.text)
        access_key = body["credentials"]["accessKeyId"]
        secret_key = body["credentials"]["secretAccessKey"]
        token = body["credentials"]["sessionToken"]

        session = boto3.Session(aws_access_key_id=access_key,
                    aws_secret_access_key=secret_key,
                    aws_session_token=token,
                    region_name = self.__region)

        return session.client('kinesis')

    def send(self, data):
        now = datetime.now()
        # 時刻とデータの結合
        ts = now.timestamp()
        ts = struct.pack('<d', ts)
        transfer_data = ts + data
        try:
            response = self.__kinesis.put_record(
                StreamName = self.__stream_name, 
                PartitionKey = str(random.randrange(0,100)), 
                Data = transfer_data
            )
        except Exception as e:
            print("Exception: {}", e.args)
        print('put_record SequenceNumber:{}'.format(response['SequenceNumber']))

5 最後に

今回は、Authorizing Direct Callsによって、証明書だけで、Amazon Kinesis Data Streamsにデータ送信する要領を確認してみました。

AWSのリソースへ直接アクセスする場合でも、エッジ側が、Cognitoの認証に馴染まないような場合は、このような方法も検討できるのではないでしょうか。

全てのコードは下記に置きました
https://github.com/furuya02/AudioTransmission/tree/main/sample_6