Spring Batch Classifier with Composite Item Writer

Learn to make use of Spring Batch decorators to classify the data to write to multiple destinations. This is very much needed when you work in enterprise architecture to pass/share data to multiple systems.

1. What are Decorators in Spring Batch

2.1. What is a Decorator, and when to use it?

In Spring batch, a Decorator is a specialized implementation of ItemReader and ItemWriter for specific usecases. It is based on the Decorator pattern. For example, if a user needs the specialized behavior to be appended to a pre-existing ItemReader then we can use the decorator.

Spring Batch offers some out-of-the-box decorators that can add additional behavior to your ItemReader and ItemWriter implementations.

1.2. Spring Decorators

Spring Batch includes the following decorators:

  • SynchronizedItemStreamReader – When using an ItemReader that is not thread-safe, it can be used to make the ItemReader thread-safe. Spring Batch provides a SynchronizedItemStreamReaderBuilder to construct an instance of the SynchronizedItemStreamReader
  • SingleItemPeekableItemReader – adds a peek() method to an ItemReader. This peek() method lets the user peek at one item ahead. Repeated calls to the peek return the same item, and this is the next item returned from the read method. Spring Batch provides a SingleItemPeekableItemReaderBuilder to construct an instance of the SingleItemPeekableItemReader.
  • MultiResourceItemWriter – wraps a ResourceAwareItemWriterItemStream and creates a new output resource when the count of items written in the current resource exceeds the limit. Spring Batch provides a MultiResourceItemWriterBuilder to construct an instance of the MultiResourceItemWriter.
  • ClassifierCompositeItemWriter – calls one of a collection of ItemWriter implementations for each item, based on a router pattern implemented through the provided Classifier. The implementation is thread-safe if all delegates are thread-safe. Spring Batch provides a ClassifierCompositeItemWriterBuilder to construct an instance of the ClassifierCompositeItemWriter.
  • ClassifierCompositeItemProcessor – is an ItemProcessor that calls one of a collection of ItemProcessor implementations, based on a router pattern implemented through the provided Classifier. Spring Batch provides a ClassifierCompositeItemProcessorBuilder to construct an instance of the ClassifierCompositeItemProcessor.

2. Spring Batch Decorator Example

In this example, we’ll read the customer data from the MySQL DB and classify the data based on the classifier and write it to two files.

2.1. Maven

We used the latest version of Spring Boot as of today, and by adding spring-boot-starter-batch dependency, it will automatically pull the latest versions.

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
  </dependency>
 
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
  </dependency>
 
  <!-- Spring OXM -->
 
  <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-oxm</artifactId>
  </dependency>
 
  <dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
  </dependency>
 
  <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <scope>runtime</scope>
  </dependency>
 
  <dependency>
    <groupId>com.thoughtworks.xstream</groupId>
    <artifactId>xstream</artifactId>
    <version>1.4.7</version>
  </dependency>
 
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
  </dependency>
 
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
  </dependency>
 
  <dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-test</artifactId>
    <scope>test</scope>
  </dependency>
 
</dependencies>

3.2. Custom Classifier

CustomerClassifier is classifier class. It will divide the customer data based on the CustomerId value is even or odd.

import org.springframework.batch.item.ItemWriter;
import org.springframework.classify.Classifier;
import com.howtodoinjava.batch.decorator.model.Customer;
 
public class CustomerClassifier implements Classifier<Customer, ItemWriter<? super Customer>> {
 
  private static final long serialVersionUID = 1L;
   
  private ItemWriter<Customer> evenItemWriter;
  private ItemWriter<Customer> oddItemWriter;
 
  public CustomerClassifier(ItemWriter<Customer> evenItemWriter, ItemWriter<Customer> oddItemWriter) {
    this.evenItemWriter = evenItemWriter;
    this.oddItemWriter = oddItemWriter;
  }
 
  @Override
  public ItemWriter<? super Customer> classify(Customer customer) {
    return customer.getId() % 2 == 0 ? evenItemWriter : oddItemWriter;
  }
}

