JavaよりEMRを起動してHiveスクリプトを実行する

2017.01.13

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

アプリケーションからEMRを起動し、Hiveスクリプトを実行して結果を出力した場合があるかと思います。今回はこのケースに対応するサンプルをJavaにて実装してみました。Spring Boot + AWS SDK for Javaを用いて実装しましたが、その他の言語でも考え方は応用できるかと思います。

実装について

処理の流れ

まずは今回のサンプルの処理の流れについてです。以下のようになります。

  1. EMRクラスターを起動する
  2. クラスターにステップを追加してHiveスクリプトを実行する
  3. 実行完了後にクラスターを終了する

抽出対象データ

今回のサンプルではCSVファイルをEMR(Hadoop)に取込み、任意の年月のデータを抽出しました。元データとなるCSVは以下の様になります。

2010,01,01,user_01,remarks 01
2010,01,02,user_02,remarks 02
2010,01,03,user_03,remarks 03
2010,01,04,user_01,remarks 04
2010,01,05,user_02,remarks 05
2010,01,06,user_03,remarks 06
2010,01,07,user_01,remarks 07
2010,01,08,user_02,remarks 08
2010,01,09,user_03,remarks 09
2010,01,10,user_01,remarks 10
2010,02,01,user_02,remarks 11
2010,02,02,user_03,remarks 12
2010,02,03,user_01,remarks 13
2010,02,04,user_02,remarks 14
2010,02,05,user_03,remarks 15
2010,02,06,user_01,remarks 16
2010,02,07,user_02,remarks 17
2010,02,08,user_03,remarks 18
2010,02,09,user_01,remarks 19
2010,02,10,user_02,remarks 20
2010,03,01,user_03,remarks 21
2010,03,02,user_01,remarks 22
2010,03,03,user_02,remarks 23
2010,03,04,user_03,remarks 24
2010,03,05,user_01,remarks 25
2010,03,06,user_02,remarks 26
2010,03,07,user_03,remarks 27
2010,03,08,user_01,remarks 28
2010,03,09,user_02,remarks 29
2010,03,10,user_03,remarks 30

このCSVファイルは、予めS3の任意のバケット内に保存しておく必要があります(後ほどHiveスクリプトで入力データのパスを指定する)。

build.gradle

では、Javaの実装についてです。Spring Boot + AWS SDK for Javaを使用するため、build.gradleは以下のようになりました。

