Amazon ConnectとKinesis Data Streamsを使用し、エージェントの介在がないIVRの場合でも電話中のユーザーの発話を録音する方法
はじめに
この記事では、Amazon ConnectとKinesis Data Streams(以下、KDS)を用いて、エージェントが介在しなくても、ユーザーの発話を録音する方法を紹介します。
Connectでは、下記のコンタクフローのブロックで録音できますが、録音条件は、顧客とエージェントが繋がってからのみ録音されます。
例えば「留守番電話」やAmazon Lexと組み合わせた「AIチャットボット」のように、エージェントが介在しない場合、通常の録音機能は利用できません。
解決策として、コンタクフロー内で「メディアストリーミングの開始」というブロックを利用し、Kinesis Video Streams(以降、KVS)にメディアデータを保存できます。ただし、注意点として、保存されたメディアデータはMatroska(MKV)形式となるため、一般的であるWAVなどの形式に変換する作業が必要となります。
Matroska(以降、MKV)は、動画や音声、字幕などのマルチメディアデータを一つのファイルに格納できる、コンテナフォーマットです。
また、Connectでは、通話終了後に、通話の詳細情報を問い合わせ追跡記録(Contact Trace Record、以降、問い合わせ記録)として、リアルタイムでKDSにエクスポートすることが可能です。
今回は、通話を終了後に、LambdaがKDSからの問い合わせ記録を受けとり、KVSからMKV形式のメディアデータを取得し、WAV形式に変換し、S3バケットに保存する方法を説明します。
構成
システムの流れは次の通りです。
- コンタクトフロー内で「メディアストリーミングの開始」ブロックを使って、KVSへ音声をストリーミングします。
- 「顧客の入力を保存する」ブロックにより、顧客が特定の番号を押すと、ストリーミングが終了します。
- 通話が終了すると、問い合わせ記録がKDSにエクスポートされ、KDSによってLambdaをトリガーし、KVSからデータを取得します。取得したデータをWAV形式に変換し、S3バケットに音声ファイルを保存します。
- Lambdaでは、MKV形式のメディアデータをKVSから取得するために、KDSから
KVSのARN
と開始フラグメント番号
を取得します。
- Lambdaでは、MKV形式のメディアデータをKVSから取得するために、KDSから
これによって、コンタクフローを調整し「留守番電話」として利用することができます。
また、「顧客の入力を保存する」ではなく、Lexを利用することで、AIチャットボットでの会話中は全て録音することが可能です。
注意点
今回のシステム構成では、「メディアストリーミングの開始」ブロックまで処理が進むと、音声データがS3バケットに保存されます。
「顧客の入力を保存する」ブロックで、何も押さずに電話を途中で切った場合でも録音可能です。
もし特定の条件の顧客だけを録音する(例えば、フローを適切に処理している顧客のみ)必要がある場合は、フロー内で「AWS Lambda関数を呼び出す」ブロックを使う方法があります。
下記の記事に方法をまとめていますので、ご参考ください。
上記の記事の構成は、下記の通りです。コンタクフロー内でLambdaが呼ばれた場合に限り、S3バケットに録音ファイルを保存できます。
Kinesis Data Steam作成
Kinesis Data Steamを作成します。名前のみ記入し、他はデフォルトのままで作成します。
Connectの設定
ライブメディアストリーミングを有効にします。
また、先程作成したKDSを選択し、データストリーミングを有効にします。
Connectのコンタクトフロー作成
コンタクフローは下記の通りです。
- 「メディアストリーミングの開始」ブロックでKVSへの音声ストリーミングを開始します。
- 「顧客の入力を保存する」ブロックで、顧客が特定の番号のボタンを押すと、ストリーミングを終了します。
- 通話終了後、問い合わせ記録がKDSにエクスポートされます
「メディアストリーミングの開始」は、顧客から
(顧客のみ)にします。
「顧客の入力を保存する」は、メッセージをアナウンスするようにし、顧客が番号のボタンを押すとストリーミングを終了します。
終了するキープレスを指定
は、指定してもよいです。
Lambda作成
ランタイムPython3.11
を使用して作成します。
タイムアウトは1分に変更します。
IAMポリシーは、以下を追加しました。
- AmazonKinesisReadOnlyAccess
- AmazonKinesisVideoStreamsReadOnlyAccess
- AmazonS3FullAccess
また、KDSをトリガーにLambdaを起動するため、トリガー設定を行います。
コードは下記のとおりです。コード内のバケット名(bucket_name)は、各自変更ください。
KVSでメディアデータを取得する際、GetMedia APIを利用したコードを紹介しましたが、正しくはGetMediaForFragmentList APIを利用するべきでした。
GetMediaForFragmentList APIを利用すべき理由は、下記の記事をご参照ください
import boto3, io, struct, base64, json from ebmlite import loadSchema from enum import Enum from datetime import datetime, timedelta, timezone from botocore.config import Config from decimal import Decimal JST_OFFSET = timedelta(hours=9) def decimal_to_int(obj): if isinstance(obj, Decimal): return int(obj) class Mkv(Enum): SEGMENT = 0x18538067 CLUSTER = 0x1F43B675 SIMPLEBLOCK = 0xA3 class Ebml(Enum): EBML = 0x1A45DFA3 class KVSParser: def __init__(self, media_content): self.__stream = media_content["Payload"] self.__schema = loadSchema("matroska.xml") self.__buffer = bytearray() @property def fragments(self): return [fragment for chunk in self.__stream if (fragment := self.__parse(chunk))] def __parse(self, chunk): self.__buffer.extend(chunk) header_elements = [e for e in self.__schema.loads(self.__buffer) if e.id == Ebml.EBML.value] if header_elements: fragment_dom = self.__schema.loads(self.__buffer[:header_elements[0].offset]) self.__buffer = self.__buffer[header_elements[0].offset:] return fragment_dom def get_simple_blocks(media_content): parser = KVSParser(media_content) return [b.value for document in parser.fragments for b in next(filter(lambda c: c.id == Mkv.CLUSTER.value, next(filter(lambda s: s.id == Mkv.SEGMENT.value, document)))) if b.id == Mkv.SIMPLEBLOCK.value] def create_audio_sample(simple_blocks, margin=4): position = 0 total_length = sum(len(block) - margin for block in simple_blocks) combined_samples = bytearray(total_length) for block in simple_blocks: temp = block[margin:] combined_samples[position:position+len(temp)] = temp position += len(temp) return combined_samples def convert_bytearray_to_wav(samples): length = len(samples) channel = 1 bit_par_sample = 16 format_code = 1 sample_rate = 8000 header_size = 44 wav = bytearray(header_size + length) wav[0:4] = b"RIFF" wav[4:8] = struct.pack("<I", 36 + length) wav[8:12] = b"WAVE" wav[12:16] = b"fmt " wav[16:20] = struct.pack("<I", 16) wav[20:22] = struct.pack("<H", format_code) wav[22:24] = struct.pack("<H", channel) wav[24:28] = struct.pack("<I", sample_rate) wav[28:32] = struct.pack("<I", sample_rate * channel * bit_par_sample // 8) wav[32:34] = struct.pack("<H", channel * bit_par_sample // 8) wav[34:36] = struct.pack("<H", bit_par_sample) wav[36:40] = b"data" wav[40:44] = struct.pack("<I", length) wav[44:] = samples return wav def create_kvs_client(): region_name = "ap-northeast-1" return boto3.client("kinesisvideo", region_name=region_name) def create_archive_media_client(ep): region_name = "ap-northeast-1" return boto3.client("kinesis-video-archived-media", endpoint_url=ep, config=Config(region_name=region_name)) def upload_audio_to_s3(bucket_name, audio_data, filename): region_name = "ap-northeast-1" s3_client = boto3.client("s3", region_name=region_name) s3_client.upload_fileobj(io.BytesIO(audio_data), bucket_name, filename, ExtraArgs={"ContentType": "audio/wav"}) def get_media_data(arn, start_timestamp, end_timestamp): kvs_client = create_kvs_client() list_frags_ep = kvs_client.get_data_endpoint(StreamARN=arn, APIName="LIST_FRAGMENTS")["DataEndpoint"] list_frags_client = create_archive_media_client(list_frags_ep) fragment_list = list_frags_client.list_fragments( StreamARN = arn, FragmentSelector = { "FragmentSelectorType": "PRODUCER_TIMESTAMP", "TimestampRange": {"StartTimestamp": datetime.fromtimestamp(start_timestamp), "EndTimestamp": datetime.fromtimestamp(end_timestamp)} } ) sorted_fragments = sorted(fragment_list["Fragments"], key = lambda fragment: fragment["ProducerTimestamp"]) fragment_number_array = [fragment["FragmentNumber"] for fragment in sorted_fragments] get_media_ep = kvs_client.get_data_endpoint(StreamARN=arn, APIName="GET_MEDIA_FOR_FRAGMENT_LIST")["DataEndpoint"] get_media_client = create_archive_media_client(get_media_ep) media = get_media_client.get_media_for_fragment_list(StreamARN = arn, Fragments = fragment_number_array) return media def save_audio_to_tmp(wav_audio, filename="output.wav"): with open(f"/tmp/{filename}", "wb") as out_file: out_file.write(wav_audio) def extract_kvs_details(event): kinesis_record = event['Records'][0]['kinesis'] data_string = kinesis_record['data'] decoded_data = base64.b64decode(data_string) decoded_string = decoded_data.decode('utf-8') print('Decoded data:', decoded_string) data_json = json.loads(decoded_string) return { 'VideoStreamARN': data_json['Recordings'][0]['Location'], 'StartTimestamp': data_json['Recordings'][0]['StartTimestamp'], 'StopTimestamp': data_json['Recordings'][0]['StopTimestamp'] } def convert_to_unix(timestamp_str): timestamp_dt = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%SZ") return int(timestamp_dt.replace(tzinfo=timezone.utc).timestamp()) def lambda_handler(event, context): print('Received event:' + json.dumps(event,default=decimal_to_int, ensure_ascii=False)) kvs_details = extract_kvs_details(event) kvs_arn = kvs_details['VideoStreamARN'] start_timestamp = convert_to_unix(kvs_details["StartTimestamp"]) end_timestamp = convert_to_unix(kvs_details["StopTimestamp"]) combined_samples = create_audio_sample( get_simple_blocks(get_media_data(kvs_arn, start_timestamp, end_timestamp))) wav_audio = convert_bytearray_to_wav(combined_samples) bucket_name = "バケット名" jst_time = datetime.utcnow() + JST_OFFSET filename = "output_{date}.wav".format(date=jst_time.strftime("%Y%m%d_%H%M%S")) upload_audio_to_s3(bucket_name, wav_audio, filename) return { "statusCode": 200, "body": "Audio uploaded successfully!" }
GetMedia APIを利用していたコード (クリックすると展開します)
import base64 import boto3 import io import json import struct from datetime import datetime, timedelta from decimal import Decimal from ebmlite import loadSchema from enum import Enum JST_OFFSET = timedelta(hours=9) class Mkv(Enum): SEGMENT = 0x18538067 TAGS = 0x1254C367 TAG = 0x7373 SIMPLETAG = 0x67C8 TAGNAME = 0x45A3 TAGSTRING = 0x4487 TAGBINARY = 0x4485 CLUSTER = 0x1F43B675 SIMPLEBLOCK = 0xA3 class Ebml(Enum): EBML = 0x1A45DFA3 class KVSParser: def __init__(self, media_content): self.__stream = media_content['Payload'] self.__schema = loadSchema('matroska.xml') self.__buffer = bytearray() @property def fragments(self): fragment_list = list() for chunk in self.__stream: fragment = self.__parse(chunk) if fragment: fragment_list.append(fragment) return fragment_list def __parse(self, chunk): self.__buffer.extend(chunk) header_elements = [e for e in self.__schema.loads( self.__buffer) if e.id == Ebml.EBML.value] if header_elements: offset = header_elements[0].offset fragment_bytes = self.__buffer[:offset] fragment_dom = self.__schema.loads(fragment_bytes) self.__buffer = self.__buffer[offset:] return fragment_dom def decimal_to_int(obj): if isinstance(obj, Decimal): return int(obj) def get_media_data(stream_arn, start_fragment_num): kinesis_client = boto3.client('kinesisvideo') endpoint_response = kinesis_client.get_data_endpoint( StreamARN=stream_arn, APIName='GET_MEDIA') data_endpoint = endpoint_response['DataEndpoint'] kinesis_video_client = boto3.client( 'kinesis-video-media', endpoint_url=data_endpoint) return kinesis_video_client.get_media(StreamARN=stream_arn, StartSelector={'StartSelectorType': 'FRAGMENT_NUMBER', 'AfterFragmentNumber': start_fragment_num}) def get_simple_blocks(media_content): parser = KVSParser(media_content) documents = parser.fragments simple_blocks = [] for document in documents: segment = next(filter(lambda s: s.id == Mkv.SEGMENT.value, document)) cluster = next(filter(lambda c: c.id == Mkv.CLUSTER.value, segment)) simple_blocks.extend( [b.value for b in cluster if b.id == Mkv.SIMPLEBLOCK.value]) return simple_blocks def remove_margin_and_combine_samples(simple_blocks, margin=4): total_length = sum(len(block) - margin for block in simple_blocks) combined_samples = bytearray(total_length) position = 0 for block in simple_blocks: temp = block[margin:] combined_samples[position:position+len(temp)] = temp position += len(temp) return combined_samples def convert_bytearray_to_wav(samples): length = len(samples) channel = 1 bit_par_sample = 16 format_code = 1 sample_rate = 8000 header_size = 44 wav = bytearray(header_size + length) wav[0:4] = b'RIFF' wav[4:8] = struct.pack('<I', 36 + length) wav[8:12] = b'WAVE' wav[12:16] = b'fmt ' wav[16:20] = struct.pack('<I', 16) wav[20:22] = struct.pack('<H', format_code) wav[22:24] = struct.pack('<H', channel) wav[24:28] = struct.pack('<I', sample_rate) wav[28:32] = struct.pack('<I', sample_rate * channel * bit_par_sample // 8) wav[32:34] = struct.pack('<H', channel * bit_par_sample // 8) wav[34:36] = struct.pack('<H', bit_par_sample) wav[36:40] = b'data' wav[40:44] = struct.pack('<I', length) wav[44:] = samples return wav def upload_audio_to_s3(bucket_name, audio_data, filename, s3_client): s3_client.upload_fileobj(io.BytesIO( audio_data), bucket_name, filename, ExtraArgs={"ContentType": "audio/wav"}) def extract_kvs_details(event): kinesis_record = event['Records'][0]['kinesis'] data_string = kinesis_record['data'] decoded_data = base64.b64decode(data_string) decoded_string = decoded_data.decode('utf-8') print('Decoded data:', decoded_string) data_json = json.loads(decoded_string) return { 'VideoStreamARN': data_json['Recordings'][0]['Location'], 'StartFragmentNumber': data_json['Recordings'][0]['FragmentStartNumber'] } def lambda_handler(event, context): print('Received event:' + json.dumps(event,default=decimal_to_int, ensure_ascii=False)) kvs_details = extract_kvs_details(event) kvs_arn = kvs_details['VideoStreamARN'] start_fragment_number = kvs_details['StartFragmentNumber'] media_data = get_media_data(kvs_arn, start_fragment_number) simple_blocks = get_simple_blocks(media_data) combined_samples = remove_margin_and_combine_samples(simple_blocks) wav_audio = convert_bytearray_to_wav(combined_samples) s3_clinet = boto3.client('s3') bucket_name = "cm-hirai-kinesis-video-stream" jst_time = datetime.utcnow() + JST_OFFSET filename = "output_{date}.wav".format(date=jst_time.strftime("%Y%m%d_%H%M%S")) upload_audio_to_s3(bucket_name, wav_audio, filename, s3_clinet) return { 'statusCode': 200, 'body': 'Audio uploaded successfully!' }
コードの解説はこちらを参考ください
参考記事に記載している通り、Matroskaファイルの解析にはebmliteライブラリを使用します。このライブラリを使用する場合は、先にZIP化してLambda レイヤーにアップロードしましょう。CloudShellからダウンロード可能です。
$ python3 -m pip install -t ./python ebmlite
Collecting ebmlite
Downloading ebmlite-3.3.1-py3-none-any.whl (92 kB)
|████████████████████████████████| 92 kB 5.9 MB/s
Installing collected packages: ebmlite
Successfully installed ebmlite-3.3.1
$ zip -r ebmlite-3.3.1.zip ./python
実際に試してみた
コンタクフローに電話番号を紐づけて、電話をかけると、音声が録音されます。
録音後、S3バケットに保存されたことが確認できました。
ちなみに、通話を終了してから、KDSをトリガーにLambdaが起動するまでには、1分ほどかかりました。
Lambdaのイベントデータ
KDSからトリガーされたLambdaは、問い合わせ記録をイベントデータとして取得されます。
イベントデータの例は下記のとおりです。
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "82cd81b0-c46b-4f44-849a-a294af488370", "sequenceNumber": "49645657297583566760061509862006490884521875685018435586", "data": "xxxxxxxxxxxxxxx", "approximateArrivalTimestamp": 1697891290.568 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000000:49645657297583566760061509862006490884521875685018435586", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::1111111111111:role/service-role/cm-hirai-kinesis-data-stream-role-r9cfserw", "awsRegion": "ap-northeast-1", "eventSourceARN": "arn:aws:kinesis:ap-northeast-1:1111111111111:stream/cm-hirai" } ] }
event['Records'][0]['kinesis']['data']
はBase64でエンコードされていますので、まずBase64でデコードし、さらにUTF-8文字列にデコードします。すると、以下のJSON形式で取得できます。
Lambdaでは、下記から、KVSのARNと開始フラグメント番号FragmentStartNumber
を取得します。
{ "AWSAccountId": "111111111111", "AWSContactTraceRecordFormatVersion": "2017-03-10", "Agent": null, "AgentConnectionAttempts": 0, "AnsweringMachineDetectionStatus": null, "Attributes": {}, "Campaign": { "CampaignId": null }, "Channel": "VOICE", "ConnectedToSystemTimestamp": "2023-10-21T12:26:45Z", "ContactDetails": {}, "ContactId": "82cd81b0-c46b-4f44-849a-a294af488370", "CustomerEndpoint": { "Address": "+81xxxxxxxxx", "Type": "TELEPHONE_NUMBER" }, "CustomerVoiceActivity": null, "DisconnectReason": "CONTACT_FLOW_DISCONNECT", "DisconnectTimestamp": "2023-10-21T12:27:00Z", "InitialContactId": null, "InitiationMethod": "INBOUND", "InitiationTimestamp": "2023-10-21T12:26:45Z", "InstanceARN": "arn:aws:connect:ap-northeast-1:111111111111:instance/3ff2093d-af96-43fd-b038-3c07cdd7609c", "LastUpdateTimestamp": "2023-10-21T12:28:09Z", "MediaStreams": [ { "Type": "AUDIO" } ], "NextContactId": null, "PreviousContactId": null, "Queue": null, "Recording": null, "Recordings": [ { "DeletionReason": null, "FragmentStartNumber": "91343852333181437344442219799204510878849660669", "FragmentStopNumber": "91343852333181437374152780742053638827886413933", "Location": "arn:aws:kinesisvideo:ap-northeast-1:111111111111:stream/cm-hirai-connect-demo02-classmethod-contact-2e090f38-c73c-4f68-92e6-1d905e0d15f1/1697888777951", "MediaStreamType": "VIDEO", "ParticipantType": "CUSTOMER", "StartTimestamp": "2023-10-21T12:26:50Z", "Status": null, "StopTimestamp": "2023-10-21T12:26:57Z", "StorageType": "KINESIS_VIDEO_STREAM" } ], "References": [], "ScheduledTimestamp": null, "SystemEndpoint": { "Address": "+81xxxxxxx", "Type": "TELEPHONE_NUMBER" }, "TransferCompletedTimestamp": null, "TransferredToEndpoint": null, "VoiceIdResult": null }