Part 4 — LLM API Mastery  ·  Module 13 of 14
Streaming & Conversation State
Stream tokens in real-time and manage multi-turn conversations without losing context
⏱ 1 Week 🟡 Intermediate 🔧 SSE · FastAPI · tiktoken · Redis 📋 Prerequisite: P4-M12
🎯

What This Module Covers

UX + Architecture

Two skills that separate prototype AI apps from production ones: streaming responses and conversation state management. Without streaming, users stare at a blank screen for 10 seconds. Without proper state management, your chatbot forgets everything after one message.

  • Streaming basics — how Server-Sent Events work, streaming from Anthropic and OpenAI SDKs
  • Streaming in FastAPI — exposing a streaming endpoint that the browser or client can consume
  • Conversation state — the messages array pattern, multi-turn management, turn limits
  • Context window management — counting tokens with tiktoken, sliding window, summarisation strategies
  • Persistent history — storing and retrieving conversation history from SQLite and Redis
🔗

Why These Skills Matter

Context
  • Streaming — users perceive a streaming response as 3-5× faster than waiting for the same content to appear all at once. Every production LLM app streams.
  • Conversation state — LLMs are stateless. Each API call is independent. Your code is responsible for maintaining the illusion of memory by sending the full message history with every request.
  • Context management — context windows are expensive. A 200k token context window costs ~20× more per token than a 10k window. You need strategies to keep context lean without losing important information.
  • Persistent history — users expect their conversation to survive a page refresh. You need a storage layer.

How Streaming Works — Server-Sent Events

Concept First

LLM streaming uses Server-Sent Events (SSE) — an HTTP connection stays open and the server pushes data chunks as they are generated. Each chunk contains a few tokens. The client appends them to build the final response.

# Without streaming — user waits 8 seconds, then sees everything at once
response = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Explain RAG in detail"}]
)
print(response.content[0].text)   # appears all at once after 8s

# With streaming — first token appears in ~300ms, rest stream in
with client.messages.stream(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Explain RAG in detail"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)   # each chunk printed as it arrives

# Get final message after streaming completes
final_message = stream.get_final_message()
print(f"\nTokens used: {final_message.usage.input_tokens} in, {final_message.usage.output_tokens} out")
🔧

Streaming Events — What Actually Arrives

Under the Hood
# Low-level event iteration — see every event type
with client.messages.stream(
    model="claude-3-5-sonnet-20241022",
    max_tokens=512,
    messages=[{"role": "user", "content": "Say hello"}]
) as stream:
    for event in stream:
        match event.type:
            case "message_start":
                # First event — contains model, message id
                print(f"Started: {event.message.id}")
            case "content_block_start":
                # A new content block begins (text or tool_use)
                print(f"Block type: {event.content_block.type}")
            case "content_block_delta":
                # A chunk of text (text_delta) or tool input (input_json_delta)
                if event.delta.type == "text_delta":
                    print(event.delta.text, end="", flush=True)
            case "content_block_stop":
                # Content block finished
                pass
            case "message_delta":
                # Stop reason and final token counts
                print(f"\nStop: {event.delta.stop_reason}")
            case "message_stop":
                # Streaming complete
                print("Stream finished")

# OpenAI streaming — similar pattern
with client.chat.completions.stream(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Say hello"}]
) as stream:
    for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            print(delta.content, end="", flush=True)
🔄

Streaming with Tool Calls

Advanced
# Streaming tool use — tool input arrives in chunks too
with client.messages.stream(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    tools=tools,
    messages=[{"role": "user", "content": "What's the weather in Mumbai?"}]
) as stream:
    current_tool_input = ""
    for event in stream:
        if event.type == "content_block_start":
            if event.content_block.type == "tool_use":
                tool_name = event.content_block.name
                tool_id   = event.content_block.id
        elif event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)
            elif event.delta.type == "input_json_delta":
                current_tool_input += event.delta.partial_json  # accumulate
        elif event.type == "content_block_stop":
            if current_tool_input:
                import json
                tool_args = json.loads(current_tool_input)
                # Now execute the tool...
                current_tool_input = ""

    final = stream.get_final_message()
    # Use final.content to build the next turn of the conversation

