慣性聚合 高效追讀感興趣之博客、新聞、科技資訊
閱原文 以慣性聚合開啟

推薦訂閱源

Google DeepMind News
Google DeepMind News
人人都是产品经理
人人都是产品经理
M
MIT News - Artificial intelligence
博客园 - 叶小钗
MyScale Blog
MyScale Blog
V
Visual Studio Blog
月光博客
月光博客
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
量子位
I
InfoQ
有赞技术团队
有赞技术团队
阮一峰的网络日志
阮一峰的网络日志
Jina AI
Jina AI
V
V2EX
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
Blog — PlanetScale
Blog — PlanetScale
Last Week in AI
Last Week in AI
雷峰网
雷峰网
Stack Overflow Blog
Stack Overflow Blog
博客园 - Franky

DEV Community

Authentication Security Deep Dive: From Brute Force to Salted Hashing (With Java Examples) Why AI Systems Don’t Fail — They Drift Spilling beans for how i learn for exam😁"Reinforcement Learning Cheat Sheet" I Replaced Chrome with Safari for AI Browser Automation. Here's What Broke (and What Finally Worked) How Python Borrows Other People's Work The $40 Architecture: Processing 1 Billion API Requests with 99.99% Uptime Vibe Coding: A Workflow Guide (From Zero to SaaS) Most webhook security guides protect the wrong side. The scary part is delivery. Headless CMS for TanStack Start: Build a Blog with Cosmic EU Age Verification App "Hacked in 2 Minutes" — What Actually Happened Comfy Cloud’s delete function does not actually remove files Running AI Models on GPU Cloud Servers: A Beginner Guide Event-driven media intelligence with AWS Step Functions and Bedrock I scored 500 AI prompts across 8 quality dimensions — here's what broke How to Call Google Gemini API from Next.js (Free Tier, No Backend Needed) The Portal Protocol: Reclaiming Human Connection in the Age of AI How to Fix Your Team's Scattered Knowledge Problem With a Self-Hosted Forum Intro to tc Cloud Functors: A Graph-First Mental Model for the Modern Cloud Designing Multi-Tenant Backends With Both Ownership and Team Access I Built a Neumorphic CSS Library with 77+ Components — Here's What I Learned PostgreSQL Performance Optimization: Why Connection Pooling Is Critical at Scale Cómo construí un SaaS multi-rubro para gestionar expensas en Argentina con FastAPI + Vue 3 🚀 I Built an Ethical Hacking Scanner Tool – Open Source Project I Replaced /usage and /context in Claude Code With a Single Statusline A Pythonic Way to Handle Emails (IMAP/SMTP) with Auto-Discovery and AI-Ready Design I Collected 8.9 Million Polymarket Price Points — Here's What I Found About How Markets Really Move EcoTrack AI — Carbon Footprint Tracker & Dashboard Everyone's Using AI. No One Agrees How. 5 self-hosted ebook managers worth trying in 2026 Building Your First AI Agent with LangChain: From Chatbot to Autonomous Assistant Common SOC 2 Failures (Real World) Stop Vibe-Checking Your AI App: A Practical Guide to Evals How to Use SonarQube and SonarScanner Locally to Level Up Your Code Quality Your Next To-Do App Is Dead — I Replaced Mine with an OpenClaw AI Sign a Nostr event in 60 lines of Python using coincurve — no nostr-sdk, no nbxplorer, no rust toolchain ITGC Audit Explained Like You’re in Big 4 Patch Tuesday abril 2026: Microsoft parcha 163 vulnerabilidades y un zero-day en SharePoint Stop scraping everything: a better way to track competitor price changes Listing on MCPize + the Official MCP Registry while routing payments OUTSIDE the marketplace — how I kept 100% of my x402 revenue Building an AI-Powered Risk Intelligence System Using Serverless Architecture Why We Ripped Function Overloading Out of Our AI Toolchain Testing AI-Generated Code: How to Actually Know If It Works SaaS Churn Is Killing Your Business. Here Is What to Do About It (Without a Support Team) The Speed of AI Is No Longer Linear - And Self-Improving Models Are Why How to Implement RBAC for MCP Tools: A Practical Guide for Engineering Teams From Standard Quote to Persuasive Proposal: AI Automation for Arborists I built a CLI that scaffolds complete multi-tenant SaaS apps Axios CVE-2025–62718: The Silent SSRF Bug That Could Be Hiding in Your Node.js App Right Now The dashboard that ended our friendship Data Pipelines Explained Simply (and How to Build Them with Python)
自 Python 至生产流程:Apache Airflow 实践指南
Varun Joshi · 2026-05-25 · via DEV Community

