Learn AI Series (#91) - Mini Project - Building a Visual AI System

avatar

Learn AI Series (#91) - Mini Project - Building a Visual AI System

variant-a-07-purple.png

What will I learn

  • You will learn to build an end-to-end visual AI pipeline combining detection, OCR, embeddings, and captioning;
  • connecting object detection, text extraction, visual similarity search, and image captioning into one unified system;
  • model optimization strategies for practical deployment: concurrency, batching, and model selection tradeoffs;
  • building a visual search API with FastAPI;
  • performance benchmarking with percentile latencies instead of misleading averages;
  • tying together all Arc 5 concepts into a single working application.

Requirements

  • A working modern computer running macOS, Windows or Ubuntu;
  • An installed Python 3(.11+) distribution;
  • The ambition to learn AI and machine learning.

Difficulty

  • Beginner

Curriculum (of the Learn AI Series):

Learn AI Series (#91) - Mini Project - Building a Visual AI System

Solutions to Episode #90 Exercises

Exercise 1: Contrastive learning augmentation analyzer.

import numpy as np
from scipy.ndimage import gaussian_filter


class AugmentationAnalyzer:
    """Analyze overlap between augmented views
    for contrastive learning."""

    def __init__(self, size=64, seed=42):
        self.size = size
        self.rng = np.random.RandomState(seed)
        self.original = self._make_image()

    def _make_image(self):
        img = np.full(
            (self.size, self.size, 3),
            128, dtype=np.float64)
        y, x = np.ogrid[:self.size, :self.size]

        # Red circle top-left
        mask = ((x - 16) ** 2
                + (y - 16) ** 2) < 10 ** 2
        img[mask] = [220, 40, 40]

        # Blue rectangle bottom-right
        img[40:58, 38:58] = [40, 40, 220]

        # Green diagonal stripe
        for i in range(self.size):
            lo = max(0, i - 3)
            hi = min(self.size, i + 4)
            img[lo:hi, i] = [40, 200, 40]
        return img

    def aug_crop(self, img):
        s = self.size
        y0 = self.rng.randint(0, s - 48)
        x0 = self.rng.randint(0, s - 48)
        crop = img[y0:y0 + 48, x0:x0 + 48]
        # Nearest-neighbor resize to 64x64
        out = np.zeros_like(img)
        for r in range(s):
            for c in range(s):
                sr = int(r * 48 / s)
                sc = int(c * 48 / s)
                out[r, c] = crop[sr, sc]
        return out

    def aug_flip(self, img):
        return img[:, ::-1].copy()

    def aug_brightness(self, img):
        factor = 1.0 + self.rng.uniform(
            -0.2, 0.2)
        return np.clip(img * factor, 0, 255)

    def aug_blur(self, img):
        out = np.zeros_like(img)
        for c in range(3):
            out[:, :, c] = gaussian_filter(
                img[:, :, c], sigma=2.0)
        return out

    def aug_grayscale(self, img):
        gray = img.mean(axis=2, keepdims=True)
        return np.repeat(gray, 3, axis=2)

    def overlap(self, a, b):
        af = a.flatten()
        bf = b.flatten()
        if af.std() < 1e-8 or bf.std() < 1e-8:
            return 0.0
        return float(np.corrcoef(af, bf)[0, 1])

    def run(self):
        augs = {
            "crop": self.aug_crop,
            "flip": self.aug_flip,
            "bright": self.aug_brightness,
            "blur": self.aug_blur,
            "gray": self.aug_grayscale,
        }
        names = list(augs.keys())
        n = len(names)
        trials = 20
        matrix = np.zeros((n, n))

        for t in range(trials):
            views = {}
            for name, fn in augs.items():
                views[name] = fn(
                    self.original.copy())
            for i in range(n):
                for j in range(n):
                    matrix[i, j] += self.overlap(
                        views[names[i]],
                        views[names[j]])
        matrix /= trials

        header = "       " + " ".join(
            f"{n[:5]:>6}" for n in names)
        print(header)
        for i, name in enumerate(names):
            row = f"{name[:5]:>6} " + " ".join(
                f"{matrix[i, j]:>6.3f}"
                for j in range(n))
            print(row)

        best_i, best_j = 0, 1
        worst_i, worst_j = 0, 1
        best_val = -1
        worst_val = 2
        for i in range(n):
            for j in range(i + 1, n):
                if matrix[i, j] > best_val:
                    best_val = matrix[i, j]
                    best_i, best_j = i, j
                if matrix[i, j] < worst_val:
                    worst_val = matrix[i, j]
                    worst_i, worst_j = i, j

        print(f"\nHighest overlap: "
              f"{names[best_i]}/{names[best_j]}"
              f" = {best_val:.3f}")
        print(f"Lowest overlap:  "
              f"{names[worst_i]}/"
              f"{names[worst_j]}"
              f" = {worst_val:.3f}")


analyzer = AugmentationAnalyzer()
analyzer.run()

The self-pairs all show high correlation (above 0.9 for deterministic augmentations like flip, gray, and blur, since the same operation applied to the same image is identical). Crop shows lower self-correlation because the random crop region varies between calls. The lowest overlap pairs involve grayscale combined with crop or brightness -- grayscale destroys all color information (our image's main distinguishing features are color-coded), and crop additionally changes spatial content. Brightness and blur produce the highest cross-overlap because they preserve both spatial structure and relative color relationships. This directly illustrates why SimCLR uses aggressive composition of multiple augmentations: any single augmentation leaves too much mutual information, making the contrastive task too easy for the model to learn useful representations.

Exercise 2: Momentum encoder dynamics simulator.

import numpy as np


class MomentumSimulator:
    """Simulate momentum encoder (EMA) dynamics
    for different momentum values."""

    def __init__(self, dim=100, steps=500,
                 update_std=0.1, seed=42):
        self.dim = dim
        self.steps = steps
        self.update_std = update_std
        self.rng = np.random.RandomState(seed)

    def simulate(self, momentum):
        online = np.zeros(self.dim)
        target = np.ones(self.dim)
        online_history = [online.copy()]
        distances = []
        target_vars = []

        for step in range(self.steps):
            # Gradient update (simulated)
            online = online + self.rng.randn(
                self.dim) * self.update_std
            online_history.append(online.copy())

            # EMA update
            target = (momentum * target
                      + (1 - momentum) * online)

            dist = np.sqrt(
                ((online - target) ** 2).sum())
            distances.append(dist)
            target_vars.append(target.var())

        # Estimate staleness: find which
        # historical online state is closest
        # to current target
        final_target = target
        min_dist = float('inf')
        stale_step = 0
        for i, hist in enumerate(
                online_history[-50:]):
            d = np.sqrt(
                ((hist - final_target) ** 2
                 ).sum())
            if d < min_dist:
                min_dist = d
                stale_step = (
                    self.steps - 50 + i)

        staleness = self.steps - stale_step
        return {
            "momentum": momentum,
            "avg_dist": np.mean(distances[-100:]),
            "final_dist": distances[-1],
            "avg_var": np.mean(
                target_vars[-100:]),
            "staleness": staleness,
        }

    def run(self):
        momentums = [0.9, 0.99, 0.999, 0.9999]
        print(f"{'m':>8} {'AvgDist':>9} "
              f"{'FinalDist':>10} "
              f"{'AvgVar':>9} {'Stale':>6}")
        print("-" * 46)

        for m in momentums:
            r = self.simulate(m)
            print(f"{r['momentum']:>8.4f} "
                  f"{r['avg_dist']:>9.3f} "
                  f"{r['final_dist']:>10.3f} "
                  f"{r['avg_var']:>9.5f} "
                  f"{r['staleness']:>5}s")

        print(f"\nHigher momentum = more stable "
              f"(lower variance)")
        print(f"Higher momentum = more stale "
              f"(farther behind)")
        print(f"m=0.999 balances stability "
              f"and freshness")


sim = MomentumSimulator()
sim.run()

At m=0.9, the momentum encoder tracks the online encoder closely (low distance) but its parameters fluctuate heavily (high variance) because each update contributes 10% of the current online state. At m=0.9999, the target is incredibly stable (near-zero variance) but lags dozens of steps behind the online encoder -- it's essentially seeing a heavily smoothed version of where the online encoder was many updates ago. m=0.999 (MoCo's default) hits the sweet spot: the target encoder is stable enough to provide consistent representations for the negative queue, but fresh enough that its representations aren't completely out of date. This tradeoff is exactly why the momentum coefficient matters so much in practice -- too low and the target is noisy, too high and you're comparing against stale representations.

Exercise 3: Masking strategy comparator for MAE.

import numpy as np


class MaskingComparator:
    """Compare random, block, and grid masking
    strategies for MAE-style training."""

    def __init__(self, grid_size=8, seed=42):
        self.gs = grid_size
        self.total = grid_size ** 2
        self.rng = np.random.RandomState(seed)

    def _positions(self):
        return np.array([(r, c)
            for r in range(self.gs)
            for c in range(self.gs)])

    def random_mask(self, ratio):
        n_mask = int(self.total * ratio)
        indices = self.rng.permutation(
            self.total)
        masked = set(indices[:n_mask])
        return masked

    def block_mask(self, ratio):
        n_mask = int(self.total * ratio)
        side = int(np.ceil(np.sqrt(n_mask)))
        side = min(side, self.gs)
        r0 = self.rng.randint(
            0, max(self.gs - side + 1, 1))
        c0 = self.rng.randint(
            0, max(self.gs - side + 1, 1))
        masked = set()
        for r in range(r0, min(
                r0 + side, self.gs)):
            for c in range(c0, min(
                    c0 + side, self.gs)):
                masked.add(r * self.gs + c)
                if len(masked) >= n_mask:
                    return masked
        # Fill remaining randomly
        remaining = [i for i in range(
            self.total) if i not in masked]
        self.rng.shuffle(remaining)
        for idx in remaining:
            masked.add(idx)
            if len(masked) >= n_mask:
                break
        return masked

    def grid_mask(self, ratio):
        n_mask = int(self.total * ratio)
        step = max(1, int(1 / (1 - ratio + 1e-8)))
        visible = set(range(0, self.total, step))
        all_idx = set(range(self.total))
        masked = all_idx - visible
        # Adjust to exact count
        if len(masked) < n_mask:
            extra = list(visible)
            self.rng.shuffle(extra)
            for idx in extra:
                masked.add(idx)
                if len(masked) >= n_mask:
                    break
        elif len(masked) > n_mask:
            remove = list(masked)
            self.rng.shuffle(remove)
            for idx in remove:
                masked.discard(idx)
                if len(masked) <= n_mask:
                    break
        return masked

    def analyze(self, masked_set):
        positions = self._positions()
        visible = [i for i in range(self.total)
                   if i not in masked_set]
        masked = list(masked_set)
        n_vis = len(visible)

        # Average distance between visible
        vis_pos = positions[visible]
        if n_vis > 1:
            dists = []
            for i in range(n_vis):
                for j in range(i + 1, n_vis):
                    d = np.sqrt(
                        ((vis_pos[i] - vis_pos[j])
                         ** 2).sum())
                    dists.append(d)
            avg_vis_dist = np.mean(dists)
        else:
            avg_vis_dist = 0

        # Coverage uniformity (4 quadrants)
        half = self.gs // 2
        quads = [0, 0, 0, 0]
        for idx in visible:
            r, c = positions[idx]
            q = (0 if r < half else 2) + (
                0 if c < half else 1)
            quads[q] += 1
        uniformity = np.std(quads)

        # Reconstruction difficulty
        if n_vis > 0 and len(masked) > 0:
            vis_pos_arr = positions[visible]
            recon_dists = []
            for m_idx in masked:
                mp = positions[m_idx]
                d = np.sqrt(
                    ((vis_pos_arr - mp) ** 2
                     ).sum(axis=1))
                recon_dists.append(d.min())
            recon_diff = np.mean(recon_dists)
        else:
            recon_diff = 0

        return {
            "n_visible": n_vis,
            "avg_vis_dist": avg_vis_dist,
            "uniformity": uniformity,
            "recon_diff": recon_diff,
        }

    def run(self):
        strategies = {
            "random": self.random_mask,
            "block": self.block_mask,
            "grid": self.grid_mask,
        }
        ratios = [0.25, 0.50, 0.75, 0.90]
        trials = 100

        print(f"{'Strat':>7} {'Ratio':>6} "
              f"{'Vis':>4} {'VisDist':>8} "
              f"{'Uniform':>8} {'ReconD':>7}")
        print("-" * 44)

        for name, fn in strategies.items():
            for ratio in ratios:
                totals = {"n_visible": 0,
                          "avg_vis_dist": 0,
                          "uniformity": 0,
                          "recon_diff": 0}
                for _ in range(trials):
                    m = fn(ratio)
                    r = self.analyze(m)
                    for k in totals:
                        totals[k] += r[k]
                for k in totals:
                    totals[k] /= trials

                print(
                    f"{name:>7} {ratio:>6.2f} "
                    f"{totals['n_visible']:>4.0f} "
                    f"{totals['avg_vis_dist']:>8.2f}"
                    f" {totals['uniformity']:>8.2f}"
                    f" {totals['recon_diff']:>7.2f}"
                )
            print()


comp = MaskingComparator()
comp.run()

Random masking distributes visible patches evenly across the grid (low uniformity score, meaning the quadrant counts are balanced), which is why MAE uses it -- the model sees information from everywhere and has to reconstruct local structure. Block masking creates regions where many neighboring patches are all masked simultaneously, forcing the model to hallucinate large contiguous areas (high reconstruction difficulty locally), but leaves other parts of the image fully visible (poor coverage uniformity). Grid masking produces perfect uniformity (exactly the same number of visible patches in each quadrant) but trivially low reconstruction difficulty because every masked patch has a visible neighbor at a fixed, predictable distance -- the model can learn a simple interpolation shortcut instead of understanding semantics. At 75% masking, random is the clear winner for training a useful encoder, which is why Kaiming He's original MAE paper landed on exactly this combination.

On to today's episode

Here we go! Ninety-one episodes in, and it's time for the fifth mini project in this series. We've done these before -- predicting crypto market regimes (#21), building a complete ML pipeline (#36), building a transformer from scratch (#56), and building your own AI assistant (#76). Each one pulled together everything from its preceding arc. This one does the same for Arc 5: computer vision.

Over the past fourteen episodes (#77-90) we covered a LOT of ground. Image processing fundamentals, object detection (two parts!), segmentation, pose estimation, OCR, video understanding, diffusion models (two parts!), image editing, 3D vision, face analysis, medical imaging, and self-supervised learning. Each episode focused on one technique in isolation. But real visual AI systems don't use one model -- they combine multiple models into a pipeline that extracts complementary information from the same image.

That's what we're building today. A visual content analysis API that takes an image and returns: every object it detects, all the text it can read, a set of visually similar images from a database, and a natural language description of the scene. It's the kind of system that powers reverse image search, content moderation, visual commerce, and photo organization -- and we're building it from components we've already studied ;-)

The system architecture

Before writing any code, let's map out what we're building. Four independent models feed into one structured output:

Image input
    |
    v
+-----------------------------------+
|  1. Object Detection (YOLO)       | --> bounding boxes + class labels
|  2. OCR (text extraction)         | --> recognized text + positions
|  3. Visual Embedding (DINOv2)     | --> 384-dim feature vector
|  4. Image Captioning (BLIP)       | --> natural language description
+-----------------------------------+
    |
    v
Structured JSON response
  + similarity search results from FAISS index

Each component handles a different aspect of visual understanding. YOLO (episode #79) finds objects. OCR (episode #82) reads text. DINOv2 (episode #90) produces embeddings for similarity search. BLIP (episode #75) generates descriptions. Together, they extract far more than any single model could.

The key architectural insight: these four models are independent. Detection doesn't depend on OCR results. Captioning doesn't need the embedding. This independence means we can run them concurrently -- a big deal for latency.

Component 1: object detection with YOLO

YOLO gives us fast, accurate object detection. We wrap it in a clean class that returns structured results:

from ultralytics import YOLO
import numpy as np


class ObjectDetector:
    """Detect objects using YOLOv8.
    Returns list of dicts with label,
    confidence, and bounding box."""

    def __init__(self, model_name="yolov8n.pt"):
        self.model = YOLO(model_name)

    def detect(self, image_path,
               confidence=0.4):
        results = self.model(
            image_path, conf=confidence,
            verbose=False)
        detections = []
        for r in results:
            for box in r.boxes:
                detections.append({
                    "label": r.names[
                        int(box.cls)],
                    "confidence": float(
                        box.conf),
                    "bbox": box.xyxy[0].tolist(),
                })
        return detections


detector = ObjectDetector()
objects = detector.detect("street_scene.jpg")
for obj in objects:
    print(f"  {obj['label']}: "
          f"{obj['confidence']:.2f} at "
          f"{[int(x) for x in obj['bbox']]}")

We use YOLOv8 nano for speed. For higher accuracy at the cost of latency, swap yolov8n.pt for yolov8m.pt (medium) or yolov8l.pt (large) -- same API, just a different checkpoint file. The nano model detects 80 COCO classes (people, cars, dogs, chairs, bottles, the usual household and street objects) in single-digit milliseconds on a GPU. That's fast enough for real-time processing.

Component 2: text extraction

OCR (episode #82) handles text detection and recognition. PaddleOCR is our weapon of choice because it handles detection and recognition in one call:

from paddleocr import PaddleOCR


class TextExtractor:
    """Extract text from images using
    PaddleOCR. Returns text strings with
    confidence scores and positions."""

    def __init__(self, lang="en"):
        self.ocr = PaddleOCR(
            use_angle_cls=True,
            lang=lang, show_log=False)

    def extract(self, image_path):
        results = self.ocr.ocr(
            image_path, cls=True)
        texts = []
        if results and results[0]:
            for line in results[0]:
                texts.append({
                    "text": line[1][0],
                    "confidence": float(
                        line[1][1]),
                    "bbox": line[0],
                })
        return texts


ocr = TextExtractor()
texts = ocr.extract("sign_photo.jpg")
for t in texts:
    print(f"  '{t['text']}' "
          f"(conf: {t['confidence']:.3f})")

PaddleOCR's use_angle_cls=True handles rotated text -- useful for photos of signs, documents, or product labels where text isn't always horizontal. The bounding box it returns is a 4-point polygon (not a simple rectangle), which more accurately represents the text region when the text is at an angle.

Component 3: visual embedding and similarity search

This is where self-supervised learning from last episode becomes practical. DINOv2 produces powerful visual embeddings without any fine-tuning, and FAISS (we used it in episode #63 for text embeddings) enables efficient similarity search:

import torch
import faiss
import numpy as np
from torchvision import transforms
from PIL import Image


class VisualSearchEngine:
    """Visual similarity search using DINOv2
    embeddings and FAISS indexing."""

    def __init__(self, embed_dim=384):
        self.model = torch.hub.load(
            "facebookresearch/dinov2",
            "dinov2_vits14")
        self.model.eval()
        self.transform = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]),
        ])
        # Inner product on normalized vectors
        # = cosine similarity
        self.index = faiss.IndexFlatIP(
            embed_dim)
        self.metadata = []

    def embed(self, image):
        tensor = self.transform(
            image).unsqueeze(0)
        with torch.no_grad():
            features = self.model(tensor)
        features = features / features.norm(
            dim=1, keepdim=True)
        return features.numpy()

    def add_to_index(self, image, meta):
        embedding = self.embed(image)
        self.index.add(
            embedding.astype(np.float32))
        self.metadata.append(meta)

    def search(self, query_image, k=5):
        query = self.embed(
            query_image).astype(np.float32)
        scores, indices = self.index.search(
            query, k)
        results = []
        for score, idx in zip(
                scores[0], indices[0]):
            if idx < len(self.metadata):
                results.append({
                    "metadata": self.metadata[
                        idx],
                    "similarity": float(score),
                })
        return results

The IndexFlatIP computes exact inner product (which equals cosine similarity for L2-normalized vectors). For a database of thousands of images, this is fast enough. For millions, you'd swap to IndexIVFFlat or IndexHNSW for approximate nearest neighbor search -- a tradeoff between search speed and recall that we covered in episode #63.

Why DINOv2 specifically? Because it was trained with self-supervised learning on 142 million curated images (the LVD-142M dataset) and produces representations that generalize to virtually any visual domain without fine-tuning. A DINOv2 embedding captures what is in the image at a semantic level -- it knows that two photos of dogs in different poses are similar, even though their pixel values are completely different. That's the power of the self-supervised pre-training we just studied.

Component 4: image captioning

A vision-language model produces a natural language description. BLIP (Bootstrapped Language-Image Pre-training) is a solid choice -- lightweight enough for CPU inference, accurate enough for production:

from transformers import BlipProcessor
from transformers import (
    BlipForConditionalGeneration)


class ImageCaptioner:
    """Generate natural language descriptions
    of images using BLIP."""

    def __init__(self):
        name = ("Salesforce/"
                "blip-image-captioning-base")
        self.processor = (
            BlipProcessor.from_pretrained(name))
        self.model = (
            BlipForConditionalGeneration
            .from_pretrained(name))
        self.model.eval()

    def caption(self, image):
        inputs = self.processor(
            image, return_tensors="pt")
        with torch.no_grad():
            output = self.model.generate(
                **inputs, max_length=50)
        return self.processor.decode(
            output[0],
            skip_special_tokens=True)


captioner = ImageCaptioner()
img = Image.open("beach.jpg")
description = captioner.caption(img)
print(f"Caption: {description}")

BLIP generates captions like "a dog running on a sandy beach with waves in the background" -- not poetry, but structured and accurate. For more detailed descriptions, you could swap to BLIP-2 or CoCa, which produce longer and more nuanced text at the cost of higher latency and memory.

Combining everything: the analysis pipeline

Now we wire the four components into a single pipeline class. Each image goes through all four models, and we time every stage:

from PIL import Image
import time


class VisualAnalysisPipeline:
    """Full visual analysis pipeline:
    detection + OCR + embedding search
    + captioning."""

    def __init__(self):
        print("Loading models...")
        t0 = time.time()
        self.detector = ObjectDetector()
        self.ocr = TextExtractor()
        self.search = VisualSearchEngine()
        self.captioner = ImageCaptioner()
        print(f"All models loaded in "
              f"{time.time() - t0:.1f}s")

    def analyze(self, image_path):
        image = Image.open(
            image_path).convert("RGB")
        result = {
            "image": image_path,
            "timings": {},
        }

        # Object detection
        t0 = time.time()
        result["objects"] = (
            self.detector.detect(image_path))
        result["timings"]["detection"] = (
            time.time() - t0)

        # Text extraction
        t0 = time.time()
        result["text"] = (
            self.ocr.extract(image_path))
        result["timings"]["ocr"] = (
            time.time() - t0)

        # Similarity search
        t0 = time.time()
        result["similar"] = (
            self.search.search(image, k=5))
        result["timings"]["search"] = (
            time.time() - t0)

        # Captioning
        t0 = time.time()
        result["caption"] = (
            self.captioner.caption(image))
        result["timings"]["captioning"] = (
            time.time() - t0)

        result["timings"]["total"] = sum(
            result["timings"].values())
        return result


pipeline = VisualAnalysisPipeline()
analysis = pipeline.analyze("sample.jpg")

print(f"\nCaption: {analysis['caption']}")
print(f"Objects: {[o['label'] for o in analysis['objects']]}")
print(f"Text: {[t['text'] for t in analysis['text']]}")
print(f"Similar: {len(analysis['similar'])} results")
print(f"\nTimings:")
for comp, t in analysis["timings"].items():
    print(f"  {comp:12s}: {t * 1000:.1f}ms")

The model loading happens once at startup (takes a few seconds for all four models). After that, each analyze() call only runs inference -- much faster. This is a critical pattern for any ML serving system: load models once, serve many requests.

Serving as an API

FastAPI (we touched on deployment in episode #34) wraps the pipeline in HTTP endpoints:

from fastapi import FastAPI, UploadFile
from fastapi.responses import JSONResponse
import tempfile
import os

app = FastAPI(
    title="Visual AI Analysis API")
pipeline = VisualAnalysisPipeline()


@app.post("/analyze")
async def analyze_image(file: UploadFile):
    """Analyze an uploaded image through
    the full pipeline."""
    suffix = os.path.splitext(
        file.filename)[1]
    with tempfile.NamedTemporaryFile(
            suffix=suffix,
            delete=False) as tmp:
        content = await file.read()
        tmp.write(content)
        tmp_path = tmp.name

    try:
        result = pipeline.analyze(tmp_path)
        return JSONResponse(content=result)
    finally:
        os.unlink(tmp_path)


@app.post("/index")
async def index_image(file: UploadFile,
                      image_id: str = ""):
    """Add an image to the similarity
    search index."""
    with tempfile.NamedTemporaryFile(
            delete=False) as tmp:
        content = await file.read()
        tmp.write(content)
        image = Image.open(tmp.name)
        pipeline.search.add_to_index(
            image,
            {"id": image_id,
             "filename": file.filename})
        os.unlink(tmp.name)
    return {
        "status": "indexed",
        "total_images": len(
            pipeline.search.metadata),
    }

# Run with:
# uvicorn visual_api:app --host 0.0.0.0 --port 8000

Two endpoints. /analyze takes an image and returns the full analysis result. /index adds an image to the FAISS similarity search index so future /analyze calls can find similar images. Simple, clean, functional.

You'd test it with curl:

# Analyze an image
curl -X POST http://localhost:8000/analyze \
  -F "[email protected]"

# Add to search index
curl -X POST http://localhost:8000/index \
  -F "[email protected]" \
  -F "image_id=ref001"

Performance optimization

The naive pipeline runs each model sequentially. For production, several optimizations matter -- and the most impactful one is concurrent execution.

Detection and OCR are independent. Captioning doesn't need the embedding result. So why run them one after another? We can run all four concurrently and wait for the slowest one:

import asyncio
from concurrent.futures import (
    ThreadPoolExecutor)

executor = ThreadPoolExecutor(max_workers=4)


async def analyze_concurrent(pipeline,
                              image_path):
    """Run all pipeline components
    concurrently instead of sequentially."""
    loop = asyncio.get_event_loop()
    image = Image.open(
        image_path).convert("RGB")

    # All four tasks are independent --
    # run them in parallel
    det_future = loop.run_in_executor(
        executor,
        pipeline.detector.detect,
        image_path)
    ocr_future = loop.run_in_executor(
        executor,
        pipeline.ocr.extract,
        image_path)
    cap_future = loop.run_in_executor(
        executor,
        pipeline.captioner.caption,
        image)
    search_future = loop.run_in_executor(
        executor,
        pipeline.search.search,
        image, 5)

    objects, texts, caption, similar = (
        await asyncio.gather(
            det_future, ocr_future,
            cap_future, search_future))

    return {
        "objects": objects,
        "text": texts,
        "caption": caption,
        "similar": similar,
    }

Sequential latency = detection + OCR + search + captioning. Concurrent latency = max(detection, OCR, search, captioning). If each component takes roughly 50ms, that's 200ms sequential vs ~50ms concurrent. A 4x improvement from pure parallelism, no model changes needed.

Having said that, there's a caveat with GPU-based models: if all four models run on the same GPU, concurrent execution doesn't help because the GPU can only process one model at a time anyway (the requests queue up on the GPU). Concurrency helps most when models run on CPU, or when you have multiple GPUs, or when some models are I/O-bound (like FAISS search, which is primarily memory-bound).

Model selection is your other big lever. The table below shows the tradeoffs for YOLO specifically, but the same principle applies to every component:

import numpy as np


class ModelSelectionGuide:
    """Compare model variants for the
    speed/accuracy tradeoff."""

    def __init__(self):
        # Approximate values from YOLO docs
        self.variants = {
            "YOLOv8n": {
                "params_m": 3.2,
                "mAP50": 37.3,
                "latency_ms": 1.2,
            },
            "YOLOv8s": {
                "params_m": 11.2,
                "mAP50": 44.9,
                "latency_ms": 2.3,
            },
            "YOLOv8m": {
                "params_m": 25.9,
                "mAP50": 50.2,
                "latency_ms": 5.5,
            },
            "YOLOv8l": {
                "params_m": 43.7,
                "mAP50": 52.9,
                "latency_ms": 8.7,
            },
            "YOLOv8x": {
                "params_m": 68.2,
                "mAP50": 53.9,
                "latency_ms": 14.2,
            },
        }

    def compare(self):
        print(f"{'Model':>10} {'Params':>8} "
              f"{'mAP50':>7} {'Latency':>9} "
              f"{'mAP/ms':>8}")
        print("-" * 46)
        for name, v in self.variants.items():
            efficiency = (
                v["mAP50"] / v["latency_ms"])
            print(f"{name:>10} "
                  f"{v['params_m']:>7.1f}M "
                  f"{v['mAP50']:>7.1f} "
                  f"{v['latency_ms']:>8.1f}ms "
                  f"{efficiency:>8.2f}")

        print(f"\nBest accuracy: YOLOv8x")
        print(f"Best speed: YOLOv8n")
        print(f"Best efficiency: YOLOv8n "
              f"(highest mAP per ms)")


guide = ModelSelectionGuide()
guide.compare()

For a content indexing pipeline where you process images in bulk overnight, use the large model -- accuracy matters, latency doesn't. For a real-time API that needs to respond within 100ms, use nano. For most situations, small or medium gives you a good balance. Profile YOUR specific use case on YOUR hardware before deciding.

Benchmarking: measuring what matters

When you deploy a model, the number you care about is NOT average latency. It's the tail latency -- specifically the p95 and p99 percentiles. Here's why: if your average latency is 50ms but 1% of requests take 2 seconds (because of garbage collection, model warmup, CPU throttling, or other gremlins), your users experience those 2-second delays as the system "hanging."

import numpy as np
import time


class PipelineBenchmark:
    """Benchmark the visual analysis pipeline
    with proper percentile reporting."""

    def __init__(self):
        self.timings = {
            "detection": [],
            "ocr": [],
            "search": [],
            "captioning": [],
            "total": [],
        }

    def simulate_run(self, rng):
        """Simulate component latencies with
        realistic distributions: mostly fast,
        occasional spikes."""
        timings = {}
        # Base latencies in ms (GPU)
        bases = {
            "detection": 8,
            "ocr": 45,
            "search": 2,
            "captioning": 35,
        }
        for comp, base in bases.items():
            # Log-normal: right-skewed with
            # occasional spikes
            t = rng.lognormal(
                np.log(base), 0.3)
            # 2% chance of a major spike
            if rng.random() < 0.02:
                t *= rng.uniform(3, 10)
            timings[comp] = t
            self.timings[comp].append(t)

        total = sum(timings.values())
        self.timings["total"].append(total)
        return timings

    def run(self, n_requests=1000):
        rng = np.random.RandomState(42)
        for _ in range(n_requests):
            self.simulate_run(rng)

        print(f"Benchmarked {n_requests} "
              f"requests\n")
        print(f"{'Component':>12} {'p50':>8} "
              f"{'p95':>8} {'p99':>8} "
              f"{'max':>8}")
        print("-" * 40)

        for comp in self.timings:
            times = self.timings[comp]
            p50 = np.percentile(times, 50)
            p95 = np.percentile(times, 95)
            p99 = np.percentile(times, 99)
            mx = max(times)
            print(f"{comp:>12} {p50:>7.1f}ms "
                  f"{p95:>7.1f}ms "
                  f"{p99:>7.1f}ms "
                  f"{mx:>7.1f}ms")

        print(f"\nOCR dominates latency "
              f"(highest p50)")
        print(f"Tail spikes (p99 vs p50) "
              f"show why averages lie")


bench = PipelineBenchmark()
bench.run()

Notice how the p99 latency can be 3-5x the p50 (median) for each component. When you chain four components sequentially, those spikes compound -- the total p99 is worse than the sum of individual p50s. This is exactly why concurrent execution matters: if one component spikes but the others don't, concurrency absorbs that spike (it only costs time equal to the single slowest component, not the sum). Report p50, p95, and p99 in any latency benchmark. Anyone who reports only the average (mean) is hiding the tail.

Building the search index

The similarity search component needs images in its index before it can return results. Here's how you'd build an index from a directory of images:

import os
from pathlib import Path


class IndexBuilder:
    """Build a FAISS index from a directory
    of images."""

    def __init__(self, search_engine):
        self.engine = search_engine

    def build_from_directory(self, image_dir,
                              extensions=None):
        if extensions is None:
            extensions = {
                ".jpg", ".jpeg", ".png",
                ".webp", ".bmp"}
        image_dir = Path(image_dir)
        indexed = 0
        failed = 0

        for path in sorted(
                image_dir.iterdir()):
            if path.suffix.lower() in extensions:
                try:
                    img = Image.open(
                        path).convert("RGB")
                    self.engine.add_to_index(
                        img,
                        {"path": str(path),
                         "name": path.name})
                    indexed += 1
                except Exception as e:
                    print(f"Failed: {path.name}"
                          f" ({e})")
                    failed += 1

        print(f"Indexed {indexed} images "
              f"({failed} failed)")
        print(f"Index size: "
              f"{self.engine.index.ntotal}")
        return indexed


# Usage:
# builder = IndexBuilder(pipeline.search)
# builder.build_from_directory("./photos/")

For a production system, you'd persist the FAISS index to disk (faiss.write_index()) and load it at startup instead of rebuilding from scratch every time. You'd also store the metadata in a proper database (SQLite, PostgreSQL) rather than an in-memory list. But for a prototype, the in-memory approach works fine up to tens of thousands of images.

Putting it all together: the complete application

Let's trace through what happens when a user uploads an image to our API:

  1. The image hits the /analyze endpoint
  2. FastAPI saves it to a temp file
  3. The pipeline runs all four models (sequentially or concurrently)
  4. Detection returns: "person (0.92), car (0.87), traffic_light (0.71)"
  5. OCR returns: "STOP" (from a sign), "Main St" (from a street sign)
  6. Embedding search returns: 5 most similar images from the index, with similarity scores
  7. Captioning returns: "a person walking across a street near a red traffic light"
  8. Everything gets packaged into a JSON response and sent back

The total latency depends on your hardware: on a modern GPU, the whole pipeline runs in under 100ms. On CPU only, expect 500ms-2s depending on model sizes. Either way, the response contains rich structured information that would take a human minutes to compile.

What this system can NOT do

I want to be honest about limitations, because knowing what a system can't do is as important as knowing what it can:

  • YOLO detects 80 categories. It won't find "electrical outlet" or "specific car model" -- just generic "car." Fine-tuning or a different detection model is needed for domain-specific objects.
  • OCR fails on handwriting and artistic fonts. It's reliable for printed text, street signs, and product labels. Messy handwriting, heavily styled fonts, or text embedded in complex backgrounds will produce garbage.
  • Similarity search only works for images in the index. If someone uploads a photo of a rare flower and your index contains only street scenes, the "similar images" will be irrelevant but will still have high-sounding similarity scores (because cosine similarity is relative, not absolute).
  • BLIP captions are generic. "A dog sitting on a couch" is accurate but not insightful. It won't tell you the dog breed, the couch material, or whether the scene looks messy or tidy. For richer descriptions, you'd need a larger VLM.
  • None of these models understand context. The system doesn't know that a stop sign near a school zone has different implications than a stop sign on a rural highway. It processes pixels, not meaning.

These are NOT failures -- they're the natural boundaries of the current components. A real production system would be iteratively improved based on which limitations matter most for the specific use case.

Samengevat

  • A visual AI system combines multiple specialized models (detection, OCR, embedding, captioning) into a single pipeline that extracts complementary information from images;
  • each component handles a different modality of visual understanding: YOLO finds objects (#79), PaddleOCR reads text (#82), DINOv2 produces embeddings for similarity search (#90), and BLIP generates descriptions (#75);
  • FAISS enables efficient similarity search over visual embeddings, using the same inner-product indexing we built for text in episode #63;
  • FastAPI wraps the pipeline in HTTP endpoints for serving as a microservice, with separate /analyze and /index routes;
  • concurrent execution of independent model calls reduces total latency from the sum to the maximum of component latencies -- a potentially 4x improvement with no model changes;
  • model selection (nano vs medium vs large) is the primary accuracy/speed tradeoff; profile your specific hardware and use case to choose;
  • benchmark with percentile latencies (p50, p95, p99), never averages -- tail latencies reveal the real user experience and compound across sequential pipeline stages;
  • knowing what the system cannot do (limited object categories, OCR failures on handwriting, generic captions) is as important as knowing what it can.

That wraps up Arc 5 -- the Computer Vision Deep Dive. From raw pixels (#77) through detection, segmentation, generation, 3D reconstruction, face analysis, medical imaging, self-supervised learning, and now a complete working system that ties it all together. Arc 6 takes us into a completely different sensory modality: sound. We'll start from the fundamentals of digital audio and work our way up to speech recognition, music generation, and audio-visual models. A whole new world of signal processing awaits.

De groeten!

@scipio



0
0
0.000
4 comments
avatar

Loving this, keep up the good work :)
!HBIT !PIZZA

0
0
0.000
avatar

urielkinnear, you mined 0.9 🟧 HBIT and the user you replied to (scipio) received 0.1 HBIT on your behalf as a tip. You can receive 100% of the HBIT by replying to one of your own posts or comments. When you mine HBIT, you're also playing the Wusang: Isle of Blaq game. 🏴‍☠️ | tools | wallet | discord | community | daily <><

What's more, you found 1.0 ⚪ BLAQ pearl as a bonus treasure token!


Your random number was 0.10351423483892808, also viewable in the Discord server, #hbit-wusang-log channel. Check for bonus treasure tokens by entering your username at block explorer A, explorer B, or take a look at your wallet.

There is a treasure chest of bitcoin sats hidden in Wusang: Isle of Blaq. Happy treasure hunting! 😃 Read about Hivebits (HBIT) or read the story of Wusang: Isle of Blaq.

0
0
0.000
avatar

Wow love this I can't read these language but good work keep it up 👏💯

0
0
0.000