29 #include <boost/lexical_cast.hpp>
31 #include <boost/asio/ip/address.hpp>
33 #include <boost/format.hpp>
37 using namespace boost;
39 using namespace boost::asio;
40 using boost::asio::ip::tcp;
42 using namespace rsc::logging;
51 logger(Logger::getLogger(
"rsb.transport.socket.Factory")),
52 keepAlive(new io_service::work(service)),
53 thread(boost::bind(&boost::asio::io_service::run, &service)) {
54 RSCINFO(
logger,
"Started service thread");
62 RSCINFO(
logger,
"Stopping service thread");
65 RSCINFO(
logger,
"Stopped service thread");
72 RSCDEBUG(
logger,
"Was asked for a bus client for " << host <<
":" << port);
79 BusClientMap::const_iterator it;
81 BusPtr result = it->second;
83 result->addConnector(connector);
84 RSCDEBUG(
logger,
"Found existing bus client "
85 << result <<
" without resolving");
89 RSCDEBUG(
logger,
"Did not find bus client without resolving");
97 RSCDEBUG(
logger,
"Resolving endpoint")
98 tcp::resolver resolver(this->
service);
99 tcp::resolver::query query(host, lexical_cast<string>(port),
100 tcp::resolver::query::numeric_service);
101 for (tcp::resolver::iterator endpointIterator = resolver.resolve(query);
102 endpointIterator != tcp::resolver::iterator();
103 ++endpointIterator) {
104 endpoint =
Endpoint(endpointIterator->host_name(), port);
107 BusClientMap::const_iterator it;
109 BusPtr result = it->second;
111 result->addConnector(connector);
112 RSCDEBUG(
logger,
"Found existing bus client "
113 << it->second <<
" after resolving");
120 for (tcp::resolver::iterator endpointIterator = resolver.resolve(query);
121 endpointIterator != tcp::resolver::iterator();
122 ++endpointIterator) {
123 endpoint =
Endpoint(endpointIterator->host_name(), port);
124 RSCDEBUG(
logger,
"Trying endpoint " << endpointIterator->endpoint());
125 socket.reset(
new tcp::socket(this->
service));
126 boost::system::error_code error;
127 socket->connect(endpointIterator->endpoint(), error);
129 RSCDEBUG(
logger,
"Success");
132 RSCDEBUG(
logger,
"Failed: " << error.message());
136 throw runtime_error(str(format(
"Could not connector to any of the endpoints to which %1%:%2% resolved.")
142 RSCDEBUG(
logger,
"Did not find bus client after resolving; creating a new one");
148 result->addConnection(connection);
149 connection->startReceiving();
151 result->addConnector(connector);
153 RSCDEBUG(
logger,
"Created new bus client " << result);
159 RSCDEBUG(
logger,
"Removing client bus " << bus);
161 boost::mutex::scoped_lock lock(this->
busMutex);
163 for (BusClientMap::iterator it = this->
busClients.begin();
165 if (it->second == bus) {
167 RSCDEBUG(
logger,
"Removed");
177 RSCDEBUG(
logger,
"Was asked for a bus server for " << host <<
":" << port);
182 BusServerMap::const_iterator it;
184 RSCDEBUG(
logger,
"Found existing bus server " << it->second);
186 it->second->addConnector(connector);
192 RSCDEBUG(
logger,
"Did not find bus server; creating a new one");
198 result->addConnector(connector);
200 RSCDEBUG(
logger,
"Created new bus client " << result);
206 RSCDEBUG(
logger,
"Removing server bus " << bus);
208 boost::mutex::scoped_lock lock(this->
busMutex);
210 for (BusServerMap::iterator it = this->
busServers.begin();
212 if (it->second == bus) {
213 boost::dynamic_pointer_cast<
BusServer>(bus)->deactivate();
215 RSCDEBUG(
logger,
"Removed");
222 const std::string& host,
223 const boost::uint16_t& port,
227 boost::mutex::scoped_lock lock(this->
busMutex);
229 switch (serverMode) {
237 }
catch (
const std::exception& e) {
239 "Could not create server for bus: " << e.what() <<
"; trying to access bus as client");
244 throw invalid_argument(
"Impossible Server enum value received");
250 if (bus->isTcpnodelay() != tcpnodelay) {
251 throw invalid_argument(str(format(
"Requested tcpnodelay option %1% does not match existing option %2%")
252 % tcpnodelay % bus->isTcpnodelay()));