32 #include <boost/lexical_cast.hpp>
33 #include <boost/format.hpp>
35 #include <rsc/misc/IllegalStateException.h>
39 #include "../../CommException.h"
43 using namespace boost;
45 using namespace rsc::logging;
50 #define SPREAD_MAX_GROUPS 100
51 #define SPREAD_MAX_MESSLEN 180000
53 SpreadConnection::SpreadConnection(
const string&
id,
const string& host,
55 logger(Logger::getLogger(
"rsb.transport.spread.SpreadConnection")), connected(false),
56 host(host), port(port),
58 spreadname(str(format(
"%1%@%2%") % port % host)),
61 ? lexical_cast<string>(port)
62 : str(format(
"%1%@%2%") % port % host)),
64 conId(id), msgCount(0) {
65 RSCDEBUG(
logger,
"instantiated spread connection with id " <<
conId
70 RSCDEBUG(
logger,
"destroying SpreadConnection object");
79 throw rsc::misc::IllegalStateException(
"Connection with id " +
conId
80 +
" is already active.");
84 char spreadPrivateGroup[MAX_GROUP_NAME];
85 int ret = SP_connect(
spreadname.c_str(), 0, 0, 0, &
con, spreadPrivateGroup);
86 spreadpg = string(spreadPrivateGroup);
87 stringstream errorString;
88 if (ret != ACCEPT_SESSION) {
89 errorString <<
"Error connecting to '" <<
spreadname <<
"': ";
93 <<
"connection to spread daemon at "
94 << spreadname <<
" failed, check port and hostname";
96 case COULD_NOT_CONNECT:
98 <<
"connection to spread daemon failed due to socket errors, check port and hostname";
100 case CONNECTION_CLOSED:
102 <<
"communication errors occurred during setup of connection";
106 <<
"daemon or library version mismatch";
109 errorString <<
"protocol error during setup";
111 case REJECT_ILLEGAL_NAME:
113 <<
"name provided violated requirement, length or illegal character";
115 case REJECT_NOT_UNIQUE:
117 <<
"name provided is not unique on this daemon";
120 errorString <<
"unknown spread connect error, value: " << ret;
123 RSCFATAL(
logger, errorString.str());
126 RSCDEBUG(
logger,
"success, private group id is " << spreadpg);
128 RSCINFO(
logger,
"connected to spread daemon");
137 throw rsc::misc::IllegalStateException(
"Connection with id " +
conId
138 +
" is not active.");
152 throw rsc::misc::IllegalStateException(
"Connection with id " +
conId
153 +
" is not active.");
161 char sender[MAX_GROUP_NAME];
164 int dummyEndianMismatch;
167 &numGroups, retGroups, &messType, &dummyEndianMismatch,
175 case ILLEGAL_SESSION:
176 err =
"spread receive error: mbox given to receive on was illegal";
178 case ILLEGAL_MESSAGE:
179 err =
"spread receive error: message had an illegal structure";
181 case CONNECTION_CLOSED:
182 err =
"spread receive error: message communication errors occurred";
184 case GROUPS_TOO_SHORT:
186 =
"spread receive error: groups array too short to hold list of groups";
188 case BUFFER_TOO_SHORT:
190 =
"spread receive error: message body buffer too short to hold the message received";
193 err =
"unknown spread receive error";
195 throw CommException(
"Spread communication error. Reason: " + err);
200 if (Is_regular_mess(serviceType)) {
202 RSCDEBUG(
logger,
"regular spread message received");
205 if (numGroups == 1 &&
string(retGroups[0]) ==
string(
spreadpg)) {
206 throw boost::thread_interrupted();
210 sm->setData(
string(buf, ret));
214 "error during message reception, group array too large, requested size "
217 for (
int i = 0; i < numGroups; i++) {
218 if (retGroups[i] != NULL) {
219 string group = string(retGroups[i]);
221 "received message, addressed at group with name "
227 }
else if (Is_membership_mess(serviceType)) {
231 RSCINFO(
logger,
"received spread membership message type");
236 RSCFATAL(
logger,
"received unknown spread message type with code " << serviceType);
239 "Received a message that is neither membership nor data message. "
240 "This should never happen according to the spread documentation.");
250 throw rsc::misc::IllegalStateException(
"Connection with id " +
conId
251 +
" is not active.");
255 if (groupCount == 0) {
260 if (groupCount == 1) {
263 RSCDEBUG(
logger,
"sending message to group with name " << group);
268 char* groups =
new char[groupCount * MAX_GROUP_NAME];
269 memset(groups, 0, groupCount * MAX_GROUP_NAME);
274 strcpy(groups + j * MAX_GROUP_NAME, group.c_str());
277 ret = SP_multigroup_multicast(
con, msg.
getQOS(), groupCount,
278 (
const char(*)[MAX_GROUP_NAME]) groups, 0, msg.
getSize(),
290 case ILLEGAL_SESSION:
291 err <<
"Illegal Session";
293 case ILLEGAL_MESSAGE:
294 err <<
"Illegal Message";
296 case CONNECTION_CLOSED:
297 err <<
"Connection Closed";
300 err <<
"Unknown spread error with code " << ret;
313 throw rsc::misc::IllegalStateException(
"Connection with id " +
conId
314 +
" is not active.");
317 SP_multicast(
con, RELIABLE_MESS,
spreadpg.c_str(), 0, 0, 0);
336 throw rsc::misc::IllegalStateException(
"Connection with id " +
conId
337 +
" is not active.");
347 return DEFAULT_SPREAD_PORT;