RSB  0.19.0
IntrospectionSender.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is part of the RSB project
4  *
5  * Copyright (C) 2014, 2015 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "IntrospectionSender.h"
28 
29 #include <boost/format.hpp>
30 
31 #include <rsc/misc/UUID.h>
32 
33 #include <rsb/protocol/introspection/Hello.pb.h>
34 #include <rsb/protocol/introspection/Bye.pb.h>
35 
36 #include "../EventId.h"
37 #include "../MetaData.h"
38 #include "../Handler.h"
39 #include "../Factory.h"
40 
41 #include "Types.h"
42 #include "Model.h"
43 
44 namespace rsb {
45 namespace introspection {
46 
47 // Internal
48 // @author jmoringe
49 struct QueryHandler : public Handler {
51  : sender(sender) {
52  }
53 
54  void handle(EventPtr query) {
55  RSCDEBUG(this->sender.logger, "Processing introspection query " << query);
56 
57  boost::mutex::scoped_lock lock(sender.mutex);
58 
59  if (query->getType() == rsc::runtime::typeName<void>()) {
60  if (query->getMethod() == "SURVEY") { // TODO check scope
61  handleSurvey(query);
62  } else if (query->getMethod() == "REQUEST") {
63  handleRequest(query);
64  } else {
65  RSCWARN(this->sender.logger, "Introspection query not understood: " << query);
66  }
67  } else if ((query->getType() == rsc::runtime::typeName<std::string>())
68  && (*boost::static_pointer_cast<std::string>(query->getData())
69  == "ping")) {
70  handlePing(query);
71  }
72  }
73 
74  void handleSurvey(EventPtr query) {
75  for (IntrospectionSender::ParticipantList::const_iterator it
76  = this->sender.participants.begin();
77  it != this->sender.participants.end();
78  ++it) {
79  this->sender.sendHello(*it, query);
80  }
81  }
82 
83  void handleRequest(EventPtr query) {
84  std::string idString
85  = (query->getScope().getComponents()
86  [query->getScope().getComponents().size() - 1]);
87  rsc::misc::UUID id(idString);
88  for (IntrospectionSender::ParticipantList::const_iterator it
89  = this->sender.participants.begin();
90  it != this->sender.participants.end(); ++it) {
91  if (it->getId() == id) {
92  this->sender.sendHello(*it, query);
93  break;
94  }
95  }
96  }
97 
98  void handlePing(EventPtr query) {
99  for (IntrospectionSender::ParticipantList::const_iterator it
100  = this->sender.participants.begin();
101  it != this->sender.participants.end();
102  ++it) {
103  sendPong(*it, query);
104  }
105  }
106 
107  void sendPong(const ParticipantInfo& participant, EventPtr /*query*/) {
108  EventPtr pongEvent(new Event());
109  pongEvent->setScope(this->sender.informer->getScope()->concat(participant.getId().getIdAsString()));
110  pongEvent->setType(rsc::runtime::typeName<std::string>());
111  pongEvent->setData(boost::shared_ptr<std::string>(new std::string("pong")));
112  this->sender.informer->publish(pongEvent);
113  }
114 
116 };
117 
118 // Internal
119 // @author jmoringe
121  EventPtr call(const std::string& /*methodName*/, EventPtr request) {
122  EventPtr reply(new Event(request->getScope(),
123  request->getData(),
124  request->getType()));
125  reply->mutableMetaData().setUserTime("request.send",
126  request->getMetaData().getSendTime());
127  reply->mutableMetaData().setUserTime("request.receive",
128  request->getMetaData().getReceiveTime());
129  return reply;
130  }
131 };
132 
133 IntrospectionSender::IntrospectionSender(boost::shared_ptr<std::string> processDisplayName,
134  const ParticipantConfig& listenerConfig,
135  const ParticipantConfig& informerConfig)
136  : logger(rsc::logging::Logger::getLogger("rsb.introspection.IntrospectionSender")),
137  processDisplayName(processDisplayName),
139  listenerConfig)),
140  informer(getFactory().createInformerBase(INTROSPECTION_PARTICIPANTS_SCOPE,
141  "", informerConfig)),
143  .concat(boost::str(boost::format("/%1%/%2%")
144  % host.getId() % process.getPid())),
145  listenerConfig, informerConfig)) {
146  listener->addHandler(HandlerPtr(new QueryHandler(*this)));
147  server->registerMethod("echo", patterns::LocalServer::CallbackPtr(new EchoCallback()));
148 }
149 
151  ParticipantPtr parent) {
152  RSCDEBUG(this->logger, boost::format("Adding participant %1%") % participant);
153 
154  boost::mutex::scoped_lock lock(this->mutex);
155 
156  ParticipantInfo info(participant->getKind(),
157  participant->getId(),
158  (parent ? parent->getId() : rsc::misc::UUID(false)),
159  *participant->getScope(),
160  "TODO", // TODO type
161  participant->getTransportURLs());
162  this->participants.push_back(info);
163 
164  sendHello(info);
165 }
166 
168  RSCDEBUG(this->logger, boost::format("Removing participant %1%") % participant);
169 
170  boost::mutex::scoped_lock lock(this->mutex);
171 
172  ParticipantList::iterator it;
173  for (it = this->participants.begin(); it != this->participants.end(); ++it) {
174  if (it->getId() == participant.getId()) {
175  break;
176  }
177  }
178  if (it == this->participants.end()) {
179  RSCWARN(this->logger, boost::format("Trying to remove unknown participant %1%")
180  % participant);
181  } else {
182  sendBye(*it);
183 
184  this->participants.erase(it);
185  }
186 
187  RSCDEBUG(this->logger, boost::format("%1% participant(s) remain(s)")
188  % this->participants.size());
189 
190  return !this->participants.empty();
191 }
192 
193 const boost::posix_time::ptime UNIX_EPOCH
194  = boost::posix_time::ptime(boost::gregorian::date(1970, boost::date_time::Jan, 1));
195 
197  EventPtr query) {
198  boost::shared_ptr<rsb::protocol::introspection::Hello> hello(
199  new rsb::protocol::introspection::Hello());
200 
201  // Add participant information.
202  hello->set_id(participant.getId().getId().data,
203  participant.getId().getId().size());
204  if (participant.getParentId() != rsc::misc::UUID(false)) {
205  hello->set_parent(participant.getParentId().getId().data,
206  participant.getParentId().getId().size());
207  }
208  hello->set_kind(participant.getKind());
209  hello->set_scope(participant.getScope().toString());
210  for (std::set<std::string>::const_iterator it
211  = participant.getTransportURLs().begin();
212  it != participant.getTransportURLs().end(); ++it) {
213  hello->add_transport(*it);
214  }
215 
216  // Add process information.
217  rsb::protocol::operatingsystem::Process* process
218  = hello->mutable_process();
219  process->set_id(boost::lexical_cast<std::string>(this->process.getPid()));
220  process->set_program_name(this->process.getProgramName());
221  std::vector<std::string> arguments = this->process.getArguments();
222  for (std::vector<std::string>::const_iterator it = arguments.begin();
223  it != arguments.end(); ++it) {
224  process->add_commandline_arguments(*it);
225  }
226  process->set_start_time((this->process.getStartTime() - UNIX_EPOCH)
227  .total_microseconds());
228  process->set_rsb_version(this->process.getRSBVersion());
229  if (!this->process.getExecutingUser().empty()) {
230  process->set_executing_user(this->process.getExecutingUser());
231  }
232  if (this->processDisplayName) {
233  process->set_display_name(*this->processDisplayName);
234  }
235 
236  // Add host information.
237  rsb::protocol::operatingsystem::Host* host = hello->mutable_host();
238  if (!this->host.getId().empty()) {
239  host->set_id(this->host.getId());
240  }
241  host->set_hostname(this->host.getHostname());
242  if (!this->host.getMachineType().empty()) {
243  host->set_machine_type(this->host.getMachineType());
244  }
245  if (!this->host.getMachineVersion().empty()) {
246  host->set_machine_version(this->host.getMachineVersion());
247  }
248  if (!this->host.getSoftwareType().empty()) {
249  host->set_software_type(this->host.getSoftwareType());
250  }
251  if (!this->host.getSoftwareVersion().empty()) {
252  host->set_software_version(this->host.getSoftwareVersion());
253  }
254 
255  // Construct event.
256  EventPtr helloEvent(new Event());
257  helloEvent->setScope(this->informer->getScope()->concat(boost::str(boost::format("/%1%")
258  % participant.getId().getIdAsString())));
259  helloEvent->setData(hello);
260  helloEvent->setType(rsc::runtime::typeName(*hello.get()));
261  if (query) {
262  helloEvent->addCause(query->getId());
263  }
264 
265  this->informer->publish(helloEvent);
266 }
267 
269  boost::shared_ptr<rsb::protocol::introspection::Bye> bye(
270  new rsb::protocol::introspection::Bye());
271  bye->set_id(participant.getId().getId().data,
272  participant.getId().getId().size());
273  EventPtr byeEvent(new Event());
274  byeEvent->setScope(this->informer->getScope()
275  ->concat(boost::str(boost::format("/%1%")
276  % participant.getId().getIdAsString())));
277  byeEvent->setData(bye);
278  byeEvent->setType(rsc::runtime::typeName(*bye.get()));
279  this->informer->publish(byeEvent);
280 }
281 
282 }
283 }
Instances of this class store information about participants in the current process.
Definition: Model.h:53
patterns::LocalServerPtr createLocalServer(const Scope &scope)
Creates and returns a Server object that exposes methods under the Scope scope.
const Scope INTROSPECTION_PARTICIPANTS_SCOPE
const std::string & getKind() const
Definition: Model.cpp:60
EventPtr call(const std::string &, EventPtr request)
Basic message that is exchanged between informers and listeners.
Definition: Event.h:60
Objects of this class participate in the exchange of notifications on one channel of the bus...
Definition: Participant.h:65
boost::shared_ptr< std::string > processDisplayName
Instances of this class publish information about the local host, the current process and its partici...
Asynchronously called handler interface on the client level.
Definition: Handler.h:53
void sendHello(const ParticipantInfo &participant, EventPtr query=EventPtr())
const std::string & toString() const
Reconstructs a fully formal string representation of the scope with leading an trailing slashes...
Definition: Scope.cpp:143
void handle(EventPtr query)
Handle event.
const std::set< std::string > & getTransportURLs() const
Definition: Model.cpp:80
Callback class with receives and returns events.
Definition: LocalServer.h:78
const rsc::misc::UUID & getParentId() const
Definition: Model.cpp:68
Factory & getFactory()
Returns a factory for client-level RSB objects.
Definition: Factory.cpp:163
void addParticipant(ParticipantPtr participant, ParticipantPtr parent)
const boost::posix_time::ptime UNIX_EPOCH
const std::vector< std::string > & getArguments() const
Definition: Model.cpp:151
void sendPong(const ParticipantInfo &participant, EventPtr)
boost::shared_ptr< Participant > ParticipantPtr
Definition: Participant.h:122
const Scope & getScope() const
Definition: Model.cpp:72
boost::shared_ptr< IntlCallback > CallbackPtr
Definition: LocalServer.h:71
ListenerPtr createListener(const Scope &scope, const ParticipantConfig &config)
Creates and returns a new Listener for the Scope scope.
boost::shared_ptr< Handler > HandlerPtr
Definition: Handler.h:92
rsb::patterns::LocalServerPtr server
bool removeParticipant(const Participant &participant)
A class describing the configuration of Participant instances.
QueryHandler(IntrospectionSender &sender)
IntrospectionSender(boost::shared_ptr< std::string > processDisplayName, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig())
rsc::misc::UUID getId() const
Returns the unique id of the participant.
Definition: Participant.cpp:60
boost::shared_ptr< Event > EventPtr
Definition: Event.h:264
const Scope INTROSPECTION_HOSTS_SCOPE
void sendBye(const ParticipantInfo &participant)
const rsc::misc::UUID & getId() const
Definition: Model.cpp:64