32 #include <boost/format.hpp>
33 #include <boost/thread/mutex.hpp>
35 #include <rsc/runtime/TypeStringTools.h>
36 #include <rsc/misc/UUID.h>
38 #include "../Factory.h"
39 #include "../Handler.h"
40 #include "../MetaData.h"
41 #include "../EventId.h"
45 using namespace boost;
47 using namespace rsc::runtime;
48 using namespace rsc::logging;
49 using namespace rsc::threading;
70 return "WaitingEventHandler";
79 || event->getCauses().empty()
80 || (
event->getMethod() !=
"REPLY")) {
81 RSCTRACE(logger,
"Received uninteresting event " << event);
84 EventId requestId = *
event->getCauses().begin();
86 MutexType::scoped_lock lock(mutex);
88 if (!this->inprogress.count(requestId)) {
89 RSCTRACE(logger,
"Received uninteresting event " << event);
93 RSCDEBUG(logger,
"Received reply event " << event);
96 if (event->mutableMetaData().hasUserInfo(
"rsb:error?")) {
97 assert(event->getType() == typeName<string>());
98 result->setError(str(format(
"Error calling remote method '%1%': %2%")
99 %
"TODO: obtain method name"
100 % *(boost::static_pointer_cast<string>(event->getData()))));
104 this->inprogress.erase(requestId);
109 MutexType::scoped_lock lock(this->mutex);
110 this->inprogress.insert(make_pair(requestId, result));
115 RemoteServer::RemoteServer(
const Scope& scope,
121 format(
"rsb.patterns.RemoteServer[%1%]")
122 % scope.toString()))), scope(scope), listenerConfig(
123 listenerConfig), informerConfig(informerConfig) {
133 const string& sendType) {
141 Scope(
"/" + methodName));
142 ListenerPtr listener = Factory::getInstance().createListener(replyScope,
145 boost::shared_ptr<WaitingEventHandler> handler(
147 listener->addHandler(handler);
157 set.sendType = sendType;
158 set.handler = handler;
159 set.replyListener = listener;
160 set.requestInformer = informer;
166 if (
methodSets[methodName].sendType != sendType) {
167 throw runtime_error(
"Illegal send type. Method previously accepted "
168 +
methodSets[methodName].sendType +
" but now " + sendType
178 RSCDEBUG(
logger,
"Calling method " << methodName <<
" with data " << data);
184 WaitingEventHandler::MutexType::scoped_lock
185 lock(methodSet.
handler->getMutex());
188 data->setMethod(
"REQUEST");
190 result.reset(
new Future<EventPtr>());
191 methodSet.
handler->addCall(data->getEventId(), result);
199 unsigned int maxReplyWaitTime) {
200 return callAsync(methodName, data)->get(maxReplyWaitTime);