Part 7 — Production & Deployment  ·  Module 25 of 27
Auth, Logging & Observability
See inside your running AI system — metrics, traces, alerts, and proper auth
⏱ 1 Week 🟡 Intermediate 🔧 Prometheus · Grafana · structlog · Sentry 📋 Prerequisite: P7-M24
🎯

What This Module Covers

See Everything

A deployed AI system you can't see inside is a liability. When the LLM API starts returning 500 errors at 2am, or a user complains the chatbot is slow, you need instrumentation to diagnose it within seconds — not hours. This module covers the full observability stack.

  • Metrics with Prometheus — request rates, latency histograms, LLM token counters, error rates
  • Dashboards with Grafana — visualising metrics, setting up standard AI API panels
  • Structured logging — production log pipeline, correlation IDs, log aggregation
  • Distributed tracing — tracing requests across API + worker + LLM calls
  • Error tracking with Sentry — capturing exceptions with context in production
  • Alerting — alert rules for error rate, p95 latency, and LLM cost spikes
📊

Prometheus Metrics for AI APIs

Measure Everything
pip install prometheus-client prometheus-fastapi-instrumentator

from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
from prometheus_fastapi_instrumentator import Instrumentator
from fastapi import FastAPI, Response

# ── Standard HTTP metrics (auto-instrumented) ─────────
Instrumentator().instrument(app).expose(app)
# Provides: http_requests_total, http_request_duration_seconds

# ── Custom AI-specific metrics ────────────────────────
# Token usage counter — track cost per model per endpoint
llm_tokens = Counter(
    "llm_tokens_total",
    "Total LLM tokens used",
    labelnames=["model", "endpoint", "token_type"]   # input | output
)

# LLM call latency histogram
llm_latency = Histogram(
    "llm_call_duration_seconds",
    "LLM API call duration",
    labelnames=["model", "endpoint"],
    buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)

# RAG retrieval quality gauge
rag_avg_score = Gauge(
    "rag_retrieval_score_avg",
    "Rolling average RAG retrieval similarity score"
)

# Active agent sessions
active_agents = Gauge(
    "agent_sessions_active",
    "Number of currently running agent sessions"
)

# Usage example inside an endpoint
import time

async def call_llm_instrumented(prompt: str, model: str, endpoint: str) -> str:
    start = time.perf_counter()
    try:
        response = await llm_client.messages.create(
            model=model, max_tokens=1024,
            messages=[{"role": "user", "content": prompt}]
        )
        reply = response.content[0].text
        llm_tokens.labels(model=model, endpoint=endpoint, token_type="input").inc(response.usage.input_tokens)
        llm_tokens.labels(model=model, endpoint=endpoint, token_type="output").inc(response.usage.output_tokens)
        return reply
    finally:
        llm_latency.labels(model=model, endpoint=endpoint).observe(time.perf_counter() - start)

# Expose metrics endpoint
@app.get("/metrics")
async def metrics():
    return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)

💡 Four AI-specific metrics to always instrument: (1) LLM token counts by model — directly maps to cost. (2) LLM call latency histogram — detect when the API is slow. (3) RAG retrieval scores — quality degradation over time. (4) Agent session count — detect runaway loops. Standard HTTP metrics (request rate, latency, error rate) come free from the Instrumentator.

📈

Grafana Dashboard Setup

Visualise
# Add Prometheus + Grafana to docker-compose.yml

# prometheus service
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml

# grafana service
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      GF_SECURITY_ADMIN_PASSWORD: admin
    volumes:
      - grafana_data:/var/lib/grafana

# prometheus.yml — scrape your FastAPI app
global:
  scrape_interval: 15s
scrape_configs:
  - job_name: "ai-api"
    static_configs:
      - targets: ["api:8000"]   # Docker service name

