JavaBlog.fr / Java.lu DEVELOPMENT,Java Java: Multi tasks, Multi threading, Synchronization, Semaphore, Mutex, Barrier

Java: Multi tasks, Multi threading, Synchronization, Semaphore, Mutex, Barrier

Hi,

This post concerns the multi-tasking or multi-threading which is a programming technique to take advantage of the use of tasks (threads). We will see in this tutorial the practical implementation of a multi-tasking.

Firstly, to define what the multi-tasking, I propose several examples within an application:

  • the GUI can start a thread to load a image while it continues to process events generated by user actions,
  • a server application that waits for connection requests from clients, can initiate a process / thread to handle requests for multiple clients simultaneously,
  • the multiplication of two matrices (m, p) and (p, n) may be carried out m * n parallel threads,

Below, some goals of multi-tasking:

  • improve performance by distributing the different tasks on different processors,
  • enjoy the break of a task (waiting for I / O or a user action) to do something else,
  • faster response to user actions by rejecting a long and non-interactive task in another thread (example: corrections of spelling in a text editor),

However, there are also problems with multi-tasking:

  • the difficulties of writing a multi-threaded program,
  • the debugging of a program that uses the multi-tasking,

1) Notion of Process
Each process has an distinct address space (memory space where variables and objects are stored). Communication between processes can be achieved by:

  • Signals,
  • Tubes (pipes),
  • Shared Memory,
  • Sockets,
  • …,

The java.lang.Runtime class:

  • Allows the controls on the current process: Runtime.getRuntime(),
  • Methods to know the runtime environment: total / free / MaxMemory(), availableProcessors(),
  • Process Control: gc(), exit(), halt(),
  • Creation of other processes: exec(),
  • …,

The java.lang.Process class:

  • Allows the controls on a child process,
  • Creation of children processes: Runtime.getRuntime().exec(″command″),
  • Control: waitFor(), exitValue(), destroy(), getInputStream(), getOutputStream(), getErrorStream(),
  • …,

2) Notion of Thread
A thread allows run multiple portions of code simultaneously in the same process i.e. sharing the same memory space.
The Java environment is multi-threaded:

  • Creating Threads,
  • Synchronization of threads that share data,
  • …,

3) Creating and launching Threads
In Java, a task is actually a class that implements the Runnable interface. This class must have a constructor and a “run” method, which will be called during the actual execution of the task:

public class MyTask implements Runnable{
	private String threadName; // name of thread
	
	// Constructor 
	public MyTask (String name){
		this.threadName = name; // set name of thread
	}

	public void run(){	
		// instructions
	}
}

The launch of a task will look like this:

  • Creating an instance of the class are ExecutorService via the static method newFixedThreadPool of Executors class in taking as parameter the number of tasks required,
  • Instantiate the class “MyTask” with a identifier parameter,
  • Launch this task thanks to the execute method of ExecutorService class,
  • Once the instructions of the task were performed, one calls the shutdown method,

4) A first example
The following class MyTask implements Runnable, so that each MyTask object can execute concurrently. A variable sleepTime stores a random integer value chosen when the MyTask constructor executes. Each thread running a MyTask object sleeps for the amount of time specified by the corresponding MyTask object’s sleepTime (for a certain number of milliseconds), then outputs its name.

MyTask.java

package ho.tests.thread.synchro.test1.order1.launch;

import java.util.Random;

/**
 * Mytask Runnable example
 * @author Huseyin OZVEREN
 */
public class MyTask implements Runnable {

	private int sleepTime; // random sleep time for thread   
	private String threadName; // name of thread   
	private static Random generator = new Random();
	
	public MyTask(String name) {
		// set name of thread
		this.threadName = name;
		// random sleep time between 0 and 5 seconds
		this.sleepTime = generator.nextInt( 5000 ); 
	}

	public void run() {
		try{ 
			// put thread to sleep for sleepTime amount of time
			System.out.printf( "%s starting and going to sleep for %d milliseconds.\n", this.threadName, this.sleepTime );
			// put thread to sleep
			Thread.sleep( this.sleepTime );   
		} catch ( InterruptedException exception )  {  
			exception.printStackTrace(); 
		}
		// print thread name
		System.out.printf( "%s done sleeping\n", this.threadName );  
	}		
}

ApplicationLaunch.java

package ho.tests.thread.synchro.test1.order1.launch;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Thread launching example
 * @author Huseyin OZVEREN
 */
