Basic Communication

The essential form of communication in RSB consists in participant s sending and receiving event s. The following sections explain:

Sending Data

To send data in RSB, in the simplest case, an informer has to be created for the desired destination scope and the data then has to be passed to it.

A rsb.Informer object is created by calling rsb.createInformer() with

  • the desired scope (which can be specified as str object, for example, a string literal)
  • a data type (which can be object to allow any kind of data)

Once the informer has been created, data is published by calling rsb.Informer.publishData().

After use, the rsb.Informer object has to be deactivated using its :py:meth:`rsb.Informer.deactivate method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import logging

import rsb

if __name__ == '__main__':
    # Pacify logger.
    logging.basicConfig()

    # Create an informer for strings on scope "/example/informer".
    informer = rsb.createInformer("/example/informer", dataType=str)

    # Send and event using a method that directly accepts data.
    informer.publishData("example payload")

    # Deactivate the informer after use.
    informer.deactivate()

Download this example

A rsb::Informer object is created by calling obtaining the RSB factory via rsb::Factory::getInstance and then calling its rsb::Factory::createInformer method with

  • the desired scope (which can be specified as std::string object, for example, a string literal)
  • a data type (which can be rsb::AnyType to allow any kind of data)

Once the informer has been created, data is published by calling rsb::Informer::publish.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <stdlib.h>

#include <rsb/Informer.h>
#include <rsb/Factory.h>

using namespace std;
using namespace rsb;

int main(void) {

    // First get a factory instance that is used to create RSB domain
    // objects.
    Factory& factory = Factory::getInstance();

    // Create an informer that is capable of sending events containing
    // string data on the scope "/example/informer".
    Informer<string>::Ptr informer
        = factory.createInformer<string> ("/example/informer");

    // Create data to send over the informer. Data is always
    // maintained in shared_ptr instances. Informer provides a typedef
    // DataPtr of the appropriate type according to its template
    // parameter
    Informer<string>::DataPtr s(new string("example payload"));

    // Send the data.
    informer->publish(s);

    return EXIT_SUCCESS;
}

Download this example

A rsb.Informer object is created by obtaining the RSB factory via rsb.Factory.getInstance and then calling its rsb.Factory.createInformer method with the desired scope (which can be specified as a string literal). The generic parameter of the rsb.Informer class determines the data type of the informer.

The rsb.Informer has to activated before and deactivated after use via the rsb.Informer.activate and rsb.Informer.deactivate methods.

Once the informer has been created and activated, data is published by calling rsb.Informer.send.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import rsb.Factory;
import rsb.Informer;

public class InformerExample {

    public static void main(String[] args) throws Throwable {

        // Get a factory instance to create RSB objects.
        Factory factory = Factory.getInstance();

        // Create an informer on scope "/exmaple/informer".
        Informer<Object> informer = factory.createInformer("/example/informer");

        // Activate the informer to be ready for work
        informer.activate();

        // Send and event using a method that accepts the data and
        // automatically creates an appropriate event internally.
        informer.send("example payload");

        // As there is no explicit removal model in java, always manually
        // deactivate the informer if it is not needed anymore
        informer.deactivate();

    }

}

Download this example

The macro rsb:with-informer can be used to create an informer for a particular scope and data type (which can be cl:t). The method rsb:send can then be used to send data. rsb:with-informer takes care of destroying the informer after use.

1
2
(rsb:with-informer (informer "/example/informer" 'string)
  (rsb:send informer "example payload"))

Alternatively, rsb:make-informer can be used to obtain an informer without automatic destruction:

1
2
3
(defvar *informer* (rsb:make-informer "/example/informer" 'string))

(rsb:send *informer* "example payload")

Download this example

Receiving Data

Receiving data can be performed in two different ways in RSB:

Synchronous

Wait until event s are received.

Asynchronous

Continue execution and execute a callback function (called handler in RSB) when event s are received.

The following two sections explain the two ways of receiving data.

Receiving Data Synchronously

To receive data synchronously, a reader object has to be created for the scope from which event s should be received. Then, individual event s have to be retrieved explicitly from the reader object, hence synchronous receiving.

Note

Synchronous receiving of data is not currently implemented in Python.

A reader is created by obtaining the RSB factory via rsb::Factory::getInstance (line 16) and then calling its rsb::Factory::createReader method with the desired scope (which can be specified as std::string object, for example, a string literal, line 17).

Once the reader has been created, individual events are received by calling the rsb::Reader::read method (line 21).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <stdlib.h>

#include <iostream>

#include <rsb/Factory.h>

using namespace rsb;

int main(int argc, char** argv) {
    // Set up the scope to receive on either from the command line
    // argument or use the default scope of the informer example.
    Scope scope(argc > 1 ? argv[1] : "/example/informer");

    // Create a reader which synchronously receives events on the
    // specified scope.
    Factory& factory = Factory::getInstance();
    ReaderPtr reader = factory.createReader(scope);

    // Print events as they are received.
    while (true) {
        EventPtr event = reader->read();
        std::cout << event << std::endl;
    }

    return EXIT_SUCCESS;
}

Download this example

Note

Synchronous receiving of data is not currently implemented in Java.

The macro rsb:with-reader can be used to create a reader for a particular scope. The method rsb:receive can then be used to receive individual events data. rsb:with-reader takes care of destroying the reader after use.

1
2
3
4
(rsb:with-reader (reader "/example/reader")
  (let ((event (rsb:receive reader :block? t)))
    (format t "Received event: ~A~%" event)
    event)) ;; return the event

Alternatively, rsb:make-reader can be used to obtain a reader without automatic destruction:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
(defvar *reader* (rsb:make-reader "/example/reader"))
;; mark-end::variable

;; The reader will participate in the channel until it is garbage
;; collected or explicitly detached from he channel.

;; mark-start::receive/block
(let ((event (rsb:receive *reader* :block? t))) ;; block? defaults to t
  (format t "Received event: ~A~%" event)
  event) ;; return the event

Download this example

Receiving Data Asynchronously

To receive data asynchronously, a listener object has to be created for the scope from which event s should be received. Then, individual event s are received automatically and in parallel to the execution of the program. For each received event, a user-supplied callback function (a handler in RSB terminology) is executed to process the event.

A rsb.Listener object is created by calling rsb.createListener() with the desired scope (which can be specified as str object, for example, a string literal, line 16)

Once the listener has been created, handlers can be added by calling rsb.Listener.addHandler() (line 20). Any callable() can be used as a handler.

After use, the rsb.Listener object has to be deactivated using its rsb.Listener.deactivate() method (line 27).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import time
import logging

import rsb

def handle(event):
    print("Received event: %s" % event)

if __name__ == '__main__':
    # Pacify logger.
    logging.basicConfig()

    # Create a listener on the specified scope. The listener will
    # dispatch all received events asynchronously to all registered
    # handlers.
    listener = rsb.createListener("/example/informer")

    # Add a handler to handle received events. Handlers are callable
    # objects with the received event as the single argument.
    listener.addHandler(handle)

    # Wait for events; clean up when interrupted.
    try:
        while True:
            time.sleep(1)
    finally:
        listener.deactivate()

Download this example

A listener is created by obtaining the RSB factory via rsb::Factory::getInstance (line 19) and then calling its rsb::Factory::createListener method with the desired scope (which can be specified as std::string object, for example, a string literal, line 27).

Once the listener has been created, individual handlers can be added by calling the rsb::Listener::addHandler method (line 36). In general, handlers are objects which implement the rsb::Handler interface. However, there are specialized handlers such as rsb::DataFunctionHandler which allow using different things such as ordinary functions as handlers.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <stdlib.h>

#include <iostream>

#include <rsb/Handler.h>
#include <rsb/Listener.h>
#include <rsb/Factory.h>

using namespace rsb;

void printData(boost::shared_ptr<std::string> e) {
    std::cout << "Received event: " << *e << std::endl;
}

int main(int argc, char** argv) {

    // First get a factory instance that is used to create RSB
    // objects.
    Factory& factory = Factory::getInstance();

    // Set up the scope to receive on either from the command line
    // argument or use the default scope of the informer example.
    Scope scope((argc > 1) ? argv[1] : "/example/informer");

    // Create a listener that asynchronously receives events from the
    // bus and dispatches them to registered handlers.
    ListenerPtr listener = factory.createListener(scope);

    // Add a handler that is notified about every new event.  This
    // time a special handler instance is used that wraps a function
    // pointer of a function that is only interested in the received
    // data contained in the event and not the additional meta data
    // provided by the event instance. Other handlers exist that also
    // receive Event instances, either as class instances or by
    // wrapping function pointers.
    listener->addHandler(HandlerPtr(new DataFunctionHandler<std::string> (&printData)));

    // As events are received asynchronously we have to wait here for
    // them.
    while (true) {
        boost::this_thread::sleep(boost::posix_time::seconds(1000));
    }

    return EXIT_SUCCESS;
}

Download this example

A rsb.Listener object is created by obtaining the RSB factory via rsb.Factory.getInstance (line 18) and then calling its rsb.Factory.createListener method with the desired scope (which can be specified as a string literal, line 23).

The rsb.Listener has to activated before and deactivated after use via the rsb.Listener.activate (line 24) and rsb.Listener.deactivate (line 37) methods.

Once the listener has been created and activated, handlers can be added by calling the rsb.Listener.addHandler method (line 29). Objects implementing the rsb.Handler interface can be used as handlers.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import java.lang.System;
import java.lang.Thread;

import rsb.AbstractEventHandler;
import rsb.Factory;
import rsb.Event;
import rsb.Listener;

public class EventListenerExample extends AbstractEventHandler {

    @Override
    public void handleEvent(Event event) {
        System.out.println("Received event " + event.toString());
    }

    public static void main(String[] args) throws Throwable {
        // Get a factory instance to create new RSB objects.
        Factory factory = Factory.getInstance();

        // Create a Listener instance on the specified scope that will
        // receive events and dispatch them asynchronously to all
        // registered handlers; activate the listener.
        Listener listener = factory.createListener("/example/informer");
        listener.activate();

        try {
            // Add an EventHandler that will print events when they
            // are received.
            listener.addHandler(new EventListenerExample(), true);

            // Wait for events.
            while (true) {
                Thread.sleep(1);
            }
        } finally {
            // Deactivate the listener after use.
            listener.deactivate();
        }
    }

}

Download this example

The macro rsb:with-listener can be used to create a listener for a particular scope. Inside the lexical scope of rsb:with-listener (or for listeners created differently), the macro rsb:with-handler can be used to add a handler to the listener. While the body of rsb:with-handler executes, events are handled by the supplied code.

1
2
3
4
5
(rsb:with-listener (listener "/example/listener2")
  (rsb::with-handler listener
      ((event)
       (format t "Received event: ~A~%" event))
    (sleep 20)))

Alternatively, rsb:make-listener can be used to obtain a listener without automatic destruction:

1
2
3
4
5
(defvar *listener* (rsb:make-listener "/example/listener"))

(push (lambda (event)
	(format t "Received event: ~A~%" event))
      (rsb.ep:handlers *listener*))

Download this example

Remote Procedure Calls

See also

specification-request-reply
For a detailed description of the underlying implementation.

Remote procedure calls (RPCs) execute methods of objects located in different processes, and potentially different computers, than the calling entity. Some things are easier to implement using RPCs than using event s. However, using RPCs generally makes a system less flexible and often more error-prone. RSB includes means for providing and using a simple form of remote procedure calls.

The following two sections describe

Client

The RPC client calls methods provided by one or more RPC servers. In RSB, such an RPC client is implemented as a remote server object which is similar to other participant s . Such an object has to be created in order to perform method calls.

After the remote server object has been created, a method can be called by supplying its name as string and, optionally, the parameter (there are only one or zero parameters). Methods can be called in blocking and non-blocking way:

  • When called in a blocking way, the method call returns only after the server has processed the request and returned a result.
  • When called in a non-blocking way, the method call returns immediately and the result can be obtained later, when the server completes its processing.

Important

When a non-existent method is called (for example, because the name of the method has been misspelled), nothing happens: blocking calls block forever and non-blocking calls never provide a result.

Conversely, if a method is provided by multiple servers, all servers process the request but only one reply is returned to the caller. It is unspecified, which reply is received by the caller, in such a situation.

A rsb.patterns.RemoteServer object is created by calling rsb.Factory.createRemoteServer() with the scope on which the service is provided (line 12). Remote methods can then be called on the rsb.patterns.RemoteServer object as if they were ordinary Python methods using the function call syntax OBJECT.METHOD(ARGUMENTS) (see line 17). Asynchronous calls can be made by using the syntax OBJECT.METHOD.async(ARGUMENTS) (see line 20).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import logging

import rsb

if __name__ == "__main__":
    # Pacify logger.
    logging.basicConfig()

    # Create a RemoteServer object for the remote server at scope
    # /example/server. Method calls should complete within five
    # seconds.
    server = rsb.createRemoteServer('/example/server')

    # Call the method 'methodOne' on the remote server passing it a
    # string argument. The server's reply is returned from the call as
    # for a regular function call.
    print 'server replied to synchronous call: "%s"' % server.echo('bla')

    # Call the method 'methodOne' again, this time asynchronously.
    future = server.echo.async('bla')
    # do other things
    print 'server replied to asynchronous call: "%s"' % future.get(timeout = 10)

Download this example

A rsb::patterns::RemoteServer object is created by calling rsb::Factory::createRemoteServer with the scope on which the service is provided (lines 12 and 13). Remote methods can then be called using the rsb::patterns::RemoteServer::call method (see line 21) and the rsb::patterns::RemoteServer::callAsync method. The expected return type is specified as a template argument to the function call while the argument type is derived from the supplied argument.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <stdlib.h>

#include <rsb/Factory.h>

using namespace rsb;
using namespace rsb::patterns;

int main(int /*argc*/, char** /*argv*/) {
    // Use the RSB factory to create a RemoteServer instance for the
    // server at scope /example/server.
    Factory& factory = Factory::getInstance();
    RemoteServerPtr remoteServer
        = factory.createRemoteServer("/example/server");

    // Call the method "echo", passing it a string value as argument
    // and accepting a string value as result. Note that the types of
    // arguments and return values are defined by the server providing
    // the respective methods and have to be matched in method calls.
    boost::shared_ptr<std::string> request(new std::string("bla"));
    boost::shared_ptr<std::string> result
        = remoteServer->call<std::string>("echo", request);
    std::cout << "Server replied: " << *result << std::endl;

    return EXIT_SUCCESS;
}

Download this example

A rsb.patterns.RemoteServer object is created by calling rsb.Factory.createRemoteServer with the scope on which the service is provided (line 16). Remote methods can then be called using the rsb.patterns.RemoteServer.call method (see line 21) and the rsb.patterns.RemoteServer.callAsync method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.lang.System;

import java.util.concurrent.ExecutionException;

import rsb.InitializeException;
import rsb.RSBException;
import rsb.Factory;
import rsb.patterns.RemoteServer;

public class ClientExample {

    public static void main(String[] args) throws RSBException, InterruptedException, ExecutionException, InitializeException {
        // Get remote server object to call exposed request methods of
        // participants
        RemoteServer server
            = Factory.getInstance().createRemoteServer("/example/server");
        server.activate();

        // Call remote method and deactivate the server.
        try {

            System.out.println("Server replied: " + server.call("echo", "bla"));
        } finally {
            server.deactivate();
        }
    }

}

Download this example

A remote server can be created and managed with the rsb.patterns:with-remote-server macro. The rsb.patterns:call method can be used on the remote server object to call remote methods. The method name and the argument of the call have to be passed as the second and third argument respectively.

1
2
3
(rsb.patterns:with-remote-server (remote-server "/example/clientserver")
  (format t "Server replied: ~A~%"
	  (rsb.patterns:call remote-server "echo" "bla")))

Alternatively, rsb:make-remote-server can be used to obtain a remote server without automatic destruction:

1
2
3
4
5
(defvar *remote-server* (rsb.patterns:make-remote-server "/example/clientserver"))

(rsb.patterns:call *remote-server* "echo" "bla")

(rsb:detach *remote-server*)

Download this example

Server

Methods which are callable via RPC are provided by local server objects which are similar to other participant s. To provide such methods a local server object has be created.

After the local server object has been created, methods have to be registered, supplying the desired method name as a string and a callback function which implements the desired behavior of the method.

A rsb.patterns.LocalServer object is created by calling rsb.Factory.createLocalServer() with the scope on which the service is provided (line 12). Remote methods can then be called on the rsb.patterns.RemoteServer object as if they were ordinary Python methods using the function call syntax OBJECT.METHOD(ARGUMENTS) (see line 17). Asynchronous calls can be made by using the syntax OBJECT.METHOD.async(ARGUMENTS) (see line 20).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time
import logging

import rsb

if __name__ == '__main__':
    # Pacify logger.
    logging.basicConfig()

    # Create a LocalServer object that exposes its methods under the
    # scope /example/server.
    server = rsb.createServer('/example/server')

    # Create a function which processes requests and returns a
    # result. Note that the name of the function does not determine
    # the name of the exposed method. See addMethod below.
    def echo(x):
        return x

    # Add the function to the server under the name "echo".
    server.addMethod('echo', echo, str, str)

    # Wait for method calls by clients.
    try:
        while True:
            time.sleep(1)
    finally:
        server.deactivate()

Download this example

A rsb::patterns::Server object is created by calling rsb::Factory::createServer with the scope on which the server should provide its service (line 20). Methods and the callback objects implementing their behavior can be registered using the rsb::patterns::LocalServer::registerMethod method (see line 23). Callback classes are derived from rsb::patterns::Server::Callback (with template arguments specifying the request and reply data type s) and override the rsb::patterns::Server::Callback::call method (see lines 8 to 14).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <boost/thread.hpp>

#include <rsb/Factory.h>

using namespace rsb;
using namespace rsb::patterns;

class EchoCallback: public Server::Callback<std::string, std::string> {
public:
    boost::shared_ptr<std::string> call(const std::string& /*methodName*/,
                                        boost::shared_ptr<std::string> input) {
        return input;
    }
};

int main(int /*argc*/, char** /*argv*/) {
    // Use the RSB factory to create a Server instance that provides
    // callable methods under the scope /example/server.
    Factory& factory = Factory::getInstance();
    ServerPtr server = factory.createServer("/example/server");

    // Register method with name and implementing callback object.
    server->registerMethod("echo", Server::CallbackPtr(new EchoCallback()));

    // Wait here so incoming method calls can be processed.
    boost::this_thread::sleep(boost::posix_time::seconds(1000));

    return EXIT_SUCCESS;
}

Download this example

A rsb.patterns.LocalServer object is created by calling rsb.Factory.createLocalServer with the scope on which server should provide its service (line 22). Methods are registered by calling the rsb.patterns.LocalServer.addMethod method (see line 26) with a suitable callback object. The callback class supplies the behavior of server methods by overriding the rsb.patterns.EventCallback.invoke method (see lines 9 to 17).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import rsb.Event;
import rsb.Factory;
import rsb.InitializeException;
import rsb.patterns.EventCallback;
import rsb.patterns.LocalServer;

public class ServerExample {

    public static class EchoCallback implements EventCallback {

        @Override
        public Event invoke(Event request) throws Throwable {
            request.setData(((String) request.getData()));
            return request;
        }

    }

    public static void main(String[] args) throws InitializeException {
        // Get local server object which allows to expose remotely
        // callable methods.
        LocalServer server = Factory.getInstance().createLocalServer("/example/server");
        server.activate();

        // Add method an "echo" method, implemented by EchoCallback.
        server.addMethod("echo", new EchoCallback());

        // Block until server.deactivate or process shutdown
        server.waitForShutdown();
    }

}

Download this example

A local server can be created and managed with the rsb.patterns:with-local-server macro. The rsb.patterns:with-methods macro can be used to register methods and their implementations in the local server.

1
2
3
4
(rsb.patterns:with-local-server (server "/example/clientserver")
  (rsb.patterns:with-methods (server)
      (("echo" (arg string)
	 arg))))

Alternatively, rsb.patterns:make-local-server can be used to obtain a local server without automatic destruction. Similarly, methods can be added without the rsb.patterns:with-methods macro:

1
2
3
4
5
6
7
(defvar *local-server* (rsb.patterns:make-local-server "/example/clientserver"))

(setf (rsb.patterns:server-method *local-server* "echo3")
      #'(lambda (arg)
	  arg))

(rsb:detach *local-server*)

Download this example