# Key Grafana panels for an AI API:
# 1. Request rate: rate(http_requests_total[5m])
# 2. p95 latency: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))
# 3. Error rate: rate(http_requests_total{status=~"5.."}[5m])
# 4. LLM tokens/hour: rate(llm_tokens_total[1h]) * 3600
# 5. LLM p95 latency: histogram_quantile(0.95, rate(llm_call_duration_seconds_bucket[5m]))
📋

Production Log Pipeline

Queryable Logs
pip install structlog python-json-logger

import structlog, logging, sys
from pythonjsonlogger import jsonlogger

# ── Production structlog configuration ────────────────
def configure_logging(environment: str = "production"):
    if environment == "development":
        # Human-readable console output for dev
        structlog.configure(
            processors=[
                structlog.contextvars.merge_contextvars,
                structlog.processors.add_log_level,
                structlog.processors.TimeStamper(fmt="%H:%M:%S"),
                structlog.dev.ConsoleRenderer(colors=True)
            ]
        )
    else:
        # JSON output for production — queryable by log aggregators
        structlog.configure(
            processors=[
                structlog.contextvars.merge_contextvars,
                structlog.processors.add_log_level,
                structlog.processors.TimeStamper(fmt="iso"),
                structlog.processors.format_exc_info,
                structlog.processors.JSONRenderer()
            ]
        )

logger = structlog.get_logger()

# ── Request correlation — trace a request through all logs
import uuid
from starlette.middleware.base import BaseHTTPMiddleware

class CorrelationMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        # Use incoming X-Request-ID or generate new one
        request_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8])
        user_id    = getattr(request.state, "user", {}).get("user_id", "anon")

        # Bind to context — all logs in this request include these fields
        structlog.contextvars.bind_contextvars(
            request_id=request_id,
            user_id=user_id,
            path=request.url.path
        )
        response = await call_next(request)
        structlog.contextvars.clear_contextvars()
        response.headers["X-Request-ID"] = request_id
        return response

# All logs now include request_id and user_id automatically
# Output: {"event":"llm_called","model":"claude...","request_id":"a3f7b2","user_id":"user_1",...}
# Query in CloudWatch/Datadog/Loki: request_id="a3f7b2" → all logs for that request
🔍

Distributed Tracing with OpenTelemetry

End-to-End
pip install opentelemetry-api opentelemetry-sdk             opentelemetry-instrumentation-fastapi             opentelemetry-exporter-otlp

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

# Setup tracing — sends to Jaeger or any OTLP-compatible backend
provider = TracerProvider()
provider.add_span_processor(
    BatchSpanProcessor(OTLPSpanExporter(endpoint="http://jaeger:4317"))
)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)

# Auto-instrument FastAPI and all HTTPX calls (LLM API calls)
FastAPIInstrumentor.instrument_app(app)
HTTPXClientInstrumentor().instrument()
# Every FastAPI request and every LLM API call now has a trace span

# Manual spans for custom work
async def rag_pipeline(question: str) -> dict:
    with tracer.start_as_current_span("rag.retrieve") as span:
        span.set_attribute("query.length", len(question))
        chunks = await retrieve(question)
        span.set_attribute("chunks.count", len(chunks))

    with tracer.start_as_current_span("rag.generate") as span:
        span.set_attribute("model", "claude-3-5-sonnet-20241022")
        answer = await generate(question, chunks)
        span.set_attribute("answer.length", len(answer))

    return {"answer": answer, "chunks": chunks}

# Trace view in Jaeger UI:
# [GET /rag/ask 450ms]
#   └─ [rag.retrieve 120ms] chunks=5
#   └─ [rag.generate 320ms] model=claude-3-5-sonnet
#        └─ [POST api.anthropic.com/v1/messages 310ms]
🚨

Alerting — Know Before Your Users Do

Proactive
# Sentry for exception tracking
pip install sentry-sdk[fastapi]

import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.celery import CeleryIntegration

