Learn to use Spring batch ItemProcessor to add business logic after reading the input and before passing it to the writer for writing to the file/database. It should be noted that while it’s possible to return a different datatype than the one provided as input, it’s not necessary.
Returning null from ItemProcessor indicates that the item should not be continued to be processed, further in the batch processing.
1. Writing an ItemProcessor
A custom ItemProcessor is created by implementing the interface ItemProcessor, and implementing its process() method.
For example, the given ItemProcessor implementation processes the Employee records. During processing, it does the following tasks:
- Validate if
'id'
field is set. - Validate if
'id'
field is parsable to an integer. - Validate if
'id'
field is a positive integer greater than zero. - If validation fails then return
null
, which indicates that don’t process the record. - If validation succeeds then return the
Employee
object, as it is.
import org.springframework.batch.item.ItemProcessor;
import com.howtodoinjava.demo.model.Employee;
public class ValidationProcessor implements ItemProcessor<Employee,Employee> {
public Employee process(Employee employee) throws Exception {
if (employee.getId() == null){
System.out.println("Missing employee id : " + employee.getId());
return null;
}
try {
if(Integer.valueOf(employee.getId()) <= 0) {
System.out.println("Invalid employee id : " + employee.getId());
return null;
}
} catch (NumberFormatException e) {
System.out.println("Invalid employee id : " + employee.getId());
return null;
}
return employee;
}
}
2. How to use ItemProcessor?
We shall use SimpleStepBuilder.processor() to set processor instance during setting the tasklets in Step. In the following example, we are registering the processor in the step1().
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import com.howtodoinjava.demo.model.Employee;
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Value("/input/inputData.csv")
private Resource inputResource;
@Bean
public Job readCSVFilesJob() {
return jobBuilderFactory
.get("readCSVFilesJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory
.get("step1")
.<Employee, Employee>chunk(1)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
public ItemProcessor<Employee, Employee> processor() {
return new ValidationProcessor();
}
@Bean
public FlatFileItemReader<Employee> reader() {
FlatFileItemReader<Employee> itemReader = new FlatFileItemReader<Employee>();
itemReader.setLineMapper(lineMapper());
itemReader.setLinesToSkip(1);
itemReader.setResource(inputResource);
return itemReader;
}
@Bean
public LineMapper<Employee> lineMapper() {
DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setNames(new String[] { "id", "firstName", "lastName" });
lineTokenizer.setIncludedFields(new int[] { 0, 1, 2 });
BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<Employee>();
fieldSetMapper.setTargetType(Employee.class);
lineMapper.setLineTokenizer(lineTokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
return lineMapper;
}
@Bean
public ConsoleItemWriter<Employee> writer() {
return new ConsoleItemWriter<Employee>();
}
}
3. Demo
In the demo, we are batch-processing a CSV file with the above content. Notice line number 5 which has an invalid ‘id‘ field. As a result, the ItemProcessor should detect this error, and return null, and finally this record should not be processed further by the ItemWriter.
id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
abc,David,Miller
4,David,Walsh
Start the job and watch the console.
2018-07-11 14:59:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] launched with the following parameters: [{JobID=1531301340005}]
2018-07-11 14:59:00 INFO - Executing step: [step1]
Employee [id=1, firstName=Lokesh, lastName=Gupta]
Employee [id=2, firstName=Amit, lastName=Mishra]
Employee [id=3, firstName=Pankaj, lastName=Kumar]
Invalid employee id : abc
Employee [id=4, firstName=David, lastName=Walsh]
2018-07-11 14:59:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] completed with the following parameters: [{JobID=1531301340005}] and the following status: [COMPLETED]
Drop me your questions in the comments section.
Happy Learning !!
My ItemReader returns a list of objects. The processor takes the list and converts each instance of the object to String and returns a list of String values.
This is a pretty generic implementation in the sense that all the Objects extend out of a Base DTO and all of them have overridden toString method to ensure that the String representation is in the correct format.
My question is that if I want to send some additional parameters from the Reader to the Processor, how can I do the same? For example a true or false value, depending on the Reader Implementation.
Can I access the Job Parameters in the Processor?If yes, then I can set the boolean value as one of the Job Parameters and access from the Processor.
Please add this to pom.xml section to resolve ConsoleItemWriter problem
com.javaetmoi.core
spring-batch-toolkit
4.0.0
How to write a list of data into oracle database using jdbcitem writer
I am able to write the invalid records to a file and I want to add the statement for each record with which they failed. How can I achieve that ?
In a custom item processor, can we implement too the JobExecutionListener and StepExecutionListener?
This is a good or bad practice?
Can I Validate The Column Names But Not The Data In The CSV File ?
thank you sir….!
ConsoleItemWriter implementaion is missing. Also, request you to provide run method to launch this code.
Thanks Lokesh. This helped alot in understanding spring batch..
How can we write functional/integration tests for this batch job?