TRACK A · LLD · MODULE A5 · WEEK 8 · LIVE

Concurrency
in LLD

Threads · Locks · Semaphores · Producer-Consumer
Rate Limiter · Deadlock · Thread Pool · Pub/Sub Queue
9
TOPICS
2
PROJECTS
4
TASKS
A5
MODULE
Java Memory Model — why threads see stale data
// THREAD EXECUTION — RACE CONDITION VISUALISED
THREAD-1
GETFIELD → IADD → PUTFIELD
count?
THREAD-2
GETFIELD → IADD → PUTFIELD
count?
THREAD-3
GETFIELD → IADD → PUTFIELD
count?
⚠ count++ is 3 bytecode ops — threads interleave, updates get lost
PROBLEM 01
Visibility
Thread writes to local CPU cache. Other threads see stale value from main memory. JMM does NOT guarantee when (or if) cache is flushed without synchronization.
PROBLEM 02
Atomicity
count++ is 3 bytecode instructions: read → modify → write. Threads can interleave between any two, causing lost updates.
GUARANTEE
Happens-Before
JMM guarantee: if A happens-before B, B sees all of A's memory writes. Established by: synchronized exit/enter, volatile write/read, thread start/join, lock release/acquire.
FIX 01
volatile
Ensures visibility only. Writes immediately flushed to main memory; reads always from main memory. Does NOT fix atomicity — count++ still broken with volatile.
FIX 02
synchronized
Ensures both visibility AND atomicity. Only one thread executes synchronized block at a time. Establishes happens-before on exit/enter. Correct but heavier than atomic ops.
FIX 03
AtomicInteger
Lock-free via CAS (Compare-And-Swap). Hardware instruction: atomic read-modify-write. Best for single counters/references under moderate contention. No blocking.
volatile vs synchronized vs AtomicJAVA
// ❌ BROKEN — race condition on count++
class BrokenCounter   { private int count = 0; void inc() { count++; } }

// ❌ STILL BROKEN — volatile fixes visibility, not atomicity
class VolatileCounter  { private volatile int count = 0; void inc() { count++; } }

// ✅ FIXED — synchronized ensures atomicity + visibility
class SyncCounter      { private int count = 0; synchronized void inc() { count++; } }

// ✅ FIXED — lock-free CAS, better than synchronized for single counter
class AtomicCounter    { private final AtomicInteger count = new AtomicInteger(0);
                           void inc() { count.incrementAndGet(); } }

// volatile IS correct for simple flags (single writer, no compound op)
class Service {
    private volatile boolean running = true;   // ✅ correct: simple write/read
    public void stop()  { running = false; }     // single writer
    public void run()   { while (running) work(); } // multi-reader
}
Common interview trap: "Is volatile enough for a counter?" — No. volatile fixes visibility but count++ still has the read-modify-write race. Use AtomicInteger or synchronized.
Synchronization Primitives
PRIMITIVEATOMICITYVISIBILITYTRY-LOCKTIMEOUTFAIRNESSBEST FOR
synchronizedSimple mutual exclusion
ReentrantLockComplex locking needs
volatileN/AN/AN/ASimple flags, single writer
AtomicIntegerCASLock-free single counter
SemaphoreLimiting concurrent access (N>1)
ReadWriteLockRead-heavy access patterns
LongAdderN/AN/AN/AHigh-contention counter (striped)
ReentrantLock — full APIJAVA
class BankAccount {
    private double              balance = 0;
    private final ReentrantLock lock = new ReentrantLock(true); // fair=true

    // Basic lock — ALWAYS unlock in finally
    public void deposit(double amount) {
        lock.lock();
        try     { balance += amount; }
        finally { lock.unlock(); }  // ← never skip this
    }

    // tryLock — non-blocking, returns false if unavailable
    public boolean tryDeposit(double amount) {
        if (lock.tryLock()) {
            try     { balance += amount; return true; }
            finally { lock.unlock(); }
        }
        return false;
    }