sentry_sdk.init(
    dsn=os.environ["SENTRY_DSN"],
    environment=os.environ.get("ENVIRONMENT", "production"),
    integrations=[FastApiIntegration(), CeleryIntegration()],
    traces_sample_rate=0.1,   # 10% of requests traced
    profiles_sample_rate=0.1,
)
# Any unhandled exception now appears in Sentry with full context:
# stack trace, request headers, user ID, environment, breadcrumbs

# Add user context so Sentry shows which user triggered the error
from sentry_sdk import set_user, set_extra

async def call_with_sentry_context(user: dict, func, *args):
    set_user({"id": user["user_id"], "email": user.get("email")})
    set_extra("request_tier", user.get("tier"))
    return await func(*args)

# Prometheus alert rules (prometheus-alerts.yml)
# Copy into Alertmanager for PagerDuty / Slack / email alerts

ALERT_RULES = """
groups:
  - name: ai_api_alerts
    rules:
      - alert: HighErrorRate
        expr: rate(http_requests_total{status=~"5.."}[5m]) > 0.05
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Error rate above 5% for 2 minutes"

      - alert: HighLLMLatency
        expr: histogram_quantile(0.95, rate(llm_call_duration_seconds_bucket[5m])) > 15
        for: 3m
        labels:
          severity: warning
        annotations:
          summary: "LLM p95 latency above 15s"

      - alert: TokenCostSpike
        expr: rate(llm_tokens_total[1h]) * 3600 > 500000
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Token usage spike: >500k tokens/hour"
"""
🔐

Auth Patterns — OAuth2 and API Key Management

Security
# ── Rotating API keys — never embed keys in clients ───
import secrets, hashlib, sqlite3
from datetime import datetime

def generate_api_key() -> tuple[str, str]:
    """Returns (raw_key, hashed_key). Store only the hash."""
    raw = f"sk-{secrets.token_urlsafe(32)}"
    hsh = hashlib.sha256(raw.encode()).hexdigest()
    return raw, hsh

def create_user_api_key(user_id: str, name: str = "default") -> str:
    raw, hsh = generate_api_key()
    with sqlite3.connect("keys.db") as conn:
        conn.execute("""CREATE TABLE IF NOT EXISTS api_keys (
            hash TEXT PRIMARY KEY, user_id TEXT, name TEXT,
            created_at TEXT, last_used TEXT, is_active INTEGER DEFAULT 1)""")
        conn.execute("INSERT INTO api_keys VALUES (?,?,?,?,?,1)",
                     (hsh, user_id, name, datetime.utcnow().isoformat(), None))
    return raw   # show raw key to user ONCE — never store it

async def validate_api_key(raw_key: str) -> dict | None:
    hsh = hashlib.sha256(raw_key.encode()).hexdigest()
    with sqlite3.connect("keys.db") as conn:
        row = conn.execute(
            "SELECT user_id, name FROM api_keys WHERE hash=? AND is_active=1",
            (hsh,)).fetchone()
        if row:
            conn.execute("UPDATE api_keys SET last_used=? WHERE hash=?",
                         (datetime.utcnow().isoformat(), hsh))
    return {"user_id": row[0], "key_name": row[1]} if row else None

def revoke_api_key(hsh: str):
    with sqlite3.connect("keys.db") as conn:
        conn.execute("UPDATE api_keys SET is_active=0 WHERE hash=?", (hsh,))

# ── OAuth2 with Google (social login) ─────────────────
pip install authlib httpx

from authlib.integrations.starlette_client import OAuth
from starlette.config import Config

config = Config(".env")
oauth = OAuth(config)
oauth.register(
    name="google",
    server_metadata_url="https://accounts.google.com/.well-known/openid-configuration",
    client_id=config("GOOGLE_CLIENT_ID"),
    client_secret=config("GOOGLE_CLIENT_SECRET"),
    client_kwargs={"scope": "openid email profile"},
)

@router.get("/auth/google")
async def google_login(request: Request):
    redirect_uri = request.url_for("google_callback")
    return await oauth.google.authorize_redirect(request, redirect_uri)

