Part 7 — Production & Deployment  ·  Module 23 of 27
FastAPI Production Patterns
Structure, middleware, dependency injection, and async patterns for AI-powered APIs
⏱ 1 Week 🟡 Intermediate 🔧 FastAPI · Pydantic v2 · uvicorn · gunicorn 📋 Prerequisite: P4-M14
🎯

What This Module Covers

Part 7 Start

Parts 1–6 taught you to build AI systems. Part 7 teaches you to ship them. FastAPI is the standard Python framework for AI APIs — it is async-native, type-safe, and generates OpenAPI docs automatically. This module covers the production patterns that take a working FastAPI app to a deployable service.

  • App structure — routers, lifespan events, settings, project layout for production
  • Dependency injection — sharing LLM clients, DB connections, and config across endpoints
  • Middleware — request logging, rate limiting, CORS, error handling
  • Async patterns — background tasks, concurrent requests, avoiding blocking calls
  • Authentication — API key validation, JWT tokens, per-user rate limiting
  • Deployment — uvicorn + gunicorn, health checks, graceful shutdown
🏗

Production App Structure

Layout
# Production FastAPI project layout
#
# app/
# ├── main.py          ← app factory, lifespan, mount routers
# ├── config.py        ← settings from environment variables
# ├── dependencies.py  ← shared clients (LLM, DB, cache)
# ├── middleware.py     ← logging, rate limiting, CORS
# ├── routers/
# │   ├── chat.py      ← /chat endpoints
# │   ├── rag.py       ← /search, /ask endpoints
# │   └── admin.py     ← /health, /metrics
# ├── models/
# │   ├── requests.py  ← Pydantic request models
# │   └── responses.py ← Pydantic response models
# └── services/
#     ├── llm.py       ← LLM call wrappers
#     └── rag.py       ← retrieval pipeline

# config.py — all settings from environment
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    anthropic_api_key: str
    openai_api_key:    str = ""
    database_url:      str = "sqlite:///./app.db"
    redis_url:         str = "redis://localhost:6379"
    api_key_secret:    str = "change-me-in-production"
    max_requests_per_minute: int = 60
    environment:       str = "development"

    class Config:
        env_file = ".env"

@lru_cache
def get_settings() -> Settings:
    return Settings()

# main.py — app factory with lifespan
from contextlib import asynccontextmanager
from fastapi import FastAPI
import anthropic, chromadb

app_state = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: initialise shared resources once
    settings = get_settings()
    app_state["llm_client"]  = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
    app_state["vector_db"]   = chromadb.PersistentClient(path="./chroma_db")
    print("✓ App started")
    yield
    # Shutdown: clean up
    await app_state["llm_client"].close()
    print("✓ App stopped")

app = FastAPI(title="AI API", version="1.0.0", lifespan=lifespan)

# Mount routers
from routers import chat, rag, admin
app.include_router(chat.router,  prefix="/chat",  tags=["chat"])
app.include_router(rag.router,   prefix="/rag",   tags=["rag"])
app.include_router(admin.router, prefix="/admin", tags=["admin"])
⚙️

Dependency Injection — Share Without Global State

Core Pattern
from fastapi import Depends, Request
import anthropic

# dependencies.py — all shared resource providers

def get_llm_client(request: Request) -> anthropic.AsyncAnthropic:
    """Provide the shared LLM client initialised at startup."""
    return request.app.state.llm_client   # stored in lifespan

def get_settings_dep() -> Settings:
    return get_settings()

def get_vector_db(request: Request):
    return request.app.state.vector_db

# Alternative: use app_state dict from lifespan
def get_llm(request: Request) -> anthropic.AsyncAnthropic:
    return app_state["llm_client"]

# In routers — inject dependencies cleanly
from fastapi import APIRouter
from pydantic import BaseModel
from typing import Annotated

router = APIRouter()

class ChatRequest(BaseModel):
    message:    str
    session_id: str = ""
    max_tokens: int = 1024

class ChatResponse(BaseModel):
    reply:      str
    session_id: str
    tokens_used: int

# Type-aliased dependency for cleaner signatures
LLMDep      = Annotated[anthropic.AsyncAnthropic, Depends(get_llm_client)]
SettingsDep = Annotated[Settings, Depends(get_settings_dep)]

