After my post concerning presentation of Queue/BlockingQueue, I would continue with a post about the BlockingQueue implementation: java.util.concurrent.PriorityBlockingQueue.

This implementation of BlockingQueue stores and keeps the elements internally in a unbounded concurrent queue. These elements are ordered and weighted in accordance with a order based on a priority heap. This order is not necessarily the FIFO (First In, First Out) order. Of course, this type of BlockingQueue supplies blocking retrieval operations. While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryError). The elements are ordered according to their natural ordering, or by the priority decided in the Comparable/Comparator implementation.
The insertion of non-comparable objects are not permit (doing so results in ClassCastException). More, the PriorityBlockingQueue doesn’t enforce any specific behaviour for elements that have equal priority (compare() == 0). The Iterator provided in method iterator() is not guaranteed to traverse the elements of the PriorityBlockingQueue in any particular order.

We will illustrate our writing by example of queue containing ‘no thread-safe’ business objects MyNoTheadSafeTask which will be accessed by a series of consumer/producer threads: each thread takes an instance from queue, calls an action on this instance and release (put) it again in the queue.

Example n°1: FIFO (First In, First Out) order
In our example, this business object is a simple POJO, however, this POJO could be a ‘no thread-safe’ bean Spring with the scope “prototype”:

<bean id="fileBean" class="com.ho.utils.businessobject.MyNoTheadSafeTask" scope="prototype">
        <property name="property1" value="value1" />
</bean>

So, our business object :

  • has the attribut name instanceName and random sleep time between 0 and 5 seconds:
  • private int sleepTime; // random sleep time for instance   
    private String instanceName; // name of instance   
    private static Random generator = new Random();
    	
    public MyNoTheadSafeTask(String name){
    	// set name of instance
    	this.instanceName = name;
    	// random sleep time between 0 and 5 seconds
    	this.sleepTime = generator.nextInt( 5000 ); 
    }
    
  • provides a method action corresponding to a System.out output and a random sleep time:
  • public boolean action(String paramValue1, int paramValue2) throws Throwable{
    	try{ 
    		// put instance to sleep for sleepTime amount of time
    		System.out.printf("--> '%s' starting 'action' and going to sleep for %d milliseconds.\n", this.instanceName, this.sleepTime);
    		// put instance to sleep
    		Thread.sleep( this.sleepTime );   
    	} catch ( InterruptedException exception )  {  
    		exception.printStackTrace(); 
    	}
    	return true;
    }
    

Then, the MySingletonDelegate singleton:

  • contains the pool/queue component PriorityBlockingQueue:
  • private PriorityBlockingQueue<MyNoTheadSafeTask> noTheadSafeTaskQueue = new PriorityBlockingQueue<MyNoTheadSafeTask>();
    
  • create several instances of “NoTheadSafeTask” in this pool component:
  • private MySingletonDelegate() {
    	for (int i=0; i < 5; i++) {
    		String instanceName = "instanceName_"+i;
    		System.out.println("NoTheadSafeTask.createInstance("+instanceName+");");
    		MyNoTheadSafeTask noTheadSafeTask = new MyNoTheadSafeTask(instanceName);
    		noTheadSafeTaskQueue.add(noTheadSafeTask);
    	} // end-if
    }
    
  • provides a private method ‘getNoTheadSafeTaskFromPool’ to take the unique instance of “NoTheadSafeTask” in queue/pool.
  • private synchronized MyNoTheadSafeTask getNoTheadSafeTaskFromPool() {
    	try {
    		MyNoTheadSafeTask noTheadSafeTask = noTheadSafeTaskQueue.take();
    		System.out.printf("--> '%s' instance has been taken from the queue (new size: %d) \n", noTheadSafeTask.getInstanceName(), noTheadSafeTaskQueue.size());
    		return noTheadSafeTask;
    	} catch (InterruptedException e) {
    		System.out.println("getNoTheadSafeTaskFromPool : an InterruptedException occured:"+e.getMessage());
    	} // end-try
    	return null;
    }
    
  • provides a private method releaseNoTheadSafeTaskToPool to put an instance of “NoTheadSafeTask” in queue/pool:
  • private final void releaseNoTheadSafeTaskToPool(MyNoTheadSafeTask noTheadSafeTask) {
    	try {
    		noTheadSafeTaskQueue.put(noTheadSafeTask);
    		System.out.printf("--> '%s' instance has been released from the queue (new size: %d) \n", noTheadSafeTask.getInstanceName(), noTheadSafeTaskQueue.size());
    	} catch (Throwable e) {
    		System.out.println("releaseNoTheadSafeTaskToPool : an InterruptedException occured:"+e.getMessage());
    	} // end-try
    }
    
  • provides a public method ‘completeAction’ allowing a complete action with the retrieve of a instance of “NoTheadSafeTask” from the pool, the execution of action on this retrieved instance and the release of this retrieved instance of “NoTheadSafeTask” to the pool:
  • public boolean completeAction(String paramValue1, int paramValue2) throws Throwable {
    	MyNoTheadSafeTask noTheadSafeTask = getNoTheadSafeTaskFromPool();
    	try {
    		return noTheadSafeTask.action(paramValue1, paramValue2);
    	} finally {
    		releaseNoTheadSafeTaskToPool(noTheadSafeTask);
    	}
    }
    

