Learn to use Java ThreadPoolExecutor in combination with BlockingQueue.
1. Creating ThreadPoolExecutor
A ThreadPoolExecutor is a type of ExecutorService
that executes each submitted task using one of the threads from a thread pool. This class provides many flexible ways to create a pool of threads in different contexts.
1.1 Constructors
The following constructors can be used to create a thread pool executor instance based on our requirements.
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
The constructor arguments are:
corePoolSize
– the number of threads to keep in the pool, even if they are idle.maximumPoolSize
– the maximum number of threads to allow in the pool.keepAliveTime
– when the number of threads is greater than the core, this is the maximum time an idle thread will wait for the new task.unit
– the time unit for the keepAliveTime argument.workQueue
– the queue to use for holding Runnable tasks before they are executed.threadFactory
– an optional factory to use when the executor creates a new thread.handler
– rejected task execution handler.
1.2. Custom ThreadPoolExecutor
Even without extending the ThreadPoolExecutor, we can use it very effectively. But, we will miss some extremely useful features in terms of controlling the execution flow.
For example, ThreadPoolExecutor class provides two excellent methods which I will highly recommend to override. These methods provide a very good handle on the execution lifecycle of a Runnable to be executed.
beforeExecute()
afterExecute()
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
System.out.println("Perform beforeExecute() logic");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null) {
System.out.println("Perform exception handler logic");
}
System.out.println("Perform afterExecute() logic");
}
}
2. Creating BlockingQueue
A BlockingQueue is like another Queue
implementations with additional capabilities. Any attempt, to retrieve something out of it, can be seen safe as it will not return empty. The consumer thread will automatically wait until BlockingQueue is not populated with some data. Once it fills, the thread will consume the resource.
A BlockingQueue may be used to transfer and hold the tasks to be executed by the thread pool. Blocking queues helps in many ways:
- If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
- If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
- If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
2.1. Queueing Strategies
The ThreadPoolExecutor support different kind of blocking queues. Each queue provides a different behavior to the processing of the tasks.
2.1.1. Direct Handoffs
This can be achieved with SynchronousQueue
that does not have any internal capacity. We cannot insert a task (using any method) unless another thread is trying to take it.
When using the synchronous queue, when we attempt to queue a task then this will fail if no threads are immediately available to run it. If it still has not reached to maximumPoolSize thread then a new thread will be constructed. Else, the task will be rejected immediately.
2.1.2. Unbounded Queues
An unbounded queue (for example, LinkedBlockingQueue
) causes new submitted tasks to wait in the queue when all (corePoolSize) threads are busy. Because tasks can wait for unlimited time, the executor needs not create new threads. So maximumPoolSize has no effect if this queue is used.
This style of queuing can be useful when a sudden burst of requests comes to the server. Although, this may lead to memory issues if requests continue to come faster than they are processed.
2.1.3. Bounded Queues
Bounded queues (for example, ArrayBlockingQueue
) helps in managing the resources in a much better way. It provides mechanisms to control the number of threads as well as the tasks in the queues to prevent resource exhaustion.
For different scenarios, we can test custom pool sizes and queue sizes, and finally, use what is best suited for our usecase.
- Using large queues and small pools minimizes the system overhead, but leads to low throughput.
- Using small queues and large pools also keeps the CPU busy which also can lead to low throughput.
- So finding a right balance between the queue size and pool size is important.
2.2. Handling Rejected Tasks
There may be situations when the submitted tasks cannot be executed by the executor service and thus have been rejected. Task rejection may occur when no more threads or queue slots are available because their bounds have been exceeded, or the executor has been shut down.
ThreadPoolExecutor provides the following 4 inbuild handlers to handle these rejected tasks. We can create our own custom handler as well.
- AbortPolicy : This is the default policy. It causes the executor to throw a RejectedExecutionException.
- CallerRunsPolicy : This policy runs the rejected task directly in the calling thread of the
execute
method. If the executor has been shut down, the task will be discarded. - DiscardOldestPolicy : This policy discards the oldest unhandled request and then retries
execute
. If the executor has been shut down, the task will be discarded. - DiscardPolicy : This policy silently discards the rejected task.
- Custom Policy : We can implement the RejectedExecutionHandler interface and provide our own logic to handle the rejected tasks.
3. Using ThreadPoolExecutor with BlockingQueue
To demonstrate the usage of ThreadPoolExecutor with BlockingQueue, we have created one task DemoTask
. This task does nothing. It simply waits for 500ms and then completes.
public class DemoTask implements Runnable {
private String name = null;
public DemoTask(String name) {
this.name = name;
}
public String getName() {
return this.name;
}
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Executing : " + name);
}
}
Now lets suppose we have total 100 tasks. We want to run them using ideally 10, and the maximum of 20 threads.
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class DemoExecutor {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Runnable> blockingQueue =
new LinkedBlockingQueue<Runnable>();
CustomThreadPoolExecutor executor =
new CustomThreadPoolExecutor(10, 20, 5, TimeUnit.SECONDS,
blockingQueue, new ThreadPoolExecutor.AbortPolicy());
// Let start all core threads initially
executor.prestartAllCoreThreads();
for (int i = 1; i <= 100; i++) {
blockingQueue.offer(new DemoTask("Task " + i));
}
executor.shutdown();
executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
}
Execute the above code and you will see all the tasks gets executed one by one.
Happy Learning !!
Hii Lokesh ,
What is differnece between newFixedThreadPool and Blocking queue in multi threading? Please help .
I’m curious here,
Shouldn’t I wait till the executor is terminated??
Could you please give me idea on this one?
My mCustomBDThreadPoolEx.toString() shows 161k queued tasks.
[Shutting down, pool size = 10, active threads = 10, queued tasks = 16249, completed tasks = 9967]
16249,
Is this checking is mandatory?
What does it do actaullly.
mCustomBDThreadPoolEx.shutdown();
while (!mCustomBDThreadPoolEx.isTerminated()) {
try {
log.info(“executoor still runing”);
Thread.sleep(1000);
} catch (InterruptedException e) {
log.info(“error at mCustomBDThreadPoolEx” + e);
}
}
Hello, am implementing a thread pool in web service using jersey framework, but the problem is that in jersey every request will create a new object of the resource place and it is this class that i have instantiated the thread pool class, how do i this without creating the a new instance of the thread pool every time a request comes in, thank you
Can you please share more information about how you have implemented it till now??
Hi,
Great tutorial. Many thanks.
In the section Explaining Blocking Queue, I wonder if the sentence “Consumer thread will automatically wait until BlockingQueue is not populated with some data.” should actually be “Consumer thread will automatically wait until BlockingQueue is populated with some data.”
Thanks,
Ron.
hi Lokesh,
Nice tutorial!!!!
When using ThreadPoolExecutor, Suppose if some thread is blocked, and i want to terminate all active threads and restart the ThreadPoolExecutor.
Is it possible to terminate all active threads and start creating threads again to perform the task.
Could you please explain. Thanks in advance..:)
As stated in the documentation, you cannot reuse an ExecutorService that has been shut down. I’d recommend against any workarounds, since (a) they may not work as expected in all situations; and (b) you can achieve what you want using standard classes.
This line giving error :
BlockingQueue blockingQueue = new ArrayBlockingQueue(
50);
Yes, this line gives an error.
I am using this constructor.
Can i use LinkedBlockingQueue like LinkedBlockingQueue logQueue = new LinkedBlockingQueue(QUEUE_CAPACITY)?
I do not see any problem as far as it is implementation of BlockingQueue.
How to maintain the thread execution order? I want threads to be executed in the order they are created.
Two thoughts come up immediately in my mind.
1) If you want ordering, then no need to multi-thread your application. Multi-threading is inherently un-ordered and for parallel execution.
2) If you still demand ordering based on object counter, doesn’t Queue process them sequentially in order they were added? Yes it does.
Am I missing anything, Guys?
I think they are not executing in the same order as they are created. For example, in my first run of the above code, thread 5 got executed before thread 1
Hi Lokesh, Thanks for the beautiful explanation!!
blockingQueue has never been used, why have you declared in first place?
I passed it into CustomThreadPoolExecutor’s constructor.
Your main program never terminates.
Yes, it’s expected and intended behavior.
I have a scenario where I need a module based scheduling.. Probably I will consider separate BLOCKING QUEUE for each module for handling different their implementation. Do you think that creating a Map for module name to its Blocking queue would be the right solution ?
Seems ok to me. Basically you would like to hide it behind a factory kind of wrapper. It will help in replacing the logic if not proved correct in future.
Is there any way that I can override getTask() method of threadpool executor ?. Because I need to implement my own logic to get next task.
Thanks,
Dimal
NO. You can not. Reason is simple in two aspects. First the method getTask() is private so you cannot override. Second, this method plays a critical part in fetching the next executable/runnable from queue. If you are changing the logic here, then the whole idea of using a Queue falls apart. Queues use a fixed insertion/retrieval ordering and any program should honor this default behavior.
Regarding your own logic to get next task, I will suggest you to research more on PriorityBlockingQueue concept. You can add a comparator to priority queue and it will re-arrange (theoretically i suppose, not tested myself) the runnables in queue and whenever executor ask for new task, queue can provide the next task as per your logic in comparator.
Again, I have not tested it myself but this should be your area of research.
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/PriorityBlockingQueue.html
Thanks Lokesh for your valuable feedback. I have a question about the second point that you mentioned, why do we need a complex method to fetch next executable from queue? . So you are saying if I have my own implementation for getTask(), I no need to extends ThreadPoolExecutor.
Thanks,
Dimal
Hi,
Is there a possibility that I can reduce the number of threads of executor service once the threads are started? executor.shutdown() will terminate all the threads, but can I terminate one of the thread which was started by executor service?
Thanks,
Bala
Hi,
I am facing a problem in ThreadPoolExecutor, i am using ArrayBlockingQueue to queue the tasks. CorePoolSize is 50, maximum pool size is MAX value of integer. When I submit requests and number of threads reaches corePoolSize then it queue up the request in the ArrayBlockingQueue but never execute the run() method of the submitted requests. Could you please let me know what could be reason for this and possible solution. I’m facing this problem due to which my application is getting timeout as it does not get the response.
Thanks
AK
Because your tasks are not getting executed. Try to put some log statements OR sysout statements to check that even a single task was executed.
Then check if they are not being rejected. Add RejectedExecutionHandler.
If still struck, paste the code here. I will try to get you a solution.
I still don’t see why new tasks should be rejected if the queue is full. If new tasks are rejected when the queue is full, the purpose of using bounded queue is lost. The purpose of the bounded queue is that the put() method is blocked when the queue is full. It seems the ThreadPoolExecutor is not using this feature.
Therefore, I think this is a design fault in ThreadPoolExecutor.
Hmmm, it’s good point. But I do not completely agree with you. Reason is that this is well documented behavior. Please refer to bounded queue and unbounded queue sections.
https://github.com/openjdk-mirror/jdk7u-jdk/blob/master/src/share/classes/java/util/concurrent/ThreadPoolExecutor.java
It is desired in many cases to prevent resource exhaustion. Otherwise waiting queue can keep growing and eat up whole system resources.
New tasks submitted in method execute(java.lang.Runnable) will be rejected when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated
100% correct. You are right.
Try using the CallerRunsPolicy:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html
From ThreadPoolExecutor JavaDoc:
“In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.”
It also ensures that tasks will never be discarded.
It’s good point.
You are right! execute(Runnable task) adds tasks in blocking queue using offer(task) which will return false if queue if full and will not block producer : https://www.baeldung.com/java-blocking-queue . However, you can setRejectedExecutionHandler by using a timeout for which you want to block the producer before resubmit the task again.
Using RejectedExecutionHandler is good concept. Liked it
The RejectedExecutionHandler has one approach problem. When a very large number of executions come in at a very short time (lets say the implementation is a kind of server) the method may encounter a stack overflow as rejected calls execution which rejects the execution back to rejected and so on eventually reaching overflow. I write this from experience using such an implementation in high volume transaction system. Thinking of a different solution as I can’t loose executions.
Pretty valid point. Any suggestion what you would like to do in this case? One solution may be increasing the number of size of thread pool or queue size, but it has it’s own disadvantages in resource critical systems. Another approach may be to let tasks reject and log them somewhere to get a report in future.
How about using BlockingQueue.put call which will wait till slot is empty?