AWS SDK for JavaでKinesisにアクセスする
Amazon Kinesisとは
Kinesisとは、いろいろな場所から送られてくる大規模なデータを取得し、格納するためのサービスで、 以下のような特徴をもってます。
- データの格納場所(Amazon Kinesis ストリーム)を簡単に作成でき、すぐに使い始められる
- 入力データ量に応じて、ストリームのスループットを動的に調整可能
- データの信頼性が高く、低コスト
※Kinesisについては、このあたりに詳しく書いてあるので参考にしてください
今回はJavaアプリでKinesisからデータを取得する方法について紹介します。
動作環境
今回使用した動作環境は以下のとおりです。 * OS : MacOS X 10.9.4 * Java : 1.8.0_11 * Python : 2.7.5
KinesisにJavaでアクセスする
Kinesis ストリームを作成する
AWSコンソールのAWS Kinesisにアクセスし、create streamボタンを押してストリームを作成します。 ※ここではsample-streamと仮定
aws cliをインストール
今回はテストデータをaws cliでKinesisに突っ込むので、aws cliをインストールしておきましょう。 参考:MacユーザがAWS CLIを最速で試す方法 なお、ここでaws cliを設定して~/.awsにキー情報を登録しておけば、プログラムにアクセスキーを記述しなくても大丈夫になります。
aws cliでKinesisにデータ登録
aws cliを使えば、Kinesisに対して簡単にアクセスすることができます。 今回はjsonファイルを用意し、そのデータを登録してみましょう。 まずは次のようなフォーマットのjsonファイルをsample.jsonという名前で用意します。
{ "Records": [ { "Data": "this is sample data!", "PartitionKey": "sample-key" } ], "StreamName": "sample-stream" }
そしてaws cliを使ってKinesisに登録しましょう。
% aws kinesis put-records --cli-input-json file://sample.json
これでKinesisにデータが登録されたので、次はこのデータをJavaプログラムから取得してみます。
JavaからKinesisにアクセス
ここからAWS SDK for Javaをダウンロードし、 aws-java-sdk-1.x.jarと依存するライブラリをクラスパスに含めます。
必要なクラスをimportしたら、AmazonKinesisクライアントをインスタンス化し、 シャード情報を取得します。 ※シャードとはKinesisストリームのスループット単位
import com.amazonaws.services.kinesis.*; import com.amazonaws.services.kinesis.model.*; import java.nio.charset.*; ・ ・ //クライアントをインスタンス化 //プログラムにキーを記述する場合、 //new AmazonKinesisClient(new BasicAWSCredentials(accessKey, secretKey) //とする AmazonKinesis kinesis = new AmazonKinesisClient(); //デコーダー CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); //シャード情報を取得 DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(prop.getString("kinesis.name")); DescribeStreamResult describeStreamResult = kinesis.describeStream(describeStreamRequest); List shards = describeStreamResult.getStreamDescription().getShards();
次に、対象のシャードから「シャードイテレータ」と呼ばれるイテレータを取得します。 このイテレータをつかってレコード情報を取得します。 ※実際には取得したシャードのリストを順次精査していきますが、今回はシャードは1つと仮定
Shard shard = shards.get(0); GetShardIteratorRequest iterReq = new GetShardIteratorRequest(); iterReq.setStreamName(<Kinesisストリーム名>); iterReq.setShardId(shard.getShardId()); // シャードの最初からデータを取得 iterReq.setShardIteratorType("TRIM_HORIZON"); //シャードイテレータ取得 GetShardIteratorResult iteratorRes = kinesis.getShardIterator(iterReq); String shardIterator = iteratorRes.getShardIterator();
シャードイテレータタイプを「TRIM_HORIZON」としてますが、 これはレコード取得時、最も古く保存されたレコードから取得を開始します。 それ以外にも次のようなイテレータタイプを指定することが可能です。
- AT_SEQUENCE_NUMBER:与えられたシーケンス番号から取得する
- AFTER_SEQUENCE_NUMBER:与えられたシーケンス番号の後から取得する
- LATEST:到着した新しいレコードから取得する
実際のアプリケーションでは、取得したレコードのシーケンス番号をデータベース等に保存しておき、 エラーが発生したりして処理が中断したときでも、保存しておいたシーケンス番号を指定して続きから処理を再開する、 というような方法を使います。(その場合、イテレータタイプはAFTER_SEQUENCE_NUMBERを使用)
では、取得したシャードイテレータを使ってレコードを取得しましょう。 ※ここでは取得したレコード内容を表示してるだけ
while (true) { //シャードイテレータを使ってレコード取得 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); GetRecordsResult result = kinesis.getRecords(getRecordsRequest); //レコードのリストを取得 List records = result.getRecords(); for (Record r : records) { //取得したデータをデコードして表示 System.out.println(decoder.decode(r.getData()).toString()); } //レコードを取得間隔をあけるためsleep try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } //次のシャードイテレータ取得(データ終端でも取得できるので注意) shardIterator = result.getNextShardIterator(); }
レコード取得間隔は1秒以上空けなければいけないようなので途中でsleepさせてます。 そして、レコード取得後に再度シャードイテレータを取得し、次のデータを探します。 上のサンプルだと、while(true)で無限ループになっているので、実際には 終了条件を決めてループから抜けるようにする必要があるでしょう。
まとめ
今回はAmazon KinesisにJavaからアクセスして、データを取得する方法について紹介しました。 Kinesisは弊社でも最近よく使用されるようになってきているサービスです。 みなさんもぜひ使ってみてください。
参考サイトなど
- Amazon Kinesis!:https://dev.classmethod.jp/cloud/aws-advent-calendar-2013-amazon-kinesis/
- 事例からAmazon Kinesisとは何なのかを学ぶ:https://dev.classmethod.jp/cloud/aws/what-is-kinesis/