Amazon ConnectからKinesis Video Streamに送信する音声データは、GetMediaForFragmentList APIで取得すべき理由

2023.11.06

はじめに

以前、Amazon Connectでエージェントが介在しない「留守番電話」や「AIチャットボット」で録音したい場合、Kinesis Video Streams(以降、KVS)経由でAWS Lambdaを使い音声データの録音と保存する方法をまとめました。

ただし、ConnectからKVS経由で音声ファイルS3バケットに保存すると、録音した音声と異なる音声が時折含まれていました。

調査した結果、以下の記事でも同じ現象に言及しておりました。その記事では、KVSからメディアデータを取得する際にGetMediaAPIではなくGetMediaForFragmentListAPIを利用することで、この問題が解消されたと書かれていました。

今回の記事では、上記のGetMediaForFragmentListを利用することで問題が解消された理由を解説し、先述の記事Node.jsで書かれていたため、Pythonのコードでの実装内容を明記します。

KVSに出てくる用語

下記の用語は、KVSで必須ですのでまとめました。

  • メディアデータ
    • 音声、動画、画像といった形式により人間が判断できる情報のことです。ビデオ、オーディオ、および関連メタデータから構成されます。
  • チャンク
    • チャンクは大きなメディアデータを取り扱いやすいサイズに分割した部分データです。
    • 各チャンクはメディアデータ全体の一部を表し、独立してストリームに送信、処理されます。
    • ストリーム内でのデータの格納形式であり、以下を含みます。
      • メディアメタデータ
      • フラグメント
      • KVSのメタデータ(フラグメント番号、サーバー側タイムスタンプ、プロデューサー側タイムスタンプ)
  • フラグメント
    • 短時間のフレームをまとめたもので、固有の番号が割り当てられます。
  • フレーム
    • ビデオフレームは1つの静止画像のことで、連続して再生することで動画となります。音声フレームは、ある瞬間の音声データのことです。
    • フレームはさまざまなメディアデータを組み合わせてメディアストリームを形成する基本要素です。
  • プロデューサー
    • KVSに送信する側(今回の場合、Connectに該当)
  • コンシューマー
    • KVSから受信する側(今回の場合、Lambdaに該当)
    • KVSからチャンク単位で取得し、チャンクからフラグメントを取り出します。

下記にある通り、プロデューサー(Connectなど)からKVSにフラグメントを送信し、KVSからコンシューマー(Lambdaなど)はチャンクを取得します。

引用元

KVSからコンシューマーがチャンクを取得する場合、GetMediaもしくはGetMediaForFragmentListのAPIを利用する必要があります。APIの違いは次の章で解説します。

引用元

録音した音声とは別の音声原因

GetMediaを利用しメディアデータ取得する場合、KVSのARNと取得する開始チャンクのフラグメント番号を指定し、開始チャンクから全てのフラグメントを取得します。

GetMediaでは、終了位置を指定することができないため、プロデューサーから送信された別の音声も取得してしまい、録音した音声とは別の音声が入る現象が発生したと考えられます。

ではどのように取得するとよいかと言いますと、ListFragmentsGetMediaForFragmentListのAPIを利用します。

ListFragmentsは、開始と終了時のタイムスタンプ(フラグメント番号も可)で指定し、指定範囲内のフラグメントのリストを取得するAPIです。

GetMediaForFragmentListは、KVSに保存されたアーカイブデータから指定したフラグメントのメディアデータを取得するAPIです。フラグメント番号のリストを指定してリクエストします。

つまり、今回のケースでは、Connectから送信された必要なメディアデータのみを取得するには、ListFragmentsで範囲内のフラグメントのリストを取得し、GetMediaForFragmentListで範囲内のメディアデータのみを取得することで実現できます。

AWS Black Beltでは、GetMediaAPIはリアルタイム処理で利用し、バッチ処理では、ListFragmentsGetMediaForFragmentListAPIを利用するように記載がありました。

引用元

ConnectからKVSへの音声のストリーミングをする場合、バッチ処理でKVSから音声ファイルを取得します。

そのため、リアルタイム処理用途で利用されるGetMediaAPIを採用することがそもそもの間違えのようですね。

検証環境の構築

