4.4. Topic Communication in C++

Note

The example uses C++14. If you need to know more about which versions of C++ are supported, see section Language Standards we use and which are supported by LN in the tutorial. Section Prerequisites gives detailes about the required installation.

This section shows a minimal, self-contained example of how processes can exchange information in real-time, via topics which exchange messages using a publish/subscribe pattern. (If you want to look up what these terms mean, they are defined in the communication bullet point of the Introduction chapter).

Note

This chaper mostly matches the respective example in Python, so if you need to get started on topics communication in Python, please look into Topic Communication in Python instead.

To summarize, the main points we will demonstrate are these:

  • Different processes can exchange information via messages, which are compound data structures a fixed data type.

  • Messages are associated with topics, which is basically a label for a specific stream of messages

  • One process can send messages. It is the publisher in respect to that topic.

  • Other processes (one or more) can receive messages, they are subscribers.

We will show now how that works with an example in C++. The first thing we need to do is to define what data elements the messages we want to send will contain. This is done using a piece of text information which we call a message definition.

4.4.1. Creating Message Definitions

Message definitions are usually stored in plain text files which are part of the set-up of a distributed system.

So, to create a message definition with name counter/count_message, we create a plain ASCII text file with this content:

double time
uint32_t num_counts

and store this text file in quickstart/cpp/topics/msg_defs/counter/count_message.

What does the content of the message definition mean? it means that we have two fields:

  • time

  • num_counts

and that the “time” field has the type double (which is a 64-bit floating point value), and the “num_counts” field has the type uint32_t (which is the C type name for an unsigned integer value with 32 bits).

So, we can imagine messages of this type as a packet of data which has the same structure as a C or C++ struct: Each name is mapped to a value, and each value has a fixed type.

However, we can use the same definition in other languages such as Python, so that we can beam them back and forth between different processes and processes on different computers. Also, we can even share messages between systems which have a different endianness, for example.

4.4.2. The Publisher Program

We start with the publisher programm, which we call counter_publisher, by creating a source code file quickstart/cpp/topics/counter_publisher.cpp like this:

 1#include <chrono>
 2#include <iostream>
 3#include <ln/ln.h>
 4#include <thread>
 5
 6// ln_messages.h is generated by the CMake generate_ln_message_headers()
 7// helper or by an equivalent Makefile rule.
 8
 9#include "ln_messages.h"
10
11int main(int argc, char *argv[]) {
12  using namespace std::chrono_literals;
13
14  ln::client clnt("counter publisher", argc, argv);
15  // ln::outport *out_port =
16  //     clnt.publish("counter1.count_message", "counter/count_message");
17
18  // To have the message definition hash checked at runtime use <NAME>_MD_NAME variable:
19  // you can use #ifdef HAVE_LN_MD_NAME to check whether a new-enough ln_generate
20  // was used to generate your ln_messages.h
21  ln::inport *out_port =
22      clnt.publish("counter1.count_message", COUNTER_COUNT_MESSAGE_MD_NAME);
23
24  uint32_t val = 0u;
25  std::cout << "counter publisher running\n";
26  while (true) {
27
28    counter_count_message_t out_data{};
29
30    val = (val + 1) % 1024;
31    out_data.num_counts = val;
32
33    time_t t = time(nullptr);
34    out_data.time = t;
35
36    out_port->write(&out_data);
37
38    std::this_thread::sleep_for(100ms);
39  };
40}

Let’s go just through a few interesting aspects of the script:

  • In line 2, the links_and_nodes C++ header is imported as ln/ln.h (it works also for C).

  • The header ln_messages.h is generated from the message definitions, and defines the type counter_mount_message_t which is the type of the message buffer. That buffer is allocated on the stack in line 25, and zero-initialized using “{}”.

  • in line 17, an ln::client instance is initialized. The parameter that is passed for the initialization is the default name of the client, which can be used by the LN Manager to display its state.

  • In line 18, a publisher port is initialized, using the ln::client::publish() method. The method returns a handle to the port, and the object is owned by the LN client instance. Note that the name of the message definition and the name of the topic that uses it are distinguished: The two arguments are the names of the topic which is used (counter1.count_message), and the name of the message definition (counter/count_message).

    Here, the name of the topic is almost equal to the name of the message definition. However this is optional: While it is often practical and makes things simpler to keep names in sync, they can be different. The reason for this is that message definitions and topic names serve two different purposes: The topic is like the frequency of a radio broadcast, which determines which content we will get where, and under which title. (For further reading, the section on Message definitions and topics in the tutorial gives a more in-depth explanation of the relationships between message definitions, topics, and their names.)

    The message definition is, in contrast, just the type of the transmitted data. This means that one can use the same message definitions in different places. For example, a message definition which defines a vector of three floating point elements could be used to signal the place of a vehicle, its speed vector, its acceleration, and perhaps also the place it should go to.

  • After the initialization, the program starts its main processing loop in line 23. This is interspersed with any kind of activity (like the sleep command), and then generating and sending data. The latter is done in two steps:

  • The assignment to out_data.time puts data into the message buffer. The elements of the message are member values of out_data, and the time field is defined by the message definition. In the same way, the assignment to out_data.num_counts generates some data which is updated within each step.

  • When the message data is prepared, it is sent using the ln::outport::write() method. This function atomically transmits the message and returns after the transmission has been initiated. Both port.write() and ln::outport::read() shown below are safte to use from multiple threads.

