惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

F
Full Disclosure
Recorded Future
Recorded Future
T
Tenable Blog
S
Securelist
C
CERT Recently Published Vulnerability Notes
T
Threatpost
S
Schneier on Security
A
Arctic Wolf
The Hacker News
The Hacker News
C
CXSECURITY Database RSS Feed - CXSecurity.com
Know Your Adversary
Know Your Adversary
P
Privacy International News Feed
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
The Register - Security
The Register - Security
Cisco Talos Blog
Cisco Talos Blog
AWS News Blog
AWS News Blog
K
Kaspersky official blog
T
True Tiger Recordings
T
Threat Research - Cisco Blogs
V
Vulnerabilities – Threatpost
P
Palo Alto Networks Blog
T
The Exploit Database - CXSecurity.com
小众软件
小众软件
B
Blog
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
Microsoft Azure Blog
Microsoft Azure Blog
Cyberwarzone
Cyberwarzone
C
Cybersecurity and Infrastructure Security Agency CISA
T
Tor Project blog
Spread Privacy
Spread Privacy
Malwarebytes
Malwarebytes
P
Proofpoint News Feed
F
Fox-IT International blog
F
Fortinet All Blogs
P
Privacy & Cybersecurity Law Blog
G
GRAHAM CLULEY
量子位
Latest news
Latest news
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
博客园 - 叶小钗
Project Zero
Project Zero
T
Tailwind CSS Blog
N
Netflix TechBlog - Medium
Martin Fowler
Martin Fowler
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
I
Intezer
博客园_首页
腾讯CDC
H
Hackread – Cybersecurity News, Data Breaches, AI and More
D
Darknet – Hacking Tools, Hacker News & Cyber Security

DEV Community

