Spring Batch Classifier: Composite ItemWriter Example

Learn to make use of Spring Batch decorators to classify the data to write to multiple destinations. This is very much needed when you work in enterprise architecture to pass/share data to multiple systems.

1. What are the Decorators in Spring Batch

2.1. What is a Decorator, and when to use it?

In Spring batch, a Decorator is a specialized implementation of ItemReader and ItemWriter for specific usecases. It is based on the Decorator pattern. For example, if a user needs the specialized behavior to be appended to a pre-existing ItemReader then we can use the decorator.

Spring Batch offers some out-of-the-box decorators that can add additional behavior to your ItemReader and ItemWriter implementations.

1.2. Spring Decorators

Spring Batch includes the following decorators:

  • SynchronizedItemStreamReader – When using an ItemReader that is not thread-safe, it can be used to make the ItemReader thread-safe. Spring Batch provides a SynchronizedItemStreamReaderBuilder to construct an instance of the SynchronizedItemStreamReader
  • SingleItemPeekableItemReader – adds a peek() method to an ItemReader. This peek() method lets the user peek at one item ahead. Repeated calls to the peek return the same item, and this is the next item returned from the read method. Spring Batch provides a SingleItemPeekableItemReaderBuilder to construct an instance of the SingleItemPeekableItemReader.
  • MultiResourceItemWriter – wraps a ResourceAwareItemWriterItemStream and creates a new output resource when the count of items written in the current resource exceeds the limit. Spring Batch provides a MultiResourceItemWriterBuilder to construct an instance of the MultiResourceItemWriter.
  • ClassifierCompositeItemWriter – calls one of a collection of ItemWriter implementations for each item, based on a router pattern implemented through the provided Classifier. The implementation is thread-safe if all delegates are thread-safe. Spring Batch provides a ClassifierCompositeItemWriterBuilder to construct an instance of the ClassifierCompositeItemWriter.
  • ClassifierCompositeItemProcessor – is an ItemProcessor that calls one of a collection of ItemProcessor implementations, based on a router pattern implemented through the provided Classifier. Spring Batch provides a ClassifierCompositeItemProcessorBuilder to construct an instance of the ClassifierCompositeItemProcessor.

2. Spring Batch Decorator Example

In this example, we’ll read the customer data from the MySQL DB and classify the data based on the classifier and write it to two files.

2.1. Maven

We used the latest version of Spring Boot as of today, and by adding spring-boot-starter-batch dependency, it will automatically pull the latest versions.

  <!-- Spring OXM -->

3.2. Custom Classifier

CustomerClassifier is classifier class. It will divide the customer data based on the CustomerId value is even or odd.

