Part 7 — Production & Deployment  ·  Module 24 of 27
Docker & Background Jobs
Containerise your AI app, run background workers, and orchestrate with Docker Compose
⏱ 1 Week 🟡 Intermediate 🔧 Docker · Celery · Redis · Docker Compose 📋 Prerequisite: P7-M23
🎯

What This Module Covers

Container + Queue

Two infrastructure skills that every production AI app needs: Docker to make your app portable and reproducible, and background job queues to handle long-running AI tasks without making users wait.

  • Dockerfile — production multi-stage build for a FastAPI + AI app
  • Docker Compose — orchestrating API + worker + Redis + vector DB as a local stack
  • Background jobs — when to offload to a queue vs handle in-request
  • Celery — the standard Python task queue, with Redis as broker
  • Job status API — polling endpoint so clients can track async job progress
  • Retry and error handling — failed task retries, dead letter queues
🐳

Production Dockerfile — Multi-Stage Build

Container
# Dockerfile — production multi-stage build

# ── Stage 1: dependency builder ───────────────────────
FROM python:3.12-slim AS builder

WORKDIR /build
COPY requirements.txt .

# Install dependencies into /install — separate from app code
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt

# ── Stage 2: production image ─────────────────────────
FROM python:3.12-slim AS production

# Create non-root user — never run as root in production
RUN useradd --create-home --shell /bin/bash appuser

WORKDIR /app

# Copy installed packages from builder stage
COPY --from=builder /install /usr/local

# Copy only app code — not tests, docs, or dev files
COPY app/ ./app/
COPY --chown=appuser:appuser . .

# Switch to non-root user
USER appuser

# Expose port
EXPOSE 8000

# Health check — Docker monitors this
HEALTHCHECK --interval=30s --timeout=10s --start-period=15s --retries=3   CMD python -c "import httpx; httpx.get('http://localhost:8000/admin/health').raise_for_status()"

# Production command: gunicorn managing uvicorn workers
CMD ["gunicorn", "app.main:app",
     "--worker-class", "uvicorn.workers.UvicornWorker",
     "--workers", "4",
     "--bind", "0.0.0.0:8000",
     "--timeout", "120",
     "--graceful-timeout", "30"]
# .dockerignore — keep image small
__pycache__/
*.pyc
*.pyo
.env
.env.*
.git/
.pytest_cache/
tests/
*.md
chroma_db/         # mount as volume, not baked in
*.log
# Build and run
docker build -t ai-api:latest .
docker run -p 8000:8000   --env-file .env   -v $(pwd)/chroma_db:/app/chroma_db   ai-api:latest

# Inspect image layers (find what's making it large)
docker history ai-api:latest
# Or use dive: https://github.com/wagoodman/dive

💡 Multi-stage builds keep production images small. The builder stage installs all build tools and dependencies. The production stage copies only the compiled packages — no pip, no build-essentials, no compiler. A typical FastAPI app goes from ~800MB (single stage) to ~200MB (multi-stage) with this pattern.

🗂

Docker Compose — Local Production Stack

Orchestration
# docker-compose.yml — complete AI app stack
version: "3.9"

services:

  # ── FastAPI app ──────────────────────────────────────
  api:
    build: .
    ports:
      - "8000:8000"
    env_file: .env
    environment:
      REDIS_URL: redis://redis:6379
    volumes:
      - chroma_data:/app/chroma_db
    depends_on:
      redis:
        condition: service_healthy
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "python", "-c", "import httpx; httpx.get('http://localhost:8000/admin/health').raise_for_status()"]
      interval: 30s
      timeout: 10s
      retries: 3

  # ── Celery worker ────────────────────────────────────
  worker:
    build: .
    command: celery -A app.worker.celery_app worker --loglevel=info --concurrency=4
    env_file: .env
    environment:
      REDIS_URL: redis://redis:6379
    volumes:
      - chroma_data:/app/chroma_db
    depends_on:
      - redis
    restart: unless-stopped

  # ── Celery Beat (scheduled tasks) ────────────────────
  beat:
    build: .
    command: celery -A app.worker.celery_app beat --loglevel=info
    env_file: .env
    environment:
      REDIS_URL: redis://redis:6379
    depends_on:
      - redis
    restart: unless-stopped

  # ── Redis (message broker + result backend) ──────────
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes   # persist to disk
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5
    restart: unless-stopped

volumes:
  chroma_data:
  redis_data:

# Commands
# docker compose up --build -d    # start all services detached
# docker compose logs -f api      # follow api logs
# docker compose ps               # show service status
# docker compose down -v          # stop and remove volumes
🔄

When to Use Background Jobs

Architecture Decision

Not all AI work belongs in the request-response cycle. Background jobs handle long-running, expensive, or retry-able work without blocking the API.

