惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

宝玉的分享
宝玉的分享
The GitHub Blog
The GitHub Blog
Vercel News
Vercel News
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
酷 壳 – CoolShell
酷 壳 – CoolShell
Last Week in AI
Last Week in AI
F
Fortinet All Blogs
Jina AI
Jina AI
I
InfoQ
T
The Blog of Author Tim Ferriss
P
Proofpoint News Feed
博客园 - 三生石上(FineUI控件)
G
Google Developers Blog
V
Visual Studio Blog
L
LangChain Blog
WordPress大学
WordPress大学
K
KPMG report finds enterprise disconnect between AI and its ROI | CIO
T
Tor Project blog
GbyAI
GbyAI
MongoDB | Blog
MongoDB | Blog
V
V2EX
Stack Overflow Blog
Stack Overflow Blog
H
Help Net Security
Recorded Future
Recorded Future
N
News and Events Feed by Topic
云风的 BLOG
云风的 BLOG
Martin Fowler
Martin Fowler
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
罗磊的独立博客
O
OpenAI News
Google DeepMind News
Google DeepMind News
S
Schneier on Security
C
Check Point Blog
N
Netflix TechBlog - Medium
The Register - Security
The Register - Security
aimingoo的专栏
aimingoo的专栏
TaoSecurity Blog
TaoSecurity Blog
T
Tenable Blog
H
Hackread – Cybersecurity News, Data Breaches, AI and More
Hugging Face - Blog
Hugging Face - Blog
Cyberwarzone
Cyberwarzone
月光博客
月光博客
The Last Watchdog
The Last Watchdog
B
Blog
有赞技术团队
有赞技术团队
Blog — PlanetScale
Blog — PlanetScale
T
Tailwind CSS Blog
Hacker News: Ask HN
Hacker News: Ask HN
H
Heimdal Security Blog
美团技术团队

DEV Community

