LocalStackを使ってCIでKinesis Client Libraryを使ったコードのテストを書いてみる

アイキャッチ

こんにちは。最近体の衰えをひどく感じるようになってきました。齋藤です。 最近、温度変化が激しくて体調が悪くなる人が多いですが お元気でしょうか。体調管理に気をつけたいところです。

初めに

今回、業務の中でKinesisを触る機会があるので 少し調査をしようと思い、CIでテストできる環境を作りました。

構成

  • Circle CI 2.0
  • LocalStack
  • AWS SDK for Java

まずはざっとCicleCIの設定を

Circle CIの設定はこちらです。 設定を間違えると動かない部分があるのでご注意ください。 LocalStackのimageはatlassianlabsからlocalstackに移っております。

version: 2
jobs:
  build:
    docker:
      - image: circleci/openjdk:8-jdk
      - image: localstack/localstack # atlassianlabs/localstackと間違えると動きません。
    working_directory: ~/repo
    environment:
      JVM_OPTS: -Xmx3200m
      TERM: dumb

    steps:
      - checkout

      - restore_cache:
          keys:
          - v1-dependencies-{{ checksum "build.gradle" }}-{{ checksum "gradle.properties" }}
          - v1-dependencies-

      - run: ./gradlew dependencies

      - save_cache:
          paths:
            - ~/.m2
          key: v1-dependencies-{{ checksum "build.gradle" }}-{{ checksum "gradle.properties" }}

      - run: ./gradlew test
      - store_test_results:
          path: build/test-results/

まずはKinesisで使う基本的なメソッドから

AmazonKinesisクラスのセットアップについては後ほど説明します。 この辺は普通にKinesisのAPIです

Streamの作成

createStreamメソッドでStreamの作成をします。

AmazonKinesis kinesis = ...; 
kinesis.createStream("testStream", /* シャード数 */ 5);

Streamの情報取得

Streamの情報を取得します。 今回このメソッドはStreamが出来たかどうかの確認で使っています。

AmazonKinesis kinesis = ...; 
kinesis.describeStream("testStream");

Streamの削除

Streamの削除をします。

AmazonKinesis kinesis = ...; 
kinesis.deleteStream("testStream");

Streamにデータを登録

AmazonKinesis kinesis = ...; 

PutRecordRequest recordRequest = new PutRecordRequest();
recordRequest.setStreamName("testStream");
recordRequest.setData(ByteBuffer.wrap("some_data".getBytes(StandardCharsets.UTF_8)));
recordRequest.setPartitionKey("some_partition_key");

kinesis.putRecord(recordRequest);

基本的なテストを書いてみる

一連の流れをテストコードにして見ます。 LocalStackではあらかじめ決められたポートでそれぞれのサービスが立ち上がります。 本ブログの以下の記事やgithubのページなどを見てご確認ください。

LocalStackをつかってローカルでLambdaを実行してみた

  // guavaのThreadFactoryBuilderを使っています
  ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true)
    .build();

  ExecutorService executorService = Executors.newFixedThreadPool(10, factory);

  @Test
  public void test() throws Exception {
    // LocalStackはバイナリのフォーマットをサポートしていないのでdisableしておきます。
    System.setProperty("com.amazonaws.sdk.disableCbor", "1");

    // LocalStack向けのSetupです
    AWSStaticCredentialsProvider dummyProvider =
      new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy"));

    // LocalStack向けのSetupです
    KinesisClientLibConfiguration config =
      new KinesisClientLibConfiguration("testApp", "testStream", dummyProvider, "testWorker")
        .withInitialPositionInStream(InitialPositionInStream.LATEST)
        .withKinesisEndpoint("http://localhost:4568/")
        .withDynamoDBEndpoint("http://localhost:4569/");

    // LocalStack向けのSetupです
    AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard()
      .withCredentials(dummyProvider)
      .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4568", null))
      .build();

    kinesis.createStream("testStream", 5);

    // streamが作成されるまで待つ
    while (true) {
      try {
        kinesis.describeStream("testStream");
        break;
      } catch (ResourceNotFoundException e) {
      }
    }

    String key = RandomStringUtils.randomAlphanumeric(10);
    String data = "test-data-" + key;

    PutRecordRequest recordRequest =
      new PutRecordRequest();
    recordRequest.setStreamName("testStream");
    recordRequest.setData(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)));
    recordRequest.setPartitionKey(key);
    kinesis.putRecord(recordRequest);

    kinesis.deleteStream("testStream");
  }

ちなみにこのテスト、たまに落ちるはずです。 これについては後ほど直していきます。

KCLでStreamからレコードを取り出して処理する

KCL(Kinesis Client Library)でStreamからレコードを順次取り出して処理していくコード書いて見ます

IRecordProcessorFactoryというインターフェースの実装とFactoryが返却するIRecordProcessorの実装を行います。

