Spring Batch Partitioning into Multiple Steps

Learn to use Spring batch partitioning to use multiple threads to process a range of data sets in a spring boot application.

1. Parallel Processing and Step Partitioning

1.1. Parallel Processing

Mostly batch processing problems can be solved using a single-threaded, but a few complex scenarios like single-threaded processing taking a long time to perform tasks, where parallel processing is needed, can be achieved using multi-threaded models.

At a very high level, there are two modes of parallel processing:

  • Single-process, multi-threaded
  • Multi-process

These break down into categories as well, as follows:

  • Multi-threaded Step (single process)
  • Parallel Steps (single process)
  • Remote Chunking of Step (multi-process)
  • Partitioning a Step (single or multi-process)

1.2. Partitioning in Spring batch

Spring Batch is single-threaded by default. In order to make the parallel processing, we need to partition the steps of the batch job.

In the above image, Manager is a Step that has been partitioned into multiple Worker steps that are also instances of Step. Workers can be some remote services, locally executing threads or any other independent tasks.

Spring batch allows passing the input data from Manager to Worker steps so each worker knows exactly what to do. The JobRepository ensures that every worker is executed only once in a single execution of the Job.

Partitioning uses multiple threads to process a range of data sets. The range of data sets can be defined programmatically. It’s on the usecase how many threads we want to create to be used in a partition(s). The number of threads is purely based on the need/requirement.

Partitioning is useful when we have millions of records to read from source systems, and we can’t just rely on a single thread to process all records, which can be time-consuming. We want to use multiple threads to read and process data to use system resources effectively.

2. Spring Batch Partitioning Example

In this tutorial, we will read some data from a database table and write it into another table. We can create millions of records into DB in order to experience how long the process would be if using Single thread batch processing. I have created a few programs to understand how the program/concept works here.

2.1. Maven

We used the latest version of Spring Boot, and adding spring-boot-starter-batch dependency will transitively pull the latest versions of the required dependencies.

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
  </dependency>
 
  <dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
  </dependency>
 
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.2</version>
    <optional>true</optional>
  </dependency>
</dependencies>

2.2. Step Partitioner

Partitioner is the central strategy interface for creating input parameters for a partitioned step in the form of ExecutionContext instances. The usual aim is to create a set of distinct input values, e.g. a set of non-overlapping primary key ranges or unique filenames.

In this example, we are querying the table to get the MAX and MIN id values (assuming they are sequential), and based on that we’re creating partitions between all records.

For the partitioner, we have used gridSize = number of threads. Use your own custom value based on your requirements.

import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.JdbcTemplate;
 
public class ColumnRangePartitioner implements Partitioner 
{
  private JdbcOperations jdbcTemplate;
  private String table;
  private String column;
 
  public void setTable(String table) {
    this.table = table;
  }
 
  public void setColumn(String column) {
    this.column = column;
  }
 
  public void setDataSource(DataSource dataSource) {
    jdbcTemplate = new JdbcTemplate(dataSource);
  }
 
  @Override
  public Map<String, ExecutionContext> partition(int gridSize) 
  {
    int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") FROM " + table, Integer.class);
    int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") FROM " + table, Integer.class);
 
    int targetSize = (max - min) / gridSize + 1;
 
    Map<String, ExecutionContext> result = new HashMap<>();
 
    int number = 0;
    int start = min;
    int end = start + targetSize - 1;
     
    while (start <= max) 
    {
      ExecutionContext value = new ExecutionContext();
      result.put("partition" + number, value);
       
      if(end >= max) {
        end = max;
      }
       
      value.putInt("minValue", start);
      value.putInt("maxValue", end);
 
      start += targetSize;
      end += targetSize;
 
      number++;
    }
    return result;
  }
}

2.3. Job Configuration

This is the job configuration class where we are creating the necessary beans to perform the job. In this example, we used SimpleAsyncTaskExecutor which is the simplest multi-threaded implementation of the TaskExecutor interface.

We’ve used partitioner in the Step to create a partition step builder for a remote (or local) step. Using these Step for each chunk of data ‘reading, processing and writing’, happens altogether in different threads. Hence, the processed records may not be in the same sequential order as fed into it.