Authentication Security Deep Dive: From Brute Force to Salted Hashing (With Java Examples) Why AI Systems Don’t Fail — They Drift Spilling beans for how i learn for exam😁"Reinforcement Learning Cheat Sheet" I Replaced Chrome with Safari for AI Browser Automation. Here's What Broke (and What Finally Worked) How Python Borrows Other People's Work The $40 Architecture: Processing 1 Billion API Requests with 99.99% Uptime Vibe Coding: A Workflow Guide (From Zero to SaaS) Most webhook security guides protect the wrong side. The scary part is delivery. Headless CMS for TanStack Start: Build a Blog with Cosmic EU Age Verification App "Hacked in 2 Minutes" — What Actually Happened Comfy Cloud’s delete function does not actually remove files Running AI Models on GPU Cloud Servers: A Beginner Guide Event-driven media intelligence with AWS Step Functions and Bedrock I scored 500 AI prompts across 8 quality dimensions — here's what broke How to Call Google Gemini API from Next.js (Free Tier, No Backend Needed) The Portal Protocol: Reclaiming Human Connection in the Age of AI How to Fix Your Team's Scattered Knowledge Problem With a Self-Hosted Forum Intro to tc Cloud Functors: A Graph-First Mental Model for the Modern Cloud Designing Multi-Tenant Backends With Both Ownership and Team Access I Built a Neumorphic CSS Library with 77+ Components — Here's What I Learned PostgreSQL Performance Optimization: Why Connection Pooling Is Critical at Scale Cómo construí un SaaS multi-rubro para gestionar expensas en Argentina con FastAPI + Vue 3 🚀 I Built an Ethical Hacking Scanner Tool – Open Source Project I Replaced /usage and /context in Claude Code With a Single Statusline A Pythonic Way to Handle Emails (IMAP/SMTP) with Auto-Discovery and AI-Ready Design I Collected 8.9 Million Polymarket Price Points — Here's What I Found About How Markets Really Move EcoTrack AI — Carbon Footprint Tracker & Dashboard Everyone's Using AI. No One Agrees How. 5 self-hosted ebook managers worth trying in 2026 Building Your First AI Agent with LangChain: From Chatbot to Autonomous Assistant Common SOC 2 Failures (Real World) Stop Vibe-Checking Your AI App: A Practical Guide to Evals How to Use SonarQube and SonarScanner Locally to Level Up Your Code Quality Your Next To-Do App Is Dead — I Replaced Mine with an OpenClaw AI Sign a Nostr event in 60 lines of Python using coincurve — no nostr-sdk, no nbxplorer, no rust toolchain ITGC Audit Explained Like You’re in Big 4 Patch Tuesday abril 2026: Microsoft parcha 163 vulnerabilidades y un zero-day en SharePoint Stop scraping everything: a better way to track competitor price changes Listing on MCPize + the Official MCP Registry while routing payments OUTSIDE the marketplace — how I kept 100% of my x402 revenue Building an AI-Powered Risk Intelligence System Using Serverless Architecture Why We Ripped Function Overloading Out of Our AI Toolchain Testing AI-Generated Code: How to Actually Know If It Works SaaS Churn Is Killing Your Business. Here Is What to Do About It (Without a Support Team) The Speed of AI Is No Longer Linear - And Self-Improving Models Are Why How to Implement RBAC for MCP Tools: A Practical Guide for Engineering Teams From Standard Quote to Persuasive Proposal: AI Automation for Arborists I built a CLI that scaffolds complete multi-tenant SaaS apps Axios CVE-2025–62718: The Silent SSRF Bug That Could Be Hiding in Your Node.js App Right Now The dashboard that ended our friendship Data Pipelines Explained Simply (and How to Build Them with Python) The Hidden Cost of AI Systems Nobody Talks About. undefined vs undeclared, and how typeof behaves Switching from file-based jobs to NATS/Kafka in Rust without changing code io_uring Adventures: Rust Servers That Love Syscalls Why Agentic AI is Killing the Traditional Database The POUR principles of web accessibility for developers and designers Quantum Neural Network 3D — A Deep Dive into Interactive WebGL Visualization How To Install Caveman In Codex On macOS And Windows Automation Pipeline Reliability: Why Your Workflow Breaks When Nobody Is Watching I Built an 'Open World' AI Coding Agent — It Works From ANY Folder From Freelancing to Product: A Tech Service Company's SaaS Transformation China's AI Giants: Adding Tencent Hunyuan & ByteDance Doubao to AI University (74 Providers) On the Vibe Coders and Their Lies clerk: Auto-Summarize Your Claude Code Sessions AI Weekly — 2026/04/10–04/17 | The Model Lockdown Is Here, but the Toolchain Is the Real Battleground AI 週報 — 2026/04/10–2026/04/17 模型封鎖潮來了,但工具鏈才是真戰場 Maybe this is how Open-Source apps are born... 🚀 Fine-Tune LLMs with LoRA and QLoRA: 2026 Guide tRPC v11 + Next.js App Router: End-to-End Type Safety Without the Boilerplate ShadCN UI in 2026: Why I Stopped Installing Component Libraries and Started Owning My Components SaaS Billing in React Server Components: Stripe + Supabase Without a Single `useEffect` Join our DEV Weekend Challenge — $1,000 in Prizes Across TEN winners! Submissions Due April 20 at 6:59 AM UTC. Implementing FSRS Spaced Repetition in Flutter + Supabase — Adding Memory Science to an AI Learning App "I Texted My Localhost From the Train — Claude Code Fixed the Bug Before I Got Home" I Built a Sales Prep AI and It Went Deeper Than Expected Design to Code #2: One JSON, Eleven Outputs Solving the 100M-Row Problem: A Summary Table Pattern for High-Volume Push Notification Logs Flutter Web With Wasm: What Actually Changes For Developers I Built 50 Royalty-Free Soundtracks for My Side Project in a Weekend Using AI Music Generation The Vibe Coding Security Checklist: 7 Things to Check Before You Ship Stop Letting Googlebot Guess Fix Your React App's SEO Right Desconstruindo o Streaming do LinkedIn: Como Criar um Engine de Extração de Vídeo de Alta Performance com HLS e FFmpeg (EDA Part-1) EDA (Exploratory Data Analysis) Explained With Real Life — Why Looking at Your Data Is the Most Important Step in Machine Learning Brand Relationship Management at Scale: Our 4-Touch Outreach System for 200+ Brands Why String.fromEnvironment() Might Return an Empty String in Dart JGuardrails 1.0.0 — Hardening Java LLM Apps Against Jailbreaks, Toxicity, and Prompt Injection Plan and Schedule a Full Week of Threads Content From One Claude Conversation Coding Cat Oran Ep3, Five Tables Changed Everything Updated: BFF Pattern I'm done watching freelancers get buried by 200 proposals. So I'm building the alternative. This is my first post BFS Algorithm in Java Step by Step Tutorial with Examples Tracking LLM Pricing Monthly: An Open Dataset for 22 AI Models How We Measure Content ROI on a Comparison Site: Revenue Attribution Without Perfect Data Introducing Nova AI Ops: The AI-Native Operating System for SRE Teams I built a free desktop video downloader for Windows — Grabbit How Talkie OCR Helps Vision-Impaired & Dyslexic Users Read the World Around Them VRCFaceTracking安装和iPhone面捕配置教程,有bug Even CrowdStrike Can't See Your Agents The Automation Gold Rush: What n8n Workflows and Claude Are Opening Up for Developers Right Now
FastAPI for Data Engineers: Building, Testing, and Debugging APIs That Don't Lie to You
De' Clerke · 2026-06-03 · via DEV Community

