Amazon ConnectとKinesis Video Streamsを利用した音声データの録音と保存(「留守番電話」や「AIチャットボット」で利用)

2023.10.24

はじめに

Amazon Connectでエージェントが介在しない「留守番電話」や「AIチャットボット」で録音したい場合、Kinesis Video Streams(以降、KVS)経由でAWS Lambdaを使い音声データの録音と保存する方法をまとめました。

Amazon Connectでは、下記のコンタクフローのブロックで録音できますが、録音条件は、顧客とエージェントが繋がってからのみ録音されます。

エージェントが介在しない、「留守番電話」やAmazon Lexと組み合わせた「AIチャットボット」の場合、録音機能は利用できません。

解決策として、コンタクフロー内で「メディアストリーミングの開始」というブロックを利用し、KVSにメディアデータを保存できます。ここで注意が必要なのは、保存されたメディアデータがMatroska(MKV)形式となるため、一般的な形式であるWAVなどの形式に変換する必要があります。

Matroska(以降、MKV)は、動画や音声、字幕などのマルチメディアデータを一つのファイルに格納できる、コンテナフォーマットです。

今回は、KVSに保存された際、AWS Lambdaを使い、MKVからWAV形式に変換し、S3バケットに保存方法を説明します。

構成

システムの流れは次の通りです。

  1. コンタクトフロー内で「メディアストリーミングの開始」ブロックを使って、KVSへの音声のストリーミングを開始します。
  2. 「顧客の入力を保存する」ブロックで、顧客が特定の番号を押すと、ストリーミングを終了します。
  3. 「AWS Lambda関数を呼び出す」ブロックを使い、LambdaでKVSからデータを取得します。取得したデータをWAV形式に変換し、S3バケットに音声ファイルを保存します。

これによって、コンタクフローを調整し「留守番電話」として利用することができます。

また、「顧客の入力を保存する」ではなく、Lexを利用することで、AIチャットボットでの会話中は全て録音することが可能です。

注意点

今回の構成では、「AWS Lambda関数を呼び出す」ブロックまで処理が進む場合のみ、音声データをS3バケットに保存できます。

もし特定の条件の顧客だけを録音する(例えば、フローを適切に処理している顧客のみ)必要がある場合は、今回の構成が適しています。

逆に言うと、その前に電話が終了してしまうと、保存されません。

例えば、「顧客の入力を保存する」ブロックで、何も押さずに電話を途中で切った場合などが挙げられます。

その場合でも顧客からの電話を録音する必要がある場合は、別途Kinesis Data Streamを利用した構成に変更する必要があります。

下記の記事に方法をまとめていますので、ご参考ください。

Kinesis Data Streamを利用した場合の構成としては、下記の通りです。

通話が終了すると、問い合わせ記録がKinesis Data Streamにエクスポートされ、Kinesis Data StreamによってLambdaをトリガーし、KVSからデータを取得し、ファイル変換後S3バケットに録音ファイルを保存します。

Lambda作成

まず、ランタイムPython3.11で作成します。

IAMポリシーは、以下を追加しました。

  • AmazonKinesisVideoStreamsReadOnlyAccess
  • AmazonS3FullAccess

コードは下記の通りです。

bucket_nameのみ、音声ファイルを保存するバケット名に変更してください。

2023年11月6日追記

KVSでメディアデータを取得する際、GetMedia APIを利用したコードを紹介しましたが、GetMediaForFragmentList APIを利用するべきです。

GetMediaForFragmentList APIを利用すべき理由は、下記の記事をご参照ください

import boto3, io, struct,json
from ebmlite import loadSchema
from enum import Enum
from datetime import datetime, timedelta
from botocore.config import Config
from decimal import Decimal

JST_OFFSET = timedelta(hours=9)

def decimal_to_int(obj):
    if isinstance(obj, Decimal):
        return int(obj)

class Mkv(Enum):
    SEGMENT = 0x18538067
    CLUSTER = 0x1F43B675
    SIMPLEBLOCK = 0xA3

class Ebml(Enum):
    EBML = 0x1A45DFA3

