CS 134

Locks and Condition Variables

Although it's a little bit less elegant than building synchronization primitives into the programming language itself, it's possible to provide the same functionity as monitors using normal language features like objects or functions.

Python Example

Python, for example, has a threading module that provides a Lock class and a Condition class.

  • A Lock is a simple mutual-exclusion lock that can be acquired and released. It can be used to protect a critical section.
  • If we want the full power of monitors, we can use a Condition object. A Condition object is a synchronization primitive that allows threads to wait until a certain condition is true—just like we saw with monitors. In the same way that we used wait and notify inside a synchronized method, we likewise need to be holding the lock associated with the Condition object before we can call wait or notify.

Here's an example of how we might use these in Python:

import threading
import queue
import time
import random

class BoundedBuffer:
    def __init__(self, capacity):
        self.buffer = queue.Queue(capacity)
        self.lock = threading.Lock()
        self.not_full = threading.Condition(self.lock)
        self.not_empty = threading.Condition(self.lock)

    def put(self, item):
        with self.lock:
            while self.buffer.full():
                self.not_full.wait()
            self.buffer.put(item)
            self.not_empty.notify()

    def get(self):
        with self.lock:
            while self.buffer.empty():
                self.not_empty.wait()
            item = self.buffer.get()
            self.not_full.notify()
            return item

def producer(buffer, id):
    for i in range(10):
        message = f"Message {i} from Producer {id}"
        buffer.put(message)
        time.sleep(random.random())
    buffer.put("Bye")

def consumer(buffer, id):
    while True:
        message = buffer.get()
        print(f"Consumer {id}: {message}")
        if message == "Bye":
            break
        time.sleep(random.random() * 0.1)

def main():
    BUFFER_SIZE = 32
    NUM_PRODUCERS = 2
    NUM_CONSUMERS = 2

    buffer = BoundedBuffer(BUFFER_SIZE)

    producers = [
        threading.Thread(target=producer, args=(buffer, i))
        for i in range(NUM_PRODUCERS)
    ]
    consumers = [
        threading.Thread(target=consumer, args=(buffer, i))
        for i in range(NUM_CONSUMERS)
    ]

    for t in producers + consumers:
        t.start()

    for t in producers + consumers:
        t.join()

if __name__ == "__main__":
    main()

In Python, with self.lock: is a context manager that acquires the lock before entering the block and releases it when the block is exited. This is equivalent to calling self.lock.acquire() and self.lock.release() (and also the moral equivalent of synchronized in Java).

In addition, in Python when we create a Condition object, we pass in the lock that's associated with the condition. This is the lock that needs to already be held before we can call wait or notify.

  • Goat speaking

    Meh. In this class we're coding in C, not Python.

  • PinkRobot speaking

    C can provide the same functionality, but it's a little more mannered about it, as C doesn't have objects, only structs and functions.

C: Mutexes and Condition Variables

POSIX provides a set of functions for creating and using mutexes and condition variables. These are the C equivalents of Python's Lock and Condition classes.

For locks, POSIX provides pthread_mutex_t and functions like pthread_mutex_init, pthread_mutex_lock, and pthread_mutex_unlock.

For condition variables, POSIX provides pthread_cond_t and functions like pthread_cond_init, pthread_cond_wait, and pthread_cond_signal.

Here's an example of how we might use these in C:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

#define BUFFER_SIZE 32
#define NUM_PRODUCERS 2
#define NUM_CONSUMERS 2

struct message {
        char *content;
};

struct circular_buffer {
        struct message buffer[BUFFER_SIZE];
        int head;
        int tail;
        int count;
        pthread_mutex_t mutex;
        pthread_cond_t not_full;
        pthread_cond_t not_empty;
};

struct thread_args {
        struct circular_buffer *cb;
        int id;
};

/* Function prototypes */
void message_init(struct message *msg, const char *content);
void message_destroy(struct message *msg);
void circular_buffer_init(struct circular_buffer *cb);
void circular_buffer_destroy(struct circular_buffer *cb);
void *producer(void *arg);
void *consumer(void *arg);

