RSB  0.19.0
BusImpl.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is part of the RSB project
4  *
5  * Copyright (C) 2011-2018 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
27 #include "BusImpl.h"
28 
29 #include <boost/asio/ip/tcp.hpp>
30 
31 #include <rsc/runtime/ContainerIO.h>
32 
33 #include "../../MetaData.h"
34 
35 #include "InConnector.h"
36 
37 using namespace std;
38 
39 using namespace rsc::logging;
40 
41 using namespace boost::asio;
42 using boost::asio::ip::tcp;
43 
44 namespace rsb {
45 namespace transport {
46 namespace socket {
47 
48 BusImpl::BusImpl(AsioServiceContextPtr asioService, bool tcpnodelay) :
49  logger(Logger::getLogger("rsb.transport.socket.BusImpl")),
50  asioService(asioService), tcpnodelay(tcpnodelay) {
51 }
52 
54  RSCDEBUG(logger, "Destructing bus instance");
55 
56  // Sinks should be empty.
57  if (!this->sinkDispatcher.empty()) {
58  RSCWARN(logger, "" << this->sinkDispatcher.size() << " non-empty scopes when destructing");
59  }
60 
61  // Active connections hold a shared_ptr to themselves and would
62  // thus not be destructed. Disconnecting resolves this problem.
63  for (ConnectionList::iterator it = this->connections.begin();
64  it != this->connections.end(); ++it) {
65  try {
66  (*it)->shutdown();
67  } catch (const std::exception& e) {
68  RSCDEBUG(logger, "Failed to disconnect connection " << *it
69  << ": " << e.what());
70  }
71  }
72 
73  RSCDEBUG(logger, "BusImpl destruction finished");
74 }
75 
77  return this->asioService;
78 }
79 
80 bool BusImpl::isTcpnodelay() const {
81  return this->tcpnodelay;
82 }
83 
85  return this->connections;
86 }
87 
88 boost::recursive_mutex& BusImpl::getConnectionLock() {
89  return this->connectionLock;
90 }
91 
93  boost::recursive_mutex::scoped_lock lock(this->connectorLock);
94 
95  Scope scope = sink->getScope();
96  RSCDEBUG(logger, "Adding sink " << sink << " to scope " << scope);
97  this->sinkDispatcher.addSink(scope, sink);
98 }
99 
100 void BusImpl::removeSink(const InConnector* sink) {
101  boost::recursive_mutex::scoped_lock lock(this->connectorLock);
102 
103  Scope scope = sink->getScope();
104  RSCDEBUG(logger, "Removing sink " << sink << " from scope " << scope);
105  this->sinkDispatcher.removeSink(scope, sink);
106 }
107 
109  RSCDEBUG(logger, "Adding connection " << connection);
110 
111  boost::recursive_mutex::scoped_lock lock(this->connectionLock);
112 
113  this->connections.push_back(connection);
114 }
115 
117  RSCDEBUG(logger, "Removing connection " << connection);
118 
119  boost::recursive_mutex::scoped_lock lock(this->connectionLock);
120 
121  this->connections.remove(connection);
122 }
123 
124 // Cannot be a local struct in the handle() method since some
125 // compilers (or standard versions?) don't support that.
126 namespace {
127 
128 struct PoorPersonsLambda1 {
130  PoorPersonsLambda1(EventPtr event) : event(event) {}
131  void operator()(InConnector& sink) {
132  sink.handle(this->event);
133  }
134 };
135 
136 }
137 
139  // Dispatch to our own connectors.
140  RSCDEBUG(logger, "Delivering outgoing event to connectors " << event);
141 
142  vector<Scope> scopes = event->getScopePtr()->superScopes(true);
143  RSCDEBUG(logger, "Relevant scopes " << scopes);
144 
145  {
146  boost::recursive_mutex::scoped_lock lock(this->connectorLock);
147 
148  this->sinkDispatcher.mapSinks(event->getScope(),
149  PoorPersonsLambda1(event));
150  }
151 
152  // Dispatch to outgoing connections.
153  {
154  boost::recursive_mutex::scoped_lock lock(this->connectionLock);
155 
156  RSCDEBUG(logger, "Dispatching outgoing event " << event << " to connections");
157 
158  string wireSchema = event->getMetaData().getUserInfo("rsb.wire-schema");
159  list<BusConnectionPtr> failing;
160  for (list<BusConnectionPtr>::iterator it = this->connections.begin();
161  it != this->connections.end(); ++it) {
162  RSCDEBUG(logger, "Dispatching to connection " << *it);
163  try {
164  (*it)->sendEvent(event, wireSchema);
165  } catch (const std::exception& e) {
166  RSCWARN(logger, "Send failure (" << e.what() << "); will close connection later");
167  // We record failing connections instead of closing them
168  // immediately to avoid invalidating the iterator.
169  failing.push_back(*it);
170  }
171  }
172 
173  // This should remove all references to the connection objects.
174  for (list<BusConnectionPtr>::const_iterator it = failing.begin();
175  it != failing.end(); ++it) {
176  removeConnection(*it);
177  }
178  }
179 }
180 
181 // Cannot be a local struct in the handle() method since some
182 // compilers (or standard versions?) don't support that.
183 namespace {
184 
185 struct PoorPersonsLambda2 {
186  EventPtr event;
187  PoorPersonsLambda2(EventPtr event) : event(event) {};
188  void operator()(InConnector& sink) {
189  sink.handle(this->event);
190  }
191 };
192 
193 }
194 
196  BusConnectionPtr /*connection*/) {
197  RSCDEBUG(logger, "Delivering received event to connectors " << event);
198 
199  vector<Scope> scopes = event->getScopePtr()->superScopes(true);
200  RSCDEBUG(logger, "Relevant scopes " << scopes);
201 
202  {
203  boost::recursive_mutex::scoped_lock lock(this->connectorLock);
204 
205  this->sinkDispatcher.mapSinks(event->getScope(),
206  PoorPersonsLambda2(event));
207  }
208 }
209 
210 void BusImpl::printContents(ostream& stream) const {
211  stream << "connections = " << this->connections
212  << ", sinks = " << this->sinkDispatcher.size();
213 }
214 
215 const std::string BusImpl::getTransportURL() const {
216  assert(this->connections.size() == 1);
217  return (*this->connections.begin())->getTransportURL();
218 }
219 
220 }
221 }
222 }
virtual const std::string getTransportURL() const
Definition: BusImpl.cpp:215
virtual void addConnection(BusConnectionPtr connection)
Adds connection to the list of connections of the bus.
Definition: BusImpl.cpp:108
virtual void printContents(std::ostream &stream) const
Definition: BusImpl.cpp:210
AsioServiceContextPtr asioService
Definition: BusImpl.h:115
boost::shared_ptr< InConnector > InConnectorPtr
Definition: Bus.h:43
STL namespace.
EventPtr event
Definition: BusImpl.cpp:129
virtual void removeConnection(BusConnectionPtr connection)
Removes connection from the list of connections of this bus.
Definition: BusImpl.cpp:116
virtual void handle(EventPtr event)
Handle event.
Definition: BusImpl.cpp:138
boost::shared_ptr< AsioServiceContext > AsioServiceContextPtr
virtual void handle(EventPtr event)=0
Handle event.
boost::recursive_mutex connectorLock
Definition: BusImpl.h:121
virtual void handleIncoming(EventPtr event, BusConnectionPtr connection)
Definition: BusImpl.cpp:195
bool empty() const
Indicates whether there are scopes with associated sinks.
Instances of this class receive events from a bus that is accessed via a socket connection.
Definition: InConnector.h:61
ConnectionList connections
Definition: BusImpl.h:117
rsc::logging::LoggerPtr logger
Definition: BusImpl.h:111
boost::recursive_mutex connectionLock
Definition: BusImpl.h:118
virtual bool isTcpnodelay() const
Definition: BusImpl.cpp:80
size_t size() const
Returns number of scopes with associated sinks.
void addSink(const Scope &scope, const T &sink)
Associates sink with scope.
virtual AsioServiceContextPtr getService() const
Definition: BusImpl.cpp:76
boost::shared_ptr< BusConnection > BusConnectionPtr
Definition: Bus.h:46
virtual void removeSink(const InConnector *sink)
Definition: BusImpl.cpp:100
void mapSinks(const Scope &scope, boost::function< void(T &)> function) const
SinkDispatcher sinkDispatcher
Definition: BusImpl.h:120
boost::shared_ptr< Event > EventPtr
Definition: Event.h:264
Scope is a descriptor for a hierarchical channel of the unified bus.
Definition: Scope.h:46
void removeSink(const Scope &scope, const T *sink)
std::list< BusConnectionPtr > ConnectionList
Definition: BusImpl.h:102
virtual void addSink(InConnectorPtr sink)
Definition: BusImpl.cpp:92
ConnectionList getConnections() const
Definition: BusImpl.cpp:84
boost::recursive_mutex & getConnectionLock()
Definition: BusImpl.cpp:88