7.2.3. Protobuf: Person#
In the last section you learned how to send strings to an eCAL Topic. Using strings is great for simple data that has a textual representation. Quite often however your data will be more complex, so you need some kind of protocol that defines how your data is structured.
Our recommended way is to use Google protobuf to do that, because:
It solves the problem of how to serialize and de-serialize data for you
You get downward compatibility out of the box (if you follow the guidelines)
It is maintained by Google and the API is stable
The eCAL Monitor can display a reflection view of the data
Important
It is important to remember, that all your applications must agree on the data format.
As protobuf messages are defined in .proto
files, all of your applications should share the same files.
eCAL supports protobuf serialization natively for C++, C# and Python.
The usage of protobuf for data exchange in eCAL is very simple. You know already from the “String: Hello World” how to send and receive simple string data. The basic setup will be the same, but instead of using the string publisher, we will use the protobuf publisher and subscriber.
7.2.3.1. Person Protobuf#
As the sender and receiver need the same .proto files, we place them in a separate directory next to the source directories for the sender and the receiver.
Person Protobuf File ├─person.proto
│ ├─animal.proto
│ └─house.proto
Let’s start with the protobuf/person.proto
file!
1syntax = "proto3";
2
3// import message definitions from different proto files
4import "animal.proto";
5import "house.proto";
6
7// assign a package name, that will appear as a namespace
8package pb.People;
9
10// create the message "Person" with
11// - base types like int32 and string
12// - individual defined enum SType
13// - imported message types
14message Person
15{
16 enum SType
17 {
18 MALE = 0;
19 FEMALE = 1;
20 }
21
22 int32 id = 1;
23 string name = 2;
24 SType stype = 3;
25 string email = 4;
26
27 Animal.Dog dog = 5;
28 Environment.House house = 6;
29}
As you can see, the person.proto
file imports also other message definitions: animal.proto
and house.proto
.
So we need them as well. The definitions are straight forward. For more information about protobuf, please refer to the detailed official documentation.
1syntax = "proto3";
2
3package pb.Animal;
4
5message Dog
6{
7 string name = 1;
8 string colour = 2;
9}
1syntax = "proto3";
2
3package pb.Environment;
4
5message House
6{
7 int32 rooms = 1;
8}
7.2.3.2. Person Publisher#
- The main differences to the string publisher are:
we need to include / import the protobuf specific publishers
also we need to include / import the compiled protobuf message definitions for the specific programming language
we need to utilize the protobuf message class
Person
instead of the string class
1// Including the eCAL convenience header
2#include <ecal/ecal.h>
3// In addition we include the msg protobuf publisher
4#include <ecal/msg/protobuf/publisher.h>
5
6#include <iostream>
7
8// Here we include the compiled protobuf header for the message "Person"
9#include "person.pb.h"
10
11int main()
12{
13 std::cout << "--------------------" << std::endl;
14 std::cout << " C++: PERSON SENDER" << std::endl;
15 std::cout << "--------------------" << std::endl;
16
17 /*
18 Initialize eCAL. You always have to initialize eCAL before using its API.
19 The name of our eCAL Process will be "person send".
20 This name will be visible in the eCAL Monitor, once the process is running.
21 */
22 eCAL::Initialize("person send");
23
24 /*
25 Print some eCAL version information.
26 */
27 std::cout << "eCAL " << eCAL::GetVersionString() << " (" << eCAL::GetVersionDateString() << ")" << "\n";
28
29 /*
30 Set the state for the program.
31 You can vary between different states like healthy, warning, critical ...
32 This can be used to communicate the application state to applications like eCAL Monitor/Sys.
33 */
34 eCAL::Process::SetState(eCAL::Process::eSeverity::healthy, eCAL::Process::eSeverityLevel::level1, "I feel good!");
35
36 /*
37 Now we create a new publisher that will publish the topic "person".
38 The data type is "pb.People.Person", generated from the protobuf definition.
39 */
40 eCAL::protobuf::CPublisher<pb::People::Person> publisher("person");
41
42 /*
43 Construct a message. The message is a protobuf struct that will be sent to the subscribers.
44 */
45 pb::People::Person person;
46 person.set_id(0);
47 person.set_name("Max");
48 person.set_stype(pb::People::Person_SType_MALE);
49 person.set_email("max@mail.net");
50 person.mutable_dog()->set_name("Brandy");
51 person.mutable_house()->set_rooms(4);
52
53 /*
54 Creating an infinite publish-loop.
55 eCAL Supports a stop signal; when an eCAL Process is stopped, eCAL_Ok() will return false.
56 */
57 auto loop_count = 0;
58 while(eCAL::Ok())
59 {
60 /*
61 Change in each loop the content of the message to see a difference per message.
62 */
63 person.set_id(loop_count++);
64
65 /*
66 Send the message. The message is sent to all subscribers that are currently connected to the topic "person".
67 */
68 if (publisher.Send(person))
69 {
70 std::cout << "----------------------------------" << "\n";
71 std::cout << "Sent protobuf message in C++: " << "\n";
72 std::cout << "----------------------------------" << "\n";
73 std::cout << "person id : " << person.id() << "\n";
74 std::cout << "person name : " << person.name() << "\n";
75 std::cout << "person stype : " << person.stype() << "\n";
76 std::cout << "person email : " << person.email() << "\n";
77 std::cout << "dog.name : " << person.dog().name() << "\n";
78 std::cout << "house.rooms : " << person.house().rooms() << "\n";
79 std::cout << "----------------------------------" << "\n";
80 std::cout << "\n";
81 }
82 else
83 {
84 std::cout << "Failed to send Protobuf message in C++!" << "\n";
85 }
86
87 /*
88 Sleep for 500ms to send in a frequency of 2 hz.
89 */
90 eCAL::Process::SleepMS(500);
91 }
92
93 /*
94 Finalize eCAL. This will stop all eCAL processes and free all resources.
95 You should always finalize eCAL when you are done using it.
96 */
97 eCAL::Finalize();
98
99 return(0);
100}
1using System;
2// Include the eCAL API namespace
3using Eclipse.eCAL.Core;
4
5public class PersonSend
6{
7 static void Main()
8 {
9 Console.WriteLine("-------------------");
10 Console.WriteLine(" C#: PERSON SENDER");
11 Console.WriteLine("-------------------");
12
13 /*
14 Initialize eCAL. You always have to initialize eCAL before using its API.
15 The name of our eCAL Process will be "person send c#".
16 This name will be visible in the eCAL Monitor, once the process is running.
17 */
18 Core.Initialize("person send c#");
19
20 /*
21 Print version info.
22 */
23 Console.WriteLine(String.Format("eCAL {0} ({1})\n", Core.GetVersionString(), Core.GetVersionDateString()));
24
25 /*
26 Set the state for the program.
27 You can vary between different states like healthy, warning, critical ...
28 This can be used to communicate the application state to applications like eCAL Monitor/Sys.
29 */
30 Process.SetState(eProcessSeverity.Healthy, eProcessSeverityLevel.Level1, "I feel good!");
31
32 /*
33 Now we create a new publisher that will publish the topic "person".
34 The data type is "Pb.People.Person", generated from the protobuf definition.
35 */
36 var publisher = new ProtobufPublisher<Pb.People.Person>("person");
37
38 /*
39 Construct a message. The message is a protobuf struct that will be sent to the subscribers.
40 */
41 var person = new Pb.People.Person
42 {
43 Id = 0,
44 Email = "max@online.de",
45 Name = "Max",
46 Stype = Pb.People.Person.Types.SType.Female,
47 Dog = new Pb.Animal.Dog { Name = "Brandy" },
48 House = new Pb.Environment.House { Rooms = 4 }
49 };
50
51 /*
52 Creating an infinite publish-loop.
53 eCAL Supports a stop signal; when an eCAL Process is stopped, eCAL_Ok() will return false.
54 */
55 int loop_count = 0;
56 while (Core.Ok())
57 {
58 /*
59 Change in each loop the content of the message to see a difference per message.
60 */
61 person.Id = loop_count++;
62
63 /*
64 Send the message. The message is sent to all subscribers that are currently connected to the topic "person".
65 */
66 if (publisher.Send(person))
67 {
68 Console.WriteLine("----------------------------------");
69 Console.WriteLine("Sent protobuf message in C#: ");
70 Console.WriteLine("----------------------------------");
71 Console.WriteLine("person id : {0}", person.Id);
72 Console.WriteLine("person name : {0}", person.Name);
73 Console.WriteLine("person stype : {0}", person.Stype);
74 Console.WriteLine("person email : {0}", person.Email);
75 Console.WriteLine("dog.name : {0}", person.Dog.Name);
76 Console.WriteLine("house.rooms : {0}", person.House.Rooms);
77 Console.WriteLine("----------------------------------");
78 Console.WriteLine("");
79 }
80 else
81 {
82 Console.WriteLine("Sending protobuf message in C# failed!");
83 }
84
85 /*
86 Sleep for 500ms to send in a frequency of 2 hz.
87 */
88 System.Threading.Thread.Sleep(500);
89 }
90
91 /*
92 Cleanup. Dispose the publisher to free resources.
93 */
94 publisher.Dispose();
95
96 /*
97 Terminate eCAL. This will stop all eCAL processes and free all resources.
98 You should always terminate eCAL when you are done using it.
99 */
100 Core.Terminate();
101 }
102}
1import os
2import sys
3import time
4# import the eCAL core API
5import ecal.nanobind_core as ecal_core
6# import the eCAL publisher AP
7from ecal.msg.proto.core import Publisher as ProtobufPublisher
8
9import google.protobuf
10from packaging import version
11proto_version = version.parse(google.protobuf.__version__)
12sys.path.insert(1, os.path.join(sys.path[0], f"./_protobuf/v{proto_version.major}"))
13# import the compiled protobuf message classes
14import person_pb2
15
16def main():
17 print("-------------------------------")
18 print(" Python: PERSON SENDER ")
19 print("-------------------------------")
20
21 # Initialize eCAL. You always have to initialize eCAL before using it.
22 # The name of our eCAL Process will be "person send python".
23 # This name will be visible in the eCAL Monitor, once the process is running.
24 ecal_core.initialize("person send python")
25
26 # Print used eCAL version and date
27 print("eCAL {} ({})\n".format(ecal_core.get_version_string(), ecal_core.get_version_date_string()))
28
29 # Set the state for the program.
30 # You can vary between different states like healthy, warning, critical ...
31 # This can be used to communicate the application state to applications like eCAL Monitor/Sys.
32 ecal_core.process.set_state(ecal_core.process.Severity.HEALTHY, ecal_core.process.SeverityLevel.LEVEL1, "I feel good!")
33
34 # Creating the eCAL Publisher. An eCAL Process can create multiple publishers (and subscribers).
35 # The topic we are going to publish is called "person".
36 pub = ProtobufPublisher[person_pb2.Person](person_pb2.Person, "person")
37
38 # Create a new message object of type "Person" and fill it with some data.
39 person = person_pb2.Person()
40 person.name = "Max"
41 person.stype = person_pb2.Person.MALE
42 person.email = "max@mail.net"
43 person.dog.name = "Brandy"
44 person.house.rooms = 4
45
46 loop_count = 0
47 # Creating an infinite publish-loop.
48 # eCAL Supports a stop signal; when an eCAL Process is stopped, ecal_core.ok() will return false.
49 while ecal_core.ok():
50 # Change something in the message so we can see that the message is changing, e.g. in eCAL Monitor.
51 loop_count = loop_count + 1
52 person.id = loop_count
53
54 # Send the content to other eCAL Processes that have subscribed to the topic "hello".
55 pub.send(person)
56
57 print("Sending: {}".format(person))
58
59 # Sleep for 500 ms so we send with a frequency of 2 Hz.
60 time.sleep(0.5)
61
62 # Finalize eCAL.
63 # You should always do that before your application exits.
64 ecal_core.finalize()
65
66if __name__ == "__main__":
67 main()
1import os
2import sys
3import time
4
5# import the eCAL core API
6import ecal.core.core as ecal_core
7# import the eCAL publisher API supporting protobuf
8from ecal.core.publisher import ProtoPublisher
9
10sys.path.insert(1, os.path.join(sys.path[0], '../_protobuf'))
11# import the compiled protobuf message classes
12import person_pb2
13
14def main():
15 print("--------------------------------")
16 print(" Python (legacy): PERSON SENDER ")
17 print("--------------------------------")
18
19 # Initialize eCAL. You always have to initialize eCAL before using it.
20 # The name of our eCAL Process will be "person send python".
21 # This name will be visible in the eCAL Monitor, once the process is running.
22 ecal_core.initialize("person send python (legacy)")
23
24 # Print used eCAL version and date
25 print("eCAL {} ({})\n".format(ecal_core.getversion(), ecal_core.getdate()))
26
27 # Set the state for the program.
28 # You can vary between different states like healthy, warning, critical ...
29 # This can be used to communicate the application state to applications like eCAL Monitor/Sys.
30 ecal_core.set_process_state(1, 1, "I feel good!")
31
32 # Creating the eCAL Publisher. An eCAL Process can create multiple publishers (and subscribers).
33 # The topic we are going to publish is called "person".
34 # Furthermore, we want to tell the publisher that we want to use the protobuf message class "Person" from the file "person.proto".
35 pub = ProtoPublisher("person", person_pb2.Person)
36
37 # Create a new message object of type "Person" and fill it with some data.
38 person = person_pb2.Person()
39 person.name = "Max"
40 person.stype = person_pb2.Person.MALE
41 person.email = "max@mail.net"
42 person.dog.name = "Brandy"
43 person.house.rooms = 4
44
45 # Creating an inifite publish-loop.
46 # eCAL Supports a stop signal; when an eCAL Process is stopped, eCAL::Ok() will return false.
47 loop_count = 0
48 while ecal_core.ok():
49
50 # Change something in the message so we can see that the message is changing, e.g. in eCAL Monitor.
51 loop_count = loop_count + 1
52 person.id = loop_count
53
54 # Send the content to other eCAL Processes that have subscribed to the topic "person".
55 pub.send(person)
56 print("Sending person {}".format(person.id))
57
58 # Sleep for 500 ms so we send with a frequency of 2 Hz.
59 time.sleep(0.5)
60
61 # Finalize eCAL.
62 # You should always do that before your application exits.
63 ecal_core.finalize()
64
65if __name__ == "__main__":
66 main()
├─ C++ │ └─person_send.cpp
│ ├─ C# │ └─person_send_csharp.cs
│ ├─ Python | └─person_send.py
| └─ Python (legacy) └─person_send.py
7.2.3.3. Person Subscriber#
For the subscriber the same changes apply as for the publisher.
1// Including the eCAL convenience header
2#include <ecal/ecal.h>
3// In addition we include the msg protobuf publisher
4#include <ecal/msg/protobuf/subscriber.h>
5
6#include <iostream>
7
8#include "person.pb.h"
9
10/*
11 Here we create the subscriber callback function that is called everytime,
12 when a new message arrived from a publisher.
13*/
14void OnPerson(const eCAL::STopicId& topic_id_, const pb::People::Person& person_, const long long time_, const long long clock_)
15{
16 std::cout << "------------------------------------------" << "\n";
17 std::cout << " Received Protobuf message in C++ " << "\n";
18 std::cout << "------------------------------------------" << "\n";
19 std::cout << " topic name : " << topic_id_.topic_name << "\n";
20 std::cout << " topic time : " << time_ << "\n";
21 std::cout << " topic clock : " << clock_ << "\n";
22 std::cout << "" << "\n";
23 std::cout << " Content of message type \"" << person_.GetTypeName() << "\"" << "\n";
24 std::cout << "------------------------------------------" << "\n";
25 std::cout << " id : " << person_.id() << "\n";
26 std::cout << " name : " << person_.name() << "\n";
27 std::cout << " stype : " << person_.stype() << "\n";
28 std::cout << " email : " << person_.email() << "\n";
29 std::cout << " dog.name : " << person_.dog().name() << "\n";
30 std::cout << " house.rooms : " << person_.house().rooms() << "\n";
31 std::cout << "------------------------------------------" << "\n";
32 std::cout << "\n";
33}
34
35int main()
36{
37 std::cout << "----------------------" << std::endl;
38 std::cout << " C++: PERSON RECEIVER" << std::endl;
39 std::cout << "----------------------" << std::endl;
40
41 /*
42 Initialize eCAL. You always have to initialize eCAL before using its API.
43 The name of our eCAL Process will be "person receive".
44 This name will be visible in the eCAL Monitor, once the process is running.
45 */
46 eCAL::Initialize("person receive");
47
48 /*
49 Print some eCAL version information.
50 */
51 std::cout << "eCAL " << eCAL::GetVersionString() << " (" << eCAL::GetVersionDateString() << ")" << "\n";
52
53 /*
54 Set the state for the program.
55 You can vary between different states like healthy, warning, critical ...
56 This can be used to communicate the application state to applications like eCAL Monitor/Sys.
57 */
58 eCAL::Process::SetState(eCAL::Process::eSeverity::healthy, eCAL::Process::eSeverityLevel::level1, "I feel good!");
59
60 /*
61 Creating the eCAL Subscriber. An eCAL Process can create multiple subscribers (and publishers).
62 The topic we are going to receive is called "person".
63 The data type is "Pb.People.Person", generated from the protobuf definition.
64 */
65 eCAL::protobuf::CSubscriber<pb::People::Person> subscriber("person");
66
67 /*
68 Create and register a receive callback. The callback will be called whenever a new message is received.
69 */
70 subscriber.SetReceiveCallback(&OnPerson);
71
72 /*
73 Creating an infinite loop.
74 eCAL Supports a stop signal; when an eCAL Process is stopped, eCAL::Ok() will return false.
75 */
76 while(eCAL::Ok())
77 {
78 /*
79 Sleep for 500ms to avoid busy waiting.
80 */
81 eCAL::Process::SleepMS(500);
82 }
83
84 /*
85 Finalize eCAL. This will stop all eCAL processes and free all resources.
86 You should always finalize eCAL before exiting your application.
87 */
88 eCAL::Finalize();
89
90 return(0);
91}
1using System;
2// include ecal core namespace
3using Eclipse.eCAL.Core;
4
5public class PersonReceive
6{
7 public static void Main()
8 {
9 Console.WriteLine("---------------------");
10 Console.WriteLine(" C#: PERSON RECEIVER");
11 Console.WriteLine("---------------------");
12
13 /*
14 Initialize eCAL. You always have to initialize eCAL before using its API.
15 The name of our eCAL Process will be "person receive c#".
16 This name will be visible in the eCAL Monitor, once the process is running.
17 */
18 Core.Initialize("person receive c#");
19
20 /*
21 Print version info.
22 */
23 Console.WriteLine(String.Format("eCAL {0} ({1})\n", Core.GetVersionString(), Core.GetVersionDateString()));
24
25 /*
26 Set the state for the program.
27 You can vary between different states like healthy, warning, critical ...
28 This can be used to communicate the application state to applications like eCAL Monitor/Sys.
29 */
30 Process.SetState(eProcessSeverity.Healthy, eProcessSeverityLevel.Level1, "I feel good!");
31
32 /*
33 Creating the eCAL Subscriber. An eCAL Process can create multiple subscribers (and publishers).
34 The topic we are going to receive is called "person".
35 The data type is "Pb.People.Person", generated from the protobuf definition.
36 */
37 var subscriber = new ProtobufSubscriber<Pb.People.Person>("person");
38
39 /*
40 Create and register a receive callback. The callback will be called whenever a new message is received.
41 */
42 subscriber.SetReceiveCallback((topicId, dataTypeInfo, person) =>
43 {
44 Console.WriteLine("------------------------------------------");
45 Console.WriteLine(" Received Protobuf message in C#");
46 Console.WriteLine("------------------------------------------");
47 Console.WriteLine(" topic name : {0}", topicId.TopicName);
48 Console.WriteLine(" topic time : n/a");
49 Console.WriteLine(" topic clock : n/a");
50 Console.WriteLine("");
51 Console.WriteLine(" Content of message type \"{0}\"", person.Message.GetType());
52 Console.WriteLine("------------------------------------------");
53 Console.WriteLine(" id : {0}", person.Message.Id);
54 Console.WriteLine(" name : {0}", person.Message.Name);
55 Console.WriteLine(" stype : {0}", person.Message.Stype);
56 Console.WriteLine(" email : {0}", person.Message.Email);
57 Console.WriteLine(" dog.name : {0}", person.Message.Dog.Name);
58 Console.WriteLine(" house.rooms : {0}", person.Message.House.Rooms);
59 Console.WriteLine("------------------------------------------");
60 Console.WriteLine("");
61 });
62
63 /*
64 Creating an infinite loop.
65 eCAL Supports a stop signal; when an eCAL Process is stopped, eCAL::Ok() will return false.
66 */
67 while (Core.Ok())
68 {
69 /*
70 Sleep for 500ms to avoid busy waiting.
71 */
72 System.Threading.Thread.Sleep(500);
73 }
74
75 /*
76 Cleanup. Dispose the subscriber to free resources.
77 */
78 subscriber.Dispose();
79
80 /*
81 Terminate eCAL. This will stop all eCAL processes and free all resources.
82 You should always terminate eCAL before exiting your application.
83 */
84 Core.Terminate();
85 }
86}
1import os
2import sys
3import time
4import typing
5# import the eCAL core API
6import ecal.nanobind_core as ecal_core
7# import the eCAL subscriber API
8from ecal.msg.proto.core import Subscriber as ProtobufSubscriber
9from ecal.msg.common.core import ReceiveCallbackData
10
11import google.protobuf
12from packaging import version
13proto_version = version.parse(google.protobuf.__version__)
14sys.path.insert(1, os.path.join(sys.path[0], f"./_protobuf/v{proto_version.major}"))
15# import the compiled protobuf message classes
16import person_pb2
17
18# eCAL receive callback
19def data_callback(publisher_id : ecal_core.TopicId, data : ReceiveCallbackData[person_pb2.Person]) -> None:
20 output = f"""
21 ----------------------------------------------
22 Received person message from topic {publisher_id.topic_name} in Python
23 ----------------------------------------------
24 Time : {data.send_timestamp}
25 Clock : {data.send_clock}
26 Message :
27 {data.message}
28 """
29 print(output)
30
31def main():
32 print("--------------------------------")
33 print(" Python: PERSON RECEIVE ")
34 print("--------------------------------")
35
36 # Initialize eCAL. You always have to initialize eCAL before using its API.
37 # The name of our eCAL Process will be "hello receive python".
38 # This name will be visible in the eCAL Monitor, once the process is running.
39 ecal_core.initialize("person receive python")
40
41 # print eCAL version and date
42 print("eCAL {} ({})\n".format(ecal_core.get_version_string(), ecal_core.get_version_date_string()))
43
44 # Set the state for the program.
45 # You can vary between different states like healthy, warning, critical ...
46 # This can be used to communicate the application state to applications like eCAL Monitor/Sys.
47 ecal_core.process.set_state(ecal_core.process.Severity.HEALTHY, ecal_core.process.SeverityLevel.LEVEL1, "I feel good!")
48
49 # Creating the eCAL Subscriber. An eCAL Process can create multiple subscribers (and publishers).
50 # The topic we are going to receive is called "person".
51 sub = ProtobufSubscriber[person_pb2.Person](person_pb2.Person, "person")
52 sub.set_receive_callback(data_callback)
53
54 # Creating an infinite loop.
55 # eCAL Supports a stop signal; when an eCAL Process is stopped, eCAL::Ok() will return false.
56 while ecal_core.ok():
57 time.sleep(0.5) # Sleep for 500ms to reduce CPU usage.
58
59 # Finalize eCAL.
60 # You should always do that before your application exits.
61 ecal_core.finalize()
62
63if __name__ == "__main__":
64 main()
65
1import os
2import sys
3import time
4# import the eCAL core API
5import ecal.core.core as ecal_core
6# import the eCAL subscriber API
7from ecal.core.subscriber import ProtoSubscriber
8
9sys.path.insert(1, os.path.join(sys.path[0], '../_protobuf'))
10# import the protobuf generated classes
11import person_pb2
12
13# Create here the eCAL receive callback. This will be called whenever a new message is received.
14def callback(topic_name, person, time):
15 print("")
16 print("Received person ..")
17 print("person id : {}".format(person.id))
18 print("person name : {}".format(person.name))
19 print("person stype : {}".format(person.stype))
20 print("person email : {}".format(person.email))
21 print("dog.name : {}".format(person.dog.name))
22 print("house.rooms : {}".format(person.house.rooms))
23
24def main():
25 print("----------------------------------")
26 print(" Python (legacy): PERSON RECEIVER ")
27 print("----------------------------------")
28
29 # Initialize eCAL. You always have to initialize eCAL before using it.
30 # The name of our eCAL Process will be "person receive python (legacy)".
31 # This name will be visible in the eCAL Monitor, once the process is running.
32 ecal_core.initialize("person receive python (legacy)")
33
34 # Print used eCAL version and date
35 print("eCAL {} ({})\n".format(ecal_core.getversion(), ecal_core.getdate()))
36
37 # Set the state for the program.
38 # You can vary between different states like healthy, warning, critical ...
39 # This can be used to communicate the application state to applications like eCAL Monitor/Sys.
40 ecal_core.set_process_state(1, 1, "I feel good!")
41
42 # Creating the eCAL Subscriber. An eCAL Process can create multiple subscribers (and publishers).
43 # The topic we are going to receive is called "person".
44 # The topic type is specified by the protobuf message class.
45 sub = ProtoSubscriber("person", person_pb2.Person)
46
47 # Register the callback with the subscriber so it can be called.
48 # The callback will be called whenever a new message is received.
49 sub.set_callback(callback)
50
51 # Creating an infinite loop.
52 # eCAL Supports a stop signal; when an eCAL Process is stopped, eCAL::Ok() will return false.
53 while ecal_core.ok():
54 # Sleep for 500ms to avoid busy waiting.
55 # You can use eCAL::Process::SleepMS() to sleep in milliseconds.
56 time.sleep(0.5)
57
58 # Finalize eCAL.
59 # You should always do that before your application exits.
60 ecal_core.finalize()
61
62if __name__ == "__main__":
63 main()
├─ C++ │ └─person_receive.cpp
│ ├─ C# │ └─person_receive_csharp.cs
│ ├─ Python | └─person_receive.py
| └─ Python (legacy) └─person_receive.py
7.2.3.4. Person Dynamic Subscriber#
Using eCAL and Protobuf, it is possible to receive data without knowing the structure of the incoming data in advance.
Hence you can use a dynamic subscriber to receive Protobuf data, even if you do not have access to the corresponding .proto
file.
This is useful for generic applications, such as the eCAL Monitor, which can display all data types without knowing them in advance.
The dynamic Protobuf API is unfortunately only available for C++ and Python at the moment. C# does not have the capabilities for dynamic message support in Protocol Buffers at this time. Progress on this feature can be tracked in the following GitHub issue.
1#include <ecal/ecal.h>
2#include <ecal/msg/protobuf/dynamic_subscriber.h>
3
4#include <iostream>
5
6const std::string MESSAGE_NAME("person");
7
8void ProcValue(const std::string& group_, const std::string& name_, const double value_, size_t index_)
9{
10 std::string var_name;
11 if(!group_.empty()) var_name += group_ + ".";
12 var_name += name_;
13 if(index_ > 0) var_name += "[" + std::to_string(index_) + "]";
14 std::cout << var_name << " : " << value_ << std::endl;
15}
16
17void ProcString(const std::string& group_, const std::string& name_, const std::string& value_, size_t index_)
18{
19 std::string var_name;
20 if(!group_.empty()) var_name += group_ + ".";
21 var_name += name_;
22 if(index_ > 0) var_name += "[" + std::to_string(index_) + "]";
23 std::cout << var_name << " : " << value_ << std::endl;
24}
25
26void ProcProtoType(const std::string& group_, const std::string& name_, google::protobuf::int32 value_, size_t index_)
27{
28 ProcValue(group_, name_, double(value_), index_);
29}
30
31void ProcProtoType(const std::string& group_, const std::string& name_, google::protobuf::int64 value_, size_t index_)
32{
33 ProcValue(group_, name_, double(value_), index_);
34}
35
36void ProcProtoType(const std::string& group_, const std::string& name_, google::protobuf::uint32 value_, size_t index_)
37{
38 ProcValue(group_, name_, double(value_), index_);
39}
40
41void ProcProtoType(const std::string& group_, const std::string& name_, google::protobuf::uint64 value_, size_t index_)
42{
43 ProcValue(group_, name_, double(value_), index_);
44}
45
46void ProcProtoType(const std::string& group_, const std::string& name_, float value_, size_t index_)
47{
48 ProcValue(group_, name_, double(value_), index_);
49}
50
51void ProcProtoType(const std::string& group_, const std::string& name_, double value_, size_t index_)
52{
53 ProcValue(group_, name_, double(value_), index_);
54}
55
56void ProcProtoType(const std::string& group_, const std::string& name_, bool value_, size_t index_)
57{
58 ProcValue(group_, name_, double(value_), index_);
59}
60
61void ProcProtoType(const std::string& group_, const std::string& name_, const std::string& value_, size_t index_)
62{
63 ProcString(group_, name_, value_, index_);
64}
65
66void ProcProtoType(const std::string& group_, const std::string& name_, const google::protobuf::EnumValueDescriptor* value_, size_t index_)
67{
68 ProcValue(group_, name_, double(value_->number()), index_);
69}
70
71void ProcProtoMsg(const google::protobuf::Message& msg_, const std::string& prefix_ /* = "" */)
72{
73 int count = msg_.GetDescriptor()->field_count();
74 const google::protobuf::Reflection* ref_ptr = msg_.GetReflection();
75
76 if(ref_ptr)
77 {
78 for (int i = 0; i < count; ++i)
79 {
80 auto field = msg_.GetDescriptor()->field(i);
81
82 const google::protobuf::FieldDescriptor::CppType fdt = field->cpp_type();
83 switch(fdt)
84 {
85 case google::protobuf::FieldDescriptor::CPPTYPE_INT32: // TYPE_INT32, TYPE_SINT32, TYPE_SFIXED32
86 if(field->is_repeated())
87 {
88 int fsize = ref_ptr->FieldSize(msg_, field);
89 for(int fnum = 0; fnum < fsize; ++fnum)
90 {
91 ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedInt32(msg_, field, fnum), static_cast<size_t>(fnum));
92 }
93 }
94 else
95 {
96 ProcProtoType(prefix_, field->name(), ref_ptr->GetInt32(msg_, field), 0);
97 }
98 break;
99 case google::protobuf::FieldDescriptor::CPPTYPE_INT64: // TYPE_INT64, TYPE_SINT64, TYPE_SFIXED64
100 if(field->is_repeated())
101 {
102 int fsize = ref_ptr->FieldSize(msg_, field);
103 for(int fnum = 0; fnum < fsize; ++fnum)
104 {
105 ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedInt64(msg_, field, fnum), static_cast<size_t>(fnum));
106 }
107 }
108 else
109 {
110 ProcProtoType(prefix_, field->name(), ref_ptr->GetInt64(msg_, field), 0);
111 }
112 break;
113 case google::protobuf::FieldDescriptor::CPPTYPE_UINT32: // TYPE_UINT32, TYPE_FIXED32
114 if(field->is_repeated())
115 {
116 int fsize = ref_ptr->FieldSize(msg_, field);
117 for(int fnum = 0; fnum < fsize; ++fnum)
118 {
119 ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedUInt32(msg_, field, fnum), static_cast<size_t>(fnum));
120 }
121 }
122 else
123 {
124 ProcProtoType(prefix_, field->name(), ref_ptr->GetUInt32(msg_, field), 0);
125 }
126 break;
127 case google::protobuf::FieldDescriptor::CPPTYPE_UINT64: // TYPE_UINT64, TYPE_FIXED64
128 if(field->is_repeated())
129 {
130 int fsize = ref_ptr->FieldSize(msg_, field);
131 for(int fnum = 0; fnum < fsize; ++fnum)
132 {
133 ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedUInt64(msg_, field, fnum), static_cast<size_t>(fnum));
134 }
135 }
136 else
137 {
138 ProcProtoType(prefix_, field->name(), ref_ptr->GetUInt64(msg_, field), 0);
139 }
140 break;
141 case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE: // TYPE_DOUBLE
142 if(field->is_repeated())
143 {
144 int fsize = ref_ptr->FieldSize(msg_, field);
145 for(int fnum = 0; fnum < fsize; ++fnum)
146 {
147 ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedDouble(msg_, field, fnum), static_cast<size_t>(fnum));
148 }
149 }
150 else
151 {
152 ProcProtoType(prefix_, field->name(), ref_ptr->GetDouble(msg_, field), 0);
153 }
154 break;
155 case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT: // TYPE_FLOAT
156 if(field->is_repeated())
157 {
158 int fsize = ref_ptr->FieldSize(msg_, field);
159 for(int fnum = 0; fnum < fsize; ++fnum)
160 {
161 ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedFloat(msg_, field, fnum), static_cast<size_t>(fnum));
162 }
163 }
164 else
165 {
166 ProcProtoType(prefix_, field->name(), ref_ptr->GetFloat(msg_, field), 0);
167 }
168 break;
169 case google::protobuf::FieldDescriptor::CPPTYPE_BOOL: // TYPE_BOOL
170 if(field->is_repeated())
171 {
172 int fsize = ref_ptr->FieldSize(msg_, field);
173 for(int fnum = 0; fnum < fsize; ++fnum)
174 {
175 ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedBool(msg_, field, fnum), static_cast<size_t>(fnum));
176 }
177 }
178 else
179 {
180 ProcProtoType(prefix_, field->name(), ref_ptr->GetBool(msg_, field), 0);
181 }
182 break;
183 case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: // TYPE_ENUM
184 if(field->is_repeated())
185 {
186 int fsize = ref_ptr->FieldSize(msg_, field);
187 for(int fnum = 0; fnum < fsize; ++fnum)
188 {
189 ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedEnum(msg_, field, fnum), static_cast<size_t>(fnum));
190 }
191 }
192 else
193 {
194 ProcProtoType(prefix_, field->name(), ref_ptr->GetEnum(msg_, field), 0);
195 }
196 break;
197 case google::protobuf::FieldDescriptor::CPPTYPE_STRING: // TYPE_STRING, TYPE_BYTES
198 if(field->is_repeated())
199 {
200 int fsize = ref_ptr->FieldSize(msg_, field);
201 for(int fnum = 0; fnum < fsize; ++fnum)
202 {
203 ProcProtoType(prefix_, field->name(), ref_ptr->GetRepeatedString(msg_, field, fnum), fnum);
204 }
205 }
206 else
207 {
208 ProcProtoType(prefix_, field->name(), ref_ptr->GetString(msg_, field), 0);
209 }
210 break;
211 case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: // TYPE_MESSAGE, TYPE_GROUP
212 {
213 if(field->is_repeated())
214 {
215 int fsize = ref_ptr->FieldSize(msg_, field);
216 for(int fnum = 0; fnum < fsize; ++fnum)
217 {
218 const google::protobuf::Message& msg = ref_ptr->GetRepeatedMessage(msg_, field, fnum);
219 std::string prefix = field->name();
220 prefix += "[";
221 prefix += std::to_string(fnum);
222 prefix += "]";
223 if(!prefix_.empty()) prefix = prefix_ + "." + prefix;
224
225 // do not process default messages to avoid infinite recursions.
226 std::vector<const google::protobuf::FieldDescriptor*> msg_fields;
227 msg.GetReflection()->ListFields(msg, &msg_fields);
228
229 if (prefix_.find(field->name()) == std::string::npos || !msg_fields.empty())
230 ProcProtoMsg(msg, prefix);
231 }
232 }
233 else
234 {
235 const google::protobuf::Message& msg = ref_ptr->GetMessage(msg_, field);
236 std::string prefix = field->name();
237 if(!prefix_.empty()) prefix = prefix_ + "." + prefix;
238
239 // do not process default messages to avoid infinite recursions.
240 std::vector<const google::protobuf::FieldDescriptor*> msg_fields;
241 msg.GetReflection()->ListFields(msg, &msg_fields);
242
243 if (prefix_.find(field->name()) == std::string::npos || !msg_fields.empty())
244 ProcProtoMsg(msg, prefix);
245 }
246 }
247 break;
248 default:
249 break;
250 }
251 }
252 }
253}
254
255void ProtoMsgCallback(const eCAL::STopicId& topic_id_, const std::shared_ptr<google::protobuf::Message>& msg_)
256{
257 ProcProtoMsg(*msg_, topic_id_.topic_name);
258 std::cout << std::endl;
259}
260
261int main()
262{
263 // initialize eCAL API
264 eCAL::Initialize("proto_dyn");
265
266 // create dynamic subscribers for receiving and decoding messages
267 eCAL::protobuf::CDynamicSubscriber sub(MESSAGE_NAME);
268 sub.SetReceiveCallback(std::bind(ProtoMsgCallback, std::placeholders::_1, std::placeholders::_2));
269
270 // enter main loop
271 while(eCAL::Ok())
272 {
273 // sleep main thread for 1 second
274 eCAL::Process::SleepMS(1000);
275 }
276
277 // finalize eCAL API
278 eCAL::Finalize();
279
280 return(0);
281}
1import sys
2import time
3import typing
4# import the eCAL core API
5import ecal.nanobind_core as ecal_core
6# import the eCAL subscriber API
7from ecal.msg.proto.core import DynamicSubscriber as ProtobufDynamicSubscriber
8from ecal.msg.common.core import ReceiveCallbackData
9
10# eCAL receive callback
11def data_callback(publisher_id : ecal_core.TopicId, data : ReceiveCallbackData[typing.Any]) -> None:
12 output = f"""
13 ----------------------------------------------
14 Received dynamic person message from topic {publisher_id.topic_name} in Python
15 ----------------------------------------------
16 Time : {data.send_timestamp}
17 Clock : {data.send_clock}
18 Message :
19 {data.message}
20 """
21 print(output)
22
23def main():
24 print("--------------------------------")
25 print(" Python: DYNAMIC PERSON RECEIVE ")
26 print("--------------------------------")
27
28 # Initialize eCAL. You always have to initialize eCAL before using its API.
29 # The name of our eCAL Process will be "hello receive python".
30 # This name will be visible in the eCAL Monitor, once the process is running.
31 ecal_core.initialize("dynamic person receive python")
32
33 # print eCAL version and date
34 print("eCAL {} ({})\n".format(ecal_core.get_version_string(), ecal_core.get_version_date_string()))
35
36 # Set the state for the program.
37 # You can vary between different states like healthy, warning, critical ...
38 # This can be used to communicate the application state to applications like eCAL Monitor/Sys.
39 ecal_core.process.set_state(ecal_core.process.Severity.HEALTHY, ecal_core.process.SeverityLevel.LEVEL1, "I feel good!")
40
41 # Creating the eCAL Subscriber. An eCAL Process can create multiple subscribers (and publishers).
42 # The topic we are going to receive is called "person".
43 sub = ProtobufDynamicSubscriber("person")
44 sub.set_receive_callback(data_callback)
45
46 # Creating an infinite loop.
47 # eCAL Supports a stop signal; when an eCAL Process is stopped, eCAL::Ok() will return false.
48 while ecal_core.ok():
49 time.sleep(0.5) # Sleep for 500ms to reduce CPU usage.
50
51 # Finalize eCAL.
52 # You should always do that before your application exits.
53 ecal_core.finalize()
54
55if __name__ == "__main__":
56 main()
57
├─ C++ │ └─person_receive.cpp
│ └─ Python └─person_receive.py