この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
ローカルでKinesisを動かそう
ここのblogでも多数紹介されているとおり、ストリーミングデータをごにょごにょするのにAmazon Kinesisは便利です。
そのためプログラムからもKinesisを使用することがありますが、開発中にテストする際にもAWS環境を使う必要があるのが面倒です。
本稿では、そんなときに役立つ、ローカルで動作するKinesis、「kinesalite」を紹介します。
環境
今回使用した動作環境は以下のとおりです。
- OS : MacOS X 10.12.4
- Node : v9.8.0
- Java : 1.8.0_121
- Gradle : 4.6
kinesaliteとは
kinesaliteはNode.jsで動作するAWS Kinesisのローカル実装で、LevelDBを使用して動きます。 Node.jsが動けばどこでも動くので簡単に導入できます。
kinesaliteを使ってみる
まずはkinesaliteのインストール(Nodeとnpmはインストール済み)
% npm i npm
% npm install -g kinesalite
インストールできたら起動しましょう。デフォルトでは4567番ポートで起動します。
% kinesalite
Listening at http://:::4567
とりあえずjavascriptでアクセスできるのを確認。
//main.js
var AWS = require('aws-sdk')
AWS.config.update({region:'us-east-1'});
var kinesis = new AWS.Kinesis({endpoint: 'http://localhost:4567'})
kinesis.listStreams(console.log.bind(console))
下記のように結果がかえってくればアクセスOKです。
% node main.js
null { StreamNames: [], HasMoreStreams: false }
さて次にJavaからアクセスしてみます。必要ライブラリは次のとおり。
// build.gradle
dependencies {
compile group: 'com.amazonaws', name: 'aws-java-sdk-kinesis', version: '1.11.301'
}
これでJavaプログラムからローカルKinesisにアクセスできるようになるのですが、その前にもう1つ設定が必要です。
下記のように、AWSライブラリでCBORバイナリオブジェクトの無効化をするための環境変数を設定しましょう。
% export AWS_CBOR_DISABLE=1
これでローカルのKinesisにアクセスできるようになりました。
stream作成・stream取得・データPUT・データGETを実行してみます。
AmazonKinesis kinesisClient = AmazonKinesisClientBuilder.standard()
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
"http://localhost:4567/", "us-east-1")).build();
//kinesis-stream作成
CreateStreamRequest req = new CreateStreamRequest();
req.setStreamName("my-stream");
req.setShardCount(1);
kinesisClient.createStream(req);
//kinesis-streamの取得
ListStreamsResult res = kinesisClient.listStreams();
List<String> l = res.getStreamNames();
for (String s : l) {
System.out.println(s);
}
//data put
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName( "my-stream" );
putRecordRequest.setData(ByteBuffer.wrap(("testData").getBytes()));
putRecordRequest.setPartitionKey(String.format("partitionKey"));
PutRecordResult putRecordResult = kinesisClient.putRecord( putRecordRequest );
String shartdId = putRecordResult.getShardId();
//data get
String shardIterator;
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName("my-stream");
getShardIteratorRequest.setShardId(shartdId);
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");
GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
shardIterator = getShardIteratorResult.getShardIterator();
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(10);
GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
List<Record> records = getRecordsResult.getRecords();
for(Record r : records) {
System.out.println(new String(r.getData().array()));
}
これでAWS環境を用意できなくても、ローカルのKinesisである程度の動作確認はできますね。