    // tryLock with timeout — blocks at most N ms
    public boolean tryDepositTimeout(double amount, long ms)
            throws InterruptedException {
        if (lock.tryLock(ms, TimeUnit.MILLISECONDS)) {
            try     { balance += amount; return true; }
            finally { lock.unlock(); }
        }
        return false;  // Timeout — didn't acquire
    }
}
ReadWriteLock — concurrent reads, exclusive writesJAVA
class ConfigCache {
    private final Map<String,String> cache  = new HashMap<>();
    private final ReadWriteLock        rwLock = new ReentrantReadWriteLock();
    private final Lock                 rLock  = rwLock.readLock();
    private final Lock                 wLock  = rwLock.writeLock();

    // ✅ MANY threads can read simultaneously
    public String get(String key) {
        rLock.lock();
        try     { return cache.get(key); }
        finally { rLock.unlock(); }
    }

    // ✅ EXCLUSIVE — blocks all readers + other writers
    public void put(String key, String val) {
        wLock.lock();
        try     { cache.put(key, val); }
        finally { wLock.unlock(); }
    }

    // Double-checked pattern — read fast path, write fallback
    public String computeIfAbsent(String key, Function<String,String> fn) {
        rLock.lock();                             // 1. Try read (fast path)
        try { if (cache.get(key) != null) return cache.get(key); }
        finally { rLock.unlock(); }
        wLock.lock();                             // 2. Write lock
        try {
            if (cache.get(key) != null) return cache.get(key); // 3. Double-check
            String v = fn.apply(key);
            cache.put(key, v); return v;
        } finally { wLock.unlock(); }
    }
}
ReadWriteLock rule: Lock downgrade (write→read) is allowed in Java. Lock upgrade (read→write) is NOT — it deadlocks. Always release read lock before acquiring write lock.
Concurrency Patterns
Producer-Consumer — BlockingQueue
LogPipeline.java — classic producer-consumerJAVA
class LogPipeline {
    // LinkedBlockingQueue: bounded, separate head/tail locks — high throughput
    private final BlockingQueue<LogEvent> queue =
        new LinkedBlockingQueue<>(1000); // Bounded — backpressure!
    private volatile boolean running = true;

    // PRODUCER — any app thread calls this
    public void log(String level, String msg) {
        try {
            queue.put(new LogEvent(level, msg)); // Blocks if queue full (backpressure)
        } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
    }

    public boolean tryLog(String level, String msg) {
        return queue.offer(new LogEvent(level, msg)); // Non-blocking, drops if full
    }

    // CONSUMER — background thread drains queue
    private void consume() {
        while (running || !queue.isEmpty()) {
            try {
                LogEvent e = queue.poll(100, TimeUnit.MILLISECONDS); // timeout: re-check 'running'
                if (e != null) writeToSink(e);
            } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; }
        }
    }
}
Condition Variables — await() ALWAYS in while
BoundedBuffer.java — two conditionsJAVA
class BoundedBuffer<T> {
    private final Object[]      buf;
    private int                  count=0, put=0, take=0;
    private final ReentrantLock lock     = new ReentrantLock();
    private final Condition     notFull  = lock.newCondition();
    private final Condition     notEmpty = lock.newCondition();

    public void put(T item) throws InterruptedException {
        lock.lock();
        try {
            while (count == buf.length) notFull.await();  // ← WHILE not IF! (spurious wakeups)
            buf[put] = item; put = (put+1)%buf.length; count++;
            notEmpty.signal();  // Wake one consumer
        } finally { lock.unlock(); }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) notEmpty.await();         // ← WHILE not IF!
            T item = (T) buf[take]; buf[take]=null; take=(take+1)%buf.length; count--;
            notFull.signal();   // Wake one producer
            return item;
        } finally { lock.unlock(); }
    }
}
Spurious wakeups: The JVM can wake a thread from await() without signal() — allowed by spec. ALWAYS re-check the guard condition in a while loop after await(). This is one of the most common concurrency bugs.
Semaphore — bounded resource pool
DBConnectionPool.javaJAVA
class DBConnectionPool {
    private final Semaphore         sem;
    private final Queue<Connection>  pool = new ConcurrentLinkedQueue<>();

    public DBConnectionPool(int max) {
        sem = new Semaphore(max, true);  // fair=true: FIFO, no starvation
        for (int i=0; i<max; i++) pool.offer(createConnection());
    }

