この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは。最近体の衰えをひどく感じるようになってきました。齋藤です。 最近、温度変化が激しくて体調が悪くなる人が多いですが お元気でしょうか。体調管理に気をつけたいところです。
初めに
今回、業務の中でKinesisを触る機会があるので 少し調査をしようと思い、CIでテストできる環境を作りました。
構成
- Circle CI 2.0
- LocalStack
- AWS SDK for Java
まずはざっとCicleCIの設定を
Circle CIの設定はこちらです。 設定を間違えると動かない部分があるのでご注意ください。 LocalStackのimageはatlassianlabsからlocalstackに移っております。
version: 2
jobs:
build:
docker:
- image: circleci/openjdk:8-jdk
- image: localstack/localstack # atlassianlabs/localstackと間違えると動きません。
working_directory: ~/repo
environment:
JVM_OPTS: -Xmx3200m
TERM: dumb
steps:
- checkout
- restore_cache:
keys:
- v1-dependencies-{{ checksum "build.gradle" }}-{{ checksum "gradle.properties" }}
- v1-dependencies-
- run: ./gradlew dependencies
- save_cache:
paths:
- ~/.m2
key: v1-dependencies-{{ checksum "build.gradle" }}-{{ checksum "gradle.properties" }}
- run: ./gradlew test
- store_test_results:
path: build/test-results/
まずはKinesisで使う基本的なメソッドから
AmazonKinesisクラスのセットアップについては後ほど説明します。 この辺は普通にKinesisのAPIです
Streamの作成
createStreamメソッドでStreamの作成をします。
AmazonKinesis kinesis = ...;
kinesis.createStream("testStream", /* シャード数 */ 5);
Streamの情報取得
Streamの情報を取得します。 今回このメソッドはStreamが出来たかどうかの確認で使っています。
AmazonKinesis kinesis = ...;
kinesis.describeStream("testStream");
Streamの削除
Streamの削除をします。
AmazonKinesis kinesis = ...;
kinesis.deleteStream("testStream");
Streamにデータを登録
AmazonKinesis kinesis = ...;
PutRecordRequest recordRequest = new PutRecordRequest();
recordRequest.setStreamName("testStream");
recordRequest.setData(ByteBuffer.wrap("some_data".getBytes(StandardCharsets.UTF_8)));
recordRequest.setPartitionKey("some_partition_key");
kinesis.putRecord(recordRequest);
基本的なテストを書いてみる
一連の流れをテストコードにして見ます。 LocalStackではあらかじめ決められたポートでそれぞれのサービスが立ち上がります。 本ブログの以下の記事やgithubのページなどを見てご確認ください。
// guavaのThreadFactoryBuilderを使っています
ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true)
.build();
ExecutorService executorService = Executors.newFixedThreadPool(10, factory);
@Test
public void test() throws Exception {
// LocalStackはバイナリのフォーマットをサポートしていないのでdisableしておきます。
System.setProperty("com.amazonaws.sdk.disableCbor", "1");
// LocalStack向けのSetupです
AWSStaticCredentialsProvider dummyProvider =
new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy"));
// LocalStack向けのSetupです
KinesisClientLibConfiguration config =
new KinesisClientLibConfiguration("testApp", "testStream", dummyProvider, "testWorker")
.withInitialPositionInStream(InitialPositionInStream.LATEST)
.withKinesisEndpoint("http://localhost:4568/")
.withDynamoDBEndpoint("http://localhost:4569/");
// LocalStack向けのSetupです
AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard()
.withCredentials(dummyProvider)
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4568", null))
.build();
kinesis.createStream("testStream", 5);
// streamが作成されるまで待つ
while (true) {
try {
kinesis.describeStream("testStream");
break;
} catch (ResourceNotFoundException e) {
}
}
String key = RandomStringUtils.randomAlphanumeric(10);
String data = "test-data-" + key;
PutRecordRequest recordRequest =
new PutRecordRequest();
recordRequest.setStreamName("testStream");
recordRequest.setData(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)));
recordRequest.setPartitionKey(key);
kinesis.putRecord(recordRequest);
kinesis.deleteStream("testStream");
}
ちなみにこのテスト、たまに落ちるはずです。 これについては後ほど直していきます。
KCLでStreamからレコードを取り出して処理する
KCL(Kinesis Client Library)でStreamからレコードを順次取り出して処理していくコード書いて見ます
IRecordProcessorFactoryというインターフェースの実装とFactoryが返却するIRecordProcessorの実装を行います。
とりあえずざっとラムダ式と匿名クラスで書きました。
ログに出力するだけのProcessorです。ProcessorがレコードをConsumeします。また、ProcessorはStreamの
processBarrier
とbarrier
については後ほど説明します。
IRecordProcessorFactory factory = () -> {
return new IRecordProcessor() {
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
log.info("test: shutdown");
}
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
log.info("test: processRecords");
records.stream()
.forEach(r -> log.info(r.toString()));
try {
processBarrier.await();
} catch (Exception e) {
throw new AssertionError(e);
}
}
@Override
public void initialize(String shardId) {
log.info("test: initialize {}", shardId);
try {
barrier.await();
} catch (Exception e) {
throw new AssertionError(e);
}
}
};
};
シナリオ的なテストにしてみる
先ほどのテストにProcessorのコードを追加して動作を確認して見ます。 また、CyclicBarrierを使ってタイミングによってテストが失敗しないようにして見ます。 CyclicBarrierのコードの参考は以下の記事です。 Spring AOP+CyclicBarrierを活用してSpring Bootアプリ上での楽観ロックのテスト条件を確実に整える
以下のコードが全ての処理をざっとまとめたコードです。
ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true)
.build();
ExecutorService executorService = Executors.newFixedThreadPool(10, factory);
@Test
public void test() throws Exception {
// LocalStackはバイナリのフォーマットをサポートしていないのでdisableしておきます。
System.setProperty("com.amazonaws.sdk.disableCbor", "1");
CyclicBarrier barrier = new CyclicBarrier(6);
CyclicBarrier processBarrier = new CyclicBarrier(2);
IRecordProcessorFactory factory = () -> {
return new IRecordProcessor() {
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
log.info("test: shutdown");
}
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
log.info("test: processRecords");
records.stream()
.forEach(r -> log.info(r.toString()));
try {
processBarrier.await();
} catch (Exception e) {
throw new AssertionError(e);
}
}
@Override
public void initialize(String shardId) {
log.info("test: initialize {}", shardId);
try {
barrier.await();
} catch (Exception e) {
throw new AssertionError(e);
}
}
};
};
// LocalStack向けのSetupです
AWSStaticCredentialsProvider dummyProvider =
new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy"));
// LocalStack向けのSetupです
KinesisClientLibConfiguration config =
new KinesisClientLibConfiguration("testApp", "testStream", dummyProvider, "testWorker")
.withInitialPositionInStream(InitialPositionInStream.LATEST)
.withKinesisEndpoint("http://localhost:4568/")
.withDynamoDBEndpoint("http://localhost:4569/");
// LocalStack向けのSetupです
AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard()
.withCredentials(dummyProvider)
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4568", null))
.build();
kinesis.createStream("testStream", 5);
// streamが作成されるまで待つ
while (true) {
try {
kinesis.describeStream("testStream");
break;
} catch (ResourceNotFoundException e) {
}
}
String key = RandomStringUtils.randomAlphanumeric(10);
String data = "test-data-" + key;
// Processorの準備
Worker worker = new Worker.Builder()
.config(config)
.recordProcessorFactory(factory)
.metricsFactory(new NullMetricsFactory())
.execService(executorService)
.build();
executorService.submit(worker);
// Processor全てが初期化されるのを待ちます。Processorはシャードの数だけ作成されます。
barrier.await();
PutRecordRequest recordRequest =
new PutRecordRequest();
recordRequest.setStreamName("testStream");
recordRequest.setData(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)));
recordRequest.setPartitionKey(key);
kinesis.putRecord(recordRequest);
// ProcessorがRecordを処理するのを待ちます。
processBarrier.await();
kinesis.deleteStream("testStream");
// Processorが処理をしているスレッド群を終了
executorService.awaitTermination(10, TimeUnit.SECONDS);
}
Streamの作成や削除などのsetup, teardownで本来書くような内容も同じ場所に書きましたが
この辺は@Rule
や@Before
などのjunitで拡張したりすれば
もう少し綺麗になるかと思われます。
まとめ
いかがだったでしょうか? 今回初めてKCLを使ってKinesisからデータを取得するような処理を書いて見ましたが LocalStackを使うことで非常に簡単にテストをすることができました。 テストコードは少し汚いですが。
このコードは業務で使われないかもしれない。
この記事が日の目を見ることがあると良いですね。
今回このテストを書いたリポジトリは以下のリポジトリです。 kinesis-sandbox
今回はLocalStackを使ったのですが 元々はkinesaliteを使おうかなと思っていました。 今度はkinesaliteを使ってみようかなと思います。