class KVSParser:
    def __init__(self, media_content):
        self.__stream = media_content["Payload"]
        self.__schema = loadSchema("matroska.xml")
        self.__buffer = bytearray()

    @property
    def fragments(self):
        return [fragment for chunk in self.__stream if (fragment := self.__parse(chunk))]

    def __parse(self, chunk):
        self.__buffer.extend(chunk)
        header_elements = [e for e in self.__schema.loads(self.__buffer) if e.id == Ebml.EBML.value]
        if header_elements:
            fragment_dom = self.__schema.loads(self.__buffer[:header_elements[0].offset])
            self.__buffer = self.__buffer[header_elements[0].offset:]
            return fragment_dom

def get_simple_blocks(media_content):
    parser = KVSParser(media_content)
    return [b.value for document in parser.fragments for b in 
            next(filter(lambda c: c.id == Mkv.CLUSTER.value, 
                        next(filter(lambda s: s.id == Mkv.SEGMENT.value, document)))) 
            if b.id == Mkv.SIMPLEBLOCK.value]

def create_audio_sample(simple_blocks, margin=4):
    position = 0
    total_length = sum(len(block) - margin for block in simple_blocks)
    combined_samples = bytearray(total_length)
    for block in simple_blocks:
        temp = block[margin:]
        combined_samples[position:position+len(temp)] = temp
        position += len(temp)
    return combined_samples

