尔已习用Python,且撰脚本以取数据、洁之,载之某处,或竟有之。while True或有服务器上之循环,或为汝所惧之cron任务。
此教程乃为尔设
Apache Airflow者,乃将君之Python脚本自"运行于吾机"化而为"每日晨六时可靠运行,失败则重试,有故则发警,且具视之界面以明其状"之利器也。此乃数据工程之众团队所标准之编排工具,且更易近,盖其貌若此也。
及至是文之终,君将晓Airflow之思,且得可运作之DAG,以调度真实之数据管也。
究何为 Airflow (及其非是)?
Airflow 乃工作流调度之器。其不迁数据,惟调度监控为之者。
想之如极智之 CRON 作业:
- 知任务当行之序。
- 自行重试其败者。
- 留每行之全史。
- 显于界面诸事
- 事有乖蹇则鸣警
非数据处理之机也。犹须书Python以成实事。Airflow决其行时,及其败时何以为之
两分钟内明其要义
触文之前有三辞当晓:
有向无环图(Directed Acyclic Graph):此乃汝之流水线。有向无环者,谓任务不可自环也。
任务:一单元之工作,在图内,或运行一函数,或执行一查询,或调用一API,或诸般事务。
算子:任务之基石。PythonOperator 运行 Python 函數。BashOperator 執行 shell 命令。PostGresOperator 執行 SQL,airflow 對萬事皆有其運算符。
設置 Airflow 本地
得 Airflow 個人運行之捷徑,乃藉 pip:
pip install apache-airflow
airflow dbt init
airflow user create \
--username admin \
--password admin \
--firstname your \
--lastname name \
--role admin \
--email admin@x.com \
於二異終端啟動調度器及伺服器
#Terminal 1
airflow scheduler
#Terminal 2
airflow webserver --port 8080
今访http://localhost:8080——汝将见Airflow之界面,已备诸例DAG矣.
君之第一DAG:真实之数据管道
今营实用之物,非徒“Hello world”,吾辈将营一管道:
一、自公共API取数据
二、涤其垢而化其形
三、存为 CSV 文件
此若真实之摄取之脉,无企业之繁杂。
创一文件名曰weather_pipeline.py君之气流之内dags/文夹
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import pandas as pd
import os
default_args= {
'owner': 'varun'
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_in_future': False,
}
#Define a DAG----
with DAG(
dag_id='weather_pipeline'
default_args = default_args,
description = 'Fetch, transform and save daily weather data'
schedule_interval = '0 6 * * *' # runs daily at 6 am
start_date=datetime(2024,1,1)
catchup = False,
tags= ['Weather','Tutorial']
) as dag:
pass
*数事须记
*
- 调度间隔为一种CRON表达式。
-
catchup=False告 airflow勿行历日期之全start_date今朝。汝几欲之。False -
Retries =2气流转而试之再,方以为败。
增补事目
今增三事,置于其处pass
def fetch_weather(**context):
"""Pull weather from open meteo (free, No API needed)"""
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": 28.6,
"longitude": 77.2,
"daily":"temperature_2m_max, temperature_2m_min, precipitation_sum",
"timezone":"Asia/Kolkata",
"forecast_days": 1
}
response = request.get(url, params=params)
response.raise_for_status() #raise an exception
data = response.json()
#Push data to Xcom so the next task can access it
context['ti'].xcom_push(key='raw_weather', value =data)
print(f"Fetched weather data for {data['daily']['time'][0]}")
def transform_weather(**context):
"""clean and reshape the raw API response"""
raw = context['ti'].xcom_pull(key='raw_weather', task_ids='fetch_weather')
daily = raw['daily']
df = pd.DataFrame({
'date': daily['time']
'temp_max_c': daily['temperature_2m_max'],
'temp_min_c':daily['temperature_2m_min'],
'precipitation_mm':daily['precipitation_mm'],
})
context['ti'].xcom_push(key='clean_weather', value=df.to_dict('records'))
print(f"✓ Transformed {len(df)} rows")
df save_weather(**context):
"""Append the cleaned data to csv file"""
records = context["ti"].xcom_pull(key='clean_weather', task_ids='transform_weather')
df= pd.DataFrame(records)
output_path= '/tmp/weather_data.csv'
file_exists = os.path.exists(output_path)
df.to_csv(output_path, mode='a', header=not file_exists, index=False)
print(f"✓ Saved to {output_path}")
#Wiring the tasks
t1 = PythonOperator(task_id = 'fetch_weather', python_callable=fetch_weather)
t2 = PythonOperator(task_id = 'transform_weather', python_callable=transform_weather)
t3 = PythonOperator(task_id = 'Save_weather', python_callable=Save_weather)
下方之t1>>t2>>t3,乃言“依此序行之事”。此等事初见或觉怪诞,然久则习之。
Xcom:任务如何互语
汝必已察xcom_push与xcom_pull于码中。
此乃气流任务相授数据之道也。
XCom(cross-communication)乃Airflow内置之小键存储也。
一务推值,次务引之,依务号与键。
#Task-1 Push
context['ti'].xcom_push(key='my_data', value={'rows':42})
#Task-2 Pull
data = context['ti'].xcom_pull(key='my_data', task_ids='task_1')
Xcom存于Airflow元数据数据库,故非为大数据集而设。宜用于小数据负载——计数、文件路径、API响应。若遇大数据,当写入S3/GCS,而以文件路径经Xcom传递之。
何如行之
既文件至汝处dags/文件夹Airflow自会取之。汝当见其现于http://localhost:8080。
手触而发之
- 觅之
weather_pipeline于DAG之列。 - 启之。
- 触发有向无环图
观图示,任务当随其成而转绿。若有所败,则任务显赤。
生产中要紧之模式
- 用之
execution_date为幂等管道。
def fetch_weather(**context):
run_date = context['execution_date'].strftime('%Y-%m-%d')
#fetch data for that specific date
是汝之责,可重行之,使若此失败于星期二,则可于星期三行之,以取星期二之数据
- 常设
retries与retry_delay
网络之呼不达。API崩,数据库缓
设重试之,可免子时之扰
default_args = {
'retries': 3
'retry_delay' : timedelta(minutes=10)
}
- 用之
on_failure_callback以警之
def alert_on_failure(context):
#send a slack message, email etc
print(f"Task failed:{context['task_instance'].task_id}")
default_args = {
'on_failure_callback': alert_on_failure,
}
- 任事以小,专意以精
一事一务,勿作巨函数以包揽万端。小务之务,则重试之洁,日志之易读也。
斯可矣
汝自手动于机中运行 Python 代码,转而调度之,兼具重试之能,并自动依序执行任务,且败则通知.
学之曲实有,然多在布置。既撰二三 DAG,则法成而不可复归 CRON矣。











