RSB  0.7.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
DirectEventReceivingStrategy.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is part of the RSB project
4  *
5  * Copyright (C) 2011 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
6  *
7  * This file may be licensed under the terms of the
8  * GNU Lesser General Public License Version 3 (the ``LGPL''),
9  * or (at your option) any later version.
10  *
11  * Software distributed under the License is distributed
12  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
13  * express or implied. See the LGPL for the specific language
14  * governing rights and limitations.
15  *
16  * You should have received a copy of the LGPL along with this
17  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
18  * or write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20  *
21  * The development of this software was supported by:
22  * CoR-Lab, Research Institute for Cognition and Robotics
23  * Bielefeld University
24  *
25  * ============================================================ */
26 
28 
29 #include <boost/thread/shared_mutex.hpp>
30 #include <boost/thread/locks.hpp>
31 
32 #include <rsc/debug/DebugTools.h>
33 #include <rsc/runtime/ContainerIO.h>
34 #include <rsc/misc/langutils.h>
35 
36 #include "../MetaData.h"
37 #include "../filter/Filter.h"
38 
39 using namespace std;
40 
41 using namespace boost;
42 
43 using namespace rsc::runtime;
44 using namespace rsc::logging;
45 
46 namespace rsb {
47 namespace eventprocessing {
48 
49 EventReceivingStrategy* DirectEventReceivingStrategy::create(const Properties& props) {
50  return new DirectEventReceivingStrategy(props.getAs<bool>("singlethreaded"));
51 }
52 
53 DirectEventReceivingStrategy::DirectEventReceivingStrategy(bool singleThreaded) :
54  logger(Logger::getLogger("rsb.eventprocessing.DirectEventReceivingStrategy")),
55  errorStrategy(ParticipantConfig::LOG),
56  singleThreaded(singleThreaded) {
57 }
58 
60 }
61 
62 void DirectEventReceivingStrategy::printContents(ostream& stream) const {
63  boost::shared_lock<boost::shared_mutex> filtersLock(this->filtersMutex);
64  boost::shared_lock<boost::shared_mutex> errorLock(this->errorStrategyMutex);
65  stream << "filters = " << this->filters
66  << ", errorStrategy = " << this->errorStrategy
67  << ", singleThreaded = " << this->singleThreaded;
68 }
69 
71  const ParticipantConfig::ErrorStrategy& strategy) {
72  boost::shared_lock<boost::shared_mutex> lock(errorStrategyMutex);
73  this->errorStrategy = strategy;
74 }
75 
77  // match event
78  try {
79  if (this->singleThreaded) {
80  return filterNoLock(e);
81  } else {
82  boost::shared_lock<boost::shared_mutex> lock(filtersMutex);
83 
84  return filterNoLock(e);
85  }
86  } catch (const std::exception& ex) {
87 
88  stringstream s;
89  s << "Exception matching event " << e << ":" << endl;
90  s << ex.what() << endl;
91  s << rsc::debug::DebugTools::newInstance()->exceptionInfo(ex);
92 
93  handleDispatchError(s.str());
94 
95  } catch (...) {
96 
97  stringstream s;
98  s << "Catch-all handler called matching event " << endl;
99  rsc::debug::DebugToolsPtr tool = rsc::debug::DebugTools::newInstance();
100  vector<string> trace = tool->createBacktrace();
101  s << tool->formatBacktrace(trace);
102 
103  handleDispatchError(s.str());
104 
105  }
106 
107  return false;
108 
109 }
110 
112  for (set<filter::FilterPtr>::const_iterator filterIt =
113  filters.begin(); filterIt != filters.end(); ++filterIt) {
114  if (!(*filterIt)->match(e)) {
115  return false;
116  }
117  }
118  return true;
119 }
120 
122  boost::shared_lock<boost::shared_mutex> strategyLock(errorStrategyMutex);
123 
124  switch (errorStrategy) {
126  RSCERROR(logger, message);
127  break;
129  cerr << message << endl;
130  break;
132  cerr << message << endl;
133  exit(1);
134  break;
135  default:
136  RSCWARN(logger, "Unknown error strategy: " << errorStrategy);
137  RSCERROR(logger, message);
138  break;
139  }
140 
141 }
142 
144  RSCDEBUG(logger, "Delivering event " << e << " to handler " << handler);
145 
146  try {
147 
148  handler->handle(e);
149 
150  } catch (const std::exception& ex) {
151 
152  stringstream s;
153  s << "Exception dispatching event " << e << " to handler " << handler
154  << ":" << endl;
155  s << ex.what() << endl;
156  s << rsc::debug::DebugTools::newInstance()->exceptionInfo(ex);
157 
158  handleDispatchError(s.str());
159 
160  } catch (...) {
161 
162  stringstream s;
163  s << "Catch-all handler called dispatching event " << e
164  << " to handler " << handler << endl;
165  rsc::debug::DebugToolsPtr tool = rsc::debug::DebugTools::newInstance();
166  vector<string> trace = tool->createBacktrace();
167  s << tool->formatBacktrace(trace);
168 
169  handleDispatchError(s.str());
170 
171  }
172 
173 }
174 
176  if (this->singleThreaded) {
177  handleNoLock(event);
178  } else {
179  boost::shared_lock<boost::shared_mutex> lock(this->handlerMutex);
180 
181  handleNoLock(event);
182  }
183 }
184 
186  event->mutableMetaData().setDeliverTime(rsc::misc::currentTimeMicros());
187 
188  if (filter(event)) {
189  for (HandlerList::const_iterator it = this->handlers.begin(); it
190  != this->handlers.end(); ++it)
191  deliver(*it, event);
192  }
193 }
194 
196  const bool& /*wait*/) {
197  boost::shared_lock<boost::shared_mutex> lock(this->handlerMutex);
198 
199  this->handlers.push_back(handler);
200 }
201 
203  const bool& /*wait*/) {
204  boost::shared_lock<boost::shared_mutex> lock(this->handlerMutex);
205 
206  this->handlers.remove(handler);
207 }
208 
210  boost::unique_lock<boost::shared_mutex> lock(filtersMutex);
211 
212  this->filters.insert(filter);
213 }
214 
216  boost::unique_lock<boost::shared_mutex> lock(filtersMutex);
217 
218  this->filters.erase(filter);
219 }
220 
221 }
222 }