4.4.3. The Subscriber Program

The folliwing code, in quickstart/cpp/topics/counter_subscriber.cpp is an example for a matching subscriber program:

 1#include <iostream>
 2#include <ln/ln.h>
 3
 4// ln_messages.h is generated by the CMake generate_ln_message_headers()
 5// helper or by an equivalent Makefile rule.
 6
 7#include "ln_messages.h"
 8
 9int main(int argc, char *argv[]) {
10
11  ln::client clnt("counter subscriber");
12  // ln::inport *in_port =
13  //     clnt.subscribe("counter1.count_message", "counter/count_message");
14
15  // To have the message definition hash checked at runtime use <NAME>_MD_NAME variable:
16  // you can use #ifdef HAVE_LN_MD_NAME to check whether a new-enough ln_generate
17  // was used to generate your ln_messages.h
18  ln::inport *in_port =
19      clnt.subscribe("counter1.count_message", COUNTER_COUNT_MESSAGE_MD_NAME);
20
21  std::cout.precision(1);
22  while (true) {
23    counter_count_message_t in_data{};
24    const double time_out = 0.5; /* seconds */
25
26    if (!in_port->read(&in_data, time_out)) {
27      std::cout << "Warning: no data was received!\n";
28    } else {
29      std::cout << "received time = " << std::fixed << in_data.time
30                << std::scientific << ", num_counts = " << in_data.num_counts
31                << "\n";
32    }
33  }
34}
  • The initialization of the program is very similar to the initialization of the publisher, with one difference: the method clnt.publish(...) is replaced by the method ln::client::subscribe() in line 15. This means that this port will wait for data which was written by the publisher process.

    Note

    This port initialization is done only once. This is recommended, since the initialization of a publisher or subscriber client (or other communication channels) can cost much more time than using the communication link.

  • The main loop uses the ln::outport::read() method in order to receive data. If there is no data available, the client will wait for a maximum of time_out seconds, and return false if no data was received.

  • When the port.read() method has completed, it returns a packet data in the in_data buffer, whose members time and num_counts are populated with the values that have been sent by the publisher. If no data was received by port.read(), the existing data in the buffer will remain unchanged.

4.4.4. Configuring the LN Manager for Message Exchange

Now, we add a minimal configuration for the LN manager that supports exchange of these messages. First, we will do that in a bare minimum way, without using the manager for process management. After that, we will add automatic process management again.

Create a ln-manager config file quickstart/cpp/topics/main.lnc:

1instance
2
3name: counter topics communication example (C++) for %(env USER)@%(hostname)
4manager: :%(get_port_from_string "%(instance_name)")
5
6add_message_definition_dir: %(CURDIR)/msg_defs/

This creates a unique instance name and manager listening port. As in the previous quickstart example on basic process configuration <quickstart/basic_process_configuration>, it sets the instance name and the port number derived from environment variables.

In addition to that example, there is also a directive which tells the LN manager where it can find the message definition which we defined in the first step. This is necessary because the LN manager needs to start some background processing that supports message exchange, and for this it needs to know the message definitions which we are using.

