【Kinesis Video Streams】Pythonで動画ファイルを送信する

2021.04.27

カフェチームの山本です。

現在、カフェのシステムでは、機械学習を用いて、商品の前にいる人物の骨格や手を検出することで、どの商品を取り出したかを判定しています。そのために、骨格検出用のモデルを用いてエッジデバイスで推論処理(撮影した画像から映っている人物の骨格の座標を検出する処理)を実行しています。ただ、エッジデバイスが高性能なものを使用する必要がありコストが高い、今後機能を追加しようとすると性能の限界がある、という問題があります。

今後、エッジデバイスのコストを下げたり、さらに機能を追加するために処理を増やしたりするためには、エッジ側では動画送信のみを行い、クラウド側で推論処理をできるようにする方が良さそうです。また、常に商品の前にユーザが居るわけではないので、コストを抑えるために、必要に応じて動画送信のON/OFFを切り替えられるようにすると良さそうです。

今回は、柔軟に送信をON/OFFするための方法として、Pythonで動画を送信する処理を実装したので、その内容を共有します。

既存手法の課題

今回、動画の送信先はKinesis Video Streams(以下、KVS)にしました。

少し調べてみると、KVSに動画を送信する方法として、kvssink(gstreamer)を使った方法が検索でよく引っかかります。

https://docs.aws.amazon.com/ja_jp/kinesisvideostreams/latest/dg/examples-gstreamer-plugin-parameters.html

(詳しい使い方・コマンドなどは、SINさんが記事にまとめていますので、ご参照ください。)

https://dev.classmethod.jp/articles/kinesis-video-streams-jetson-nano/

https://dev.classmethod.jp/articles/kinesis-video-streams-gstreamer-raspberrypi/

https://dev.classmethod.jp/articles/kinesis-video-streams-jetson-nano-realsence/

kvssinkによって、Kinesis Video Streamsに動画を送信することができます。ただ、kvssinkは継続的に送信を行うように設計されているため、必要に応じてON/OFFすることが難しいです。例えば、プロセスの起動・停止を繰り返してみると、プロセスを起動してから7秒以上かかってから動画送信が開始される場合がありました。(そうすると、撮影範囲にユーザが来たことを検知してからプロセスを開始した場合、最初の7秒間は動画を送信できないという問題が起こります。そのため、その間骨格検出を使った判定ができない可能性が出てしまいます)。

解決方法(Pythonで実装したコード)

そこで、今回はPython(boto3)を使って実装しました。

おおよその部分は、以下のページに掲載されているコードを参考にしました。

https://stackoverflow.com/questions/66023168/how-to-send-live-video-captured-from-opencv-in-django-to-aws-kinesis-video-strea

https://coderoad.ru/59481174/Amazon-AWS-Kinesis-Video-Boto-GetMedia-PutMedia

実装したコードは、以下の通りです。おおよそは上記のコードを整理したものですが、以下の点を修正しました。

  • create_authorized_headers関数内、headersのx-amzn-fragment-timecode-typeを、"ABSOLUTE"から"RELATIVE"に変更しました。RELATIVEを指定することで、送信した動画のプロデューサータイムスタンプが、x-amzn-producer-start-timestampで指定した値(start_tmstp)から始まって、動画の長さに応じた値になります。
import boto3
import hashlib
import hmac
import requests

import datetime

def get_host_from_endpoint(endpoint):
    # u'https://s-123abc78.kinesisvideo.us-east-2.amazonaws.com'
    if not endpoint.startswith('https://'):
        return None
    retv = endpoint[len('https://'):]
    return str(retv)

def get_region_from_endpoint(endpoint):
    # u'https://s-123abc78.kinesisvideo.us-east-2.amazonaws.com'
    if not endpoint.startswith('https://'):
        return None
    retv = endpoint[len('https://'):].split('.')[2]
    return str(retv)

def get_endpoint(secret_key, access_key, region, stream_name):
    client = boto3.client(
        'kinesisvideo',
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_key,
        region_name=region
    )
    response = client.get_data_endpoint(
        StreamName=stream_name,
        APIName='PUT_MEDIA'
    )
    endpoint = response.get('DataEndpoint', None)
    if endpoint is None:
        raise Exception("endpoint none")

    host = get_host_from_endpoint(endpoint)
    region = get_region_from_endpoint(endpoint)

    return endpoint, host, region

def sign(key, msg):
    return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()

def get_signature_key(key, date_stamp, regionName, serviceName):
    kDate = sign(('AWS4' + key).encode('utf-8'), date_stamp)
    kRegion = sign(kDate, regionName)
    kService = sign(kRegion, serviceName)
    kSigning = sign(kService, 'aws4_request')
    return kSigning

