Learn to use ItemStream
and ChunkListener
to count number of records processed by Spring batch job and log the record count in logs file or console.
Record count using ItemStream
Below given ItemStream
implementation count the number of records processed periodically.
import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemStream; import org.springframework.batch.item.ItemStreamException; public class ItemCountItemStream implements ItemStream { public void open(ExecutionContext executionContext) throws ItemStreamException { } public void update(ExecutionContext executionContext) throws ItemStreamException { System.out.println("ItemCount: "+executionContext.get("FlatFileItemReader.read.count")); } public void close() throws ItemStreamException { } }
How to use ItemCountItemStream
Register above created ItemCountItemStream
using SimpleStepBuilder.stream()
method in Tasklet
.
@Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Job readCSVFilesJob() { return jobBuilderFactory .get("readCSVFilesJob") .incrementer(new RunIdIncrementer()) .start(step1()) .build(); } @Bean public Step step1() { return stepBuilderFactory .get("step1") .<Employee, Employee>chunk(1) .reader(reader()) .writer(writer()) .stream(stream()) .build(); } @Bean public ItemCountItemStream stream() { return new ItemCountItemStream(); }
Record count using ChunkListener
Below given ChunkListener
implementation count the number of records processed periodically.
import org.springframework.batch.core.ChunkListener; import org.springframework.batch.core.scope.context.ChunkContext; public class ItemCountListener implements ChunkListener { @Override public void beforeChunk(ChunkContext context) { } @Override public void afterChunk(ChunkContext context) { int count = context.getStepContext().getStepExecution().getReadCount(); System.out.println("ItemCount: " + count); } @Override public void afterChunkError(ChunkContext context) { } }
How to use ItemCountListener
Register above created ItemCountListener
using SimpleStepBuilder.listener()
method in Tasklet
.
@Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Job readCSVFilesJob() { return jobBuilderFactory .get("readCSVFilesJob") .incrementer(new RunIdIncrementer()) .start(step1()) .build(); } @Bean public Step step1() { return stepBuilderFactory .get("step1") .<Employee, Employee>chunk(1) .reader(reader()) .writer(writer()) .listener(listener()) .build(); } @Bean public ItemCountListener listener() { return new ItemCountListener(); }
Record Count Demo
I am processing this CSV with ItemCountListener
configuration given above.
id,firstName,lastName 1,Lokesh,Gupta 2,Amit,Mishra 3,Pankaj,Kumar 4,David,Miller 5,David,Walsh
The complete batch configuration looks like this:
package com.howtodoinjava.demo.config; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.LineMapper; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.Resource; import com.howtodoinjava.demo.model.Employee; @Configuration @EnableBatchProcessing public class BatchConfig { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Value("/input/inputData.csv") private Resource inputResource; @Bean public Job readCSVFilesJob() { return jobBuilderFactory .get("readCSVFilesJob") .incrementer(new RunIdIncrementer()) .start(step1()) .build(); } @Bean public Step step1() { return stepBuilderFactory .get("step1") .<Employee, Employee>chunk(1) .reader(reader()) .writer(writer()) .listener(listner()) .build(); } @Bean public ItemCountListener listner() { return new ItemCountListener(); } @Bean public FlatFileItemReader<Employee> reader() { FlatFileItemReader<Employee> itemReader = new FlatFileItemReader<Employee>(); itemReader.setLineMapper(lineMapper()); itemReader.setLinesToSkip(1); itemReader.setResource(inputResource); return itemReader; } @Bean public LineMapper<Employee> lineMapper() { DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>(); DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer(); lineTokenizer.setNames(new String[] { "id", "firstName", "lastName" }); lineTokenizer.setIncludedFields(new int[] { 0, 1, 2 }); BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<Employee>(); fieldSetMapper.setTargetType(Employee.class); lineMapper.setLineTokenizer(lineTokenizer); lineMapper.setFieldSetMapper(fieldSetMapper); return lineMapper; } @Bean public ConsoleItemWriter<Employee> writer() { return new ConsoleItemWriter<Employee>(); } }
Start the application as Spring boot application. Spring task scheduler will start the job.
import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; @SpringBootApplication @EnableScheduling public class App { @Autowired JobLauncher jobLauncher; @Autowired Job job; public static void main(String[] args) { SpringApplication.run(App.class, args); } @Scheduled(cron = "0 */1 * * * ?") public void perform() throws Exception { JobParameters params = new JobParametersBuilder() .addString("JobID", String.valueOf(System.currentTimeMillis())) .toJobParameters(); jobLauncher.run(job, params); } }
Now watch the console.
2018-07-11 16:38:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] launched with the following parameters: [{JobID=1531307280004}] 2018-07-11 16:38:00 INFO - Executing step: [step1] Employee [id=1, firstName=Lokesh, lastName=Gupta] ItemCount: 1 Employee [id=2, firstName=Amit, lastName=Mishra] ItemCount: 2 Employee [id=3, firstName=Pankaj, lastName=Kumar] ItemCount: 3 Employee [id=4, firstName=David, lastName=Miller] ItemCount: 4 Employee [id=5, firstName=David, lastName=Walsh] ItemCount: 5 ItemCount: 5 2018-07-11 16:38:00 INFO - Job: [SimpleJob: [name=readCSVFilesJob]] completed with the following parameters: [{JobID=1531307280004}] and the following status: [COMPLETED]
Drop me your questions in comments section.
Happy Learning !!
Ref:
ItemStream Java Doc
ChunkListener Java Doc
Eddie
Hi, I have a problem close to this.
I have a CSV with 50,000 records. And I don’t want to log every line.
I just want to log only the line number of read or write record that have failed.
Do you have any solution ?
Many Thanks !
SureshAmeda
how to read files if one file has corrupted to read remaining files using MultiResourceItemReader in spring boot?