08/20: Layer 3 – The Network Layer: IP Addresses & Routing Explained CLAUDE.md for Astro: 13 Rules That Stop AI from Shipping Too Much JavaScript 10 JSON Formatting Tricks Every Developer Should Know We replaced 73 hours of weekly alert triage with 10 AI agents. Here is what the architecture looks like. The four-line cron that decides who falls in love (in my dating app) Blocked by Mac Security? How to Fix “Apple Could Not Verify” Errors in Seconds Stop the Leak: A Developer’s Guide to Taming the AWS RDS Bill in 2026 How to Decode JWT Tokens Without Sending Data to a Server Practical AI Adoption in Test Automation PicoCTF Web Challenge Writeup: NO FA PicoCTF Web Challenge Writeup: Failure Failure An AI Agent Wiped a Production Database in 9 Seconds. What Engineers Must Design Before Shipping. The Fire That Reached the Backups: The OVHcloud Strasbourg Data-Centre Fire, 2021 Why HEIC to JPG Is Still a Massive Problem for iPhone Users? How I Fixed a CSS Animation Bug in an Open Source React Library Why Your API Gateway Might Be Your Biggest Compliance Liability Liquidity Pool Analyzer — Zero-Dep Python CLI for Solana DEX Data What AI Leaders Are Really Worried About in 2026 5 ways AI agents quietly die inside n8n production LLM-as-judge variance broke our DPO training signal for 3 weeks I Tracked Revenue Per User for 6 Months — Here's Why ARPU Beats ARPPU for Channel Decisions 2026 I stopped trying to build a “productivity app.” How to Build a HIPAA-Compliant Healthcare App in React Native (2026) Veltrix Was Losing Events in Plain Sight—Heres the Flame Graph That Proved It Anthropic Self-Hosted Sandboxes + MCP Tunnels: Enterprise AI Agents That Keep Your Data Behind Your Walls Understanding Closures in JavaScript: A Complete Beginner Guide Most expense trackers expect perfect English. But real users type in Hindi, Hinglish, mixed language, and natural conversation. So I built https://vitmora.com to understand the way people actually type. I Got Tired of Messy Bookmark Managers, So I Built My Own HackTheBox: DarkZero Writeup The seam I Built an AI Expense Tracker That Understands the Way People Actually Type I built a Chrome extension after my kid turned my YouTube feed into Roblox Building a Production MCP Server in Laravel How Our Event-Driven Pipeline Blew Up Because We Trusted the Default Config Looping in Python I Built a Retro Gaming Console Using ESP32 and OLED Display 🎮 ORA-00255 오류 원인과 해결 방법 완벽 가이드 Why Hytale Treasure Hunt Servers Throttle at 100 Players (And How We Fixed It) Product Update: Post-Quantum Cryptography meets <1s Kubernetes Syncs ECS vs EKS vs Lambda: How to Pick the Right AWS Compute Service (2026) Shopify fired the webhook. My server never processed it. Here's how I catch that now. Understanding React: Components, JSX, Virtual DOM, and More Stage 0.2 — Operating System Fundamentals I Didn’t Need Another Markdown App. So I Built This Instead. ClickUp Alternatives for Solo Freelancers Who Want Less Complexity The Gods That Ate the Engineers "My AI Agent Kept Missing Buttons, So I Used Windows UI Automation" Manejo de errores en Go - Primeros pasos The Treasure Hunt Engine Blew Up My Inbox at 3 AM Curing Telegram Information Overload: How I Automate Deal Hunting with AI and MTProto Read-Modify-Write isolation in NoSQL, part 2: When the invariant spans multiple aggregates. The Code Runs. The System Runs Too. How I secured my FastAPI app - 6 vulnerabilities fixed in one session with gstack /cso The Day the Treasure Hunt Engine Stopped Beeping The bf16 grad accumulator that killed our SDXL LoRA training I Still Have Nightmares About the Time Our Hytale Server Crashed Under Load Stop Using Global State: Master Localized React Context ⚡ Build a Private AI Search on Your Device: Local RAG in the Browser Stop Freezing Your API: Async Email Delivery in Laravel An AI Agent Wrote and Sold Her Own Prompt Collection Solana Validator Stake Checker CLI — Track Decentralization from Your Terminal Mouse Unlock!—no password, just a secret click pattern Reloading Textures in Blender Is a Pain — I Made a Free Add-on for That AI Agents Don't Log In. That's Why Your Entire Security Stack Is Flying Blind Claude Cowork has changed managing a Figma design system library forever Bayesian Knowledge Tracing in 37 lines of Python — how NumPath models what a student knows Two Cross-Platform Bugs in Our Go CLI (And How We Fixed Them) Two Knowledge Hierarchies: Structuring Context for AI Agents and LLMs The Day Treasure Hunt Broke My Caches—And How We Fixed It From Figma to production React, with AI in the loop Built a Sentiment Analysis Web App – My First Full-Stack ML Project I built a zsh cleanup script for macOS dev machines — and learned more than I expected AI 3D tools need product evals, not benchmark faith AI Prompt Injection Defense: Building Effective Strategies in 5 Steps Treasure Hunt Engine Blew Up When We Asked It To Grow I Tried Self-Hosting Open Source AI Models. Here's Why I Went Back to APIs. Enterprise vs Startup AI APIs — The Architectural Decision Nobody Talks About I Cut My AI API Bill from $420 to $28/Month — Here's Exactly How ENS Resolver CLI — Look Up Any ENS Name from Your Terminal 🚀 My Journey Begins on DEV Community — Building Startups, Communities & AI-Powered Solutions Using AI Chat Is Not the Same as Using an AI Agent The Cache That Bled — How We Turned Veltrix Event Config From Silent Killer to Silent Savior Designing a Modular Wiring Harness for Multi-Function Vehicle Trackers Reviving a 12K+ Star Abandoned Library: toastr-next v3 🍞 The Day the Language Became the Bottleneck winston vs pino in 2026: A Production-Tested Comparison HTB: MonitorsFour - Full Walkthrough Fixing your writing tone with a Chrome extension Experimented to fork AWS infra graph and simulate what breaks before you deploy Industrial SEO at 100 Pages/Week: My n8n + Claude Code + RAG Stack I Built a Kubernetes Alternative. It Changed My Perspective on Complexity. Chronos vs Toto: Zero-Shot Forecasting Benchmark Results Edge-Cached Localhost Tunnels: How to Give Stakeholders a Production-Fast Preview Directly from Your IDE Radiation-Proof Flash Storage Could Be the Missing Layer for AI Data Centers in Space AI Learning Roadmap: Where to Start if You're a Complete Beginner I built 6 free dev tools to skip the signup walls — here's what I learned How to Set Realistic Goals for an Open Source Project? How I Built an Indonesian NLP Parser That Understands Warung Owners, Then Abandoned It Keyboard shortcuts that fixed my editing flow I Built an AI-Native Productivity System Instead of Another AI Wrapper
Building a DAG Workflow Orchestration Engine from Scratch in Python
Haji Rufai · 2026-05-27 · via DEV Community

