Creating and Managing Threads Using Active Objects
The example uses active objects to carry out asynchronous operations on a unique thread-safe data stream. Active objects execute in their own thread. They start their execution upon construction and wait for their processing to terminate upon destruction, if necessary. This example uses two different kinds of active objects. The first one writes ten elements of one of the C++ base types to a data output stream, ensuring that the ten elements are written in one synchronized operation. The second type of active object writes twenty elements to a data output stream, ensuring that each individual operation on the data output stream is synchronized. The code for both active object classes is presented below.
 
template <class T>
class synchronizeABunch {
 
public:
 
synchronizeABunch(RWDataOutputStream& stream, T data) // 1
:data_(data)
,dataOutputStream_(stream)
{
thread_= rwMakeThreadFunctionM(synchronizeABunch<T>,
*this,void,&synchronizeABunch<T>::func); // 2
thread_.start();
}
 
~synchronizeABunch() {
thread_.join(); // 3
thread_.raise();
}
 
void func() {
RWDataOutputStream tmpStream =
RWGuardedDataOutputStreamImp::make(dataOutputStream_); // 4
for(int i=0; i<10; i++) {
tmpStream << data_; // 5
}
}
 
private:
 
synchronizeABunch(const synchronizeABunch<T>&);
synchronizeABunch<T>& operator=(const synchronizeABunch<T>&);
 
RWDataOutputStream dataOutputStream_;
T data_;
RWThreadFunction thread_;
};
//1 The active object’s constructor takes a handle to the data output stream that is used internally as the sink of data. The handle points to the thread-safe chain of streaming elements. If the first element of the chain of streaming elements pointed to by the data output stream handle is not of type RWSynchronizedDataOutputStreamImp, then the active object does not enforce proper thread synchronization. The second parameter is the C++ base type’s value that is inserted ten times into the data output stream.
//2 Creates and starts a thread that executes the member function func().
//3 Upon destruction the active object waits for its thread to terminate execution and re-throws any exception raised while executing.
//4 Creates a temporary guarded output stream. All the operations carried out on the guarded output stream until its destruction are synchronized. The guarded output stream uses the internal lock provided by the synchronized data output stream class to which it is attached to enforce synchronization. The guarded output stream acquires the synchronized data output stream lock in its constructor and releases it in its destructor. Once the guarded output stream is constructed, any other threads accessing the guarded output stream’s synchronized data output stream are blocked.
//5 Inserts the C++ base type’s value provided at construction time into the temporary guarded data output stream.
The code for the second active object is similar to the code presented above with the exception of the func() function, which enforces synchronization at the operation level.
 
template <class T>
class synchronizeSingle {
 
public:
 
synchronizeSingle(RWDataOutputStream& stream, T data)
:data_(data)
,dataOutputStream_(stream)
{
thread_= rwMakeThreadFunctionM(synchronizeSingle<T>,
*this,void,&synchronizeSingle<T>::func);
thread_.start();
}
 
~synchronizeSingle() {
thread_.join();
thread_.raise();
}
 
void func() {
for(int i=0; i<20; i++) {
dataOutputStream_ << data_; // 1
rwYield();
}
}
 
private:
 
synchronizeSingle(const synchronizeSingle<T>&);
synchronizeSingle<T>& operator=(const synchronizeSingle<T>&);
 
RWDataOutputStream dataOutputStream_;
T data_;
RWThreadFunction thread_;
};
//1 Inserts the C++ base type’s value provided at construction time into the data output stream. After each insertion, the active object’s thread yields execution to any other thread candidate for execution. Therefore, only individual insertions are guaranteed to be synchronized because a synchronized data output stream is used as the sink of data.