Spring Batch ItemProcessor Example

Learn to use Spring batch ItemProcessor to add business logic after reading the input and before passing it to the writer for writing to the file/database. It should be noted that while it’s possible to return a different datatype than the one provided as input, it’s not necessary.

Returning null from ItemProcessor indicates that the item should not be continued to be processed, further in the batch processing.

1. Writing an ItemProcessor

A custom ItemProcessor is created by implementing the interface ItemProcessor, and implementing its process() method.

For example, the given ItemProcessor implementation processes the Employee records. During processing, it does the following tasks:

  • Validate if 'id' field is set.
  • Validate if 'id' field is parsable to an integer.
  • Validate if 'id' field is a positive integer greater than zero.
  • If validation fails then return null, which indicates that don’t process the record.
  • If validation succeeds then return the Employee object, as it is.
import org.springframework.batch.item.ItemProcessor;
import com.howtodoinjava.demo.model.Employee;
 
public class ValidationProcessor implements ItemProcessor<Employee,Employee> {

  public Employee process(Employee employee) throws Exception {

    if (employee.getId() == null){
      System.out.println("Missing employee id : " + employee.getId());
      return null;
    } 
     
    try {
      if(Integer.valueOf(employee.getId()) <= 0) {
        System.out.println("Invalid employee id : " + employee.getId());
        return null;
      }
    } catch (NumberFormatException e) {
      System.out.println("Invalid employee id : " + employee.getId());
      return null;
    }

    return employee;
  }
}

2. How to use ItemProcessor?

We shall use SimpleStepBuilder.processor() to set processor instance during setting the tasklets in Step. In the following example, we are registering the processor in the step1().

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
 
import com.howtodoinjava.demo.model.Employee;
 
@Configuration
@EnableBatchProcessing
public class BatchConfig {

  @Autowired
  private JobBuilderFactory jobBuilderFactory;
 
  @Autowired
  private StepBuilderFactory stepBuilderFactory;
 
  @Value("/input/inputData.csv")
  private Resource inputResource;
 
  @Bean
  public Job readCSVFilesJob() {
    return jobBuilderFactory
        .get("readCSVFilesJob")
        .incrementer(new RunIdIncrementer())
        .start(step1())
        .build();
  }
 
  @Bean
  public Step step1() {
    return stepBuilderFactory
        .get("step1")
        .<Employee, Employee>chunk(1)
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .build();
  }
 
  @Bean
  public ItemProcessor<Employee, Employee> processor() {
    return new ValidationProcessor();
  }
 
  @Bean
  public FlatFileItemReader<Employee> reader() {
    FlatFileItemReader<Employee> itemReader = new FlatFileItemReader<Employee>();
    itemReader.setLineMapper(lineMapper());
    itemReader.setLinesToSkip(1);
    itemReader.setResource(inputResource);
    return itemReader;
  }
 
  @Bean
  public LineMapper<Employee> lineMapper() {
    DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>();
    DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
    lineTokenizer.setNames(new String[] { "id", "firstName", "lastName" });
    lineTokenizer.setIncludedFields(new int[] { 0, 1, 2 });
    BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<Employee>();
    fieldSetMapper.setTargetType(Employee.class);
    lineMapper.setLineTokenizer(lineTokenizer);
    lineMapper.setFieldSetMapper(fieldSetMapper);
    return lineMapper;
  }
 
  @Bean
  public ConsoleItemWriter<Employee> writer() {
    return new ConsoleItemWriter<Employee>();
  }
}

3. Demo

In the demo, we are batch-processing a CSV file with the above content. Notice line number 5 which has an invalid ‘id‘ field. As a result, the ItemProcessor should detect this error, and return null, and finally this record should not be processed further by the ItemWriter.

id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
abc,David,Miller
4,David,Walsh

Start the job and watch the console.

2018-07-11 14:59:00 INFO  - Job: [SimpleJob: [name=readCSVFilesJob]] launched with the following parameters: [{JobID=1531301340005}]

2018-07-11 14:59:00 INFO  - Executing step: [step1]
Employee [id=1, firstName=Lokesh, lastName=Gupta]
Employee [id=2, firstName=Amit, lastName=Mishra]
Employee [id=3, firstName=Pankaj, lastName=Kumar]

Invalid employee id : abc

Employee [id=4, firstName=David, lastName=Walsh]

2018-07-11 14:59:00 INFO  - Job: [SimpleJob: [name=readCSVFilesJob]] completed with the following parameters: [{JobID=1531301340005}] and the following status: [COMPLETED]

Drop me your questions in the comments section.

Happy Learning !!

Ref: ItemProcessor Java Doc

Source Code on Github

Leave a Comment

  1. My ItemReader returns a list of objects. The processor takes the list and converts each instance of the object to String and returns a list of String values.
    This is a pretty generic implementation in the sense that all the Objects extend out of a Base DTO and all of them have overridden toString method to ensure that the String representation is in the correct format.
    My question is that if I want to send some additional parameters from the Reader to the Processor, how can I do the same? For example a true or false value, depending on the Reader Implementation.
    Can I access the Job Parameters in the Processor?If yes, then I can set the boolean value as one of the Job Parameters and access from the Processor.

    Reply
  2. I am able to write the invalid records to a file and I want to add the statement for each record with which they failed. How can I achieve that ?

    Reply
  3. In a custom item processor, can we implement too the JobExecutionListener and StepExecutionListener?
    This is a good or bad practice?

    Reply
  4. Thanks Lokesh. This helped alot in understanding spring batch..

    How can we write functional/integration tests for this batch job?

    Reply

Leave a Comment

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.

Our Blogs

REST API Tutorial

Dark Mode

Dark Mode