public class ApplicationLaunch {
	public static void main(String[] args) {
		
		System.out.println( "Starting threads in main thread" );
		
		// create and name each runnable
		MyTask th1 = new MyTask("threadname_1");
		MyTask th2 = new MyTask("threadname_2");
		MyTask th3 = new MyTask("threadname_3");

		// create ExecutorService to manage threads OR empty (better)
		ExecutorService es = Executors.newFixedThreadPool(3);
		
		// start threads and place in runnable state 
		es.execute(th1);
		es.execute(th2);
		es.execute(th3);

		// shutdown worker threads  
		es.shutdown();
		
		System.out.println( "Threads started, main thread ends\n" );
	}
}

Outputs in console:

Starting threads in main thread
Threads started, main thread ends

threadname_1 starting and going to sleep for 624 milliseconds.
threadname_3 starting and going to sleep for 849 milliseconds.
threadname_2 starting and going to sleep for 719 milliseconds.
threadname_1 done sleeping
threadname_2 done sleeping
threadname_3 done sleeping

5) Synchronization
The use of threads can lead to synchronization requirements to avoid problems with concurrent access to global variables.
So for example, we throw 2 tasks:
– Their instructions will therefore execute simultaneously,
– If these instructions change a global variable in the program, then there will be a problem.

In parallel programming, a “critical section” is a piece of code that can be executed concurrently by multiple threads without risking operational anomalies. These “critical section” synchronized are also named thread-safe, they could be blocks of codes, methods or components like “Vector”, “Hastable”, “ConcurrentHashMap”, “CopyOnWriteArrayList” or “CopyOnWriteArraySet” (whereas the “ArrayList” and “HashMap” are not thread-safe).

Warning: The thread-safe components should be used when there are risks due to changes by multiple threads because they are less efficient than “normal” or classic components.

This is where we need to use methods of synchronization. So, it is necessary to avoid the simultaneous execution of critical sections by multiple threads:
– In Java, the synchronized keyword is used to synchronize threads and prevent them from performing at the same time, critical sections of codes,
– Several threads can run concurrently code synchronized to the same object.

There are 2 possibilities to synchronize code on a object “myObject”:

  • Declare a synchronized method “myMethod” (Synchronization during the call of myObject.myMethod()):
    public synchronized int myMethod(...) { . . . }
    
  • Use of a synchronized block on the object “myObject”:
  • synchronized(myObject) {
    // synchronized codes
    ...
    }
    

6) Semaphore
Java provides a class java.util.concurrent.Semaphore whose the constructor takes two parameters: an integer representing the current value of the semaphore that defines the number of resources available initially and a boolean indicating whether the method of administration must be FIFO.

Here we are going to develop the previous class MyTask and then synchronize two tasks to get a consistent outputs in console. In this example, the “acquire” method checks if the resource “semaphore” is available, whether this is the case then the semaphore is blocked by the current task, and this task is left to execute. Else the task is blocked until the resource “semaphore” is released by a call to “release” method (by an other task) that gives the resource.

MyTask.java

package ho.tests.thread.synchro.test1.order2.semaphore;

import java.util.Random;
import java.util.concurrent.Semaphore;

/**
 * Mytask Runnable example using semaphore synchronization
 * @author Huseyin OZVEREN
 */
public class MyTask implements Runnable {
	
	private int sleepTime; // random sleep time for thread   
	private String threadName; // name of thread   
	private static Random generator = new Random();
	
	Semaphore personalSem;
	Semaphore neighboringSem;

	public MyTask(String name, Semaphore personalSem, Semaphore neighboringSem) {
		// set name of thread
		this.threadName = name;
		// random sleep time between 0 and 5 seconds
		this.sleepTime = generator.nextInt( 5000 ); 
		//
		this.personalSem = personalSem;
		this.neighboringSem = neighboringSem;
	}

	public void run() {
		try {
			personalSem.acquire();
		} catch (InterruptedException ex) {
			System.out.println(ex);
		}

		try{ 
			// put thread to sleep for sleepTime amount of time
			System.out.printf( "%s starting and going to sleep for %d milliseconds.\n", this.threadName, this.sleepTime );
			// put thread to sleep
			Thread.sleep( this.sleepTime );   
		} catch ( InterruptedException exception )  {  
			exception.printStackTrace(); 
		}

		// print thread name
		System.out.printf( "%s done sleeping\n", this.threadName ); 

		// release semaphore => unblock the next task
		neighboringSem.release();
	}
}