Every data engineer knows Apache Airflow. But how many have built a workflow orchestrator from scratch? Understanding the internals — topological sorting, parallel execution, trigger rules, retry logic — transforms you from a user into an architect.

In this article, I'll walk through building TaskFlow, a lightweight DAG workflow orchestration engine in Python. It's ~3,100 lines of production-quality code with 107 passing tests.

GitHub: hajirufai/taskflow


What We're Building

TaskFlow lets you define workflows as directed acyclic graphs (DAGs) of tasks, then executes them with:

  • Parallel execution of independent tasks (asyncio)
  • Dependency resolution via topological sort
  • Trigger rules (all_success, all_done, one_success, none_failed)
  • Retry with exponential backoff and jitter
  • Timeout enforcement per task
  • SQLite-backed run history
  • Cron scheduling
  • REST API (FastAPI) + Rich CLI
🚀 Starting DAG run: etl_pipeline
  ✅ extract              [2.30s]
  ✅ validate             [0.12s]
  ✅ transform            [1.50s]
  ✅ load                 [3.10s]
  ✅ notify               [0.05s]
✨ DAG run completed in 5.60s (5/5 tasks succeeded)

Enter fullscreen mode Exit fullscreen mode


Core Architecture

┌─────────────────────────────────────────────┐
│              TaskFlow Engine                 │
├──────────┬──────────┬───────────┬───────────┤
│   DAG    │  Graph   │ Executor  │ Scheduler │
│ Registry │  Algos   │ (async)   │  (cron)   │
├──────────┴──────────┴───────────┴───────────┤
│         Storage (SQLite + aiosqlite)         │
├─────────────────────┬───────────────────────┤
│    REST API         │      Rich CLI         │
│   (FastAPI)         │     (Click)           │
└─────────────────────┴───────────────────────┘

Enter fullscreen mode Exit fullscreen mode

The engine has four pillars:

  1. DAG Registry — stores workflow definitions with task configs
  2. Graph Algorithms — topological sort, cycle detection, parallel levels
  3. Async Executor — runs tasks with concurrency control
  4. Storage — persists run history to SQLite

Step 1: Graph Algorithms

The foundation of any workflow engine is graph theory. We need:

