S3にアップロードしたCSVファイルをLambda経由でAmazon RedshiftにCOPYする – AWS Lambda Advent Calendar 2014:2日目

2014.12.02

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

先日米国ラスベガスで開催された『re:Invent 2014』にて発表されたAWS Lambdaに関するエントリを12/01から12/25まで毎日1本ずつ書いていくアドベントカレンダー『AWS Lambda Advent Calendar 2014』。このエントリは2日目の内容となります。

先日1日目のエントリはharuyoshさんの『Lambdaからhttpリクエスト』でした。

本日2日目のエントリは『S3にアップロードしたCSVファイルをLambda経由でAmazon RedshiftにCOPYする』です。Redshiftへのデータ投入についてはS3にファイルをアップロードした状態でシェル等を使ってCOPYコマンドを発行し、Redshiftにデータをロードするのが一般的ですが、AWS Lambdaでは『S3イベント通知』を使った処理も行う事が可能になっていますので、最終的にはそのトリガー(S3にCSVファイルがアップロードされたら)を使って適宜Redshiftにデータを投入してしまおう、という試みです。

目次

1.環境準備

Node.js開発・実行環境を別個にEC2上に用意してみる

Node.js上でライブラリ依存無い要素のものを作るのであれば管理コンソール上で行けるけれども、ライブラリ依存だとzipパッケージングしなければならなさそう。なので、個別の環境を用意してそこでプログラムの動作確認及びパッケージングを行う。

EC2起動&ログイン後、Node.js及びnpmの環境をインストール。

$ ssh -i xxxxxx.key ec2-user@XXX.XXX.XXX.XXX
$ sudo yum -y install nodejs npm --enablerepo=epel
$ node -v
v0.10.32

Redshiftにアクセスする為、ライブラリ周りの環境を整えるべくPostgreSQLをインストール。

$ sudo yum -y install postgresql postgresql-devel
$ psql --version
psql (PostgreSQL) 9.2.9

npmコマンドでpgをインストール。

$ npm install pg
npm http GET https://registry.npmjs.org/pg
npm http 304 https://registry.npmjs.org/pg
npm http GET https://registry.npmjs.org/bindings/1.2.1
npm http GET https://registry.npmjs.org/buffer-writer/1.0.0
npm http GET https://registry.npmjs.org/generic-pool/2.1.1
npm http GET https://registry.npmjs.org/nan/1.3.0
npm http GET https://registry.npmjs.org/packet-reader/0.2.0
npm http GET https://registry.npmjs.org/pg-connection-string/0.1.3
npm http GET https://registry.npmjs.org/pg-types/1.6.0
npm http GET https://registry.npmjs.org/pgpass/0.0.3
npm http 304 https://registry.npmjs.org/buffer-writer/1.0.0
npm http 304 https://registry.npmjs.org/bindings/1.2.1
npm http 304 https://registry.npmjs.org/generic-pool/2.1.1
npm http 304 https://registry.npmjs.org/packet-reader/0.2.0
npm http 304 https://registry.npmjs.org/pg-connection-string/0.1.3
npm http 304 https://registry.npmjs.org/pgpass/0.0.3
npm http 304 https://registry.npmjs.org/pg-types/1.6.0
npm http 304 https://registry.npmjs.org/nan/1.3.0
npm http GET https://registry.npmjs.org/split
npm http 304 https://registry.npmjs.org/split
npm http GET https://registry.npmjs.org/through
npm http 304 https://registry.npmjs.org/through

> pg@3.6.3 install /home/ec2-user/node_modules/pg
> node-gyp rebuild || (exit 0)

make: ディレクトリ `/home/ec2-user/node_modules/pg/build' に入ります
  CXX(target) Release/obj.target/binding/src/binding.o
  SOLINK_MODULE(target) Release/obj.target/binding.node
  SOLINK_MODULE(target) Release/obj.target/binding.node: Finished
  COPY Release/binding.node
make: ディレクトリ `/home/ec2-user/node_modules/pg/build' から出ます
pg@3.6.3 node_modules/pg
├── bindings@1.2.1
├── packet-reader@0.2.0
├── pg-connection-string@0.1.3
├── buffer-writer@1.0.0
├── generic-pool@2.1.1
├── pg-types@1.6.0
├── nan@1.3.0
└── pgpass@0.0.3 (split@0.3.1)
$

