An Example Using the Threads Module and the DB Interface Module
SourcePro DB is designed to work with the other SourcePro products to supply database access. The following example combines class
RWDBDatabase, from the DB Interface Module of SourcePro DB, with class
RWServerPool, from the Threads Module of SourcePro Core, to create a higher-level abstraction, the multithreaded class
QueryServer. In just a few lines of code, this class manages a controlled number of threads that can perform database queries for the whole application.
In the code using
QueryServer, the server is constructed with an
RWDBDatabase object and the number of threads to be used at one time for database queries is specified. Following that, application code can submit queries to the
QueryServer using its
enqueue() method. Internally,
QueryServer manages its threads using
RWServerPool, and asynchronously gives results to users via the
RWTIOUResult template class. When the application is done with the
QueryServer, its destructor cleans up all the worker threads it created, and automatically frees up any used connections to the database.
QueryServer shows one way of easily wrapping up multithreaded database programming chores into an easy-to-use class for other programmers. However, the usage in main() does not exhibit its full capability. If you submitted 20 queries, the server would queue them and then send them to the database as connections are freed. This procedure allows you to keep strict control over the access and the number of connections made to your database.
Example 3 – Using the Threads Module and the DB Interface Module
// Header file for class QueryServer
// From the DB Interface Module
#include <rw/db/db.h>
// From the Threads Module
#include <rw/functor/rwBind.h>
#include <rw/itc/RWTIOUResult.h>
#include <rw/thread/RWRunnableSelf.h> // for rwSleep()
#include <rw/thread/RWServerPool.h>
#include <rw/thread/RWTRunnableIOUFunction.h>
// From the Essential Tools Module
#include <rw/tools/bustring.h>
// From the C++ Standard Library
#include <iostream>
using namespace std;
#define DBSERVER_TYPE "<servertype>"
#define DBSERVER_NAME "<servername>"
#define DBUSER_NAME "<username>"
#define DBPASSWORD "<password>"
#define DBDATABASE_NAME "<database>"
#define DBDATABASE_PSTRING "<property_string>"
class QueryServer
{
public:
QueryServer(const RWDBDatabase& db, unsigned concurrency)
: database_(db), pool_(RWServerPool::make(concurrency)) {
// Set the database's connection pool size to our
// concurrency level for better performance.
database_.defaultConnections(concurrency);
// Start the pool running to accept requests.
pool_.start();
}
~QueryServer() {
// Wait for the pool to finish processing its queue.
pool_.stop();
pool_.join();
}
RWTIOUResult<RWDBResult> enqueue(const RWCString& sql) {
typedef RWTRunnableIOUFunction<RWDBResult> Runnable;
// Make a new runnable to handle the request and
// throw it on the queue.
Runnable rf =
Runnable::make(rwBind(&QueryServer::serviceFunction,
this, sql));
pool_.enqueue(rf);
return rf.result();
}
private:
RWDBDatabase database_;
RWServerPool pool_;
// Service function enqueued on the RWServerPool to handle
// the request. Private, because users should use enqueue()
// to place a query on the queue.
RWDBResult serviceFunction(const RWCString& sql) {
// Grab a connection, go for it.
return database_.connection().executeSql(sql);
}
// Not defined on purpose
QueryServer(const QueryServer&);
QueryServer& operator=(const QueryServer&);
};
void outputStatus(const RWDBStatus& aStatus)
{
if (aStatus.errorCode() == 0) {
return;
}
// Print out the error.
cout << "Error code: " << (int) aStatus.errorCode() << endl
<< "Error message " << aStatus.message() << endl
<< "Is terminal: " << (aStatus.isTerminal() ? "Yes" : "No")
<< endl
<< "Vendor error 1: " << aStatus.vendorError1() << endl
<< "Vendor error 2 : " << aStatus.vendorError2() << endl
<< "Vendor message 1: " << aStatus.vendorMessage1() << endl
<< "Vendor message 2: " << aStatus.vendorMessage2() << endl;
}
int main()
{
RWDBManager::setErrorHandler(outputStatus);
RWDBDatabase db = RWDBManager::database(DBSERVER_TYPE,
DBSERVER_NAME,
DBUSER_NAME,
DBPASSWORD,
DBDATABASE_NAME,
DBDATABASE_PSTRING);
RWDBTracer& trc = db.tracer();
trc.setOn(RWDBTracer::SQL);
trc.stream(cout);
RWDBConnection conn = db.connection();
// Create and populate table
{
RWDBSchema schema;
schema.appendColumn("col1", RWDBValue::UString, 20);
db.createTable("testServ", schema, conn);
}
RWDBTable table = db.table("testServ");
RWDBInserter inserter = table.inserter(5);
inserter << RWBasicUString("Oregon State Beavers");
inserter.execute();
inserter << RWBasicUString("Colorado Buffaloes");
inserter.execute();
inserter.flush();
// Create QueryServer
QueryServer server(db, 5);
RWCString query("SELECT * FROM testServ");
RWTIOUResult<RWDBResult> resultIOU = server.enqueue(query);
// Poll until redeemable
while (!resultIOU.redeemable()) {
cout << "Waiting for result..." << endl;
::rwSleep(50);
}
// Obtain RWDBResult object from RWTIOUResult
RWDBResult res = resultIOU.redeem();
// Create RWDBReader
RWDBReader rdr = res.table().reader();
// Output data from query
RWBasicUString output;
// Create a file to write out the results
RWFile fileOut("queryServerOut.txt");
while (rdr()) {
rdr >> output;
// Output UString
fileOut << output;
// Print out UTF 8 version of UString
cout << output.toUtf8() << endl;
}
// Cleanup
table.drop();
return 0;
}
Program output:
06/13/2013 10:49:17 SQL> CREATE TABLE testServ (col1 NVARCHAR(20))
06/13/2013 10:49:17 SQL> INSERT INTO testServ VALUES ( ? )
06/13/2013 10:49:17 SQL> INSERT INTO testServ VALUES ( ? )
Waiting for result...
06/13/2013 10:49:17 SQL> SELECT * FROM testServ
Oregon State Beavers
Colorado Buffaloes
06/13/2013 10:49:17 SQL> DROP TABLE testServ