Learn AI Series (#34) - ML Engineering - From Notebook to Production

avatar

Learn AI Series (#34) - ML Engineering - From Notebook to Production

ai-banner.png

What will I learn

  • You will learn the gap between "works in Jupyter" and "works in production" -- and how to bridge it;
  • model serialization with joblib -- saving the entire pipeline so preprocessing and model travel together;
  • batch vs real-time serving -- when precomputed predictions beat on-demand inference;
  • serving a model with FastAPI -- a minimal but real deployment you can build in an afternoon;
  • feature stores and shared feature modules -- the consistency problem that silently kills production ML;
  • model monitoring -- detecting data drift, concept drift, and prediction drift before they cost you;
  • A/B testing and shadow mode for validating new model versions in production.

Requirements

  • A working modern computer running macOS, Windows or Ubuntu;
  • An installed Python 3(.11+) distribution;
  • The ambition to learn AI and machine learning.

Difficulty

  • Beginner

Curriculum (of the Learn AI Series):

Learn AI Series (#34) - ML Engineering - From Notebook to Production

You've spent 33 episodes building models. Linear regression, random forests, gradient boosting, SVMs, clustering, NLP pipelines, Bayesian methods, and sophisticated ensemble stacks. Every single one of them worked beautifully in a notebook. You loaded data, trained a model, printed an accuracy score, and moved on to the next technique. But here's what we haven't addressed yet: nothing you've built so far is production-ready.

A model that lives in a Jupyter notebook is a prototype. A proof of concept. A science experiment. A model that serves predictions to real users, handles messy real-world input, degrades gracefully under load, and alerts you when it stops working -- that is an ML system. The gap between the two is where most ML projects die. Industry estimates put it as high as 87% of ML models never making it to production. Eighty-seven percent! That's a staggering failure rate, and it's not because the models are bad -- it's because going from "notebook accuracy" to "deployed system" requires an entirely different set of engeneering skills.

Today we bridge that gap. Not with a comprehensive MLOps textbook (that would take ten episodes by itself), but with the essential patterns and a working deployment you can actually build in an afternoon. We're going to take the scikit-learn pipeline skills from episode #16, the model evaluation from episode #13, and the feature engineering from episode #15, and turn them into something that serves real predictions to real users.

Here we go!

The notebook-to-production gap

Let me paint the picture, because I think a lot of people underestimate just how different production is from the notebook world we've been living in.

In a notebook, you load a CSV that sits quietly on disk. In production, data arrives continuously -- from APIs, databases, message queues, user interactions. The data has missing fields, invalid types, unexpected categories your model has never seen, and encoding issues that turn "Sao Paulo" into garbled bytes. Remember the data preparation challenges from episode #14? Multiply those by ten and add the constraint that you can't stop and fix things manually anymore.

In a notebook, your model processes one dataset at a time. In production, it might need to handle 1,000 requests per second, each with sub-100ms latency requirements. Your beautiful scikit-learn pipeline that takes 50ms per prediction works fine when you call it once. Under concurrent load from hundreds of users simultaneously, it chokes.

In a notebook, you retrain whenever you want. In production, data distributions shift. The model you trained on January data might perform terribly by March because user behavior changed, the economy shifted, or a new product category appeared that doesn't match anything in your training set. You won't notice unless you're actively watching -- and by the time someone complains, the model has been silently producing garbage for weeks.

In a notebook, you're the only user. In production, a bad prediction can lose money, damage reputation, or worse. That spam classifier from episode #32's Naive Bayes section? If it starts letting spam through because the spammers changed their vocabulary, your users will notice immediately.

These aren't theoretical problems. They're why ML engineering exists as a discipline, and why companies pay ML engineers very well to solve them ;-)

Model serialization: saving what you built

The first step toward production is embarrassingly simple but critically important: save the trained model to disk so you don't need to retrain it every time you want a prediction. We touched on this briefly in episode #16 with scikit-learn pipelines, but now it's central to everything else we're going to build.

import os
import joblib
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import make_classification

# Generate a dataset (stand-in for your real training data)
X, y = make_classification(
    n_samples=1000, n_features=20, random_state=42
)

# Build a pipeline: preprocessing + model together
pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('model', GradientBoostingClassifier(
        n_estimators=100, random_state=42
    ))
])
pipe.fit(X, y)

# Save the ENTIRE pipeline -- preprocessing + model as one artifact
joblib.dump(pipe, 'model_pipeline.joblib')
print(f"Pipeline saved. File size: "
      f"{os.path.getsize('model_pipeline.joblib') / 1024:.1f} KB")