    public Connection acquire() throws InterruptedException {
        sem.acquire();         // Blocks until a slot is free
        return pool.poll();
    }

    public Connection acquire(long ms) throws InterruptedException {
        if (!sem.tryAcquire(ms, TimeUnit.MILLISECONDS))
            throw new TimeoutException("No connection in "+ms+"ms");
        return pool.poll();
    }

    public void release(Connection c) {
        pool.offer(c);
        sem.release();         // Signal one slot free → unblocks next waiter
    }
}
Deadlock — Detection, Prevention, Avoidance
Deadlock: Thread A holds Lock-1 waiting for Lock-2. Thread B holds Lock-2 waiting for Lock-1. Neither can proceed. Program hangs forever.
❌ BROKEN — circular wait
// Thread-1: lock(from) then lock(to)
// Thread-2: lock(to)   then lock(from)
// → DEADLOCK when called concurrently
void transfer(Account from, Account to, double amt) {
    synchronized (from) {
        synchronized (to) {   // ← order varies!
            from.debit(amt);
            to.credit(amt);
        }
    }
}
✅ FIX 1 — consistent lock ordering
// Always lock lower account ID first
void transfer(Account from, Account to, double amt) {
    Account first  = from.id < to.id ? from : to;
    Account second = from.id < to.id ? to   : from;
    synchronized (first) {
        synchronized (second) { // ← same order always
            from.debit(amt);
            to.credit(amt);
        }
    }
}
✅ FIX 2 — tryLock with backoff
void transfer(Account f, Account t, double amt) throws Exception {
    while (true) {
        if (f.lock.tryLock(50, TimeUnit.MILLISECONDS)) {
            try {
                if (t.lock.tryLock(50, TimeUnit.MILLISECONDS)) {
                    try { f.debit(amt); t.credit(amt); return; }
                    finally { t.lock.unlock(); }
                }
            } finally { f.lock.unlock(); }
        }
        Thread.sleep((long)(Math.random()*10)); // random backoff
    }
}
🔑 Coffman Conditions
ALL four must hold for deadlock:
1. Mutual Exclusion — resource held exclusively
2. Hold & Wait — holds one, waits for another
3. No Preemption — resource can't be taken away
4. Circular Wait — A→B→C→A dependency cycle

Break ANY ONE to prevent deadlock:
→ Ordering breaks Circular Wait
→ tryLock breaks Hold & Wait + No Preemption
Rate Limiter — Token Bucket Algorithm
Bucket
capacity N tokens
Refill
+rate/ms tokens
Request →
Check
tokens ≥ 1?
→ YES →
Allow
tokens--
→ NO →
Reject
429 / wait
TokenBucketRateLimiter.javaJAVA
class TokenBucketRateLimiter {
    private final long   capacity;
    private final double refillRatePerMs;
    private double       tokens;
    private long         lastRefill;

    public TokenBucketRateLimiter(long capacity, long rps) {
        this.capacity        = capacity;
        this.refillRatePerMs = rps / 1000.0;
        this.tokens          = capacity;   // Start full
        this.lastRefill      = System.currentTimeMillis();
    }

    private void refill() {
        long now = System.currentTimeMillis();
        tokens = Math.min(capacity, tokens + (now - lastRefill) * refillRatePerMs);
        lastRefill = now;
    }

    // Non-blocking — returns false if rate exceeded
    public synchronized boolean tryAcquire() {
        refill();
        if (tokens >= 1) { tokens--; return true; }
        return false;
    }

    // Blocking — waits until token available
    public synchronized void acquire() throws InterruptedException {
        while (true) {
            refill();
            if (tokens >= 1) { tokens--; return; }
            long waitMs = (long) Math.ceil((1-tokens)/refillRatePerMs);
            wait(waitMs);
        }
    }
}

// Per-user limiter — each user gets own bucket
class UserRateLimiter {
    private final ConcurrentHashMap<String, TokenBucketRateLimiter> buckets
        = new ConcurrentHashMap<>();
    private final Map<Tier,Long> limits = Map.of(
        Tier.FREE,10L, Tier.PRO,100L, Tier.ENTERPRISE,1000L);

