7.2.3. Protobuf: Person#

In the last section you learned how to send strings to an eCAL Topic. Using strings is great for simple data that has a textual representation. Quite often however your data will be more complex, so you need some kind of protocol that defines how your data is structured.

Our recommended way is to use Google protobuf to do that, because:

  • It solves the problem of how to serialize and de-serialize data for you

  • You get downward compatibility out of the box (if you follow the guidelines)

  • It is maintained by Google and the API is stable

  • The eCAL Monitor can display a reflection view of the data

Important

It is important to remember, that all your applications must agree on the data format. As protobuf messages are defined in .proto files, all of your applications should share the same files.

eCAL supports protobuf serialization natively for C++, C# and Python.

The usage of protobuf for data exchange in eCAL is very simple. You know already from the “String: Hello World” how to send and receive simple string data. The basic setup will be the same, but instead of using the string publisher, we will use the protobuf publisher and subscriber.

7.2.3.1. Person Protobuf#

As the sender and receiver need the same .proto files, we place them in a separate directory next to the source directories for the sender and the receiver.

 Person Protobuf File
├─  person.proto
│
├─  animal.proto
│
└─  house.proto

Let’s start with the protobuf/person.proto file!

 1syntax = "proto3";
 2
 3// import message definitions from different proto files
 4import "animal.proto";
 5import "house.proto";
 6
 7// assign a package name, that will appear as a namespace
 8package pb.People;
 9
10// create the message "Person" with
11//   - base types like int32 and string
12//   - individual defined enum SType
13//   - imported message types
14message Person
15{
16  enum SType
17  {
18    MALE   = 0;
19    FEMALE = 1;
20  }
21
22  int32  id               = 1;
23  string name             = 2;
24  SType  stype            = 3;
25  string email            = 4;
26
27  Animal.Dog        dog   = 5;
28  Environment.House house = 6;
29}

As you can see, the person.proto file imports also other message definitions: animal.proto and house.proto. So we need them as well. The definitions are straight forward. For more information about protobuf, please refer to the detailed official documentation.

1syntax = "proto3";
2
3package pb.Animal;
4
5message Dog
6{
7  string name   = 1;
8  string colour = 2;
9}
1syntax = "proto3";
2
3package pb.Environment;
4
5message House
6{
7  int32 rooms = 1;
8}

7.2.3.2. Person Publisher#

The main differences to the string publisher are:
  • we need to include / import the protobuf specific publishers

  • also we need to include / import the compiled protobuf message definitions for the specific programming language

  • we need to utilize the protobuf message class Person instead of the string class

  1// Including the eCAL convenience header
  2#include <ecal/ecal.h>
  3// In addition we include the msg protobuf publisher
  4#include <ecal/msg/protobuf/publisher.h>
  5
  6#include <iostream>
  7
  8// Here we include the compiled protobuf header for the message "Person"
  9#include "person.pb.h"
 10
 11int main()
 12{
 13  std::cout << "--------------------" << std::endl;
 14  std::cout << " C++: PERSON SENDER"  << std::endl;
 15  std::cout << "--------------------" << std::endl;
 16
 17  /*
 18    Initialize eCAL. You always have to initialize eCAL before using its API.
 19    The name of our eCAL Process will be "person send". 
 20    This name will be visible in the eCAL Monitor, once the process is running.
 21  */
 22  eCAL::Initialize("person send");
 23
 24  /*
 25    Print some eCAL version information.
 26  */
 27  std::cout << "eCAL " << eCAL::GetVersionString() << " (" << eCAL::GetVersionDateString() << ")" << "\n";
 28
 29  /*
 30    Set the state for the program.
 31    You can vary between different states like healthy, warning, critical ...
 32    This can be used to communicate the application state to applications like eCAL Monitor/Sys.
 33  */
 34  eCAL::Process::SetState(eCAL::Process::eSeverity::healthy, eCAL::Process::eSeverityLevel::level1, "I feel good!");
 35
 36  /*
 37    Now we create a new publisher that will publish the topic "person".
 38    The data type is "pb.People.Person", generated from the protobuf definition.    
 39  */
 40  eCAL::protobuf::CPublisher<pb::People::Person> publisher("person");
 41
 42  /*
 43    Construct a message. The message is a protobuf struct that will be sent to the subscribers.
 44  */
 45  pb::People::Person person;
 46  person.set_id(0);
 47  person.set_name("Max");
 48  person.set_stype(pb::People::Person_SType_MALE);
 49  person.set_email("max@mail.net");
 50  person.mutable_dog()->set_name("Brandy");
 51  person.mutable_house()->set_rooms(4);
 52
 53  /*
 54    Creating an infinite publish-loop.
 55    eCAL Supports a stop signal; when an eCAL Process is stopped, eCAL_Ok() will return false.
 56  */
 57  auto loop_count = 0;
 58  while(eCAL::Ok())
 59  {
 60    /*
 61      Change in each loop the content of the message to see a difference per message.
 62    */
 63    person.set_id(loop_count++);
 64
 65    /*
 66      Send the message. The message is sent to all subscribers that are currently connected to the topic "person".
 67    */
 68    if (publisher.Send(person)) 
 69    {
 70      std::cout << "----------------------------------"        << "\n";
 71      std::cout << "Sent protobuf message in C++: "            << "\n";
 72      std::cout << "----------------------------------"        << "\n";
 73      std::cout << "person id    : " << person.id()            << "\n";
 74      std::cout << "person name  : " << person.name()          << "\n";
 75      std::cout << "person stype : " << person.stype()         << "\n";
 76      std::cout << "person email : " << person.email()         << "\n";
 77      std::cout << "dog.name     : " << person.dog().name()    << "\n";
 78      std::cout << "house.rooms  : " << person.house().rooms() << "\n";
 79      std::cout << "----------------------------------"        << "\n";
 80      std::cout                                                << "\n";
 81    }
 82    else
 83    {
 84      std::cout << "Failed to send Protobuf message in C++!"   << "\n";
 85    }
 86
 87    /*
 88      Sleep for 500ms to send in a frequency of 2 hz.
 89    */
 90    eCAL::Process::SleepMS(500);
 91  }
 92
 93  /*
 94    Finalize eCAL. This will stop all eCAL processes and free all resources.
 95    You should always finalize eCAL when you are done using it.
 96  */
 97  eCAL::Finalize();
 98
 99  return(0);
100}