def convert_bytearray_to_wav(samples):
    length = len(samples)
    channel = 1
    bit_par_sample = 16
    format_code = 1
    sample_rate = 8000
    header_size = 44
    wav = bytearray(header_size + length)
    
    wav[0:4] = b"RIFF"
    wav[4:8] = struct.pack("<I", 36 + length)
    wav[8:12] = b"WAVE"
    wav[12:16] = b"fmt "
    wav[16:20] = struct.pack("<I", 16)
    wav[20:22] = struct.pack("<H", format_code)
    wav[22:24] = struct.pack("<H", channel)
    wav[24:28] = struct.pack("<I", sample_rate)
    wav[28:32] = struct.pack("<I", sample_rate * channel * bit_par_sample // 8)
    wav[32:34] = struct.pack("<H", channel * bit_par_sample // 8)
    wav[34:36] = struct.pack("<H", bit_par_sample)
    wav[36:40] = b"data"
    wav[40:44] = struct.pack("<I", length)
    
    wav[44:] = samples
    return wav

def create_kvs_client():
    region_name = "ap-northeast-1"
    return boto3.client("kinesisvideo", region_name=region_name)

def create_archive_media_client(ep):
    region_name = "ap-northeast-1"
    return boto3.client("kinesis-video-archived-media", endpoint_url=ep, config=Config(region_name=region_name))

def upload_audio_to_s3(bucket_name, audio_data, filename):
    region_name = "ap-northeast-1"
    s3_client = boto3.client("s3", region_name=region_name)
    s3_client.upload_fileobj(io.BytesIO(audio_data), bucket_name, filename, ExtraArgs={"ContentType": "audio/wav"})

def get_media_data(arn, start_timestamp, end_timestamp):
    kvs_client = create_kvs_client()

    list_frags_ep = kvs_client.get_data_endpoint(StreamARN=arn, APIName="LIST_FRAGMENTS")["DataEndpoint"]
    list_frags_client = create_archive_media_client(list_frags_ep)

    fragment_list = list_frags_client.list_fragments(
        StreamARN = arn, 
        FragmentSelector = {
            "FragmentSelectorType": "PRODUCER_TIMESTAMP",
            "TimestampRange": {"StartTimestamp": datetime.fromtimestamp(start_timestamp), "EndTimestamp": datetime.fromtimestamp(end_timestamp)}
        }
    )

    sorted_fragments = sorted(fragment_list["Fragments"], key = lambda fragment: fragment["ProducerTimestamp"])
    fragment_number_array = [fragment["FragmentNumber"] for fragment in sorted_fragments]

    get_media_ep = kvs_client.get_data_endpoint(StreamARN=arn, APIName="GET_MEDIA_FOR_FRAGMENT_LIST")["DataEndpoint"]
    get_media_client = create_archive_media_client(get_media_ep)

    media = get_media_client.get_media_for_fragment_list(StreamARN = arn, Fragments = fragment_number_array)
    return media

def save_audio_to_tmp(wav_audio, filename="output.wav"):
    with open(f"/tmp/{filename}", "wb") as out_file:
        out_file.write(wav_audio)

def lambda_handler(event, context):
    print('Received event:' + json.dumps(event,default=decimal_to_int, ensure_ascii=False))
    media_streams = event["Details"]["ContactData"]["MediaStreams"]["Customer"]["Audio"]
    stream_arn = media_streams["StreamARN"]
    start_time = float(media_streams["StartTimestamp"]) / 1000
    end_time = float(media_streams["StopTimestamp"]) / 1000
    combined_samples = create_audio_sample(
        get_simple_blocks(get_media_data(stream_arn, start_time, end_time)))

    wav_audio = convert_bytearray_to_wav(combined_samples)
    bucket_name = "バケット名"
    jst_time = datetime.utcnow() + JST_OFFSET
    filename = "output_{date}.wav".format(date=jst_time.strftime("%Y%m%d_%H%M%S"))

    upload_audio_to_s3(bucket_name, wav_audio, filename)

    return {
        "statusCode": 200,
        "body": "Audio uploaded successfully!"
    }
GetMedia APIを利用していたコード (クリックすると展開します)
import boto3
import io
import json
import struct
from datetime import datetime, timedelta
from decimal import Decimal
from ebmlite import loadSchema
from enum import Enum

JST_OFFSET = timedelta(hours=9)

class Mkv(Enum):
    SEGMENT = 0x18538067
    TAGS = 0x1254C367
    TAG = 0x7373
    SIMPLETAG = 0x67C8
    TAGNAME = 0x45A3
    TAGSTRING = 0x4487
    TAGBINARY = 0x4485
    CLUSTER = 0x1F43B675
    SIMPLEBLOCK = 0xA3

class Ebml(Enum):
    EBML = 0x1A45DFA3

class KVSParser:
    def __init__(self, media_content):
        self.__stream = media_content['Payload']
        self.__schema = loadSchema('matroska.xml')
        self.__buffer = bytearray()

    @property
    def fragments(self):
        fragment_list = list()
        for chunk in self.__stream:
            fragment = self.__parse(chunk)
            if fragment:
                fragment_list.append(fragment)
        return fragment_list

    def __parse(self, chunk):
        self.__buffer.extend(chunk)
        header_elements = [e for e in self.__schema.loads(
            self.__buffer) if e.id == Ebml.EBML.value]
        if header_elements:
            offset = header_elements[0].offset
            fragment_bytes = self.__buffer[:offset]
            fragment_dom = self.__schema.loads(fragment_bytes)
            self.__buffer = self.__buffer[offset:]
            return fragment_dom

def decimal_to_int(obj):
    if isinstance(obj, Decimal):
        return int(obj)

def get_media_data(stream_arn, start_fragment_num):
    kinesis_client = boto3.client('kinesisvideo')
    endpoint_response = kinesis_client.get_data_endpoint(
        StreamARN=stream_arn, APIName='GET_MEDIA')
    data_endpoint = endpoint_response['DataEndpoint']
    kinesis_video_client = boto3.client(
        'kinesis-video-media', endpoint_url=data_endpoint)
    return kinesis_video_client.get_media(StreamARN=stream_arn, StartSelector={'StartSelectorType': 'FRAGMENT_NUMBER', 'AfterFragmentNumber': start_fragment_num})

def get_simple_blocks(media_content):
    parser = KVSParser(media_content)
    documents = parser.fragments
    simple_blocks = []
    for document in documents:
        segment = next(filter(lambda s: s.id == Mkv.SEGMENT.value, document))
        cluster = next(filter(lambda c: c.id == Mkv.CLUSTER.value, segment))
        simple_blocks.extend(
            [b.value for b in cluster if b.id == Mkv.SIMPLEBLOCK.value])
    return simple_blocks

def remove_margin_and_combine_samples(simple_blocks, margin=4):
    total_length = sum(len(block) - margin for block in simple_blocks)
    combined_samples = bytearray(total_length)
    position = 0
    for block in simple_blocks:
        temp = block[margin:]
        combined_samples[position:position+len(temp)] = temp
        position += len(temp)
    return combined_samples

def convert_bytearray_to_wav(samples):
    length = len(samples)
    channel = 1
    bit_par_sample = 16
    format_code = 1
    sample_rate = 8000
    header_size = 44
    wav = bytearray(header_size + length)
    wav[0:4] = b'RIFF'
    wav[4:8] = struct.pack('<I', 36 + length)
    wav[8:12] = b'WAVE'
    wav[12:16] = b'fmt '
    wav[16:20] = struct.pack('<I', 16)
    wav[20:22] = struct.pack('<H', format_code)
    wav[22:24] = struct.pack('<H', channel)
    wav[24:28] = struct.pack('<I', sample_rate)
    wav[28:32] = struct.pack('<I', sample_rate * channel * bit_par_sample // 8)
    wav[32:34] = struct.pack('<H', channel * bit_par_sample // 8)
    wav[34:36] = struct.pack('<H', bit_par_sample)
    wav[36:40] = b'data'
    wav[40:44] = struct.pack('<I', length)
    wav[44:] = samples
    return wav

def upload_audio_to_s3(bucket_name, audio_data, filename, s3_client):
    s3_client.upload_fileobj(io.BytesIO(
        audio_data), bucket_name, filename, ExtraArgs={"ContentType": "audio/wav"})

def lambda_handler(event, context):
    print('Received event:' + json.dumps(event,default=decimal_to_int, ensure_ascii=False))
    MediaStreams = event['Details']['ContactData']['MediaStreams']['Customer']['Audio']
    video_stream_arn = MediaStreams['StreamARN']
    start_fragment_number = MediaStreams['StartFragmentNumber']

    media_data = get_media_data(video_stream_arn, start_fragment_number)
    simple_blocks = get_simple_blocks(media_data)
    combined_samples = remove_margin_and_combine_samples(simple_blocks)
    wav_audio = convert_bytearray_to_wav(combined_samples)

    s3_clinet = boto3.client('s3')
    bucket_name = "バケット名"
    jst_time = datetime.utcnow() + JST_OFFSET
    filename = "output_{date}.wav".format(date=jst_time.strftime("%Y%m%d_%H%M%S"))

    upload_audio_to_s3(bucket_name, wav_audio, filename, s3_clinet)

    return {
        'statusCode': 200,
        'body': 'Audio uploaded successfully!'
    }

Lambdaの処理は以下の流れです。

  1. コンタクトフローがLambdaを呼び出すと、イベントデータからKVSのストリームARNフラグメント番号を取得します
  2. 取得した情報を使ってKVSからメディアデータを取得します
  3. 取得したメディアデータからSimpleBlock(フラグメントの実データ部分)を抽出します
  4. SimpleBlockから不要箇所を取り除き、これらを結合します
  5. 結合したデータをWAV形式に変換します
  6. 最後に、変換した音声データをS3バケットにアップロードします

2番目の処理

取得したKVSのARNとフラグメント番号を使用して、KVSからメディアデータを取得します。取得したデータは、MKVフォーマットで提供されます

3番目の処理

音声データ等を取り出すには、まず MKVファイルをebmliteライブラリを用いて解析し、その中にあるSimpleBlockを抽出する必要があります。

4番目の処理

SimpleBlockには、実際の音声データ以外の情報も含んでいますので、各SimpleBlockから先頭4バイトを削除します。

純粋な音声データだけが残った各SimpleBlockを結合して、一つのバイト列(8ビットのデータ列を1つの単位として扱うデータ形式)を作成します。

5番目の処理

WAVファイルは、RIFF (Resource Interchange File Format) の一種で、音声ファイルを保存するのに利用される形式です。WAVファイルの構造は以下の3つの主要な部分から成り立っています

  • RIFFチャンク:マルチメディア用のファイルフォーマット
  • fmtチャンク:音声データの形式を定義する情報
  • dataチャンク:実際の音声データ

このようなWAVファイルのフォーマットに従い、音声データをバイト列としてWAVファイルに変換します。

WAVファイルの変換処理は、下記でWAVファイルのフォーマットの各項目を記載しています。

def convert_bytearray_to_wav(samples):
    length = len(samples)
    channel = 1
    bit_par_sample = 16
    format_code = 1
    rate = 8000
    header_size = 44
    wav = bytearray(header_size + length)

    # RIFFチャンク
    # 'RIFF'という識別子(ID)
    wav[0:4] = b'RIFF'
    # チャンクサイズ:全体のファイルサイズを示すが、この部分を除く
    wav[4:8] = struct.pack('<I', 36 + length) 
    # フォーマット:WAVE
    wav[8:12] = b'WAVE'

    # fmtチャンク:音声データの形式を定義する情報
    # 'fmt '識別子
    wav[12:16] = b'fmt '
    # fmtチャンクのバイト数:通常16バイト
    wav[16:20] = struct.pack('<I', 16)
    # 音声フォーマット:1
    wav[20:22] = struct.pack('<H', format_code)
    # チャンネル数:モノラルは1
    wav[22:24] = struct.pack('<H', channel)
    # サンプリングレート:秒間のサンプル数(Connectは8000Hz)
    wav[24:28] = struct.pack('<I', rate)
    # 1 秒あたりバイト数の平均
    wav[28:32] = struct.pack('<I', rate * channel * bit_par_sample // 8)
    # ブロックサイズ
    wav[32:34] = struct.pack('<H', channel * bit_par_sample // 8)
    # サンプルあたりのビット数
    wav[34:36] = struct.pack('<H', bit_par_sample)

    # dataチャンク:実際の音声データ
    # 'data'識別子
    wav[36:40] = b'data'
    # 波形データのバイト数:音声データのサイズ
    wav[40:44] = struct.pack('<I', length)
    # 波形データ:実際の音声データ
    wav[44:] = samples
    return wav

MKVファイルの解析にはebmliteライブラリを使用します。このライブラリを使用する場合は、先にZIP化してLambda レイヤーにアップロードしておきます。

$ python3 -m pip install -t ./python ebmlite
$ zip -r ebmlite-3.3.1.zip ./python

Connectの設定

ライブメディアストリーミングは有効にします。

Connectのコンタクトフロー作成

コンタクフローは下記の通りです。

  1. 「メディアストリーミングの開始」ブロックでKVSへの音声ストリーミングを開始します。
  2. 「顧客の入力を保存する」ブロックで、顧客が特定の番号のボタンを押すと、ストリーミングを終了します。
  3. 「AWS Lambda 関数を呼び出す」ブロックで、LambdaがKVSからデータを取得します。取得したデータをWAVファイルに変換し、S3バケットに保存します。

「メディアストリーミングの開始」は、顧客から(顧客のみ)にします。

「顧客の入力を保存する」は、メッセージをアナウンスするようにし、顧客が番号のボタンを押すとストリーミングを終了します。

終了するキープレスを指定は、指定してもよいです。

制限時間は5分です。

「AWS Lambda 関数を呼び出す」では、作成したLambdaを指定します。タイムアウト1秒にしています。タイムアウトしてもLambdaは、処理を続けますので問題ありません。

コンタクフロー作成後、コンタクフローに電話番号を紐づけて、電話をかけると、音声が録音されます。

録音後、S3バケットに保存されたことが確認できました。

Lambdaのイベントデータ

コンタクフローでLambdaが呼ばれた際のイベントデータでは、KVSのARN(StreamARN)、開始フラグメント番号(StartFragmentNumber)、開始時間(StartTimestamp)が設定不要で取得できるので、ラクですね。

{
    "Details": {
        "ContactData": {
            "Attributes": {},
            "AwsRegion": "ap-northeast-1",
            "Channel": "VOICE",
            "ContactId": "4c34cc6b-9126-47f1-b21c-1b255c9a1e8e",
            "CustomerEndpoint": {
                "Address": "+81xxxxxxxx",
                "Type": "TELEPHONE_NUMBER"
            },
            "CustomerId": null,
            "Description": null,
            "InitialContactId": "4c34cc6b-9126-47f1-b21c-1b255c9a1e8e",
            "InitiationMethod": "INBOUND",
            "InstanceARN": "arn:aws:connect:ap-northeast-1:111111111111:instance/3ff2093d-af96-43fd-b038-3c07cdd7609c",
            "LanguageCode": "ja-JP",
            "MediaStreams": {
                "Customer": {
                    "Audio": {
                        "StartFragmentNumber": "91343852333181437344442219799177485496994629466",
                        "StartTimestamp": "1697790533391",
                        "StreamARN": "arn:aws:kinesisvideo:ap-northeast-1:111111111111:stream/xxxxx-contact-7fe907b5-60ee-49b4-9b68-996609b8eb54/1697789374343",
                        "StopFragmentNumber": "91343852333181437393959821370592698935066067380",
                        "StopTimestamp": "1697790543806"
                    }
                }
            },
            "Name": null,
            "PreviousContactId": "4c34cc6b-9126-47f1-b21c-1b255c9a1e8e",
            "Queue": null,
            "References": {},
            "RelatedContactId": null,
            "SystemEndpoint": {
                "Address": "+81xxxxxxxx",
                "Type": "TELEPHONE_NUMBER"
            }
        },
        "Parameters": {}
    },
    "Name": "ContactFlowEvent"
}

参考