【Spring Batch】Chunkモデルを使用したバッチ処理のサンプルコード

【Spring Batch】 Chunkモデルを使用した バッチ処理のサンプルコード バックエンド
スポンサーリンク

はじめに

ぴんくうさぎ
ぴんくうさぎ

最近、Spring Batchを勉強し始めたぞ!
概念的な部分は理解できたけど、いざコード書くとなると

ハードル高いよね。。

みどりがめ
みどりがめ

そうだね〜。

今回はサンプルコードを書いて、Spring Batchでのバッチ処理

について理解を深めていくよ!

本記事ではSpring Batchを使用したバッチ処理のサンプルコードを掲載し、Spring Batch特有のクラスの使い方等を解説しています。

この記事で分かること
  • Spring Batchを使用するための設定方法
  • Chunkモデルを使用したバッチ処理の記述方法


Spring Batchの基礎概念を理解したい方は以下の記事も併せて参照ください。

作成するバッチ処理について

今回は下記のcsvファイルのデータsample_userテーブルにインポートする処理を記載していきます。単純な処理ですが、このコードを一通り理解することで、Spring Batchの基本的な記述方法は学ぶことができると思います。

テーブルのDDLについては以降で掲載しています。

ID,名前,年齢
1,ユーザー1,30
2,ユーザー2,30
3,ユーザー3,30
4,ユーザー4,30
5,ユーザー5,30
6,ユーザー6,30
7,ユーザー7,30
8,ユーザー8,30
idnameage
1ユーザー130
2ユーザー230
3ユーザー330
4ユーザー430
5ユーザー530
6ユーザー630
sample_userテーブル

作成手順概要

大まかな作成手順は以下の通りです。以降、下記の項目に従って記載していきます。

  • 環境の確認、設定ファイルの作成
  • テーブルの作成、csvファイルの配置
  • Modelの作成
  • Processorの作成
  • Listenerの作成
  • Reader、Writerの作成(Configクラス)
  • 動作確認

環境の確認、設定ファイルの作成

今回作成するプロジェクトのクラス構成は以下の通りです。
以降の解説ではこのクラス構成を前提に解説しますので、必要に応じて参照してください。

フォルダ構成

私のpom.xml(依存関係)は以下の通りです。
・Spring Boot DevTools
・Lombok
・Spring Batch
・Validation
・postgresql
・JDBC API
・Spring Data JDBC

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.3</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.example</groupId>
	<artifactId>SpringBatchSample</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>SpringBatchSample</name>
	<description>Demo project for Spring Boot</description>
	<properties>
		<java.version>11</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-batch</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-jdbc</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-jdbc</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-jpa</artifactId>
		</dependency>
		<dependency>
			<groupId>org.mybatis.spring.boot</groupId>
			<artifactId>mybatis-spring-boot-starter</artifactId>
			<version>2.2.2</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-validation</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.postgresql</groupId>
			<artifactId>postgresql</artifactId>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
			</plugin>
		</plugins>
	</build>

</project>

また、application.ymlは以下の通りです。
今回、データベースはpostgresqlを使用しています。

spring:
  batch:
    job:
      names: CsvImportJdbcJob
    jdbc:
      initialize-schema: always
      schema: classpath:org/springframework/batch/core/schema-postgresql.sql
  datasource:
    driver-class-name: org.postgresql.Driver
    url: jdbc:postgresql://localhost:5432/sample_database
    username: postgres
    password: password

logging:
  level:
    '[com.example.SpringBatchSample]': debug

csvのパスを記述するsample.propertiesは以下の通りです。
アプリケーション起動時に下記のパスのcsvファイルを読み込むようにします。
csvファイルは上述したuser.csvです。

csv.path=sql/user.csv

csvのパスを保持するJavaクラスを作成します。(sampleProperty.java)
赤文字部でプロパティファイル、csvファイルのパスを示しています。

package com.example.SpringBatchSample.config;

import lombok.Getter;
import lombok.ToString;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;

@Component
@PropertySource("classpath:property/sample.properties")
@Getter
@ToString
public class SampleProperty {
    @Value("${csv.path}")
    private String csvPath;
}

テーブルの作成

テーブル作成のDDLは以下の通りです。必要に応じてご使用ください。

create table if not exists sample_user(
id INT primary key,
name VARCHAR(50),
age INT
);

Modelの作成

sample_userテーブルに対応するモデルを作成します。
本クラス中のメソッドであるjudgeAge()はcsvのage(年齢)が20歳未満の場合、登録できないようにするためのバリデーションメソッドです。Processorで実際にチェックを行い、20歳以上のデータしか登録できないようにします。

