How to use BlockingQueue and ThreadPoolExecutor in java

Life has become very easy for java programmers working on multi-threaded applications after release of JDK 5. JDK 5 brought many features related to multi-threaded processing which were kind of nightmare for application developers, and even worse for those who had to debug this code in future for bug fixing. Sometimes, this resulted in deadlock situations as well.

In this post, i will suggest to use such a new feature ThreadPoolExecutor in combination with BlockingQueue. I will let you know the best practices to use above classes in your application.

Sections in this post:

  • Introducing DemoTask
  • Adding CustomThreadPoolExecutor
  • Explaining BlockingQueue
  • Explaining RejectedExecutionHandler
  • Testing our code

Introducing DemoTask

I will not take much time here as our DemoTask is just another test thread written to support our logic and code.

package corejava.thread;

public class DemoThread implements Runnable 
	private String name = null;

	public DemoThread(String name) { = name;

	public String getName() {

	public void run() {
		try {
		} catch (InterruptedException e) {
		System.out.println("Executing : " + name);

Adding CustomThreadPoolExecutor

This is important. Our CustomThreadPoolExecutor is extension of ThreadPoolExecutor. Even without extending the ThreadPoolExecutor, simply creating its instance and using it, will also work correctly. But, we will miss some extremely useful features in terms of control of execution.

ThreadPoolExecutor provides two excellent methods which i will highly recommend to override i.e. beforeExecute() and afterExecute() methods. They provide very good handle on execution life cycle of runnables to be executed. Lets see above methods inside our CustomThreadPoolExecutor.

package corejava.thread;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {

	public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

	protected void beforeExecute(Thread t, Runnable r) {
		super.beforeExecute(t, r);
		System.out.println("Perform beforeExecute() logic");

	protected void afterExecute(Runnable r, Throwable t) {
		super.afterExecute(r, t);
		if (t != null) {
			System.out.println("Perform exception handler logic");
		System.out.println("Perform afterExecute() logic");


Explaining BlockingQueue

If you remember solving the producer-consumer problem, before JDK 5, consumer had to wait until producer put something in resource queue. This problem can be easily solved using new BlockingQueue.

BlockingQueue is like another Queue implementations with additional capabilities. Any attempt, to retrieve something out of it, can be seen safe as it will not return empty handed. Consumer thread will automatically wait until BlockingQueue is not populated with some data. Once it fills, thread will consume the resource.

BlockingQueue works on following rules:

  • If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
  • If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
  • If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

Explaining RejectedExecutionHandler

So the danger is, a task can be rejected as well. We need to have something in place to resolve this situation because no one would like to miss any single job in his application.

Can we do something about it? Yes, we can…[Borrowed from Obama]

BlockingQueue in case of rejection throws RejectedExectionException, we can add a handler for it.

Adding RejectedExecutionHandler is considered a good practice when using new concurrent APIs.

Testing our code

I am done with talking and its time to see if actually, what i said, works? Lets write a test case.

We have some 100 tasks. We want to run them using ideally 10, and maximum 20 threads. I am trying to write code as below. You might write it better or you have this solution.

package corejava.thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DemoExecutor 
	public static void main(String[] args) 
		Integer threadCounter = 0;
		BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);

		CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(10,
											20, 5000, TimeUnit.MILLISECONDS, blockingQueue);

		executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
			public void rejectedExecution(Runnable r,
					ThreadPoolExecutor executor) {
				System.out.println("DemoTask Rejected : "
						+ ((DemoThread) r).getName());
				System.out.println("Waiting for a second !!");
				try {
				} catch (InterruptedException e) {
				System.out.println("Lets add another time : "
						+ ((DemoThread) r).getName());
		// Let start all core threads initially
		while (true) {
			// Adding threads one by one
			System.out.println("Adding DemoTask : " + threadCounter);
			executor.execute(new DemoThread(threadCounter.toString()));

			if (threadCounter == 100)


Execute above code and you will see the result is as desired and performance is also good.

I hope i am able to make a point here. Please let me know of your thoughts also.

Happy Learning !!

33 thoughts on “How to use BlockingQueue and ThreadPoolExecutor in java”
  1. Hi,

    Great tutorial. Many thanks.

    In the section Explaining Blocking Queue, I wonder if the sentence “Consumer thread will automatically wait until BlockingQueue is not populated with some data.” should actually be “Consumer thread will automatically wait until BlockingQueue is populated with some data.”



  2. hi Lokesh,

    Nice tutorial!!!!

    When using ThreadPoolExecutor, Suppose if some thread is blocked, and i want to terminate all active threads and restart the ThreadPoolExecutor.

    Is it possible to terminate all active threads and start creating threads again to perform the task.

    Could you please explain. Thanks in advance..:)

    1. As stated in the documentation, you cannot reuse an ExecutorService that has been shut down. I’d recommend against any workarounds, since (a) they may not work as expected in all situations; and (b) you can achieve what you want using standard classes.

    1. Two thoughts come up immediately in my mind.
      1) If you want ordering, then no need to multi-thread your application. Multi-threading is inherently un-ordered and for parallel execution.
      2) If you still demand ordering based on object counter, doesn’t Queue process them sequentially in order they were added? Yes it does.

      Am I missing anything, Guys?

      1. I think they are not executing in the same order as they are created. For example, in my first run of the above code, thread 5 got executed before thread 1

  3. Hi Lokesh, Thanks for the beautiful explanation!!

    blockingQueue has never been used, why have you declared in first place?

  4. I have a scenario where I need a module based scheduling.. Probably I will consider separate BLOCKING QUEUE for each module for handling different their implementation. Do you think that creating a Map for module name to its Blocking queue would be the right solution ?

      1. Is there any way that I can override getTask() method of threadpool executor ?. Because I need to implement my own logic to get next task.


        1. NO. You can not. Reason is simple in two aspects. First the method getTask() is private so you cannot override. Second, this method plays a critical part in fetching the next executable/runnable from queue. If you are changing the logic here, then the whole idea of using a Queue falls apart. Queues use a fixed insertion/retrieval ordering and any program should honor this default behavior.

          Regarding your own logic to get next task, I will suggest you to research more on PriorityBlockingQueue concept. You can add a comparator to priority queue and it will re-arrange (theoretically i suppose, not tested myself) the runnables in queue and whenever executor ask for new task, queue can provide the next task as per your logic in comparator.

          Again, I have not tested it myself but this should be your area of research.

          1. Thanks Lokesh for your valuable feedback. I have a question about the second point that you mentioned, why do we need a complex method to fetch next executable from queue? . So you are saying if I have my own implementation for getTask(), I no need to extends ThreadPoolExecutor.


  5. Hi,
    Is there a possibility that I can reduce the number of threads of executor service once the threads are started? executor.shutdown() will terminate all the threads, but can I terminate one of the thread which was started by executor service?


  6. Hi,

    I am facing a problem in ThreadPoolExecutor, i am using ArrayBlockingQueue to queue the tasks. CorePoolSize is 50, maximum pool size is MAX value of integer. When I submit requests and number of threads reaches corePoolSize then it queue up the request in the ArrayBlockingQueue but never execute the run() method of the submitted requests. Could you please let me know what could be reason for this and possible solution. I’m facing this problem due to which my application is getting timeout as it does not get the response.


    1. Because your tasks are not getting executed. Try to put some log statements OR sysout statements to check that even a single task was executed.
      Then check if they are not being rejected. Add RejectedExecutionHandler.

      If still struck, paste the code here. I will try to get you a solution.

  7. I still don’t see why new tasks should be rejected if the queue is full. If new tasks are rejected when the queue is full, the purpose of using bounded queue is lost. The purpose of the bounded queue is that the put() method is blocked when the queue is full. It seems the ThreadPoolExecutor is not using this feature.

    Therefore, I think this is a design fault in ThreadPoolExecutor.

      1. New tasks submitted in method execute(java.lang.Runnable) will be rejected when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated

    1. The RejectedExecutionHandler has one approach problem. When a very large number of executions come in at a very short time (lets say the implementation is a kind of server) the method may encounter a stack overflow as rejected calls execution which rejects the execution back to rejected and so on eventually reaching overflow. I write this from experience using such an implementation in high volume transaction system. Thinking of a different solution as I can’t loose executions.

      1. Pretty valid point. Any suggestion what you would like to do in this case? One solution may be increasing the number of size of thread pool or queue size, but it has it’s own disadvantages in resource critical systems. Another approach may be to let tasks reject and log them somewhere to get a report in future.

Note:- In comment box, please put your code inside [java] ... [/java] OR [xml] ... [/xml] tags otherwise it may not appear as intended.

Leave a Reply

Your email address will not be published. Required fields are marked *

8 − four =

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>