The JobSense project needed a FastAPI backend that served 604 job embeddings via semantic search, a Pydantic validation layer that stopped bad data before it reached pgvector, and a test suite that could be run without a live Ollama instance. Getting all three right took more time than the pipeline itself.

This article is the guide I wish I had then. It covers FastAPI setup for data engineering use cases, the Pydantic patterns that actually prevent bad data at the boundary, consuming external APIs without silent failures, testing patterns that catch real bugs, debugging the most common FastAPI errors, and the production patterns that most tutorials skip.


What FastAPI Is and Is Not in a Data Stack

Before building anything: FastAPI is a system boundary tool. It is not a scheduler, not a data processor, and not a database.

Use FastAPI for Use something else for
Ingestion endpoint (receive events, files, JSON) Orchestration: use Airflow, Dagster, Prefect
Serving processed data to dashboards Heavy transformation: use pandas, DuckDB, Spark
Triggering pipeline runs via HTTP Real-time streaming: use Kafka, Flink
Health and metadata endpoints Batch processing: use a DAG task, not an endpoint
Feature serving (ML embeddings, predictions) Message queuing: use SQS, RabbitMQ

The most common mistake I see in portfolio projects is using FastAPI where a dbt model and a BI tool would do the job in a third of the code. FastAPI belongs at the edges of your system where external clients need to push data in or pull data out.


App Setup

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI(
    title="JobSense API",
    description="Kenyan jobs semantic search",
    version="1.0.0",
    debug=True,           # detailed error messages in dev — disable in prod
)

# CORS — required when a Streamlit or React frontend calls FastAPI
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:8501", "http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Lifecycle events — connect DB and warm caches at startup
@app.on_event("startup")
async def on_startup():
    await database.connect()

@app.on_event("shutdown")
async def on_shutdown():
    await database.disconnect()

API versioning from day one

If your API will have external consumers, prefix routes with a version. It costs nothing now and avoids breaking changes later.

from fastapi import APIRouter

v1 = APIRouter(prefix="/api/v1", tags=["v1"])
v2 = APIRouter(prefix="/api/v2", tags=["v2"])  # Future — add when needed

@v1.get("/jobs")
def list_jobs_v1():
    return []

app.include_router(v1)

The alternative — changing /api/jobs to a different response shape after clients depend on it — is a breaking change that requires coordination. Versioning upfront avoids this.

Development server

uvicorn main:app --reload               # dev: auto-reload on save
uvicorn main:app --host 0.0.0.0 --port 8000  # expose to network
uvicorn main:app --workers 4            # production: multiple workers

Interactive docs auto-generated at http://localhost:8000/docs (Swagger) and /redoc (ReDoc). Disable them in production:

app = FastAPI(docs_url=None, redoc_url=None)


Pydantic: The Data Contract at the Boundary

Pydantic models are the most important part of a FastAPI data engineering setup. They are the point where your pipeline says "this is the shape data must have to enter my system." Everything downstream assumes this contract was enforced.

from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Optional
from datetime import datetime
from enum import Enum

class JobSource(str, Enum):
    brightermonday = "brightermonday"
    linkedin       = "linkedin"
    jobwebkenya    = "jobwebkenya"

class JobSchema(BaseModel):
    title:      str            = Field(..., min_length=2, max_length=200)
    company:    str            = Field(..., min_length=1)
    salary_min: Optional[float] = Field(None, ge=0)
    salary_max: Optional[float] = Field(None, ge=0)
    source:     JobSource
    posted_at:  Optional[datetime] = None

    @field_validator("title")
    @classmethod
    def strip_title(cls, v: str) -> str:
        return v.strip()

    @model_validator(mode="after")
    def salary_order(self):
        if self.salary_min and self.salary_max:
            if self.salary_min > self.salary_max:
                raise ValueError("salary_min must be <= salary_max")
        return self

