CS 134

Monitors

Rather than using semaphores to protect critical sections, monitors provide a higher-level abstraction for synchronization. Monitors are a more object-oriented approach to synchronization, where a monitor is a class that encapsulates data that needs to be protected so that only one thread can access it at a time.

Here's a timeline of the development of monitors:

  • 1965: Edsger Dijkstra introduces the concept of semaphores in his paper Cooperating sequential processes, laying the foundation for synchronization primitives.
  • 1972: Tony Hoare publishes Towards a theory of parallel programming, discussing the idea of conditional critical regions, which later evolve into monitors.
  • 1973: Per Brinch Hansen develops the concept of monitors and implements them in the Concurrent Pascal programming language.
  • 1975: Tony Hoare formalizes the concept of monitors in his paper Monitors: An Operating System Structuring Concept, describing them as a synchronization construct for shared data.
  • 1980s–1990s: Monitors gain wider adoption and are incorporated into various programming languages, such as Ada, Java, and C#.
    • Technically, Java uses a(n over) simplified version of monitors.

We'll use Java as an example for showing how monitors work, since it's probably familiar to you. We can say that a Java class acts as a monitor if all its methods use the synchronized keyword. This keyword ensures that only one thread can access the object's data at a time.

Here's an example BankAccount class that protects its balance with by being a monitor:

class BankAccount {
    private int balance;

    public BankAccount(int initialBalance) {
        balance = initialBalance;
    }

    public synchronized void deposit(int amount) {
        balance += amount;
    }

    public synchronized void withdraw(int amount) {
        balance -= amount;
    }

    public synchronized int getBalance() {
        return balance;
    }
}

Notice that other than adding a keyword to our code, we didn't have to do anything special to make this class thread-safe, which contrasts with semaphores, where we'd have to declare a semaphore variable for each object, initialize it to the right value, and then remember to acquire and release the semaphore in the right places.

  • Duck speaking

    But behind the scenes, Java is using semaphores or something like them, right?

  • PinkRobot speaking

    Right. Monitors are a higher-level abstraction that can be implemented using semaphores or other lower-level synchronization primitives.

  • Dog speaking

    The key is that it's easy to use monitors, and less error-prone. I like it when people make things easier for me!

  • Cat speaking

    It solves the critical section problem, but is it good enough to do the producer–consumer problem?

  • PinkRobot speaking

    Let's see…

Producer–Consumer, First Try

Here's a first crack at recreating the producer–consumer problem we did on the previous page using C and semaphores, but this time using Java with a monitor class BoundedBuffer.

  • BlueRobot speaking

    Your goal is to skim this code and get the big ideas, not carefully check every detail.

import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;

class BoundedBuffer {
    private final Queue<String> buffer;
    private final int capacity;

    public BoundedBuffer(int capacity) {
        this.buffer = new LinkedList<>();
        this.capacity = capacity;
    }

    public synchronized boolean put(String message) {
        if (buffer.size() == capacity) {        // Fail if buffer is full
            return false;
        }
        buffer.add(message);
        return true;
    }

    public synchronized String get() {
        if (buffer.isEmpty()) {                 // Fail if buffer is empty
            return null;
        }
        return buffer.remove();
    }
}

class Producer implements Runnable {
    private final BoundedBuffer buffer;
    private final int id;
    private final Random random = new Random();

