//
you're reading...

Java 5 Features

How to use BlockingQueue and ThreadPoolExecutor in java

Life has become very easy for java programmers working on multi-threaded applications after release of JDK 5. JDK 5 brought many features related to multi-threaded processing which were kind of nightmare for application developers, and even worse for those who had to debug this code in future for bug fixing. Sometimes, this resulted in deadlock situations as well.

In this post, i will suggest to use such a new feature ThreadPoolExecutor in combination with BlockingQueue. I will let you know the best practices to use above classes in your application.

Sections in this post:

  • Introducing DemoTask
  • Adding CustomThreadPoolExecutor
  • Explaining BlockingQueue
  • Explaining RejectedExecutionHandler
  • Testing our code

Introducing DemoTask

I will not take much time here as our DemoTask is just another test thread written to support our logic and code.

package corejava.thread;

public class DemoThread implements Runnable {

	private String name = null;

	public DemoThread(String name) {
		this.name = name;
	}

	public String getName() {
		return this.name;
	}

	@Override
	public void run() {
		try {
			Thread.sleep(500);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("Executing : " + name);
	}
}

Adding CustomThreadPoolExecutor

This is important. Our CustomThreadPoolExecutor is extension of ThreadPoolExecutor. Even without extending the ThreadPoolExecutor, simply creating its instance and using it, will also work correctly. But, we will miss some extremely useful features in terms of control of execution.

ThreadPoolExecutor provides two excellent methods which i will highly recommend to override i.e. beforeExecute() and afterExecute() methods. They provide very good handle on execution life cycle of runnables to be executed. Lets see above methods inside our CustomThreadPoolExecutor.

package corejava.thread;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {

	public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit, BlockingQueue<runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
	}

	@Override
	protected void beforeExecute(Thread t, Runnable r) {
		super.beforeExecute(t, r);
		System.out.println("Perform beforeExecute() logic");
	}

	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		super.afterExecute(r, t);
		if (t != null) {
			System.out.println("Perform exception handler logic");
		}
		System.out.println("Perform afterExecute() logic");
	}

}

Explaining BlockingQueue

If you remember solving the producer-consumer problem, before JDK 5, consumer had to wait until producer put something in resource queue. This problem can be easily solved using new BlockingQueue.

BlockingQueue is like another Queue implementations with additional capabilities. Any attempt, to retrieve something out of it, can be seen safe as it will not return empty handed. Consumer thread will automatically wait until BlockingQueue is not populated with some data. Once it fills, thread will consume the resource.

BlockingQueue works on following rules:

  • If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
  • If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
  • If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

Explaining RejectedExecutionHandler

So the danger is, a task can be rejected as well. We need to have something in place to resolve this situation because no one would like to miss any single job in his application.

Can we do something about it? Yes, we can…[Borrowed from Obama]

BlockingQueue in case of rejection throws RejectedExectionException, we can add a handler for it.

Adding RejectedExecutionHandler is considered a good practice when using new concurrent APIs.

Testing our code

I am done with talking and its time to see if actually, what i said, works? Lets write a test case.

We have some 100 tasks. We want to run them using ideally 10, and maximum 20 threads. I am trying to write code as below. You might write it better or you have this solution.

package corejava.thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DemoExecutor {

	public static void main(String[] args) {
		Integer threadCounter = 0;
		BlockingQueue</runnable><runnable> blockingQueue = new ArrayBlockingQueue</runnable><runnable>(
				50);

		CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(10,
				20, 5000, TimeUnit.MILLISECONDS, blockingQueue);

		executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
			@Override
			public void rejectedExecution(Runnable r,
					ThreadPoolExecutor executor) {
				System.out.println("DemoTask Rejected : "
						+ ((DemoThread) r).getName());
				System.out.println("Waiting for a second !!");
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("Lets add another time : "
						+ ((DemoThread) r).getName());
				executor.execute(r);
			}
		});
		// Let start all core threads initially
		executor.prestartAllCoreThreads();
		while (true) {
			threadCounter++;
			// Adding threads one by one
			System.out.println("Adding DemoTask : " + threadCounter);
			executor.execute(new DemoThread(threadCounter.toString()));

			if (threadCounter == 100)
				break;
		}
	}

}

Execute above code and you will see the result is as desired and performance is also good.

I hope i am able to make a point here. Please let me know of your thoughts also.
Happy Learning !!

Discussion

Trackbacks/Pingbacks

  1. [...] question for a long time. With the JDK 1.5 release, ExecutorService and BlockingQueue brought another way of doing it more effectively, but this approach is also worth knowing and might be useful in certain [...]

Leave a Reply

Recommend this blog

Get the latest in your inbox. Subscribe Your email id.

Like us on facebook

Connect on Twitter

Follow on google plus

Copyright Information