When validation fails, FastAPI returns a 422 with the exact field and reason. That is more useful than the silent data corruption you get when you skip validation.

Idempotency keys for ingestion endpoints

Production ingestion APIs add an idempotency key requirement. If the client retries a failed POST, you need to recognize the duplicate and return the same result rather than inserting twice.

import hashlib
from fastapi import Header, HTTPException
from typing import Optional

@app.post("/api/v1/events", status_code=201)
def ingest_event(
    event: EventSchema,
    x_idempotency_key: Optional[str] = Header(None),
):
    if x_idempotency_key:
        # Check if we already processed this key
        existing = event_repo.find_by_idempotency_key(x_idempotency_key)
        if existing:
            return existing  # Return previous result, no re-insert

    result = event_repo.create(event, idempotency_key=x_idempotency_key)
    return result

Without this pattern, a client that retries after a network timeout (which received no response but the insert succeeded) creates a duplicate. This is how pipelines end up with double-counted revenue.

Common HTTP status codes

200 OK            — successful GET/PUT
201 Created       — successful POST
204 No Content    — successful DELETE
400 Bad Request   — client sent invalid data (use this for your own validation logic)
401 Unauthorized  — missing or invalid credentials
403 Forbidden     — authenticated but not permitted to access this resource
404 Not Found     — resource does not exist
422 Unprocessable — Pydantic validation failed (FastAPI default for bad body)
429 Too Many      — rate limited (from you or from upstream)
500 Server Error  — unhandled exception in your code
503 Unavailable   — your service is up but a dependency (DB) is down


Dependency Injection

Dependency injection lets you share resources (database sessions, auth checks, config) across route handlers without passing them around manually.

from fastapi import Depends, HTTPException
from sqlalchemy.orm import Session

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

def get_job_or_404(job_id: int, db: Session = Depends(get_db)) -> JobModel:
    obj = db.get(JobModel, job_id)
    if not obj:
        raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
    return obj

@app.get("/api/v1/jobs/{job_id}", response_model=JobResponse)
def get_job(job: JobModel = Depends(get_job_or_404)):
    return job

API key authentication

from fastapi.security import APIKeyHeader
import os

api_key_header = APIKeyHeader(name="X-API-Key")

def verify_api_key(key: str = Depends(api_key_header)):
    if key != os.getenv("API_KEY"):
        raise HTTPException(status_code=401, detail="Invalid API key")
    return key

@app.get("/admin/stats", dependencies=[Depends(verify_api_key)])
def admin_stats():
    return {"total_jobs": 604}

JWT authentication for multi-user APIs

API keys work for service-to-service auth. For user-facing APIs with multiple roles, use JWT.

from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from datetime import datetime, timedelta

SECRET_KEY = os.getenv("JWT_SECRET_KEY")
ALGORITHM  = "HS256"

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")

def create_access_token(data: dict, expires_delta: timedelta = timedelta(hours=1)):
    payload = data.copy()
    payload["exp"] = datetime.utcnow() + expires_delta
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id = payload.get("sub")
        if user_id is None:
            raise HTTPException(status_code=401, detail="Invalid token")
        return user_id
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid or expired token")

@app.get("/api/v1/profile")
def get_profile(user_id: str = Depends(get_current_user)):
    return {"user_id": user_id}

Install python-jose[cryptography] for the JWT library.


Consuming External APIs Without Silent Failures

Every external API call is a failure point. The pattern that works in all my projects: a session with a retry adapter, explicit timeout, structured error handling, and logging that tells you exactly what failed and why.

requests: the complete setup

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import logging
import os

log = logging.getLogger(__name__)

def build_session() -> requests.Session:
    session = requests.Session()
    session.headers.update({
        "User-Agent": "DataPipeline/1.0",
        "Accept": "application/json",
    })
    retry = Retry(
        total=3,
        backoff_factor=2,             # wait 2s, 4s, 8s between retries
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["GET", "POST"],
    )
    session.mount("https://", HTTPAdapter(max_retries=retry))
    session.mount("http://",  HTTPAdapter(max_retries=retry))
    return session

SESSION = build_session()

Error handling that tells you what actually happened

from requests.exceptions import HTTPError, ConnectionError, Timeout, JSONDecodeError

