Amazon ConnectとKinesis Data Streamsを使用し、エージェントの介在がないIVRの場合でも電話中のユーザーの発話を録音する方法

2023.10.25

はじめに

この記事では、Amazon ConnectとKinesis Data Streams(以下、KDS)を用いて、エージェントが介在しなくても、ユーザーの発話を録音する方法を紹介します。

Connectでは、下記のコンタクフローのブロックで録音できますが、録音条件は、顧客とエージェントが繋がってからのみ録音されます。

例えば「留守番電話」やAmazon Lexと組み合わせた「AIチャットボット」のように、エージェントが介在しない場合、通常の録音機能は利用できません。

解決策として、コンタクフロー内で「メディアストリーミングの開始」というブロックを利用し、Kinesis Video Streams(以降、KVS)にメディアデータを保存できます。ただし、注意点として、保存されたメディアデータはMatroska(MKV)形式となるため、一般的であるWAVなどの形式に変換する作業が必要となります。

Matroska(以降、MKV)は、動画や音声、字幕などのマルチメディアデータを一つのファイルに格納できる、コンテナフォーマットです。

また、Connectでは、通話終了後に、通話の詳細情報を問い合わせ追跡記録(Contact Trace Record、以降、問い合わせ記録)として、リアルタイムでKDSにエクスポートすることが可能です。

今回は、通話を終了後に、LambdaがKDSからの問い合わせ記録を受けとり、KVSからMKV形式のメディアデータを取得し、WAV形式に変換し、S3バケットに保存する方法を説明します。

構成

システムの流れは次の通りです。

  1. コンタクトフロー内で「メディアストリーミングの開始」ブロックを使って、KVSへ音声をストリーミングします。
  2. 「顧客の入力を保存する」ブロックにより、顧客が特定の番号を押すと、ストリーミングが終了します。
  3. 通話が終了すると、問い合わせ記録がKDSにエクスポートされ、KDSによってLambdaをトリガーし、KVSからデータを取得します。取得したデータをWAV形式に変換し、S3バケットに音声ファイルを保存します。
    • Lambdaでは、MKV形式のメディアデータをKVSから取得するために、KDSからKVSのARN開始フラグメント番号を取得します。

これによって、コンタクフローを調整し「留守番電話」として利用することができます。

また、「顧客の入力を保存する」ではなく、Lexを利用することで、AIチャットボットでの会話中は全て録音することが可能です。

注意点

今回のシステム構成では、「メディアストリーミングの開始」ブロックまで処理が進むと、音声データがS3バケットに保存されます。

「顧客の入力を保存する」ブロックで、何も押さずに電話を途中で切った場合でも録音可能です。

もし特定の条件の顧客だけを録音する(例えば、フローを適切に処理している顧客のみ)必要がある場合は、フロー内で「AWS Lambda関数を呼び出す」ブロックを使う方法があります。

下記の記事に方法をまとめていますので、ご参考ください。

上記の記事の構成は、下記の通りです。コンタクフロー内でLambdaが呼ばれた場合に限り、S3バケットに録音ファイルを保存できます。

Kinesis Data Steam作成

Kinesis Data Steamを作成します。名前のみ記入し、他はデフォルトのままで作成します。

Connectの設定

ライブメディアストリーミングを有効にします。

また、先程作成したKDSを選択し、データストリーミングを有効にします。

Connectのコンタクトフロー作成

コンタクフローは下記の通りです。

  1. 「メディアストリーミングの開始」ブロックでKVSへの音声ストリーミングを開始します。
  2. 「顧客の入力を保存する」ブロックで、顧客が特定の番号のボタンを押すと、ストリーミングを終了します。
  3. 通話終了後、問い合わせ記録がKDSにエクスポートされます

「メディアストリーミングの開始」は、顧客から(顧客のみ)にします。

「顧客の入力を保存する」は、メッセージをアナウンスするようにし、顧客が番号のボタンを押すとストリーミングを終了します。

終了するキープレスを指定は、指定してもよいです。

Lambda作成

ランタイムPython3.11を使用して作成します。

タイムアウトは1分に変更します。

IAMポリシーは、以下を追加しました。

  • AmazonKinesisReadOnlyAccess
  • AmazonKinesisVideoStreamsReadOnlyAccess
  • AmazonS3FullAccess