def create_canonical_request(host, amz_date, stream_name, producer_timestamp_start):
    start_tmstp = repr(producer_timestamp_start)
    # ************* TASK 1: CREATE A CANONICAL REQUEST *************
    # http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html

    # Step 1 is to define the verb (GET, POST, etc.)--already done.

    # Step 2: Create canonical URI--the part of the URI from domain to query
    # string (use '/' if no path)
    ##canonical_uri = '/'
    canonical_uri = '/putMedia'  # endpoint[len('https://'):]

    ## Step 3: Create the canonical query string. In this example, request
    # parameters are passed in the body of the request and the query string
    # is blank.
    canonical_querystring = ''

    # Step 4: Create the canonical headers. Header names must be trimmed
    # and lowercase, and sorted in code point order from low to high.
    # Note that there is a trailing \n.
    # 'host:' + host + '\n' +
    canonical_headers = ''
    # canonical_headers += 'Accept: */*\r\n'
    canonical_headers += 'connection:keep-alive\n'
    canonical_headers += 'content-type:application/json\n'
    canonical_headers += 'host:' + host + '\n'
    canonical_headers += 'transfer-encoding:chunked\n'
    # canonical_headers += 'x-amz-content-sha256: ' + 'UNSIGNED-PAYLOAD' + '\r\n'
    canonical_headers += 'user-agent:AWS-SDK-KVS/2.0.2 GCC/7.4.0 Linux/4.15.0-46-generic x86_64\n'
    canonical_headers += 'x-amz-date:' + amz_date + '\n'
    canonical_headers += 'x-amzn-fragment-acknowledgment-required:1\n'
    canonical_headers += 'x-amzn-fragment-timecode-type:' + 'RELATIVE' + '\n'
    canonical_headers += 'x-amzn-producer-start-timestamp:' + start_tmstp + '\n'
    canonical_headers += 'x-amzn-stream-name:' + stream_name + '\n'

    # Step 5: Create the list of signed headers. This lists the headers
    # in the canonical_headers list, delimited with ";" and in alpha order.
    # Note: The request can include any headers; canonical_headers and
    # signed_headers include those that you want to be included in the
    # hash of the request. "Host" and "x-amz-date" are always required.
    # For DynamoDB, content-type and x-amz-target are also required.
    #
    # in original sample  after x-amz-date :  + 'x-amz-target;'
    signed_headers = 'connection;content-type;host;transfer-encoding;user-agent;'
    signed_headers += 'x-amz-date;x-amzn-fragment-acknowledgment-required;'
    signed_headers += 'x-amzn-fragment-timecode-type;x-amzn-producer-start-timestamp;x-amzn-stream-name'

    # Step 6: Create payload hash. In this example, the payload (body of
    # the request) contains the request parameters.

    # Step 7: Combine elements to create canonical request
    method = 'POST'
    canonical_request = method + '\n' + canonical_uri + '\n' + canonical_querystring + '\n' + canonical_headers + '\n' + signed_headers
    canonical_request += '\n'
    canonical_request += hashlib.sha256(''.encode('utf-8')).hexdigest()

    return canonical_request, signed_headers

def create_string_to_sign(date_stamp, region, service, amz_date, canonical_request):
    # ************* TASK 2: CREATE THE STRING TO SIGN*************
    # Match the algorithm to the hashing algorithm you use, either SHA-1 or
    # SHA-256 (recommended)
    algorithm = 'AWS4-HMAC-SHA256'
    credential_scope = date_stamp + '/' + region + '/' + service + '/' + 'aws4_request'
    string_to_sign = algorithm + '\n' + amz_date + '\n' + credential_scope + '\n' + hashlib.sha256(
        canonical_request.encode('utf-8')).hexdigest()

    return string_to_sign, algorithm, credential_scope

