This section uses examples to introduce how to coordinate the activities of two or more threads.
You can find more examples in examples\thr0200osfam\thread\sync. The following list gives the names of those examples and the classes that they exercise:
RWBarrierExample: RWBarrier, RWMutexLock
RWConditionExample: RWCondition, RWTLockGuard
RWCriticalSectionExample: RWCriticalSection, RWTLockGuard
RWFIFOMutexLockExample: RWFIFOMutexLock, RWTLockGuard
RWMutexLockExample: RWMutexLock, RWTLockGuard
RWReadersWriterLockExample: RWReadersWriterLock, RWMutexLock
RWSemaphoreExample: RWSemaphore, RWTLockGuard, RWMutexLock
RWThreadIdExample: RWThreadId, RWCondition, RWTMonitor
In Example 1 in Section 3.4.1, "Creating Threads," a single thread was created that printed a message to the console. The original thread and the newly created thread were not competing for any shared resources.
Example 28 shows how unwanted interference can occur between two or more threads accessing the same data or resources. In it, two threads simultaneously attempt to write messages to the console using standard C++ iostreams.
#include <rw/rstream.h> #include <rw/thread/rwtMakeThreadFunction.h> void hello(void) { for(int i=0;i<100;i++) { cout << "Hello " << "World!" << endl; } } int main() { // Construct two thread objects RWThreadFunction myThread1 = rwMakeThreadFunction(hello); RWThreadFunction myThread2 = rwMakeThreadFunction(hello); // Start the threads myThread1.start(); myThread2.start(); // Wait for the threads to finish myThread1.join(); myThread2.join(); return 0; }
The "Hello World!" message is intentionally broken into separate output operations for each word. When the code is executed on a system with time-slicing or multiple processors, the two new threads execute simultaneously and produce jumbled output similar to the following:
Hello World! Hello World! Hello World! Hello Hello World! World! Hello Hello World! World! Hello Hello World! World! . . .
The output from the two threads has become interleaved at the word level. The words themselves remain intact because a stream only allows one thread at a time to execute an output operation. In this example, however, a complete message requires three separate output operations. This sequence of operations is not protected from interference by other threads.
To eliminate undesirable interference, a mechanism is needed that prohibits one thread from attempting output while another thread is in the middle of its message output sequence. One such mechanism is called a mutex.
A mutex implements a form of synchronization called mutual exclusion, where the presence of one thread within a protected or critical section of code prohibits the entry of other threads into that section. A mutex can be used to synchronize thread access to shared resources and data.
A thread must acquire ownership of a mutex prior to entering the section of code protected by the mutex. Once a thread is granted ownership of the mutex, it is allowed to proceed into the protected section of code. Other threads entering this same section also attempt to acquire ownership of the mutex, but these threads are forced to wait until the current owner exits the section of code and releases ownership of the mutex.
The RWMutexLock class is the basic mutex mechanism for the Synchronization package. See Section 4.5.1, "The RWMutexLock Class," for more information about this class.
By using the RWMutexLock class, the hello() function can be modified, as in Example 29, so that each "Hello World!" message is produced without interference.
void hello(void) { // Declare a static mutex instance that will // be shared by all threads calling this function static RWMutexLock mutex; for(int i=0;i<100;i++) { // Acquire mutex to block other threads mutex.acquire(); cout << "Hello" << "World!" << endl; // Release the mutex so other threads can say "hello" mutex.release(); } }
By adding synchronization, the code now produces the desired output. Each message appears without being interleaved with the messages produced by the other thread:
Hello World! Hello World! Hello World! Hello World! Hello World! Hello World! Hello World! Hello World! Hello World! . . .
A potential problem with this second implementation is that if one of the iostream operations produces an exception, the mutex.release() statement is not executed, leaving the mutex locked forever.
One way to fix this problem is to add a try-catch block around the output operation and code a second mutex release in the catch block:
void hello(void) { // Declare a static mutex instance that will // be shared by all threads calling this function static RWMutexLock mutex; for(int i=0;i<100;i++) { // Acquire mutex to block other threads mutex.acquire(); try { cout << "Hello" << "World!" << endl; } catch(...) { // Exception occurred! // Release the mutex and rethrow mutex.release(); throw; } // Release the mutex so other threads can say "hello" mutex.release(); } }
With this code, the mutex is released, even if an exception is thrown by an iostream operation. (The mutex state might not be important once an exception has occurred, but assume that it is for the purpose of this discussion.)
This solution is sufficient for a simple example, but if unique handlers for several different exceptions are required, you must remember to code the mutex release in each catch block that you define. This can be cumbersome and can introduce difficult-to-detect coding mistakes.
The guard classes use the "resource acquisition is initialization" idiom where the object constructor is used to acquire a resource, and its destructor is used to release an acquired resource. The RWTLockGuard template class uses this idiom for automatic acquisition and release of various synchronization resources, including instances of RWMutexLock.
To use RWTLockGuard, declare and initialize a named instance of the class within the code block that the mutex is intended to protect, as in Example 31.
void hello(void) { static RWMutexLock mutex; for(int i=0;i<100;i++) { RWTLockGuard<RWMutexLock> guard(mutex); // 1 cout << "Hello" << "World!" << endl; } // 2 }
//1 | The declaration of an RWTLockGuard<RWMutexLock> instance invokes a constructor that acquires the mutex passed as an argument. |
//2 | The RWTLockGuard<RWMutexLock> instance is automatically destroyed when it goes out-of-scope-upon exit from the block where it was declared. The RWTLockGuard<RWMutexLock> destructor releases the mutex that was previously acquired. The guard instance is also destroyed if an exception occurs within this block, thus insuring that the mutex is always released. |
This version of hello() is functionally equivalent to the previous implementation, but is easier to write and easier to understand than the previous version that used the try-catch block.
In addition to the lock guard class, the Synchronization package also has try-lock and unlock guard classes. For more information on each of these guard classes see Section 4.6, "The Guard Classes."
A monitor is a passive object that has a synchronized interface for accessing a shared resource. A simple form of monitor can be constructed by encapsulating a resource or data in a C++ class that also includes a mutex. Each member function that gives access to the resource or data uses the mutex to synchronize or serialize the access operations.
In Example 32, a simple monitor class is constructed to implement a multithread-safe counter.
#include <rw/sync/RWMutexLock.h> class Counter { private: RWMutexLock mutex; int count_; public: Counter(int count=0) : count_(count) {} Counter& operator++(void) { RWMutexLock::LockGuard guard(mutex); count_++; return *this; } Counter& operator--(void) { RWMutexLock::LockGuard guard(mutex); count_--; return *this; } operator int(void) const { RWMutexLock::LockGuard guard(mutex); return count_; } };
In this example, each member function that accesses the count member first acquires the mutex to insure that other threads do not simultaneously attempt to change the count value.
NOTE: Instead of the RWTLockGuard class, this example uses the public type LockGuard that is defined by the RWMutexLock class. All of the Synchronization package classes have pre-defined guard types.
Count synchronization is necessary because, in all likelihood, the integer increment and decrement operators are not atomic. A compiler must typically generate a non-atomic sequence of instructions to read the count variable, increment it, then write it back. Without synchronization, two different threads could interleave these sequences producing undesirable results, as illustrated in Figure 23.
Thread 1 | Thread 2 | count | ||
0 | ||||
Read count |
0 | |||
Increment count |
0 | |||
Read count |
0 |
Thread 2 preempts 1 | ||
Increment count |
0 | |||
Write count |
1 |
OK so far... | ||
Write count |
1 |
Oops! Should be 2! | ||
1 |
The use of the mutex to protect the counter operations ensures that each read-modify-write sequence is not interleaved with counter instruction sequences executing in other threads.
In some environments, the operator int() function is atomic because an int value can always be fetched with a single indivisible memory access (see the previous example). However, this is only true if the environment or compiler forces the count member to be aligned on a word boundary.
Many architectures allow data fetches across word boundaries. These fetches are typically non-atomic, and while most compilers do allow you to control the word alignment for class and structure members, the typical default is to pack the members without regard to alignment. For this reason, you should synchronize any read operations involving data types whose size is greater than the smallest indivisible memory fetch (usually a byte).
To simplify the development of monitor classes, the Synchronization package defines a template class, RWTMonitor<Lock>, that can be used as a base class in new monitor class implementations. This class includes a mutex member and has public definitions for LockGuard and UnlockGuard types that accept a self-reference returned by a monitor() member function.
Using the RWTMonitor as a base class gives the implementation in Example 33.
#include <rw/sync/RWMutexLock.h> #include <rw/sync/RWTMonitor.h> class Counter : public RWTMonitor<RWMutexLock> { private: int count; public: Counter(int count=0) : count(count) {} Counter& operator++(void) { RWMutexLock::LockGuard lock(monitor()); count++; return *this; } Counter& operator--(void) { RWMutexLock::LockGuard lock(monitor()); count--; return *this; } operator int(void) const { RWMutexLock::LockGuard lock(monitor()); return count; } };
In addition to mutual exclusion, you can also synchronize threads using condition synchronization.
In condition synchronization, a thread is delayed (blocked) until the program state satisfies some predicate or condition. A key mechanism for implementing condition synchronization is the condition variable.
A condition variable is an efficient mechanism for waiting for and signaling changes in program state. The basic condition variable defines two operations:
wait-Causes the calling thread to block until another thread calls the signal function.
signal-Wakes up one waiting thread.
A condition variable also includes a signal-all, or broadcast operation that can be used to awaken all waiting threads, not just one.
In this form of condition synchronization, the mutex is used to prohibit changes to the program state upon which the synchronization condition depends. A thread subjected to condition synchronization acquires the mutex, tests the condition, and if the condition is found to be false, enters the condition-variable wait. This wait operation temporarily unlocks the mutex so that other threads can access and update the program state.
A thread that is going to change the program state acquires the mutex, updates the state, and calls the condition-variable's signal operation to wake-up a thread that is waiting. When the waiting thread is awakened, it reacquires the mutex in preparation for a reevaluation of the synchronization condition. If the condition is still found to be false, the thread again enters the wait. If the condition is found to be true, the thread has achieved synchronization and can now release the mutex and proceed.
In the Synchronization package, condition variable synchronization is included in the RWCondition class. This class combines the mutex and condition-variable interface. An RWCondition class instance does not possess its own mutex; you must supply a reference to an RWMutexLock instance when you construct each condition instance. See Section 4.5.8, "The RWCondition Class," for more information about using this class.
Condition variables are ideal for implementing producer-consumer synchronization, a common form of condition synchronization used to mediate the transfer of information between threads. Under this form of synchronization, a consumer thread attempts to get information from a producer thread or threads. If that information is not immediately available, it waits for a producer thread to signal when the information becomes available.
Similarly, a producer thread attempts to send information to a consumer thread or threads. If no consumer thread is waiting for the information or if the information cannot be buffered, then the producer thread waits for a consumer thread to signal that it has received or is ready to receive information.
Example 34 uses the Tools.h++ linked-list template RWTValSlist and the Synchronization package classes RWMutexLock and RWCondition to illustrate how producer-consumer synchronization can be used to turn a simple queue into an elegant and simple mechanism for efficiently communicating between threads.
#include <rw/tvslist.h> #include <rw/sync/RWMutexLock.h> #include <rw/sync/RWCondition.h> template <class T> class PCQueue { private: RWMutexLock mutex; // 1 RWCondition notEmpty; // 2 RWCondition notFull; RWTValSlist<T> queue; // 3 size_t maxSize; public: PCQueue(size_t size=0) // 4 : maxSize(size), notEmpty(mutex), notFull(mutex) // 5 { } T read(void) { RWMutexLock::LockGuard guard(mutex); // 6 while(queue.isEmpty()) { // 7 notEmpty.wait(); // 8 } T result = queue.removeFirst(); // 9 notFull.signal(); // 10 return result; } void write(T t) { RWMutexLock::LockGuard guard(mutex); while(maxSize != 0 && queue.entries() <= maxSize) {// 11 notFull.wait(); // 12 } queue.append(t); // 13 notEmpty.signal(); // 14 } };
//1 | Declare a mutex member that gives mutually exclusive access to the internal linked list. |
//2 | Declare two condition variables to represent the two queue conditions of interest: a queue that is not empty and a queue that is not full. |
//3 | Declare a linked list collection to use as the internal implementation of the actual queue. |
//4 | The PCQueue constructor accepts a maximum size argument. This size is used to limit the growth of the queue, if desired. A value of zero implies that no preset limit is defined for the number of entries that can be written. |
//5 | The mutex must be passed to the member initializers for the RWCondition instances so that they know what mutex to release and reacquire during a wait() operation. |
//6 | Acquire the mutex to prohibit other threads from changing the internal linked list while the list condition is evaluated and the next available entry is removed. |
//7 | If the list is empty, proceed to the wait operation described in the next comment. Otherwise, proceed to step #9 and remove an entry from the beginning of the list. |
//8 | Call the wait() member on the notEmpty condition variable. The wait function unlocks the mutex and begins waiting for another thread to signal that an entry has been added to the list. |
//9 | Remove an entry from the beginning of the list. |
//10 | Call the signal() member on the notFull condition variable to inform a waiting writer, if any, that the list is no longer full because an entry has just been removed! |
//11 | If the maxSize is zero, which indicates that the queue is not size limited, or if the current number of entries in the list is less than the established maximum size, proceed to step #13 and write the new value at the end of the list. |
//12 | Call the wait() member on the notFull condition variable. The wait function unlocks the mutex and begins waiting for another thread to signal that an entry has been removed from the list. |
//13 | Add the new value to the end of the list. |
//14 | Call the signal() member on the notEmpty condition variable to inform a waiting reader, if any, that the list can no longer be empty because an entry has just been added! |
A full implementation of a producer-consumer queue would also have member functions for testing whether the queue was currently empty or full, thereby allowing threads to avoid blocking during a read or write operation. These functions and more have already been defined in an existing family of producer-consumer queue and stack classes in the Interthread Communication package. These classes are described in Section 5.6, "The Producer-Consumer Classes."
©Copyright 2000, Rogue Wave Software, Inc.
Contact Rogue Wave about documentation or support issues.