ちょっと話題の記事

[新機能] Kinesis StreamsのデータをKinesis Firehoseへ直接配信できます

2017.08.19

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、菊池です。

2017/8/25追記:Kinesis Firehoseの東京リージョンでの提供が開始されました。

本日(日本時間:2017/8/19)の機能アップデートにより、Kinesis Streamsに配信されたデータを直接Kinesis Firehoseで読み取ることができるようになりました。

この機能統合により、Kinesis Streamsへ送信したデータを、S3やRedshiftへ簡単に保存しておくことが可能になります。

kinesis-streams-to-firehose-001

試してみた

Kinesis StreamsにPUTしたデータを、Firehoseを経由してS3に保存してみます。

Kinesis Streamの作成

まずはストリームの作成です。今回はオレゴンリージョンで試しました。

ストリーム名とシャード数を指定するのみです。テストなので1シャードで作成します。

kinesis-streams-to-firehose-002

ストリームが作成されました。ステータスが[Active]になれば利用可能です。

kinesis-streams-to-firehose-003

Firehose delivery streamの作成

続いで、デリバリーストリームの作成です。

作成画面で、[Choose source]に[Kinesis Stream]が選択可能になっています

kinesis-streams-to-firehose-004

SourceにKinesis streamをチェックすると、作成済みのストリームが選択できます。ここで指定可能なのは、同一リージョンに作成したストリームのみです。

kinesis-streams-to-firehose-005

Lambdaによる変換はなしで進みます。

kinesis-streams-to-firehose-006

データの送信先を選択します。今回はS3のバケットを選びました。

kinesis-streams-to-firehose-007

バッファーや圧縮の有無を指定します。デフォルトで進みました。

kinesis-streams-to-firehose-008

最後に、設定内容を確認して作成実行です。

kinesis-streams-to-firehose-009

作成されました。

kinesis-streams-to-firehose-010

データの送信

それでは、作成したストリームにAWS CLIを使ってデータをPUTしてみます。操作方法はチュートリアルが参考になります。

put-recordでデータを送信します。

$ aws --region us-west-2 kinesis put-record --stream-name test-stream --partition-key 123 --data testdata
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49576182398068203153242681558001998866810406977929216002"
}

PUTできました。問題なければ、そのままFirehoseに送信され、S3に保存されてるはずです。

kinesis-streams-to-firehose-011

ちゃんとFirehose経由でS3に保存されています。

では、Kinesis Streamのデータはどうなっているでしょうか。まずはget-shard-iteratorでシャードイテレータを取得します。

$ aws --region us-west-2 kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name test-stream
{
    "ShardIterator": "AAAAAAAAAAFM2q2P1MsnBKFhYXNTbQxzn7Twbrv4yUGCOHOGIAwJMFqo9pnn6hI3EOkhYvhrC1HLQX7i1i9nQKEAsaYEOH0SlGXKibVrPgKXfbNaFZo60jd8l6tlEiJIvwW2WmoWaOaFhlRIGtwVhM2kURlLq6Lcs4FnHC1xyYNOdDynv9z3oDu6/OKKVf79N56ZT4BiQR5hrs5S43kzgEIsUVYIwslN"
}

次に、シャードイテレータを指定して、get-recordsでデータを取得します。

$ aws --region us-west-2 kinesis get-records --shard-iterator AAAAAAAAAAFM2q2P1MsnBKFhYXNTbQxzn7Twbrv4yUGCOHOGIAwJMFqo9pnn6hI3EOkhYvhrC1HLQX7i1i9nQKEAsaYEOH0SlGXKibVrPgKXfbNaFZo60jd8l6tlEiJIvwW2WmoWaOaFhlRIGtwVhM2kURlLq6Lcs4FnHC1xyYNOdDynv9z3oDu6/OKKVf79N56ZT4BiQR5hrs5S43kzgEIsUVYIwslN
{
    "Records": [
        {
            "Data": "dGVzdGRhdGE=",
            "PartitionKey": "123",
            "ApproximateArrivalTimestamp": 1503104502.847,
            "SequenceNumber": "49576182398068203153242681558001998866810406977929216002"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAE1g98qSPTn+RErlm9mqhmvvBMDUFy3kPr+gmI0ZlKRjk8l38RjBvb17GCA0D9msV3msIfNVSlORUDXINoA8miuMWJR7ngp+XtU1HhJJv9Be9h8ieDv6DtG27YkQgwCCWBs1JmAlB/CzauzRRhgvHiHExswsfnJ1JSREiON5AakOCiy+/0o9JvRj/ce9WDoqsjMjgrUh3uJsyx1MhVHRC8G",
    "MillisBehindLatest": 0
}

データが取得できています。(DateはBase64エンコードされています)

Kinesis FirehoseでS3に送信すると同時に、並行してそのデータをKinesisアプリケーション(コンシューマ)でデータを処理することができます。

最後に

以上です。

Kinesis StreamsとKinesis Firehoseの連携が、簡単な設定のみで利用できるようになりました。Kinesis Streamsで処理しているデータのオリジナルを、FIrehoseでS3に保存しておく、といったことがサックっと実現できます。

ただ、連携できるのはFIrehoseと同じリージョンにあるStreamとなります。東京リージョンでのFirehose提供が待ち遠しいですね。(2017/8/25追記)東京リージョンでもFirehoseの提供が開始されましたので、既存の東京リージョンのStreamと連携可能になりました!