💡 Always call get_final_message() after streaming. This gives you the complete, assembled message including all content blocks — safe to append directly to your conversation history. Never try to assemble the message yourself from streaming chunks.

🌐

FastAPI Streaming Endpoint — SSE Pattern

Production Pattern
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import anthropic, asyncio, json

app = FastAPI()
client = anthropic.AsyncAnthropic()   # async client for FastAPI

class ChatRequest(BaseModel):
    message:  str
    session_id: str = "default"

# ── Streaming endpoint ────────────────────────────────
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):

    async def generate():
        async with client.messages.stream(
            model="claude-3-5-sonnet-20241022",
            max_tokens=2048,
            messages=[{"role": "user", "content": request.message}]
        ) as stream:
            async for text in stream.text_stream:
                # SSE format: "data: {payload}

"
                yield f"data: {json.dumps({'text': text})}

"

            # Send final event with usage stats
            final = await stream.get_final_message()
            yield f"data: {json.dumps({'done': True, 'usage': {'input': final.usage.input_tokens, 'output': final.usage.output_tokens}})}

"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",   # disable nginx buffering
        }
    )
🖥

Consuming the Stream — Browser JavaScript

Frontend Pattern
// Browser JavaScript — EventSource API for SSE
async function streamChat(message) {
  const response = await fetch('/chat/stream', {
    method: 'POST',
    headers: {'Content-Type': 'application/json'},
    body: JSON.stringify({message})
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const {done, value} = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, {stream: true});
    const lines = buffer.split('\n');
    buffer = lines.pop();   // keep incomplete line in buffer

    for (const line of lines) {
      if (!line.startsWith('data: ')) continue;
      const data = JSON.parse(line.slice(6));

      if (data.text) {
        document.getElementById('response').textContent += data.text;
      }
      if (data.done) {
        console.log('Tokens:', data.usage);
      }
    }
  }
}

// Python client consuming the stream
import httpx, json

async def consume_stream(message: str):
    async with httpx.AsyncClient(timeout=60) as http:
        async with http.stream("POST", "http://localhost:8000/chat/stream",
                               json={"message": message}) as response:
            async for line in response.aiter_lines():
                if not line.startswith("data: "):
                    continue
                data = json.loads(line[6:])
                if data.get("text"):
                    print(data["text"], end="", flush=True)
⚠️

Streaming Gotchas

Common Issues
  • Nginx buffering — nginx buffers responses by default. Add X-Accel-Buffering: no header to disable or configure proxy_buffering off in nginx config.
  • Error handling mid-stream — errors can occur after streaming has started (e.g. rate limit hit at token 500). Wrap your generator in try/except and send an error SSE event before closing.
  • Connection drops — if the client disconnects mid-stream, FastAPI's StreamingResponse will raise a anyio.EndOfStream. Handle this gracefully.
  • Buffered proxies — some cloud platforms (AWS ALB, certain CDNs) buffer SSE. Use WebSockets instead if SSE is unreliable in your deployment.
# Error-safe streaming generator
async def generate_safe():
    try:
        async with client.messages.stream(...) as stream:
            async for text in stream.text_stream:
                yield f"data: {json.dumps({'text': text})}

"
    except anthropic.RateLimitError:
        yield f"data: {json.dumps({'error': 'rate_limit', 'message': 'Too many requests. Please wait a moment.'})}

"
    except anthropic.APIStatusError as e:
        yield f"data: {json.dumps({'error': 'api_error', 'status': e.status_code})}

"
    except Exception as e:
        yield f"data: {json.dumps({'error': 'unknown', 'message': str(e)})}

"
    finally:
        yield "data: {"done": true}

"   # always send done event
💬

The Messages Array — LLMs Are Stateless

Critical Concept

LLMs have no memory between API calls. Every call is completely independent. The only "memory" is the messages array you send. Your code is entirely responsible for maintaining conversation state.

# ── WRONG — no conversation state ─────────────────────
response1 = client.messages.create(
    messages=[{"role": "user", "content": "My name is Ajay"}], ...
)
response2 = client.messages.create(
    messages=[{"role": "user", "content": "What is my name?"}], ...
)
# Model: "I don't know your name" — it never saw the first message

