AWS Data Pipelineリリース!

AWS

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データパイプライン?

AWS Data Pipelineは、パイプラインと呼ばれるデータ駆動のワークフローを管理するWebサービスです。デベロッパーがデータの処理に集中できるように、依存関係の設定や順序実行やスケジュールの管理等を行います。AWS Data Pipeline APIは、基本的な操作を提供していますので、デベロッパーはこれを活用してアプリケーションを構築することができます。パイプラインの作成、スケジューリング、データソースの定義、依存関係、データ変換等の指示が出来ます。

早速使ってみよう

今はus-east-1のみで使えますので、リージョンを切り替えましょう

pipeline-000

新規サービスの申込み完了です。

pipeline-001

画面からパイプラインを作成する

画面に沿ってパイプラインを作成してみましょう。

pipeline-002

パイプライン名等をナビゲージョンに沿って入力していきます。

pipeline-003

そしてお絵描きソフト!?

pipeline-006

用意されているテンプレート

Data Pipelineにはテンプレートが複数用意されています。

pipeline-007

今後はこのテンプレートが流通しそうですね。続きまして、プログラミングしてみましょう〜

主なAPI

以下に主なAPIをご紹介します。

  • ActivatePipeline:パイプラインIDを指定し、作成したパイプラインの検証をしてから初期処理を開始します。規約に沿っていなければエラーとなります。
  • CreatePipeline:パイプラインを新規に作成します。ユニークIDと名前と説明を記述します。
  • DeletePipeline:パイプラインを削除します。削除されたパイプラインは検索したりリストアすることはできません。
  • DescribeObjects:パイプライン内に定義されたオブジェクトの情報を表示します。オプジェクとはキーバリュー型で値を持っています。
  • DescribePipelines:パイプライン自身の情報を表示します。
  • EvaluateExpression:オブジェクトを操作する独自SQLの評価をします。例えば、ある時刻にデータ変換を開始する等です。
  • GetPipelineDefinition:設定されたパイプラインを取得します。
  • ListPipelines:有効なパイプライン一覧を表示します。
  • PollForTask:タスクランナーによって得られる結果をポーリングして取得します。
  • PutPipelineDefinition:タスクやスケジュール等のパイプラインの振る舞いを定義します。
  • QueryObjects:問合せによってフィルターされたオブジェクト一覧です。
  • ReportTaskProgress:タスクランナーの進捗状況を返します。
  • ReportTaskRunnerHeartbeat:タスクランナー自体の状態を返します。
  • SetStatus:オブジェクトの状態を更新します。
  • SetTaskStatus:タスクの状態を更新します。通常は自動で設定されますが、手動で設定することもできます。
  • ValidatePipelineDefinition:パイプライン定義が正しいか検証します。

サンプル作成

とりあえずJavaのSDKが出ていましたので妄想で書いてみたいと思います。

import java.util.ArrayList;
import java.util.List;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.datapipeline.DataPipelineClient;
import com.amazonaws.services.datapipeline.model.CreatePipelineRequest;
import com.amazonaws.services.datapipeline.model.CreatePipelineResult;
import com.amazonaws.services.datapipeline.model.DescribePipelinesRequest;
import com.amazonaws.services.datapipeline.model.DescribePipelinesResult;
import com.amazonaws.services.datapipeline.model.Field;
import com.amazonaws.services.datapipeline.model.PipelineDescription;

public class DataPipeLineSample {

	static DataPipelineClient dpClient;
	
    private static void init() throws Exception {
        AWSCredentials credentials = new PropertiesCredentials(
        		DataPipeLineSample.class.getResourceAsStream("AwsCredentials.properties"));

        dpClient = new DataPipelineClient(credentials);
    }

	public static void main(String[] args) throws Exception {
        init();
        
        String uniqueId = "hogehogehoge-sample";
        String name = "classmethod-sample";
        
        CreatePipelineRequest createPipelineRequest = new CreatePipelineRequest();
        createPipelineRequest.setUniqueId(uniqueId);
        createPipelineRequest.setName(name);
        CreatePipelineResult createPipelineResult = dpClient.createPipeline(createPipelineRequest);
        String pipelineId = createPipelineResult.getPipelineId();
        System.out.println("pipeline ID : " + pipelineId);
        
        List<String> pipelineIds = new ArrayList<String>();
        pipelineIds.add(pipelineId);
        
        DescribePipelinesRequest describePipelinesRequest = new DescribePipelinesRequest();
        describePipelinesRequest.setPipelineIds(pipelineIds);
        DescribePipelinesResult describePipelinesResult = dpClient.describePipelines(describePipelinesRequest);
        List<PipelineDescription> pipelineDescriptionList = describePipelinesResult.getPipelineDescriptionList();
        for(int i=0;i<pipelineDescriptionList.size();i++){
        	PipelineDescription pipelineDescription = pipelineDescriptionList.get(i);
        	System.out.println("PipelineId : "+pipelineDescription.getPipelineId());
        	System.out.println("Name : "+pipelineDescription.getName());
        	System.out.println("Description : "+pipelineDescription.getDescription());
        	List<Field> fields = pipelineDescription.getFields();
        	for(int j=0;j<fields.size();j++){
        		Field field = fields.get(j);
        		System.out.println("  Key : "+field.getKey());
        		System.out.println("  RefValue : "+field.getRefValue());
        		System.out.println("  StringValue : "+field.getStringValue());
        	}
        }
	}

}

実行結果は以下です。

pipeline ID : df-01962283IRRTZGIGVXXX
PipelineId : df-01962283IRRTZGIGVSDDD
Name : classmethod-sample
Description : null
  Key : @pipelineState
  RefValue : null
  StringValue : PENDING
  Key : name
  RefValue : null
  StringValue : classmethod-sample
  Key : @creationTime
  RefValue : null
  StringValue : 2012-12-21T10:16:35
  Key : @id
  RefValue : null
  StringValue : df-01962283IRRTZGIGVasdf
  Key : @sphere
  RefValue : null
  StringValue : PIPELINE
  Key : @userId
  RefValue : null
  StringValue : 123412814XXX
  Key : uniqueId
  RefValue : null
  StringValue : hogehogehoge-sample
  Key : @accountId
  RefValue : null
  StringValue : 123412814XXX

まとめ

AWS Data Pipelineは要注目のサービスです。様々なオブジェクトを連携させたワークフローに対して、時刻指定でランナーを走らせて結果を得るような流れは、日々変わるビジネスプロセス系アプリケーションの開発にはピッタリですね!次回はもっと深く調べたいと思います。

参考資料

AWS Data Pipeline API Reference (API Version 2012-10-29)