尔已习用Python,且撰脚本以取数据、洁之,载之某处,或竟有之。while True或有服务器上之循环,或为汝所惧之cron任务。

此教程乃为尔设

Apache Airflow者,乃将君之Python脚本自"运行于吾机"化而为"每日晨六时可靠运行,失败则重试,有故则发警,且具视之界面以明其状"之利器也。此乃数据工程之众团队所标准之编排工具,且更易近,盖其貌若此也。

及至是文之终,君将晓Airflow之思,且得可运作之DAG,以调度真实之数据管也。

究何为 Airflow (及其非是)?

Airflow 乃工作流调度之器。其不迁数据,惟调度监控为之者。

想之如极智之 CRON 作业:

  1. 知任务当行之序。
  2. 自行重试其败者。
  3. 留每行之全史。
  4. 显于界面诸事
  5. 事有乖蹇则鸣警

非数据处理之机也。犹须书Python以成实事。Airflow决其行时,及其败时何以为之

两分钟内明其要义

触文之前有三辞当晓:

有向无环图(Directed Acyclic Graph):此乃汝之流水线。有向无环者,谓任务不可自环也。

任务:一单元之工作,在图内,或运行一函数,或执行一查询,或调用一API,或诸般事务。

算子:任务之基石。PythonOperator 运行 Python 函數。BashOperator 執行 shell 命令。PostGresOperator 執行 SQL,airflow 對萬事皆有其運算符。

設置 Airflow 本地

得 Airflow 個人運行之捷徑,乃藉 pip:

pip install apache-airflow
airflow dbt init
airflow user create \
   --username admin \
   --password admin \
   --firstname your \
   --lastname name \
   --role admin \
   --email admin@x.com \

進入全屏模式 退出全屏模式

於二異終端啟動調度器及伺服器

#Terminal 1
airflow scheduler

#Terminal 2
airflow webserver --port 8080

入全屏模式 出全屏模式

今访http://localhost:8080——汝将见Airflow之界面,已备诸例DAG矣.

君之第一DAG:真实之数据管道

今营实用之物,非徒“Hello world”,吾辈将营一管道:

一、自公共API取数据
二、涤其垢而化其形
三、存为 CSV 文件

此若真实之摄取之脉,无企业之繁杂。

创一文件名曰weather_pipeline.py君之气流之内dags/文夹

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import pandas as pd
import os

default_args= {
    'owner': 'varun'
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_in_future': False,
}

#Define a DAG----

with DAG(
   dag_id='weather_pipeline'
   default_args = default_args,
   description = 'Fetch, transform and save daily weather data'
   schedule_interval = '0 6 * * *' # runs daily at 6 am
   start_date=datetime(2024,1,1)
   catchup = False,
   tags= ['Weather','Tutorial']
) as dag:
   pass

入全景模式 出全屏模式

*数事须记
*

  • 调度间隔为一种CRON表达式。
  • catchup=False告 airflow勿行历日期之全start_date今朝。汝几欲之。False
  • Retries =2气流转而试之再,方以为败。

增补事目

今增三事,置于其处pass

