Throttling Task Submission Rate with ThreadPoolExecutor and Semaphore

If you may know that in web-servers you can configure the maximum number of concurrent connections to the server. If more connections than this limit come to server, they have to wait until some other connections are freed or closed. This limitation can be taken as throttling. Throttling is the capability of regulating the rate of input for a system where output rate is slower than input. It is necessary to stop the system from crashing or resource exhaustion.

In one of my previous post related to BlockingQueue and ThreadPoolExecutor, We learned about creating a CustomThreadPoolExecutor which had following capabilities:

1) Tasks being submitted to blocking queue
2) An executor which picks up the task from queue and execute them
3) Had overridden beforeExecute() and afterExecute() methods to perform some extra activities if needed
4) Attached a RejectedExecutionHandler which handle the task if it got rejected because the queue was full

Our approach was good enough already and capable of handling most of the practical scenarios. Now let’s add one more concept into it which may prove beneficial in some conditions. This concept is around throttling of task submission in queue.


In this example, throttling will help in keeping the number of tasks in queue in limit so that no task get rejected. It essentially removes the necessity of RejectedExecutionHandler as well.

Previous Solution Using CustomThreadPoolExecutor with RejectedExecutionHandler

In this solution, we had following classes:

DemoTask.java

public class DemoTask implements Runnable
{
   private String name = null;

   public DemoTask(String name) {
      this.name = name;
   }

   public String getName() {
      return this.name;
   }

   @Override
   public void run(){
      try {
         Thread.sleep(1000);
      } catch (InterruptedException e){
         e.printStackTrace();
      }
      System.out.println("Executing : " + name);
   }
}

CustomThreadPoolExecutor.java

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExecutor extends ThreadPoolExecutor
{
   public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
									TimeUnit unit, BlockingQueue<Runnable> workQueue)
   {
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
   }

   @Override
   protected void beforeExecute(Thread t, Runnable r)
   {
      super.beforeExecute(t, r);
   }

   @Override
   protected void afterExecute(Runnable r, Throwable t)
   {
      super.afterExecute(r, t);
      if (t != null)
      {
         t.printStackTrace();
      }
   }
}

DemoExecutor.java

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DemoExecutor
{
   public static void main(String[] args)
   {
      Integer threadCounter = 0;
      BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);
      CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue);
      executor.setRejectedExecutionHandler(new RejectedExecutionHandler()
         {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
            {
               System.out.println("DemoTask Rejected : " + ((DemoTask) r).getName());
               try
               {
                  Thread.sleep(1000);
               } catch (InterruptedException e)
               {
                  e.printStackTrace();
               }
               System.out.println("Lets add another time : " + ((DemoTask) r).getName());
               executor.execute(r);
            }
         });
      // Let start all core threads initially
      executor.prestartAllCoreThreads();
      while (true)
      {
         threadCounter++;
         // Adding threads one by one
         //System.out.println("Adding DemoTask : " + threadCounter);
         executor.execute(new DemoTask(threadCounter.toString()));
         if (threadCounter == 1000)
            break;
      }
   }
}

If we run the above program, we will get the output like below:

DemoTask Rejected : 71
Executing : 3
Executing : 5
...
...

There will be multiple occurrences of “DemoTask Rejected“. In next solution, we will put throttle technique so that no task should be rejected.

Throttling Task submission rate using ThreadPoolExecutor and Semaphore

In this solution, we will create a Semaphore with a number which must be equal to maximum number of tasks in blocking queue at any given point of time. So the approach works like this:

1) Before executing a task a lock in semaphore is requested
2) If lock is acquired then execution works normally; Otherwise retry will happen until lock is acquired
3) Once task is completed; lock is released to semaphore

Our new throttling enabled BlockingThreadPoolExecutor looks like below:

