Monitors
Rather than using semaphores to protect critical sections, monitors provide a higher-level abstraction for synchronization. Monitors are a more object-oriented approach to synchronization, where a monitor is a class that encapsulates data that needs to be protected so that only one thread can access it at a time.
Here's a timeline of the development of monitors:
- 1965: Edsger Dijkstra introduces the concept of semaphores in his paper Cooperating sequential processes, laying the foundation for synchronization primitives.
- 1972: Tony Hoare publishes Towards a theory of parallel programming, discussing the idea of conditional critical regions, which later evolve into monitors.
- 1973: Per Brinch Hansen develops the concept of monitors and implements them in the Concurrent Pascal programming language.
- 1975: Tony Hoare formalizes the concept of monitors in his paper Monitors: An Operating System Structuring Concept, describing them as a synchronization construct for shared data.
- 1980s–1990s: Monitors gain wider adoption and are incorporated into various programming languages, such as Ada, Java, and C#.
- Technically, Java uses a(n over) simplified version of monitors.
We'll use Java as an example for showing how monitors work, since it's probably familiar to you. We can say that a Java class acts as a monitor if all its methods use the synchronized keyword. This keyword ensures that only one thread can access the object's data at a time.
Here's an example BankAccount class that protects its balance with by being a monitor:
class BankAccount {
private int balance;
public BankAccount(int initialBalance) {
balance = initialBalance;
}
public synchronized void deposit(int amount) {
balance += amount;
}
public synchronized void withdraw(int amount) {
balance -= amount;
}
public synchronized int getBalance() {
return balance;
}
}
Notice that other than adding a keyword to our code, we didn't have to do anything special to make this class thread-safe, which contrasts with semaphores, where we'd have to declare a semaphore variable for each object, initialize it to the right value, and then remember to acquire and release the semaphore in the right places.
But behind the scenes, Java is using semaphores or something like them, right?
Right. Monitors are a higher-level abstraction that can be implemented using semaphores or other lower-level synchronization primitives.
The key is that it's easy to use monitors, and less error-prone. I like it when people make things easier for me!
It solves the critical section problem, but is it good enough to do the producer–consumer problem?
Let's see…
Producer–Consumer, First Try
Here's a first crack at recreating the producer–consumer problem we did on the previous page using C and semaphores, but this time using Java with a monitor class BoundedBuffer.
Your goal is to skim this code and get the big ideas, not carefully check every detail.
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
class BoundedBuffer {
private final Queue<String> buffer;
private final int capacity;
public BoundedBuffer(int capacity) {
this.buffer = new LinkedList<>();
this.capacity = capacity;
}
public synchronized boolean put(String message) {
if (buffer.size() == capacity) { // Fail if buffer is full
return false;
}
buffer.add(message);
return true;
}
public synchronized String get() {
if (buffer.isEmpty()) { // Fail if buffer is empty
return null;
}
return buffer.remove();
}
}
class Producer implements Runnable {
private final BoundedBuffer buffer;
private final int id;
private final Random random = new Random();
Producer(BoundedBuffer buffer, int id) {
this.buffer = buffer;
this.id = id;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
String message = "Message " + i + " from Producer " + id;
while (!buffer.put(message)) { // Wait until successful
Thread.yield();
}
System.out.println("Producer " + id + " added: " + message);
Thread.sleep(random.nextInt(1000));
}
while (!buffer.put("Bye")) { // Wait until successful
Thread.yield();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
private final BoundedBuffer buffer;
private final int id;
private final Random random = new Random();
Consumer(BoundedBuffer buffer, int id) {
this.buffer = buffer;
this.id = id;
}
@Override
public void run() {
try {
while (true) {
String message = null;
while (message == null) {
message = buffer.get();
if (message == null) { // Wait until successful
Thread.yield();
}
}
System.out.println("Consumer " + id + ": " + message);
if ("Bye".equals(message)) {
break;
}
Thread.sleep(random.nextInt(100));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class ProducerConsumerDemo {
private static final int BUFFER_SIZE = 32;
private static final int NUM_PRODUCERS = 2;
private static final int NUM_CONSUMERS = 2;
public static void main(String[] args) throws InterruptedException {
BoundedBuffer buffer = new BoundedBuffer(BUFFER_SIZE);
Thread[] producers = new Thread[NUM_PRODUCERS];
Thread[] consumers = new Thread[NUM_CONSUMERS];
for (int i = 0; i < NUM_PRODUCERS; i++) {
producers[i] = new Thread(new Producer(buffer, i));
producers[i].start();
}
for (int i = 0; i < NUM_CONSUMERS; i++) {
consumers[i] = new Thread(new Consumer(buffer, i));
consumers[i].start();
}
for (Thread producer : producers) {
producer.join();
}
for (Thread consumer : consumers) {
consumer.join();
}
}
}
Busy Waiting
A problem with this code is that while it solves the critical-section problem (i.e., protects access to the buffer), the code is inefficient. The producer and consumer threads are both spinning in busy-wait loops, checking whether the buffer is full or empty. This problem is exactly the same one we saw with spinlocks, where a thread is spinning in a loop, checking if the lock is available.
We need a way for threads to go to sleep (a.k.a. block) and be woken up when the buffer is no longer full or empty.
Java's got you covered!!
Adding Wait and NotifyAll
Monitor classes in Java have an additional trick up their sleeve: the wait and notifyAll methods.
wait()causes the current thread to wait until another thread callsnotifyAll()on the same object. It's stepping away from the critical section but ready to leap back in when it's notified.notifyAll()wakes up all threads that are waiting on the object. It's like saying, “Hey, everyone, the buffer is no longer full or empty, come take a look!”
You can think of it a bit like there being a bell that can be rung. When you ring the bell, everyone who's waiting for it to be rung wakes up. (Each Java object has its own “bell”, but only one each.)
Here's the code:
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
class BoundedBuffer {
private final Queue<String> buffer;
private final int capacity;
public BoundedBuffer(int capacity) {
this.buffer = new LinkedList<>();
this.capacity = capacity;
}
public synchronized void put(String message) throws InterruptedException {
while (buffer.size() == capacity) { // Sleep until buffer is not full
wait();
}
buffer.add(message);
notifyAll(); // Wake everyone up
}
public synchronized String get() throws InterruptedException {
while (buffer.isEmpty()) { // Sleep untill buffer is not empty
wait();
}
String message = buffer.remove();
notifyAll();
return message; // Wake everyone up
}
}
class Producer implements Runnable {
private final BoundedBuffer buffer;
private final int id;
private final Random random = new Random();
Producer(BoundedBuffer buffer, int id) {
this.buffer = buffer;
this.id = id;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
String message = "Message " + i + " from Producer " + id;
buffer.put(message);
Thread.sleep(random.nextInt(1000));
}
buffer.put("Bye");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
private final BoundedBuffer buffer;
private final int id;
private final Random random = new Random();
Consumer(BoundedBuffer buffer, int id) {
this.buffer = buffer;
this.id = id;
}
@Override
public void run() {
try {
while (true) {
String message = buffer.get();
System.out.println("Consumer " + id + ": " + message);
if ("Bye".equals(message)) {
break;
}
Thread.sleep(random.nextInt(100));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class BoundedBufferDemo {
private static final int BUFFER_SIZE = 32;
private static final int NUM_PRODUCERS = 2;
private static final int NUM_CONSUMERS = 2;
public static void main(String[] args) throws InterruptedException {
BoundedBuffer buffer = new BoundedBuffer(BUFFER_SIZE);
Thread[] producers = new Thread[NUM_PRODUCERS];
Thread[] consumers = new Thread[NUM_CONSUMERS];
for (int i = 0; i < NUM_PRODUCERS; i++) {
producers[i] = new Thread(new Producer(buffer, i));
producers[i].start();
}
for (int i = 0; i < NUM_CONSUMERS; i++) {
consumers[i] = new Thread(new Consumer(buffer, i));
consumers[i].start();
}
for (Thread producer : producers) {
producer.join();
}
for (Thread consumer : consumers) {
consumer.join();
}
}
}
An important aspect of this design is that there is only one bell to ring, so we ring the same bell when the buffer is either no longer full or empty. So some threads get woken up needlessly. For that reason, you can see that all the wait() calls are inside while loops, so that when a thread wakes up, it checks to see if it still needs to wait.
Waking everyone up seems terrible. Why not just wake up one thread?
Java's got you covered!!!
Java also provides another method, notify(), which wakes up only one thread.
In OnlineGDB, fork the code try changing notifyAll() to notify() in the put and get methods and see what happens.
I'm pretty sure it should work. I know the worry is that we'll wake up the wrong kind of thread (wake up the
put()code when the bell is saying that there's an opportunity toget()), but I don't think that can happen, because the buffer can't be both full and empty at the same time.
If you want to see the problem actually happen, make the following changes:
- Remove the
Thread.sleeplines from theProducerandConsumerclasses. - Change the
BUFFER_SIZEto 1.
When you try these changes, be kind to Online GDB! If/when it gets stuck, hit the button to stop the program (or press Ctrl+C in the output terminal).
The Lost-Wake-Up Problem
The problem here is that we only wake up one thread, but it might not be the right thread. Dropping a needed wake-up on the floor is called the lost-wake-up problem.
In this specific problem, we could reason that lost wake-ups can't happen when there is a single producer and consumer, but when there is more than one of either, we can run into trouble.
Surely you're going to give more explanation than that, right?
Actually, no. While we could lay out the exact sequence of events, we'd get lost in the details. The reality is that human beings are terrible at envisioning possible interleavings of concurrent tasks and being confident they're correct.
If you really want to know, catch Prof. Melissa in the lab and she'll walk you through it.
Meh. I don't need to know. I'll just use
notifyAll()and be done with it.
The crux of our problem in this case was that we had two different events we were looking for:
- The buffer is no longer full
- The buffer is no longer empty
It would be better if instead of having one bell to ring, we had two bells, one for each event. That way, we could ring the right bell for the right event.
Condition Variables
Let's remember two things. First, we now want to have separate named conditions, one for being full and one for being empty. Second, this functionality is being designed by programming-language designers. To programming language folks, the obvious named thing is a variable, so they called this feature condition variables.
We can imagine declaring them as
private final ConditionVariable notFull = new ConditionVariable();
private final ConditionVariable notEmpty = new ConditionVariable();
and instead of writing wait() and notify(), we'd write wait(notFull) and notify(notFull).
What's stored in these condition variables?
We don't have to worry about that to use them. We read the above lines as “wait until the buffer is not full” and “notify that the buffer is not full”. The details of how that's implemented are hidden from us.
But it would be some kind of queue of sleeping threads waiting for the condition to be true.
Other than that, our code is exactly the same. If you want to check it out, click the button below.
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import alternatereality.java.concurrent.ConditionVariable;
class BoundedBuffer {
private final Queue<String> buffer;
private final int capacity;
private final ConditionVariable notFull = new ConditionVariable();
private final ConditionVariable notEmpty = new ConditionVariable();
public BoundedBuffer(int capacity) {
this.buffer = new LinkedList<>();
this.capacity = capacity;
}
public synchronized void put(String message) throws InterruptedException {
while (buffer.size() == capacity) {
wait(notFull);
}
buffer.add(message);
notifyAll(notEmpty);
}
public synchronized String get() throws InterruptedException {
while (buffer.isEmpty()) {
wait(notEmpty);
}
String message = buffer.remove();
notifyAll(notFull);
return message;
}
}
class Producer implements Runnable {
private final BoundedBuffer buffer;
private final int id;
private final Random random = new Random();
Producer(BoundedBuffer buffer, int id) {
this.buffer = buffer;
this.id = id;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
String message = "Message " + i + " from Producer " + id;
buffer.put(message);
Thread.sleep(random.nextInt(1000));
}
buffer.put("Bye");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
private final BoundedBuffer buffer;
private final int id;
private final Random random = new Random();
Consumer(BoundedBuffer buffer, int id) {
this.buffer = buffer;
this.id = id;
}
@Override
public void run() {
try {
while (true) {
String message = buffer.get();
System.out.println("Consumer " + id + ": " + message);
if ("Bye".equals(message)) {
break;
}
Thread.sleep(random.nextInt(100));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class ProducerConsumerDemo {
private static final int BUFFER_SIZE = 32;
private static final int NUM_PRODUCERS = 2;
private static final int NUM_CONSUMERS = 2;
public static void main(String[] args) throws InterruptedException {
BoundedBuffer buffer = new BoundedBuffer(BUFFER_SIZE);
Thread[] producers = new Thread[NUM_PRODUCERS];
Thread[] consumers = new Thread[NUM_CONSUMERS];
for (int i = 0; i < NUM_PRODUCERS; i++) {
producers[i] = new Thread(new Producer(buffer, i));
producers[i].start();
}
for (int i = 0; i < NUM_CONSUMERS; i++) {
consumers[i] = new Thread(new Consumer(buffer, i));
consumers[i].start();
}
for (Thread producer : producers) {
producer.join();
}
for (Thread consumer : consumers) {
consumer.join();
}
}
}
Technically, this code is from an alternate reality where Java has condition variables built into the core language. In our reality, you need to write code like this to get the same effect.
Meh. We don't need to know that. I'm happy to pretend Java is more sane than it actually is.
Spurious Wake-Ups
Now that we have it working with named conditions (
notFullandnotEmpty) and usingnotifyrather thannotifyAll, it's efficient, but can we make it even better? Can we get rid of thewhileloops around thewaitcalls?
Actually, no…
Although we've now eliminated the issue where we send a wake-up to the wrong kind of thread that it needs to ignore (and all the problems that can cause), we still need the while loop because of the guarantees made by monitors.
- When you send a
notifyto a condition variable, if any threads are waiting on that condition variable, one of them will be woken up. No wake-ups are lost by the system. - But there is no promise that the
notifywake-up is the only wake-up that will happen. It's possible that a thread could wake up for no reason at all, or because an implementation decided to implementnotifywithnotifyAll. These are called spurious wake-ups.
So, whenever a waiting thread wakes up, it needs to explicitly check if the scenario it was waiting for is actually true. If not, it should go back to sleep.
Thus wait calls are always in a loop, checking the condition they're waiting for.
I last used Java in CS 60. I don't think I'm going to be writing any monitor-based code.
Well, some programming languages, like Java, build in monitors (or other more sophisticated synchronization mechanisms, like “actors” in Swift or “channels” in Go), but there is a way to bring the monitor concept to other languages, even plain old C. We'll see that in the next section.
(When logged in, completion status appears here.)