この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
Javaでのバッチ処理について調べていると、どうやらSpring Batchが便利らしいので使ってみました。
環境
Mac OSX 10.10.5 Yosemite Spring Boot 1.3.5 PostgreSQL 9.5
JobとStep
Spring Batchでの最小単位はステップで、そこに処理内容を記述します。 そのステップをまとめた処理を行うのがジョブです。 また、ステップには2種類、「tasklet」と「chunk」が有りますが、今回はchunk(チャンク)のみ使用しています。
準備
schema-all.sql
「src/main/resources」に置いてプラットフォームに合わせたファイル名にすると実行時に読み込まれます。 「all」としているのでどのプラットフォームでも実行されるそうです。 PostgreSQLでは実行されました。
DROP TABLE IF EXISTS fruit;
CREATE TABLE fruit (
id SERIAL NOT NULL PRIMARY KEY
, name VARCHAR(10)
, price INT);
postgres=# SELECT * FROM fruit;
int | name | price
-----+------+-------
(0 rows)
fruit_price.csv
フルーツ名と値段のデータです。 これをテーブルに登録します。 ファイルは「src/main/resources」に置いてください。
apple,300
orange,200
banana,100
gradle
dependenciesに下記を追加してgradleをrefreshしてください。
dependencies {
compile("org.springframework.boot:spring-boot-starter-batch")
compile('org.springframework.boot:spring-boot-starter-jdbc')
runtime('org.postgresql:postgresql')
compile('org.springframework.boot:spring-boot-starter-test')
}
application.yml
データベースPostgreSQLの接続情報です。
spring:
datasource:
url: jdbc:postgresql://localhost/postgres
username: XXXXX
password: YYYYY
driverClassName: org.postgresql.Driver
コード
起動クラス:SpringBatchApplication
package com.test.batch;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class SpringBatchApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchApplication.class, args);
}
}
カラム定義:Fruit
データベースのカラム情報と一致させます。 「id」はSERIALに設定しているので、ここでは定義していません。
package com.test.batch;
public class Fruit {
private String name;
private int price;
public Fruit() {
}
public Fruit(String name, int price) {
this.name = name;
this.price = price;
}
// getter-setter
public String getName() {return name;}
public void setName(String name) {this.name = name;}
public int getPrice() {return price;}
public void setPrice(int price) {this.price = price;}
}
プロセッサー
取得したアイテム(今回はCSV)を加工しています。 フルーツ名をtoUpperCase()で大文字に変換しています。
package com.test.batch;
import org.springframework.batch.item.ItemProcessor;
public class FruitItemProcessor implements ItemProcessor<Fruit, Fruit> {
@Override
public Fruit process(final Fruit fruit) throws Exception {
final String title = fruit.getName().toUpperCase();
final int price = fruit.getPrice();
final Fruit transformColumns = new Fruit(title, price);
return transformColumns;
}
}
リスナー
処理の開始と終了の合図に使っています。
package com.test.batch;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
public class JobStartEndLIstener extends JobExecutionListenerSupport {
private final JdbcTemplate jdbcTemplate;
@Autowired
public JobStartEndLIstener(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
// ステップの開始前に実行
@Override
public void beforeJob(JobExecution jobExecution) {
super.beforeJob(jobExecution);
System.out.println("開始");
}
// ステップの終了後に実行
@Override
public void afterJob(JobExecution jobExecution) {
super.afterJob(jobExecution);
System.out.println("終了");
}
}
バッチ処理:Batch
実行すると下記の流れで動作します。 1、Jobを実行 2、Listenerを実行 3、Step1を実行 3−1、Readerでアイテムを読み込む 3−2、Processorで読み込んだ物を加工 3−3、Writerで加工したデータを書き込む 4、Step2を実行(内容は3と同様) 5、Listenerを実行
下記ではコードを分割して解説していきます。
package com.test.batch;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;
@Configuration
@EnableBatchProcessing
public class Batch {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public DataSource dataSource;
必要な要素をAutowiredで接続します。
Readerでアイテムを読み込む
// Reader
@Bean
public FlatFileItemReader<Fruit> reader() {
FlatFileItemReader<Fruit> reader = new FlatFileItemReader<Fruit>();
reader.setResource(new ClassPathResource("fruit_price.csv"));
reader.setLineMapper(new DefaultLineMapper<Fruit>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames(new String[] { "name" , "price" });
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Fruit>() {{
setTargetType(Fruit.class);
}});
}});
return reader;
}
46行目:ReaderにItem"fruit_price.csv"を読み込む 47〜54行目:後述するSQLに読み込ませる配列を設定し、Fruitクラスと紐付ける 56行目:Readerを返す。
Processorで読み込んだ物を加工
// Processor
@Bean
public FruitItemProcesser processor() {
return new FruitItemProcessor();
}
Writerで加工したデータを書き込む
// Writer
@Bean
public JdbcBatchItemWriter<Fruit> writer() {
JdbcBatchItemWriter<Fruit> writer = new JdbcBatchItemWriter<Fruit>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Fruit>());
writer.setSql("INSERT INTO fruit (name, price) VALUES (:name, :price)");
writer.setDataSource(dataSource);
return writer;
}
Readerの49行目で設定した「name」「price」を、70行目のSQLで読み込ませるので「:name」「:price」と合わせています。
Listenerを設定
Jobの開始と終了を案内するJobStartEndListenerを返します。
@Bean
public JobExecutionListener listener() {
return new JobStartEndLIstener(new JdbcTemplate(dataSource));
}
StepにReader,Processor,Writerをまとめる
step1()、step2()と作っていますが実行内容は同じです。
// ステップ1
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Fruit,Fruit> chunk(10)
.reader(reader())
.processor(processer())
.writer(writer())
.build();
}
// ステップ2
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.<Fruit,Fruit> chunk(10)
.reader(reader())
.processor(processer())
.writer(writer())
.build();
}
JobにStepをまとめる
ここでListenerとStepを読み込ませます。
// ジョブ
@Bean
public Job testJob() {
return jobBuilderFactory.get("testJob")
.incrementer(new RunIdIncrementer())
.listener(listener())
.flow(step1())
.next(step2())
.end()
.build();
}
}
実行
同じ処理のstep1()、step2()が実行されたので、下記のようになりました。 ちゃんと小文字が大文字になっています。
postgres=# select * from fruit;
id | name | price
----+--------+-------
1 | APPLE | 300
2 | ORANGE | 200
3 | BANANA | 100
4 | APPLE | 300
5 | ORANGE | 200
6 | BANANA | 100
(6 rows)
引っかかったところ
テストでgetterとsetterのメソッド名を変更したところ、実行エラーが発生しました。 変更点は、「getName」から「getNaMe」です。
public String getName() {return name;}
↓
public String getNaMe() {return name;}
org.springframework.dao.InvalidDataAccessApiUsageException:
No value supplied for the SQL parameter 'name':
Invalid property 'name' of bean class [com.test.batch.Fruit]:
Bean property 'name' is not readable or has an invalid getter method:
Does the return type of the getter match the parameter type of the setter?
どうやらフレームワークごとに命名規則が有り、今回はそこに引っかかった様でした。 getter/setterで有れば、通常は自動で作成するかlombokを使うので上記エラーは発生しませんが、 こういう事が有るという例として上げました。
さいごに
Eclipseを使用していますが、Spring Framework専用のエラーは表示されない場合が有るので、そこに気をつける必要が有りますね。