また、KDSをトリガーにLambdaを起動するため、トリガー設定を行います。

コードは下記のとおりです。

2023年11月6日追記

KVSでメディアデータを取得する際、GetMedia APIを利用したコードを紹介しましたが、正しくはGetMediaForFragmentList APIを利用するべきでした。

GetMediaForFragmentList APIを利用すべき理由は、下記の記事をご参照ください

import boto3, io, struct, base64, json
from ebmlite import loadSchema
from enum import Enum
from datetime import datetime, timedelta, timezone
from botocore.config import Config
from decimal import Decimal

JST_OFFSET = timedelta(hours=9)

def decimal_to_int(obj):
    if isinstance(obj, Decimal):
        return int(obj)

class Mkv(Enum):
    SEGMENT = 0x18538067
    CLUSTER = 0x1F43B675
    SIMPLEBLOCK = 0xA3

class Ebml(Enum):
    EBML = 0x1A45DFA3

class KVSParser:
    def __init__(self, media_content):
        self.__stream = media_content["Payload"]
        self.__schema = loadSchema("matroska.xml")
        self.__buffer = bytearray()

    @property
    def fragments(self):
        return [fragment for chunk in self.__stream if (fragment := self.__parse(chunk))]

    def __parse(self, chunk):
        self.__buffer.extend(chunk)
        header_elements = [e for e in self.__schema.loads(self.__buffer) if e.id == Ebml.EBML.value]
        if header_elements:
            fragment_dom = self.__schema.loads(self.__buffer[:header_elements[0].offset])
            self.__buffer = self.__buffer[header_elements[0].offset:]
            return fragment_dom

def get_simple_blocks(media_content):
    parser = KVSParser(media_content)
    return [b.value for document in parser.fragments for b in 
            next(filter(lambda c: c.id == Mkv.CLUSTER.value, 
                        next(filter(lambda s: s.id == Mkv.SEGMENT.value, document)))) 
            if b.id == Mkv.SIMPLEBLOCK.value]

def create_audio_sample(simple_blocks, margin=4):
    position = 0
    total_length = sum(len(block) - margin for block in simple_blocks)
    combined_samples = bytearray(total_length)
    for block in simple_blocks:
        temp = block[margin:]
        combined_samples[position:position+len(temp)] = temp
        position += len(temp)
    return combined_samples

