Every data engineer knows Apache Airflow. But how many have built a workflow orchestrator from scratch? Understanding the internals — topological sorting, parallel execution, trigger rules, retry logic — transforms you from a user into an architect.
In this article, I'll walk through building TaskFlow, a lightweight DAG workflow orchestration engine in Python. It's ~3,100 lines of production-quality code with 107 passing tests.
GitHub: hajirufai/taskflow
What We're Building
TaskFlow lets you define workflows as directed acyclic graphs (DAGs) of tasks, then executes them with:
- Parallel execution of independent tasks (asyncio)
- Dependency resolution via topological sort
- Trigger rules (all_success, all_done, one_success, none_failed)
- Retry with exponential backoff and jitter
- Timeout enforcement per task
- SQLite-backed run history
- Cron scheduling
- REST API (FastAPI) + Rich CLI
🚀 Starting DAG run: etl_pipeline
✅ extract [2.30s]
✅ validate [0.12s]
✅ transform [1.50s]
✅ load [3.10s]
✅ notify [0.05s]
✨ DAG run completed in 5.60s (5/5 tasks succeeded)
Core Architecture
┌─────────────────────────────────────────────┐
│ TaskFlow Engine │
├──────────┬──────────┬───────────┬───────────┤
│ DAG │ Graph │ Executor │ Scheduler │
│ Registry │ Algos │ (async) │ (cron) │
├──────────┴──────────┴───────────┴───────────┤
│ Storage (SQLite + aiosqlite) │
├─────────────────────┬───────────────────────┤
│ REST API │ Rich CLI │
│ (FastAPI) │ (Click) │
└─────────────────────┴───────────────────────┘
The engine has four pillars:
- DAG Registry — stores workflow definitions with task configs
- Graph Algorithms — topological sort, cycle detection, parallel levels
- Async Executor — runs tasks with concurrency control
- Storage — persists run history to SQLite
Step 1: Graph Algorithms
The foundation of any workflow engine is graph theory. We need:
Topological Sort (Kahn's Algorithm)
This gives us a valid execution order — no task runs before its dependencies.
from collections import deque
def topological_sort(nodes, edges):
"""Kahn's algorithm: O(V + E) topological ordering."""
in_degree = {n: 0 for n in nodes}
for src, dsts in edges.items():
for dst in dsts:
in_degree[dst] += 1
queue = deque(n for n in nodes if in_degree[n] == 0)
result = []
while queue:
node = queue.popleft()
result.append(node)
for downstream in edges.get(node, []):
in_degree[downstream] -= 1
if in_degree[downstream] == 0:
queue.append(downstream)
if len(result) != len(nodes):
raise CycleError("Graph contains a cycle!")
return result
Parallel Level Assignment
This groups tasks that can run concurrently:
def parallel_levels(nodes, edges):
"""Assign tasks to execution levels.
Level 0 = no dependencies. Level N = max(upstream levels) + 1.
"""
order = topological_sort(nodes, edges)
reverse_edges = build_reverse_edges(edges)
levels_map = {}
for node in order:
upstreams = reverse_edges.get(node, [])
if not upstreams:
levels_map[node] = 0
else:
levels_map[node] = max(levels_map[u] for u in upstreams) + 1
# Group by level
max_level = max(levels_map.values(), default=0)
return [
[n for n in order if levels_map[n] == i]
for i in range(max_level + 1)
]
For a diamond DAG (extract → [transform, validate] → load), this produces:
- Level 0:
[extract] - Level 1:
[transform, validate]← run in parallel! - Level 2:
[load]
Step 2: DAG Definition with Decorators
The user-facing API uses Python decorators — familiar and Pythonic:
from taskflow import DAG, TriggerRule
dag = DAG("etl_pipeline", schedule="0 2 * * *")
@dag.task()
def extract():
return {"records": fetch_data()}
@dag.task(depends_on=["extract"], retries=3, timeout=300)
def transform(extract=None):
return clean(extract["records"])
@dag.task(depends_on=["transform"])
def load(transform=None):
write_to_warehouse(transform)
Under the hood, each @dag.task() call:
- Creates a
TaskConfigwith retry/timeout/trigger settings - Wraps the function in a
TaskDefinition - Registers edges in the DAG's adjacency list
- Adds to a global registry for CLI/API discovery
Step 3: The Async Executor
The executor is where the magic happens. It runs tasks level by level, with proper concurrency control:
class Executor:
async def execute(self, dag, run_id=None):
# 1. Validate the DAG (check for cycles, missing deps)
errors = dag.validate()
if errors:
raise ValueError(f"Invalid DAG: {errors}")
# 2. Get parallel execution levels
levels = parallel_levels(dag.tasks.keys(), dag.edges)
# 3. Execute each level
for level_tasks in levels:
coros = [
self._execute_task(dag, task_id, dag_run)
for task_id in level_tasks
]
await asyncio.gather(*coros) # Parallel!
return dag_run
Trigger Rules
Before running a task, we evaluate its trigger rule against upstream states:
def _evaluate_trigger_rule(rule, depends_on, dag_run):
upstream_states = [dag_run.task_runs[dep].state for dep in depends_on]
if rule == TriggerRule.ALL_SUCCESS:
return all(s == TaskState.SUCCESS for s in upstream_states)
elif rule == TriggerRule.ALL_DONE:
return all(s.is_terminal for s in upstream_states)
elif rule == TriggerRule.ONE_SUCCESS:
return any(s == TaskState.SUCCESS for s in upstream_states)
elif rule == TriggerRule.NONE_FAILED:
return not any(s in (FAILED, TIMED_OUT) for s in upstream_states)
This enables patterns like cleanup tasks that run even when upstream tasks fail (ALL_DONE), or notification tasks that fire when at least one branch succeeds (ONE_SUCCESS).
Step 4: Retry with Exponential Backoff
Production workflows need retries. We implement exponential backoff with jitter to avoid thundering herds:
def compute_delay(attempt, base_delay=1.0, backoff=2.0, max_delay=300.0):
delay = base_delay * (backoff ** attempt)
delay = min(delay, max_delay)
# Add ±25% jitter to avoid synchronized retries
jitter = delay * 0.25
delay += random.uniform(-jitter, jitter)
return max(0.0, delay)
The retry sequence for base_delay=1.0, backoff=2.0: ~1s → ~2s → ~4s → ~8s → ... capped at max_delay.
Step 5: SQLite Storage
Every run is persisted to SQLite for history and debugging:
async def save_dag_run(dag_run):
db = await get_db()
await db.execute(
"INSERT INTO dag_runs (run_id, dag_id, state, ...) VALUES (?, ?, ?, ...)",
(dag_run.run_id, dag_run.dag_id, dag_run.state.value, ...),
)
for task_run in dag_run.task_runs.values():
await db.execute(
"INSERT INTO task_runs (run_id, task_id, state, ...) VALUES (...)",
...
)
We use aiosqlite for async compatibility, WAL mode for concurrent reads, and automatic cleanup of old runs.
Step 6: Cron Scheduler
The built-in scheduler parses standard cron expressions:
class CronExpression:
def __init__(self, expression):
# Parse "0 */6 * * *" into sets of valid values
parts = expression.split()
self.minute = self._parse_field(parts[0], 0, 59)
self.hour = self._parse_field(parts[1], 0, 23)
# ... day, month, weekday
def matches(self, dt):
return (dt.minute in self.minute
and dt.hour in self.hour
and dt.day in self.day
and dt.month in self.month
and dt.weekday() in self.weekday)
Supports wildcards (*), steps (*/6), ranges (9-17), and lists (1,15,28).
The REST API
FastAPI gives us automatic OpenAPI docs and async support:
@app.post("/dags/{dag_id}/trigger")
async def trigger_dag(dag_id: str):
dag = DAG.get(dag_id)
executor = Executor()
dag_run = await executor.execute(dag)
await save_dag_run(dag_run)
return {
"run_id": dag_run.run_id,
"state": dag_run.state.value,
"summary": dag_run.summary(),
}
Endpoints for listing DAGs, triggering runs, querying history, and getting task-level details.
Testing: 107 Tests in 0.94s
Comprehensive test coverage across all components:
| Module | Tests | What's Covered |
|---|---|---|
test_graph.py |
18 | Topo sort, cycles, levels, critical path |
test_dag.py |
14 | Creation, validation, registry, context manager |
test_executor.py |
17 | Execution, failures, timeouts, trigger rules, async |
test_retry.py |
10 | Backoff, jitter, exhaustion |
test_store.py |
9 | Save, query, filter, cleanup |
test_scheduler.py |
18 | Cron parsing, matching, scheduling |
test_api.py |
11 | All REST endpoints |
Key testing patterns:
- In-memory SQLite for fast, isolated storage tests
- ASGITransport from httpx for testing FastAPI without a server
- DAG registry cleanup between tests to prevent leakage
-
pytest-asyncio with
asyncio_mode = "auto"for clean async tests
What I Learned
Kahn's algorithm is elegant — just in-degree tracking and a queue. O(V+E) and naturally detects cycles.
Trigger rules change everything —
ALL_DONEfor cleanup,ONE_SUCCESSfor fan-in patterns,NONE_FAILEDfor conditional logic. These four rules cover most real-world patterns.Jitter in retries matters — without it, all failed tasks retry at the same time (thundering herd). A ±25% jitter spreads the load.
asyncio.Semaphore is perfect for concurrency limits — combined with
asyncio.gather()for level-based parallelism.SQLite + WAL mode is surprisingly capable — for single-process orchestrators, it's all you need. No Postgres required.
Try It
git clone https://github.com/hajirufai/taskflow.git
cd taskflow
pip install -e ".[dev]"
pytest tests/ -v
python examples/etl_pipeline.py
The full source is ~3,100 lines of Python with 107 tests. It's a great foundation for understanding how workflow orchestrators work under the hood.
GitHub: hajirufai/taskflow
Building things from scratch is the fastest way to truly understand them. If you're interviewing for data engineering roles, being able to explain topological sort and trigger rules from first principles sets you apart.

















