你已經在使用 Python,並且你已經寫過腳本來抓取數據、清理數據並將其載入某處,或許你甚至在某個伺服器上運行著一個 while True 環路,或者你害怕觸碰的 cron 任務。
這個教學是為了你
Apache Airflow 是將您的 Python 腳本從 "在我的電腦上運行" 變為 "每天早上 6 點可靠運行、失敗時重試、出現問題時發送警報,並有 UI 來查看確切情況的工具。它是大多數數據工程團隊的標準協調工具,而且更易於使用,因為它看起來這樣.
到這篇文章結束時,您將理解 Airflow 是如何思考的,並且您將擁有一個可工作的 DAG,用於排程真實的數據管線。
Airflow 真正是什么(以及不是什么)?
Airflow 是一個工作流程排程器。它本身不會移動數據,但它排程並監控執行這些任務的程序。
考慮它像一個非常聰明的 CRON 工作:
- 知道任務應該執行的順序。
- 自動重試失敗的任務。
- 保留每個執行的完整歷史記錄。
- 顯示您在UI上的所有內容
- 當事情出錯時發送警報
它不是數據處理引擎:您仍然需要編寫Python來完成實際工作。Airflow決定何時運行它以及當它失敗時要做什么
兩分鐘內了解核心概念
在接觸代碼之前,您應該知道的三個術語:
有向無環圖(Directed Acyclic Graph):這是你的流程。有向無環圖是一個定義任務集及其執行順序的 Python 檔案。無環部分只是表示任務不能自我迴圈。
任務: DAG 內的一個單位工作,可以是執行一個 Python 函數、執行一個 SQL 查詢、呼叫一個 API 或其他 dozens of 事情。
操作員: 任務的構建塊。PythonOperator 執行 Python 函數。BashOperator 執行 shell 命令。PostGresOperator 執行 SQL,airflow 對幾乎所有事情都有操作員。
在本地設定 Airflow
在本地運行 Airflow 最快的方法是使用 pip:
pip install apache-airflow
airflow dbt init
airflow user create \
--username admin \
--password admin \
--firstname your \
--lastname name \
--role admin \
--email admin@x.com \
在兩個不同的終端啟動調度程序和伺服器
#Terminal 1
airflow scheduler
#Terminal 2
airflow webserver --port 8080
現在請訪問http://localhost:8080 -您將看到已經有許多範例 DAG 的 Airflow UI.
您的第一個 DAG:一個實際的數據流程
讓我們建立一些實用的東西,而不僅僅是「Hello world」,我們將建立一個流程:
1- 從公共 API 拉取數據
2- 清潔並轉換它
3- 存檔為 CSV
這反映了一個真實的消費流程,沒有企業級的複雜性。
建立一個名為weather_pipeline.py在你 airflow 內部dags/資料夾。
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import pandas as pd
import os
default_args= {
'owner': 'varun'
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_in_future': False,
}
#Define a DAG----
with DAG(
dag_id='weather_pipeline'
default_args = default_args,
description = 'Fetch, transform and save daily weather data'
schedule_interval = '0 6 * * *' # runs daily at 6 am
start_date=datetime(2024,1,1)
catchup = False,
tags= ['Weather','Tutorial']
) as dag:
pass
*幾點注意事項
*
- schedule_interval 是一個 CRON 表達式.
-
catchup=False告知 Airflow 不需執行start_date與今天之間的所有歷史日期. 您幾乎總是希望它False -
Retries =2意味著 Airflow 將嘗試兩次才標記為失敗.
添加任務
現在添加三個任務進入 pass
def fetch_weather(**context):
"""Pull weather from open meteo (free, No API needed)"""
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": 28.6,
"longitude": 77.2,
"daily":"temperature_2m_max, temperature_2m_min, precipitation_sum",
"timezone":"Asia/Kolkata",
"forecast_days": 1
}
response = request.get(url, params=params)
response.raise_for_status() #raise an exception
data = response.json()
#Push data to Xcom so the next task can access it
context['ti'].xcom_push(key='raw_weather', value =data)
print(f"Fetched weather data for {data['daily']['time'][0]}")
def transform_weather(**context):
"""clean and reshape the raw API response"""
raw = context['ti'].xcom_pull(key='raw_weather', task_ids='fetch_weather')
daily = raw['daily']
df = pd.DataFrame({
'date': daily['time']
'temp_max_c': daily['temperature_2m_max'],
'temp_min_c':daily['temperature_2m_min'],
'precipitation_mm':daily['precipitation_mm'],
})
context['ti'].xcom_push(key='clean_weather', value=df.to_dict('records'))
print(f"✓ Transformed {len(df)} rows")
df save_weather(**context):
"""Append the cleaned data to csv file"""
records = context["ti"].xcom_pull(key='clean_weather', task_ids='transform_weather')
df= pd.DataFrame(records)
output_path= '/tmp/weather_data.csv'
file_exists = os.path.exists(output_path)
df.to_csv(output_path, mode='a', header=not file_exists, index=False)
print(f"✓ Saved to {output_path}")
#Wiring the tasks
t1 = PythonOperator(task_id = 'fetch_weather', python_callable=fetch_weather)
t2 = PythonOperator(task_id = 'transform_weather', python_callable=transform_weather)
t3 = PythonOperator(task_id = 'Save_weather', python_callable=Save_weather)
底下的t1>>t2>>t3是一種表示「按這個順序執行」的方式。這是一些可能看起來很奇怪但後來你會習慣的事情
Xcom:任務如何溝通
你一定注意到xcom_push和xcom_pull在程式碼中。
這就是 Airflow 執行任務之間如何傳遞資料的方式。
XCom(跨通訊)是 Airflow 中內建的輕量級儲存空間。
一個任務推送一個值,下一個任務透過 TaskID 和 Key 來拉取它。
#Task-1 Push
context['ti'].xcom_push(key='my_data', value={'rows':42})
#Task-2 Pull
data = context['ti'].xcom_pull(key='my_data', task_ids='task_1')
Xcom 會儲存在 Airflow 元資料資料庫中,所以它並非用於大型資料集。用於小型有效載荷 - 數字、檔案路徑、API 回應。對於大型資料,請寫入 S3/GCS,並透過 Xcom 傳遞檔案路徑。
如何執行
一旦檔案在您的 dags/ 資料夾中。Airflow 會自動抓取它。您應該會看到它在 http://localhost:8080 中出現。
要手動觸發:
- 在 DAG 清單中尋找
weather_pipeline。 - 開啟它。
- 觸發 DAG
查看圖形視圖,任務完成時應變為綠色。如果出現錯誤,任務會顯示紅色。
生產中重要的模式
- 對於幂等式管線使用
execution_date。
def fetch_weather(**context):
run_date = context['execution_date'].strftime('%Y-%m-%d')
#fetch data for that specific date
這讓它變成可重執行的任務,以便如果這個任務在星期二失敗,可以在星期三執行以獲取星期二的数据
- 務必設定
retries和retry_delay
網絡呼叫失敗。APIs下線,資料庫變慢
設定重試可以讓你避免凌晨2點的通話
default_args = {
'retries': 3
'retry_delay' : timedelta(minutes=10)
}
- 使用
on_failure_callback進行警告
def alert_on_failure(context):
#send a slack message, email etc
print(f"Task failed:{context['task_instance'].task_id}")
default_args = {
'on_failure_callback': alert_on_failure,
}
- 保持任務簡單集中
一個任務=一件事情,不要寫一個單一的巨大函數來做所有事情。小任務意味著更乾淨的重試和容易閱讀的日誌.
就這樣
你從在電腦上手動運行 Python 程式碼,變成使用重試功能排程執行,並自動逐步執行任務並發送失敗通知。
學習曲線確實存在,但主要是在設定階段。一旦你寫了兩個或三個 DAG,模式就清晰了,你就不再回頭使用 CRON。











