Part 7 — Production & Deployment  ·  Module 27 of 27
MLOps Foundations
CI/CD for AI, data drift detection, model versioning, and the operational patterns for long-running AI products
⏱ 1 Week 🟡 Intermediate 🔧 GitHub Actions · DVC · MLflow · Evidently 📋 Prerequisite: P7-M26
🎯

What This Module Covers

Part 7 Complete

MLOps for AI engineers — not traditional ML practitioners. Your "models" are LLM APIs, your "training" is prompt engineering and RAG index updates, your "drift" is the distribution of user queries shifting away from your indexed documents. This module covers the operational disciplines that make AI products reliable over months and years.

  • CI/CD for AI — GitHub Actions pipeline: lint, test, eval, deploy gates
  • Data versioning with DVC — track document corpus versions alongside code
  • Experiment tracking with MLflow — logging prompt variants, eval scores, cost metrics
  • Data/query drift detection — detecting when user queries shift out of distribution
  • Deployment patterns — blue-green, canary, feature flags for AI apps
🔄

GitHub Actions CI/CD Pipeline for AI Apps

Automate
# .github/workflows/ai-ci.yml
name: AI App CI/CD

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

env:
  ANTHROPIC_API_KEY: $
  OPENAI_API_KEY: $

jobs:

  # ── 1. Fast checks (no LLM calls) ─────────────────────
  lint-and-type:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: {python-version: "3.12"}
      - run: pip install ruff mypy
      - run: ruff check app/
      - run: mypy app/ --ignore-missing-imports

  # ── 2. Unit + integration tests (mocked LLM) ─────────
  test:
    needs: lint-and-type
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: pip install -r requirements.txt pytest pytest-asyncio
      - run: pytest tests/unit/ tests/integration/ -v --tb=short

  # ── 3. Prompt regression tests (real LLM calls) ───────
  prompt-eval:
    needs: test
    runs-on: ubuntu-latest
    # Only run on PRs that touch prompts/ or src/
    if: |
      github.event_name == 'pull_request' &&
      contains(github.event.pull_request.changed_files, 'prompts/')
    steps:
      - uses: actions/checkout@v4
      - run: pip install -r requirements.txt pytest
      - name: Run prompt eval suite
        run: pytest tests/test_prompts.py -v --tb=long
      - name: Post eval results as PR comment
        uses: actions/github-script@v7
        with:
          script: |
            const body = `## Prompt Eval Results
✅ All grounding tests passed
✅ Faithfulness: 0.91 >= 0.85`
            github.rest.issues.createComment({...context.repo, issue_number: context.issue.number, body})

  # ── 4. Build and push Docker image ────────────────────
  build:
    needs: [test, prompt-eval]
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: docker/login-action@v3
        with:
          registry: ghcr.io
          username: $
          password: $
      - uses: docker/build-push-action@v5
        with:
          push: true
          tags: ghcr.io/$:$

  # ── 5. Deploy (blue-green, see Tab 5) ─────────────────
  deploy:
    needs: build
    runs-on: ubuntu-latest
    environment: production
    steps:
      - name: Deploy new image
        run: |
          ssh deploy@$SERVER "docker pull ghcr.io/$IMAGE:$SHA &&             docker service update --image ghcr.io/$IMAGE:$SHA ai_api"
📦

DVC — Version Your Document Corpus

Reproducibility

Your RAG index is data. When you add, remove, or update documents, the retrieval behaviour changes. DVC (Data Version Control) tracks your document corpus alongside your code so you can always reproduce any system state.

pip install dvc dvc-s3   # or dvc-gcs, dvc-azure

# Initialise DVC in your repo
git init && dvc init

# Add remote storage (S3, GCS, Azure, or local)
dvc remote add -d storage s3://my-bucket/dvc-store
# Or local for dev:
dvc remote add -d storage /tmp/dvc-store

# Track your document corpus
dvc add docs/corpus/          # creates docs/corpus.dvc (pointer file)
git add docs/corpus.dvc .gitignore
git commit -m "Add corpus v1: initial DPDK documentation"
dvc push                      # upload to remote storage

# Update the corpus
# ... add new PDF files to docs/corpus/ ...
dvc add docs/corpus/
git add docs/corpus.dvc
git commit -m "Update corpus v2: add VPP documentation"
dvc push

# On another machine or in CI: reproduce exact corpus version
git checkout "v1-tag"
dvc pull       # downloads the exact corpus for that commit

