iOSからストリームデータをKinesisに流し込む #アドカレ2015

2015.12.22

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

本記事はAWSモバイルアドベントカレンダーの22日目の記事です。

iOS からとれるデータをストリームデータとしてAWS Kinesisに流し込むことを目標に、実装までの手順を書き連ねていきます。

iOS側の準備

iOSアプリをSingle View Application -> Swiftで新規作成した後、次の手順でKinesisへのインターフェイスを用意します。

まず、cocoapodsを用いてAWS Core, AWS Kinesis, AWS Cognitoのフレームワークをインストールします。

Podfile

target 'KinesisStudy' do
  pod 'AWSCore'
  pod 'AWSKinesis'
  pod 'AWSCognito'
end
$ pod install

新しいプロジェクトにObjectiveCの適当なファイルを作成してBridgingHeaderを作成するかどうか聞かれるので、Yesを押して作成します。

作成したBridgingHeaderには次のようにAWSCore, AWSCognito, AWSKinesisのヘッダを記述します。

XXX-Bridging-Header.h

#import <AWSCore/AWSCore.h>
#import <AWSCognito/AWSCognito.h>
#import <AWSKinesis/AWSKinesis.h>

iOS側からAWSのフレームワークを利用する準備が整いました。

次にAWSインフラ側の設定に移ります。まずCognitoの設定をしてみましょう。

Cognito Identity Poolの用意

[Amazon Cognito] Cognito の導入方法がより簡単になりました! を参考にしてCognito identity poolを作成します。今回の例ではログインを必要としない匿名ユーザとしてデータをアップロードするため、Unauthenticated IdentitiesをEnabledにして作成しています。Identity Poolの名前はKinesisStudyとしました。

Cognitoのセットアップと同時にIAMロールの作成後に表示される次のようなソースコードの例を参考に

次のように所定のファイルにコードを追加し、Cognito Sync Storeの機能を用いることでCognitoが正しく動いているかどうか確認して見ましょう。

AppDelegate.swift

@UIApplicationMain
class AppDelegate: UIResponder, UIApplicationDelegate {

    var window: UIWindow?

    func application(application: UIApplication, didFinishLaunchingWithOptions launchOptions: [NSObject: AnyObject]?) -> Bool {
        // Override point for customization after application launch.

        let credentialsProvider = AWSCognitoCredentialsProvider(regionType:.APNortheast1,
            identityPoolId:"ap-northeast-1:XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXX") // Cognito identity pool id

        let configuration = AWSServiceConfiguration(region:.APNortheast1, credentialsProvider:credentialsProvider)

        AWSServiceManager.defaultServiceManager().defaultServiceConfiguration = configuration

        return true
    }
}

ViewController.swift

class ViewController: UIViewController {

    override func viewDidLoad() {
        super.viewDidLoad()
        // Do any additional setup after loading the view, typically from a nib.

        // Initialize the Cognito Sync client
        let syncClient = AWSCognito.defaultCognito()

        // Create a record in a dataset and synchronize with the server
        let dataset = syncClient.openOrCreateDataset("myDataset")
        dataset.setString("myValue", forKey:"myKey")
        dataset.synchronize().continueWithBlock {(task: AWSTask!) -> AnyObject! in
            // Your handler code here
            return nil

        }
    }
}

iPhoneシミュレーター向けにビルドします。

ビルドしてシミュレータ上のアプリが起動した後に、作成したCognito Identity Poolの画面を開き、次のようにダッシュボード上のグラフにモバイルからのアクセスがあれば疎通が正しく行えています。

Cognitoの疎通を確認できたので続いてはKinesisのストリームを作成します。

Kinesisストリームの用意

Edisonを使ってセンサデータをAmazon Kinesisにあげてみる【後編】- Qiitaの作成手順をもとに、Kinesisのストリームを作成します。

次に作成したKinesisストリームへのデータPut権限を先ほど作成したCognito Identity PoolのUnauth IAM Roleに付与します。

IAM -> ロールで以下のような一覧が表示されるのでそのなかから先ほどのCognito Identity Poolの作成の際に生成したIAMロール(Unauth)を選びます。

表示される画面からロールポリシーの作成を選択

Policy Generatorを選択

アクセス許可の編集で許可、Amazon Kinesisを選択し、アクションの中からPutRecord、PutRecordsを選択して、アマゾンリソース名に以下のような形式のAmazon リソースネーム(ARN)を入力します。

arn:aws:kinesis:リージョン名:アカウントID:stream/ストリーム名