ApplicationSemaphore.java

package ho.tests.thread.synchro.test1.order2.semaphore;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * Thread launching example using semaphore synchronization
 * @author Huseyin OZVEREN
 */
public class ApplicationSemaphore {
	public static void main(String[] args) {
		ExecutorService es = Executors.newFixedThreadPool(2);
		// SEMPAHORE
		// Param1=current value of the semaphore that defines the number of resources available initially (when is 0 then lock)
		// Param2=boolean indicating whether the method of administration must be FIFO
		Semaphore personalSem = new Semaphore(1, true);
		Semaphore neighboringSem = new Semaphore(0, true);

		System.out.println( "Starting threads in main thread" );

		// create and name each runnable
		// 1st thread
		MyTask th1 = new MyTask("threadname_1", personalSem, neighboringSem);
		es.execute(th1);

		// 2nd thread
		MyTask th2 = new MyTask("threadname_2", neighboringSem, personalSem);
		es.execute(th2);

		// shutdown worker threads  
		es.shutdown();

		System.out.println( "Threads started, main thread ends\n" );
	}
}

Outputs in console:

Starting threads in main thread
Threads started, main thread ends

threadname_1 starting and going to sleep for 2284 milliseconds.
threadname_1 done sleeping
threadname_2 starting and going to sleep for 2438 milliseconds.
threadname_2 done sleeping

Explanations:
The semaphore “personalSem” is initialized to 1 because it is necessary to have at least a resource initially available to execute the instructions when the method “acquire” will be called in the first task. The semaphore “neighboringSem” is initialized to 0 because in the second task, when the method “acquire” will be called, the task will be blocked because no (0) resource will be available.

An other example with 3 tasks:
ApplicationSemaphoreWith3Tasks.java

package ho.tests.thread.synchro.test1.order2.semaphore;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * Thread launching example using semaphore synchronization
 * @author Huseyin OZVEREN
 */
public class ApplicationSemaphoreWith3Tasks {
	public static void main(String[] args) {
		ExecutorService es = Executors.newFixedThreadPool(2);
		// SEMPAHORE
		// Param1=current value of the semaphore that defines the number of resources available initially (when is 0 then lock)
		// Param2=boolean indicating whether the method of administration must be FIFO
		Semaphore personalSem = new Semaphore(1, true);
		Semaphore neighboringSem2nd = new Semaphore(0, true);
		Semaphore neighboringSem3rd = new Semaphore(0, true);

		System.out.println( "Starting threads in main thread" );

		// create and name each runnable
		// 1st thread
		MyTask th1 = new MyTask("threadname_1", personalSem, neighboringSem2nd);
		es.execute(th1);

		// 2nd thread
		MyTask th2 = new MyTask("threadname_2", neighboringSem2nd, neighboringSem3rd);
		es.execute(th2);

		// 3rd thread
		MyTask th3 = new MyTask("threadname_3", neighboringSem3rd, personalSem);
		es.execute(th3);

		// shutdown worker threads  
		es.shutdown();

		System.out.println( "Threads started, main thread ends\n" );
	}
}

Outputs in console:

Starting threads in main thread
Threads started, main thread ends

threadname_1 starting and going to sleep for 2709 milliseconds.
threadname_1 done sleeping
threadname_2 starting and going to sleep for 3964 milliseconds.
threadname_2 done sleeping
threadname_3 starting and going to sleep for 3101 milliseconds.
threadname_3 done sleeping

However, the execution of this example you will find probably that the message “end main task” is not appearing at the end of the execution of two tasks because it has not been synchronized and therefore it may appear at any time.

This will allow us to see a second mechanism is the synchronization barrier.

7) Barrier
Imagine a barrier which remains closed as the two tasks were not performed. Java proposes a class named java.util.concurrent.CountDownLatch which is similar to a barrier initially closed. A construction of this object, a counter block is initialized with the integer value passed as parameter.

So, this class “CountDownLatch” provides the method “await” allowing to tasksto wait the barrier’ survey. To decrement the counter value, it must call the “countdown”. When the counter value reaches 0, it is that all tasks have completed their work and therefore the barrier opens.

Here we are going to develop the previous class MyTask in order to synchronize the main thread and two tasks to get a consistent outputs in console.

