Learn to use Spring batch partitioning to use multiple threads to process a range of data sets in a spring boot application.
1. Parallel processing and Partitioning
1.1. Parallel processing
Mostly batch processing problems can be solved using single-threaded, but few complex scenarios like single-threaded processing taking a long time to perform tasks, where parallel processing is needed, can be achieved using multi-threaded models.
At a very high level, there are two modes of parallel processing:
- Single process, multi-threaded
- Multi-process
These break down into categories as well, as follows:
- Multi-threaded
Step
(single process) - Parallel
Step
s (single process) - Remote Chunking of
Step
(multi process) - Partitioning a
Step
(single or multi process)
1.2. Partitioning in Spring batch
Partitioning uses multiple threads to process a range of data sets. The range of data sets can be defined programmatically. It’s on use case, how many threads we want to create to be used in a partition(s). The number of threads is purely based on the need/requirement.
Spring Batch is single threaded by default. In order to make the parallel processing, we need to Partition the steps of the batch job.
Partitioning is useful when we have millions of records to read from source systems and we can’t just rely on a single thread to process all records, which can be time-consuming. We want to use multiple threads to read and process data to effectively use the system resources.
2. Spring batch partitioning example
In this tutorial, we will read some data from a table and write it into another table. We can create millions of records into DB in order to experience how long the process would be if using Single thread batch processing. Here I have created few to understand how the program/concept works.
2.1. Maven dependencies
We used the latest version of Spring Boot as of today and by adding spring batch dependency, it will automatically pull the latest versions.
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.2</version> <optional>true</optional> </dependency> </dependencies>
2.2. Partitioner
Partitioner is the central strategy interface for creating input parameters for a partitioned step in the form of ExecutionContext
instances. The usual aim is to create a set of distinct input values, e.g. a set of non-overlapping primary key ranges, or a set of unique filenames.
Here we are querying the table to get the MAX
and MIN
id values (assuming they are sequential), and based on that we’re creating partitions between all records.
For partitioner, we have used gridSize = number of threads
. Use your own custom value based on your requirements.
package com.howtodoinjava.batch.decorator.classifier; import java.util.HashMap; import java.util.Map; import javax.sql.DataSource; import org.springframework.batch.core.partition.support.Partitioner; import org.springframework.batch.item.ExecutionContext; import org.springframework.jdbc.core.JdbcOperations; import org.springframework.jdbc.core.JdbcTemplate; public class ColumnRangePartitioner implements Partitioner { private JdbcOperations jdbcTemplate; private String table; private String column; public void setTable(String table) { this.table = table; } public void setColumn(String column) { this.column = column; } public void setDataSource(DataSource dataSource) { jdbcTemplate = new JdbcTemplate(dataSource); } @Override public Map<String, ExecutionContext> partition(int gridSize) { int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") FROM " + table, Integer.class); int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") FROM " + table, Integer.class); int targetSize = (max - min) / gridSize + 1; Map<String, ExecutionContext> result = new HashMap<>(); int number = 0; int start = min; int end = start + targetSize - 1; while (start <= max) { ExecutionContext value = new ExecutionContext(); result.put("partition" + number, value); if(end >= max) { end = max; } value.putInt("minValue", start); value.putInt("maxValue", end); start += targetSize; end += targetSize; number++; } return result; } }
2.3. Job configuration
This is the job configuration class where we are creating the necessary bean to perform the job. In this example, we used SimpleAsyncTaskExecutor
which is the simplest multi-threaded implementation of the TaskExecutor
interface.
We’ve used partitioner in the Step
to create a partition step builder for a remote (or local) step.
Using these Step
for each chunk of data ‘reading, processing and writing’, happens altogether in different threads. Hence, the processed records may not be in the same sequential order as fed into it.
Below are the things to look –
- A throttle limit imposed on the task executor say, when it is backed by some thread pool. This limit defaults to 4 but can be configured differently.
- Concurrency limits might comes on the resource used in the
Step
, say the DataSource used. - ColumnRangePartitioner – Central strategy interface for creating input parameters for a partitioned step in the form of
ExecutionContext
instances. - JdbcPagingItemReader – This bean read data using Pagination and accepts minValue and maxValue to be accept based on range to get those data only. Here we setFetchSize to 1000, however you can use any value and make it configurable from properties file.
- JdbcBatchItemWriter – This bean will write the data into another table.
- Step – This is step configure in the batch job. This is read data and write it into XML and JSON format
- Job – Batch domain object representing a job
package com.howtodoinjava.batch.decorator.config; import java.util.HashMap; import java.util.Map; import javax.sql.DataSource; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.database.JdbcPagingItemReader; import org.springframework.batch.item.database.Order; import org.springframework.batch.item.database.support.MySqlPagingQueryProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.SimpleAsyncTaskExecutor; import com.example.domain.Customer; import com.example.mapper.CustomerRowMapper; @Configuration public class JobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; @Bean public ColumnRangePartitioner partitioner() { ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner(); columnRangePartitioner.setColumn("id"); columnRangePartitioner.setDataSource(dataSource); columnRangePartitioner.setTable("customer"); return columnRangePartitioner; } @Bean @StepScope public JdbcPagingItemReader<Customer> pagingItemReader( @Value("#{stepExecutionContext['minValue']}") Long minValue, @Value("#{stepExecutionContext['maxValue']}") Long maxValue) { System.out.println("reading " + minValue + " to " + maxValue); Map<String, Order> sortKeys = new HashMap<>(); sortKeys.put("id", Order.ASCENDING); MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("id, firstName, lastName, birthdate"); queryProvider.setFromClause("from customer"); queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue); queryProvider.setSortKeys(sortKeys); JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>(); reader.setDataSource(this.dataSource); reader.setFetchSize(1000); reader.setRowMapper(new CustomerRowMapper()); reader.setQueryProvider(queryProvider); return reader; } @Bean @StepScope public JdbcBatchItemWriter<Customer> customerItemWriter() { JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>(); itemWriter.setDataSource(dataSource); itemWriter.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)"); itemWriter.setItemSqlParameterSourceProvider (new BeanPropertyItemSqlParameterSourceProvider<>()); itemWriter.afterPropertiesSet(); return itemWriter; } // Master @Bean public Step step1() { return stepBuilderFactory.get("step1") .partitioner(slaveStep().getName(), partitioner()) .step(slaveStep()) .gridSize(4) .taskExecutor(new SimpleAsyncTaskExecutor()) .build(); } // slave step @Bean public Step slaveStep() { return stepBuilderFactory.get("slaveStep") .<Customer, Customer>chunk(1000) .reader(pagingItemReader(null, null)) .writer(customerItemWriter()) .build(); } @Bean public Job job() { return jobBuilderFactory.get("job") .start(step1()) .build(); } }
2.4 Entity and Mapper class
This is a business model class.
package com.howtodoinjava.batch.decorator.model; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @Data @AllArgsConstructor @Builder @NoArgsConstructor public class Customer { private Long id; private String firstName; private String lastName; private String birthdate; }
CustomerRowMapper
class is used to map the resultset into the Customer
domain object.
package com.howtodoinjava.batch.decorator.mapper; import java.sql.ResultSet; import java.sql.SQLException; import org.springframework.jdbc.core.RowMapper; import com.howtodoinjava.batch.decorator.model.Customer; public class CustomerRowMapper implements RowMapper<Customer> { @Override public Customer mapRow(ResultSet rs, int rowNum) throws SQLException { return Customer.builder() .id(rs.getLong("id")) .firstName(rs.getString("firstName")) .lastName(rs.getString("lastName")) .birthdate(rs.getString("birthdate")) .build(); } }
2.5. application.properties
Configuration to create DB connection with MySQL DB.
spring.datasource.url=jdbc:h2:mem:test spring.datasource.driverClassName=org.h2.Driver spring.datasource.username=sa spring.datasource.password= spring.jpa.database-platform=org.hibernate.dialect.H2Dialect #Prevents running the job during application context creation spring.batch.job.enabled=false
2.6. JDBC config and schema files
These are schema and SQL data files.
CREATE TABLE customer ( id INT PRIMARY KEY, firstName VARCHAR(255) NULL, lastName VARCHAR(255) NULL, birthdate VARCHAR(255) NULL ); CREATE TABLE new_customer ( id INT PRIMARY KEY, firstName VARCHAR(255) NULL, lastName VARCHAR(255) NULL, birthdate VARCHAR(255) NULL );
INSERT INTO customer VALUES ('1', 'John', 'Doe', '10-10-1952 10:10:10'); INSERT INTO customer VALUES ('2', 'Amy', 'Eugene', '05-07-1985 17:10:00'); INSERT INTO customer VALUES ('3', 'Laverne', 'Mann', '11-12-1988 10:10:10'); INSERT INTO customer VALUES ('4', 'Janice', 'Preston', '19-02-1960 10:10:10'); INSERT INTO customer VALUES ('5', 'Pauline', 'Rios', '29-08-1977 10:10:10'); INSERT INTO customer VALUES ('6', 'Perry', 'Burnside', '10-03-1981 10:10:10'); INSERT INTO customer VALUES ('7', 'Todd', 'Kinsey', '14-12-1998 10:10:10'); INSERT INTO customer VALUES ('8', 'Jacqueline', 'Hyde', '20-03-1983 10:10:10'); INSERT INTO customer VALUES ('9', 'Rico', 'Hale', '10-10-2000 10:10:10'); INSERT INTO customer VALUES ('10', 'Samuel', 'Lamm', '11-11-1999 10:10:10'); INSERT INTO customer VALUES ('11', 'Robert', 'Coster', '10-10-1972 10:10:10'); INSERT INTO customer VALUES ('12', 'Tamara', 'Soler', '02-01-1978 10:10:10'); INSERT INTO customer VALUES ('13', 'Justin', 'Kramer', '19-11-1951 10:10:10'); INSERT INTO customer VALUES ('14', 'Andrea', 'Law', '14-10-1959 10:10:10'); INSERT INTO customer VALUES ('15', 'Laura', 'Porter', '12-12-2010 10:10:10'); INSERT INTO customer VALUES ('16', 'Michael', 'Cantu', '11-04-1999 10:10:10'); INSERT INTO customer VALUES ('17', 'Andrew', 'Thomas', '04-05-1967 10:10:10'); INSERT INTO customer VALUES ('18', 'Jose', 'Hannah', '16-09-1950 10:10:10'); INSERT INTO customer VALUES ('19', 'Valerie', 'Hilbert', '13-06-1966 10:10:10'); INSERT INTO customer VALUES ('20', 'Patrick', 'Durham', '12-10-1978 10:10:10');
2.7. Demo
Run the application as Spring boot application.
import java.util.Date; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @EnableBatchProcessing public class LocalPartitioningApplication implements CommandLineRunner { @Autowired private JobLauncher jobLauncher; @Autowired private Job job; public static void main(String[] args) { SpringApplication.run(LocalPartitioningApplication.class, args); } @Override public void run(String... args) throws Exception { JobParameters jobParameters = new JobParametersBuilder() .addString("JobId", String.valueOf(System.currentTimeMillis())) .addDate("date", new Date()) .addLong("time",System.currentTimeMillis()).toJobParameters(); JobExecution execution = jobLauncher.run(job, jobParameters); System.out.println("STATUS :: "+execution.getStatus()); } }
The application will read the data from one database using partitions that we created and write it into another table.
2019-12-13 15:03:42.408 --- c.example.LocalPartitioningApplication : Started LocalPartitioningApplication in 3.504 seconds (JVM running for 4.877) 2019-12-13 15:03:42.523 --- o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] launched with the following parameters: [{JobId=1576229622410, date=1576229622410, time=1576229622410}] 2019-12-13 15:03:42.603 --- o.s.batch.core.job.SimpleStepHandler : Executing step: [step1] reading 1 to 5 reading 11 to 15 reading 16 to 20 reading 6 to 10 2019-12-13 15:03:42.890 --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep : Step: [slaveStep:partition0] executed in 173ms 2019-12-13 15:03:42.895 --- [cTaskExecutor-1] o.s.batch.core.step.AbstractStep : Step: [slaveStep:partition3] executed in 178ms 2019-12-13 15:03:42.895 --- [cTaskExecutor-3] o.s.batch.core.step.AbstractStep : Step: [slaveStep:partition1] executed in 177ms 2019-12-13 15:03:42.901 --- [cTaskExecutor-4] o.s.batch.core.step.AbstractStep : Step: [slaveStep:partition2] executed in 182ms 2019-12-13 15:03:42.917 --- o.s.batch.core.step.AbstractStep : Step: [step1] executed in 314ms 2019-12-13 15:03:42.942 --- o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] completed with the following parameters: [{JobId=1576229622410, date=1576229622410, time=1576229622410}] and the following status: [COMPLETED] in 374ms STATUS :: COMPLETED
3. Conclusion
In this spring batch step partitioner tutorial, we learned to use partitioning to process bulk data using multiple threads. It enables the application to utilize the full potential of machine hardware and operating system capabilities.
Happy Learning !!