Learn to make use of Spring Batch decorators to classify the data to write to multiple destinations. This is very much needed when you work in enterprise architecture to pass/share data to multiple systems.
1. What are Decorators in Spring Batch
2.1. What is a Decorator, and when to use it?
In Spring batch, a Decorator is a specialized implementation of ItemReader
and ItemWriter
for specific usecases. It is based on the Decorator pattern. For example, if a user needs the specialized behavior to be appended to a pre-existing ItemReader then we can use the decorator.
Spring Batch offers some out-of-the-box decorators that can add additional behavior to your ItemReader
and ItemWriter
implementations.
1.2. Spring Decorators
Spring Batch includes the following decorators:
- SynchronizedItemStreamReader – When using an
ItemReader
that is not thread-safe, it can be used to make theItemReader
thread-safe. Spring Batch provides aSynchronizedItemStreamReaderBuilder
to construct an instance of theSynchronizedItemStreamReader
- SingleItemPeekableItemReader – adds a
peek()
method to anItemReader
. Thispeek()
method lets the user peek at one item ahead. Repeated calls to the peek return the same item, and this is the next item returned from the read method. Spring Batch provides aSingleItemPeekableItemReaderBuilder
to construct an instance of theSingleItemPeekableItemReader
. - MultiResourceItemWriter – wraps a
ResourceAwareItemWriterItemStream
and creates a new output resource when the count of items written in the current resource exceeds the limit. Spring Batch provides aMultiResourceItemWriterBuilder
to construct an instance of theMultiResourceItemWriter
. - ClassifierCompositeItemWriter – calls one of a collection of
ItemWriter
implementations for each item, based on a router pattern implemented through the provided Classifier. The implementation is thread-safe if all delegates are thread-safe. Spring Batch provides aClassifierCompositeItemWriterBuilder
to construct an instance of theClassifierCompositeItemWriter
. - ClassifierCompositeItemProcessor – is an
ItemProcessor
that calls one of a collection ofItemProcessor
implementations, based on a router pattern implemented through the providedClassifier
. Spring Batch provides aClassifierCompositeItemProcessorBuilder
to construct an instance of theClassifierCompositeItemProcessor
.
2. Spring Batch Decorator Example
In this example, we’ll read the customer data from the MySQL DB and classify the data based on the classifier and write it to two files.
2.1. Maven
We used the latest version of Spring Boot as of today, and by adding spring-boot-starter-batch dependency, it will automatically pull the latest versions.
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!-- Spring OXM -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<version>1.4.7</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
3.2. Custom Classifier
CustomerClassifier
is classifier class. It will divide the customer data based on the CustomerId
value is even or odd.
import org.springframework.batch.item.ItemWriter;
import org.springframework.classify.Classifier;
import com.howtodoinjava.batch.decorator.model.Customer;
public class CustomerClassifier implements Classifier<Customer, ItemWriter<? super Customer>> {
private static final long serialVersionUID = 1L;
private ItemWriter<Customer> evenItemWriter;
private ItemWriter<Customer> oddItemWriter;
public CustomerClassifier(ItemWriter<Customer> evenItemWriter, ItemWriter<Customer> oddItemWriter) {
this.evenItemWriter = evenItemWriter;
this.oddItemWriter = oddItemWriter;
}
@Override
public ItemWriter<? super Customer> classify(Customer customer) {
return customer.getId() % 2 == 0 ? evenItemWriter : oddItemWriter;
}
}
2.3. Job Configuration
This is the job configuration class where we are creating the necessary bean to perform the job.
- JdbcPagingItemReader – This bean help to read database records using JDBC using the pagination
- FlatFileItemWriter – This bean will write the data into JSON format to the output file
- StaxEventItemWriter – This bean will write the data into XML format to the output file
- ClassifierCompositeItemWriter – Calls one of a collection of ItemWriters for each item, based on a router pattern implemented through the provided Classifier. The implementation is thread-safe if all delegates are thread-safe.
- Step – This is the step configured in the batch job. This reads data and writes the record into XML and JSON format
- Job – Batch domain object representing a job
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;
import com.howtodoinjava.batch.decorator.aggregator.CustomLineAggregator;
import com.howtodoinjava.batch.decorator.classifier.CustomerClassifier;
import com.howtodoinjava.batch.decorator.mapper.CustomerRowMapper;
import com.howtodoinjava.batch.decorator.model.Customer;
@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public JdbcPagingItemReader<Customer> customerPagingItemReader() {
// reading database records using JDBC in a paging fashion
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());
// Sort Keys
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
// MySQL implementation of a PagingQueryProvider using database specific features.
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
public FlatFileItemWriter<Customer> jsonItemWriter() throws Exception {
String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
System.out.println(">> Output Path = " + customerOutputPath);
FlatFileItemWriter<Customer> writer = new FlatFileItemWriter<>();
writer.setLineAggregator(new CustomLineAggregator());
writer.setResource(new FileSystemResource(customerOutputPath));
writer.afterPropertiesSet();
return writer;
}
@Bean
public StaxEventItemWriter<Customer> xmlItemWriter() throws Exception {
String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
System.out.println(">> Output Path = " + customerOutputPath);
Map<String, Class> aliases = new HashMap<>();
aliases.put("customer", Customer.class);
XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
// StAX and Marshaller for serializing object to XML.
StaxEventItemWriter<Customer> writer = new StaxEventItemWriter<>();
writer.setRootTagName("customers");
writer.setMarshaller(marshaller);
writer.setResource(new FileSystemResource(customerOutputPath));
writer.afterPropertiesSet();
return writer;
}
@Bean
public ClassifierCompositeItemWriter<Customer> classifierCustomerCompositeItemWriter() throws Exception {
ClassifierCompositeItemWriter<Customer> compositeItemWriter = new ClassifierCompositeItemWriter<>();
compositeItemWriter.setClassifier(new CustomerClassifier(xmlItemWriter(), jsonItemWriter()));
return compositeItemWriter;
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(10)
.reader(customerPagingItemReader())
.writer(classifierCustomerCompositeItemWriter())
.stream(xmlItemWriter())
.stream(jsonItemWriter())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
}
2.4. Configure Persistence
This is a business model class.
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthdate;
}
CustomerRowMapper
class is used to map the resultset into the Customer
domain object.
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import com.howtodoinjava.batch.decorator.model.Customer;
public class CustomerRowMapper implements RowMapper<Customer> {
@Override
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
return Customer.builder().id(rs.getLong("id"))
.firstName(rs.getString("firstName"))
.lastName(rs.getString("lastName"))
.birthdate(rs.getString("birthdate")).build();
}
}
2.5. DataSource Configuration
Configuration to create DB connection with MySQL DB.
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/test spring.datasource.username=root spring.datasource.password=root spring.batch.initialize-schema=always
2.6. SQL Schema and Data Files
These are schema and SQL data files.
CREATE TABLE 'test'.'customer' (
'id' MEDIUMINT(8) UNSIGNED NOT NULL AUTO_INCREMENT,
'firstName' VARCHAR(255) NULL,
'lastName' VARCHAR(255) NULL,
'birthdate' VARCHAR(255) NULL,
PRIMARY KEY ('id')
) AUTO_INCREMENT=1;
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('1', 'John', 'Doe', '10-10-1952 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('2', 'Amy', 'Eugene', '05-07-1985 17:10:00');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('3', 'Laverne', 'Mann', '11-12-1988 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('4', 'Janice', 'Preston', '19-02-1960 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('5', 'Pauline', 'Rios', '29-08-1977 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('6', 'Perry', 'Burnside', '10-03-1981 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('7', 'Todd', 'Kinsey', '14-12-1998 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('8', 'Jacqueline', 'Hyde', '20-03-1983 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('9', 'Rico', 'Hale', '10-10-2000 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('10', 'Samuel', 'Lamm', '11-11-1999 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('11', 'Robert', 'Coster', '10-10-1972 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('12', 'Tamara', 'Soler', '02-01-1978 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('13', 'Justin', 'Kramer', '19-11-1951 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('14', 'Andrea', 'Law', '14-10-1959 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('15', 'Laura', 'Porter', '12-12-2010 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('16', 'Michael', 'Cantu', '11-04-1999 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('17', 'Andrew', 'Thomas', '04-05-1967 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('18', 'Jose', 'Hannah', '16-09-1950 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('19', 'Valerie', 'Hilbert', '13-06-1966 10:10:10');
INSERT INTO 'test'.'customer' ('id', 'firstName', 'lastName', 'birthdate') VALUES ('20', 'Patrick', 'Durham', '12-10-1978 10:10:10');
3. Demo
Run the application as a Spring boot application.
import java.util.Date;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
public class BatchApplication
implements CommandLineRunner {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("JobId", String.valueOf(System.currentTimeMillis()))
.addDate("date", new Date())
.addLong("time",System.currentTimeMillis()).toJobParameters();
JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("STATUS :: "+execution.getStatus());
}
}
The application will read the data from the database, and based on the value of the customer id (even or odd), it will write into the respective file.
Please look at the console, we see data has been written into multiple destinations, and destination data is classified based on the classifier we wrote.
4. Conclusion
In this Spring batch tutorial, we learned to use the Classifier with CompositeItemWriter to classify the data and write to multiple destinations.
Happy Learning !!
Comments