Amazon Kinesis Firehoseに対する各操作をJavaから行う #reinvent

よく訓練されたアップル信者、都元です。re:Invent 2015が始まりましたね。日本時間の本日未明に発表されたKinesis Firehose。Firehoseというのは消火栓や消防車につなぐホースのことだと思います。

Firehoseと言われて思い出すのが、以前TwitterのFirehoseというのがありまして。世界中の全てのツイートを提供してもらえるサービス(そして超高価)だったと記憶しています。要するに、とてつもない流量を流しても大丈夫な何か、というイメージで、一般的にそういうものをFirehoseと呼んだりするのかな、なんて思った次第です。

さて、Firehoseの概要と、Firehose Agentを使った利用方法については、既に速報記事が出ておりますので、下記をご覧ください。

本稿では、AWS SDK for JavaのAPIを使ってFirehoseのDelivery Streamのライフサイクルを一通り呼び出してみました。

KinesisとKinesis Firehoseの関係

検証に先立って、まず理解しなければならないことは「KinesisのStream」と「Kinesis FirehoseのDelivery Stream」は全く別のエンティティであることです。つまり、Kinesis FirehoseのDelivery Streamを作っても、KinesisのListStreamsアクションでは存在を確認できません。

KinesisではStreamに「シャード」という概念があり、これをsplitしたりmergeしたりしてキャパシティを制御できました。Kinesis Firehoseにはこの考え方は(現時点では)無く、シャードを意識しない仕組みになっています。

SDKのクライアントクラスも下記のように、パッケージからして別物になっています。

  • com.amazonaws.services.kinesis.AmazonKinesisClient
  • com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient

正直、なんとなく似てるので同じ「Kinesis」って名前が付いているだけで、「Kinesis」と「Kinesis Firehose」は別のサービスであると認識したほうが良いのかもしれません。

JavaからAPIを一通り呼び出す

ここからは、コードを細切れで貼りつつ逐次解説をしていきますが、最後までを1つのコードとして呼び出すことができるようになっています。ご興味のある方は試してみてください。

clientの作成と、パラメータの決定

AmazonKinesisFirehose firehose = new AmazonKinesisFirehoseClient();
firehose.setRegion(Region.getRegion(Regions.US_WEST_2));

String deliveryStreamName = "firehose-example";
String bucketName = "example;
String prefix = "firehose/";
String roleArn = "arn:aws:iam::000000000000:role/firehose_delivery_role";

clientはただnewするだけです。これにより、ローカル環境では~/.aws/credentialsファイルのdefaultプロファイルのキーを使い、EC2上ではインスタンスに付与したロールを利用するようになります。

ストリームの名前はfirehose-exampleとし、流れてきたレコードはexampleバケットに対してキー接頭辞firehose/を付与してPutすることとしました。また、Firehoseに対して、S3のPutObject等の権限を与える必要があるので、ロールを用意し、そのARNを使います。

ここは各自任意に書き換えてください。

Delivery streamの作成

logger.info("Create Delivery Stream");
String deliveryStreamARN = firehose.createDeliveryStream(new CreateDeliveryStreamRequest()
    .withDeliveryStreamName(deliveryStreamName)
    .withS3DestinationConfiguration(new S3DestinationConfiguration()
        .withBucketARN("arn:aws:s3:::" + bucketName)
        .withPrefix(prefix)
        .withRoleARN(roleArn)
        .withCompressionFormat(CompressionFormat.UNCOMPRESSED)
        .withEncryptionConfiguration(new EncryptionConfiguration()
            .withNoEncryptionConfig(NoEncryptionConfig.NoEncryption))
        .withBufferingHints(new BufferingHints()
            .withSizeInMBs(5)
            .withIntervalInSeconds(60))))
    .getDeliveryStreamARN();
logger.info("Delivery Stream created: {}", deliveryStreamARN);

ここでストリームを作成します。設定としてはオーソドックスなものを選択しています。S3に置くファイルは圧縮を行わず、暗号化も行いません。

圧縮したい場合、圧縮形式は GZIP, ZIP, Snappy の何れかから選択できます。暗号化したい場合はKMSを利用します。

上記のログ出力はこんな感じ。

15:56:35 [main] INFO  Create Delivery Stream
15:56:37 [main] INFO  Delivery Stream created: arn:aws:firehose:us-west-2:000000000000:deliverystream/firehose-example

StreamのACTIVE待機

