What This Module Covers
See EverythingA deployed AI system you can't see inside is a liability. When the LLM API starts returning 500 errors at 2am, or a user complains the chatbot is slow, you need instrumentation to diagnose it within seconds — not hours. This module covers the full observability stack.
- Metrics with Prometheus — request rates, latency histograms, LLM token counters, error rates
- Dashboards with Grafana — visualising metrics, setting up standard AI API panels
- Structured logging — production log pipeline, correlation IDs, log aggregation
- Distributed tracing — tracing requests across API + worker + LLM calls
- Error tracking with Sentry — capturing exceptions with context in production
- Alerting — alert rules for error rate, p95 latency, and LLM cost spikes
Prometheus Metrics for AI APIs
Measure Everythingpip install prometheus-client prometheus-fastapi-instrumentator from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST from prometheus_fastapi_instrumentator import Instrumentator from fastapi import FastAPI, Response # ── Standard HTTP metrics (auto-instrumented) ───────── Instrumentator().instrument(app).expose(app) # Provides: http_requests_total, http_request_duration_seconds # ── Custom AI-specific metrics ──────────────────────── # Token usage counter — track cost per model per endpoint llm_tokens = Counter( "llm_tokens_total", "Total LLM tokens used", labelnames=["model", "endpoint", "token_type"] # input | output ) # LLM call latency histogram llm_latency = Histogram( "llm_call_duration_seconds", "LLM API call duration", labelnames=["model", "endpoint"], buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 30.0] ) # RAG retrieval quality gauge rag_avg_score = Gauge( "rag_retrieval_score_avg", "Rolling average RAG retrieval similarity score" ) # Active agent sessions active_agents = Gauge( "agent_sessions_active", "Number of currently running agent sessions" ) # Usage example inside an endpoint import time async def call_llm_instrumented(prompt: str, model: str, endpoint: str) -> str: start = time.perf_counter() try: response = await llm_client.messages.create( model=model, max_tokens=1024, messages=[{"role": "user", "content": prompt}] ) reply = response.content[0].text llm_tokens.labels(model=model, endpoint=endpoint, token_type="input").inc(response.usage.input_tokens) llm_tokens.labels(model=model, endpoint=endpoint, token_type="output").inc(response.usage.output_tokens) return reply finally: llm_latency.labels(model=model, endpoint=endpoint).observe(time.perf_counter() - start) # Expose metrics endpoint @app.get("/metrics") async def metrics(): return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
💡 Four AI-specific metrics to always instrument: (1) LLM token counts by model — directly maps to cost. (2) LLM call latency histogram — detect when the API is slow. (3) RAG retrieval scores — quality degradation over time. (4) Agent session count — detect runaway loops. Standard HTTP metrics (request rate, latency, error rate) come free from the Instrumentator.
Grafana Dashboard Setup
Visualise# Add Prometheus + Grafana to docker-compose.yml # prometheus service prometheus: image: prom/prometheus:latest ports: - "9090:9090" volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml # grafana service grafana: image: grafana/grafana:latest ports: - "3000:3000" environment: GF_SECURITY_ADMIN_PASSWORD: admin volumes: - grafana_data:/var/lib/grafana # prometheus.yml — scrape your FastAPI app global: scrape_interval: 15s scrape_configs: - job_name: "ai-api" static_configs: - targets: ["api:8000"] # Docker service name # Key Grafana panels for an AI API: # 1. Request rate: rate(http_requests_total[5m]) # 2. p95 latency: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) # 3. Error rate: rate(http_requests_total{status=~"5.."}[5m]) # 4. LLM tokens/hour: rate(llm_tokens_total[1h]) * 3600 # 5. LLM p95 latency: histogram_quantile(0.95, rate(llm_call_duration_seconds_bucket[5m]))
Production Log Pipeline
Queryable Logspip install structlog python-json-logger import structlog, logging, sys from pythonjsonlogger import jsonlogger # ── Production structlog configuration ──────────────── def configure_logging(environment: str = "production"): if environment == "development": # Human-readable console output for dev structlog.configure( processors=[ structlog.contextvars.merge_contextvars, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="%H:%M:%S"), structlog.dev.ConsoleRenderer(colors=True) ] ) else: # JSON output for production — queryable by log aggregators structlog.configure( processors=[ structlog.contextvars.merge_contextvars, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.format_exc_info, structlog.processors.JSONRenderer() ] ) logger = structlog.get_logger() # ── Request correlation — trace a request through all logs import uuid from starlette.middleware.base import BaseHTTPMiddleware class CorrelationMiddleware(BaseHTTPMiddleware): async def dispatch(self, request, call_next): # Use incoming X-Request-ID or generate new one request_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8]) user_id = getattr(request.state, "user", {}).get("user_id", "anon") # Bind to context — all logs in this request include these fields structlog.contextvars.bind_contextvars( request_id=request_id, user_id=user_id, path=request.url.path ) response = await call_next(request) structlog.contextvars.clear_contextvars() response.headers["X-Request-ID"] = request_id return response # All logs now include request_id and user_id automatically # Output: {"event":"llm_called","model":"claude...","request_id":"a3f7b2","user_id":"user_1",...} # Query in CloudWatch/Datadog/Loki: request_id="a3f7b2" → all logs for that request
Distributed Tracing with OpenTelemetry
End-to-Endpip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation-fastapi opentelemetry-exporter-otlp from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor # Setup tracing — sends to Jaeger or any OTLP-compatible backend provider = TracerProvider() provider.add_span_processor( BatchSpanProcessor(OTLPSpanExporter(endpoint="http://jaeger:4317")) ) trace.set_tracer_provider(provider) tracer = trace.get_tracer(__name__) # Auto-instrument FastAPI and all HTTPX calls (LLM API calls) FastAPIInstrumentor.instrument_app(app) HTTPXClientInstrumentor().instrument() # Every FastAPI request and every LLM API call now has a trace span # Manual spans for custom work async def rag_pipeline(question: str) -> dict: with tracer.start_as_current_span("rag.retrieve") as span: span.set_attribute("query.length", len(question)) chunks = await retrieve(question) span.set_attribute("chunks.count", len(chunks)) with tracer.start_as_current_span("rag.generate") as span: span.set_attribute("model", "claude-3-5-sonnet-20241022") answer = await generate(question, chunks) span.set_attribute("answer.length", len(answer)) return {"answer": answer, "chunks": chunks} # Trace view in Jaeger UI: # [GET /rag/ask 450ms] # └─ [rag.retrieve 120ms] chunks=5 # └─ [rag.generate 320ms] model=claude-3-5-sonnet # └─ [POST api.anthropic.com/v1/messages 310ms]
Alerting — Know Before Your Users Do
Proactive# Sentry for exception tracking pip install sentry-sdk[fastapi] import sentry_sdk from sentry_sdk.integrations.fastapi import FastApiIntegration from sentry_sdk.integrations.celery import CeleryIntegration sentry_sdk.init( dsn=os.environ["SENTRY_DSN"], environment=os.environ.get("ENVIRONMENT", "production"), integrations=[FastApiIntegration(), CeleryIntegration()], traces_sample_rate=0.1, # 10% of requests traced profiles_sample_rate=0.1, ) # Any unhandled exception now appears in Sentry with full context: # stack trace, request headers, user ID, environment, breadcrumbs # Add user context so Sentry shows which user triggered the error from sentry_sdk import set_user, set_extra async def call_with_sentry_context(user: dict, func, *args): set_user({"id": user["user_id"], "email": user.get("email")}) set_extra("request_tier", user.get("tier")) return await func(*args) # Prometheus alert rules (prometheus-alerts.yml) # Copy into Alertmanager for PagerDuty / Slack / email alerts ALERT_RULES = """ groups: - name: ai_api_alerts rules: - alert: HighErrorRate expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.05 for: 2m labels: severity: critical annotations: summary: "Error rate above 5% for 2 minutes" - alert: HighLLMLatency expr: histogram_quantile(0.95, rate(llm_call_duration_seconds_bucket[5m])) > 15 for: 3m labels: severity: warning annotations: summary: "LLM p95 latency above 15s" - alert: TokenCostSpike expr: rate(llm_tokens_total[1h]) * 3600 > 500000 for: 5m labels: severity: critical annotations: summary: "Token usage spike: >500k tokens/hour" """
Auth Patterns — OAuth2 and API Key Management
Security# ── Rotating API keys — never embed keys in clients ─── import secrets, hashlib, sqlite3 from datetime import datetime def generate_api_key() -> tuple[str, str]: """Returns (raw_key, hashed_key). Store only the hash.""" raw = f"sk-{secrets.token_urlsafe(32)}" hsh = hashlib.sha256(raw.encode()).hexdigest() return raw, hsh def create_user_api_key(user_id: str, name: str = "default") -> str: raw, hsh = generate_api_key() with sqlite3.connect("keys.db") as conn: conn.execute("""CREATE TABLE IF NOT EXISTS api_keys ( hash TEXT PRIMARY KEY, user_id TEXT, name TEXT, created_at TEXT, last_used TEXT, is_active INTEGER DEFAULT 1)""") conn.execute("INSERT INTO api_keys VALUES (?,?,?,?,?,1)", (hsh, user_id, name, datetime.utcnow().isoformat(), None)) return raw # show raw key to user ONCE — never store it async def validate_api_key(raw_key: str) -> dict | None: hsh = hashlib.sha256(raw_key.encode()).hexdigest() with sqlite3.connect("keys.db") as conn: row = conn.execute( "SELECT user_id, name FROM api_keys WHERE hash=? AND is_active=1", (hsh,)).fetchone() if row: conn.execute("UPDATE api_keys SET last_used=? WHERE hash=?", (datetime.utcnow().isoformat(), hsh)) return {"user_id": row[0], "key_name": row[1]} if row else None def revoke_api_key(hsh: str): with sqlite3.connect("keys.db") as conn: conn.execute("UPDATE api_keys SET is_active=0 WHERE hash=?", (hsh,)) # ── OAuth2 with Google (social login) ───────────────── pip install authlib httpx from authlib.integrations.starlette_client import OAuth from starlette.config import Config config = Config(".env") oauth = OAuth(config) oauth.register( name="google", server_metadata_url="https://accounts.google.com/.well-known/openid-configuration", client_id=config("GOOGLE_CLIENT_ID"), client_secret=config("GOOGLE_CLIENT_SECRET"), client_kwargs={"scope": "openid email profile"}, ) @router.get("/auth/google") async def google_login(request: Request): redirect_uri = request.url_for("google_callback") return await oauth.google.authorize_redirect(request, redirect_uri) @router.get("/auth/google/callback", name="google_callback") async def google_callback(request: Request): token = await oauth.google.authorize_access_token(request) user = token["userinfo"] api_key = create_user_api_key(user["sub"], name="google-oauth") return {"api_key": api_key, "email": user["email"]}
FREE LEARNING RESOURCES
| Type | Resource | Best For |
|---|---|---|
| Library | Prometheus FastAPI Instrumentator — github.com/trallnag | Zero-config HTTP metrics for FastAPI. Auto-instruments all routes. |
| Docs | OpenTelemetry Python — opentelemetry.io/docs | Complete Python OTel instrumentation guide for tracing and metrics. |
| Docs | Sentry FastAPI Integration — docs.sentry.io | Setting up Sentry error tracking with FastAPI and Celery. |
| Docs | Prometheus Alertmanager — prometheus.io/docs | Setting up alert rules and routing to Slack, PagerDuty, or email. |
Add the complete observability layer to your M24 containerised app.
Requirements
- Metrics — Prometheus scraping /metrics; custom counters for LLM tokens, latency histogram, active agents gauge
- Grafana — 5 panels: request rate, p95 latency, error rate, tokens/hour, LLM p95 latency
- Structured logs — JSON in production, request_id + user_id in every log line via CorrelationMiddleware
- Sentry — exception tracking with user context and environment tag
- Alerts — 3 Prometheus alert rules: high error rate, high LLM latency, token spike
- API keys — full DB-backed key management: create, validate, revoke, track last_used
Skills: Prometheus, Grafana, structlog, OpenTelemetry, Sentry, API key management
Metrics — Instrument and Dashboard
Objective: Instrument your API and build a Grafana dashboard that answers: is the system healthy right now?
Structured Logs — Trace a Request
Objective: Add correlation IDs and trace a single request through all your logs.
API Key Lifecycle
Objective: Build and test the complete API key management lifecycle.
P7-M25 MASTERY CHECKLIST
- Can add prometheus-fastapi-instrumentator for zero-config HTTP metrics
- Can define custom Prometheus metrics: Counter (tokens), Histogram (latency), Gauge (active sessions)
- Know the 4 AI-specific metrics to always instrument: token counts, LLM latency, RAG scores, agent sessions
- Can expose /metrics endpoint and configure Prometheus to scrape it
- Can add Prometheus + Grafana to docker-compose.yml
- Can write PromQL for: request rate, p95 latency, error rate, token usage
- Can configure structlog for JSON output in production and ColourConsoleRenderer in development
- Can implement CorrelationMiddleware that binds request_id + user_id to all log lines
- Know that all logs for a request should share the same request_id — enabling full request tracing
- Can set up OpenTelemetry auto-instrumentation for FastAPI and HTTPX
- Can add manual spans with tracer.start_as_current_span() and set_attribute()
- Can initialise Sentry with FastAPI and Celery integrations and set_user() context
- Know the 3 critical alert rules: high error rate (>5%), LLM p95 latency (>15s), token spike
- Can implement DB-backed API key management: generate (show once), hash-store, validate, revoke
- Know to store only SHA-256 hash of API keys — raw keys are never recoverable from the database
- Completed Lab 1: metrics instrumented, Grafana dashboard with 4 panels
- Completed Lab 2: structured logs with request tracing via correlation IDs
- Completed Lab 3: API key lifecycle including revocation and security verification
- Milestone project pushed to GitHub: full observability stack
✅ When complete: Move to P7-M26 — Prompt Versioning, Cost Monitoring & Caching. With observability in place, M26 covers the AI-specific production layer: managing prompt changes safely, tracking costs, and caching LLM calls.