HowToDoInJava

  • Python
  • Java
  • Spring Boot
  • Dark Mode
Home / Spring Batch / Spring Batch Count of Processed Records Example

Spring Batch Count of Processed Records Example

Learn to use ItemStream and ChunkListener to count number of records processed by Spring batch job and log the record count in logs file or console.

Record count using ItemStream

Below given ItemStream implementation count the number of records processed periodically.

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;

public class ItemCountItemStream implements ItemStream {

	public void open(ExecutionContext executionContext) throws ItemStreamException {
	}

	public void update(ExecutionContext executionContext) throws ItemStreamException {
		System.out.println("ItemCount: "+executionContext.get("FlatFileItemReader.read.count"));
	}

	public void close() throws ItemStreamException {
	}
}

How to use ItemCountItemStream

Register above created ItemCountItemStream using SimpleStepBuilder.stream() method in Tasklet.

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job readCSVFilesJob() {
	return jobBuilderFactory
			.get("readCSVFilesJob")
			.incrementer(new RunIdIncrementer())
			.start(step1())
			.build();
}

@Bean
public Step step1() {
	return stepBuilderFactory
			.get("step1")
			.<Employee, Employee>chunk(1)
			.reader(reader())
			.writer(writer())
			.stream(stream())
			.build();
}

@Bean
public ItemCountItemStream stream() {
	return new ItemCountItemStream();
}

Record count using ChunkListener

Below given ChunkListener implementation count the number of records processed periodically.

import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;

public class ItemCountListener implements ChunkListener {
	
	@Override
	public void beforeChunk(ChunkContext context) {
	}

	@Override
	public void afterChunk(ChunkContext context) {
		
		int count = context.getStepContext().getStepExecution().getReadCount();
		System.out.println("ItemCount: " + count);
	}
	
	@Override
	public void afterChunkError(ChunkContext context) {
	}
}

How to use ItemCountListener

Register above created ItemCountListener using SimpleStepBuilder.listener() method in Tasklet.

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job readCSVFilesJob() {
	return jobBuilderFactory
			.get("readCSVFilesJob")
			.incrementer(new RunIdIncrementer())
			.start(step1())
			.build();
}

@Bean
public Step step1() {
	return stepBuilderFactory
			.get("step1")
			.<Employee, Employee>chunk(1)
			.reader(reader())
			.writer(writer())
			.listener(listener())
			.build();
}

@Bean
public ItemCountListener listener() {
	return new ItemCountListener();
}

Record Count Demo

I am processing this CSV with ItemCountListener configuration given above.

id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
5,David,Walsh

The complete batch configuration looks like this:

package com.howtodoinjava.demo.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;

import com.howtodoinjava.demo.model.Employee;

@Configuration
@EnableBatchProcessing
public class BatchConfig 
{
	@Autowired
	private JobBuilderFactory jobBuilderFactory;

	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Value("/input/inputData.csv")
	private Resource inputResource;

	@Bean
	public Job readCSVFilesJob() {
		return jobBuilderFactory
				.get("readCSVFilesJob")
				.incrementer(new RunIdIncrementer())
				.start(step1())
				.build();
	}

	@Bean
	public Step step1() {
		return stepBuilderFactory
				.get("step1")
				.<Employee, Employee>chunk(1)
				.reader(reader())
				.writer(writer())
				.listener(listner())
				.build();
	}
	
	@Bean
	public ItemCountListener listner() {
		return new ItemCountListener();
	}

	@Bean
	public FlatFileItemReader<Employee> reader() {
		FlatFileItemReader<Employee> itemReader = new FlatFileItemReader<Employee>();
		itemReader.setLineMapper(lineMapper());
		itemReader.setLinesToSkip(1);
		itemReader.setResource(inputResource);
		return itemReader;
	}

