この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
KinesisやRedshiftを使ったビッグデータ基盤の運用負荷を減らせないかと考えていたところ、Lambdaが正式公開されましたのでAWSのフルマネージドサービスだけで構築できないかやってみました。今回、構築したのはスマホアプリやIntel Edisonなどの端末からセンサー情報をKinesisに送信後、S3を経由してRedshiftにデータをロードするところまでになります。処理としては以下の図のような流れになります。Kinesisにデータを送信する箇所とデータ分析の部分は含まれていません。今回まだLambdaが東京リージョンでは使えないので、N.Virginiaリージョンで試しました。
Kinesisから取得したデータをCSVにしてS3へアップロードする
デバイスからKinesisに送信されたデータを取得してCSVファイルを作成、S3にアップロードしています。
準備する
- Kinesisのストリームを作成
- まず最初にKinesisのストリームを作成します。私はmystreamという名前にしました。
- CSVをアップロードするバケットを作成
- RedshiftにデータをロードするためにKinesisから取得したデータをCSVにしてS3にアップロードする必要があります。CSVを置いておくバケットを作成しましょう。私はmy-lambda-sampleという名前にしました。
- IAMロールの作成
- KinesisとS3が利用できるIAMロールを作成します。以下がPolicyになります。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
"kinesis:ListStreams"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"logs:*"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::*"
]
}
]
}
Lambdaファンクションの実装
Kinesisからデータを取得してCSVに変換し、S3にアップロードするファンクションです。今回はCSVはソースコードを短く分かりやすくしたかったのでやっていませんが、CSVは圧縮した方がいいと思います。Management Consoleで実行するとS3のmy-lambda-sampleにtable1.csvというデータがアップロードされます。data checkというコメントの部分でデータのチェックやCSVへの変換処理を行う想定です。
var aws = require('aws-sdk');
var s3 = new aws.S3({apiVersion: '2006-03-01'});
exports.handler = function(event, context) {
var data = '';
event.Records.forEach(function(record) {
payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
// data check
data += payload + '\n';
});
s3.putObject({
Bucket:'my-lambda-sample',
Key:'table1.csv',
Body: data},
function (err) {
if(err){
context.succeed('error:' + err);
}else{
context.succeed('success');
}
}
);
};
Lambdaは以下のようにManagement Console上で編集することができます。ソースコードを張り付けて名前とRoleを設定後、保存したら完了です。
イベントの登録
どのような場合にLambdaファンクションが実行されるのか定義します。今回はKinesisに新しいデータが5件追加されたらイベントが発生することにします。Kinesis streamのプルダウンにはLambdaと同じリージョンのストリームしか表示されないので注意しましょう。Lambdaはまだ東京リージョンにないからLambdaだけ別のリージョンにする、というのはできません。
S3にアップロードしたCSVファイルをRedshiftにロードする
準備する
- Redshiftのクラスターとテーブルの作成
- 最後にRedshiftの準備をします。クラスターを作成しテーブルを作成します。
CREATE TABLE public.table1 (
column1 int,
column2 VARCHAR(30)
);
- IAMロールの作成
- S3とRedshiftが利用できるIAMロールを作成します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:*"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::*"
]
},
{
"Effect": "Allow",
"Action": [
"redshift:*"
],
"Resource": "*"
}
]
}
Lambdaファンクションの実装
以下がS3にアップロードしたCSVファイルをRedshiftにロードするLambdaファンクションになります。 ソースコードに直接APIキーを埋め込んでいるのが気になる方は回避する方法があるので以下の記事をご覧ください。 AWS LambdaからIAM RoleのCredential情報を取得し、RedshiftのCOPY処理に利用する
var pg = require('pg');
exports.handler = function(event, context) {
var bucket = event.Records[0].s3.bucket.name;
var key = event.Records[0].s3.object.key;
console.log(bucket);
console.log(key);
var target = 'tcp://<接続ユーザー名>:<接続パスワード>@<接続時のエンドポイント>:<接続ポート番号>/<接続DB名>';
var client = new pg.Client(target);
client.connect();
var queryString = "COPY public.table1 FROM 's3://" + bucket + "/" + key + "' ";
queryString += "CREDENTIALS 'aws_access_key_id=<アクセスキー>;aws_secret_access_key=<シークレットアクセスキー>' ";
queryString += 'CSV;';
queryString += 'commit;';
client.query(queryString,
function (err) {
if(err){
context.succeed('error:' + err);
}else{
context.succeed('success');
}
}
);
};
先ほどはManagement Consoleからやりましたが今度はpgというモジュールを使用しているので、ローカルで実装したものをzipにしてアップロードしています。npmコマンドを使ってモジュールをインストールする方法やモジュールをzipファイルに含めてアップロードする方法は以下の記事を参考にしました。デフォルトだと3秒でタイムアウトするので60秒に設定しています。
イベントの登録
どのような場合にLambdaファンクションが実行されるのか定義します。今回はS3のmy-lambda-sampleというバケットにファイルがアップロードされたらRedshiftにロードするCSVファイルと見なしてロードを開始しています。BucketのプルダウンにはLambdaと同じリージョンのBucketしか選択できないので注意しましょう。
動作を確認する
実装できたので確認してみましょう。確認にはAWS CLIを使います。以下のコマンドを実行するとKinesisにレコードが5件追加されますが、これによりRedshiftのtable1にデータが5件追加されていれば成功です。
実行したコマンドは以下の記事を参考にしました。 Kinesisの複数PUT対応APIを試してみた
$ echo '{
"Records": [
{
"Data": "1,aaa",
"PartitionKey": "sample1"
},
{
"Data": "2,bbb",
"PartitionKey": "sample2"
},
{
"Data": "3,ccc",
"PartitionKey": "sample3"
},
{
"Data": "4,ddd",
"PartitionKey": "sample4"
},
{
"Data": "5,eee",
"PartitionKey": "sample5"
}
],
"StreamName": "mystream"
}' > sample.json
$ aws kinesis put-records --cli-input-json file://sample.json
まとめ
私は今回Lambdaに初めて触ったので時間がかかりましたが、慣れればすばやく構築できるようになるのではないかと思います。Javascriptのイベントオブジェクトから追加されたファイルやレコードの情報が取得できるのがとても便利でした。EC2を立てずにフルマネージドサービスだけで運用できれば負荷を減らすことができそうですね。早く東京リージョンで使えるようになって欲しいです。今後、実際にこのような構成で運用する場合はどの程度の負荷まで耐えられるのかは分からないので、実際に運用する際には負荷試験をしっかりやってからにしたいと思います!