JavaBlog.fr / Java.lu DEVELOPMENT,Java Java: BlockingQueue implementation SynchronousQueue (Part5)

Java: BlockingQueue implementation SynchronousQueue (Part5)

After my post concerning presentation of Queue/BlockingQueue, I would continue with a post about the BlockingQueue implementation: java.util.concurrent.SynchronousQueue.

This implementation of BlockingQueue stores and keeps ONLY a single element internally. For example, a producer thread inserting an element into the queue is blocked until another consumer thread takes that element from the queue. Similarly, if the consumer thread tries to take an element whereas no element is present, then, the consumer thread is blocked until the producer thread inserts a new element into the queue.

We will illustrate our writing by example of queue containing ‘no thread-safe’ business objects MyNoTheadSafeTask which will be accessed by:
– one producer thread which cyclically, will try to put an instance in the queue,
– several consumer threads which will take the unique instance from queue and will call an action on this instance.

In our example, this business object is a simple POJO, however, this POJO could be a ‘no thread-safe’ bean Spring with the scope “prototype”:

<bean id="fileBean" class="com.ho.utils.businessobject.MyNoTheadSafeTask" scope="prototype">
        <property name="property1" value="value1" />
</bean>

So, our business object :

  • has the attribut name instanceName and random sleep time between 0 and 5 seconds:
  • private int sleepTime; // random sleep time for instance   
    private String instanceName; // name of instance   
    private static Random generator = new Random();
    	
    public MyNoTheadSafeTask(String name){
    	// set name of instance
    	this.instanceName = name;
    	// random sleep time between 0 and 5 seconds
    	this.sleepTime = generator.nextInt( 5000 ); 
    }
    
  • provides a method action corresponding to a System.out output and a random sleep time:
  • public boolean action(String paramValue1, int paramValue2) throws Throwable{
    	try{ 
    		// put instance to sleep for sleepTime amount of time
    		System.out.printf("--> '%s' starting 'action' and going to sleep for %d milliseconds.\n", this.instanceName, this.sleepTime);
    		// put instance to sleep
    		Thread.sleep( this.sleepTime );   
    	} catch ( InterruptedException exception )  {  
    		exception.printStackTrace(); 
    	}
    	return true;
    }
    