def fetch_eia_prices(fuel_type: str) -> list[dict]:
    url = "https://api.eia.gov/v2/petroleum/pri/gnd/data/"
    params = {
        "api_key":    os.getenv("EIA_API_KEY"),
        "frequency":  "monthly",
        "data[0]":    "value",
        "facets[product][]": fuel_type,
    }
    try:
        r = SESSION.get(url, params=params, timeout=15)
        r.raise_for_status()
        return r.json()["response"]["data"]
    except HTTPError as e:
        log.error(f"EIA API HTTP {e.response.status_code}: {e.response.text[:200]}")
        raise
    except ConnectionError:
        log.error("EIA API unreachable — check network or service status")
        raise
    except Timeout:
        log.error("EIA API timeout after 15s")
        raise
    except (JSONDecodeError, KeyError) as e:
        log.error(f"EIA API response parse error: {e}")
        raise

Never call r.json() without catching JSONDecodeError. When an API returns a 200 with an HTML error page (maintenance mode, Cloudflare challenge), .json() raises an exception with a confusing message. Catch it explicitly.

Pagination patterns

Offset/limit (most REST APIs):

def fetch_all_pages(base_url: str, params: dict, page_size: int = 100) -> list[dict]:
    all_results = []
    offset = 0
    while True:
        params.update({"limit": page_size, "offset": offset})
        r = SESSION.get(base_url, params=params, timeout=15)
        r.raise_for_status()
        data = r.json()

        # APIs use different response shapes — handle both
        items = data.get("results") or data.get("data") or (data if isinstance(data, list) else [])
        if not items:
            break
        all_results.extend(items)
        offset += len(items)
        if len(items) < page_size:
            break        # reached last page
        time.sleep(0.5)  # polite delay
    log.info(f"Fetched {len(all_results)} total records from {base_url}")
    return all_results

Cursor-based pagination:

def fetch_cursor_pages(base_url: str) -> list[dict]:
    all_results = []
    cursor = None
    while True:
        params = {"cursor": cursor} if cursor else {}
        data = SESSION.get(base_url, params=params, timeout=15).json()
        all_results.extend(data["items"])
        cursor = data.get("next_cursor")
        if not cursor:
            break
    return all_results

Handling 429: rate limit responses

import time

def request_with_backoff(url: str, max_retries: int = 5) -> requests.Response:
    delay = 1
    for attempt in range(max_retries):
        r = SESSION.get(url, timeout=15)
        if r.status_code == 429:
            # Respect the Retry-After header if the API sends one
            wait = int(r.headers.get("Retry-After", delay))
            log.warning(f"Rate limited (attempt {attempt + 1}/{max_retries}). Waiting {wait}s")
            time.sleep(wait)
            delay = min(delay * 2, 60)
            continue
        r.raise_for_status()
        return r
    raise RuntimeError(f"Exceeded {max_retries} retries for {url}")

httpx for async pipelines

Use httpx.AsyncClient inside FastAPI async routes or asyncio-based pipelines. For the Ollama embedding calls in JobSense:

import httpx
import asyncio

async def fetch_embedding(text: str) -> list[float]:
    async with httpx.AsyncClient(timeout=30) as client:
        r = await client.post(
            "http://localhost:11434/api/embeddings",
            json={"model": "nomic-embed-text", "prompt": text},
        )
        r.raise_for_status()
        return r.json()["embedding"]

# Fetch many embeddings concurrently
async def fetch_all_embeddings(texts: list[str]) -> list[list[float]]:
    async with httpx.AsyncClient(timeout=30) as client:
        tasks = [
            client.post(
                "http://localhost:11434/api/embeddings",
                json={"model": "nomic-embed-text", "prompt": t},
            )
            for t in texts
        ]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        results = []
        for r in responses:
            if isinstance(r, Exception):
                log.error(f"Embedding fetch failed: {r}")
                results.append([])
            else:
                results.append(r.json()["embedding"])
        return results


Quick Manual Testing with curl

Before writing a test, reach for curl to verify the endpoint works at all.

# Basic GET
curl http://localhost:8000/api/v1/jobs
curl -s http://localhost:8000/api/v1/jobs | python3 -m json.tool  # pretty print

# GET with query params and auth
curl -H "X-API-Key: abc123" \
     "http://localhost:8000/api/v1/jobs?keyword=data+engineer&limit=10"

# POST with JSON body
curl -X POST http://localhost:8000/api/v1/jobs \
     -H "Content-Type: application/json" \
     -d '{"title": "Data Engineer", "company": "Safaricom", "source": "linkedin"}'

