Spring Batch Count of Processed Records Example

Learn to use ItemStream and ChunkListener to count number of records processed by Spring batch job and log the record count in logs file or console.

Record count using ItemStream

Below given ItemStream implementation count the number of records processed periodically.

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;

public class ItemCountItemStream implements ItemStream {

	public void open(ExecutionContext executionContext) throws ItemStreamException {
	}

	public void update(ExecutionContext executionContext) throws ItemStreamException {
		System.out.println("ItemCount: "+executionContext.get("FlatFileItemReader.read.count"));
	}

	public void close() throws ItemStreamException {
	}
}

How to use ItemCountItemStream

Register above created ItemCountItemStream using SimpleStepBuilder.stream() method in Tasklet.

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job readCSVFilesJob() {
	return jobBuilderFactory
			.get("readCSVFilesJob")
			.incrementer(new RunIdIncrementer())
			.start(step1())
			.build();
}

@Bean
public Step step1() {
	return stepBuilderFactory
			.get("step1")
			.<Employee, Employee>chunk(1)
			.reader(reader())
			.writer(writer())
			.stream(stream())
			.build();
}

@Bean
public ItemCountItemStream stream() {
	return new ItemCountItemStream();
}

Record count using ChunkListener

Below given ChunkListener implementation count the number of records processed periodically.

import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;

public class ItemCountListener implements ChunkListener {
	
	@Override
	public void beforeChunk(ChunkContext context) {
	}

	@Override
	public void afterChunk(ChunkContext context) {
		
		int count = context.getStepContext().getStepExecution().getReadCount();
		System.out.println("ItemCount: " + count);
	}
	
	@Override
	public void afterChunkError(ChunkContext context) {
	}
}

How to use ItemCountListener

Register above created ItemCountListener using SimpleStepBuilder.listener() method in Tasklet.

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job readCSVFilesJob() {
	return jobBuilderFactory
			.get("readCSVFilesJob")
			.incrementer(new RunIdIncrementer())
			.start(step1())
			.build();
}

@Bean
public Step step1() {
	return stepBuilderFactory
			.get("step1")
			.<Employee, Employee>chunk(1)
			.reader(reader())
			.writer(writer())
			.listener(listener())
			.build();
}

@Bean
public ItemCountListener listener() {
	return new ItemCountListener();
}

Record Count Demo

I am processing this CSV with ItemCountListener configuration given above.

id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
5,David,Walsh

The complete batch configuration looks like this:

package com.howtodoinjava.demo.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;

import com.howtodoinjava.demo.model.Employee;

@Configuration
@EnableBatchProcessing
public class BatchConfig 
{
	@Autowired
	private JobBuilderFactory jobBuilderFactory;

	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Value("/input/inputData.csv")
	private Resource inputResource;

	@Bean
	public Job readCSVFilesJob() {
		return jobBuilderFactory
				.get("readCSVFilesJob")
				.incrementer(new RunIdIncrementer())
				.start(step1())
				.build();
	}

	@Bean
	public Step step1() {
		return stepBuilderFactory
				.get("step1")
				.<Employee, Employee>chunk(1)
				.reader(reader())
				.writer(writer())
				.listener(listner())
				.build();
	}
	
	@Bean
	public ItemCountListener listner() {
		return new ItemCountListener();
	}

	@Bean
	public FlatFileItemReader<Employee> reader() {
		FlatFileItemReader<Employee> itemReader = new FlatFileItemReader<Employee>();
		itemReader.setLineMapper(lineMapper());
		itemReader.setLinesToSkip(1);
		itemReader.setResource(inputResource);
		return itemReader;
	}

	@Bean
	public LineMapper<Employee> lineMapper() {
		DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>();
		DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
		lineTokenizer.setNames(new String[] { "id", "firstName", "lastName" });
		lineTokenizer.setIncludedFields(new int[] { 0, 1, 2 });
		BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<Employee>();
		fieldSetMapper.setTargetType(Employee.class);
		lineMapper.setLineTokenizer(lineTokenizer);
		lineMapper.setFieldSetMapper(fieldSetMapper);
		return lineMapper;
	}

	@Bean
	public ConsoleItemWriter<Employee> writer() {
		return new ConsoleItemWriter<Employee>();
	}
}

Start the application as Spring boot application. Spring task scheduler will start the job.

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@SpringBootApplication
@EnableScheduling
public class App
{
    @Autowired
    JobLauncher jobLauncher;
     
    @Autowired
    Job job;
     
    public static void main(String[] args)
    {
        SpringApplication.run(App.class, args);
    }
     
    @Scheduled(cron = "0 */1 * * * ?")
    public void perform() throws Exception
    {
        JobParameters params = new JobParametersBuilder()
                .addString("JobID", String.valueOf(System.currentTimeMillis()))
                .toJobParameters();
        jobLauncher.run(job, params);
    }
}

Now watch the console.

2018-07-11 16:38:00 INFO  - Job: [SimpleJob: [name=readCSVFilesJob]] launched with the following parameters: [{JobID=1531307280004}]

2018-07-11 16:38:00 INFO  - Executing step: [step1]

Employee [id=1, firstName=Lokesh, lastName=Gupta]
ItemCount: 1
Employee [id=2, firstName=Amit, lastName=Mishra]
ItemCount: 2
Employee [id=3, firstName=Pankaj, lastName=Kumar]
ItemCount: 3
Employee [id=4, firstName=David, lastName=Miller]
ItemCount: 4
Employee [id=5, firstName=David, lastName=Walsh]
ItemCount: 5
ItemCount: 5

2018-07-11 16:38:00 INFO  - Job: [SimpleJob: [name=readCSVFilesJob]] completed with the following parameters: [{JobID=1531307280004}] and the following status: [COMPLETED]

Drop me your questions in comments section.

Happy Learning !!

Ref:
ItemStream Java Doc
ChunkListener Java Doc

Was this post helpful?

Join 8000+ Awesome Developers, Like YOU!

2 thoughts on “Spring Batch Count of Processed Records Example”

  1. Hi, I have a problem close to this.
    I have a CSV with 50,000 records. And I don’t want to log every line.
    I just want to log only the line number of read or write record that have failed.
    Do you have any solution ?

    Many Thanks !

    Reply

Leave a Comment

About HowToDoInJava

This blog provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions, and frequently asked interview questions.