BlockingQueue drainTo() – Polling Queue Items into Collection

Learn to use BlockingQueue.drainTo() method for draining a queue (polling all or a specific number of elements from the queue) into a Collection. Draining is needed in situations when multiple producer threads are adding elements into a BlockingQueue and a consumer thread, periodically, polls multiple items from the queue and processes them together.

One such example could be an excel report generator. There could be an ExecutorService with multiple threads processing records and putting them in the blocking queue. And a report writer thread periodically polling to queue and writing records in excel.

1. BlockingQueue drainTo() Method

  • The drainTo() removes all available elements from the specified queue and adds them to the given collection.
  • It provides better performance than polling all elements one by one.
  • The behavior of this method is undefined if the provided collection is modified while the draining is in progress.
  • If collection is immutable the method will throw UnsupportedOperationException.
  • For generic collections, incompatible class types will result in ClassCastException.

There are two versions of this method. The second method drains at most the maxElements number of available elements.

int drainTo(Collection c)
int drainTo(Collection c, int maxElements)

2. Demo

We are creating two tasks to demonstrate the usage of the drainTo () method. To keep things simple, let us call the Producer and Consumer. Producers will keep adding the items in a blocking queue, and the consumer will drain items, repeatedly, after some delay.

public class Producer implements Runnable {

  BlockingQueue queue;

  public Producer(BlockingQueue queue){
    this.queue = queue;
  }

  @Override
  @SneakyThrows
  public void run() {
    while (true){
      Thread.sleep(2000);
      System.out.println("Produced new message at : " + LocalDateTime.now());
      queue.offer("Test Message");
    }
  }
}
public class Consumer implements Runnable {

  BlockingQueue queue;

  public Consumer(BlockingQueue queue){
    this.queue = queue;
  }

  @Override
  @SneakyThrows
  public void run() {
    while (true) {
      Thread.sleep(10000);

      List<String> messages = new ArrayList<>();

      System.out.println("=========================================");
      System.out.println("Queue size before draining : " + queue.size());

      int messagesCount = queue.drainTo(messages, 20);

      System.out.println("Collection size : " + messagesCount);
      //messages.stream().forEach(System.out::println);

      System.out.println("Queue size after draining : " + queue.size());
      System.out.println("=========================================");
    }
  }
}

The following code creates an ExecutorService and starts two producer threads and one consumer thread. The Consumer thread executes every 10 seconds and drains all the messages from the queue.

public class QueueDrain {
  public static void main(String[] args) {
    BlockingQueue<String> queue = new ArrayBlockingQueue(20);

    ExecutorService executorService = Executors.newFixedThreadPool(3);

    executorService.submit(new Producer(queue));
    executorService.submit(new Producer(queue));
    executorService.submit(new Consumer(queue));
  }
}

The output of the program is as follows. Both producer threads produce 1 message each at 2 seconds intervals, thus a total of 10 messages. The consumer thread polls all the messages at every 10 seconds interval.

Produced new message at : 2022-08-10T15:45:58.627532600
Produced new message at : 2022-08-10T15:45:58.627532600
Produced new message at : 2022-08-10T15:46:00.631044400
Produced new message at : 2022-08-10T15:46:00.631044400
Produced new message at : 2022-08-10T15:46:02.646342
Produced new message at : 2022-08-10T15:46:02.646342
Produced new message at : 2022-08-10T15:46:04.647652800
Produced new message at : 2022-08-10T15:46:04.647790800

=========================================
Queue size before draining : 8
Collection size : 8
Queue size after draining : 0
=========================================

3. Conclusion

The BlockingQueue’s drainTo() method is a handy method for solving producer consumer problems where the producer threads’ rate is significantly less than consumer threads. In such cases, consumers can drain all or certain number of items from the queue and process them at once.

Happy Learning !!

Source Code on Github

Comments

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.

Our Blogs

REST API Tutorial

Dark Mode

Dark Mode