# POST from a file
curl -X POST http://localhost:8000/api/v1/jobs \
     -H "Content-Type: application/json" \
     -d @payload.json

# Verbose: show request and response headers
curl -v http://localhost:8000/api/v1/jobs

# Status code only
curl -o /dev/null -s -w "%{http_code}\n" http://localhost:8000/api/v1/jobs

# Test all services at once
for port in 8000 8080 8501; do
  echo -n ":$port → "
  curl -s --max-time 3 http://localhost:$port/health || echo "DOWN"
done


Testing FastAPI Endpoints

The conftest.py pattern

# tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.main import app
from app.database import get_db, Base

TEST_DB_URL = "postgresql+psycopg2://user:pass@localhost:5432/test_jobsense"

@pytest.fixture(scope="session")
def test_engine():
    engine = create_engine(TEST_DB_URL)
    Base.metadata.create_all(bind=engine)
    yield engine
    Base.metadata.drop_all(bind=engine)

@pytest.fixture
def db_session(test_engine):
    Session = sessionmaker(bind=test_engine)
    session = Session()
    yield session
    session.rollback()   # undo every test's changes
    session.close()

@pytest.fixture
def client(db_session):
    def override_get_db():
        yield db_session
    app.dependency_overrides[get_db] = override_get_db
    with TestClient(app) as c:
        yield c
    app.dependency_overrides.clear()

The session.rollback() in the fixture is critical. Without it, data written by one test leaks into the next, causing flaky tests that pass in isolation but fail in sequence.

Tests that actually catch bugs

# tests/test_jobs.py
class TestJobsEndpoint:

    def test_list_jobs_200(self, client):
        r = client.get("/api/v1/jobs")
        assert r.status_code == 200
        assert isinstance(r.json(), list)

    def test_create_job_201(self, client):
        payload = {"title": "Data Engineer", "company": "Safaricom", "source": "linkedin"}
        r = client.post("/api/v1/jobs", json=payload)
        assert r.status_code == 201
        assert r.json()["title"] == "Data Engineer"

    def test_missing_required_field_422(self, client):
        # Test that Pydantic validation catches missing company
        r = client.post("/api/v1/jobs", json={"title": "No company"})
        assert r.status_code == 422
        errors = r.json()["detail"]
        assert any("company" in str(e) for e in errors)

    def test_invalid_enum_422(self, client):
        payload = {"title": "DE", "company": "X", "source": "FAKE_SOURCE"}
        r = client.post("/api/v1/jobs", json=payload)
        assert r.status_code == 422

    def test_salary_validation(self, client):
        # salary_min > salary_max should fail
        payload = {
            "title": "DE", "company": "X", "source": "linkedin",
            "salary_min": 200_000, "salary_max": 100_000
        }
        r = client.post("/api/v1/jobs", json=payload)
        assert r.status_code == 422

    def test_job_not_found_404(self, client):
        r = client.get("/api/v1/jobs/99999")
        assert r.status_code == 404

    def test_delete_204(self, client):
        r = client.post("/api/v1/jobs", json={"title": "Temp", "company": "X", "source": "linkedin"})
        job_id = r.json()["id"]
        assert client.delete(f"/api/v1/jobs/{job_id}").status_code == 204
        assert client.get(f"/api/v1/jobs/{job_id}").status_code == 404

    def test_auth_required_401(self, client):
        r = client.get("/admin/stats")
        assert r.status_code in (401, 403)

    def test_auth_with_key(self, client, monkeypatch):
        monkeypatch.setenv("API_KEY", "test-key-123")
        r = client.get("/admin/stats", headers={"X-API-Key": "test-key-123"})
        assert r.status_code == 200

Mocking external API calls

Never call a live external API in tests. They are slow, unreliable, may have rate limits, and make your CI dependent on a third-party service being up.

# Using pytest-mock
def test_eia_endpoint(client, mocker):
    mock_response = MagicMock()
    mock_response.json.return_value = {
        "response": {"data": [{"period": "2024-01", "value": "3.45"}]}
    }
    mock_response.status_code = 200
    mock_response.raise_for_status = lambda: None
    mocker.patch("app.services.eia.SESSION.get", return_value=mock_response)

    r = client.get("/api/v1/energy/prices?fuel=gasoline")
    assert r.status_code == 200

# Using the 'responses' library (cleaner for URL-level mocking)
import responses as mock_http