package com.example.SpringBatchSample.model;

import lombok.Data;

@Data
public class User {

    private Integer id;
    private String name;
    private Integer age;

    public void judgeAge(){
        if(20 > age){
            String errorMessage = String.format(
                    "20才未満は登録できません::id={},name={},age={}"
                    ,getId(%d),getName(%s),getAge(%d));
            throw new RuntimeException(errorMessage);
        }
    }
}

Processorの作成

Processorを作成します。Processorは一般的にデータの加工の役割を担わせます。

本記事の内部の処理では①ユーザーの年齢チェック②DBの存在チェックを行なっています。
②(青文字)は後述するRepositoryクラスを作成後に記述していください!

Processorを作成する際にはItemProcessorインターフェースを実装します。ジェネリクスは1項目がProcessorに入っていく引数、2項目がWriterに渡す引数です。今回はどちらもUserとなります。

package com.example.SpringBatchSample.processor;

import com.example.SpringBatchSample.model.User;
import com.example.SpringBatchSample.repository.UserJdbcRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component("ValidProcessor")
@StepScope
@Slf4j
public class ValidProcessor implements ItemProcessor<User,User> {

    @Autowired
    UserJdbcRepository repository;

    @Override
    public User process(User user) throws Exception {
        if(repository.exists(user.getId())){
            log.warn("既に存在しているIDのため登録できません::id={}",user.getId());
            return null;
        }
        try {
            user.judgeAge();
        }catch (RuntimeException e){
            log.warn(e.getMessage(),e);
            return null;
        }
        return user;
    }
}

Listenerの作成

次にListenerを作成していきます。LIstenerは何かをトリガーに起動する処理のことです。
一般的にログ出力の役割を担います。本コードでは、Reader、Processor、Writerのそれぞれの処理前後のログ出力を担います。

まずはReaderのListenerです。 ItemReadListenerを実装します。
今回は読み込んだユーザーの情報のログを出力しています。

package com.example.SpringBatchSample.listener;

import com.example.SpringBatchSample.model.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ReadListener implements ItemReadListener<User> {

    @Override
    public void beforeRead() {
        // Do nothing
    }

    @Override
    public void afterRead(User user) {
        log.debug("AfterRead:{}",user);
    }

    @Override
    public void onReadError(Exception e) {
        log.error("ReadError::errorMessage={}",e.getMessage(),e);
    }
}

次にProcessorのListenerです。ItemProcessListenerを実装します。

package com.example.SpringBatchSample.listener;

import com.example.SpringBatchSample.model.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ProcessListener implements ItemProcessListener<User,User> {

    @Override
    public void beforeProcess(User user) {
        // Do nothing
    }

    @Override
    public void afterProcess(User user, User user2) {
        // Do nothing
    }

    @Override
    public void onProcessError(User user, Exception e) {
        log.error("ProcessError::user{},errorMessage={}",user,e.getMessage(),e);
    }
}

最後にWriterのListenerです。ItemWriteListenerを実装します。
DBに書き込む数をログ出力しています。

package com.example.SpringBatchSample.listener;

import com.example.SpringBatchSample.model.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@Slf4j
public class WriteListener implements ItemWriteListener<User> {

    @Override
    public void beforeWrite(List<? extends User> list) {
        // Do nothing
    }

    @Override
    public void afterWrite(List<? extends User> list) {
        log.debug("AfterWrite::count={}",list.size());
    }

    @Override
    public void onWriteError(Exception e, List<? extends User> list) {
        log.error("WriteError::errorMessage={}",e.getMessage(),e);
    }
}

Repositoryの作成

ここでは、前述したバリデーションチェック(DBの存在チェック)のための処理を記述します。
csvのidがDBに存在する場合はtrue、存在しない場合はfalseを返します。

package com.example.SpringBatchSample.repository;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;

@Repository
public class UserJdbcRepository {

    @Autowired
    private JdbcTemplate template;

    private static final String EXISTS_CHECK_SQL = "" +
            "SELECT exists (SELECT * FROM sample_user where id =?)";

    public boolean exists(Integer id){
        return template.queryForObject(EXISTS_CHECK_SQL,Boolean.class,id);
    }
}

Reader、Writerの作成(Configクラス)

いよいよバッチ設定クラスを作成します。今回は①BatchConfigクラス②JdbcImportBatchConfigクラスの2つのクラスに分けます。①にはどのJobでも共通で使用するReader、Processor、Listenerの処理を記述します。②は①を継承したクラスでWriterを作成します。

