この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
LocalStackでKinesis Firehose検証用のコード書いています。齋藤です。
それでは今回はLocalStackでKinesis Firehoseを動かしてみたいと思います。 Firehoseの連携先はS3です。 ちなみに最後の方にも書いたのですが、LocalStackではFirehoseからRedshiftとElasticsearchへの連携は実装されていないようです。
今回書いたコードはgithub上に置いています。
LocalStackでKinesis Firehoseの連携先S3バケットを作成する
S3のバケットを作成するためにはAmazonS3オブジェクトが必要になってきます。 dd以下のコードでLocalStackに向けたS3クライアントをセットアップします。
AWSCredentialsProvider dummyProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy"));
AmazonS3 s3 = AmazonS3ClientBuilder.standard()
.withCredentials(dummyProvider)
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4572", null))
.withPathStyleAccessEnabled(true) // これがないとエラーになります。
.build();
バケットの作成を以下のコードで行います。
s3.createBucket("test");
これでひとまず、LocalStackで使うS3のバケットのセットアップは終了です。
LocalStackでKinesis FirehoseのDelivery Streamを作成します。
LocalStackに向けたKinesis Firehoseのクライアントの作成は以下のコードで行います。
AWSCredentialsProvider dummyProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy"));
AmazonKinesisFirehose firehose = AmazonKinesisFirehoseClientBuilder.standard()
.withCredentials(dummyProvider)
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4573", null))
.build();;
先ほど作成したFirehoseのクライアントを使って 以下のコードでDelivery Streamを作成します。
firehose.createDeliveryStream(
new CreateDeliveryStreamRequest()
.withDeliveryStreamName("testStream")
.withS3DestinationConfiguration(
new S3DestinationConfiguration()
.withBucketARN("arn:aws:s3:::test") // 先ほど作ったS3のバケット名でS3のARNを設定します。
.withPrefix("firehose/")
.withRoleARN("arn:aws:iam::dummy:role/dummy"))) // LocalStackにはIAM関連のAPIはないみたいなのでdummy roleです。
RecordをPutしてS3に連携されていることをザッと確認してみます。
RecordをPutします。 このコードは都元ダイスケ氏の記事から持ってきました。
firehose.putRecord(
new PutRecordRequest()
.withDeliveryStreamName("testStream")
.withRecord(
new Record()
.withData(ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8)))));
S3からlistObjectを使ってKinesisによって連係されていることを確認します。 アサーションはassertjを使っています。
assertThat(s3.listObjects("test")
.getObjectSummaries()).anySatisfy(summary -> {
assertThat(summary.getKey()).startsWith("firehose/");
});
S3のお片付け
S3ってobjectがあると削除できないんですね・・・・。初めて知りました。 先にObjectを削除してからBucketを削除します。
@After
public void tearDown() {
firehose.deleteDeliveryStream(new DeleteDeliveryStreamRequest().withDeliveryStreamName("testStream"));
s3.listObjects("test")
.getObjectSummaries()
.forEach(s -> {
s3.deleteObject(s.getBucketName(), s.getKey());
});
s3.deleteBucket("test");
}
なお、CLIだとforce deleteできるようです。 ログを見た感じだと中で似たようなことをやっていそうですね。
aws --endpoint http://localhost:4572 s3 rb s3://test --force
delete: s3://test/firehose/28fe77a2-d0ef-4d39-9d6a-1a576709b922
delete: s3://test/firehose/2e4038af-7997-4735-8e2f-2e5f69b7fb88
delete: s3://test/firehose/3038ba8b-df66-46f9-9398-4d7e3ac7182f
delete: s3://test/firehose/8c1720be-e3dd-4faa-b068-d254434c00fe
delete: s3://test/xxxxx
remove_bucket: test
今回書いたコード
今回書いたコードはgithub上に置いています。 こちらにも貼り付けておきます。
public class FirehoseTest {
private AmazonKinesisFirehose firehose;
private AWSCredentialsProvider dummyProvider;
private AmazonS3 s3;
@Before
public void setup() {
System.setProperty("com.amazonaws.sdk.disableCbor", "1"); // LocalStackが対応していないプロトコルを使わないようにする
dummyProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy"));;
firehose = AmazonKinesisFirehoseClientBuilder.standard()
.withCredentials(dummyProvider)
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4573", null))
.build();;
s3 = AmazonS3ClientBuilder.standard()
.withCredentials(dummyProvider)
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4572", null))
.withPathStyleAccessEnabled(true)
.build();;
}
@Test
public void test() {
s3.createBucket("test");
firehose.createDeliveryStream(
new CreateDeliveryStreamRequest()
.withDeliveryStreamName("testStream")
.withS3DestinationConfiguration(
new S3DestinationConfiguration()
.withBucketARN("arn:aws:s3:::test")
.withPrefix("firehose/")
.withRoleARN("arn:aws:iam::dummy:role/dummy")));
while (true) {
try {
firehose.describeDeliveryStream(new DescribeDeliveryStreamRequest().withDeliveryStreamName("testStream"));
break;
} catch (ResourceNotFoundException e) {
}
}
firehose.putRecord(
new PutRecordRequest()
.withDeliveryStreamName("testStream")
.withRecord(
new Record()
.withData(ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8)))));
assertThat(s3.listObjects("test")
.getObjectSummaries()).anySatisfy(summary -> {
assertThat(summary.getKey()).startsWith("firehose/");
});
}
@After
public void tearDown() {
firehose.deleteDeliveryStream(new DeleteDeliveryStreamRequest().withDeliveryStreamName("testStream"));
s3.listObjects("test")
.getObjectSummaries()
.forEach(s -> {
s3.deleteObject(s.getBucketName(), s.getKey());
});
s3.deleteBucket("test");
}
}
まとめ
いかがだったでしょうか。
今回はKinesis FirehoseをLocalStackを使って動かしてみました。 最近LocalStackを使って検証しているのですが、LocalStack非常に便利ですね!
~今度はElasticsearchをDestinationにした記事を書くつもりです。~
と思ったらLocalStackではFirehoseからRedshiftとElasticsearchへの連携は実装されていないようです。 どうしようテスト。 Firehoseに投げるテストをするだけならS3に連携しておけばいいかなって気もしますね。
辛い 辛くない
Issue報告したら直りました。(というかAPIがなかったみたい)
AWS CLIでLocalStack上のdelivery-streamを削除しようとしたらエラーが返ってきます。
今回自分はDockerでLocalStackを使っているので
--rm
オプションを使ってdocker run
したりやdocker rm
したりして環境を真っさらにしています。。。