# Load and predict -- identical results, no retraining needed
loaded_pipe = joblib.load('model_pipeline.joblib')
preds = loaded_pipe.predict(X[:5])
probs = loaded_pipe.predict_proba(X[:5])
print(f"Predictions: {preds}")
print(f"Probabilities: {probs[:, 1].round(3)}")

# Verify: loaded model gives EXACT same results
original_preds = pipe.predict(X)
loaded_preds = loaded_pipe.predict(X)
print(f"Predictions match: {np.array_equal(original_preds, loaded_preds)}")

Critical detail here (and I've seen people get this wrong more times than I can count): save the entire pipeline, not just the model. If you save only the GradientBoostingClassifier without the StandardScaler, you'll need to separately track how the scaler was fitted -- which means, means and standard deviations for every feature. Forget one parameter, and your production predictions silently diverge from your notebook results. The pipeline bundles everything together. One file, one artifact, no discrepancies.

import os

# Why pipelines, not just models
# BAD: saving model alone
joblib.dump(pipe['model'], 'model_only.joblib')

# To use model_only, you need the EXACT scaler state:
print(f"Scaler means (20 features):")
print(f"  {pipe['scaler'].mean_[:5].round(4)}...")
print(f"Scaler stds (20 features):")
print(f"  {pipe['scaler'].scale_[:5].round(4)}...")
print(f"\nForget ANY of these numbers and your "
      f"predictions are silently wrong")
print(f"\nPipeline approach: ONE file, ZERO chance of mismatch")

# Cleanup
os.remove('model_only.joblib')

Versioning your models

In production, you'll have multiple model versions over time. The model trained today might perform differently than the one trained last month. You need to track which version is currently serving, what data it was trained on, and what its validation metrics were:

import json
from datetime import datetime

def save_model_with_metadata(pipeline, train_score,
                              val_score, feature_names,
                              version_dir='models'):
    """Save model with metadata for production tracking."""
    os.makedirs(version_dir, exist_ok=True)

    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    model_path = f'{version_dir}/model_{timestamp}.joblib'
    meta_path = f'{version_dir}/model_{timestamp}_meta.json'

    # Save the pipeline
    joblib.dump(pipeline, model_path)

    # Save metadata
    metadata = {
        'version': timestamp,
        'created_at': datetime.now().isoformat(),
        'train_score': float(train_score),
        'val_score': float(val_score),
        'n_features': len(feature_names),
        'feature_names': list(feature_names),
        'pipeline_steps': [name for name, _ in pipeline.steps],
        'model_params': pipeline['model'].get_params(),
    }

    with open(meta_path, 'w') as f:
        json.dump(metadata, f, indent=2, default=str)

    print(f"Model saved: {model_path}")
    print(f"Metadata saved: {meta_path}")
    print(f"  Train score: {train_score:.4f}")
    print(f"  Val score:   {val_score:.4f}")
    return model_path

# Example usage
from sklearn.model_selection import train_test_split

X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, random_state=42
)
pipe.fit(X_train, y_train)

train_acc = pipe.score(X_train, y_train)
val_acc = pipe.score(X_val, y_val)
feature_names = [f'feature_{i}' for i in range(X.shape[1])]

model_path = save_model_with_metadata(
    pipe, train_acc, val_acc, feature_names
)

This metadata file is your audit trail. When something goes wrong in production (and it will -- it always does), you can trace back to exactly which model version was serving, what its validation metrics were, and what data it was trained on. Without this, debugging production ML issues is like navigating in the dark.

Batch vs real-time serving

There are two fundamental ways to serve predictions, and choosing the right one saves you an enormous amount of engineering headache:

Batch prediction: run the model periodically (hourly, daily) on all pending data, store the results in a database, and serve precomputed predictions on request. This is simpler, more robust, and sufficient for more use cases than most people realize. If your recommendation system updates every 6 hours, users won't notice -- Netflix's recommendations don't change every second either. The anomaly detection system from episode #26? Perfectly suited for batch -- run it every hour on new transactions, flag the suspicious ones, done.

Real-time inference: run the model on each incoming request and return the prediction immediately. Necessary when the input data only exists at request time (a user typing a search query, a payment being processed right now) or when freshness matters so much that even a few minutes of delay is unacceptable (fraud detection on a live payment).