まずは①BatchConfigです。バッチ設定用クラスには、@EnableBatchProcessingを付与します。
また、このクラスでのメイン処理となるReaderを実装する際にはFlatFileItemReaderを使います。
メソッドチェーンで読み込むcsvのパスや文字コード、ヘッダのスキップ指定等を行なっています。

fieldSetMapperでは、csvファイルのデータとUserクラスのマッピングを行なっています。csvのカラムにつけた名前とJavaクラスのフィールド名が一致していると値がマッピングされます。

package com.example.SpringBatchSample.config;

import com.example.SpringBatchSample.model.User;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;

import java.nio.charset.StandardCharsets;

@EnableBatchProcessing
public class BatchConfig {

    @Autowired
    protected JobBuilderFactory jobBuilderFactory;

    @Autowired
    protected StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("ValidProcessor")
    protected ItemProcessor<User,User> judgeAgeProcessor;

    @Autowired
    protected ItemReadListener<User> readListener;

    @Autowired
    protected ItemProcessListener<User,User> processListener;

    @Autowired
    protected ItemWriteListener<User> writeListener;

    @Autowired
    protected SampleProperty sampleProperty;

    @Bean
    @StepScope
    public FlatFileItemReader<User> csvReader(){
        String [] columnNameArray = new String[] {"id","name","age"};

        return new FlatFileItemReaderBuilder<User>()
                .name("userCsvReader")
                .resource(new ClassPathResource(sampleProperty.getCsvPath()))
                .linesToSkip(1)
                .encoding(StandardCharsets.UTF_8.name())
                .delimited()
                .names(columnNameArray)
                .fieldSetMapper(new BeanWrapperFieldSetMapper<User>(){
                    {
                        setTargetType(User.class);
                    }
                }).build();

    }
}

次に②JdbcImportBatchConfigを作成していきます。
JdbcBatchItemWriterでDBへの書き込みができます。

csvImportJdbcStep()ではStepを生成しています。Chunk(5)はコミット間隔です。ReaderとProcessorが5回実行されてWriterが1回実行されます。引数の数字を変えて実行するとログの表示が変わり、理解が深まると思います。
rederやlistner、writerには作成したクラスを指定します。

csvImportJdbcJob()ではJobを生成します。Beanに渡す名前はapplication.ymlのジョブ名と一致させてください。start()には作成したstep()を入れます。

package com.example.SpringBatchSample.config;

import com.example.SpringBatchSample.model.User;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

@Configuration
public class JdbcImportBatchConfig extends BatchConfig {

    @Autowired
    private DataSource dataSource;

    private static final String INSERT_SQL =
    "INSERT INTO sample_user(id,name,age) VALUES(:id,:name,:age)";

    @Bean
    @StepScope
    public JdbcBatchItemWriter<User> jdbcWriter(){
        BeanPropertyItemSqlParameterSourceProvider<User> provider =
                new BeanPropertyItemSqlParameterSourceProvider<>();
        return new JdbcBatchItemWriterBuilder<User>       ().itemSqlParameterSourceProvider(provider)
                .sql(INSERT_SQL).dataSource(this.dataSource).build();
    }

    @Bean
    public Step csvImportJdbcStep(){
        return this.stepBuilderFactory.get("CsvImportJdbcStep")
                .<User,User>chunk(5)
                .reader(csvReader()).listener(this.readListener)
                .processor(judgeAgeProcessor).listener(this.processListener)
                .writer(jdbcWriter()).listener(this.writeListener).build();

    }

    @Bean("CsvImportJdbcJob")
    public Job csvImportJdbcJob(){
        return this.jobBuilderFactory.get("CsvImportJdbcJob")
                .incrementer(new RunIdIncrementer())
                .start(csvImportJdbcStep()).build();
    }
}

動作確認

アプリケーションを起動すると以下のログが出力されます。
5件(コミット間隔)ずつcsvの情報を読み込んで、まとめて書き込み処理をしていることがわかります。

起動時ログ

DBにも登録されていますね。

DBのユーザー情報を確認

もう一度、アプリケーションを起動すると、バリデーションチェックで弾かれます。

起動時ログ(バリデーションチェック)

終わりに

本記事はここまでとなります。ご覧いただきありがとうございました。

ご指摘等がございましたら頂けますと嬉しいです。
引き続き、プログラミングについて定期的に発信していきますのでよろしくお願いします!
また、もしよろしければtwitterもフォローしていただけると嬉しいです!🐢

コメント