慣性聚合 高效追蹤和閱讀你感興趣的部落格、新聞、科技資訊
閱讀原文 在慣性聚合中打開

推薦訂閱源

Google DeepMind News
Google DeepMind News
人人都是产品经理
人人都是产品经理
M
MIT News - Artificial intelligence
博客园 - 叶小钗
MyScale Blog
MyScale Blog
V
Visual Studio Blog
月光博客
月光博客
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
量子位
I
InfoQ
有赞技术团队
有赞技术团队
阮一峰的网络日志
阮一峰的网络日志
Jina AI
Jina AI
V
V2EX
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
Blog — PlanetScale
Blog — PlanetScale
Last Week in AI
Last Week in AI
雷峰网
雷峰网
Stack Overflow Blog
Stack Overflow Blog
博客园 - Franky

DEV Community

Authentication Security Deep Dive: From Brute Force to Salted Hashing (With Java Examples) Why AI Systems Don’t Fail — They Drift Spilling beans for how i learn for exam😁"Reinforcement Learning Cheat Sheet" I Replaced Chrome with Safari for AI Browser Automation. Here's What Broke (and What Finally Worked) How Python Borrows Other People's Work The $40 Architecture: Processing 1 Billion API Requests with 99.99% Uptime Vibe Coding: A Workflow Guide (From Zero to SaaS) Most webhook security guides protect the wrong side. The scary part is delivery. Headless CMS for TanStack Start: Build a Blog with Cosmic EU Age Verification App "Hacked in 2 Minutes" — What Actually Happened Comfy Cloud’s delete function does not actually remove files Running AI Models on GPU Cloud Servers: A Beginner Guide Event-driven media intelligence with AWS Step Functions and Bedrock I scored 500 AI prompts across 8 quality dimensions — here's what broke How to Call Google Gemini API from Next.js (Free Tier, No Backend Needed) The Portal Protocol: Reclaiming Human Connection in the Age of AI How to Fix Your Team's Scattered Knowledge Problem With a Self-Hosted Forum Intro to tc Cloud Functors: A Graph-First Mental Model for the Modern Cloud Designing Multi-Tenant Backends With Both Ownership and Team Access I Built a Neumorphic CSS Library with 77+ Components — Here's What I Learned PostgreSQL Performance Optimization: Why Connection Pooling Is Critical at Scale Cómo construí un SaaS multi-rubro para gestionar expensas en Argentina con FastAPI + Vue 3 🚀 I Built an Ethical Hacking Scanner Tool – Open Source Project I Replaced /usage and /context in Claude Code With a Single Statusline A Pythonic Way to Handle Emails (IMAP/SMTP) with Auto-Discovery and AI-Ready Design I Collected 8.9 Million Polymarket Price Points — Here's What I Found About How Markets Really Move EcoTrack AI — Carbon Footprint Tracker & Dashboard Everyone's Using AI. No One Agrees How. 5 self-hosted ebook managers worth trying in 2026 Building Your First AI Agent with LangChain: From Chatbot to Autonomous Assistant Common SOC 2 Failures (Real World) Stop Vibe-Checking Your AI App: A Practical Guide to Evals How to Use SonarQube and SonarScanner Locally to Level Up Your Code Quality Your Next To-Do App Is Dead — I Replaced Mine with an OpenClaw AI Sign a Nostr event in 60 lines of Python using coincurve — no nostr-sdk, no nbxplorer, no rust toolchain ITGC Audit Explained Like You’re in Big 4 Patch Tuesday abril 2026: Microsoft parcha 163 vulnerabilidades y un zero-day en SharePoint Stop scraping everything: a better way to track competitor price changes Listing on MCPize + the Official MCP Registry while routing payments OUTSIDE the marketplace — how I kept 100% of my x402 revenue Building an AI-Powered Risk Intelligence System Using Serverless Architecture Why We Ripped Function Overloading Out of Our AI Toolchain Testing AI-Generated Code: How to Actually Know If It Works SaaS Churn Is Killing Your Business. Here Is What to Do About It (Without a Support Team) The Speed of AI Is No Longer Linear - And Self-Improving Models Are Why How to Implement RBAC for MCP Tools: A Practical Guide for Engineering Teams From Standard Quote to Persuasive Proposal: AI Automation for Arborists I built a CLI that scaffolds complete multi-tenant SaaS apps Axios CVE-2025–62718: The Silent SSRF Bug That Could Be Hiding in Your Node.js App Right Now The dashboard that ended our friendship Data Pipelines Explained Simply (and How to Build Them with Python)
從 Python 到生產流程:一本人氣 Apache Airflow 的實用指南
Varun Joshi · 2026-05-25 · via DEV Community