def fetch_weather(**context):
    """Pull weather from open meteo (free, No API needed)"""
    url = "https://api.open-meteo.com/v1/forecast"
    params = {
       "latitude": 28.6,
       "longitude": 77.2,
       "daily":"temperature_2m_max, temperature_2m_min, precipitation_sum",
       "timezone":"Asia/Kolkata",
       "forecast_days": 1
 }
    response = request.get(url, params=params)
    response.raise_for_status() #raise an exception

    data = response.json()
    #Push data to Xcom so the next task can access it
    context['ti'].xcom_push(key='raw_weather', value =data)
    print(f"Fetched weather data for {data['daily']['time'][0]}")

def transform_weather(**context):
   """clean and reshape the raw API response"""
   raw = context['ti'].xcom_pull(key='raw_weather', task_ids='fetch_weather')
   daily = raw['daily']

   df = pd.DataFrame({
        'date': daily['time']
        'temp_max_c': daily['temperature_2m_max'],
        'temp_min_c':daily['temperature_2m_min'],
        'precipitation_mm':daily['precipitation_mm'],
        })
   context['ti'].xcom_push(key='clean_weather', value=df.to_dict('records'))
        print(f"✓ Transformed {len(df)} rows")


   df save_weather(**context):
      """Append the cleaned data to csv file"""
      records = context["ti"].xcom_pull(key='clean_weather', task_ids='transform_weather')

      df= pd.DataFrame(records)
      output_path= '/tmp/weather_data.csv'
      file_exists = os.path.exists(output_path)

      df.to_csv(output_path, mode='a', header=not file_exists, index=False)

      print(f"✓ Saved to {output_path}")


   #Wiring the tasks
   t1 = PythonOperator(task_id = 'fetch_weather', python_callable=fetch_weather)
   t2 = PythonOperator(task_id = 'transform_weather', python_callable=transform_weather)
   t3 = PythonOperator(task_id = 'Save_weather', python_callable=Save_weather)

入全景模式 退出全屏模式

下方之t1>>t2>>t3,乃言“依此序行之事”。此等事初见或觉怪诞,然久则习之。

Xcom:任务如何互语

汝必已察xcom_pushxcom_pull于码中。
此乃气流任务相授数据之道也。

XCom(cross-communication)乃Airflow内置之小键存储也。
一务推值,次务引之,依务号与键。

#Task-1 Push
context['ti'].xcom_push(key='my_data', value={'rows':42})
#Task-2 Pull
data = context['ti'].xcom_pull(key='my_data', task_ids='task_1')

入全屏模式 出全屏模式

Xcom存于Airflow元数据数据库,故非为大数据集而设。宜用于小数据负载——计数、文件路径、API响应。若遇大数据,当写入S3/GCS,而以文件路径经Xcom传递之。

何如行之

既文件至汝处dags/文件夹Airflow自会取之。汝当见其现于http://localhost:8080

手触而发之

  1. 觅之weather_pipeline于DAG之列。
  2. 启之。
  3. 触发有向无环图

观图示,任务当随其成而转绿。若有所败,则任务显赤。

生产中要紧之模式

  1. 用之execution_date为幂等管道。
def fetch_weather(**context):
    run_date = context['execution_date'].strftime('%Y-%m-%d')
    #fetch data for that specific date

入全景模式 退出全屏模式

是汝之责,可重行之,使若此失败于星期二,则可于星期三行之,以取星期二之数据

  1. 常设retriesretry_delay

网络之呼不达。API崩,数据库缓
设重试之,可免子时之扰

default_args = {
      'retries': 3
      'retry_delay' : timedelta(minutes=10)

}

进入全屏模式 退出全屏模式

  1. 用之on_failure_callback以警之
def alert_on_failure(context):
    #send a slack message, email etc
    print(f"Task failed:{context['task_instance'].task_id}")

default_args = {
   'on_failure_callback': alert_on_failure,
}

入全景模式 出全屏模式

  1. 任事以小,专意以精

一事一务,勿作巨函数以包揽万端。小务之务,则重试之洁,日志之易读也。

斯可矣

汝自手动于机中运行 Python 代码,转而调度之,兼具重试之能,并自动依序执行任务,且败则通知.

学之曲实有,然多在布置。既撰二三 DAG,则法成而不可复归 CRON矣。