Reading CSV and Publishing to Kafka with Apache Avro and Schema Registry in C++ (Part 1)
Reading CSV and Publishing to Kafka with Apache Avro and Schema Registry in C++ (Part 1)
This project builds upon my previous work, Enhancing CSV Processing in C++: Leveraging the Decorator Design Pattern for Transformative Column Operations. This tutorial will consist of two parts, with all sections, including the previously mentioned post, utilizing the same GitHub repository referenced above. Let’s get started.
I will not delve into the process of reading the CSV file, as this has been covered in several previous posts. For this tutorial, I will assume the CSV data has already been processed and is ready to be published to Kafka. Each row will be published as a separate message to Kafka, encoded in Avro format, using a topic and schema defined in our input YAML configuration file. In a subsequent tutorial, I will demonstrate how to subscribe to the topic, read, and decode the messages.
The schema will be automatically published to a schema registry, with the schema ID included in each message. This approach eliminates the need to prepend the entire JSON schema to every message, significantly saving space and processing time by including only the schema ID.
Let’s begin by setting up our infrastructure, which includes Kafka and the Schema Registry. We will use Docker to manage these components. Simply create a docker-compose.yml
file and run the command docker-compose up -d
to start the services.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: "true"
CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"
kafka:
image: confluentinc/cp-kafka
hostname: kafka
container_name: kafka
command: ["tail", "-f", "/dev/null"]
network_mode: "host"
schema-registry:
image: confluentinc/cp-schema-registry
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "broker:29092"
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
control-center:
image: confluentinc/cp-enterprise-control-center
hostname: control-center
container_name: control-center
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: "broker:29092"
CONTROL_CENTER_ZOOKEEPER_CONNECT: "zookeeper:2181"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
With the infrastructure in place, we can now focus on the code implementation. As mentioned earlier, I assume the CSVRow is already prepared for processing. Our task will be to serialize and encode it, and then publish it to Kafka. To achieve this, we will focus on the following:
- Developing a Schema Registry client capable of registering a new Avro schema with the registry and returning its ID.
- Enhancing the Schema Registry client to fetch a schema from the registry using a schema ID.
- Extending the CsvProcessor to include methods for serializing and publishing a CSV row to Kafka using a specified Avro schema.
Schema Registry Client
In this tutorial, we will implement the Schema Registry client using the Singleton pattern. The client will feature three primary methods: one for accessing the singleton instance, another for registering a JSON schema by name, and a third for retrieving the schema ID based on the schema name. Since this tutorial focuses solely on publishing data to Kafka and does not involve consuming data at this stage, we will not include functionality for fetching the actual schema using a schema ID. This aspect will be explored in the second part of the series.
Let’s start with the interface:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/* SchemaRegistry.h */
#ifndef SCHEMA_REGISTRY_H
#define SCHEMA_REGISTRY_H
#include <string>
#include <atomic>
#include <mutex>
#include <libserdes/serdescpp.h>
#include <libserdes/serdescpp-avro.h>
class SchemaRegistry
{
private:
std::atomic<bool> m_uninitialized;
Serdes::Avro *m_serdes;
SchemaRegistry(const std::string *h);
static SchemaRegistry &instance_impl(const std::string *h);
public:
SchemaRegistry(const SchemaRegistry &) = delete;
void operator=(const SchemaRegistry &) = delete;
static void init(const std::string h);
static SchemaRegistry &instance();
int fetch_value_schema_id(const std::string &schema_name);
int register_value_schema(const std::string &schema_name, const std::string &schema_def);
};
#endif
We are utilizing libserdes to handle the integration with Avro and the registry components. Rest assured, a comprehensive README is included in the repository, providing step-by-step instructions for installation on macOS or Linux environments.
Let’s take a closer look at the actual implementation.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/* SchemaRegistry.cpp */
#include "SchemaRegistry.h"
#include "logging/Logging.h"
#include <thread>
#include <iostream>
#include <signal.h>
#include <sstream>
static const std::string name = "SchemaRegistry";
SchemaRegistry::SchemaRegistry(const std::string *h)
{
if (h)
{
Serdes::Conf *m_sconf = Serdes::Conf::create();
std::string errstr;
if (m_sconf->set("schema.registry.url", *h, errstr))
{
Logging::ERROR("Configuration failed: " + errstr, name);
kill(getpid(), SIGINT);
}
/* Default framing CP1 */
if (m_sconf->set("serializer.framing", "cp1", errstr))
{
Logging::ERROR("Configuration failed: " + errstr, name);
kill(getpid(), SIGINT);
}
m_serdes = Serdes::Avro::create(m_sconf, errstr);
if (!m_serdes)
{
Logging::ERROR("Failed to create Serdes handle: " + errstr, name);
kill(getpid(), SIGINT);
}
m_uninitialized.store(false);
}
}
SchemaRegistry &SchemaRegistry::instance_impl(const std::string *h = nullptr)
{
static SchemaRegistry i{h};
return i;
}
void SchemaRegistry::init(const std::string h)
{
instance_impl(&h);
}
SchemaRegistry &SchemaRegistry::instance()
{
SchemaRegistry &i = instance_impl();
if (i.m_uninitialized.load())
{
throw std::logic_error("SchemaRegistry was not initialized");
}
return i;
}
int SchemaRegistry::fetch_value_schema_id(const std::string &schema_name)
{
if (!schema_name.empty())
{
std::string errstr;
Serdes::Schema *schema = Serdes::Schema::get(m_serdes, schema_name + "-value", errstr);
if (schema)
{
std::stringstream ss;
ss << "Fetched schema: '"
<< schema->name() << "'"
<< ", id: "
<< schema->id();
Logging::INFO(ss.str(), name);
return schema->id();
}
else
{
std::stringstream ss;
ss << "No schema with name '"
<< schema_name << "'"
<< " found";
Logging::INFO(ss.str(), name);
}
}
return -1;
}
int SchemaRegistry::register_value_schema(const std::string &schema_name, const std::string &schema_def)
{
std::string errstr;
Serdes::Schema *schema = Serdes::Schema::add(m_serdes, schema_name + "-value", schema_def, errstr);
if (schema)
{
std::stringstream ss;
ss << "Registered schema: '"
<< schema->name() << "'"
<< ", id: "
<< schema->id();
Logging::INFO(ss.str(), name);
return schema->id();
}
else
{
std::stringstream ss;
ss << "Failed to register new schema: '"
<< schema_name << "'"
<< ", id: "
<< schema->id()
<< ", definition: "
<< schema_def;
Logging::ERROR(ss.str(), name);
}
return -1;
}
In the Singleton implementation, I use a function-local static object, which is initialized upon the first invocation of SchemaRegistry::instance()
. This ensures that the singleton is properly initialized by any dependent code that calls the function, thereby avoiding initialization order issues and ensuring thread safety. In the constructor, I initialize libserdes with the necessary properties, such as registry.url.
The SchemaRegistry::register_value_schema(const std::string, const std::string)
method accepts two parameters: a schema name and the schema represented as a JSON string. It then invokes the Serdes::Schema::add
method to submit the schema to the registry. Its counterpart, SchemaRegistry::fetch_value_schema_id(const std::string)
, accepts a schema name and invokes the Serdes::Schema::get
method to retrieve it from the registry.
To verify it, you can simply print it using the following approach:
1
2
3
4
#include <avro/Schema.hh>
// ...
avro::ValidSchema *avro_schema = schema->object();
avro_schema->toJson(std::cout);
That concludes the discussion on the SchemaRegistry. Next, we will explore how the CsvProcessor is extended to include methods for serializing and publishing a CSV row to Kafka using a specified Avro schema.
Serializing and Publishing a CSV Row to Kafka
To maintain focus and avoid cluttering the blog post with redundant details, I will not be including the full interface and implementation here, as these have already been covered in the previous post. Additionally, the complete code is available in the repository and is largely self-explanatory.
I will be focusing on two specific methods: CsvProcessor::serialize(avro::ValidSchema, const int32_t, const avro::GenericDatum datum, std::vector<char> &, std::string &)
and void CsvProcessor::publish(CSVRow &, size_t &)
. These methods are central to the functionality we’re discussing and will be explored in detail.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/* impl/CsvProcessor.cpp */
ssize_t CsvProcessor::serialize(avro::ValidSchema schema, const int32_t schema_id, const avro::GenericDatum datum, std::vector<char> &out, std::string &errstr)
{
auto output_stream = avro::memoryOutputStream();
auto encoder = avro::validatingEncoder(schema, avro::binaryEncoder());
try
{
// Encode Avro datum to Avro binary format
encoder->init(*output_stream.get());
avro::encode(*encoder, datum);
encoder->flush();
}
catch (const avro::Exception &e)
{
errstr = e.what();
return -1;
}
// Extract written bytes
auto encoded = avro::snapshot(*output_stream.get());
// Write framing [<magic byte> <schema id> <bytesavro>]
ssize_t framing_size = 5; // 1 for magic byte + 4 for schema id
int pos = out.size();
out.resize(out.size() + framing_size);
char *payload = &out[pos];
// Magic byte
payload[0] = 0;
// Schema ID
int32_t id = htonl(schema_id);
memcpy(payload + 1, &id, 4);
// Append binary encoded Avro to output vector
out.insert(out.end(), encoded->cbegin(), encoded->cend());
return out.size();
}
The key aspect of the serialization and deserialization process is that the payload must be prepended with 5 bytes: 1 byte for the magic byte and 4 bytes for the schema ID. This ensures proper identification and handling of the data schema during processing.
Once the serialization is complete, the next step is straightforward: publishing the serialized message to Kafka.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/* impl/CsvProcessor.cpp */
void CsvProcessor::publish(CSVRow &row, size_t &old_count)
{
// Create Avro record
std::vector<char> out_data;
for (auto &[topic, schema_config] : *m_schemas)
{
const avro::ValidSchema &schema = schema_config.schema;
avro::GenericDatum datum(schema);
avro::GenericRecord &record = datum.value<avro::GenericRecord>();
for (const auto &field : schema_config.columns)
{
std::string field_name = field;
auto name_it = schema_config.column_map.find(field);
if (name_it != schema_config.column_map.end())
{
field_name.assign(name_it->second);
}
/* Check max age now that we got the new field name */
if (m_max_age_config)
{
if (!field_name.compare(m_max_age_config->first))
{
long event_timestamp = stol(row[field]);
time_t now = time(NULL);
int days_since_event = (now - event_timestamp) / (60 * 60 * 24);
if (days_since_event > m_max_age_config->second)
{
++old_count;
return;
}
}
}
std::string type = "string";
auto type_it = schema_config.column_type_transforms.find(field);
if (type_it != schema_config.column_type_transforms.end())
{
type.assign(type_it->second);
}
record.setFieldAt(record.fieldIndex(field_name), Util::create_datum_for_type(row[field], type));
}
// Register schema
if (schema_config.schema_id == -1)
{
schema_config.schema_id = SchemaRegistry::instance().register_value_schema(topic, schema_config.schema.toJson());
}
std::string errstr;
if (serialize(schema, schema_config.schema_id, datum, out_data, errstr) == -1)
{
Logging::ERROR("Avro serialization failed: " + errstr, m_name);
kill(getpid(), SIGINT);
}
else
{
retry:
RdKafka::ErrorCode err = m_kafka_producer->produce(topic,
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
/* Value */
out_data.data(),
out_data.size(),
/* Key */
row[schema_config.key_column].c_str(),
/* Key len */
row[schema_config.key_column].length(),
/* Timestamp (defaults to current time) */
0,
/* Message headers, if any */
NULL,
/* Per-message opaque value passed to
* delivery report */
NULL);
if (err != RdKafka::ERR_NO_ERROR)
{
Logging::ERROR("Failed to produce to topic '" + topic + "': " + RdKafka::err2str(err), m_name);
if (err == RdKafka::ERR__QUEUE_FULL)
{
/* If the internal queue is full, wait for
* messages to be delivered and then retry.
* The internal queue represents both
* messages to be sent and messages that have
* been sent or failed, awaiting their
* delivery report callback to be called.
*
* The internal queue is limited by the
* configuration property
* queue.buffering.max.messages and queue.buffering.max.kbytes */
m_kafka_producer->poll(1000 /*block for max 1000ms*/);
goto retry;
}
}
else
{
Logging::DEBUG("Enqueued message (" + std::to_string(out_data.size()) + " bytes) for topic '" + topic + "'", m_name);
}
/* Wait for final messages to be delivered or fail.
* flush() is an abstraction over poll() which
* waits for all messages to be delivered. */
Logging::DEBUG("Flushing final messages...", m_name);
m_kafka_producer->flush(10 * 1000 /* wait for max 10 seconds */);
if (m_kafka_producer->outq_len() > 0)
{
Logging::ERROR(std::to_string(m_kafka_producer->outq_len()) + " message(s) were not delivered", m_name);
}
}
}
}
There’s quite a bit of code involved, more than what is strictly necessary for simply publishing to Kafka. However, since this is a fully-fledged, ready-to-go project that also handles CSV processing, I opted not to strip out the CSV handling and transformation logic. Instead, I’ll walk you through it, and you can decide whether you’d like to skip it and go directly to the m_kafka_producer->produce()
part.
The first step in the process is constructing a new Avro schema from the CSV row. Keep in mind that if the schema is not already in our registry, it is automatically submitted—there’s no need to manually create it in the registry. You can think of the CSV row as a map, where each column value is easily accessible by the column name.
One transformation we allow is renaming CSV columns in our schema before publishing them to Kafka. To handle this, we need to check if any renaming steps are specified in our YAML configuration file, specifically under the column_map member of the SchemaConfig. For example, if a CSV column named “A” needs to be renamed to “MyA” in the Kafka message, this is where that change happens.
We also need to check the timestamp field, assuming the CSV file contains one. We discard rows that are considered too old, with the maximum age configured in our input YAML configuration file using the max_age property. Rows that exceed the age threshold are ignored, and we keep track of the count of discarded rows.
After applying the necessary transformations and validations, we set the type for each field as per the YAML configuration. Once the schema is finalized, we check if it already exists in the registry. If it doesn’t, we submit the schema using the methods we’ve previously discussed.
Finally, we serialize the data using the schema and schema ID, and then publish it to Kafka using the RdKafka::Producer
.
I won’t go into the details of how we instantiate the RdKafka::Producer
, as that is handled in main.cpp
. However, here’s a brief glimpse of how the setup would look:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/* main.cpp */
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
std::string errstr;
if (conf->set("bootstrap.servers", kafka_config["bootstrap.servers"], errstr) != RdKafka::Conf::CONF_OK)
{
Logging::ERROR(errstr, name);
kill(getpid(), SIGINT);
}
if (conf->set("client.id", kafka_config["client.id"], errstr) != RdKafka::Conf::CONF_OK)
{
Logging::ERROR(errstr, name);
kill(getpid(), SIGINT);
}
/* Set the delivery report callback.
* This callback will be called once per message to inform
* the application if delivery succeeded or failed.
* See dr_msg_cb() above.
* The callback is only triggered from ::poll() and ::flush().
*
* IMPORTANT:
* Make sure the DeliveryReport instance outlives the Producer object,
* either by putting it on the heap or as in this case as a stack variable
* that will NOT go out of scope for the duration of the Producer object.
*/
KafkaDeliveryReportCb ex_dr_cb;
if (conf->set("dr_cb", &ex_dr_cb, errstr) != RdKafka::Conf::CONF_OK)
{
Logging::ERROR(errstr, name);
kill(getpid(), SIGINT);
}
RdKafka::Producer *kafka_producer = RdKafka::Producer::create(conf, errstr);
if (!kafka_producer)
{
Logging::ERROR("Failed to create Kafka producer: " + errstr, name);
kill(getpid(), SIGINT);
}
That concludes this part of the series. For a more comprehensive understanding of the code, feel free to check out the repository. Enjoy exploring the project!