package threadpoolDemo;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class BlockingThreadPoolExecutor extends ThreadPoolExecutor
{
   private final Semaphore semaphore;

   public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
   {
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
      semaphore = new Semaphore(corePoolSize + 50);
   }

   @Override
   protected void beforeExecute(Thread t, Runnable r)
   {
      super.beforeExecute(t, r);
   }

   @Override
   public void execute(final Runnable task)
   {
      boolean acquired = false;
      do
      {
         try
         {
            semaphore.acquire();
            acquired = true;
         } catch (final InterruptedException e)
         {
            //LOGGER.warn("InterruptedException whilst aquiring semaphore", e);
         }
      } while (!acquired);
      try
      {
         super.execute(task);
      } catch (final RejectedExecutionException e)
      {
         System.out.println("Task Rejected");
         semaphore.release();
         throw e;
      }
   }

   @Override
   protected void afterExecute(Runnable r, Throwable t)
   {
      super.afterExecute(r, t);
      if (t != null)
      {
         t.printStackTrace();
      }
      semaphore.release();
   }
}

Now test the code as below.

package threadpoolDemo;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DemoExecutor
{
   public static void main(String[] args)
   {
      Integer threadCounter = 0;
      BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);
      BlockingThreadPoolExecutor executor = new BlockingThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue);
      executor.setRejectedExecutionHandler(new RejectedExecutionHandler()
         {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
            {
               System.out.println("DemoTask Rejected : " + ((DemoTask) r).getName());
               try
               {
                  Thread.sleep(1000);
               } catch (InterruptedException e)
               {
                  e.printStackTrace();
               }
               System.out.println("Lets add another time : " + ((DemoTask) r).getName());
               executor.execute(r);
            }
         });
      // Let start all core threads initially
      executor.prestartAllCoreThreads();
      while (true)
      {
         threadCounter++;
         // Adding threads one by one
         System.out.println("Adding DemoTask : " + threadCounter);
         executor.execute(new DemoTask(threadCounter.toString()));
         if (threadCounter == 1000)
            break;
      }
   }
}

When you run the DemoExecutor program using BlockingThreadPoolExecutor in place of CustomThreadPoolExecutor, you will not see any task rejected and all tasks will be executed successfully.


You can control the number of tasks executing at any time parameter passes in Semaphore constructor.

That’s all for this post. You should read more posts on concurrency for better confidence.

Happy Learning !!

Was this post helpful?

Join 7000+ Awesome Developers

Get the latest updates from industry, awesome resources, blog updates and much more.

* We do not spam !!

15 thoughts on “Throttling Task Submission Rate with ThreadPoolExecutor and Semaphore”

  1. The post is really good. But I don’t understand one point. “In this solution, we will create a Semaphore with a number which must be equal to maximum number of tasks in blocking queue at any given point of time. So the approach works like this:”

    But in code, you have added 50 to core pool size. I though you would add it based on the size of the queue. Can you clarify please?

    Reply
  2. Hi,

    I’m a newbie in multithreading. Need help here on this topic. Why didn’t we use Executor Services and used newFixedThreadPool instead of writing this entire code? Am I missing anything here?

    Reply
      • I mean we can specify something like newFixedThreadPool(6); to limit the no of running threads at a time without writing this code? I thought Executor framework will take care of the rest. I need to go through more documents and juming up a bit but am I completely off the track here?

        Reply
  3. In the info section above you mentioned –
    “In this example, throttling will help in keeping the number of tasks in queue in limit so that no task get rejected. It essentially removes the necessity of RejectedExecutionHandler as well.”

    But in the example BlockingThreadPoolExecutor , RejectedExecutionHandler is still registered. Can’t we remove it?

    Reply
      • I think it will be same. Please correct me if wrong.
        If in above example without semaphore one , if we use:-

        BlockingQueue blockingQueue = new LinkedBlockingQueue();

        instead of

        BlockingQueue blockingQueue = new ArrayBlockingQueue(50);

        then it will work fine which is done same by Executors.newFixedThreadPool();

        Reply
  4. Hi

    This BlockingThreadPoolExecutor , will be problem for parallel execution , for tasks by multiple threads , because each task to be submitted for execution first it needs to acqcuire the lock , this will cause drop in performance

    Reply

Leave a Comment

HowToDoInJava

A blog about Java and related technologies, the best practices, algorithms, and interview questions.