What This Module Covers
Part 7 StartParts 1–6 taught you to build AI systems. Part 7 teaches you to ship them. FastAPI is the standard Python framework for AI APIs — it is async-native, type-safe, and generates OpenAPI docs automatically. This module covers the production patterns that take a working FastAPI app to a deployable service.
- App structure — routers, lifespan events, settings, project layout for production
- Dependency injection — sharing LLM clients, DB connections, and config across endpoints
- Middleware — request logging, rate limiting, CORS, error handling
- Async patterns — background tasks, concurrent requests, avoiding blocking calls
- Authentication — API key validation, JWT tokens, per-user rate limiting
- Deployment — uvicorn + gunicorn, health checks, graceful shutdown
Production App Structure
Layout# Production FastAPI project layout # # app/ # ├── main.py ← app factory, lifespan, mount routers # ├── config.py ← settings from environment variables # ├── dependencies.py ← shared clients (LLM, DB, cache) # ├── middleware.py ← logging, rate limiting, CORS # ├── routers/ # │ ├── chat.py ← /chat endpoints # │ ├── rag.py ← /search, /ask endpoints # │ └── admin.py ← /health, /metrics # ├── models/ # │ ├── requests.py ← Pydantic request models # │ └── responses.py ← Pydantic response models # └── services/ # ├── llm.py ← LLM call wrappers # └── rag.py ← retrieval pipeline # config.py — all settings from environment from pydantic_settings import BaseSettings from functools import lru_cache class Settings(BaseSettings): anthropic_api_key: str openai_api_key: str = "" database_url: str = "sqlite:///./app.db" redis_url: str = "redis://localhost:6379" api_key_secret: str = "change-me-in-production" max_requests_per_minute: int = 60 environment: str = "development" class Config: env_file = ".env" @lru_cache def get_settings() -> Settings: return Settings() # main.py — app factory with lifespan from contextlib import asynccontextmanager from fastapi import FastAPI import anthropic, chromadb app_state = {} @asynccontextmanager async def lifespan(app: FastAPI): # Startup: initialise shared resources once settings = get_settings() app_state["llm_client"] = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key) app_state["vector_db"] = chromadb.PersistentClient(path="./chroma_db") print("✓ App started") yield # Shutdown: clean up await app_state["llm_client"].close() print("✓ App stopped") app = FastAPI(title="AI API", version="1.0.0", lifespan=lifespan) # Mount routers from routers import chat, rag, admin app.include_router(chat.router, prefix="/chat", tags=["chat"]) app.include_router(rag.router, prefix="/rag", tags=["rag"]) app.include_router(admin.router, prefix="/admin", tags=["admin"])
Dependency Injection — Share Without Global State
Core Patternfrom fastapi import Depends, Request import anthropic # dependencies.py — all shared resource providers def get_llm_client(request: Request) -> anthropic.AsyncAnthropic: """Provide the shared LLM client initialised at startup.""" return request.app.state.llm_client # stored in lifespan def get_settings_dep() -> Settings: return get_settings() def get_vector_db(request: Request): return request.app.state.vector_db # Alternative: use app_state dict from lifespan def get_llm(request: Request) -> anthropic.AsyncAnthropic: return app_state["llm_client"] # In routers — inject dependencies cleanly from fastapi import APIRouter from pydantic import BaseModel from typing import Annotated router = APIRouter() class ChatRequest(BaseModel): message: str session_id: str = "" max_tokens: int = 1024 class ChatResponse(BaseModel): reply: str session_id: str tokens_used: int # Type-aliased dependency for cleaner signatures LLMDep = Annotated[anthropic.AsyncAnthropic, Depends(get_llm_client)] SettingsDep = Annotated[Settings, Depends(get_settings_dep)] @router.post("/message", response_model=ChatResponse) async def send_message( request: ChatRequest, client: LLMDep, # injected — no global state settings: SettingsDep, # injected — type-safe settings ) -> ChatResponse: response = await client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=request.max_tokens, messages=[{"role": "user", "content": request.message}] ) return ChatResponse( reply=response.content[0].text, session_id=request.session_id or "anon", tokens_used=response.usage.input_tokens + response.usage.output_tokens )
💡 Never create LLM clients inside endpoint functions. Creating a new anthropic.AsyncAnthropic() on every request means creating a new HTTP connection pool on every request — a significant performance penalty. Always initialise clients once at startup via lifespan and share via dependency injection.
Middleware — Request Lifecycle Hooks
Cross-Cuttingfrom fastapi import FastAPI, Request, Response from fastapi.middleware.cors import CORSMiddleware from starlette.middleware.base import BaseHTTPMiddleware import time, uuid, structlog logger = structlog.get_logger() # ── 1. Request ID + Timing middleware ───────────────── class RequestLoggingMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next) -> Response: request_id = str(uuid.uuid4())[:8] start = time.perf_counter() # Attach request_id to context for all logs in this request structlog.contextvars.bind_contextvars(request_id=request_id) response = await call_next(request) elapsed = round((time.perf_counter() - start) * 1000, 1) logger.info("http_request", method=request.method, path=request.url.path, status=response.status_code, latency_ms=elapsed, request_id=request_id) response.headers["X-Request-ID"] = request_id structlog.contextvars.clear_contextvars() return response # ── 2. Rate limiting middleware ─────────────────────── import asyncio from collections import defaultdict class RateLimitMiddleware(BaseHTTPMiddleware): def __init__(self, app, calls_per_minute: int = 60): super().__init__(app) self.calls_per_minute = calls_per_minute self._counts: dict[str, list] = defaultdict(list) def _get_client_id(self, request: Request) -> str: return request.headers.get("X-API-Key", request.client.host) async def dispatch(self, request: Request, call_next) -> Response: client_id = self._get_client_id(request) now = time.time() window = [t for t in self._counts[client_id] if now - t < 60] if len(window) >= self.calls_per_minute: return Response( content='{"detail":"Rate limit exceeded. Try again in 60 seconds."}', status_code=429, headers={"Retry-After": "60", "Content-Type": "application/json"} ) self._counts[client_id] = window + [now] return await call_next(request) # ── 3. Global exception handler ─────────────────────── from fastapi import HTTPException from fastapi.responses import JSONResponse @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): logger.error("unhandled_exception", path=request.url.path, error=str(exc)) return JSONResponse( status_code=500, content={"detail": "Internal server error", "request_id": request.headers.get("X-Request-ID")} ) # ── Register all middleware ──────────────────────────── app.add_middleware(RequestLoggingMiddleware) app.add_middleware(RateLimitMiddleware, calls_per_minute=60) app.add_middleware(CORSMiddleware, allow_origins=["https://yourdomain.com"], # never "*" in production allow_methods=["GET", "POST"], allow_headers=["*"])
Async Patterns for AI APIs
Performancefrom fastapi import BackgroundTasks import asyncio # ── Background tasks — fire and forget ─────────────── # Use for: logging, analytics, cache warming, notifications # Do NOT use for: work the user needs to see in the response async def log_usage_async(user_id: str, tokens: int, cost: float): """Run after response is sent — user doesn't wait for this.""" await asyncio.sleep(0) # yield to event loop await db.insert_usage(user_id, tokens, cost) @router.post("/chat") async def chat_with_logging( request: ChatRequest, background_tasks: BackgroundTasks, client: LLMDep ): response = await client.messages.create(...) reply = response.content[0].text # Schedule logging AFTER response is sent background_tasks.add_task( log_usage_async, user_id=request.session_id, tokens=response.usage.output_tokens, cost=response.usage.output_tokens * 15e-6 ) return {"reply": reply} # returned immediately; logging runs after # ── Never block the event loop ──────────────────────── import asyncio # BAD: blocks the entire event loop — other requests wait @router.get("/bad") async def bad_endpoint(): import time time.sleep(5) # blocks! no other requests can run during this return {"ok": True} # GOOD: yields to event loop @router.get("/good") async def good_endpoint(): await asyncio.sleep(5) # other requests run while waiting return {"ok": True} # For CPU-bound work: run in thread pool import functools @router.post("/embed") async def embed_text(text: str): loop = asyncio.get_event_loop() # Run sync embedding model in thread pool — doesn't block event loop embed = await loop.run_in_executor( None, functools.partial(sync_embedding_model.encode, text) ) return {"embedding": embed.tolist()}
⚠️ Every time.sleep(), synchronous DB call, or CPU-heavy operation inside an async def blocks the entire FastAPI event loop. While your endpoint sleeps, every other concurrent request waits. Use asyncio.sleep() for delays, run_in_executor() for CPU work, and async DB drivers (asyncpg, motor) for database calls.
Authentication — API Keys and JWT
Securityfrom fastapi import Security, HTTPException, status from fastapi.security import APIKeyHeader import secrets, hashlib API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=False) # ── Simple API key validation ───────────────────────── VALID_KEYS = { # in prod, store hashed keys in DB hashlib.sha256("sk-dev-key-1".encode()).hexdigest(): {"user_id": "user_1", "tier": "free"}, hashlib.sha256("sk-prod-key-1".encode()).hexdigest(): {"user_id": "user_2", "tier": "pro"}, } async def require_api_key(api_key: str = Security(API_KEY_HEADER)): if not api_key: raise HTTPException(status_code=401, detail="API key required") key_hash = hashlib.sha256(api_key.encode()).hexdigest() user = VALID_KEYS.get(key_hash) if not user: raise HTTPException(status_code=403, detail="Invalid API key") return user # Type alias for clean signatures AuthUser = Annotated[dict, Security(require_api_key)] @router.post("/ask") async def ask(request: RAGRequest, user: AuthUser, client: LLMDep): # user = {"user_id": "user_2", "tier": "pro"} if user["tier"] == "free" and len(request.question) > 500: raise HTTPException(status_code=402, detail="Upgrade to Pro for longer questions") ... # ── JWT with python-jose ────────────────────────────── pip install python-jose[cryptography] from jose import JWTError, jwt from datetime import datetime, timedelta SECRET_KEY = "your-256-bit-secret" # from environment in prod ALGORITHM = "HS256" def create_access_token(user_id: str, expires_minutes: int = 60) -> str: payload = {"sub": user_id, "exp": datetime.utcnow() + timedelta(minutes=expires_minutes)} return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) async def get_current_user_jwt(token: str = Security(oauth2_scheme)) -> str: try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload["sub"] except JWTError: raise HTTPException(status_code=401, detail="Invalid or expired token")
Deployment — uvicorn + gunicorn + Health Checks
Ship It# ── Development server ──────────────────────────────── uvicorn app.main:app --reload --port 8000 # ── Production: gunicorn manages uvicorn workers ────── # workers = (2 × CPU cores) + 1 is the standard formula gunicorn app.main:app --worker-class uvicorn.workers.UvicornWorker --workers 4 --bind 0.0.0.0:8000 --timeout 120 --graceful-timeout 30 --access-logfile - --error-logfile - # ── Health check endpoints ──────────────────────────── # /health — fast liveness check (load balancer uses this) # /ready — readiness check (DB connected, model loaded) @router.get("/health") async def health(): return {"status": "ok", "timestamp": datetime.utcnow().isoformat()} @router.get("/ready") async def readiness(client: LLMDep, request: Request): checks = {} # Check LLM API reachable try: await client.messages.count_tokens( model="claude-3-haiku-20240307", messages=[{"role": "user", "content": "ping"}] ) checks["llm"] = "ok" except Exception as e: checks["llm"] = f"error: {e}" # Check vector DB try: vdb = request.app.state.vector_db vdb.heartbeat() checks["vector_db"] = "ok" except Exception as e: checks["vector_db"] = f"error: {e}" all_ok = all(v == "ok" for v in checks.values()) return JSONResponse( status_code=200 if all_ok else 503, content={"status": "ready" if all_ok else "degraded", "checks": checks} )
FREE LEARNING RESOURCES
| Type | Resource | Best For |
|---|---|---|
| Docs | FastAPI: Bigger Applications — fastapi.tiangolo.com | Official guide on routers, dependencies, and project structure for production apps. |
| Docs | FastAPI: Dependencies — fastapi.tiangolo.com | Complete dependency injection documentation including yield dependencies and lifespan. |
| Docs | Uvicorn Deployment Guide — uvicorn.org/deployment | Production deployment with gunicorn workers, systemd, and supervisor. |
| Library | Pydantic Settings — docs.pydantic.dev | Environment variable management with type safety. Essential for production config. |
Wrap your M18 RAG system and M21 agent in a production-grade FastAPI application.
Requirements
- Structure — routers, models, services, config using pydantic-settings
- Lifespan — LLM client and vector DB initialised once at startup, cleaned up on shutdown
- Dependency injection — no global state; all resources injected via Depends()
- Middleware — request logging (request_id, path, latency), rate limiting (60 req/min)
- CORS — configured for your frontend domain only
- Auth — API key validation via X-API-Key header
- Endpoints — POST /rag/ask, POST /chat/message (streaming), GET /admin/health, GET /admin/ready
- Deployment — gunicorn config, .env file, Procfile for cloud deployment
Skills: pydantic-settings, lifespan, dependency injection, BaseHTTPMiddleware, APIKeyHeader, gunicorn
Lifespan and Dependency Injection
Objective: Verify that shared resources are initialised once and injected correctly.
Middleware Stack
Objective: Build and verify the middleware stack works correctly in combination.
Async Safety
Objective: Verify that blocking code kills concurrency and fix it.
P7-M23 MASTERY CHECKLIST
- Can describe the production FastAPI project layout: main.py, config.py, dependencies.py, middleware.py, routers/, models/, services/
- Can implement pydantic-settings with env_file=".env" and required field validation at startup
- Can implement a lifespan context manager that creates shared clients at startup and cleans up on shutdown
- Know never to create LLM clients inside endpoint functions — always initialise once at startup
- Can implement dependency providers (get_llm_client, get_settings) and inject via Depends()
- Can use Annotated type aliases for clean endpoint signatures
- Can implement RequestLoggingMiddleware with request_id, latency_ms, and structlog
- Can implement RateLimitMiddleware with per-client sliding window and 429 response
- Can configure CORSMiddleware with specific origins (never allow_origins=["*"] in production)
- Can register a global exception_handler that returns JSON 500 (not HTML tracebacks)
- Know that time.sleep() in async def blocks the event loop — always use asyncio.sleep()
- Can offload CPU-bound work to run_in_executor() to avoid blocking the event loop
- Can use BackgroundTasks for fire-and-forget work that runs after the response is sent
- Can implement API key validation with hashed key storage and HTTPException 401/403
- Can run production server: gunicorn with UvicornWorker, correct worker count, graceful timeout
- Can implement /health (fast liveness) and /ready (full dependency check, returns 503 if degraded)
- Completed Lab 1: lifespan + dependency injection verified
- Completed Lab 2: middleware stack built and tested
- Completed Lab 3: async safety verified with timing measurements
- Milestone project pushed to GitHub: production AI API with all patterns
✅ When complete: Move to P7-M24 — Docker & Background Jobs. Your API is production-structured. M24 covers containerisation and offloading heavy work to background workers.