Spring Batch provides a FlatFileItemReader that we can use to read data from flat files, including CSV files. Here’s an example of how to configure and use FlatFileItemReader to read data from a CSV file in a Spring Batch job.
1. CSV File and Model
For demo purposes, we will be using the following CSF files:
Lokesh,Gupta,41,true
Brian,Schultz,42,false
John,Cena,43,true
Albert,Pinto,44,false
Then we need to create a domain object to represent the data.
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Person {
String firstName;
String lastName;
Integer age;
Boolean active;
}
2. Read CSV with FlatFileItemReader
In the following configuration, the FlatFileItemReader is configured to read a CSV file. The DelimitedLineTokenizer is used to specify the column names, and the BeanWrapperFieldSetMapper is used to map each line to a Person object.
We’ll need to customize the ItemProcessor and ItemWriter beans according to the business logic and data destination. This configuration writes data to the database.
Finally, create a Job that includes the Steps.
import com.howtodoinjava.demo.batch.jobs.csvToDb.model.Person;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.LineTokenizer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Configuration
public class CsvToDatabaseJob {
public static final Logger logger = LoggerFactory.getLogger(CsvToDatabaseJob.class);
private static final String INSERT_QUERY = """
insert into person (first_name, last_name, age, is_active)
values (:firstName,:lastName,:age,:active)""";
private final JobRepository jobRepository;
public CsvToDatabaseJob(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}
@Value("classpath:csv/inputData.csv")
private Resource inputFeed;
@Bean(name="insertIntoDbFromCsvJob")
public Job insertIntoDbFromCsvJob(Step step1, Step step2) {
var name = "Persons Import Job";
var builder = new JobBuilder(name, jobRepository);
return builder.start(step1).build();
}
@Bean
public Step step1(ItemReader<Person> reader,
ItemWriter<Person> writer,
ItemProcessor<Person, Person> processor,
PlatformTransactionManager txManager) {
var name = "INSERT CSV RECORDS To DB Step";
var builder = new StepBuilder(name, jobRepository);
return builder
.reader(reader)
.writer(writer)
.build();
}
@Bean
public FlatFileItemReader<Person> reader(
LineMapper<Person> lineMapper) {
var itemReader = new FlatFileItemReader<Person>();
itemReader.setLineMapper(lineMapper);
itemReader.setResource(inputFeed);
return itemReader;
}
@Bean
public DefaultLineMapper<Person> lineMapper(LineTokenizer tokenizer,
FieldSetMapper<Person> fieldSetMapper) {
var lineMapper = new DefaultLineMapper<Person>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
return lineMapper;
}
@Bean
public BeanWrapperFieldSetMapper<Person> fieldSetMapper() {
var fieldSetMapper = new BeanWrapperFieldSetMapper<Person>();
fieldSetMapper.setTargetType(Person.class);
return fieldSetMapper;
}
@Bean
public DelimitedLineTokenizer tokenizer() {
var tokenizer = new DelimitedLineTokenizer();
tokenizer.setDelimiter(",");
tokenizer.setNames("firstName", "lastName", "age", "active");
return tokenizer;
}
@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
var provider = new BeanPropertyItemSqlParameterSourceProvider<Person>();
var itemWriter = new JdbcBatchItemWriter<Person>();
itemWriter.setDataSource(dataSource);
itemWriter.setSql(INSERT_QUERY);
itemWriter.setItemSqlParameterSourceProvider(provider);
return itemWriter;
}
}
If the above configuration seems like a lot then you can merge the DefaultLineMapper, DelimitedLineTokenizer and BeanWrapperFieldSetMapper in the FlatFileItemReader bean itself.
@Bean
public FlatFileItemReader<Person> reader() {
FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
reader.setResource(inputFile);
reader.setLineMapper(new DefaultLineMapper<Person>() {{
setLineTokenizer(new DelimitedLineTokenizer() {{
setNames("firstName", "lastName", "age", "active");
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}});
}});
return reader;
}
3. Demo
3.1. Maven
Make sure you have the following dependencies in the project:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
3.2. Run the Application
Now run the application, and watch out for the console logs.
import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; @SpringBootApplication public class BatchProcessingApplication implements CommandLineRunner { private final JobLauncher jobLauncher; private final ApplicationContext applicationContext; public BatchProcessingApplication(JobLauncher jobLauncher, ApplicationContext applicationContext) { this.jobLauncher = jobLauncher; this.applicationContext = applicationContext; } public static void main(String[] args) { SpringApplication.run(BatchProcessingApplication.class, args); } @Override public void run(String... args) throws Exception { Job job = (Job) applicationContext.getBean("insertIntoDbFromCsvJob"); JobParameters jobParameters = new JobParametersBuilder() .addString("JobID", String.valueOf(System.currentTimeMillis())) .toJobParameters(); var jobExecution = jobLauncher.run(job, jobParameters); var batchStatus = jobExecution.getStatus(); while (batchStatus.isRunning()) { System.out.println("Still running..."); Thread.sleep(5000L); } } }
The program output:
2023-11-29T14:32:54.612+05:30 INFO 24044 --- [main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=Persons Import Job]] launched with the following parameters: [{'JobID':'{value=1701248574579, type=class java.lang.String, identifying=true}'}] 2023-11-29T14:32:54.631+05:30 INFO 24044 --- [main] o.s.batch.core.job.SimpleStepHandler : Executing step: [INSERT CSV RECORDS To DB Step] 2023-11-29T14:32:54.647+05:30 INFO 24044 --- [main] c.h.d.b.j.c.l.PersonItemReadListener : Reading a new Person Record 2023-11-29T14:32:54.662+05:30 INFO 24044 --- [main] c.h.d.b.j.c.l.PersonItemReadListener : New Person record read : Person(firstName=Lokesh, lastName=Gupta, age=41, active=true) 2023-11-29T14:32:54.664+05:30 INFO 24044 --- [main] c.h.d.b.j.c.l.PersonItemReadListener : Reading a new Person Record 2023-11-29T14:32:54.665+05:30 INFO 24044 --- [main] c.h.d.b.j.c.l.PersonItemReadListener : New Person record read : Person(firstName=Brian, lastName=Schultz, age=42, active=false) 2023-11-29T14:32:54.665+05:30 INFO 24044 --- [main] c.h.d.b.j.c.l.PersonItemReadListener : Reading a new Person Record 2023-11-29T14:32:54.665+05:30 INFO 24044 --- [main] c.h.d.b.j.c.l.PersonItemReadListener : New Person record read : Person(firstName=John, lastName=Cena, age=43, active=true) 2023-11-29T14:32:54.666+05:30 INFO 24044 --- [main] c.h.d.b.j.c.l.PersonItemReadListener : Reading a new Person Record 2023-11-29T14:32:54.666+05:30 INFO 24044 --- [main] c.h.d.b.j.c.l.PersonItemReadListener : New Person record read : Person(firstName=Albert, lastName=Pinto, age=44, active=false) 2023-11-29T14:32:54.666+05:30 INFO 24044 --- [main] c.h.d.b.j.c.l.PersonItemReadListener : Reading a new Person Record 2023-11-29T14:32:54.676+05:30 INFO 24044 --- [main] o.s.batch.core.step.AbstractStep : Step: [INSERT CSV RECORDS To DB Step] executed in 44ms 2023-11-29T14:32:54.679+05:30 INFO 24044 --- [main] .j.c.l.JobCompletionNotificationListener : JOB FINISHED !!
Drop me your questions in the comments section.
Happy Learning !!
Comments