Then, the MySingletonDelegate singleton:

  • contains the pool/queue component SynchronousQueue:
  • private SynchronousQueue <MyNoTheadSafeTask> noTheadSafeTaskQueue = new SynchronousQueue<MyNoTheadSafeTask>();
    
  • create several instances of “NoTheadSafeTask” in this pool component:
  • private MySingletonDelegate() {
    	for (int i=0; i < 5; i++) {
    		String instanceName = "instanceName_"+i;
    		System.out.println("NoTheadSafeTask.createInstance("+instanceName+");");
    		MyNoTheadSafeTask noTheadSafeTask = new MyNoTheadSafeTask(instanceName);
    		noTheadSafeTaskQueue.add(noTheadSafeTask);
    	} // end-if
    }
    

    If we execute this method, then the following errors will appear in outputs because the SynchronousQueue can contain only an unique element:

    NoTheadSafeTask.createInstance(instanceName_0);
    java.lang.IllegalStateException: Queue full
    	at java.util.AbstractQueue.add(AbstractQueue.java:71)
    	at ho.tests.thread.queue.synchronous.MySingletonDelegate.<init>(MySingletonDelegate.java:40)
    	at ho.tests.thread.queue.synchronous.MySingletonDelegate.getInstance(MySingletonDelegate.java:47)
    	at ho.tests.thread.queue.synchronous.ApplicationLaunch.main(ApplicationLaunch.java:16)
    

    So, we will replace the ‘add’ method by ‘offer’:

    private MySingletonDelegate() {
    	try {
    		for (int i=0; i < 5; i++) {
    			String instanceName = "instanceName_"+i;
    			System.out.println("NoTheadSafeTask.createInstance("+instanceName+");");
    			MyNoTheadSafeTask noTheadSafeTask = new MyNoTheadSafeTask(instanceName);
    			noTheadSafeTaskQueue.offer(noTheadSafeTask);
    		} // end-if
    
    		System.out.println("NoTheadSafeTask creation of instances is done !");
    	} catch (Throwable e) {
    		e.printStackTrace();
    	}
    }
    

    … then the following traces will appear in outputs because the producer is not blocked because the ‘offer’ method doesn’t do anything if there are no consumers:

    NoTheadSafeTask.createInstance(instanceName_0);
    NoTheadSafeTask.createInstance(instanceName_1);
    NoTheadSafeTask.createInstance(instanceName_2);
    NoTheadSafeTask.createInstance(instanceName_3);
    NoTheadSafeTask.createInstance(instanceName_4);
    NoTheadSafeTask creation of instances is done !
    

    We will replace the ‘offer’ method by ‘put’:

    private MySingletonDelegate() {
    	try {
    		for (int i=0; i < 5; i++) {
    			String instanceName = "instanceName_"+i;
    			System.out.println("NoTheadSafeTask.createInstance("+instanceName+");");
    			MyNoTheadSafeTask noTheadSafeTask = new MyNoTheadSafeTask(instanceName);
    			noTheadSafeTaskQueue.put(noTheadSafeTask);
    		} // end-if
    
    		System.out.println("NoTheadSafeTask creation of instances is done !");
    	} catch (Throwable e) {
    		e.printStackTrace();
    	}
    }
    

    … then the following traces will appear in outputs because the producer is blocked because the ‘put’ method adds the specified element to this queue, by waiting if necessary for another thread to receive it:

    NoTheadSafeTask.createInstance(instanceName_0);
    

    So, to conclude, all adding methods of SynchronousQueue wait for another thread to receive the element.
    For this, we will modify this constructor ‘MySingletonDelegate()’ like:

    private MySingletonDelegate() {}
    
  • provides a private method ‘getNoTheadSafeTaskFromPool’ to take the unique instance of “NoTheadSafeTask” in queue/pool.
  • public synchronized MyNoTheadSafeTask getNoTheadSafeTaskFromPool(String threadName) {
    	try {
    		MyNoTheadSafeTask noTheadSafeTask = noTheadSafeTaskQueue.take();
    		System.out.printf("--> '%s' instance has been taken from the queue by the consumer thread '%s'\n", noTheadSafeTask.getInstanceName(), threadName);
    		return noTheadSafeTask;
    	} catch (InterruptedException e) {
    		System.out.println("getNoTheadSafeTaskFromPool : an InterruptedException occured:"+e.getMessage());
    	} // end-try
    	return null;
    }
    
  • provides a private method putNoTheadSafeTaskToPool to put an instance of “NoTheadSafeTask” in queue/pool:
  • public final void putNoTheadSafeTaskToPool(MyNoTheadSafeTask noTheadSafeTask, String threadName) {
    	try {
    		noTheadSafeTaskQueue.put(noTheadSafeTask);
    		System.out.printf("--> '%s' instance has been put in the queue by the producer thread '%s'\n", noTheadSafeTask.getInstanceName(), threadName);
    	} catch (InterruptedException e) {
    		System.out.println("putNoTheadSafeTaskToPool : an InterruptedException occured:"+e.getMessage());
    	} // end-try
    }
    

At last, the main method which will:
– create the singleton MySingletonDelegate which will create QUEUE,
– create one producer thread which will put 10 elements of “NoTheadSafeTask” in queue/pool,
– create 10 consumer threads which will each take the unique instance of “NoTheadSafeTask” from queue/pool,
– execute these 11 threads due to an ExecutorService;

// Counter of consumer threads created
private static AtomicInteger consumerCounterAtomic = new AtomicInteger(0);

public static void main(String[] args) {
		
	try {
		// create singleton which will create QUEUE
		MySingletonDelegate.getInstance();

		// Consumers and producer
		ExecutorService es = Executors.newFixedThreadPool(11);
			
		// Producer
		es.submit(new Runnable() { 
			public void run() { 
				int i = 0;
				while(i<10){ // Test with 10 elements
					try {
						// This runnable will create the instances of "NoTheadSafeTask" in the pool component
						// in order to create only ONE instance in the queue:
						String instanceName = "instanceName_"+i;
						// Put an instance of "NoTheadSafeTask" to the pool 
						MyNoTheadSafeTask noTheadSafeTask = new MyNoTheadSafeTask(instanceName);
						MySingletonDelegate.getInstance().putNoTheadSafeTaskToPool(noTheadSafeTask, "ProducerThread_1");
							
						//
						i++;
					} catch (Throwable e) {
						e.printStackTrace();
					}
				}
			} 
		});

			
		//Consumers
		while(consumerCounterAtomic.intValue() < 10) {
				
			//Thread
			es.submit(new Runnable() { 
				public void run() { 
					try {
						// Retrieve of a instance of "NoTheadSafeTask" from the pool 
						MyNoTheadSafeTask noTheadSafeTask = MySingletonDelegate.getInstance().getNoTheadSafeTaskFromPool("ConsumerThread_"+consumerCounterAtomic.intValue());

						// Execution of action on this retrieved instance,
						noTheadSafeTask.action(null, 1);

					} catch (Throwable e) {
						e.printStackTrace();
					}
				} 
			});
				
			// Wait 1s between the threads' launching
			Thread.sleep( 1000 );  
				
			consumerCounterAtomic.incrementAndGet();
		} // end-for
			
		es.shutdown(); 

	} catch (Throwable e) {
		e.printStackTrace();
	}
}

