30 #include <rsc/threading/ThreadedTaskExecutor.h>
32 #include "../../Scope.h"
36 using namespace rsc::logging;
37 using namespace rsc::runtime;
38 using namespace rsc::threading;
40 using namespace rsb::eventprocessing;
41 using namespace rsb::converter;
46 transport::InPushConnector* InPushConnector::create(
const Properties& args) {
47 static LoggerPtr logger = Logger::getLogger(
"rsb.spread.InConnector");
48 RSCDEBUG(logger,
"creating InConnector with properties " << args);
50 return new InPushConnector(args.get<ConverterSelectionStrategyPtr>(
"converters"),
55 InPushConnector::InPushConnector(
const ConverterSelectionStrategyPtr converters,
58 transport::ConverterSelectingConnector<string>(converters), logger(
59 Logger::getLogger(
"rsb.spread.InPushConnector")), active(false),
60 connector(new SpreadConnector(host, port)) {
61 this->exec = TaskExecutorPtr(
new ThreadedTaskExecutor);
62 this->rec = boost::shared_ptr<ReceiverTask>(
new ReceiverTask(
63 this->connector->getConnection(),
HandlerPtr(),
this));
72 string InPushConnector::getClassName()
const {
73 return "InPushConnector";
77 stream <<
"active = " << this->
active
78 <<
"connector = " << this->connector;
82 this->connector->activate();
85 this->exec->schedule(rec);
90 if (activationScope) {
92 activationScope.reset();
99 if (this->connector->getConnection()->isActive()) {
100 this->connector->getConnection()->interruptReceive();
101 this->rec->waitDone();
103 this->connector->deactivate();
108 this->connector->setQualityOfServiceSpecs(specs);
110 this->rec->setPruning(
false);
112 this->rec->setPruning(
true);
119 this->rec->setHandler(this->
handlers.front());
129 activationScope.reset(
new Scope(scope));
131 connector->join(connector->makeGroupName(scope));