29 #include <rsc/misc/langutils.h>
30 #include <rsc/debug/DebugTools.h>
32 #include "../../CommException.h"
33 #include "../../MetaData.h"
34 #include "../../EventId.h"
36 #include "../../converter/Converter.h"
43 using namespace rsc::logging;
46 using namespace rsb::eventprocessing;
47 using namespace rsb::transport;
48 using namespace rsb::protocol;
55 logger(rsc::logging::Logger::getLogger(
"rsb.spread.ReceiverTask")), con(
56 s), connector(connector), assemblyPool(new
AssemblyPool()), handler(
59 RSCTRACE(
logger,
"ReceiverTask::ReceiverTask, SpreadConnection: " <<
con);
71 con->receive(message);
74 "Receiving a SpreadMessage returned a zero pointer, why?");
78 "ReceiverTask::execute new SpreadMessage received " << message);
85 if (!notification->ParseFromString(message->getDataAsString())) {
86 throw CommException(
"Failed to parse notification in pbuf format");
90 "Parsed event seqnum: " << notification->notification().event_id().sequence_number());
92 "Binary length: " << notification->notification().data().length());
94 "Number of split message parts: " << notification->num_data_parts());
96 "... received message part : " << notification->data_part());
101 if (completeNotification) {
103 "ReceiverTask::execute fragmented notification joined, last message " << message);
110 rsc::debug::DebugToolsPtr tools = rsc::debug::DebugTools::newInstance();
112 "Error receiving spread message: " << e.what() << endl << tools->exceptionInfo(e));
113 }
catch (boost::thread_interrupted& e) {
124 bool multiPartNotification = notification->num_data_parts() > 1;
125 if (multiPartNotification) {
126 completeNotification = this->
assemblyPool->add(notification);
128 completeNotification.reset(
129 notification->mutable_notification(),
130 rsc::misc::ParentSharedPtrDeleter
131 < rsb::protocol::FragmentedNotification
134 return completeNotification;
145 notification->wire_schema());
147 notification->wire_schema(), notification->data());
149 fillEvent(e, *notification, deserialized.second, deserialized.first);
151 e->mutableMetaData().setReceiveTime();
157 RSCINFO(
logger,
"No handler");
159 }
catch (
const std::exception& ex) {
160 RSCWARN(
logger,
"ReceiverTask::notifyHandler catched std exception: " << ex.what() );
162 RSCWARN(
logger,
"ReceiverTask::notifyHandler catched unknown exception" );