29 #include <rsc/misc/langutils.h>
31 #include "../../MetaData.h"
32 #include "../../EventId.h"
33 #include "../../Scope.h"
35 #include "../../protocol/ProtocolException.h"
36 #include "../../protocol/FragmentedNotification.h"
41 using namespace rsc::runtime;
42 using namespace rsc::logging;
44 using namespace rsb::protocol;
45 using namespace rsb::converter;
50 transport::OutConnector* OutConnector::create(
const Properties& args) {
51 static LoggerPtr logger = Logger::getLogger(
"rsb.spread.OutConnector");
52 RSCDEBUG(logger,
"creating OutConnector with properties " << args);
54 return new OutConnector(
55 args.get<ConverterSelectionStrategyPtr>(
"converters"),
58 args.getAs<
unsigned int>(
"maxfragmentsize", 100000));
62 const string& host,
unsigned int port,
unsigned int maxFragmentSize) :
63 transport::ConverterSelectingConnector<string>(converters), logger(
64 Logger::getLogger(
"rsb.spread.OutConnector")), active(false), connector(
66 maxFragmentSize), minDataSpace(5) {
76 return "OutConnector";
94 this->
connector->setQualityOfServiceSpecs(specs);
102 string wireSchema = c->serialize(
103 make_pair(event->getType(),
event->getData()), wire);
105 event->mutableMetaData().setSendTime(rsc::misc::currentTimeMicros());
108 vector<FragmentedNotificationPtr> fragments;
111 unsigned int currentDataPart = 0;
114 while (curPos < wire.size() || currentDataPart == 0) {
121 if (currentDataPart == 0) {
127 unsigned int headerByteSize = notification->ByteSize();
131 "The meta data of this event are too big for spread!");
136 string dataPart = wire.substr(curPos, maxDataPartSize);
137 curPos += maxDataPartSize;
139 notification->mutable_notification()->set_data(dataPart);
140 notification->set_data_part(currentDataPart);
142 notification->set_num_data_parts(1);
144 fragments.push_back(notification);
152 assert(!fragments.empty());
153 if (fragments.size() > 1) {
154 for (vector<FragmentedNotificationPtr>::iterator fragmentIt =
155 fragments.begin(); fragmentIt != fragments.end();
157 (*fragmentIt)->set_num_data_parts(fragments.size());
163 for (vector<FragmentedNotificationPtr>::const_iterator fragmentIt =
164 fragments.begin(); fragmentIt != fragments.end(); ++fragmentIt) {
167 string serializedMessageData;
168 if (!(*fragmentIt)->SerializeToString(&serializedMessageData)) {
174 const vector<string>& groupNames =
connector->makeGroupNames(
175 *event->getScopePtr());
176 for (vector<string>::const_iterator groupIt = groupNames.begin();
177 groupIt != groupNames.end(); ++groupIt) {
184 "This is the serialized message size before send: " << spreadMessage.
getSize());
190 RSCDEBUG(logger,
"event sent to spread");