さて、作成したストリームはすぐ使えるわけではありません。概ね1〜2分の間CREATING状態となり、その後ACTIVEとなります。

logger.info("Wait status ACTIVE");
String deliveryStreamStatus;
do {
    DescribeDeliveryStreamResult result = firehose.describeDeliveryStream(new DescribeDeliveryStreamRequest()
        .withDeliveryStreamName(deliveryStreamName));
    deliveryStreamStatus = result.getDeliveryStreamDescription().getDeliveryStreamStatus();
    logger.info(deliveryStreamStatus);
    Thread.sleep(10000);
} while (deliveryStreamStatus.equals("ACTIVE") == false);

そのため、適当な間隔でストリームをdescribeし、ステータスをポーリングします。DynamoDBのテーブル作成もこんな感じだったりしますね。

15:56:37 [main] INFO  Wait status ACTIVE
15:56:37 [main] INFO  CREATING
15:56:48 [main] INFO  CREATING
15:56:59 [main] INFO  CREATING
15:57:10 [main] INFO  CREATING
15:57:20 [main] INFO  CREATING
15:57:31 [main] INFO  CREATING
15:57:42 [main] INFO  CREATING
15:57:53 [main] INFO  CREATING
15:58:03 [main] INFO  ACTIVE

Stream一覧を取ってみる

logger.info("List Delivery Streams");
firehose.listDeliveryStreams(new ListDeliveryStreamsRequest())
    .getDeliveryStreamNames()
    .forEach(logger::info);

まぁ大した話ではないです。普通に一覧が取れるだけです。私が別途管理コンソールから作成してみたfirehose-firstも見えていますね。

15:58:13 [main] INFO  List Delivery Streams
15:58:14 [main] INFO  firehose-first
15:58:14 [main] INFO  firehose-example

StreamにデータをPutする(単発篇)

for (int i = 0; i < 10; i++) {
    Record record = new Record()
        .withData(ByteBuffer.wrap((String.format("Test Data %d\n", i)).getBytes()));
    String recordId = firehose.putRecord(new PutRecordRequest()
        .withDeliveryStreamName(deliveryStreamName)
        .withRecord(record))
        .getRecordId();
    logger.info("Record ID for {} = {}", i, recordId);
}

Thread.sleep(60000);

Recordを作り、それをputRecordでfirehoseに投げているだけです。投げる度にレコードIDという文字列が得られます。意外と長いIDが帰ってきますね。まだ何に使うかは不明ですw

最後に60秒スリープしている理由は後ほど。

