Spring Batch Event Listeners

As Spring Batch jobs can run for long times, providing progress information is often critical. For example, which Job is in progress, what all jobs have failed, and what all have been completed. All such information can be gathered using the batch event listeners.

In this tutorial, we will learn about seven available event listeners and how to create and configure them in a Spring batch application. We have the following types of event listeners which intercept the batch processing at specific events.

Listener InterfaceAnnotationsUsage
JobExecutionListener@BeforeJob
@AfterJob
Before the Job has been started
After the Job has been completed
StepExecutionListener@BeforeStep
@AfterStep
Before the Step has been started
After the Step has been completed
ChunkListener@BeforeChunk
@AfterChunk
@AfterChunkError
Before the Chunk has been started
After the Chunk has been completed
If any error occurred while processing the Chunk
ItemReadListener@BeforeRead
@AfterRead
@OnReadError
Before a new Item is read
After the Item has been read
If any error occurred while reading the Item
ItemProcessListener@BeforeProcess
@AfterProcess
@OnProcessError
Before an Item is processed
After the Item has been processed
If any error occurred while processing the Item
ItemWriteListener@BeforeWrite
@AfterWrite
@OnWriteError
Before a new Item is written
After the Item has been written
If any error occurred while writing the Item
SkipListener@OnSkipInRead
@OnSkipInWrite
@OnSkipInProcess
When an Item is skipped due to an error in ItemReader
When an Item is skipped due to an error in ItemWriter
When an Item is skipped due to an error in ItemProcessor

1. JobExecutionListener

The JobExecutionListener provides callbacks before the start and after the completion of a Job.

The annotations corresponding to this interface are:

  • @BeforeJob
  • @AfterJob

1.1. Implementing JobExecutionListener

It should be noted that the afterJob() method is called regardless of the success or failure of the Job

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

public class LoggingJobExecutionListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {
        System.out.println("Job started at: " + jobExecution.getStartTime());
        // Add any setup or logic before the job starts
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println("Job finished at: " + jobExecution.getEndTime());
        // Add any cleanup or logic after the job completes
    }
}

We can rewrite this class using the annotations approach as follows:

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.annotation.AfterJob;

public class LoggingJobExecutionListener {

    @BeforeJob
    public void beforeJob(JobExecution jobExecution) {
        System.out.println("Before job logic...");
        // Perform any setup or logic before the job starts
    }

    @AfterJob
    public void afterJob(JobExecution jobExecution) {
        System.out.println("After job logic...");
        // Perform any cleanup or logic after the job completes
    }
}

We can use the other annotations for different listener types, in a similar way. In the other sections, we will not be repeating the annotation approach.

1.2. Configuring JobExecutionListener

@Bean
public Job demoJob(){

    return jobs.get("demoJob")
            .incrementer(new RunIdIncrementer())
            .listener(new LoggingJobExecutionListener())
            .start(stepOne())
            .next(stepTwo())
            .build();
}

2. StepExecutionListener

The StepExecutionListener allows for notification before a Step is started and after it ends, whether it ended normally or failed.

The annotations corresponding to this interface are:

  • @BeforeStep
  • @AfterStep

2.1. Implementing StepExecutionListener

import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;

public class LoggingStepExecutionListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("Step started at: " + stepExecution.getStartTime());
        // Add any setup or logic before the step starts
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("Step finished at: " + stepExecution.getEndTime());
        // Add any cleanup or logic after the step completes
        return null; // Return null to use the default ExitStatus
    }
}

ExitStatus is the return type of afterStep in order to allow listeners the chance to modify the exit code that is returned upon completion of a Step.

2.2. Configuring StepExecutionListener

@Bean
public Step stepOne(){

    return steps.get("stepOne")
            .tasklet(new MyTaskOne())
            .listener(new StepResultListener())
            .build();
}

@Bean
public Step stepTwo(){
    return steps.get("stepTwo")
            .tasklet(new MyTaskTwo())
            .listener(new LoggingStepExecutionListener())
            .build();
}

3. ChunkListener

A chunk is defined as the items processed within the scope of a transaction. Committing a transaction, at each commit interval, commits a ‘chunk’.

The ChunkListener can be used to perform logic before a chunk begins processing or after a chunk has been completed successfully.

The annotations corresponding to this interface are:

  • @BeforeChunk
  • @AfterChunk
  • @AfterChunkError

3.1. Implementing ChunkListener

import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;

import java.util.List;

public class LoggingChunkListener implements ChunkListener {

    @Override
    public void beforeChunk(ChunkContext context) {
        System.out.println("Before chunk processing...");
        // Add any pre-processing logic here
    }

    @Override
    public void afterChunk(ChunkContext context) {
        System.out.println("After chunk processing...");
        // Add any post-processing logic here
    }

    @Override
    public void afterChunkError(ChunkContext context) {
        System.out.println("Error during chunk processing...");
        // Add error handling or cleanup logic here
    }
}
  • The beforeChunk method is called after the transaction is started but before read is called on the ItemReader.
  • The afterChunk is called after the chunk has been committed and there is no rollback.

3.2. Configuring ChunkListener

@Bean
public Step stepOne(){

    return steps.get("stepOne")
            .tasklet(new MyTaskOne())
            .listener(new LoggingChunkListener())
            .build();
}

4. ItemReadListener