def convert_bytearray_to_wav(samples):
    length = len(samples)
    channel = 1
    bit_par_sample = 16
    format_code = 1
    sample_rate = 8000
    header_size = 44
    wav = bytearray(header_size + length)
    
    wav[0:4] = b"RIFF"
    wav[4:8] = struct.pack("<I", 36 + length)
    wav[8:12] = b"WAVE"
    wav[12:16] = b"fmt "
    wav[16:20] = struct.pack("<I", 16)
    wav[20:22] = struct.pack("<H", format_code)
    wav[22:24] = struct.pack("<H", channel)
    wav[24:28] = struct.pack("<I", sample_rate)
    wav[28:32] = struct.pack("<I", sample_rate * channel * bit_par_sample // 8)
    wav[32:34] = struct.pack("<H", channel * bit_par_sample // 8)
    wav[34:36] = struct.pack("<H", bit_par_sample)
    wav[36:40] = b"data"
    wav[40:44] = struct.pack("<I", length)
    
    wav[44:] = samples
    return wav

def create_kvs_client():
    region_name = "ap-northeast-1"
    return boto3.client("kinesisvideo", region_name=region_name)

def create_archive_media_client(ep):
    region_name = "ap-northeast-1"
    return boto3.client("kinesis-video-archived-media", endpoint_url=ep, config=Config(region_name=region_name))

def upload_audio_to_s3(bucket_name, audio_data, filename):
    region_name = "ap-northeast-1"
    s3_client = boto3.client("s3", region_name=region_name)
    s3_client.upload_fileobj(io.BytesIO(audio_data), bucket_name, filename, ExtraArgs={"ContentType": "audio/wav"})

def get_media_data(arn, start_timestamp, end_timestamp):
    kvs_client = create_kvs_client()

    list_frags_ep = kvs_client.get_data_endpoint(StreamARN=arn, APIName="LIST_FRAGMENTS")["DataEndpoint"]
    list_frags_client = create_archive_media_client(list_frags_ep)

    fragment_list = list_frags_client.list_fragments(
        StreamARN = arn, 
        FragmentSelector = {
            "FragmentSelectorType": "PRODUCER_TIMESTAMP",
            "TimestampRange": {"StartTimestamp": datetime.fromtimestamp(start_timestamp), "EndTimestamp": datetime.fromtimestamp(end_timestamp)}
        }
    )

    sorted_fragments = sorted(fragment_list["Fragments"], key = lambda fragment: fragment["ProducerTimestamp"])
    fragment_number_array = [fragment["FragmentNumber"] for fragment in sorted_fragments]

    get_media_ep = kvs_client.get_data_endpoint(StreamARN=arn, APIName="GET_MEDIA_FOR_FRAGMENT_LIST")["DataEndpoint"]
    get_media_client = create_archive_media_client(get_media_ep)

    media = get_media_client.get_media_for_fragment_list(StreamARN = arn, Fragments = fragment_number_array)
    return media

def save_audio_to_tmp(wav_audio, filename="output.wav"):
    with open(f"/tmp/{filename}", "wb") as out_file:
        out_file.write(wav_audio)

def extract_kvs_details(event):
    kinesis_record = event['Records'][0]['kinesis']
    data_string = kinesis_record['data']
    decoded_data = base64.b64decode(data_string)
    decoded_string = decoded_data.decode('utf-8') 
    print('Decoded data:', decoded_string)
    data_json = json.loads(decoded_string)
    return {
        'VideoStreamARN': data_json['Recordings'][0]['Location'],
        'StartTimestamp': data_json['Recordings'][0]['StartTimestamp'],
        'StopTimestamp': data_json['Recordings'][0]['StopTimestamp']
        }

def convert_to_unix(timestamp_str):
    timestamp_dt = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%SZ")
    return int(timestamp_dt.replace(tzinfo=timezone.utc).timestamp())

def lambda_handler(event, context):
    print('Received event:' + json.dumps(event,default=decimal_to_int, ensure_ascii=False))
    kvs_details = extract_kvs_details(event)
    kvs_arn = kvs_details['VideoStreamARN']

    start_timestamp = convert_to_unix(kvs_details["StartTimestamp"])
    end_timestamp = convert_to_unix(kvs_details["StopTimestamp"])

    combined_samples = create_audio_sample(
        get_simple_blocks(get_media_data(kvs_arn, start_timestamp, end_timestamp)))

    wav_audio = convert_bytearray_to_wav(combined_samples)
    bucket_name = "バケット名"
    jst_time = datetime.utcnow() + JST_OFFSET
    filename = "output_{date}.wav".format(date=jst_time.strftime("%Y%m%d_%H%M%S"))

    upload_audio_to_s3(bucket_name, wav_audio, filename)

    return {
        "statusCode": 200,
        "body": "Audio uploaded successfully!"
    }
GetMedia APIを利用していたコード (クリックすると展開します)
import base64
import boto3
import io
import json
import struct
from datetime import datetime, timedelta
from decimal import Decimal
from ebmlite import loadSchema
from enum import Enum

JST_OFFSET = timedelta(hours=9)

class Mkv(Enum):
    SEGMENT = 0x18538067
    TAGS = 0x1254C367
    TAG = 0x7373
    SIMPLETAG = 0x67C8
    TAGNAME = 0x45A3
    TAGSTRING = 0x4487
    TAGBINARY = 0x4485
    CLUSTER = 0x1F43B675
    SIMPLEBLOCK = 0xA3

class Ebml(Enum):
    EBML = 0x1A45DFA3

class KVSParser:
    def __init__(self, media_content):
        self.__stream = media_content['Payload']
        self.__schema = loadSchema('matroska.xml')
        self.__buffer = bytearray()

    @property
    def fragments(self):
        fragment_list = list()
        for chunk in self.__stream:
            fragment = self.__parse(chunk)
            if fragment:
                fragment_list.append(fragment)
        return fragment_list

    def __parse(self, chunk):
        self.__buffer.extend(chunk)
        header_elements = [e for e in self.__schema.loads(
            self.__buffer) if e.id == Ebml.EBML.value]
        if header_elements:
            offset = header_elements[0].offset
            fragment_bytes = self.__buffer[:offset]
            fragment_dom = self.__schema.loads(fragment_bytes)
            self.__buffer = self.__buffer[offset:]
            return fragment_dom

def decimal_to_int(obj):
    if isinstance(obj, Decimal):
        return int(obj)

def get_media_data(stream_arn, start_fragment_num):
    kinesis_client = boto3.client('kinesisvideo')
    endpoint_response = kinesis_client.get_data_endpoint(
        StreamARN=stream_arn, APIName='GET_MEDIA')
    data_endpoint = endpoint_response['DataEndpoint']
    kinesis_video_client = boto3.client(
        'kinesis-video-media', endpoint_url=data_endpoint)
    return kinesis_video_client.get_media(StreamARN=stream_arn, StartSelector={'StartSelectorType': 'FRAGMENT_NUMBER', 'AfterFragmentNumber': start_fragment_num})

def get_simple_blocks(media_content):
    parser = KVSParser(media_content)
    documents = parser.fragments
    simple_blocks = []
    for document in documents:
        segment = next(filter(lambda s: s.id == Mkv.SEGMENT.value, document))
        cluster = next(filter(lambda c: c.id == Mkv.CLUSTER.value, segment))
        simple_blocks.extend(
            [b.value for b in cluster if b.id == Mkv.SIMPLEBLOCK.value])
    return simple_blocks

def remove_margin_and_combine_samples(simple_blocks, margin=4):
    total_length = sum(len(block) - margin for block in simple_blocks)
    combined_samples = bytearray(total_length)
    position = 0
    for block in simple_blocks:
        temp = block[margin:]
        combined_samples[position:position+len(temp)] = temp
        position += len(temp)
    return combined_samples

def convert_bytearray_to_wav(samples):
    length = len(samples)
    channel = 1
    bit_par_sample = 16
    format_code = 1
    sample_rate = 8000
    header_size = 44
    wav = bytearray(header_size + length)
    wav[0:4] = b'RIFF'
    wav[4:8] = struct.pack('<I', 36 + length)
    wav[8:12] = b'WAVE'
    wav[12:16] = b'fmt '
    wav[16:20] = struct.pack('<I', 16)
    wav[20:22] = struct.pack('<H', format_code)
    wav[22:24] = struct.pack('<H', channel)
    wav[24:28] = struct.pack('<I', sample_rate)
    wav[28:32] = struct.pack('<I', sample_rate * channel * bit_par_sample // 8)
    wav[32:34] = struct.pack('<H', channel * bit_par_sample // 8)
    wav[34:36] = struct.pack('<H', bit_par_sample)
    wav[36:40] = b'data'
    wav[40:44] = struct.pack('<I', length)
    wav[44:] = samples
    return wav

def upload_audio_to_s3(bucket_name, audio_data, filename, s3_client):
    s3_client.upload_fileobj(io.BytesIO(
        audio_data), bucket_name, filename, ExtraArgs={"ContentType": "audio/wav"})
    
def extract_kvs_details(event):
    kinesis_record = event['Records'][0]['kinesis']
    data_string = kinesis_record['data']
    decoded_data = base64.b64decode(data_string)
    decoded_string = decoded_data.decode('utf-8') 
    print('Decoded data:', decoded_string)
    data_json = json.loads(decoded_string)
    return {
        'VideoStreamARN': data_json['Recordings'][0]['Location'],
        'StartFragmentNumber': data_json['Recordings'][0]['FragmentStartNumber']
        }

def lambda_handler(event, context):
    print('Received event:' + json.dumps(event,default=decimal_to_int, ensure_ascii=False))
    kvs_details = extract_kvs_details(event)
    kvs_arn = kvs_details['VideoStreamARN']
    start_fragment_number = kvs_details['StartFragmentNumber']

    media_data = get_media_data(kvs_arn, start_fragment_number)
    simple_blocks = get_simple_blocks(media_data)
    combined_samples = remove_margin_and_combine_samples(simple_blocks)
    wav_audio = convert_bytearray_to_wav(combined_samples)

    s3_clinet = boto3.client('s3')
    bucket_name = "cm-hirai-kinesis-video-stream"
    jst_time = datetime.utcnow() + JST_OFFSET
    filename = "output_{date}.wav".format(date=jst_time.strftime("%Y%m%d_%H%M%S"))

    upload_audio_to_s3(bucket_name, wav_audio, filename, s3_clinet)

    return {
        'statusCode': 200,
        'body': 'Audio uploaded successfully!'
    }

コードの解説はこちらを参考ください

参考記事に記載している通り、Matroskaファイルの解析にはebmliteライブラリを使用します。このライブラリを使用する場合は、先にZIP化してLambda レイヤーにアップロードしましょう。

$ python3 -m pip install -t ./python ebmlite
$ zip -r ebmlite-3.3.1.zip ./python

実際に試してみた

コンタクフローに電話番号を紐づけて、電話をかけると、音声が録音されます。

録音後、S3バケットに保存されたことが確認できました。

ちなみに、通話を終了してから、KDSをトリガーにLambdaが起動するまでには、1分ほどかかりました。

Lambdaのイベントデータ

KDSからトリガーされたLambdaは、問い合わせ記録をイベントデータとして取得されます。

イベントデータの例は下記のとおりです。

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "82cd81b0-c46b-4f44-849a-a294af488370",
                "sequenceNumber": "49645657297583566760061509862006490884521875685018435586",
                "data": "xxxxxxxxxxxxxxx",
                "approximateArrivalTimestamp": 1697891290.568
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49645657297583566760061509862006490884521875685018435586",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::1111111111111:role/service-role/cm-hirai-kinesis-data-stream-role-r9cfserw",
            "awsRegion": "ap-northeast-1",
            "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:1111111111111:stream/cm-hirai"
        }
    ]
}

event['Records'][0]['kinesis']['data']はBase64でエンコードされていますので、まずBase64でデコードし、さらにUTF-8文字列にデコードします。すると、以下のJSON形式で取得できます。

Lambdaでは、下記から、KVSのARNと開始フラグメント番号FragmentStartNumberを取得します。

{
    "AWSAccountId": "111111111111",
    "AWSContactTraceRecordFormatVersion": "2017-03-10",
    "Agent": null,
    "AgentConnectionAttempts": 0,
    "AnsweringMachineDetectionStatus": null,
    "Attributes": {},
    "Campaign": {
        "CampaignId": null
    },
    "Channel": "VOICE",
    "ConnectedToSystemTimestamp": "2023-10-21T12:26:45Z",
    "ContactDetails": {},
    "ContactId": "82cd81b0-c46b-4f44-849a-a294af488370",
    "CustomerEndpoint": {
        "Address": "+81xxxxxxxxx",
        "Type": "TELEPHONE_NUMBER"
    },
    "CustomerVoiceActivity": null,
    "DisconnectReason": "CONTACT_FLOW_DISCONNECT",
    "DisconnectTimestamp": "2023-10-21T12:27:00Z",
    "InitialContactId": null,
    "InitiationMethod": "INBOUND",
    "InitiationTimestamp": "2023-10-21T12:26:45Z",
    "InstanceARN": "arn:aws:connect:ap-northeast-1:111111111111:instance/3ff2093d-af96-43fd-b038-3c07cdd7609c",
    "LastUpdateTimestamp": "2023-10-21T12:28:09Z",
    "MediaStreams": [
        {
            "Type": "AUDIO"
        }
    ],
    "NextContactId": null,
    "PreviousContactId": null,
    "Queue": null,
    "Recording": null,
    "Recordings": [
        {
            "DeletionReason": null,
            "FragmentStartNumber": "91343852333181437344442219799204510878849660669",
            "FragmentStopNumber": "91343852333181437374152780742053638827886413933",
            "Location": "arn:aws:kinesisvideo:ap-northeast-1:111111111111:stream/cm-hirai-connect-demo02-classmethod-contact-2e090f38-c73c-4f68-92e6-1d905e0d15f1/1697888777951",
            "MediaStreamType": "VIDEO",
            "ParticipantType": "CUSTOMER",
            "StartTimestamp": "2023-10-21T12:26:50Z",
            "Status": null,
            "StopTimestamp": "2023-10-21T12:26:57Z",
            "StorageType": "KINESIS_VIDEO_STREAM"
        }
    ],
    "References": [],
    "ScheduledTimestamp": null,
    "SystemEndpoint": {
        "Address": "+81xxxxxxx",
        "Type": "TELEPHONE_NUMBER"
    },
    "TransferCompletedTimestamp": null,
    "TransferredToEndpoint": null,
    "VoiceIdResult": null
}

参考