Spring Batch – Writing to Multiple Destinations with Classifier

Learn to make use of Spring Batch decorators to classify the data to write to the multiple destinations. This is very much needed when you work in enterprise architecture to pass/share data to multiple systems.

What are decorators in Spring batch

2.1. What is a decorator and when to use it

Decorator is a design pattern. It’s an specialized implementation of ItemReader and ItemWriter for specific usecases.

In some cases, a user needs specialized behavior to be appended to a pre-existing ItemReader. Spring Batch offers some out of the box decorators that can add additional behavior to your ItemReader and ItemWriter implementations.

The destination could be flat-file or another DB or CSV or XML file. It depends upon your use case where you want to save the classified data.

2.2. Available decorators

Spring Batch includes the following decorators:

  • SynchronizedItemStreamReader – When using an ItemReader that is not thread safe, it can be used to make the ItemReader thread safe. Spring Batch provides a SynchronizedItemStreamReaderBuilder to construct an instance of the SynchronizedItemStreamReader
  • SingleItemPeekableItemReader – adds a peek() method to an ItemReader. This peek() method lets the user peek one item ahead. Repeated calls to the peek returns the same item, and this is the next item returned from the read method. Spring Batch provides a SingleItemPeekableItemReaderBuilder to construct an instance of the SingleItemPeekableItemReader.
  • MultiResourceItemWriter – wraps a ResourceAwareItemWriterItemStream and creates a new output resource when the count of items written in the current resource exceeds the limit. Spring Batch provides a MultiResourceItemWriterBuilder to construct an instance of the MultiResourceItemWriter.
  • ClassifierCompositeItemWriter – calls one of a collection of ItemWriter implementations for each item, based on a router pattern implemented through the provided Classifier. The implementation is thread-safe if all delegates are thread-safe. Spring Batch provides a ClassifierCompositeItemWriterBuilder to construct an instance of the ClassifierCompositeItemWriter.
  • ClassifierCompositeItemProcessor – is an ItemProcessor that calls one of a collection of ItemProcessor implementations, based on a router pattern implemented through the provided Classifier. Spring Batch provides a ClassifierCompositeItemProcessorBuilder to construct an instance of the ClassifierCompositeItemProcessor.

3. Spring batch decorator example

In this example, we’ll read the customer data from the MySQL DB and classify the data based on the classifier and write it to two files.

3.1. Maven dependencies

We used the latest version of Spring Boot as of today and by adding spring batch dependency, it will automatically pull the latest versions.

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-batch</artifactId>
	</dependency>

	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-jdbc</artifactId>
	</dependency>

	<!-- Spring OXM -->

	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-oxm</artifactId>
	</dependency>

	<dependency>
		<groupId>com.h2database</groupId>
		<artifactId>h2</artifactId>
		<scope>runtime</scope>
	</dependency>

	<dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<scope>runtime</scope>
	</dependency>

	<dependency>
		<groupId>com.thoughtworks.xstream</groupId>
		<artifactId>xstream</artifactId>
		<version>1.4.7</version>
	</dependency>

	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<optional>true</optional>
	</dependency>

	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>

	<dependency>
		<groupId>org.springframework.batch</groupId>
		<artifactId>spring-batch-test</artifactId>
		<scope>test</scope>
	</dependency>

</dependencies>

3.2. Classifiers

CustomerClassifier is classifier class. It will divide the customer data based on the CustomerId value is even or odd.

package com.howtodoinjava.batch.decorator.classifier;

import org.springframework.batch.item.ItemWriter;
import org.springframework.classify.Classifier;
import com.howtodoinjava.batch.decorator.model.Customer;

public class CustomerClassifier implements Classifier<Customer, ItemWriter<? super Customer>> {

	private static final long serialVersionUID = 1L;
	
	private ItemWriter<Customer> evenItemWriter;
	private ItemWriter<Customer> oddItemWriter;

	public CustomerClassifier(ItemWriter<Customer> evenItemWriter, ItemWriter<Customer> oddItemWriter) {
		this.evenItemWriter = evenItemWriter;
		this.oddItemWriter = oddItemWriter;
	}

	@Override
	public ItemWriter<? super Customer> classify(Customer customer) {
		return customer.getId() % 2 == 0 ? evenItemWriter : oddItemWriter;
	}
}

3.3. Job configuration