int
main(void)
{
        struct circular_buffer cb;
        pthread_t prod_threads[NUM_PRODUCERS];
        pthread_t cons_threads[NUM_CONSUMERS];
        struct thread_args prod_args[NUM_PRODUCERS];
        struct thread_args cons_args[NUM_CONSUMERS];
        int i;

        circular_buffer_init(&cb);

        for (i = 0; i < NUM_PRODUCERS; i++) {
                prod_args[i].cb = &cb;
                prod_args[i].id = i;
                pthread_create(&prod_threads[i], NULL, producer, &prod_args[i]);
        }

        for (i = 0; i < NUM_CONSUMERS; i++) {
                cons_args[i].cb = &cb;
                cons_args[i].id = i;
                pthread_create(&cons_threads[i], NULL, consumer, &cons_args[i]);
        }

        for (i = 0; i < NUM_PRODUCERS; i++)
                pthread_join(prod_threads[i], NULL);

        for (i = 0; i < NUM_CONSUMERS; i++)
                pthread_join(cons_threads[i], NULL);

        circular_buffer_destroy(&cb);

        return 0;
}

void
message_init(struct message *msg, const char *content)
{
        msg->content = strdup(content);
}

void
message_destroy(struct message *msg)
{
        free(msg->content);
        msg->content = NULL;
}

void
circular_buffer_init(struct circular_buffer *cb)
{
        cb->head = 0;
        cb->tail = 0;
        cb->count = 0;
        pthread_mutex_init(&cb->mutex, NULL);
        pthread_cond_init(&cb->not_full, NULL);
        pthread_cond_init(&cb->not_empty, NULL);
}

void
circular_buffer_destroy(struct circular_buffer *cb)
{
        pthread_mutex_destroy(&cb->mutex);
        pthread_cond_destroy(&cb->not_full);
        pthread_cond_destroy(&cb->not_empty);
}

void *
producer(void *arg)
{
        struct thread_args *args = (struct thread_args *) arg;
        struct circular_buffer *cb = args->cb;
        int id = args->id;
        char message_content[50];

        for (int i = 0; i < 10; i++) {
                snprintf(message_content, sizeof(message_content),
                         "Message %d from Producer %d", i, id);

                pthread_mutex_lock(&cb->mutex);
                while (cb->count == BUFFER_SIZE) {
                        pthread_cond_wait(&cb->not_full, &cb->mutex);
                }

                message_init(&cb->buffer[cb->tail], message_content);
                cb->tail = (cb->tail + 1) % BUFFER_SIZE;
                cb->count++;

                pthread_cond_signal(&cb->not_empty);
                pthread_mutex_unlock(&cb->mutex);

                usleep(rand() % 1000000); /* Sleep up to 1 second */
        }

        /* Send termination message */
        pthread_mutex_lock(&cb->mutex);
        while (cb->count == BUFFER_SIZE) {
                pthread_cond_wait(&cb->not_full, &cb->mutex);
        }

        message_init(&cb->buffer[cb->tail], "Bye");
        cb->tail = (cb->tail + 1) % BUFFER_SIZE;
        cb->count++;

        pthread_cond_signal(&cb->not_empty);
        pthread_mutex_unlock(&cb->mutex);

        return NULL;
}

void *
consumer(void *arg)
{
        struct thread_args *args = (struct thread_args *) arg;
        struct circular_buffer *cb = args->cb;
        int id = args->id;
        struct message msg;

        while (1) {
                pthread_mutex_lock(&cb->mutex);
                while (cb->count == 0) {
                        pthread_cond_wait(&cb->not_empty, &cb->mutex);
                }

                msg = cb->buffer[cb->head];
                cb->head = (cb->head + 1) % BUFFER_SIZE;
                cb->count--;

                pthread_cond_signal(&cb->not_full);
                pthread_mutex_unlock(&cb->mutex);

                printf("Consumer %d: %s\n", id, msg.content);

                if (strcmp(msg.content, "Bye") == 0) {
                        message_destroy(&msg);
                        break;
                }

                message_destroy(&msg);
                usleep(rand() % 100000); /* Sleep up to 1/10 second */
        }

        return NULL;
}

Do you dimly recall seeing these functions before, perhaps in CS 105?

  • Hedgehog speaking

    When we were talking about monitors, I had a sense of deja vu. Now that we've seen the C code, I realize I did this in CS 105.

  • PinkRobot speaking

    Exactly. Hopefully now you know where these things come from, which I think helps understand why they are the way they are.

  • Duck speaking

    Can we do more with mutexes and condition variables than we could with semaphores?

  • PinkRobot speaking

    Actually, no. We can implement mutexes and condition variables using semaphores and vice versa. But mutexes and condition variables are less error-prone and easier to use, so they're more widely used in practice.

(When logged in, completion status appears here.)