Spring Batch Count of Processed Records Example

Lokesh Gupta

Learn to use ItemStream and ChunkListener to count number of records processed by Spring batch job and log the record count in logs file or console.

Record count using ItemStream

Below given ItemStream implementation count the number of records processed periodically.

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;

public class ItemCountItemStream implements ItemStream {

	public void open(ExecutionContext executionContext) throws ItemStreamException {
	}

	public void update(ExecutionContext executionContext) throws ItemStreamException {
		System.out.println("ItemCount: "+executionContext.get("FlatFileItemReader.read.count"));
	}

	public void close() throws ItemStreamException {
	}
}

How to use ItemCountItemStream

Register above created ItemCountItemStream using SimpleStepBuilder.stream() method in Tasklet.

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job readCSVFilesJob() {
	return jobBuilderFactory
			.get("readCSVFilesJob")
			.incrementer(new RunIdIncrementer())
			.start(step1())
			.build();
}

@Bean
public Step step1() {
	return stepBuilderFactory
			.get("step1")
			.<Employee, Employee>chunk(1)
			.reader(reader())
			.writer(writer())
			.stream(stream())
			.build();
}

@Bean
public ItemCountItemStream stream() {
	return new ItemCountItemStream();
}

Record count using ChunkListener

Below given ChunkListener implementation count the number of records processed periodically.

import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;

public class ItemCountListener implements ChunkListener {
	
	@Override
	public void beforeChunk(ChunkContext context) {
	}

	@Override
	public void afterChunk(ChunkContext context) {
		
		int count = context.getStepContext().getStepExecution().getReadCount();
		System.out.println("ItemCount: " + count);
	}
	
	@Override
	public void afterChunkError(ChunkContext context) {
	}
}

How to use ItemCountListener

Register above created ItemCountListener using SimpleStepBuilder.listener() method in Tasklet.

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job readCSVFilesJob() {
	return jobBuilderFactory
			.get("readCSVFilesJob")
			.incrementer(new RunIdIncrementer())
			.start(step1())
			.build();
}

@Bean
public Step step1() {
	return stepBuilderFactory
			.get("step1")
			.<Employee, Employee>chunk(1)
			.reader(reader())
			.writer(writer())
			.listener(listener())
			.build();
}

@Bean
public ItemCountListener listener() {
	return new ItemCountListener();
}

Record Count Demo

I am processing this CSV with ItemCountListener configuration given above.

id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
5,David,Walsh

The complete batch configuration looks like this:

package com.howtodoinjava.demo.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;

import com.howtodoinjava.demo.model.Employee;

@Configuration
@EnableBatchProcessing
public class BatchConfig 
{
	@Autowired
	private JobBuilderFactory jobBuilderFactory;

	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Value("/input/inputData.csv")
	private Resource inputResource;

	@Bean
	public Job readCSVFilesJob() {
		return jobBuilderFactory
				.get("readCSVFilesJob")
				.incrementer(new RunIdIncrementer())
				.start(step1())
				.build();
	}

	@Bean
	public Step step1() {
		return stepBuilderFactory
				.get("step1")
				.<Employee, Employee>chunk(1)
				.reader(reader())
				.writer(writer())
				.listener(listner())
				.build();
	}
	
	@Bean
	public ItemCountListener listner() {
		return new ItemCountListener();
	}

	@Bean
	public FlatFileItemReader<Employee> reader() {
		FlatFileItemReader<Employee> itemReader = new FlatFileItemReader<Employee>();
		itemReader.setLineMapper(lineMapper());
		itemReader.setLinesToSkip(1);
		itemReader.setResource(inputResource);
		return itemReader;
	}

	@Bean
	public LineMapper<Employee> lineMapper() {
		DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>();
		DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
		lineTokenizer.setNames(new String[] { "id", "firstName", "lastName" });
		lineTokenizer.setIncludedFields(new int[] { 0, 1, 2 });
		BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<Employee>();
		fieldSetMapper.setTargetType(Employee.class);
		lineMapper.setLineTokenizer(lineTokenizer);
		lineMapper.setFieldSetMapper(fieldSetMapper);
		return lineMapper;
	}

	@Bean
	public ConsoleItemWriter<Employee> writer() {
		return new ConsoleItemWriter<Employee>();
	}
}

Start the application as Spring boot application. Spring task scheduler will start the job.

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@SpringBootApplication
@EnableScheduling
public class App
{
    @Autowired
    JobLauncher jobLauncher;
     
    @Autowired
    Job job;
     
    public static void main(String[] args)
    {
        SpringApplication.run(App.class, args);
    }
     
    @Scheduled(cron = "0 */1 * * * ?")
    public void perform() throws Exception
    {
        JobParameters params = new JobParametersBuilder()
                .addString("JobID", String.valueOf(System.currentTimeMillis()))
                .toJobParameters();
        jobLauncher.run(job, params);
    }
}

Now watch the console.

2018-07-11 16:38:00 INFO  - Job: [SimpleJob: [name=readCSVFilesJob]] launched with the following parameters: [{JobID=1531307280004}]

2018-07-11 16:38:00 INFO  - Executing step: [step1]

Employee [id=1, firstName=Lokesh, lastName=Gupta]
ItemCount: 1
Employee [id=2, firstName=Amit, lastName=Mishra]
ItemCount: 2
Employee [id=3, firstName=Pankaj, lastName=Kumar]
ItemCount: 3
Employee [id=4, firstName=David, lastName=Miller]
ItemCount: 4
Employee [id=5, firstName=David, lastName=Walsh]
ItemCount: 5
ItemCount: 5

2018-07-11 16:38:00 INFO  - Job: [SimpleJob: [name=readCSVFilesJob]] completed with the following parameters: [{JobID=1531307280004}] and the following status: [COMPLETED]

Drop me your questions in comments section.

Happy Learning !!

Ref:
ItemStream Java Doc
ChunkListener Java Doc

Comments

Subscribe
Notify of
guest

2 Comments
Most Voted
Newest Oldest
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.