kinesaliteを使ってローカルでKinesisを動かす

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

ローカルでKinesisを動かそう

ここのblogでも多数紹介されているとおり、ストリーミングデータをごにょごにょするのにAmazon Kinesisは便利です。
そのためプログラムからもKinesisを使用することがありますが、開発中にテストする際にもAWS環境を使う必要があるのが面倒です。
本稿では、そんなときに役立つ、ローカルで動作するKinesis、「kinesalite」を紹介します。  

環境

今回使用した動作環境は以下のとおりです。

  • OS : MacOS X 10.12.4
  • Node : v9.8.0
  • Java : 1.8.0_121
  • Gradle : 4.6

kinesaliteとは

kinesaliteはNode.jsで動作するAWS Kinesisのローカル実装で、LevelDBを使用して動きます。   Node.jsが動けばどこでも動くので簡単に導入できます。

kinesaliteを使ってみる

まずはkinesaliteのインストール(Nodeとnpmはインストール済み)  

% npm i npm
% npm install -g kinesalite

インストールできたら起動しましょう。デフォルトでは4567番ポートで起動します。   

% kinesalite   
Listening at http://:::4567

とりあえずjavascriptでアクセスできるのを確認。

//main.js
var AWS = require('aws-sdk')
AWS.config.update({region:'us-east-1'});

var kinesis = new AWS.Kinesis({endpoint: 'http://localhost:4567'})
kinesis.listStreams(console.log.bind(console))

下記のように結果がかえってくればアクセスOKです。  

% node main.js                                  
null { StreamNames: [], HasMoreStreams: false }

さて次にJavaからアクセスしてみます。必要ライブラリは次のとおり。

// build.gradle
dependencies {
    compile group: 'com.amazonaws', name: 'aws-java-sdk-kinesis', version: '1.11.301'
}

これでJavaプログラムからローカルKinesisにアクセスできるようになるのですが、その前にもう1つ設定が必要です。
下記のように、AWSライブラリでCBORバイナリオブジェクトの無効化をするための環境変数を設定しましょう。

% export AWS_CBOR_DISABLE=1

これでローカルのKinesisにアクセスできるようになりました。
stream作成・stream取得・データPUT・データGETを実行してみます。

AmazonKinesis kinesisClient = AmazonKinesisClientBuilder.standard()
        .withEndpointConfiguration(
                new AwsClientBuilder.EndpointConfiguration(
                        "http://localhost:4567/", "us-east-1")).build();

//kinesis-stream作成
CreateStreamRequest req = new CreateStreamRequest();
req.setStreamName("my-stream");
req.setShardCount(1);
kinesisClient.createStream(req);

//kinesis-streamの取得
ListStreamsResult res = kinesisClient.listStreams();
List<String> l = res.getStreamNames();
for (String s : l) {
    System.out.println(s);
}

//data put 
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName( "my-stream" );
putRecordRequest.setData(ByteBuffer.wrap(("testData").getBytes()));
putRecordRequest.setPartitionKey(String.format("partitionKey"));
PutRecordResult putRecordResult = kinesisClient.putRecord( putRecordRequest );
String shartdId = putRecordResult.getShardId();

//data get
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName("my-stream");
getShardIteratorRequest.setShardId(shartdId);
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");

GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();

GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(10);

GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
for(Record r : records) {
    System.out.println(new String(r.getData().array()));
}

これでAWS環境を用意できなくても、ローカルのKinesisである程度の動作確認はできますね。

参考サイトなど