Handle In-RequestOffload to Queue
Simple Q&A (< 5s)Document ingestion (minutes)
Single-turn RAG queryBatch embedding 10k documents
Streaming chat responseRunning evaluation harness
Short agent task (< 30s)Long research agent (5+ min)
Classification / routingReport generation, exports
# The async job pattern
#
# 1. Client POSTs request → API returns job_id immediately (202 Accepted)
# 2. Worker processes job in background
# 3. Client polls GET /jobs/{job_id} → {"status": "pending" | "running" | "done" | "failed"}
# 4. When done, result available in job response
#
# Alternative: webhooks (POST to client URL when done)
# Alternative: SSE endpoint client subscribes to for job updates
🌿

Celery — Distributed Task Queue

Standard
pip install celery redis

# app/worker.py — Celery app and task definitions
from celery import Celery
from celery.utils.log import get_task_logger
import anthropic, os

logger = get_task_logger(__name__)

# Celery app — Redis as broker AND result backend
celery_app = Celery(
    "ai_tasks",
    broker=os.environ.get("REDIS_URL", "redis://localhost:6379/0"),
    backend=os.environ.get("REDIS_URL", "redis://localhost:6379/0"),
)
celery_app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    result_expires=3600,        # results expire after 1 hour
    task_soft_time_limit=300,   # raise SoftTimeLimitExceeded after 5 min
    task_time_limit=360,        # hard kill after 6 min
    worker_max_tasks_per_child=50  # restart worker after 50 tasks (memory leak prevention)
)

# ── Task: ingest documents ────────────────────────────
@celery_app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,      # retry after 60s
    name="tasks.ingest_documents"
)
def ingest_documents(self, document_paths: list[str], collection: str) -> dict:
    logger.info(f"Ingesting {len(document_paths)} documents into {collection}")
    try:
        pipeline = DocumentIngestionPipeline(config=IngestionConfig(collection_name=collection))
        results  = pipeline.ingest_directory_files(document_paths)
        return {"status": "success", "chunks_added": results["chunks"], "files": results["files"]}
    except Exception as exc:
        logger.error(f"Ingestion failed: {exc}")
        raise self.retry(exc=exc, countdown=60)   # retry with 60s delay

# ── Task: run research agent ──────────────────────────
@celery_app.task(
    bind=True,
    max_retries=2,
    name="tasks.run_agent"
)
def run_agent_task(self, goal: str, session_id: str) -> dict:
    try:
        result = guarded_agent(goal)
        return result
    except Exception as exc:
        raise self.retry(exc=exc, countdown=30)

# ── Scheduled tasks (beat) ────────────────────────────
from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    "daily-index-cleanup": {
        "task": "tasks.cleanup_stale_documents",
        "schedule": crontab(hour=2, minute=0),   # 2am daily
    },
    "hourly-cache-warm": {
        "task": "tasks.warm_embedding_cache",
        "schedule": crontab(minute=0),           # every hour
    },
}
📬

Job Status API — Async Job Pattern

Production Pattern
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult
from typing import Any, Optional
import uuid

router = APIRouter(prefix="/jobs", tags=["jobs"])

class JobSubmitResponse(BaseModel):
    job_id:  str
    status:  str = "queued"
    message: str

class JobStatusResponse(BaseModel):
    job_id:   str
    status:   str       # queued | started | success | failure | revoked
    progress: Optional[float] = None   # 0.0 – 1.0
    result:   Optional[Any]  = None    # populated when status=success
    error:    Optional[str]  = None    # populated when status=failure

# ── Submit: returns job_id immediately ────────────────
class IngestRequest(BaseModel):
    document_paths: list[str]
    collection:     str = "default"

@router.post("/ingest", status_code=202, response_model=JobSubmitResponse)
async def submit_ingestion(request: IngestRequest):
    task = ingest_documents.delay(
        document_paths=request.document_paths,
        collection=request.collection
    )
    return JobSubmitResponse(
        job_id=task.id,
        status="queued",
        message=f"Ingestion job queued. Poll /jobs/{task.id} for status."
    )

# ── Poll: check job status ────────────────────────────
@router.get("/{job_id}", response_model=JobStatusResponse)
async def get_job_status(job_id: str):
    result = AsyncResult(job_id, app=celery_app)

    match result.state:
        case "PENDING":
            return JobStatusResponse(job_id=job_id, status="queued")
        case "STARTED":
            meta = result.info or {}
            return JobStatusResponse(job_id=job_id, status="running",
                                     progress=meta.get("progress"))
        case "SUCCESS":
            return JobStatusResponse(job_id=job_id, status="success",
                                     result=result.result)
        case "FAILURE":
            return JobStatusResponse(job_id=job_id, status="failed",
                                     error=str(result.info))
        case _:
            return JobStatusResponse(job_id=job_id, status=result.state.lower())