15:58:14 [main] INFO  Record ID for 0 = uDYG+FuleIyQcdSATiVMXoY+V0nS2b8+GdHxgdP6D38oz+TzOIfNrdQpVFFUqQOSgA7hPhp6y+FVIOo/CTeRIltHCTxm3bvOyKaijmr/IFRv5hykBgW6yO9VyvdkSYohvF2o/Cy+RnTrIr1FnQUmDUtDd6OgTYAA27uGVLBZw4hyeZyjEjEXfefGfgf77QorRXs9mwLtwl9p1i3E8qXpjgcEIbk9VxNu
15:58:14 [main] INFO  Record ID for 1 = 7tSoNoGHwFg+4AQ/X06hEih0Xg8b3yANh7ya5pPFQ9rDTP7tHSiIVgNl1viRUYP7o5b+O0DAQHKL6zz2bubCkK0WzgS/GgqitK01F3lGNfYQlz0DL4ZXN6jMwnXB4aA6+CFIJsbE7PpB7wT3sKr1MKRxoLIkNnED/2EfBfTq0ugo87cQhmbwh2blCUKhQNjpZTssHCdLQefgKN6EGnLZn6zbaeyAJlWY
15:58:15 [main] INFO  Record ID for 2 = Vw616Zh31ShO6xQqWNCqBi+ucNvmIS2JT60XpCH5jJmH60F1zwr15SUysYHw+4Vo+XAWJ/TaNfERHiQJtRyDL6nOs7/GdBddhGUAePjzQzOEifcH3LeJ5C7Tbp5uIry8FKnzObHLymK0FLWHtjJLLrIlyJzCLBO4Q5hajvKz9QD3KzKk1z6K24nG9WumWzmNlpBP1w0X6Jv3hb/mJSD/2RxsATv35gfB
15:58:15 [main] INFO  Record ID for 3 = c951XuF7sEJ1PxdI5i6KjOHtBvqkz9Td/nVvZi+y5WYI5GfpvzICccZcB2S0S9kQwfhO65fmAfxWGuUhVGB0pXlDKgsUCH6MYFkCOlkavM5wdATotwKueJVe/vTzCifdFmLdyII8yQ/BjEVxyPOmktupUdGTF9k+u5tQcqD7ksNMCxhSaPGNFlgmpE5q1qKLBhdLs3Z3w4MmTeeUM7MzVl4uNbIRoWMM
15:58:15 [main] INFO  Record ID for 4 = iW9q/CYrCnK/yGxwVZRQUDaMlRdje7Q0n6eXaQo+I2pQXaLpnXZSvI3wv1bsSZ0YHLD2rlVfgc1sObgJnK31IHvS0pVJcGL8TRQQJaUQ3eKSDac2jaqe8MVSBUA1DRcKWs7OZYT6QWn61DzznfXvUd7VCtlz7og4jlkRf7PzgfpAx2Q/DpKlzAZ7Q3IcBbPp/9AUMmUuo30ORSeqLAEkFtfQxbbtRvjI
15:58:16 [main] INFO  Record ID for 5 = 77HqTBatw1o+JukB41lKEQNjYfPNAa0OHW6Nkj/dMh1cWvcR7VgbX2oWaNe2+Gi0XzPeQqsfv4RVzQx83RPJBIIDXP3jxn3ciKT5GPx7/rudLDl4JxxfYQPnCD0vgPCA86Nsj2PJMaOkpMIaV6ZKBkybOEzFbr72hRSeNoaxsLs+EcY0YePJdMBOykt4cmzBSyfcMMiN8q+JcrkMw1yGFmyy5VgMzFdZ
15:58:16 [main] INFO  Record ID for 6 = 5iRD6W/RTJd0/4dmBdH+GcCa9WgNfgn5tgZZ/UCTyUHKp4BM+2TibBWoBUtdDtZC2h8QlSRmBqG43wvVyzt9Ne68MekwPDRfFuN1ADabl3utgKinzceP6Q1Rm+1ueHBLs2Tn50mcJLjfPsVWZd30Ur9cWRiRKKXdBSBefCtczE+W4izQCVpsbFkFk7Rsn1HOEKoz8oF8an3ZUJB9xFYm19XdOrNAyySi
15:58:16 [main] INFO  Record ID for 7 = wocD9wC20+6EyLuQUQee9Y8nMtJ00poKl9LOBBChQHox7MfYBB3F/uEW6ynEisiIvmqJmoy8hhvo/y+t+5ZazJyhJy2/TFc3UzfBSXi8fw2vWUKm5jwx1HULPcdtq6NG5/7tfijfA60k74ECvFyzmOGJMh2dy7WsOagIaa6bra6pEo1/h2gzErWINfED6I1O8Uz+3ghkv4mbW9byNRffclcZuhuLfxBF
15:58:16 [main] INFO  Record ID for 8 = NJrrc4EQ65rrcnwRrLIFfmtbu3W5t1dJ1dRZ83wZ74jLEq+SlsBznJPMP9znkYaz1fGSLSuwDn6YLkbTcZmO8DE6wqFLyv0Jz8YntcRHJoVYhihguwvAnJa0LWy8RpxXD8tW0aXpSZmGuNz6Oioza+ga8DW0Q80Cmnib3IAAZBN/5u0vgoXxRweAWFCOutFBkrruNCM1QmTYdpomxXadeqqEXSm1IskU
15:58:16 [main] INFO  Record ID for 9 = sRbAPoNZsKmOGPDIP6wmsvES5DTdaH+GhGLCfcP5IrxvRBFpPbP/I18+EXKmL8L4uEyO2pGGK6KDe8xSWzMNkokTdpfzPXl0q1ryZnZJReFyYg2SFLU+voi4vPRvVYcH0QqMVbAJhR77HVdfpksB/0K7mqsHUEhgHtgSzjaFeFdNFfpH2/vUfLJ9Vjs7Kgad5akWN5Si5k0BlqtGC2qEwuBkG4yAjeaB

StreamにデータをPutする(バッチ篇)

ストリームに1レコードずつputするのではなく、1リクエストで一気に最大500件のレコードをputすることもできます。