Below are the things to look for:

  • A throttle limit is imposed on the task executor when it is backed by some thread pool. This limit defaults to 4 but can be configured differently.
  • Concurrency limits might come on the resource used in the Step, say the DataSource used.
  • ColumnRangePartitioner – Central strategy interface for creating input parameters for a partitioned step in the form of ExecutionContext instances.
  • JdbcPagingItemReader – This bean reads data using Pagination and accepts minValue and maxValue to be accepted based on range to get those data only. Here we setFetchSize to 1000 however you can use any value and make it configurable from the properties file.
  • JdbcBatchItemWriter – This bean will write the data into another table.
  • Step – This is the step configured in the batch job. This is reading data and writing it into XML and JSON format.
  • Job – Batch domain object representing a job.
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import com.example.domain.Customer;
import com.example.mapper.CustomerRowMapper;
 
@Configuration
public class JobConfiguration {
  @Autowired
  private JobBuilderFactory jobBuilderFactory;
 
  @Autowired
  private StepBuilderFactory stepBuilderFactory;
 
  @Autowired
  private DataSource dataSource;
 
  @Bean
  public ColumnRangePartitioner partitioner() 
  {
    ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();
    columnRangePartitioner.setColumn("id");
    columnRangePartitioner.setDataSource(dataSource);
    columnRangePartitioner.setTable("customer");
    return columnRangePartitioner;
  }
 
  @Bean
  @StepScope
  public JdbcPagingItemReader<Customer> pagingItemReader(
      @Value("#{stepExecutionContext['minValue']}") Long minValue,
      @Value("#{stepExecutionContext['maxValue']}") Long maxValue) 
  {
    System.out.println("reading " + minValue + " to " + maxValue);
 
    Map<String, Order> sortKeys = new HashMap<>();
    sortKeys.put("id", Order.ASCENDING);
     
    MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
    queryProvider.setSelectClause("id, firstName, lastName, birthdate");
    queryProvider.setFromClause("from customer");
    queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);
    queryProvider.setSortKeys(sortKeys);
     
    JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
    reader.setDataSource(this.dataSource);
    reader.setFetchSize(1000);
    reader.setRowMapper(new CustomerRowMapper());
    reader.setQueryProvider(queryProvider);
     
    return reader;
  }
   
   
  @Bean
  @StepScope
  public JdbcBatchItemWriter<Customer> customerItemWriter()
  {
    JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
    itemWriter.setDataSource(dataSource);
    itemWriter.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)");
 
    itemWriter.setItemSqlParameterSourceProvider
      (new BeanPropertyItemSqlParameterSourceProvider<>());
    itemWriter.afterPropertiesSet();
     
    return itemWriter;
  }
   
  // Master
  @Bean
  public Step step1() 
  {
    return stepBuilderFactory.get("step1")
        .partitioner(workerStep().w(), paorkerrtitioner())
        .step(workerStep())
        .gridSize(4)
        .taskExecutor(new SimpleAsyncTaskExecutor())
        .build();
  }
   
  // slave step
  @Bean
  public Step workerStep() 
  {
    return stepBuilderFactory.get("workerStep")
        .<Customer, Customer>chunk(1000)
        .reader(pagingItemReader(null, null))
        .writer(customerItemWriter())
        .build();
  }
   
  @Bean
  public Job job() 
  {
    return jobBuilderFactory.get("job")
        .start(step1())
        .build();
  }
}

2.4 Entity and Mapper

Customer is a business model class.

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class Customer
{
	private Long id;
	private String firstName;
	private String lastName;
	private String birthdate;
}

CustomerRowMapper class is used to map the resultset into the Customer domain object.

import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import com.howtodoinjava.batch.decorator.model.Customer;

public class CustomerRowMapper implements RowMapper<Customer> {

	@Override
	public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
		return Customer.builder()
					.id(rs.getLong("id"))
					.firstName(rs.getString("firstName"))
					.lastName(rs.getString("lastName"))
					.birthdate(rs.getString("birthdate"))
					.build();
	}
}

2.5. application.properties

Configuration to create DB connection with MySQL DB.

spring.datasource.url=jdbc:h2:mem:test
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect

#Prevents running the job during application context creation
spring.batch.job.enabled=false

2.6. JDBC Config and Schema Files

These are schema and SQL data files.

CREATE TABLE customer (
	id INT PRIMARY KEY,
	firstName VARCHAR(255) NULL,
	lastName VARCHAR(255) NULL,
	birthdate VARCHAR(255) NULL
);

