Learn Python Series (#53) - Deployment & Production Best Practices

avatar

Learn Python Series (#53) - Deployment & Production Best Practices

python-logo.png

Repository

What will I learn

  • You will learn the difference between development and production ASGI servers;
  • how to configure applications for different environments using pydantic-settings;
  • essential production requirements: health checks, graceful shutdown, structured logging;
  • Docker deployment with multi-stage builds and docker-compose;
  • database connection pooling, migration strategy, and common production pitfalls;
  • monitoring, request tracing, and knowing what to watch when things go wrong.

Requirements

  • A working modern computer running macOS, Windows or Ubuntu;
  • An installed Python 3(.11+) distribution;
  • The ambition to learn Python programming.

Difficulty

  • Intermediate, advanced

Curriculum (of the Learn Python Series):

GitHub Account

https://github.com/realScipio

Learn Python Series (#53) - Deployment & Production Best Practices

Your API works on localhost. Tests pass. Authentication is solid. You're feeling good.

Then you deploy, and everything falls apart. Database connections time out under load. One slow external API call blocks all your workers. Logs are a wall of unstructured text that tells you nothing useful. A server crashes at 3 AM and nobody notices until users complain hours later.

The gap between "works on my machine" and "runs reliably in production" is where most developers get humbled — including me, more than once ;-). This episode is about closing that gap systematically.

Nota bene: Deployment is the easy part. A docker push and you're live. Staying live — that's the real skill. Monitoring, logging, graceful degradation, connection pooling, knowing what to do when things break at scale. This is what separates hobbyist projects from production systems, and it's what we'll dig into today.

Development vs Production Servers — Why It Matters

During development, you run Uvicorn directly:

uvicorn main:app --reload

The --reload flag watches for file changes and restarts the server automatically. Convenient for development. But here's what --reload actually does under the hood: it uses file system watchers (watchfiles on Linux, fsevents on macOS) to detect changes, then kills the running process and spawns a new one. Every active connection is dropped. Every in-flight request is abandoned. In development, that's a mild annoyance. In production, that's users getting 502 errors.

More importantly, a single Uvicorn process means a single Python event loop, which means a single point of failure. If that process crashes, your API is down — completely, instantly, silently (unless you've set up monitoring, which is the other half of this episode).

Production uses a process manager:

gunicorn main:app \
    --workers 4 \
    --worker-class uvicorn.workers.UvicornWorker \
    --bind 0.0.0.0:8000 \
    --access-logfile - \
    --error-logfile -

Why Gunicorn + Uvicorn? Gunicorn is a mature, battle-tested process manager. It handles spawning workers, monitoring their health, restarting crashed workers, and distributing incoming connections. Uvicorn (via UvicornWorker) handles the actual async request processing within each worker. Together they give you: process supervision, graceful restarts, and horizontal scaling within a single machine.

Worker Count — The Formula and the Nuance

The standard formula is:

workers = (2 × CPU cores) + 1

For a 4-core machine: (2 × 4) + 1 = 9 workers. The reasoning: during I/O operations (database queries, external API calls, file reads), the worker is waiting — not computing. Having more workers than cores ensures CPUs stay busy while other workers are blocked on I/O.

But this formula has limits. Each worker is a separate OS process that loads the entire application into memory. A FastAPI app with SQLAlchemy, Pydantic models, and imported packages might use 50-150MB per worker. Nine workers = 450MB-1.3GB just for the application layer.

For CPU-bound workloads (image processing, heavy computation), use cores + 1. For I/O-bound workloads (most web APIs), (2 × cores) + 1 is the right starting point. But always profile — theory is no substitute for measuring actual memory usage and response times under realistic load:

# Check memory per worker
ps aux | grep gunicorn | awk '{print $6/1024 " MB", $0}'

Environment Configuration with pydantic-settings

Never hardcode configuration. Not "try not to" — never:

# settings.py
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    database_url: str
    secret_key: str
    debug: bool = False
    allowed_origins: list[str] = ["https://yourfrontend.com"]
    log_level: str = "INFO"

    # Worker config
    workers: int = 4
    db_pool_size: int = 10
    db_max_overflow: int = 20

    model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}

@lru_cache
def get_settings() -> Settings:
    return Settings()

The @lru_cache ensures settings are loaded once and reused. In development, create a .env file:

# .env (NEVER commit this to git — add to .gitignore)
DATABASE_URL=postgresql://user:password@localhost/mydb
SECRET_KEY=dev-only-change-in-production
DEBUG=True
LOG_LEVEL=DEBUG

In production, set environment variables directly — via Docker, systemd, your cloud provider's secret manager, or whatever your infrastructure uses. The point is: the same code runs everywhere, only the configuration changes.

Using pydantic-settings instead of raw os.getenv() gives you type validation at startup. If DATABASE_URL is missing or DB_POOL_SIZE isn't an integer, the application fails immediately with a clear error — not thirty minutes later when the first database query runs and you get a cryptic traceback.

# In your FastAPI app
from fastapi import Depends

app = FastAPI()

@app.get("/debug/settings")
def show_settings(settings: Settings = Depends(get_settings)):
    # Only expose in debug mode
    if not settings.debug:
        raise HTTPException(status_code=404)
    return {"log_level": settings.log_level, "workers": settings.workers}

Health Check Endpoints — Simple and Deep

Load balancers, container orchestrators (Kubernetes), and monitoring tools need to know if your app is healthy. Two levels of health checks:

Liveness — "Is the process alive?" (should it be restarted?):

@app.get("/health/live")
async def liveness():
    return {"status": "alive"}

Readiness — "Can it handle requests?" (should traffic be routed here?):

from sqlalchemy import text

@app.get("/health/ready")
async def readiness(db: Session = Depends(get_db)):
    checks = {}
    try:
        db.execute(text("SELECT 1"))
        checks["database"] = "connected"
    except Exception:
        checks["database"] = "unavailable"
        raise HTTPException(
            status_code=503,
            detail={"status": "not ready", "checks": checks}
        )
    # Add more checks: Redis, external APIs, disk space...
    return {"status": "ready", "checks": checks}

The distinction matters. A liveness failure means "restart this container." A readiness failure means "stop sending it traffic, but don't kill it" — maybe the database is temporarily down and will recover. Kubernetes uses these differently, and getting it wrong causes cascading restarts that make outages worse.

Graceful Shutdown — Finish What You Started

When Gunicorn restarts a worker (deploy, scale-down, crash recovery), it sends SIGTERM. The worker should finish its current requests before exiting:

from contextlib import asynccontextmanager
from fastapi import FastAPI
import logging

logger = logging.getLogger(__name__)

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    logger.info("Application starting up...")
    # Initialize connection pools, caches, background tasks
    yield
    # Shutdown
    logger.info("Application shutting down, cleaning up...")
    # Close database pools, flush logs, cancel background tasks

app = FastAPI(lifespan=lifespan)

FastAPI's lifespan context manager (replacing the old on_event("startup") / on_event("shutdown") pattern) runs code before the first request and after the last request. This is where you initialize and clean up shared resources — database connection pools, Redis clients, HTTP client sessions.

Gunicorn's --graceful-timeout 30 gives workers 30 seconds to finish in-flight requests after receiving SIGTERM. If they don't finish in time, SIGKILL ends them forcefully. Set this high enough for your slowest expected request, but not so high that deploys take forever.

Structured Logging — JSON or Go Home

In development, human-readable logs are fine:

2026-02-23 14:30:22 - INFO - User [email protected] logged in

In production, you need structured (JSON) logs that log aggregation tools can parse, index, and query:

import logging
import json
from datetime import datetime, timezone

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_entry = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno,
        }
        if record.exc_info and record.exc_info[0]:
            log_entry["exception"] = self.formatException(record.exc_info)
        if hasattr(record, "request_id"):
            log_entry["request_id"] = record.request_id
        return json.dumps(log_entry)

def setup_logging(level: str = "INFO"):
    handler = logging.StreamHandler()
    handler.setFormatter(JSONFormatter())
    root = logging.getLogger()
    root.handlers.clear()
    root.addHandler(handler)
    root.setLevel(getattr(logging, level.upper()))

The request_id field is critical for production debugging. Add a unique ID to every request using middleware:

import uuid

@app.middleware("http")
async def add_request_id(request, call_next):
    request_id = str(uuid.uuid4())[:8]
    request.state.request_id = request_id
    response = await call_next(request)
    response.headers["X-Request-ID"] = request_id
    return response

When a user reports "something broke," they give you the request ID from the response header, and you can grep your logs for every event related to that specific request. Without request IDs, debugging production issues in a multi-worker, multi-instance system is like finding a needle in a haystack — while the haystack is on fire.

Database Connection Pooling — Don't Create Connections Per Request

Every database connection involves a TCP handshake, authentication, and protocol negotiation. On a fast network that's ~5ms. At 100 requests/second, that's 500ms of pure overhead every second — just connecting.

Connection pooling maintains a set of open connections that get reused:

from sqlalchemy import create_engine

engine = create_engine(
    settings.database_url,
    pool_size=10,           # Keep 10 connections alive
    max_overflow=20,        # Allow 20 more under burst load
    pool_pre_ping=True,     # Check connection health before use
    pool_recycle=3600,      # Recreate connections after 1 hour
    pool_timeout=30,        # Wait max 30s for a connection from pool
)

pool_pre_ping=True sends a lightweight query (SELECT 1) before handing a connection to your code. This catches stale connections (database restarted, network timeout) before they cause a cryptic "connection reset" error mid-query. The cost is one extra round-trip per checkout — worth it for the reliability.

pool_recycle=3600 prevents the "connection has been idle too long" problem. Many databases (and firewalls in between) silently drop connections after a timeout. Recycling forces reconnection before that happens.

Sizing the pool: pool_size should roughly match your worker count. Each worker handles one request at a time (in the synchronous path), so one connection per worker is the baseline. max_overflow handles burst traffic — these extra connections are created on demand and destroyed when returned to the pool.

Docker Deployment — Multi-Stage Builds

A production Dockerfile should be small, reproducible, and secure:

# Stage 1: Build dependencies
FROM python:3.11-slim AS builder

WORKDIR /build
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt

# Stage 2: Runtime
FROM python:3.11-slim

# Don't run as root
RUN useradd --create-home appuser
USER appuser
WORKDIR /home/appuser/app

# Copy installed packages from builder
COPY --from=builder /root/.local /home/appuser/.local
ENV PATH=/home/appuser/.local/bin:$PATH

# Copy application code
COPY --chown=appuser:appuser . .

EXPOSE 8000

CMD ["gunicorn", "main:app", \
     "--workers", "4", \
     "--worker-class", "uvicorn.workers.UvicornWorker", \
     "--bind", "0.0.0.0:8000", \
     "--access-logfile", "-", \
     "--error-logfile", "-", \
     "--graceful-timeout", "30"]

Key points: the multi-stage build keeps your final image small (no compiler toolchains, no pip cache). The appuser prevents the container from running as root — a basic security requirement. --no-cache-dir avoids storing pip's download cache in the image.

For a complete local setup with a database, use docker-compose:

# docker-compose.yml
services:
  api:
    build: .
    ports:
      - "8000:8000"
    env_file: .env
    depends_on:
      db:
        condition: service_healthy
  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: myuser
      POSTGRES_PASSWORD: mypassword
      POSTGRES_DB: mydb
    volumes:
      - pgdata:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U myuser"]
      interval: 5s
      timeout: 3s
      retries: 5

volumes:
  pgdata:

The depends_on with condition: service_healthy ensures the API doesn't start until PostgreSQL is actually accepting connections — not just when the container is running (the container starts before the database process is ready, which is a classic race condition).

Common Production Pitfalls — Learn from Others' Mistakes

No Request Timeouts

Without timeouts, a slow external API can hold your workers hostage indefinitely:

import httpx

# WRONG — hangs forever if the external service is down
response = await client.get("https://slow-api.example.com/data")

# RIGHT — fail fast, handle gracefully
async with httpx.AsyncClient(timeout=httpx.Timeout(10.0, connect=5.0)) as client:
    try:
        response = await client.get("https://slow-api.example.com/data")
    except httpx.TimeoutException:
        logger.warning("External API timeout, using cached data")
        return get_cached_data()

The Timeout(10.0, connect=5.0) sets 10 seconds for the overall request and 5 seconds just for establishing the TCP connection. Separate connect timeout catches DNS/network issues faster than waiting for the full timeout.

Synchronous Database Calls in Async Endpoints

This is a subtle but devastating mistake:

# BLOCKS THE EVENT LOOP — all other async requests wait
@app.get("/users/")
async def list_users(db: Session = Depends(get_db)):
    users = db.query(User).all()  # synchronous call in async context!
    return users

When you declare an endpoint with async def, FastAPI runs it on the asyncio event loop. A synchronous database call (db.query(...)) blocks that event loop — no other request can be processed until the query completes. With standard SQLAlchemy, either use def (not async def) so FastAPI runs it in a threadpool automatically, or use async SQLAlchemy:

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

async_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=10
)