# ── Cancel a job ──────────────────────────────────────
@router.delete("/{job_id}")
async def cancel_job(job_id: str):
    celery_app.control.revoke(job_id, terminate=True)
    return {"message": f"Job {job_id} cancelled"}

# ── Report progress from inside a task ────────────────
@celery_app.task(bind=True)
def batch_embed_task(self, texts: list[str]) -> dict:
    total = len(texts)
    for i, text in enumerate(texts):
        embed_and_store(text)
        # Update progress — visible in /jobs/{id}
        self.update_state(
            state="STARTED",
            meta={"progress": (i + 1) / total, "current": i + 1, "total": total}
        )
    return {"embedded": total}

FREE LEARNING RESOURCES

TypeResourceBest For
DocsDocker: Multi-Stage Builds — docs.docker.comOfficial guide on multi-stage builds for smaller, secure production images.
DocsCelery: First Steps — docs.celeryq.devOfficial Celery quickstart. Covers tasks, workers, beat scheduler, and result backends.
DocsDocker Compose Documentation — docs.docker.com/composeComplete Docker Compose reference including healthchecks, depends_on, and volumes.
ArticleFastAPI + Celery Tutorial — testdriven.ioEnd-to-end tutorial combining FastAPI with Celery and Redis. Includes Docker Compose setup.
🛠 Containerised AI Stack with Async Document Ingestion [Intermediate] 3–4 days

Containerise your M23 FastAPI app and add an async document ingestion pipeline using Celery.

Requirements

  • Dockerfile — multi-stage build, non-root user, HEALTHCHECK, gunicorn CMD
  • docker-compose.yml — api, worker, beat, redis services with healthchecks and volumes
  • Celery tasks — ingest_documents task with retry logic and progress reporting
  • Job API — POST /jobs/ingest (202), GET /jobs/{id}, DELETE /jobs/{id}
  • Progress — task updates state with progress 0.0–1.0; client polls /jobs/{id}
  • Scheduled task — daily cleanup of stale embeddings via Celery Beat

Test It

  • docker compose up, submit 50-document ingestion job, poll until complete
  • Kill the worker mid-job. Restart it. Verify the job retries and completes.
  • Verify docker compose ps shows all services healthy

Skills: Multi-stage Docker, Docker Compose healthchecks, Celery tasks + retries + progress, job status polling API

LAB 1

Dockerfile — Build and Inspect

Objective: Build a production Docker image and understand what's inside it.

1
Write a single-stage Dockerfile for your FastAPI app. Build it: docker build -t ai-api:single .. Check the size: docker image ls ai-api:single.
2
Rewrite as a multi-stage build. Build: docker build -t ai-api:multi .. Compare sizes. The multi-stage version should be 30–60% smaller.
3
Add the non-root user (RUN useradd + USER appuser). Verify: docker run ai-api:multi whoami → prints "appuser" not "root".
4
Add .dockerignore. Rebuild and verify chroma_db/, .git/, and __pycache__/ are not in the image: docker run ai-api:multi ls -la.
5
Trigger the HEALTHCHECK: start the container without the app running (override CMD). Verify docker ps shows "unhealthy" after 3 failed checks.
LAB 2

Docker Compose Stack

Objective: Bring up the full multi-service stack and verify all services communicate.

1
Write docker-compose.yml with api, worker, redis. Run: docker compose up --build -d. Check all services: docker compose ps.
2
Verify service startup order: stop redis (docker compose stop redis). Does the api service fail to start? Verify depends_on with condition: service_healthy works.
3
Test volume persistence: ingest some documents. Stop and remove containers (docker compose down — NOT -v). Restart. Verify documents are still in ChromaDB.
4
Simulate a worker crash: docker compose kill worker. Submit an ingestion job via the API. Restart the worker: docker compose start worker. Verify the job eventually completes.
LAB 3

Celery Task — Retry and Progress

Objective: Build and test a Celery task with retry logic and progress reporting.

1
Write a batch_embed_task that embeds 20 texts. Report progress (0.0–1.0) after each. Poll the job status endpoint every second and print the progress.
2
Add deliberate failure on the 3rd attempt (raise Exception if self.request.retries < 2). Submit the task. Observe it fails twice then succeeds on retry 3. Check /jobs/{id} during each retry.
3
Test task time limit: add a deliberate time.sleep(400) in your task (beyond the 300s soft limit). Verify Celery raises SoftTimeLimitExceeded and the job shows as "failed" with a timeout error.
4
Add Celery Beat with a task that runs every minute (for testing). Verify it fires on schedule: docker compose logs beat should show it enqueuing each minute.

P7-M24 MASTERY CHECKLIST

When complete: Move to P7-M25 — Auth, Logging & Observability. Your app is containerised and has async jobs. M25 covers what you need to see inside a running production system.

← P7-M23: FastAPI Production 🗺️ All Modules Next: P7-M25 — Auth, Logging & Observability →