# DVC pipeline: define reproducible ingestion pipeline
# dvc.yaml
stages:
  ingest:
    cmd: python scripts/ingest.py --input docs/corpus/ --output chroma_db/
    deps:
      - scripts/ingest.py
      - docs/corpus/
    outs:
      - chroma_db/

# Run: dvc repro — reruns only stages where inputs changed
# dvc dag — visualise the pipeline

💡 The DVC pointer file (corpus.dvc) is tiny and goes in Git. The actual data goes in remote storage. This means your Git repo stays fast while your data is versioned and reproducible. Every commit of your code has a matching commit of your data — you can reproduce any system state from history.

📈

MLflow — Track Prompt Experiments

Compare Everything
pip install mlflow

import mlflow

# MLflow tracks: parameters, metrics, artifacts, tags
# For AI/LLM work: prompt versions, eval scores, cost metrics

mlflow.set_tracking_uri("http://localhost:5000")   # or file:./mlruns
mlflow.set_experiment("rag-prompt-iterations")

def evaluate_prompt_variant(prompt_name: str, prompt_version: int,
                             test_cases: list) -> dict:
    with mlflow.start_run(run_name=f"{prompt_name}-v{prompt_version}"):
        # Log parameters
        mlflow.log_param("prompt_name", prompt_name)
        mlflow.log_param("prompt_version", prompt_version)
        mlflow.log_param("model", "claude-3-5-sonnet-20241022")
        mlflow.log_param("n_test_cases", len(test_cases))

        # Run evaluation
        prompt_content = get_prompt_version(prompt_name, prompt_version)
        faithfulness_scores, relevancy_scores, costs = [], [], []

        for case in test_cases:
            result = rag_pipeline(case["question"], prompt_content)
            faith  = judge_faithfulness(result["context"], result["answer"])
            faithfulness_scores.append(faith.score)
            costs.append(result["cost_usd"])

        # Log metrics
        mlflow.log_metric("faithfulness_mean",  sum(faithfulness_scores)/len(faithfulness_scores))
        mlflow.log_metric("faithfulness_min",   min(faithfulness_scores))
        mlflow.log_metric("cost_per_query_usd", sum(costs)/len(costs))
        mlflow.log_metric("total_cost_usd",     sum(costs))

        # Log the prompt as an artifact
        with open("prompt.txt", "w") as f:
            f.write(prompt_content)
        mlflow.log_artifact("prompt.txt")

        return {"faithfulness": sum(faithfulness_scores)/len(faithfulness_scores),
                "cost": sum(costs)/len(costs)}

# Compare runs in MLflow UI
# mlflow ui --port 5000
# → parallel coordinates plot shows which params produce best faithfulness
# → compare v1, v2, v3 side-by-side on all metrics
📉

Query Drift Detection — When Users Stop Asking What You Indexed

Production Health

For RAG systems, "data drift" means user queries are shifting toward topics not covered by your indexed documents. Retrieval scores drop, but nothing crashes — users just get worse answers. You need to detect this proactively.

import numpy as np
from datetime import datetime, timedelta

# ── Track retrieval scores over time ──────────────────
# Log the top-1 similarity score for every query
# Degrading average = queries moving out of distribution

def log_retrieval_score(query: str, top_score: float, session_id: str):
    with sqlite3.connect("drift.db") as conn:
        conn.execute("""CREATE TABLE IF NOT EXISTS retrieval_log
            (ts TEXT, query TEXT, score REAL, session_id TEXT)""")
        conn.execute("INSERT INTO retrieval_log VALUES (?,?,?,?)",
                     (datetime.utcnow().isoformat(), query, top_score, session_id))

def check_retrieval_drift(window_days: int = 7, baseline_days: int = 30) -> dict:
    """Compare recent avg score vs baseline avg score."""
    now     = datetime.utcnow().isoformat()
    recent  = (datetime.utcnow() - timedelta(days=window_days)).isoformat()
    old     = (datetime.utcnow() - timedelta(days=baseline_days)).isoformat()

    with sqlite3.connect("drift.db") as conn:
        recent_avg = conn.execute(
            "SELECT AVG(score) FROM retrieval_log WHERE ts > ?", (recent,)).fetchone()[0]
        baseline_avg = conn.execute(
            "SELECT AVG(score) FROM retrieval_log WHERE ts BETWEEN ? AND ?",
            (old, recent)).fetchone()[0]

    if not baseline_avg:
        return {"status": "insufficient_data"}

    delta = recent_avg - baseline_avg
    return {
        "recent_avg_score":   round(recent_avg, 4),
        "baseline_avg_score": round(baseline_avg, 4),
        "delta":              round(delta, 4),
        "drifting":          delta < -0.05,   # >5% drop = significant drift
        "action":            "Re-index new documents" if delta < -0.05 else "Monitor"
    }