def create_authorized_headers(
    producer_timestamp_start, datetime_now,
    access_key, secret_key, region, stream_name, host
):
    service = 'kinesisvideo'

    start_tmstp = repr(producer_timestamp_start)
    amz_date = datetime_now.strftime('%Y%m%dT%H%M%SZ')
    date_stamp = datetime_now.strftime('%Y%m%d')  # Date w/o time, used in credential scope

    # Create a date for headers and the credential string
    canonical_request, signed_headers = create_canonical_request(host, amz_date, stream_name, producer_timestamp_start)
    string_to_sign, algorithm, credential_scope = create_string_to_sign(date_stamp, region, service, amz_date, canonical_request)

    # ************* TASK 3: CALCULATE THE SIGNATURE *************
    # Create the signing key using the function defined above.
    signing_key = get_signature_key(secret_key, date_stamp, region, service)

    # Sign the string_to_sign using the signing_key
    signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()

    # ************* TASK 4: ADD SIGNING INFORMATION TO THE REQUEST *************
    # Put the signature information in a header named Authorization.
    authorization_header = algorithm + ' ' + 'Credential=' + access_key + '/' + credential_scope + ', '
    authorization_header += 'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature

    # # Python note: The 'host' header is added automatically by the Python 'requests' library.
    headers = {
        'Accept': '*/*',
        'Authorization': authorization_header,
        'connection': 'keep-alive',
        'content-type': 'application/json', # For DynamoDB, the content is JSON.
        # 'host': host,
        'transfer-encoding': 'chunked',
        # 'x-amz-content-sha256': 'UNSIGNED-PAYLOAD',
        'user-agent': 'AWS-SDK-KVS/2.0.2 GCC/7.4.0 Linux/4.15.0-46-generic x86_64',
        'x-amz-date': amz_date,
        'x-amzn-fragment-acknowledgment-required': '1',
        'x-amzn-fragment-timecode-type': 'RELATIVE',
        'x-amzn-producer-start-timestamp': start_tmstp,
        'x-amzn-stream-name': stream_name,
        'Expect': '100-continue'
    }
    return headers

def put_media(data, headers, endpoint):
    # print('\nBEGIN REQUEST++++++++++++++++++++++++++++++++++++')
    # print('Request URL = ' + endpoint)
    r = requests.post(endpoint + '/putMedia', data=data, headers=headers)
    # print('\nRESPONSE++++++++++++++++++++++++++++++++++++')
    # print('Response code: %d\n' % r.status_code)

    return r

class gen_request_parameters:
    def __init__(self, filepath):
        # filepath = 'big-buck-bunny_trailer.webm' # error fragment duration over limit
        with open(filepath, 'rb') as image:
            self._data = image.read()
        self._pointer = 0
        self._size = len(self._data)

    def __iter__(self):
        return self

    def __next__(self):
        if self._pointer >= self._size:
            raise StopIteration  # signals "the end"
        left = self._size - self._pointer
        chunksz = 16000
        if left < chunksz:
            chunksz = left
        pointer_start = self._pointer
        self._pointer += chunksz
        # print("Data: chunk size %d" % chunksz)
        return self._data[pointer_start:self._pointer]

class KVSSender():
    def __init__(self, access_key, secret_key, region, stream_name):
        print(stream_name)
        self._access_key = access_key
        self._secret_key = secret_key
        self._region = region
        self._stream_name = stream_name

        endpoint, endpoint_host, endpoint_region = get_endpoint(secret_key, access_key, region, stream_name)
        self._endpoint = endpoint
        self._endpoint_host = endpoint_host
        self._endpoint_region = endpoint_region

    def put_media(self, data, producer_timestamp_start, show_log=True):
        import time
        # if show_log:
        #     print(producer_timestamp_start)
        time_start = time.time()
        datetime_now = datetime.datetime.utcnow()
        headers = create_authorized_headers(
            producer_timestamp_start, datetime_now,
            self._access_key, self._secret_key, self._endpoint_region,
            self._stream_name, self._endpoint_host
        )
        time_end = time.time()
        proc_time = time_end - time_start
        # if show_log:
        #     print(f"create_ah {producer_timestamp_start} {proc_time:0.1f}")

        time_start = time.time()
        if show_log:
            print(f"put_media {time_start:.1f} {producer_timestamp_start} start")
        r = put_media(data, headers, self._endpoint)
        time_end = time.time()
        proc_time = time_end - time_start
        if show_log:
            print(f"put_media {time_end:.1f} {producer_timestamp_start} end   {proc_time:0.1f}", r, "****************" if proc_time > 5.0 else "")

        return r

############################### (以下、実行処理) ####################################

ACCESS_KEY = 'xxxxxxxxxxxxxxxxxxxx'
SECRET_KEY = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
REGION = 'ap-northeast-1'
STREAM_NAME = 'python_test'
VIDEO_FILEPATH = 'sample_video.mkv' 
T_INTERVAL = 4  

def send_one_time():
    import time
    proc_time_start = time.time()

    kvs_sender = KVSSender(ACCESS_KEY, SECRET_KEY, REGION, STREAM_NAME)
    data = gen_request_parameters(VIDEO_FILEPATH)
    producer_timestamp_start = int(time.time()) - 10
    r = kvs_sender.put_media(data, producer_timestamp_start)
    print(r)

    proc_time_end = time.time()
    proc_time = proc_time_end - proc_time_start
    print(f"proc_time {proc_time:0.1f}")

    print(r)
    print(r.text)
    print(datetime.datetime.fromtimestamp(producer_timestamp_start))