MyTask.java

package ho.tests.thread.synchro.test1.order3.barrier;

import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.CountDownLatch;

/**
 * Mytask Runnable example using semaphore and barrier synchronization
 * @author Huseyin OZVEREN
 */
public class MyTask implements Runnable {
	private int sleepTime; // random sleep time for thread   
	private String threadName; // name of thread   
	private static Random generator = new Random();
	
	// Semaphores
	Semaphore personalSem;
	Semaphore neighboringSem;
	
	// Barrier
	CountDownLatch barrier; 
	
	public MyTask(String name, Semaphore personalSem, Semaphore neighboringSem, CountDownLatch barrier) {
		// set name of thread
		this.threadName = name;
		// random sleep time between 0 and 5 seconds
		this.sleepTime = generator.nextInt( 5000 ); 
		//
		this.personalSem = personalSem;
		this.neighboringSem = neighboringSem;
		//
		this.barrier = barrier;
	}
	

	public void run() {
		try {
			personalSem.acquire();
		} catch (InterruptedException ex) {
			System.out.println(ex);
		}

		try{ 
			// put thread to sleep for sleepTime amount of time
			System.out.printf( "%s starting and going to sleep for %d milliseconds.\n", this.threadName, this.sleepTime );
			// put thread to sleep
			Thread.sleep( this.sleepTime );   
		} catch ( InterruptedException exception )  {  
			exception.printStackTrace(); 
		}
		
		// print thread name
		System.out.printf( "%s done sleeping\n", this.threadName ); 
		
		// decrements the counter value of the barrier "bar"
		barrier.countDown();
		
		// release semaphore => unblock the next task
		neighboringSem.release();
	}	
}

ApplicationBarrierSemaphore.java

package ho.tests.thread.synchro.test1.order3.barrier;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;


/**
 * Thread launching example using semaphore and barrier synchronization
 * @author Huseyin OZVEREN
 */
public class ApplicationBarrierSemaphore {
	public static void main(String[] args) {
		ExecutorService es = Executors.newFixedThreadPool(2);
		// SEMPAHORE
		// Param1=current value of the semaphore that defines the number of resources available initially (when is 0 then lock)
		// Param2=boolean indicating whether the method of administration must be FIFO
		Semaphore personalSem = new Semaphore(1, true);
		Semaphore neighboringSem = new Semaphore(0, true);
		
		// BARRIER for 2 threads
		// Param1=blocking counter initialized with an integer value
		CountDownLatch barrier = new CountDownLatch(2);
		
		System.out.println( "Starting threads in main thread" );

		// create and name each runnable
		// 1st thread
		MyTask th1 = new MyTask("threadname_1", personalSem, neighboringSem, barrier);
		es.execute(th1);

		// 2nd thread
		MyTask th2 = new MyTask("threadname_2", neighboringSem, personalSem, barrier);
		es.execute(th2);
		
		try {
			barrier.await(); // Current thread is blocked until all others threads have call the method bar.countDown();
		} catch (InterruptedException ex) {
			System.out.println(ex);
		}

		// shutdown worker threads  
		es.shutdown();

		System.out.println( "Threads started, main thread ends\n" );
	}
}

Outputs in console:

Starting threads in main thread
threadname_1 starting and going to sleep for 42 milliseconds.
threadname_1 done sleeping
threadname_2 starting and going to sleep for 2610 milliseconds.
threadname_2 done sleeping
Threads started, main thread ends

8) Mutex
Finally, I would like expose you the MutEx (MUTual EXclusion). But what is it?
A Mutex is nothing but simple non-re entrant mutual exclusion lock. The lock is free upon construction. Each acquire gets the lock, and each release frees it. Releasing a lock that is already free has no effect. MUTEX is achieved in java by the keyword “synchronized” used when two or more methods try to access the same variables/Data structure. Mutex can be useful in constructions that cannot be expressed using java synchronized blocks because the acquire/release pairs do not occur in the same method or code block.

So, Java proposes a class named java.util.concurrent.locks.ReentrantLock:
“A reentrant mutual exclusion Lock with the same basic behavior and semantics as the implicit monitor lock accessed using synchronized methods and statements, but with extended capabilities.”

The main use of Mutex is sharing data or resources in a multitasking environment. For example, if a task changes and displays a variable, which must be protected during its modification. This requirement can be achieved through the use of semaphores, however, a lock or may be more practical.

