Creating and Managing Threads Using Active Objects
The example uses active objects to carry out asynchronous operations on a unique thread-safe data stream. Active objects execute in their own thread. They start their execution upon construction and wait for their processing to terminate upon destruction, if necessary. This example uses two different kinds of active objects. The first one writes ten elements of one of the C++ base types to a data output stream, ensuring that the ten elements are written in one synchronized operation. The second type of active object writes twenty elements to a data output stream, ensuring that each individual operation on the data output stream is synchronized. The code for both active object classes is presented below.
template <class T>
class synchronizeABunch {
public:
synchronizeABunch(RWDataOutputStream& stream, T data) // 1
:data_(data)
,dataOutputStream_(stream)
{
thread_= rwMakeThreadFunctionM(synchronizeABunch<T>,
*this,void,&synchronizeABunch<T>::func); // 2
thread_.start();
}
~synchronizeABunch() {
thread_.join(); // 3
thread_.raise();
}
void func() {
RWDataOutputStream tmpStream =
RWGuardedDataOutputStreamImp::make(dataOutputStream_); // 4
for(int i=0; i<10; i++) {
tmpStream << data_; // 5
}
}
private:
synchronizeABunch(const synchronizeABunch<T>&);
synchronizeABunch<T>& operator=(const synchronizeABunch<T>&);
RWDataOutputStream dataOutputStream_;
T data_;
RWThreadFunction thread_;
};
The code for the second active object is similar to the code presented above with the exception of the func() function, which enforces synchronization at the operation level.
template <class T>
class synchronizeSingle {
public:
synchronizeSingle(RWDataOutputStream& stream, T data)
:data_(data)
,dataOutputStream_(stream)
{
thread_= rwMakeThreadFunctionM(synchronizeSingle<T>,
*this,void,&synchronizeSingle<T>::func);
thread_.start();
}
~synchronizeSingle() {
thread_.join();
thread_.raise();
}
void func() {
for(int i=0; i<20; i++) {
dataOutputStream_ << data_; // 1
rwYield();
}
}
private:
synchronizeSingle(const synchronizeSingle<T>&);
synchronizeSingle<T>& operator=(const synchronizeSingle<T>&);
RWDataOutputStream dataOutputStream_;
T data_;
RWThreadFunction thread_;
};