HowToDoInJava

  • Python
  • Java
  • Spring Boot
  • Dark Mode
Home / Java / Multi-threading / Throttling Task Submission Rate with ThreadPoolExecutor and Semaphore

Throttling Task Submission Rate with ThreadPoolExecutor and Semaphore

If you may know that in web-servers you can configure the maximum number of concurrent connections to the server. If more connections than this limit come to server, they have to wait until some other connections are freed or closed. This limitation can be taken as throttling. Throttling is the capability of regulating the rate of input for a system where output rate is slower than input. It is necessary to stop the system from crashing or resource exhaustion.

In one of my previous post related to BlockingQueue and ThreadPoolExecutor, We learned about creating a CustomThreadPoolExecutor which had following capabilities:

1) Tasks being submitted to blocking queue
2) An executor which picks up the task from queue and execute them
3) Had overridden beforeExecute() and afterExecute() methods to perform some extra activities if needed
4) Attached a RejectedExecutionHandler which handle the task if it got rejected because the queue was full

Our approach was good enough already and capable of handling most of the practical scenarios. Now let’s add one more concept into it which may prove beneficial in some conditions. This concept is around throttling of task submission in queue.


In this example, throttling will help in keeping the number of tasks in queue in limit so that no task get rejected. It essentially removes the necessity of RejectedExecutionHandler as well.

Previous Solution Using CustomThreadPoolExecutor with RejectedExecutionHandler

In this solution, we had following classes:

DemoTask.java

public class DemoTask implements Runnable
{
   private String name = null;

   public DemoTask(String name) {
      this.name = name;
   }

   public String getName() {
      return this.name;
   }

   @Override
   public void run(){
      try {
         Thread.sleep(1000);
      } catch (InterruptedException e){
         e.printStackTrace();
      }
      System.out.println("Executing : " + name);
   }
}

CustomThreadPoolExecutor.java

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExecutor extends ThreadPoolExecutor
{
   public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
									TimeUnit unit, BlockingQueue<Runnable> workQueue)
   {
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
   }

   @Override
   protected void beforeExecute(Thread t, Runnable r)
   {
      super.beforeExecute(t, r);
   }

   @Override
   protected void afterExecute(Runnable r, Throwable t)
   {
      super.afterExecute(r, t);
      if (t != null)
      {
         t.printStackTrace();
      }
   }
}

DemoExecutor.java

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DemoExecutor
{
   public static void main(String[] args)
   {
      Integer threadCounter = 0;
      BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);
      CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue);
      executor.setRejectedExecutionHandler(new RejectedExecutionHandler()
         {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
            {
               System.out.println("DemoTask Rejected : " + ((DemoTask) r).getName());
               try
               {
                  Thread.sleep(1000);
               } catch (InterruptedException e)
               {
                  e.printStackTrace();
               }
               System.out.println("Lets add another time : " + ((DemoTask) r).getName());
               executor.execute(r);
            }
         });
      // Let start all core threads initially
      executor.prestartAllCoreThreads();
      while (true)
      {
         threadCounter++;
         // Adding threads one by one
         //System.out.println("Adding DemoTask : " + threadCounter);
         executor.execute(new DemoTask(threadCounter.toString()));
         if (threadCounter == 1000)
            break;
      }
   }
}

If we run the above program, we will get the output like below:

DemoTask Rejected : 71
Executing : 3
Executing : 5
...
...

There will be multiple occurrences of “DemoTask Rejected“. In next solution, we will put throttle technique so that no task should be rejected.

Throttling Task submission rate using ThreadPoolExecutor and Semaphore

In this solution, we will create a Semaphore with a number which must be equal to maximum number of tasks in blocking queue at any given point of time. So the approach works like this:

1) Before executing a task a lock in semaphore is requested
2) If lock is acquired then execution works normally; Otherwise retry will happen until lock is acquired
3) Once task is completed; lock is released to semaphore

Our new throttling enabled BlockingThreadPoolExecutor looks like below:

