Learn to use Spring batch ItemProcessor to add business logic after reading the input and before passing it to the writer for writing to the file/database. It should be noted that while it’s possible to return a different datatype than the one provided as input, it’s not necessary.
Returning null from ItemProcessor indicates that the item should not be continued to be processed, further in the batch processing.
1. Writing an ItemProcessor
A custom ItemProcessor is created by implementing the interface ItemProcessor, and implementing its process() method.
For example, the given ItemProcessor implementation processes the Employee records. During processing, it does the following tasks:
- Validate if
'id'
field is set. - Validate if
'id'
field is parsable to an integer. - Validate if
'id'
field is a positive integer greater than zero. - If validation fails then return
null
, which indicates that don’t process the record. - If validation succeeds then return the
Employee
object, as it is.
import org.springframework.batch.item.ItemProcessor;
import com.howtodoinjava.demo.model.Employee;
public class ValidationProcessor implements ItemProcessor<Employee,Employee> {
public Employee process(Employee employee) throws Exception {
if (employee.getId() == null){
System.out.println("Missing employee id : " + employee.getId());
return null;
}
try {
if(Integer.valueOf(employee.getId()) <= 0) {
System.out.println("Invalid employee id : " + employee.getId());
return null;
}
} catch (NumberFormatException e) {
System.out.println("Invalid employee id : " + employee.getId());
return null;
}
return employee;
}
}
2. How to use ItemProcessor?
We shall use SimpleStepBuilder.processor() to set processor instance during setting the tasklets in Step. In the following example, we are registering the processor in the step1().
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import com.howtodoinjava.demo.model.Employee;
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Value("/input/inputData.csv")
private Resource inputResource;
@Bean
public Job readCSVFilesJob() {
return jobBuilderFactory
.get("readCSVFilesJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory
.get("step1")
.<Employee, Employee>chunk(1)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
public ItemProcessor<Employee, Employee> processor() {
return new ValidationProcessor();
}
@Bean
public FlatFileItemReader<Employee> reader() {
FlatFileItemReader<Employee> itemReader = new FlatFileItemReader<Employee>();
itemReader.setLineMapper(lineMapper());
itemReader.setLinesToSkip(1);
itemReader.setResource(inputResource);
return itemReader;
}
@Bean
public LineMapper<Employee> lineMapper() {
DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setNames(new String[] { "id", "firstName", "lastName" });
lineTokenizer.setIncludedFields(new int[] { 0, 1, 2 });
BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<Employee>();
fieldSetMapper.setTargetType(Employee.class);
lineMapper.setLineTokenizer(lineTokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
return lineMapper;
}
@Bean
public ConsoleItemWriter<Employee> writer() {
return new ConsoleItemWriter<Employee>();
}
}
3. Demo
In the demo, we are batch-processing a CSV file with the above content. Notice line number 5 which has an invalid ‘id‘ field. As a result, the ItemProcessor should detect this error, and return null, and finally this record should not be processed further by the ItemWriter.
id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
abc,David,Miller
4,David,Walsh
Start the job and watch the console.
2018-07-11 14:59:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] launched with the following parameters: [{JobID=1531301340005}]
2018-07-11 14:59:00 INFO - Executing step: [step1]
Employee [id=1, firstName=Lokesh, lastName=Gupta]
Employee [id=2, firstName=Amit, lastName=Mishra]
Employee [id=3, firstName=Pankaj, lastName=Kumar]
Invalid employee id : abc
Employee [id=4, firstName=David, lastName=Walsh]
2018-07-11 14:59:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] completed with the following parameters: [{JobID=1531301340005}] and the following status: [COMPLETED]
Drop me your questions in the comments section.
Happy Learning !!
Comments