JavaBlog.fr / Java.lu DEVELOPMENT,Java Java: Queue implementation ConcurrentLinkedQueue (Part4)

Java: Queue implementation ConcurrentLinkedQueue (Part4)

After my post concerning presentation of Queue/BlockingQueue, I would continue with a post about the Queue implementation: java.util.concurrent.ConcurrentLinkedQueue.

This implementation of Queue (NOT BlockingQueue) is thread-safe queue which stores and keeps the elements internally in an unbounded queue based on linked nodes and in FIFO (First In, First Out) order. A ConcurrentLinkedQueue is an appropriate choice when many threads will share access to a common collection. This queue does not permit null elements.

We will illustrate our writing by the example of pool/queue containing ‘no thread-safe’ business objects MyNoTheadSafeTask which will be accessed by a series of consumer/producer threads: each thread takes an instance from queue, calls an action on this instance and release (put) it again in the queue.

In our example, this business object is a simple POJO, however, this POJO could be a ‘no thread-safe’ bean Spring with the scope “prototype”:

1<bean id="fileBean" class="com.ho.utils.businessobject.MyNoTheadSafeTask" scope="prototype">
2        <property name="property1" value="value1" />
3</bean>

So, our business object :

  • has the attribut name instanceName and random sleep time between 0 and 5 seconds:
  • 01private int sleepTime; // random sleep time for instance  
    02private String instanceName; // name of instance  
    03private static Random generator = new Random();
    04     
    05public MyNoTheadSafeTask(String name){
    06    // set name of instance
    07    this.instanceName = name;
    08    // random sleep time between 0 and 5 seconds
    09    this.sleepTime = generator.nextInt( 5000 );
    10}
  • provides a method action corresponding to a System.out output and a random sleep time:
  • 01public boolean action(String paramValue1, int paramValue2) throws Throwable{
    02    try{
    03        // put instance to sleep for sleepTime amount of time
    04        System.out.printf("--> '%s' starting 'action' and going to sleep for %d milliseconds.\n", this.instanceName, this.sleepTime);
    05        // put instance to sleep
    06        Thread.sleep( this.sleepTime );  
    07    } catch ( InterruptedException exception )  { 
    08        exception.printStackTrace();
    09    }
    10    return true;
    11}

Then, the MySingletonDelegate singleton:

  • contains the pool/queue component ConcurrentLinkedQueue:
  • 1private ConcurrentLinkedQueue <MyNoTheadSafeTask> noTheadSafeTaskQueue = new ConcurrentLinkedQueue<MyNoTheadSafeTask>();
  • create several instances of “NoTheadSafeTask” in this pool component:
  • 1private MySingletonDelegate() {
    2    for (int i=0; i < 5; i++) {
    3        String instanceName = "instanceName_"+i;
    4        System.out.println("NoTheadSafeTask.createInstance("+instanceName+");");
    5        MyNoTheadSafeTask noTheadSafeTask = new MyNoTheadSafeTask(instanceName);
    6        noTheadSafeTaskQueue.add(noTheadSafeTask);
    7    } // end-if
    8}
  • provides a private method ‘getNoTheadSafeTaskFromPool’ to take an instance of “NoTheadSafeTask” in queue/pool. This method needn’t be synchronized because the pool ConcurrentLinkedQueue is a thread-safe queue.
  • 01private MyNoTheadSafeTask getNoTheadSafeTaskFromPool() {
    02    try {
    03        MyNoTheadSafeTask noTheadSafeTask = noTheadSafeTaskQueue.poll();
    04        System.out.printf("--> '%s' instance has been taken from the queue (new size: %d) \n", noTheadSafeTask.getInstanceName(), noTheadSafeTaskQueue.size());
    05        return noTheadSafeTask;
    06    } catch (Throwable e) {
    07        System.out.println("getNoTheadSafeTaskFromPool : an InterruptedException occured:"+e.getMessage());
    08    } // end-try
    09    return null;
    10}
  • provides a private method ‘releaseNoTheadSafeTaskToPool’ to release an instance of “NoTheadSafeTask” in queue/pool:
  • 1private final void releaseNoTheadSafeTaskToPool(MyNoTheadSafeTask noTheadSafeTask) {
    2    try {
    3        noTheadSafeTaskQueue.add(noTheadSafeTask);
    4        System.out.printf("--> '%s' instance has been released from the queue (new size: %d) \n", noTheadSafeTask.getInstanceName(), noTheadSafeTaskQueue.size());
    5    } catch (Throwable e) {
    6        System.out.println("releaseNoTheadSafeTaskToPool : an InterruptedException occured:"+e.getMessage());
    7    } // end-try
    8}
  • provides a public method ‘completeAction’ allowing a complete action with the retrieve of a instance of “NoTheadSafeTask” from the pool, the execution of action on this retrieved instance and the release of this retrieved instance of “NoTheadSafeTask” to the pool:
  • 1public boolean completeAction(String paramValue1, int paramValue2) throws Throwable {
    2    MyNoTheadSafeTask noTheadSafeTask = getNoTheadSafeTaskFromPool();
    3    try {
    4        return noTheadSafeTask.action(paramValue1, paramValue2);
    5    } finally {
    6        releaseNoTheadSafeTaskToPool(noTheadSafeTask);
    7    }
    8}

