[AWS Lambda] JavaでRedshiftにデータを登録する

2016.01.25

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

JavaでAWS Lambda Functionを作成し、JSON形式で受け取ったデータをRedshiftに登録するサンプルを作成してみました。他のブログ記事等で既出かもしれませんが、多少苦戦したので、備忘録としてソースを載せておきたいと思います。

事前準備

先に書いたように今回作成するサンプルは、JSON形式でデータを受け取ります。そのJSONは以下の様な形式としました。

{
  "timestamp": "2016-01-15 19:01:59",
  "column1": "100",
  "column2": "200"
}

このデータを格納するテーブルを、予めRedshiftに作成します。以下の様なcreate文で作成しました。

drop table if exists public.input;

create table public.input(
    id INTEGER IDENTITY(1,1) not null,
    timestamp timestamp not null,
    column1 varchar not null,
    column2 varchar not null,
    customize varchar not null
);

テーブル名は「input」で、idカラムはauto incrementでidを採番します。timestamp・column1・column2カラムはJSONより受け取った値をそのまま登録しますが、customizeカラムにはcolumn1・column2の値を加工した値を登録します。

またJavaよりRedshiftに接続するのにJDBC 4.1 互換ドライバを使用します。これについては、予めローカルにダウンロードしておいてください。

Lambda Function

ではLambda Functionのソースです。今回はgradleを使用してプロジェクトを作成しました。

gradle

「build.gradle」です。以下のようになりました。

apply plugin: 'java'

def defaultEncoding = 'UTF-8'
[compileJava, compileTestJava]*.options*.encoding = defaultEncoding

group = 'com.classmethod.sample'
version = '0.0.1-SNAPSHOT'

description = "LambdaJsonToRedshift"

sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
    jcenter()
}

dependencies {
    compile 'com.amazonaws:aws-lambda-java-core:1.0.0'
    compile 'com.amazonaws:aws-lambda-java-events:1.0.0'
    compile 'com.fasterxml.jackson.core:jackson-databind:2.3.4'
    compile 'org.projectlombok:lombok:1.16.6'
    compile files('libs/RedshiftJDBC41-1.1.10.1010.jar')

    testCompile 'junit:junit:4.11'
}

jar {
    from configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}

gradleについての詳細はここでは書きませんが、23行目でローカルにダウンロードしたJDBC 4.1 互換ドライバのパスを指定しています。

Redshiftに登録するハンドラー

受け取ったJSONをRedshiftに登録するハンドラーです。

package com.classmethod.sample;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.JsonParser;
import lombok.val;
import lombok.SneakyThrows;
import java.sql.*;
import java.util.Properties;

public class MyLambdaFunctionHandler implements RequestHandler<Object, Object> {

    @Override
    @SneakyThrows
    public Object handleRequest(Object input, Context context) {
        val mapper = new ObjectMapper();
        mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
        val inputData = mapper.readValue(formatInput(input), InputData.class);
        val result = register(inputData);
        return result;
    }
    
    private String formatInput(Object input){
        return input.toString().
                replace(":", "").
                replace("-", "").
                replace(" ", "").
                replace("=", ":");
    }
    
    @SneakyThrows
    private int register(InputData inputData){
        final String jdbcURL = "RedshiftのJDBC URL";
        final String user = "ユーザ名";
        final String password = "パスワード";
        
        Class.forName("com.amazon.redshift.jdbc41.Driver");
        
        val prop = new Properties();
        prop.setProperty("user", user);
        prop.setProperty("password", password);
        
        final String sql = "insert into public.input (timestamp, column1, column2, customize) values (?, ?, ?, ?)";
        
        try (val con = DriverManager.getConnection(jdbcURL, prop);
                val pstmt = con.prepareStatement(sql);) {
            pstmt.setTimestamp(1, inputData.timestamp());
            pstmt.setString(2, inputData.column1);
            pstmt.setString(3, inputData.column2);
            pstmt.setString(4, inputData.customize());
            
            return pstmt.executeUpdate();
           } catch (SQLException e) {
               e.printStackTrace();
               return -1;
           }
    }

}

lombokを使用している事以外は、JDBCを使用してinsert文を実行するJavaのソースです。「register」メソッドで登録処理を行っていますが、成功時には登録した件数を(つまり今回は常に「1」)、失敗時には「-1」を返却しています。また受け取ったJSONを格納し、登録する形式にフォーマットする「InputData」クラスは以下のようになります。

package com.classmethod.sample;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;

import lombok.SneakyThrows;

public class InputData {
    public String timestamp = "";
    public String column1 = "";
    public String column2 = "";
    
    public String customize(){
        return column1 + ":" + column2;
    }
    
    @SneakyThrows
    public Timestamp timestamp(){
        return new Timestamp(new SimpleDateFormat("yyyy/MM/dd hh:mm:ss").parse(formatTimestamp()).getTime());
    }
    
    private String formatTimestamp(){
        return timestamp.substring(0, 4) + "/" +
                timestamp.substring(4, 6) + "/" +
                timestamp.substring(6, 8) + " " +
                timestamp.substring(8, 10) + ":" +
                timestamp.substring(10, 12) + ":" +
                timestamp.substring(12, 14);
    }
}

LambdaのグローバルIPを取得するハンドラー

LambdaからRedshiftへアクセスを許可するため、今回はセキュリティグループにLambdaのグローバルIPを設定することにしました。そのグローバルIPを調べる為のハンドラーを作成しました。

package com.classmethod.sample;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import lombok.SneakyThrows;

public class MyGlobalIpHandler implements RequestHandler<Object, Object>{
    public Object handleRequest(Object input, Context context) {
        String myGlobalIp = getIp();
        System.out.println(myGlobalIp);
        return myGlobalIp;
    }
    
    @SneakyThrows
    private String getIp() {
        URL url = new URL("http://checkip.amazonaws.com");
        BufferedReader in = null;
        try {
            in = new BufferedReader(new InputStreamReader(
                    url.openStream()));
            String ip = in.readLine();
            return ip;
        } 
        finally {
            if (in != null) {
                in.close();
            }
        }
    }
}

getIp()メソッド内でグローバルIPを取得し、その値を返却しています。

Lambdaの実行

上記で作成したJavaのソースをビルドし、Lambda Functionとして登録します。Lambdaの実行ロールのポリシーは以下のようにしました。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:*"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Action": [
                "redshift:*"
            ],
            "Effect": "Allow",
            "Resource": "*"
        }
    ]
}

Lambda Functionを作成したら、まずはLambdaのグローバルIPを取得するため「MyGlobalIpHandler」をLambdaのハンドラーとして登録してLambdaを実行します。

lambda-function-1

グローバルIPがLambdaの実行結果として表示されるので、そのIPをRedshiftのセキュリティグループに追加します。

次にRedshiftへの登録を行う「MyLambdaFunctionHandler」をLambdaのハンドラーとして登録します。

lambda-function-2

「Actions」 - 「Configure Test Event」を押下し、入力値として先に書いたJSONを登録してLambda Functionを実行します。

{
  "timestamp": "2016-01-15 19:01:59",
  "column1": "100",
  "column2": "200"
}

以下の様なデータがRedshiftのinputテーブルに登録される筈です。

id	timestamp	column1	column2	customize
26	2016-01-15 19:01:59	100	200	100:200

参考サイト

以下のサイトを参考にさせて頂きました。ありがとうございました。

【AWS】JavaプログラムからRedshiftのクラスターに接続する方法
JDBC 接続を設定する
Processingで自分のグローバルIPアドレスを取得する