Topological Sort (Kahn's Algorithm)

This gives us a valid execution order — no task runs before its dependencies.

from collections import deque

def topological_sort(nodes, edges):
    """Kahn's algorithm: O(V + E) topological ordering."""
    in_degree = {n: 0 for n in nodes}
    for src, dsts in edges.items():
        for dst in dsts:
            in_degree[dst] += 1

    queue = deque(n for n in nodes if in_degree[n] == 0)
    result = []

    while queue:
        node = queue.popleft()
        result.append(node)
        for downstream in edges.get(node, []):
            in_degree[downstream] -= 1
            if in_degree[downstream] == 0:
                queue.append(downstream)

    if len(result) != len(nodes):
        raise CycleError("Graph contains a cycle!")
    return result

Enter fullscreen mode Exit fullscreen mode

Parallel Level Assignment

This groups tasks that can run concurrently:

def parallel_levels(nodes, edges):
    """Assign tasks to execution levels.
    Level 0 = no dependencies. Level N = max(upstream levels) + 1.
    """
    order = topological_sort(nodes, edges)
    reverse_edges = build_reverse_edges(edges)

    levels_map = {}
    for node in order:
        upstreams = reverse_edges.get(node, [])
        if not upstreams:
            levels_map[node] = 0
        else:
            levels_map[node] = max(levels_map[u] for u in upstreams) + 1

    # Group by level
    max_level = max(levels_map.values(), default=0)
    return [
        [n for n in order if levels_map[n] == i]
        for i in range(max_level + 1)
    ]

Enter fullscreen mode Exit fullscreen mode

For a diamond DAG (extract → [transform, validate] → load), this produces:

  • Level 0: [extract]
  • Level 1: [transform, validate] ← run in parallel!
  • Level 2: [load]

Step 2: DAG Definition with Decorators

The user-facing API uses Python decorators — familiar and Pythonic:

from taskflow import DAG, TriggerRule

dag = DAG("etl_pipeline", schedule="0 2 * * *")

@dag.task()
def extract():
    return {"records": fetch_data()}

@dag.task(depends_on=["extract"], retries=3, timeout=300)
def transform(extract=None):
    return clean(extract["records"])

@dag.task(depends_on=["transform"])
def load(transform=None):
    write_to_warehouse(transform)

Enter fullscreen mode Exit fullscreen mode

Under the hood, each @dag.task() call:

  1. Creates a TaskConfig with retry/timeout/trigger settings
  2. Wraps the function in a TaskDefinition
  3. Registers edges in the DAG's adjacency list
  4. Adds to a global registry for CLI/API discovery

Step 3: The Async Executor

The executor is where the magic happens. It runs tasks level by level, with proper concurrency control:

class Executor:
    async def execute(self, dag, run_id=None):
        # 1. Validate the DAG (check for cycles, missing deps)
        errors = dag.validate()
        if errors:
            raise ValueError(f"Invalid DAG: {errors}")

        # 2. Get parallel execution levels
        levels = parallel_levels(dag.tasks.keys(), dag.edges)

        # 3. Execute each level
        for level_tasks in levels:
            coros = [
                self._execute_task(dag, task_id, dag_run)
                for task_id in level_tasks
            ]
            await asyncio.gather(*coros)  # Parallel!

        return dag_run

Enter fullscreen mode Exit fullscreen mode

Trigger Rules

Before running a task, we evaluate its trigger rule against upstream states:

def _evaluate_trigger_rule(rule, depends_on, dag_run):
    upstream_states = [dag_run.task_runs[dep].state for dep in depends_on]

    if rule == TriggerRule.ALL_SUCCESS:
        return all(s == TaskState.SUCCESS for s in upstream_states)
    elif rule == TriggerRule.ALL_DONE:
        return all(s.is_terminal for s in upstream_states)
    elif rule == TriggerRule.ONE_SUCCESS:
        return any(s == TaskState.SUCCESS for s in upstream_states)
    elif rule == TriggerRule.NONE_FAILED:
        return not any(s in (FAILED, TIMED_OUT) for s in upstream_states)

Enter fullscreen mode Exit fullscreen mode

This enables patterns like cleanup tasks that run even when upstream tasks fail (ALL_DONE), or notification tasks that fire when at least one branch succeeds (ONE_SUCCESS).


Step 4: Retry with Exponential Backoff

Production workflows need retries. We implement exponential backoff with jitter to avoid thundering herds:

def compute_delay(attempt, base_delay=1.0, backoff=2.0, max_delay=300.0):
    delay = base_delay * (backoff ** attempt)
    delay = min(delay, max_delay)
    # Add ±25% jitter to avoid synchronized retries
    jitter = delay * 0.25
    delay += random.uniform(-jitter, jitter)
    return max(0.0, delay)

Enter fullscreen mode Exit fullscreen mode

The retry sequence for base_delay=1.0, backoff=2.0: ~1s → ~2s → ~4s → ~8s → ... capped at max_delay.


Step 5: SQLite Storage

Every run is persisted to SQLite for history and debugging:

async def save_dag_run(dag_run):
    db = await get_db()
    await db.execute(
        "INSERT INTO dag_runs (run_id, dag_id, state, ...) VALUES (?, ?, ?, ...)",
        (dag_run.run_id, dag_run.dag_id, dag_run.state.value, ...),
    )
    for task_run in dag_run.task_runs.values():
        await db.execute(
            "INSERT INTO task_runs (run_id, task_id, state, ...) VALUES (...)",
            ...
        )

Enter fullscreen mode Exit fullscreen mode

We use aiosqlite for async compatibility, WAL mode for concurrent reads, and automatic cleanup of old runs.


Step 6: Cron Scheduler

The built-in scheduler parses standard cron expressions:

class CronExpression:
    def __init__(self, expression):
        # Parse "0 */6 * * *" into sets of valid values
        parts = expression.split()
        self.minute = self._parse_field(parts[0], 0, 59)
        self.hour = self._parse_field(parts[1], 0, 23)
        # ... day, month, weekday

    def matches(self, dt):
        return (dt.minute in self.minute
                and dt.hour in self.hour
                and dt.day in self.day
                and dt.month in self.month
                and dt.weekday() in self.weekday)

Enter fullscreen mode Exit fullscreen mode

Supports wildcards (*), steps (*/6), ranges (9-17), and lists (1,15,28).


The REST API

FastAPI gives us automatic OpenAPI docs and async support:

@app.post("/dags/{dag_id}/trigger")
async def trigger_dag(dag_id: str):
    dag = DAG.get(dag_id)
    executor = Executor()
    dag_run = await executor.execute(dag)
    await save_dag_run(dag_run)
    return {
        "run_id": dag_run.run_id,
        "state": dag_run.state.value,
        "summary": dag_run.summary(),
    }

Enter fullscreen mode Exit fullscreen mode

Endpoints for listing DAGs, triggering runs, querying history, and getting task-level details.


Testing: 107 Tests in 0.94s

Comprehensive test coverage across all components:

Module Tests What's Covered
test_graph.py 18 Topo sort, cycles, levels, critical path
test_dag.py 14 Creation, validation, registry, context manager
test_executor.py 17 Execution, failures, timeouts, trigger rules, async
test_retry.py 10 Backoff, jitter, exhaustion
test_store.py 9 Save, query, filter, cleanup
test_scheduler.py 18 Cron parsing, matching, scheduling
test_api.py 11 All REST endpoints

Key testing patterns:

  • In-memory SQLite for fast, isolated storage tests
  • ASGITransport from httpx for testing FastAPI without a server
  • DAG registry cleanup between tests to prevent leakage
  • pytest-asyncio with asyncio_mode = "auto" for clean async tests

What I Learned

  1. Kahn's algorithm is elegant — just in-degree tracking and a queue. O(V+E) and naturally detects cycles.

  2. Trigger rules change everythingALL_DONE for cleanup, ONE_SUCCESS for fan-in patterns, NONE_FAILED for conditional logic. These four rules cover most real-world patterns.

  3. Jitter in retries matters — without it, all failed tasks retry at the same time (thundering herd). A ±25% jitter spreads the load.

  4. asyncio.Semaphore is perfect for concurrency limits — combined with asyncio.gather() for level-based parallelism.

  5. SQLite + WAL mode is surprisingly capable — for single-process orchestrators, it's all you need. No Postgres required.


Try It

git clone https://github.com/hajirufai/taskflow.git
cd taskflow
pip install -e ".[dev]"
pytest tests/ -v
python examples/etl_pipeline.py

Enter fullscreen mode Exit fullscreen mode

The full source is ~3,100 lines of Python with 107 tests. It's a great foundation for understanding how workflow orchestrators work under the hood.

GitHub: hajirufai/taskflow


Building things from scratch is the fastest way to truly understand them. If you're interviewing for data engineering roles, being able to explain topological sort and trigger rules from first principles sets you apart.