@router.post("/message", response_model=ChatResponse)
async def send_message(
    request: ChatRequest,
    client:  LLMDep,         # injected — no global state
    settings: SettingsDep,   # injected — type-safe settings
) -> ChatResponse:
    response = await client.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=request.max_tokens,
        messages=[{"role": "user", "content": request.message}]
    )
    return ChatResponse(
        reply=response.content[0].text,
        session_id=request.session_id or "anon",
        tokens_used=response.usage.input_tokens + response.usage.output_tokens
    )

💡 Never create LLM clients inside endpoint functions. Creating a new anthropic.AsyncAnthropic() on every request means creating a new HTTP connection pool on every request — a significant performance penalty. Always initialise clients once at startup via lifespan and share via dependency injection.

🛡

Middleware — Request Lifecycle Hooks

Cross-Cutting
from fastapi import FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
import time, uuid, structlog

logger = structlog.get_logger()

# ── 1. Request ID + Timing middleware ─────────────────
class RequestLoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next) -> Response:
        request_id = str(uuid.uuid4())[:8]
        start      = time.perf_counter()

        # Attach request_id to context for all logs in this request
        structlog.contextvars.bind_contextvars(request_id=request_id)

        response = await call_next(request)

        elapsed = round((time.perf_counter() - start) * 1000, 1)
        logger.info("http_request",
                    method=request.method,
                    path=request.url.path,
                    status=response.status_code,
                    latency_ms=elapsed,
                    request_id=request_id)

        response.headers["X-Request-ID"] = request_id
        structlog.contextvars.clear_contextvars()
        return response

# ── 2. Rate limiting middleware ───────────────────────
import asyncio
from collections import defaultdict

class RateLimitMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, calls_per_minute: int = 60):
        super().__init__(app)
        self.calls_per_minute = calls_per_minute
        self._counts: dict[str, list] = defaultdict(list)

    def _get_client_id(self, request: Request) -> str:
        return request.headers.get("X-API-Key", request.client.host)

    async def dispatch(self, request: Request, call_next) -> Response:
        client_id = self._get_client_id(request)
        now       = time.time()
        window    = [t for t in self._counts[client_id] if now - t < 60]

        if len(window) >= self.calls_per_minute:
            return Response(
                content='{"detail":"Rate limit exceeded. Try again in 60 seconds."}',
                status_code=429,
                headers={"Retry-After": "60",
                         "Content-Type": "application/json"}
            )
        self._counts[client_id] = window + [now]
        return await call_next(request)

# ── 3. Global exception handler ───────────────────────
from fastapi import HTTPException
from fastapi.responses import JSONResponse

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    logger.error("unhandled_exception", path=request.url.path, error=str(exc))
    return JSONResponse(
        status_code=500,
        content={"detail": "Internal server error",
                 "request_id": request.headers.get("X-Request-ID")}
    )

# ── Register all middleware ────────────────────────────
app.add_middleware(RequestLoggingMiddleware)
app.add_middleware(RateLimitMiddleware, calls_per_minute=60)
app.add_middleware(CORSMiddleware,
    allow_origins=["https://yourdomain.com"],   # never "*" in production
    allow_methods=["GET", "POST"],
    allow_headers=["*"])

Async Patterns for AI APIs

Performance
from fastapi import BackgroundTasks
import asyncio

# ── Background tasks — fire and forget ───────────────
# Use for: logging, analytics, cache warming, notifications
# Do NOT use for: work the user needs to see in the response

async def log_usage_async(user_id: str, tokens: int, cost: float):
    """Run after response is sent — user doesn't wait for this."""
    await asyncio.sleep(0)   # yield to event loop
    await db.insert_usage(user_id, tokens, cost)

@router.post("/chat")
async def chat_with_logging(
    request: ChatRequest,
    background_tasks: BackgroundTasks,
    client: LLMDep
):
    response = await client.messages.create(...)
    reply    = response.content[0].text

    # Schedule logging AFTER response is sent
    background_tasks.add_task(
        log_usage_async,
        user_id=request.session_id,
        tokens=response.usage.output_tokens,
        cost=response.usage.output_tokens * 15e-6
    )
    return {"reply": reply}   # returned immediately; logging runs after

