Java DelayQueue is an unbounded blocking queue of delayed elements, in which an element can only be taken when its delay has expired. DelayQueue
class is part of java.util.concurrent
package.
1. What is a Delayed Element?
- An element will be considered delayed when it implements
java.util.concurrent.Delayed
interface and itsgetDelay()
method returns a zero or negative value which indicates that the delay has already elapsed. - To make things more clear, we can consider that each element stores its activation date/time. As soon as this timestamp reaches, the element is ready to be picked up from the queue. The
getDelay()
method returns the time until the activation of the element. - Note that the implementation of
Delayed
interface must define a compareTo() method that provides an ordering consistent with itsgetDelay()
method. - compareTo(Delayed o) method generally does not return the actual timestamp. It returns a value less than zero if the object that is executing the method has a delay smaller than the object passed as a parameter – otherwise a positive value greater than zero. It will return zero if both objects have the same delay.
public interface Delayed extends Comparable<Delayed>
{
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
*
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
2. Creating a Delayed Element
In the given example, we have created an event object. Each event will have an event id, name and activation date (after this time it shall be processed).
class DelayedEvent implements Delayed
{
private long id;
private String name;
private LocalDateTime activationDateTime;
//Constructor and getter methods
@Override
public int compareTo(Delayed that)
{
long result = this.getDelay(TimeUnit.NANOSECONDS)
- that.getDelay(TimeUnit.NANOSECONDS);
if (result < 0) {
return -1;
} else if (result > 0) {
return 1;
}
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
LocalDateTime now = LocalDateTime.now();
long diff = now.until(activationDateTime, ChronoUnit.MILLIS);
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public String toString() {
return "DelayedEvent [id=" + id + ", name=" + name + ", activationDateTime=" + activationDateTime + "]";
}
}
3. What is a DelayQueue?
- A
DelayQueue
is an unbounded blocking queue ofDelayed
elements. When a consumer of an element wants to take an element from the queue, it can take only when the delay for that particular element has expired. DelayQueue
is a specialized PriorityQueue that orders elements based on their delay time.- The head of the queue is the element whose delay expired furthest in the past.
- If there is no element whose delay has expired yet, there is no head element in the queue and
poll()
will returnnull
. - Even though unexpired elements cannot be removed using
take()
orpoll()
, they are otherwise treated as normal elements in the queue i.e.size()
method returns the count of both expired and unexpired elements. - This queue does not permit
null
elements because their delay cannot be determined.
4. Java DelayQueue Example
To demo the DelayQueue
, I have rewritten the producer consumer problem using ScheduledExecutorService. In this program, the producer thread adds events in a DelayQueue
. Consumer thread invokes periodically and picks up all items that have expired activation time i.e. in past.
4.1. Event Producer
class DelayedEventProducer implements Runnable
{
private final DelayQueue<DelayedEvent> queue;
private AtomicInteger counter;
public DelayedEventProducer(DelayQueue<DelayedEvent> queue, AtomicInteger counter) {
this.queue = queue;
this.counter = counter;
}
@Override
public void run()
{
LocalDateTime now = LocalDateTime.now();
int id = counter.incrementAndGet();
DelayedEvent event = new DelayedEvent(id, "Task-" + id, now);
System.out.println("Added to queue :: " + event);
queue.add(event);
}
}
4.2. Event Consumer
class DelayedEventConsumer implements Runnable
{
private final DelayQueue<DelayedEvent> queue;
public DelayedEventConsumer(DelayQueue<DelayedEvent> queue) {
this.queue = queue;
}
@Override
public void run()
{
List<DelayedEvent> events = new ArrayList<DelayedEvent>();
queue.drainTo(events);
System.out.println("\nEvent processing start **********\n");
events.stream().forEach(System.out::println);
System.out.println("\nEvent processing end **********\n");
}
}
4.3. Main Program
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Main
{
public static void main(String[] args) throws InterruptedException, ExecutionException
{
DelayQueue<DelayedEvent> queue = new DelayQueue<>();
AtomicInteger counter = new AtomicInteger();
ScheduledExecutorService ses = Executors.newScheduledThreadPool(2);
ses.scheduleAtFixedRate(new DelayedEventProducer(queue, counter), 1, 2, TimeUnit.SECONDS);
ses.scheduleAtFixedRate(new DelayedEventConsumer(queue), 1, 10, TimeUnit.SECONDS);
}
}
Program output.
Added to queue :: DelayedEvent [id=1, name=Task-1, activationDateTime=2019-05-27T15:56:33.689]
Added to queue :: DelayedEvent [id=2, name=Task-2, activationDateTime=2019-05-27T15:56:35.619]
Added to queue :: DelayedEvent [id=3, name=Task-3, activationDateTime=2019-05-27T15:56:37.619]
Added to queue :: DelayedEvent [id=4, name=Task-4, activationDateTime=2019-05-27T15:56:39.619]
Added to queue :: DelayedEvent [id=5, name=Task-5, activationDateTime=2019-05-27T15:56:41.619]
Added to queue :: DelayedEvent [id=6, name=Task-6, activationDateTime=2019-05-27T15:56:43.619]
Event processing start **********
DelayedEvent [id=1, name=Task-1, activationDateTime=2019-05-27T15:56:33.689]
DelayedEvent [id=2, name=Task-2, activationDateTime=2019-05-27T15:56:35.619]
DelayedEvent [id=3, name=Task-3, activationDateTime=2019-05-27T15:56:37.619]
DelayedEvent [id=4, name=Task-4, activationDateTime=2019-05-27T15:56:39.619]
DelayedEvent [id=5, name=Task-5, activationDateTime=2019-05-27T15:56:41.619]
DelayedEvent [id=6, name=Task-6, activationDateTime=2019-05-27T15:56:43.619]
Event processing end **********
Added to queue :: DelayedEvent [id=7, name=Task-7, activationDateTime=2019-05-27T15:56:45.620]
Added to queue :: DelayedEvent [id=8, name=Task-8, activationDateTime=2019-05-27T15:56:47.618]
Added to queue :: DelayedEvent [id=9, name=Task-9, activationDateTime=2019-05-27T15:56:49.620]
Added to queue :: DelayedEvent [id=10, name=Task-10, activationDateTime=2019-05-27T15:56:51.618]
Added to queue :: DelayedEvent [id=11, name=Task-11, activationDateTime=2019-05-27T15:56:53.619]
Event processing start **********
DelayedEvent [id=7, name=Task-7, activationDateTime=2019-05-27T15:56:45.620]
DelayedEvent [id=8, name=Task-8, activationDateTime=2019-05-27T15:56:47.618]
DelayedEvent [id=9, name=Task-9, activationDateTime=2019-05-27T15:56:49.620]
DelayedEvent [id=10, name=Task-10, activationDateTime=2019-05-27T15:56:51.618]
DelayedEvent [id=11, name=Task-11, activationDateTime=2019-05-27T15:56:53.619]
Event processing end **********
Clearly, the program is working as intended.
Drop me your questions in the comments section related to this Java DelayQueue example program.
Happy Learning !!
Leave a Reply