Using Interthread Communication
This section uses a series of simple examples to introduce and explain the concepts and classes involved in interthread communication.
You can find more examples in examples\thr0200osfam\itc. The following list gives the names of those examples and the classes that they exercise:
activobj: RWTIOUResult, RWTIOUTrap, RWTPCValQueue
balance: RWTPCValQueue
iouescro: rwtMakeIOUCallback, RWTIOUResult, RWTIOUEscrow
ioureslt: rwtMakeIOUCallback, RWTIOUResult
ioutrap: rwtMakeIOUCallback, RWTIOUTrap, RWTIOUResult
prodcons: RWTPCValQueue
servpool: rwtMakeIOUCallback,RWTIOUTrap, RWTIOUResult
The concept of producer-consumer synchronization was introduced in Section 4.4.5, "Using Condition Variables." This form of synchronization can be used to coordinate communication between two or more threads. By combining producer-consumer synchronization semantics with a simple templatized queue, you can link threads together into a network of cooperating processing elements.
For more information on producer-consumer synchronization, see Section 5.6, "The Producer-Consumer Classes."
The RWTPCValQueue class is one of a family of producer-consumer classes that includes thread-safe, synchronized, and buffered exchange of data. Each of the classes uses:
Read operations that block the calling thread if the underlying buffer is empty.
Write operations that block if the underlying buffer is full.
The maximum capacity of the buffer can be specified at the time of construction. The default is a buffer whose capacity is limited only by memory.
Example 46 simulates a manufacturing operation where an RWTPCValQueue is used to represent a warehouse of limited capacity that accepts widgets from a production function and then sends these widgets to a shipping function. The production of widgets occurs at a fixed rate, while the shipping rate of widgets varies from widget to widget. This difference in read and write rates eventually results in one function being forced to wait on the other.
#include <rw/rstream.h> #include <stdlib.h> // For rand() and srand() #include <rw/thread/rwtMakeThreadFunction.h> #include <rw/itc/RWTPCValQueue.h> #include <rw/sync/RWMutexLock.h> #include <rw/cstring.h> typedef int Widget; void message(RWCString& message) { // 1 static RWMutexLock mutex; // Make output operation atomic! RWMutexLock::LockGuard lock(mutex); cout << message << endl; } void produce(RWTPCValQueue<Widget>& warehouse, size_t runSize) { for (size_t i=0; i<runSize; i++) { if (!warehouse.canWrite()) // 2 message("Warehouse full!"); // Print bar graph of inventory size message(RWCString('*',warehouse.entries())); warehouse.write(Widget(i)); // 3 rwSleep(100); // Fixed time to produce widgets } warehouse.write(Widget(-1)); // Tell shipping the run is done! message("Production done!"); } void ship(RWTPCValQueue<Widget>& warehouse) { // Seed random number sequence using thread id ::srand(rwThreadHash(rwThreadId())); Widget widget; do { rwSleep(::rand()%200); // Random time for each shipment if (!warehouse.canRead()) // 4 message("Warehouse empty!"); widget = warehouse.read(); // 5 } while (widget != -1); message("Shipping done!"); } void main() { // The warehouse can only hold five widgets! RWTPCValQueue<Widget> warehouse(5); // 6 RWThreadFunction production; production = rwtMakeThreadFunctionGA2(void,produce, RWTPCValQueue<Widget>&, warehouse, size_t,100); RWThreadFunction shipping; shipping = rwtMakeThreadFunctionGA1(void,ship, RWTPCValQueue<Widget>&, warehouse); production.start(); rwSleep(5*100); // Let production fill the warehouse! shipping.start(); production.join(); shipping.join(); }
//1 | A simple routine for synchronizing message output. |
//2 | Test to see if the warehouse queue is currently full, so that an alert can be sent before being blocked inside the write function. |
//3 | Put a widget into the warehouse, and wait if it is full. |
//4 | Test to see if the warehouse queue is currently empty, so that an alert can be sent before being blocked inside the read function. |
//5 | Get a widget from the warehouse, and wait if it is empty. |
//6 | Construct a queue to represent the warehouse and limit the queue length to five entries. |
©Copyright 2000, Rogue Wave Software, Inc.
Contact Rogue Wave about documentation or support issues.