Spring Batch CSV to Database Example

Learn to use Spring Batch to read records from CSV files and insert them into the database using JdbcBatchItemWriter in a Spring Boot application. We are using the embedded database H2, and you can replace it with any other database of your choice.

1. Learning Objectives

In this demo application, we will perform the following tasks:

  • Configure H2 database, initialize the batch schema and create a PERSON table in the database.
  • Read records from a CSV file with FlatFileItemReader. Consider using MultiResourceItemReader if there are multiple CSV files to process them in parallel.
  • Write person records to the PERSON table with JdbcBatchItemWriter.
  • Log items inserted into the database using ItemProcessor.
  • Verify inserted records using the H2 console.

2. CSV File and Model

For demo purposes, let us create a batch Job that reads the person’s information from a CSV file and saves it into a database. The CSV file looks like this:

Lokesh,Gupta,41,true
Brian,Schultz,42,false
John,Cena,43,true
Albert,Pinto,44,false

To read a person’s record in Java and process it, we need to create a simple POJO.

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Person {

    String firstName;
    String lastName;
    Integer age;
    Boolean active;
}

3. Maven

We need to include the following dependencies for the Spring batch to work in a Spring Boot application. Also, include the H2 database for creating the batch schema, and saving the Job execution data and employee records.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>

4. Schema Initialization

Spring batch distribution comes with schema files for all popular databases. We only need to refer it when initializing the DataSource using the DataSourceInitializer bean.

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.datasource.init.DataSourceInitializer;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;

import javax.sql.DataSource;

@Configuration
@EnableBatchProcessing
public class BatchConfig {

  DataSource dataSource;

  public BatchConfig(DataSource dataSource) {
    this.dataSource = dataSource;
  }

  @Bean
  public DataSourceTransactionManager transactionManager() {

    return new DataSourceTransactionManager(dataSource);
  }

  @Bean
  public DataSourceInitializer databasePopulator() {

    ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
    populator.addScript(new ClassPathResource("org/springframework/batch/core/schema-h2.sql"));
    populator.addScript(new ClassPathResource("sql/batch-schema.sql"));
    populator.setContinueOnError(false);
    populator.setIgnoreFailedDrops(false);
    DataSourceInitializer dataSourceInitializer = new DataSourceInitializer();
    dataSourceInitializer.setDataSource(dataSource);
    dataSourceInitializer.setDatabasePopulator(populator);
    return dataSourceInitializer;
  }
}

In the above configuration, we have created our custom table PERSON using the batch-schema.sql that will be executed when the application starts.

DROP TABLE IF EXISTS person;

CREATE TABLE person  (
    person_id BIGINT AUTO_INCREMENT NOT NULL PRIMARY KEY,
    first_name VARCHAR(50),
    last_name VARCHAR(50),
    age INTEGER,
    is_active BOOLEAN
);

Finally, add the DataSource connection details in the application.properties file to create the DataSource bean. We are connecting to the in-memory H2 database.

spring.datasource.url=jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1
spring.datasource.username=sa
spring.datasource.password=

5. Enabling Batch Processing

Use @EnableBatchProcessing annotation on a @Configuration class to enable the batch processing and execute its autoconfiguration with Spring Boot. This will bootstrap Spring Batch with some sensible defaults.

The default configuration will configure a JobRepositoryJobRegistry, and JobLauncher beans.

@Configuration
@EnableBatchProcessing
public class BatchConfig {
    //...
}

6. CSV Reader and Database Writer Configuration

The following BatchConfig class contains all the necessary bean definitions to make the program work. Let’s understand each one:

  • The input CSV file is specified using the @Value annotation on the Resource type.
  • The Job and Step beans have been defined. The job consists of a single step (step1). The step includes a chunk-oriented processing mechanism with a specified reader, writer, and transaction manager.
  • For reading the flat file, we are using FlatFileItemReader with the standard configuration involving DefaultLineMapper,  DelimitedLineTokenizer, and BeanWrapperFieldSetMapper classes.
  • For writing the records in the database, we are using the JdbcBatchItemWriter which is the standard writer for executing batch queries in a database for Spring batch jobs.
import com.howtodoinjava.demo.batch.jobs.csvToDb.model.Person;
import com.howtodoinjava.demo.batch.jobs.csvToDb.processor.PersonItemProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.LineTokenizer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
public class CsvToDatabaseJob {

  public static final Logger logger = LoggerFactory.getLogger(CsvToDatabaseJob.class);

  private static final String INSERT_QUERY = """
      insert into person (first_name, last_name, age, is_active)
      values (:firstName,:lastName,:age,:active)""";

  private final JobRepository jobRepository;

  public CsvToDatabaseJob(JobRepository jobRepository) {
    this.jobRepository = jobRepository;
  }

  @Value("classpath:csv/inputData.csv")
  private Resource inputFeed;

  @Bean(name = "insertIntoDbFromCsvJob")
  public Job insertIntoDbFromCsvJob(Step step1, Step step2) {

    var name = "Persons Import Job";
    var builder = new JobBuilder(name, jobRepository);

    return builder.start(step1)
        .build();
  }

  @Bean
  public Step step1(ItemReader<Person> reader,
                    ItemWriter<Person> writer,
                    ItemProcessor<Person, Person> processor,
                    PlatformTransactionManager txManager) {

    var name = "INSERT CSV RECORDS To DB Step";
    var builder = new StepBuilder(name, jobRepository);
    return builder
        .<Person, Person>chunk(5, txManager)
        .reader(reader)
        .processor(processor)
        .writer(writer)
        .build();
  }