# ── CORRECT — full history sent every call ─────────────
messages = []

# Turn 1
messages.append({"role": "user", "content": "My name is Ajay"})
response = client.messages.create(model="...", max_tokens=512, messages=messages)
messages.append({"role": "assistant", "content": response.content[0].text})

# Turn 2
messages.append({"role": "user", "content": "What is my name?"})
response = client.messages.create(model="...", max_tokens=512, messages=messages)
messages.append({"role": "assistant", "content": response.content[0].text})
# Model: "Your name is Ajay" — it sees the full history
🏗

ConversationManager — Clean State Pattern

Production Class
import anthropic
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime

@dataclass
class Turn:
    role:      str
    content:   str
    timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
    tokens:    Optional[int] = None

class ConversationManager:
    def __init__(
        self,
        system_prompt: str = "",
        model: str = "claude-3-5-sonnet-20241022",
        max_tokens: int = 2048,
    ):
        self.client        = anthropic.Anthropic()
        self.system_prompt = system_prompt
        self.model         = model
        self.max_tokens    = max_tokens
        self.history: list[Turn] = []

    def _build_messages(self) -> list[dict]:
        return [
            {"role": t.role, "content": t.content}
            for t in self.history
        ]

    def chat(self, user_message: str) -> str:
        self.history.append(Turn(role="user", content=user_message))

        response = self.client.messages.create(
            model=self.model,
            max_tokens=self.max_tokens,
            system=self.system_prompt,
            messages=self._build_messages()
        )

        reply = response.content[0].text
        self.history.append(Turn(
            role="assistant",
            content=reply,
            tokens=response.usage.output_tokens
        ))
        return reply

    def chat_stream(self, user_message: str):
        self.history.append(Turn(role="user", content=user_message))
        full_reply = ""

        with self.client.messages.stream(
            model=self.model,
            max_tokens=self.max_tokens,
            system=self.system_prompt,
            messages=self._build_messages()
        ) as stream:
            for text in stream.text_stream:
                full_reply += text
                yield text
            final = stream.get_final_message()

        self.history.append(Turn(
            role="assistant",
            content=full_reply,
            tokens=final.usage.output_tokens
        ))

    def clear(self):
        self.history = []

    @property
    def turn_count(self) -> int:
        return len([t for t in self.history if t.role == "user"])

# Usage
conv = ConversationManager(system_prompt="You are a helpful coding assistant.")
print(conv.chat("My name is Ajay and I work with DPDK"))
print(conv.chat("What is my name and what technology do I work with?"))
print(conv.turn_count)   # 2
🎯

System Prompt Design for Multi-Turn

Best Practice
# System prompt rules for multi-turn conversations

# 1. State what information the model should REMEMBER across turns
system = """You are a helpful coding assistant.

Remember and use throughout the conversation:
- The user's name and role (if they tell you)
- Programming languages they work with
- Specific codebase or project context they share
- Decisions made in earlier turns of this conversation

When the user references "the function" or "the code" without specifying,
use context from earlier in the conversation."""

# 2. Define how to handle ambiguous references
system += """

If a reference is ambiguous and cannot be resolved from context,
ask for clarification before answering."""

# 3. Set turn-specific behaviour
system += """

For code reviews: always reference specific line numbers.
For debugging: always ask to see the error message if not provided."""
📏

Context Windows and Token Counting

Cost Control

Every token in your context costs money and increases latency. As conversations grow, managing context becomes essential.

System 15%
History 55%
Reserve 20%
Avail 10%
System prompt Conversation history Reserved for response Available for next turn
pip install tiktoken   # OpenAI's fast token counter — works for Claude too (approx)

import tiktoken

def count_tokens(text: str, model: str = "cl100k_base") -> int:
    """Approximate token count. cl100k_base works for GPT-4 and Claude."""
    enc = tiktoken.get_encoding(model)
    return len(enc.encode(text))

def count_messages_tokens(messages: list[dict]) -> int:
    total = 0
    for msg in messages:
        total += count_tokens(msg["content"])
        total += 4   # overhead per message (role + formatting)
    return total + 2   # reply priming tokens