def send_multi_times_with_threading(do_threading=True):
    import threading
    import time
    import sys
    STREAM_NAME = sys.argv[2] if len(sys.argv) >= 3 else STREAM_NAME

    T_SEND = 180  # [sec]
    N_REPEAT = int(T_SEND / T_INTERVAL)  # [times]
    producer_timestamp_start = int(time.time())
    kvs_sender = KVSSender(ACCESS_KEY, SECRET_KEY, REGION, STREAM_NAME)

    i_repeat = 0
    procducer_timestamp_start_ = producer_timestamp_start
    while True:
        proc_time_start = time.time()

        data = gen_request_parameters(VIDEO_FILEPATH)

        if do_threading:
            thread = threading.Thread(target=kvs_sender.put_media, args=[data, procducer_timestamp_start_])
            thread.start()
        else:
            kvs_sender.put_media(data, procducer_timestamp_start_)

        proc_time_end = time.time()
        proc_time = proc_time_end - proc_time_start

        diff = T_INTERVAL - proc_time
        if diff >= 0:
            time.sleep(diff)
        else:
            print(f"WARNING: {proc_time:0.1f} > {T_INTERVAL:0.1f}")

        i_repeat += 1
        if i_repeat >= N_REPEAT:
            break

        procducer_timestamp_start_ += T_INTERVAL

        print("----------------------------------------------------------------------------------------")

# Kinesis Video StreamsのAPIの仕様上、送信対象の動画ファイルは、以下のようになっている必要ある
# ・エンコーディング   : 動画:H264・音声:AAC
# ・ファイル種類(拡張子): .mkv
if __name__ == "__main__":
    # send_one_time()
    send_multi_times_with_threading()

使い方

これを以下のように実行します。send_one_timeは、1回だけ動画ファイルを送信します。send_multi_times_with_threadingは、同じファイルを繰り返し3分間送信します。Kinesis Video Streams上では同じシーンが繰り返されてつながった動画になります。(送信処理(kvs_sender.put_media)にはおおよそ3秒程度かかるので、threadingで処理しています)

上記を利用する際は、以下のようにしてください。

手順1:Kinesis Video Streamsのストリームを作成する

お持ちのAWSアカウントで、Kinesis Video Streamsのstreamを作成します。

マネジメントコンソールから操作する場合は以下の手順で作成できます。

  • Kinesis Video Streamsのページにアクセスします

    以下のような画面が表示された場合は、右の「ビデオストリーム」を選択して「作成」をクリックします

    以下のような画面が表示された場合は、中央の「ビデオストリームの作成」をクリックします

  • ストリームを作成します

    「ビデオストリーム名」に付けたいストリーム名を入力し、「ビデオストリームを作成」をクリックします。

スクリプト内の、REGIONを作業したリージョン名に、STREAM_NAMEを入力したストリーム名に、書き換えてください。

手順2:IAMユーザを作成する

手順1で作成したstreamへのアクセス権限のあるIAMユーザを作成します。

マネジメントコンソールから操作する場合は、以下の手順で作成できます。

  • IAMのページにアクセスします。左の「ユーザー」をクリックします。

  • 「ユーザを追加する」をクリックします。

  • 「ユーザー名」に付けたいユーザ名を入力し、「プログラムによるアクセス」をチェックします。

  • 「既存のポリシー…」を選択し、「ポリシーのフィルタ」にkinesisvideoと入力し、表示される「AmazonKinesisVideoStreamsFullAccess」をチェックします。(権限を小さくしたい場合は、「ポリシーを作成」作成をクリックし、別途作成して付与してください。)

  • 以降は、特に操作せずに進んでいき「ユーザーの作成」で作成します。作成されたユーザを選択します。

  • 「認証情報」を選択し、「アクセスキーの作成」をクリックします。

  • 表示されたアクセスキーIDとシークレットアクセスキーでACCESS_KEYとSECRET_KEYを書き換えてください。(認証情報を記載したスクリプトファイルを公開しないようにご注意ください)

手順3:動画を用意する

Kinesis Video Streamsのput_media APIの仕様に合わせるため、ビデオをH264形式でエンコードした(音声を含む場合は、音声をAACでエンコードした)mkv形式の動画ファイルを用意します。

例えば、FFmpegを使用する場合、以下のようなコマンドで動画を用意できます。このコマンドでは、original.mp4というファイルを入力とし、動画の0秒から開始して4秒間を切り出し、ビデオをH264形式で、音声をaac形式でそれぞれエンコードするという処理が実行されます(-vb 800kはビデオのビットレートを800kbpsでエンコードすることを指定します。不要であれば削除したり、必要に応じて値を変更したりしてください)