This is the job configuration class where we are creating the necessary bean to perform the job.

  • JdbcPagingItemReader – This bean help to read database records using JDBC using pagination fashion
  • FlatFileItemWriter – This bean will write the data into JSON format to the output file
  • StaxEventItemWriter – This bean will write the data into XML format to the output file
  • ClassifierCompositeItemWriter – Calls one of a collection of ItemWriters for each item, based on a router pattern implemented through the provided Classifier. The implementation is thread-safe if all delegates are thread-safe.
  • Step – This is step configure in the batch job. This is read data and write it into XML and JSON format
  • Job – Batch domain object representing a job
package com.howtodoinjava.batch.decorator.config;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;
import com.howtodoinjava.batch.decorator.aggregator.CustomLineAggregator;
import com.howtodoinjava.batch.decorator.classifier.CustomerClassifier;
import com.howtodoinjava.batch.decorator.mapper.CustomerRowMapper;
import com.howtodoinjava.batch.decorator.model.Customer;

@Configuration
public class JobConfiguration {

	@Autowired
	private JobBuilderFactory jobBuilderFactory;

	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Autowired
	private DataSource dataSource;

	@Bean
	public JdbcPagingItemReader<Customer> customerPagingItemReader() {

		// reading database records using JDBC in a paging fashion

		JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
		reader.setDataSource(this.dataSource);
		reader.setFetchSize(1000);
		reader.setRowMapper(new CustomerRowMapper());

		// Sort Keys
		Map<String, Order> sortKeys = new HashMap<>();
		sortKeys.put("id", Order.ASCENDING);

		// 	MySQL implementation of a PagingQueryProvider using database specific features.

		MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
		queryProvider.setSelectClause("id, firstName, lastName, birthdate");
		queryProvider.setFromClause("from customer");
		queryProvider.setSortKeys(sortKeys);
		reader.setQueryProvider(queryProvider);
		return reader;
	}

	@Bean
	public FlatFileItemWriter<Customer> jsonItemWriter() throws Exception {

		String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
		System.out.println(">> Output Path = " + customerOutputPath);
		FlatFileItemWriter<Customer> writer = new FlatFileItemWriter<>();
		writer.setLineAggregator(new CustomLineAggregator());
		writer.setResource(new FileSystemResource(customerOutputPath));
		writer.afterPropertiesSet();
		return writer;
	}

	@Bean
	public StaxEventItemWriter<Customer> xmlItemWriter() throws Exception {

		String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
		System.out.println(">> Output Path = " + customerOutputPath);
		Map<String, Class> aliases = new HashMap<>();
		aliases.put("customer", Customer.class);
		XStreamMarshaller marshaller = new XStreamMarshaller();
		marshaller.setAliases(aliases);

		// StAX and Marshaller for serializing object to XML.
		StaxEventItemWriter<Customer> writer = new StaxEventItemWriter<>();
		writer.setRootTagName("customers");
		writer.setMarshaller(marshaller);
		writer.setResource(new FileSystemResource(customerOutputPath));
		writer.afterPropertiesSet();
		return writer;
	}

	@Bean
	public ClassifierCompositeItemWriter<Customer> classifierCustomerCompositeItemWriter() throws Exception {
		ClassifierCompositeItemWriter<Customer> compositeItemWriter = new ClassifierCompositeItemWriter<>();
		compositeItemWriter.setClassifier(new CustomerClassifier(xmlItemWriter(), jsonItemWriter()));
		return compositeItemWriter;
	}

	@Bean
	public Step step1() throws Exception {
		return stepBuilderFactory.get("step1")
				.<Customer, Customer>chunk(10)
				.reader(customerPagingItemReader())
				.writer(classifierCustomerCompositeItemWriter())
				.stream(xmlItemWriter())
				.stream(jsonItemWriter())
				.build();
	}

	@Bean
	public Job job() throws Exception {
		return jobBuilderFactory.get("job")
				.start(step1())
				.build();
	}

}

3.5. Entity and Mapper class

This is a business model class.

package com.howtodoinjava.batch.decorator.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class Customer {

	private Long id;
	private String firstName;
	private String lastName;
	private String birthdate;
}

CustomerRowMapper class is used to map the resultset into the Customer domain object.

package com.howtodoinjava.batch.decorator.mapper;

import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import com.howtodoinjava.batch.decorator.model.Customer;

public class CustomerRowMapper implements RowMapper<Customer> {

	@Override
	public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
		return Customer.builder().id(rs.getLong("id"))
				.firstName(rs.getString("firstName"))
				.lastName(rs.getString("lastName"))
				.birthdate(rs.getString("birthdate")).build();
	}
}

3.6 application.properties

Configuration to create DB connection with MySQL DB.

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=root
spring.batch.initialize-schema=always

3.7. JDBC config and schema files