  @Bean
  public FlatFileItemReader<Person> reader(
      LineMapper<Person> lineMapper) {
    var itemReader = new FlatFileItemReader<Person>();
    itemReader.setLineMapper(lineMapper);
    itemReader.setResource(inputFeed);
    return itemReader;
  }

  @Bean
  public DefaultLineMapper<Person> lineMapper(LineTokenizer tokenizer,
                                              FieldSetMapper<Person> fieldSetMapper) {
    var lineMapper = new DefaultLineMapper<Person>();
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(fieldSetMapper);
    return lineMapper;
  }

  @Bean
  public BeanWrapperFieldSetMapper<Person> fieldSetMapper() {
    var fieldSetMapper = new BeanWrapperFieldSetMapper<Person>();
    fieldSetMapper.setTargetType(Person.class);
    return fieldSetMapper;
  }

  @Bean
  public DelimitedLineTokenizer tokenizer() {
    var tokenizer = new DelimitedLineTokenizer();
    tokenizer.setDelimiter(",");
    tokenizer.setNames("firstName", "lastName", "age", "active");
    return tokenizer;
  }

  @Bean
  public JdbcBatchItemWriter<Person> jdbcItemWriter(DataSource dataSource) {
    var provider = new BeanPropertyItemSqlParameterSourceProvider<Person>();
    var itemWriter = new JdbcBatchItemWriter<Person>();
    itemWriter.setDataSource(dataSource);
    itemWriter.setSql(INSERT_QUERY);
    itemWriter.setItemSqlParameterSourceProvider(provider);
    return itemWriter;
  }

  @Bean
  public PersonItemProcessor personItemProcessor() {
    return new PersonItemProcessor();
  }
}

Also, we are creating PersonItemProcessor which will log the employee records before writing to the database. It’s optional and we are using it for tracking purposes.

import com.howtodoinjava.demo.batch.jobs.csvToDb.model.Person;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ItemProcessListener;

public class PersonItemProcessListener implements ItemProcessListener<Person, Person> {

  public static final Logger logger = LoggerFactory.getLogger(PersonItemProcessListener.class);

  @Override
  public void beforeProcess(Person input) {
    logger.info("Person record has been read: " + input);
  }

  @Override
  public void afterProcess(Person input, Person result) {
    logger.info("Person record has been processed to : " + result);
  }

  @Override
  public void onProcessError(Person input, Exception e) {
    logger.error("Error in reading the person record : " + input);
    logger.error("Error in reading the person record : " + e);
  }
}

7. Demo

Now our demo job is configured and ready to be executed. Start the Spring batch application and launch the batch Job using JobLauncher.

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class BatchProcessingApplication implements CommandLineRunner {

  private final JobLauncher jobLauncher;
  private final ApplicationContext applicationContext;

  public BatchProcessingApplication(JobLauncher jobLauncher, ApplicationContext applicationContext) {
    this.jobLauncher = jobLauncher;
    this.applicationContext = applicationContext;
  }

  public static void main(String[] args) {
    SpringApplication.run(BatchProcessingApplication.class, args);
  }

  @Override
  public void run(String... args) throws Exception {

    Job job = (Job) applicationContext.getBean("insertIntoDbFromCsvJob");

    JobParameters jobParameters = new JobParametersBuilder()
        .addString("JobID", String.valueOf(System.currentTimeMillis()))
        .toJobParameters();

    var jobExecution = jobLauncher.run(job, jobParameters);

    var batchStatus = jobExecution.getStatus();
    while (batchStatus.isRunning()) {
      System.out.println("Still running...");
      Thread.sleep(5000L);
    }
  }
}

Notice the console logs.

2023-11-29T16:12:12.520+05:30  INFO 17712 --- [main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=Persons Import Job]] launched with the following parameters: [{'JobID':'{value=1701254532458, type=class java.lang.String, identifying=true}'}]
2023-11-29T16:12:12.573+05:30  INFO 17712 --- [main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [INSERT CSV RECORDS To DB Step]
2023-11-29T16:12:12.646+05:30  INFO 17712 --- [main] c.h.d.b.j.c.p.PersonItemProcessor        : Processed record: Person(firstName=Lokesh, lastName=Gupta, age=41, active=true)
2023-11-29T16:12:12.646+05:30  INFO 17712 --- [main] c.h.d.b.j.c.p.PersonItemProcessor        : Processed record: Person(firstName=Brian, lastName=Schultz, age=42, active=false)
2023-11-29T16:12:12.647+05:30  INFO 17712 --- [main] c.h.d.b.j.c.p.PersonItemProcessor        : Processed record: Person(firstName=John, lastName=Cena, age=43, active=true)
2023-11-29T16:12:12.647+05:30  INFO 17712 --- [main] c.h.d.b.j.c.p.PersonItemProcessor        : Processed record: Person(firstName=Albert, lastName=Pinto, age=44, active=false)
2023-11-29T16:12:12.661+05:30  INFO 17712 --- [main] o.s.batch.core.step.AbstractStep         : Step: [INSERT CSV RECORDS To DB Step] executed in 87ms
2023-11-29T16:12:12.669+05:30  INFO 17712 --- [main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=Persons Import Job]] completed with the following parameters: [{'JobID':'{value=1701254532458, type=class java.lang.String, identifying=true}'}] and the following status: [COMPLETED] in 122ms

Let us also check the H2 console. Notice that the database has the schema created for batch jobs. Also, we have the data written to PERSON table.

Drop me your questions in the comments section.

Happy Learning !!

Source Code on Github

Comments

Subscribe
Notify of
guest
2 Comments
Most Voted
Newest Oldest
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.

Our Blogs

REST API Tutorial

Dark Mode

Dark Mode