RSB  0.19.0
ParallelEventReceivingStrategy.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is a part of the RSB project
4  *
5  * Copyright (C) 2010 by Sebastian Wrede <swrede at techfak dot uni-bielefeld dot de>
6  * 2011 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
7  *
8  * This file may be licensed under the terms of the
9  * GNU Lesser General Public License Version 3 (the ``LGPL''),
10  * or (at your option) any later version.
11  *
12  * Software distributed under the License is distributed
13  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
14  * express or implied. See the LGPL for the specific language
15  * governing rights and limitations.
16  *
17  * You should have received a copy of the LGPL along with this
18  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
19  * or write to the Free Software Foundation, Inc.,
20  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21  *
22  * The development of this software was supported by:
23  * CoR-Lab, Research Institute for Cognition and Robotics
24  * Bielefeld University
25  *
26  * ============================================================ */
27 
29 
30 #include <rsc/debug/DebugTools.h>
31 #include <rsc/runtime/ContainerIO.h>
32 #include <rsc/misc/langutils.h>
33 
34 #include "../MetaData.h"
35 #include "../filter/Filter.h"
36 
37 using namespace std;
38 
39 using namespace rsc::runtime;
40 using namespace rsc::logging;
41 using namespace rsc::debug;
42 
43 namespace rsb {
44 namespace eventprocessing {
45 
46 EventReceivingStrategy* ParallelEventReceivingStrategy::create(const Properties& props) {
47  return new ParallelEventReceivingStrategy(props.getAs<unsigned int>("threads", 5),
48  props.getAs<bool>("parallelhandlercalls", false));
49 }
50 
51 ParallelEventReceivingStrategy::ParallelEventReceivingStrategy(unsigned int numThreads,
52  bool parallelHandlerCalls) :
53  logger(Logger::getLogger("rsb.eventprocessing.ParallelEventReceivingStrategy")),
54  pool(numThreads,
55  boost::bind(&ParallelEventReceivingStrategy::deliver, this, _1, _2),
56  boost::bind(&ParallelEventReceivingStrategy::filter, this, _1, _2)),
57  errorStrategy(ParticipantConfig::ERROR_STRATEGY_LOG) {
58  pool.setParallelCalls(parallelHandlerCalls);
59  pool.start();
60 }
61 
63  pool.stop();
64 }
65 
67  return "ParallelEventReceivingStrategy";
68 }
69 
70 void ParallelEventReceivingStrategy::printContents(ostream& stream) const {
71  boost::shared_lock<boost::shared_mutex> filtersLock(filtersMutex);
72  boost::recursive_mutex::scoped_lock errorLock(errorStrategyMutex);
73  stream << "filters = " << filters << ", errorStrategy = " << errorStrategy;
74 }
75 
77  const ParticipantConfig::ErrorStrategy& strategy) {
78  boost::recursive_mutex::scoped_lock lock(errorStrategyMutex);
79  this->errorStrategy = strategy;
80 }
81 
83  RSCDEBUG(logger, "Matching event " << e << " for handler " << handler);
84 
85  // match event
86  try {
87 
88  if (!handler->acceptsMethod(e->getMethod())) {
89  return false;
90  }
91 
92  boost::shared_lock<boost::shared_mutex> lock(filtersMutex);
93  for (set<filter::FilterPtr>::const_iterator filterIt = filters.begin(); filterIt
94  != filters.end(); ++filterIt) {
95  if (!(*filterIt)->match(e)) {
96  return false;
97  }
98  }
99 
100  return true;
101 
102  } catch (const std::exception& ex) {
103 
104  stringstream s;
105  s << "Exception matching event " << e << " for handler " << handler
106  << ":" << endl;
107  s << ex.what() << endl;
108  s << DebugTools::newInstance()->exceptionInfo(ex);
109 
110  handleDispatchError(s.str());
111 
112  } catch (...) {
113 
114  stringstream s;
115  s << "Catch-all handler called matching event " << e << " for handler "
116  << handler << endl;
117  DebugToolsPtr tool = DebugTools::newInstance();
118  vector<string> trace = tool->createBacktrace();
119  s << tool->formatBacktrace(trace);
120 
121  handleDispatchError(s.str());
122 
123  }
124 
125  return false;
126 
127 }
128 
130 
131  boost::recursive_mutex::scoped_lock strategyLock(errorStrategyMutex);
132  switch (errorStrategy) {
134  RSCERROR(logger, message)
135  ;
136  break;
138  cerr << message << endl;
139  break;
141  cerr << message << endl;
142  exit(1);
143  break;
144  default:
145  RSCWARN(logger, "Unknown error strategy: " << errorStrategy)
146  ;
147  RSCERROR(logger, message)
148  ;
149  break;
150  }
151 
152 }
153 
155  RSCDEBUG(logger, "Delivering event " << e << " to handler " << handler);
156 
157  try {
158 
159  handler->handle(e);
160 
161  } catch (const std::exception& ex) {
162 
163  stringstream s;
164  s << "Exception dispatching event " << e << " to handler " << handler
165  << ":" << endl;
166  s << ex.what() << endl;
167  s << DebugTools::newInstance()->exceptionInfo(ex);
168 
169  handleDispatchError(s.str());
170 
171  } catch (...) {
172 
173  stringstream s;
174  s << "Catch-all handler called dispatching event " << e
175  << " to handler " << handler << endl;
176  DebugToolsPtr tool = DebugTools::newInstance();
177  vector<string> trace = tool->createBacktrace();
178  s << tool->formatBacktrace(trace);
179 
180  handleDispatchError(s.str());
181 
182  }
183 
184 }
185 
187  event->mutableMetaData().setDeliverTime(rsc::misc::currentTimeMicros());
188  pool.push(event);
189 }
190 
192  const bool& /*wait*/) {
193  // wait can be ignored because the pool always ensures this
194  pool.registerReceiver(handler);
195 }
196 
198  const bool& /*wait*/) {
199  // wait can be ignored because the pool always ensures this
200  pool.unregisterReceiver(handler);
201 }
202 
204  boost::unique_lock<boost::shared_mutex> lock(filtersMutex);
205  filters.insert(filter);
206 }
207 
209  boost::unique_lock<boost::shared_mutex> lock(filtersMutex);
210  filters.erase(filter);
211 }
212 
213 }
214 }
Uses stderr for printing a message.
Implementations of this interface organize the receiving of events via rsb::transport::InConnector s...
STL namespace.
rsc::threading::OrderedQueueDispatcherPool< EventPtr, rsb::Handler > pool
EventPtr event
Definition: BusImpl.cpp:129
boost::shared_ptr< Filter > FilterPtr
virtual void addHandler(rsb::HandlerPtr handler, const bool &wait)
Adds a new handler that will be notified about new events.
void setHandlerErrorStrategy(const ParticipantConfig::ErrorStrategy &strategy)
Defines the strategy to use for handling dispatching errors to the client handler.
This push-style event receiving strategy uses one or more threads to filter rsb::Event s and dispatch...
boost::shared_ptr< Handler > HandlerPtr
Definition: Handler.h:92
ErrorStrategy
Possible error handling strategies in user-provided code like event handlers.
Logs a message using the logging mechanism.
virtual void removeHandler(rsb::HandlerPtr handler, const bool &wait)
Removes a handler that will will then not be notified anymore.
A class describing the configuration of Participant instances.
boost::shared_ptr< Event > EventPtr
Definition: Event.h:264
void handle(EventPtr e)
Dispatches the event to the listener.