新規立ち上げたEC2上にpgを導入した際の構造は以下の様になっています。

$ ll
合計 336
drwxrwxr-x 3 ec2-user ec2-user   4096 11月 17 08:29 node_modules
drwxrwxr-x 2 ec2-user ec2-user   4096 11月 17 08:21 tmp
$

2.データ準備

今回は、以下サイトから『世界の地震に関する記録』をCSVとしてダウンロードし、一定期間分の情報として揃えました。全て投入すると凡そ20万件となるボリュームです。

中身はこんな感じです。

query20110101-0630-utf8.csv

time,latitude,longitude,depth,mag,magType,nst,gap,dmin,rms,net,id,updated,place,type
2011-06-30T23:41:56.790Z,51.911,-170.538,28.6,3.4,ml,12,224.6,,,pde,pde20110630234156790_28,2013-03-16T00:03:24.149Z,"Fox Islands, Aleutian Islands, Alaska",earthquake
2011-06-30T22:35:01.680Z,-10.248,166.034,35,4.5,mb,11,110.4,,1.08,pde,pde20110630223501680_35,2013-03-16T00:03:24.146Z,"Santa Cruz Islands",earthquake
2011-06-30T22:03:24.000Z,36.459,-120.369,8,3.2,ml,7,159.3,,,pde,pde20110630220324000_8,2014-04-09T10:16:50.244Z,"Central California",earthquake
2011-06-30T22:02:49.440Z,-56.055,-26.397,35,4.4,mb,16,70.7,,0.76,pde,pde20110630220249440_35,2013-03-16T00:03:24.142Z,"South Sandwich Islands region",earthquake
2011-06-30T21:47:59.260Z,40.378,143.401,30,4.8,mb,80,136,,0.92,pde,pde20110630214759260_30,2013-03-16T00:03:24.133Z,"off the east coast of Honshu, Japan",earthquake
:
:

所定のバケットに2011年01月から2014年08月までのデータを以下の様に用意しておきました。

$ aws s3 ls 's3://xxxxxxxxxxxxxx/earthquake/'
2014-11-27 23:54:35          0 
2014-11-27 23:54:41    2013038 query20110101-0630-utf8.csv
2014-11-27 23:54:43    1501325 query20110701-1231-utf8.csv
2014-11-27 23:54:41    1446194 query20120101-0630-utf8.csv
2014-11-27 23:54:41    1572000 query20120701-1231-utf8.csv
2014-11-27 23:54:41    2454740 query20130101-0331-utf8.csv
2014-11-27 23:54:43    2613954 query20130401-0531-utf8.csv
2014-11-27 23:54:41    1186770 query20130601-0630-utf8.csv
2014-11-03 23:54:41    2510417 query20130701-0831-utf8.csv
2014-11-27 23:54:45    2567750 query20130901-1031-utf8.csv
2014-11-27 23:54:47    2744555 query20131101-1231-utf8.csv
2014-11-27 23:54:46    2861908 query20140101-0228-utf8.csv
2014-11-27 23:54:47    3076560 query20140301-0430-utf8.csv
2014-11-27 23:54:46    3064543 query20140501-0630-utf8.csv
2014-11-27 23:54:48    1137304 query20140701-0831-utf8.csv

投入用のテーブルも事前に用意しておく事にしました。定義は以下の様にしています。

# SELECT * FROM pg_table_def WHERE schemaname='public' AND tablename='earthquake';
 schemaname | tablename  |  column   |            type             | encoding | distkey | sortkey | notnull 
