Module 19 — Kafka Producer (CDR Export)
Requires librdkafka. Needs a running Kafka broker for full demo.
What you learn
How to implement a Kafka producer in C using librdkafka for CDR
(Charging Data Record) export — the pattern used in the DP application to publish
every policy decision to a Kafka topic for downstream billing, analytics,
and SIEM systems.
Covers producer config, delivery report callbacks, per-worker CDR batching, timer-based flush, back-pressure handling, and graceful shutdown with flush.
Kafka in the DP application architecture
the DP application (this module: producer)
│
│ rd_kafka_produce() CDRs → Kafka broker
│ Topic: dp_cdr
│
├── Consumer: Billing microservice (charges subscribers)
├── Consumer: Analytics (traffic patterns, blocked categories)
└── Consumer: SIEM (security incidents from sinkhole events)
the DP application (Module 20: consumer)
│
│ rd_kafka_consumer_poll() ← Kafka broker
│ Topic: policy_updates (from the Provisioning Module)
│
└── Update domain_details_table, recompile Hyperscan DB
Setup
# Install librdkafka
dnf install librdkafka librdkafka-devel # RedHat/Rocky
apt-get install librdkafka-dev # Ubuntu
# Start a test Kafka broker (Docker)
docker run -d -p 9092:9092 --name kafka apache/kafka:3.7.0
# Build and run
make
./kafka_producer localhost:9092 dp_cdr
Files
| File | Purpose |
|---|---|
kafka_producer.c |
Full producer: config, CDR struct, batch, delivery callback, shutdown |
Makefile |
Links with -lrdkafka (pkg-config aware) |
Key concepts
1. Delivery report callback — the only true confirmation
rd_kafka_conf_set_dr_msg_cb(conf, delivery_report_cb);
static void delivery_report_cb(rd_kafka_t *rk,
const rd_kafka_message_t *msg, void *opaque)
{
if (msg->err) {
LOG_WARN("CDR delivery failed: %s", rd_kafka_err2str(msg->err));
} else {
/* msg->offset: broker confirmed receipt at this offset */
}
}
rd_kafka_produce() returning 0 means the message entered the local queue.
The broker has NOT received it yet. The delivery callback fires (via
rd_kafka_poll()) only after the broker ACKs the message.
2. rd_kafka_poll() — must be called regularly
/* In the main lcore control loop (every 100ms): */
rd_kafka_poll(g_producer, 0); /* 0 = non-blocking, process pending callbacks */
Without rd_kafka_poll(), the delivery callback NEVER fires — even if the
broker received all messages. The local queue fills up, and subsequent
rd_kafka_produce() calls return QUEUE_FULL.
3. Batching — reducing per-message overhead
With CDR_BATCH_MAX=256:
Accumulate 256 records → burst to Kafka queue in one tight loop
Timer-based flush (CDR_FLUSH_MS=100):
Even at low traffic (10 CDRs/sec):
100ms timer fires → flush 1-9 records → no CDR older than 100ms
4. Message key — per-subscriber ordering
uint32_t key_ip = htonl(rec->subscriber_ip);
rd_kafka_produce(topic, RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY,
payload, len,
&key_ip, sizeof(key_ip), /* KEY */
NULL);
Kafka partitions messages by hash of the key. Using subscriber IP as key means all CDRs for one subscriber land in one partition — in order.
5. RD_KAFKA_MSG_F_COPY vs RD_KAFKA_MSG_F_FREE
RD_KAFKA_MSG_F_COPY: librdkafka copies payload — you can reuse buffer immediately
RD_KAFKA_MSG_F_FREE: librdkafka frees payload via free() after delivery
In the DP application, MSG_F_COPY is used because the JSON buffer is stack-allocated
and reused for each CDR.
6. Back-pressure: QUEUE_FULL
if (err == -1 && rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
rd_kafka_poll(producer, 100); /* wait 100ms, process callbacks, free slots */
/* retry once */
}
In the DP application, CDR loss is acceptable (< 0.01% target). If the queue stays full, CDRs are dropped rather than blocking the packet processing pipeline.
7. Graceful shutdown
/* WRONG: destroy immediately — queued CDRs lost */
rd_kafka_destroy(producer);
/* CORRECT: flush first */
rd_kafka_flush(producer, 5000); /* wait up to 5s for delivery */
rd_kafka_topic_destroy(topic);
rd_kafka_destroy(producer);
CDR JSON format
{
"ts": 1717430400123,
"ip": "192.168.1.42",
"subscriber_id": "subscriber-42",
"domain": "malware.ru",
"qtype": 1,
"action": "sinkhole",
"group": 3,
"category": "0x00000001"
}
Kafka config reference
| Property | the DP application value | Effect |
|---|---|---|
bootstrap.servers |
from config | Broker address(es) |
acks |
"1" |
Leader ACK only (speed over durability) |
linger.ms |
5 |
Wait up to 5ms to fill a batch |
compression.type |
snappy |
Reduce network + disk by ~3-5× |
queue.buffering.max.messages |
100000 |
Drop CDRs beyond this |
Next module
Module 20 — Kafka Consumer (Policy Sync): The Kafka consumer that receives policy updates from the PM and applies them to the in-memory data plane.
Source files
| File | Download |
|---|---|
kafka_producer.c |
kafka_producer.c |
Makefile |
Makefile |