    public boolean tryAcquire(String userId, Tier tier) {
        return buckets.computeIfAbsent(userId,
            k -> new TokenBucketRateLimiter(limits.get(tier)*10, limits.get(tier))
        ).tryAcquire();
    }
}
ALGORITHMBURST ALLOWEDRATE SMOOTHINGBEST FOR
Token Bucket✓ (saved tokens)PartialAPIs, HTTP rate limiting — most common
Leaky Bucket✗ (constant drain)✓ StrictNetwork traffic shaping
Sliding Window✓ AccuratePer-user quotas, billing accuracy
Fixed Window✓ at window start✗ Bursty edgesSimple quotas (daily/hourly limits)
Production LLD Projects
🚗
Thread-Safe Parking Lot
PROJECT 1 · PARKING · CONCURRENCY
50 concurrent threads, 3 vehicle types, zero double-bookings. Combines Semaphore + AtomicBoolean CAS + ReadWriteLock for the display board + Token Bucket for entry rate limiting.
Semaphore AtomicBoolean.CAS ReadWriteLock TokenBucket ConcurrentHashMap
Key concurrency designJAVA
// 1. Semaphore limits concurrent parkers per type
sem.acquire();  // blocks if no spots

// 2. CAS claims specific spot — no explicit lock
if (spot.occupied.compareAndSet(false, true)) { ...claim... }

// 3. ReadWriteLock on display board
//    many readers, write only on park/unpark

// 4. AtomicInteger for available count — no lock
availableCar.decrementAndGet();

// 5. TokenBucket at entry gate — 5 entries/sec
if (!entryLimiter.tryAcquire()) throw rateLimitEx;
📨
Pub/Sub Message Queue
PROJECT 2 · MESSAGING · ASYNC
Thread-safe publish/subscribe with async fan-out dispatch. Producers don't block consumers. One failing handler doesn't affect others. Uses CopyOnWriteArrayList for safe iteration during subscribe/unsubscribe.
LinkedBlockingQueue CopyOnWriteArrayList ExecutorService ConcurrentHashMap volatile
Key concurrency designJAVA
// 1. Producer: non-blocking publish
queue.offer(msg);  // false if full — backpressure

// 2. CopyOnWriteArrayList: safe to iterate
//    while other threads subscribe/unsubscribe
subscribers.computeIfAbsent(topic,
    k -> new CopyOnWriteArrayList<>()).add(h);

// 3. Dispatcher fans out to thread pool
for (MessageHandler h : handlers) {
    dispatchPool.submit(() -> {
        try { h.handle(msg); }
        catch (Exception e) { /* isolated */ }
    });
}

// 4. Shutdown: drain queue before stopping
running = false;
pool.awaitTermination(30, TimeUnit.SECONDS);
Why CopyOnWriteArrayList for subscribers? Iteration is snapshot-based — adding/removing subscribers mid-dispatch doesn't throw ConcurrentModificationException. Write operations are O(n) but dispatch iteration is lock-free. Perfect for read-heavy, write-rare lists like subscriber registries.
01
Race Condition Identification — 4 Snippets
~1.5 hrs

Name the concurrency bug in each snippet and write a correct fix.

// A — Lazy Singleton
class Config {
    private static Config instance;
    public static Config getInstance() {
        if (instance == null) { instance = new Config(); }  // Bug?
        return instance;
    }
}

// B — Check-then-act
class TicketSeller {
    private int tickets = 100;
    public boolean sell() {
        if (tickets > 0) { tickets--; return true; }        // Bug?
        return false;
    }
}

// C — Compound AtomicInteger
AtomicInteger count = new AtomicInteger(0);
public int getAndDoubleIfEven() {
    if (count.get() % 2 == 0)                              // Bug?
        return count.getAndAdd(count.get());
    return count.get();
}

// D — Visibility
class Worker {
    boolean done = false;
    void finish() { done = true; }
    void run()    { while (!done) work(); }                 // Bug?
}
02
Thread-Safe LRU Cache
~2.5 hrs · code

Implement an LRU Cache with concurrent access from multiple threads.