2.3. Job Configuration

This is the job configuration class where we are creating the necessary bean to perform the job.

  • JdbcPagingItemReader – This bean help to read database records using JDBC using the pagination
  • FlatFileItemWriter – This bean will write the data into JSON format to the output file
  • StaxEventItemWriter – This bean will write the data into XML format to the output file
  • ClassifierCompositeItemWriter – Calls one of a collection of ItemWriters for each item, based on a router pattern implemented through the provided Classifier. The implementation is thread-safe if all delegates are thread-safe.
  • Step – This is the step configured in the batch job. This reads data and writes the record into XML and JSON format
  • Job – Batch domain object representing a job
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;
import com.howtodoinjava.batch.decorator.aggregator.CustomLineAggregator;
import com.howtodoinjava.batch.decorator.classifier.CustomerClassifier;
import com.howtodoinjava.batch.decorator.mapper.CustomerRowMapper;
import com.howtodoinjava.batch.decorator.model.Customer;
 
@Configuration
public class JobConfiguration {
 
  @Autowired
  private JobBuilderFactory jobBuilderFactory;
 
  @Autowired
  private StepBuilderFactory stepBuilderFactory;
 
  @Autowired
  private DataSource dataSource;
 
  @Bean
  public JdbcPagingItemReader<Customer> customerPagingItemReader() {
 
    // reading database records using JDBC in a paging fashion
 
    JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
    reader.setDataSource(this.dataSource);
    reader.setFetchSize(1000);
    reader.setRowMapper(new CustomerRowMapper());
 
    // Sort Keys
    Map<String, Order> sortKeys = new HashMap<>();
    sortKeys.put("id", Order.ASCENDING);
 
    //  MySQL implementation of a PagingQueryProvider using database specific features.
 
    MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
    queryProvider.setSelectClause("id, firstName, lastName, birthdate");
    queryProvider.setFromClause("from customer");
    queryProvider.setSortKeys(sortKeys);
    reader.setQueryProvider(queryProvider);
    return reader;
  }
 
  @Bean
  public FlatFileItemWriter<Customer> jsonItemWriter() throws Exception {
 
    String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
    System.out.println(">> Output Path = " + customerOutputPath);
    FlatFileItemWriter<Customer> writer = new FlatFileItemWriter<>();
    writer.setLineAggregator(new CustomLineAggregator());
    writer.setResource(new FileSystemResource(customerOutputPath));
    writer.afterPropertiesSet();
    return writer;
  }
 
  @Bean
  public StaxEventItemWriter<Customer> xmlItemWriter() throws Exception {
 
    String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
    System.out.println(">> Output Path = " + customerOutputPath);
    Map<String, Class> aliases = new HashMap<>();
    aliases.put("customer", Customer.class);
    XStreamMarshaller marshaller = new XStreamMarshaller();
    marshaller.setAliases(aliases);
 
    // StAX and Marshaller for serializing object to XML.
    StaxEventItemWriter<Customer> writer = new StaxEventItemWriter<>();
    writer.setRootTagName("customers");
    writer.setMarshaller(marshaller);
    writer.setResource(new FileSystemResource(customerOutputPath));
    writer.afterPropertiesSet();
    return writer;
  }
 
  @Bean
  public ClassifierCompositeItemWriter<Customer> classifierCustomerCompositeItemWriter() throws Exception {
    ClassifierCompositeItemWriter<Customer> compositeItemWriter = new ClassifierCompositeItemWriter<>();
    compositeItemWriter.setClassifier(new CustomerClassifier(xmlItemWriter(), jsonItemWriter()));
    return compositeItemWriter;
  }
 
  @Bean
  public Step step1() throws Exception {
    return stepBuilderFactory.get("step1")
        .<Customer, Customer>chunk(10)
        .reader(customerPagingItemReader())
        .writer(classifierCustomerCompositeItemWriter())
        .stream(xmlItemWriter())
        .stream(jsonItemWriter())
        .build();
  }
 
