Spring BootでSpring BatchのChunkモデルを試す
Spring Batchの挙動が微塵も理解できなくて本気で焦っています。
というよりかは、トランザクションの挙動や、中間コミットされた状態でバッチが再開しようとした際に、どのような挙動になるのかがつかめず、その辺りの知識がない状態で案件で使用したくないというだけなのかもしれないですが……
もう少し、手を動かして頭の中で整理できるようにしてみたいと思います。
読者想定
- Spring BatchのChunkモデルとTaskletモデルの違いがわかっている方
- Spring BatchのChunkモデルでのほぼ最小限の実装例を知りたい方
- JavaConfigを用いたSpring Batchの実装を知りたい方
※ほぼ最小限というのは、Spring Bootを用いないSpring Batchの実装だったり、DataResourceを利用しないInMemoryなバッチ起動等があるので、ほぼという言い回しにしています。
公式のサンプル
Getting Started | Creating a Batch Service
今回は上記リンクの公式サンプル(チャンクモデル)実装を元に、CSVのデータを読み込んで、そのデータをDBにインサートする処理を作っていきます。
JPAなどを用いていますが、それ以外は必要最低限の実装しかしていないので、もう少し実践的な実装方法については別途記事を作成しようと思います。
Spring initializerでひな形作成
Dependenciesには以下を追加
- Lombok
- H2 Database
- Spring Batch
- Spring Data JPA
Spring Bootの各種設定
DBに格納する処理があるので、DBの指定などを行います。
resource/application.properties
を以下のように追記します。
spring.datasource.url=jdbc:h2:./.data/h2/db;MODE=MySQL
spring.jpa.hibernate.ddl-auto=update
spring.h2.console.enabled=true
# 2021/02/12 追記
spring.batch.initialize-schema=always
resource/application.yml
にするのであれば以下のように。
spring:
datasource:
url: jdbc:h2:./.data/h2/db;MODE=MySQL
jpa:
hibernate:
ddl-auto: update
h2:
console:
enabled: true
# 2021/02/12 追記
batch:
initialize-schema: always
データの格納手段をh2dbにし、データの格納先を .data
ディレクトリを指定。更に標準だとSpring Bootを再起動するたびにdbの値が消えてしまうので、永続化の設定も追加した。
また、実際に格納されている値を確認するためにh2dbに付属しているconsosle機能も有効にしている。
これを有効にすることで localhost:8080/h2-console
にアクセスすると実際にdbに格納された値を確認することができる。
2021/02/12 追記 spring.batch.initialize-schema プロパティ
少なくとも、初回起動時は spring.batch.initialize-schema=always
を記載しないと正常に Spring Batch が起動しないようです。
上記プロパティを加えた状態で一度 Spring Batch を起動した状態にすれば以降はこの記述が無くても Spring Batch は起動するようになります。
なお、上記のプロパティが Spring Batch 初回起動時に指定がないと以下のようなエラーが出て、バッチが始動しないのを確認しました。
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-02-12 20:51:04.645 ERROR 10932 --- [ main] o.s.boot.SpringApplication : Application run failed
java.lang.IllegalStateException: Failed to execute ApplicationRunner
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:798) ~[spring-boot-2.4.2.jar:2.4.2]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:785) ~[spring-boot-2.4.2.jar:2.4.2]
~中略~
at com.example.demo.DemoApplication.main(DemoApplication.java:10) ~[main/:na]
Caused by: org.springframework.jdbc.BadSqlGrammarException: PreparedStatementCallback; bad SQL grammar [SELECT JOB_INSTANCE_ID, JOB_NAME from BATCH_JOB_INSTANCE where JOB_NAME = ? and JOB_KEY = ?]; nested exception is org.h2.jdbc.JdbcSQLSyntaxErrorException: テーブル "BATCH_JOB_INSTANCE" が見つかりません
Table "BATCH_JOB_INSTANCE" not found; SQL statement:
SELECT JOB_INSTANCE_ID, JOB_NAME from BATCH_JOB_INSTANCE where JOB_NAME = ? and JOB_KEY = ? [42102-200]
at org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator.doTranslate(SQLErrorCodeSQLExceptionTranslator.java:239) ~[spring-jdbc-5.3.3.jar:5.3.3]
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:70) ~[spring-jdbc-5.3.3.jar:5.3.3]
~以下略~
データ読込元のCSVファイルを作成する
プロジェクトフォルダに sample.csv
という名前でとりあえずcsvファイルを以下のように作成しておきます。
ジョン, スミス
田中, 太郎
ジョセフ, ジョースター
Erik, Satie
※Érik Alfred Leslie Satieと、アクセント記号が本来付きます。
Model, Repositoryクラスを作成
公式のサンプルだと resources/schema-all.sql
を用意してテーブル作成、JDBCで値の書き込みをしていますが、今回はJPAを用いてデータの格納をしてみます。
Modelクラス作成
公式サンプルの resources/schema-all.sql
で作成するテーブルと近いように作ります。
package com.example.demo.domain.user;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
@Data
@Entity(name = "user")
@NoArgsConstructor
public class User {
public User(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
public Integer id;
public String firstName;
public String lastName;
}
Repositoryクラス
DBとデータのやり取りを行うReposirotyクラスを作成します。
JpaRepository
を継承してデータのやり取りをするモデルクラスと主キーの型を指定しています。
package com.example.demo.domain.user;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface UserRepository extends JpaRepository<User, Integer> {
}
Reader, Writer, Processor, Job, Stepを実装するConfigurationクラスの実装
Config用のクラスを作成(今回は BatchConfiguration
クラス)する。
Readerはファイルを読み取る FlatFileItemReader
を用いて実装します。
ProcessorはReaderで読み取ったデータを変換する処理を実装します。特に変換せずに保存するだけだったら省略可能です。
WriterはDBに値を書き込む RepositoryItemWriter
を用いて実装をします。
今回は全てを BatchConfiguration
に実装していくが、それぞれクラスを分けて実装することももちろん可能です。
package com.example.demo.batchprocessing;
import com.example.demo.domain.user.User;
import com.example.demo.domain.user.UserRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.data.RepositoryItemWriter;
import org.springframework.batch.item.data.builder.RepositoryItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import java.text.Normalizer;
@Configuration
@EnableBatchProcessing
@Slf4j
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public UserRepository userRepository;
/**
* sample.csv ファイルを読み込む
* @return バッチ処理用 Reader item
*/
@Bean
public FlatFileItemReader<User> reader() {
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new FileSystemResource("sample.csv"))
.delimited()
.names("firstName", "lastName")
.fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
setTargetType(User.class);
}})
.build();
}
/**
* Reader から渡されたオブジェクトを保存する形に変換する
* 今回はアルファベットの大文字変換と、半角カナを全角カナに変換している
* @return 変換後に保存するアイテム
*/
@Bean
public ItemProcessor<User, User> processor() {
return user -> {
final var firstName = Normalizer.normalize(user.getFirstName().toUpperCase(), Normalizer.Form.NFKC);
final var lastName = Normalizer.normalize(user.getLastName().toUpperCase(), Normalizer.Form.NFKC);
final var transformedUser = new User(firstName, lastName);
log.info("before converted : " + user);
log.info("transformed user : " + transformedUser);
return transformedUser;
};
}
/**
* Processor から渡されたオブジェクトを書き込む(保存処理をかける)
* @return 保存処理用のリポジトリ
*/
@Bean
public RepositoryItemWriter<User> writer() {
return new RepositoryItemWriterBuilder<User>()
.repository(userRepository)
.methodName("save")
.build();
}
/**
* 1つのバッチ処理の処理順やエラーハンドリング等の定義を行う
* @return Jobを返却
*/
@Bean
public Job importUserJob() {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.flow(step1())
.end()
.build();
}
/**
* Jobの中に含まれるステップを定義する
* @return ステップを返却する
*/
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<User, User>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
}
バッチの実行
ここまで作成し、Spring Bootを起動したタイミングでバッチが走るようになる。
Gradleプロジェクトであれば、 bootRun
タスクでSpring Bootが立ち上がる。
11:11:28: Executing task 'DemoApplication.main()'...
> Task :compileJava
> Task :processResources
> Task :classes
> Task :DemoApplication.main()
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.3.5.RELEASE)
2020-11-20 11:11:32.295 INFO 10590 --- [ main] com.example.demo.DemoApplication : Starting DemoApplication on YukinoMacBook.local with PID 10590 (/Users/yuki/demo/build/classes/java/main started by yuki in /Users/yuki/demo)
2020-11-20 11:11:32.298 INFO 10590 --- [ main] com.example.demo.DemoApplication : No active profile set, falling back to default profiles: default
2020-11-20 11:11:33.172 INFO 10590 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data JPA repositories in DEFERRED mode.
2020-11-20 11:11:33.275 INFO 10590 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 88ms. Found 1 JPA repository interfaces.
2020-11-20 11:11:33.889 INFO 10590 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-11-20 11:11:33.975 INFO 10590 --- [ task-1] o.hibernate.jpa.internal.util.LogHelper : HHH000204: Processing PersistenceUnitInfo [name: default]
2020-11-20 11:11:34.095 INFO 10590 --- [ task-1] org.hibernate.Version : HHH000412: Hibernate ORM core version 5.4.22.Final
2020-11-20 11:11:34.249 INFO 10590 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2020-11-20 11:11:34.719 INFO 10590 --- [ task-1] o.hibernate.annotations.common.Version : HCANN000001: Hibernate Commons Annotations {5.1.0.Final}
2020-11-20 11:11:34.754 INFO 10590 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2020-11-20 11:11:34.887 WARN 10590 --- [ main] o.s.b.a.batch.JpaBatchConfigurer : JPA does not support custom isolation levels, so locks may not be taken when launching Jobs
2020-11-20 11:11:34.892 INFO 10590 --- [ main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: H2
2020-11-20 11:11:35.061 INFO 10590 --- [ task-1] org.hibernate.dialect.Dialect : HHH000400: Using dialect: org.hibernate.dialect.H2Dialect
2020-11-20 11:11:35.332 INFO 10590 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
2020-11-20 11:11:35.520 INFO 10590 --- [ main] DeferredRepositoryInitializationListener : Triggering deferred initialization of Spring Data repositories…
2020-11-20 11:11:36.355 INFO 10590 --- [ task-1] o.h.e.t.j.p.i.JtaPlatformInitiator : HHH000490: Using JtaPlatform implementation: [org.hibernate.engine.transaction.jta.platform.internal.NoJtaPlatform]
2020-11-20 11:11:36.367 INFO 10590 --- [ task-1] j.LocalContainerEntityManagerFactoryBean : Initialized JPA EntityManagerFactory for persistence unit 'default'
2020-11-20 11:11:36.690 INFO 10590 --- [ main] DeferredRepositoryInitializationListener : Spring Data repositories initialized!
2020-11-20 11:11:36.702 INFO 10590 --- [ main] com.example.demo.DemoApplication : Started DemoApplication in 5.163 seconds (JVM running for 5.823)
2020-11-20 11:11:36.704 INFO 10590 --- [ main] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: []
2020-11-20 11:11:36.785 INFO 10590 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=importUserJob]] launched with the following parameters: [{run.id=14}]
2020-11-20 11:11:36.849 INFO 10590 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
2020-11-20 11:11:36.952 INFO 10590 --- [ main] c.e.d.b.BatchConfiguration : before converted : User(id=null, firstName=ジョン, lastName=スミス)
2020-11-20 11:11:36.953 INFO 10590 --- [ main] c.e.d.b.BatchConfiguration : transformed user : User(id=null, firstName=ジョン, lastName=スミス)
2020-11-20 11:11:36.954 INFO 10590 --- [ main] c.e.d.b.BatchConfiguration : before converted : User(id=null, firstName=田中, lastName=太郎)
2020-11-20 11:11:36.955 INFO 10590 --- [ main] c.e.d.b.BatchConfiguration : transformed user : User(id=null, firstName=田中, lastName=太郎)
2020-11-20 11:11:36.955 INFO 10590 --- [ main] c.e.d.b.BatchConfiguration : before converted : User(id=null, firstName=ジョセフ, lastName=ジョースター)
2020-11-20 11:11:36.955 INFO 10590 --- [ main] c.e.d.b.BatchConfiguration : transformed user : User(id=null, firstName=ジョセフ, lastName=ジョースター)
2020-11-20 11:11:36.956 INFO 10590 --- [ main] c.e.d.b.BatchConfiguration : before converted : User(id=null, firstName=Erik, lastName=Satie)
2020-11-20 11:11:36.956 INFO 10590 --- [ main] c.e.d.b.BatchConfiguration : transformed user : User(id=null, firstName=ERIK, lastName=SATIE)
2020-11-20 11:11:37.026 INFO 10590 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 176ms
2020-11-20 11:11:37.039 INFO 10590 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=importUserJob]] completed with the following parameters: [{run.id=14}] and the following status: [COMPLETED] in 204ms
Spring Bootの各種設定 でDBの値を見れるコンソールを有効にしているので、実際に格納された値を localhost:8080/h2-console
で確認をしてみる。
Batch終了後にSpringApplicationも終了する
2021/02/12 追記
以前のバージョンでは Batch 終了後にビルドタスクは初期状態だと終了しなかったのですが、Spring Batch のバージョンを新しくして確認したところ、Batch 処理の終了後にビルドタスクも終了することを確認しました。
恐らく Spring Batch 4.3.x がリリースされたタイミングで挙動が変わったようですが、リリースノートにそれらしき記述は見つからず……
以下の記述をしても引き続き正常に動作するので、心配であれば以下記述を試してくださればと思います。
また、それに伴い h2db のコンソールが確認できなくなってしまいました。
H2DB のコンソールを確認する方法として、 spring-boot-starter-web
を build.gradle
の dependencies
に追記していただいて、再度 bootRun
を実行すれば勝手にビルドタスクが終了することはなくなります。
spring-boot-starter-web
を追加後は勝手にビルドタスクは終了しないので、以下の記述を追記してもらうことで、引き続きバッチの終了のタイミングで SpringApplication も終了させることができます。
以下、追記前の内容となります。
これで実装は完成したが、Batch処理が完了してもビルドタスクが止まらないことに気づくと思います。
そこで、main関数でSpringApplicationを呼んでいる箇所に以下関数でくくってあげることで、Batch処理が終了した際に自動的にSpringApplicationも終了してくれるようになります。
※バッチの処理が短い場合、h2DBのコンソールが見れなくなってしまうので、実際に案件などでBatch処理を作る際にはこちらを思い出していただければと思います。また、本番環境ではh2DBのコンソールは無効にするのをお忘れなく。
package com.example.batchDemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
- SpringApplication.run(DemoApplication.class, args);
+ System.exit(SpringApplication.exit(SpringApplication.run(DemoApplication.class, args)));
}
}
終わりに
ここまでできたら実際にJARにまとめるなどして、実際にバッチとして使い始めることができます。
基本的なバッチであればChunkモデルで実装できると思うのですが、処理が予めReader, Processor, Writerに分かれているので、設計時点でそれを考慮した上で実装することが必要になります。
上記で実装したものを以下リポジトリにて公開しています。また、Gradle, java, docker コマンドを利用した Batch の実行方法も参考程度に載せているので、そちらも確認いただければと思います。
https://git.pu10g.com/ykato/spring-batch-chunk-example
次回のSpring Batchの記事ではTaskletモデルを用いたバッチ実装を紹介しようと思います。
それでわ。