# Check context usage before sending
MAX_CONTEXT = 180_000   # Claude 3.5 Sonnet context window
RESERVE_TOKENS = 4_096  # always reserve for response

def will_fit(messages: list, system: str) -> bool:
    used = count_messages_tokens(messages) + count_tokens(system)
    return used + RESERVE_TOKENS < MAX_CONTEXT

# Anthropic's own token counter (exact, not approximate)
response = client.messages.count_tokens(
    model="claude-3-5-sonnet-20241022",
    system=system_prompt,
    messages=messages
)
print(response.input_tokens)   # exact count before sending
🪟

Sliding Window — Keep Last N Turns

Simple Strategy
def sliding_window(
    messages: list[dict],
    max_tokens: int = 100_000,
    min_turns: int = 2
) -> list[dict]:
    """
    Keep as many recent messages as fit within max_tokens.
    Always keep at least min_turns (user+assistant pairs).
    """
    # Always keep messages in pairs (user + assistant)
    # Start from most recent, work backwards
    result = []
    token_count = 0
    pairs = []

    # Group into user+assistant pairs
    i = 0
    while i < len(messages) - 1:
        if messages[i]["role"] == "user" and messages[i+1]["role"] == "assistant":
            pairs.append((messages[i], messages[i+1]))
            i += 2
        else:
            i += 1

    # Always include last user message
    if messages and messages[-1]["role"] == "user":
        result = [messages[-1]]
        token_count = count_tokens(messages[-1]["content"])
        pairs_to_check = pairs
    else:
        pairs_to_check = pairs

    # Add pairs from most recent backwards until we hit the limit
    included = []
    for user_msg, asst_msg in reversed(pairs_to_check):
        pair_tokens = count_tokens(user_msg["content"]) + count_tokens(asst_msg["content"])
        if token_count + pair_tokens > max_tokens and len(included) >= min_turns:
            break
        included.insert(0, (user_msg, asst_msg))
        token_count += pair_tokens

    for u, a in included:
        result = [u, a] + result

    return result
📝

Summarisation Strategy — Compress Old History

Better Than Sliding Window
async def summarise_old_turns(
    messages: list[dict],
    keep_recent: int = 6
) -> list[dict]:
    """
    When context gets long: summarise all but the last N turns,
    then continue with [summary_message, ...recent_turns].
    """
    if len(messages) <= keep_recent * 2:
        return messages   # not long enough to summarise

    old_turns = messages[:-(keep_recent * 2)]
    recent_turns = messages[-(keep_recent * 2):]

    old_text = "\n".join(
        f"{m['role'].upper()}: {m['content']}"
        for m in old_turns
    )

    summary_response = await async_client.messages.create(
        model="claude-3-haiku-20240307",   # use cheaper model for summarisation
        max_tokens=512,
        messages=[{
            "role": "user",
            "content": f"""Summarise this conversation history concisely.
Preserve: key facts stated by the user, decisions made, context needed for future turns.
Do not include: greetings, filler, repeated information.

{old_text}

Provide a dense 3-5 sentence summary:"""
        }]
    )
    summary = summary_response.content[0].text

    # Replace old history with summary message
    summary_msg = {
        "role": "user",
        "content": f"[Previous conversation summary]: {summary}"
    }
    ack_msg = {
        "role": "assistant",
        "content": "Understood. I've noted the conversation history."
    }
    return [summary_msg, ack_msg] + recent_turns

💡 Use a cheap fast model (Haiku, GPT-4o-mini) for summarisation. Summarising a 20-turn conversation with Claude Haiku costs ~$0.001. Using Sonnet would cost ~$0.05. The quality difference for summarisation is negligible, but the cost difference is 50×.

💾

Storing Conversation History — SQLite

Persistence
import sqlite3, json
from contextlib import contextmanager
from datetime import datetime

