kinesaliteを使ってローカルでKinesisを動かす
ローカルでKinesisを動かそう
ここのblogでも多数紹介されているとおり、ストリーミングデータをごにょごにょするのにAmazon Kinesisは便利です。
そのためプログラムからもKinesisを使用することがありますが、開発中にテストする際にもAWS環境を使う必要があるのが面倒です。
本稿では、そんなときに役立つ、ローカルで動作するKinesis、「kinesalite」を紹介します。
環境
今回使用した動作環境は以下のとおりです。
- OS : MacOS X 10.12.4
- Node : v9.8.0
- Java : 1.8.0_121
- Gradle : 4.6
kinesaliteとは
kinesaliteはNode.jsで動作するAWS Kinesisのローカル実装で、LevelDBを使用して動きます。 Node.jsが動けばどこでも動くので簡単に導入できます。
kinesaliteを使ってみる
まずはkinesaliteのインストール(Nodeとnpmはインストール済み)
% npm i npm % npm install -g kinesalite
インストールできたら起動しましょう。デフォルトでは4567番ポートで起動します。
% kinesalite Listening at http://:::4567
とりあえずjavascriptでアクセスできるのを確認。
//main.js var AWS = require('aws-sdk') AWS.config.update({region:'us-east-1'}); var kinesis = new AWS.Kinesis({endpoint: 'http://localhost:4567'}) kinesis.listStreams(console.log.bind(console))
下記のように結果がかえってくればアクセスOKです。
% node main.js null { StreamNames: [], HasMoreStreams: false }
さて次にJavaからアクセスしてみます。必要ライブラリは次のとおり。
// build.gradle dependencies { compile group: 'com.amazonaws', name: 'aws-java-sdk-kinesis', version: '1.11.301' }
これでJavaプログラムからローカルKinesisにアクセスできるようになるのですが、その前にもう1つ設定が必要です。
下記のように、AWSライブラリでCBORバイナリオブジェクトの無効化をするための環境変数を設定しましょう。
% export AWS_CBOR_DISABLE=1
これでローカルのKinesisにアクセスできるようになりました。
stream作成・stream取得・データPUT・データGETを実行してみます。
AmazonKinesis kinesisClient = AmazonKinesisClientBuilder.standard() .withEndpointConfiguration( new AwsClientBuilder.EndpointConfiguration( "http://localhost:4567/", "us-east-1")).build(); //kinesis-stream作成 CreateStreamRequest req = new CreateStreamRequest(); req.setStreamName("my-stream"); req.setShardCount(1); kinesisClient.createStream(req); //kinesis-streamの取得 ListStreamsResult res = kinesisClient.listStreams(); List<String> l = res.getStreamNames(); for (String s : l) { System.out.println(s); } //data put PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( "my-stream" ); putRecordRequest.setData(ByteBuffer.wrap(("testData").getBytes())); putRecordRequest.setPartitionKey(String.format("partitionKey")); PutRecordResult putRecordResult = kinesisClient.putRecord( putRecordRequest ); String shartdId = putRecordResult.getShardId(); //data get String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName("my-stream"); getShardIteratorRequest.setShardId(shartdId); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator(); GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(10); GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords(); for(Record r : records) { System.out.println(new String(r.getData().array())); }
これでAWS環境を用意できなくても、ローカルのKinesisである程度の動作確認はできますね。