この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
今回、AWSのAmazon Kinesisを触る機会があり、AWS SDK for Androidを使用して接続してみたので、その際の手順をまとめてみました。
事前準備
Kinesisの起動
まずは、Kinesisでストリームを作成します。
ストリームの作成方法はこちらが参考になります。
今回は、Stream Name : exampleKinesis , Number of Shards : 2 で作成してみました。
Access Key, SecretKeyの取得
Android SDKでAWSにアクセスするには、AWSのAccess Key, Secret Keyが必要になるので、取得しておきます。
取得方法はこちらを参照してください。
AWS SDK for Androidの取得
SDKをダウンロードしておきます。
今回は、「lib」-「releas」内の「aws-android-sdk-2.0.5-core.jar」「aws-android-sdk-2.0.5-kinesis.jar」を使用するので、プロジェクトにインポートしておいてください。
Android実装
AndroidからKinesisにデータを送ってみる
以下のような画面を用意し、ボタンを押したらkinesisに、商品名と価格を送信するようなアプリを作成してみます。
レイアウトファイルは以下のようにしました。
fragment_main.xml
<LinearLayout
xmlns:android="http://schemas.android.com/apk/res/android"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:paddingBottom="@dimen/activity_vertical_margin"
android:paddingLeft="@dimen/activity_horizontal_margin"
android:paddingRight="@dimen/activity_horizontal_margin"
android:paddingTop="@dimen/activity_vertical_margin"
android:orientation="vertical"
>
<TextView
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="Store"
android:textSize="24sp"/>
<Button
android:id="@+id/button1"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:layout_marginTop="32dp"
android:text="Apple : 100 yen"/>
<Button
android:id="@+id/button2"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:layout_marginTop="32dp"
android:text="Banana : 130 yen" />
<Button
android:id="@+id/button3"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:layout_marginTop="32dp"
android:text="Orange : 180 yen" />
</LinearLayout>
ひとまず、ボタンのclickListenerとJsonデータの作成コードを実装します。 (Jsonデータの作成には、jsonic-1-3-5.jarを使用しました。)
MainActivity.java
import java.nio.ByteBuffer;
import java.util.Locale;
import net.arnx.jsonic.JSON;
import org.joda.time.DateTime;
import android.os.Bundle;
import android.support.v4.app.Fragment;
import android.support.v7.app.ActionBarActivity;
import android.text.TextUtils;
import android.util.Log;
import android.view.LayoutInflater;
import android.view.View;
import android.view.View.OnClickListener;
import android.view.ViewGroup;
import android.widget.Button;
import android.widget.Toast;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordResult;
public class MainActivity extends ActionBarActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
if (savedInstanceState == null) {
getSupportFragmentManager().beginTransaction()
.add(R.id.container, new PlaceholderFragment()).commit();
}
}
public static class PlaceholderFragment extends Fragment {
@Override
public View onCreateView(LayoutInflater inflater, ViewGroup container,
Bundle savedInstanceState) {
View rootView = inflater.inflate(R.layout.fragment_main, container,
false);
Button button1 = (Button) rootView.findViewById(R.id.button1);
button1.setOnClickListener(clickListener);
Button button2 = (Button) rootView.findViewById(R.id.button2);
button2.setOnClickListener(clickListener);
Button button3 = (Button) rootView.findViewById(R.id.button3);
button3.setOnClickListener(clickListener);
return rootView;
}
private OnClickListener clickListener = new OnClickListener() {
@Override
public void onClick(View v) {
String json = null;
switch (v.getId()) {
case R.id.button1:
json = createJson("Apple", 100);
Toast.makeText(getActivity(), "Appleが売れました。", 0).show();
break;
case R.id.button2:
json = createJson("Banana", 130);
Toast.makeText(getActivity(), "Bananaが売れました。", 0).show();
break;
case R.id.button3:
json = createJson("Orange", 180);
Toast.makeText(getActivity(), "Orangeが売れました。", 0).show();
break;
default:
break;
}
if (!TextUtils.isEmpty(json)) {
final AmazonKinesisClient client = kinesisInit();
kinesisAccess_Put(client, json);
}
}
};
private String createJson(String name, int value) {
DateTime dt = new DateTime();
Hoge hoge = new Hoge();
hoge.datetime = dt.toString("yyyy-MM-dd HH:mm:ss", Locale.JAPAN);
hoge.name = name;
hoge.value = "" + value;
String json = JSON.encode(hoge);
return json;
}
public class Hoge {
public String datetime;
public String name;
public String value;
}
つづいて、AWSに接続する際の初期化処理と、実際の送信処理を記述します。
private AmazonKinesisClient kinesisInit() {
String accessKey = "先ほど取得したアクセスキー";
String secretKey = "先ほど取得したシークレットキー";
AWSCredentials credentials = new BasicAWSCredentials(accessKey,secretKey);
// 今回はリージョンを US West (Oregon) としたので、endpointに US West (Oregon)を指定しています。
String endpoint = "https://kinesis.us-west-2.amazonaws.com";
AmazonKinesisClient client = new AmazonKinesisClient(credentials);
client.setEndpoint(endpoint);
return client;
}
endpointに関しては、各々で作成したリージョン毎に適宜変更してください。endpoinの一覧表はこちらにあります。
つづいて、実際の送信処理を記述します。(実際に使う時には、非同期処理はAsyncTaskなどの方が良いと思います。) 送信処理の場合には、kinesisのストリーム名(作成時に決めた名前)と、パーティションキー(shardが複数ある場合に分散時に判断するためのテキスト(任意で決める))を設定する必要があります。
private String stremName = "exampleKinesis";
private void kinesisAccess_Put(final AmazonKinesisClient client,final String json) {
(new Thread(new Runnable() {
@Override
public void run() {
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName(stremName);
DescribeStreamResult describeStreamResponse = client
.describeStream(describeStreamRequest);
String streamStatus = describeStreamResponse
.getStreamDescription().getStreamStatus();
// kinesisのストリームの状態がACTIVEであれば、送信処理を実施する。
if (streamStatus.equals("ACTIVE")) {
putRecode(client, json);
}
}
})).start();
}
private void putRecode(AmazonKinesisClient client, String json) {
String key = "store_akiba";
Log.d("jsonEncode", "encode:" + json);
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(stremName);
putRecordRequest.setData(ByteBuffer.wrap(json.getBytes()));
putRecordRequest.setPartitionKey(key);
PutRecordResult putRecordResult = client
.putRecord(putRecordRequest);
Log.d("putRecode", "result:" + putRecordResult.toString());
}
上記のkinesisAccess_Put()では、まずkinesisのストリーム状態(CREATE, ACTIVE, DELETEなど)を確認し、ACTIVEの状態であれば送信処理を行っています。 putRecode()が実際の送信処理になります。PutRecordRequestで、どのストリームに送信するか(setStreamName)、送信するデータ(setData)、パーティションキー(setPartitionKey)を設定し、client.putRecord()で送信処理を行います。
ここで一旦実行してみました。
ログに、shardIdとSequenceNumberが表示されていれば、送信成功です。
Kinesis内のデータを取得してみる
実際に、保存されているかどうかなどは後述の受信処理を行うか、awscliにて確認することができます。
まずは、awscliでの確認方法を記述します。
コンソールでawsのkinesisコマンドを実行するのですが、内部のデータを確認するには、まずshard-iteratorの文字列を取得し、その文字列を引数にして内部のデータを参照します。
- shard-iteratorの取得
例) $ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --stream-name exampleKinesis --shard-iterator-type TRIM_HORIZON
- 内部データの取得
例) $ aws kinesis get-records --shard-iterator "AAAAAA ~
shard-iteratorの取得時に必要なshardIdは、先ほどのログに記録されていたものを使用します。
- 結果
{
"Records": [
{
"PartitionKey": "store_akiba",
"Data": "eyJkYXRldGltZSI6IjI ~",
"SequenceNumber": "49541594946334033975392935981650703103931505262058274818"
},
{
"PartitionKey": "store_akiba",
"Data": "eyJkYXRldGltZSI6Ij ~",
"SequenceNumber": "49541594946334033975392935981651912029751120922025132034"
},
{
"PartitionKey": "store_akiba",
"Data": "eyJkYXRldGltZSI6 ~",
"SequenceNumber": "49541594946334033975392935981653120955570735757358268418"
}
],
"NextShardIterator": "AAAAAAAA ~
}
先ほど、3回データを送信したので、3つ分のデータが格納されていました。
それでは、次はAndroidで取得してみます。
先ほどとは別の、取得用のプロジェクトを作成します。(同様に、aws-android-sdk-2.0.4-core.jar, aws-android-sdk-2.0.4-kinesis.jarをインポートします。)
kinesis内のデータを参照し、店舗の売り上げを表示するようなアプリを作成します。
レイアウトファイルは以下のようにします。
fragment_main.xml
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:paddingBottom="@dimen/activity_vertical_margin"
android:paddingLeft="@dimen/activity_horizontal_margin"
android:paddingRight="@dimen/activity_horizontal_margin"
android:paddingTop="@dimen/activity_vertical_margin"
android:orientation="vertical"
>
<TextView
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:text="akiba売り上げ"
android:textSize="22sp"
/>
<TextView
android:id="@+id/akiba"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:text=""
android:textSize="22sp"
/>
<TextView
android:layout_marginTop="32dp"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:text="shinjuku売り上げ"
android:textSize="22sp"
/>
<TextView
android:id="@+id/shinjuku"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:text=""
android:textSize="22sp"
/>
</LinearLayout>
続いて実装です。まずは、表示部分の実装です。今回は1秒毎に表示を更新する、という仕様にしました。
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import android.os.Bundle;
import android.os.Handler;
import android.support.v4.app.Fragment;
import android.support.v7.app.ActionBarActivity;
import android.util.Log;
import android.view.LayoutInflater;
import android.view.View;
import android.view.ViewGroup;
import android.widget.TextView;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.util.json.JSONException;
import com.amazonaws.util.json.JSONObject;
public class MainActivity extends ActionBarActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
if (savedInstanceState == null) {
getSupportFragmentManager().beginTransaction()
.add(R.id.container, new PlaceholderFragment()).commit();
}
}
public static class PlaceholderFragment extends Fragment {
private TextView akiba;
private TextView shinjuku;
private Timer mTimer;
private Handler mHandler;
private int akibaValue;
private int shinjukuValue;
@Override
public View onCreateView(LayoutInflater inflater, ViewGroup container,
Bundle savedInstanceState) {
View rootView = inflater.inflate(R.layout.fragment_main, container,
false);
akiba = (TextView) rootView.findViewById(R.id.akiba);
shinjuku = (TextView) rootView.findViewById(R.id.shinjuku);
return rootView;
}
@Override
public void onStart() {
super.onStart();
mTimer = new Timer();
mHandler = new Handler();
// 1秒毎に表示テキストを更新する。
int refleshTime = 1000;
mTimer.schedule(new TimerTask() {
@Override
public void run() {
AmazonKinesisClient client = kinesisInit();
kinesisAccess_Get(client);
if (mHandler != null) {
mHandler.post(new Runnable() {
@Override
public void run() {
// テキストを更新
setText();
}
});
}
}
}, 0, refleshTime);
}
@Override
public void onPause() {
super.onPause();
mTimer.cancel();
mTimer = null;
mHandler = null;
}
private void setText() {
akiba.setText("" + akibaValue + " yen");
shinjuku.setText("" + shinjukuValue + " yen");
}
次に、AWSに接続する際の初期化処理です。こちらは先ほどと同様です。
private AmazonKinesisClient kinesisInit() {
String accessKey = "先ほど取得したアクセスキー";
String secretKey = "先ほど取得したシークレットキー";
AWSCredentials credentials = new BasicAWSCredentials(accessKey,secretKey);
// 今回はリージョンを US West (Oregon) としたので、endpointに US West (Oregon)を指定しています。
String endpoint = "https://kinesis.us-west-2.amazonaws.com";
AmazonKinesisClient client = new AmazonKinesisClient(credentials);
client.setEndpoint(endpoint);
return client;
}
最後に、kinesisからのデータ取得処理です。
private void kinesisAccess_Get(AmazonKinesisClient client) {
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName(stremName);
DescribeStreamResult describeStreamResponse = client
.describeStream(describeStreamRequest);
String streamStatus = describeStreamResponse.getStreamDescription()
.getStreamStatus();
List<Shard> shards = describeStreamResponse.getStreamDescription()
.getShards();
// kinesisのストリームの状態がACTIVEであれば、送信処理を実施する。
if (streamStatus.equals("ACTIVE")) {
Log.d("getRecode", "active");
getRecord(client, shards);
}
}
private void getRecord(AmazonKinesisClient client, List<Shard> shards) {
akibaValue = 0;
shinjukuValue = 0;
for (int i = 0; i < shards.size(); i++) {
Log.d("getRecode", "shardId" + i + ":"
+ shards.get(i).getShardId());
// ShardIteratorを取得する処理
GetShardIteratorRequest getShardRequest = new GetShardIteratorRequest();
getShardRequest.setStreamName(stremName);
getShardRequest.setShardId(shards.get(i).getShardId());
getShardRequest.setShardIteratorType(ShardIteratorType.TRIM_HORIZON);
GetShardIteratorResult getShardResult = client
.getShardIterator(getShardRequest);
String shardIterator = getShardResult.getShardIterator();
Log.d("getRecode", "result_shard:" + shardIterator);
// 取得したShardIteratorからデータを取得する処理
GetRecordsRequest getRecordRequest = new GetRecordsRequest();
getRecordRequest.setShardIterator(shardIterator);
getRecordRequest.setLimit(100);
GetRecordsResult getRecordResult = client
.getRecords(getRecordRequest);
List<Record> records = getRecordResult.getRecords();
// 取得したデータリストの中にjsonデータがあるので一つずつ取り出す
for (int j = 0; j < records.size(); j++) {
byte[] b = records.get(j).getData().array();
String s = new String(b);
Log.d("getRecode", "result:" + s);
try {
JSONObject object = new JSONObject(s);
int v = object.getInt("value");
switch (i) {
case 0:
akibaValue += v;
break;
case 1:
shinjukuValue += v;
break;
default:
break;
}
} catch (JSONException e) {
e.printStackTrace();
}
}
}
}
kinesisAccess_Get()は先ほどのkinesisAccess_Put()とほぼ同様ですが、コンソールでの取得と同じように、kinesisからデータ取得する際はShardIdが必要となるので、 describeStreamResponse.getStreamDescription().getShards() でストリーム内にあるShardのリストを取得しています。
getRecord()では、先ほどのコンソールと同様に、ShardIteratorの取得 -> 内部データの取得を行い、結果がjsonで返却されるので、forループ内でパースしています。
これで、実行すると前述した図のように表示されます。ちなみに今回は、apple,banana,orangeを1回ずつタップしたので、合計410yenが表示されました。
ひとまずこれで、1つのshardでの送受信が確認できました。今回は2つのshardを作成したのでそちらにも振り分けてみたいと思います。
shardの振り分けは簡単で、送信時に設定したパーティションキーを変更することで、振り分けが可能です。(正確にはパーティションキーのハッシュ値で自動で振り分けられるので、確実に振り分けるにはパーティションキーを計算する必要があります。)
それでは、送信用アプリのパーティションキーを、store_akiba から store_shinjuku に変更してみます。
ログを確認すると、shardIdが shardId-000000000001 に変わっています。
先ほどのデータ取得アプリを確認するとshinjuku売り上げが増えていることが確認できます。
これで、kinesisのデータ送受信ができました!
まとめ
多少クセがあるかと思いますが、いままで多大なコストを払って取得していたビックデータなどを、低コストで手軽に扱えるようになるのは非常に魅力的だと思います。 特にスマートフォンはセンサーなどの情報が多いので、上手く使えれば大きなビジネスにつながりそうですね!
それでは、今日はこの辺で。