ffmpeg -ss 0 -t 4 -i original.mp4 -c:v libx264 -vb 800k -acodec aac sample_video.mkv

スクリプト内の、VIDEO_FILEPATHをそのファイルへのパスに、T_INTERVALをその動画ファイルの長さ(秒数)に、それぞれ書き換えます。

結果

上記のスクリプトを実行し、動画ファイルが送信されている様子を、Kinesis Video Streamsのコンソール画面で確認できました。

また、kvs_senderのput_media実行する部分を、適宜ON/OFFすることによって、動画の送信をON/OFFできました。これによって、送信のON/OFFを即時に行うことができるようになりました。

注意点

上記のコードを実行した際、put_media関数内のrequests.postが、通常3秒程度であるのが、8秒程度かかる場合がありました。Kinesis Video Streamsのコンソールで確認しても、それが起きたときにサーバータイムスタンプが前後しており、到着時刻が遅れているようでした。

発生する条件は以下の通りでした。

  • デバイス:Raspberry Pi 3 Model B+・Raspberry Pi 4 Model B 8GB
  • OS:Raspbian GNU/Linux 10 (buster)
  • Python:3.7.3
  • 発生頻度:
    • threading使用時:3回に1回程度
    • threading不使用時:40回に1回程度

他に、以下の環境で試しましたが、上記のような問題はおきませんでした。

  • デバイス:Surface Laptop 4、OS:Windows10
  • デバイス:Surface Laptop 4、OS:Ubuntu18.04(WSL2)
  • デバイス:Jetson Nano 4GB、OS:Ubuntu18.04

RasPiで動作させる場合は、動画の送信が遅れても問題ないか、要件を確認する必要がありそうです。

料金

Kinesis Video Streamsの利用料を見ると、以下の4種類が費用としてかかります。

(2021/06/01 東京リージョン)

  • ストリームに入力したデータ量  :0.01097[USD/GB]
  • ストリームから取り出したデータ量:0.01097[USD/GB]
  • ストリームに保存したデータ量  :0.02500[USD/GB・mon]
  • HLS を使用してストリームから取り出したしたデータ量:0.01536[USD/GB]

例えば、常時今回のように動画をアップロードし続け、それを別のコンシューマプログラムで1回取り出して処理するという場合を考えます。その際、ストリームで動画を1時間保持するという設定です。

  • 動画の送信時間  :60[sec/min] * 60[min/hour] * 24[hour/day] = 86400[sec/day]
  • 動画のビットレート:800[kb/s] = 100[kB/s] = 0.0001 [GB/s]
  • ストリーム保存時間:1[hour] = 1[hour] * 24[hour/day] / 30[day/month] = 0.00139[mon]
  • (動画のデータ量) :0.0001[GB/s] * 86400[sec/day] = 8.64[GB/day]

このときの料金は、以下のように計算できます。

  • ストリームに入力したデータ量  :0.01097[USD/GB] * 8.64[GB/day] = 0.0948[USD/day]
  • ストリームから取り出したデータ量:0.01097[USD/GB] * 8.64[GB/day] = 0.0948[USD/day]
  • ストリームに保存したデータ量  :0.02500[USD/GB・mon] * 8.64[GB/day] * 0.00139[mon] = 0.00029808[USD/day]
  • (合計):0.0948 + 0.0948 + 0.000298 = 0.1899[USD/day]

なので、常時送信しっぱなしで、1台1日あたり20円程度かかる、と計算できました。

(※ 料金はおおよその値です。正確な料金は異なる可能性があります。)

よって、例えばコンビニのような大きさの店舗で20台カメラを設置した場合、1月あたり12000円程度かかる計算になります。

少し料金としては高いので、節約する方法が必要そうです。 例えば、画面に人がいることを検知するといったことができれば、人がいないときは動画を送信せず、その分の利用料を節約できそうです。人がいる時間の割合が10%程度であれば、データ量が10%になるので、1200円程度に下げられます。

まとめ

Kinesis Video Streamesに動画ファイルを送信するために、Pythonでput_media APIを実行するコードを実装しました。これによって、エッジデバイスから動画を送信でき、送信のON/OFFも即時に行うことができるようになりました。

参考にさせていただいたページ・サイト

https://stackoverflow.com/questions/66023168/how-to-send-live-video-captured-from-opencv-in-django-to-aws-kinesis-video-strea

https://coderoad.ru/59481174/Amazon-AWS-Kinesis-Video-Boto-GetMedia-PutMedia