Firehoseを使ったコードのテストをLocalStackを使って動かしてみる!

Firehoseを使ったコードのテストをLocalStackを使って動かしてみる!

LocalStackでKinesis Firehose検証用のコード書いています。齋藤です。

それでは今回はLocalStackでKinesis Firehoseを動かしてみたいと思います。 Firehoseの連携先はS3です。 ちなみに最後の方にも書いたのですが、LocalStackではFirehoseからRedshiftとElasticsearchへの連携は実装されていないようです。

今回書いたコードはgithub上に置いています

LocalStackでKinesis Firehoseの連携先S3バケットを作成する

S3のバケットを作成するためにはAmazonS3オブジェクトが必要になってきます。 dd以下のコードでLocalStackに向けたS3クライアントをセットアップします。

    AWSCredentialsProvider dummyProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy"));
    AmazonS3 s3 = AmazonS3ClientBuilder.standard()
        .withCredentials(dummyProvider)
        .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4572", null))
        .withPathStyleAccessEnabled(true) // これがないとエラーになります。
        .build();

バケットの作成を以下のコードで行います。

s3.createBucket("test");

これでひとまず、LocalStackで使うS3のバケットのセットアップは終了です。

LocalStackでKinesis FirehoseのDelivery Streamを作成します。

LocalStackに向けたKinesis Firehoseのクライアントの作成は以下のコードで行います。

    AWSCredentialsProvider dummyProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy"));
    AmazonKinesisFirehose firehose = AmazonKinesisFirehoseClientBuilder.standard()
      .withCredentials(dummyProvider)
      .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4573", null))
      .build();;

先ほど作成したFirehoseのクライアントを使って 以下のコードでDelivery Streamを作成します。

    firehose.createDeliveryStream(
      new CreateDeliveryStreamRequest()
        .withDeliveryStreamName("testStream")
        .withS3DestinationConfiguration(
          new S3DestinationConfiguration()
            .withBucketARN("arn:aws:s3:::test") // 先ほど作ったS3のバケット名でS3のARNを設定します。
            .withPrefix("firehose/")
            .withRoleARN("arn:aws:iam::dummy:role/dummy"))) // LocalStackにはIAM関連のAPIはないみたいなのでdummy roleです。

RecordをPutしてS3に連携されていることをザッと確認してみます。

RecordをPutします。 このコードは都元ダイスケ氏の記事から持ってきました。

    firehose.putRecord(
      new PutRecordRequest()
        .withDeliveryStreamName("testStream")
        .withRecord(
          new Record()
            .withData(ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8)))));

S3からlistObjectを使ってKinesisによって連係されていることを確認します。 アサーションはassertjを使っています。

    assertThat(s3.listObjects("test")
      .getObjectSummaries()).anySatisfy(summary -> {
        assertThat(summary.getKey()).startsWith("firehose/");
      });

S3のお片付け

S3ってobjectがあると削除できないんですね・・・・。初めて知りました。 先にObjectを削除してからBucketを削除します。

  @After
  public void tearDown() {
    firehose.deleteDeliveryStream(new DeleteDeliveryStreamRequest().withDeliveryStreamName("testStream"));

    s3.listObjects("test")
      .getObjectSummaries()
      .forEach(s -> {
        s3.deleteObject(s.getBucketName(), s.getKey());
      });

    s3.deleteBucket("test");
  }

なお、CLIだとforce deleteできるようです。 ログを見た感じだと中で似たようなことをやっていそうですね。

aws --endpoint http://localhost:4572 s3 rb s3://test --force

delete: s3://test/firehose/28fe77a2-d0ef-4d39-9d6a-1a576709b922

delete: s3://test/firehose/2e4038af-7997-4735-8e2f-2e5f69b7fb88

delete: s3://test/firehose/3038ba8b-df66-46f9-9398-4d7e3ac7182f

delete: s3://test/firehose/8c1720be-e3dd-4faa-b068-d254434c00fe

delete: s3://test/xxxxx

remove_bucket: test

今回書いたコード

今回書いたコードはgithub上に置いています。 こちらにも貼り付けておきます。

public class FirehoseTest {
  private AmazonKinesisFirehose firehose;

  private AWSCredentialsProvider dummyProvider;

  private AmazonS3 s3;


  @Before
  public void setup() {
    System.setProperty("com.amazonaws.sdk.disableCbor", "1"); // LocalStackが対応していないプロトコルを使わないようにする
    dummyProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy"));;
    firehose = AmazonKinesisFirehoseClientBuilder.standard()
      .withCredentials(dummyProvider)
      .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4573", null))
      .build();;

    s3 = AmazonS3ClientBuilder.standard()
      .withCredentials(dummyProvider)
      .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4572", null))
      .withPathStyleAccessEnabled(true)
      .build();;
  }

  @Test
  public void test() {
    s3.createBucket("test");

    firehose.createDeliveryStream(
      new CreateDeliveryStreamRequest()
        .withDeliveryStreamName("testStream")
        .withS3DestinationConfiguration(
          new S3DestinationConfiguration()
            .withBucketARN("arn:aws:s3:::test")
            .withPrefix("firehose/")
            .withRoleARN("arn:aws:iam::dummy:role/dummy")));

    while (true) {
      try {
        firehose.describeDeliveryStream(new DescribeDeliveryStreamRequest().withDeliveryStreamName("testStream"));
        break;
      } catch (ResourceNotFoundException e) {
      }
    }

    firehose.putRecord(
      new PutRecordRequest()
        .withDeliveryStreamName("testStream")
        .withRecord(
          new Record()
            .withData(ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8)))));


    assertThat(s3.listObjects("test")
      .getObjectSummaries()).anySatisfy(summary -> {
        assertThat(summary.getKey()).startsWith("firehose/");
      });
  }

  @After
  public void tearDown() {
    firehose.deleteDeliveryStream(new DeleteDeliveryStreamRequest().withDeliveryStreamName("testStream"));

    s3.listObjects("test")
      .getObjectSummaries()
      .forEach(s -> {
        s3.deleteObject(s.getBucketName(), s.getKey());
      });

    s3.deleteBucket("test");
  }
}

まとめ

いかがだったでしょうか。

今回はKinesis FirehoseをLocalStackを使って動かしてみました。 最近LocalStackを使って検証しているのですが、LocalStack非常に便利ですね!

~今度はElasticsearchをDestinationにした記事を書くつもりです。~

と思ったらLocalStackではFirehoseからRedshiftとElasticsearchへの連携は実装されていないようです。 どうしようテスト。 Firehoseに投げるテストをするだけならS3に連携しておけばいいかなって気もしますね。

辛い 辛くない

Issue報告したら直りました。(というかAPIがなかったみたい)

AWS CLIでLocalStack上のdelivery-streamを削除しようとしたらエラーが返ってきます。 今回自分はDockerでLocalStackを使っているので --rmオプションを使ってdocker runしたりやdocker rmしたりして 環境を真っさらにしています。。。