[AWS Lambda] JavaでRedshiftにデータを登録する
はじめに
JavaでAWS Lambda Functionを作成し、JSON形式で受け取ったデータをRedshiftに登録するサンプルを作成してみました。他のブログ記事等で既出かもしれませんが、多少苦戦したので、備忘録としてソースを載せておきたいと思います。
事前準備
先に書いたように今回作成するサンプルは、JSON形式でデータを受け取ります。そのJSONは以下の様な形式としました。
{ "timestamp": "2016-01-15 19:01:59", "column1": "100", "column2": "200" }
このデータを格納するテーブルを、予めRedshiftに作成します。以下の様なcreate文で作成しました。
drop table if exists public.input; create table public.input( id INTEGER IDENTITY(1,1) not null, timestamp timestamp not null, column1 varchar not null, column2 varchar not null, customize varchar not null );
テーブル名は「input」で、idカラムはauto incrementでidを採番します。timestamp・column1・column2カラムはJSONより受け取った値をそのまま登録しますが、customizeカラムにはcolumn1・column2の値を加工した値を登録します。
またJavaよりRedshiftに接続するのにJDBC 4.1 互換ドライバを使用します。これについては、予めローカルにダウンロードしておいてください。
Lambda Function
ではLambda Functionのソースです。今回はgradleを使用してプロジェクトを作成しました。
gradle
「build.gradle」です。以下のようになりました。
apply plugin: 'java' def defaultEncoding = 'UTF-8' [compileJava, compileTestJava]*.options*.encoding = defaultEncoding group = 'com.classmethod.sample' version = '0.0.1-SNAPSHOT' description = "LambdaJsonToRedshift" sourceCompatibility = 1.8 targetCompatibility = 1.8 repositories { jcenter() } dependencies { compile 'com.amazonaws:aws-lambda-java-core:1.0.0' compile 'com.amazonaws:aws-lambda-java-events:1.0.0' compile 'com.fasterxml.jackson.core:jackson-databind:2.3.4' compile 'org.projectlombok:lombok:1.16.6' compile files('libs/RedshiftJDBC41-1.1.10.1010.jar') testCompile 'junit:junit:4.11' } jar { from configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
gradleについての詳細はここでは書きませんが、23行目でローカルにダウンロードしたJDBC 4.1 互換ドライバのパスを指定しています。
Redshiftに登録するハンドラー
受け取ったJSONをRedshiftに登録するハンドラーです。
package com.classmethod.sample; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.core.JsonParser; import lombok.val; import lombok.SneakyThrows; import java.sql.*; import java.util.Properties; public class MyLambdaFunctionHandler implements RequestHandler<Object, Object> { @Override @SneakyThrows public Object handleRequest(Object input, Context context) { val mapper = new ObjectMapper(); mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); val inputData = mapper.readValue(formatInput(input), InputData.class); val result = register(inputData); return result; } private String formatInput(Object input){ return input.toString(). replace(":", ""). replace("-", ""). replace(" ", ""). replace("=", ":"); } @SneakyThrows private int register(InputData inputData){ final String jdbcURL = "RedshiftのJDBC URL"; final String user = "ユーザ名"; final String password = "パスワード"; Class.forName("com.amazon.redshift.jdbc41.Driver"); val prop = new Properties(); prop.setProperty("user", user); prop.setProperty("password", password); final String sql = "insert into public.input (timestamp, column1, column2, customize) values (?, ?, ?, ?)"; try (val con = DriverManager.getConnection(jdbcURL, prop); val pstmt = con.prepareStatement(sql);) { pstmt.setTimestamp(1, inputData.timestamp()); pstmt.setString(2, inputData.column1); pstmt.setString(3, inputData.column2); pstmt.setString(4, inputData.customize()); return pstmt.executeUpdate(); } catch (SQLException e) { e.printStackTrace(); return -1; } } }
lombokを使用している事以外は、JDBCを使用してinsert文を実行するJavaのソースです。「register」メソッドで登録処理を行っていますが、成功時には登録した件数を(つまり今回は常に「1」)、失敗時には「-1」を返却しています。また受け取ったJSONを格納し、登録する形式にフォーマットする「InputData」クラスは以下のようになります。
package com.classmethod.sample; import java.sql.Timestamp; import java.text.SimpleDateFormat; import lombok.SneakyThrows; public class InputData { public String timestamp = ""; public String column1 = ""; public String column2 = ""; public String customize(){ return column1 + ":" + column2; } @SneakyThrows public Timestamp timestamp(){ return new Timestamp(new SimpleDateFormat("yyyy/MM/dd hh:mm:ss").parse(formatTimestamp()).getTime()); } private String formatTimestamp(){ return timestamp.substring(0, 4) + "/" + timestamp.substring(4, 6) + "/" + timestamp.substring(6, 8) + " " + timestamp.substring(8, 10) + ":" + timestamp.substring(10, 12) + ":" + timestamp.substring(12, 14); } }
LambdaのグローバルIPを取得するハンドラー
LambdaからRedshiftへアクセスを許可するため、今回はセキュリティグループにLambdaのグローバルIPを設定することにしました。そのグローバルIPを調べる為のハンドラーを作成しました。
package com.classmethod.sample; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URL; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import lombok.SneakyThrows; public class MyGlobalIpHandler implements RequestHandler<Object, Object>{ public Object handleRequest(Object input, Context context) { String myGlobalIp = getIp(); System.out.println(myGlobalIp); return myGlobalIp; } @SneakyThrows private String getIp() { URL url = new URL("http://checkip.amazonaws.com"); BufferedReader in = null; try { in = new BufferedReader(new InputStreamReader( url.openStream())); String ip = in.readLine(); return ip; } finally { if (in != null) { in.close(); } } } }
getIp()メソッド内でグローバルIPを取得し、その値を返却しています。
Lambdaの実行
上記で作成したJavaのソースをビルドし、Lambda Functionとして登録します。Lambdaの実行ロールのポリシーは以下のようにしました。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:*" ], "Resource": "arn:aws:logs:*:*:*" }, { "Action": [ "redshift:*" ], "Effect": "Allow", "Resource": "*" } ] }
Lambda Functionを作成したら、まずはLambdaのグローバルIPを取得するため「MyGlobalIpHandler」をLambdaのハンドラーとして登録してLambdaを実行します。
グローバルIPがLambdaの実行結果として表示されるので、そのIPをRedshiftのセキュリティグループに追加します。
次にRedshiftへの登録を行う「MyLambdaFunctionHandler」をLambdaのハンドラーとして登録します。
「Actions」 - 「Configure Test Event」を押下し、入力値として先に書いたJSONを登録してLambda Functionを実行します。
{ "timestamp": "2016-01-15 19:01:59", "column1": "100", "column2": "200" }
以下の様なデータがRedshiftのinputテーブルに登録される筈です。
id timestamp column1 column2 customize 26 2016-01-15 19:01:59 100 200 100:200
参考サイト
以下のサイトを参考にさせて頂きました。ありがとうございました。
【AWS】JavaプログラムからRedshiftのクラスターに接続する方法
JDBC 接続を設定する
Processingで自分のグローバルIPアドレスを取得する