	@Bean
	public LineMapper<Employee> lineMapper() {
		DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>();
		DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
		lineTokenizer.setNames(new String[] { "id", "firstName", "lastName" });
		lineTokenizer.setIncludedFields(new int[] { 0, 1, 2 });
		BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<Employee>();
		fieldSetMapper.setTargetType(Employee.class);
		lineMapper.setLineTokenizer(lineTokenizer);
		lineMapper.setFieldSetMapper(fieldSetMapper);
		return lineMapper;
	}

	@Bean
	public ConsoleItemWriter<Employee> writer() {
		return new ConsoleItemWriter<Employee>();
	}
}

Start the application as Spring boot application. Spring task scheduler will start the job.

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@SpringBootApplication
@EnableScheduling
public class App
{
    @Autowired
    JobLauncher jobLauncher;
     
    @Autowired
    Job job;
     
    public static void main(String[] args)
    {
        SpringApplication.run(App.class, args);
    }
     
    @Scheduled(cron = "0 */1 * * * ?")
    public void perform() throws Exception
    {
        JobParameters params = new JobParametersBuilder()
                .addString("JobID", String.valueOf(System.currentTimeMillis()))
                .toJobParameters();
        jobLauncher.run(job, params);
    }
}

Now watch the console.

2018-07-11 16:38:00 INFO  - Job: [SimpleJob: [name=readCSVFilesJob]] launched with the following parameters: [{JobID=1531307280004}]

2018-07-11 16:38:00 INFO  - Executing step: [step1]

Employee [id=1, firstName=Lokesh, lastName=Gupta]
ItemCount: 1
Employee [id=2, firstName=Amit, lastName=Mishra]
ItemCount: 2
Employee [id=3, firstName=Pankaj, lastName=Kumar]
ItemCount: 3
Employee [id=4, firstName=David, lastName=Miller]
ItemCount: 4
Employee [id=5, firstName=David, lastName=Walsh]
ItemCount: 5
ItemCount: 5

2018-07-11 16:38:00 INFO  - Job: [SimpleJob: [name=readCSVFilesJob]] completed with the following parameters: [{JobID=1531307280004}] and the following status: [COMPLETED]

Drop me your questions in comments section.

Happy Learning !!

Ref:
ItemStream Java Doc
ChunkListener Java Doc

Was this post helpful?

Let us know if you liked the post. That’s the only way we can improve.
TwitterFacebookLinkedInRedditPocket

About Lokesh Gupta

A family guy with fun loving nature. Love computers, programming and solving everyday problems. Find me on Facebook and Twitter.

Feedback, Discussion and Comments

  1. Eddie

    March 11, 2020

    Hi, I have a problem close to this.
    I have a CSV with 50,000 records. And I don’t want to log every line.
    I just want to log only the line number of read or write record that have failed.
    Do you have any solution ?

    Many Thanks !

  2. SureshAmeda

    November 2, 2018

    how to read files if one file has corrupted to read remaining files using MultiResourceItemReader in spring boot?

Comments are closed on this article!

Search Tutorials

Spring Batch Tutorial

  • Spring Batch – Java Config
  • Spring Batch – Classifier
  • Spring Batch – Partitioner
  • Spring Batch – Event Listeners
  • Spring Batch – ItemProcessor
  • Spring Batch – Job Scheduling
  • Spring Batch – Quartz
  • Spring Batch – Jdbcjobstore
  • Spring Batch – DI in Quartz Job
  • Spring Batch – FlatFileItemReader
  • Spring Batch – FlatFileItemWriter
  • SB – MultiResourceItemReader
  • Spring Batch – Delete Files
  • Spring Batch – Records Counting
  • Spring Batch – CSV to Database

Meta Links

  • About Me
  • Contact Us
  • Privacy policy
  • Advertise
  • Guest and Sponsored Posts

Recommended Reading

  • 10 Life Lessons
  • Secure Hash Algorithms
  • How Web Servers work?
  • How Java I/O Works Internally?
  • Best Way to Learn Java
  • Java Best Practices Guide
  • Microservices Tutorial
  • REST API Tutorial
  • How to Start New Blog

Copyright © 2020 · HowToDoInjava.com · All Rights Reserved. | Sitemap

  • Sealed Classes and Interfaces