29 #include <boost/format.hpp>
30 #include <boost/date_time/microsec_time_clock.hpp>
32 #include "../../protocol/ProtocolException.h"
36 using namespace boost;
37 using namespace boost::posix_time;
39 using namespace rsc::logging;
40 using namespace rsc::threading;
42 using namespace rsb::protocol;
51 format(
"rsb.spread.Assembly[%1%]")
52 % n->notification().event_id().sequence_number()))), receivedParts(
53 0), birthTime(microsec_clock::local_time()) {
54 store.resize(n->num_data_parts());
62 RSCTRACE(
logger,
"Joining fragments");
66 store[0]->mutable_notification(),
67 rsc::misc::ParentSharedPtrDeleter
68 < rsb::protocol::FragmentedNotification > (
store[0]));
71 string* resultData = notification->mutable_data();
72 for (
unsigned int i = 1; i < this->
store.size(); ++i) {
73 resultData->append(
store[i]->notification().data());
81 "Adding notification " << n->notification().event_id().sequence_number() <<
" (part " << n->data_part() <<
"/" << this->
store.size() <<
") to assembly");
82 assert(n->num_data_parts() ==
store.size());
84 if (
store[n->data_part()]) {
88 "Received fragment (%d/%d) of notification for event with sender id %x and sequence number %d twice!.")
89 % n->data_part() % n->num_data_parts()
90 % n->notification().event_id().sender_id()
91 % n->notification().event_id().sequence_number()));
93 store[n->data_part()] = n;
103 return (microsec_clock::local_time() - this->
birthTime).total_seconds();
107 boost::recursive_mutex& poolMutex,
const unsigned& ageS,
108 const unsigned int& pruningIntervalMs) :
109 PeriodicTask(pruningIntervalMs), logger(
110 Logger::getLogger(
"rsb.spread.AssemblyPool.PruningTask")), pool(
111 pool), poolMutex(poolMutex), maxAge(ageS) {
115 boost::recursive_mutex::scoped_lock lock(this->
poolMutex);
117 RSCDEBUG(
logger,
"Scanning for old assemblies");
118 Pool::iterator it = this->
pool.begin();
119 while (it != this->
pool.end()) {
120 if (it->second->age() > maxAge) {
121 RSCDEBUG(
logger,
"Pruning old assembly " << it->second);
122 this->
pool.erase(it++);
132 logger(Logger::getLogger(
"rsb.spread.AssemblyPool")),
pruningAgeS(ageS), pruningIntervalMs(
135 throw domain_error(
"Age must not be 0.");
137 if (pruningIntervalMs == 0) {
138 throw domain_error(
"Pruning interval must not be 0");
155 RSCDEBUG(
logger,
"Starting Assembly pruning");
161 RSCDEBUG(
logger,
"Stopping Assembly pruning");
165 RSCDEBUG(
logger,
"Assembly pruning stopped");
172 boost::recursive_mutex::scoped_lock lock(this->
poolMutex);
174 string key = notification->notification().event_id().sender_id();
176 notification->notification().event_id().sequence_number()
179 notification->notification().event_id().sequence_number()
182 notification->notification().event_id().sequence_number()
185 notification->notification().event_id().sequence_number()
187 Pool::iterator it = this->
pool.find(key);
190 if (it != this->
pool.end()) {
192 assembly = it->second;
195 "Adding notification " << notification->notification().event_id().sequence_number() <<
" to existing assembly " << assembly);
196 assembly->add(notification);
201 "Creating new assembly for notification " << notification->notification().event_id().sequence_number());
202 assembly.reset(
new Assembly(notification));
203 it = this->
pool.insert(make_pair(key, assembly)).first;
206 if (assembly->isComplete()) {
207 result = assembly->getCompleteNotification();
208 this->
pool.erase(it);
211 RSCTRACE(
logger,
"dataPool size: " << this->
pool.size());