[Java編] DynamoDB StreamsでLambdaを使用する

2016.03.06

AWS Lambda と Amazon DynamoDB Streams

Amazon DynamoDB Streams

DynamoDBのテーブルに対して更新が発生した場合、その発生時に任意の処理をしたい要望があるとおもいます。
DynamoDB Streamsを使用すると、テーブルに対する変更をキャプチャしてログに保存します。
アプリは、このログにアクセスし、データの変更前/変更後の内容を(ほぼ)リアルタイムで参照できます。
※Dynamo Streamsの詳細についてはこちら

AWS Lambda

AWS Lambdaは、サーバを用意しなくてもサーバサイドの実行環境をAWSで用意してくれるAWSのサービスです。
サーバのプロビジョニングなしでサーバサイドのコードを実行でき、料金は使用したぶんだけ発生します。
API Gatewayと連携してサーバレスアプリを構築したり、他のAWSサービスからトリガー設定して起動するようにすることが可能です。
現在対応している言語は、JavaScript/Java/Pythonです。

なお、類似サービスにGoogle Cloud Functionsなどがあります。
AWS Lambdaについてはこちらなどを参照してください。

DynamoDB StreamsでLambdaを使用する

DynamoDB Streamsを有効にしたテーブルに対して、AWS Lambda関数をトリガーとして使用することが可能です。
このトリガーは、テーブルに対して実行された更新(登録/更新/削除)のタイミングでLambda関数を起動します。
トリガーを作成するにはDynamoテーブルに対してDynamoDB Streamを有効にしたあと、紐付けるLambda関数を指定します。

DynamoDB Streams + AWS Lambda(Java)を動かしてみる

では、AWSコンソールを使ってDynamo Streams + Lambdaのサンプルプログラムを動かしてみましょう。
やってることはこことだいたい同じです。

1.テーブル作成&DynamoDB Streams有効化

MySteramTableという名前で、プライマリパーティションキー(Hashキー)をid(String)で作成します。
テーブルを作成したら概要タブのストリーム管理ボタンを押し、Dynamo Streamsを有効化してください。
stream-2

2.Lambda関数を作成&ビルド

次に、このあたりを参考にしてJavaでLambdaコードを作成し、アップロードします。
今回はGradleを使ってビルドを行いました。なお、Gradleについてはここを参考にしてください。

必要な設定を記述したbuild.gradleは下記のとおり。

group '<Group Name>'
version '<Your Version>'

apply plugin: 'java'
apply plugin: 'idea'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    compile fileTree(dir: 'jars', include: '*.jar')
    compile 'com.amazonaws:aws-lambda-java-core:1.1.0'
    compile 'com.amazonaws:aws-lambda-java-events:1.1.0'
    compile 'com.amazonaws:aws-java-sdk:1.10.44'
}

task buildZip(type: Zip) {
    from compileJava
    from processResources
    into('lib') {
        from configurations.runtime
    }
}

build.dependsOn buildZip

そして、Lambdaのコードを次のように記述します。

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;

public class Hello implements RequestHandler<DynamodbEvent, String> {

   @Override
   public String handleRequest(DynamodbEvent ddbEvent, Context context) {
        for (DynamodbStreamRecord record : ddbEvent.getRecords()) {
            System.out.println(record.getEventID());
            System.out.println(record.getEventName());
            System.out.println(record.getDynamodb().toString());

        }
        return "Successfully processed " + ddbEvent.getRecords().size() + " records.";
    }
}

RequestHandler(もしくはRequestStreamHandler)をimplementsし、handleRequestをオーバーライドすることで、
Lambda関数を実装可能です。
ここではStreamデータを受け取り、ログ出力しているだけです。

コード記述ができたら、Gradleでビルドします。

% cd path/your/lambda-module
% ./gradlew build

問題なければ/build/distributionsディレクトリにzipファイルが出力されます。

3.zipのアップロード

作成したLambdaモジュールをアップロードし、Lambda関数を定義します。
トリガー>トリガーの作成>新規関数を指定し、Event Source typeやDynamoテーブル名を指定します。
stream-3

Lmabda関数の設定を記述します。RuntimeをJava8に指定し、先ほど作成したzipファイルを指定します。
stream-4
HandlerにはエントリーポイントとなるJavaクラス(パッケージ名含む)を記述します。
Roleb部分には、DynamoStreams・lambdaの実行を許可したIAMロールを指定するので、用意しておきましょう。

4.動作確認

Lambda関数をアップロードしたら、MySteramTableに対してAWSコンソールから登録/更新/削除を実行してみてください。
更新タイミングでLambda関数が起動します。
Lambdaの動作は、CloudWatchのログメニューから確認することができます。  

まとめ

今回はJavaでDynamoDB Streams + Lambdaのサンプルを作成してみました。
ちなみに、DynamoDB StreamsはLambdaを使用しなくてもkinesisと同じく、シャードを取得して自分で
データを取得することもできます。

参考サイトなど