# ── Topic clustering — find what users are asking about ──
from sklearn.cluster import KMeans
from sklearn.preprocessing import normalize

def identify_drift_topics(n_clusters: int = 5) -> list[dict]:
    """Cluster low-scoring queries to find coverage gaps."""
    with sqlite3.connect("drift.db") as conn:
        rows = conn.execute(
            "SELECT query FROM retrieval_log WHERE score < 0.4 ORDER BY ts DESC LIMIT 500"
        ).fetchall()

    if len(rows) < 10:
        return []

    queries = [r[0] for r in rows]
    embeddings = embed_batch(queries)   # your embedding function
    X = normalize(np.array(embeddings))

    km = KMeans(n_clusters=n_clusters, random_state=42)
    km.fit(X)

    # Find representative query for each cluster
    clusters = []
    for cluster_id in range(n_clusters):
        mask = km.labels_ == cluster_id
        cluster_queries = [queries[i] for i, m in enumerate(mask) if m]
        clusters.append({
            "cluster_id":  cluster_id,
            "count":       len(cluster_queries),
            "sample_queries": cluster_queries[:3]
        })
    return sorted(clusters, key=lambda x: -x["count"])

💡 Query drift is how you know what to index next. When identify_drift_topics() shows 200 low-scoring queries clustering around "VPP DPDK integration", that's your signal to add VPP documentation to your corpus. Drift detection turns reactive support into proactive documentation improvement.

🚀

Safe Deployment Patterns

Zero Downtime
# ── Blue-Green Deployment ─────────────────────────────
# Run two identical environments. Switch traffic between them.
# Zero downtime. Instant rollback (switch traffic back).

# docker-compose.prod.yml style blue-green
# Blue = current live version, Green = new version
#
# 1. Deploy green alongside blue
# 2. Run health checks on green
# 3. Switch load balancer: 100% → green
# 4. Keep blue running for 5 min (easy rollback)
# 5. Tear down blue

# ── Canary Deployment ─────────────────────────────────
# Route small % of traffic to new version first
# Monitor metrics. If good → increase %. If bad → 0%.

import random

class CanaryRouter:
    def __init__(self, canary_pct: float = 0.05):  # 5% to new version
        self.canary_pct = canary_pct

    def route(self, request) -> str:
        # Sticky routing: same user always gets same version
        user_hash = hash(request.headers.get("X-User-ID", "")) % 100
        if user_hash < self.canary_pct * 100:
            return "green"   # canary version
        return "blue"       # stable version

# ── Feature flags — safest for AI changes ─────────────
# Toggle behaviour without deploying new code
# Perfect for A/B testing prompt variants in production

FEATURE_FLAGS = {
    "use_reranker":        True,
    "use_hyde":            False,
    "contextual_retrieval": False,
    "new_prompt_v3":       False,   # flip to True after testing
}

def is_enabled(flag: str, user_id: str = "", rollout_pct: float = 1.0) -> bool:
    """Check if feature flag is enabled. Supports percentage rollout."""
    if not FEATURE_FLAGS.get(flag):
        return False
    if rollout_pct < 1.0 and user_id:
        return (hash(user_id + flag) % 100) < (rollout_pct * 100)
    return True

# In endpoint:
# if is_enabled("use_reranker", user_id, rollout_pct=0.2):
#     results = retrieve_and_rerank(query)
# else:
#     results = basic_retrieve(query)

# ── Index hot-swap ────────────────────────────────────
# Update the vector DB index without downtime

class IndexSwapper:
    def __init__(self):
        self.active = "index_a"   # current live index
        self.staging = "index_b"  # being rebuilt

    def rebuild_in_background(self, new_documents: list):
        # Build into staging index while live index serves traffic
        pipeline = DocumentIngestionPipeline(
            config=IngestionConfig(collection_name=self.staging)
        )
        pipeline.ingest_directory(new_documents)
        self.swap()

    def swap(self):
        self.active, self.staging = self.staging, self.active
        print(f"Swapped to {self.active}")

FREE LEARNING RESOURCES