------------+------------+-----------+-----------------------------+----------+---------+---------+---------
 public     | earthquake | time      | timestamp without time zone | delta    | f       |       1 | t
 public     | earthquake | latitude  | double precision            | none     | f       |       0 | t
 public     | earthquake | longitude | double precision            | none     | f       |       0 | t
 public     | earthquake | depth     | double precision            | none     | f       |       0 | f
 public     | earthquake | mag       | double precision            | none     | t       |       0 | f
 public     | earthquake | mag_type  | character(5)                | none     | f       |       0 | t
 public     | earthquake | nst       | smallint                    | none     | f       |       0 | f
 public     | earthquake | gap       | double precision            | none     | f       |       0 | f
 public     | earthquake | dmin      | double precision            | none     | f       |       0 | f
 public     | earthquake | rms       | double precision            | none     | f       |       0 | f
 public     | earthquake | net       | character(5)                | lzo      | f       |       0 | t
 public     | earthquake | id        | character(30)               | lzo      | f       |       0 | t
 public     | earthquake | updated   | timestamp without time zone | delta    | f       |       0 | t
 public     | earthquake | place     | character(100)              | lzo      | f       |       0 | t
 public     | earthquake | type      | character(30)               | lzo      | f       |       0 | t
(15 rows)

定義文はこんな感じで。

CREATE TABLE public.earthquake (
    time TIMESTAMP encode delta NOT NULL,
    latitude DOUBLE PRECISION NOT NULL,
    longitude DOUBLE PRECISION NOT NULL,
    depth DOUBLE PRECISION,
    mag  DOUBLE PRECISION,
    mag_type CHAR(5) NOT NULL,
    nst SMALLINT,
    gap DOUBLE PRECISION,
    dmin DOUBLE PRECISION,
    rms DOUBLE PRECISION,
    net CHAR(5) encode lzo NOT NULL,
    id CHAR(30) encode lzo NOT NULL,
    updated TIMESTAMP encode delta NOT NULL,
    place CHAR(100) encode lzo NOT NULL,
    type CHAR(30) encode lzo NOT NULL
)
distkey(mag)
sortkey(time);

3.プログラム作成〜動作確認

3-1.ユーザーアプリケーション経由で実行する場合

3-1-A.Node.jsプログラム作成

