ちょっと話題の記事

[Kinesis Video Streams] ストリーム上のフラグメントデータから、任意の時間帯を指定して動画を取得してみました。

2019.05.26

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

1 はじめに

AIソリューション部の平内(SIN)です。

Amazon Kinesis Video Streams(以下、Kinesis Video Streams) を使用すると、各種デバイスからAWSへ動画を簡単にストリーミングできます。

アップロードされた動画は、一定の期間、ストリーム上でアーカイブされますが、今回は、このデータから特定の時間帯のものを切り取って取得する要領を確認してみました。

2 アーカイブ

動画は、そもそも連続した画像(フレーム)が集まったものですが、プロデューサー(送信側デバイス)からは、一定時間ごとのフレームをまとめたフラグメントという形でKinesis Video Streamsへ送信されます。

個々のフラグメントは、他のフラグメントとの依存関係はなく、自己完結型でストリーム上に溜まって行きます。

溜まっているデータは、データ保持期間で指定された時間、アーカイブとして保持され、指定時間が過ぎたフラグメントから順次破棄されます。

コンシューマー(受信側)は、ストリーム上に溜まっているフラグメントを列挙することができ、また、そのフラグメントの番号を指定することで、データを取得する事ができます。


Kinesis ビデオストリーム API およびプロデューサーライブラリのサポート より

3 ListFragments

Kinesis Video Streamsのストリームに保持( アーカイブ)されているフラグメントを列挙するには、ListFragmentsを使用します。
参考:ListFragments

列挙された一覧には、個々のフラグメントのタイムスタンプや、識別子(フラグメント番号)、そのフラグメントが保持する動画の時間などが含まれています。

下記は、アーカイブ上のフラグメントを列挙しているコードです。

listFragments()をメソッドに持つAWS.KinesisVideoArchivedMediaは、最初に、getDataEndpointで取得したエンドポイントで初期化する必要があります。

const AWS = require('aws-sdk');
const kinesisvideo = new AWS.KinesisVideo();

async function listFragments(streamName, fragmentSelectorType, startTimestamp, endTimestamp) {
    // EndPointで初期化して、KinesisVideoArchivedMediaを生成する
    var params = {
        APIName: "LIST_FRAGMENTS",
        StreamName: streamName
    };
    const e = await kinesisvideo.getDataEndpoint(params).promise();
    const kinesisvideoarchivedmedia = new AWS.KinesisVideoArchivedMedia({endpoint: e.DataEndpoint});

    const maxResults = 100; // 指定可能な数字は1000まで、これを超えた場合は、NextTokenで取得する
    let fragments = [];
    while(true){
        var nextToken;

        // listFragmentsのパラメータの生成(2回目以降は変換する)
        var params = { StreamName: streamName }
        if (nextToken) {
            params.NextToken = nextToken
        } else {
            params.FragmentSelector =  {
                FragmentSelectorType: fragmentSelectorType,
                TimestampRange: { 
                    EndTimestamp: endTimestamp, 
                    StartTimestamp: startTimestamp
                }
            }
            params.MaxResults =  maxResults;
        }

        // listFragmentsでフラグメント情報を取得する
        const data = await kinesisvideoarchivedmedia.listFragments(params).promise();
        data.Fragments.forEach( fragment => fragments.push(fragment) )

        // 次のデータがない場合は、列挙を終了する
        if(data.NextToken == null) {
            break;
        }
        nextToken = data.NextToken;
    }

    // 配列は、古い順にソートする
    return  fragments.sort((a,b)=> {
        if( a.ProducerTimestamp < b.ProducerTimestamp ) return -1;
        if( a.ProducerTimestamp > b.ProducerTimestamp ) return 1;
        return 0;
    })
}

async function job() {
    const streamName = 'TestStream';
    const fragmentSelectorType = "PRODUCER_TIMESTAMP";
    const startTimestamp = 0; // データの最初から取得する
    const endTimestamp = new Date(); // 現在の時間まで

    // フラグメントの列挙
    let fragments = await listFragments(streamName, fragmentSelectorType, startTimestamp, endTimestamp);

    //動画の全時間
    var total = fragments.map( f => f.FragmentLengthInMilliseconds).reduce( (p, c) => p + c);

    console.log("フラグメント数: " , fragments.length);
    console.log("開始時間: " , fragments[0].ProducerTimestamp.toLocaleString());
    console.log("終了時間: " , fragments[fragments.length - 1].ProducerTimestamp.toLocaleString());
    console.log("動画の時間(msec): " , total);
}

job();

上記のコードでデータを列挙した例は、以下のとおりです。

フラグメント数:  223
開始時間:  2019-5-26 09:37:52
終了時間:  2019-5-26 09:41:33
動画の時間(msec):  206379

listFragments()では、列挙の対象を、開始時間と終了時間で指定することになりますが、その指定例は、以下のような感じになります。