@mock_http.activate
def test_cbk_forex_fetch():
    mock_http.add(
        mock_http.GET,
        "https://www.centralbank.go.ke/api/forex",
        json={"rates": [{"pair": "USD/KES", "rate": 129.5}]},
        status=200,
    )
    from app.services.forex import fetch_rates
    data = fetch_rates()
    assert data[0]["rate"] == 129.5

Async endpoint tests

import pytest
import httpx
from app.main import app

@pytest.mark.asyncio
async def test_semantic_search():
    async with httpx.AsyncClient(app=app, base_url="http://test") as client:
        r = await client.post(
            "/api/v1/search",
            json={"text": "python data engineer nairobi", "top_k": 5}
        )
        assert r.status_code == 200
        assert len(r.json()) <= 5

Add to pytest.ini or pyproject.toml:

[pytest]
asyncio_mode = auto

Useful pytest flags

pytest -v                              # verbose output
pytest -x                              # stop on first failure
pytest -s                              # show print/logging output
pytest -k "keyword"                    # run matching tests only
pytest -k "not slow"                   # skip slow tests
pytest --cov=app --cov-report=term-missing  # coverage
pytest -m integration                  # run marked tests


Debugging Common Errors

422 Unprocessable Entity

This is FastAPI's most common error. Pydantic validation failed. The response body tells you exactly what and where:

curl -X POST http://localhost:8000/api/v1/jobs \
     -H "Content-Type: application/json" \
     -d '{"title": "DE"}' | python3 -m json.tool

{
  "detail": [
    {
      "loc": ["body", "company"],
      "msg": "Field required",
      "type": "missing"
    }
  ]
}

Common causes:

  • Required field missing in the request body
  • Wrong type (sending a string where a number is expected)
  • Enum value not in the allowed list
  • min_length or max_length constraint violated

500 Internal Server Error

Check the uvicorn terminal. The full Python traceback is printed there. For dev, add a global exception handler that returns the trace in the response:

from fastapi import Request
from fastapi.responses import JSONResponse
import traceback

@app.exception_handler(Exception)
async def generic_handler(request: Request, exc: Exception):
    return JSONResponse(
        status_code=500,
        content={"detail": str(exc), "trace": traceback.format_exc()},
    )

Disable this in production. Exposing tracebacks to external clients leaks implementation details.

Custom exception handlers (better than generic 500)

class DataQualityError(Exception):
    def __init__(self, message: str, field: str = None):
        self.message = message
        self.field   = field

@app.exception_handler(DataQualityError)
async def data_quality_handler(request: Request, exc: DataQualityError):
    return JSONResponse(
        status_code=400,
        content={"error": "data_quality_error", "message": exc.message, "field": exc.field},
    )

# In your route:
@app.post("/api/v1/events")
def ingest_event(event: EventSchema):
    if event.timestamp > datetime.utcnow():
        raise DataQualityError("Event timestamp is in the future", field="timestamp")

This pattern gives clients a structured error they can handle programmatically rather than a generic 500.

CORS errors in the browser

CORS errors appear in the browser console, not in the FastAPI terminal. The fix is nearly always the same:

app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:8501"],  # exact origin, no trailing slash
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

Common CORS mistakes:

  • Including a trailing slash: "http://localhost:8501/" does not match "http://localhost:8501"
  • Using allow_origins=["*"] with allow_credentials=True (blocked by browsers for credentialed requests)
  • Forgetting to add the middleware before route definitions

Debugging what is running on a port

# Linux / WSL
lsof -i :8000
fuser -k 8000/tcp     # kill what is on port 8000

# PowerShell
netstat -ano | findstr ":8000"
$pid = (Get-NetTCPConnection -LocalPort 8000).OwningProcess
Stop-Process -Id $pid -Force


Production Patterns

Health endpoint (add to every API)

from sqlalchemy import text

@app.get("/health")
def health(db: Session = Depends(get_db)):
    try:
        db.execute(text("SELECT 1"))
        return {"status": "ok", "database": "connected"}
    except Exception as e:
        raise HTTPException(status_code=503, detail=str(e))

Airflow's HttpSensor can poll this endpoint before triggering downstream tasks. Docker Compose health checks use it. It is one line of code that saves real debugging time.

Rate limiting your own API

Protecting your API from abuse or accidental hammering requires a rate limiter. slowapi is the standard library for FastAPI:

pip install slowapi

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get("/api/v1/search")
@limiter.limit("10/minute")
def semantic_search(request: Request, q: str):
    # The `request` parameter is required by slowapi
    return search_jobs(q)

