31 #include <rsc/misc/Registry.h>
33 #include "../../CommException.h"
34 #include "../../UnsupportedQualityOfServiceException.h"
35 #include "../../util/MD5.h"
36 #include "../../Scope.h"
38 #include "../../converter/Converter.h"
47 using namespace rsc::logging;
48 using namespace rsc::runtime;
51 using namespace rsb::util;
52 using namespace rsb::transport;
57 const SpreadConnector::QoSMap SpreadConnector::qosMapping =
58 SpreadConnector::buildQoSMapping();
60 SpreadConnector::SpreadConnector(
const string& host,
unsigned int port) {
64 void SpreadConnector::init(
const string& host,
unsigned int port) {
65 this->logger = Logger::getLogger(
"rsb.spread.SpreadConnector");
66 RSCDEBUG(logger,
"SpreadConnector::init() entered");
67 this->activated =
false;
76 void SpreadConnector::activate() {
78 this->con->activate();
79 this->activated =
true;
82 void SpreadConnector::deactivate() {
83 RSCDEBUG(logger,
"deactivate() entered");
84 if (this->con->isActive()) {
85 this->con->deactivate();
88 RSCTRACE(logger,
"deactivate() finished");
89 this->activated =
false;
92 void SpreadConnector::join(
const string& name) {
93 this->memberships->join(name, this->con);
96 void SpreadConnector::leave(
const string& name) {
97 this->memberships->leave(name, this->con);
101 this->con->send(msg);
105 this->con->receive(msg);
108 SpreadConnector::~SpreadConnector() {
109 if (this->activated) {
119 return this->messageQoS;
123 map<QualityOfServiceSpec::Reliability, SpreadMessage::QOS> unorderedMap;
125 make_pair(QualityOfServiceSpec::UNRELIABLE,
126 SpreadMessage::UNRELIABLE));
128 make_pair(QualityOfServiceSpec::RELIABLE, SpreadMessage::RELIABLE));
130 map<QualityOfServiceSpec::Reliability, SpreadMessage::QOS> orderedMap;
132 make_pair(QualityOfServiceSpec::UNRELIABLE, SpreadMessage::FIFO));
134 make_pair(QualityOfServiceSpec::RELIABLE, SpreadMessage::FIFO));
138 table.insert(make_pair(QualityOfServiceSpec::UNORDERED, unorderedMap));
139 table.insert(make_pair(QualityOfServiceSpec::ORDERED, orderedMap));
144 void SpreadConnector::setQualityOfServiceSpecs(
147 QoSMap::const_iterator orderMapIt = qosMapping.find(specs.
getOrdering());
148 if (orderMapIt == qosMapping.end()) {
151 map<QualityOfServiceSpec::Reliability, SpreadMessage::QOS>::const_iterator
153 if (mapIt == orderMapIt->second.end()) {
157 messageQoS = mapIt->second;
159 RSCDEBUG(logger,
"Selected new message type " << messageQoS);
162 const vector<string>& SpreadConnector::makeGroupNames(
163 const Scope& scope)
const {
165 boost::upgrade_lock<boost::shared_mutex> lock(groupNameCacheMutex);
167 GroupNameCache::const_iterator it = this->groupNameCache.find(scope);
168 if (it != this->groupNameCache.end()) {
172 boost::upgrade_to_unique_lock<boost::shared_mutex> uniqueLock(lock);
178 if (groupNameCache.size() > 300) {
179 RSCDEBUG(logger,
"Flushing group name cache");
180 groupNameCache.clear();
184 vector<string>& cacheItem = this->groupNameCache[scope];
186 for (vector<Scope>::const_iterator scopeIt = scopes.begin(); scopeIt
187 != scopes.end(); ++scopeIt) {
188 cacheItem.push_back(this->makeGroupName(*scopeIt));
195 std::string SpreadConnector::makeGroupName(
const Scope& scope)
const {
196 return MD5(scope.
toString()).toHexString().substr(0, MAX_GROUP_NAME - 1);