Collection<Record> records = IntStream
    .range(10, 20)
    .mapToObj(i -> String.format("Test Data %d\n", i))
    .map(s -> s.getBytes())
    .map(ByteBuffer::wrap)
    .map(b -> new Record().withData(b))
    .collect(Collectors.toList());
firehose.putRecordBatch(new PutRecordBatchRequest()
    .withDeliveryStreamName(deliveryStreamName)
    .withRecords(records));

Thread.sleep(60000);

Java 8らしく、レコードをいっぱい作ってみました。

S3に置かれたファイルを読んでみる

logger.info("Read S3 Objects");
S3Objects.withPrefix(new AmazonS3Client(), bucketName, prefix).forEach(os -> {
    S3Object object = s3.getObject(os.getBucketName(), os.getKey());
    try (InputStreamReader reader = new InputStreamReader(object.getObjectContent())) {
        String content = CharStreams.toString(reader);
        logger.info("Key: {}", os.getKey());
        logger.info("{}", content);
    } catch (Exception e) {
        logger.error("error", e);
    }
});

まぁ、普通ですね。その結果得られたログはこんな感じです。

16:00:17 [main] INFO  Read S3 Objects
16:00:18 [main] INFO  Key: firehose/2015/10/08/06/firehose-miyamoto-1-2015-10-08-06-59-18-3cf72bc4-619c-4890-93e2-872d01959561
16:00:18 [main] INFO  Test Data 0
Test Data 1
Test Data 2
Test Data 3
Test Data 4
Test Data 5
Test Data 6
Test Data 7
Test Data 8
Test Data 9

16:00:18 [main] INFO  Key: firehose/2015/10/08/07/firehose-miyamoto-1-2015-10-08-07-00-20-b35618f2-0b19-423d-af62-141271f2c60b
16:00:18 [main] INFO  Test Data 10
Test Data 11
Test Data 12
Test Data 13
Test Data 14
Test Data 15
Test Data 16
Test Data 17
Test Data 18
Test Data 19

お気づきでしょうか。1レコード1ファイルというわけではありません。適宜複数のレコードが集約されてファイルに吐かれています。だから60秒待ったりしていたんですね。

キーには、指定したprefixに続き、yyyy/MM/dd/HH という形式(UTC)でタイムスタンプがついています。その後にもストリーム名等が続きますが、このあたりは仕様として明示されているわけではないので、後に変更があるかもしれません。注意しましょう。

ファイル内に書かれた複数のレコードは、特に区切り文字があるわけではない(ログに現れた改行は、コード中で明示的にデータに含めた改行です)ので、PutRecordのレベルで区切り文字を意識しておく必要がありそうです。個人的には、各レコードはJSONであることが多い気がしているので、無改行のJSONを本体として末尾に改行をつけたもの、をレコードのデータとして送信すると使いやすいのではないかと思いました。

Delivery streamの削除

さて、使い終わったストリームは削除シておきましょう。これも即座に消えるわけではなく、1分弱の間DELETINGというステータスになるようです。

logger.info("Delete Delivery Stream");
firehose.deleteDeliveryStream(new DeleteDeliveryStreamRequest()
    .withDeliveryStreamName(deliveryStreamName));

logger.info("Wait deleted");
try {
    do {
        DescribeDeliveryStreamResult result =
                firehose.describeDeliveryStream(new DescribeDeliveryStreamRequest()
                    .withDeliveryStreamName(deliveryStreamName));
        deliveryStreamStatus = result.getDeliveryStreamDescription().getDeliveryStreamStatus();
        logger.info(deliveryStreamStatus);
        Thread.sleep(10000);
    } while (true);
} catch (ResourceNotFoundException e) {
    logger.info("END");
    // finish
}

完全に消えるとステータスがDELETEDになるわけではなく、ResourceNotFoundExceptionが飛ぶようです。

16:00:18 [main] INFO  Delete Delivery Stream
16:00:19 [main] INFO  Wait deleted
16:00:19 [main] INFO  DELETING
16:00:29 [main] INFO  DELETING
16:00:40 [main] INFO  DELETING
16:00:51 [main] INFO  DELETING
16:01:02 [main] INFO  END

まとめ

正直、Kinesisは「流す側」は気楽で良いけど、「受け取る側」がかなり頑張らなきゃいけない印象でした。

これに対してKinesis Firehoseは、とにかくS3やRedshiftにレコードを突っ込んでくれるところまでをマネージドで実現してくれるため、適用範囲がぐっと広がった感があります。今後が楽しみです。