下記の記事では、ConnectからKVSに送信した音声データをLambdaで取得し、S3バケットに保存する手順を記載しております。

Lambdaのコード以外は下記の記事の手順で環境を構築します。

Lambdaのコード

import boto3, io, struct,json
from ebmlite import loadSchema
from enum import Enum
from datetime import datetime, timedelta
from botocore.config import Config

JST_OFFSET = timedelta(hours=9)

class Mkv(Enum):
    SEGMENT = 0x18538067
    CLUSTER = 0x1F43B675
    SIMPLEBLOCK = 0xA3

class Ebml(Enum):
    EBML = 0x1A45DFA3

class KVSParser:
    def __init__(self, media_content):
        self.__stream = media_content["Payload"]
        self.__schema = loadSchema("matroska.xml")
        self.__buffer = bytearray()

    @property
    def fragments(self):
        return [fragment for chunk in self.__stream if (fragment := self.__parse(chunk))]

    def __parse(self, chunk):
        self.__buffer.extend(chunk)
        header_elements = [e for e in self.__schema.loads(self.__buffer) if e.id == Ebml.EBML.value]
        if header_elements:
            fragment_dom = self.__schema.loads(self.__buffer[:header_elements[0].offset])
            self.__buffer = self.__buffer[header_elements[0].offset:]
            return fragment_dom

def get_simple_blocks(media_content):
    parser = KVSParser(media_content)
    return [
        b.value for document in parser.fragments
        for b in next(filter(lambda c: c.id == Mkv.CLUSTER.value, next(filter(lambda s: s.id == Mkv.SEGMENT.value, document))))
        if b.id == Mkv.SIMPLEBLOCK.value
    ]

def create_audio_sample(simple_blocks, margin=4):
    total_length = sum(len(block) - margin for block in simple_blocks)
    combined_samples = bytearray(total_length)
    position = 0
    for block in simple_blocks:
        temp = block[margin:]
        combined_samples[position:position+len(temp)] = temp
        position += len(temp)
    return combined_samples

