29 #include "../../MetaData.h"
35 using namespace boost;
37 using namespace rsc::logging;
38 using namespace rsc::runtime;
39 using namespace rsc::threading;
45 transport::InPullConnector* InPullConnector::create(
const Properties& args) {
46 LoggerPtr logger = Logger::getLogger(
"rsb.transport.socket.InPullConnector");
47 RSCDEBUG(logger,
"Creating InPullConnector with properties " << args);
49 return new InPullConnector(args.get<ConverterSelectionStrategyPtr>(
"converters"),
53 args.getAs<
bool> (
"tcpnodelay",
false));
62 InConnector(converters, host, port, server, tcpnodelay),
63 logger(Logger::getLogger(
"rsb.transport.socket.InPullConnector")) {
71 throw std::runtime_error(
"Cannot handle events when not active");
78 event->mutableMetaData().setReceiveTime();
82 boost::shared_ptr<string> wireData = static_pointer_cast<
string>(
event->getData());
83 string wireSchema =
event->getMetaData().getUserInfo(
"rsb.wire-schema");
87 =
getConverter(wireSchema)->deserialize(wireSchema, *wireData);
88 event->setData(d.second);
89 event->setType(d.first);
91 this->
queue.push(event);
96 return this->
queue.pop();
99 return this->
queue.tryPop();
100 }
catch (
const QueueEmptyException&) {