Learn to use ItemStream
and ChunkListener
to count number of records processed by Spring batch job and log the record count in logs file or console.
Record count using ItemStream
Below given ItemStream
implementation count the number of records processed periodically.
import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemStream; import org.springframework.batch.item.ItemStreamException; public class ItemCountItemStream implements ItemStream { public void open(ExecutionContext executionContext) throws ItemStreamException { } public void update(ExecutionContext executionContext) throws ItemStreamException { System.out.println("ItemCount: "+executionContext.get("FlatFileItemReader.read.count")); } public void close() throws ItemStreamException { } }
How to use ItemCountItemStream
Register above created ItemCountItemStream
using SimpleStepBuilder.stream()
method in Tasklet
.
@Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Job readCSVFilesJob() { return jobBuilderFactory .get("readCSVFilesJob") .incrementer(new RunIdIncrementer()) .start(step1()) .build(); } @Bean public Step step1() { return stepBuilderFactory .get("step1") .<Employee, Employee>chunk(1) .reader(reader()) .writer(writer()) .stream(stream()) .build(); } @Bean public ItemCountItemStream stream() { return new ItemCountItemStream(); }
Record count using ChunkListener
Below given ChunkListener
implementation count the number of records processed periodically.
import org.springframework.batch.core.ChunkListener; import org.springframework.batch.core.scope.context.ChunkContext; public class ItemCountListener implements ChunkListener { @Override public void beforeChunk(ChunkContext context) { } @Override public void afterChunk(ChunkContext context) { int count = context.getStepContext().getStepExecution().getReadCount(); System.out.println("ItemCount: " + count); } @Override public void afterChunkError(ChunkContext context) { } }
How to use ItemCountListener
Register above created ItemCountListener
using SimpleStepBuilder.listener()
method in Tasklet
.
@Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Job readCSVFilesJob() { return jobBuilderFactory .get("readCSVFilesJob") .incrementer(new RunIdIncrementer()) .start(step1()) .build(); } @Bean public Step step1() { return stepBuilderFactory .get("step1") .<Employee, Employee>chunk(1) .reader(reader()) .writer(writer()) .listener(listener()) .build(); } @Bean public ItemCountListener listener() { return new ItemCountListener(); }
Record Count Demo
I am processing this CSV with ItemCountListener
configuration given above.
id,firstName,lastName 1,Lokesh,Gupta 2,Amit,Mishra 3,Pankaj,Kumar 4,David,Miller 5,David,Walsh
The complete batch configuration looks like this:
package com.howtodoinjava.demo.config; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.LineMapper; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.Resource; import com.howtodoinjava.demo.model.Employee; @Configuration @EnableBatchProcessing public class BatchConfig { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Value("/input/inputData.csv") private Resource inputResource; @Bean public Job readCSVFilesJob() { return jobBuilderFactory .get("readCSVFilesJob") .incrementer(new RunIdIncrementer()) .start(step1()) .build(); } @Bean public Step step1() { return stepBuilderFactory .get("step1") .<Employee, Employee>chunk(1) .reader(reader()) .writer(writer()) .listener(listner()) .build(); } @Bean public ItemCountListener listner() { return new ItemCountListener(); } @Bean public FlatFileItemReader<Employee> reader() { FlatFileItemReader<Employee> itemReader = new FlatFileItemReader<Employee>(); itemReader.setLineMapper(lineMapper()); itemReader.setLinesToSkip(1); itemReader.setResource(inputResource); return itemReader; } @Bean public LineMapper<Employee> lineMapper() { DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>(); DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer(); lineTokenizer.setNames(new String[] { "id", "firstName", "lastName" }); lineTokenizer.setIncludedFields(new int[] { 0, 1, 2 }); BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<Employee>(); fieldSetMapper.setTargetType(Employee.class); lineMapper.setLineTokenizer(lineTokenizer); lineMapper.setFieldSetMapper(fieldSetMapper); return lineMapper; } @Bean public ConsoleItemWriter<Employee> writer() { return new ConsoleItemWriter<Employee>(); } }
Start the application as Spring boot application. Spring task scheduler will start the job.
import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; @SpringBootApplication @EnableScheduling public class App { @Autowired JobLauncher jobLauncher; @Autowired Job job; public static void main(String[] args) { SpringApplication.run(App.class, args); } @Scheduled(cron = "0 */1 * * * ?") public void perform() throws Exception { JobParameters params = new JobParametersBuilder() .addString("JobID", String.valueOf(System.currentTimeMillis())) .toJobParameters(); jobLauncher.run(job, params); } }
Now watch the console.
2018-07-11 16:38:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] launched with the following parameters: [{JobID=1531307280004}] 2018-07-11 16:38:00 INFO - Executing step: [step1] Employee [id=1, firstName=Lokesh, lastName=Gupta] ItemCount: 1 Employee [id=2, firstName=Amit, lastName=Mishra] ItemCount: 2 Employee [id=3, firstName=Pankaj, lastName=Kumar] ItemCount: 3 Employee [id=4, firstName=David, lastName=Miller] ItemCount: 4 Employee [id=5, firstName=David, lastName=Walsh] ItemCount: 5 ItemCount: 5 2018-07-11 16:38:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] completed with the following parameters: [{JobID=1531307280004}] and the following status: [COMPLETED]
Drop me your questions in comments section.
Happy Learning !!
Comments