build.gradle
buildscript {
	ext {
		springBootVersion = '1.4.3.RELEASE'
	}
	repositories {
		mavenCentral()
	}
	dependencies {
		classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
	}
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'

jar {
	baseName = 'springBootEmrHiveSample'
	version = '0.0.1-SNAPSHOT'
}

sourceCompatibility = 1.8

repositories {
	mavenCentral()
}


dependencies {
	compile('org.springframework.boot:spring-boot-starter')
	compile group: 'com.amazonaws', name: 'aws-java-sdk', version: '1.11.75'
	testCompile('org.springframework.boot:spring-boot-starter-test')
}

Application.java

次にmain()を持つApplicationクラスです。今回のサンプルではrun()メソッド内で処理順序を制御します。AWS SDK for Javaを用いてのEMRの操作は後述するEMRクラス内で実装し、ここではEMRクラスのメソッドを呼び出すのみです。

Application.java
package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		try (ConfigurableApplicationContext ctx = SpringApplication
				.run(Application.class, args)) {
			Application app = ctx.getBean(Application.class);
			app.run(args);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public void run(String... args) throws Exception {
		System.out.println("処理開始");

		Emr emr = new Emr();

		//EMRクライアント初期化
		emr.init();
		
		//JobFlowの起動
		String jobFlowId = emr.runJobFlow();
		
		//クラスタの起動待機
		while (true) {
			if (emr.getClusterStatus(jobFlowId).equals("WAITING"))
				break;
			System.out.println("Cluster creating.");
			Thread.sleep(10000);
		}
		
		//ステップの追加
		String stepId = emr.addStep(jobFlowId);
		
		//ステップの処理完了待機
		while (true) {
			if (emr.getStepStatus(jobFlowId, stepId).equals("COMPLETED"))
				break;
			System.out.println("Step running.");
			Thread.sleep(10000);
		}
		
		//JobFlowの終了
		emr.terminateJobFlow(jobFlowId);

		System.out.println("処理終了");
	}
}

「JobFlowの起動」でEMRクラスターを立ち上げ、起動するまで待機します。次にステップを追加してHiveスクリプトを実行し、処理が完了するまで待機します。最後に「JobFlowの終了」でEMRクラスターを終了します。

Emr.java

EMRを実際に操作するクラスです。先ほどのApplicationクラス内のメソッドの呼び出し元と対比して見ると分かり易いかと思います。

Emr.java
package com.example;

import java.util.ArrayList;
import java.util.List;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.ActionOnFailure;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
import com.amazonaws.services.elasticmapreduce.model.Application;
import com.amazonaws.services.elasticmapreduce.model.DescribeClusterRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeClusterResult;
import com.amazonaws.services.elasticmapreduce.model.DescribeStepRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeStepResult;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig;
import com.amazonaws.services.elasticmapreduce.model.PlacementType;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsRequest;

public class Emr {
	private final String AWS_PROFILE = "your_profile"; //AWSクレデンシャルのプロファイル名
	private final String EC2_KEY_NAME = "your_ec2_keypair_name"; //EC2のキーペア名
	private final String LOG_URI = "s3://xxxxxx/logs"; //ログの出力先となるS3フォルダパス
	private final String QUERY_PATH = "s3://xxxxxx/query.sql"; //実行するHiveスクリプトのS3上のフルパス
	private final String INPUT_PATH = "s3://xxxxxx/input/"; //EMRに取り込む元データを格納したS3フォルダパス
	private final String OUTPUT_PATH = "s3://xxxxxx/output/"; //Hiveスクリプトの実行結果を出力するS3フォルダパス
	
	private final String CLUSTER_NAME = "clusterName";
	private final String EMR_VERSION = "emr-5.2.1";
	private final String SERVICE_ROLE = "EMR_DefaultRole";
	private final String JOB_FLOW_NAME = "EMR_EC2_DefaultRole";
	private final String MASTER_INSTANCE_TYPE = "m3.xlarge";
	private final String SLAVE_INSTANCE_TYPE = "m3.xlarge";
	private final String STEP_NAME = "Hive Program";
	
	private AmazonElasticMapReduceClient emrClient;

	public void init() {
		AWSCredentialsProvider credentialsProvider = new ProfileCredentialsProvider(
				AWS_PROFILE);
		emrClient = new AmazonElasticMapReduceClient(credentialsProvider);
		emrClient.setRegion(Region.getRegion(Regions.AP_NORTHEAST_1));
	}

	public String runJobFlow() {
		Application hive = new Application();
		hive.withName("Hive");

		RunJobFlowRequest request = new RunJobFlowRequest()
				.withName(CLUSTER_NAME)
				.withApplications(hive).withLogUri(LOG_URI)
				.withReleaseLabel(EMR_VERSION)
				.withServiceRole(SERVICE_ROLE)
				.withJobFlowRole(JOB_FLOW_NAME)
				.withVisibleToAllUsers(true)
				.withInstances(new JobFlowInstancesConfig().withInstanceCount(2)
						.withKeepJobFlowAliveWhenNoSteps(true)
						.withPlacement(new PlacementType()
								.withAvailabilityZone("ap-northeast-1a"))
						.withEc2KeyName(EC2_KEY_NAME)
						.withMasterInstanceType(MASTER_INSTANCE_TYPE)
						.withSlaveInstanceType(SLAVE_INSTANCE_TYPE));

		RunJobFlowResult result = emrClient.runJobFlow(request);
		return result.getJobFlowId();
	}

	public String getClusterStatus(String jobFlowId) {
		DescribeClusterResult result = emrClient.describeCluster(
				new DescribeClusterRequest().withClusterId(jobFlowId));
		return result.getCluster().getStatus().getState();
	}

	public String addStep(String jobFlowId) {
		AddJobFlowStepsResult result = emrClient
				.addJobFlowSteps(new AddJobFlowStepsRequest()
						.withJobFlowId(jobFlowId).withSteps(buildStepConfig()));
		return result.getStepIds().get(0);
	}

	public String getStepStatus(String jobFlowId, String stepId) {
		DescribeStepResult result = emrClient
				.describeStep(new DescribeStepRequest().withClusterId(jobFlowId)
						.withStepId(stepId));
		return result.getStep().getStatus().getState();
	}

	public void terminateJobFlow(String jobFlowId) {
		emrClient.terminateJobFlows(
				new TerminateJobFlowsRequest().withJobFlowIds(jobFlowId));
	}

	private List<StepConfig> buildStepConfig() {
		List<StepConfig> result = new ArrayList<>();

		String[] args = { 
				"hive-script", 
				"--run-hive-script", 
				"--args", 
				"-f", QUERY_PATH, 
				"-d", "INPUT=" + INPUT_PATH, 
				"-d", "OUTPUT=" + OUTPUT_PATH, 
				"-d", "YYYY=2010", 
				"-d", "MM=01" 
				};

		StepConfig step = new StepConfig().withName(STEP_NAME)
				.withActionOnFailure(ActionOnFailure.CONTINUE)
				.withHadoopJarStep(new HadoopJarStepConfig()
						.withJar("command-runner.jar").withArgs(args));

		result.add(step);
		return result;
	}
}

ソースの上部にある定数には適切な値を設定してください。100行目から始まるbuildStepConfig()でステップに渡す引数を設定しています。設定方法は「-d 引数名=値」となるよう指定します。

なお、このメソッド内で指定した値がマネージメントコンソール上のステップに以下のように表示されます。 java-emr-run-hive-script-1
なので開発時には、先にマネージメントコンソール上でステップを実行しつつHiveスクリプトや引数を決め、ここに表示されている値を元にプログラムから引数を指定することも可能です。

Hiveスクリプト

今回実行するHiveスクリプトとなります。

DROP TABLE IF EXISTS csv_input;

CREATE EXTERNAL TABLE csv_input
(
    yyyy string,
    mm string,
    dd string,
    user_name string,
    remarks string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION '${INPUT}';

DROP TABLE IF EXISTS csv_output;

CREATE EXTERNAL TABLE csv_output
(
    yyyy string,
    mm string,
    dd string,
    remarks string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '${OUTPUT}';

FROM csv_input input
INSERT OVERWRITE TABLE csv_output
SELECT
input.yyyy,
input.mm,
input.dd,
input.remarks
WHERE input.yyyy = ${YYYY}
AND input.mm = ${MM};

Hiveスクリプトでは引数は「${引数名}」の書式で指定します。ここの引数名の具体的な値として、先のJavaで指定した値が設定されます。

まとめ

以上でJavaからEMRクラスターを起動してHiveスクリプトを実行することができました。私個人としては、マネージメントコンソール上の表示とJavaのソースの関係が分かったことで、より理解が深まりました。プログラムからEMRクラスターを起動する際などの参考になれば幸いです。

参考サイト

以下のサイトを参考にさせて頂きました。ありがとうございました。