29 #include <rsc/runtime/ContainerIO.h>
31 #include <boost/bind.hpp>
33 #include <boost/thread.hpp>
35 #include <boost/asio/ip/tcp.hpp>
38 #include "../../MetaData.h"
42 using namespace rsc::logging;
44 using namespace boost::asio;
45 using boost::asio::ip::tcp;
51 Bus::Bus(io_service& service,
bool tcpnodelay) :
52 logger(Logger::getLogger(
"rsb.transport.socket.Bus")),
53 service(service), tcpnodelay(tcpnodelay) {
58 if (!this->sinks.empty()) {
59 RSCWARN(logger,
"" << this->sinks.size() <<
" non-empty scopes when destructing");
64 for (ConnectionList::iterator it = this->connections.begin();
65 it != this->connections.end(); ++it) {
68 }
catch (
const std::exception& e) {
69 RSCDEBUG(logger,
"Failed to disconnect connection " << *it
75 bool Bus::isTcpnodelay()
const {
76 return this->tcpnodelay;
80 return this->connections;
83 boost::recursive_mutex& Bus::getConnectionLock() {
84 return this->connectionLock;
88 boost::recursive_mutex::scoped_lock lock(this->connectorLock);
90 Scope scope = sink->getScope();
91 RSCDEBUG(logger,
"Adding sink " << sink <<
" to scope " << scope);
92 SinkList& connectors = this->sinks[scope];
93 connectors.push_back(sink);
97 boost::recursive_mutex::scoped_lock lock(this->connectorLock);
100 RSCDEBUG(logger,
"Removing sink " << sink <<
" from scope " << scope);
101 SinkList& connectors = this->sinks[scope];
102 RSCDEBUG(logger,
"Scope " << scope <<
" has "
103 << connectors.size() <<
" connectors (before removing)");
104 for (SinkList::iterator it = connectors.begin(); it != connectors.end(); ++it) {
108 if (!ptr || (ptr.get() == sink)) {
109 RSCDEBUG(logger,
"Found connector " << sink <<
" in scope " << scope);
110 connectors.erase(it);
114 RSCDEBUG(logger,
"Scope " << scope <<
" has "
115 << connectors.size() <<
" connectors (after removing)");
119 if (connectors.empty()) {
120 RSCDEBUG(logger,
"Removing empty scope " << scope);
121 this->sinks.erase(scope);
126 boost::recursive_mutex::scoped_lock lock(this->connectorLock);
128 RSCDEBUG(logger,
"Adding connector " << connector);
129 this->connectors.push_back(connector);
133 boost::recursive_mutex::scoped_lock lock(this->connectorLock);
135 RSCDEBUG(logger,
"Removing connector " << connector);
136 this->connectors.remove(connector);
139 if (this->connectors.empty()) {
140 RSCINFO(logger,
"No more connectors; suiciding");
146 RSCDEBUG(logger,
"Adding connection " << connection);
148 boost::recursive_mutex::scoped_lock lock(this->connectionLock);
150 this->connections.push_back(connection);
154 RSCDEBUG(logger,
"Removing connection " << connection);
156 boost::recursive_mutex::scoped_lock lock(this->connectionLock);
158 this->connections.remove(connection);
163 RSCDEBUG(logger,
"Delivering received event to connectors " << event);
165 vector<Scope> scopes =
event->getScopePtr()->superScopes(
true);
166 RSCDEBUG(logger,
"Relevant scopes " << scopes);
169 boost::recursive_mutex::scoped_lock lock(this->connectorLock);
171 for (vector<Scope>::const_iterator it = scopes.begin(); it != scopes.end(); ++it) {
172 SinkMap::const_iterator it_ = this->sinks.find(*it);
173 if (it_ != this->sinks.end()) {
174 const SinkList& connectors = it_->second;
176 for (SinkList::const_iterator it__ = connectors.begin(); it__
177 != connectors.end(); ++it__) {
180 RSCDEBUG(logger,
"Delivering to connector " << connector <<
" in " << *it);
181 connector->handle(event);
191 RSCDEBUG(logger,
"Delivering outgoing event to connectors " << event);
193 vector<Scope> scopes =
event->getScopePtr()->superScopes(
true);
194 RSCDEBUG(logger,
"Relevant scopes " << scopes);
197 boost::recursive_mutex::scoped_lock lock(this->connectorLock);
199 for (vector<Scope>::const_iterator it = scopes.begin(); it != scopes.end(); ++it) {
200 SinkMap::const_iterator it_ = this->sinks.find(*it);
201 if (it_ != this->sinks.end()) {
202 const SinkList& connectors = it_->second;
204 for (SinkList::const_iterator it__ = connectors.begin(); it__
205 != connectors.end(); ++it__) {
208 RSCDEBUG(logger,
"Delivering to connector " << connector <<
" in " << *it);
209 connector->handle(event);
218 boost::recursive_mutex::scoped_lock lock(this->connectionLock);
220 RSCDEBUG(logger,
"Dispatching outgoing event " << event <<
" to connections");
222 string wireSchema =
event->getMetaData().getUserInfo(
"rsb.wire-schema");
223 list<BusConnectionPtr> failing;
224 for (list<BusConnectionPtr>::iterator it = this->connections.begin();
225 it != this->connections.end(); ++it) {
226 RSCDEBUG(logger,
"Dispatching to connection " << *it);
228 (*it)->sendEvent(event, wireSchema);
229 }
catch (
const std::exception& e) {
230 RSCWARN(logger,
"Send failure (" << e.what() <<
"); will close connection later");
233 failing.push_back(*it);
238 for (list<BusConnectionPtr>::const_iterator it = failing.begin();
239 it != failing.end(); ++it) {
240 removeConnection(*it);
245 void Bus::suicide() {
246 Factory::getInstance().removeBusClient(shared_from_this());
249 void Bus::printContents(ostream& stream)
const {
250 stream <<
"connections = " << this->connections
251 <<
", sinks = " << this->sinks;