At last, the main method which will:
– create the singleton MySingletonDelegate which will create QUEUE,
– create 5 threads which will each call the business method completeAction provided by the above singleton MySingletonDelegate,
– execute these 5 threads due to an ExecutorService;

public static void main(String[] args) {
	try {
		// create singleton which will create QUEUE
		MySingletonDelegate.getInstance();
			
		// Client classes of the  ClientDelegate
		ExecutorService es = Executors.newFixedThreadPool(5);
		//
		for (int i=0; i < 5; i++) {
				
			//Thread
			es.submit(new Runnable() { 
				public void run() { 
					try {
						MySingletonDelegate.getInstance().completeAction(null, 1);
					} catch (Throwable e) {
						e.printStackTrace();
					}
				} 
			});
			
		} // end-for
			
		es.shutdown(); 

	} catch (Throwable e) {
		e.printStackTrace();
	}
}

… and the outputs would be:

NoTheadSafeTask.createInstance(instanceName_0);
NoTheadSafeTask.createInstance(instanceName_1);
NoTheadSafeTask.createInstance(instanceName_2);
NoTheadSafeTask.createInstance(instanceName_3);
NoTheadSafeTask.createInstance(instanceName_4);
--> 'instanceName_0' instance has been taken from the queue (new size: 4) 
--> 'instanceName_0' starting 'action' and going to sleep for 1294 milliseconds.
--> 'instanceName_1' instance has been taken from the queue (new size: 3) 
--> 'instanceName_1' starting 'action' and going to sleep for 4434 milliseconds.
--> 'instanceName_2' instance has been taken from the queue (new size: 2) 
--> 'instanceName_2' starting 'action' and going to sleep for 2170 milliseconds.
--> 'instanceName_3' instance has been taken from the queue (new size: 1) 
--> 'instanceName_3' starting 'action' and going to sleep for 2675 milliseconds.
--> 'instanceName_4' instance has been taken from the queue (new size: 0) 
--> 'instanceName_4' starting 'action' and going to sleep for 106 milliseconds.
--> 'instanceName_4' instance has been released from the queue (new size: 1) 
--> 'instanceName_0' instance has been released from the queue (new size: 2) 
--> 'instanceName_2' instance has been released from the queue (new size: 3) 
--> 'instanceName_3' instance has been released from the queue (new size: 4) 
--> 'instanceName_1' instance has been released from the queue (new size: 5) 

Conclusion n°1
In this example, we could analyze that the natural order is similar to FIFO order (or natural ordering) which is kept because the instances taken by the consumer threads are in the same order than the adding’s order in the queue/pool.

NoTheadSafeTask.createInstance(instanceName_0);
NoTheadSafeTask.createInstance(instanceName_1);
NoTheadSafeTask.createInstance(instanceName_2);
NoTheadSafeTask.createInstance(instanceName_3);
NoTheadSafeTask.createInstance(instanceName_4);
...
--> 'instanceName_0' instance has been taken from the queue (new size: 4) 
--> 'instanceName_1' instance has been taken from the queue (new size: 3) 
--> 'instanceName_2' instance has been taken from the queue (new size: 2) 
--> 'instanceName_3' instance has been taken from the queue (new size: 1) 
--> 'instanceName_4' instance has been taken from the queue (new size: 0) 

