Spring Batch Count of Processed Records Example

Learn to use ItemStream and ChunkListener to count the number of records processed by Spring batch job and log the record count in the logs file or console.

1. Record Count using ItemStream

In Spring Batch, ItemStream is an interface that provides a way to maintain the state between restarts of a batch job. It’s part of the Spring Batch infrastructure for handling the reading, processing, and writing of data in a batch job.

1.1. Defining ItemStream Implementation

Below given ItemStream implementation count the number of records processed periodically.

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
 
public class ItemCountItemStream implements ItemStream {
 
  public void open(ExecutionContext executionContext) throws ItemStreamException {
  }
 
  public void update(ExecutionContext executionContext) throws ItemStreamException {
    //log the record count periodically
    System.out.println("ItemCount: "+executionContext.get("FlatFileItemReader.read.count"));
  }
 
  public void close() throws ItemStreamException {
  }
}

1.2. How to use ItemCountItemStream?

We need to register the above-created ItemCountItemStream using SimpleStepBuilder.stream() method in Tasklet. See an example registration example in the step1() method.

@Autowired
private JobBuilderFactory jobBuilderFactory;
 
@Autowired
private StepBuilderFactory stepBuilderFactory;
 
@Bean
public Job readCSVFilesJob() {
  return jobBuilderFactory
      .get("readCSVFilesJob")
      .incrementer(new RunIdIncrementer())
      .start(step1())
      .build();
}
 
@Bean
public Step step1() {
  return stepBuilderFactory
      .get("step1")
      .<Employee, Employee>chunk(1)
      .reader(reader())
      .writer(writer())
      .stream(stream())
      .build();
}
 
@Bean
public ItemCountItemStream stream() {
  return new ItemCountItemStream();
}

When you run this job, it will print the processed items and the final record count at the end of the job. The record count is saved and restored during a restart, demonstrating the use of ItemStream to maintain state across job executions.

2. Record count using ChunkListener

In Spring Batch, the ChunkListener interface provides methods that get called before and after a chunk is processed, as well as in the case of an error during processing. Generally used for logging and cleanup activities, it can also be used for counting the processed records.

2.1. Defining ChunkListener Implementation

Below given ChunkListener implementation count the number of records processed periodically, after a chunk is processed. See an example registration example in the step1() method.

import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;
 
public class ItemCountListener implements ChunkListener {
   
  @Override
  public void beforeChunk(ChunkContext context) {
  }
 
  @Override
  public void afterChunk(ChunkContext context) {
     
    int count = context.getStepContext().getStepExecution().getReadCount();
    System.out.println("ItemCount: " + count);
  }
   
  @Override
  public void afterChunkError(ChunkContext context) {
  }
}

2.2. How to use ItemCountListener?

We need to register above created ItemCountListener using SimpleStepBuilder.listener() method in the Tasklet bean definition.

@Autowired
private JobBuilderFactory jobBuilderFactory;
 
@Autowired
private StepBuilderFactory stepBuilderFactory;
 
@Bean
public Job readCSVFilesJob() {
  return jobBuilderFactory
      .get("readCSVFilesJob")
      .incrementer(new RunIdIncrementer())
      .start(step1())
      .build();
}
 
@Bean
public Step step1() {
  return stepBuilderFactory
      .get("step1")
      .<Employee, Employee>chunk(1)
      .reader(reader())
      .writer(writer())
      .listener(listener())
      .build();
}
 
@Bean
public ItemCountListener listener() {
  return new ItemCountListener();
}

3. Record Count Demo

I am processing the following CSV file with ItemCountListener configuration.

id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
5,David,Walsh

The complete batch configuration looks like this:

package com.howtodoinjava.demo.config;
 
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
 
import com.howtodoinjava.demo.model.Employee;
 
@Configuration
@EnableBatchProcessing
public class BatchConfig 
{
  @Autowired
  private JobBuilderFactory jobBuilderFactory;
 
  @Autowired
  private StepBuilderFactory stepBuilderFactory;
 
  @Value("/input/inputData.csv")
  private Resource inputResource;
 
  @Bean
  public Job readCSVFilesJob() {
    return jobBuilderFactory
        .get("readCSVFilesJob")
        .incrementer(new RunIdIncrementer())
        .start(step1())
        .build();
  }
 
  @Bean
  public Step step1() {
    return stepBuilderFactory
        .get("step1")
        .<Employee, Employee>chunk(1)
        .reader(reader())
        .writer(writer())
        .listener(listner())
        .build();
  }
   
  @Bean
  public ItemCountListener listner() {
    return new ItemCountListener();
  }
 
  @Bean
  public FlatFileItemReader<Employee> reader() {
    FlatFileItemReader<Employee> itemReader = new FlatFileItemReader<Employee>();
    itemReader.setLineMapper(lineMapper());
    itemReader.setLinesToSkip(1);
    itemReader.setResource(inputResource);
    return itemReader;
  }
 
  @Bean
  public LineMapper<Employee> lineMapper() {
    DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>();
    DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
    lineTokenizer.setNames(new String[] { "id", "firstName", "lastName" });
    lineTokenizer.setIncludedFields(new int[] { 0, 1, 2 });
    BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<Employee>();
    fieldSetMapper.setTargetType(Employee.class);
    lineMapper.setLineTokenizer(lineTokenizer);
    lineMapper.setFieldSetMapper(fieldSetMapper);
    return lineMapper;
  }
 
  @Bean
  public ConsoleItemWriter<Employee> writer() {
    return new ConsoleItemWriter<Employee>();
  }
}

Start the application as a Spring boot application. The spring task scheduler will start the job.

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
 
@SpringBootApplication
@EnableScheduling
public class App
{
    @Autowired
    JobLauncher jobLauncher;
      
    @Autowired
    Job job;
      
    public static void main(String[] args)
    {
        SpringApplication.run(App.class, args);
    }
      
    @Scheduled(cron = "0 */1 * * * ?")
    public void perform() throws Exception
    {
        JobParameters params = new JobParametersBuilder()
                .addString("JobID", String.valueOf(System.currentTimeMillis()))
                .toJobParameters();
        jobLauncher.run(job, params);
    }
}

Now watch the console. It will print the processed record count in the logs or console.

2018-07-11 16:38:00 INFO  - Job: [SimpleJob: [name=readCSVFilesJob]] launched with the following parameters: [{JobID=1531307280004}]
 
2018-07-11 16:38:00 INFO  - Executing step: [step1]
 
Employee [id=1, firstName=Lokesh, lastName=Gupta]
ItemCount: 1
Employee [id=2, firstName=Amit, lastName=Mishra]
ItemCount: 2
Employee [id=3, firstName=Pankaj, lastName=Kumar]
ItemCount: 3
Employee [id=4, firstName=David, lastName=Miller]
ItemCount: 4
Employee [id=5, firstName=David, lastName=Walsh]
ItemCount: 5
ItemCount: 5
 
2018-07-11 16:38:00 INFO  - Job: [SimpleJob: [name=readCSVFilesJob]] completed with the following parameters: [{JobID=1531307280004}] and the following status: [COMPLETED]

Drop me your questions in the comments section.

Happy Learning !!

Ref:
ItemStream Java Doc
ChunkListener Java Doc

Source Code on Github

Comments

Subscribe
Notify of
guest
2 Comments
Most Voted
Newest Oldest
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.

Our Blogs

REST API Tutorial

Dark Mode

Dark Mode