29 #include <boost/filesystem/fstream.hpp>
31 #include <rsc/config/ConfigFileSource.h>
32 #include <rsc/config/Environment.h>
33 #include <rsc/logging/OptionBasedConfigurator.h>
47 using namespace rsc::config;
48 using namespace rsc::logging;
49 using namespace rsc::runtime;
51 using namespace rsb::converter;
52 using namespace rsb::transport;
56 template<
unsigned int which,
typename C>
57 std::map<
typename C::value_type::first_type,
58 typename C::value_type::second_type> pairsToMap(
const C& container) {
59 typedef typename C::value_type::first_type first_type;
60 typedef typename C::value_type::second_type second_type;
62 typedef typename C::const_iterator const_iterator;
64 std::map<first_type, second_type> result;
65 for (const_iterator it = container.begin(); it != container.end(); ++it) {
67 result[it->first] = it->second;
69 result[it->second] = it->first;
79 logger(Logger::getLogger(
"rsb.Factory")) {
93 set<string> availableTransports;
96 for (set<InPullFactory::ConnectorInfo>::const_iterator it
97 = infos.begin(); it != infos.end(); ++it) {
98 availableTransports.insert(it->getName());
102 for (set<InPushFactory::ConnectorInfo>::const_iterator it
103 = infos.begin(); it != infos.end(); ++it) {
104 availableTransports.insert(it->getName());
108 for (set<OutFactory::ConnectorInfo>::const_iterator it
109 = infos.begin(); it != infos.end(); ++it) {
110 availableTransports.insert(it->getName());
115 for (set<string>::const_iterator it = availableTransports.begin();
116 it != availableTransports.end(); ++it) {
136 RSCWARN(
logger,
"No transports are enabled. This is probably a configuration error or an internal RSB error.");
153 OptionBasedConfigurator loggingConfigurator;
156 boost::filesystem::ifstream stream(
157 systemConfigDirectory() /
"rsb.conf");
159 ConfigFileSource source(stream);
160 source.provideOptions(loggingConfigurator);
162 }
catch (
const runtime_error& e) {
164 "Could not find a system-wide configuration file ("
170 boost::filesystem::ifstream stream(userConfigDirectory() /
"rsb.conf");
172 ConfigFileSource source(stream);
173 source.provideOptions(loggingConfigurator);
175 }
catch (
const runtime_error& e) {
177 "Could not find a user-specific configuration file ("
183 boost::filesystem::ifstream stream(
"rsb.conf");
185 ConfigFileSource source(stream);
186 source.provideOptions(loggingConfigurator);
191 EnvironmentVariableSource source(
"RSC_");
192 source.provideOptions(loggingConfigurator);
198 return transport::OutFactory::getInstance();
202 const string& dataType,
233 boost::recursive_mutex::scoped_lock lock(
configMutex);
238 boost::recursive_mutex::scoped_lock lock(
configMutex);
246 vector<InPullConnectorPtr>
249 vector<InPullConnectorPtr> connectors;
250 set<ParticipantConfig::Transport> configuredTransports = config.
getTransports();
251 for (set<ParticipantConfig::Transport>::const_iterator transportIt =
252 configuredTransports.begin(); transportIt
253 != configuredTransports.end(); ++transportIt) {
254 RSCDEBUG(
logger,
"Trying to add connector " << *transportIt);
255 Properties options = transportIt->getOptions();
256 RSCDEBUG(
logger,
"Supplied connector options " << transportIt->getOptions());
259 if (!options.has(
"converters")) {
260 RSCDEBUG(
logger,
"Converter configuration for transport `"
261 << transportIt->getName() <<
"': " << transportIt->getConverters());
264 = converterRepository<string>()
265 ->getConvertersForDeserialization(pairsToMap<1> (transportIt->getConverters()));
266 RSCDEBUG(
logger,
"Selected converters for transport `"
267 << transportIt->getName() <<
"': " << converters);
268 options[
"converters"] = converters;
275 vector<InPushConnectorPtr>
278 vector<InPushConnectorPtr> connectors;
279 set<ParticipantConfig::Transport> configuredTransports = config.
getTransports();
280 for (set<ParticipantConfig::Transport>::const_iterator transportIt =
281 configuredTransports.begin(); transportIt
282 != configuredTransports.end(); ++transportIt) {
283 RSCDEBUG(
logger,
"Trying to add connector " << *transportIt);
284 Properties options = transportIt->getOptions();
285 RSCDEBUG(
logger,
"Supplied connector options " << transportIt->getOptions());
288 if (!options.has(
"converters")) {
289 RSCDEBUG(
logger,
"Converter configuration for transport `"
290 << transportIt->getName() <<
"': " << transportIt->getConverters());
293 = converterRepository<string>()
294 ->getConvertersForDeserialization(pairsToMap<1> (transportIt->getConverters()));
295 RSCDEBUG(
logger,
"Selected converters for transport `"
296 << transportIt->getName() <<
"': " << converters);
297 options[
"converters"] = converters;
304 vector<OutConnectorPtr>
307 vector<OutConnectorPtr> connectors;
308 set<ParticipantConfig::Transport> configuredTransports = config.
getTransports();
309 for (set<ParticipantConfig::Transport>::const_iterator transportIt =
310 configuredTransports.begin(); transportIt
311 != configuredTransports.end(); ++transportIt) {
312 RSCDEBUG(
logger,
"Trying to add connector " << *transportIt);
313 Properties options = transportIt->getOptions();
314 RSCDEBUG(
logger,
"Supplied connector options " << transportIt->getOptions());
317 if (!options.has(
"converters")) {
318 RSCDEBUG(
logger,
"Converter configuration for transport `"
319 << transportIt->getName() <<
"': " << transportIt->getConverters());
322 = converterRepository<string>()
323 ->getConvertersForSerialization(pairsToMap<2> (transportIt->getConverters()));
324 RSCDEBUG(
logger,
"Selected converters for transport `"
325 << transportIt->getName() <<
"': " << converters);
326 options[
"converters"] = converters;