# Batch prediction pattern
def batch_predict(model_path, new_data_path, output_path):
    """Run predictions on a batch of data and store results."""
    pipe = joblib.load(model_path)

    # In reality, this would be a database query or file read
    # Here we simulate with random data
    n_samples = 500
    X_new = np.random.randn(n_samples, 20)

    predictions = pipe.predict(X_new)
    probabilities = pipe.predict_proba(X_new)[:, 1]

    # Store results (in practice: write to database)
    results = []
    for i in range(n_samples):
        results.append({
            'sample_id': i,
            'prediction': int(predictions[i]),
            'probability': float(probabilities[i]),
            'timestamp': datetime.now().isoformat(),
        })

    with open(output_path, 'w') as f:
        json.dump(results, f)

    pos_rate = predictions.mean()
    print(f"Batch prediction complete:")
    print(f"  Samples processed: {n_samples}")
    print(f"  Positive rate: {pos_rate:.1%}")
    print(f"  Results saved to: {output_path}")

batch_predict('model_pipeline.joblib', 'new_data.csv',
              'predictions.json')

My honest advice (and I know this might sound boring): start with batch prediction. It's easier to build, easier to debug, easier to monitor, and sufficient for the vast majority of ML applications. Move to real-time only when the business case genuinely demands it. I've seen teams spend months building a real-time serving infrastructure for a model that could have been a daily cron job.

Serving a model with FastAPI

For those cases where you DO need real-time serving, you need an API endpoint that receives input data and returns predictions. FastAPI is the standard Python choice -- fast, type-safe, and automatically generates API documentation. If you followed the Learn Python Series through episode #50 (FastAPI advanced validation), you already know the framework. Here we're applying it to model serving specifically.

# serve_model.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
import joblib
import numpy as np
import time

app = FastAPI(
    title="ML Model API",
    description="Serves predictions from a trained scikit-learn pipeline"
)

# Load model at startup -- NOT on every request
pipe = joblib.load('model_pipeline.joblib')
print(f"Model loaded. Pipeline steps: "
      f"{[name for name, _ in pipe.steps]}")


class PredictionRequest(BaseModel):
    features: list[float] = Field(
        ..., min_length=20, max_length=20,
        description="Exactly 20 numerical features"
    )


class PredictionResponse(BaseModel):
    prediction: int
    probability: float
    latency_ms: float


@app.post("/predict", response_model=PredictionResponse)
def predict(req: PredictionRequest):
    start = time.time()

    X = np.array(req.features).reshape(1, -1)
    if X.shape[1] != 20:
        raise HTTPException(
            400,
            f"Expected 20 features, got {X.shape[1]}"
        )

    pred = pipe.predict(X)[0]
    prob = pipe.predict_proba(X)[0].max()
    latency = (time.time() - start) * 1000

    return PredictionResponse(
        prediction=int(pred),
        probability=float(prob),
        latency_ms=round(latency, 2)
    )


@app.get("/health")
def health():
    """Health check endpoint for monitoring."""
    return {
        "status": "healthy",
        "model_type": type(pipe['model']).__name__,
        "n_features": pipe['model'].n_features_in_,
    }

Run this with uvicorn serve_model:app and you have a production-grade API serving predictions. The Pydantic models validate input automatically -- wrong types, wrong number of features, missing fields all get rejected before reaching your model. The HTTPException catches dimension mismatches. FastAPI handles concurrency, serialization, and API documentation out of the box. Visit /docs in your browser and you get an interactive Swagger UI where you can test your API without writing a single line of client code.

This is the skeleton. A real deployment adds authentication, rate limiting, logging, health checks, and container packaging (Docker). But the core pattern never changes: load model at startup, validate input, predict, return structured output.

# Test the API locally (simulated -- in practice you'd
# use requests or httpx to call the actual endpoint)

def simulate_api_call(features):
    """Simulate what the API endpoint does."""
    X = np.array(features).reshape(1, -1)
    pred = pipe.predict(X)[0]
    prob = pipe.predict_proba(X)[0].max()
    return {'prediction': int(pred), 'probability': float(prob)}

# Generate some test inputs
np.random.seed(42)
test_inputs = np.random.randn(5, 20)

print("Simulated API calls:")
for i, features in enumerate(test_inputs):
    result = simulate_api_call(features.tolist())
    print(f"  Request {i+1}: prediction={result['prediction']}, "
          f"probability={result['probability']:.3f}")

Feature stores: the consistency problem

Here's one of the most insidious production ML bugs, and I guarantee you'll encounter it if you work in ML long enough: training-serving skew. Your model was trained with features computed one way, but production computes them slightly differently. Maybe training used pandas fillna(0) for missing values while production uses a database COALESCE(value, -1). Maybe training computed "average transaction amount over 30 days" using Python's datetime while production uses SQL's DATE_SUB. The numbers are almost the same but not quite, and your model sees data it never learned from and silently produces garbage predictions.