package threadpoolDemo;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class BlockingThreadPoolExecutor extends ThreadPoolExecutor
{
   private final Semaphore semaphore;

   public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
   {
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
      semaphore = new Semaphore(corePoolSize + 50);
   }

   @Override
   protected void beforeExecute(Thread t, Runnable r)
   {
      super.beforeExecute(t, r);
   }

   @Override
   public void execute(final Runnable task)
   {
      boolean acquired = false;
      do
      {
         try
         {
            semaphore.acquire();
            acquired = true;
         } catch (final InterruptedException e)
         {
            //LOGGER.warn("InterruptedException whilst aquiring semaphore", e);
         }
      } while (!acquired);
      try
      {
         super.execute(task);
      } catch (final RejectedExecutionException e)
      {
         System.out.println("Task Rejected");
         semaphore.release();
         throw e;
      }
   }

   @Override
   protected void afterExecute(Runnable r, Throwable t)
   {
      super.afterExecute(r, t);
      if (t != null)
      {
         t.printStackTrace();
      }
      semaphore.release();
   }
}

Now test the code as below.

package threadpoolDemo;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DemoExecutor
{
   public static void main(String[] args)
   {
      Integer threadCounter = 0;
      BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);
      BlockingThreadPoolExecutor executor = new BlockingThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue);
      executor.setRejectedExecutionHandler(new RejectedExecutionHandler()
         {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
            {
               System.out.println("DemoTask Rejected : " + ((DemoTask) r).getName());
               try
               {
                  Thread.sleep(1000);
               } catch (InterruptedException e)
               {
                  e.printStackTrace();
               }
               System.out.println("Lets add another time : " + ((DemoTask) r).getName());
               executor.execute(r);
            }
         });
      // Let start all core threads initially
      executor.prestartAllCoreThreads();
      while (true)
      {
         threadCounter++;
         // Adding threads one by one
         System.out.println("Adding DemoTask : " + threadCounter);
         executor.execute(new DemoTask(threadCounter.toString()));
         if (threadCounter == 1000)
            break;
      }
   }
}

When you run the DemoExecutor program using BlockingThreadPoolExecutor in place of CustomThreadPoolExecutor, you will not see any task rejected and all tasks will be executed successfully.


You can control the number of tasks executing at any time parameter passes in Semaphore constructor.

That’s all for this post. You should read more posts on concurrency for better confidence.

Happy Learning !!

Was this post helpful?

Let us know if you liked the post. That’s the only way we can improve.

Share this:

  • Twitter
  • Facebook
  • LinkedIn
  • Reddit

About Lokesh Gupta

A family guy with fun loving nature. Love computers, programming and solving everyday problems. Find me on Facebook and Twitter.

Feedback, Discussion and Comments

  1. Manoj Periathambi

    November 20, 2018

    The post is really good. But I don’t understand one point. “In this solution, we will create a Semaphore with a number which must be equal to maximum number of tasks in blocking queue at any given point of time. So the approach works like this:”

    But in code, you have added 50 to core pool size. I though you would add it based on the size of the queue. Can you clarify please?

  2. Alpha

    March 21, 2017

    Why didn’t you put the semaphore.acquire() in the beforeExecute

  3. ara

    October 23, 2016

    Hi,

    I’m a newbie in multithreading. Need help here on this topic. Why didn’t we use Executor Services and used newFixedThreadPool instead of writing this entire code? Am I missing anything here?

    • Lokesh Gupta

      October 24, 2016

      ThreadPoolExecutor is specific implementation of Executor interface. Is your question different??

      • ara

        October 25, 2016

        I mean we can specify something like newFixedThreadPool(6); to limit the no of running threads at a time without writing this code? I thought Executor framework will take care of the rest. I need to go through more documents and juming up a bit but am I completely off the track here?

  4. RamJane

    March 24, 2015

    Isnt the blockingthreadpoolexector same can be achieve using the fixedthreadpool exector.

  5. Manish

    March 5, 2015

    In the info section above you mentioned –
    “In this example, throttling will help in keeping the number of tasks in queue in limit so that no task get rejected. It essentially removes the necessity of RejectedExecutionHandler as well.”

    But in the example BlockingThreadPoolExecutor , RejectedExecutionHandler is still registered. Can’t we remove it?

    • Lokesh Gupta

      March 5, 2015

      Yes, you can remove RejectedExecutionHandler. I added it to log that no task is rejected now.

  6. tarun

    February 11, 2015

    cant we achieve this by limiting the number of threads in thread pool ?

    • Lokesh Gupta

      February 11, 2015

      Not really. Both are different things.

      • Jaspreet Singh

        July 5, 2019

        I think it will be same. Please correct me if wrong.
        If in above example without semaphore one , if we use:-

        BlockingQueue blockingQueue = new LinkedBlockingQueue();

        instead of

        BlockingQueue blockingQueue = new ArrayBlockingQueue(50);

        then it will work fine which is done same by Executors.newFixedThreadPool();

  7. Ramesh

    July 23, 2014

    Hi

    This BlockingThreadPoolExecutor , will be problem for parallel execution , for tasks by multiple threads , because each task to be submitted for execution first it needs to acqcuire the lock , this will cause drop in performance

    • Lokesh Gupta

      July 23, 2014

      True. But here we are trying to limit of rejected tasks to zero. In good performance, task will be added son but they may be rejected as well.

  8. Prashanth V

    May 24, 2014

    Hi,
    Where is the code for BlockingThreadPoolExecutor ? I don t see it in ur post. Pls update it. Thanks.

    • Lokesh Gupta

      May 24, 2014

      My bad. Weekend hangover. Updated the post. Thanks for pointing out. Much appreciated.

