M13 — Event-Driven Architecture

Phase 5 Kafka internals & delivery semantics · RabbitMQ exchange patterns · Saga & Outbox · CQRS & Event Sourcing · Idempotent consumers · C/librdkafka implementations
🎯 Why Event-Driven Architecture?
Synchronous request/response creates temporal coupling — the caller blocks until the callee responds, and a slow callee cascades latency. Event-driven architecture severs this coupling: producers emit events without knowing who consumes them, and consumers process at their own pace.

Key motivations:
  • Temporal decoupling: producer and consumer run independently — if the consumer is down, events buffer in the broker
  • Fanout: one event reaches multiple consumers simultaneously (notifications, analytics, search indexing — all from one order-placed event)
  • Audit log / replay: full event history is replayable — rebuild any read model from scratch, debug production issues with real data
  • Reduces synchronous blocking chains: checkout doesn't wait for email service; email service consumes the event asynchronously
  • Enables eventual consistency: services converge on consistent state over time, trading strong consistency for availability and partition tolerance
Analogy — Event streaming vs work queue:
A work queue (RabbitMQ) is like a restaurant ticket system — a ticket is torn off by one chef, cooked, and discarded. A log-based stream (Kafka) is like a newspaper printing press — each edition is stamped with a page number (offset), archived forever, and any subscriber can re-read any past edition at any time.
📦 Message Queue Model (RabbitMQ)
  • Message consumed → deleted from queue
  • Push-based delivery to consumers
  • Competing consumers share load
  • At-most-once or at-least-once via ACK
  • Dead-letter exchange handles failures
  • Complex routing via exchanges
  • Great for: task queues, RPC, work distribution
📜 Event Stream Model (Kafka)
  • Messages retained for configurable period
  • Pull-based: consumers control their pace
  • Consumer groups: each partition → one consumer
  • Exactly-once via idempotent producer + transactions
  • Any consumer can replay from any offset
  • Ordered within partition
  • Great for: event sourcing, audit log, stream processing
📐 Pattern Landscape
PatternProblem SolvedTrade-off
OutboxAtomic: DB write + event publish in one transactionExtra table + relay process
Saga (Orchestration)Multi-service transactions without 2PCCentral coordinator = SPOF risk
Saga (Choreography)Distributed coordination via eventsHard to trace overall flow
CQRSSeparate write model from read modelEventual consistency on reads
Event SourcingState as immutable event logComplex queries, snapshot management
Idempotent ConsumerHandle at-least-once delivery safelyDedup table storage + lookup cost
🗺️ Phase 5 Module Map
ModuleTopicKey Concepts
M13 (this)Event-Driven ArchitectureKafka, RabbitMQ, Saga, Outbox, CQRS, Event Sourcing
M14Stream ProcessingKafka Streams, Flink windowing, stateful operators, exactly-once
Prerequisites: Ph2 (databases — you need to understand transactions for Outbox), Ph4 (concurrency — consumer thread pools, back-pressure)
⚙️ Kafka Architecture — Internals
Kafka's fundamental unit is the topic — a logical feed of records. Topics are split into partitions, each an ordered, immutable append-only log on disk.
Topic: orders (3 partitions, replication-factor=2) Partition 0 [offset 0][offset 1][offset 2]...[offset 847] ← append-only Partition 1 [offset 0][offset 1]...[offset 391] Partition 2 [offset 0]...[offset 1203] ┌─── Broker 1 (leader P0, follower P1) ───┐ │ Broker 2 (leader P1, follower P2) │ │ Broker 3 (leader P2, follower P0) │ └─────────────────────────────────────────┘ Producer → assigns partition by: hash(key) % num_partitions (key set) or: round-robin (key=null) Consumer Group A: consumer-A1 reads P0, consumer-A2 reads P1, consumer-A3 reads P2 Consumer Group B: consumer-B1 reads P0+P1, consumer-B2 reads P2 (independent read positions)
🔑 Partition Key Assignment
  • Key set: partition = murmur2(key) % num_partitions — all events for the same key go to the same partition → ordering guaranteed per key
  • Key null: sticky partitioner (batch to one partition, rotate after batch) — maximizes throughput
  • Custom partitioner: implement Partitioner interface for business logic (e.g., tenant-based routing)
Hot partition: if 10% of keys account for 80% of traffic, their partition becomes a bottleneck. Spread hot keys with a suffix: order_id + "_" + random(0,N)
🔄 Consumer Group Rebalancing
When a consumer joins or leaves, the group coordinator (a broker) triggers a rebalance:
  • Eager rebalance (stop-the-world): all consumers stop, revoke all partitions, reassign — processing pauses
  • Cooperative rebalance (incremental): only affected partitions are moved; others keep processing — reduces pause
  • partition.assignment.strategy = CooperativeStickyAssignor for incremental
  • session.timeout.ms: consumer must send heartbeat within this window or be considered dead → rebalance
