Throttling Task Submission Rate in Java

1. Introduction

In BlockingQueue and ThreadPoolExecutor example, we learned creating a CustomThreadPoolExecutor which had the following capabilities:

  • Tasks are being submitted to the blocking queue.
  • An executor picks up the task from the queue and execute them.
  • It had overridden beforeExecute() and afterExecute() methods to perform pre-and-post activities if needed.
  • Attached a RejectedExecutionHandler to handle a task if it got rejected because the queue was full.

Our approach was good enough already and capable of handling most of the practical scenarios. Now let’s add one more concept to it which may prove beneficial in some conditions. This concept is around the throttling of task submission in the queue.

In this example, throttling will help in keeping the number of tasks in queue in limit so that no task get rejected. It essentially removes the necessity of RejectedExecutionHandler as well.

2. What is Throttling?

In a web server, we can configure the maximum number of concurrent connections to the server. If more connections than this limit come to the server, they have to wait until some other connections are freed or closed. This limitation can be taken as throttling.

Throttling is the capability of regulating the rate of input for a system where the output rate is slower than the input. It is necessary to stop the system from crashing or resource exhaustion.

3. Throttling Task Submissions using Semaphore

We will use a Semaphore with a number that must be equal to the maximum number of tasks that are permitted to run concurrently. So the approach works like this:

  • Before executing a task, a lock in the semaphore is requested.
  • If the lock is acquired then execution works normally; Otherwise retry will happen until the lock is acquired.
  • Once the task is completed; the lock is released to the semaphore.

Locking and releasing the Semephore will ensure that there are no more that configured number of threads completing the tasks concurrently. The other tasks must wait in the blocking queue and retry after some time.

Our new throttling enabled BlockingThreadPoolExecutor looks like the following class:

import java.util.concurrent.*;

public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {
    private final Semaphore semaphore;

    public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                      TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        semaphore = new Semaphore(corePoolSize);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
    }

    @Override
    public void execute(final Runnable task) {
        boolean acquired = false;

        do {
            try {
                semaphore.acquire();
                acquired = true;
            } catch (final InterruptedException e) {
                e.printStackTrace();
            }
        } while (!acquired);

        try {
            super.execute(task);
        } catch (final RejectedExecutionException e) {
            System.out.println("Task Rejected");
            semaphore.release();
            return;
        }
        semaphore.release();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t != null) {
            t.printStackTrace();
        }
    }
}

We can now submit the unlimited tasks to the blocking queue or executor, and they will all execute without getting rejected.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockingThreadPoolExecutorDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(10);
        BlockingThreadPoolExecutor executor = new BlockingThreadPoolExecutor(1, 1, 5000, TimeUnit.MILLISECONDS, blockingQueue);
        executor.setRejectedExecutionHandler(new CustomRejectedExecutionHandler());

        executor.prestartAllCoreThreads();

        int threadCounter = 0;
        while (true) {
            threadCounter++;
            // Adding threads one by one
            System.out.println("Adding DemoTask : " + threadCounter);
            blockingQueue.offer(new DemoTask(Integer.toString(threadCounter)));
            if (threadCounter == 100)
                break;
        }

        Thread.sleep(1000000);
    }
}

When you run the DemoExecutor program using BlockingThreadPoolExecutor, you will not see any task rejected, and all tasks will be executed successfully.

4. Conclusion

In this tutorial, we learned to limit the task submission and execution rate with the help of BlockingQueue, ThreadPoolExecutor and Semaphore.

We can control the number of tasks executing at any time by passing the appropriate counter in Semaphore constructor.

Happy Learning !!

Leave a Reply

15 Comments
Most Voted
Newest Oldest
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions, and frequently asked interview questions.