/////////////////////////////////////////////////////////////////
// sample 12 : active object(actor)
// 非同期な要求を受理する能動的オブジェクト
// バックグラウンドでworker threadが1つ走っていて、
// そいつが処理を行なう
namespace test12 {
// スマートポインタいちいち書くの面倒
#define p(T) shared_ptr<T>
// 引数いちいち書くの面倒
#define p_(T) const shared_ptr<T>&
// Java風にしておこう
#define Object shared_ptr<void>
#define Object_ const shared_ptr<void>&
struct IResult {
// futureパターンで返し値をいただくための基底クラス
virtual Object getResultValue()=0;
};
struct CRealResult : public IResult {
// 返し値を保持するためのクラス
CRealResult(Object_ resultValue){
this->resultValue = resultValue;
}
Object getResultValue() {
return resultValue;
}
protected:
Object resultValue;
};
struct CFutureResult : public IResult {
// futureパターンでResultを返す
CFutureResult():ready(false){}
void setResult(p_(IResult) result) {
synchronized g(&lock);
this->result = result;
this->ready = true;
lock.notifyAll();
}
Object getResultValue() {
synchronized g(&lock);
while (!ready) {
try {
lock.wait();
} catch (CInterruptedException&) {
}
}
return result->getResultValue();
}
protected:
CLockObject lock;
p(IResult) result;
bool ready;
};
/* // ↑のfutureクラスを使うためのサンプル
void test1(){
Object obj(new string("あびばびば〜"));
p(IResult) real(new CRealResult(obj));
p(CFutureResult) result(new CFutureResult);
// このresultに対してgetResultValueを行なうと
// 他のスレッドがsetResultするまで待たされる
// a.「引換券」と交換する「データ」を渡す
result->setResult(real);
// b.「引換券」と「データ」を交換する
obj = result->getResultValue();
// 「データ」をunboxingする
string str = *smart_ptr_static_cast<string>(obj);
CDbg().Out(str);
}
*/
struct IActiveObject {
// このクラスが、active object基底クラス
// このクラスのメソッドは非同期に呼び出される
virtual void displayString(const string&) = 0;
// 要求1.
virtual p(IResult) makeString(int count, char fillchar) = 0;
// 要求2.結果は、futureパターンでもらう
virtual void shutdown() =0;
// シャットダウンリクエスト(これが来たらactive objectを停止する)
};
struct CServant : public IActiveObject {
// これは、アクティブオブジェクトの実装例
virtual p(IResult) makeString(int count, char fillchar) {
smart_ptr<char> buffer;
buffer.AddArray(count+1);
buffer[count] = '\0';
for (int i = 0; i < count; i++) {
buffer[i] = fillchar;
try {
IThread::getThread()->sleep(100);
} catch (CInterruptedException&) {
}
}
Object obj(new string(buffer.get()));
// stringをObject型にboxing
return p(IResult)(new CRealResult(obj));
}
virtual void displayString(const string&str) {
try {
CDbg().Out("displayString: " + str);
// CThread()->getMyThread()->sleep(10);
} catch (CInterruptedException &) {
}
}
virtual void shutdown() {}
};
struct IMethodRequest {
// active objectに投げられる要求を保存しておくための基底クラス
virtual void execute()=0;
// これで実行する
protected:
IMethodRequest(p_(CServant) servant,p_(CFutureResult) future) {
this->servant = servant;
this->future = future;
}
p(CServant) servant;
p(CFutureResult) future;
};
struct CActivationQueue {
// スケジュールを保持するためのキュー
enum { MAX_METHOD_REQUEST = 100 }; // 最大リクエスト数
void putRequest(const shared_ptr<IMethodRequest>& request) {
synchronized g(&lock);
while (MAX_METHOD_REQUEST < requestQueue.size()) {
// これ以上queuing不可能なら仕方ないので待つ
lock.wait();
}
requestQueue.push_back(request);
lock.notifyAll();
}
p(IMethodRequest) takeRequest() {
synchronized g(&lock);
while (requestQueue.size() <= 0) {
// 仕事がないなら仕方ないので待つ
lock.wait();
}
p(IMethodRequest) request = *requestQueue.begin();
requestQueue.pop_front();
lock.notifyAll();
return request;
}
protected:
CLockObject lock;
list<p(IMethodRequest) > requestQueue;
};
struct CSchedulerThread : public CThread {
// スケジュールを保持しておき順番に実行するスレッド
CSchedulerThread(p_(CActivationQueue) queue) {
this->queue = queue;
}
// スケジューラーに要求を積む
void invoke(p_(IMethodRequest) request) {
queue->putRequest(request);
}
virtual void ThreadProc() {
try {
while (true) {
p(IMethodRequest) request = queue->takeRequest();
request->execute();
// キューに積まれている内容をひとつずつ実行するだけで良い
} // 終了するときはこのスレッドに対して割り込みがかかる
} catch (CInterruptedException&){
// shutdownによってIThread::interruptが呼び出され、その結果
// このスレッドがCLockObject::waitしていればそこでinterrupt例外が
// 発生する。(よってここに抜けてくる)
}
CDbg().Out("ActiveObjectを終了しました");
}
virtual void shutdown() {
interrupt();
// このスレッドに対してCInterruptedExceptionを発生させる
}
protected:
p(CActivationQueue) queue;
};
struct CMakeStringRequest : public IMethodRequest {
// 要求1をIMethodRequest派生クラスにしたもの
CMakeStringRequest(p_(CServant) servant,p_(CFutureResult) future,
int count, char fillchar)
: IMethodRequest(servant, future)
{
this->count = count;
this->fillchar = fillchar;
}
void execute() {
p(IResult) result = servant->makeString(count, fillchar);
future->setResult(result);
// futureパターンで受け渡しする
}
protected:
int count;
char fillchar;
};
struct CDisplayStringRequest : public IMethodRequest {
CDisplayStringRequest(p_(CServant) servant,const string& str)
: IMethodRequest(servant,p(CFutureResult)())
{
this->str = str;
}
void execute() {
servant->displayString(str);
}
protected:
string str;
};
struct CProxy : public IActiveObject {
CProxy(p_(CSchedulerThread) scheduler,p_(CServant) servant) {
this->scheduler = scheduler;
this->servant = servant;
}
virtual p(IResult) makeString(int count, char fillchar) {
p(CFutureResult) future(new CFutureResult());
scheduler->invoke(p(IMethodRequest)
(new CMakeStringRequest(servant, future, count, fillchar))
);
// スケジューラーにお願いして、futureパターンで
// 引換券だけ渡して制御をすぐに戻す
return smart_ptr_static_cast<IResult>(future);
// IResultに変換して戻す必要あり
}
virtual void displayString(const string& str) {
scheduler->invoke(p(IMethodRequest)
(new CDisplayStringRequest(servant, str))
);
// スケジューラーにお願いしてすぐに制御を戻す
}
virtual void shutdown(){
// スケジューラにシャットダウン要請
scheduler->shutdown();
}
protected:
p(CSchedulerThread) scheduler;
p(CServant) servant;
};
struct CMakerClientThread : public CThread {
CMakerClientThread(const string&name,p_(IActiveObject)actor)
:name_(name),actor_(actor){}
virtual void ThreadProc() {
try {
for(int i=1;i<=5;++i){
p(IResult) result = actor_->makeString(i,'C');
// 戻り値つきのactive objectの呼び出し
Object obj = result->getResultValue();
const string& str = *smart_ptr_static_cast<string>(obj);
// unboxing
// (Object型に封入されたstring型オブジェクトを取り出す)
// string str = **(smart_ptr<string>*)(obj.get());
// でもいいのだがコピーコストがもったいないので
CDbg().Out(name_ + "は" + str + "を作った");
IThread::getThread()->sleep(200);
}
} catch(CInterruptedException&){
// sleep中に割り込み例外が発生した?
}
}
protected:
string name_;
p(IActiveObject) actor_;
};
struct CDisplayClientThread : public CThread {
CDisplayClientThread(const string&name,p_(IActiveObject)actor)
:name_(name),actor_(actor){}
virtual void ThreadProc() {
try {
for(int i=1;i<=10;++i){
string data = name_ + "は"
+ CStringScanner::NumToString(i) + "を表示する";
actor_->displayString(data);
IThread::getThread()->sleep(200);
// このsleep中に割り込み例外が発生する(かも)
}
} catch(CInterruptedException&){
// sleep中に割り込み例外が発生した?
}
}
protected:
string name_;
p(IActiveObject) actor_;
};
struct CActiveObjectFactory {
static p(IActiveObject) createActiveObject() {
p(CServant) servant(new CServant());
// active object
p(CActivationQueue) queue(new CActivationQueue());
// 要求queue
p(CSchedulerThread) scheduler(new CSchedulerThread(queue));
// 要求を順番に保持しておくためのスレッド
// proxyオブジェクトが、スケジューラ(とactive object)に橋渡しする
p(IActiveObject) proxy(new CProxy(scheduler, servant));
CThreadManager::CreateThread(
smart_ptr_static_cast<IThread>(scheduler));
// ↑shared_ptr<CSchedulerThread>をshared_ptr<IThread>にupcastする
return proxy;
}
};
struct CShutdownRequester : public CThread {
// 10秒後にshutdown requestを出す
CShutdownRequester(p_(IActiveObject) actor){
this->actor = actor;
}
virtual void ThreadProc() {
sleep(10000);
CDbg().Out("shutdown要求をだします");
actor->shutdown();
}
protected:
p(IActiveObject) actor;
};
void Sample()
{
p(IActiveObject) actor(CActiveObjectFactory::createActiveObject());
CThreadManager::CreateThread(new CMakerClientThread("デジコ",actor));
CThreadManager::CreateThread(new CMakerClientThread("シュガー",actor));
CThreadManager::CreateThread(new CDisplayClientThread("ゲマゲマ",actor));
CThreadManager::CreateThread(new CShutdownRequester(actor));
}
// 変なdefineしたのをはずしておこう
#undef p
#undef p_
#undef Object
#undef Object_
} // end of namespace test12
|