📊 ISR — In-Sync Replicas
Each partition has one leader (handles all reads/writes) and N follower replicas that pull from the leader. The ISR set tracks which replicas are caught up within replica.lag.time.max.ms.

SettingMeaningTrade-off
acks=0Producer doesn't wait for any ackFastest, may lose messages
acks=1Leader acknowledges writeLost if leader fails before replication
acks=all (-1)All ISR members acknowledgeSlowest, strongest durability
min.insync.replicas=2Minimum ISR for acks=all to succeedPrevents silent data loss with small ISR
Production recommendation: acks=all + min.insync.replicas=2 + replication.factor=3 — survives one broker failure without data loss.
📍 Offset Management
Each consumer group tracks its position (offset) per partition in the __consumer_offsets internal topic.

Commit strategies:
  • Auto-commit (enable.auto.commit=true): commits periodically. Risk: commit before processing → data loss on crash; or process but not commit → reprocess on restart
  • Manual sync commit (commitSync()): blocks until broker confirms. Safe but slower
  • Manual async commit (commitAsync()): fire-and-forget. Higher throughput, retry on failure is tricky (stale offset may overwrite newer commit)
  • Best practice: manual sync commit after batch processing, async for throughput-critical paths with idempotent processing
Log compaction: Kafka can compact topics (cleanup.policy=compact) — keeps only the latest value per key, discards older records. Used for changelogs (KTable in Kafka Streams).
📐 Kafka Producer Batching & Compression
ConfigEffectTuning Guidance
batch.sizeMax bytes per batch per partitionIncrease (e.g. 64KB) for throughput
linger.msWait up to N ms to fill a batch5–50ms trades latency for throughput
compression.typesnappy/lz4/zstd per batchzstd for best ratio, lz4 for speed
buffer.memoryTotal producer buffer bytesIncrease if producers block frequently
max.in.flight.requests.per.connectionConcurrent unacked requestsSet to 1 for strict ordering (without idempotence)
📬 The Three Delivery Guarantees
Message delivery semantics describe what happens when things go wrong (network timeout, broker restart, producer crash mid-send).
GuaranteeHowRiskUse When
At-most-onceacks=0, no retryMessages lost if broker is downMetrics where loss is acceptable
At-least-onceacks=all + retriesDuplicates on retry after timeoutMost use-cases with idempotent consumers
Exactly-onceIdempotent producer + transactionsHigher latency, more complexFinancial transfers, order processing
🔑 At-Least-Once: The Duplicate Problem
With retries enabled, a producer sends a message. The broker receives it, writes it, but the network fails before sending the ACK. The producer retries — the broker receives a duplicate.
Producer ──[msg #1]──► Broker (writes to log) │ network blip ──┘ (ACK lost) Producer ──[msg #1 retry]──► Broker (writes duplicate!) offset 5: msg#1 offset 6: msg#1 ← duplicate!
Solution: idempotent producer assigns each message a sequence number + producer ID (PID). The broker deduplicates within a 5-message window per (PID, partition).
✅ Exactly-Once: Idempotent Producer + Transactions
Idempotent producer (single-partition, single-session):
// Producer config for idempotence rd_kafka_conf_set(conf, "enable.idempotence", "true", errstr, sz); // Automatically sets: acks=all, retries=INT_MAX, max.in.flight=5
Transactional producer (multi-partition, multi-message atomic write):
// Transactional config rd_kafka_conf_set(conf, "enable.idempotence", "true", errstr, sz); rd_kafka_conf_set(conf, "transactional.id", "my-txn-producer-1", errstr, sz); // Usage pattern rd_kafka_init_transactions(rk, 10000); // once at startup rd_kafka_begin_transaction(rk); // ... produce messages ... err = rd_kafka_commit_transaction(rk, 10000); // on error: rd_kafka_abort_transaction(rk, 10000);
Transactions guarantee atomicity across partitions: all messages commit or none do. Consumers must set isolation.level=read_committed to skip uncommitted messages (aborted transaction leftovers).
EOS (Exactly-Once Semantics) end-to-end requires: idempotent producer + transactions + read_committed isolation + atomic offset commit within the transaction. This is what Kafka Streams provides out-of-the-box with processing.guarantee=exactly_once_v2.
⚡ Consumer-Side Delivery Semantics
Delivery guarantees apply to both producer→broker and broker→consumer:
  • At-most-once consumer: commit offset before processing. If processing fails, message is lost (offset already moved forward)
  • At-least-once consumer: commit offset after processing. If crash after process but before commit, message is reprocessed on restart → make consumers idempotent
  • Exactly-once consumer: atomic commit — use Kafka transactions to write processing result AND commit offset in one transaction
Practical guidance: for most systems, at-least-once delivery + idempotent consumers is the right balance. Pure exactly-once adds significant complexity — use only when duplicates cause real business harm (billing, double-shipping).
🐰 RabbitMQ — AMQP Model
RabbitMQ implements the AMQP 0-9-1 protocol. Messages are published to exchanges, which route to queues via bindings. Consumers subscribe to queues.
Publisher ──[msg + routing_key]──► Exchange │ ├──[binding: routing_key=*.error]──► Queue: errors ├──[binding: routing_key=orders.*]──► Queue: orders └──[binding: fanout]──────────────► Queue: analytics Queue: audit-log Queue: errors ──► Consumer A (ACK → message deleted) └──► DLX → dead-letter-queue (NACK/TTL expired)
🔀 Exchange Types
TypeRouting Logic
directExact routing key match — one queue per key
topicPattern match: * (one word) / # (zero+). e.g. order.*.created
fanoutIgnores routing key — copies to ALL bound queues
headersMatch on message headers (key-value), ignores routing key
defaultBuilt-in direct exchange — routes to queue named = routing key
💀 Dead Letter Exchange (DLX)
Messages are dead-lettered when:
  • Consumer sends NACK with requeue=false
  • Message TTL expires in queue
  • Queue length overflow (x-max-length)
Dead-lettered messages go to the DLX (if configured), enabling:
  • Retry queues with exponential backoff (TTL + DLX chain)
  • Poison message parking for inspection
  • Alerting on repeated failures
🔁 Retry with Exponential Backoff via DLX Chain
/* Declare retry queue with TTL + DLX pointing back to main exchange */ /* main queue */ amqp_queue_declare(ch, 1, amqp_cstring_bytes("orders"), ...); /* retry-30s: TTL=30000ms, dead-letter back to "orders-exchange" */ amqp_table_entry_t args[2] = { { amqp_cstring_bytes("x-message-ttl"), { .kind = AMQP_FIELD_KIND_I32, .value.i32 = 30000 } }, { amqp_cstring_bytes("x-dead-letter-exchange"), { .kind = AMQP_FIELD_KIND_BYTES, .value.bytes = amqp_cstring_bytes("orders-exchange") } } }; amqp_table_t tbl = { .num_entries = 2, .entries = args }; amqp_queue_declare(ch, 1, amqp_cstring_bytes("orders.retry.30s"), 0, 1, 0, 0, tbl);
Pattern: main queue → failure → NACK → DLX(retry-5s) → TTL expires → DLX(retry-30s) → DLX(retry-5m) → DLX(dead-letter-final). Each level doubles the delay — classic exponential backoff without consumer sleep loops.
⚖️ Consumer ACK Modes
ModeBehaviorRisk
autoAck=trueBroker removes message as soon as delivered (at-most-once)Lost if consumer crashes before processing
Manual ACKConsumer confirms success; broker removes messageNone — correct at-least-once
Manual NACK requeue=trueMessage returned to front of queueInfinite loop on poison messages
Manual NACK requeue=falseMessage sent to DLX (if configured) or droppedDropped if no DLX
RejectSame as NACK for single message
prefetch count (basicQos): limits unacked messages per consumer. Set to 1 for strict round-robin fairness; increase (e.g. 20–100) for throughput when processing is fast.
🆚 Kafka vs RabbitMQ — When to Use Which
CriterionUse KafkaUse RabbitMQ
Message retentionNeed replay / audit logProcess-and-forget
Multiple consumersIndependent consumer groups reading same eventsOne consumer per message (competing consumers)
OrderingStrict ordering per key (partition)Best-effort (priority queues)
ThroughputMillions of msg/sec (sequential disk I/O)Tens of thousands/sec
Routing complexitySimple (partition key)Rich (exchange types, header matching)
RPC / request-replyAwkwardBuilt-in (correlation ID + reply-to)
Stream processingKafka Streams, ksqlDB nativeNeeds external framework
🔄 Distributed Transactions: Why Not 2PC?
Two-Phase Commit (2PC) coordinates distributed transactions but has critical problems: the coordinator is a single point of failure, participants hold locks during phase 1 (blocking), and the protocol is blocking — a coordinator crash leaves participants in limbo. In a microservices system spanning multiple databases and services, 2PC is impractical.

The Saga pattern replaces distributed ACID transactions with a sequence of local transactions, each publishing an event. If a step fails, compensating transactions undo the completed steps.
🎭 Orchestration Saga
A central Saga Orchestrator sends commands and receives events, maintaining the state machine.
Orchestrator │─[ReserveInventory]──► Inventory Svc │◄─[InventoryReserved]──┘ │─[ChargePayment]──────► Payment Svc │◄─[PaymentFailed]──────┘ ← failure! │─[ReleaseInventory]───► Inventory Svc compensation
Pros: easy to trace, centralized state, clear compensations
Cons: orchestrator = potential SPOF; logic centralized may become god object
💃 Choreography Saga
No central coordinator — each service listens for events and emits new events.
Order Svc emits [OrderCreated] ▼ Inventory Svc listens → reserves → emits [InventoryReserved] ▼ Payment Svc listens → charges → emits [PaymentFailed] ▼ Inventory Svc listens → releases → emits [InventoryReleased] Order Svc listens → marks order failed
Pros: decentralized, services fully independent
Cons: hard to understand overall flow; compensations scattered across services
📤 Outbox Pattern — Atomic DB Write + Event Publish
Problem: you write to the database and then publish to Kafka. What if the process crashes between the two operations? The DB write committed but the event never published → downstream services miss the event → distributed inconsistency.

Solution: write to an outbox table in the same local transaction as the business data. A separate relay process publishes outbox records to the broker and marks them as sent.
Business Transaction (atomic, single DB) ┌─────────────────────────────────────────────┐ │ INSERT INTO orders (id, …) │ │ INSERT INTO outbox (event_type, payload, │ │ status='PENDING') │ ← same txn! └─────────────────────────────────────────────┘ Relay Process (separate, runs continuously) SELECT * FROM outbox WHERE status='PENDING' LIMIT 100 FOR EACH row: publish to Kafka/RabbitMQ UPDATE outbox SET status='SENT', sent_at=NOW() WHERE id=row.id Alternatives to polling relay: CDC (Change Data Capture) via Debezium — reads DB WAL log directly, zero-latency, no polling overhead, works with Postgres/MySQL logical replication
⚠️ Outbox: Failure Modes and Guarantees
FailureOutcomeWhy Safe
Process crashes after DB write, before relay runsRelay picks up PENDING row on restartRow persists in outbox
Relay publishes to Kafka but crashes before marking SENTRelay re-publishes on restart → duplicateConsumer must be idempotent
DB transaction rolled backOutbox row never created → no eventAtomicity maintained
Kafka broker down during relayRelay retries; PENDING rows accumulateBroker recovery unblocks relay
The Outbox pattern provides at-least-once delivery. To prevent business harm from duplicates, combine with idempotent consumers (Tab 7).
✂️ CQRS — Command Query Responsibility Segregation
Traditional CRUD uses one model for reads and writes. As systems scale, reads and writes have very different access patterns: writes need normalized data for integrity; reads need denormalized projections for performance.

CQRS separates them into two explicit models:
  • Command side: mutates state, normalized DB optimized for writes and integrity (e.g., PostgreSQL with foreign keys)
  • Query side: returns projections, denormalized read model optimized for queries (e.g., Elasticsearch, Redis, materialized views)
Client │ ├──[Command: PlaceOrder(items)]──► Command Handler │ │ │ ▼ │ Write DB (PostgreSQL) │ orders, order_items, inventory │ │ │ [Domain Event: OrderPlaced] │ │ │ ┌─────────────┼───────────────┐ │ ▼ ▼ ▼ │ Read DB Search Index Analytics DB │ (denormalized (Elasticsearch) (ClickHouse) │ order view) │ └──[Query: GetOrderSummary(userId)]──► Query Handler │ ▼ Read DB (fast!)
✅ CQRS Benefits
  • Read model can be independently scaled (horizontal replicas)
  • Read model optimized for each query pattern (denormalized)
  • Write model optimized for correctness (normalized, transactions)
  • Multiple specialized read models from the same events
  • Replay events to rebuild read models after bugs
⚠️ CQRS Trade-offs
  • Eventual consistency: read model lags behind writes (typically milliseconds)
  • Additional infrastructure (separate read store)
  • Synchronization complexity (events must be reliably published)
  • Read-your-own-writes: a user who just placed an order may see stale data on immediate reload → handle with version tokens or direct write-model reads for the write's own session
📜 Event Sourcing — State as Event Log
Instead of storing the current state, store the sequence of events that led to the current state. Current state is derived by replaying events.

Normal CRUD: accounts: {id:1, balance:850}
Event Sourced:
event_log (account_id=1): AccountOpened { amount: 1000 } // balance → 1000 MoneyWithdrawn { amount: 200 } // balance → 800 InterestEarned { amount: 50 } // balance → 850 // replay → current balance = 850
📸 Snapshots
Replaying 10 years of events on every read is slow. Snapshots periodically capture current state:
  • Every N events, save a snapshot: {account_id:1, balance:850, version:3}
  • On read: load latest snapshot, replay events since that snapshot version
  • Typical threshold: every 100–1000 events
🔄 Schema Evolution
Events are immutable — you can't change past events. Handling schema changes:
  • Upcasting: transform old event format to new format at read time
  • Versioned events: MoneyWithdrawn_v1 vs MoneyWithdrawn_v2
  • Additive changes only: add fields, never remove or rename
  • Use Avro/Protobuf with Schema Registry for versioned schemas
📐 When to Use Event Sourcing (vs When Not To)
Use Event Sourcing When…Avoid When…
Complete audit log is required by regulationSimple CRUD with no history requirement
State reconstruction / time-travel debugging neededTeam unfamiliar with the pattern — steep learning curve
Multiple read models from the same dataQueries span multiple aggregates (event sourcing is aggregate-scoped)
Business domain is naturally event-oriented (banking, order management)Simple content management, settings, static data
🔑 Idempotent Consumers: Handling At-Least-Once Delivery
In any reliable messaging system (Kafka with retries, RabbitMQ with NACK+requeue, Outbox relay), messages may be delivered more than once. An idempotent consumer produces the same result regardless of how many times it processes the same message.

Two approaches:
  • Natural idempotency: the operation is inherently idempotent — e.g., SET balance=850 vs balance = balance - 200. If the operation can be expressed as an absolute state (upsert), duplicates are safe.
  • Deduplication table: track processed message IDs; reject re-processed messages.
🗃️ Deduplication Table Pattern
Each message carries a unique idempotency key (message ID, event ID, or a business key). The consumer records processed keys in a processed_events table.
-- Schema CREATE TABLE processed_events ( event_id TEXT PRIMARY KEY, processed_at TIMESTAMPTZ DEFAULT NOW() ); -- Atomic check-and-process (PostgreSQL) BEGIN; -- Attempt to insert event ID (fails with UNIQUE violation if duplicate) INSERT INTO processed_events (event_id) VALUES ($1) ON CONFLICT (event_id) DO NOTHING RETURNING event_id; -- If no row returned: already processed, skip -- If row returned: new event, proceed with business logic UPDATE accounts SET balance = balance - $2 WHERE id = $3; COMMIT;
The INSERT ... ON CONFLICT DO NOTHING + business logic must be in the same transaction for atomicity. If the transaction rolls back, the event ID is not recorded and can be safely reprocessed.
⚡ Redis-based Deduplication (High Throughput)
For very high throughput, use Redis with a TTL-based dedup set instead of a DB table:
/* Check + mark with SETNX (SET if Not eXists) */ /* Returns 1 if set (new), 0 if already existed (duplicate) */ int is_new = redis_setnx(r, event_id, "1"); if (is_new) { redis_expire(r, event_id, 86400); /* TTL: 24h — beyond message retention */ process_event(event); } else { /* duplicate: skip */ }
Redis dedup is best-effort — Redis can lose data if not persisted (AOF/RDB). For financial events, prefer the database dedup table for durable deduplication.
🧠 Designing Idempotency Keys
SourceKey StrategyNotes
Kafka messagestopic:partition:offsetGlobally unique per message position
RabbitMQ messagesSet message-id AMQP property at publish timeProducer responsibility; use UUID
Business eventsorder_id:event_typeNatural key; handles schema-level dedup
HTTP API callsClient-supplied Idempotency-Key header (UUID)Store response and return cached result on repeat
TTL management: set the dedup key TTL longer than your maximum retry window. If retries can span 24h, use 48h TTL. If your Kafka retention is 7 days, set TTL to 8 days.
🔧 Libraries Used
  • librdkafka — official C Kafka client; used by the Python/Go/Ruby clients under the hood
  • rabbitmq-c (amqp.h) — C AMQP 0-9-1 client for RabbitMQ
# Install (Ubuntu) apt-get install librdkafka-dev librabbitmq-dev # Compile gcc -o kafka_producer kafka_producer.c -lrdkafka gcc -o amqp_consumer amqp_consumer.c -lrabbitmq
── Implementation 1 — Kafka Producer with Delivery Reports ──
📤 Kafka Producer (librdkafka) — Full Implementation
/* kafka_producer.c — production-ready Kafka producer */ #include <librdkafka/rdkafka.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <signal.h> static volatile int run = 1; static void sigterm(int sig) { run = 0; } /* Called for every message after produce attempt */ static void delivery_report_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque) { (void)rk; (void)opaque; if (msg->err) { fprintf(stderr, "[DR] FAILED: topic=%s err=%s key=%.*s\n", rd_kafka_topic_name(msg->rkt), rd_kafka_err2str(msg->err), (int)msg->key_len, (const char *)msg->key); } else { fprintf(stdout, "[DR] OK: topic=%s partition=%" PRId32 " offset=%" PRId64 " key=%.*s\n", rd_kafka_topic_name(msg->rkt), msg->partition, msg->offset, (int)msg->key_len, (const char *)msg->key); } } int main(int argc, char *argv[]) { if (argc < 4) { fprintf(stderr, "Usage: %s <brokers> <topic> <message>\n", argv[0]); return 1; } char errstr[512]; rd_kafka_conf_t *conf = rd_kafka_conf_new(); /* Broker list */ if (rd_kafka_conf_set(conf, "bootstrap.servers", argv[1], errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); return 1; } /* Idempotent producer: deduplicates retries automatically */ rd_kafka_conf_set(conf, "enable.idempotence", "true", errstr, sizeof(errstr)); /* Delivery callback for per-message success/failure reporting */ rd_kafka_conf_set_dr_msg_cb(conf, delivery_report_cb); rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) { fprintf(stderr, "Failed to create producer: %s\n", errstr); return 1; } const char *topic = argv[2]; const char *msg = argv[3]; const char *key = "order-key-001"; /* determines partition */ signal(SIGINT, sigterm); signal(SIGTERM, sigterm); retry_produce: rd_kafka_resp_err_t err = rd_kafka_producev( rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_KEY(key, strlen(key)), RD_KAFKA_V_VALUE((void *)msg, strlen(msg)), RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END ); if (err) { fprintf(stderr, "Produce failed: %s\n", rd_kafka_err2str(err)); if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) { rd_kafka_poll(rk, 1000); /* drain delivery queue, then retry */ goto retry_produce; } } /* Poll for delivery reports (fires delivery_report_cb) */ rd_kafka_poll(rk, 0); /* non-blocking poll */ /* Wait for all outstanding messages to be delivered */ fprintf(stdout, "Flushing...\n"); rd_kafka_flush(rk, 10 * 1000); /* wait up to 10s */ if (rd_kafka_outq_len(rk) > 0) fprintf(stderr, "%d message(s) not delivered\n", rd_kafka_outq_len(rk)); rd_kafka_destroy(rk); return 0; }
── Implementation 2 — Kafka Consumer with Manual Offset Commit ──
📥 Kafka Consumer (librdkafka) — At-Least-Once with Manual Commit
/* kafka_consumer.c — manual offset commit consumer */ #include <librdkafka/rdkafka.h> #include <stdio.h> #include <stdlib.h> #include <string.h> static volatile int run = 1; static void rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *parts, void *opaque) { switch (err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: fprintf(stdout, "Rebalance: assigned %d partitions\n", parts->cnt); rd_kafka_assign(rk, parts); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: fprintf(stdout, "Rebalance: revoking partitions, committing offsets\n"); rd_kafka_commit(rk, parts, 0); /* sync commit before revoke */ rd_kafka_assign(rk, NULL); break; default: rd_kafka_assign(rk, NULL); break; } } int main(int argc, char *argv[]) { if (argc < 4) { fprintf(stderr, "Usage: %s <brokers> <group> <topic>\n", argv[0]); return 1; } char errstr[512]; rd_kafka_conf_t *conf = rd_kafka_conf_new(); rd_kafka_conf_set(conf, "bootstrap.servers", argv[1], errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "group.id", argv[2], errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr)); /* Disable auto-commit: we commit manually after processing */ rd_kafka_conf_set(conf, "enable.auto.commit", "false", errstr, sizeof(errstr)); rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb); rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); rd_kafka_poll_set_consumer(rk); /* Subscribe to topic */ rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(topics, argv[3], RD_KAFKA_PARTITION_UA); rd_kafka_subscribe(rk, topics); rd_kafka_topic_partition_list_destroy(topics); int msg_count = 0; while (run) { rd_kafka_message_t *msg = rd_kafka_consumer_poll(rk, 1000); if (!msg) continue; if (msg->err) { if (msg->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) fprintf(stdout, "Reached end of partition\n"); else fprintf(stderr, "Consumer error: %s\n", rd_kafka_message_errstr(msg)); } else { /* Process message */ fprintf(stdout, "Message: partition=%" PRId32 " offset=%" PRId64 " key=%.*s value=%.*s\n", msg->partition, msg->offset, (int)msg->key_len, (const char *)msg->key, (int)msg->len, (const char *)msg->payload); /* TODO: process_message(msg->payload, msg->len); */ /* Commit offset after successful processing (at-least-once) */ if (++msg_count % 100 == 0) { /* Sync commit every 100 messages for durability */ rd_kafka_commit_message(rk, msg, /*async=*/0); } } rd_kafka_message_destroy(msg); } /* Final commit before shutdown */ rd_kafka_commit(rk, NULL, 0); rd_kafka_consumer_close(rk); rd_kafka_destroy(rk); return 0; }
── Implementation 3 — RabbitMQ Consumer with DLX (rabbitmq-c) ──
🐰 RabbitMQ Consumer with Dead-Letter Exchange (rabbitmq-c)
/* amqp_consumer.c — RabbitMQ consumer with manual ACK and DLX */ #include <amqp.h> #include <amqp_tcp_socket.h> #include <stdio.h> #include <stdlib.h> #include <string.h> static void die_on_amqp_error(amqp_rpc_reply_t x, const char *context) { if (x.reply_type != AMQP_RESPONSE_NORMAL) { fprintf(stderr, "%s: AMQP error\n", context); exit(1); } } int main() { amqp_connection_state_t conn = amqp_new_connection(); amqp_socket_t *socket = amqp_tcp_socket_new(conn); amqp_tcp_socket_open(socket, "localhost", 5672, NULL); die_on_amqp_error( amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Login"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Open channel"); /* Declare dead-letter exchange */ amqp_exchange_declare(conn, 1, amqp_cstring_bytes("dlx.orders"), amqp_cstring_bytes("fanout"), 0, 1, 0, 0, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Declare DLX"); /* Declare dead-letter queue */ amqp_queue_declare(conn, 1, amqp_cstring_bytes("orders.dead-letter"), 0, 1, 0, 0, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Declare DLQ"); amqp_queue_bind(conn, 1, amqp_cstring_bytes("orders.dead-letter"), amqp_cstring_bytes("dlx.orders"), amqp_cstring_bytes(""), amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Bind DLQ"); /* Declare main queue with DLX configured */ amqp_table_entry_t dlx_arg = { .key = amqp_cstring_bytes("x-dead-letter-exchange"), .value = { .kind = AMQP_FIELD_KIND_UTF8, .value.bytes = amqp_cstring_bytes("dlx.orders") } }; amqp_table_t dlx_args = { .num_entries = 1, .entries = &dlx_arg }; amqp_queue_declare(conn, 1, amqp_cstring_bytes("orders"), 0, 1, 0, 0, dlx_args); die_on_amqp_error(amqp_get_rpc_reply(conn), "Declare orders queue"); /* Set prefetch count — process 10 at a time */ amqp_basic_qos(conn, 1, 0, 10, 0); /* Start consuming (no-ack=false: manual ACK mode) */ amqp_basic_consume(conn, 1, amqp_cstring_bytes("orders"), amqp_empty_bytes, 0, 0 /*no_ack=false*/, 0, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Consume"); while (1) { amqp_envelope_t envelope; amqp_maybe_release_buffers(conn); amqp_rpc_reply_t res = amqp_consume_message(conn, &envelope, NULL, 0); if (res.reply_type != AMQP_RESPONSE_NORMAL) break; fprintf(stdout, "Delivery tag: %" PRIu64 " body: %.*s\n", envelope.delivery_tag, (int)envelope.message.body.len, (const char *)envelope.message.body.bytes); int success = 1; /* TODO: replace with actual processing */ if (success) { /* ACK: remove from queue */ amqp_basic_ack(conn, 1, envelope.delivery_tag, 0); } else { /* NACK + requeue=false: sends to DLX */ amqp_basic_nack(conn, 1, envelope.delivery_tag, 0, 0); } amqp_destroy_envelope(&envelope); } amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS); amqp_connection_close(conn, AMQP_REPLY_SUCCESS); amqp_destroy_connection(conn); return 0; }
── Implementation 4 — Outbox Relay (libpq) ──
📤 Outbox Relay: PostgreSQL → Kafka (libpq + librdkafka)
/* outbox_relay.c — polls outbox table and publishes to Kafka */ #include <libpq-fe.h> #include <librdkafka/rdkafka.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #define BATCH_SIZE 100 #define POLL_INTERVAL_MS 100 typedef struct { PGconn *pg; rd_kafka_t *rk; rd_kafka_topic_t *rkt; } relay_ctx_t; static void relay_batch(relay_ctx_t *ctx) { /* Fetch pending outbox rows */ PGresult *res = PQexec(ctx->pg, "SELECT id, event_type, payload FROM outbox " "WHERE status = 'PENDING' ORDER BY created_at LIMIT " BATCH_SIZE_STR " FOR UPDATE SKIP LOCKED"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { fprintf(stderr, "Query failed: %s\n", PQerrorMessage(ctx->pg)); PQclear(res); return; } int rows = PQntuples(res); for (int i = 0; i < rows; i++) { const char *id = PQgetvalue(res, i, 0); const char *event_type = PQgetvalue(res, i, 1); const char *payload = PQgetvalue(res, i, 2); /* Produce to Kafka (fire-and-forget here; use delivery callback for ACK) */ rd_kafka_produce(ctx->rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)payload, strlen(payload), event_type, strlen(event_type), NULL); /* Mark as sent in same connection (no distributed txn needed: at-least-once — may re-publish if crash here before UPDATE) */ const char *params[1] = { id }; PQexecParams(ctx->pg, "UPDATE outbox SET status='SENT', sent_at=NOW() WHERE id=$1", 1, NULL, params, NULL, NULL, 0); } PQclear(res); rd_kafka_flush(ctx->rk, 5000); } int main() { relay_ctx_t ctx; char errstr[512]; ctx.pg = PQconnectdb("host=localhost dbname=myapp user=relay_user"); if (PQstatus(ctx.pg) != CONNECTION_OK) { fprintf(stderr, "PG connect failed: %s\n", PQerrorMessage(ctx.pg)); return 1; } rd_kafka_conf_t *conf = rd_kafka_conf_new(); rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "enable.idempotence", "true", errstr, sizeof(errstr)); ctx.rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); ctx.rkt = rd_kafka_topic_new(ctx.rk, "domain-events", NULL); fprintf(stdout, "Outbox relay started\n"); while (1) { relay_batch(&ctx); usleep(POLL_INTERVAL_MS * 1000); } }
FOR UPDATE SKIP LOCKED: if running multiple relay processes for redundancy, this PostgreSQL clause ensures each row is only processed by one relay at a time — no duplicate publications from concurrent relays.
🔬 Lab 1 — Kafka Producer / Consumer Pipeline with Delivery Guarantees
Build a Kafka pipeline and observe the behavior of different delivery semantics.
1 Start Kafka locally: docker compose up kafka zookeeper. Create topic events with 3 partitions, replication-factor 1.
2 Write a producer using librdkafka with enable.idempotence=true. Send 10,000 messages with keys user-{i%100} (100 distinct keys).
3 Observe partition distribution: messages with the same key always go to the same partition. Verify with kafka-console-consumer --partition output.
4 Write two consumer processes in the same group. Start both. Observe partition rebalancing in logs when the second consumer joins.
5 Kill one consumer mid-stream. Observe rebalance and that the surviving consumer picks up all partitions and continues from the committed offset (not the beginning).
6 Bonus: write a transactional producer that sends batches of 10 messages atomically. Verify with isolation.level=read_committed consumer that aborted batches are not visible.
🔬 Lab 2 — RabbitMQ Dead-Letter Exchange Chain
Build a retry-with-backoff pipeline using DLX chaining.
1 Start RabbitMQ: docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management. Open management UI at http://localhost:15672.
2 Declare the following chain using rabbitmq-c or the management UI:
  • orders queue → DLX: retry-exchange
  • orders.retry.5s queue → TTL: 5000ms → DLX: orders-exchange (routes back to orders)
  • orders.dead-final queue for permanently failed messages (retry limit exceeded)
3 Write a consumer that processes messages. Increment a retry count header. After 3 retries, NACK to final dead-letter queue. Otherwise NACK to retry queue.
4 Publish 10 messages. Fail the first 3 (to trigger retries). Verify in management UI that messages move through the retry chain with 5s delay.
5 Bonus: modify retry delay to be exponential: 5s → 30s → 5m using three distinct retry queues.
🔬 Lab 3 — Outbox Pattern with PostgreSQL + Kafka
Demonstrate atomic event publishing using the Outbox pattern.
1 Create PostgreSQL tables:
CREATE TABLE orders (id SERIAL PRIMARY KEY, customer_id INT, amount NUMERIC); CREATE TABLE outbox ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), event_type TEXT NOT NULL, payload JSONB NOT NULL, status TEXT NOT NULL DEFAULT 'PENDING', created_at TIMESTAMPTZ DEFAULT NOW(), sent_at TIMESTAMPTZ );
2 Write a transaction in C (libpq) that inserts an order AND an outbox record atomically. Simulate a crash (call abort() after DB commit) — verify the outbox row persists even though Kafka was never published.
3 Build the Outbox Relay from Tab 8. Run it — observe it picks up the PENDING row and publishes to Kafka. Verify message arrives in Kafka consumer.
4 Test the duplicate scenario: manually reset a row to PENDING after it was marked SENT. Run relay again — verify Kafka receives a duplicate. Make the Kafka consumer idempotent using a Redis SETNX dedup check.
🔬 Lab 4 — CQRS Read Model Projection
Build a minimal CQRS system: write to PostgreSQL, project events to a Redis read model.
1 Write model: orders table in PostgreSQL with normalized schema. Each INSERT/UPDATE publishes an event to Kafka topic order-events.
2 Read model consumer: subscribe to order-events. For each event, upsert a denormalized Redis hash: HSET order:{id} customer_name "..." status "..." total "...".
3 Query handler: read from Redis hash for single order lookups. Measure latency vs direct PostgreSQL query under load (use Apache Bench or wrk).
4 Simulate read model rebuild: delete Redis keys, reset consumer offset to earliest, re-run consumer — verify Redis is repopulated from event history.
5 Bonus: add a "leaderboard" read model: top 10 customers by order total, stored as a Redis sorted set (ZADD). Update on every OrderPlaced event.
── Phase 5 Mastery Checklist ──
Kafka
  • Explain topic, partition, offset, consumer group, ISR
  • Describe producer partition assignment: key hash vs round-robin
  • Explain rebalance: eager vs cooperative, triggers
  • Configure acks=all + min.insync.replicas=2 for durability
  • Implement idempotent producer with librdkafka
  • Implement manual offset commit consumer
  • Explain log compaction and when to use it
RabbitMQ
  • Explain the 5 exchange types and when to use each
  • Configure dead-letter exchange for failed messages
  • Build retry chain with TTL + DLX
  • Implement manual ACK/NACK consumer
  • Set prefetch count for flow control
Delivery Semantics
  • Explain at-most-once, at-least-once, exactly-once trade-offs
  • Configure idempotent + transactional producer for EOS
  • Set isolation.level=read_committed for EOS consumers
Patterns
  • Implement Outbox: atomic DB write + relay process
  • Explain Saga orchestration vs choreography trade-offs
  • Design CQRS write/read model split for a given domain
  • Explain Event Sourcing: replay, snapshots, schema versioning
  • Implement idempotent consumer with dedup table (PostgreSQL)
  • Design idempotency keys for Kafka, RabbitMQ, HTTP APIs