Example n°2: FILO (First In, Last Out) order
In our second example, our previous business object will be modified to implement the Comparable interface. This new POJO will be named MyNoTheadSafeTaskFILO and will extend the previous POJO MyNoTheadSafeTask. So, this new POJO:

  • has the attribut name seqNum corresponding to a object ID,
  • has a static attribut AtomicLong seq used to generate the above seqNum,
  • provides a method compareTo in order to compare an instance of ‘MyNoTheadSafeTaskFILO’ to another,
/**
 * New Business object which extends the previous object 'MyNoTheadSafeTask' and implements the interface Comparable
 * @author huseyin
 *
 */
public class MyNoTheadSafeTaskFILO extends MyNoTheadSafeTask implements Comparable<MyNoTheadSafeTaskFILO>{

	// Sequence of seqNum number 
	private final static AtomicLong seq = new AtomicLong();
	// seqNum or ID number 
	private final long seqNum;

	public MyNoTheadSafeTaskFILO(String name){
		super(name);
		// SeqNum
		this.seqNum = seq.getAndIncrement();
	}
	
	/**
	 * Comparable method FILA order
	 * @param other
	 * @return
	 */
	public int compareTo(MyNoTheadSafeTaskFILO other) {
		int res = (seqNum > other.seqNum ? -1 : 1);
		return res;
	}
	
	public long getSeqNum() {
		return seqNum;
	}	
}

Then, a new MySingletonDelegateFILO singleton:

  • contains the pool/queue component PriorityBlockingQueue:
  • private PriorityBlockingQueue <MyNoTheadSafeTaskFILO> noTheadSafeTaskQueue = new PriorityBlockingQueue<MyNoTheadSafeTaskFILO>();
    
  • create several instances of “MyNoTheadSafeTaskFILO” in this pool component:
  • private MySingletonDelegate() {
    	for (int i=0; i < 5; i++) {
    		String instanceName = "instanceName_"+i;
    		System.out.println("NoTheadSafeTask.createInstance("+instanceName+");");
    		MyNoTheadSafeTaskFILO noTheadSafeTask = new MyNoTheadSafeTaskFILO(instanceName);
    		noTheadSafeTaskQueue.add(noTheadSafeTask);
    	} // end-if
    }
    
  • provides a private method ‘getNoTheadSafeTaskFromPool’ to take the unique instance of “MyNoTheadSafeTaskFILO” in queue/pool.
  • private synchronized MyNoTheadSafeTaskFILO getNoTheadSafeTaskFromPool() {
    	try {
    		MyNoTheadSafeTaskFILO noTheadSafeTask = noTheadSafeTaskQueue.take();
    		System.out.printf("--> '%s' instance has been taken from the queue (new size: %d) \n", noTheadSafeTask.getInstanceName(), noTheadSafeTaskQueue.size());
    		return noTheadSafeTask;
    	} catch (InterruptedException e) {
    		System.out.println("getNoTheadSafeTaskFromPool : an InterruptedException occured:"+e.getMessage());
    	} // end-try
    	return null;
    }
    
  • provides a private method releaseNoTheadSafeTaskToPool to put an instance of “MyNoTheadSafeTaskFILO” in queue/pool:
  • private final void releaseNoTheadSafeTaskToPool(MyNoTheadSafeTaskFILO noTheadSafeTask) {
    	try {
    		noTheadSafeTaskQueue.put(noTheadSafeTask);
    		System.out.printf("--> '%s' instance has been released from the queue (new size: %d) \n", noTheadSafeTask.getInstanceName(), noTheadSafeTaskQueue.size());
    	} catch (Throwable e) {
    		System.out.println("releaseNoTheadSafeTaskToPool : an InterruptedException occured:"+e.getMessage());
    	} // end-try
    }
    
  • provides a public method ‘completeAction’ allowing a complete action with the retrieve of a instance of “MyNoTheadSafeTaskFILO” from the pool, the execution of action on this retrieved instance and the release of this retrieved instance of “MyNoTheadSafeTaskFILO” to the pool:
  • public boolean completeAction(String paramValue1, int paramValue2) throws Throwable {
    	MyNoTheadSafeTaskFILO noTheadSafeTask = getNoTheadSafeTaskFromPool();
    	try {
    		return noTheadSafeTask.action(paramValue1, paramValue2);
    	} finally {
    		releaseNoTheadSafeTaskToPool(noTheadSafeTask);
    	}
    }
    