├─  C++
│  └─  person_send.cpp
│
├─  C#
│  └─  person_send_csharp.cs
│
├─  Python
|  └─  person_send.py
|
└─  Python (legacy)
   └─  person_send.py

7.2.3.3. Person Subscriber#

For the subscriber the same changes apply as for the publisher.

 1// Including the eCAL convenience header
 2#include <ecal/ecal.h>
 3// In addition we include the msg protobuf publisher
 4#include <ecal/msg/protobuf/subscriber.h>
 5
 6#include <iostream>
 7
 8#include "person.pb.h"
 9
10/*
11  Here we create the subscriber callback function that is called everytime,
12  when a new message arrived from a publisher.
13*/
14void OnPerson(const eCAL::STopicId& topic_id_, const pb::People::Person& person_, const long long time_, const long long clock_)
15{
16  std::cout << "------------------------------------------"                    << "\n";
17  std::cout << " Received Protobuf message in C++ "                            << "\n";
18  std::cout << "------------------------------------------"                    << "\n";
19  std::cout << " topic name   : " << topic_id_.topic_name                      << "\n";
20  std::cout << " topic time   : " << time_                                     << "\n";
21  std::cout << " topic clock  : " << clock_                                    << "\n";
22  std::cout << ""                                                              << "\n";
23  std::cout << " Content of message type \"" << person_.GetTypeName()  << "\"" << "\n";
24  std::cout << "------------------------------------------"                    << "\n";
25  std::cout << " id          : " << person_.id()                               << "\n";
26  std::cout << " name        : " << person_.name()                             << "\n";
27  std::cout << " stype       : " << person_.stype()                            << "\n";
28  std::cout << " email       : " << person_.email()                            << "\n";
29  std::cout << " dog.name    : " << person_.dog().name()                       << "\n";
30  std::cout << " house.rooms : " << person_.house().rooms()                    << "\n";
31  std::cout << "------------------------------------------"                    << "\n";
32  std::cout                                                                    << "\n";
33}
34
35int main()
36{
37  std::cout << "----------------------" << std::endl;
38  std::cout << " C++: PERSON RECEIVER"  << std::endl;
39  std::cout << "----------------------" << std::endl;
40
41  /*
42    Initialize eCAL. You always have to initialize eCAL before using its API.
43    The name of our eCAL Process will be "person receive". 
44    This name will be visible in the eCAL Monitor, once the process is running.
45  */
46  eCAL::Initialize("person receive");
47
48  /*
49    Print some eCAL version information.
50  */
51  std::cout << "eCAL " << eCAL::GetVersionString() << " (" << eCAL::GetVersionDateString() << ")" << "\n";
52
53  /*
54    Set the state for the program.
55    You can vary between different states like healthy, warning, critical ...
56    This can be used to communicate the application state to applications like eCAL Monitor/Sys.
57  */
58  eCAL::Process::SetState(eCAL::Process::eSeverity::healthy, eCAL::Process::eSeverityLevel::level1, "I feel good!");
59
60  /*
61    Creating the eCAL Subscriber. An eCAL Process can create multiple subscribers (and publishers).
62    The topic we are going to receive is called "person".
63    The data type is "Pb.People.Person", generated from the protobuf definition.
64  */
65  eCAL::protobuf::CSubscriber<pb::People::Person> subscriber("person");
66
67  /*
68    Create and register a receive callback. The callback will be called whenever a new message is received.
69  */
70  subscriber.SetReceiveCallback(&OnPerson);
71
72  /*
73    Creating an infinite loop.
74    eCAL Supports a stop signal; when an eCAL Process is stopped, eCAL::Ok() will return false.
75  */
76  while(eCAL::Ok())
77  {
78    /*
79      Sleep for 500ms to avoid busy waiting.
80    */
81    eCAL::Process::SleepMS(500);
82  }
83
84  /*
85    Finalize eCAL. This will stop all eCAL processes and free all resources.
86    You should always finalize eCAL before exiting your application.
87  */
88  eCAL::Finalize();
89  
90  return(0);
91}

