What This Module Covers
Container + QueueTwo infrastructure skills that every production AI app needs: Docker to make your app portable and reproducible, and background job queues to handle long-running AI tasks without making users wait.
- Dockerfile — production multi-stage build for a FastAPI + AI app
- Docker Compose — orchestrating API + worker + Redis + vector DB as a local stack
- Background jobs — when to offload to a queue vs handle in-request
- Celery — the standard Python task queue, with Redis as broker
- Job status API — polling endpoint so clients can track async job progress
- Retry and error handling — failed task retries, dead letter queues
Production Dockerfile — Multi-Stage Build
Container# Dockerfile — production multi-stage build # ── Stage 1: dependency builder ─────────────────────── FROM python:3.12-slim AS builder WORKDIR /build COPY requirements.txt . # Install dependencies into /install — separate from app code RUN pip install --no-cache-dir --prefix=/install -r requirements.txt # ── Stage 2: production image ───────────────────────── FROM python:3.12-slim AS production # Create non-root user — never run as root in production RUN useradd --create-home --shell /bin/bash appuser WORKDIR /app # Copy installed packages from builder stage COPY --from=builder /install /usr/local # Copy only app code — not tests, docs, or dev files COPY app/ ./app/ COPY --chown=appuser:appuser . . # Switch to non-root user USER appuser # Expose port EXPOSE 8000 # Health check — Docker monitors this HEALTHCHECK --interval=30s --timeout=10s --start-period=15s --retries=3 CMD python -c "import httpx; httpx.get('http://localhost:8000/admin/health').raise_for_status()" # Production command: gunicorn managing uvicorn workers CMD ["gunicorn", "app.main:app", "--worker-class", "uvicorn.workers.UvicornWorker", "--workers", "4", "--bind", "0.0.0.0:8000", "--timeout", "120", "--graceful-timeout", "30"]
# .dockerignore — keep image small __pycache__/ *.pyc *.pyo .env .env.* .git/ .pytest_cache/ tests/ *.md chroma_db/ # mount as volume, not baked in *.log
# Build and run docker build -t ai-api:latest . docker run -p 8000:8000 --env-file .env -v $(pwd)/chroma_db:/app/chroma_db ai-api:latest # Inspect image layers (find what's making it large) docker history ai-api:latest # Or use dive: https://github.com/wagoodman/dive
💡 Multi-stage builds keep production images small. The builder stage installs all build tools and dependencies. The production stage copies only the compiled packages — no pip, no build-essentials, no compiler. A typical FastAPI app goes from ~800MB (single stage) to ~200MB (multi-stage) with this pattern.
Docker Compose — Local Production Stack
Orchestration# docker-compose.yml — complete AI app stack version: "3.9" services: # ── FastAPI app ────────────────────────────────────── api: build: . ports: - "8000:8000" env_file: .env environment: REDIS_URL: redis://redis:6379 volumes: - chroma_data:/app/chroma_db depends_on: redis: condition: service_healthy restart: unless-stopped healthcheck: test: ["CMD", "python", "-c", "import httpx; httpx.get('http://localhost:8000/admin/health').raise_for_status()"] interval: 30s timeout: 10s retries: 3 # ── Celery worker ──────────────────────────────────── worker: build: . command: celery -A app.worker.celery_app worker --loglevel=info --concurrency=4 env_file: .env environment: REDIS_URL: redis://redis:6379 volumes: - chroma_data:/app/chroma_db depends_on: - redis restart: unless-stopped # ── Celery Beat (scheduled tasks) ──────────────────── beat: build: . command: celery -A app.worker.celery_app beat --loglevel=info env_file: .env environment: REDIS_URL: redis://redis:6379 depends_on: - redis restart: unless-stopped # ── Redis (message broker + result backend) ────────── redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis_data:/data command: redis-server --appendonly yes # persist to disk healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 10s timeout: 5s retries: 5 restart: unless-stopped volumes: chroma_data: redis_data: # Commands # docker compose up --build -d # start all services detached # docker compose logs -f api # follow api logs # docker compose ps # show service status # docker compose down -v # stop and remove volumes
When to Use Background Jobs
Architecture DecisionNot all AI work belongs in the request-response cycle. Background jobs handle long-running, expensive, or retry-able work without blocking the API.
| Handle In-Request | Offload to Queue |
|---|---|
| Simple Q&A (< 5s) | Document ingestion (minutes) |
| Single-turn RAG query | Batch embedding 10k documents |
| Streaming chat response | Running evaluation harness |
| Short agent task (< 30s) | Long research agent (5+ min) |
| Classification / routing | Report generation, exports |
# The async job pattern # # 1. Client POSTs request → API returns job_id immediately (202 Accepted) # 2. Worker processes job in background # 3. Client polls GET /jobs/{job_id} → {"status": "pending" | "running" | "done" | "failed"} # 4. When done, result available in job response # # Alternative: webhooks (POST to client URL when done) # Alternative: SSE endpoint client subscribes to for job updates
Celery — Distributed Task Queue
Standardpip install celery redis # app/worker.py — Celery app and task definitions from celery import Celery from celery.utils.log import get_task_logger import anthropic, os logger = get_task_logger(__name__) # Celery app — Redis as broker AND result backend celery_app = Celery( "ai_tasks", broker=os.environ.get("REDIS_URL", "redis://localhost:6379/0"), backend=os.environ.get("REDIS_URL", "redis://localhost:6379/0"), ) celery_app.conf.update( task_serializer="json", result_serializer="json", accept_content=["json"], result_expires=3600, # results expire after 1 hour task_soft_time_limit=300, # raise SoftTimeLimitExceeded after 5 min task_time_limit=360, # hard kill after 6 min worker_max_tasks_per_child=50 # restart worker after 50 tasks (memory leak prevention) ) # ── Task: ingest documents ──────────────────────────── @celery_app.task( bind=True, max_retries=3, default_retry_delay=60, # retry after 60s name="tasks.ingest_documents" ) def ingest_documents(self, document_paths: list[str], collection: str) -> dict: logger.info(f"Ingesting {len(document_paths)} documents into {collection}") try: pipeline = DocumentIngestionPipeline(config=IngestionConfig(collection_name=collection)) results = pipeline.ingest_directory_files(document_paths) return {"status": "success", "chunks_added": results["chunks"], "files": results["files"]} except Exception as exc: logger.error(f"Ingestion failed: {exc}") raise self.retry(exc=exc, countdown=60) # retry with 60s delay # ── Task: run research agent ────────────────────────── @celery_app.task( bind=True, max_retries=2, name="tasks.run_agent" ) def run_agent_task(self, goal: str, session_id: str) -> dict: try: result = guarded_agent(goal) return result except Exception as exc: raise self.retry(exc=exc, countdown=30) # ── Scheduled tasks (beat) ──────────────────────────── from celery.schedules import crontab celery_app.conf.beat_schedule = { "daily-index-cleanup": { "task": "tasks.cleanup_stale_documents", "schedule": crontab(hour=2, minute=0), # 2am daily }, "hourly-cache-warm": { "task": "tasks.warm_embedding_cache", "schedule": crontab(minute=0), # every hour }, }
Job Status API — Async Job Pattern
Production Patternfrom fastapi import APIRouter, HTTPException from pydantic import BaseModel from celery.result import AsyncResult from typing import Any, Optional import uuid router = APIRouter(prefix="/jobs", tags=["jobs"]) class JobSubmitResponse(BaseModel): job_id: str status: str = "queued" message: str class JobStatusResponse(BaseModel): job_id: str status: str # queued | started | success | failure | revoked progress: Optional[float] = None # 0.0 – 1.0 result: Optional[Any] = None # populated when status=success error: Optional[str] = None # populated when status=failure # ── Submit: returns job_id immediately ──────────────── class IngestRequest(BaseModel): document_paths: list[str] collection: str = "default" @router.post("/ingest", status_code=202, response_model=JobSubmitResponse) async def submit_ingestion(request: IngestRequest): task = ingest_documents.delay( document_paths=request.document_paths, collection=request.collection ) return JobSubmitResponse( job_id=task.id, status="queued", message=f"Ingestion job queued. Poll /jobs/{task.id} for status." ) # ── Poll: check job status ──────────────────────────── @router.get("/{job_id}", response_model=JobStatusResponse) async def get_job_status(job_id: str): result = AsyncResult(job_id, app=celery_app) match result.state: case "PENDING": return JobStatusResponse(job_id=job_id, status="queued") case "STARTED": meta = result.info or {} return JobStatusResponse(job_id=job_id, status="running", progress=meta.get("progress")) case "SUCCESS": return JobStatusResponse(job_id=job_id, status="success", result=result.result) case "FAILURE": return JobStatusResponse(job_id=job_id, status="failed", error=str(result.info)) case _: return JobStatusResponse(job_id=job_id, status=result.state.lower()) # ── Cancel a job ────────────────────────────────────── @router.delete("/{job_id}") async def cancel_job(job_id: str): celery_app.control.revoke(job_id, terminate=True) return {"message": f"Job {job_id} cancelled"} # ── Report progress from inside a task ──────────────── @celery_app.task(bind=True) def batch_embed_task(self, texts: list[str]) -> dict: total = len(texts) for i, text in enumerate(texts): embed_and_store(text) # Update progress — visible in /jobs/{id} self.update_state( state="STARTED", meta={"progress": (i + 1) / total, "current": i + 1, "total": total} ) return {"embedded": total}
FREE LEARNING RESOURCES
| Type | Resource | Best For |
|---|---|---|
| Docs | Docker: Multi-Stage Builds — docs.docker.com | Official guide on multi-stage builds for smaller, secure production images. |
| Docs | Celery: First Steps — docs.celeryq.dev | Official Celery quickstart. Covers tasks, workers, beat scheduler, and result backends. |
| Docs | Docker Compose Documentation — docs.docker.com/compose | Complete Docker Compose reference including healthchecks, depends_on, and volumes. |
| Article | FastAPI + Celery Tutorial — testdriven.io | End-to-end tutorial combining FastAPI with Celery and Redis. Includes Docker Compose setup. |
Containerise your M23 FastAPI app and add an async document ingestion pipeline using Celery.
Requirements
- Dockerfile — multi-stage build, non-root user, HEALTHCHECK, gunicorn CMD
- docker-compose.yml — api, worker, beat, redis services with healthchecks and volumes
- Celery tasks — ingest_documents task with retry logic and progress reporting
- Job API — POST /jobs/ingest (202), GET /jobs/{id}, DELETE /jobs/{id}
- Progress — task updates state with progress 0.0–1.0; client polls /jobs/{id}
- Scheduled task — daily cleanup of stale embeddings via Celery Beat
Test It
- docker compose up, submit 50-document ingestion job, poll until complete
- Kill the worker mid-job. Restart it. Verify the job retries and completes.
- Verify docker compose ps shows all services healthy
Skills: Multi-stage Docker, Docker Compose healthchecks, Celery tasks + retries + progress, job status polling API
Dockerfile — Build and Inspect
Objective: Build a production Docker image and understand what's inside it.
docker build -t ai-api:single .. Check the size: docker image ls ai-api:single.docker build -t ai-api:multi .. Compare sizes. The multi-stage version should be 30–60% smaller.docker run ai-api:multi whoami → prints "appuser" not "root".docker run ai-api:multi ls -la.docker ps shows "unhealthy" after 3 failed checks.Docker Compose Stack
Objective: Bring up the full multi-service stack and verify all services communicate.
docker compose up --build -d. Check all services: docker compose ps.docker compose stop redis). Does the api service fail to start? Verify depends_on with condition: service_healthy works.docker compose down — NOT -v). Restart. Verify documents are still in ChromaDB.docker compose kill worker. Submit an ingestion job via the API. Restart the worker: docker compose start worker. Verify the job eventually completes.Celery Task — Retry and Progress
Objective: Build and test a Celery task with retry logic and progress reporting.
docker compose logs beat should show it enqueuing each minute.P7-M24 MASTERY CHECKLIST
- Can write a multi-stage Dockerfile: builder stage for dependencies, production stage for the app
- Always run containers as non-root user (RUN useradd + USER appuser)
- Always include a .dockerignore to exclude .git, __pycache__, .env, chroma_db
- Can add Docker HEALTHCHECK that calls /admin/health
- Know the production CMD: gunicorn with UvicornWorker, not uvicorn directly
- Can write a docker-compose.yml with api, worker, redis services
- Can configure depends_on with condition: service_healthy for startup ordering
- Can configure named volumes for ChromaDB and Redis data persistence
- Know when to offload to a queue: any AI task over 30s or requiring retries
- Can create a Celery app with Redis as broker and result backend
- Can write a Celery task with bind=True, max_retries, and retry on exception
- Can configure task_soft_time_limit and task_time_limit to prevent runaway tasks
- Can report task progress via self.update_state(state="STARTED", meta={"progress": x})
- Can implement a job status API: POST returns 202 + job_id, GET polls AsyncResult
- Can configure Celery Beat for scheduled tasks using crontab
- Completed Lab 1: Dockerfile built with multi-stage, non-root, healthcheck
- Completed Lab 2: Docker Compose stack tested with volume persistence and crash recovery
- Completed Lab 3: Celery task with retry + progress + time limit verified
- Milestone project: containerised AI stack with async ingestion pushed to GitHub
✅ When complete: Move to P7-M25 — Auth, Logging & Observability. Your app is containerised and has async jobs. M25 covers what you need to see inside a running production system.