29 #include <boost/asio/ip/tcp.hpp> 31 #include <rsc/runtime/ContainerIO.h> 33 #include "../../MetaData.h" 42 using boost::asio::ip::tcp;
49 logger(Logger::getLogger(
"rsb.transport.socket.BusImpl")),
50 asioService(asioService), tcpnodelay(tcpnodelay) {
54 RSCDEBUG(
logger,
"Destructing bus instance");
63 for (ConnectionList::iterator it = this->
connections.begin();
67 }
catch (
const std::exception& e) {
68 RSCDEBUG(
logger,
"Failed to disconnect connection " << *it
73 RSCDEBUG(
logger,
"BusImpl destruction finished");
93 boost::recursive_mutex::scoped_lock lock(this->
connectorLock);
95 Scope scope = sink->getScope();
96 RSCDEBUG(
logger,
"Adding sink " << sink <<
" to scope " << scope);
101 boost::recursive_mutex::scoped_lock lock(this->
connectorLock);
104 RSCDEBUG(
logger,
"Removing sink " << sink <<
" from scope " << scope);
109 RSCDEBUG(
logger,
"Adding connection " << connection);
117 RSCDEBUG(
logger,
"Removing connection " << connection);
128 struct PoorPersonsLambda1 {
130 PoorPersonsLambda1(
EventPtr event) : event(event) {}
131 void operator()(InConnector& sink) {
132 sink.handle(this->event);
140 RSCDEBUG(
logger,
"Delivering outgoing event to connectors " << event);
142 vector<Scope> scopes =
event->getScopePtr()->superScopes(
true);
143 RSCDEBUG(
logger,
"Relevant scopes " << scopes);
146 boost::recursive_mutex::scoped_lock lock(this->
connectorLock);
149 PoorPersonsLambda1(event));
156 RSCDEBUG(
logger,
"Dispatching outgoing event " << event <<
" to connections");
158 string wireSchema =
event->getMetaData().getUserInfo(
"rsb.wire-schema");
159 list<BusConnectionPtr> failing;
160 for (list<BusConnectionPtr>::iterator it = this->
connections.begin();
162 RSCDEBUG(
logger,
"Dispatching to connection " << *it);
164 (*it)->sendEvent(event, wireSchema);
165 }
catch (
const std::exception& e) {
166 RSCWARN(
logger,
"Send failure (" << e.what() <<
"); will close connection later");
169 failing.push_back(*it);
174 for (list<BusConnectionPtr>::const_iterator it = failing.begin();
175 it != failing.end(); ++it) {
185 struct PoorPersonsLambda2 {
197 RSCDEBUG(
logger,
"Delivering received event to connectors " << event);
199 vector<Scope> scopes =
event->getScopePtr()->superScopes(
true);
200 RSCDEBUG(
logger,
"Relevant scopes " << scopes);
203 boost::recursive_mutex::scoped_lock lock(this->
connectorLock);
206 PoorPersonsLambda2(event));
virtual const std::string getTransportURL() const
virtual void addConnection(BusConnectionPtr connection)
Adds connection to the list of connections of the bus.
virtual void printContents(std::ostream &stream) const
AsioServiceContextPtr asioService
boost::shared_ptr< InConnector > InConnectorPtr
virtual void removeConnection(BusConnectionPtr connection)
Removes connection from the list of connections of this bus.
virtual void handle(EventPtr event)
Handle event.
boost::shared_ptr< AsioServiceContext > AsioServiceContextPtr
virtual void handle(EventPtr event)=0
Handle event.
boost::recursive_mutex connectorLock
virtual Scope getScope() const
virtual void handleIncoming(EventPtr event, BusConnectionPtr connection)
bool empty() const
Indicates whether there are scopes with associated sinks.
Instances of this class receive events from a bus that is accessed via a socket connection.
ConnectionList connections
rsc::logging::LoggerPtr logger
boost::recursive_mutex connectionLock
virtual bool isTcpnodelay() const
size_t size() const
Returns number of scopes with associated sinks.
void addSink(const Scope &scope, const T &sink)
Associates sink with scope.
virtual AsioServiceContextPtr getService() const
boost::shared_ptr< BusConnection > BusConnectionPtr
virtual void removeSink(const InConnector *sink)
void mapSinks(const Scope &scope, boost::function< void(T &)> function) const
SinkDispatcher sinkDispatcher
boost::shared_ptr< Event > EventPtr
Scope is a descriptor for a hierarchical channel of the unified bus.
void removeSink(const Scope &scope, const T *sink)
std::list< BusConnectionPtr > ConnectionList
virtual void addSink(InConnectorPtr sink)
ConnectionList getConnections() const
boost::recursive_mutex & getConnectionLock()