この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
AWS IoTでは受信したメッセージの送信先としてKinesis Firehoseを選択することができます。Kinesis Firehoseでは受信したメッセージをS3やRedshiftに書き込むことができます。今回はAWS IoTで受信したメッセージを、Kinesis Firehose → S3 → Lambdaの順に連携するサンプルを作成しました。図にすると以下のようになります。
AWS IoT〜S3までは先に書いたように各サービスによる連携を、S3からLambdaの連携についてもS3へのオブジェクト配置時にLambdaが起動するように設定しました。
このような構成にした理由としては、以下の様な要件を想定したためです。
- IoTクライアントから送信されるデータをKinesis Firehoseで受信する
- 受信したオリジナルデータは加工せずにS3に貯めておきたい
- Lambdaに連携し、データを加工しRedshiftやDyamoDBに登録したい
サンプルの作成手順(Kinesis Firehose以降)
ではサンプルの作成手順についてです。今回はKinesis Firehose〜Lambdaまでの作成手順について記述します。AWS IoTの作成については次回までお待ち下さい。
0.環境要件
AWS環境の構築にはAWS CLI、LambdaのソースはJava8を使用し、リージョンはオレゴンを選択しました。
1.Lambdaの作成
最初にメッセージを受信するLambdaについてです。S3にファイルを配置時に呼び出されて、そのファイルのパスと内容をログに出力するソースです。
LambdaFunctionHandler.java
package com.classmethod.sample;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import lombok.val;
public class LambdaFunctionHandler implements RequestHandler<S3Event, Object> {
InputData inputData = null;
@Override
public Object handleRequest(S3Event input, Context context) {
val logger = context.getLogger();
val record = input.getRecords().get(0);
logger.log("********** Log Start **********" + "\n");
logger.log(record.getEventName() + "\n");
logger.log(record.getS3().getBucket().getName() + "\n");
logger.log(record.getS3().getObject().getKey() + "\n");
if(inputData == null) inputData = new InputData();
val lines = inputData.read(record.getS3().getBucket().getName(),
record.getS3().getObject().getKey());
logger.log("input value = " + "\n");
logger.log(lines);
logger.log("\n");
logger.log("********** Log End **********" + "\n");
return lines;
}
}
InputData.java
package com.classmethod.sample;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectRequest;
import lombok.val;
import lombok.SneakyThrows;
public class InputData {
public String read(String bucketName, String key){
val s3Client = new AmazonS3Client(new EnvironmentVariableCredentialsProvider());
val s3Object = s3Client.getObject(new GetObjectRequest(bucketName, key));
return readS3File(s3Object.getObjectContent());
}
@SneakyThrows
private String readS3File(InputStream input){
try(val reader = new BufferedReader(new InputStreamReader(input))){
val lines = new StringBuilder();
while (true) {
String line = reader.readLine();
if (line == null) break;
lines.append(line + "\n");
}
return lines.toString();
}
}
}
build.gradle
apply plugin: 'java'
def defaultEncoding = 'UTF-8'
[compileJava, compileTestJava]*.options*.encoding = defaultEncoding
group = 'com.classmethod.sample'
version = '0.0.1-SNAPSHOT'
description = "LambdaS3Read"
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
jcenter()
}
dependencies {
compile 'com.amazonaws:aws-lambda-java-core:1.0.0'
compile 'com.amazonaws:aws-lambda-java-events:1.0.0'
compile 'org.projectlombok:lombok:1.16.6'
testCompile 'junit:junit:4.11'
}
jar {
from configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}
このJavaソースをビルドし、作成したjarをLambda Functionとして登録します。Function名は「LambdaS3Read」、ロールのロールポリシーは以下のようにしました(が、Redshiftへの権限は必要ないですね・・・)。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:*"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Action": [
"redshift:*"
],
"Effect": "Allow",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ReadObject"
],
"Resource": [
"arn:aws:s3:::*"
]
}
]
}
2.S3のバケット作成
次にS3にバケットを作成し、上記のLambdaを呼び出すようにします。先ずは以下のコマンドでバケットを作成します。
$ aws s3 mb s3://t-honda-kinesis-firehose
make_bucket: s3://t-honda-kinesis-firehose/
Lambdaに権限を追加します。
$ aws lambda add-permission --function-name LambdaS3Read --statement-id s3-put-event --action lambda:InvokeFunction --principal s3.amazonaws.com --source-arn arn:aws:s3:::t-honda-kinesis-firehose
{
"Statement": "{\"Condition\":{\"ArnLike\":{\"AWS:SourceArn\":\"arn:aws:s3:::t-honda-kinesis-firehose\"}},\"Action\":[\"lambda:InvokeFunction\"],\"Resource\":\"arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:LambdaS3Read\",\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"s3.amazonaws.com\"},\"Sid\":\"s3-put-event\"}"
}
バケットにオブジェクトを配置時、Lambdaを呼び出すようにします。読み出すLambdaのArn等の定義はnotification_configuration.jsonに記述しています。
$ aws s3api put-bucket-notification-configuration --bucket t-honda-kinesis-firehose --notification-configuration file://notification_configuration.json
notification_configuration.json
{"LambdaFunctionConfigurations": [{"LambdaFunctionArn": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:LambdaS3Read", "Events": ["s3:ObjectCreated:Put"]}]}
3.Kinesisの実行ロール作成
Kinesisを実行するロールを作成します。権限の詳細はassume_role_policy.jsonに定義しています。
$ aws iam create-role --role-name t-honda-kinesis-firehose_role --assume-role-policy-document file://assume_role_policy.json
{
"Role": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Effect": "Allow",
"Sid": ""
}
]
},
"RoleId": "AROAIDIXI655PEBY5VUYA",
"CreateDate": "2016-02-09T22:42:58.017Z",
"RoleName": "t-honda-kinesis-firehose_role",
"Path": "/",
"Arn": "arn:aws:iam::XXXXXXXXXXXX:role/t-honda-kinesis-firehose_role"
}
}
assume_role_policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
作成したロールにロールポリシーを付与します。ポリシーの詳細はrole_policy.jsonに定義しています。
aws iam put-role-policy --role-name t-honda-kinesis-firehose_role --policy-name KinesisFirehosePolicy --policy-document file://role_policy.json
role_policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::t-honda-kinesis-firehose",
"arn:aws:s3:::t-honda-kinesis-firehose/*"
]
}
]
}
4.Kinesis Firehoseの作成
Kinesis Firehoseを作成します。kinesis_input.jsonのRoleARNに先ほど作成したロールのARNと、BucketARNに送信先のバケットを指定していることに注意してください。
$ aws firehose create-delivery-stream --delivery-stream-name t-honda-kinesis-to-s3 --cli-input-json file://kinesis_input.json
{
"DeliveryStreamARN": "arn:aws:firehose:us-west-2:XXXXXXXXXXXX:deliverystream/t-honda-kinesis-to-s3"
}
kinesis_input.json
{
"DeliveryStreamName": "",
"S3DestinationConfiguration": {
"RoleARN": "arn:aws:iam::XXXXXXXXXXXX:role/t-honda-kinesis-firehose_role",
"BucketARN": "arn:aws:s3:::t-honda-kinesis-firehose",
"Prefix": "",
"BufferingHints": {
"SizeInMBs": 1,
"IntervalInSeconds": 60
},
"CompressionFormat": "UNCOMPRESSED",
"EncryptionConfiguration": {
"NoEncryptionConfig": "NoEncryption"
}
}
}
まとめ
ここまででKinesis Firehoseで受信したメッセージをS3に配置し、Lambdaを起動するまでを構築することができました。後はクライアントからメッセージを受信するAWS IoTの設定を行うだけです。これについては、次回に記述します。