// データの最初が取得する場合
endTimestamp = new Date();
startTimestamp = 0;

// 今から15分前以降のデータを取得する場合
endTimestamp = new Date();
startTimestamp = new Date(endTimestamp.getTime() - 60*15*1000);

// 特定の日時 2019/4/1 9:40:00
startTime = new Date(2019, 4 - 1, 1, 9, 40, 0);

4 GetMediaForFragmentList

Kinesis Video Streamsのストリームに保持( アーカイブ)されているフラグメントを取得するには、GetMediaForFragmentListを使用します。
参考:GetMediaForFragmentList

GetMediaForFragmentListのパラメータには、フラグメント番号の配列が必要ですが、これには、先のListFragmentsで取得したものが利用可能です。

下記は、アーカイブ上のデータを取得しているコードです。

getMediaForFragmentList()をメソッドに持つAWS.KinesisVideoArchivedMediaは、最初に、getDataEndpointで取得したエンドポイントで初期化する必要があります。

const AWS = require('aws-sdk');
const kinesisvideo = new AWS.KinesisVideo();

async function listFragments(streamName, fragmentSelectorType, startTimestamp, endTimestamp) {
   // 上記と同じであるため、省略します
}

async function getMediaForFragmentList(streamName, fragmentNumberList) {
    // EndPointで初期化して、KinesisVideoArchivedMediaを生成する
    var params = {
        APIName: "GET_MEDIA_FOR_FRAGMENT_LIST",
        StreamName: streamName
    };
    const e = await kinesisvideo.getDataEndpoint(params).promise();
    const kinesisvideoarchivedmedia = new AWS.KinesisVideoArchivedMedia({endpoint: e.DataEndpoint});

    var params = { 
        StreamName: streamName,
        Fragments: fragmentNumberList
    }

    return await kinesisvideoarchivedmedia.getMediaForFragmentList(params).promise();
}

async function job() {
    const streamName = 'TestStream';
    const fragmentSelectorType = "PRODUCER_TIMESTAMP";
    const startTimestamp = 0; // データの最初から取得する
    const endTimestamp = new Date(); // 現在の時間まで

    // フラグメントの列挙
    let fragments = await listFragments(streamName, fragmentSelectorType, startTimestamp, endTimestamp);

    //動画の全時間
    const total = fragments.map( f => f.FragmentLengthInMilliseconds).reduce( (p, c) => p + c);

    console.log("フラグメント数: " , fragments.length);
    console.log("開始時間: " , fragments[0].ProducerTimestamp.toLocaleString());
    console.log("終了時間: " , fragments[fragments.length - 1].ProducerTimestamp.toLocaleString());
    console.log("動画の時間(msec): " , total);

    const fragmentList = fragments.map( f => f.FragmentNumber);
    const data = await getMediaForFragmentList(streamName, fragmentList);

    const filePath = "./out.webm"
    var fs = require('fs');
    console.log(data);
    fs.writeFileSync(filePath, data.Payload);
    var x = 0;

}

job();

上記のコードでデータを取得している例は、以下のとおりです。

フラグメント数:  223
開始時間:  2019-5-26 09:37:52
終了時間:  2019-5-26 09:41:33
動画の時間(msec):  206379
{ ContentType: 'video/webm',
  Payload: <Buffer 1a 45 df a3 a3 42 86 81 01 42 f7 81 01 42 f2 81 04 42 f3 81 08 42 82 88 6d 61 74 72 6f 73 6b 61 42 87 81 02 42 85 81 02 18 53 80 67 01 ff ff ff ff ff ... > }

取得したデータは、Matroska (MKV) 仕様となっています。chromeは、この形式のファイルの再生が可能ですので、保存したファイルをドロップするだけで表示が可能です。 (.mp4 .movなどに変換したい場合は、ffmpegが便利だと思います。)

5 参考:Dockerのパーサーで動画を配信する

今回、テストデータは、パーサーのビルドが完了している、Dockerイメージを使用しました。
[Amazon Kinesis Video Streams] DockerイメージのプロデューサーライブラリでRTSPサーバの動画を配信してみました。

Amazonから提供されているパーサーライブラリは、コンパイルの必要があるので、Macから手軽に動画を送るには、この方法が一番手っ取り早いと思います。

$ export AWS_DEFAULT_REGION=ap-northeast-1
$ AWS_ACCESS_KEY_ID=xxx AWS_SECRET_ACCESS_KEY=xxx ./kinesis_video_gstreamer_sample_app TestStream rtsp://192.168.0.105/

6 最後に

今回は、Kinesis Video Streamsのアーカイブから任意のデータを取得する方法について確認してみました。

アーカイブ上のフラグメントデータの状況と、取得できるチャンク形式を把握することで、色々応用できそうです。

7 参考リンク


20180328 AWS Black Belt Online Seminar Amazon Kinesis Video Streams