Consuming From Kafka with Apache Avro and Schema Registry in C++ (Part 2)
Consuming From Kafka with Apache Avro and Schema Registry in C++ (Part 2)
This project builds upon my previous work, Reading CSV and Publishing to Kafka with Apache Avro and Schema Registry in C++ (Part 1).
In this blog post, I will demonstrate how to consume data from Kafka that is encoded in the Avro format. Specifically, I will walk through the process of extracting the Avro schema ID, fetching the corresponding schema from the schema registry, and decoding the data. As always, the complete source code for this tutorial is available in the GitHub repository linked above. The project is fully developed, including makefiles, logging components, and more.
The decoded data is persisted into a PostgreSQL database. The project reads triples from a Kafka topic representing subject-predicate-object data. For this example, I generated sample data modeling an organized crime family. The data includes relationships (predicates) among family members, as well as connections to telecommunication identifiers such as email addresses, phone numbers, and social media accounts (e.g., Facebook IDs). Other relationships, such as crimes committed and affiliations with other organizations, are also represented.
These triples are then stored in two database tables: one for objects and another for relationships. An example input file containing such triples might look like this:
Source | Target | Target |
---|---|---|
Don Vito | Consigliere Carlo | Advisor |
Don Vito | Underboss Marco | Underboss |
Underboss Marco | Capo Salvatore | Captain |
Looking at the first row, we see that the subject “Don Vito” is an Advisor (relationship or predicate) to “Consigliere Carlo.” The objects table will include entries for both “Don Vito” (with id=1
) and “Consigliere Carlo” (with id=2
) as distinct objects. Meanwhile, the relationships table will contain a row with source_id=1
, target_id=2
, and name='Advisor'
, representing the described relationship. The complete files are included in the repository associated with the first part of this series.
To ensure everything is set up smoothly, here is the docker-compose.yml
file to help you get started with the database. Simply run docker-compose up
to initialize the environment.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
version: "3.9"
services:
db:
image: postgres
restart: always
shm_size: 128mb
environment:
POSTGRES_DB: odynet
POSTGRES_USER: postgres
POSTGRES_PASSWORD: example
ports:
- 5432:5432
adminer:
image: adminer
restart: always
ports:
- 8080:8080
The database schema is also provided below:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
DROP TABLE IF EXISTS relationships;
DROP TABLE IF EXISTS objects;
CREATE TABLE IF NOT EXISTS objects(
id SERIAL PRIMARY KEY,
object_name VARCHAR(50),
object_type VARCHAR(50),
created_at DATE
);
ALTER TABLE objects ADD CONSTRAINT objects_unique_constraint UNIQUE (object_name, object_type);
CREATE TABLE IF NOT EXISTS relationships(
id SERIAL PRIMARY KEY,
source_id integer REFERENCES objects (id),
target_id integer REFERENCES objects (id),
relationship_name VARCHAR(50)
);
ALTER TABLE relationships ADD CONSTRAINT relationships_unique_constraint UNIQUE (source_id, target_id, relationship_name);
Database
The database client is implemented using the Singleton pattern. It provides methods for accessing the singleton instance, as well as for inserting and retrieving objects and relationships. The constructor accepts a PostgreSQL connection string, which is used to initialize the internal pqxx::connection member variable.
Let’s start with the interface:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/* Database.h */
#ifndef DATABASE_H
#define DATABASE_H
#include <iostream>
#include <pqxx/pqxx>
#include <string>
class Database {
public:
~Database();
bool insert_object(const std::string &object_name,
const std::string &object_type,
const std::string &created_at);
int get_object_id(const std::string &object_name);
bool insert_relationship(const int source_id,
const int target_id,
const std::string &relationship_name);
static Database &instance();
static void init(const std::string &url);
private:
Database(const std::string *url);
void prepare_insert(pqxx::connection &conn);
static Database &instance_impl(const std::string *url);
pqxx::connection m_conn;
const std::string m_insert_object_stmt{
"INSERT INTO objects(object_name, object_type, created_at) VALUES ($1, $2, $3::date) ON CONFLICT ON CONSTRAINT "
"objects_unique_constraint DO NOTHING RETURNING id"};
const std::string m_object_id_for_name_stmt{"SELECT id FROM objects WHERE object_name = $1"};
const std::string m_insert_relationship_stmt{
"INSERT INTO relationships(source_id, target_id, relationship_name) VALUES ($1, $2, $3) ON CONFLICT ON "
"CONSTRAINT relationships_unique_constraint DO NOTHING"};
};
#endif
Let’s take a closer look at the actual implementation.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/* Database.cpp */
#include "Database.h"
#include <iostream>
Database::Database(const std::string *url) : m_conn(*url) { prepare_insert(m_conn); }
Database &Database::instance_impl(const std::string *url = nullptr) {
static Database i{url};
return i;
}
void Database::init(const std::string &url) { instance_impl(&url); }
Database &Database::instance() {
Database &i = instance_impl();
return i;
}
void Database::prepare_insert(pqxx::connection &conn) {
conn.prepare("insert_object", m_insert_object_stmt);
conn.prepare("insert_relationship", m_insert_relationship_stmt);
conn.prepare("select_object_id", m_object_id_for_name_stmt);
}
int Database::get_object_id(const std::string &object_name) {
if (m_conn.is_open()) {
pqxx::work transaction{m_conn};
pqxx::result r = transaction.exec_prepared("select_object_id", object_name);
transaction.commit();
for (auto const &row : r) {
const pqxx::field field = row[0];
return field.as<int>();
}
} else {
std::cout << "Database connection is not open!" << std::endl;
}
return 0;
}
bool Database::insert_object(const std::string &object_name,
const std::string &object_type,
const std::string &created_at) {
if (m_conn.is_open()) {
pqxx::work transaction{m_conn};
pqxx::result r;
for (int i = 0; i < 2; i++) {
r = transaction.exec_prepared("insert_object",
object_name,
object_type,
created_at);
}
transaction.commit();
} else {
std::cout << "Database connection is not open!" << std::endl;
return false;
}
return true;
}
bool Database::insert_relationship(const int source_id,
const int target_id,
const std::string &relationship_name) {
if (m_conn.is_open()) {
pqxx::work transaction{m_conn};
pqxx::result r;
for (int i = 0; i < 2; i++) {
r = transaction.exec_prepared("insert_relationship",
source_id,
target_id,
relationship_name);
}
transaction.commit();
} else {
std::cout << "Database connection is not open!" << std::endl;
return false;
}
return true;
}
Database::~Database() { m_conn.close(); }
This is fairly straightforward and thoroughly documented in the libpqxx documentation.
To initialize the database client, simply include the following code in the main.cpp
file:
1
Database::init("hostaddr=127.0.0.1 port=5432 dbname=odynet user=postgres password=example");
Kafka Consumer
Let’s start by examining the main()
method. The configuration file, passed as a command-line argument, is read to extract essential Kafka properties such as bootstrap servers, schema registry URL, and client ID. We use librdkafka as our Kafka client library.
Once the configuration is loaded and the RdKafka::Consumer
is set up, we initialize our custom implementation of KafkaConsumerCallback
. This class is responsible for handling incoming messages and performs the core tasks of decoding and persisting them.
Finally, we enter the main loop, where the application waits for new messages to arrive and processes them as they are consumed.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/* main.cpp */
int main(int argc, char *argv[]) {
std::string config_file;
parse_args(argc, argv, config_file);
ConfigParser &config = ConfigParser::instance(config_file);
Database::init("hostaddr=127.0.0.1 port=5432 dbname=odynet user=postgres password=example");
/*************************************************************************
*
* KAFKA
*
*************************************************************************/
std::map<std::string, std::string> kafka_config = config.kafka();
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
std::string errstr;
if (conf->set("bootstrap.servers", kafka_config["bootstrap.servers"], errstr) != RdKafka::Conf::CONF_OK) {
exit(1);
}
if (conf->set("client.id", kafka_config["client.id"], errstr) != RdKafka::Conf::CONF_OK) {
exit(1);
}
conf->set("enable.partition.eof", "true", errstr);
// Create a consumer handle
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
if (!consumer) {
exit(1);
}
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
std::string topic_str = "spo";
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, tconf, errstr);
if (!topic) {
exit(1);
}
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
int32_t partition = 0;
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
if (resp != RdKafka::ERR_NO_ERROR) {
exit(1);
}
KafkaConsumerCallback consumer_cb;
int use_ccb = 0;
size_t errors = 0;
while (1) {
if (use_ccb) {
consumer->consume_callback(topic, partition, 1000, &consumer_cb, &use_ccb);
} else {
RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
if (!consumer_cb.consume_message(msg)) {
++errors;
}
delete msg;
}
consumer->poll(0);
}
consumer->stop(topic, partition);
consumer->poll(1000);
delete topic;
delete consumer;
return 0;
}
Next, let’s look at the KafkaConsumerCallback
class. This class is a subclass of RdKafka::ConsumeCb
and implements the RdKafka::ConsumeCb::consume_cb()
method. Implementing this method is necessary if we want to leverage the RdKafka::Consumer::consume_callback()
interface.
Additionally, we define our own KafkaConsumerCallback::consume_message(RdKafka::Message *message)
method, which is the method invoked in the main loop described earlier (i.e. non callback mode).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* KafkaConsumerCallback.h */
#ifndef KAFKA_CONSUMER_CALLBACK_H
#define KAFKA_CONSUMER_CALLBACK_H
#include <librdkafka/rdkafkacpp.h>
#include <cstring>
#include <iostream>
#include "SchemaRegistry.h"
class KafkaConsumerCallback : public RdKafka::ConsumeCb {
public:
KafkaConsumerCallback();
void consume_cb(RdKafka::Message &msg, void *opaque) override;
bool consume_message(RdKafka::Message *message);
~KafkaConsumerCallback();
private:
const std::string m_name = "KafkaConsumerCallback";
Serdes::Schema *m_schema;
int avro2json(Serdes::Schema *schema, const avro::GenericDatum *datum, std::string &str, std::string &errstr);
size_t deserialize(RdKafka::Message *message);
};
#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/* KafkaConsumerCallback.cpp */
#include "KafkaConsumerCallback.h"
#include <cpprest/http_client.h> // web::json::value
#include <ctime>
#include <iomanip>
#include <sstream>
#include "Database.h"
#include "logging/Logging.h"
KafkaConsumerCallback::KafkaConsumerCallback() {
m_schema = SchemaRegistry::instance().fetch_value_schema("spo");
}
bool KafkaConsumerCallback::consume_message(RdKafka::Message *message) {
bool exit_eof = false;
switch (message->err()) {
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
return deserialize(message) > 0;
break;
case RdKafka::ERR__PARTITION_EOF:
/* Last message */
if (exit_eof) {
return false;
}
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
Logging::ERROR("Consume failed: " + message->errstr(), m_name);
return false;
break;
default:
/* Errors */
Logging::ERROR("Consume failed: " + message->errstr(), m_name);
return false;
}
return true;
}
void KafkaConsumerCallback::consume_cb(RdKafka::Message &msg, void *opaque) {
consume_message(&msg);
}
int KafkaConsumerCallback::avro2json(Serdes::Schema *schema,
const avro::GenericDatum *datum,
std::string &str,
std::string &errstr) {
avro::ValidSchema *avro_schema = schema->object();
/* JSON encoder */
avro::EncoderPtr json_encoder = avro::jsonEncoder(*avro_schema);
/* JSON output stream */
std::ostringstream oss;
auto json_os = avro::ostreamOutputStream(oss);
try {
/* Encode Avro datum to JSON */
json_encoder->init(*json_os.get());
avro::encode(*json_encoder, *datum);
json_encoder->flush();
} catch (const avro::Exception &e) {
errstr = std::string("Binary to JSON transformation failed: ") + e.what();
Logging::ERROR(errstr, m_name);
return -1;
}
str = oss.str();
return 0;
}
size_t KafkaConsumerCallback::deserialize(RdKafka::Message *message) {
std::string errstr;
std::string out;
avro::GenericDatum *d = NULL;
ssize_t bytes_read =
SchemaRegistry::instance().m_serdes->deserialize(&m_schema, &d, message->payload(), message->len(), errstr);
if (bytes_read == -1) {
Logging::ERROR("Serdes::Avro::deserialize() failed to deserialize: " + errstr, m_name);
} else {
Logging::INFO("Serdes::Avro::deserialize() read : " + std::to_string(bytes_read) + " bytes", m_name);
}
if (avro2json(m_schema, d, out, errstr) == -1) {
Logging::ERROR("KafkaConsumerCallback::avro2json() failed to deserialize: " + errstr, m_name);
}
if (!out.empty()) {
web::json::value json_value = web::json::value::parse(out);
std::string subject = json_value["subject"].as_string();
std::string predicate = json_value["predicate"].as_string();
std::string object = json_value["object"].as_string();
auto t = std::time(nullptr);
auto tm = *std::localtime(&t);
std::ostringstream oss;
oss << std::put_time(&tm, "%Y-%m-%d %H:%M:%S");
std::string created_at = oss.str();
if (Database::instance().insert_object(subject, "MyObjectType", created_at) &&
Database::instance().insert_object(object, "MyObjectType", created_at)) {
int source_id = Database::instance().get_object_id(subject);
int target_id = Database::instance().get_object_id(object);
if (!Database::instance().insert_relationship(source_id, target_id, predicate)) {
Logging::ERROR("Could not persist predicate", m_name);
}
} else {
Logging::ERROR("Could not persist either subject or object", m_name);
}
}
delete d;
return out.length();
}
KafkaConsumerCallback::~KafkaConsumerCallback() {
delete m_schema;
}
The KafkaConsumerCallback::consume_message()
method is straightforward. Since KafkaConsumer::consume()
can either consume a message or receive an error event, we first check the message’s error string to determine if it represents an error event. If no error is detected, we proceed with deserializing the message.
We begin deserialization using the Serdes::Avro::deserialize()
method. This method attempts to deserialize the binary buffer payload of size size into a generic Avro datum. On success, it returns the number of bytes read from payload; on failure, it returns -1 and sets the errstr
accordingly. If an error occurs (i.e., Serdes::Avro::deserialize()
returns -1), we stop processing the message.
If Serdes::Avro::deserialize()
succeeds, we proceed with further processing using our custom serialization method, KafkaConsumerCallback::avro2json()
. This method operates on the populated generic Avro datum and converts it into a JSON representation, writing the decoded message to the provided std::string
output parameter.
If the output parameter is not empty, we convert it into a JSON object and persist it to the database, as described earlier. I’ll leave the specifics of the persistence process to you.
I hope you found this post informative and learned something new. As always, you can explore the complete code in the GitHub repository linked above.