Spring Batchでバッチ処理

2016.06.27

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

Javaでのバッチ処理について調べていると、どうやらSpring Batchが便利らしいので使ってみました。

環境

Mac OSX 10.10.5 Yosemite
Spring Boot 1.3.5
PostgreSQL 9.5

JobとStep

Spring Batchでの最小単位はステップで、そこに処理内容を記述します。
そのステップをまとめた処理を行うのがジョブです。
また、ステップには2種類、「tasklet」と「chunk」が有りますが、今回はchunk(チャンク)のみ使用しています。

準備

schema-all.sql

「src/main/resources」に置いてプラットフォームに合わせたファイル名にすると実行時に読み込まれます。
「all」としているのでどのプラットフォームでも実行されるそうです。
PostgreSQLでは実行されました。

DROP TABLE IF EXISTS fruit;

CREATE TABLE fruit (
 id SERIAL NOT NULL PRIMARY KEY
 , name VARCHAR(10)
 , price INT);
postgres=# SELECT * FROM fruit;
 int | name | price 
-----+------+-------
(0 rows)

fruit_price.csv

フルーツ名と値段のデータです。
これをテーブルに登録します。
ファイルは「src/main/resources」に置いてください。

apple,300
orange,200
banana,100

gradle

dependenciesに下記を追加してgradleをrefreshしてください。

dependencies {
    compile("org.springframework.boot:spring-boot-starter-batch")
    compile('org.springframework.boot:spring-boot-starter-jdbc')
    runtime('org.postgresql:postgresql')
    compile('org.springframework.boot:spring-boot-starter-test') 
}

application.yml

データベースPostgreSQLの接続情報です。

spring:
  datasource:
    url: jdbc:postgresql://localhost/postgres
    username: XXXXX
    password: YYYYY
    driverClassName: org.postgresql.Driver

コード

起動クラス:SpringBatchApplication

package com.test.batch;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class SpringBatchApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringBatchApplication.class, args);
	}

}

カラム定義:Fruit

データベースのカラム情報と一致させます。 「id」はSERIALに設定しているので、ここでは定義していません。

package com.test.batch;

public class Fruit {
	
	private String name;
	private int price;
	
	public Fruit() {
	}
	
	public Fruit(String name, int price) {
		this.name = name;
		this.price = price;
	}

	// getter-setter
	public String getName() {return name;}
	public void setName(String name) {this.name = name;}
	public int getPrice() {return price;}
	public void setPrice(int price) {this.price = price;}

}

プロセッサー

取得したアイテム(今回はCSV)を加工しています。
フルーツ名をtoUpperCase()で大文字に変換しています。

package com.test.batch;

import org.springframework.batch.item.ItemProcessor;

public class FruitItemProcessor implements ItemProcessor<Fruit, Fruit> {

	@Override
	public Fruit process(final Fruit fruit) throws Exception {
		final String title = fruit.getName().toUpperCase();
		final int price = fruit.getPrice();
		
		final Fruit transformColumns = new Fruit(title, price);
		
		return transformColumns;
	}

}

リスナー

処理の開始と終了の合図に使っています。

package com.test.batch;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;

public class JobStartEndLIstener extends JobExecutionListenerSupport {

	private final JdbcTemplate jdbcTemplate;
	
	@Autowired
	public JobStartEndLIstener(JdbcTemplate jdbcTemplate) {
		this.jdbcTemplate = jdbcTemplate;
	}
	
	// ステップの開始前に実行
	@Override
	public void beforeJob(JobExecution jobExecution) {
		super.beforeJob(jobExecution);
		System.out.println("開始");
	}	

	// ステップの終了後に実行
	@Override
	public void afterJob(JobExecution jobExecution) {
		super.afterJob(jobExecution);
		System.out.println("終了");
	}

}

バッチ処理:Batch

実行すると下記の流れで動作します。
1、Jobを実行
2、Listenerを実行
3、Step1を実行
3−1、Readerでアイテムを読み込む
3−2、Processorで読み込んだ物を加工
3−3、Writerで加工したデータを書き込む
4、Step2を実行(内容は3と同様)
5、Listenerを実行

下記ではコードを分割して解説していきます。

package com.test.batch;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;

@Configuration
@EnableBatchProcessing
public class Batch {