# ── Never block the event loop ────────────────────────
import asyncio

# BAD: blocks the entire event loop — other requests wait
@router.get("/bad")
async def bad_endpoint():
    import time
    time.sleep(5)   # blocks! no other requests can run during this
    return {"ok": True}

# GOOD: yields to event loop
@router.get("/good")
async def good_endpoint():
    await asyncio.sleep(5)   # other requests run while waiting
    return {"ok": True}

# For CPU-bound work: run in thread pool
import functools

@router.post("/embed")
async def embed_text(text: str):
    loop  = asyncio.get_event_loop()
    # Run sync embedding model in thread pool — doesn't block event loop
    embed = await loop.run_in_executor(
        None,
        functools.partial(sync_embedding_model.encode, text)
    )
    return {"embedding": embed.tolist()}

⚠️ Every time.sleep(), synchronous DB call, or CPU-heavy operation inside an async def blocks the entire FastAPI event loop. While your endpoint sleeps, every other concurrent request waits. Use asyncio.sleep() for delays, run_in_executor() for CPU work, and async DB drivers (asyncpg, motor) for database calls.

🔐

Authentication — API Keys and JWT

Security
from fastapi import Security, HTTPException, status
from fastapi.security import APIKeyHeader
import secrets, hashlib

API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=False)

# ── Simple API key validation ─────────────────────────
VALID_KEYS = {  # in prod, store hashed keys in DB
    hashlib.sha256("sk-dev-key-1".encode()).hexdigest(): {"user_id": "user_1", "tier": "free"},
    hashlib.sha256("sk-prod-key-1".encode()).hexdigest(): {"user_id": "user_2", "tier": "pro"},
}

async def require_api_key(api_key: str = Security(API_KEY_HEADER)):
    if not api_key:
        raise HTTPException(status_code=401, detail="API key required")
    key_hash = hashlib.sha256(api_key.encode()).hexdigest()
    user = VALID_KEYS.get(key_hash)
    if not user:
        raise HTTPException(status_code=403, detail="Invalid API key")
    return user

# Type alias for clean signatures
AuthUser = Annotated[dict, Security(require_api_key)]

@router.post("/ask")
async def ask(request: RAGRequest, user: AuthUser, client: LLMDep):
    # user = {"user_id": "user_2", "tier": "pro"}
    if user["tier"] == "free" and len(request.question) > 500:
        raise HTTPException(status_code=402, detail="Upgrade to Pro for longer questions")
    ...

# ── JWT with python-jose ──────────────────────────────
pip install python-jose[cryptography]

from jose import JWTError, jwt
from datetime import datetime, timedelta

SECRET_KEY = "your-256-bit-secret"   # from environment in prod
ALGORITHM  = "HS256"

def create_access_token(user_id: str, expires_minutes: int = 60) -> str:
    payload = {"sub": user_id,
               "exp": datetime.utcnow() + timedelta(minutes=expires_minutes)}
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

async def get_current_user_jwt(token: str = Security(oauth2_scheme)) -> str:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload["sub"]
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid or expired token")
🚀

Deployment — uvicorn + gunicorn + Health Checks

Ship It
# ── Development server ────────────────────────────────
uvicorn app.main:app --reload --port 8000

# ── Production: gunicorn manages uvicorn workers ──────
# workers = (2 × CPU cores) + 1 is the standard formula
gunicorn app.main:app   --worker-class uvicorn.workers.UvicornWorker   --workers 4   --bind 0.0.0.0:8000   --timeout 120   --graceful-timeout 30   --access-logfile -   --error-logfile -

# ── Health check endpoints ────────────────────────────
# /health — fast liveness check (load balancer uses this)
# /ready  — readiness check (DB connected, model loaded)

@router.get("/health")
async def health():
    return {"status": "ok", "timestamp": datetime.utcnow().isoformat()}

@router.get("/ready")
async def readiness(client: LLMDep, request: Request):
    checks = {}
    # Check LLM API reachable
    try:
        await client.messages.count_tokens(
            model="claude-3-haiku-20240307",
            messages=[{"role": "user", "content": "ping"}]
        )
        checks["llm"] = "ok"
    except Exception as e:
        checks["llm"] = f"error: {e}"

    # Check vector DB
    try:
        vdb = request.app.state.vector_db
        vdb.heartbeat()
        checks["vector_db"] = "ok"
    except Exception as e:
        checks["vector_db"] = f"error: {e}"

    all_ok = all(v == "ok" for v in checks.values())
    return JSONResponse(
        status_code=200 if all_ok else 503,
        content={"status": "ready" if all_ok else "degraded", "checks": checks}
    )

