What This Module Covers
UX + ArchitectureTwo skills that separate prototype AI apps from production ones: streaming responses and conversation state management. Without streaming, users stare at a blank screen for 10 seconds. Without proper state management, your chatbot forgets everything after one message.
- Streaming basics — how Server-Sent Events work, streaming from Anthropic and OpenAI SDKs
- Streaming in FastAPI — exposing a streaming endpoint that the browser or client can consume
- Conversation state — the messages array pattern, multi-turn management, turn limits
- Context window management — counting tokens with tiktoken, sliding window, summarisation strategies
- Persistent history — storing and retrieving conversation history from SQLite and Redis
Why These Skills Matter
Context- Streaming — users perceive a streaming response as 3-5× faster than waiting for the same content to appear all at once. Every production LLM app streams.
- Conversation state — LLMs are stateless. Each API call is independent. Your code is responsible for maintaining the illusion of memory by sending the full message history with every request.
- Context management — context windows are expensive. A 200k token context window costs ~20× more per token than a 10k window. You need strategies to keep context lean without losing important information.
- Persistent history — users expect their conversation to survive a page refresh. You need a storage layer.
How Streaming Works — Server-Sent Events
Concept FirstLLM streaming uses Server-Sent Events (SSE) — an HTTP connection stays open and the server pushes data chunks as they are generated. Each chunk contains a few tokens. The client appends them to build the final response.
# Without streaming — user waits 8 seconds, then sees everything at once response = client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=[{"role": "user", "content": "Explain RAG in detail"}] ) print(response.content[0].text) # appears all at once after 8s # With streaming — first token appears in ~300ms, rest stream in with client.messages.stream( model="claude-3-5-sonnet-20241022", max_tokens=1024, messages=[{"role": "user", "content": "Explain RAG in detail"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True) # each chunk printed as it arrives # Get final message after streaming completes final_message = stream.get_final_message() print(f"\nTokens used: {final_message.usage.input_tokens} in, {final_message.usage.output_tokens} out")
Streaming Events — What Actually Arrives
Under the Hood# Low-level event iteration — see every event type with client.messages.stream( model="claude-3-5-sonnet-20241022", max_tokens=512, messages=[{"role": "user", "content": "Say hello"}] ) as stream: for event in stream: match event.type: case "message_start": # First event — contains model, message id print(f"Started: {event.message.id}") case "content_block_start": # A new content block begins (text or tool_use) print(f"Block type: {event.content_block.type}") case "content_block_delta": # A chunk of text (text_delta) or tool input (input_json_delta) if event.delta.type == "text_delta": print(event.delta.text, end="", flush=True) case "content_block_stop": # Content block finished pass case "message_delta": # Stop reason and final token counts print(f"\nStop: {event.delta.stop_reason}") case "message_stop": # Streaming complete print("Stream finished") # OpenAI streaming — similar pattern with client.chat.completions.stream( model="gpt-4o", messages=[{"role": "user", "content": "Say hello"}] ) as stream: for chunk in stream: delta = chunk.choices[0].delta if delta.content: print(delta.content, end="", flush=True)
Streaming with Tool Calls
Advanced# Streaming tool use — tool input arrives in chunks too with client.messages.stream( model="claude-3-5-sonnet-20241022", max_tokens=1024, tools=tools, messages=[{"role": "user", "content": "What's the weather in Mumbai?"}] ) as stream: current_tool_input = "" for event in stream: if event.type == "content_block_start": if event.content_block.type == "tool_use": tool_name = event.content_block.name tool_id = event.content_block.id elif event.type == "content_block_delta": if event.delta.type == "text_delta": print(event.delta.text, end="", flush=True) elif event.delta.type == "input_json_delta": current_tool_input += event.delta.partial_json # accumulate elif event.type == "content_block_stop": if current_tool_input: import json tool_args = json.loads(current_tool_input) # Now execute the tool... current_tool_input = "" final = stream.get_final_message() # Use final.content to build the next turn of the conversation
💡 Always call get_final_message() after streaming. This gives you the complete, assembled message including all content blocks — safe to append directly to your conversation history. Never try to assemble the message yourself from streaming chunks.
FastAPI Streaming Endpoint — SSE Pattern
Production Patternfrom fastapi import FastAPI from fastapi.responses import StreamingResponse from pydantic import BaseModel import anthropic, asyncio, json app = FastAPI() client = anthropic.AsyncAnthropic() # async client for FastAPI class ChatRequest(BaseModel): message: str session_id: str = "default" # ── Streaming endpoint ──────────────────────────────── @app.post("/chat/stream") async def chat_stream(request: ChatRequest): async def generate(): async with client.messages.stream( model="claude-3-5-sonnet-20241022", max_tokens=2048, messages=[{"role": "user", "content": request.message}] ) as stream: async for text in stream.text_stream: # SSE format: "data: {payload} " yield f"data: {json.dumps({'text': text})} " # Send final event with usage stats final = await stream.get_final_message() yield f"data: {json.dumps({'done': True, 'usage': {'input': final.usage.input_tokens, 'output': final.usage.output_tokens}})} " return StreamingResponse( generate(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", # disable nginx buffering } )
Consuming the Stream — Browser JavaScript
Frontend Pattern// Browser JavaScript — EventSource API for SSE async function streamChat(message) { const response = await fetch('/chat/stream', { method: 'POST', headers: {'Content-Type': 'application/json'}, body: JSON.stringify({message}) }); const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; while (true) { const {done, value} = await reader.read(); if (done) break; buffer += decoder.decode(value, {stream: true}); const lines = buffer.split('\n'); buffer = lines.pop(); // keep incomplete line in buffer for (const line of lines) { if (!line.startsWith('data: ')) continue; const data = JSON.parse(line.slice(6)); if (data.text) { document.getElementById('response').textContent += data.text; } if (data.done) { console.log('Tokens:', data.usage); } } } } // Python client consuming the stream import httpx, json async def consume_stream(message: str): async with httpx.AsyncClient(timeout=60) as http: async with http.stream("POST", "http://localhost:8000/chat/stream", json={"message": message}) as response: async for line in response.aiter_lines(): if not line.startswith("data: "): continue data = json.loads(line[6:]) if data.get("text"): print(data["text"], end="", flush=True)
Streaming Gotchas
Common Issues- Nginx buffering — nginx buffers responses by default. Add
X-Accel-Buffering: noheader to disable or configureproxy_buffering offin nginx config. - Error handling mid-stream — errors can occur after streaming has started (e.g. rate limit hit at token 500). Wrap your generator in try/except and send an error SSE event before closing.
- Connection drops — if the client disconnects mid-stream, FastAPI's
StreamingResponsewill raise aanyio.EndOfStream. Handle this gracefully. - Buffered proxies — some cloud platforms (AWS ALB, certain CDNs) buffer SSE. Use WebSockets instead if SSE is unreliable in your deployment.
# Error-safe streaming generator async def generate_safe(): try: async with client.messages.stream(...) as stream: async for text in stream.text_stream: yield f"data: {json.dumps({'text': text})} " except anthropic.RateLimitError: yield f"data: {json.dumps({'error': 'rate_limit', 'message': 'Too many requests. Please wait a moment.'})} " except anthropic.APIStatusError as e: yield f"data: {json.dumps({'error': 'api_error', 'status': e.status_code})} " except Exception as e: yield f"data: {json.dumps({'error': 'unknown', 'message': str(e)})} " finally: yield "data: {"done": true} " # always send done event
The Messages Array — LLMs Are Stateless
Critical ConceptLLMs have no memory between API calls. Every call is completely independent. The only "memory" is the messages array you send. Your code is entirely responsible for maintaining conversation state.
# ── WRONG — no conversation state ───────────────────── response1 = client.messages.create( messages=[{"role": "user", "content": "My name is Ajay"}], ... ) response2 = client.messages.create( messages=[{"role": "user", "content": "What is my name?"}], ... ) # Model: "I don't know your name" — it never saw the first message # ── CORRECT — full history sent every call ───────────── messages = [] # Turn 1 messages.append({"role": "user", "content": "My name is Ajay"}) response = client.messages.create(model="...", max_tokens=512, messages=messages) messages.append({"role": "assistant", "content": response.content[0].text}) # Turn 2 messages.append({"role": "user", "content": "What is my name?"}) response = client.messages.create(model="...", max_tokens=512, messages=messages) messages.append({"role": "assistant", "content": response.content[0].text}) # Model: "Your name is Ajay" — it sees the full history
ConversationManager — Clean State Pattern
Production Classimport anthropic
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
@dataclass
class Turn:
role: str
content: str
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
tokens: Optional[int] = None
class ConversationManager:
def __init__(
self,
system_prompt: str = "",
model: str = "claude-3-5-sonnet-20241022",
max_tokens: int = 2048,
):
self.client = anthropic.Anthropic()
self.system_prompt = system_prompt
self.model = model
self.max_tokens = max_tokens
self.history: list[Turn] = []
def _build_messages(self) -> list[dict]:
return [
{"role": t.role, "content": t.content}
for t in self.history
]
def chat(self, user_message: str) -> str:
self.history.append(Turn(role="user", content=user_message))
response = self.client.messages.create(
model=self.model,
max_tokens=self.max_tokens,
system=self.system_prompt,
messages=self._build_messages()
)
reply = response.content[0].text
self.history.append(Turn(
role="assistant",
content=reply,
tokens=response.usage.output_tokens
))
return reply
def chat_stream(self, user_message: str):
self.history.append(Turn(role="user", content=user_message))
full_reply = ""
with self.client.messages.stream(
model=self.model,
max_tokens=self.max_tokens,
system=self.system_prompt,
messages=self._build_messages()
) as stream:
for text in stream.text_stream:
full_reply += text
yield text
final = stream.get_final_message()
self.history.append(Turn(
role="assistant",
content=full_reply,
tokens=final.usage.output_tokens
))
def clear(self):
self.history = []
@property
def turn_count(self) -> int:
return len([t for t in self.history if t.role == "user"])
# Usage
conv = ConversationManager(system_prompt="You are a helpful coding assistant.")
print(conv.chat("My name is Ajay and I work with DPDK"))
print(conv.chat("What is my name and what technology do I work with?"))
print(conv.turn_count) # 2System Prompt Design for Multi-Turn
Best Practice# System prompt rules for multi-turn conversations # 1. State what information the model should REMEMBER across turns system = """You are a helpful coding assistant. Remember and use throughout the conversation: - The user's name and role (if they tell you) - Programming languages they work with - Specific codebase or project context they share - Decisions made in earlier turns of this conversation When the user references "the function" or "the code" without specifying, use context from earlier in the conversation.""" # 2. Define how to handle ambiguous references system += """ If a reference is ambiguous and cannot be resolved from context, ask for clarification before answering.""" # 3. Set turn-specific behaviour system += """ For code reviews: always reference specific line numbers. For debugging: always ask to see the error message if not provided."""
Context Windows and Token Counting
Cost ControlEvery token in your context costs money and increases latency. As conversations grow, managing context becomes essential.
pip install tiktoken # OpenAI's fast token counter — works for Claude too (approx) import tiktoken def count_tokens(text: str, model: str = "cl100k_base") -> int: """Approximate token count. cl100k_base works for GPT-4 and Claude.""" enc = tiktoken.get_encoding(model) return len(enc.encode(text)) def count_messages_tokens(messages: list[dict]) -> int: total = 0 for msg in messages: total += count_tokens(msg["content"]) total += 4 # overhead per message (role + formatting) return total + 2 # reply priming tokens # Check context usage before sending MAX_CONTEXT = 180_000 # Claude 3.5 Sonnet context window RESERVE_TOKENS = 4_096 # always reserve for response def will_fit(messages: list, system: str) -> bool: used = count_messages_tokens(messages) + count_tokens(system) return used + RESERVE_TOKENS < MAX_CONTEXT # Anthropic's own token counter (exact, not approximate) response = client.messages.count_tokens( model="claude-3-5-sonnet-20241022", system=system_prompt, messages=messages ) print(response.input_tokens) # exact count before sending
Sliding Window — Keep Last N Turns
Simple Strategydef sliding_window(
messages: list[dict],
max_tokens: int = 100_000,
min_turns: int = 2
) -> list[dict]:
"""
Keep as many recent messages as fit within max_tokens.
Always keep at least min_turns (user+assistant pairs).
"""
# Always keep messages in pairs (user + assistant)
# Start from most recent, work backwards
result = []
token_count = 0
pairs = []
# Group into user+assistant pairs
i = 0
while i < len(messages) - 1:
if messages[i]["role"] == "user" and messages[i+1]["role"] == "assistant":
pairs.append((messages[i], messages[i+1]))
i += 2
else:
i += 1
# Always include last user message
if messages and messages[-1]["role"] == "user":
result = [messages[-1]]
token_count = count_tokens(messages[-1]["content"])
pairs_to_check = pairs
else:
pairs_to_check = pairs
# Add pairs from most recent backwards until we hit the limit
included = []
for user_msg, asst_msg in reversed(pairs_to_check):
pair_tokens = count_tokens(user_msg["content"]) + count_tokens(asst_msg["content"])
if token_count + pair_tokens > max_tokens and len(included) >= min_turns:
break
included.insert(0, (user_msg, asst_msg))
token_count += pair_tokens
for u, a in included:
result = [u, a] + result
return resultSummarisation Strategy — Compress Old History
Better Than Sliding Windowasync def summarise_old_turns(
messages: list[dict],
keep_recent: int = 6
) -> list[dict]:
"""
When context gets long: summarise all but the last N turns,
then continue with [summary_message, ...recent_turns].
"""
if len(messages) <= keep_recent * 2:
return messages # not long enough to summarise
old_turns = messages[:-(keep_recent * 2)]
recent_turns = messages[-(keep_recent * 2):]
old_text = "\n".join(
f"{m['role'].upper()}: {m['content']}"
for m in old_turns
)
summary_response = await async_client.messages.create(
model="claude-3-haiku-20240307", # use cheaper model for summarisation
max_tokens=512,
messages=[{
"role": "user",
"content": f"""Summarise this conversation history concisely.
Preserve: key facts stated by the user, decisions made, context needed for future turns.
Do not include: greetings, filler, repeated information.
{old_text}
Provide a dense 3-5 sentence summary:"""
}]
)
summary = summary_response.content[0].text
# Replace old history with summary message
summary_msg = {
"role": "user",
"content": f"[Previous conversation summary]: {summary}"
}
ack_msg = {
"role": "assistant",
"content": "Understood. I've noted the conversation history."
}
return [summary_msg, ack_msg] + recent_turns💡 Use a cheap fast model (Haiku, GPT-4o-mini) for summarisation. Summarising a 20-turn conversation with Claude Haiku costs ~$0.001. Using Sonnet would cost ~$0.05. The quality difference for summarisation is negligible, but the cost difference is 50×.
Storing Conversation History — SQLite
Persistenceimport sqlite3, json
from contextlib import contextmanager
from datetime import datetime
@contextmanager
def get_db():
conn = sqlite3.connect("conversations.db")
conn.row_factory = sqlite3.Row
conn.execute("""
CREATE TABLE IF NOT EXISTS conversations (
id TEXT PRIMARY KEY,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
system_prompt TEXT DEFAULT ''
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conversation_id TEXT NOT NULL REFERENCES conversations(id),
role TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT NOT NULL,
token_count INTEGER
)
""")
conn.commit()
try:
yield conn
finally:
conn.close()
def save_turn(conv_id: str, user_msg: str, assistant_msg: str, tokens: int = 0):
now = datetime.utcnow().isoformat()
with get_db() as conn:
# Upsert conversation record
conn.execute("""
INSERT INTO conversations (id, created_at, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(id) DO UPDATE SET updated_at = ?
""", (conv_id, now, now, now))
# Insert both messages
conn.executemany("""
INSERT INTO messages (conversation_id, role, content, created_at, token_count)
VALUES (?, ?, ?, ?, ?)
""", [
(conv_id, "user", user_msg, now, 0),
(conv_id, "assistant", assistant_msg, now, tokens),
])
conn.commit()
def load_history(conv_id: str, last_n: int = 0) -> list[dict]:
with get_db() as conn:
query = "SELECT role, content FROM messages WHERE conversation_id = ? ORDER BY id"
rows = conn.execute(query, (conv_id,)).fetchall()
messages = [{"role": r["role"], "content": r["content"]} for r in rows]
return messages[-last_n * 2:] if last_n else messagesRedis for High-Traffic Conversations
Scalableimport redis, json r = redis.Redis(host="localhost", port=6379, decode_responses=True) def save_message_redis(session_id: str, role: str, content: str, ttl: int = 3600): """Store message in Redis list. TTL in seconds (default: 1 hour).""" key = f"conv:{session_id}" message = json.dumps({"role": role, "content": content}) r.rpush(key, message) # append to list r.expire(key, ttl) # reset TTL on each message def load_history_redis(session_id: str, last_n: int = 0) -> list[dict]: """Load conversation history from Redis.""" key = f"conv:{session_id}" start = -last_n * 2 if last_n else 0 raw = r.lrange(key, start, -1) return [json.loads(m) for m in raw] def clear_history_redis(session_id: str): r.delete(f"conv:{session_id}") # When to use SQLite vs Redis: # SQLite — single server, persistent storage, needs query/search, < 1k concurrent users # Redis — multi-server, ephemeral (TTL), fast read/write, > 1k concurrent sessions # Both — use Redis as cache + SQLite/PostgreSQL for durable archive
Full Stateful Chat API — Putting It Together
Complete Patternfrom fastapi import FastAPI
from pydantic import BaseModel
import anthropic, uuid
app = FastAPI()
client = anthropic.AsyncAnthropic()
class ChatRequest(BaseModel):
message: str
session_id: str = "" # empty = new session
class ChatResponse(BaseModel):
session_id: str
reply: str
turn_count: int
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
session_id = request.session_id or str(uuid.uuid4())
# Load existing history
history = load_history_redis(session_id, last_n=10)
# Add new user message
history.append({"role": "user", "content": request.message})
# Call LLM with full history
response = await client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=2048,
system="You are a helpful AI assistant.",
messages=history
)
reply = response.content[0].text
# Persist both turns
save_message_redis(session_id, "user", request.message)
save_message_redis(session_id, "assistant", reply)
turn_count = len(history) // 2 + 1
return ChatResponse(session_id=session_id, reply=reply, turn_count=turn_count)FREE LEARNING RESOURCES
| Type | Resource | Best For |
|---|---|---|
| Docs | Anthropic Streaming Reference — docs.anthropic.com/en/api/messages-streaming | Complete SSE event reference for Claude streaming including all event types and formats. |
| Docs | OpenAI Streaming Docs — platform.openai.com | OpenAI's streaming API reference with chunk formats. |
| Docs | FastAPI StreamingResponse — fastapi.tiangolo.com | Official FastAPI documentation on streaming responses. |
| Library | tiktoken — github.com/openai/tiktoken | Fast token counter by OpenAI. Works for approximate Claude token counting too. |
| Docs | Redis Python Client — redis.io/docs | Official redis-py documentation for conversation state storage at scale. |
MILESTONE PROJECT
Build a complete stateful chatbot API with streaming responses, session management, and context window control.
Requirements
- POST /chat/stream/{session_id} — streaming SSE endpoint. New session if session_id = "new". Streams tokens as they arrive.
- GET /sessions/{session_id}/history — return full conversation history as JSON
- DELETE /sessions/{session_id} — clear a session
- GET /sessions/{session_id}/stats — return: turn count, total tokens used, session age
- Persist conversation to SQLite — history survives server restarts
- Context window management: if history exceeds 50k tokens, apply sliding window (keep last 10 turns)
- Custom system prompt per session — passed in POST body on first message
- Error-safe streaming — proper SSE error events on API failures
Test it
- Start a session, have a 10-turn conversation, restart the server, continue — history should persist
- Test a 20-turn conversation — verify context management kicks in and older turns are dropped
- Test error handling — disconnect mid-stream and observe how the server handles it
Skills: FastAPI StreamingResponse, SQLite persistence, context window management, SSE protocol, session lifecycle
Streaming — Measure Time to First Token
Objective: Quantify the user experience improvement from streaming with real measurements.
Context Window — Observe Forgetting Without History
Objective: Make the stateless nature of LLMs viscerally obvious — then fix it.
Build a Persistent Session Store
Objective: Implement and test the complete persistence pattern end-to-end.
P4-M13 MASTERY CHECKLIST
- Can explain why streaming feels faster to users even though total time is the same
- Know the SSE event types: message_start, content_block_start, content_block_delta, message_stop
- Can implement streaming with both Anthropic and OpenAI SDKs
- Can build a FastAPI StreamingResponse endpoint that correctly formats SSE data
- Know to add X-Accel-Buffering: no header to prevent nginx from buffering SSE
- Can handle errors mid-stream — sending error SSE events and always sending a done event
- Can explain why LLMs are stateless and what that means for conversation management
- Can implement a ConversationManager that correctly sends full history with every API call
- Can count tokens with tiktoken and with Anthropic's native count_tokens() method
- Can implement a sliding window strategy that keeps the last N turns within a token budget
- Can implement a summarisation strategy using a cheap model for old conversation turns
- Can persist conversation history to SQLite with proper schema and load it back
- Know when to use Redis vs SQLite for conversation storage — and can implement both
- Always call get_final_message() after streaming to get the complete assembled message
- Completed Lab 1: time-to-first-token measurement
- Completed Lab 2: context window observation and management
- Completed Lab 3: persistent session store end-to-end
- Milestone project pushed to GitHub with README
✅ When complete: Move to P4-M14 — Reliability, Cost & Security. This is the final Part 4 module — covering retries, rate limit handling, cost monitoring, and prompt injection defence before you move to RAG in Part 5.