とりあえずざっとラムダ式と匿名クラスで書きました。 ログに出力するだけのProcessorです。ProcessorがレコードをConsumeします。また、ProcessorはStreamの processBarrierbarrierについては後ほど説明します。

    IRecordProcessorFactory factory = () -> {
      return new IRecordProcessor() {

        @Override
        public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
          log.info("test: shutdown");
        }

        @Override
        public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
          log.info("test: processRecords");
          records.stream()
            .forEach(r -> log.info(r.toString()));
          try {
            processBarrier.await();
          } catch (Exception e) {
            throw new AssertionError(e);
          }
        }

        @Override
        public void initialize(String shardId) {
          log.info("test: initialize {}", shardId);
          try {
            barrier.await();
          } catch (Exception e) {
            throw new AssertionError(e);
          }
        }
      };
    };

シナリオ的なテストにしてみる

先ほどのテストにProcessorのコードを追加して動作を確認して見ます。 また、CyclicBarrierを使ってタイミングによってテストが失敗しないようにして見ます。 CyclicBarrierのコードの参考は以下の記事です。 Spring AOP+CyclicBarrierを活用してSpring Bootアプリ上での楽観ロックのテスト条件を確実に整える

以下のコードが全ての処理をざっとまとめたコードです。

  ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true)
    .build();

  ExecutorService executorService = Executors.newFixedThreadPool(10, factory);


  @Test
  public void test() throws Exception {
    // LocalStackはバイナリのフォーマットをサポートしていないのでdisableしておきます。
    System.setProperty("com.amazonaws.sdk.disableCbor", "1");

    CyclicBarrier barrier = new CyclicBarrier(6);
    CyclicBarrier processBarrier = new CyclicBarrier(2);

    IRecordProcessorFactory factory = () -> {
      return new IRecordProcessor() {

        @Override
        public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
          log.info("test: shutdown");
        }

        @Override
        public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
          log.info("test: processRecords");
          records.stream()
            .forEach(r -> log.info(r.toString()));
          try {
            processBarrier.await();
          } catch (Exception e) {
            throw new AssertionError(e);
          }
        }

        @Override
        public void initialize(String shardId) {
          log.info("test: initialize {}", shardId);
          try {
            barrier.await();
          } catch (Exception e) {
            throw new AssertionError(e);
          }
        }
      };
    };

    // LocalStack向けのSetupです
    AWSStaticCredentialsProvider dummyProvider =
      new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy"));

    // LocalStack向けのSetupです
    KinesisClientLibConfiguration config =
      new KinesisClientLibConfiguration("testApp", "testStream", dummyProvider, "testWorker")
        .withInitialPositionInStream(InitialPositionInStream.LATEST)
        .withKinesisEndpoint("http://localhost:4568/")
        .withDynamoDBEndpoint("http://localhost:4569/");

    // LocalStack向けのSetupです
    AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard()
      .withCredentials(dummyProvider)
      .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4568", null))
      .build();

    kinesis.createStream("testStream", 5);

    // streamが作成されるまで待つ
    while (true) {
      try {
        kinesis.describeStream("testStream");
        break;
      } catch (ResourceNotFoundException e) {
      }
    }
    String key = RandomStringUtils.randomAlphanumeric(10);
    String data = "test-data-" + key;

    // Processorの準備
    Worker worker = new Worker.Builder()
      .config(config)
      .recordProcessorFactory(factory)
      .metricsFactory(new NullMetricsFactory())
      .execService(executorService)
      .build();  
    executorService.submit(worker);

    // Processor全てが初期化されるのを待ちます。Processorはシャードの数だけ作成されます。
    barrier.await();

    PutRecordRequest recordRequest =
      new PutRecordRequest();
    recordRequest.setStreamName("testStream");
    recordRequest.setData(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)));
    recordRequest.setPartitionKey(key);
    kinesis.putRecord(recordRequest);

    // ProcessorがRecordを処理するのを待ちます。
    processBarrier.await();

    kinesis.deleteStream("testStream");

    // Processorが処理をしているスレッド群を終了
    executorService.awaitTermination(10, TimeUnit.SECONDS);
  }

Streamの作成や削除などのsetup, teardownで本来書くような内容も同じ場所に書きましたが この辺は@Rule@Beforeなどのjunitで拡張したりすれば もう少し綺麗になるかと思われます。

まとめ

いかがだったでしょうか? 今回初めてKCLを使ってKinesisからデータを取得するような処理を書いて見ましたが LocalStackを使うことで非常に簡単にテストをすることができました。 テストコードは少し汚いですが。

このコードは業務で使われないかもしれない。

この記事が日の目を見ることがあると良いですね。

今回このテストを書いたリポジトリは以下のリポジトリです。 kinesis-sandbox

今回はLocalStackを使ったのですが 元々はkinesaliteを使おうかなと思っていました。 今度はkinesaliteを使ってみようかなと思います。