FREE LEARNING RESOURCES

TypeResourceBest For
DocsFastAPI: Bigger Applications — fastapi.tiangolo.comOfficial guide on routers, dependencies, and project structure for production apps.
DocsFastAPI: Dependencies — fastapi.tiangolo.comComplete dependency injection documentation including yield dependencies and lifespan.
DocsUvicorn Deployment Guide — uvicorn.org/deploymentProduction deployment with gunicorn workers, systemd, and supervisor.
LibraryPydantic Settings — docs.pydantic.devEnvironment variable management with type safety. Essential for production config.
🛠 Production-Ready AI API — Full FastAPI App [Intermediate] 3–4 days

Wrap your M18 RAG system and M21 agent in a production-grade FastAPI application.

Requirements

  • Structure — routers, models, services, config using pydantic-settings
  • Lifespan — LLM client and vector DB initialised once at startup, cleaned up on shutdown
  • Dependency injection — no global state; all resources injected via Depends()
  • Middleware — request logging (request_id, path, latency), rate limiting (60 req/min)
  • CORS — configured for your frontend domain only
  • Auth — API key validation via X-API-Key header
  • Endpoints — POST /rag/ask, POST /chat/message (streaming), GET /admin/health, GET /admin/ready
  • Deployment — gunicorn config, .env file, Procfile for cloud deployment

Skills: pydantic-settings, lifespan, dependency injection, BaseHTTPMiddleware, APIKeyHeader, gunicorn

LAB 1

Lifespan and Dependency Injection

Objective: Verify that shared resources are initialised once and injected correctly.

1
Build the lifespan function that creates an anthropic.AsyncAnthropic client. Add a print statement with an ID (id(client)) to confirm it is the same object across requests.
2
Create a dependency get_llm_client() and inject it into 3 endpoints. Add the same id() print. Verify all 3 print the same ID — proving the client is shared.
3
Load all settings from a .env file using pydantic-settings. Verify a missing required variable raises a clear ValidationError at startup (not at request time).
4
Test graceful shutdown: send a request, then Ctrl+C while it is in progress. Does the lifespan cleanup run? Does the in-progress request complete or get cut off?
LAB 2

Middleware Stack

Objective: Build and verify the middleware stack works correctly in combination.

1
Add RequestLoggingMiddleware. Verify every request produces one JSON log line with method, path, status, latency_ms, and request_id. Verify the X-Request-ID header appears in the response.
2
Add RateLimitMiddleware (5 req/minute for testing). Send 6 requests in rapid succession. Verify the 6th returns 429 with a Retry-After header.
3
Trigger the global exception handler: add a route that raises an unhandled ValueError. Verify the response is 500 JSON (not an HTML traceback) and the error is logged.
4
Test CORS: use a browser fetch() from a different origin. Verify that allowed origins work and blocked origins get a CORS error.
LAB 3

Async Safety

Objective: Verify that blocking code kills concurrency and fix it.

1
Create two endpoints: /slow-sync (time.sleep(3) inside async def) and /slow-async (await asyncio.sleep(3)). Send 3 concurrent requests to each using httpx.AsyncClient with asyncio.gather.
2
Measure total time for 3 concurrent requests to /slow-sync vs /slow-async. /slow-sync should take ~9s (sequential). /slow-async should take ~3s (parallel).
3
Fix /slow-sync using run_in_executor(). Re-measure. Verify it now takes ~3s for 3 concurrent requests.
4
Add a background task to an endpoint. Verify: the response arrives before the background task completes (add asyncio.sleep(2) in the task and confirm response arrives in <1s).

P7-M23 MASTERY CHECKLIST

When complete: Move to P7-M24 — Docker & Background Jobs. Your API is production-structured. M24 covers containerisation and offloading heavy work to background workers.

← P6-M22: Evaluation 🗺️ All Modules Next: P7-M24 — Docker & Background Jobs →