7.2.4. Cap’n Proto: AddressBook#

Cap’n Proto is an insanely fast data interchange format and capability-based RPC system.” eCAL has implemented Cap’n Proto serialization on top of the binary publisher and subscriber API, such that users can conveniently send Cap’n Proto objects. Currently, the eCAL Capnproto Message API is only available for C++.

In the Cap’n Proto example, we will use their default serialization example and send an address book with multiple entries.

7.2.4.1. AddressBook Cap’n Proto#

Cap’n Proto uses its own schema definition language to define the data structures, which is not unlike the Protobuf schema definition language.

As the sender and receiver need the same .capnp files, we place them in a separate directory next to the source directories for the sender and the receiver.

Let’s start with the capnp/addressbook.capnp file!

 1@0x9eb32e19f86ee174;
 2
 3# using Cxx = import "/capnp/c++.capnp";
 4# $Cxx.namespace("addressbook");
 5
 6struct Person {
 7  id @0 :UInt32;
 8  name @1 :Text;
 9  email @2 :Text;
10  phones @3 :List(PhoneNumber);
11
12  struct PhoneNumber {
13    number @0 :Text;
14    type @1 :Type;
15
16    enum Type {
17      mobile @0;
18      home @1;
19      work @2;
20    }
21  }
22
23  employment :union {
24    unemployed @4 :Void;
25    employer @5 :Text;
26    school @6 :Text;
27    selfEmployed @7 :Void;
28    # We assume that a person is only one of these.
29  }
30  
31  weight @8 :Float32;
32  data @9 :Data;  
33}
34
35struct AddressBook {
36  people @0 :List(Person);
37}
 AddressBook Cap'n Proto File
└─  addressbook.capnp

7.2.4.2. AddressBook Publisher#

 1#include <ecal/ecal.h>
 2#include <ecal/msg/capnproto/publisher.h>
 3#include <ecal/msg/capnproto/helper.h>
 4
 5#include <iostream>
 6#include <iterator>
 7#include <chrono>
 8#include <thread>
 9
10// capnp includes
11#ifdef _MSC_VER
12#pragma warning(push, 0)
13#endif /*_MSC_VER*/
14#include "addressbook.capnp.h" 
15#ifdef _MSC_VER
16#pragma warning(pop)
17#endif
18
19void printAddressBook(AddressBook::Builder addressBook)
20{
21  auto m_string = addressBook.toString().flatten();
22  std::cout << "our string: " << m_string.cStr() << std::endl;
23}
24
25int main()
26{
27  // initialize eCAL API
28  eCAL::Initialize("addressbook_send");
29
30  // set process state
31  eCAL::Process::SetState(eCAL::Process::eSeverity::healthy, eCAL::Process::eSeverityLevel::level1, "I feel good !");
32
33  // create a publisher (topic name "addressbook")
34  eCAL::capnproto::CPublisher<AddressBook> pub("addressbook");
35
36  capnp::MallocMessageBuilder message;
37  AddressBook::Builder addressBook = message.initRoot<AddressBook>();
38  auto people = addressBook.initPeople(2);
39
40  auto alice = people[0];
41  alice.setId(123);
42  alice.setName("Alice");
43  alice.setEmail("alice@example.com");
44
45  auto alicePhones = alice.initPhones(1);
46  alicePhones[0].setNumber("555-1212");
47  alicePhones[0].setType(Person::PhoneNumber::Type::MOBILE);
48  alice.getEmployment().setSchool("MIT");
49  alice.setWeight(60.4f);
50  
51  kj::byte* data = new kj::byte[6];
52  data[0] = 0x1F;
53  data[1] = 0x00;
54  data[2] = 0xA1;
55  data[3] = 0xB4;
56  data[4] = 0x14;
57  data[5] = 0x54;
58  capnp::Data::Builder aliceData(data, 6);
59  alice.setData(aliceData);
60
61  auto bob = people[1];
62  int bobid = 456;
63  bob.setName("Bob");
64  bob.setEmail("bob@example.com");
65
66  auto bobPhones = bob.initPhones(2);
67  bobPhones[0].setNumber("555-4567");
68  bobPhones[0].setType(Person::PhoneNumber::Type::HOME);
69  bobPhones[1].setNumber("555-7654");
70  bobPhones[1].setType(Person::PhoneNumber::Type::WORK);
71  bob.getEmployment().setUnemployed();
72  bob.setWeight(80.8f);
73
74  // enter main loop
75  while (eCAL::Ok())
76  {
77    bob.setId(++bobid);
78    // send content
79    pub.Send(message);
80
81    // print content
82    printAddressBook(addressBook);
83    std::cout << std::endl;
84
85    // sleep 500 ms
86    std::this_thread::sleep_for(std::chrono::milliseconds(500));
87  }
88
89  // finalize eCAL API
90  eCAL::Finalize();
91}

