Locks and Condition Variables
Although it's a little bit less elegant than building synchronization primitives into the programming language itself, it's possible to provide the same functionity as monitors using normal language features like objects or functions.
Python Example
Python, for example, has a threading module that provides a Lock class and a Condition class.
- A
Lockis a simple mutual-exclusion lock that can be acquired and released. It can be used to protect a critical section. - If we want the full power of monitors, we can use a
Conditionobject. AConditionobject is a synchronization primitive that allows threads to wait until a certain condition is true—just like we saw with monitors. In the same way that we usedwaitandnotifyinside asynchronizedmethod, we likewise need to be holding the lock associated with theConditionobject before we can callwaitornotify.
Here's an example of how we might use these in Python:
import threading
import queue
import time
import random
class BoundedBuffer:
def __init__(self, capacity):
self.buffer = queue.Queue(capacity)
self.lock = threading.Lock()
self.not_full = threading.Condition(self.lock)
self.not_empty = threading.Condition(self.lock)
def put(self, item):
with self.lock:
while self.buffer.full():
self.not_full.wait()
self.buffer.put(item)
self.not_empty.notify()
def get(self):
with self.lock:
while self.buffer.empty():
self.not_empty.wait()
item = self.buffer.get()
self.not_full.notify()
return item
def producer(buffer, id):
for i in range(10):
message = f"Message {i} from Producer {id}"
buffer.put(message)
time.sleep(random.random())
buffer.put("Bye")
def consumer(buffer, id):
while True:
message = buffer.get()
print(f"Consumer {id}: {message}")
if message == "Bye":
break
time.sleep(random.random() * 0.1)
def main():
BUFFER_SIZE = 32
NUM_PRODUCERS = 2
NUM_CONSUMERS = 2
buffer = BoundedBuffer(BUFFER_SIZE)
producers = [
threading.Thread(target=producer, args=(buffer, i))
for i in range(NUM_PRODUCERS)
]
consumers = [
threading.Thread(target=consumer, args=(buffer, i))
for i in range(NUM_CONSUMERS)
]
for t in producers + consumers:
t.start()
for t in producers + consumers:
t.join()
if __name__ == "__main__":
main()
In Python, with self.lock: is a context manager that acquires the lock before entering the block and releases it when the block is exited. This is equivalent to calling self.lock.acquire() and self.lock.release() (and also the moral equivalent of synchronized in Java).
In addition, in Python when we create a Condition object, we pass in the lock that's associated with the condition. This is the lock that needs to already be held before we can call wait or notify.
Meh. In this class we're coding in C, not Python.
C can provide the same functionality, but it's a little more mannered about it, as C doesn't have objects, only
structs and functions.
C: Mutexes and Condition Variables
POSIX provides a set of functions for creating and using mutexes and condition variables. These are the C equivalents of Python's Lock and Condition classes.
For locks, POSIX provides pthread_mutex_t and functions like pthread_mutex_init, pthread_mutex_lock, and pthread_mutex_unlock.
For condition variables, POSIX provides pthread_cond_t and functions like pthread_cond_init, pthread_cond_wait, and pthread_cond_signal.
Here's an example of how we might use these in C:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#define BUFFER_SIZE 32
#define NUM_PRODUCERS 2
#define NUM_CONSUMERS 2
struct message {
char *content;
};
struct circular_buffer {
struct message buffer[BUFFER_SIZE];
int head;
int tail;
int count;
pthread_mutex_t mutex;
pthread_cond_t not_full;
pthread_cond_t not_empty;
};
struct thread_args {
struct circular_buffer *cb;
int id;
};
/* Function prototypes */
void message_init(struct message *msg, const char *content);
void message_destroy(struct message *msg);
void circular_buffer_init(struct circular_buffer *cb);
void circular_buffer_destroy(struct circular_buffer *cb);
void *producer(void *arg);
void *consumer(void *arg);
int
main(void)
{
struct circular_buffer cb;
pthread_t prod_threads[NUM_PRODUCERS];
pthread_t cons_threads[NUM_CONSUMERS];
struct thread_args prod_args[NUM_PRODUCERS];
struct thread_args cons_args[NUM_CONSUMERS];
int i;
circular_buffer_init(&cb);
for (i = 0; i < NUM_PRODUCERS; i++) {
prod_args[i].cb = &cb;
prod_args[i].id = i;
pthread_create(&prod_threads[i], NULL, producer, &prod_args[i]);
}
for (i = 0; i < NUM_CONSUMERS; i++) {
cons_args[i].cb = &cb;
cons_args[i].id = i;
pthread_create(&cons_threads[i], NULL, consumer, &cons_args[i]);
}
for (i = 0; i < NUM_PRODUCERS; i++)
pthread_join(prod_threads[i], NULL);
for (i = 0; i < NUM_CONSUMERS; i++)
pthread_join(cons_threads[i], NULL);
circular_buffer_destroy(&cb);
return 0;
}
void
message_init(struct message *msg, const char *content)
{
msg->content = strdup(content);
}
void
message_destroy(struct message *msg)
{
free(msg->content);
msg->content = NULL;
}
void
circular_buffer_init(struct circular_buffer *cb)
{
cb->head = 0;
cb->tail = 0;
cb->count = 0;
pthread_mutex_init(&cb->mutex, NULL);
pthread_cond_init(&cb->not_full, NULL);
pthread_cond_init(&cb->not_empty, NULL);
}
void
circular_buffer_destroy(struct circular_buffer *cb)
{
pthread_mutex_destroy(&cb->mutex);
pthread_cond_destroy(&cb->not_full);
pthread_cond_destroy(&cb->not_empty);
}
void *
producer(void *arg)
{
struct thread_args *args = (struct thread_args *) arg;
struct circular_buffer *cb = args->cb;
int id = args->id;
char message_content[50];
for (int i = 0; i < 10; i++) {
snprintf(message_content, sizeof(message_content),
"Message %d from Producer %d", i, id);
pthread_mutex_lock(&cb->mutex);
while (cb->count == BUFFER_SIZE) {
pthread_cond_wait(&cb->not_full, &cb->mutex);
}
message_init(&cb->buffer[cb->tail], message_content);
cb->tail = (cb->tail + 1) % BUFFER_SIZE;
cb->count++;
pthread_cond_signal(&cb->not_empty);
pthread_mutex_unlock(&cb->mutex);
usleep(rand() % 1000000); /* Sleep up to 1 second */
}
/* Send termination message */
pthread_mutex_lock(&cb->mutex);
while (cb->count == BUFFER_SIZE) {
pthread_cond_wait(&cb->not_full, &cb->mutex);
}
message_init(&cb->buffer[cb->tail], "Bye");
cb->tail = (cb->tail + 1) % BUFFER_SIZE;
cb->count++;
pthread_cond_signal(&cb->not_empty);
pthread_mutex_unlock(&cb->mutex);
return NULL;
}
void *
consumer(void *arg)
{
struct thread_args *args = (struct thread_args *) arg;
struct circular_buffer *cb = args->cb;
int id = args->id;
struct message msg;
while (1) {
pthread_mutex_lock(&cb->mutex);
while (cb->count == 0) {
pthread_cond_wait(&cb->not_empty, &cb->mutex);
}
msg = cb->buffer[cb->head];
cb->head = (cb->head + 1) % BUFFER_SIZE;
cb->count--;
pthread_cond_signal(&cb->not_full);
pthread_mutex_unlock(&cb->mutex);
printf("Consumer %d: %s\n", id, msg.content);
if (strcmp(msg.content, "Bye") == 0) {
message_destroy(&msg);
break;
}
message_destroy(&msg);
usleep(rand() % 100000); /* Sleep up to 1/10 second */
}
return NULL;
}
When we were talking about monitors, I had a sense of deja vu. Now that we've seen the C code, I realize I did this in CS 105.
Exactly. Hopefully now you know where these things come from, which I think helps understand why they are the way they are.
Can we do more with mutexes and condition variables than we could with semaphores?
Actually, no. We can implement mutexes and condition variables using semaphores and vice versa. But mutexes and condition variables are less error-prone and easier to use, so they're more widely used in practice.
(When logged in, completion status appears here.)