CREATE TABLE new_customer (
	id INT PRIMARY KEY,
	firstName VARCHAR(255) NULL,
	lastName VARCHAR(255) NULL,
	birthdate VARCHAR(255) NULL
);
INSERT INTO customer VALUES ('1', 'John', 'Doe', '10-10-1952 10:10:10');
INSERT INTO customer VALUES ('2', 'Amy', 'Eugene', '05-07-1985 17:10:00');
INSERT INTO customer VALUES ('3', 'Laverne', 'Mann', '11-12-1988 10:10:10');
INSERT INTO customer VALUES ('4', 'Janice', 'Preston', '19-02-1960 10:10:10');
INSERT INTO customer VALUES ('5', 'Pauline', 'Rios', '29-08-1977 10:10:10');
INSERT INTO customer VALUES ('6', 'Perry', 'Burnside', '10-03-1981 10:10:10');
INSERT INTO customer VALUES ('7', 'Todd', 'Kinsey', '14-12-1998 10:10:10');
INSERT INTO customer VALUES ('8', 'Jacqueline', 'Hyde', '20-03-1983 10:10:10');
INSERT INTO customer VALUES ('9', 'Rico', 'Hale', '10-10-2000 10:10:10');
INSERT INTO customer VALUES ('10', 'Samuel', 'Lamm', '11-11-1999 10:10:10');
INSERT INTO customer VALUES ('11', 'Robert', 'Coster', '10-10-1972 10:10:10');
INSERT INTO customer VALUES ('12', 'Tamara', 'Soler', '02-01-1978 10:10:10');
INSERT INTO customer VALUES ('13', 'Justin', 'Kramer', '19-11-1951 10:10:10');
INSERT INTO customer VALUES ('14', 'Andrea', 'Law', '14-10-1959 10:10:10');
INSERT INTO customer VALUES ('15', 'Laura', 'Porter', '12-12-2010 10:10:10');
INSERT INTO customer VALUES ('16', 'Michael', 'Cantu', '11-04-1999 10:10:10');
INSERT INTO customer VALUES ('17', 'Andrew', 'Thomas', '04-05-1967 10:10:10');
INSERT INTO customer VALUES ('18', 'Jose', 'Hannah', '16-09-1950 10:10:10');
INSERT INTO customer VALUES ('19', 'Valerie', 'Hilbert', '13-06-1966 10:10:10');
INSERT INTO customer VALUES ('20', 'Patrick', 'Durham', '12-10-1978 10:10:10');

3. Demo

Run the application as a Spring boot application.

import java.util.Date;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class LocalPartitioningApplication implements CommandLineRunner
{
	@Autowired
	private JobLauncher jobLauncher;

	@Autowired
	private Job job;

	public static void main(String[] args) {
		SpringApplication.run(LocalPartitioningApplication.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		JobParameters jobParameters = new JobParametersBuilder()
                .addString("JobId", String.valueOf(System.currentTimeMillis()))
				.addDate("date", new Date())
                .addLong("time",System.currentTimeMillis()).toJobParameters();

		JobExecution execution = jobLauncher.run(job, jobParameters);

		System.out.println("STATUS :: "+execution.getStatus());
	}
}

The application will read the data from one database using partitions that we created and write it into another table.

2019-12-13 15:03:42.408   ---  c.example.LocalPartitioningApplication   : Started LocalPartitioningApplication
in 3.504 seconds (JVM running for 4.877)

2019-12-13 15:03:42.523   ---  o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]]
launched with the following parameters: [{JobId=1576229622410, date=1576229622410, time=1576229622410}]

2019-12-13 15:03:42.603   ---  o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]

reading 1 to 5
reading 11 to 15
reading 16 to 20
reading 6 to 10

2019-12-13 15:03:42.890   --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition0] executed in 173ms
2019-12-13 15:03:42.895   --- [cTaskExecutor-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition3] executed in 178ms
2019-12-13 15:03:42.895   --- [cTaskExecutor-3] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition1] executed in 177ms
2019-12-13 15:03:42.901   --- [cTaskExecutor-4] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition2] executed in 182ms

2019-12-13 15:03:42.917   ---  o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 314ms

2019-12-13 15:03:42.942   ---  o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] completed
with the following parameters: [{JobId=1576229622410, date=1576229622410, time=1576229622410}]
and the following status: [COMPLETED] in 374ms

STATUS :: COMPLETED

4. Conclusion

In this spring batch step partitioner example with database, we learned to use partitioning to process bulk data using multiple threads. It enables the application to utilize the full potential of machine hardware and operating system capabilities.

Happy Learning !!

Sourcecode Download

Leave a Reply

0 Comments
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.

Our Blogs

REST API Tutorial