@router.get("/auth/google/callback", name="google_callback")
async def google_callback(request: Request):
    token   = await oauth.google.authorize_access_token(request)
    user    = token["userinfo"]
    api_key = create_user_api_key(user["sub"], name="google-oauth")
    return {"api_key": api_key, "email": user["email"]}

FREE LEARNING RESOURCES

TypeResourceBest For
LibraryPrometheus FastAPI Instrumentator — github.com/trallnagZero-config HTTP metrics for FastAPI. Auto-instruments all routes.
DocsOpenTelemetry Python — opentelemetry.io/docsComplete Python OTel instrumentation guide for tracing and metrics.
DocsSentry FastAPI Integration — docs.sentry.ioSetting up Sentry error tracking with FastAPI and Celery.
DocsPrometheus Alertmanager — prometheus.io/docsSetting up alert rules and routing to Slack, PagerDuty, or email.
🛠 Full Observability Stack for Your AI API [Intermediate] 3–4 days

Add the complete observability layer to your M24 containerised app.

Requirements

  • Metrics — Prometheus scraping /metrics; custom counters for LLM tokens, latency histogram, active agents gauge
  • Grafana — 5 panels: request rate, p95 latency, error rate, tokens/hour, LLM p95 latency
  • Structured logs — JSON in production, request_id + user_id in every log line via CorrelationMiddleware
  • Sentry — exception tracking with user context and environment tag
  • Alerts — 3 Prometheus alert rules: high error rate, high LLM latency, token spike
  • API keys — full DB-backed key management: create, validate, revoke, track last_used

Skills: Prometheus, Grafana, structlog, OpenTelemetry, Sentry, API key management

LAB 1

Metrics — Instrument and Dashboard

Objective: Instrument your API and build a Grafana dashboard that answers: is the system healthy right now?

1
Add prometheus-fastapi-instrumentator. Start your app + Prometheus in Docker Compose. Verify /metrics returns data.
2
Add 3 custom metrics: llm_tokens_total (Counter), llm_call_duration_seconds (Histogram), active_agent_sessions (Gauge). Instrument your LLM calls to update them.
3
Open Grafana (localhost:3000). Add Prometheus as data source. Create a dashboard with 4 panels: request rate, error rate, p95 HTTP latency, LLM tokens/minute.
4
Generate load: send 100 requests via a simple script. Watch your dashboard update in real-time. Deliberately trigger some 500 errors and watch the error rate panel spike.
LAB 2

Structured Logs — Trace a Request

Objective: Add correlation IDs and trace a single request through all your logs.

1
Add CorrelationMiddleware that binds request_id and user_id to structlog context. Send a request and verify the JSON log includes both fields.
2
Add log statements at 3 levels: middleware (request received), service (LLM called), endpoint (response sent). Verify all 3 logs share the same request_id.
3
Parse your JSONL log file with Python: group all log lines by request_id. For one specific request, print the full trace — every log event in order.
4
Find all requests with status=500 in your logs. Extract their request_ids. For each, reconstruct what happened leading up to the error using correlated logs.
LAB 3

API Key Lifecycle

Objective: Build and test the complete API key management lifecycle.

1
Implement create_user_api_key(), validate_api_key(), revoke_api_key(). Create 2 keys for the same user.
2
Make 5 API calls with key 1. Query the database and verify last_used updated correctly for key 1 but not key 2.
3
Revoke key 1. Verify subsequent API calls with key 1 return 403. Verify key 2 still works.
4
Try to reconstruct the raw key from the database hash. Verify it's impossible — only SHA-256 is stored. This is the key security property: even database access doesn't expose user keys.

P7-M25 MASTERY CHECKLIST

When complete: Move to P7-M26 — Prompt Versioning, Cost Monitoring & Caching. With observability in place, M26 covers the AI-specific production layer: managing prompt changes safely, tracking costs, and caching LLM calls.

← P7-M24: Docker & Jobs 🗺️ All Modules Next: P7-M26 — Prompt Versioning →