Learn to make use of Spring Batch decorators to classify the data to write to the multiple destinations. This is very much needed when you work in enterprise architecture to pass/share data to multiple systems.
What are decorators in Spring batch
2.1. What is a decorator and when to use it
Decorator is a design pattern. It’s an specialized implementation of ItemReader
and ItemWriter
for specific usecases.
In some cases, a user needs specialized behavior to be appended to a pre-existing ItemReader
. Spring Batch offers some out of the box decorators that can add additional behavior to your ItemReader
and ItemWriter
implementations.
The destination could be flat-file or another DB or CSV or XML file. It depends upon your use case where you want to save the classified data.
2.2. Available decorators
Spring Batch includes the following decorators:
- SynchronizedItemStreamReader – When using an
ItemReader
that is not thread safe, it can be used to make theItemReader
thread safe. Spring Batch provides aSynchronizedItemStreamReaderBuilder
to construct an instance of theSynchronizedItemStreamReader
- SingleItemPeekableItemReader – adds a
peek()
method to anItemReader
. Thispeek()
method lets the user peek one item ahead. Repeated calls to the peek returns the same item, and this is the next item returned from the read method. Spring Batch provides aSingleItemPeekableItemReaderBuilder
to construct an instance of theSingleItemPeekableItemReader
. - MultiResourceItemWriter – wraps a
ResourceAwareItemWriterItemStream
and creates a new output resource when the count of items written in the current resource exceeds the limit. Spring Batch provides aMultiResourceItemWriterBuilder
to construct an instance of theMultiResourceItemWriter
. - ClassifierCompositeItemWriter – calls one of a collection of
ItemWriter
implementations for each item, based on a router pattern implemented through the provided Classifier. The implementation is thread-safe if all delegates are thread-safe. Spring Batch provides aClassifierCompositeItemWriterBuilder
to construct an instance of theClassifierCompositeItemWriter
. - ClassifierCompositeItemProcessor – is an
ItemProcessor
that calls one of a collection ofItemProcessor
implementations, based on a router pattern implemented through the providedClassifier
. Spring Batch provides aClassifierCompositeItemProcessorBuilder
to construct an instance of theClassifierCompositeItemProcessor
.
3. Spring batch decorator example
In this example, we’ll read the customer data from the MySQL DB and classify the data based on the classifier and write it to two files.
3.1. Maven dependencies
We used the latest version of Spring Boot as of today and by adding spring batch dependency, it will automatically pull the latest versions.
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <!-- Spring OXM --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>com.thoughtworks.xstream</groupId> <artifactId>xstream</artifactId> <version>1.4.7</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <scope>test</scope> </dependency> </dependencies>
3.2. Classifiers
CustomerClassifier
is classifier class. It will divide the customer data based on the CustomerId
value is even or odd.
package com.howtodoinjava.batch.decorator.classifier; import org.springframework.batch.item.ItemWriter; import org.springframework.classify.Classifier; import com.howtodoinjava.batch.decorator.model.Customer; public class CustomerClassifier implements Classifier<Customer, ItemWriter<? super Customer>> { private static final long serialVersionUID = 1L; private ItemWriter<Customer> evenItemWriter; private ItemWriter<Customer> oddItemWriter; public CustomerClassifier(ItemWriter<Customer> evenItemWriter, ItemWriter<Customer> oddItemWriter) { this.evenItemWriter = evenItemWriter; this.oddItemWriter = oddItemWriter; } @Override public ItemWriter<? super Customer> classify(Customer customer) { return customer.getId() % 2 == 0 ? evenItemWriter : oddItemWriter; } }
3.3. Job configuration
This is the job configuration class where we are creating the necessary bean to perform the job.
- JdbcPagingItemReader – This bean help to read database records using JDBC using pagination fashion
- FlatFileItemWriter – This bean will write the data into JSON format to the output file
- StaxEventItemWriter – This bean will write the data into XML format to the output file
- ClassifierCompositeItemWriter – Calls one of a collection of ItemWriters for each item, based on a router pattern implemented through the provided Classifier. The implementation is thread-safe if all delegates are thread-safe.
- Step – This is step configure in the batch job. This is read data and write it into XML and JSON format
- Job – Batch domain object representing a job
package com.howtodoinjava.batch.decorator.config; import java.io.File; import java.util.HashMap; import java.util.Map; import javax.sql.DataSource; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.item.database.JdbcPagingItemReader; import org.springframework.batch.item.database.Order; import org.springframework.batch.item.database.support.MySqlPagingQueryProvider; import org.springframework.batch.item.file.FlatFileItemWriter; import org.springframework.batch.item.support.ClassifierCompositeItemWriter; import org.springframework.batch.item.xml.StaxEventItemWriter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.FileSystemResource; import org.springframework.oxm.xstream.XStreamMarshaller; import com.howtodoinjava.batch.decorator.aggregator.CustomLineAggregator; import com.howtodoinjava.batch.decorator.classifier.CustomerClassifier; import com.howtodoinjava.batch.decorator.mapper.CustomerRowMapper; import com.howtodoinjava.batch.decorator.model.Customer; @Configuration public class JobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; @Bean public JdbcPagingItemReader<Customer> customerPagingItemReader() { // reading database records using JDBC in a paging fashion JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>(); reader.setDataSource(this.dataSource); reader.setFetchSize(1000); reader.setRowMapper(new CustomerRowMapper()); // Sort Keys Map<String, Order> sortKeys = new HashMap<>(); sortKeys.put("id", Order.ASCENDING); // MySQL implementation of a PagingQueryProvider using database specific features. MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("id, firstName, lastName, birthdate"); queryProvider.setFromClause("from customer"); queryProvider.setSortKeys(sortKeys); reader.setQueryProvider(queryProvider); return reader; } @Bean public FlatFileItemWriter<Customer> jsonItemWriter() throws Exception { String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath(); System.out.println(">> Output Path = " + customerOutputPath); FlatFileItemWriter<Customer> writer = new FlatFileItemWriter<>(); writer.setLineAggregator(new CustomLineAggregator()); writer.setResource(new FileSystemResource(customerOutputPath)); writer.afterPropertiesSet(); return writer; } @Bean public StaxEventItemWriter<Customer> xmlItemWriter() throws Exception { String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath(); System.out.println(">> Output Path = " + customerOutputPath); Map<String, Class> aliases = new HashMap<>(); aliases.put("customer", Customer.class); XStreamMarshaller marshaller = new XStreamMarshaller(); marshaller.setAliases(aliases); // StAX and Marshaller for serializing object to XML. StaxEventItemWriter<Customer> writer = new StaxEventItemWriter<>(); writer.setRootTagName("customers"); writer.setMarshaller(marshaller); writer.setResource(new FileSystemResource(customerOutputPath)); writer.afterPropertiesSet(); return writer; } @Bean public ClassifierCompositeItemWriter<Customer> classifierCustomerCompositeItemWriter() throws Exception { ClassifierCompositeItemWriter<Customer> compositeItemWriter = new ClassifierCompositeItemWriter<>(); compositeItemWriter.setClassifier(new CustomerClassifier(xmlItemWriter(), jsonItemWriter())); return compositeItemWriter; } @Bean public Step step1() throws Exception { return stepBuilderFactory.get("step1") .<Customer, Customer>chunk(10) .reader(customerPagingItemReader()) .writer(classifierCustomerCompositeItemWriter()) .stream(xmlItemWriter()) .stream(jsonItemWriter()) .build(); } @Bean public Job job() throws Exception { return jobBuilderFactory.get("job") .start(step1()) .build(); } }
3.5. Entity and Mapper class
This is a business model class.
package com.howtodoinjava.batch.decorator.model; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @Data @AllArgsConstructor @Builder @NoArgsConstructor public class Customer { private Long id; private String firstName; private String lastName; private String birthdate; }
CustomerRowMapper
class is used to map the resultset into the Customer
domain object.
package com.howtodoinjava.batch.decorator.mapper; import java.sql.ResultSet; import java.sql.SQLException; import org.springframework.jdbc.core.RowMapper; import com.howtodoinjava.batch.decorator.model.Customer; public class CustomerRowMapper implements RowMapper<Customer> { @Override public Customer mapRow(ResultSet rs, int rowNum) throws SQLException { return Customer.builder().id(rs.getLong("id")) .firstName(rs.getString("firstName")) .lastName(rs.getString("lastName")) .birthdate(rs.getString("birthdate")).build(); } }
3.6 application.properties
Configuration to create DB connection with MySQL DB.
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/test spring.datasource.username=root spring.datasource.password=root spring.batch.initialize-schema=always
3.7. JDBC config and schema files
These are schema and SQL data files.
CREATE TABLE 'test'.'customer' ( 'id' MEDIUMINT(8) UNSIGNED NOT NULL AUTO_INCREMENT, 'firstName' VARCHAR(255) NULL, 'lastName' VARCHAR(255) NULL, 'birthdate' VARCHAR(255) NULL, PRIMARY KEY ('id') ) AUTO_INCREMENT=1;
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('1', 'John', 'Doe', '10-10-1952 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('2', 'Amy', 'Eugene', '05-07-1985 17:10:00'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('3', 'Laverne', 'Mann', '11-12-1988 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('4', 'Janice', 'Preston', '19-02-1960 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('5', 'Pauline', 'Rios', '29-08-1977 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('6', 'Perry', 'Burnside', '10-03-1981 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('7', 'Todd', 'Kinsey', '14-12-1998 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('8', 'Jacqueline', 'Hyde', '20-03-1983 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('9', 'Rico', 'Hale', '10-10-2000 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('10', 'Samuel', 'Lamm', '11-11-1999 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('11', 'Robert', 'Coster', '10-10-1972 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('12', 'Tamara', 'Soler', '02-01-1978 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('13', 'Justin', 'Kramer', '19-11-1951 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('14', 'Andrea', 'Law', '14-10-1959 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('15', 'Laura', 'Porter', '12-12-2010 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('16', 'Michael', 'Cantu', '11-04-1999 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('17', 'Andrew', 'Thomas', '04-05-1967 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('18', 'Jose', 'Hannah', '16-09-1950 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('19', 'Valerie', 'Hilbert', '13-06-1966 10:10:10'); INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('20', 'Patrick', 'Durham', '12-10-1978 10:10:10');
3.8. Demo
Run the application as Spring boot application.
package com.howtodoinjava.batch.decorator; import java.util.Date; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @EnableBatchProcessing public class BatchApplication implements CommandLineRunner { @Autowired private JobLauncher jobLauncher; @Autowired private Job job; public static void main(String[] args) { SpringApplication.run(BatchApplication.class, args); } @Override public void run(String... args) throws Exception { JobParameters jobParameters = new JobParametersBuilder() .addString("JobId", String.valueOf(System.currentTimeMillis())) .addDate("date", new Date()) .addLong("time",System.currentTimeMillis()).toJobParameters(); JobExecution execution = jobLauncher.run(job, jobParameters); System.out.println("STATUS :: "+execution.getStatus()); } }
The application will read the data from the database and based on the value of customer id (even or odd), it will write into the respective file.
Here is the output of both files.
[/xml ]
{"id":1,"firstName":"John","lastName":"Doe","birthdate":"10-10-1952 10:10:10"} {"id":3,"firstName":"Laverne","lastName":"Mann","birthdate":"11-12-1988 10:10:10"} {"id":5,"firstName":"Pauline","lastName":"Rios","birthdate":"29-08-1977 10:10:10"} {"id":7,"firstName":"Todd","lastName":"Kinsey","birthdate":"14-12-1998 10:10:10"} {"id":9,"firstName":"Rico","lastName":"Hale","birthdate":"10-10-2000 10:10:10"} {"id":11,"firstName":"Robert","lastName":"Coster","birthdate":"10-10-1972 10:10:10"} {"id":13,"firstName":"Justin","lastName":"Kramer","birthdate":"19-11-1951 10:10:10"} {"id":15,"firstName":"Laura","lastName":"Porter","birthdate":"12-12-2010 10:10:10"} {"id":17,"firstName":"Andrew","lastName":"Thomas","birthdate":"04-05-1967 10:10:10"} {"id":19,"firstName":"Valerie","lastName":"Hilbert","birthdate":"13-06-1966 10:10:10"}
Conclusion
Please look at the console, you see data has been written into multiple destinations and destination data is classified based on the classifier we wrote.
It’s concluded that the classifier is classifying the data and writing to multiple destinations.
Happy Learning !!
thanks for your excellent example!
I did somthing similar using JPA instead of JDBC but I get a
org.springframework.batch.item.WriterNotOpenException: Writer must be open before it can be written to
when trying to go into one of the two streams (odd even)
any Idea why that might be?
Nicely explained!