The scary part? No error is raised. No exception. The model happily returns a prediction -- just a wrong one. And since the skew is usually small, the predictions look plausible. You might not notice for weeks or months until someone finally checks the actual performance metrics and discovers the model has been underperforming since Tuesday three weeks ago.

A feature store is a centralized system that guarantees the same feature computation logic is used in both training and serving. Think of it as a single source of truth for features: the feature store computes "user X's average transaction amount over 30 days" once, using one piece of code, and both your training pipeline and your serving endpoint read from it. Same computation, same result, zero skew.

For most projects, a full feature store (like Feast or Tecton) is overkill. The DIY version is much simpler and captures 90% of the benefit: write your feature engineering as a shared Python module that both your training script and your serving endpoint import.

# features.py -- shared between training and serving
# This is your SINGLE SOURCE OF TRUTH for feature computation

import numpy as np

def compute_features(raw_data):
    """Compute features from raw input data.

    CRITICAL: this function is used by BOTH:
    - training pipeline (train_model.py)
    - serving endpoint (serve_model.py)

    If you change feature logic here, it changes everywhere.
    If you add a feature, add it in BOTH places by... well,
    there's only one place. That's the whole point.
    """
    features = {}

    # Numeric features with safe transformations
    features['log_amount'] = np.log1p(
        max(0, raw_data.get('amount', 0))
    )
    features['hour_of_day'] = raw_data.get('hour', 12)
    features['is_weekend'] = (
        1 if raw_data.get('day_of_week', 0) >= 5 else 0
    )
    features['txn_count_30d'] = raw_data.get('txn_count_30d', 0)
    features['avg_amount_7d'] = raw_data.get('avg_amount_7d', 0)

    # Derived features
    amount = raw_data.get('amount', 0)
    avg = features['avg_amount_7d']
    features['amount_vs_avg'] = (
        amount / avg if avg > 0 else 0
    )

    # Return features in SORTED order (guarantees consistent
    # column ordering between training and serving)
    return np.array([features[k] for k in sorted(features)])


# Demonstrate consistency
raw_input_1 = {
    'amount': 150.0,
    'hour': 14,
    'day_of_week': 2,
    'txn_count_30d': 45,
    'avg_amount_7d': 120.0,
}

raw_input_2 = {
    'amount': 5000.0,
    'hour': 3,
    'day_of_week': 6,
    'txn_count_30d': 2,
    'avg_amount_7d': 80.0,
}

f1 = compute_features(raw_input_1)
f2 = compute_features(raw_input_2)
print(f"Feature vector 1: {f1.round(3)}")
print(f"Feature vector 2: {f2.round(3)}")
print(f"Feature count: {len(f1)}")
print(f"\nKey insight: sorted(features.keys()) guarantees "
      f"consistent column ordering")

The principle is simple and connects directly to what we learned in episode #14 about data preparation: feature computation logic should exist in exactly one place. If you change how a feature is calculated, it changes everywhere simultaneously. No chance of training with one version and serving with another. No chance of a database query computing something slightly different from your Python code.

Input validation: the first line of defense

Before the model even sees the data, you need to catch garbage input. The data preparation lessons from episode #14 apply here, but with a twist: in a notebook, you can stop and inspect bad data. In production, you need automated validation that rejects bad input immediately and returns a useful error message:

def validate_input(features, expected_dim=20):
    """Validate input features before prediction.
    Returns (is_valid, error_message)."""

    # Check type and shape
    try:
        X = np.array(features, dtype=float)
    except (ValueError, TypeError) as e:
        return False, f"Cannot convert to numeric: {e}"

    if X.ndim != 1:
        return False, f"Expected 1D array, got {X.ndim}D"

    if len(X) != expected_dim:
        return False, (f"Expected {expected_dim} features, "
                       f"got {len(X)}")

    # Check for NaN and infinity
    if np.any(np.isnan(X)):
        nan_idx = np.where(np.isnan(X))[0]
        return False, f"NaN values at indices: {nan_idx.tolist()}"

    if np.any(np.isinf(X)):
        inf_idx = np.where(np.isinf(X))[0]
        return False, f"Inf values at indices: {inf_idx.tolist()}"

    # Check for extreme outliers (optional but recommended)
    extreme = np.abs(X) > 1e6
    if np.any(extreme):
        ext_idx = np.where(extreme)[0]
        return False, (f"Extreme values at indices: "
                       f"{ext_idx.tolist()}")

    return True, "OK"