你已經在使用 Python,並且你已經寫過腳本來抓取數據、清理數據並將其載入某處,或許你甚至在某個伺服器上運行著一個 while True 環路,或者你害怕觸碰的 cron 任務。

這個教學是為了你

Apache Airflow 是將您的 Python 腳本從 "在我的電腦上運行" 變為 "每天早上 6 點可靠運行、失敗時重試、出現問題時發送警報,並有 UI 來查看確切情況的工具。它是大多數數據工程團隊的標準協調工具,而且更易於使用,因為它看起來這樣.

到這篇文章結束時,您將理解 Airflow 是如何思考的,並且您將擁有一個可工作的 DAG,用於排程真實的數據管線。

Airflow 真正是什么(以及不是什么)?

Airflow 是一個工作流程排程器。它本身不會移動數據,但它排程並監控執行這些任務的程序。

考慮它像一個非常聰明的 CRON 工作:

  1. 知道任務應該執行的順序。
  2. 自動重試失敗的任務。
  3. 保留每個執行的完整歷史記錄。
  4. 顯示您在UI上的所有內容
  5. 當事情出錯時發送警報

它不是數據處理引擎:您仍然需要編寫Python來完成實際工作。Airflow決定何時運行它以及當它失敗時要做什么

兩分鐘內了解核心概念

在接觸代碼之前,您應該知道的三個術語:

有向無環圖(Directed Acyclic Graph):這是你的流程。有向無環圖是一個定義任務集及其執行順序的 Python 檔案。無環部分只是表示任務不能自我迴圈。

任務: DAG 內的一個單位工作,可以是執行一個 Python 函數、執行一個 SQL 查詢、呼叫一個 API 或其他 dozens of 事情。

操作員: 任務的構建塊。PythonOperator 執行 Python 函數。BashOperator 執行 shell 命令。PostGresOperator 執行 SQL,airflow 對幾乎所有事情都有操作員。

在本地設定 Airflow

在本地運行 Airflow 最快的方法是使用 pip:

pip install apache-airflow
airflow dbt init
airflow user create \
   --username admin \
   --password admin \
   --firstname your \
   --lastname name \
   --role admin \
   --email admin@x.com \

進入全屏模式 退出全屏模式

在兩個不同的終端啟動調度程序和伺服器

#Terminal 1
airflow scheduler

#Terminal 2
airflow webserver --port 8080

進入全螢幕模式 離開全螢幕模式

現在請訪問http://localhost:8080 -您將看到已經有許多範例 DAG 的 Airflow UI.

您的第一個 DAG:一個實際的數據流程

讓我們建立一些實用的東西,而不僅僅是「Hello world」,我們將建立一個流程:

1- 從公共 API 拉取數據
2- 清潔並轉換它
3- 存檔為 CSV

這反映了一個真實的消費流程,沒有企業級的複雜性。

建立一個名為weather_pipeline.py在你 airflow 內部dags/資料夾。

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import pandas as pd
import os

default_args= {
    'owner': 'varun'
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_in_future': False,
}

#Define a DAG----

with DAG(
   dag_id='weather_pipeline'
   default_args = default_args,
   description = 'Fetch, transform and save daily weather data'
   schedule_interval = '0 6 * * *' # runs daily at 6 am
   start_date=datetime(2024,1,1)
   catchup = False,
   tags= ['Weather','Tutorial']
) as dag:
   pass

進入全螢幕模式 離開全螢幕模式

*幾點注意事項
*

  • schedule_interval 是一個 CRON 表達式.
  • catchup=False 告知 Airflow 不需執行 start_date 與今天之間的所有歷史日期. 您幾乎總是希望它 False
  • Retries =2 意味著 Airflow 將嘗試兩次才標記為失敗.

添加任務

現在添加三個任務進入 pass

def fetch_weather(**context):
    """Pull weather from open meteo (free, No API needed)"""
    url = "https://api.open-meteo.com/v1/forecast"
    params = {
       "latitude": 28.6,
       "longitude": 77.2,
       "daily":"temperature_2m_max, temperature_2m_min, precipitation_sum",
       "timezone":"Asia/Kolkata",
       "forecast_days": 1
 }
    response = request.get(url, params=params)
    response.raise_for_status() #raise an exception

    data = response.json()
    #Push data to Xcom so the next task can access it
    context['ti'].xcom_push(key='raw_weather', value =data)
    print(f"Fetched weather data for {data['daily']['time'][0]}")