… and the outputs would be:

--> 'instanceName_0' instance has been put in the queue by the producer thread 'ProducerThread_1'
--> 'instanceName_0' instance has been taken from the queue by the consumer thread 'ConsumerThread_0'
--> 'instanceName_0' starting 'action' and going to sleep for 2397 milliseconds.
--> 'instanceName_1' instance has been put in the queue by the producer thread 'ProducerThread_1'
--> 'instanceName_1' instance has been taken from the queue by the consumer thread 'ConsumerThread_1'
--> 'instanceName_1' starting 'action' and going to sleep for 1158 milliseconds.
--> 'instanceName_2' instance has been taken from the queue by the consumer thread 'ConsumerThread_2'
--> 'instanceName_2' starting 'action' and going to sleep for 2886 milliseconds.
--> 'instanceName_2' instance has been put in the queue by the producer thread 'ProducerThread_1'
--> 'instanceName_3' instance has been taken from the queue by the consumer thread 'ConsumerThread_3'
--> 'instanceName_3' starting 'action' and going to sleep for 3273 milliseconds.
--> 'instanceName_3' instance has been put in the queue by the producer thread 'ProducerThread_1'
--> 'instanceName_4' instance has been taken from the queue by the consumer thread 'ConsumerThread_4'
--> 'instanceName_4' instance has been put in the queue by the producer thread 'ProducerThread_1'
--> 'instanceName_4' starting 'action' and going to sleep for 323 milliseconds.
--> 'instanceName_5' instance has been taken from the queue by the consumer thread 'ConsumerThread_5'
--> 'instanceName_5' starting 'action' and going to sleep for 3120 milliseconds.
--> 'instanceName_5' instance has been put in the queue by the producer thread 'ProducerThread_1'
--> 'instanceName_6' instance has been taken from the queue by the consumer thread 'ConsumerThread_6'
--> 'instanceName_6' starting 'action' and going to sleep for 2503 milliseconds.
--> 'instanceName_6' instance has been put in the queue by the producer thread 'ProducerThread_1'
--> 'instanceName_7' instance has been taken from the queue by the consumer thread 'ConsumerThread_7'
--> 'instanceName_7' starting 'action' and going to sleep for 1259 milliseconds.
--> 'instanceName_7' instance has been put in the queue by the producer thread 'ProducerThread_1'
--> 'instanceName_8' instance has been taken from the queue by the consumer thread 'ConsumerThread_8'
--> 'instanceName_8' starting 'action' and going to sleep for 1921 milliseconds.
--> 'instanceName_8' instance has been put in the queue by the producer thread 'ProducerThread_1'
--> 'instanceName_9' instance has been taken from the queue by the consumer thread 'ConsumerThread_9'
--> 'instanceName_9' starting 'action' and going to sleep for 3665 milliseconds.
--> 'instanceName_9' instance has been put in the queue by the producer thread 'ProducerThread_1' 

Conclusion
In this example, we could analyze that after the adding of an instance of “NoTheadSafeTask” in queue/pool the producer waits the taking (Synchronous aspect) of it by the consumer thread:

--> 'instanceName_0' instance has been put in the queue by the producer thread 'ProducerThread_1'
--> 'instanceName_0' instance has been taken from the queue by the consumer thread 'ConsumerThread_0'
--> 'instanceName_1' instance has been put in the queue by the producer thread 'ProducerThread_1'
--> 'instanceName_1' instance has been taken from the queue by the consumer thread 'ConsumerThread_1'
--> 'instanceName_2' instance has been taken from the queue by the consumer thread 'ConsumerThread_2'
--> 'instanceName_2' instance has been put in the queue by the producer thread 'ProducerThread_1'
--> 'instanceName_3' instance has been taken from the queue by the consumer thread 'ConsumerThread_3'
...

The source code of this article and the a test cases are in the ZIP file attachement.

Download: java_queue.zip

Best regards,

Leave a Reply

Your email address will not be published.

Time limit is exhausted. Please reload CAPTCHA.

Related Post

Documentum : Audit trail entries dm_audittrail / SessionConfig / application_codeDocumentum : Audit trail entries dm_audittrail / SessionConfig / application_code

Hi, After my previous posts concerning the Documentum audit trail entries (dm_audittrail) http://www.javablog.fr/documentum-creation-of-audit-trail-entries-dm_audittrail.html and http://www.javablog.fr/deleting-of-audit-trail-entries-dm_audittrail.html, here, I would like to expose a solution in order to force content server to