Talend×LambdaでサーバーレスのETL処理をする

2016.09.21

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

自分が所属しているデータインテグレーション部では、TalendというETLツールをよく使っています。
Talendの使い方としては、S3からデータを取得してデータ加工処理を行い、再度S3へアップロードしてRedshiftにデータをロードするということをよく行っています。Talendで作成したジョブはEC2上に配置して定期的に実行し、S3にデータがあれば処理をするということをしております。
Talendのジョブを実行する場合、jarファイルにして出力しそれを実行します。なのでTalendで作成したjarファイルをLambdaから呼び出せればジョブを実行できるなと考え、今回やってみました。

開発環境

  • Windows7
  • Java 1.8.0_25
  • Eclipse4.4
  • Talend Open Studio for Data Integration 6.2.1
  • Talendでジョブを作成する

    Talendでジョブを作成します。今回はS3にファイルがアップロードされたのをトリガーにS3からCSVファイルを取得して、データ変換処理を行いRedshiftに登録するまでをやっていたいと思います。
    今回Talendで使用するコンポーネントは以下の5つになります。

  • tS3Get・・・S3からオブジェクトを取得するコンポーネント
  • tFileInputDelimited・・・区切り文字形式のファイルを読み込むコンポーネント
  • tMap・・・データを変換するコンポーネント
  • tFileOutputDelimited・・・区切り文字形式にファイルを出力するコンポーネント
  • tS3Put・・・S3にファイルをアップロードするコンポーネント
  • tRedshiftBulkExec・・・S3にあるファイルをRedshiftにロードするコンポーネント
  • ジョブ全体としてはこのようになります。
    01_Talendジョブ構成
    簡単にTalendのジョブの説明をしていきます。
    ジョブを実行すると上記の上から順番(同列にあるものは左から右に)に処理がされていきます。
    ①tS3GetでS3からCSVファイルを取得します。設定する項目としてはアクセスキー、シークレットキー、Region、バケット、Key、出力先のファイルパスになります。Lambdaは一時作業ディレクトリとして /tmp フォルダが誰でも読み書きできる領域となっているためS3からダウンロードしたファイルはそこに保存するようにしています。
    02_S3Get

    ②tFileInputDelimitedでS3からダウンロードしたファイルを読み込みます。ファイル名/ストリームに読み込むファイルを指定します。フィールド区切り記号ですが今回はカンマ区切りのデータにしているのでカンマを指定します。またヘッダ行があるため読み込みをスキップするのにヘッダーに1を指定して1行スキップしています。
    画像03_tFileInputDelimited

    ③読み込んだCSVデータをtMapで変換します。ここでは簡単に読み込んだデータの最後に作成日という形でnew Date()を使って日付を追加しています。
    画像7_tMap

    ④tFileOutputDelimitedで変換したデータをファイル出力します。tS3Getと同様に/tmp フォルダに出力するようにしています。
    04_tFileOutputDelimited

    ⑤tS3PutでS3にファイルをアップロードします。設定項目としては①のtS3Getと同じになります。

    ⑥tRedshiftBulkExecでS3にアップロードしたファイルをRedshiftにロードします。ここではロード対象のS3の設定、登録するRedshiftを指定します。
    05_tRedshiftBulkExec


    S3から取得するcsvファイルはこんな感じです。

    id,name
    1,hoge
    2,fuga
    3,piyo

    ジョブを出力する

    作成したTalendのジョブを出力します。作成したジョブを選択して右クリックから「ジョブをエクスポート」を選択し、出力します。出力したZIPファイルを解凍するとジョブ名と同じフォルダが作成されています。そのフォルダ内にあるjarファイルが今回作成したコンポーネントをビルドしたものになります。またlibフォルダにはそのビルドしたjarに必要なライブラリが格納されています。今回作成したジョブは「etl_sample」というジョブ名なのでetl_sample_0.1.jarが作成されています。

    EclipseでLambdaプロジェクトを作成する

    EclipseにAWS の開発用ツールをインストールします。メニュー->ヘルプ->新規ソフトウェアのインストールから作業対象に「http://aws.amazon.com/eclipse」を入力し、AWS Development ToolsからLambda Pluginをインストールします。
    画像1_lambda_plugin
    クレデンシャル情報はメニューのウィンドウ->設定->AWSツールキットから設定して下さい。 次に新規プロジェクトからAWS Lambda Java Projectを選択し、プロジェクト名、パッケージ名をを入力し「入力タイプ」が「S3 Event」になっていることを確認して完了ボタンをクリックします。
    画像3_プロジェクト作成
    プロジェクトが作成できたので、先ほど出力したTalendのjarを追加します。プロジェクト直下にlibsフォルダを作成し、以下のjarファイルを配置しクラスパスを通します。

  • etl_sample_0_1.jar(今回作成したコンポーネントをビルドしたもの)
  • routines.jar
  • talend_file_enhanced_20070724.jar
  • talendcsv.jar
  • 次にTalendで作成したetl_sample_0_1.jarを呼び出す実装をします。まずetl_sample_0_1.jarの中身を見てみるとmainメソッドは以下のようになっています。

    public static void main(String[] args) {
      final etl_sample etl_sampleClass = new etl_sample();
    
      int exitCode = etl_sampleClass.runJobInTOS(args);
    
      System.exit(exitCode);
    }

    見て分かるとおりmainメソッドはとてもシンプルです。LambdaプロジェクトのhandleRequestメソッドからも同じように呼び出しをします。

    @Override
    public Object handleRequest(S3Event input, Context context) {
      context.getLogger().log("Start ETL");
      final etl_sample etl_sampleClass = new etl_sample();
      String[] args = {};
      int exitCode = etl_sampleClass.runJobInTOS(args);
      context.getLogger().log("exitCode = " + exitCode);
      context.getLogger().log("End ETL");
      return null;
    }

    Lambdaに登録する

    実装が終わったのでLambdaに登録します。プロジェクトを選択して右クリックから「Amazon Webサービス->Upload Function to AWS Lambda」を選択します。Regionを選択して、「Create a new Lambda Function」を選択し、適当なfunction名を入力します。「次へ」ボタンをクリックし、IAM Role、S3のバケットを選択して「完了」ボタンをクリックします。これでLambdaへの登録が完了しました。最後にManagement ConsoleからTriggerの設定をします。TriggerはETL処理対象のファイルがS3にアップロードされたらLambdaが動作するようにしています。
    画像9_lambda設定

    Redshiftにテーブルを作成する

    CSVファイルをロードするテーブルを作成します。

    create table sample1 (
      id integer not null
      , name character varying(50) not null
      , created_at timestamp not null
    ) ;

    実行してみる

    では動かしてみます。S3のバケットにsample.csvファイルをアップロードします。Lambdaが実行され、S3バケットの同階層にTalendで変換されたファイルがアップロードされます。Lambdaのログを確認してみます。ログが出力され動作したことが分かります。
    画像11_result_cloudwatch
    次にファイルをアップロードしたS3のフォルダを見てみます。変換処理されたファイルが存在していることが分かります。
    画像12_result_S3
    最後にRedshiftのテーブルの中身を見てみます。CSVファイルのデータが登録されていることも確認できました。
    画像10_result

    まとめ

    いかがだったでしょうか。無事LambdaからTalendで作成したジョブが呼び出せました。Lambdaでの実行なので制約はありますがTalendを使ってサーバーレスでETL処理ができるのはいいなと感じました。今回は以上です。