AWS SDK for JavaでKinesisにアクセスする

2015.06.18

Amazon Kinesisとは

Kinesisとは、いろいろな場所から送られてくる大規模なデータを取得し、格納するためのサービスで、 以下のような特徴をもってます。

  • データの格納場所(Amazon Kinesis ストリーム)を簡単に作成でき、すぐに使い始められる
  • 入力データ量に応じて、ストリームのスループットを動的に調整可能
  • データの信頼性が高く、低コスト

※Kinesisについては、このあたりに詳しく書いてあるので参考にしてください

今回はJavaアプリでKinesisからデータを取得する方法について紹介します。

動作環境

今回使用した動作環境は以下のとおりです。 * OS : MacOS X 10.9.4 * Java : 1.8.0_11 * Python : 2.7.5

KinesisにJavaでアクセスする

Kinesis ストリームを作成する

AWSコンソールのAWS Kinesisにアクセスし、create streamボタンを押してストリームを作成します。 ※ここではsample-streamと仮定

aws cliをインストール

今回はテストデータをaws cliでKinesisに突っ込むので、aws cliをインストールしておきましょう。 参考:MacユーザがAWS CLIを最速で試す方法 なお、ここでaws cliを設定して~/.awsにキー情報を登録しておけば、プログラムにアクセスキーを記述しなくても大丈夫になります。

aws cliでKinesisにデータ登録

aws cliを使えば、Kinesisに対して簡単にアクセスすることができます。 今回はjsonファイルを用意し、そのデータを登録してみましょう。 まずは次のようなフォーマットのjsonファイルをsample.jsonという名前で用意します。

{
"Records": [
{
"Data": "this is sample data!",
"PartitionKey": "sample-key"
}
],
"StreamName": "sample-stream"
}

そしてaws cliを使ってKinesisに登録しましょう。

% aws kinesis put-records --cli-input-json file://sample.json

これでKinesisにデータが登録されたので、次はこのデータをJavaプログラムから取得してみます。

JavaからKinesisにアクセス

ここからAWS SDK for Javaをダウンロードし、 aws-java-sdk-1.x.jarと依存するライブラリをクラスパスに含めます。

必要なクラスをimportしたら、AmazonKinesisクライアントをインスタンス化し、 シャード情報を取得します。 ※シャードとはKinesisストリームのスループット単位

import com.amazonaws.services.kinesis.*;
import com.amazonaws.services.kinesis.model.*;
import java.nio.charset.*;
・
・
//クライアントをインスタンス化
//プログラムにキーを記述する場合、
//new AmazonKinesisClient(new BasicAWSCredentials(accessKey, secretKey)
//とする
AmazonKinesis kinesis = new AmazonKinesisClient();
//デコーダー
CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
//シャード情報を取得
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName(prop.getString("kinesis.name"));
DescribeStreamResult describeStreamResult = kinesis.describeStream(describeStreamRequest);
List shards = describeStreamResult.getStreamDescription().getShards();

次に、対象のシャードから「シャードイテレータ」と呼ばれるイテレータを取得します。 このイテレータをつかってレコード情報を取得します。 ※実際には取得したシャードのリストを順次精査していきますが、今回はシャードは1つと仮定

Shard shard = shards.get(0);
GetShardIteratorRequest iterReq = new GetShardIteratorRequest();
iterReq.setStreamName(<Kinesisストリーム名>);
iterReq.setShardId(shard.getShardId());
// シャードの最初からデータを取得
iterReq.setShardIteratorType("TRIM_HORIZON");
//シャードイテレータ取得
GetShardIteratorResult iteratorRes = kinesis.getShardIterator(iterReq);
String shardIterator = iteratorRes.getShardIterator();

シャードイテレータタイプを「TRIM_HORIZON」としてますが、 これはレコード取得時、最も古く保存されたレコードから取得を開始します。 それ以外にも次のようなイテレータタイプを指定することが可能です。

  • AT_SEQUENCE_NUMBER:与えられたシーケンス番号から取得する
  • AFTER_SEQUENCE_NUMBER:与えられたシーケンス番号の後から取得する
  • LATEST:到着した新しいレコードから取得する

実際のアプリケーションでは、取得したレコードのシーケンス番号をデータベース等に保存しておき、 エラーが発生したりして処理が中断したときでも、保存しておいたシーケンス番号を指定して続きから処理を再開する、 というような方法を使います。(その場合、イテレータタイプはAFTER_SEQUENCE_NUMBERを使用)

では、取得したシャードイテレータを使ってレコードを取得しましょう。 ※ここでは取得したレコード内容を表示してるだけ

while (true) {
//シャードイテレータを使ってレコード取得
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
GetRecordsResult result = kinesis.getRecords(getRecordsRequest);
//レコードのリストを取得
List records = result.getRecords();
for (Record r : records) {
//取得したデータをデコードして表示
System.out.println(decoder.decode(r.getData()).toString());
}
//レコードを取得間隔をあけるためsleep
try {
Thread.sleep(1000);
}
catch (InterruptedException exception) {
throw new RuntimeException(exception);
}

//次のシャードイテレータ取得(データ終端でも取得できるので注意)
shardIterator = result.getNextShardIterator();
}

レコード取得間隔は1秒以上空けなければいけないようなので途中でsleepさせてます。 そして、レコード取得後に再度シャードイテレータを取得し、次のデータを探します。 上のサンプルだと、while(true)で無限ループになっているので、実際には 終了条件を決めてループから抜けるようにする必要があるでしょう。

まとめ

今回はAmazon KinesisにJavaからアクセスして、データを取得する方法について紹介しました。 Kinesisは弊社でも最近よく使用されるようになってきているサービスです。 みなさんもぜひ使ってみてください。

参考サイトなど