この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
JavaでAWS Lambda Functionを作成し、JSON形式で受け取ったデータをRedshiftに登録するサンプルを作成してみました。他のブログ記事等で既出かもしれませんが、多少苦戦したので、備忘録としてソースを載せておきたいと思います。
事前準備
先に書いたように今回作成するサンプルは、JSON形式でデータを受け取ります。そのJSONは以下の様な形式としました。
{
"timestamp": "2016-01-15 19:01:59",
"column1": "100",
"column2": "200"
}
このデータを格納するテーブルを、予めRedshiftに作成します。以下の様なcreate文で作成しました。
drop table if exists public.input;
create table public.input(
id INTEGER IDENTITY(1,1) not null,
timestamp timestamp not null,
column1 varchar not null,
column2 varchar not null,
customize varchar not null
);
テーブル名は「input」で、idカラムはauto incrementでidを採番します。timestamp・column1・column2カラムはJSONより受け取った値をそのまま登録しますが、customizeカラムにはcolumn1・column2の値を加工した値を登録します。
またJavaよりRedshiftに接続するのにJDBC 4.1 互換ドライバを使用します。これについては、予めローカルにダウンロードしておいてください。
Lambda Function
ではLambda Functionのソースです。今回はgradleを使用してプロジェクトを作成しました。
gradle
「build.gradle」です。以下のようになりました。
apply plugin: 'java'
def defaultEncoding = 'UTF-8'
[compileJava, compileTestJava]*.options*.encoding = defaultEncoding
group = 'com.classmethod.sample'
version = '0.0.1-SNAPSHOT'
description = "LambdaJsonToRedshift"
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
jcenter()
}
dependencies {
compile 'com.amazonaws:aws-lambda-java-core:1.0.0'
compile 'com.amazonaws:aws-lambda-java-events:1.0.0'
compile 'com.fasterxml.jackson.core:jackson-databind:2.3.4'
compile 'org.projectlombok:lombok:1.16.6'
compile files('libs/RedshiftJDBC41-1.1.10.1010.jar')
testCompile 'junit:junit:4.11'
}
jar {
from configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}
gradleについての詳細はここでは書きませんが、23行目でローカルにダウンロードしたJDBC 4.1 互換ドライバのパスを指定しています。
Redshiftに登録するハンドラー
受け取ったJSONをRedshiftに登録するハンドラーです。
package com.classmethod.sample;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.JsonParser;
import lombok.val;
import lombok.SneakyThrows;
import java.sql.*;
import java.util.Properties;
public class MyLambdaFunctionHandler implements RequestHandler<Object, Object> {
@Override
@SneakyThrows
public Object handleRequest(Object input, Context context) {
val mapper = new ObjectMapper();
mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
val inputData = mapper.readValue(formatInput(input), InputData.class);
val result = register(inputData);
return result;
}
private String formatInput(Object input){
return input.toString().
replace(":", "").
replace("-", "").
replace(" ", "").
replace("=", ":");
}
@SneakyThrows
private int register(InputData inputData){
final String jdbcURL = "RedshiftのJDBC URL";
final String user = "ユーザ名";
final String password = "パスワード";
Class.forName("com.amazon.redshift.jdbc41.Driver");
val prop = new Properties();
prop.setProperty("user", user);
prop.setProperty("password", password);
final String sql = "insert into public.input (timestamp, column1, column2, customize) values (?, ?, ?, ?)";
try (val con = DriverManager.getConnection(jdbcURL, prop);
val pstmt = con.prepareStatement(sql);) {
pstmt.setTimestamp(1, inputData.timestamp());
pstmt.setString(2, inputData.column1);
pstmt.setString(3, inputData.column2);
pstmt.setString(4, inputData.customize());
return pstmt.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
return -1;
}
}
}
lombokを使用している事以外は、JDBCを使用してinsert文を実行するJavaのソースです。「register」メソッドで登録処理を行っていますが、成功時には登録した件数を(つまり今回は常に「1」)、失敗時には「-1」を返却しています。また受け取ったJSONを格納し、登録する形式にフォーマットする「InputData」クラスは以下のようになります。
package com.classmethod.sample;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import lombok.SneakyThrows;
public class InputData {
public String timestamp = "";
public String column1 = "";
public String column2 = "";
public String customize(){
return column1 + ":" + column2;
}
@SneakyThrows
public Timestamp timestamp(){
return new Timestamp(new SimpleDateFormat("yyyy/MM/dd hh:mm:ss").parse(formatTimestamp()).getTime());
}
private String formatTimestamp(){
return timestamp.substring(0, 4) + "/" +
timestamp.substring(4, 6) + "/" +
timestamp.substring(6, 8) + " " +
timestamp.substring(8, 10) + ":" +
timestamp.substring(10, 12) + ":" +
timestamp.substring(12, 14);
}
}
LambdaのグローバルIPを取得するハンドラー
LambdaからRedshiftへアクセスを許可するため、今回はセキュリティグループにLambdaのグローバルIPを設定することにしました。そのグローバルIPを調べる為のハンドラーを作成しました。
package com.classmethod.sample;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import lombok.SneakyThrows;
public class MyGlobalIpHandler implements RequestHandler<Object, Object>{
public Object handleRequest(Object input, Context context) {
String myGlobalIp = getIp();
System.out.println(myGlobalIp);
return myGlobalIp;
}
@SneakyThrows
private String getIp() {
URL url = new URL("http://checkip.amazonaws.com");
BufferedReader in = null;
try {
in = new BufferedReader(new InputStreamReader(
url.openStream()));
String ip = in.readLine();
return ip;
}
finally {
if (in != null) {
in.close();
}
}
}
}
getIp()メソッド内でグローバルIPを取得し、その値を返却しています。
Lambdaの実行
上記で作成したJavaのソースをビルドし、Lambda Functionとして登録します。Lambdaの実行ロールのポリシーは以下のようにしました。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:*"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Action": [
"redshift:*"
],
"Effect": "Allow",
"Resource": "*"
}
]
}
Lambda Functionを作成したら、まずはLambdaのグローバルIPを取得するため「MyGlobalIpHandler」をLambdaのハンドラーとして登録してLambdaを実行します。
グローバルIPがLambdaの実行結果として表示されるので、そのIPをRedshiftのセキュリティグループに追加します。
次にRedshiftへの登録を行う「MyLambdaFunctionHandler」をLambdaのハンドラーとして登録します。
「Actions」 - 「Configure Test Event」を押下し、入力値として先に書いたJSONを登録してLambda Functionを実行します。
{
"timestamp": "2016-01-15 19:01:59",
"column1": "100",
"column2": "200"
}
以下の様なデータがRedshiftのinputテーブルに登録される筈です。
id timestamp column1 column2 customize
26 2016-01-15 19:01:59 100 200 100:200
参考サイト
以下のサイトを参考にさせて頂きました。ありがとうございました。
【AWS】JavaプログラムからRedshiftのクラスターに接続する方法
JDBC 接続を設定する
Processingで自分のグローバルIPアドレスを取得する