@contextmanager
def get_db():
    conn = sqlite3.connect("conversations.db")
    conn.row_factory = sqlite3.Row
    conn.execute("""
        CREATE TABLE IF NOT EXISTS conversations (
            id          TEXT PRIMARY KEY,
            created_at  TEXT NOT NULL,
            updated_at  TEXT NOT NULL,
            system_prompt TEXT DEFAULT ''
        )
    """)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS messages (
            id              INTEGER PRIMARY KEY AUTOINCREMENT,
            conversation_id TEXT NOT NULL REFERENCES conversations(id),
            role            TEXT NOT NULL,
            content         TEXT NOT NULL,
            created_at      TEXT NOT NULL,
            token_count     INTEGER
        )
    """)
    conn.commit()
    try:
        yield conn
    finally:
        conn.close()

def save_turn(conv_id: str, user_msg: str, assistant_msg: str, tokens: int = 0):
    now = datetime.utcnow().isoformat()
    with get_db() as conn:
        # Upsert conversation record
        conn.execute("""
            INSERT INTO conversations (id, created_at, updated_at)
            VALUES (?, ?, ?)
            ON CONFLICT(id) DO UPDATE SET updated_at = ?
        """, (conv_id, now, now, now))
        # Insert both messages
        conn.executemany("""
            INSERT INTO messages (conversation_id, role, content, created_at, token_count)
            VALUES (?, ?, ?, ?, ?)
        """, [
            (conv_id, "user",      user_msg,      now, 0),
            (conv_id, "assistant", assistant_msg, now, tokens),
        ])
        conn.commit()

def load_history(conv_id: str, last_n: int = 0) -> list[dict]:
    with get_db() as conn:
        query = "SELECT role, content FROM messages WHERE conversation_id = ? ORDER BY id"
        rows = conn.execute(query, (conv_id,)).fetchall()
    messages = [{"role": r["role"], "content": r["content"]} for r in rows]
    return messages[-last_n * 2:] if last_n else messages

Redis for High-Traffic Conversations

Scalable
import redis, json

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

def save_message_redis(session_id: str, role: str, content: str, ttl: int = 3600):
    """Store message in Redis list. TTL in seconds (default: 1 hour)."""
    key = f"conv:{session_id}"
    message = json.dumps({"role": role, "content": content})
    r.rpush(key, message)        # append to list
    r.expire(key, ttl)           # reset TTL on each message

def load_history_redis(session_id: str, last_n: int = 0) -> list[dict]:
    """Load conversation history from Redis."""
    key = f"conv:{session_id}"
    start = -last_n * 2 if last_n else 0
    raw = r.lrange(key, start, -1)
    return [json.loads(m) for m in raw]

def clear_history_redis(session_id: str):
    r.delete(f"conv:{session_id}")

# When to use SQLite vs Redis:
# SQLite  — single server, persistent storage, needs query/search, < 1k concurrent users
# Redis   — multi-server, ephemeral (TTL), fast read/write, > 1k concurrent sessions
# Both    — use Redis as cache + SQLite/PostgreSQL for durable archive
🔄

Full Stateful Chat API — Putting It Together

Complete Pattern
from fastapi import FastAPI
from pydantic import BaseModel
import anthropic, uuid

app = FastAPI()
client = anthropic.AsyncAnthropic()

class ChatRequest(BaseModel):
    message:    str
    session_id: str = ""   # empty = new session

class ChatResponse(BaseModel):
    session_id: str
    reply:      str
    turn_count: int

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    session_id = request.session_id or str(uuid.uuid4())

    # Load existing history
    history = load_history_redis(session_id, last_n=10)

    # Add new user message
    history.append({"role": "user", "content": request.message})

    # Call LLM with full history
    response = await client.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=2048,
        system="You are a helpful AI assistant.",
        messages=history
    )
    reply = response.content[0].text

    # Persist both turns
    save_message_redis(session_id, "user",      request.message)
    save_message_redis(session_id, "assistant", reply)

    turn_count = len(history) // 2 + 1
    return ChatResponse(session_id=session_id, reply=reply, turn_count=turn_count)

FREE LEARNING RESOURCES

TypeResourceBest For
DocsAnthropic Streaming Reference — docs.anthropic.com/en/api/messages-streamingComplete SSE event reference for Claude streaming including all event types and formats.
DocsOpenAI Streaming Docs — platform.openai.comOpenAI's streaming API reference with chunk formats.
DocsFastAPI StreamingResponse — fastapi.tiangolo.comOfficial FastAPI documentation on streaming responses.
Librarytiktoken — github.com/openai/tiktokenFast token counter by OpenAI. Works for approximate Claude token counting too.
DocsRedis Python Client — redis.io/docsOfficial redis-py documentation for conversation state storage at scale.

MILESTONE PROJECT

🛠 Streaming Chatbot API with Persistent Sessions [Intermediate] 3–4 days

Build a complete stateful chatbot API with streaming responses, session management, and context window control.

Requirements

  • POST /chat/stream/{session_id} — streaming SSE endpoint. New session if session_id = "new". Streams tokens as they arrive.
  • GET /sessions/{session_id}/history — return full conversation history as JSON
  • DELETE /sessions/{session_id} — clear a session
  • GET /sessions/{session_id}/stats — return: turn count, total tokens used, session age
  • Persist conversation to SQLite — history survives server restarts
  • Context window management: if history exceeds 50k tokens, apply sliding window (keep last 10 turns)
  • Custom system prompt per session — passed in POST body on first message
  • Error-safe streaming — proper SSE error events on API failures

Test it

  • Start a session, have a 10-turn conversation, restart the server, continue — history should persist
  • Test a 20-turn conversation — verify context management kicks in and older turns are dropped
  • Test error handling — disconnect mid-stream and observe how the server handles it

Skills: FastAPI StreamingResponse, SQLite persistence, context window management, SSE protocol, session lifecycle

LAB 1

Streaming — Measure Time to First Token

Objective: Quantify the user experience improvement from streaming with real measurements.

1
Write a function that measures time-to-first-token (TTFT) for streaming: record timestamp when you call messages.stream(), record timestamp when you receive the first text chunk. TTFT = second - first.
2
Write a function that measures total time for non-streaming: time from API call to response.content available.
3
Run both on 5 different prompts ranging from 50 to 500 token outputs. For each: record TTFT (stream), total time (stream), total time (non-stream), and subjective UX rating (1-5).
4
Plot a simple text table: prompt length | stream TTFT | stream total | non-stream total | TTFT speedup.
5
Key insight to document: At what output length does streaming provide the most meaningful UX improvement? At what length is it negligible?
LAB 2

Context Window — Observe Forgetting Without History

Objective: Make the stateless nature of LLMs viscerally obvious — then fix it.

1
Have a 5-turn conversation WITHOUT passing history: tell the model your name, your favourite language, a project you're working on. Then ask "What is my name?" in a new API call. Observe the failure.
2
Repeat with the ConversationManager from Tab 3. Observe that the model correctly answers all questions.
3
Now test context limits: build a very long conversation (50+ turns of substantial messages). Use client.messages.count_tokens() to track token usage after each turn. At what turn does the token count become concerning?
4
Apply the sliding_window() function from Tab 4. Verify: (a) recent turns are preserved, (b) the model can still answer questions from recent context, (c) the model correctly says it cannot recall very old turns.
5
Apply the summarisation strategy. Compare quality: does the model retain important facts from summarised turns? What information gets lost? Is it acceptable for your use case?
LAB 3

Build a Persistent Session Store

Objective: Implement and test the complete persistence pattern end-to-end.

1
Implement the SQLite persistence functions from Tab 5: save_turn(), load_history(), and add a list_sessions() function that returns all session IDs and their last updated time.
2
Build a simple CLI chat loop: ask for user input, call Claude, save both turns to SQLite, print the reply. Accept --session flag to resume an existing session by ID.
3
Test the persistence: start a conversation, note the session ID, kill the process (Ctrl+C), restart with --session [id], verify the model has context from the previous run.
4
Add a --summarise flag that applies the summarisation strategy when history exceeds 20 turns. Verify the summarised history loads correctly on the next session.
5
Extension: Replace SQLite with Redis. Set TTL to 10 minutes. Verify that after 10 minutes of inactivity, the session is automatically cleaned up.

P4-M13 MASTERY CHECKLIST

When complete: Move to P4-M14 — Reliability, Cost & Security. This is the final Part 4 module — covering retries, rate limit handling, cost monitoring, and prompt injection defence before you move to RAG in Part 5.

← P4-M12: Structured Outputs 🗺️ All Modules Next: P4-M14 — Reliability & Security →