まずはともかく、EC2上に用意したNode.jsの環境内からRedshiftへデータを投入出来るか、試してみる事から始めてみました。Node.js自体は今回Lambdaの件があって新たに触り始めたようなもんなのでコード的にはお見苦しい点があるかと思いますが、多目に見てやってください(´・ω・`) コードレビュー的な御指摘がありましたら頂けますと幸いです!

以下コード内では(1).COPY処理 → (2).COMMIT → (3).VACUUM処理 → (4).COMMIT、と処理を行っています。トランザクションを使った形で実行してみたかったのでこういう形となりました。

copy-s3csv-to-redshift.js

var pg = require('pg');

// 接続文字列
var rsConnectionString = "tcp://<接続ユーザー名>:<接続パスワード>@<接続時のエンドポイント>:<接続ポート番号>/<接続DB名>";

// 
var client = new pg.Client(rsConnectionString);
client.connect();

var rollback = function(client) {
  console.log("ERROR!");
  client.query('ROLLBACK', function() {
    client.end();
  });
};

// トランザクション開始
client.query('BEGIN', function(err, result) {
  if(err) return rollback(client);
  console.log("transaction start.");

  // クエリ実行(1).COPY
  var queryString = "";
  queryString += "COPY public.earthquake ";
  queryString += "FROM \'s3://xxxxxxxxxxxxxx/earthquake/' ";
  queryString += "CREDENTIALS \'aws_access_key_id=XXXXXXXXXX;aws_secret_access_key=YYYYYYYYYY\' "; 
  queryString += "CSV ";
  queryString += "IGNOREHEADER 1 ";
  queryString += "TIMEFORMAT \'auto\' ";
  queryString += "DELIMITER \',\'; ";

  client.query(queryString, function(err, result) {
    if(err) { console.log(err); return rollback(client); }
    console.log("COPY Operation Done.");

    // クエリ実行(2).COMMIT
    client.query("COMMIT;", client.end.bind(client));
    console.log("transaction commit.");

    // クエリ実行(3).VACUUM
    queryString = "VACUUM public.earthquake;";
    client.query(queryString, function(err, result) {
      if(err) { console.log(err); return rollback(client); }
      console.log("VACUUM Operation Done.");

      // クエリ実行(4).COMMIT
      client.query("COMMIT;", client.end.bind(client));
      console.log("transaction commit.");
      client.end();
      
      // トランザクション終了
      console.log("transaction end.");
    });
  });
});

ファイル実行。現在の環境では一連の処理に約10秒掛かっています。

$ time node copy-s3csv-to-redshift.js
transaction start.
COPY Operation Done.
transaction commit.
VACUUM Operation Done.
transaction commit.
transaction end.

real	0m10.730s
user	0m0.096s
sys	0m0.004s
$

処理実行後の件数を確認してみます。ちゃんと投入されているようです!

# SELECT COUNT(*) FROM public.earthquake;
 count  
--------
 195276
(1 row)

3-1-B.パッケージング

ひとまずコードとしては動きそうな感じですね。では次はこれをパッケージングしてLambdaで動くようにしてみましょう。仕様に従って上記のコードをfunctionで囲みます。

exports.handler = function(event, context) {
  var pg = require('pg');

  // 接続文字列
  var rsConnectionString = "tcp://<接続ユーザー名>:<接続パスワード>@<接続時のエンドポイント>:<接続ポート番号>/<接続DB名>";
        :
      (中略)
        :
        // トランザクション終了
        console.log("transaction end.");
      });
    });
  });
}

パッケージング実施。こちらは以下の佐々木さん(a.k.a.すもけ)のエントリを参考にさせて頂きました。実行後はzipファイルが生成されています。作成したファイル何らかの手段でローカルに保存しておいてください。(自分は一度EC2からs3の任意のバケットにアップロードした後、そこからダウンロードしました)

$ zip -r copy-s3csv-to-redshift.zip copy-s3csv-to-redshift.js node_modules
$ ll
合計 344
-rw-rw-r-- 1 ec2-user ec2-user   1635 11月 27 16:20 copy-s3csv-to-redshift.js
-rw-rw-r-- 1 ec2-user ec2-user 338170 11月 27 16:25 copy-s3csv-to-redshift.zip
drwxrwxr-x 4 ec2-user ec2-user   4096 11月 27 12:16 node_modules
drwxrwxr-x 2 ec2-user ec2-user   4096 11月 27 12:16 tmp

3-1-C.デプロイ to Lambda Function

管理コンソール、Lambdaのメニューから[Create a Lambda Function]を押下。

lambda-s3-upload-01

Function作成に必要な要素を設定。上記ファイル実行では凡そ10秒処理時間が掛かっていたので、タイムアウト値初期設定の3秒ではまず引っ掛かりそうな気がしてますが、ここは敢えてどういう風にエラーが出るのか試す意味でもそのまま一旦作ってみる事にします。

lambda-s3-upload-02

作成完了。[Edit/Test]を押下。

lambda-s3-upload-03

3-1-D.動作確認

そして[Invoke]押下で処理実行。やはり何か出て来たようです...

lambda-s3-upload-04

ログ出力部分は以下の様なメッセージが表示されていました。確かにきっちり3秒でタイムアウトしています。

Logs
----
START RequestId: 6774849f-7653-11e4-bdf5-5322a5ef9018
2014-11-27T16:35:22.892Z	6774849f-7653-11e4-bdf5-5322a5ef9018	transaction start.
Task timed out after 3.00 seconds

END RequestId: 6774849f-7653-11e4-bdf5-5322a5ef9018
REPORT RequestId: 6774849f-7653-11e4-bdf5-5322a5ef9018	Duration: 3000.61 ms	Billed Duration: 3100 ms 	Memory Size: 128 MB	Max Memory Used: 11 MB	

Message
-------
Task timed out after 3.00 seconds::

問題となった箇所の『タイムアウト値』、上記プログラムであれば15秒もあれば多分大丈夫だとは思いますが、ここは思い切って最大値の60秒に設定した上で、改めて処理実行をしてみます。今度は上手く動きました!

lambda-s3-upload-05

ログ出力内容です。

Logs
----
START RequestId: aec6818a-7655-11e4-bb25-6dea16c5701f
2014-11-27T16:51:35.852Z	aec6818a-7655-11e4-bb25-6dea16c5701f	transaction start.
2014-11-27T16:51:43.329Z	aec6818a-7655-11e4-bb25-6dea16c5701f	COPY Operation Done.
2014-11-27T16:51:43.330Z	aec6818a-7655-11e4-bb25-6dea16c5701f	transaction commit.
2014-11-27T16:51:46.243Z	aec6818a-7655-11e4-bb25-6dea16c5701f	VACUUM Operation Done.
2014-11-27T16:51:46.243Z	aec6818a-7655-11e4-bb25-6dea16c5701f	transaction commit.
2014-11-27T16:51:46.244Z	aec6818a-7655-11e4-bb25-6dea16c5701f	transaction end.
Process exited before completing request

END RequestId: aec6818a-7655-11e4-bb25-6dea16c5701f
REPORT RequestId: aec6818a-7655-11e4-bb25-6dea16c5701f	Duration: 11380.91 ms	Billed Duration: 11400 ms 	Memory Size: 128 MB	Max Memory Used: 11 MB	

Message
-------
Process exited before completing request::

改めて件数確認をしてみます。上記実施手順でNode.js直打ちした際のデータを消さずにそのまま残してしまったので、計2回分(Node.js経由の投入/Lambda経由の投入)の件数となってしまいましたが、データがちゃんと追加されている事は確認出来ました。

# SELECT COUNT(*) FROM public.earthquake;
 count  
--------
 390552
(1 row)

3-2.S3イベント通知経由で実行する場合

次いでS3イベント通知経由で実行する場合のケースに取り掛かってみます。下記エントリでも使った既存のs3イベント通知サンプルを使って、s3にアップロードしたオブジェクトの要素情報がどのように取得出来るのか、環境を整えて試しにアップしてみます。

3-2-A.S3動作環境の設計と情報収集

まずは、S3バケットを作成した上でそのバケット配下を以下の様に整理してみました。テーブル名/YYYY/MM/DD/という構成で日付フォルダの中にその日アップロードされるべきCSVファイルが格納される、というイメージとなります。

$ aws s3 ls --recursive 's3://cm-lambda-test/'
2014-11-29 01:00:20          0 table_aaa/
2014-11-29 01:00:20          0 table_bbb/
2014-11-29 01:00:35          0 table_bbb/2014/
2014-11-29 01:00:40          0 table_bbb/2014/11/
2014-11-30 07:13:33          0 table_bbb/2014/11/29/
2014-11-30 07:12:43          0 table_bbb/2014/11/30/
2014-11-29 01:00:43          0 table_bbb/2014/12/
2014-11-30 07:12:54          0 table_bbb/2014/12/01/
2014-11-30 07:12:58          0 table_bbb/2014/12/02/
2014-11-30 07:13:03          0 table_bbb/2014/12/03/
2014-11-30 07:13:08          0 table_bbb/2015/
2014-11-29 01:00:20          0 table_ccc/
$

アップロードしたファイルの内容をどのように情報取得出来るか、以下の様にデバッグ文を埋め込んでみます。(※ちなみに、この部分のコードや環境は先述エントリ『Amazon S3イベントを扱う』で使ったものを流用しています。)

console.log('Loading event');
var aws = require('aws-sdk');
var s3 = new aws.S3({apiVersion: '2006-03-01'});

exports.handler = function(event, context) {
   console.log('Received event:');
   console.log(JSON.stringify(event, null, '  '));
   // Get the object from the event and show its content type
   var bucket = event.Records[0].s3.bucket.name;
   console.log(bucket);
   var key = event.Records[0].s3.object.key;
   console.log(key);
   console.log(event.Records[0].s3.object);
   s3.getObject({Bucket:bucket, Key:key},
      function(err,data) {
        if (err) {
           console.log('error getting object ' + key + ' from bucket ' + bucket + 
               '. Make sure they exist and your bucket is in the same region as this function.');
           context.done('error','error getting file'+err);
        }
        else {
           console.log('CONTENT TYPE:',data.ContentType);
           context.done(null,'');
        }
      }
   );
};

作成したバケット配下のフォルダにCSVファイルをアップロードした時の実行ログです。CloudWatch Logsの情報として生成されているので、AWS CLIでも以下の様に内容を確認しています。(※若干改行等の編集を加えています)

$ aws logs get-log-events \
  --log-group-name /aws/lambda/cmS3EventProcessor \
  --log-stream-name edddea39a9ee401292d987ab954aeb60 | \
  jq -r '.events[].message'
2014-11-29T22:24:59.614Z	52fs5ds29opm94fs	Loading event
START RequestId: 9e1cf901-7816-11e4-9064-7fb2fdee54cb
2014-11-29T22:25:00.034Z	9e1cf901-7816-11e4-9064-7fb2fdee54cb	Received event:
2014-11-29T22:25:00.035Z	9e1cf901-7816-11e4-9064-7fb2fdee54cb	{
  "Records": [
    {
      "eventVersion": "2.0",
      "eventSource": "aws:s3",
      "awsRegion": "us-east-1",
      "eventTime": "2014-11-29T22:24:56.949Z",
      "eventName": "ObjectCreated:Put",
      :
      :
        "object": {
          "key": "table_bbb/2014/11/30/query-earthquake-20141130_072417.csv",
          "size": 2013038,
          "eTag": "c39330549acbfc72c61cbb55a5b812c4"
        }
      }
    }
  ]
}
2014-11-29T22:25:00.035Z	9e1cf901-7816-11e4-9064-7fb2fdee54cb	cm-lambda-test
2014-11-29T22:25:00.035Z	9e1cf901-7816-11e4-9064-7fb2fdee54cb	table_bbb/2014/11/30/query-earthquake-20141130_072417.csv
2014-11-29T22:25:00.035Z	9e1cf901-7816-11e4-9064-7fb2fdee54cb	{ key: 'table_bbb/2014/11/30/query-earthquake-20141130_072417.csv',
  size: 2013038,
  eTag: 'c39330549acbfc72c61cbb55a5b812c4' }
2014-11-29T22:25:01.635Z	9e1cf901-7816-11e4-9064-7fb2fdee54cb	CONTENT TYPE: application/octet-stream
END RequestId: 9e1cf901-7816-11e4-9064-7fb2fdee54cb
REPORT RequestId: 9e1cf901-7816-11e4-9064-7fb2fdee54cb	Duration: 1620.13 ms	Billed Duration: 1700 ms 	Memory Size: 512 MB	Max Memory Used: 25 MB	
$

3-2-B.Node.jsプログラム作成

上記での取得情報、また『3-1』での情報を踏まえつつ、管理コンソール上で文法エラーを確認しながら以下の様なコードを作成しました。Node.jsは最近のJavascriptの文法・流儀に則っていないような気がしてますがそこは目を瞑って頂きたく候。

console.log('Loading event');
var aws = require('aws-sdk');
var s3 = new aws.S3({apiVersion: '2006-03-01'});
var pg = require('pg');

// 接続文字列
var rsConnectionString = "tcp://<接続ユーザー名>:<接続パスワード>@<接続時のエンドポイント>:<接続ポート番号>/<接続DB名>";

var rollback = function(client) {
  console.log("ERROR!");
  client.query('ROLLBACK', function() {
    client.end();
  });
};

exports.handler = function(event, context) {
   console.log('Received event:');
   console.log(JSON.stringify(event, null, '  '));
   // Get the object from the event and show its content type
   var bucket = event.Records[0].s3.bucket.name;
   console.log(bucket);
   var key = event.Records[0].s3.object.key;
   console.log(key);
   console.log(event.Records[0].s3.object);
   s3.getObject({Bucket:bucket, Key:key},
      function(err,data) {
        if (err) {
           console.log('error getting object ' + key + ' from bucket ' + bucket + 
               '. Make sure they exist and your bucket is in the same region as this function.');
           context.done('error','error getting file'+err);
        }
        else {
            console.log('CONTENT TYPE:',data.ContentType);
            context.done(null,'');
            console.log("+++++++++++++++++++++++++++++++++++++++")
            console.log("Now COPY CSV TO Amazon Redshift!");
            var paths = key.split("/");
            console.log("-------------------------");
            console.log("tablename:" + paths[0]);
            console.log("year     :" + paths[1]);
            console.log("month    :" + paths[2]);
            console.log("day      :" + paths[3]);
            console.log("filename :" + paths[4]);
            console.log("-------------------------");
            var tablename = paths[0];
            var queryString = "";
            if (tablename == "table_aaa") {
                queryString = getCopySQLforTableAAA(bucket, tablename, key);
            } else if (tablename == "table_bbb") {
                queryString = getCopySQLforTableBBB(bucket, tablename, key);
            } else if (tablename == "table_ccc") {
                queryString = getCopySQLforTableCCC(bucket, tablename, key);
            }
            console.log(queryString);
            // 接続開始
            var client = new pg.Client(rsConnectionString);
            client.connect();
            
            // トランザクション開始
            client.query('BEGIN', function(err, result) {
                if(err) return rollback(client);
                console.log("transaction start.");

                // クエリ実行(1).COPY
                client.query(queryString, function(err, result) {
                    if(err) { console.log(err); return rollback(client); }
                    console.log("COPY Operation Done.");
                    
                    // クエリ実行(2).COMMIT
                    client.query("COMMIT;", client.end.bind(client));
                    console.log("transaction commit.");
                    
                    // クエリ実行(3).VACUUM
                    queryString = "VACUUM public.earthquake;";
                    client.query(queryString, function(err, result) {
                        
                        if(err) { console.log(err); return rollback(client); }
                        console.log("VACUUM Operation Done.");
                        
                        // クエリ実行(4).COMMIT
                        client.query("COMMIT;", client.end.bind(client));
                        console.log("transaction commit.");
                        client.end();
                        
                        // トランザクション終了
                        console.log("transaction end.");

                    });
                });
            });
        }
      }
   );
};

var getCopySQLforTableAAA = function(bucketname, tablename, key) {
    console.log("CREATE SQL Statement for table_aaa:");
    var copySql = "";
    :
    (中略)
    :
    return copySql;
}
var getCopySQLforTableBBB = function(bucketname, tablename, key) {
    console.log("CREATE SQL Statement for table_bbb:");
    var copySql = "";
    copySql += "COPY public." + tablename + " ";
    copySql += "FROM \'s3://" + bucketname + "/" + key + "' ";
    copySql += "CREDENTIALS \'aws_access_key_id=XXXXX;aws_secret_access_key=YYYYY\' "; 
    copySql += "CSV ";
    copySql += "IGNOREHEADER 1 ";
    copySql += "TIMEFORMAT \'auto\' ";
    copySql += "DELIMITER \',\'; ";
    return copySql;
}
var getCopySQLforTableCCC = function(bucketname, tablename, key) {
    console.log("CREATE SQL Statement for table_ccc:");
    var copySql = "";
    :
    (中略)
    :
    return copySql;
}

3-2-C.パッケージング

上記コードはパッケージ『pg』を用いているので、コード単体では稼働しません。ですので3-1同様にパッケージング作業を行います。Node.js環境上だとコメント部分の貼り付けがこちらの意図した様になってくれないので、ひとまずtxtで作成しつつリネームでjsにしています。

$ vi copy-s3csv-to-redshift-by-s3event.js
// 『:set paste』実行後、管理コンソール上で編集したコード全文をコピペ
$ mv copy-s3csv-to-redshift-by-s3event.txt copy-s3csv-to-redshift-by-s3event.js
$ zip -r copy-s3csv-to-redshift-by-s3event.zip copy-s3csv-to-redshift-by-s3event.js node_modules

3-2-D.デプロイ to Lambda Function

上記パッケージングしたzipファイルを管理コンソールにてデプロイします。COPY処理に時間が掛かる事を踏まえ、タイムアウト数値は多目に設定。諸々設定が完了したら[Save]で内容を保存しておきます。

lambda-s3csv-to-redshift-11

3-2-E.動作確認

さぁ、いよいよ動作確認です。まずは投入対象のテーブルの用意から。上記コードでは『table_aaa』『table_bbb』『table_ccc』と3つ記載してましたが、内容は『public.earthquake』と同じイメージです。テーブル名によって異なるテーブルにデータがCOPYされる事を確認したいので、こんな感じでCREATE文を都合3回流しておきます。

// table_bbb, table_cccも同じ構成でテーブル名を変えて作成
# CREATE TABLE public.table_aaa (
	time TIMESTAMP encode delta NOT NULL,
	latitude DOUBLE PRECISION NOT NULL,
 	longitude DOUBLE PRECISION NOT NULL,
 	depth DOUBLE PRECISION,
 	mag  DOUBLE PRECISION,
 	mag_type CHAR(5) NOT NULL,
 	nst SMALLINT,
 	gap DOUBLE PRECISION,
 	dmin DOUBLE PRECISION,
 	rms DOUBLE PRECISION,
 	net CHAR(5) encode lzo NOT NULL,
 	id CHAR(30) encode lzo NOT NULL,
 	updated TIMESTAMP encode delta NOT NULL,
 	place CHAR(100) encode lzo NOT NULL,
 	type CHAR(30) encode lzo NOT NULL
)
distkey(mag)
sortkey(time);

件数確認。当然ながらこの時点では全て0件です。

# SELECT COUNT(*) FROM public.table_aaa;
 count 
-------
     0
(1 row)

# SELECT COUNT(*) FROM public.table_bbb;
 count 
-------
     0
(1 row)

# SELECT COUNT(*) FROM public.table_ccc;
 count 
-------
     0

『public.table_bbb』のフォルダ配下に、以下の様にファイルをアップロードしてみました。

lambda-s3csv-to-redshift-12

上記操作後、CloudWatch Logsの該当ストリームのログ内容を確認してみます。ちゃんと動いてそうですね!

lambda-s3csv-to-redshift-13

SQLで件数も確認してみます。上手く対象のテーブルにデータが入っている事を確認出来ました!

# SELECT COUNT(*) FROM public.table_aaa;
 count 
-------
     0
(1 row)

# SELECT COUNT(*) FROM public.table_bbb;
 count 
-------
 38674
(1 row)

# SELECT COUNT(*) FROM public.table_ccc;
 count 
-------
     0
(1 row)

まとめ

以上、LambdaによるRedshiftへのデータCOPY処理実践でした。以下その他気になる点・雑感等を。

  • サンプルコードはアクセスキー等を直に記載しているので、実運用の際は直書きしなくても良いような手段に変える必要がある。何か上手いことIAM Roleとかから取れないかな?
  • 『S3イベント通知』での連携について
    • 仕組み上は行ける事が確認出来たけど、実運用する場合は色々と考慮しないと行けない部分が多そう。
    • 数が多くなると処理を待ち受けるRedshiftクラスタのスペックが気になってくる。同時クエリ実行数とかあの辺。一気にワッと数がアップロードされるようなケースだと、すぐにその辺枯渇しそう。
    • あと、タイムアウトの上限について。これも状況に拠っては1分以上掛かるケースも十分有り得るので、そこからすると『短い』という気がしなくもない。もしLambda経由でS3データをRedshiftにロードするだったりSQL処理を噛ます場合であれば、現状『60秒』の壁は超えない様にしないとならない。
    • データ投入を単発で、ファイルアップロード駆動で実行するのであれば良さそうだけど、SQLによるETLとか他処理との連携ってなるとあまり向いてないのかしら?この辺りは別サービスと連携する事で対応可能だったりするのかな?素直にシェルCron起動やAWS Data Pipelineに任せた方が良い気もする。
  • Node.js及びJavascript力については、今後も継続して高めて行かんとアカンですね(´・ω・`)

という訳で、こちらからは以上です。

明日3日目はryo0301さんによる"サムネイル作成"に関するエントリの予定です。お楽しみに!