The ItemReadListener provides methods invoked around the reading of an item. Read listeners are beneficial to log the skipped records so that skipped records can be dealt with later.

The annotations corresponding to this interface are:

  • @BeforeRead
  • @AfterRead
  • @OnReadError

4.1. Implementing ItemReadListener

import com.howtodoinjava.demo.batch.jobs.csvToDb.model.Person;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ItemReadListener;

public class PersonItemReadListener implements ItemReadListener<Person> {

  public static final Logger logger = LoggerFactory.getLogger(PersonItemReadListener.class);

  @Override
  public void beforeRead() {
    logger.info("Reading a new Person Record");
  }

  @Override
  public void afterRead(Person input) {
    logger.info("New Person record read : " + input);
  }

  @Override
  public void onReadError(Exception e) {
    logger.error("Error in reading the person record : " + e);
  }
}
  • The beforeRead method is called before each call to read on the ItemReader
  • The afterRead method is called after each successful call to read and is passed the item that was read.
  • If there was an error while reading, the onReadError method is called.

4.2. Configuring ItemReadListener

@Bean
public Step stepOne(){

    return steps.get("stepOne")
            .tasklet(new MyTaskOne())
            .listener(new PersonItemReadListener())
            .build();
}

5. ItemProcessListener

The ItemProcessListener provides methods invoked around the processing of an item. Implementations of this interface will be notified before and after an item is passed to the ItemProcessor and in the event of any exceptions thrown by the processor.

The annotations corresponding to this interface are:

  • @BeforeProcess
  • @AfterProcess
  • @OnProcessError

5.1. Implementing ItemProcessListener

import com.howtodoinjava.demo.batch.jobs.csvToDb.model.Person;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ItemProcessListener;

public class PersonItemProcessListener implements ItemProcessListener<Person, Person> {

  public static final Logger logger = LoggerFactory.getLogger(PersonItemProcessListener.class);

  @Override
  public void beforeProcess(Person input) {
    logger.info("Person record has been read: " + input);
  }

  @Override
  public void afterProcess(Person input, Person result) {
    logger.info("Person record has been processed to : " + result);
  }

  @Override
  public void onProcessError(Person input, Exception e) {
    logger.error("Error in reading the person record : " + input);
    logger.error("Error in reading the person record : " + e);
  }
}

5.2. Configuring ItemProcessListener

@Bean
public Step stepOne(){
    return steps.get("stepOne")
            .tasklet(new MyTaskOne())
            .listener(new PersonItemProcessListener())
            .build();
}

6. ItemWriteListener

Th ItemWriterListener is used for events notified before, after, and in case of any exception thrown while writing a list of items.

The annotations corresponding to this interface are:

  • @BeforeWrite
  • @AfterWrite
  • @OnWriteError

6.1. Implementing ItemWriteListener

import com.howtodoinjava.demo.batch.jobs.csvToDb.model.Person;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.item.Chunk;

public class PersonItemWriteListener implements ItemWriteListener<Person> {

  public static final Logger logger = LoggerFactory.getLogger(PersonItemReadListener.class);

  @Override
  public void beforeWrite(Chunk<? extends Person> items) {
    logger.info("Writing started persons list : " + items);
  }

  @Override
  public void afterWrite(Chunk<? extends Person> items) {
    logger.info("Writing completed persons list : " + items);
    ;
  }

  @Override
  public void onWriteError(Exception e, Chunk<? extends Person> items) {
    logger.error("Error in reading the person records " + items);
    logger.error("Error in reading the person records " + e);
  }
}

6.2. Configuring ItemWriteListener

@Bean
public Step stepOne(){
    return steps.get("stepOne")
            .tasklet(new MyTaskOne())
            .listener(new PersonItemWriteListener())
            .build();
}

7. SkipListener

Skipped items are items that encounter an error during processing but are configured to be skipped rather than causing the entire step to fail.

The SkipListener listens to skipped items. Its methods will be called by Step implementations at the appropriate time in the step lifecycle.

The annotations corresponding to this interface are:

  • @OnSkipInRead
  • @OnSkipInWrite
  • @OnSkipInProcess

7.1. Implementing SkipListener

import org.springframework.batch.core.SkipListener;

public class PersonSkipListener implements SkipListener<Person, Person> {

    private int skipCount = 0;

    @Override
    public void onSkipInRead(Throwable t) {
        System.out.println("Skipped during reading: " + t.getMessage());
        // Add any specific logic for skipped items during reading
        skipCount++;
    }

    @Override
    public void onSkipInWrite(Person item, Throwable t) {
        System.out.println("Skipped during writing: " + item + ", error: " + t.getMessage());
        // Add any specific logic for skipped items during writing
        skipCount++;
    }

    @Override
    public void onSkipInProcess(Person item, Throwable t) {
        System.out.println("Skipped during processing: " + item + ", error: " + t.getMessage());
        // Add any specific logic for skipped items during processing
        skipCount++;
    }

    public int getSkipCount() {
        return skipCount;
    }
}

7.2. Configuring SkipListener

@Bean
public Step stepOne(){

    return steps.get("stepOne")
            .tasklet(new MyTaskOne())
            .listener(new PersonSkipListener())
            .build();
}

Clearly implementing and configuring the event listeners in the spring batch application is very simple to use and implement.

Happy Learning !!

References: Spring Batch Docs

Sourece Code on Github

Comments

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.