  @Bean
  public Job job() throws Exception {
    return jobBuilderFactory.get("job")
        .start(step1())
        .build();
  }
 
}

2.4. Configure Persistence

This is a business model class.

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class Customer {

	private Long id;
	private String firstName;
	private String lastName;
	private String birthdate;
}

CustomerRowMapper class is used to map the resultset into the Customer domain object.

import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import com.howtodoinjava.batch.decorator.model.Customer;
 
public class CustomerRowMapper implements RowMapper<Customer> {
 
  @Override
  public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
    return Customer.builder().id(rs.getLong("id"))
        .firstName(rs.getString("firstName"))
        .lastName(rs.getString("lastName"))
        .birthdate(rs.getString("birthdate")).build();
  }
}

2.5. DataSource Configuration

Configuration to create DB connection with MySQL DB.

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=root
spring.batch.initialize-schema=always

2.6. SQL Schema and Data Files

These are schema and SQL data files.

CREATE TABLE 'test'.'customer' (
  'id' MEDIUMINT(8) UNSIGNED NOT NULL AUTO_INCREMENT,
  'firstName' VARCHAR(255) NULL,
  'lastName' VARCHAR(255) NULL,
  'birthdate' VARCHAR(255) NULL,
PRIMARY KEY ('id')
) AUTO_INCREMENT=1;
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('1', 'John', 'Doe', '10-10-1952 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('2', 'Amy', 'Eugene', '05-07-1985 17:10:00');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('3', 'Laverne', 'Mann', '11-12-1988 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('4', 'Janice', 'Preston', '19-02-1960 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('5', 'Pauline', 'Rios', '29-08-1977 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('6', 'Perry', 'Burnside', '10-03-1981 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('7', 'Todd', 'Kinsey', '14-12-1998 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('8', 'Jacqueline', 'Hyde', '20-03-1983 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('9', 'Rico', 'Hale', '10-10-2000 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('10', 'Samuel', 'Lamm', '11-11-1999 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('11', 'Robert', 'Coster', '10-10-1972 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('12', 'Tamara', 'Soler', '02-01-1978 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('13', 'Justin', 'Kramer', '19-11-1951 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('14', 'Andrea', 'Law', '14-10-1959 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('15', 'Laura', 'Porter', '12-12-2010 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('16', 'Michael', 'Cantu', '11-04-1999 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('17', 'Andrew', 'Thomas', '04-05-1967 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('18', 'Jose', 'Hannah', '16-09-1950 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('19', 'Valerie', 'Hilbert', '13-06-1966 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('20', 'Patrick', 'Durham', '12-10-1978 10:10:10');

3. Demo

Run the application as a Spring boot application.

import java.util.Date;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
 
@SpringBootApplication
@EnableBatchProcessing
public class BatchApplication 
          implements CommandLineRunner {
  @Autowired
  private JobLauncher jobLauncher;
 
  @Autowired
  private Job job;
   
  public static void main(String[] args) {
    SpringApplication.run(BatchApplication.class, args);
  }
   
  @Override
  public void run(String... args) throws Exception {
    JobParameters jobParameters = new JobParametersBuilder()
                .addString("JobId", String.valueOf(System.currentTimeMillis()))
        .addDate("date", new Date())
                .addLong("time",System.currentTimeMillis()).toJobParameters();
     
    JobExecution execution = jobLauncher.run(job, jobParameters);
    System.out.println("STATUS :: "+execution.getStatus());
  }
}

The application will read the data from the database, and based on the value of the customer id (even or odd), it will write into the respective file.

Please look at the console, we see data has been written into multiple destinations, and destination data is classified based on the classifier we wrote.

4. Conclusion

In this Spring batch tutorial, we learned to use the Classifier with CompositeItemWriter to classify the data and write to multiple destinations.

Happy Learning !!

Sourceocde on Github

Leave a Reply

0 Comments
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.

Our Blogs

REST API Tutorial