API:
  int  get(int key)           // O(1), returns -1 if absent
  void put(int key, int val)  // O(1), evicts LRU on capacity exceeded

Approach: LinkedHashMap (accessOrder=true) + ReentrantReadWriteLock
          OR: ConcurrentHashMap + ConcurrentLinkedDeque + explicit sync

Requirements:
  - Correct under 8 concurrent threads × 100k operations
  - No ConcurrentModificationException
  - LRU eviction order correct under concurrent access

Bonus: Benchmark vs Collections.synchronizedMap(new LinkedHashMap())
  Measure: throughput (ops/sec), latency p50/p99
03
Dining Philosophers — Two Solutions
~2 hrs · code

Implement and then fix the classic deadlock problem.

Setup: 5 philosophers, 5 forks (shared between adjacent pairs)
Lifecycle: think() → pickBothForks() → eat() → putDownForks()

Step 1: Implement naive version — show it deadlocks
  (5 threads all pick left fork simultaneously → circular wait)

Step 2: Fix with lock ordering
  Odd philosophers:  pick left, then right
  Even philosophers: pick right, then left
  → Breaks circular wait

Step 3: Fix with arbitrator (Semaphore)
  Only 4 philosophers allowed to try picking up forks at once
  → At most 4 can compete, guaranteeing one can always complete
  new Semaphore(4) wrapping pickBothForks()

Verify: 100 rounds, each philosopher eats at least once (no starvation)
Mini Project — Production Parking Lot
~5 hrs · full LLD
Requirements:
  - 3 types: Car(100), Bike(50), Truck(20) spots
  - 50 concurrent threads simulating arrivals/departures
  - Display board with real-time counts (read-heavy)
  - Ticket: vehicleId, spotId, entryTime, UUID
  - Fee: first 2h free, ₹50/hour after
  - Entry rate limit: 5 vehicles/sec (Token Bucket)

Concurrency mechanisms:
  Spot claim:      AtomicBoolean.compareAndSet (lock-free)
  Type counting:   AtomicInteger (lock-free)
  Capacity guard:  Semaphore(N, fair=true)
  Display board:   ReadWriteLock (concurrent reads)
  Entry gate:      TokenBucketRateLimiter (synchronized)

Correctness proof (JUnit assertions):
  1. Zero double-bookings: assert each spotId assigned to ≤ 1 vehicle
  2. Count invariant: final available + occupied == initial capacity
  3. All tickets have valid entry timestamps
  4. Fee calculation correct for 0, 2, 3, 5 hour durations

Deliverable: Full Java code + JUnit test + UML with sync annotations
0 / 13 completedMODULE A5 · CONCURRENCY IN LLD
Understand JMM: visibility, atomicity, happens-before — and their differences
Know when to use volatile vs synchronized vs AtomicXxx vs LongAdder
synchronized vs ReentrantLock: know every advantage of each
Can implement Producer-Consumer with BlockingQueue from memory
ReadWriteLock: know rule — read downgrade allowed, upgrade not
Semaphore: can implement bounded pool and explain acquire/release semantics
Condition variables: know await() MUST be in while loop, and why
All 4 Coffman conditions + how to break each one
Can implement Token Bucket rate limiter from memory
ExecutorService types: fixed, cached, scheduled, custom — when each
✏️ Task 1–3 completed (race conditions, LRU, dining philosophers)
✏️ Mini Project: Parking Lot — zero double-bookings verified under load
✏️ Pub/Sub Queue implemented and tested with concurrent publishers
// TRACK A COMPLETE → MOVING TO TRACK B
Track B — High-Level System Design
B1: HLD Fundamentals (CAP, consistency models, availability patterns)
B2: Databases at Scale (sharding, replication, SQL vs NoSQL tradeoffs)
B3: Caching (Redis, CDN, cache invalidation strategies)
B4: Message Queues (Kafka, RabbitMQ, at-least-once vs exactly-once)
B5: URL Shortener · Pastebin · TinyURL design
B6: Design Twitter Feed · Instagram · Netflix
← PREVIOUS: LLD A4 📄 READ STUDY NOTES ↑ ROADMAP NEXT: LLD A6 →