RSB  0.7.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RemoteServer.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is a part of RSB project
4  *
5  * Copyright (C) 2010 by Johannes Wienke <jwienke at techfak dot uni-bielefeld dot de>
6  * 2011 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
7  *
8  * This file may be licensed under the terms of the
9  * GNU Lesser General Public License Version 3 (the ``LGPL''),
10  * or (at your option) any later version.
11  *
12  * Software distributed under the License is distributed
13  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
14  * express or implied. See the LGPL for the specific language
15  * governing rights and limitations.
16  *
17  * You should have received a copy of the LGPL along with this
18  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
19  * or write to the Free Software Foundation, Inc.,
20  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21  *
22  * The development of this software was supported by:
23  * CoR-Lab, Research Institute for Cognition and Robotics
24  * Bielefeld University
25  *
26  * ============================================================ */
27 
28 #include "RemoteServer.h"
29 
30 #include <stdexcept>
31 
32 #include <boost/format.hpp>
33 #include <boost/thread/mutex.hpp>
34 
35 #include <rsc/runtime/TypeStringTools.h>
36 #include <rsc/misc/UUID.h>
37 
38 #include "../Factory.h"
39 #include "../Handler.h"
40 #include "../MetaData.h"
41 #include "../EventId.h"
42 
43 using namespace std;
44 
45 using namespace boost;
46 
47 using namespace rsc::runtime;
48 using namespace rsc::logging;
49 using namespace rsc::threading;
50 
51 namespace rsb {
52 namespace patterns {
53 
55 public:
56  typedef boost::recursive_mutex MutexType;
57 private:
58  LoggerPtr logger;
59 
61 
62  map<EventId, RemoteServer::FuturePtr> inprogress;
63 public:
64 
65  WaitingEventHandler(LoggerPtr logger) :
66  logger(logger) {
67  }
68 
69  string getClassName() const {
70  return "WaitingEventHandler";
71  }
72 
74  return this->mutex;
75  }
76 
77  void handle(EventPtr event) {
78  if (!event
79  || event->getCauses().empty()
80  || (event->getMethod() != "REPLY")) {
81  RSCTRACE(logger, "Received uninteresting event " << event);
82  return;
83  }
84  EventId requestId = *event->getCauses().begin();
85  {
86  MutexType::scoped_lock lock(mutex);
87 
88  if (!this->inprogress.count(requestId)) {
89  RSCTRACE(logger, "Received uninteresting event " << event);
90  return;
91  }
92 
93  RSCDEBUG(logger, "Received reply event " << event);
94 
95  RemoteServer::FuturePtr result = this->inprogress[requestId];
96  if (event->mutableMetaData().hasUserInfo("rsb:error?")) {
97  assert(event->getType() == typeName<string>());
98  result->setError(str(format("Error calling remote method '%1%': %2%")
99  % "TODO: obtain method name"
100  % *(boost::static_pointer_cast<string>(event->getData()))));
101  } else {
102  result->set(event);
103  }
104  this->inprogress.erase(requestId);
105  }
106  }
107 
108  void addCall(const EventId& requestId, RemoteServer::FuturePtr result) {
109  MutexType::scoped_lock lock(this->mutex);
110  this->inprogress.insert(make_pair(requestId, result));
111  }
112 
113 };
114 
115 RemoteServer::RemoteServer(const Scope& scope,
116  const ParticipantConfig &listenerConfig,
117  const ParticipantConfig &informerConfig) :
118  logger(
119  Logger::getLogger(
120  str(
121  format("rsb.patterns.RemoteServer[%1%]")
122  % scope.toString()))), scope(scope), listenerConfig(
123  listenerConfig), informerConfig(informerConfig) {
124  // TODO check that this server is alive...
125  // TODO probably it would be a good idea to request some method infos from
126  // the server, e.g. for type checking
127 }
128 
130 }
131 
133  const string& sendType) {
134 
135  boost::mutex::scoped_lock lock(methodSetMutex);
136 
137  if (!methodSets.count(methodName)) {
138 
139  // start a listener to wait for the reply
140  const Scope replyScope = scope.concat(Scope("/reply")).concat(
141  Scope("/" + methodName));
142  ListenerPtr listener = Factory::getInstance().createListener(replyScope,
144 
145  boost::shared_ptr<WaitingEventHandler> handler(
147  listener->addHandler(handler);
148 
149  // informer for requests
150  Informer<void>::Ptr informer = Factory::getInstance().createInformer<
151  void>(
152  scope.concat(Scope("/request")).concat(Scope("/" + methodName)),
153  informerConfig, sendType);
154 
155  MethodSet set;
156  set.methodName = methodName;
157  set.sendType = sendType;
158  set.handler = handler;
159  set.replyListener = listener;
160  set.requestInformer = informer;
161 
162  methodSets[methodName] = set;
163 
164  }
165 
166  if (methodSets[methodName].sendType != sendType) {
167  throw runtime_error("Illegal send type. Method previously accepted "
168  + methodSets[methodName].sendType + " but now " + sendType
169  + " was requested");
170  }
171 
172  return methodSets[methodName];
173 
174 }
175 
177 
178  RSCDEBUG(logger, "Calling method " << methodName << " with data " << data);
179 
180  // TODO check that the desired method exists
181  MethodSet methodSet = getMethodSet(methodName, data->getType());
182  FuturePtr result;
183  {
184  WaitingEventHandler::MutexType::scoped_lock
185  lock(methodSet.handler->getMutex());
186 
187  data->setScopePtr(methodSet.requestInformer->getScope());
188  data->setMethod("REQUEST");
189  methodSet.requestInformer->publish(data);
190  result.reset(new Future<EventPtr>());
191  methodSet.handler->addCall(data->getEventId(), result);
192  }
193 
194  return result;
195 }
196 
197 EventPtr RemoteServer::call(const string& methodName,
198  EventPtr data,
199  unsigned int maxReplyWaitTime) {
200  return callAsync(methodName, data)->get(maxReplyWaitTime);
201 }
202 
203 }
204 }