@app.get("/users/")
async def list_users(db: AsyncSession = Depends(get_async_db)):
    result = await db.execute(select(User))
    return result.scalars().all()  # truly non-blocking

Missing Database Indexes

The difference between indexed and unindexed lookups on a million-row table can be 1ms vs 2000ms:

# Model definition — always index columns you filter or join on
class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    email = Column(String, unique=True, index=True)      # filtered in login
    created_at = Column(DateTime, index=True)             # filtered in reports
    role = Column(String, index=True)                     # filtered in admin views

Rule of thumb: if you WHERE on it, ORDER BY it, or JOIN on it — index it. The write-time cost of maintaining indexes is almost always worth the read-time savings.

Database Migration Strategy

Never modify production schemas by hand. Use Alembic (which we introduced in episode #51):

# Generate migration from model changes
alembic revision --autogenerate -m "add user role column"

# Apply in production
alembic upgrade head

Always review autogenerated migrations before applying. Alembic doesn't detect everything (renamed columns look like a drop + add), and destructive operations need manual verification.

Run migrations before deploying new code that expects the new schema. The sequence is: migrate → deploy → verify. Never the other way around — new code hitting an old schema produces errors under load.

Monitoring — Know Before Your Users Do

The four golden signals of monitoring (per Google's SRE book):

SignalWhat it tells youHow to measure
LatencyHow long requests takep50, p95, p99 response times
TrafficHow much demand you're handlingRequests per second
ErrorsWhat's failing4xx and 5xx response rates
SaturationHow close to capacity you areCPU, memory, connection pool usage

A simple middleware to track request metrics:

import time

@app.middleware("http")
async def metrics_middleware(request, call_next):
    start = time.perf_counter()
    response = await call_next(request)
    duration = time.perf_counter() - start

    logger.info(
        "request_completed",
        extra={
            "method": request.method,
            "path": request.url.path,
            "status": response.status_code,
            "duration_ms": round(duration * 1000, 2),
        }
    )
    return response

For serious production monitoring, integrate with Prometheus (metrics collection) + Grafana (dashboards). The prometheus-fastapi-instrumentator package adds standard HTTP metrics with one line:

from prometheus_fastapi_instrumentator import Instrumentator

Instrumentator().instrument(app).expose(app)
# Now /metrics endpoint serves Prometheus-format metrics

Set up alerts for: error rate > 1%, p99 latency > 2 seconds, CPU > 80% sustained, database connection pool > 90% utilization. These thresholds catch problems before users notice them.

What to remember from this one

In this episode, we covered the full deployment lifecycle:

  • Gunicorn + Uvicorn provides process management, worker supervision, and graceful restarts that raw Uvicorn alone cannot
  • Worker count (2 × cores) + 1 balances CPU and I/O, but always profile memory usage under realistic load
  • pydantic-settings validates configuration at startup — fail fast, not at 3 AM when the first edge case hits
  • Liveness vs readiness health checks serve different purposes — getting this wrong causes cascading failures
  • Lifespan context managers replace the old startup/shutdown events for resource management
  • Structured JSON logging with request IDs makes production debugging tractable instead of hopeless
  • Connection pooling with pool_pre_ping and pool_recycle prevents stale connections from causing mysterious errors
  • Multi-stage Docker builds with non-root users produce small, secure images
  • docker-compose with health check conditions prevents startup race conditions
  • Request timeouts on external calls prevent worker starvation
  • async def + sync DB = blocked event loop — use def endpoints or async database drivers
  • Index every column you filter, sort, or join on
  • Migrate before deploying, never after — and always review autogenerated migrations
  • Four golden signals: latency, traffic, errors, saturation — know them before your users do

We've now come full circle on the web development arc. Over the last five episodes, we went from "hello world" FastAPI to a production-deployed, database-backed, authenticated API with proper logging and monitoring. That's not a tutorial toy — that's a real stack you could ship.

But here's the bigger picture: everything we've built — APIs, databases, authentication, deployment — is infrastructure. It moves data, stores data, protects data. The question we haven't answered yet is: what do you actually do with all that data once you have it?

Dusssssss, tot de volgende! Laters ;-)

@scipio



0
0
0.000
0 comments