29 #include <boost/thread/shared_mutex.hpp>
30 #include <boost/thread/locks.hpp>
32 #include <rsc/debug/DebugTools.h>
33 #include <rsc/runtime/ContainerIO.h>
34 #include <rsc/misc/langutils.h>
36 #include "../MetaData.h"
37 #include "../filter/Filter.h"
41 using namespace boost;
43 using namespace rsc::runtime;
44 using namespace rsc::logging;
47 namespace eventprocessing {
53 DirectEventReceivingStrategy::DirectEventReceivingStrategy(
bool singleThreaded) :
54 logger(Logger::getLogger(
"rsb.eventprocessing.DirectEventReceivingStrategy")),
56 singleThreaded(singleThreaded) {
63 boost::shared_lock<boost::shared_mutex> filtersLock(this->
filtersMutex);
65 stream <<
"filters = " << this->
filters
82 boost::shared_lock<boost::shared_mutex> lock(
filtersMutex);
86 }
catch (
const std::exception& ex) {
89 s <<
"Exception matching event " << e <<
":" << endl;
90 s << ex.what() << endl;
91 s << rsc::debug::DebugTools::newInstance()->exceptionInfo(ex);
98 s <<
"Catch-all handler called matching event " << endl;
99 rsc::debug::DebugToolsPtr tool = rsc::debug::DebugTools::newInstance();
100 vector<string> trace = tool->createBacktrace();
101 s << tool->formatBacktrace(trace);
112 for (set<filter::FilterPtr>::const_iterator filterIt =
114 if (!(*filterIt)->match(e)) {
126 RSCERROR(
logger, message);
129 cerr << message << endl;
132 cerr << message << endl;
137 RSCERROR(
logger, message);
144 RSCDEBUG(
logger,
"Delivering event " << e <<
" to handler " << handler);
150 }
catch (
const std::exception& ex) {
153 s <<
"Exception dispatching event " << e <<
" to handler " << handler
155 s << ex.what() << endl;
156 s << rsc::debug::DebugTools::newInstance()->exceptionInfo(ex);
163 s <<
"Catch-all handler called dispatching event " << e
164 <<
" to handler " << handler << endl;
165 rsc::debug::DebugToolsPtr tool = rsc::debug::DebugTools::newInstance();
166 vector<string> trace = tool->createBacktrace();
167 s << tool->formatBacktrace(trace);
179 boost::shared_lock<boost::shared_mutex> lock(this->
handlerMutex);
186 event->mutableMetaData().setDeliverTime(rsc::misc::currentTimeMicros());
189 for (HandlerList::const_iterator it = this->
handlers.begin(); it
197 boost::shared_lock<boost::shared_mutex> lock(this->
handlerMutex);
204 boost::shared_lock<boost::shared_mutex> lock(this->
handlerMutex);
210 boost::unique_lock<boost::shared_mutex> lock(
filtersMutex);
216 boost::unique_lock<boost::shared_mutex> lock(
filtersMutex);