At last, the main method which will:
– create the singleton MySingletonDelegate which will create QUEUE,
– create 5 threads which will each call the business method completeAction provided by the above singleton MySingletonDelegate,
– execute these 5 threads due to an ExecutorService;

01public static void main(String[] args) {
02         
03    try {
04        // create singleton which will create QUEUE
05        MySingletonDelegate.getInstance();
06             
07        // Client classes of the  ClientDelegate
08        ExecutorService es = Executors.newFixedThreadPool(5);
09        //
10        for (int i=0; i < 5; i++) {
11                 
12            //Thread
13            es.submit(new Runnable() {
14                public void run() {
15                    try {
16                        MySingletonDelegate.getInstance().completeAction(null, 1);
17                    } catch (Throwable e) {
18                        e.printStackTrace();
19                    }
20                }
21            });
22 
23            // Wait 1s between the threads' launching
24            Thread.sleep( 1000 );  
25                 
26        } // end-for
27             
28        es.shutdown();
29 
30    } catch (Throwable e) {
31        e.printStackTrace();
32    }
33}

… and the outputs would be:

01NoTheadSafeTask.createInstance(instanceName_0);
02NoTheadSafeTask.createInstance(instanceName_1);
03NoTheadSafeTask.createInstance(instanceName_2);
04NoTheadSafeTask.createInstance(instanceName_3);
05NoTheadSafeTask.createInstance(instanceName_4);
06--> 'instanceName_0' instance has been taken from the queue (new size: 4)
07--> 'instanceName_0' starting 'action' and going to sleep for 4531 milliseconds.
08--> 'instanceName_1' instance has been taken from the queue (new size: 3)
09--> 'instanceName_1' starting 'action' and going to sleep for 3548 milliseconds.
10--> 'instanceName_2' instance has been taken from the queue (new size: 2)
11--> 'instanceName_2' starting 'action' and going to sleep for 3968 milliseconds.
12--> 'instanceName_3' instance has been taken from the queue (new size: 1)
13--> 'instanceName_3' starting 'action' and going to sleep for 4853 milliseconds.
14--> 'instanceName_4' instance has been taken from the queue (new size: 0)
15--> 'instanceName_4' starting 'action' and going to sleep for 1787 milliseconds.
16--> 'instanceName_1' instance has been released from the queue (new size: 1)
17--> 'instanceName_0' instance has been released from the queue (new size: 2)
18--> 'instanceName_4' instance has been released from the queue (new size: 3)
19--> 'instanceName_2' instance has been released from the queue (new size: 4)
20--> 'instanceName_3' instance has been released from the queue (new size: 5)

Conclusion
In this example, we could analyze that the FIFO order is kept because the instances taken by the consumer threads are in the same order than the adding’s order in the queue/pool.

01NoTheadSafeTask.createInstance(instanceName_0);
02NoTheadSafeTask.createInstance(instanceName_1);
03NoTheadSafeTask.createInstance(instanceName_2);
04NoTheadSafeTask.createInstance(instanceName_3);
05NoTheadSafeTask.createInstance(instanceName_4);
06...
07--> 'instanceName_0' instance has been taken from the queue (new size: 4)
08--> 'instanceName_1' instance has been taken from the queue (new size: 3)
09--> 'instanceName_2' instance has been taken from the queue (new size: 2)
10--> 'instanceName_3' instance has been taken from the queue (new size: 1)
11--> 'instanceName_4' instance has been taken from the queue (new size: 0)

The source code of this article and the a test cases are in the ZIP file attachement.

Download: java_queue.zip

Best regards,

Leave a Reply

Your email address will not be published.

Time limit is exhausted. Please reload CAPTCHA.

Related Post