import org.springframework.batch.item.ItemWriter;
import org.springframework.classify.Classifier;
import com.howtodoinjava.batch.decorator.model.Customer;
public class CustomerClassifier implements Classifier<Customer, ItemWriter<? super Customer>> {
  private static final long serialVersionUID = 1L;
  private ItemWriter<Customer> evenItemWriter;
  private ItemWriter<Customer> oddItemWriter;
  public CustomerClassifier(ItemWriter<Customer> evenItemWriter, ItemWriter<Customer> oddItemWriter) {
    this.evenItemWriter = evenItemWriter;
    this.oddItemWriter = oddItemWriter;
  public ItemWriter<? super Customer> classify(Customer customer) {
    return customer.getId() % 2 == 0 ? evenItemWriter : oddItemWriter;

2.3. Job Configuration

This is the job configuration class where we are creating the necessary bean to perform the job.

  • JdbcPagingItemReader – This bean help to read database records using JDBC using the pagination
  • FlatFileItemWriter – This bean will write the data into JSON format to the output file
  • StaxEventItemWriter – This bean will write the data into XML format to the output file
  • ClassifierCompositeItemWriter – Calls one of a collection of ItemWriters for each item, based on a router pattern implemented through the provided Classifier. The implementation is thread-safe if all delegates are thread-safe.
  • Step – This is the step configured in the batch job. This reads data and writes the record into XML and JSON format
  • Job – Batch domain object representing a job
import com.howtodoinjava.batch.decorator.aggregator.CustomLineAggregator;
import com.howtodoinjava.batch.decorator.classifier.CustomerClassifier;
import com.howtodoinjava.batch.decorator.mapper.CustomerRowMapper;
import com.howtodoinjava.batch.decorator.model.Customer;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.io.File;
import java.util.HashMap;
import java.util.Map;

public class JobConfiguration {

  private JobRepository jobRepository;

  private PlatformTransactionManager transactionManager;

  private DataSource dataSource;

  public JdbcPagingItemReader<Customer> customerPagingItemReader() {

    // reading database records using JDBC in a paging fashion

    JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
    reader.setRowMapper(new CustomerRowMapper());

    // Sort Keys
    Map<String, Order> sortKeys = new HashMap<>();
    sortKeys.put("id", Order.ASCENDING);

    // 	MySQL implementation of a PagingQueryProvider using database specific features.

    MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
    queryProvider.setSelectClause("id, firstName, lastName, birthdate");
    queryProvider.setFromClause("from customer");
    return reader;

  public FlatFileItemWriter<Customer> jsonItemWriter() throws Exception {

    String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
    System.out.println(">> Output Path = " + customerOutputPath);
    FlatFileItemWriter<Customer> writer = new FlatFileItemWriter<>();
    writer.setLineAggregator(new CustomLineAggregator());
    writer.setResource(new FileSystemResource(customerOutputPath));
    return writer;

  public StaxEventItemWriter<Customer> xmlItemWriter() throws Exception {

    String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
    System.out.println(">> Output Path = " + customerOutputPath);
    Map<String, Class> aliases = new HashMap<>();
    aliases.put("customer", Customer.class);
    XStreamMarshaller marshaller = new XStreamMarshaller();

    // StAX and Marshaller for serializing object to XML.
    StaxEventItemWriter<Customer> writer = new StaxEventItemWriter<>();
    writer.setResource(new FileSystemResource(customerOutputPath));
    return writer;

  public ClassifierCompositeItemWriter<Customer> classifierCustomerCompositeItemWriter() throws Exception {
    ClassifierCompositeItemWriter<Customer> compositeItemWriter = new ClassifierCompositeItemWriter<>();
    compositeItemWriter.setClassifier(new CustomerClassifier(xmlItemWriter(), jsonItemWriter()));
    return compositeItemWriter;

  public Step step1() throws Exception {
    return new StepBuilder("step1", jobRepository).<Customer, Customer>chunk(10, transactionManager).reader(customerPagingItemReader()).writer(classifierCustomerCompositeItemWriter()).stream(xmlItemWriter()).stream(jsonItemWriter()).build();

  public Job job() throws Exception {
    return new JobBuilder("job", jobRepository).start(step1()).build();


2.4. Configure Persistence

This is a business model class.

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

public class Customer {

	private Long id;
	private String firstName;
	private String lastName;
	private String birthdate;

CustomerRowMapper class is used to map the resultset into the Customer domain object.

import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import com.howtodoinjava.batch.decorator.model.Customer;
public class CustomerRowMapper implements RowMapper<Customer> {
  public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
    return Customer.builder().id(rs.getLong("id"))

2.5. DataSource Configuration

Configuration to create DB connection with MySQL DB.


2.6. SQL Schema and Data Files

These are schema and SQL data files.

CREATE TABLE 'test'.'customer' (
  'firstName' VARCHAR(255) NULL,
  'lastName' VARCHAR(255) NULL,
  'birthdate' VARCHAR(255) NULL,
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('1', 'John', 'Doe', '10-10-1952 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('2', 'Amy', 'Eugene', '05-07-1985 17:10:00');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('3', 'Laverne', 'Mann', '11-12-1988 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('4', 'Janice', 'Preston', '19-02-1960 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('5', 'Pauline', 'Rios', '29-08-1977 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('6', 'Perry', 'Burnside', '10-03-1981 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('7', 'Todd', 'Kinsey', '14-12-1998 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('8', 'Jacqueline', 'Hyde', '20-03-1983 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('9', 'Rico', 'Hale', '10-10-2000 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('10', 'Samuel', 'Lamm', '11-11-1999 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('11', 'Robert', 'Coster', '10-10-1972 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('12', 'Tamara', 'Soler', '02-01-1978 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('13', 'Justin', 'Kramer', '19-11-1951 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('14', 'Andrea', 'Law', '14-10-1959 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('15', 'Laura', 'Porter', '12-12-2010 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('16', 'Michael', 'Cantu', '11-04-1999 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('17', 'Andrew', 'Thomas', '04-05-1967 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('18', 'Jose', 'Hannah', '16-09-1950 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('19', 'Valerie', 'Hilbert', '13-06-1966 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('20', 'Patrick', 'Durham', '12-10-1978 10:10:10');

3. Demo

Run the application as a Spring boot application.

import java.util.Date;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
public class BatchApplication 
          implements CommandLineRunner {
  private JobLauncher jobLauncher;
  private Job job;
  public static void main(String[] args) {
    SpringApplication.run(BatchApplication.class, args);
  public void run(String... args) throws Exception {
    JobParameters jobParameters = new JobParametersBuilder()
                .addString("JobId", String.valueOf(System.currentTimeMillis()))
        .addDate("date", new Date())
    JobExecution execution = jobLauncher.run(job, jobParameters);
    System.out.println("STATUS :: "+execution.getStatus());

The application will read the data from the database, and based on the value of the customer id (even or odd), it will write into the respective file.

Please look at the console, we see data has been written into multiple destinations, and destination data is classified based on the classifier we wrote.

4. Conclusion

In this Spring batch tutorial, we learned to use the Classifier with CompositeItemWriter to classify the data and write to multiple destinations.

Happy Learning !!

Sourceocde on Github


