Using the Condition Variable for Producer-Consumer Synchronization
Condition variables are ideal for implementing producer-consumer synchronization, a common form of condition synchronization used to mediate the transfer of information between threads. Under this form of synchronization, a consumer thread attempts to get information from a producer thread or threads. If that information is not immediately available, it waits for a producer thread to signal when the information becomes available.
Similarly, a producer thread attempts to send information to a consumer thread or threads. If no consumer thread is waiting for the information or if the information cannot be buffered, then the producer thread waits for a consumer thread to signal that it has received or is ready to receive information.
Example 31 uses the linked-list template RWTValSlist, from the Essential Tools Module, and the Synchronization package classes RWMutexLock and RWCondition to illustrate how producer-consumer synchronization can be used to turn a simple queue into an elegant and simple mechanism for efficiently communicating between threads.
Example 31 – Using condition variables for producer-consumer synchronization
#include <rw/tvslist.h>
#include <rw/sync/RWMutexLock.h>
#include <rw/sync/RWCondition.h>
 
template <class T> class PCQueue {
private:
RWMutexLock mutex; // 1
RWCondition notEmpty; // 2
RWCondition notFull;
RWTValSlist<T> queue; // 3
size_t maxSize;
public:
PCQueue(size_t size=0) // 4
: maxSize(size), notEmpty(mutex), notFull(mutex) // 5
{
}
T read(void)
{
RWMutexLock::LockGuard guard(mutex); // 6
while(queue.isEmpty()) { // 7
notEmpty.wait(); // 8
}
T result = queue.removeFirst(); // 9
notFull.signal(); // 10
return result;
}
void write(T t)
{
RWMutexLock::LockGuard guard(mutex);
while(maxSize != 0 && queue.entries() <= maxSize) { // 11
notFull.wait(); // 12
}
queue.append(t); // 13
notEmpty.signal(); // 14
}
};
//1 Declares a mutex member that gives mutually exclusive access to the internal linked list.
//2 Declares two condition variables to represent the two queue conditions of interest: a queue that is not empty and a queue that is not full.
//3 Declares a linked list collection to use as the internal implementation of the actual queue.
//4 The PCQueue constructor accepts a maximum size argument. This size is used to limit the growth of the queue, if desired. A value of zero implies that no preset limit is defined for the number of entries that can be written.
//5 The mutex must be passed to the member initializers for the RWCondition instances so that they know what mutex to release and reacquire during a wait() operation.
//6 Acquires the mutex to prohibit other threads from changing the internal linked list while the list condition is evaluated and the next available entry is removed.
//7 If the list is empty, proceeds to the wait operation described in the next comment. Otherwise, proceeds to step #9 and removes an entry from the beginning of the list.
//8 Calls the wait() member on the notEmpty condition variable. The wait function unlocks the mutex and begins waiting for another thread to signal that an entry has been added to the list.
//9 Removes an entry from the beginning of the list.
//10 Calls the signal() member on the notFull condition variable to inform a waiting writer, if any, that the list is no longer full because an entry has just been removed!
//11 If the maxSize is zero, which indicates that the queue is not size limited, or if the current number of entries in the list is less than the established maximum size, proceeds to step #13 and writes the new value at the end of the list.
//12 Calls the wait() member on the notFull condition variable. The wait function unlocks the mutex and begins waiting for another thread to signal that an entry has been removed from the list.
//13 Adds the new value to the end of the list.
//14 Calls the signal() member on the notEmpty condition variable to inform a waiting reader, if any, that the list can no longer be empty because an entry has just been added!
A full implementation of a producer-consumer queue would also have member functions for testing whether the queue was currently empty or full, thereby allowing threads to avoid blocking during a read or write operation. These functions and more have already been defined in an existing family of producer-consumer queue and stack classes in the Interthread Communication package. These classes are described in The Producer‑Consumer Classes.