ここではリージョン名がap-northeast-1, ストリーム名がkinesis_ios_study, アカウントIDがabcd-efgh-ijkn(各アルファベットは英数字)の形式のため以下の様な ARNになります。

arn:aws:kinesis:ap-northeast-1:abcdefghijkn:stream/kinesis_ios_study

入力が完了したらステートメントを追加ボタンを押します。次のステップボタンが有効になるので押し、ポリシーの確認画面に入ります。

ポリシーの適用を押すとIAM Roleに新たなポリシーが追加されたことがわかります。

これでKinesisにデータを送る準備ができました。iOSからの疎通確認を行います。

iOSとKinesisの疎通確認

ViewController.swiftに下記のように記述してみます。

ViewController.swift

class ViewController: UIViewController {

    override func viewDidLoad() {
        super.viewDidLoad()
        // Do any additional setup after loading the view, typically from a nib.

        let recorder = AWSKinesisRecorder.defaultKinesisRecorder()

        // Create an data array to track the saveRecord calls.
        let datum: [NSData] = (0..<100).map { i in "TestString-\(i)".dataUsingEncoding(NSUTF8StringEncoding)! }
        // Call saveRecord for each record
        let tasks: [AWSTask] = datum.map { data in recorder.saveRecord(data, streamName: "kinesis_ios_study") }

        // When all of the saveRecord calls have completed, call submitAllRecords to write all records
        // to the Kinesis stream
        AWSTask(forCompletionOfAllTasks: tasks).continueWithSuccessBlock { task -> AnyObject? in
            recorder.submitAllRecords()
        }.continueWithBlock { task -> AnyObject? in
            if let err = task.error {
              NSLog("Error: [%@]", err)
            }
            return nil
        }
    }
}

IAMの設定がうまくいっていない場合等はエラーのログが吐かれます。ログが吐かれなかったら送信できているのですが、実際にPutされたレコードの確認にはAWS CLIを用います。基本的なストリームオペレーションの実行を参考にレコードをコマンドライン上に出力してみます。

まず以下のコマンドでKinesisストリームの情報を出力します。

入力

$ aws kinesis describe-stream --stream-name kinesis_ios_study --profile XXXX

出力  

{
    "StreamDescription": {
        "RetentionPeriodHours": 24,
        "StreamStatus": "ACTIVE",
        "StreamName": "kinesis_ios_study",
        "StreamARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxxx:stream/kinesis_ios_study",
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "EndingHashKey": "00000000000000000000000000",
                    "StartingHashKey": "0"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "000000000000000000000000"
                }
            }
        ]
    }
}

次にこの時得られたShardIdを用いてShard-Iteratorを取得します。Shard-Iteratorはget-recordコマンドの読み取るストリームと次のシャードの位置を表します。

入力

$ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name kinesis_ios_study --profile XXXX

出力

{
    "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yA..."
}

この時得られるランダムな長い文字列がシャードイテレータで、get-recordコマンドでこのイテレータを付与すれば先ほどiOSからPutされたレコードが取得できます。

入力

$ aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yA... --profile XXXX

出力

{
    "Records": [
        {
            "Data": "VGVzdFN0cmluZy0w",
            "PartitionKey": "XXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXXXXXX",
            "ApproximateArrivalTimestamp": 1450666876.557,
            "SequenceNumber": "49557451555804241389077152914070250810773025594045104130"
        },
        {
            "Data": "VGVzdFN0cmluZy0x",
            "PartitionKey": "XXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXXXXXX",
            "ApproximateArrivalTimestamp": 1450666876.557,
            "SequenceNumber": "49557451555804241389077152914071459736592640223219810306"
        },
        ... 
    ],
    "NextShardIterator": "AAAAAAAAAAHr+d1mwlTxRIMRoThzKBihrr....",
    "MillisBehindLatest": 704000
}

データの形式はBase64でエンコードされていますが、これをデコードすると先ほどのコードで記述した文字列が入っていることが分かります。

入力

$ echo "VGVzdFN0cmluZy0w" | base64 --decode

出力

TestString-0%

補遺

AWSモバイルシリーズのアドカレということで今回はiOSからKinesisにデータをPutするところまでやりましたが、記事を書き始めた当初はKinesisにPutされたデータをSpark Streamを用いて分析するところまで含むものだったので、後日iOSに特有のUIから取れる情報をSparkを用いて分析する記事をかければと思っています。

明日は

yuukigoodmanさんの Cognito についての記事です。お楽しみに!

参考