	@Autowired
	public JobBuilderFactory jobBuilderFactory;

	@Autowired
	public StepBuilderFactory stepBuilderFactory;

	@Autowired
	public DataSource dataSource;

必要な要素をAutowiredで接続します。

Readerでアイテムを読み込む

	// Reader
	@Bean
	public FlatFileItemReader<Fruit> reader() {
		
		FlatFileItemReader<Fruit> reader = new FlatFileItemReader<Fruit>();
		reader.setResource(new ClassPathResource("fruit_price.csv"));
		reader.setLineMapper(new DefaultLineMapper<Fruit>() {{
			setLineTokenizer(new DelimitedLineTokenizer() {{
				setNames(new String[] { "name" , "price" });
			}});
			setFieldSetMapper(new BeanWrapperFieldSetMapper<Fruit>() {{
				setTargetType(Fruit.class);
			}});
		}});

		return reader;
	}

46行目:ReaderにItem"fruit_price.csv"を読み込む
47〜54行目:後述するSQLに読み込ませる配列を設定し、Fruitクラスと紐付ける
56行目:Readerを返す。

Processorで読み込んだ物を加工

	// Processor
	@Bean
	public FruitItemProcesser processor() {
		return new FruitItemProcessor();
	}

Writerで加工したデータを書き込む

	// Writer
	@Bean
	public JdbcBatchItemWriter<Fruit> writer() {
		JdbcBatchItemWriter<Fruit> writer = new JdbcBatchItemWriter<Fruit>();
		writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Fruit>());
		writer.setSql("INSERT INTO fruit (name, price) VALUES (:name, :price)");
		writer.setDataSource(dataSource);
		return writer;
	}

Readerの49行目で設定した「name」「price」を、70行目のSQLで読み込ませるので「:name」「:price」と合わせています。

Listenerを設定

Jobの開始と終了を案内するJobStartEndListenerを返します。

	@Bean
	public JobExecutionListener listener() {
		return new JobStartEndLIstener(new JdbcTemplate(dataSource));
	}

StepにReader,Processor,Writerをまとめる

step1()、step2()と作っていますが実行内容は同じです。

	// ステップ1
	@Bean
	public Step step1() {
		return stepBuilderFactory.get("step1")
				.<Fruit,Fruit> chunk(10)
				.reader(reader())
				.processor(processer())
				.writer(writer())
				.build();
	}

	// ステップ2
	@Bean
	public Step step2() {
		return stepBuilderFactory.get("step2")
				.<Fruit,Fruit> chunk(10)
				.reader(reader())
				.processor(processer())
				.writer(writer())
				.build();
	}

JobにStepをまとめる

ここでListenerとStepを読み込ませます。

	// ジョブ
	@Bean
	public Job testJob() {
		return jobBuilderFactory.get("testJob")
				.incrementer(new RunIdIncrementer())
				.listener(listener())
				.flow(step1())
				.next(step2())
				.end()
				.build();
	}

}

実行

同じ処理のstep1()、step2()が実行されたので、下記のようになりました。
ちゃんと小文字が大文字になっています。

postgres=# select * from fruit;
 id |  name  | price 
----+--------+-------
  1 | APPLE  |   300
  2 | ORANGE |   200
  3 | BANANA |   100
  4 | APPLE  |   300
  5 | ORANGE |   200
  6 | BANANA |   100
(6 rows)

引っかかったところ

テストでgetterとsetterのメソッド名を変更したところ、実行エラーが発生しました。
変更点は、「getName」から「getNaMe」です。

public String getName() {return name;}
↓
public String getNaMe() {return name;}
org.springframework.dao.InvalidDataAccessApiUsageException: 
No value supplied for the SQL parameter 'name': 
Invalid property 'name' of bean class [com.test.batch.Fruit]: 
Bean property 'name' is not readable or has an invalid getter method: 
Does the return type of the getter match the parameter type of the setter?

どうやらフレームワークごとに命名規則が有り、今回はそこに引っかかった様でした。
getter/setterで有れば、通常は自動で作成するかlombokを使うので上記エラーは発生しませんが、
こういう事が有るという例として上げました。

さいごに

Eclipseを使用していますが、Spring Framework専用のエラーは表示されない場合が有るので、そこに気をつける必要が有りますね。