Comments are closed on this article!

Search Tutorials

Java Concurrency Tutorial

  • Java Concurrency – Introduction
  • Concurrency Evolution
  • Thread Safety
  • Concurrency vs. Parallelism
  • Compare and Swap [CAS]
  • synchronized keyword
  • Object vs. Class Level Locking
  • Runnable vs. Thread
  • wait(), notify() and notifyAll()
  • Yield() vs. Join()
  • Sleep() vs. Wait()
  • Lock vs. Monitor
  • Callable + Future
  • UncaughtExceptionHandler
  • Throttling Task Submission
  • Executor Best Practices
  • Inter-thread Communication
  • Write and Resolve Deadlock

Java Concurrency Utilities

  • AtomicInteger
  • Lock
  • ThreadFactory
  • ThreadLocal
  • ExecutorService
  • ThreadPoolExecutor
  • FixedSizeThreadPoolExecutor
  • ScheduledThreadPoolExecutor
  • Semaphore
  • Binary Semaphore
  • BlockingQueue
  • DelayQueue
  • ConcurrentLinkedDeque
  • CountDownLatch
  • ForkJoinPool

Java Tutorial

  • Java Introduction
  • Java Keywords
  • Java Flow Control
  • Java OOP
  • Java Inner Class
  • Java String
  • Java Enum
  • Java Collections
  • Java ArrayList
  • Java HashMap
  • Java Array
  • Java Sort
  • Java Clone
  • Java Date Time
  • Java Concurrency
  • Java Generics
  • Java Serialization
  • Java Input Output
  • Java New I/O
  • Java Exceptions
  • Java Annotations
  • Java Reflection
  • Java Garbage collection
  • Java JDBC
  • Java Security
  • Java Regex
  • Java Servlets
  • Java XML
  • Java Puzzles
  • Java Examples
  • Java Libraries
  • Java Resources
  • Java 14
  • Java 12
  • Java 11
  • Java 10
  • Java 9
  • Java 8
  • Java 7

Meta Links

  • About Me
  • Contact Us
  • Privacy policy
  • Advertise
  • Guest and Sponsored Posts

Recommended Reading

  • 10 Life Lessons
  • Secure Hash Algorithms
  • How Web Servers work?
  • How Java I/O Works Internally?
  • Best Way to Learn Java
  • Java Best Practices Guide
  • Microservices Tutorial
  • REST API Tutorial
  • How to Start New Blog

Copyright © 2020 · HowToDoInjava.com · All Rights Reserved. | Sitemap

  • Java 15 New Features
  • Sealed Classes and Interfaces
  • EdDSA (Ed25519 / Ed448)