Spring Batchでバッチ処理
はじめに
Javaでのバッチ処理について調べていると、どうやらSpring Batchが便利らしいので使ってみました。
環境
Mac OSX 10.10.5 Yosemite Spring Boot 1.3.5 PostgreSQL 9.5
JobとStep
Spring Batchでの最小単位はステップで、そこに処理内容を記述します。 そのステップをまとめた処理を行うのがジョブです。 また、ステップには2種類、「tasklet」と「chunk」が有りますが、今回はchunk(チャンク)のみ使用しています。
準備
schema-all.sql
「src/main/resources」に置いてプラットフォームに合わせたファイル名にすると実行時に読み込まれます。 「all」としているのでどのプラットフォームでも実行されるそうです。 PostgreSQLでは実行されました。
DROP TABLE IF EXISTS fruit; CREATE TABLE fruit ( id SERIAL NOT NULL PRIMARY KEY , name VARCHAR(10) , price INT);
postgres=# SELECT * FROM fruit; int | name | price -----+------+------- (0 rows)
fruit_price.csv
フルーツ名と値段のデータです。 これをテーブルに登録します。 ファイルは「src/main/resources」に置いてください。
apple,300 orange,200 banana,100
gradle
dependenciesに下記を追加してgradleをrefreshしてください。
dependencies { compile("org.springframework.boot:spring-boot-starter-batch") compile('org.springframework.boot:spring-boot-starter-jdbc') runtime('org.postgresql:postgresql') compile('org.springframework.boot:spring-boot-starter-test') }
application.yml
データベースPostgreSQLの接続情報です。
spring: datasource: url: jdbc:postgresql://localhost/postgres username: XXXXX password: YYYYY driverClassName: org.postgresql.Driver
コード
起動クラス:SpringBatchApplication
package com.test.batch; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; @SpringBootApplication public class SpringBatchApplication { public static void main(String[] args) { SpringApplication.run(SpringBatchApplication.class, args); } }
カラム定義:Fruit
データベースのカラム情報と一致させます。 「id」はSERIALに設定しているので、ここでは定義していません。
package com.test.batch; public class Fruit { private String name; private int price; public Fruit() { } public Fruit(String name, int price) { this.name = name; this.price = price; } // getter-setter public String getName() {return name;} public void setName(String name) {this.name = name;} public int getPrice() {return price;} public void setPrice(int price) {this.price = price;} }
プロセッサー
取得したアイテム(今回はCSV)を加工しています。 フルーツ名をtoUpperCase()で大文字に変換しています。
package com.test.batch; import org.springframework.batch.item.ItemProcessor; public class FruitItemProcessor implements ItemProcessor<Fruit, Fruit> { @Override public Fruit process(final Fruit fruit) throws Exception { final String title = fruit.getName().toUpperCase(); final int price = fruit.getPrice(); final Fruit transformColumns = new Fruit(title, price); return transformColumns; } }
リスナー
処理の開始と終了の合図に使っています。
package com.test.batch; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.listener.JobExecutionListenerSupport; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; public class JobStartEndLIstener extends JobExecutionListenerSupport { private final JdbcTemplate jdbcTemplate; @Autowired public JobStartEndLIstener(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } // ステップの開始前に実行 @Override public void beforeJob(JobExecution jobExecution) { super.beforeJob(jobExecution); System.out.println("開始"); } // ステップの終了後に実行 @Override public void afterJob(JobExecution jobExecution) { super.afterJob(jobExecution); System.out.println("終了"); } }
バッチ処理:Batch
実行すると下記の流れで動作します。 1、Jobを実行 2、Listenerを実行 3、Step1を実行 3−1、Readerでアイテムを読み込む 3−2、Processorで読み込んだ物を加工 3−3、Writerで加工したデータを書き込む 4、Step2を実行(内容は3と同様) 5、Listenerを実行
下記ではコードを分割して解説していきます。
package com.test.batch; import javax.sql.DataSource; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecutionListener; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.jdbc.core.JdbcTemplate; @Configuration @EnableBatchProcessing public class Batch { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Autowired public DataSource dataSource;
必要な要素をAutowiredで接続します。
Readerでアイテムを読み込む
// Reader @Bean public FlatFileItemReader<Fruit> reader() { FlatFileItemReader<Fruit> reader = new FlatFileItemReader<Fruit>(); reader.setResource(new ClassPathResource("fruit_price.csv")); reader.setLineMapper(new DefaultLineMapper<Fruit>() {{ setLineTokenizer(new DelimitedLineTokenizer() {{ setNames(new String[] { "name" , "price" }); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper<Fruit>() {{ setTargetType(Fruit.class); }}); }}); return reader; }
46行目:ReaderにItem"fruit_price.csv"を読み込む 47〜54行目:後述するSQLに読み込ませる配列を設定し、Fruitクラスと紐付ける 56行目:Readerを返す。
Processorで読み込んだ物を加工
// Processor @Bean public FruitItemProcesser processor() { return new FruitItemProcessor(); }
Writerで加工したデータを書き込む
// Writer @Bean public JdbcBatchItemWriter<Fruit> writer() { JdbcBatchItemWriter<Fruit> writer = new JdbcBatchItemWriter<Fruit>(); writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Fruit>()); writer.setSql("INSERT INTO fruit (name, price) VALUES (:name, :price)"); writer.setDataSource(dataSource); return writer; }
Readerの49行目で設定した「name」「price」を、70行目のSQLで読み込ませるので「:name」「:price」と合わせています。
Listenerを設定
Jobの開始と終了を案内するJobStartEndListenerを返します。
@Bean public JobExecutionListener listener() { return new JobStartEndLIstener(new JdbcTemplate(dataSource)); }
StepにReader,Processor,Writerをまとめる
step1()、step2()と作っていますが実行内容は同じです。
// ステップ1 @Bean public Step step1() { return stepBuilderFactory.get("step1") .<Fruit,Fruit> chunk(10) .reader(reader()) .processor(processer()) .writer(writer()) .build(); } // ステップ2 @Bean public Step step2() { return stepBuilderFactory.get("step2") .<Fruit,Fruit> chunk(10) .reader(reader()) .processor(processer()) .writer(writer()) .build(); }
JobにStepをまとめる
ここでListenerとStepを読み込ませます。
// ジョブ @Bean public Job testJob() { return jobBuilderFactory.get("testJob") .incrementer(new RunIdIncrementer()) .listener(listener()) .flow(step1()) .next(step2()) .end() .build(); } }
実行
同じ処理のstep1()、step2()が実行されたので、下記のようになりました。 ちゃんと小文字が大文字になっています。
postgres=# select * from fruit; id | name | price ----+--------+------- 1 | APPLE | 300 2 | ORANGE | 200 3 | BANANA | 100 4 | APPLE | 300 5 | ORANGE | 200 6 | BANANA | 100 (6 rows)
引っかかったところ
テストでgetterとsetterのメソッド名を変更したところ、実行エラーが発生しました。 変更点は、「getName」から「getNaMe」です。
public String getName() {return name;} ↓ public String getNaMe() {return name;}
org.springframework.dao.InvalidDataAccessApiUsageException: No value supplied for the SQL parameter 'name': Invalid property 'name' of bean class [com.test.batch.Fruit]: Bean property 'name' is not readable or has an invalid getter method: Does the return type of the getter match the parameter type of the setter?
どうやらフレームワークごとに命名規則が有り、今回はそこに引っかかった様でした。 getter/setterで有れば、通常は自動で作成するかlombokを使うので上記エラーは発生しませんが、 こういう事が有るという例として上げました。
さいごに
Eclipseを使用していますが、Spring Framework専用のエラーは表示されない場合が有るので、そこに気をつける必要が有りますね。