TypeResourceBest For
DocsDVC: Getting Started — dvc.org/doc/startVersion control for data and ML pipelines. Covers remote storage, pipelines, and experiments.
DocsMLflow Quickstart — mlflow.org/docs/latestExperiment tracking, model registry, and artifacts. Run mlflow ui to see the dashboard.
DocsEvidently AI: Getting Started — docs.evidentlyai.comData drift and ML monitoring. Good for query distribution monitoring.
CourseGitHub Actions Starter Workflows — github.com/actions/starter-workflowsProduction-ready GitHub Actions workflow templates for Python, Docker, and deployment.
🛠 Full MLOps Pipeline — CI/CD + Drift Detection + Feature Flags [Intermediate–Advanced] 4–5 days

Add the full MLOps layer to your production AI system.

Requirements

  • GitHub Actions — 5-stage pipeline: lint → test → prompt-eval → build → deploy; prompt-eval runs only when prompts/ changes
  • DVC — version your document corpus; create v1 and v2 with different documents; verify git checkout + dvc pull restores exact corpus
  • MLflow — track 3 prompt variants with faithfulness and cost metrics; compare in UI; identify winning variant
  • Drift detection — log retrieval scores for 7 days of simulated queries; compute drift; identify top 3 under-covered topics via clustering
  • Feature flags — implement is_enabled() for new_prompt_v3 and use_reranker; set up A/B test at 20% rollout

Skills: GitHub Actions, DVC, MLflow, KMeans clustering, canary deployment, feature flags

LAB 1

GitHub Actions — End-to-End CI Pipeline

Objective: Build and trigger a real CI pipeline that gates deployment on eval quality.

1
Create .github/workflows/ci.yml with 3 jobs: lint (ruff), test (pytest unit tests with mocked LLM), prompt-eval (pytest tests/test_prompts.py with real API calls). Make prompt-eval only trigger when prompts/ directory changes.
2
Push a deliberate bad prompt (remove grounding instructions). Observe prompt-eval job fail. Check the Actions UI — you should see which test failed and why.
3
Fix the prompt. Push again. Verify all 3 jobs pass. Merge is now allowed (in a real setup you'd add branch protection rules).
4
Add a 4th job: cost-check. It runs cost_report() and fails the pipeline if the average cost per query in the eval run exceeds $0.01. Verify this gate works by temporarily using an expensive model.
LAB 2

MLflow — Compare Prompt Variants

Objective: Use MLflow to make a data-driven decision between 3 prompt variants.

1
Create 3 prompt variants: V1 (current), V2 (more specific grounding instructions), V3 (adds output format requirements). Run evaluate_prompt_variant() for each on your 20-case test set.
2
Open MLflow UI (mlflow ui --port 5000). In the experiments view, compare all 3 runs on faithfulness_mean and cost_per_query_usd.
3
Use MLflow's "Compare Runs" feature to view a parallel coordinates plot. Which variant achieves the best faithfulness-to-cost ratio?
4
Register the winning prompt version in the MLflow Model Registry (even though it's a prompt, not a model). Tag it as "production". Document the decision in the run description.
LAB 3

Drift Detection — Simulate and Detect

Objective: Simulate query drift and verify your detection catches it.

1
Populate your drift.db with 100 simulated "baseline" queries from your indexed domain (high retrieval scores ~0.75–0.90). Set their timestamps 15 days ago.
2
Add 50 "recent" queries about an uncovered topic (e.g. VPP if you only indexed DPDK). These should get low retrieval scores (~0.20–0.40). Set their timestamps to the last 7 days.
3
Call check_retrieval_drift(). Verify it correctly identifies drift (delta < -0.05) and suggests "Re-index new documents".
4
Call identify_drift_topics(n_clusters=3). Verify the largest cluster corresponds to VPP queries. This is your indexing backlog: a prioritised list of topics to add to your corpus.

P7-M27 MASTERY CHECKLIST

Part 7 Complete! Move to Part 8 — Specialisation Tracks. You now have the full production engineering foundation. Part 8 lets you go deep in one of four AI engineering specialisations.

🎉 Part 7 — Production & Deployment Complete!

You can now ship, operate, and evolve production-grade AI systems.

Structure FastAPI apps for production with DI and middleware
Containerise with multi-stage Docker + Docker Compose
Run background jobs with Celery + Redis, with job status polling
Instrument with Prometheus, Grafana, structlog, and Sentry
Version and test prompts before deployment with regression gates
Monitor costs per user/model/endpoint and optimise with caching
Detect query drift and maintain index coverage over time
Deploy safely with blue-green, canary, and feature flags
← P7-M26: Prompt Versioning 🗺️ All Modules Next: Part 8 — Specialisation →