JavaBlog.fr / Java.lu DEVELOPMENT,Java Java: BlockingQueue implementation DelayQueue (Part7)

Java: BlockingQueue implementation DelayQueue (Part7)

After my post concerning presentation of Queue/BlockingQueue, I would continue with a post about the BlockingQueue implementation: java.util.concurrent.DelayQueue.

This implementation of BlockingQueue stores and keeps the elements internally until a certain delay has expired. So, this queue is an unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired.The head of the queue is that Delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will return null. Expiration occurs when an element’s getDelay(…) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements. For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements.

We will illustrate our writing by example of queue containing business objects MyNoTheadSafeTask which will be accessed by:
– one producer thread which cyclically, will try to put 10 elements in the queue,
– 10 consumer threads which will take these elements from queue when their when its delay will have expired.

In our example, this business object is a simple POJO named MyNoTheadSafeTask which implements the interface Delayed:

public class MyNoTheadSafeTask implements Delayed {
...
}
  • has the attribut name instanceName, queueInsertTime and random delay time between 0 and 5000 milliseconds endOfDelay corresponding to the admitted time between the current time and the insertion time of element in Queue:
  • private String instanceName; // name of instance   
    private static Random generator = new Random();
    private long queueInsertTime; // DateTime corresponding to the insertion time in Queue
    private long endOfDelay; // Max Delay admitted between the current time and the insertion time in Queue
    
    public MyNoTheadSafeTask(String name) {
    	// set name of instance
    	this.instanceName = name;
    	// random time between 0 and 5000 milliseconds
    	this.endOfDelay = generator.nextInt( 5000 );
    	this.queueInsertTime = System.currentTimeMillis();
    }
    
  • provides a mandatory method compareTo in order to compare 2 elements MyNoTheadSafeTask delayed:
  • @Override
    public int compareTo(Delayed o) {
    	MyNoTheadSafeTask myNoTheadSafeTask = (MyNoTheadSafeTask) o;
    	int ret = 0;
    	//
    	if (this.endOfDelay < myNoTheadSafeTask.endOfDelay){
    		ret = -1;
    	}else if (this.endOfDelay > myNoTheadSafeTask.endOfDelay){
    		ret = 1;
    	}else if (this.getQueueInsertTime() == myNoTheadSafeTask.getQueueInsertTime()){
    		ret = 0;
    	}
    	return ret;
    }
    
  • provides a mandatory method getDelay which returns 0 value or a value lower than 0, its delay has expired:
  • @Override
    public long getDelay(TimeUnit unit) {
    	long tmp = unit.convert(getQueueInsertTime() + endOfDelay - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    	return tmp;
    }
    

Then, the MySingletonDelegate singleton:

  • contains the pool/queue component DelayQueue:
  • private DelayQueue<MyNoTheadSafeTask> noTheadSafeTaskQueue = new DelayQueue<MyNoTheadSafeTask>();
    
  • an empty constructor:
  • private MySingletonDelegate() {}
    
  • provides a private method ‘getNoTheadSafeTaskFromPool’ to take an element ‘expired’ of “NoTheadSafeTask” from the queue.
  • public synchronized MyNoTheadSafeTask getNoTheadSafeTaskFromPool() {
    	try {
    		MyNoTheadSafeTask noTheadSafeTask = noTheadSafeTaskQueue.take();
    
    		long dequeueTime = System.currentTimeMillis();
    			
    		StringBuffer sb = new StringBuffer();
    		sb.append("——————–"+"\n");
    		sb.append("--> '"+noTheadSafeTask.getInstanceName()+"' instance has been taken from the queue (new size: "+noTheadSafeTaskQueue.size()+") \n");
    		sb.append("Queue Size ( Cons ) :"+MySingletonDelegate.getInstance().getNoTheadSafeTaskQueue().size()+"\n");
    		sb.append("Inserted Element :"+noTheadSafeTask.getInstanceName()+"\n");
    		sb.append("Queue Insertion Time :"+noTheadSafeTask.getQueueInsertTime()+"\n");
    		sb.append("Now ( dequeue time ) :"+dequeueTime+"\n");
    		sb.append("Expected Delay (ms):"+noTheadSafeTask.getEndOfDelay()+"\n");
    		sb.append("Actual Delay (ms):"+(dequeueTime-noTheadSafeTask.getQueueInsertTime())+"\n");
    		sb.append("Differences Actual and Expected Delay (ms):"+((dequeueTime-noTheadSafeTask.getQueueInsertTime())-noTheadSafeTask.getEndOfDelay())+"\n");
    		sb.append("——————–"+"\n");
    		System.out.println(sb.toString());
    			
    		return noTheadSafeTask;
    	} catch (Throwable e) {
    		System.out.println("getNoTheadSafeTaskFromPool : an Throwable occured:"+e.getMessage());
    	} // end-try
    	return null;
    }
    
  • provides a private method putNoTheadSafeTaskToPool to put an instance of “NoTheadSafeTask” in queue:
  • public final void putNoTheadSafeTaskToPool(MyNoTheadSafeTask noTheadSafeTask) {
    	try {
    		noTheadSafeTaskQueue.put(noTheadSafeTask);
    			
    		StringBuffer sb = new StringBuffer();
    		sb.append("——————–"+"\n");
    		sb.append("--> '"+ noTheadSafeTask.getInstanceName()+"' instance has been put in the queue\n");
    		sb.append("Queue Size ( Cons ) :"+MySingletonDelegate.getInstance().getNoTheadSafeTaskQueue().size()+"\n");
    		sb.append("Inserted Element :"+noTheadSafeTask.getInstanceName()+"\n");
    		sb.append("Queue Insertion Time :"+noTheadSafeTask.getQueueInsertTime()+"\n");
    		sb.append("Expected Delay (ms):"+noTheadSafeTask.getEndOfDelay()+"\n");
    		sb.append("——————–"+"\n");
    		System.out.println(sb.toString());
    					
    	} catch (Throwable e) {
    		System.out.println("putNoTheadSafeTaskToPool : an Throwable occured:"+e.getMessage());
    	} // end-try
    }
    

At last, the main method which will:
– create the singleton MySingletonDelegate which will create QUEUE,
– create one producer thread which will put 10 elements of “NoTheadSafeTask” in queue,
– create 10 consumer threads which will each take an element ‘expired’ of “NoTheadSafeTask” from queue,
– execute these 11 threads due to an ExecutorService;

// Counter of consumer threads created
private static AtomicInteger consumerCounterAtomic = new AtomicInteger(0);
	
public static void main(String[] args) {
		
	try {
		// create singleton which will create QUEUE
		MySingletonDelegate.getInstance();

		// Consumers and producer
		ExecutorService es = Executors.newFixedThreadPool(11);
			
		// Producer
		es.submit(new Runnable() { 
			public void run() { 
				int i = 0;
				while(i<10){ // Test with 10 elements
					try {
						// This runnable will create the instances of "NoTheadSafeTask" in the pool component
						String instanceName = "instanceName_"+i;
						// Put an instance of "NoTheadSafeTask" to the pool 
						MyNoTheadSafeTask noTheadSafeTask = new MyNoTheadSafeTask(instanceName);
						MySingletonDelegate.getInstance().putNoTheadSafeTaskToPool(noTheadSafeTask);
							
						Thread.sleep(1);
						//
						i++;
					} catch (Throwable e) {
						e.printStackTrace();
					}
				}
			} 
		});

			
		//Consumers
		while(consumerCounterAtomic.intValue() < 10) {
				
			//Thread
			es.submit(new Runnable() { 
				public void run() { 
					try {
						// Retrieve of a instance of "NoTheadSafeTask" from the pool 
						MyNoTheadSafeTask noTheadSafeTask = MySingletonDelegate.getInstance().getNoTheadSafeTaskFromPool();
					} catch (Throwable e) {
						e.printStackTrace();
					}
				} 
			});
				
			// Wait 1s between the threads' launching
			Thread.sleep( 1000 );  
				
			consumerCounterAtomic.incrementAndGet();
		} // end-for
					
		es.shutdown(); 

	} catch (Throwable e) {
		e.printStackTrace();
	}
}

… and the outputs would be:

——————–
--> 'instanceName_0' instance has been put in the queue
Queue Size ( Cons ) :1
Inserted Element :instanceName_0
Queue Insertion Time :1339716126491
Expected Delay (ms):3363
——————–

——————–
--> 'instanceName_1' instance has been put in the queue
Queue Size ( Cons ) :2
Inserted Element :instanceName_1
Queue Insertion Time :1339716126522
Expected Delay (ms):229
——————–

——————–
--> 'instanceName_2' instance has been put in the queue
Queue Size ( Cons ) :3
Inserted Element :instanceName_2
Queue Insertion Time :1339716126538
Expected Delay (ms):1642
——————–

——————–
--> 'instanceName_3' instance has been put in the queue
Queue Size ( Cons ) :4
Inserted Element :instanceName_3
Queue Insertion Time :1339716126539
Expected Delay (ms):2309
——————–

——————–
--> 'instanceName_4' instance has been put in the queue
Queue Size ( Cons ) :5
Inserted Element :instanceName_4
Queue Insertion Time :1339716126540
Expected Delay (ms):3241
——————–

——————–
--> 'instanceName_5' instance has been put in the queue
Queue Size ( Cons ) :6
Inserted Element :instanceName_5
Queue Insertion Time :1339716126541
Expected Delay (ms):3881
——————–

——————–
--> 'instanceName_6' instance has been put in the queue
Queue Size ( Cons ) :7
Inserted Element :instanceName_6
Queue Insertion Time :1339716126542
Expected Delay (ms):4995
——————–

——————–
--> 'instanceName_7' instance has been put in the queue
Queue Size ( Cons ) :8
Inserted Element :instanceName_7
Queue Insertion Time :1339716126543
Expected Delay (ms):4834
——————–

——————–
--> 'instanceName_8' instance has been put in the queue
Queue Size ( Cons ) :9
Inserted Element :instanceName_8
Queue Insertion Time :1339716126544
Expected Delay (ms):4998
——————–

——————–
--> 'instanceName_9' instance has been put in the queue
Queue Size ( Cons ) :10
Inserted Element :instanceName_9
Queue Insertion Time :1339716126545
Expected Delay (ms):4002
——————–

——————–
--> 'instanceName_1' instance has been taken from the queue (new size: 9) 
Queue Size ( Cons ) :9
Inserted Element :instanceName_1
Queue Insertion Time :1339716126522
Now ( dequeue time ) :1339716126766
Expected Delay (ms):229
Actual Delay (ms):244
Differences Actual and Expected Delay (ms):15
——————–

——————–
--> 'instanceName_2' instance has been taken from the queue (new size: 8) 
Queue Size ( Cons ) :8
Inserted Element :instanceName_2
Queue Insertion Time :1339716126538
Now ( dequeue time ) :1339716128186
Expected Delay (ms):1642
Actual Delay (ms):1648
Differences Actual and Expected Delay (ms):6
——————–

——————–
--> 'instanceName_3' instance has been taken from the queue (new size: 7) 
Queue Size ( Cons ) :7
Inserted Element :instanceName_3
Queue Insertion Time :1339716126539
Now ( dequeue time ) :1339716128857
Expected Delay (ms):2309
Actual Delay (ms):2318
Differences Actual and Expected Delay (ms):9
——————–

——————–
--> 'instanceName_4' instance has been taken from the queue (new size: 6) 
Queue Size ( Cons ) :6
Inserted Element :instanceName_4
Queue Insertion Time :1339716126540
Now ( dequeue time ) :1339716129793
Expected Delay (ms):3241
Actual Delay (ms):3253
Differences Actual and Expected Delay (ms):12
——————–

——————–
--> 'instanceName_0' instance has been taken from the queue (new size: 5) 
Queue Size ( Cons ) :5
Inserted Element :instanceName_0
Queue Insertion Time :1339716126491
Now ( dequeue time ) :1339716130543
Expected Delay (ms):3363
Actual Delay (ms):4052
Differences Actual and Expected Delay (ms):689
——————–

——————–
--> 'instanceName_5' instance has been taken from the queue (new size: 4) 
Queue Size ( Cons ) :4
Inserted Element :instanceName_5
Queue Insertion Time :1339716126541
Now ( dequeue time ) :1339716131557
Expected Delay (ms):3881
Actual Delay (ms):5016
Differences Actual and Expected Delay (ms):1135
——————–

——————–
--> 'instanceName_9' instance has been taken from the queue (new size: 3) 
Queue Size ( Cons ) :3
Inserted Element :instanceName_9
Queue Insertion Time :1339716126545
Now ( dequeue time ) :1339716132572
Expected Delay (ms):4002
Actual Delay (ms):6027
Differences Actual and Expected Delay (ms):2025
——————–

——————–
--> 'instanceName_7' instance has been taken from the queue (new size: 2) 
Queue Size ( Cons ) :2
Inserted Element :instanceName_7
Queue Insertion Time :1339716126543
Now ( dequeue time ) :1339716133587
Expected Delay (ms):4834
Actual Delay (ms):7044
Differences Actual and Expected Delay (ms):2210
——————–

——————–
--> 'instanceName_6' instance has been taken from the queue (new size: 1) 
Queue Size ( Cons ) :1
Inserted Element :instanceName_6
Queue Insertion Time :1339716126542
Now ( dequeue time ) :1339716134602
Expected Delay (ms):4995
Actual Delay (ms):8060
Differences Actual and Expected Delay (ms):3065
——————–

——————–
--> 'instanceName_8' instance has been taken from the queue (new size: 0) 
Queue Size ( Cons ) :0
Inserted Element :instanceName_8
Queue Insertion Time :1339716126544
Now ( dequeue time ) :1339716135617
Expected Delay (ms):4998
Actual Delay (ms):9073
Differences Actual and Expected Delay (ms):4075
——————–


Conclusion
In this example, we could analyze that after the adding of instances of “NoTheadSafeTask” in queue by the producer, the consumers threads take the elements expired after a period of time. This order is aleatory and different of FIFO (First In, First Out) order and FILO (First In, Last Out) order:

--> 'instanceName_1' instance has been taken from the queue (new size: 9) 
Expected Delay (ms):229
--> 'instanceName_2' instance has been taken from the queue (new size: 8) 
Expected Delay (ms):1642
--> 'instanceName_3' instance has been taken from the queue (new size: 7) 
Expected Delay (ms):2309
--> 'instanceName_4' instance has been taken from the queue (new size: 6) 
Expected Delay (ms):3241
--> 'instanceName_0' instance has been taken from the queue (new size: 5) 
Expected Delay (ms):3363
--> 'instanceName_5' instance has been taken from the queue (new size: 4) 
Expected Delay (ms):3881
--> 'instanceName_9' instance has been taken from the queue (new size: 3) 
Expected Delay (ms):4002
--> 'instanceName_7' instance has been taken from the queue (new size: 2) 
Expected Delay (ms):4834
--> 'instanceName_6' instance has been taken from the queue (new size: 1) 
Expected Delay (ms):4995
--> 'instanceName_8' instance has been taken from the queue (new size: 0) 
Expected Delay (ms):4998

The source code of this article and the a test cases are in the ZIP file attachement.

Download: java_queue.zip

Best regards,

Huseyin OZVEREN.

Leave a Reply

Your email address will not be published.

Time limit is exhausted. Please reload CAPTCHA.

Related Post