Java Inter-thread Communication
In multi-threaded applications, threads often need to coordinate their activities or share information. This coordination, known as inter-thread communication, is essential for building efficient concurrent programs. In this tutorial, we'll explore how threads can communicate with each other in Java.
Why is Inter-thread Communication Needed?
When working with multiple threads, scenarios like the following are common:
- One thread needs to signal another that a particular task is complete
- Multiple threads need to coordinate access to shared resources
- One thread might need to wait for another thread to reach a certain state
Java provides several mechanisms to enable such communication, ensuring threads work together harmoniously.
The Core Methods for Inter-thread Communication
Java's Object
class provides three fundamental methods for inter-thread communication:
wait()
- Causes the current thread to wait until another thread callsnotify()
ornotifyAll()
notify()
- Wakes up a single thread waiting on this objectnotifyAll()
- Wakes up all threads waiting on this object
These methods must be called from within a synchronized context (block or method).
Understanding wait(), notify(), and notifyAll()
Let's examine how these methods work together:
Important Rules to Remember
- These methods must be called from within a synchronized context
wait()
causes the current thread to release the lock and waitnotify()
/notifyAll()
do not release the lock; the synchronized block must finish first- After being notified, a thread must reacquire the lock before continuing
Basic Example: Wait and Notify
Here's a simple example demonstrating wait and notify:
public class Message {
private String message;
private boolean empty = true;
public synchronized String read() {
while (empty) {
try {
wait(); // Wait until there's a message to read
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
empty = true;
notifyAll(); // Notify that the message has been read
return message;
}
public synchronized void write(String message) {
while (!empty) {
try {
wait(); // Wait until the previous message is read
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
empty = false;
this.message = message;
notifyAll(); // Notify that a message is available
}
}
public class Writer implements Runnable {
private Message message;
public Writer(Message message) {
this.message = message;
}
public void run() {
String[] messages = {
"Hello",
"How are you?",
"I'm doing great",
"Goodbye"
};
for (String msg : messages) {
message.write(msg);
System.out.println("Written: " + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
public class Reader implements Runnable {
private Message message;
public Reader(Message message) {
this.message = message;
}
public void run() {
for (String msg = message.read(); !msg.equals("Goodbye"); msg = message.read()) {
System.out.println("Read: " + msg);
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("Read: Goodbye");
}
}
public class WaitNotifyDemo {
public static void main(String[] args) {
Message message = new Message();
Thread writerThread = new Thread(new Writer(message));
Thread readerThread = new Thread(new Reader(message));
writerThread.start();
readerThread.start();
}
}
Output:
Written: Hello
Read: Hello
Written: How are you?
Read: How are you?
Written: I'm doing great
Read: I'm doing great
Written: Goodbye
Read: Goodbye
In this example:
- We have a
Message
class that represents a shared resource - The
Writer
thread writes messages - The
Reader
thread reads those messages - They coordinate using
wait()
andnotifyAll()
The Producer-Consumer Pattern
One of the most common applications of inter-thread communication is the Producer-Consumer pattern. This pattern involves:
- Producer threads that create data and add it to a shared buffer
- Consumer threads that take data from the buffer and process it
Here's an example implementation:
import java.util.LinkedList;
import java.util.Queue;
public class ProducerConsumerExample {
public static void main(String[] args) {
Buffer buffer = new Buffer(5); // Shared buffer with capacity 5
// Create and start producer and consumer threads
Thread producerThread = new Thread(new Producer(buffer));
Thread consumerThread = new Thread(new Consumer(buffer));
producerThread.start();
consumerThread.start();
}
}
// Shared buffer between producer and consumer
class Buffer {
private Queue<Integer> queue;
private int capacity;
public Buffer(int capacity) {
this.capacity = capacity;
this.queue = new LinkedList<>();
}
public synchronized void produce(int item) throws InterruptedException {
// Wait if buffer is full
while (queue.size() == capacity) {
System.out.println("Buffer is full. Producer is waiting...");
wait();
}
// Add item to buffer
queue.add(item);
System.out.println("Produced: " + item + ", Buffer size: " + queue.size());
// Notify consumer that an item is available
notify();
}
public synchronized int consume() throws InterruptedException {
// Wait if buffer is empty
while (queue.isEmpty()) {
System.out.println("Buffer is empty. Consumer is waiting...");
wait();
}
// Remove and return the first item
int item = queue.remove();
System.out.println("Consumed: " + item + ", Buffer size: " + queue.size());
// Notify producer that space is available
notify();
return item;
}
}
// Producer thread that adds items to the buffer
class Producer implements Runnable {
private Buffer buffer;
public Producer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
try {
for (int i = 1; i <= 10; i++) {
buffer.produce(i);
Thread.sleep(500); // Simulate time to produce
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// Consumer thread that removes items from the buffer
class Consumer implements Runnable {
private Buffer buffer;
public Consumer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
try {
for (int i = 1; i <= 10; i++) {
buffer.consume();
Thread.sleep(1000); // Simulate time to consume
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Output (will vary due to thread scheduling):
Produced: 1, Buffer size: 1
Produced: 2, Buffer size: 2
Consumed: 1, Buffer size: 1
Produced: 3, Buffer size: 2
Consumed: 2, Buffer size: 1
Produced: 4, Buffer size: 2
Consumed: 3, Buffer size: 1
Produced: 5, Buffer size: 2
Consumed: 4, Buffer size: 1
Produced: 6, Buffer size: 2
Consumed: 5, Buffer size: 1
Produced: 7, Buffer size: 2
Consumed: 6, Buffer size: 1
Produced: 8, Buffer size: 2
Consumed: 7, Buffer size: 1
Produced: 9, Buffer size: 2
Consumed: 8, Buffer size: 1
Produced: 10, Buffer size: 2
Consumed: 9, Buffer size: 1
Consumed: 10, Buffer size: 0
In this implementation:
- The
Buffer
class represents a shared, bounded buffer - The
Producer
adds items to the buffer - The
Consumer
removes items from the buffer - When the buffer is full, the producer waits
- When the buffer is empty, the consumer waits
- They signal each other when the state changes
Modern Approaches to Inter-thread Communication
While the wait/notify mechanism is fundamental, Java offers more modern approaches for inter-thread communication:
1. Using BlockingQueue
Java's java.util.concurrent
package provides thread-safe collections like BlockingQueue
that handle the synchronization for you:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) {
// Create a blocking queue with capacity 5
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// Producer thread
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
System.out.println("Producing: " + i);
queue.put(i); // Automatically blocks if queue is full
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer thread
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
Integer value = queue.take(); // Automatically blocks if queue is empty
System.out.println("Consumed: " + value);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
2. Using Semaphores
Semaphores can be used to control access to shared resources:
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
public static void main(String[] args) {
SharedResource resource = new SharedResource();
Thread thread1 = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
resource.produce(i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread thread2 = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
resource.consume();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
thread1.start();
thread2.start();
}
}
class SharedResource {
private int sharedValue;
private Semaphore producerSemaphore = new Semaphore(1);
private Semaphore consumerSemaphore = new Semaphore(0);
public void produce(int value) throws InterruptedException {
producerSemaphore.acquire();
this.sharedValue = value;
System.out.println("Produced: " + value);
consumerSemaphore.release();
}
public void consume() throws InterruptedException {
consumerSemaphore.acquire();
System.out.println("Consumed: " + sharedValue);
producerSemaphore.release();
}
}
3. Using CountDownLatch
CountDownLatch
can be used for one-time synchronization between threads:
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
// Create a latch with count 3
CountDownLatch latch = new CountDownLatch(3);
// Start worker threads
for (int i = 1; i <= 3; i++) {
final int workerId = i;
new Thread(() -> {
try {
System.out.println("Worker " + workerId + " starting task");
Thread.sleep(1000 * workerId);
System.out.println("Worker " + workerId + " finished task");
latch.countDown(); // Signal that this worker is done
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
// Main thread waits for all workers to finish
System.out.println("Main thread waiting for workers...");
latch.await();
System.out.println("All workers have finished, main thread continues");
}
}
Common Pitfalls and Best Practices
When working with inter-thread communication, be aware of these potential issues:
1. Deadlocks
Deadlocks occur when two or more threads are waiting for each other to release locks.
// Potential deadlock situation
public void method1() {
synchronized(lockA) {
synchronized(lockB) {
// Code here
}
}
}
public void method2() {
synchronized(lockB) { // Different order of lock acquisition
synchronized(lockA) {
// Code here
}
}
}
Best Practice: Always acquire locks in the same order across all threads.
2. Spurious Wakeups
A thread can wake up from wait()
without being notified.
// Incorrect
synchronized(obj) {
if (condition) {
obj.wait();
}
// Process data
}
// Correct
synchronized(obj) {
while (condition) { // Always use a loop
obj.wait();
}
// Process data
}
Best Practice: Always use wait()
in a loop that checks the condition.
3. Lost Notifications
If notify()
is called before wait()
, the notification is lost.
Best Practice: Use appropriate state variables to track conditions, and use notifyAll()
when in doubt.
Practical Application: A Simple Task Scheduler
Here's a real-world application - a simple task scheduler that uses inter-thread communication:
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
public class TaskSchedulerDemo {
public static void main(String[] args) {
TaskScheduler scheduler = new TaskScheduler();
// Start the scheduler
Thread schedulerThread = new Thread(scheduler);
schedulerThread.start();
// Submit some tasks
for (int i = 1; i <= 5; i++) {
final int taskId = i;
scheduler.scheduleTask(() -> System.out.println("Executing Task " + taskId));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// Let the tasks run for a while, then shutdown
try {
Thread.sleep(10000);
scheduler.shutdown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class TaskScheduler implements Runnable {
private final Queue<Runnable> taskQueue = new LinkedList<>();
private boolean running = true;
// Schedule a task
public synchronized void scheduleTask(Runnable task) {
taskQueue.add(task);
System.out.println("Task scheduled, queue size: " + taskQueue.size());
notify(); // Signal that a new task is available
}
// Shutdown the scheduler
public synchronized void shutdown() {
running = false;
notify(); // Wake up the scheduler thread if it's waiting
System.out.println("Scheduler shutdown initiated");
}
@Override
public void run() {
while (running) {
Runnable task = null;
// Get a task from the queue
synchronized (this) {
while (running && taskQueue.isEmpty()) {
try {
System.out.println("Scheduler waiting for tasks...");
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
if (!running) {
break;
}
task = taskQueue.poll();
}
// Execute the task outside the synchronized block
try {
System.out.println("Scheduler executing task");
task.run();
// Simulate some processing time
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
System.out.println("Scheduler stopped");
}
}
This scheduler demonstrates:
- A worker thread that waits for tasks
- Other threads that submit tasks to the queue
- Synchronized methods to coordinate access to the task queue
- Using
wait()
andnotify()
for signaling between threads
Summary
Inter-thread communication is a fundamental aspect of Java multithreading that allows threads to coordinate their activities. In this tutorial, we've covered:
- The basic
wait()
,notify()
, andnotifyAll()
methods for thread communication - How to implement the Producer-Consumer pattern
- Modern approaches like
BlockingQueue
,Semaphore
, andCountDownLatch
- Common pitfalls and best practices
- A practical example of a task scheduler
Understanding inter-thread communication enables you to build more efficient and robust concurrent applications.
Additional Resources
- Java Documentation on wait/notify
- Java Concurrency in Practice - A book by Brian Goetz
- Oracle's Concurrency Tutorial
Exercises
- Modify the Producer-Consumer example to support multiple producers and consumers.
- Implement a simple thread pool using inter-thread communication techniques.
- Create a barrier implementation that allows a group of threads to wait until they all reach a common point.
- Build a resource pool (e.g., database connections) that uses inter-thread communication to manage available resources.
- Implement a reader-writer lock that gives preference to writers.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)