Java DelayQueue – Blocking Queue for Delayed Elements

Java DelayQueue is an unbounded blocking queue of delayed elements, in which an element can only be taken when its delay has expired. DelayQueue class is part of java.util.concurrent package.

1. What is a Delayed Element?

  • An element will be considered delayed when it implements java.util.concurrent.Delayed interface and its getDelay() method returns a zero or negative value which indicates that the delay has already elapsed.
  • To make things more clear, we can consider that each element stores its activation date/time. As soon as this timestamp reaches, the element is ready to be picked up from the queue. The getDelay() method returns the time until the activation of the element.
  • Note that the implementation of Delayed interface must define a compareTo() method that provides an ordering consistent with its getDelay() method.
  • compareTo(Delayed o) method generally does not return the actual timestamp. It returns a value less than zero if the object that is executing the method has a delay smaller than the object passed as a parameter – otherwise a positive value greater than zero. It will return zero if both objects have the same delay.
public interface Delayed extends Comparable<Delayed> 
{
  /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     *
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

2. Creating a Delayed Element

In the given example, we have created an event object. Each event will have an event id, name and activation date (after this time it shall be processed).

class DelayedEvent implements Delayed 
{
  private long id;
  private String name;
  private LocalDateTime activationDateTime;
 
  //Constructor and getter methods
 
  @Override
  public int compareTo(Delayed that) 
  {
    long result = this.getDelay(TimeUnit.NANOSECONDS) 
            - that.getDelay(TimeUnit.NANOSECONDS);
    if (result < 0) {
      return -1;
    } else if (result > 0) {
      return 1;
    }
    return 0;
  }
 
  @Override
  public long getDelay(TimeUnit unit) {
    LocalDateTime now = LocalDateTime.now();
    long diff = now.until(activationDateTime, ChronoUnit.MILLIS);
    return unit.convert(diff, TimeUnit.MILLISECONDS);
  }
 
  @Override
  public String toString() {
    return "DelayedEvent [id=" + id + ", name=" + name + ", activationDateTime=" + activationDateTime + "]";
  }
}

3. What is a DelayQueue?

  • A DelayQueue is an unbounded blocking queue of Delayed elements. When a consumer of an element wants to take an element from the queue, it can take only when the delay for that particular element has expired.
  • DelayQueue is a specialized PriorityQueue that orders elements based on their delay time.
  • The head of the queue is the element whose delay expired furthest in the past.
  • If there is no element whose delay has expired yet, there is no head element in the queue and poll() will return null.
  • Even though unexpired elements cannot be removed using take() or poll(), they are otherwise treated as normal elements in the queue i.e. size() method returns the count of both expired and unexpired elements.
  • This queue does not permit null elements because their delay cannot be determined.

4. Java DelayQueue Example

To demo the DelayQueue, I have rewritten the producer consumer problem using ScheduledExecutorService. In this program, the producer thread adds events in a DelayQueue. Consumer thread invokes periodically and picks up all items that have expired activation time i.e. in past.

4.1. Event Producer

class DelayedEventProducer implements Runnable 
{
  private final DelayQueue<DelayedEvent> queue;
  private AtomicInteger counter;
 
  public DelayedEventProducer(DelayQueue<DelayedEvent> queue, AtomicInteger counter) {
    this.queue = queue;
    this.counter = counter;
  }
 
  @Override
  public void run() 
  {
    LocalDateTime now = LocalDateTime.now();
    int id = counter.incrementAndGet();
    DelayedEvent event = new DelayedEvent(id, "Task-" + id, now);
    System.out.println("Added to queue :: " + event);
    queue.add(event);
  }
}

4.2. Event Consumer

class DelayedEventConsumer implements Runnable 
{
  private final DelayQueue<DelayedEvent> queue;
 
  public DelayedEventConsumer(DelayQueue<DelayedEvent> queue) {
    this.queue = queue;
  }
 
  @Override
  public void run() 
  {
    List<DelayedEvent> events = new ArrayList<DelayedEvent>();
    queue.drainTo(events);
    System.out.println("\nEvent processing start **********\n");
    events.stream().forEach(System.out::println);
    System.out.println("\nEvent processing end **********\n");
  }
}

4.3. Main Program

import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
 
public class Main 
{
  public static void main(String[] args) throws InterruptedException, ExecutionException 
  {
    DelayQueue<DelayedEvent> queue = new DelayQueue<>();
    AtomicInteger counter = new AtomicInteger();
    ScheduledExecutorService ses = Executors.newScheduledThreadPool(2);
    ses.scheduleAtFixedRate(new DelayedEventProducer(queue, counter), 1, 2, TimeUnit.SECONDS);
    ses.scheduleAtFixedRate(new DelayedEventConsumer(queue), 1, 10, TimeUnit.SECONDS);
  }
}

Program output.

Added to queue :: DelayedEvent [id=1, name=Task-1, activationDateTime=2019-05-27T15:56:33.689]
Added to queue :: DelayedEvent [id=2, name=Task-2, activationDateTime=2019-05-27T15:56:35.619]
Added to queue :: DelayedEvent [id=3, name=Task-3, activationDateTime=2019-05-27T15:56:37.619]
Added to queue :: DelayedEvent [id=4, name=Task-4, activationDateTime=2019-05-27T15:56:39.619]
Added to queue :: DelayedEvent [id=5, name=Task-5, activationDateTime=2019-05-27T15:56:41.619]
Added to queue :: DelayedEvent [id=6, name=Task-6, activationDateTime=2019-05-27T15:56:43.619]

Event processing start **********

DelayedEvent [id=1, name=Task-1, activationDateTime=2019-05-27T15:56:33.689]
DelayedEvent [id=2, name=Task-2, activationDateTime=2019-05-27T15:56:35.619]
DelayedEvent [id=3, name=Task-3, activationDateTime=2019-05-27T15:56:37.619]
DelayedEvent [id=4, name=Task-4, activationDateTime=2019-05-27T15:56:39.619]
DelayedEvent [id=5, name=Task-5, activationDateTime=2019-05-27T15:56:41.619]
DelayedEvent [id=6, name=Task-6, activationDateTime=2019-05-27T15:56:43.619]

Event processing end **********

Added to queue :: DelayedEvent [id=7, name=Task-7, activationDateTime=2019-05-27T15:56:45.620]
Added to queue :: DelayedEvent [id=8, name=Task-8, activationDateTime=2019-05-27T15:56:47.618]
Added to queue :: DelayedEvent [id=9, name=Task-9, activationDateTime=2019-05-27T15:56:49.620]
Added to queue :: DelayedEvent [id=10, name=Task-10, activationDateTime=2019-05-27T15:56:51.618]
Added to queue :: DelayedEvent [id=11, name=Task-11, activationDateTime=2019-05-27T15:56:53.619]

Event processing start **********

DelayedEvent [id=7, name=Task-7, activationDateTime=2019-05-27T15:56:45.620]
DelayedEvent [id=8, name=Task-8, activationDateTime=2019-05-27T15:56:47.618]
DelayedEvent [id=9, name=Task-9, activationDateTime=2019-05-27T15:56:49.620]
DelayedEvent [id=10, name=Task-10, activationDateTime=2019-05-27T15:56:51.618]
DelayedEvent [id=11, name=Task-11, activationDateTime=2019-05-27T15:56:53.619]

Event processing end **********

Clearly, the program is working as intended.

Drop me your questions in the comments section related to this Java DelayQueue example program.

Happy Learning !!

Comments

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.