[新機能] Kinesis StreamsのデータをKinesis Firehoseへ直接配信できます
こんにちは、菊池です。
2017/8/25追記:Kinesis Firehoseの東京リージョンでの提供が開始されました。
本日(日本時間:2017/8/19)の機能アップデートにより、Kinesis Streamsに配信されたデータを直接Kinesis Firehoseで読み取ることができるようになりました。
この機能統合により、Kinesis Streamsへ送信したデータを、S3やRedshiftへ簡単に保存しておくことが可能になります。
試してみた
Kinesis StreamsにPUTしたデータを、Firehoseを経由してS3に保存してみます。
Kinesis Streamの作成
まずはストリームの作成です。今回はオレゴンリージョンで試しました。
ストリーム名とシャード数を指定するのみです。テストなので1シャードで作成します。
ストリームが作成されました。ステータスが[Active]になれば利用可能です。
Firehose delivery streamの作成
続いで、デリバリーストリームの作成です。
作成画面で、[Choose source]に[Kinesis Stream]が選択可能になっています。
SourceにKinesis streamをチェックすると、作成済みのストリームが選択できます。ここで指定可能なのは、同一リージョンに作成したストリームのみです。
Lambdaによる変換はなしで進みます。
データの送信先を選択します。今回はS3のバケットを選びました。
バッファーや圧縮の有無を指定します。デフォルトで進みました。
最後に、設定内容を確認して作成実行です。
作成されました。
データの送信
それでは、作成したストリームにAWS CLIを使ってデータをPUTしてみます。操作方法はチュートリアルが参考になります。
put-record
でデータを送信します。
$ aws --region us-west-2 kinesis put-record --stream-name test-stream --partition-key 123 --data testdata { "ShardId": "shardId-000000000000", "SequenceNumber": "49576182398068203153242681558001998866810406977929216002" }
PUTできました。問題なければ、そのままFirehoseに送信され、S3に保存されてるはずです。
ちゃんとFirehose経由でS3に保存されています。
では、Kinesis Streamのデータはどうなっているでしょうか。まずはget-shard-iterator
でシャードイテレータを取得します。
$ aws --region us-west-2 kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name test-stream { "ShardIterator": "AAAAAAAAAAFM2q2P1MsnBKFhYXNTbQxzn7Twbrv4yUGCOHOGIAwJMFqo9pnn6hI3EOkhYvhrC1HLQX7i1i9nQKEAsaYEOH0SlGXKibVrPgKXfbNaFZo60jd8l6tlEiJIvwW2WmoWaOaFhlRIGtwVhM2kURlLq6Lcs4FnHC1xyYNOdDynv9z3oDu6/OKKVf79N56ZT4BiQR5hrs5S43kzgEIsUVYIwslN" }
次に、シャードイテレータを指定して、get-records
でデータを取得します。
$ aws --region us-west-2 kinesis get-records --shard-iterator AAAAAAAAAAFM2q2P1MsnBKFhYXNTbQxzn7Twbrv4yUGCOHOGIAwJMFqo9pnn6hI3EOkhYvhrC1HLQX7i1i9nQKEAsaYEOH0SlGXKibVrPgKXfbNaFZo60jd8l6tlEiJIvwW2WmoWaOaFhlRIGtwVhM2kURlLq6Lcs4FnHC1xyYNOdDynv9z3oDu6/OKKVf79N56ZT4BiQR5hrs5S43kzgEIsUVYIwslN { "Records": [ { "Data": "dGVzdGRhdGE=", "PartitionKey": "123", "ApproximateArrivalTimestamp": 1503104502.847, "SequenceNumber": "49576182398068203153242681558001998866810406977929216002" } ], "NextShardIterator": "AAAAAAAAAAE1g98qSPTn+RErlm9mqhmvvBMDUFy3kPr+gmI0ZlKRjk8l38RjBvb17GCA0D9msV3msIfNVSlORUDXINoA8miuMWJR7ngp+XtU1HhJJv9Be9h8ieDv6DtG27YkQgwCCWBs1JmAlB/CzauzRRhgvHiHExswsfnJ1JSREiON5AakOCiy+/0o9JvRj/ce9WDoqsjMjgrUh3uJsyx1MhVHRC8G", "MillisBehindLatest": 0 }
データが取得できています。(Date
はBase64エンコードされています)
Kinesis FirehoseでS3に送信すると同時に、並行してそのデータをKinesisアプリケーション(コンシューマ)でデータを処理することができます。
最後に
以上です。
Kinesis StreamsとKinesis Firehoseの連携が、簡単な設定のみで利用できるようになりました。Kinesis Streamsで処理しているデータのオリジナルを、FIrehoseでS3に保存しておく、といったことがサックっと実現できます。
ただ、連携できるのはFIrehoseと同じリージョンにあるStreamとなります。東京リージョンでのFirehose提供が待ち遠しいですね。(2017/8/25追記)東京リージョンでもFirehoseの提供が開始されましたので、既存の東京リージョンのStreamと連携可能になりました!