These are schema and SQL data files.

CREATE TABLE 'test'.'customer' (
	'id' MEDIUMINT(8) UNSIGNED NOT NULL AUTO_INCREMENT,
	'firstName' VARCHAR(255) NULL,
	'lastName' VARCHAR(255) NULL,
	'birthdate' VARCHAR(255) NULL,
PRIMARY KEY ('id')
) AUTO_INCREMENT=1;
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('1', 'John', 'Doe', '10-10-1952 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('2', 'Amy', 'Eugene', '05-07-1985 17:10:00');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('3', 'Laverne', 'Mann', '11-12-1988 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('4', 'Janice', 'Preston', '19-02-1960 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('5', 'Pauline', 'Rios', '29-08-1977 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('6', 'Perry', 'Burnside', '10-03-1981 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('7', 'Todd', 'Kinsey', '14-12-1998 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('8', 'Jacqueline', 'Hyde', '20-03-1983 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('9', 'Rico', 'Hale', '10-10-2000 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('10', 'Samuel', 'Lamm', '11-11-1999 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('11', 'Robert', 'Coster', '10-10-1972 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('12', 'Tamara', 'Soler', '02-01-1978 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('13', 'Justin', 'Kramer', '19-11-1951 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('14', 'Andrea', 'Law', '14-10-1959 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('15', 'Laura', 'Porter', '12-12-2010 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('16', 'Michael', 'Cantu', '11-04-1999 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('17', 'Andrew', 'Thomas', '04-05-1967 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('18', 'Jose', 'Hannah', '16-09-1950 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('19', 'Valerie', 'Hilbert', '13-06-1966 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('20', 'Patrick', 'Durham', '12-10-1978 10:10:10');

3.8. Demo

Run the application as Spring boot application.

package com.howtodoinjava.batch.decorator;

import java.util.Date;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class BatchApplication 
					implements CommandLineRunner {
	@Autowired
	private JobLauncher jobLauncher;

	@Autowired
	private Job job;
	
	public static void main(String[] args) {
		SpringApplication.run(BatchApplication.class, args);
	}
	
	@Override
	public void run(String... args) throws Exception {
		JobParameters jobParameters = new JobParametersBuilder()
                .addString("JobId", String.valueOf(System.currentTimeMillis()))
				.addDate("date", new Date())
                .addLong("time",System.currentTimeMillis()).toJobParameters();
		
		JobExecution execution = jobLauncher.run(job, jobParameters);
		System.out.println("STATUS :: "+execution.getStatus());
	}
}

The application will read the data from the database and based on the value of customer id (even or odd), it will write into the respective file.

Here is the output of both files.

[/xml ]

{"id":1,"firstName":"John","lastName":"Doe","birthdate":"10-10-1952 10:10:10"}
{"id":3,"firstName":"Laverne","lastName":"Mann","birthdate":"11-12-1988 10:10:10"}
{"id":5,"firstName":"Pauline","lastName":"Rios","birthdate":"29-08-1977 10:10:10"}
{"id":7,"firstName":"Todd","lastName":"Kinsey","birthdate":"14-12-1998 10:10:10"}
{"id":9,"firstName":"Rico","lastName":"Hale","birthdate":"10-10-2000 10:10:10"}
{"id":11,"firstName":"Robert","lastName":"Coster","birthdate":"10-10-1972 10:10:10"}
{"id":13,"firstName":"Justin","lastName":"Kramer","birthdate":"19-11-1951 10:10:10"}
{"id":15,"firstName":"Laura","lastName":"Porter","birthdate":"12-12-2010 10:10:10"}
{"id":17,"firstName":"Andrew","lastName":"Thomas","birthdate":"04-05-1967 10:10:10"}
{"id":19,"firstName":"Valerie","lastName":"Hilbert","birthdate":"13-06-1966 10:10:10"}

Conclusion

Please look at the console, you see data has been written into multiple destinations and destination data is classified based on the classifier we wrote.

It’s concluded that the classifier is classifying the data and writing to multiple destinations.

Happy Learning !!

Was this post helpful?

Join 8000+ Awesome Developers, Like YOU!

2 thoughts on “Spring Batch – Writing to Multiple Destinations with Classifier”

  1. thanks for your excellent example!

    I did somthing similar using JPA instead of JDBC but I get a
    org.springframework.batch.item.WriterNotOpenException: Writer must be open before it can be written to
    when trying to go into one of the two streams (odd even)

    any Idea why that might be?

    Reply

Leave a Comment

About HowToDoInJava

This blog provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions, and frequently asked interview questions.