Here we are going to develop the previous class MyTask in order to synchronize the main thread and two tasks to get a consistent outputs in console.

MyTask.java

package ho.tests.thread.synchro.test1.order4.mutex;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Mytask Runnable example using mutex and barrier synchronization
 * @author Huseyin OZVEREN
 */
public class MyTask implements Runnable {
	private int sleepTime; // random sleep time for thread   
	private String threadName; // name of thread   
	private static Random generator = new Random();

	// Mutex
	ReentrantLock mutex;
	// Barrier
	CountDownLatch barrier; 
	
	public MyTask(String name, ReentrantLock mutex, CountDownLatch barrier) {
		// set name of thread
		this.threadName = name;
		// random sleep time between 0 and 5 seconds
		this.sleepTime = generator.nextInt( 5000 ); 
		//
		this.mutex = mutex;
		//
		this.barrier = barrier;
	}

	public void run() {
		// LOCK
		mutex.lock();

		try {
			// put thread to sleep for sleepTime amount of time
			System.out.printf( "%s starting and going to sleep for %d milliseconds.\n", this.threadName, this.sleepTime );
			// put thread to sleep
			Thread.sleep( this.sleepTime );   
			
			// Instruction to protect = "critical section"
			System.out.printf("%s before modif Application.objToProtectSynchro='%s'\n", this.threadName, ApplicationBarrierMutex.objToProtectSynchro);

			// Modify the "critical resource"
			ApplicationBarrierMutex.objToProtectSynchro = "value fixed by Task "+this.threadName;
			
			System.out.printf("%s after modif Application.objToProtectSynchro='%s'\n", this.threadName, ApplicationBarrierMutex.objToProtectSynchro);
		} catch (InterruptedException exception) {
			exception.printStackTrace();
		} finally {
			// UNLOCK
			mutex.unlock();

			// decrements the counter value of the barrier "bar"
			barrier.countDown();
		}
		
		// print thread name
		System.out.printf( "%s done sleeping\n", this.threadName ); 
	}
}

ApplicationBarrierMutex.java

package ho.tests.thread.synchro.test1.order4.mutex;


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Thread launching example using mutex and barrier synchronization
 * @author Huseyin OZVEREN
 */
public class ApplicationBarrierMutex {
	
	// Variable which could be modified by several tasks = "critical resource"
	public static String objToProtectSynchro = "";
	
	public static void main(String[] args) {
		ExecutorService es = Executors.newFixedThreadPool(2);
		//MUTEX:
		// Param1=bboolean indicating whether the method of administration must be FIFO on this lock.
		ReentrantLock mutex = new ReentrantLock(true);
		
		// BARRIER for 2 threads
		// Param1=blocking counter initialized with an integer value
		CountDownLatch barrier = new CountDownLatch(2);
		
		System.out.println( "Starting threads in main thread" );

		
		// create and name each runnable
		// 1st thread
		MyTask th1 = new MyTask("threadname_1", mutex, barrier);
		es.execute(th1);

		// 2nd thread
		MyTask th2 = new MyTask("threadname_2", mutex, barrier);
		es.execute(th2);
		
		try {
			barrier.await(); // Current thread is blocked until all others threads have call the method bar.countDown();
		} catch (InterruptedException ex) {
			System.out.println(ex);
		}

		// shutdown worker threads  
		es.shutdown();

		System.out.println( "Threads started, main thread ends\n" );
	}
}

Outputs in console:

Starting threads in main thread
threadname_1 starting and going to sleep for 239 milliseconds.
threadname_1 before modif Application.objToProtectSynchro=''
threadname_1 after modif Application.objToProtectSynchro='value fixed by Task threadname_1'
threadname_1 done sleeping
threadname_2 starting and going to sleep for 3142 milliseconds.
threadname_2 before modif Application.objToProtectSynchro='value fixed by Task threadname_1'
threadname_2 after modif Application.objToProtectSynchro='value fixed by Task threadname_2'
threadname_2 done sleeping
Threads started, main thread ends

Here, this is an article that ends.

Best regards,

Huseyin OZVEREN

Download the sources of this article: java_thread_synchro.zip

1 thought on “Java: Multi tasks, Multi threading, Synchronization, Semaphore, Mutex, Barrier”

Leave a Reply

Your email address will not be published.

Time limit is exhausted. Please reload CAPTCHA.

Related Post