Active Objects and Data Input Streams
The example in this section implements the previous example’s corresponding input operation. It creates a synchronized data input stream
RWSynchronizedDataInputStreamImp that connects to an
RWNativeDataFromByteInputStreamImp stream, which restores C++ base types from a sequences of bytes. The
RWNativeDataFromByteInputStreamImp stream is then connected to a buffered binary input stream that reads bytes from the file created in the previous example. The class
RWBufferedByteInputStreamImp is connected to class
RWByteFromStreambufInputStreamImp to provide the buffered byte input stream.
This example requires the Streams package, as well as the Execution Tracing, Thread-compatible Exception, Synchronization, Smart Pointer, Threading, Functor, and Interthread Communication packages from the Threads Module.
Figure 16 is a representation of the chain of streaming elements used in this example.
As in the previous example, this example uses active objects to carry out asynchronous operations on a unique thread-safe data stream. Only one type of active object is used. Its purpose is to read ten elements of one of the C++ base types from a data input stream, ensuring that the ten elements are read in one synchronized operation.
The complete example is located in directory ...\examples\stream in the file dataFilteredRead.cpp. The code for the active object class is presented below:
template <class T>
class readABunch {
public:
readABunch(RWDataInputStream& stream) // 1
:dataInputStream_(stream)
{
thread_= rwMakeThreadFunctionM(readABunch<T>,*this,
void,&readABunch<T>::func); // 2
thread_.start();
}
~readABunch() {
thread_.join(); // 3
thread_.raise();
}
void func() {
RWDataInputStream tmpStream =
RWGuardedDataInputStreamImp::make(dataInputStream_); // 4
for(int i=0; i<10; i++) {
tmpStream >> data_; // 5
cout << data_ << ' ';
rwYield();
}
}
private:
readABunch(const readABunch<T>&);
readABunch<T>& operator=(const readABunch<T>&);
RWDataInputStream dataInputStream_;
T data_;
RWThreadFunction thread_;
};