Module 20 — Kafka Consumer (Policy Sync)

Requires librdkafka. Standalone demo runs without a broker.

What you learn

How the DP application Kafka consumer receives policy updates from the PM (Provisioning Module) and applies them to the in-memory domain tables — the SYNC_COMPLETE atomic swap protocol, manual offset commit, partition rebalance callbacks, and the RCU QSBR write-side pattern.


Build and run

make

# Standalone demo (no Kafka broker needed):
./kafka_consumer --demo

# With a real Kafka broker:
./kafka_consumer localhost:9092 policy_updates

Standalone output:

=== Module 20: Kafka Consumer (Policy Sync) ===

  [BEGIN_SYNC]    group=enterprise_a — building pending table
  [ADD_DOMAIN]    group=enterprise_a  domain=google.com        action=whitelist
  [ADD_DOMAIN]    group=enterprise_a  domain=facebook.com      action=blacklist
  ...
  [SYNC_COMPLETE] group=enterprise_a — 5 domains now active

Policy lookup verification:
  [enterprise_a] google.com           → WHITELIST
  [enterprise_a] facebook.com         → BLACKLIST
  [enterprise_a] youtube.com          → NOT FOUND (removed)

The SYNC_COMPLETE protocol — why it matters

WITHOUT SYNC_COMPLETE (naive immediate apply):

  Main lcore applies ADD_DOMAIN(google.com=whitelist)
  Worker lcore processes tracker.com — MISS (not yet in table)
  → tracker.com is ALLOWED when it should be blocked
  This is a race condition: partial policy state visible to workers.

WITH SYNC_COMPLETE (buffered apply):

  BEGIN_SYNC: all updates go to pending table (invisible to workers)
  ADD_DOMAIN(google.com=whitelist)  → pending
  ADD_DOMAIN(tracker.com=blacklist) → pending
  ...
  SYNC_COMPLETE:
    1. Build new Hyperscan DB from pending domains
    2. atomic_swap(active_table ← pending_table)
    3. RCU synchronize (wait for all workers to quiesce)
    4. free(old_active_table)

  Workers only ever see either the full old policy or the full new policy.

Where this fits in the real application

the DP application main lcore control loop:
  while (running) {
      rd_kafka_poll(cdr_producer, 0);         ← Module 19

      msg = rd_kafka_consumer_poll(policy_consumer, 100);
      if (msg) {
          apply_policy_message(parse(msg));   ← THIS MODULE
          if (SYNC_COMPLETE)
              rd_kafka_commit_message(consumer, msg, 0);
      }

      cdr_batch_flush_if_timeout(&batches);   ← Module 19
      print_stats();
  }

Files

File Purpose
kafka_consumer.c Full consumer: SYNC_COMPLETE protocol, RCU pattern, partition rebalance
Makefile Links with -lrdkafka

Key concepts

1. auto.offset.reset = "earliest" for policy topics

{ "auto.offset.reset", "earliest" }

On DP restart, the consumer re-reads all policy messages from the beginning of the topic, rebuilding domain tables from scratch.

"latest" would miss all policies published before the restart, leaving domain tables empty until the next full sync.

2. enable.auto.commit = false — manual commit

{ "enable.auto.commit", "false" }

We commit ONLY at SYNC_COMPLETE:

if (pmsg.type == MSG_SYNC_COMPLETE) {
    rd_kafka_commit_message(rk, msg, 0);  /* sync commit */
}

A crash before SYNC_COMPLETE replays from BEGIN_SYNC — ensuring the entire sync unit is re-applied atomically.

3. Atomic swap + RCU QSBR (the real write-side pattern)

/* What happens at SYNC_COMPLETE in the real DP application app: */

/* Step 1: build pending Hyperscan DB from pending domain table */
hs_db_compile_for_groups(group);

/* Step 2: atomic pointer swap */
rte_atomic64_set(
    (rte_atomic64_t *)&group->domain_details_table,
    (int64_t)pending_rte_hash_table
);

/* Step 3: wait for all readers to quiesce */
rte_rcu_qsbr_synchronize(qsbr_var, RTE_QSBR_THRID_INVALID);

/* Step 4: free old table */
rte_hash_free(old_table);

Workers must call rte_rcu_qsbr_quiescent() regularly in their poll loop (between bursts). This is the RCU QSBR design — the writer side waits for all readers to checkpoint before freeing the old data.

4. Partition rebalance callback — must be registered

rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);

static void rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err,
                          rd_kafka_topic_partition_list_t *partitions, void *opaque)
{
    if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
        rd_kafka_assign(rk, partitions);
    else
        rd_kafka_assign(rk, NULL);
}

Without this callback, the consumer never actually starts receiving messages. This silent failure is a common bug when first using librdkafka.

5. rd_kafka_consumer_close() — graceful shutdown

/* CORRECT: close consumer (sends final offset commit, leaves group) */
rd_kafka_consumer_close(rk);
rd_kafka_destroy(rk);

Without rd_kafka_consumer_close(), the broker waits for session.timeout.ms (30s) before reassigning partitions to another consumer — blocking policy updates to other DP instances.


Message format (JSON from PM)

{"type":"BEGIN_SYNC",   "group_id":"enterprise_a"}
{"type":"ADD_DOMAIN",   "group_id":"enterprise_a",
 "domain":"blocked.com","action":"blacklist","category":2}
{"type":"SYNC_COMPLETE","group_id":"enterprise_a"}
{"type":"ADD_MALICIOUS","domain":"c2.evil.net","category":4,"confidence":98}
type Effect
BEGIN_SYNC Start buffering into pending table
ADD_DOMAIN Add domain to pending (or active if no sync)
REMOVE_DOMAIN Remove from active table immediately
SYNC_COMPLETE Atomic swap pending→active + commit offset
ADD_MALICIOUS Add to global malicious table (from IDPS feed)

Next module

Module 21 — Full Pipeline (Annotated Assembly): A single annotated file showing how all 20 modules connect into a complete dataplane application.


Source files

File Download
kafka_consumer.c kafka_consumer.c
Makefile Makefile