├─  C++
│  └─  person_receive.cpp
│
├─  C#
│  └─  person_receive_csharp.cs
│
├─  Python
|  └─  person_receive.py
|
└─  Python (legacy)
   └─  person_receive.py

7.2.3.4. Person Dynamic Subscriber#

Using eCAL and Protobuf, it is possible to receive data without knowing the structure of the incoming data in advance. Hence you can use a dynamic subscriber to receive Protobuf data, even if you do not have access to the corresponding .proto file. This is useful for generic applications, such as the eCAL Monitor, which can display all data types without knowing them in advance.

The dynamic Protobuf API is unfortunately only available for C++ and Python at the moment. C# does not have the capabilities for dynamic message support in Protocol Buffers at this time. Progress on this feature can be tracked in the following GitHub issue.

  1#include <ecal/ecal.h>
  2#include <ecal/msg/protobuf/dynamic_subscriber.h>
  3
  4#include <iostream>
  5
  6const std::string MESSAGE_NAME("person");
  7
  8void ProcValue(const std::string& group_, const std::string& name_, const double value_, size_t index_)
  9{
 10  std::string var_name;
 11  if(!group_.empty()) var_name += group_ + ".";
 12  var_name += name_;
 13  if(index_ > 0) var_name += "[" + std::to_string(index_) + "]";
 14  std::cout << var_name << " : " << value_ << std::endl;
 15}
 16
 17void ProcString(const std::string& group_, const std::string& name_, const std::string& value_, size_t index_)
 18{
 19  std::string var_name;
 20  if(!group_.empty()) var_name += group_ + ".";
 21  var_name += name_;
 22  if(index_ > 0) var_name += "[" + std::to_string(index_) + "]";
 23  std::cout << var_name << " : " << value_ << std::endl;
 24}
 25
 26void ProcProtoType(const std::string& group_, const std::string& name_, google::protobuf::int32 value_, size_t index_)
 27{
 28  ProcValue(group_, name_, double(value_), index_);
 29}
 30
 31void ProcProtoType(const std::string& group_, const std::string& name_, google::protobuf::int64 value_, size_t index_)
 32{
 33  ProcValue(group_, name_, double(value_), index_);
 34}
 35
 36void ProcProtoType(const std::string& group_, const std::string& name_, google::protobuf::uint32 value_, size_t index_)
 37{
 38  ProcValue(group_, name_, double(value_), index_);
 39}
 40
 41void ProcProtoType(const std::string& group_, const std::string& name_, google::protobuf::uint64 value_, size_t index_)
 42{
 43  ProcValue(group_, name_, double(value_), index_);
 44}
 45
 46void ProcProtoType(const std::string& group_, const std::string& name_, float value_, size_t index_)
 47{
 48  ProcValue(group_, name_, double(value_), index_);
 49}
 50
 51void ProcProtoType(const std::string& group_, const std::string& name_, double value_, size_t index_)
 52{
 53  ProcValue(group_, name_, double(value_), index_);
 54}
 55
 56void ProcProtoType(const std::string& group_, const std::string& name_, bool value_, size_t index_)
 57{
 58  ProcValue(group_, name_, double(value_), index_);
 59}
 60
 61void ProcProtoType(const std::string& group_, const std::string& name_, const std::string& value_, size_t index_)
 62{
 63  ProcString(group_, name_, value_, index_);
 64}
 65
 66void ProcProtoType(const std::string& group_, const std::string& name_, const google::protobuf::EnumValueDescriptor* value_, size_t index_)
 67{
 68  ProcValue(group_, name_, double(value_->number()), index_);
 69}
 70
 71void ProcProtoMsg(const google::protobuf::Message& msg_, const std::string& prefix_ /* = "" */)
 72{
 73  int count = msg_.GetDescriptor()->field_count();
 74  const google::protobuf::Reflection* ref_ptr = msg_.GetReflection();
 75
 76  if(ref_ptr)
 77  {
 78    for (int i = 0; i < count; ++i)
 79    {
 80      auto field = msg_.GetDescriptor()->field(i);
 81  
 82      const google::protobuf::FieldDescriptor::CppType fdt = field->cpp_type();
 83      switch(fdt)
 84      {
 85      case google::protobuf::FieldDescriptor::CPPTYPE_INT32:      // TYPE_INT32, TYPE_SINT32, TYPE_SFIXED32
 86        if(field->is_repeated())
 87        {
 88          int fsize = ref_ptr->FieldSize(msg_, field);
 89          for(int fnum = 0; fnum < fsize; ++fnum)
 90          {
 91            ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedInt32(msg_, field, fnum), static_cast<size_t>(fnum));
 92          }
 93        }
 94        else
 95        {
 96          ProcProtoType(prefix_, field->name(), ref_ptr->GetInt32(msg_, field), 0);
 97        }
 98        break;
 99      case google::protobuf::FieldDescriptor::CPPTYPE_INT64:      // TYPE_INT64, TYPE_SINT64, TYPE_SFIXED64
100        if(field->is_repeated())
101        {
102          int fsize = ref_ptr->FieldSize(msg_, field);
103          for(int fnum = 0; fnum < fsize; ++fnum)
104          {
105            ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedInt64(msg_, field, fnum), static_cast<size_t>(fnum));
106          }
107        }
108        else
109        {
110          ProcProtoType(prefix_, field->name(), ref_ptr->GetInt64(msg_, field), 0);
111        }
112        break;
113      case google::protobuf::FieldDescriptor::CPPTYPE_UINT32:     // TYPE_UINT32, TYPE_FIXED32
114        if(field->is_repeated())
115        {
116          int fsize = ref_ptr->FieldSize(msg_, field);
117          for(int fnum = 0; fnum < fsize; ++fnum)
118          {
119            ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedUInt32(msg_, field, fnum), static_cast<size_t>(fnum));
120          }
121        }
122        else
123        {
124          ProcProtoType(prefix_, field->name(), ref_ptr->GetUInt32(msg_, field), 0);
125        }
126        break;
127      case google::protobuf::FieldDescriptor::CPPTYPE_UINT64:     // TYPE_UINT64, TYPE_FIXED64
128        if(field->is_repeated())
129        {
130          int fsize = ref_ptr->FieldSize(msg_, field);
131          for(int fnum = 0; fnum < fsize; ++fnum)
132          {
133            ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedUInt64(msg_, field, fnum), static_cast<size_t>(fnum));
134          }
135        }
136        else
137        {
138          ProcProtoType(prefix_, field->name(), ref_ptr->GetUInt64(msg_, field), 0);
139        }
140        break;
141      case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE:     // TYPE_DOUBLE
142        if(field->is_repeated())
143        {
144          int fsize = ref_ptr->FieldSize(msg_, field);
145          for(int fnum = 0; fnum < fsize; ++fnum)
146          {
147            ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedDouble(msg_, field, fnum), static_cast<size_t>(fnum));
148          }
149        }
150        else
151        {
152          ProcProtoType(prefix_, field->name(), ref_ptr->GetDouble(msg_, field), 0);
153        }
154        break;
155      case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT:      // TYPE_FLOAT
156        if(field->is_repeated())
157        {
158          int fsize = ref_ptr->FieldSize(msg_, field);
159          for(int fnum = 0; fnum < fsize; ++fnum)
160          {
161            ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedFloat(msg_, field, fnum), static_cast<size_t>(fnum));
162          }
163        }
164        else
165        {
166          ProcProtoType(prefix_, field->name(), ref_ptr->GetFloat(msg_, field), 0);
167        }
168        break;
169      case google::protobuf::FieldDescriptor::CPPTYPE_BOOL:       // TYPE_BOOL
170        if(field->is_repeated())
171        {
172          int fsize = ref_ptr->FieldSize(msg_, field);
173          for(int fnum = 0; fnum < fsize; ++fnum)
174          {
175            ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedBool(msg_, field, fnum), static_cast<size_t>(fnum));
176          }
177        }
178        else
179        {
180          ProcProtoType(prefix_, field->name(), ref_ptr->GetBool(msg_, field), 0);
181        }
182        break;
183      case google::protobuf::FieldDescriptor::CPPTYPE_ENUM:       // TYPE_ENUM
184        if(field->is_repeated())
185        {
186          int fsize = ref_ptr->FieldSize(msg_, field);
187          for(int fnum = 0; fnum < fsize; ++fnum)
188          {
189            ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedEnum(msg_, field, fnum), static_cast<size_t>(fnum));
190          }
191        }
192        else
193        {
194          ProcProtoType(prefix_, field->name(), ref_ptr->GetEnum(msg_, field), 0);
195        }
196        break;
197      case google::protobuf::FieldDescriptor::CPPTYPE_STRING:     // TYPE_STRING, TYPE_BYTES
198        if(field->is_repeated())
199        {
200          int fsize = ref_ptr->FieldSize(msg_, field);
201          for(int fnum = 0; fnum < fsize; ++fnum)
202          {
203            ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedString(msg_, field, fnum), fnum);
204          }
205        }
206        else
207        {
208          ProcProtoType(prefix_, field->name(), ref_ptr->GetString(msg_, field), 0);
209        }
210        break;
211      case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE:    // TYPE_MESSAGE, TYPE_GROUP
212        {
213          if(field->is_repeated())
214          {
215            int fsize = ref_ptr->FieldSize(msg_, field);
216            for(int fnum = 0; fnum < fsize; ++fnum)
217            {
218              const google::protobuf::Message& msg = ref_ptr->GetRepeatedMessage(msg_, field, fnum);
219              std::string prefix = field->name();
220              prefix += "[";
221              prefix += std::to_string(fnum);
222              prefix += "]";
223              if(!prefix_.empty()) prefix = prefix_ + "." + prefix;
224
225              // do not process default messages to avoid infinite recursions.
226              std::vector<const google::protobuf::FieldDescriptor*> msg_fields;
227              msg.GetReflection()->ListFields(msg, &msg_fields);
228              
229              if (prefix_.find(field->name()) == std::string::npos || !msg_fields.empty())
230                ProcProtoMsg(msg, prefix);
231            }
232          }
233          else
234          {
235            const google::protobuf::Message& msg = ref_ptr->GetMessage(msg_, field);
236            std::string prefix = field->name();
237            if(!prefix_.empty()) prefix = prefix_ + "." + prefix;
238
239            // do not process default messages to avoid infinite recursions.
240            std::vector<const google::protobuf::FieldDescriptor*> msg_fields;
241            msg.GetReflection()->ListFields(msg, &msg_fields);
242
243            if (prefix_.find(field->name()) == std::string::npos || !msg_fields.empty())
244              ProcProtoMsg(msg, prefix);
245          }
246        }
247        break;
248      default:
249        break;
250      }
251    }
252  }
253}
254
255void ProtoMsgCallback(const eCAL::STopicId& topic_id_, const std::shared_ptr<google::protobuf::Message>& msg_)
256{
257  ProcProtoMsg(*msg_, topic_id_.topic_name);
258  std::cout << std::endl;
259}
260
261int main()
262{
263  // initialize eCAL API
264  eCAL::Initialize("proto_dyn");
265
266  // create dynamic subscribers for receiving and decoding messages
267  eCAL::protobuf::CDynamicSubscriber sub(MESSAGE_NAME);
268  sub.SetReceiveCallback(std::bind(ProtoMsgCallback, std::placeholders::_1, std::placeholders::_2));
269
270  // enter main loop
271  while(eCAL::Ok())
272  {
273    // sleep main thread for 1 second
274    eCAL::Process::SleepMS(1000);
275  }
276
277  // finalize eCAL API
278  eCAL::Finalize();
279
280  return(0);
281}

├─  C++
│  └─  person_receive.cpp
│
└─  Python
   └─  person_receive.py