The %(CURDIR) expression defines the folder in which the search path starts, which is the folder in which the configuration file which we are discussing is included. We have put the message definition into the folder “msg_defs`, so we need to add its name. Once the LN manager knows this folder, it and the client programs can find the correct message definition, as the rest of the path is defined in the name of the message definition. (And, of course, it is possible to use multiple folders with message definitions, so that we can compose larger systems from smaller ones).

Finally, we need a short Makefile, like this:

 1# Note: To build this under the conan package manager,
 2# you might need to set the environment using:
 3#
 4# conan2 install --requires="links_and_nodes_python/[~2]@common/stable" \
 5# -pr:a $DLRRM_HOST_PLATFORM -of conan -g AutotoolsDeps
 6# links_and_nodes_python -> for ln_generate
 7# liblinks_and_nodes -> to build the cpp examples (direct dep of ln python)
 8# source conan/conanbuild.sh for build env
 9# source conan/conanrun.sh for run env
10
11all:	counter_publisher counter_subscriber
12
13CPPFLAGS += -std=c++14
14
15LIBS ?= -lln
16
17counter_publisher: counter_publisher.cpp ln_messages.h
18	g++ $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS) counter_publisher.cpp $(LIBS) -o counter_publisher
19
20
21counter_subscriber: counter_subscriber.cpp ln_messages.h
22	g++ $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS) counter_subscriber.cpp $(LIBS) -o counter_subscriber
23
24
25NEEDED_MDS = \
26  counter/count_message
27
28OWN_MDS_DIR = msg_defs/
29
30VPATH = $(OWN_MDS_DIR)
31
32ln_messages.h: $(NEEDED_MDS) Makefile
33	ln_generate -o $@ --md-dir $(OWN_MDS_DIR) $(NEEDED_MDS)
34
35clean:
36	rm -f ln_messages.h *.o counter_subscriber counter_publisher

4.4.4.1. Running Publisher and Subscriber with manual Process Management

First, compile both publisher and subscriber, using “make”:

.. code:: bash

cd documentation/examples/quickstart/cpp/topics/ make clean make

Start the ln_manager in one terminal:

ln_manager -c quickstart/cpp/topics/main.lnc

The manager will open its GUI as shown below. In the terminal, it will output a log-message similar to Manager: listening on '<hostname>:<port>' for ln-clients!, we need the <port>.

_images/quickstart_ln_manager_on_start.png

The LN manager serving a publisher and subscriber

In another terminal, start the publisher:

cd documentation/examples/quickstart/cpp/topics/

LN_MANAGER=localhost:<port> python3 ./counter_publisher

If all goes right, no output should be generated except "counter publisher running".

In a third terminal, now start the subscriber:

LN_MANAGER=localhost:<port> ./counter_subscriber

The process should output lines like this:

time: 1647435215.0, num_counts: 15
...

Within the ln-manager GUI, when clicking on the “topics” tab, you should see our counter topic published with a rate of ~10Hz as shown below:

_images/quickstart_ln_manager_with_subscriber.png

The counter topic, when selecting the “topics” tab.

This shows in the top half a list of topics. In the bottom half, after selecting the “counter” topic, it shows a list of the clients and the averaged frequency of exchanged messages.

4.4.4.2. Delegating the Process Management to the LN Manager

The example above requires that we start the processes with the right port number, so that they can reach the manager. This is not very practical, as we need to copy the port number each time.

We can do this with less manual typing, and in a more comfortable way, when we use the LN manager for process management, as shown in section Process Management.

To do this, we can put those command lines in the ln-manager config file so that we don’t have to remember them:

 1instance
 2name: counter topics communication example (C++) for %(env USER)@%(hostname)
 3manager: :%(get_port_from_string "%(instance_name)")
 4
 5add_message_definition_dir: %(CURDIR)/msg_defs/
 6
 7process publisher
 8change_directory: %(CURDIR)
 9pass_environment: PATH, PYTHONPATH, LD_LIBRARY_PATH
10command: ./counter_publisher
11ready_regex: counter publisher running
12node: localhost
13
14process subscriber
15change_directory: %(CURDIR)
16pass_environment: PATH, PYTHONPATH, LD_LIBRARY_PATH
17command: ./counter_subscriber
18node: localhost
19depends_on: publisher

The example uses a few additional flags for the processes. The details are explained in the reference part in process section. To explain and highlight the flags and attributes used here:

  • as before, each process has a name, which is given at the start of each process section. This name overrides the name by the LN client, which is passed as constructor parameter.

  • the change_directory directive defines the working directory of the process. This makes it easy to start the compiled programs, whose name is a parameter with a relative path to the python command.

  • The LN manager keeps the environment of each process as clean as possible. However, certain environment variables sometimes need to be passed, so that the program can find shared libraries. These are defined with the directive pass_environment, which for example, tells the LN Manager to pass the environment variable LD_LIBRARY_PATH to the defined process. This can be necessary if a required shared library is installed in a non-standard location, like $HOME/.local/lib.

  • As shown in the previous quickstart chapter Process Management in section Configuring processes and their dependencies, we can define dependencies between processes which need being started in a specific order.

  • The instruction “depends_on: publisher” tells the LN manager that the “publisher” process needs to be started first, before the “subscriber” process is started.

  • Here, the directive “ready_regexp” for the publisher tells the LN manager to wait for that specific string in the output of the publisher before it starts the subscriber.

See also

Details on the last two points can be found in page process section.

After starting both processes you should get something like this:

_images/quickstart_ln_manager_topics_processes_running.png

Processes communicating as publisher and subscribe which are managed by the LN Manager

A final note: We have simplified a lot to make the main points clearer. The chapters C++ Tutorial and User Guide gives a lot more information on details. One thing worth noting here is that a process can be both a subscriber and a publisher, and can of course access more than one topic.

See also

See section Some hints on how to use topics and messages for some hints on how to use message definitions effectively.