# Test with various inputs
test_cases = [
    ([1.0] * 20, "Valid input"),
    ([1.0] * 19, "Wrong dimension"),
    ([1.0] * 19 + [float('nan')], "Contains NaN"),
    ([1.0] * 19 + [float('inf')], "Contains infinity"),
    ([1.0] * 19 + [1e8], "Extreme value"),
]

print("Input validation tests:")
for features, description in test_cases:
    valid, msg = validate_input(features)
    status = "PASS" if valid else "REJECT"
    print(f"  [{status}] {description}: {msg}")

Model monitoring: knowing when things break

Deployed models degrade. Always. The question isn't whether -- it's when and how fast. Monitoring catches degradation before it costs you. There are three types of drift to watch for, and they're all connected to concepts we've covered before:

Data drift: the distribution of incoming features shifts away from training data. If your model was trained on data where the average transaction amount was $50 and production transactions suddenly average $500, the model is extrapolating into territory it never learned from (remember the bias-variance discussion from episode #13 -- extrapolation is where models break). Detect this by comparing feature distributions between training data and recent production data.

Concept drift: the relationship between features and target changes. Even if feature distributions stay the same, the correct predictions might change. User behavior evolves, markets shift, regulations change. The fraud model trained in 2023 might miss 2025 fraud patterns entirely -- not because the data looks different, but because what constitutes fraud has evolved. This is the nastiest type of drift because you can't detect it from input features alone. You need actual outcomes (ground truth labels) to compare against.

Prediction drift: the distribution of model outputs changes. If your model suddenly predicts "positive" for 80% of inputs when it historically predicted 50%, something is probably wrong -- either the data shifted or the model broke. This one you can detect without ground truth labels, which makes it your early warning system.

from scipy import stats

def check_feature_drift(train_features, recent_features,
                         feature_names=None, threshold=0.05):
    """Detect data drift using Kolmogorov-Smirnov test.
    Compares training feature distributions against recent
    production features. Returns alerts for drifted features."""
    n_features = train_features.shape[1]
    if feature_names is None:
        feature_names = [f'feature_{i}'
                         for i in range(n_features)]

    alerts = []
    for col in range(n_features):
        stat, p_value = stats.ks_2samp(
            train_features[:, col],
            recent_features[:, col]
        )
        if p_value < threshold:
            alerts.append({
                'feature': feature_names[col],
                'ks_statistic': round(stat, 4),
                'p_value': round(p_value, 6),
                'train_mean': round(
                    train_features[:, col].mean(), 3),
                'recent_mean': round(
                    recent_features[:, col].mean(), 3),
            })

    return alerts


# Simulate: training data vs production data with drift
np.random.seed(42)
X_train_ref = np.random.randn(1000, 5)

# Production data: features 0-2 are fine,
# features 3-4 have shifted
X_production = np.random.randn(200, 5)
X_production[:, 3] += 1.5  # mean shift in feature 3
X_production[:, 4] *= 3.0  # variance change in feature 4

feature_names = ['amount', 'hour', 'day', 'txn_count',
                 'avg_amount']
alerts = check_feature_drift(
    X_train_ref, X_production, feature_names
)

print(f"Drift detection results ({len(alerts)} alerts):\n")
if alerts:
    for alert in alerts:
        print(f"  DRIFT: {alert['feature']}")
        print(f"    KS statistic: {alert['ks_statistic']}")
        print(f"    p-value: {alert['p_value']}")
        print(f"    Train mean: {alert['train_mean']} -> "
              f"Recent mean: {alert['recent_mean']}")
        print()
else:
    print("  No drift detected")

The Kolmogorov-Smirnov test (which we've been using since our statistics discussions in episode #9) compares two distributions and returns a p-value indicating whether they're significantly different. Low p-value = significant drift = time to investigate. But be careful with the threshold -- with enough samples, even tiny (meaningless) differences become statistically significant. In practice, I combine statistical tests with practical checks: did the mean shift by more than 10%? Did the variance double? A statistically significant but practically irrelevant change doesn't need action.

Prediction monitoring

Monitoring the model's output distribution is simpler and catches many problems early:

def check_prediction_drift(historical_preds,
                            recent_preds,
                            threshold=0.05):
    """Compare recent prediction distribution against
    historical baseline."""
    hist_pos_rate = historical_preds.mean()
    recent_pos_rate = recent_preds.mean()

    # KS test on prediction probabilities
    stat, p_value = stats.ks_2samp(
        historical_preds, recent_preds
    )

    report = {
        'historical_positive_rate': round(hist_pos_rate, 4),
        'recent_positive_rate': round(recent_pos_rate, 4),
        'rate_change': round(
            recent_pos_rate - hist_pos_rate, 4),
        'ks_statistic': round(stat, 4),
        'p_value': round(p_value, 6),
        'drift_detected': p_value < threshold,
    }

    return report


# Simulate: model predictions over time
np.random.seed(42)
# Historical: ~45% positive rate
hist_probs = np.random.beta(2, 2.5, 1000)
# Recent: shifted to ~60% positive rate (something changed!)
recent_probs = np.random.beta(3, 2, 200)

report = check_prediction_drift(hist_probs, recent_probs)

print("Prediction drift report:")
for key, value in report.items():
    print(f"  {key}: {value}")

if report['drift_detected']:
    print(f"\n  WARNING: Prediction distribution has shifted!")
    print(f"  Positive rate changed from "
          f"{report['historical_positive_rate']:.1%} to "
          f"{report['recent_positive_rate']:.1%}")
    print(f"  --> Investigate: data drift? concept drift? "
          f"model bug?")

The monitoring loop

In production, you'd run these checks periodically (hourly, daily) and alert when drift is detected:

def monitoring_loop(model_path, train_reference,
                     prediction_log):
    """Complete monitoring check -- run this on a schedule."""
    pipe = joblib.load(model_path)
    checks = {'timestamp': datetime.now().isoformat()}

    # 1. Feature drift
    recent_features = prediction_log['features']
    drift_alerts = check_feature_drift(
        train_reference, recent_features
    )
    checks['feature_drift'] = {
        'n_drifted': len(drift_alerts),
        'drifted_features': [a['feature']
                              for a in drift_alerts],
    }

    # 2. Prediction drift
    pred_report = check_prediction_drift(
        prediction_log['historical_probs'],
        prediction_log['recent_probs']
    )
    checks['prediction_drift'] = pred_report

    # 3. Summary
    n_issues = (len(drift_alerts) +
                (1 if pred_report['drift_detected'] else 0))
    checks['status'] = ('OK' if n_issues == 0
                         else f'ALERT ({n_issues} issues)')

    return checks


# Simulate a monitoring run
monitoring_data = {
    'features': X_production,
    'historical_probs': hist_probs,
    'recent_probs': recent_probs,
}
results = monitoring_loop(
    'model_pipeline.joblib', X_train_ref, monitoring_data
)

print("Monitoring report:")
print(f"  Status: {results['status']}")
print(f"  Feature drift: "
      f"{results['feature_drift']['n_drifted']} features")
print(f"  Prediction drift: "
      f"{results['prediction_drift']['drift_detected']}")

When drift is confirmed, the response depends on severity: mild drift might just mean retraining on fresh data (which you should be doing periodically anyway). Severe drift might mean the problem itself has changed and you need to revisit your entire approach. The monitoring system tells you when something needs attenton -- the decision about what to do is still yours. At least for now ;-)

A/B testing ML models

When you train a new model version, how do you know it's actually better in production? Offline metrics (accuracy on a test set, the evaluation techniques from episode #13) don't always translate to online performance. The test set is a snapshot of past data; production data is live and evolving.

A/B testing splits production traffic between models: 90% sees the current model (control), 10% sees the new model (treatment). Compare business metrics -- click-through rate, conversion, revenue, whatever you actually care about -- between the two groups over days or weeks. Statistical significance testing tells you whether the difference is real or noise.

def simulate_ab_test(model_a_accuracy, model_b_accuracy,
                      n_samples=10000, traffic_split=0.1):
    """Simulate an A/B test between two model versions."""
    np.random.seed(42)

    n_b = int(n_samples * traffic_split)
    n_a = n_samples - n_b

    # Simulate predictions (bernoulli outcomes)
    results_a = np.random.binomial(1, model_a_accuracy, n_a)
    results_b = np.random.binomial(1, model_b_accuracy, n_b)

    rate_a = results_a.mean()
    rate_b = results_b.mean()

    # Two-proportion z-test
    pooled = ((results_a.sum() + results_b.sum()) /
              (n_a + n_b))
    se = np.sqrt(pooled * (1 - pooled) *
                 (1/n_a + 1/n_b))
    z_stat = (rate_b - rate_a) / se if se > 0 else 0
    p_value = 2 * (1 - stats.norm.cdf(abs(z_stat)))

    print(f"A/B Test Results:")
    print(f"  Model A (control): {rate_a:.4f} "
          f"({n_a} samples)")
    print(f"  Model B (treatment): {rate_b:.4f} "
          f"({n_b} samples)")
    print(f"  Difference: {rate_b - rate_a:+.4f}")
    print(f"  Z-statistic: {z_stat:.3f}")
    print(f"  P-value: {p_value:.4f}")

    if p_value < 0.05:
        winner = "B" if rate_b > rate_a else "A"
        print(f"  --> Statistically significant! "
              f"Model {winner} wins")
    else:
        print(f"  --> Not significant yet. Keep running.")

    return rate_a, rate_b, p_value


# Test: model B is slightly better (0.5% improvement)
simulate_ab_test(0.850, 0.855)

print()

# Test: model B is clearly better (2% improvement)
simulate_ab_test(0.850, 0.870)

The key insight: ML A/B tests need more patience than UI A/B tests. A button color change affects every page view equally. A model change might only affect edge cases that appear infrequently -- the cases where model A and model B disagree. If they agree on 95% of inputs, you need a LOT of traffic to detect a 0.5% improvement on the remaining 5%. Run the test longer than you think you need to.

(Having said that, there's a simpler alternative that many teams use first: shadow mode. The new model runs on production traffic but its predictions are logged, not served. You compare the shadow predictions against the live model's actual predictions and outcomes. No user exposure, no risk, but slower feedback because you can't measure downstream business impact directly.)

Putting it all together: a complete ML system

Let's assemble everything into a coherent picture. This is what a minimal but real production ML system looks like -- using nothing but the tools we've built across this series:

# ml_system.py -- minimal production ML system
# Combines: pipeline (ep16), evaluation (ep13),
# feature engineering (ep15), and everything from today

import os
import json
import joblib
import numpy as np
from datetime import datetime
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score
from sklearn.datasets import make_classification
from scipy import stats


class MLSystem:
    """Minimal production ML system.
    Handles training, saving, loading, predicting,
    and monitoring -- all in one place."""

    def __init__(self, model_dir='models'):
        self.model_dir = model_dir
        self.pipeline = None
        self.metadata = None
        self.train_features = None  # reference for drift
        os.makedirs(model_dir, exist_ok=True)

    def train(self, X, y, n_folds=5):
        """Train pipeline with cross-validation."""
        self.pipeline = Pipeline([
            ('scaler', StandardScaler()),
            ('model', GradientBoostingClassifier(
                n_estimators=100, random_state=42
            ))
        ])

        scores = cross_val_score(
            self.pipeline, X, y, cv=n_folds
        )
        self.pipeline.fit(X, y)
        self.train_features = X.copy()

        self.metadata = {
            'cv_mean': float(scores.mean()),
            'cv_std': float(scores.std()),
            'n_samples': len(X),
            'n_features': X.shape[1],
            'trained_at': datetime.now().isoformat(),
        }

        print(f"Model trained: {scores.mean():.4f} "
              f"+/- {scores.std():.4f} ({n_folds}-fold CV)")
        return scores

    def save(self, version_name=None):
        """Save model + metadata + training reference."""
        if version_name is None:
            version_name = datetime.now().strftime(
                '%Y%m%d_%H%M%S')

        prefix = f'{self.model_dir}/{version_name}'
        joblib.dump(self.pipeline, f'{prefix}_pipeline.joblib')
        joblib.dump(self.train_features,
                     f'{prefix}_train_ref.joblib')
        with open(f'{prefix}_meta.json', 'w') as f:
            json.dump(self.metadata, f, indent=2)

        print(f"Saved model version: {version_name}")

    def load(self, version_name):
        """Load a saved model version."""
        prefix = f'{self.model_dir}/{version_name}'
        self.pipeline = joblib.load(
            f'{prefix}_pipeline.joblib')
        self.train_features = joblib.load(
            f'{prefix}_train_ref.joblib')
        with open(f'{prefix}_meta.json') as f:
            self.metadata = json.load(f)
        print(f"Loaded model version: {version_name}")

    def predict(self, X):
        """Predict with input validation."""
        X = np.atleast_2d(X)
        if np.any(np.isnan(X)) or np.any(np.isinf(X)):
            raise ValueError("Input contains NaN or Inf")
        pred = self.pipeline.predict(X)
        prob = self.pipeline.predict_proba(X)
        return pred, prob

    def check_drift(self, recent_features, threshold=0.05):
        """Check for data drift against training reference."""
        alerts = []
        for col in range(self.train_features.shape[1]):
            stat, p_val = stats.ks_2samp(
                self.train_features[:, col],
                recent_features[:, col]
            )
            if p_val < threshold:
                alerts.append(
                    f"Feature {col}: KS={stat:.3f}, "
                    f"p={p_val:.4f}"
                )
        return alerts


# Full workflow demonstration
print("=== ML System Demo ===\n")

# 1. Generate data
X, y = make_classification(
    n_samples=1000, n_features=10,
    n_informative=6, random_state=42
)

# 2. Train
system = MLSystem()
system.train(X, y)

# 3. Save
system.save('v1')

# 4. Load (simulates cold start)
system2 = MLSystem()
system2.load('v1')

# 5. Predict
test_input = np.random.randn(3, 10)
preds, probs = system2.predict(test_input)
print(f"\nPredictions: {preds}")
print(f"Max probabilities: {probs.max(axis=1).round(3)}")

# 6. Monitor for drift
drifted_data = np.random.randn(200, 10) + 0.5
alerts = system2.check_drift(drifted_data)
print(f"\nDrift check: {len(alerts)} alerts")
for alert in alerts[:3]:
    print(f"  {alert}")

The honest reality of ML in production

Here's what nobody tells you in ML courses: the model is the easy part. Data pipelines, feature engineering, monitoring, testing, versioning, and incident response consume 90% of a production ML team's time. A 2015 paper from Google titled "Hidden Technical Debt in Machine Learning Systems" draws an architecture diagram of a production ML system -- the actual ML code is a tiny rectangle in the middle of a massive infrastructure diagram. Configuration, data collection, feature extraction, analysis tools, process management, machine resource management, serving infrastructure, and monitoring: all of those boxes are larger than the ML code box.

This doesn't mean you should over-engineer from day one. Quite the opposite. Start simple: train a model, save it with joblib, serve it with FastAPI, log predictions to a file, check the logs weekly. Add complexity only when the scale demands it. Most ML systems never need Kubernetes, feature stores, or automated retraining pipelines. They need a well-trained model, a reliable API, and someone checking that it still works.

The progression looks like this:

  1. Notebook prototype -- prove the model works (episodes #1-33)
  2. Script + joblib -- save model, predict from a script
  3. API + monitoring -- FastAPI endpoint, basic drift checking
  4. Pipeline automation -- scheduled retraining, automated tests
  5. Full MLOps -- CI/CD for models, A/B testing, feature stores, monitoring dashboards

Most teams stop at step 3 and that's perfectly fine. Each step adds complexity. Each step needs justification. Don't jump to step 5 because a blog post told you to -- jump when step 3 starts breaking under the load.

Zo, wat hebben we geleerd?

We've gone from "model works in a notebook" to "model serves predictions in production." Here's the full picture:

  • The notebook-to-production gap kills most ML projects. A model isn't useful until it serves real predictions reliably. The gap is about engineering, not algorithm quality;
  • Save the entire pipeline (preprocessing + model) as one artifact with joblib. Never save the model alone. Include metadata (version, metrics, training timestamp) for your future debugging self;
  • Batch prediction is simpler and sufficient for most use cases -- only go real-time when freshness genuinely matters. Starting with batch saves you months of engineering headache;
  • FastAPI + joblib gives you a production API in under 50 lines. Input validation, concurrency, and documentation included. Add authentication and rate limiting when you grow;
  • Shared feature modules ensure training and serving compute features identically. One function, one place, zero skew. Full feature stores are for large organizations with complex feature pipelines;
  • Monitor for data drift (feature distributions shift), concept drift (the relationship between features and target changes), and prediction drift (model output distribution changes). Models always degrade -- the question is when;
  • A/B testing or shadow mode validates that offline improvements translate to production gains. ML A/B tests need more patience than UI tests because the differences are often small;
  • Start simple: save model, serve with API, log predictions, check logs. Add infrastructure only when scale demands it. Most ML systems never need full MLOps.

This episode wraps up the classical ML toolkit (well, almost -- we have one more topic to cover before the big mini-project). Over 34 episodes, we've gone from "what is ML?" to building, evaluating, combining, and now deploying sophisticated models. That's a LOT of ground covered, and every concept connects to the others in ways that keep revealing themselves as you gain experience. The data preparation from episode #14, the evaluation metrics from #13, the pipelines from #16, the feature engineering from #15 -- they all converge here in the production story.

The path forward connects everything we've done to the bigger picture: how to think about responsibility and fairness in the models we build, and then a complete mini-project that ties all these individual techniques together into one coherent ML pipeline from raw data to deployed prediction.

Bedankt en tot de volgende keer!

@scipio



0
0
0.000
0 comments