7.2.4. Cap’n Proto: AddressBook#
“Cap’n Proto is an insanely fast data interchange format and capability-based RPC system.” eCAL has implemented Cap’n Proto serialization on top of the binary publisher and subscriber API, such that users can conveniently send Cap’n Proto objects. Currently, the eCAL Capnproto Message API is only available for C++.
In the Cap’n Proto example, we will use their default serialization example and send an address book with multiple entries.
7.2.4.1. AddressBook Cap’n Proto#
Cap’n Proto uses its own schema definition language to define the data structures, which is not unlike the Protobuf schema definition language.
As the sender and receiver need the same .capnp files, we place them in a separate directory next to the source directories for the sender and the receiver.
Let’s start with the capnp/addressbook.capnp
file!
1@0x9eb32e19f86ee174;
2
3# using Cxx = import "/capnp/c++.capnp";
4# $Cxx.namespace("addressbook");
5
6struct Person {
7 id @0 :UInt32;
8 name @1 :Text;
9 email @2 :Text;
10 phones @3 :List(PhoneNumber);
11
12 struct PhoneNumber {
13 number @0 :Text;
14 type @1 :Type;
15
16 enum Type {
17 mobile @0;
18 home @1;
19 work @2;
20 }
21 }
22
23 employment :union {
24 unemployed @4 :Void;
25 employer @5 :Text;
26 school @6 :Text;
27 selfEmployed @7 :Void;
28 # We assume that a person is only one of these.
29 }
30
31 weight @8 :Float32;
32 data @9 :Data;
33}
34
35struct AddressBook {
36 people @0 :List(Person);
37}
AddressBook Cap'n Proto File
└─ addressbook.capnp
7.2.4.2. AddressBook Publisher#
1#include <ecal/ecal.h>
2#include <ecal/msg/capnproto/publisher.h>
3#include <ecal/msg/capnproto/helper.h>
4
5#include <iostream>
6#include <iterator>
7#include <chrono>
8#include <thread>
9
10// capnp includes
11#ifdef _MSC_VER
12#pragma warning(push, 0)
13#endif /*_MSC_VER*/
14#include "addressbook.capnp.h"
15#ifdef _MSC_VER
16#pragma warning(pop)
17#endif
18
19void printAddressBook(AddressBook::Builder addressBook)
20{
21 auto m_string = addressBook.toString().flatten();
22 std::cout << "our string: " << m_string.cStr() << std::endl;
23}
24
25int main()
26{
27 // initialize eCAL API
28 eCAL::Initialize("addressbook_send");
29
30 // set process state
31 eCAL::Process::SetState(eCAL::Process::eSeverity::healthy, eCAL::Process::eSeverityLevel::level1, "I feel good !");
32
33 // create a publisher (topic name "addressbook")
34 eCAL::capnproto::CPublisher<AddressBook> pub("addressbook");
35
36 capnp::MallocMessageBuilder message;
37 AddressBook::Builder addressBook = message.initRoot<AddressBook>();
38 auto people = addressBook.initPeople(2);
39
40 auto alice = people[0];
41 alice.setId(123);
42 alice.setName("Alice");
43 alice.setEmail("alice@example.com");
44
45 auto alicePhones = alice.initPhones(1);
46 alicePhones[0].setNumber("555-1212");
47 alicePhones[0].setType(Person::PhoneNumber::Type::MOBILE);
48 alice.getEmployment().setSchool("MIT");
49 alice.setWeight(60.4f);
50
51 kj::byte* data = new kj::byte[6];
52 data[0] = 0x1F;
53 data[1] = 0x00;
54 data[2] = 0xA1;
55 data[3] = 0xB4;
56 data[4] = 0x14;
57 data[5] = 0x54;
58 capnp::Data::Builder aliceData(data, 6);
59 alice.setData(aliceData);
60
61 auto bob = people[1];
62 int bobid = 456;
63 bob.setName("Bob");
64 bob.setEmail("bob@example.com");
65
66 auto bobPhones = bob.initPhones(2);
67 bobPhones[0].setNumber("555-4567");
68 bobPhones[0].setType(Person::PhoneNumber::Type::HOME);
69 bobPhones[1].setNumber("555-7654");
70 bobPhones[1].setType(Person::PhoneNumber::Type::WORK);
71 bob.getEmployment().setUnemployed();
72 bob.setWeight(80.8f);
73
74 // enter main loop
75 while (eCAL::Ok())
76 {
77 bob.setId(++bobid);
78 // send content
79 pub.Send(message);
80
81 // print content
82 printAddressBook(addressBook);
83 std::cout << std::endl;
84
85 // sleep 500 ms
86 std::this_thread::sleep_for(std::chrono::milliseconds(500));
87 }
88
89 // finalize eCAL API
90 eCAL::Finalize();
91}
└─ C++
└─ addressbook_send.cpp
7.2.4.3. AddressBook Subscriber#
1#include <ecal/ecal.h>
2#include <ecal/msg/capnproto/subscriber.h>
3
4#include <iostream>
5#include <chrono>
6#include <thread>
7
8
9
10void printAddressBook(const AddressBook::Reader& addressBook)
11{
12 for (Person::Reader person : addressBook.getPeople())
13 {
14 std::cout << person.getName().cStr() << ": " << person.getEmail().cStr() << std::endl;
15 for (Person::PhoneNumber::Reader phone : person.getPhones())
16 {
17 const char* typeName = "UNKNOWN";
18 switch (phone.getType()) {
19 case Person::PhoneNumber::Type::MOBILE: typeName = "mobile"; break;
20 case Person::PhoneNumber::Type::HOME: typeName = "home"; break;
21 case Person::PhoneNumber::Type::WORK: typeName = "work"; break;
22 }
23 std::cout << " " << typeName << " phone: " << phone.getNumber().cStr() << std::endl;
24 }
25 Person::Employment::Reader employment = person.getEmployment();
26
27 switch (employment.which())
28 {
29 case Person::Employment::UNEMPLOYED:
30 std::cout << " unemployed" << std::endl;
31 break;
32 case Person::Employment::EMPLOYER:
33 std::cout << " employer: "
34 << employment.getEmployer().cStr() << std::endl;
35 break;
36 case Person::Employment::SCHOOL:
37 std::cout << " student at: "
38 << employment.getSchool().cStr() << std::endl;
39 break;
40 case Person::Employment::SELF_EMPLOYED:
41 std::cout << " self-employed" << std::endl;
42 break;
43 }
44 }
45}
46
47void OnAddressbook(const eCAL::STopicId& topic_id_, AddressBook::Reader msg_, const long long time_)
48{
49 // print content
50 std::cout << "topic name : " << topic_id_.topic_name << std::endl;
51 std::cout << "time : " << time_ << std::endl;
52 std::cout << std::endl;
53 printAddressBook(msg_);
54 std::cout << std::endl;
55}
56
57int main()
58{
59 // initialize eCAL API
60 eCAL::Initialize("addressbook_receive");
61
62 // set process state
63 eCAL::Process::SetState(eCAL::Process::eSeverity::healthy, eCAL::Process::eSeverityLevel::level1, "I feel good !");
64
65 // create a subscriber (topic name "addressbook")
66 eCAL::capnproto::CSubscriber<AddressBook> sub("addressbook");
67
68 // add receive callback function (_1 = topic_id, _2 = msg, _3 = time)
69 auto callback = std::bind(OnAddressbook, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
70 sub.SetReceiveCallback(callback);
71
72 // enter main loop
73 while (eCAL::Ok())
74 {
75 // sleep 500 ms
76 std::this_thread::sleep_for(std::chrono::milliseconds(500));
77 }
78
79 // finalize eCAL API
80 eCAL::Finalize();
81}
└─ C++
└─ addressbook_receive.cpp
7.2.4.4. AddressBook Dynamic Subscriber#
Using Capnproto, it is possible to receive data without knowing the structure of the data in advance.
Hence you can use a dynamic subscriber to receive capnproto data, even if you do not have access to the corresponding .capnp
file.
This is useful for generic applications, such as the eCAL Monitor, which can display all data types without knowing them in advance.
1#include <iostream>
2#include <chrono>
3#include <thread>
4
5#include <ecal/ecal.h>
6#include <ecal/msg/capnproto/dynamic.h>
7
8
9void dynamicPrintValue(const capnp::DynamicValue::Reader& value)
10{
11 using namespace capnp;
12 // Print an arbitrary message via the dynamic API by
13 // iterating over the schema. Look at the handling
14 // of STRUCT in particular.
15
16 switch (value.getType()) {
17 case DynamicValue::VOID:
18 std::cout << "";
19 break;
20 case DynamicValue::BOOL:
21 std::cout << (value.as<bool>() ? "true" : "false");
22 break;
23 case DynamicValue::INT:
24 std::cout << value.as<int64_t>();
25 break;
26 case DynamicValue::UINT:
27 std::cout << value.as<uint64_t>();
28 break;
29 case DynamicValue::FLOAT:
30 std::cout << value.as<double>();
31 break;
32 case DynamicValue::TEXT:
33 std::cout << '\"' << value.as<Text>().cStr() << '\"';
34 break;
35 case DynamicValue::LIST: {
36 std::cout << "[";
37 bool first = true;
38 for (auto element : value.as<DynamicList>()) {
39 if (first) {
40 first = false;
41 }
42 else {
43 std::cout << ", ";
44 }
45 dynamicPrintValue(element);
46 }
47 std::cout << "]";
48 break;
49 }
50 case DynamicValue::ENUM: {
51 auto enumValue = value.as<DynamicEnum>();
52 KJ_IF_MAYBE(enumerant, enumValue.getEnumerant()) {
53 std::cout <<
54 enumerant->getProto().getName().cStr();
55 }
56 else {
57 // Unknown enum value; output raw number.
58 std::cout << enumValue.getRaw();
59 }
60 break;
61 }
62 case DynamicValue::STRUCT: {
63 std::cout << "(";
64 auto structValue = value.as<DynamicStruct>();
65 bool first = true;
66 for (auto field : structValue.getSchema().getFields()) {
67 if (!structValue.has(field)) continue;
68 if (first) {
69 first = false;
70 }
71 else {
72 std::cout << ", ";
73 }
74 std::cout << field.getProto().getName().cStr()
75 << " = ";
76 dynamicPrintValue(structValue.get(field));
77 }
78 std::cout << ")";
79 std::cout << "\n";
80 break;
81 }
82 default:
83 // There are other types, we aren't handling them.
84 std::cout << "?";
85 break;
86 }
87}
88
89int main()
90{
91 // initialize eCAL API
92 eCAL::Initialize("addressbook_receive_dynamic");
93
94 eCAL::Process::SetState(eCAL::Process::eSeverity::healthy, eCAL::Process::eSeverityLevel::level1, "I feel good !");
95
96 // create a subscriber (topic name "addressbook")
97 eCAL::capnproto::CDynamicSubscriber sub("addressbook");
98
99 auto lambda = [](const eCAL::STopicId& /*topic_id_*/, const capnp::DynamicValue::Reader& msg_, long long /*time_*/, long long /*clock_*/) -> void {
100 dynamicPrintValue(msg_);
101 };
102 sub.SetReceiveCallback(lambda);
103
104 // enter main loop
105 while (eCAL::Ok())
106 {
107 // sleep 500 ms
108 std::this_thread::sleep_for(std::chrono::milliseconds(500));
109 }
110
111 // finalize eCAL API
112 eCAL::Finalize();
113
114 return(0);
115}
└─ C++
└─ addressbook_receive_dynamic.cpp