def transform_weather(**context):
   """clean and reshape the raw API response"""
   raw = context['ti'].xcom_pull(key='raw_weather', task_ids='fetch_weather')
   daily = raw['daily']

   df = pd.DataFrame({
        'date': daily['time']
        'temp_max_c': daily['temperature_2m_max'],
        'temp_min_c':daily['temperature_2m_min'],
        'precipitation_mm':daily['precipitation_mm'],
        })
   context['ti'].xcom_push(key='clean_weather', value=df.to_dict('records'))
        print(f"✓ Transformed {len(df)} rows")


   df save_weather(**context):
      """Append the cleaned data to csv file"""
      records = context["ti"].xcom_pull(key='clean_weather', task_ids='transform_weather')

      df= pd.DataFrame(records)
      output_path= '/tmp/weather_data.csv'
      file_exists = os.path.exists(output_path)

      df.to_csv(output_path, mode='a', header=not file_exists, index=False)

      print(f"✓ Saved to {output_path}")


   #Wiring the tasks
   t1 = PythonOperator(task_id = 'fetch_weather', python_callable=fetch_weather)
   t2 = PythonOperator(task_id = 'transform_weather', python_callable=transform_weather)
   t3 = PythonOperator(task_id = 'Save_weather', python_callable=Save_weather)

Enter fullscreen mode 退出全螢幕模式

底下的t1>>t2>>t3是一種表示「按這個順序執行」的方式。這是一些可能看起來很奇怪但後來你會習慣的事情

Xcom:任務如何溝通

你一定注意到xcom_pushxcom_pull在程式碼中。
這就是 Airflow 執行任務之間如何傳遞資料的方式。

XCom(跨通訊)是 Airflow 中內建的輕量級儲存空間。
一個任務推送一個值,下一個任務透過 TaskID 和 Key 來拉取它。

#Task-1 Push
context['ti'].xcom_push(key='my_data', value={'rows':42})
#Task-2 Pull
data = context['ti'].xcom_pull(key='my_data', task_ids='task_1')

Enter fullscreen mode Exit fullscreen mode

Xcom 會儲存在 Airflow 元資料資料庫中,所以它並非用於大型資料集。用於小型有效載荷 - 數字、檔案路徑、API 回應。對於大型資料,請寫入 S3/GCS,並透過 Xcom 傳遞檔案路徑。

如何執行

一旦檔案在您的 dags/ 資料夾中。Airflow 會自動抓取它。您應該會看到它在 http://localhost:8080 中出現。

要手動觸發:

  1. 在 DAG 清單中尋找weather_pipeline
  2. 開啟它。
  3. 觸發 DAG

查看圖形視圖,任務完成時應變為綠色。如果出現錯誤,任務會顯示紅色。

生產中重要的模式

  1. 對於幂等式管線使用execution_date
def fetch_weather(**context):
    run_date = context['execution_date'].strftime('%Y-%m-%d')
    #fetch data for that specific date

進入全屏模式 退出全螢幕模式

這讓它變成可重執行的任務,以便如果這個任務在星期二失敗,可以在星期三執行以獲取星期二的数据

  1. 務必設定retriesretry_delay

網絡呼叫失敗。APIs下線,資料庫變慢
設定重試可以讓你避免凌晨2點的通話

default_args = {
      'retries': 3
      'retry_delay' : timedelta(minutes=10)

}

進入全螢幕模式 退出全螢幕模式

  1. 使用 on_failure_callback 進行警告
def alert_on_failure(context):
    #send a slack message, email etc
    print(f"Task failed:{context['task_instance'].task_id}")

default_args = {
   'on_failure_callback': alert_on_failure,
}

開啟全螢幕模式 退出全螢幕模式

  1. 保持任務簡單集中

一個任務=一件事情,不要寫一個單一的巨大函數來做所有事情。小任務意味著更乾淨的重試和容易閱讀的日誌.

就這樣

你從在電腦上手動運行 Python 程式碼,變成使用重試功能排程執行,並自動逐步執行任務並發送失敗通知。

學習曲線確實存在,但主要是在設定階段。一旦你寫了兩個或三個 DAG,模式就清晰了,你就不再回頭使用 CRON。