Without rate limiting, a single script hitting /search in a tight loop can exhaust your database connections or your embedding model's memory.

Streaming responses for large data exports

When an endpoint returns a large dataset (thousands of rows), do not load the entire result into memory before sending the response. Use StreamingResponse:

import csv
import io
from fastapi.responses import StreamingResponse

@app.get("/api/v1/export/jobs.csv")
def export_jobs_csv(db: Session = Depends(get_db)):
    def generate():
        output = io.StringIO()
        writer = csv.writer(output)
        writer.writerow(["id", "title", "company", "source", "posted_at"])
        yield output.getvalue()
        output.seek(0)
        output.truncate(0)

        for job in db.query(JobModel).yield_per(1000):
            writer.writerow([job.id, job.title, job.company, job.source, job.posted_at])
            yield output.getvalue()
            output.seek(0)
            output.truncate(0)

    return StreamingResponse(
        generate(),
        media_type="text/csv",
        headers={"Content-Disposition": "attachment; filename=jobs.csv"},
    )

yield_per(1000) on the SQLAlchemy query means only 1,000 rows are held in memory at a time regardless of how large the table is.

Response caching for expensive queries

For endpoints that run the same expensive query repeatedly (dashboard metrics, aggregate counts), cache the result in memory:

from functools import lru_cache
from datetime import datetime, timedelta

_cache: dict = {}

def get_cached(key: str, ttl_seconds: int = 300):
    entry = _cache.get(key)
    if entry and datetime.utcnow() - entry["ts"] < timedelta(seconds=ttl_seconds):
        return entry["value"]
    return None

def set_cached(key: str, value):
    _cache[key] = {"value": value, "ts": datetime.utcnow()}

@app.get("/api/v1/stats")
def get_stats(db: Session = Depends(get_db)):
    cached = get_cached("stats", ttl_seconds=300)
    if cached:
        return cached
    result = {
        "total_jobs":     db.query(JobModel).count(),
        "total_sources":  db.query(JobModel.source).distinct().count(),
    }
    set_cached("stats", result)
    return result

For production with multiple workers, replace the in-memory dict with Redis so all workers share the cache.

Database connection pool configuration

The default SQLAlchemy pool is fine for development. For production with multiple Uvicorn workers:

from sqlalchemy import create_engine

engine = create_engine(
    os.getenv("DATABASE_URL"),
    pool_size=5,          # connections kept open per worker
    max_overflow=10,      # extra connections above pool_size allowed in burst
    pool_pre_ping=True,   # test connection before use (handles DB restarts)
    pool_recycle=3600,    # recycle connections older than 1 hour (avoids stale TCP)
)

pool_pre_ping=True is the one you need most. Without it, workers that have been idle may hold dead connections and throw OperationalError on the first request after a database restart.

Structured request logging

import time
import logging

log = logging.getLogger("api")

@app.middleware("http")
async def log_requests(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    elapsed_ms = round((time.time() - start) * 1000, 2)
    log.info(
        f"{request.method} {request.url.path} "
        f"status={response.status_code} "
        f"duration={elapsed_ms}ms "
        f"ip={request.client.host}"
    )
    return response

This middleware gives you one log line per request with the information you need to debug production issues: method, path, status code, and how long it took.


The Profiling Section Nobody Reads Until They Need It

When an endpoint is slower than expected, do not guess. Measure.

import time
from functools import wraps

def timed(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        t = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = (time.perf_counter() - t) * 1000
        print(f"{func.__name__} took {elapsed:.2f}ms")
        return result
    return wrapper

@timed
def slow_query(db):
    return db.query(JobModel).filter(...).all()

# Full profiling with cProfile
import cProfile, pstats, io

pr = cProfile.Profile()
pr.enable()
slow_function()
pr.disable()

stream = io.StringIO()
pstats.Stats(pr, stream=stream).sort_stats("cumulative").print_stats(20)
print(stream.getvalue())

# Log all SQL queries from SQLAlchemy (enable during debugging, disable in prod)
import logging
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)

The SQLAlchemy logging usually reveals the problem immediately: an N+1 query pattern where a route is running one query per row rather than a single JOIN.


JobSense uses FastAPI for semantic job search with pgvector and Ollama. The Kenya Forex API uses FastAPI with DuckDB for sub-10ms query latency. Both are on GitHub.

Follow me on dev.to for more on data engineering, APIs, and pipelines.