M13 — Event-Driven Architecture
Phase 5
Kafka internals & delivery semantics · RabbitMQ exchange patterns · Saga & Outbox · CQRS & Event Sourcing · Idempotent consumers · C/librdkafka implementations
🎯 Why Event-Driven Architecture?
Synchronous request/response creates temporal coupling — the caller blocks until the callee responds, and a slow callee cascades latency. Event-driven architecture severs this coupling: producers emit events without knowing who consumes them, and consumers process at their own pace.
Key motivations:
Key motivations:
- Temporal decoupling: producer and consumer run independently — if the consumer is down, events buffer in the broker
- Fanout: one event reaches multiple consumers simultaneously (notifications, analytics, search indexing — all from one order-placed event)
- Audit log / replay: full event history is replayable — rebuild any read model from scratch, debug production issues with real data
- Reduces synchronous blocking chains: checkout doesn't wait for email service; email service consumes the event asynchronously
- Enables eventual consistency: services converge on consistent state over time, trading strong consistency for availability and partition tolerance
Analogy — Event streaming vs work queue:
A work queue (RabbitMQ) is like a restaurant ticket system — a ticket is torn off by one chef, cooked, and discarded. A log-based stream (Kafka) is like a newspaper printing press — each edition is stamped with a page number (offset), archived forever, and any subscriber can re-read any past edition at any time.
A work queue (RabbitMQ) is like a restaurant ticket system — a ticket is torn off by one chef, cooked, and discarded. A log-based stream (Kafka) is like a newspaper printing press — each edition is stamped with a page number (offset), archived forever, and any subscriber can re-read any past edition at any time.
📦 Message Queue Model (RabbitMQ)
- Message consumed → deleted from queue
- Push-based delivery to consumers
- Competing consumers share load
- At-most-once or at-least-once via ACK
- Dead-letter exchange handles failures
- Complex routing via exchanges
- Great for: task queues, RPC, work distribution
📜 Event Stream Model (Kafka)
- Messages retained for configurable period
- Pull-based: consumers control their pace
- Consumer groups: each partition → one consumer
- Exactly-once via idempotent producer + transactions
- Any consumer can replay from any offset
- Ordered within partition
- Great for: event sourcing, audit log, stream processing
📐 Pattern Landscape
| Pattern | Problem Solved | Trade-off |
|---|---|---|
| Outbox | Atomic: DB write + event publish in one transaction | Extra table + relay process |
| Saga (Orchestration) | Multi-service transactions without 2PC | Central coordinator = SPOF risk |
| Saga (Choreography) | Distributed coordination via events | Hard to trace overall flow |
| CQRS | Separate write model from read model | Eventual consistency on reads |
| Event Sourcing | State as immutable event log | Complex queries, snapshot management |
| Idempotent Consumer | Handle at-least-once delivery safely | Dedup table storage + lookup cost |
🗺️ Phase 5 Module Map
| Module | Topic | Key Concepts |
|---|---|---|
| M13 (this) | Event-Driven Architecture | Kafka, RabbitMQ, Saga, Outbox, CQRS, Event Sourcing |
| M14 | Stream Processing | Kafka Streams, Flink windowing, stateful operators, exactly-once |
Prerequisites: Ph2 (databases — you need to understand transactions for Outbox), Ph4 (concurrency — consumer thread pools, back-pressure)
⚙️ Kafka Architecture — Internals
Kafka's fundamental unit is the topic — a logical feed of records. Topics are split into partitions, each an ordered, immutable append-only log on disk.
Topic: orders
(3 partitions, replication-factor=2)
Partition 0 [offset 0][offset 1][offset 2]...[offset 847] ← append-only
Partition 1 [offset 0][offset 1]...[offset 391]
Partition 2 [offset 0]...[offset 1203]
┌─── Broker 1 (leader P0, follower P1) ───┐
│ Broker 2 (leader P1, follower P2) │
│ Broker 3 (leader P2, follower P0) │
└─────────────────────────────────────────┘
Producer → assigns partition by: hash(key) % num_partitions (key set)
or: round-robin (key=null)
Consumer Group A: consumer-A1 reads P0, consumer-A2 reads P1, consumer-A3 reads P2
Consumer Group B: consumer-B1 reads P0+P1, consumer-B2 reads P2 (independent read positions)
🔑 Partition Key Assignment
- Key set:
partition = murmur2(key) % num_partitions— all events for the same key go to the same partition → ordering guaranteed per key - Key null: sticky partitioner (batch to one partition, rotate after batch) — maximizes throughput
- Custom partitioner: implement
Partitionerinterface for business logic (e.g., tenant-based routing)
Hot partition: if 10% of keys account for 80% of traffic, their partition becomes a bottleneck. Spread hot keys with a suffix:
order_id + "_" + random(0,N)🔄 Consumer Group Rebalancing
When a consumer joins or leaves, the group coordinator (a broker) triggers a rebalance:
- Eager rebalance (stop-the-world): all consumers stop, revoke all partitions, reassign — processing pauses
- Cooperative rebalance (incremental): only affected partitions are moved; others keep processing — reduces pause
partition.assignment.strategy = CooperativeStickyAssignorfor incrementalsession.timeout.ms: consumer must send heartbeat within this window or be considered dead → rebalance
📊 ISR — In-Sync Replicas
Each partition has one leader (handles all reads/writes) and N follower replicas that pull from the leader. The ISR set tracks which replicas are caught up within
replica.lag.time.max.ms.
| Setting | Meaning | Trade-off |
|---|---|---|
acks=0 | Producer doesn't wait for any ack | Fastest, may lose messages |
acks=1 | Leader acknowledges write | Lost if leader fails before replication |
acks=all (-1) | All ISR members acknowledge | Slowest, strongest durability |
min.insync.replicas=2 | Minimum ISR for acks=all to succeed | Prevents silent data loss with small ISR |
Production recommendation:
acks=all + min.insync.replicas=2 + replication.factor=3 — survives one broker failure without data loss.📍 Offset Management
Each consumer group tracks its position (offset) per partition in the
Commit strategies:
__consumer_offsets internal topic.
Commit strategies:
- Auto-commit (
enable.auto.commit=true): commits periodically. Risk: commit before processing → data loss on crash; or process but not commit → reprocess on restart - Manual sync commit (
commitSync()): blocks until broker confirms. Safe but slower - Manual async commit (
commitAsync()): fire-and-forget. Higher throughput, retry on failure is tricky (stale offset may overwrite newer commit) - Best practice: manual sync commit after batch processing, async for throughput-critical paths with idempotent processing
Log compaction: Kafka can compact topics (
cleanup.policy=compact) — keeps only the latest value per key, discards older records. Used for changelogs (KTable in Kafka Streams).📐 Kafka Producer Batching & Compression
| Config | Effect | Tuning Guidance |
|---|---|---|
batch.size | Max bytes per batch per partition | Increase (e.g. 64KB) for throughput |
linger.ms | Wait up to N ms to fill a batch | 5–50ms trades latency for throughput |
compression.type | snappy/lz4/zstd per batch | zstd for best ratio, lz4 for speed |
buffer.memory | Total producer buffer bytes | Increase if producers block frequently |
max.in.flight.requests.per.connection | Concurrent unacked requests | Set to 1 for strict ordering (without idempotence) |
📬 The Three Delivery Guarantees
Message delivery semantics describe what happens when things go wrong (network timeout, broker restart, producer crash mid-send).
| Guarantee | How | Risk | Use When |
|---|---|---|---|
| At-most-once | acks=0, no retry | Messages lost if broker is down | Metrics where loss is acceptable |
| At-least-once | acks=all + retries | Duplicates on retry after timeout | Most use-cases with idempotent consumers |
| Exactly-once | Idempotent producer + transactions | Higher latency, more complex | Financial transfers, order processing |
🔑 At-Least-Once: The Duplicate Problem
With retries enabled, a producer sends a message. The broker receives it, writes it, but the network fails before sending the ACK. The producer retries — the broker receives a duplicate.
Producer ──[msg #1]──► Broker (writes to log)
│
network blip ──┘ (ACK lost)
Producer ──[msg #1 retry]──► Broker (writes duplicate!)
offset 5: msg#1
offset 6: msg#1 ← duplicate!
Solution: idempotent producer assigns each message a sequence number + producer ID (PID). The broker deduplicates within a 5-message window per (PID, partition).
✅ Exactly-Once: Idempotent Producer + Transactions
Idempotent producer (single-partition, single-session):
// Producer config for idempotence
rd_kafka_conf_set(conf, "enable.idempotence", "true", errstr, sz);
// Automatically sets: acks=all, retries=INT_MAX, max.in.flight=5
Transactional producer (multi-partition, multi-message atomic write):
// Transactional config
rd_kafka_conf_set(conf, "enable.idempotence", "true", errstr, sz);
rd_kafka_conf_set(conf, "transactional.id", "my-txn-producer-1", errstr, sz);
// Usage pattern
rd_kafka_init_transactions(rk, 10000); // once at startup
rd_kafka_begin_transaction(rk);
// ... produce messages ...
err = rd_kafka_commit_transaction(rk, 10000);
// on error:
rd_kafka_abort_transaction(rk, 10000);
Transactions guarantee atomicity across partitions: all messages commit or none do. Consumers must set
isolation.level=read_committed to skip uncommitted messages (aborted transaction leftovers).EOS (Exactly-Once Semantics) end-to-end requires: idempotent producer + transactions +
read_committed isolation + atomic offset commit within the transaction. This is what Kafka Streams provides out-of-the-box with processing.guarantee=exactly_once_v2.⚡ Consumer-Side Delivery Semantics
Delivery guarantees apply to both producer→broker and broker→consumer:
- At-most-once consumer: commit offset before processing. If processing fails, message is lost (offset already moved forward)
- At-least-once consumer: commit offset after processing. If crash after process but before commit, message is reprocessed on restart → make consumers idempotent
- Exactly-once consumer: atomic commit — use Kafka transactions to write processing result AND commit offset in one transaction
Practical guidance: for most systems, at-least-once delivery + idempotent consumers is the right balance. Pure exactly-once adds significant complexity — use only when duplicates cause real business harm (billing, double-shipping).
🐰 RabbitMQ — AMQP Model
RabbitMQ implements the AMQP 0-9-1 protocol. Messages are published to exchanges, which route to queues via bindings. Consumers subscribe to queues.
Publisher ──[msg + routing_key]──► Exchange
│
├──[binding: routing_key=*.error]──► Queue: errors
├──[binding: routing_key=orders.*]──► Queue: orders
└──[binding: fanout]──────────────► Queue: analytics
Queue: audit-log
Queue: errors ──► Consumer A (ACK → message deleted)
└──► DLX → dead-letter-queue (NACK/TTL expired)
🔀 Exchange Types
| Type | Routing Logic |
|---|---|
| direct | Exact routing key match — one queue per key |
| topic | Pattern match: * (one word) / # (zero+). e.g. order.*.created |
| fanout | Ignores routing key — copies to ALL bound queues |
| headers | Match on message headers (key-value), ignores routing key |
| default | Built-in direct exchange — routes to queue named = routing key |
💀 Dead Letter Exchange (DLX)
Messages are dead-lettered when:
- Consumer sends NACK with
requeue=false - Message TTL expires in queue
- Queue length overflow (x-max-length)
- Retry queues with exponential backoff (TTL + DLX chain)
- Poison message parking for inspection
- Alerting on repeated failures
🔁 Retry with Exponential Backoff via DLX Chain
/* Declare retry queue with TTL + DLX pointing back to main exchange */
/* main queue */
amqp_queue_declare(ch, 1, amqp_cstring_bytes("orders"), ...);
/* retry-30s: TTL=30000ms, dead-letter back to "orders-exchange" */
amqp_table_entry_t args[2] = {
{ amqp_cstring_bytes("x-message-ttl"),
{ .kind = AMQP_FIELD_KIND_I32, .value.i32 = 30000 } },
{ amqp_cstring_bytes("x-dead-letter-exchange"),
{ .kind = AMQP_FIELD_KIND_BYTES,
.value.bytes = amqp_cstring_bytes("orders-exchange") } }
};
amqp_table_t tbl = { .num_entries = 2, .entries = args };
amqp_queue_declare(ch, 1,
amqp_cstring_bytes("orders.retry.30s"),
0, 1, 0, 0, tbl);
Pattern: main queue → failure → NACK → DLX(retry-5s) → TTL expires → DLX(retry-30s) → DLX(retry-5m) → DLX(dead-letter-final). Each level doubles the delay — classic exponential backoff without consumer sleep loops.
⚖️ Consumer ACK Modes
| Mode | Behavior | Risk |
|---|---|---|
autoAck=true | Broker removes message as soon as delivered (at-most-once) | Lost if consumer crashes before processing |
| Manual ACK | Consumer confirms success; broker removes message | None — correct at-least-once |
| Manual NACK requeue=true | Message returned to front of queue | Infinite loop on poison messages |
| Manual NACK requeue=false | Message sent to DLX (if configured) or dropped | Dropped if no DLX |
| Reject | Same as NACK for single message | — |
prefetch count (
basicQos): limits unacked messages per consumer. Set to 1 for strict round-robin fairness; increase (e.g. 20–100) for throughput when processing is fast.🆚 Kafka vs RabbitMQ — When to Use Which
| Criterion | Use Kafka | Use RabbitMQ |
|---|---|---|
| Message retention | Need replay / audit log | Process-and-forget |
| Multiple consumers | Independent consumer groups reading same events | One consumer per message (competing consumers) |
| Ordering | Strict ordering per key (partition) | Best-effort (priority queues) |
| Throughput | Millions of msg/sec (sequential disk I/O) | Tens of thousands/sec |
| Routing complexity | Simple (partition key) | Rich (exchange types, header matching) |
| RPC / request-reply | Awkward | Built-in (correlation ID + reply-to) |
| Stream processing | Kafka Streams, ksqlDB native | Needs external framework |
🔄 Distributed Transactions: Why Not 2PC?
Two-Phase Commit (2PC) coordinates distributed transactions but has critical problems: the coordinator is a single point of failure, participants hold locks during phase 1 (blocking), and the protocol is blocking — a coordinator crash leaves participants in limbo. In a microservices system spanning multiple databases and services, 2PC is impractical.
The Saga pattern replaces distributed ACID transactions with a sequence of local transactions, each publishing an event. If a step fails, compensating transactions undo the completed steps.
The Saga pattern replaces distributed ACID transactions with a sequence of local transactions, each publishing an event. If a step fails, compensating transactions undo the completed steps.
🎭 Orchestration Saga
A central Saga Orchestrator sends commands and receives events, maintaining the state machine.
Cons: orchestrator = potential SPOF; logic centralized may become god object
Orchestrator
│─[ReserveInventory]──► Inventory Svc
│◄─[InventoryReserved]──┘
│─[ChargePayment]──────► Payment Svc
│◄─[PaymentFailed]──────┘ ← failure!
│─[ReleaseInventory]───► Inventory Svc compensation
Pros: easy to trace, centralized state, clear compensationsCons: orchestrator = potential SPOF; logic centralized may become god object
💃 Choreography Saga
No central coordinator — each service listens for events and emits new events.
Cons: hard to understand overall flow; compensations scattered across services
Order Svc emits [OrderCreated]
▼
Inventory Svc listens → reserves → emits [InventoryReserved]
▼
Payment Svc listens → charges → emits [PaymentFailed]
▼
Inventory Svc listens → releases → emits [InventoryReleased]
Order Svc listens → marks order failed
Pros: decentralized, services fully independentCons: hard to understand overall flow; compensations scattered across services
📤 Outbox Pattern — Atomic DB Write + Event Publish
Problem: you write to the database and then publish to Kafka. What if the process crashes between the two operations? The DB write committed but the event never published → downstream services miss the event → distributed inconsistency.
Solution: write to an
Solution: write to an
outbox table in the same local transaction as the business data. A separate relay process publishes outbox records to the broker and marks them as sent.
Business Transaction (atomic, single DB)
┌─────────────────────────────────────────────┐
│ INSERT INTO orders (id, …) │
│ INSERT INTO outbox (event_type, payload, │
│ status='PENDING') │ ← same txn!
└─────────────────────────────────────────────┘
Relay Process (separate, runs continuously)
SELECT * FROM outbox WHERE status='PENDING' LIMIT 100
FOR EACH row:
publish to Kafka/RabbitMQ
UPDATE outbox SET status='SENT', sent_at=NOW() WHERE id=row.id
Alternatives to polling relay:
CDC (Change Data Capture) via Debezium — reads DB WAL log directly,
zero-latency, no polling overhead, works with Postgres/MySQL logical replication
⚠️ Outbox: Failure Modes and Guarantees
| Failure | Outcome | Why Safe |
|---|---|---|
| Process crashes after DB write, before relay runs | Relay picks up PENDING row on restart | Row persists in outbox |
| Relay publishes to Kafka but crashes before marking SENT | Relay re-publishes on restart → duplicate | Consumer must be idempotent |
| DB transaction rolled back | Outbox row never created → no event | Atomicity maintained |
| Kafka broker down during relay | Relay retries; PENDING rows accumulate | Broker recovery unblocks relay |
The Outbox pattern provides at-least-once delivery. To prevent business harm from duplicates, combine with idempotent consumers (Tab 7).
✂️ CQRS — Command Query Responsibility Segregation
Traditional CRUD uses one model for reads and writes. As systems scale, reads and writes have very different access patterns: writes need normalized data for integrity; reads need denormalized projections for performance.
CQRS separates them into two explicit models:
CQRS separates them into two explicit models:
- Command side: mutates state, normalized DB optimized for writes and integrity (e.g., PostgreSQL with foreign keys)
- Query side: returns projections, denormalized read model optimized for queries (e.g., Elasticsearch, Redis, materialized views)
Client
│
├──[Command: PlaceOrder(items)]──► Command Handler
│ │
│ ▼
│ Write DB (PostgreSQL)
│ orders, order_items, inventory
│ │
│ [Domain Event: OrderPlaced]
│ │
│ ┌─────────────┼───────────────┐
│ ▼ ▼ ▼
│ Read DB Search Index Analytics DB
│ (denormalized (Elasticsearch) (ClickHouse)
│ order view)
│
└──[Query: GetOrderSummary(userId)]──► Query Handler
│
▼
Read DB (fast!)
✅ CQRS Benefits
- Read model can be independently scaled (horizontal replicas)
- Read model optimized for each query pattern (denormalized)
- Write model optimized for correctness (normalized, transactions)
- Multiple specialized read models from the same events
- Replay events to rebuild read models after bugs
⚠️ CQRS Trade-offs
- Eventual consistency: read model lags behind writes (typically milliseconds)
- Additional infrastructure (separate read store)
- Synchronization complexity (events must be reliably published)
- Read-your-own-writes: a user who just placed an order may see stale data on immediate reload → handle with version tokens or direct write-model reads for the write's own session
📜 Event Sourcing — State as Event Log
Instead of storing the current state, store the sequence of events that led to the current state. Current state is derived by replaying events.
Normal CRUD:
Event Sourced:
Normal CRUD:
accounts: {id:1, balance:850}Event Sourced:
event_log (account_id=1):
AccountOpened { amount: 1000 } // balance → 1000
MoneyWithdrawn { amount: 200 } // balance → 800
InterestEarned { amount: 50 } // balance → 850
// replay → current balance = 850
📸 Snapshots
Replaying 10 years of events on every read is slow. Snapshots periodically capture current state:
- Every N events, save a snapshot:
{account_id:1, balance:850, version:3} - On read: load latest snapshot, replay events since that snapshot version
- Typical threshold: every 100–1000 events
🔄 Schema Evolution
Events are immutable — you can't change past events. Handling schema changes:
- Upcasting: transform old event format to new format at read time
- Versioned events:
MoneyWithdrawn_v1vsMoneyWithdrawn_v2 - Additive changes only: add fields, never remove or rename
- Use Avro/Protobuf with Schema Registry for versioned schemas
📐 When to Use Event Sourcing (vs When Not To)
| Use Event Sourcing When… | Avoid When… |
|---|---|
| Complete audit log is required by regulation | Simple CRUD with no history requirement |
| State reconstruction / time-travel debugging needed | Team unfamiliar with the pattern — steep learning curve |
| Multiple read models from the same data | Queries span multiple aggregates (event sourcing is aggregate-scoped) |
| Business domain is naturally event-oriented (banking, order management) | Simple content management, settings, static data |
🔑 Idempotent Consumers: Handling At-Least-Once Delivery
In any reliable messaging system (Kafka with retries, RabbitMQ with NACK+requeue, Outbox relay), messages may be delivered more than once. An idempotent consumer produces the same result regardless of how many times it processes the same message.
Two approaches:
Two approaches:
- Natural idempotency: the operation is inherently idempotent — e.g.,
SET balance=850vsbalance = balance - 200. If the operation can be expressed as an absolute state (upsert), duplicates are safe. - Deduplication table: track processed message IDs; reject re-processed messages.
🗃️ Deduplication Table Pattern
Each message carries a unique idempotency key (message ID, event ID, or a business key). The consumer records processed keys in a
processed_events table.
-- Schema
CREATE TABLE processed_events (
event_id TEXT PRIMARY KEY,
processed_at TIMESTAMPTZ DEFAULT NOW()
);
-- Atomic check-and-process (PostgreSQL)
BEGIN;
-- Attempt to insert event ID (fails with UNIQUE violation if duplicate)
INSERT INTO processed_events (event_id)
VALUES ($1)
ON CONFLICT (event_id) DO NOTHING
RETURNING event_id;
-- If no row returned: already processed, skip
-- If row returned: new event, proceed with business logic
UPDATE accounts SET balance = balance - $2 WHERE id = $3;
COMMIT;
The
INSERT ... ON CONFLICT DO NOTHING + business logic must be in the same transaction for atomicity. If the transaction rolls back, the event ID is not recorded and can be safely reprocessed.⚡ Redis-based Deduplication (High Throughput)
For very high throughput, use Redis with a TTL-based dedup set instead of a DB table:
/* Check + mark with SETNX (SET if Not eXists) */
/* Returns 1 if set (new), 0 if already existed (duplicate) */
int is_new = redis_setnx(r, event_id, "1");
if (is_new) {
redis_expire(r, event_id, 86400); /* TTL: 24h — beyond message retention */
process_event(event);
} else {
/* duplicate: skip */
}
Redis dedup is best-effort — Redis can lose data if not persisted (AOF/RDB). For financial events, prefer the database dedup table for durable deduplication.
🧠 Designing Idempotency Keys
| Source | Key Strategy | Notes |
|---|---|---|
| Kafka messages | topic:partition:offset | Globally unique per message position |
| RabbitMQ messages | Set message-id AMQP property at publish time | Producer responsibility; use UUID |
| Business events | order_id:event_type | Natural key; handles schema-level dedup |
| HTTP API calls | Client-supplied Idempotency-Key header (UUID) | Store response and return cached result on repeat |
TTL management: set the dedup key TTL longer than your maximum retry window. If retries can span 24h, use 48h TTL. If your Kafka retention is 7 days, set TTL to 8 days.
🔧 Libraries Used
- librdkafka — official C Kafka client; used by the Python/Go/Ruby clients under the hood
- rabbitmq-c (amqp.h) — C AMQP 0-9-1 client for RabbitMQ
# Install (Ubuntu)
apt-get install librdkafka-dev librabbitmq-dev
# Compile
gcc -o kafka_producer kafka_producer.c -lrdkafka
gcc -o amqp_consumer amqp_consumer.c -lrabbitmq
── Implementation 1 — Kafka Producer with Delivery Reports ──
📤 Kafka Producer (librdkafka) — Full Implementation
/* kafka_producer.c — production-ready Kafka producer */
#include <librdkafka/rdkafka.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
static volatile int run = 1;
static void sigterm(int sig) { run = 0; }
/* Called for every message after produce attempt */
static void delivery_report_cb(rd_kafka_t *rk,
const rd_kafka_message_t *msg,
void *opaque) {
(void)rk; (void)opaque;
if (msg->err) {
fprintf(stderr, "[DR] FAILED: topic=%s err=%s key=%.*s\n",
rd_kafka_topic_name(msg->rkt),
rd_kafka_err2str(msg->err),
(int)msg->key_len, (const char *)msg->key);
} else {
fprintf(stdout, "[DR] OK: topic=%s partition=%" PRId32
" offset=%" PRId64 " key=%.*s\n",
rd_kafka_topic_name(msg->rkt),
msg->partition, msg->offset,
(int)msg->key_len, (const char *)msg->key);
}
}
int main(int argc, char *argv[]) {
if (argc < 4) {
fprintf(stderr, "Usage: %s <brokers> <topic> <message>\n", argv[0]);
return 1;
}
char errstr[512];
rd_kafka_conf_t *conf = rd_kafka_conf_new();
/* Broker list */
if (rd_kafka_conf_set(conf, "bootstrap.servers", argv[1],
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
return 1;
}
/* Idempotent producer: deduplicates retries automatically */
rd_kafka_conf_set(conf, "enable.idempotence", "true", errstr, sizeof(errstr));
/* Delivery callback for per-message success/failure reporting */
rd_kafka_conf_set_dr_msg_cb(conf, delivery_report_cb);
rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr, "Failed to create producer: %s\n", errstr);
return 1;
}
const char *topic = argv[2];
const char *msg = argv[3];
const char *key = "order-key-001"; /* determines partition */
signal(SIGINT, sigterm);
signal(SIGTERM, sigterm);
retry_produce:
rd_kafka_resp_err_t err = rd_kafka_producev(
rk,
RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_KEY(key, strlen(key)),
RD_KAFKA_V_VALUE((void *)msg, strlen(msg)),
RD_KAFKA_V_OPAQUE(NULL),
RD_KAFKA_V_END
);
if (err) {
fprintf(stderr, "Produce failed: %s\n", rd_kafka_err2str(err));
if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
rd_kafka_poll(rk, 1000); /* drain delivery queue, then retry */
goto retry_produce;
}
}
/* Poll for delivery reports (fires delivery_report_cb) */
rd_kafka_poll(rk, 0); /* non-blocking poll */
/* Wait for all outstanding messages to be delivered */
fprintf(stdout, "Flushing...\n");
rd_kafka_flush(rk, 10 * 1000); /* wait up to 10s */
if (rd_kafka_outq_len(rk) > 0)
fprintf(stderr, "%d message(s) not delivered\n",
rd_kafka_outq_len(rk));
rd_kafka_destroy(rk);
return 0;
}
── Implementation 2 — Kafka Consumer with Manual Offset Commit ──
📥 Kafka Consumer (librdkafka) — At-Least-Once with Manual Commit
/* kafka_consumer.c — manual offset commit consumer */
#include <librdkafka/rdkafka.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
static volatile int run = 1;
static void rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *parts,
void *opaque) {
switch (err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
fprintf(stdout, "Rebalance: assigned %d partitions\n", parts->cnt);
rd_kafka_assign(rk, parts);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
fprintf(stdout, "Rebalance: revoking partitions, committing offsets\n");
rd_kafka_commit(rk, parts, 0); /* sync commit before revoke */
rd_kafka_assign(rk, NULL);
break;
default:
rd_kafka_assign(rk, NULL);
break;
}
}
int main(int argc, char *argv[]) {
if (argc < 4) {
fprintf(stderr, "Usage: %s <brokers> <group> <topic>\n", argv[0]);
return 1;
}
char errstr[512];
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", argv[1], errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "group.id", argv[2], errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr));
/* Disable auto-commit: we commit manually after processing */
rd_kafka_conf_set(conf, "enable.auto.commit", "false", errstr, sizeof(errstr));
rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
rd_kafka_poll_set_consumer(rk);
/* Subscribe to topic */
rd_kafka_topic_partition_list_t *topics =
rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, argv[3], RD_KAFKA_PARTITION_UA);
rd_kafka_subscribe(rk, topics);
rd_kafka_topic_partition_list_destroy(topics);
int msg_count = 0;
while (run) {
rd_kafka_message_t *msg = rd_kafka_consumer_poll(rk, 1000);
if (!msg) continue;
if (msg->err) {
if (msg->err == RD_KAFKA_RESP_ERR__PARTITION_EOF)
fprintf(stdout, "Reached end of partition\n");
else
fprintf(stderr, "Consumer error: %s\n", rd_kafka_message_errstr(msg));
} else {
/* Process message */
fprintf(stdout, "Message: partition=%" PRId32
" offset=%" PRId64 " key=%.*s value=%.*s\n",
msg->partition, msg->offset,
(int)msg->key_len, (const char *)msg->key,
(int)msg->len, (const char *)msg->payload);
/* TODO: process_message(msg->payload, msg->len); */
/* Commit offset after successful processing (at-least-once) */
if (++msg_count % 100 == 0) {
/* Sync commit every 100 messages for durability */
rd_kafka_commit_message(rk, msg, /*async=*/0);
}
}
rd_kafka_message_destroy(msg);
}
/* Final commit before shutdown */
rd_kafka_commit(rk, NULL, 0);
rd_kafka_consumer_close(rk);
rd_kafka_destroy(rk);
return 0;
}
── Implementation 3 — RabbitMQ Consumer with DLX (rabbitmq-c) ──
🐰 RabbitMQ Consumer with Dead-Letter Exchange (rabbitmq-c)
/* amqp_consumer.c — RabbitMQ consumer with manual ACK and DLX */
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
static void die_on_amqp_error(amqp_rpc_reply_t x, const char *context) {
if (x.reply_type != AMQP_RESPONSE_NORMAL) {
fprintf(stderr, "%s: AMQP error\n", context);
exit(1);
}
}
int main() {
amqp_connection_state_t conn = amqp_new_connection();
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
amqp_tcp_socket_open(socket, "localhost", 5672, NULL);
die_on_amqp_error(
amqp_login(conn, "/", 0, 131072, 0,
AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Login");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Open channel");
/* Declare dead-letter exchange */
amqp_exchange_declare(conn, 1,
amqp_cstring_bytes("dlx.orders"),
amqp_cstring_bytes("fanout"),
0, 1, 0, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declare DLX");
/* Declare dead-letter queue */
amqp_queue_declare(conn, 1,
amqp_cstring_bytes("orders.dead-letter"),
0, 1, 0, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declare DLQ");
amqp_queue_bind(conn, 1,
amqp_cstring_bytes("orders.dead-letter"),
amqp_cstring_bytes("dlx.orders"),
amqp_cstring_bytes(""), amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Bind DLQ");
/* Declare main queue with DLX configured */
amqp_table_entry_t dlx_arg = {
.key = amqp_cstring_bytes("x-dead-letter-exchange"),
.value = {
.kind = AMQP_FIELD_KIND_UTF8,
.value.bytes = amqp_cstring_bytes("dlx.orders")
}
};
amqp_table_t dlx_args = { .num_entries = 1, .entries = &dlx_arg };
amqp_queue_declare(conn, 1,
amqp_cstring_bytes("orders"),
0, 1, 0, 0, dlx_args);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declare orders queue");
/* Set prefetch count — process 10 at a time */
amqp_basic_qos(conn, 1, 0, 10, 0);
/* Start consuming (no-ack=false: manual ACK mode) */
amqp_basic_consume(conn, 1,
amqp_cstring_bytes("orders"),
amqp_empty_bytes,
0, 0 /*no_ack=false*/, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consume");
while (1) {
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
amqp_rpc_reply_t res = amqp_consume_message(conn, &envelope, NULL, 0);
if (res.reply_type != AMQP_RESPONSE_NORMAL) break;
fprintf(stdout, "Delivery tag: %" PRIu64 " body: %.*s\n",
envelope.delivery_tag,
(int)envelope.message.body.len,
(const char *)envelope.message.body.bytes);
int success = 1; /* TODO: replace with actual processing */
if (success) {
/* ACK: remove from queue */
amqp_basic_ack(conn, 1, envelope.delivery_tag, 0);
} else {
/* NACK + requeue=false: sends to DLX */
amqp_basic_nack(conn, 1, envelope.delivery_tag, 0, 0);
}
amqp_destroy_envelope(&envelope);
}
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 0;
}
── Implementation 4 — Outbox Relay (libpq) ──
📤 Outbox Relay: PostgreSQL → Kafka (libpq + librdkafka)
/* outbox_relay.c — polls outbox table and publishes to Kafka */
#include <libpq-fe.h>
#include <librdkafka/rdkafka.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#define BATCH_SIZE 100
#define POLL_INTERVAL_MS 100
typedef struct {
PGconn *pg;
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
} relay_ctx_t;
static void relay_batch(relay_ctx_t *ctx) {
/* Fetch pending outbox rows */
PGresult *res = PQexec(ctx->pg,
"SELECT id, event_type, payload FROM outbox "
"WHERE status = 'PENDING' ORDER BY created_at LIMIT "
BATCH_SIZE_STR " FOR UPDATE SKIP LOCKED");
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
fprintf(stderr, "Query failed: %s\n", PQerrorMessage(ctx->pg));
PQclear(res);
return;
}
int rows = PQntuples(res);
for (int i = 0; i < rows; i++) {
const char *id = PQgetvalue(res, i, 0);
const char *event_type = PQgetvalue(res, i, 1);
const char *payload = PQgetvalue(res, i, 2);
/* Produce to Kafka (fire-and-forget here; use delivery callback for ACK) */
rd_kafka_produce(ctx->rkt,
RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY,
(void *)payload, strlen(payload),
event_type, strlen(event_type),
NULL);
/* Mark as sent in same connection (no distributed txn needed:
at-least-once — may re-publish if crash here before UPDATE) */
const char *params[1] = { id };
PQexecParams(ctx->pg,
"UPDATE outbox SET status='SENT', sent_at=NOW() WHERE id=$1",
1, NULL, params, NULL, NULL, 0);
}
PQclear(res);
rd_kafka_flush(ctx->rk, 5000);
}
int main() {
relay_ctx_t ctx;
char errstr[512];
ctx.pg = PQconnectdb("host=localhost dbname=myapp user=relay_user");
if (PQstatus(ctx.pg) != CONNECTION_OK) {
fprintf(stderr, "PG connect failed: %s\n", PQerrorMessage(ctx.pg));
return 1;
}
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092",
errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "enable.idempotence", "true", errstr, sizeof(errstr));
ctx.rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
ctx.rkt = rd_kafka_topic_new(ctx.rk, "domain-events", NULL);
fprintf(stdout, "Outbox relay started\n");
while (1) {
relay_batch(&ctx);
usleep(POLL_INTERVAL_MS * 1000);
}
}
FOR UPDATE SKIP LOCKED: if running multiple relay processes for redundancy, this PostgreSQL clause ensures each row is only processed by one relay at a time — no duplicate publications from concurrent relays.
🔬 Lab 1 — Kafka Producer / Consumer Pipeline with Delivery Guarantees
Build a Kafka pipeline and observe the behavior of different delivery semantics.
1 Start Kafka locally:
docker compose up kafka zookeeper. Create topic events with 3 partitions, replication-factor 1.2 Write a producer using librdkafka with
enable.idempotence=true. Send 10,000 messages with keys user-{i%100} (100 distinct keys).3 Observe partition distribution: messages with the same key always go to the same partition. Verify with
kafka-console-consumer --partition output.4 Write two consumer processes in the same group. Start both. Observe partition rebalancing in logs when the second consumer joins.
5 Kill one consumer mid-stream. Observe rebalance and that the surviving consumer picks up all partitions and continues from the committed offset (not the beginning).
6 Bonus: write a transactional producer that sends batches of 10 messages atomically. Verify with
isolation.level=read_committed consumer that aborted batches are not visible.🔬 Lab 2 — RabbitMQ Dead-Letter Exchange Chain
Build a retry-with-backoff pipeline using DLX chaining.
1 Start RabbitMQ:
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management. Open management UI at http://localhost:15672.2 Declare the following chain using rabbitmq-c or the management UI:
ordersqueue → DLX:retry-exchangeorders.retry.5squeue → TTL: 5000ms → DLX:orders-exchange(routes back toorders)orders.dead-finalqueue for permanently failed messages (retry limit exceeded)
3 Write a consumer that processes messages. Increment a retry count header. After 3 retries, NACK to final dead-letter queue. Otherwise NACK to retry queue.
4 Publish 10 messages. Fail the first 3 (to trigger retries). Verify in management UI that messages move through the retry chain with 5s delay.
5 Bonus: modify retry delay to be exponential: 5s → 30s → 5m using three distinct retry queues.
🔬 Lab 3 — Outbox Pattern with PostgreSQL + Kafka
Demonstrate atomic event publishing using the Outbox pattern.
1 Create PostgreSQL tables:
CREATE TABLE orders (id SERIAL PRIMARY KEY, customer_id INT, amount NUMERIC);
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
status TEXT NOT NULL DEFAULT 'PENDING',
created_at TIMESTAMPTZ DEFAULT NOW(),
sent_at TIMESTAMPTZ
);
2 Write a transaction in C (libpq) that inserts an order AND an outbox record atomically. Simulate a crash (call abort() after DB commit) — verify the outbox row persists even though Kafka was never published.
3 Build the Outbox Relay from Tab 8. Run it — observe it picks up the PENDING row and publishes to Kafka. Verify message arrives in Kafka consumer.
4 Test the duplicate scenario: manually reset a row to PENDING after it was marked SENT. Run relay again — verify Kafka receives a duplicate. Make the Kafka consumer idempotent using a Redis SETNX dedup check.
🔬 Lab 4 — CQRS Read Model Projection
Build a minimal CQRS system: write to PostgreSQL, project events to a Redis read model.
1 Write model:
orders table in PostgreSQL with normalized schema. Each INSERT/UPDATE publishes an event to Kafka topic order-events.2 Read model consumer: subscribe to
order-events. For each event, upsert a denormalized Redis hash: HSET order:{id} customer_name "..." status "..." total "...".3 Query handler: read from Redis hash for single order lookups. Measure latency vs direct PostgreSQL query under load (use Apache Bench or wrk).
4 Simulate read model rebuild: delete Redis keys, reset consumer offset to earliest, re-run consumer — verify Redis is repopulated from event history.
5 Bonus: add a "leaderboard" read model: top 10 customers by order total, stored as a Redis sorted set (
ZADD). Update on every OrderPlaced event.── Phase 5 Mastery Checklist ──
Kafka
- Explain topic, partition, offset, consumer group, ISR
- Describe producer partition assignment: key hash vs round-robin
- Explain rebalance: eager vs cooperative, triggers
- Configure
acks=all + min.insync.replicas=2for durability - Implement idempotent producer with librdkafka
- Implement manual offset commit consumer
- Explain log compaction and when to use it
- Explain the 5 exchange types and when to use each
- Configure dead-letter exchange for failed messages
- Build retry chain with TTL + DLX
- Implement manual ACK/NACK consumer
- Set prefetch count for flow control
Delivery Semantics
- Explain at-most-once, at-least-once, exactly-once trade-offs
- Configure idempotent + transactional producer for EOS
- Set
isolation.level=read_committedfor EOS consumers
- Implement Outbox: atomic DB write + relay process
- Explain Saga orchestration vs choreography trade-offs
- Design CQRS write/read model split for a given domain
- Explain Event Sourcing: replay, snapshots, schema versioning
- Implement idempotent consumer with dedup table (PostgreSQL)
- Design idempotency keys for Kafka, RabbitMQ, HTTP APIs