def convert_bytearray_to_wav(samples):
    length = len(samples)
    channel = 1
    bit_par_sample = 16
    format_code = 1
    sample_rate = 8000
    header_size = 44
    wav = bytearray(header_size + length)

    wav[0:4] = b"RIFF"
    wav[4:8] = struct.pack("<I", 36 + length)
    wav[8:12] = b"WAVE"
    wav[12:16] = b"fmt "
    wav[16:20] = struct.pack("<I", 16)
    wav[20:22] = struct.pack("<H", format_code)
    wav[22:24] = struct.pack("<H", channel)
    wav[24:28] = struct.pack("<I", sample_rate)
    wav[28:32] = struct.pack("<I", sample_rate * channel * bit_par_sample // 8)
    wav[32:34] = struct.pack("<H", channel * bit_par_sample // 8)
    wav[34:36] = struct.pack("<H", bit_par_sample)
    wav[36:40] = b"data"
    wav[40:44] = struct.pack("<I", length)
    wav[44:] = samples
    return wav

def create_archive_media_client(ep):
    region_name = "ap-northeast-1"
    return boto3.client("kinesis-video-archived-media", endpoint_url=ep, config=Config(region_name=region_name))

def upload_audio_to_s3(bucket_name, audio_data, filename):
    s3_client = boto3.client("s3")
    s3_client.upload_fileobj(io.BytesIO(audio_data), bucket_name, filename, ExtraArgs={"ContentType": "audio/wav"})

def get_media_data(arn, start_timestamp, end_timestamp):
    kvs_client = boto3.client("kinesisvideo")
    list_frags_ep = kvs_client.get_data_endpoint(StreamARN=arn, APIName="LIST_FRAGMENTS")["DataEndpoint"]
    list_frags_client = create_archive_media_client(list_frags_ep)

    fragment_list = list_frags_client.list_fragments(
        StreamARN=arn,
        FragmentSelector={
            "FragmentSelectorType": "PRODUCER_TIMESTAMP",
            "TimestampRange": {"StartTimestamp": start_timestamp, "EndTimestamp": end_timestamp}
        }
    )

    sorted_fragments = sorted(fragment_list["Fragments"], key=lambda fragment: fragment["ProducerTimestamp"])
    fragment_number_array = [fragment["FragmentNumber"] for fragment in sorted_fragments]

    get_media_ep = kvs_client.get_data_endpoint(StreamARN=arn, APIName="GET_MEDIA_FOR_FRAGMENT_LIST")["DataEndpoint"]
    get_media_client = create_archive_media_client(get_media_ep)

    media = get_media_client.get_media_for_fragment_list(StreamARN=arn, Fragments=fragment_number_array)
    return media

def transcribe_audio(bucket_name, filename, job_name, vocabulary_name):
    transcribe_client = boto3.client("transcribe")
    transcribe_client.start_transcription_job(
        TranscriptionJobName=job_name,
        Media={'MediaFileUri': f"s3://{bucket_name}/{filename}"},
        MediaFormat='wav',
        LanguageCode='ja-JP',
        Settings={'VocabularyName': vocabulary_name}
    )
    return job_name

def convert_ms_to_datetime(timestamp_ms_str, add_seconds=1):
    # 音声の尻切れのため、add_seconds
    timestamp_seconds = float(timestamp_ms_str) / 1000 + add_seconds
    return datetime.utcfromtimestamp(timestamp_seconds)

def lambda_handler(event, context):
    print('Received event:' + json.dumps(event, ensure_ascii=False))
    media_streams = event["Details"]["ContactData"]["MediaStreams"]["Customer"]["Audio"]
    stream_arn = media_streams["StreamARN"]

    start_timestamp = convert_ms_to_datetime(media_streams["StartTimestamp"])
    end_timestamp = convert_ms_to_datetime(media_streams["StopTimestamp"])

    combined_samples = create_audio_sample(
        get_simple_blocks(get_media_data(stream_arn, start_timestamp, end_timestamp)))

    wav_audio = convert_bytearray_to_wav(combined_samples)

    bucket_name = "バケット名"
    jst_time = datetime.utcnow() + JST_OFFSET
    filename = f"output_{jst_time.strftime('%Y%m%d_%H%M%S')}.wav"

    upload_audio_to_s3(bucket_name, wav_audio, filename)

    return {
        "statusCode": 200,
        "body": "Audio uploaded successfully!"
    }

S3のバケット名は、各自変更下さい。

関数get_media_dataの処理の流れを解説します。

  1. GetDataEndpointを利用
    • KVSのARNから、ListFragments APIの呼び出しに必要なエンドポイントを取得します
  2. ListFragmentsを利用
    • KVSのARNと開始と終了のタイムスタンプから、指定したタイムスタンプ範囲に該当するフラグメントのリストを取得します
  3. sortedを利用
    • ListFragmentsで取得されるフラグメントは順不同なのでProducerTimestampで昇順に並び替えます
      • ProducerTimestamp は、KVSのフラグメントに関するメタデータの一部で、プロデューサーによってフラグメントが生成された時点のタイムスタンプを指します。 引用元
  4. GetDataEndpointを利用
    • KVSのARNから、GetMediaForFragmentList APIの呼び出しに必要なエンドポイントを取得します
  5. GetMediaForFragmentListを利用
    • KVSのARNとフラグメントリストから、メディアデータを取得します

メディアデータから音声データを抽出し、WAV形式に変換するコードについては、先程の記事に解説を載せていますので、ご参考ください。

Lambdaの実行時間

ちなみに、Connectで10秒程度を録音した際、KVSからLambdaで音声データを取得し、WAV形式でS3にファイルをアップロードする処理時間は、GetMediaAPIでは7秒、GetMediaForFragmentListAPIでは4秒程度だったため、処理時間も短くなりました。

短くなることで、何がうれしいかというと、ConnectフローからLambdaを呼び出す際、最大8秒でタイムアウトになってしまいます。

GetMediaForFragmentListを利用すると、4秒程度だったため、タイムアウトする可能性が低くなり、Lambdaからのレスポンス内容をフローで利用することができます。

最後に

今回の記事では、ConnectからKVSに送信したメディアデータから音声データを取得する場合、GetMediaForFragmentListを利用すべき理由とPythonのコードについて解説しました。

GetMediaAPIはリアルタイム処理に適しており、一方、バッチ処理のケースではGetMediaForFragmentListAPIを利用することが適切だと分かりました。

この情報が参考になれば幸いです。

参考