RSB  0.19.0
Factory.cpp
Go to the documentation of this file.
1 /* ============================================================
2  *
3  * This file is a part of the RSB project.
4  *
5  * Copyright (C) 2011 by Johannes Wienke <jwienke at techfak dot uni-bielefeld dot de>
6  * Copyright (C) 2012, 2013, 2014, 2016 Jan Moringen <jmoringe@techfak.uni-bielefeld.de>
7  *
8  * This file may be licensed under the terms of the
9  * GNU Lesser General Public License Version 3 (the ``LGPL''),
10  * or (at your option) any later version.
11  *
12  * Software distributed under the License is distributed
13  * on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
14  * express or implied. See the LGPL for the specific language
15  * governing rights and limitations.
16  *
17  * You should have received a copy of the LGPL along with this
18  * program. If not, go to http://www.gnu.org/licenses/lgpl.html
19  * or write to the Free Software Foundation, Inc.,
20  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21  *
22  * The development of this software was supported by:
23  * CoR-Lab, Research Institute for Cognition and Robotics
24  * Bielefeld University
25  *
26  * ============================================================ */
27 
28 #include "Factory.h"
29 
30 #include <boost/filesystem/fstream.hpp>
31 
32 #include <rsc/config/Configuration.h>
33 #include <rsc/config/Environment.h>
34 
35 #include <rsc/logging/OptionBasedConfigurator.h>
36 
37 #include <rsc/plugins/Manager.h>
38 #include <rsc/plugins/Configurator.h>
39 
40 #include <rsb/Version.h>
41 
43 
44 #include "converter/converters.h"
45 #include "converter/Repository.h"
47 
48 #include "transport/transports.h"
49 
50 using namespace std;
51 
52 using namespace rsc::config;
53 using namespace rsc::logging;
54 using namespace rsc::runtime;
55 
56 using namespace rsb::converter;
57 using namespace rsb::transport;
58 
59 namespace { // anonymous namespace for file-local helper functions.
60 
62  SERIALIZATION,
63  DESERIALIZATION
64 };
65 
66 template<typename C>
67 std::map<typename C::value_type::first_type,
68  typename C::value_type::second_type>
69 converterSelectionToMap(const C& container, ConverterDirection direction) {
70  typedef typename C::value_type::first_type first_type;
71  typedef typename C::value_type::second_type second_type;
72 
73  typedef typename C::const_iterator const_iterator;
74 
75  std::map<first_type, second_type> result;
76  for (const_iterator it = container.begin(); it != container.end(); ++it) {
77  switch (direction) {
78  case DESERIALIZATION:
79  result[it->first] = it->second;
80  break;
81  case SERIALIZATION:
82  if (result.find(it->second) != result.end()) {
83  throw std::invalid_argument(
84  boost::str(
85  boost::format(
86  "Multiple wire-schemas (%1%, %2%) selected for data-type %3%.\n"
87  "Probably you wrote the lines transport.<name>.cpp.%1% = %3% "
88  "and transport.<name>.cpp.%2% = %3% in your rsb config. One of "
89  "these should be deleted.")
90  % it->first
91  % result[it->second]
92  % it->second
93  )
94  );
95  }
96  result[it->second] = it->first;
97  break;
98  default:
99  assert(false);
100  throw std::runtime_error("got unexpected serialization direction");
101  }
102  }
103  return result;
104 }
105 
106 Properties
107 prepareConnectorOptions(const rsb::ParticipantConfig::Transport& config,
108  ConverterDirection direction,
109  rsc::logging::LoggerPtr logger) {
110  Properties options = config.getOptions();
111  RSCDEBUG(logger, "Supplied connector options " << options);
112 
113  // For local transport, we do not mess with the converter
114  // configuration since it is not used (or even touched) anyway.
115  //
116  // For remote transports, we build a converter selection strategy
117  // - suitable for direction - and put it into the "converters"
118  // property of the transport configuration.
119  //
120  // Note that config.getConverters() only returns /converter
121  // disambiguations/. These are used to guide the actual converter
122  // selection in converterSelectionToMap().
123  if (rsb::transport::isRemote(config.getName()) && !options.has("converters")) {
124  RSCDEBUG(logger, "Converter configuration for transport `"
125  << config.getName() << "': " << config.getConverters());
126 
127  ConverterSelectionStrategy<string>::Ptr converters; // TODO we should not have to know the transport's wire-type here
128  switch (direction) {
129  case SERIALIZATION:
130  converters
131  = converterRepository<string>() // TODO wire-type
132  ->getConvertersForSerialization
133  (converterSelectionToMap(config.getConverters(), direction));
134  break;
135  case DESERIALIZATION:
136  converters
137  = converterRepository<string>() // TODO wire-type
138  ->getConvertersForDeserialization
139  (converterSelectionToMap(config.getConverters(), direction));
140  break;
141  default:
142  assert(false);
143  throw std::runtime_error("got unexpected serialization direction");
144  }
145  RSCDEBUG(logger, "Selected converters for transport `"
146  << config.getName() << "': " << converters);
147  options["converters"] = converters;
148  }
149 
150  return options;
151 }
152 
153 }
154 
155 namespace rsb {
156 
157 const std::string CONFIG_DEBUG_ENVIRONMENT_VARIABLE = "RSB_CONFIG_DEBUG";
158 
159 const std::string CONFIG_FILES_ENVIRONEMNT_VARIABLE = "RSB_CONFIG_FILES";
160 
162 
164  if (factoryWhileLoadingPlugins) {
166  } else {
167  static Factory factory;
168  return factory;
169  }
170 }
171 
173  ConfigDebugPrinter(const std::string& phase, bool enabled)
174  : phase(phase), enabled(enabled) {
175  if (this->enabled){
176  std::cerr << "Starting processing " << this->phase << " configuration"
177  << std::endl << std::endl;
178  }
179  }
180 
182  if (this->enabled) {
183  std::cerr << std::endl
184  << "Finished processing " << this->phase << " configuration"
185  << std::endl << std::endl;
186  }
187  }
188 
189  std::string phase;
190  bool enabled;
191 };
192 
193 Factory::Factory() :
194  logger(Logger::getLogger("rsb.Factory")),
195  pluginManager(new rsc::plugins::Manager()),
196  signalParticipantCreated(new SignalParticipantCreated),
197  signalParticipantDestroyed(new SignalParticipantDestroyed) {
198 
199  bool debugConfig = getEnvironmentVariable("RSB_CONFIG_DEBUG").get();
200 
201  // Configure RSC-based logging.
202  {
203  ConfigDebugPrinter printer("RSC-based logging", debugConfig);
204 
205  rsc::logging::OptionBasedConfigurator configurator;
206  provideConfigOptions("RSC_", configurator, false);
207  }
208 
209  // Register default implementation for all extension points.
210  RSCINFO(this->logger, "Registering default implementations");
214 
215  // Configure plugin path and load plugins to register additional
216  // implementations for extension points.
217  //
218  // We use the following default plugin path:
219  // 1. $HOME/.$RSB_PLUGIN_PATH_SUFFIX
220  // 2. $libdir/$RSB_PLUGIN_PATH_SUFFIX
221  RSCINFO(this->logger, "Processing plugin configuration");
222  try {
223  ConfigDebugPrinter printer("plugin", debugConfig);
224 
225  factoryWhileLoadingPlugins = this;
226 
227  vector<boost::filesystem::path> defaultPath;
228  // It may be impossible to determine a home directory for the
229  // current user. Warn, but don't throw.
230  try {
231  defaultPath.push_back(userHomeDirectory()
232  / ("." + Version::buildPluginPathSuffix()));
233  } catch (const runtime_error& e) {
234  RSCWARN(this->logger,
235  "Failed to determine user-specific plugin directory: "
236  << e.what());
237  }
238  defaultPath.push_back(Version::libdir() / Version::buildPluginPathSuffix());
239  rsc::plugins::Configurator configurator(pluginManager, defaultPath);
240  provideConfigOptions("RSB_", configurator);
241  configurator.execute(true);
242  } catch (...) {
243  factoryWhileLoadingPlugins = NULL;
244  throw;
245  }
246  factoryWhileLoadingPlugins = NULL;
247 
248  // Setup default participant config
249  //
250  // Collect all available connector implementations:
251  // + In-push
252  // + In-pull
253  // + Out
254  // Disable discovered connectors with the exception of the
255  // socket transport.
256  set<string> availableTransports = getAvailableTransports(DIRECTION_IN_PUSH
258  | DIRECTION_OUT);
259 
261  for (set<string>::const_iterator it = availableTransports.begin();
262  it != availableTransports.end(); ++it) {
263  this->defaultConfig.addTransport(ParticipantConfig::Transport(*it, *it == "socket"));
264  }
265 
266  // If there is only one transport, we can blindly enable it since
267  // the user could end up without any enabled transport otherwise.
268  if (this->defaultConfig.getTransports().size() == 1) {
269  string name = this->defaultConfig.getTransports().begin()->getName();
270  this->defaultConfig.mutableTransport(name).setEnabled(true);
271  }
272 
273  // Merge with user configuration (configuration files, environment
274  // variables)
275  {
276  ConfigDebugPrinter printer("default participant", debugConfig);
277 
278  provideConfigOptions("RSB_", this->defaultConfig);
279  }
280  if (debugConfig) {
281  std::cerr << "Default participant configuration" << std::endl
282  << defaultConfig << std::endl << std::endl;
283  }
284 
285  // Issue a warning if the combination of available transport
286  // implementations and user configuration leads to no enabled
287  // transports.
288  if (this->defaultConfig.getTransports().empty()) {
289  RSCWARN(logger, "No transports are enabled. This is probably a"
290  " configuration error or an internal RSB error.");
291  }
292 
293  RSCDEBUG(logger, "Default config " << defaultConfig);
294 }
295 
297 }
298 
299 void Factory::provideConfigOptions(const std::string& environmentVariablePrefix,
300  OptionHandler& handler,
301  bool stripPrefix) {
302  configure(handler, "rsb.conf", environmentVariablePrefix, 0, 0, stripPrefix,
303  Version::installPrefix(), CONFIG_DEBUG_ENVIRONMENT_VARIABLE,
304  defaultConfigurationFiles(CONFIG_FILES_ENVIRONEMNT_VARIABLE));
305 }
306 
308  return this->signalParticipantCreated;
309 }
310 
312  return this->signalParticipantDestroyed;
313 }
314 
316  const string& dataType,
317  const ParticipantConfig& config,
318  ParticipantPtr parent) {
319  InformerBasePtr informer(
320  new InformerBase(createOutConnectors(config), scope, config, dataType));
321  informer->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
322  (*this->signalParticipantCreated)(informer, parent);
323  return informer;
324 }
325 
326 
328  const ParticipantConfig& config,
329  ParticipantPtr parent) {
330  ListenerPtr listener(
331  new Listener(createInPushConnectors(config), scope, config));
332  listener->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
333  (*this->signalParticipantCreated)(listener, parent);
334  return listener;
335 }
336 
338  const ParticipantConfig& config,
339  ParticipantPtr parent) {
340  ReaderPtr reader(
341  new Reader(createInPullConnectors(config), scope, config));
342  reader->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
343  (*this->signalParticipantCreated)(reader, parent);
344  return reader;
345 }
346 
348 (const Scope& scope,
350  const ParticipantConfig& listenerConfig,
351  const ParticipantConfig& informerConfig,
352  ParticipantPtr parent) {
355  (scope, scope.getComponents()[scope.getComponents().size() -1],
356  listenerConfig, informerConfig, callback));
357  method->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
358  (*this->signalParticipantCreated)(method, parent);
359  return method;
360 }
361 
363  const ParticipantConfig &listenerConfig,
364  const ParticipantConfig &informerConfig,
365  ParticipantPtr parent) {
367  new patterns::LocalServer(scope, listenerConfig, informerConfig));
368  server->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
369  (*this->signalParticipantCreated)(server, parent);
370  return server;
371 }
372 
374 (const Scope& scope,
375  const ParticipantConfig& listenerConfig,
376  const ParticipantConfig& informerConfig,
377  ParticipantPtr parent) {
380  (scope, scope.getComponents()[scope.getComponents().size() -1],
381  listenerConfig, informerConfig));
382  method->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
383  (*this->signalParticipantCreated)(method, parent);
384  return method;
385 }
386 
388  const ParticipantConfig &listenerConfig,
389  const ParticipantConfig &informerConfig,
390  ParticipantPtr parent) {
392  new patterns::RemoteServer(scope, listenerConfig, informerConfig));
393  server->setSignalParticipantDestroyed(this->signalParticipantDestroyed);
394  (*this->signalParticipantCreated)(server, parent);
395  return server;
396 }
397 
399  boost::recursive_mutex::scoped_lock lock(configMutex);
400  return defaultConfig;
401 }
402 
404  boost::recursive_mutex::scoped_lock lock(configMutex);
405  this->defaultConfig = config;
406 }
407 
408 vector<InPullConnectorPtr>
410  // Note: getTransports() only returns *enabled* transports.
411  vector<InPullConnectorPtr> connectors;
412  set<ParticipantConfig::Transport> configuredTransports = config.getTransports();
413  for (set<ParticipantConfig::Transport>::const_iterator transportIt =
414  configuredTransports.begin(); transportIt
415  != configuredTransports.end(); ++transportIt) {
416  RSCDEBUG(logger, "Trying to add connector " << *transportIt);
417  try {
418  connectors.push_back(InPullConnectorPtr(getInPullFactory()
419  .createInst(transportIt->getName(),
420  prepareConnectorOptions(*transportIt,
421  DESERIALIZATION,
422  this->logger))));
423  } catch (const exception& e) {
424  throw runtime_error(boost::str(boost::format("Error configuring connector `%1%', in-pull: %2%")
425  % transportIt->getName() % e.what()));
426  }
427  }
428  return connectors;
429 }
430 
431 vector<InPushConnectorPtr>
433  // Note: getTransports() only returns *enabled* transports.
434  vector<InPushConnectorPtr> connectors;
435  set<ParticipantConfig::Transport> configuredTransports = config.getTransports();
436  for (set<ParticipantConfig::Transport>::const_iterator transportIt =
437  configuredTransports.begin(); transportIt
438  != configuredTransports.end(); ++transportIt) {
439  RSCDEBUG(logger, "Trying to add connector " << *transportIt);
440  try {
441  connectors.push_back(InPushConnectorPtr(getInPushFactory()
442  .createInst(transportIt->getName(),
443  prepareConnectorOptions(*transportIt,
444  DESERIALIZATION,
445  this->logger))));
446  } catch (const exception& e) {
447  throw runtime_error(boost::str(boost::format("Error configuring connector `%1%', in-push: %2%")
448  % transportIt->getName() % e.what()));
449  }
450  }
451  return connectors;
452 }
453 
454 vector<OutConnectorPtr>
456  // Note: getTransports() only returns *enabled* transports.
457  vector<OutConnectorPtr> connectors;
458  set<ParticipantConfig::Transport> configuredTransports = config.getTransports();
459  for (set<ParticipantConfig::Transport>::const_iterator transportIt =
460  configuredTransports.begin(); transportIt
461  != configuredTransports.end(); ++transportIt) {
462  RSCDEBUG(logger, "Trying to add connector " << *transportIt);
463  try {
464  connectors.push_back(OutConnectorPtr(getOutFactory()
465  .createInst(transportIt->getName(),
466  prepareConnectorOptions(*transportIt,
467  SERIALIZATION,
468  this->logger))));
469  } catch (const exception& e) {
470  throw runtime_error(boost::str(boost::format("Error configuring connector `%1%', out: %2%")
471  % transportIt->getName() % e.what()));
472  }
473  }
474  return connectors;
475 }
476 
477 rsc::plugins::ManagerPtr Factory::getPluginManager() const {
478  return this->pluginManager;
479 }
480 
481 }
rsc::runtime::Properties getOptions() const
Returns the specified options for the transport.
void registerDefaultEventProcessingStrategies()
Definition: strategies.cpp:44
A derived Method class which can be used to invoke methods on a remote LocalServer object...
Definition: RemoteServer.h:90
bool isRemote(const string &transportName)
Returns true if transportName names a remote transport.
Definition: transports.cpp:229
std::vector< transport::OutConnectorPtr > createOutConnectors(const ParticipantConfig &config)
Definition: Factory.cpp:455
boost::shared_ptr< RemoteMethod > RemoteMethodPtr
Definition: RemoteServer.h:115
boost::shared_ptr< LocalServer > LocalServerPtr
Definition: LocalServer.h:489
boost::shared_ptr< LocalMethod > LocalMethodPtr
Definition: LocalServer.h:387
const std::string CONFIG_DEBUG_ENVIRONMENT_VARIABLE
Definition: Factory.cpp:157
std::string getName() const
Returns the name of this transport description.
STL namespace.
boost::signals2::signal< void(Participant *)> SignalParticipantDestroyed
Definition: Participant.h:45
ParticipantConfig getDefaultParticipantConfig() const
Returns the default configuration for new participants.
Definition: Factory.cpp:398
ConfigDebugPrinter(const std::string &phase, bool enabled)
Definition: Factory.cpp:173
SignalParticipantDestroyedPtr getSignalParticipantDestroyed()
Definition: Factory.cpp:311
void addTransport(const Transport &transport)
Adds a transport to the list of desired transport mechanisms.
ConverterNames getConverters() const
boost::shared_ptr< Reader > ReaderPtr
Definition: Reader.h:106
A informer to publish data.
Definition: Informer.h:95
ListenerPtr createListener(const Scope &scope, const ParticipantConfig &config=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a new listener for the specified scope.
Definition: Factory.cpp:327
Factory & getFactory()
Returns a factory for client-level RSB objects.
Definition: Factory.cpp:163
ConverterDirection
Definition: Factory.cpp:61
virtual ~Factory()
Definition: Factory.cpp:296
std::vector< transport::InPushConnectorPtr > createInPushConnectors(const ParticipantConfig &config)
Definition: Factory.cpp:432
patterns::LocalServerPtr createLocalServer(const Scope &scope, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a Server object that exposes methods under the scope scope.
Definition: Factory.cpp:362
set< string > getAvailableTransports(unsigned int requiredDirections)
Returns the names of all available transports which support requiredDirections.
Definition: transports.cpp:142
static void provideConfigOptions(const std::string &environmentVariablePrefix, rsc::config::OptionHandler &handler, bool stripPrefix=true)
Provides the default configuration options for RSB to the specified handler.
Definition: Factory.cpp:299
SignalParticipantDestroyedPtr signalParticipantDestroyed
Definition: Factory.h:339
boost::shared_ptr< Listener > ListenerPtr
Definition: Listener.h:155
boost::shared_ptr< OutConnector > OutConnectorPtr
const std::vector< std::string > & getComponents() const
Returns all components of the scope as an ordered list.
Definition: Scope.cpp:139
boost::shared_ptr< Participant > ParticipantPtr
Definition: Participant.h:122
std::vector< transport::InPullConnectorPtr > createInPullConnectors(const ParticipantConfig &config)
Definition: Factory.cpp:409
A derived Method class which can be called from the remote side and implements its behavior by invoki...
Definition: LocalServer.h:366
boost::shared_ptr< ConverterSelectionStrategy< WireType > > Ptr
boost::shared_ptr< IntlCallback > CallbackPtr
Definition: LocalServer.h:71
Description of a desired transport.
The server side of a request-reply-based communication channel.
Definition: LocalServer.h:54
void registerDefaultTransports()
Definition: transports.cpp:53
boost::shared_ptr< SignalParticipantCreated > SignalParticipantCreatedPtr
Definition: Factory.h:60
Factory for RSB user-level domain objects for communication patterns.
Definition: Factory.h:77
A Listener receives events published by rsb::Informer objects by participating in a channel with a su...
Definition: Listener.h:81
A Reader receives events published by a informers by participating in a channel with a suitable scope...
Definition: Reader.h:62
Transport & mutableTransport(const std::string &name)
Returns a single configured transport which can be modified in place.
InformerBasePtr createInformerBase(const Scope &scope, const std::string &dataType="", const ParticipantConfig &config=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates and returns a new Informer that publishes Event s under the Scope scope.
Definition: Factory.cpp:315
boost::shared_ptr< InPushConnector > InPushConnectorPtr
Definition: Listener.h:59
patterns::RemoteServer::RemoteMethodPtr createRemoteMethod(const Scope &scope, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a patterns::RemoteServer::RemoteMethod.
Definition: Factory.cpp:374
const std::string CONFIG_FILES_ENVIRONEMNT_VARIABLE
Definition: Factory.cpp:159
ParticipantConfig defaultConfig
Always acquire configMutex before reading or writing the config.
Definition: Factory.h:335
A class describing the configuration of Participant instances.
InPullFactory & getInPullFactory()
Definition: Factory.cpp:32
SignalParticipantCreatedPtr getSignalParticipantCreated()
Definition: Factory.cpp:307
ReaderPtr createReader(const Scope &scope, const ParticipantConfig &config=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a new Reader object for the specified scope.
Definition: Factory.cpp:337
void setDefaultParticipantConfig(const ParticipantConfig &config)
Sets the default config for participants that will be used for every new participant that is created ...
Definition: Factory.cpp:403
boost::shared_ptr< InformerBase > InformerBasePtr
Definition: Informer.h:217
SignalParticipantCreatedPtr signalParticipantCreated
Definition: Factory.h:338
InPushFactory & getInPushFactory()
Definition: Factory.cpp:37
boost::shared_ptr< RemoteServer > RemoteServerPtr
Definition: RemoteServer.h:311
rsc::logging::LoggerPtr logger
Definition: Factory.h:328
OutFactory & getOutFactory()
Definition: Factory.cpp:42
void registerDefaultConverters()
Definition: converters.cpp:50
rsc::plugins::ManagerPtr pluginManager
Definition: Factory.h:330
boost::recursive_mutex configMutex
Definition: Factory.h:336
patterns::LocalServer::LocalMethodPtr createLocalMethod(const Scope &scope, patterns::LocalServer::CallbackPtr callback, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a patterns::LocalServer::LocalMethod.
Definition: Factory.cpp:348
boost::shared_ptr< SignalParticipantDestroyed > SignalParticipantDestroyedPtr
Definition: Participant.h:48
boost::shared_ptr< InPullConnector > InPullConnectorPtr
Scope is a descriptor for a hierarchical channel of the unified bus.
Definition: Scope.h:46
std::set< Transport > getTransports(bool includeDisabled=false) const
Returns the set of desired transports for a participant.
rsc::plugins::ManagerPtr getPluginManager() const
Returns the plugin manager instance used by the RSB core.
Definition: Factory.cpp:477
patterns::RemoteServerPtr createRemoteServer(const Scope &scope, const ParticipantConfig &listenerConfig=getFactory().getDefaultParticipantConfig(), const ParticipantConfig &informerConfig=getFactory().getDefaultParticipantConfig(), ParticipantPtr parent=ParticipantPtr())
Creates a RemoteServer object for the server at scope scope.
Definition: Factory.cpp:387
Factory * factoryWhileLoadingPlugins
Definition: Factory.cpp:161
boost::signals2::signal< void(ParticipantPtr, ParticipantPtr)> SignalParticipantCreated
Definition: Factory.h:59
The client side of a request-reply-based communication channel.
Definition: RemoteServer.h:60