    Producer(BoundedBuffer buffer, int id) {
        this.buffer = buffer;
        this.id = id;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i + " from Producer " + id;
                while (!buffer.put(message)) {      // Wait until successful
                    Thread.yield();
                }
                System.out.println("Producer " + id + " added: " + message);
                Thread.sleep(random.nextInt(1000));
            }
            while (!buffer.put("Bye")) {            // Wait until successful
                Thread.yield();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

class Consumer implements Runnable {
    private final BoundedBuffer buffer;
    private final int id;
    private final Random random = new Random();

    Consumer(BoundedBuffer buffer, int id) {
        this.buffer = buffer;
        this.id = id;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String message = null;
                while (message == null) {
                    message = buffer.get();
                    if (message == null) {          // Wait until successful
                        Thread.yield();
                    }
                }
                System.out.println("Consumer " + id + ": " + message);
                if ("Bye".equals(message)) {
                    break;
                }
                Thread.sleep(random.nextInt(100));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

public class ProducerConsumerDemo {
    private static final int BUFFER_SIZE = 32;
    private static final int NUM_PRODUCERS = 2;
    private static final int NUM_CONSUMERS = 2;

    public static void main(String[] args) throws InterruptedException {
        BoundedBuffer buffer = new BoundedBuffer(BUFFER_SIZE);
        Thread[] producers = new Thread[NUM_PRODUCERS];
        Thread[] consumers = new Thread[NUM_CONSUMERS];

        for (int i = 0; i < NUM_PRODUCERS; i++) {
            producers[i] = new Thread(new Producer(buffer, i));
            producers[i].start();
        }

        for (int i = 0; i < NUM_CONSUMERS; i++) {
            consumers[i] = new Thread(new Consumer(buffer, i));
            consumers[i].start();
        }

        for (Thread producer : producers) {
            producer.join();
        }

        for (Thread consumer : consumers) {
            consumer.join();
        }
    }
}

This code for the bounded buffer gets the job done because the synchronized keyword ensures that only one thread can access the BoundedBuffer object at a time. But something about it is inefficient. (Hint: Spin spin spin.)

Busy Waiting

A problem with this code is that while it solves the critical-section problem (i.e., protects access to the buffer), the code is inefficient. The producer and consumer threads are both spinning in busy-wait loops, checking whether the buffer is full or empty. This problem is exactly the same one we saw with spinlocks, where a thread is spinning in a loop, checking if the lock is available.

We need a way for threads to go to sleep (a.k.a. block) and be woken up when the buffer is no longer full or empty.

  • Jo speaking

    Java's got you covered!!

Adding Wait and NotifyAll

Monitor classes in Java have an additional trick up their sleeve: the wait and notifyAll methods.

  • wait() causes the current thread to wait until another thread calls notifyAll() on the same object. It's stepping away from the critical section but ready to leap back in when it's notified.
  • notifyAll() wakes up all threads that are waiting on the object. It's like saying, “Hey, everyone, the buffer is no longer full or empty, come take a look!”

You can think of it a bit like there being a bell that can be rung. When you ring the bell, everyone who's waiting for it to be rung wakes up. (Each Java object has its own “bell”, but only one each.)

Here's the code:

import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;

class BoundedBuffer {
    private final Queue<String> buffer;
    private final int capacity;

    public BoundedBuffer(int capacity) {
        this.buffer = new LinkedList<>();
        this.capacity = capacity;
    }

    public synchronized void put(String message) throws InterruptedException {
        while (buffer.size() == capacity) { // Sleep until buffer is not full
            wait();
        }
        buffer.add(message);
        notifyAll();                        // Wake everyone up
    }

    public synchronized String get() throws InterruptedException {
        while (buffer.isEmpty()) {          // Sleep untill buffer is not empty
            wait();
        }
        String message = buffer.remove();
        notifyAll();
        return message;                     // Wake everyone up
    }
}

class Producer implements Runnable {
    private final BoundedBuffer buffer;
    private final int id;
    private final Random random = new Random();

    Producer(BoundedBuffer buffer, int id) {
        this.buffer = buffer;
        this.id = id;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i + " from Producer " + id;
                buffer.put(message);
                Thread.sleep(random.nextInt(1000));
            }
            buffer.put("Bye");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

class Consumer implements Runnable {
    private final BoundedBuffer buffer;
    private final int id;
    private final Random random = new Random();

    Consumer(BoundedBuffer buffer, int id) {
        this.buffer = buffer;
        this.id = id;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String message = buffer.get();
                System.out.println("Consumer " + id + ": " + message);
                if ("Bye".equals(message)) {
                    break;
                }
                Thread.sleep(random.nextInt(100));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

public class BoundedBufferDemo {
    private static final int BUFFER_SIZE = 32;
    private static final int NUM_PRODUCERS = 2;
    private static final int NUM_CONSUMERS = 2;

    public static void main(String[] args) throws InterruptedException {
        BoundedBuffer buffer = new BoundedBuffer(BUFFER_SIZE);
        Thread[] producers = new Thread[NUM_PRODUCERS];
        Thread[] consumers = new Thread[NUM_CONSUMERS];

        for (int i = 0; i < NUM_PRODUCERS; i++) {
            producers[i] = new Thread(new Producer(buffer, i));
            producers[i].start();
        }

        for (int i = 0; i < NUM_CONSUMERS; i++) {
            consumers[i] = new Thread(new Consumer(buffer, i));
            consumers[i].start();
        }

        for (Thread producer : producers) {
            producer.join();
        }

        for (Thread consumer : consumers) {
            consumer.join();
        }
    }
}

An important aspect of this design is that there is only one bell to ring, so we ring the same bell when the buffer is either no longer full or empty. So some threads get woken up needlessly. For that reason, you can see that all the wait() calls are inside while loops, so that when a thread wakes up, it checks to see if it still needs to wait.

  • Duck speaking

    Waking everyone up seems terrible. Why not just wake up one thread?

  • Jo speaking

    Java's got you covered!!!

Java also provides another method, notify(), which wakes up only one thread.

In OnlineGDB, fork the code try changing notifyAll() to notify() in the put and get methods and see what happens.

What do you think about this change from notifyAll() to notify()?

  • Duck speaking

    I'm pretty sure it should work. I know the worry is that we'll wake up the wrong kind of thread (wake up the put() code when the bell is saying that there's an opportunity to get()), but I don't think that can happen, because the buffer can't be both full and empty at the same time.

If you want to see the problem actually happen, make the following changes:

  • Remove the Thread.sleep lines from the Producer and Consumer classes.
  • Change the BUFFER_SIZE to 1.

When you try these changes, be kind to Online GDB! If/when it gets stuck, hit the Stop button to stop the program (or press Ctrl+C in the output terminal).

The Lost-Wake-Up Problem

The problem here is that we only wake up one thread, but it might not be the right thread. Dropping a needed wake-up on the floor is called the lost-wake-up problem.

In this specific problem, we could reason that lost wake-ups can't happen when there is a single producer and consumer, but when there is more than one of either, we can run into trouble.

  • Duck speaking

    Surely you're going to give more explanation than that, right?

  • PinkRobot speaking

    Actually, no. While we could lay out the exact sequence of events, we'd get lost in the details. The reality is that human beings are terrible at envisioning possible interleavings of concurrent tasks and being confident they're correct.

  • BlueRobot speaking

    If you really want to know, catch Prof. Melissa in the lab and she'll walk you through it.

  • Goat speaking

    Meh. I don't need to know. I'll just use notifyAll() and be done with it.

The crux of our problem in this case was that we had two different events we were looking for:

  • The buffer is no longer full
  • The buffer is no longer empty

It would be better if instead of having one bell to ring, we had two bells, one for each event. That way, we could ring the right bell for the right event.

Condition Variables

Let's remember two things. First, we now want to have separate named conditions, one for being full and one for being empty. Second, this functionality is being designed by programming-language designers. To programming language folks, the obvious named thing is a variable, so they called this feature condition variables.

We can imagine declaring them as

    private final ConditionVariable notFull = new ConditionVariable();
    private final ConditionVariable notEmpty = new ConditionVariable();

and instead of writing wait() and notify(), we'd write wait(notFull) and notify(notFull).

  • Duck speaking

    What's stored in these condition variables?

  • PinkRobot speaking

    We don't have to worry about that to use them. We read the above lines as “wait until the buffer is not full” and “notify that the buffer is not full”. The details of how that's implemented are hidden from us.

  • BlueRobot speaking

    But it would be some kind of queue of sleeping threads waiting for the condition to be true.

Other than that, our code is exactly the same. If you want to check it out, click the button below.

import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import alternatereality.java.concurrent.ConditionVariable;

class BoundedBuffer {
    private final Queue<String> buffer;
    private final int capacity;
    private final ConditionVariable notFull = new ConditionVariable();
    private final ConditionVariable notEmpty = new ConditionVariable();

    public BoundedBuffer(int capacity) {
        this.buffer = new LinkedList<>();
        this.capacity = capacity;
    }

    public synchronized void put(String message) throws InterruptedException {
        while (buffer.size() == capacity) {
            wait(notFull);
        }
        buffer.add(message);
        notifyAll(notEmpty);
    }

    public synchronized String get() throws InterruptedException {
        while (buffer.isEmpty()) {
            wait(notEmpty);
        }
        String message = buffer.remove();
        notifyAll(notFull);
        return message;
    }
}

class Producer implements Runnable {
    private final BoundedBuffer buffer;
    private final int id;
    private final Random random = new Random();

    Producer(BoundedBuffer buffer, int id) {
        this.buffer = buffer;
        this.id = id;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i + " from Producer " + id;
                buffer.put(message);
                Thread.sleep(random.nextInt(1000));
            }
            buffer.put("Bye");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

class Consumer implements Runnable {
    private final BoundedBuffer buffer;
    private final int id;
    private final Random random = new Random();

    Consumer(BoundedBuffer buffer, int id) {
        this.buffer = buffer;
        this.id = id;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String message = buffer.get();
                System.out.println("Consumer " + id + ": " + message);
                if ("Bye".equals(message)) {
                    break;
                }
                Thread.sleep(random.nextInt(100));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

public class ProducerConsumerDemo {
    private static final int BUFFER_SIZE = 32;
    private static final int NUM_PRODUCERS = 2;
    private static final int NUM_CONSUMERS = 2;

    public static void main(String[] args) throws InterruptedException {
        BoundedBuffer buffer = new BoundedBuffer(BUFFER_SIZE);
        Thread[] producers = new Thread[NUM_PRODUCERS];
        Thread[] consumers = new Thread[NUM_CONSUMERS];

        for (int i = 0; i < NUM_PRODUCERS; i++) {
            producers[i] = new Thread(new Producer(buffer, i));
            producers[i].start();
        }

        for (int i = 0; i < NUM_CONSUMERS; i++) {
            consumers[i] = new Thread(new Consumer(buffer, i));
            consumers[i].start();
        }

        for (Thread producer : producers) {
            producer.join();
        }

        for (Thread consumer : consumers) {
            consumer.join();
        }
    }
}
  • Rabbit speaking

    Technically, this code is from an alternate reality where Java has condition variables built into the core language. In our reality, you need to write code like this to get the same effect.

  • Goat speaking

    Meh. We don't need to know that. I'm happy to pretend Java is more sane than it actually is.

Spurious Wake-Ups

  • Duck speaking

    Now that we have it working with named conditions (notFull and notEmpty) and using notify rather than notifyAll, it's efficient, but can we make it even better? Can we get rid of the while loops around the wait calls?

  • PinkRobot speaking

    Actually, no…

Although we've now eliminated the issue where we send a wake-up to the wrong kind of thread that it needs to ignore (and all the problems that can cause), we still need the while loop because of the guarantees made by monitors.

  • When you send a notify to a condition variable, if any threads are waiting on that condition variable, one of them will be woken up. No wake-ups are lost by the system.
  • But there is no promise that the notify wake-up is the only wake-up that will happen. It's possible that a thread could wake up for no reason at all, or because an implementation decided to implement notify with notifyAll. These are called spurious wake-ups.

So, whenever a waiting thread wakes up, it needs to explicitly check if the scenario it was waiting for is actually true. If not, it should go back to sleep.

Thus wait calls are always in a loop, checking the condition they're waiting for.

  • Hedgehog speaking

    I last used Java in CS 60. I don't think I'm going to be writing any monitor-based code.

  • PinkRobot speaking

    Well, some programming languages, like Java, build in monitors (or other more sophisticated synchronization mechanisms, like “actors” in Swift or “channels” in Go), but there is a way to bring the monitor concept to other languages, even plain old C. We'll see that in the next section.

(When logged in, completion status appears here.)