31 #include <boost/bind.hpp>
33 #include <boost/thread/thread_time.hpp>
35 #include "../../MetaData.h"
40 using namespace boost::asio;
41 using boost::asio::ip::tcp;
43 using namespace rsc::logging;
49 BusServer::BusServer(boost::uint16_t port,
52 :
Bus(service, tcpnodelay),
53 logger(Logger::getLogger(
"rsb.transport.socket.BusServer")),
54 acceptor(service, tcp::endpoint(tcp::v4(), port)),
56 active(false), shutdown(false) {
67 acceptOne(boost::dynamic_pointer_cast<BusServer>(shared_from_this()));
86 RSCINFO(
logger,
"Listening on " << this->
acceptor.local_endpoint());
89 boost::asio::placeholders::error));
94 const boost::system::error_code& error) {
97 RSCINFO(
logger,
"Got connection from " << socket->remote_endpoint());
101 connection->startReceiving();
103 RSCWARN(
logger,
"Accept failure, trying to continue");
122 RSCDEBUG(
logger,
"Delivering received event to connections " << event);
127 list<BusConnectionPtr> failing;
128 for (ConnectionList::iterator it = connections.begin();
129 it != connections.end(); ++it) {
130 if (*it != connection) {
131 RSCDEBUG(
logger,
"Delivering to connection " << *it);
133 (*it)->sendEvent(event, event->getMetaData().getUserInfo(
"rsb.wire-schema"));
134 }
catch (
const std::exception& e) {
135 RSCWARN(
logger,
"Send failure (" << e.what() <<
"); will close connection later");
139 failing.push_back(*it);
146 for (list<BusConnectionPtr>::const_iterator it = failing.begin();
147 it != failing.end(); ++it) {
154 Factory::getInstance().removeBusServer(shared_from_this());