At last, the main method which will:
– create the singleton MySingletonDelegateFILO which will create QUEUE,
– create 5 threads which will each call the business method completeAction provided by the above singleton MySingletonDelegateFILO,
– execute these 5 threads due to an ExecutorService;

public static void main(String[] args) {
	try {
		// create singleton which will create QUEUE
		MySingletonDelegateFILO.getInstance();
			
		// Client classes of the  ClientDelegate
		ExecutorService es = Executors.newFixedThreadPool(5);
		//
		for (int i=0; i < 5; i++) {
				
			//Thread
			es.submit(new Runnable() { 
				public void run() { 
					try {
						MySingletonDelegateFILO.getInstance().completeAction(null, 1);
					} catch (Throwable e) {
						e.printStackTrace();
					}
				} 
			});
			
		} // end-for
			
		es.shutdown(); 

	} catch (Throwable e) {
		e.printStackTrace();
	}
}

… and the outputs would be:

NoTheadSafeTask.createInstance(instanceName_0);
NoTheadSafeTask.createInstance(instanceName_1);
NoTheadSafeTask.createInstance(instanceName_2);
NoTheadSafeTask.createInstance(instanceName_3);
NoTheadSafeTask.createInstance(instanceName_4);
--> 'instanceName_4' instance has been taken from the queue (new size: 4) 
--> 'instanceName_4' starting 'action' and going to sleep for 636 milliseconds.
--> 'instanceName_3' instance has been taken from the queue (new size: 3) 
--> 'instanceName_3' starting 'action' and going to sleep for 1500 milliseconds.
--> 'instanceName_2' instance has been taken from the queue (new size: 2) 
--> 'instanceName_2' starting 'action' and going to sleep for 459 milliseconds.
--> 'instanceName_1' instance has been taken from the queue (new size: 1) 
--> 'instanceName_1' starting 'action' and going to sleep for 3959 milliseconds.
--> 'instanceName_0' instance has been taken from the queue (new size: 0) 
--> 'instanceName_0' starting 'action' and going to sleep for 2230 milliseconds.
--> 'instanceName_2' instance has been released from the queue (new size: 1) 
--> 'instanceName_4' instance has been released from the queue (new size: 2) 
--> 'instanceName_3' instance has been released from the queue (new size: 3) 
--> 'instanceName_0' instance has been released from the queue (new size: 4) 
--> 'instanceName_1' instance has been released from the queue (new size: 5) 

Conclusion n°2
In this example, we could analyze that the PriorityBlockingQueue queue has been customized with a FILO (First In, Last Out) order. The instances taken by the consumer threads are in the inverse order (FILO) than the adding’s order in the queue/pool. Of course, we could implement a more complex logic in the comparable business.

NoTheadSafeTask.createInstance(instanceName_0);
NoTheadSafeTask.createInstance(instanceName_1);
NoTheadSafeTask.createInstance(instanceName_2);
NoTheadSafeTask.createInstance(instanceName_3);
NoTheadSafeTask.createInstance(instanceName_4);
...
--> 'instanceName_4' instance has been taken from the queue (new size: 4) 
--> 'instanceName_3' instance has been taken from the queue (new size: 3) 
--> 'instanceName_2' instance has been taken from the queue (new size: 2) 
--> 'instanceName_1' instance has been taken from the queue (new size: 1) 
--> 'instanceName_0' instance has been taken from the queue (new size: 0) 

That’s all!
The source code of this article and the a test cases are in the ZIP file attachement.

Download: java_queue.zip

Best regards,

Huseyin.