└─  C++
  └─  addressbook_send.cpp

7.2.4.3. AddressBook Subscriber#

 1#include <ecal/ecal.h>
 2#include <ecal/msg/capnproto/subscriber.h>
 3
 4#include <iostream>
 5#include <chrono>
 6#include <thread>
 7
 8
 9
10void printAddressBook(const AddressBook::Reader& addressBook)
11{
12  for (Person::Reader person : addressBook.getPeople())
13  {
14    std::cout << person.getName().cStr() << ": " << person.getEmail().cStr() << std::endl;
15    for (Person::PhoneNumber::Reader phone : person.getPhones())
16    {
17      const char* typeName = "UNKNOWN";
18      switch (phone.getType()) {
19      case Person::PhoneNumber::Type::MOBILE: typeName = "mobile"; break;
20      case Person::PhoneNumber::Type::HOME:   typeName = "home";   break;
21      case Person::PhoneNumber::Type::WORK:   typeName = "work";   break;
22      }
23      std::cout << "  " << typeName << " phone: " << phone.getNumber().cStr() << std::endl;
24    }
25    Person::Employment::Reader employment = person.getEmployment();
26
27    switch (employment.which())
28    {
29    case Person::Employment::UNEMPLOYED:
30      std::cout << "  unemployed" << std::endl;
31      break;
32    case Person::Employment::EMPLOYER:
33      std::cout << "  employer: "
34        << employment.getEmployer().cStr() << std::endl;
35      break;
36    case Person::Employment::SCHOOL:
37      std::cout << "  student at: "
38        << employment.getSchool().cStr() << std::endl;
39      break;
40    case Person::Employment::SELF_EMPLOYED:
41      std::cout << "  self-employed" << std::endl;
42      break;
43    }
44  }
45}
46
47void OnAddressbook(const eCAL::STopicId& topic_id_, AddressBook::Reader msg_, const long long time_)
48{
49  // print content
50  std::cout << "topic name : " << topic_id_.topic_name << std::endl;
51  std::cout << "time       : " << time_                << std::endl;
52  std::cout << std::endl;
53  printAddressBook(msg_);
54  std::cout << std::endl;
55}
56  
57int main()
58{
59  // initialize eCAL API
60  eCAL::Initialize("addressbook_receive");
61
62  // set process state
63  eCAL::Process::SetState(eCAL::Process::eSeverity::healthy, eCAL::Process::eSeverityLevel::level1, "I feel good !");
64
65  // create a subscriber (topic name "addressbook")
66  eCAL::capnproto::CSubscriber<AddressBook> sub("addressbook");
67
68  // add receive callback function (_1 = topic_id, _2 = msg, _3 = time)
69  auto callback = std::bind(OnAddressbook, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
70  sub.SetReceiveCallback(callback);
71
72  // enter main loop
73  while (eCAL::Ok())
74  {
75    // sleep 500 ms
76    std::this_thread::sleep_for(std::chrono::milliseconds(500));
77  }
78
79  // finalize eCAL API
80  eCAL::Finalize();
81}

└─  C++
   └─  addressbook_receive.cpp

7.2.4.4. AddressBook Dynamic Subscriber#

Using Capnproto, it is possible to receive data without knowing the structure of the data in advance. Hence you can use a dynamic subscriber to receive capnproto data, even if you do not have access to the corresponding .capnp file. This is useful for generic applications, such as the eCAL Monitor, which can display all data types without knowing them in advance.

  1#include <iostream>
  2#include <chrono>
  3#include <thread>
  4
  5#include <ecal/ecal.h>
  6#include <ecal/msg/capnproto/dynamic.h>
  7
  8
  9void dynamicPrintValue(const capnp::DynamicValue::Reader& value)
 10{
 11  using namespace capnp;
 12  // Print an arbitrary message via the dynamic API by
 13  // iterating over the schema.  Look at the handling
 14  // of STRUCT in particular.
 15
 16  switch (value.getType()) {
 17  case DynamicValue::VOID:
 18    std::cout << "";
 19    break;
 20  case DynamicValue::BOOL:
 21    std::cout << (value.as<bool>() ? "true" : "false");
 22    break;
 23  case DynamicValue::INT:
 24    std::cout << value.as<int64_t>();
 25    break;
 26  case DynamicValue::UINT:
 27    std::cout << value.as<uint64_t>();
 28    break;
 29  case DynamicValue::FLOAT:
 30    std::cout << value.as<double>();
 31    break;
 32  case DynamicValue::TEXT:
 33    std::cout << '\"' << value.as<Text>().cStr() << '\"';
 34    break;
 35  case DynamicValue::LIST: {
 36    std::cout << "[";
 37    bool first = true;
 38    for (auto element : value.as<DynamicList>()) {
 39      if (first) {
 40        first = false;
 41      }
 42      else {
 43        std::cout << ", ";
 44      }
 45      dynamicPrintValue(element);
 46    }
 47    std::cout << "]";
 48    break;
 49  }
 50  case DynamicValue::ENUM: {
 51    auto enumValue = value.as<DynamicEnum>();
 52    KJ_IF_MAYBE(enumerant, enumValue.getEnumerant()) {
 53      std::cout <<
 54        enumerant->getProto().getName().cStr();
 55    }
 56 else {
 57   // Unknown enum value; output raw number.
 58   std::cout << enumValue.getRaw();
 59 }
 60 break;
 61  }
 62  case DynamicValue::STRUCT: {
 63    std::cout << "(";
 64    auto structValue = value.as<DynamicStruct>();
 65    bool first = true;
 66    for (auto field : structValue.getSchema().getFields()) {
 67      if (!structValue.has(field)) continue;
 68      if (first) {
 69        first = false;
 70      }
 71      else {
 72        std::cout << ", ";
 73      }
 74      std::cout << field.getProto().getName().cStr()
 75        << " = ";
 76      dynamicPrintValue(structValue.get(field));
 77    }
 78    std::cout << ")";
 79    std::cout << "\n";
 80    break;
 81  }
 82  default:
 83    // There are other types, we aren't handling them.
 84    std::cout << "?";
 85    break;
 86  }
 87}
 88
 89int main()
 90{
 91  // initialize eCAL API
 92  eCAL::Initialize("addressbook_receive_dynamic");
 93
 94  eCAL::Process::SetState(eCAL::Process::eSeverity::healthy, eCAL::Process::eSeverityLevel::level1, "I feel good !");
 95
 96  // create a subscriber (topic name "addressbook")
 97  eCAL::capnproto::CDynamicSubscriber sub("addressbook");
 98
 99  auto lambda = [](const eCAL::STopicId& /*topic_id_*/, const capnp::DynamicValue::Reader& msg_, long long /*time_*/, long long /*clock_*/) -> void {
100    dynamicPrintValue(msg_);
101  };
102  sub.SetReceiveCallback(lambda);
103
104  // enter main loop
105  while (eCAL::Ok())
106  {
107    // sleep 500 ms
108    std::this_thread::sleep_for(std::chrono::milliseconds(500));
109  }
110
111  // finalize eCAL API
112  eCAL::Finalize();
113  
114  return(0);
115}

└─  C++
   └─  addressbook_receive_dynamic.cpp