はじめに
最近、Spring Batchを勉強し始めたぞ!
概念的な部分は理解できたけど、いざコード書くとなると
ハードル高いよね。。
そうだね〜。
今回はサンプルコードを書いて、Spring Batchでのバッチ処理
について理解を深めていくよ!
本記事では、Spring Batchを使用したバッチ処理のサンプルコードを掲載し、Spring Batch特有のクラスの使い方等を解説しています。
Spring Batchの基礎概念を理解したい方は以下の記事も併せて参照ください。
作成するバッチ処理について
今回は下記のcsvファイルのデータをsample_userテーブルにインポートする処理を記載していきます。単純な処理ですが、このコードを一通り理解することで、Spring Batchの基本的な記述方法は学ぶことができると思います。
テーブルのDDLについては以降で掲載しています。
ID,名前,年齢
1,ユーザー1,30
2,ユーザー2,30
3,ユーザー3,30
4,ユーザー4,30
5,ユーザー5,30
6,ユーザー6,30
7,ユーザー7,30
8,ユーザー8,30
id | name | age |
1 | ユーザー1 | 30 |
2 | ユーザー2 | 30 |
3 | ユーザー3 | 30 |
4 | ユーザー4 | 30 |
5 | ユーザー5 | 30 |
6 | ユーザー6 | 30 |
作成手順概要
大まかな作成手順は以下の通りです。以降、下記の項目に従って記載していきます。
- 環境の確認、設定ファイルの作成
- テーブルの作成、csvファイルの配置
- Modelの作成
- Processorの作成
- Listenerの作成
- Reader、Writerの作成(Configクラス)
- 動作確認
環境の確認、設定ファイルの作成
今回作成するプロジェクトのクラス構成は以下の通りです。
以降の解説ではこのクラス構成を前提に解説しますので、必要に応じて参照してください。
私のpom.xml(依存関係)は以下の通りです。
・Spring Boot DevTools
・Lombok
・Spring Batch
・Validation
・postgresql
・JDBC API
・Spring Data JDBC
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>SpringBatchSample</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>SpringBatchSample</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
また、application.ymlは以下の通りです。
今回、データベースはpostgresqlを使用しています。
spring:
batch:
job:
names: CsvImportJdbcJob
jdbc:
initialize-schema: always
schema: classpath:org/springframework/batch/core/schema-postgresql.sql
datasource:
driver-class-name: org.postgresql.Driver
url: jdbc:postgresql://localhost:5432/sample_database
username: postgres
password: password
logging:
level:
'[com.example.SpringBatchSample]': debug
csvのパスを記述するsample.propertiesは以下の通りです。
アプリケーション起動時に下記のパスのcsvファイルを読み込むようにします。
csvファイルは上述したuser.csvです。
csv.path=sql/user.csv
csvのパスを保持するJavaクラスを作成します。(sampleProperty.java)
赤文字部でプロパティファイル、csvファイルのパスを示しています。
package com.example.SpringBatchSample.config;
import lombok.Getter;
import lombok.ToString;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
@Component
@PropertySource("classpath:property/sample.properties")
@Getter
@ToString
public class SampleProperty {
@Value("${csv.path}")
private String csvPath;
}
テーブルの作成
テーブル作成のDDLは以下の通りです。必要に応じてご使用ください。
create table if not exists sample_user(
id INT primary key,
name VARCHAR(50),
age INT
);
Modelの作成
sample_userテーブルに対応するモデルを作成します。
本クラス中のメソッドであるjudgeAge()はcsvのage(年齢)が20歳未満の場合、登録できないようにするためのバリデーションメソッドです。Processorで実際にチェックを行い、20歳以上のデータしか登録できないようにします。
package com.example.SpringBatchSample.model;
import lombok.Data;
@Data
public class User {
private Integer id;
private String name;
private Integer age;
public void judgeAge(){
if(20 > age){
String errorMessage = String.format(
"20才未満は登録できません::id={},name={},age={}"
,getId(%d),getName(%s),getAge(%d));
throw new RuntimeException(errorMessage);
}
}
}
Processorの作成
Processorを作成します。Processorは一般的にデータの加工の役割を担わせます。
本記事の内部の処理では①ユーザーの年齢チェック②DBの存在チェックを行なっています。
②(青文字)は後述するRepositoryクラスを作成後に記述していください!
Processorを作成する際にはItemProcessorインターフェースを実装します。ジェネリクスは1項目がProcessorに入っていく引数、2項目がWriterに渡す引数です。今回はどちらもUserとなります。
package com.example.SpringBatchSample.processor;
import com.example.SpringBatchSample.model.User;
import com.example.SpringBatchSample.repository.UserJdbcRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component("ValidProcessor")
@StepScope
@Slf4j
public class ValidProcessor implements ItemProcessor<User,User> {
@Autowired
UserJdbcRepository repository;
@Override
public User process(User user) throws Exception {
if(repository.exists(user.getId())){
log.warn("既に存在しているIDのため登録できません::id={}",user.getId());
return null;
}
try {
user.judgeAge();
}catch (RuntimeException e){
log.warn(e.getMessage(),e);
return null;
}
return user;
}
}
Listenerの作成
次にListenerを作成していきます。LIstenerは何かをトリガーに起動する処理のことです。
一般的にログ出力の役割を担います。本コードでは、Reader、Processor、Writerのそれぞれの処理前後のログ出力を担います。
まずはReaderのListenerです。 ItemReadListenerを実装します。
今回は読み込んだユーザーの情報のログを出力しています。
package com.example.SpringBatchSample.listener;
import com.example.SpringBatchSample.model.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ReadListener implements ItemReadListener<User> {
@Override
public void beforeRead() {
// Do nothing
}
@Override
public void afterRead(User user) {
log.debug("AfterRead:{}",user);
}
@Override
public void onReadError(Exception e) {
log.error("ReadError::errorMessage={}",e.getMessage(),e);
}
}
次にProcessorのListenerです。ItemProcessListenerを実装します。
package com.example.SpringBatchSample.listener;
import com.example.SpringBatchSample.model.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ProcessListener implements ItemProcessListener<User,User> {
@Override
public void beforeProcess(User user) {
// Do nothing
}
@Override
public void afterProcess(User user, User user2) {
// Do nothing
}
@Override
public void onProcessError(User user, Exception e) {
log.error("ProcessError::user{},errorMessage={}",user,e.getMessage(),e);
}
}
最後にWriterのListenerです。ItemWriteListenerを実装します。
DBに書き込む数をログ出力しています。
package com.example.SpringBatchSample.listener;
import com.example.SpringBatchSample.model.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class WriteListener implements ItemWriteListener<User> {
@Override
public void beforeWrite(List<? extends User> list) {
// Do nothing
}
@Override
public void afterWrite(List<? extends User> list) {
log.debug("AfterWrite::count={}",list.size());
}
@Override
public void onWriteError(Exception e, List<? extends User> list) {
log.error("WriteError::errorMessage={}",e.getMessage(),e);
}
}
Repositoryの作成
ここでは、前述したバリデーションチェック(DBの存在チェック)のための処理を記述します。
csvのidがDBに存在する場合はtrue、存在しない場合はfalseを返します。
package com.example.SpringBatchSample.repository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
@Repository
public class UserJdbcRepository {
@Autowired
private JdbcTemplate template;
private static final String EXISTS_CHECK_SQL = "" +
"SELECT exists (SELECT * FROM sample_user where id =?)";
public boolean exists(Integer id){
return template.queryForObject(EXISTS_CHECK_SQL,Boolean.class,id);
}
}
Reader、Writerの作成(Configクラス)
いよいよバッチ設定クラスを作成します。今回は①BatchConfigクラス②JdbcImportBatchConfigクラスの2つのクラスに分けます。①にはどのJobでも共通で使用するReader、Processor、Listenerの処理を記述します。②は①を継承したクラスでWriterを作成します。
まずは①BatchConfigです。バッチ設定用クラスには、@EnableBatchProcessingを付与します。
また、このクラスでのメイン処理となるReaderを実装する際にはFlatFileItemReaderを使います。
メソッドチェーンで読み込むcsvのパスや文字コード、ヘッダのスキップ指定等を行なっています。
fieldSetMapperでは、csvファイルのデータとUserクラスのマッピングを行なっています。csvのカラムにつけた名前とJavaクラスのフィールド名が一致していると値がマッピングされます。
package com.example.SpringBatchSample.config;
import com.example.SpringBatchSample.model.User;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import java.nio.charset.StandardCharsets;
@EnableBatchProcessing
public class BatchConfig {
@Autowired
protected JobBuilderFactory jobBuilderFactory;
@Autowired
protected StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("ValidProcessor")
protected ItemProcessor<User,User> judgeAgeProcessor;
@Autowired
protected ItemReadListener<User> readListener;
@Autowired
protected ItemProcessListener<User,User> processListener;
@Autowired
protected ItemWriteListener<User> writeListener;
@Autowired
protected SampleProperty sampleProperty;
@Bean
@StepScope
public FlatFileItemReader<User> csvReader(){
String [] columnNameArray = new String[] {"id","name","age"};
return new FlatFileItemReaderBuilder<User>()
.name("userCsvReader")
.resource(new ClassPathResource(sampleProperty.getCsvPath()))
.linesToSkip(1)
.encoding(StandardCharsets.UTF_8.name())
.delimited()
.names(columnNameArray)
.fieldSetMapper(new BeanWrapperFieldSetMapper<User>(){
{
setTargetType(User.class);
}
}).build();
}
}
次に②JdbcImportBatchConfigを作成していきます。
JdbcBatchItemWriterでDBへの書き込みができます。
csvImportJdbcStep()ではStepを生成しています。Chunk(5)はコミット間隔です。ReaderとProcessorが5回実行されてWriterが1回実行されます。引数の数字を変えて実行するとログの表示が変わり、理解が深まると思います。
rederやlistner、writerには作成したクラスを指定します。
csvImportJdbcJob()ではJobを生成します。Beanに渡す名前はapplication.ymlのジョブ名と一致させてください。start()には作成したstep()を入れます。
package com.example.SpringBatchSample.config;
import com.example.SpringBatchSample.model.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
public class JdbcImportBatchConfig extends BatchConfig {
@Autowired
private DataSource dataSource;
private static final String INSERT_SQL =
"INSERT INTO sample_user(id,name,age) VALUES(:id,:name,:age)";
@Bean
@StepScope
public JdbcBatchItemWriter<User> jdbcWriter(){
BeanPropertyItemSqlParameterSourceProvider<User> provider =
new BeanPropertyItemSqlParameterSourceProvider<>();
return new JdbcBatchItemWriterBuilder<User> ().itemSqlParameterSourceProvider(provider)
.sql(INSERT_SQL).dataSource(this.dataSource).build();
}
@Bean
public Step csvImportJdbcStep(){
return this.stepBuilderFactory.get("CsvImportJdbcStep")
.<User,User>chunk(5)
.reader(csvReader()).listener(this.readListener)
.processor(judgeAgeProcessor).listener(this.processListener)
.writer(jdbcWriter()).listener(this.writeListener).build();
}
@Bean("CsvImportJdbcJob")
public Job csvImportJdbcJob(){
return this.jobBuilderFactory.get("CsvImportJdbcJob")
.incrementer(new RunIdIncrementer())
.start(csvImportJdbcStep()).build();
}
}
動作確認
アプリケーションを起動すると以下のログが出力されます。
5件(コミット間隔)ずつcsvの情報を読み込んで、まとめて書き込み処理をしていることがわかります。
DBにも登録されていますね。
もう一度、アプリケーションを起動すると、バリデーションチェックで弾かれます。
終わりに
本記事はここまでとなります。ご覧いただきありがとうございました。
ご指摘等がございましたら頂けますと嬉しいです。
引き続き、プログラミングについて定期的に発信していきますのでよろしくお願いします!
また、もしよろしければtwitterもフォローしていただけると嬉しいです!🐢
コメント