慣性聚合 関心のあるブログ、ニュース、テクノロジーを効率的に追跡
原文を読む 慣性聚合で開く

おすすめ購読元

Google DeepMind News
Google DeepMind News
人人都是产品经理
人人都是产品经理
M
MIT News - Artificial intelligence
博客园 - 叶小钗
MyScale Blog
MyScale Blog
V
Visual Studio Blog
月光博客
月光博客
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
量子位
I
InfoQ
有赞技术团队
有赞技术团队
阮一峰的网络日志
阮一峰的网络日志
Jina AI
Jina AI
V
V2EX
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
Blog — PlanetScale
Blog — PlanetScale
Last Week in AI
Last Week in AI
雷峰网
雷峰网
Stack Overflow Blog
Stack Overflow Blog
博客园 - Franky

DEV Community

Authentication Security Deep Dive: From Brute Force to Salted Hashing (With Java Examples) Why AI Systems Don’t Fail — They Drift Spilling beans for how i learn for exam😁"Reinforcement Learning Cheat Sheet" I Replaced Chrome with Safari for AI Browser Automation. Here's What Broke (and What Finally Worked) How Python Borrows Other People's Work The $40 Architecture: Processing 1 Billion API Requests with 99.99% Uptime Vibe Coding: A Workflow Guide (From Zero to SaaS) Most webhook security guides protect the wrong side. The scary part is delivery. Headless CMS for TanStack Start: Build a Blog with Cosmic EU Age Verification App "Hacked in 2 Minutes" — What Actually Happened Comfy Cloud’s delete function does not actually remove files Running AI Models on GPU Cloud Servers: A Beginner Guide Event-driven media intelligence with AWS Step Functions and Bedrock I scored 500 AI prompts across 8 quality dimensions — here's what broke How to Call Google Gemini API from Next.js (Free Tier, No Backend Needed) The Portal Protocol: Reclaiming Human Connection in the Age of AI How to Fix Your Team's Scattered Knowledge Problem With a Self-Hosted Forum Intro to tc Cloud Functors: A Graph-First Mental Model for the Modern Cloud Designing Multi-Tenant Backends With Both Ownership and Team Access I Built a Neumorphic CSS Library with 77+ Components — Here's What I Learned PostgreSQL Performance Optimization: Why Connection Pooling Is Critical at Scale Cómo construí un SaaS multi-rubro para gestionar expensas en Argentina con FastAPI + Vue 3 🚀 I Built an Ethical Hacking Scanner Tool – Open Source Project I Replaced /usage and /context in Claude Code With a Single Statusline A Pythonic Way to Handle Emails (IMAP/SMTP) with Auto-Discovery and AI-Ready Design I Collected 8.9 Million Polymarket Price Points — Here's What I Found About How Markets Really Move EcoTrack AI — Carbon Footprint Tracker & Dashboard Everyone's Using AI. No One Agrees How. 5 self-hosted ebook managers worth trying in 2026 Building Your First AI Agent with LangChain: From Chatbot to Autonomous Assistant Common SOC 2 Failures (Real World) Stop Vibe-Checking Your AI App: A Practical Guide to Evals How to Use SonarQube and SonarScanner Locally to Level Up Your Code Quality Your Next To-Do App Is Dead — I Replaced Mine with an OpenClaw AI Sign a Nostr event in 60 lines of Python using coincurve — no nostr-sdk, no nbxplorer, no rust toolchain ITGC Audit Explained Like You’re in Big 4 Patch Tuesday abril 2026: Microsoft parcha 163 vulnerabilidades y un zero-day en SharePoint Stop scraping everything: a better way to track competitor price changes Listing on MCPize + the Official MCP Registry while routing payments OUTSIDE the marketplace — how I kept 100% of my x402 revenue Building an AI-Powered Risk Intelligence System Using Serverless Architecture Why We Ripped Function Overloading Out of Our AI Toolchain Testing AI-Generated Code: How to Actually Know If It Works SaaS Churn Is Killing Your Business. Here Is What to Do About It (Without a Support Team) The Speed of AI Is No Longer Linear - And Self-Improving Models Are Why How to Implement RBAC for MCP Tools: A Practical Guide for Engineering Teams From Standard Quote to Persuasive Proposal: AI Automation for Arborists I built a CLI that scaffolds complete multi-tenant SaaS apps Axios CVE-2025–62718: The Silent SSRF Bug That Could Be Hiding in Your Node.js App Right Now The dashboard that ended our friendship Data Pipelines Explained Simply (and How to Build Them with Python)
Pythonからプロダクションパイプラインへ:Apache Airflowの実践ガイド
Varun Joshi · 2026-05-25 · via DEV Community

Pythonを使用しており、データを取得し、クリーンアップしてどこかにロードするスクリプトを書いていたかもしれません。おそらくサーバーのどこかにwhile Trueのループが走っているか、またはcronジョブに触れられないと思っているかもしれません。

このチュートリアルはあなたのためです。

Apache Airflowは、あなたのPythonスクリプトを「私のマシンで実行する」から「毎朝6時に確実に実行し、失敗した場合に再試行し、何か問題が発生した場合にアラートを送信し、何が正確に起こったかを確認できるUIがある」ツールです。これは大多数のデータエンジニアリングチームで標準的なオーケストレーションツールであり、見た目が分かりやすいため、よりアクセスしやすくなっています。

この投稿の終わりには、Airflowがどのように動作するかを理解し、実際のデータパイプラインをスケジュールする動作するDAGを持つようになります。

実際のAirflowとは(何ではないか)?

Airflowはワークフロー・オーケストレーターです。データを移動することはありませんが、タスクを実行するものをスケジュールし監視します。

非常に賢いCRONジョブのように考えてください:

  1. タスクが実行される順序を知っています。
  2. 失敗したタスクを自動的に再試行します。
  3. 各実行の完全な履歴を保持します。
  4. UI上のすべてを表示します。
  5. 問題が発生したときにアラートを送信します。

それは何ではありません:データ処理エンジン。実際の作業を行うためにPythonをまだ書きます。Airflowはそれをいつ実行するかと失敗したときに何をするかを決定します。

2分間でコアコンセプト

コードを触る前に知っておくべき3つの用語:

DAG(有向非巡回グラフ):これはあなたのパイプラインです。DAGはタスクのセットとその実行順序を定義するPythonファイルです。非巡回部分は、タスクが自分自身にループしないことを意味します。

Task: DAG内の単一の作業単位で、Python関数を実行するか、SQLクエリを実行するか、APIを呼び出すか、他にも数十種類のことを行うことができます。

Operator: タスクの構成要素です。PythonOperator は Python 関数を実行します。BashOperator はシェルコマンドを実行します。PostGresOperator は SQL を実行し、airflow にはほぼすべての操作に対応する operator があります。

ローカルで Airflow を設定する

Airflow をローカルで実行する最も簡単な方法は pip です:

pip install apache-airflow
airflow dbt init
airflow user create \
   --username admin \
   --password admin \
   --firstname your \
   --lastname name \
   --role admin \
   --email admin@x.com \

フルスクリーンモードに入る フルスクリーンモードから退出する

別々のターミナルでスケジューラとサーバーを起動する

#Terminal 1
airflow scheduler

#Terminal 2
airflow webserver --port 8080

フルスクリーンモードを開始 フルスクリーンモードを終了

今、http://localhost:8080にアクセスしてください - すでに多くのサンプルDAGが含まれたAirflow UIが表示されます.

あなたの最初のDAG: 実際のデータパイプライン

実用的なものを作成しましょう「Hello world」だけではなく、パイプラインを作成します

1- 公開APIからデータを取得します
2- 清掃し、変換します
3- CSV形式で保存

これは実際のインジェストパイプラインに似ており、企業の複雑さはありません。

ファイルを作成してweather_pipeline.pyあなたのairflowの内部にdags/フォルダ。

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import pandas as pd
import os

default_args= {
    'owner': 'varun'
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_in_future': False,
}

#Define a DAG----

with DAG(
   dag_id='weather_pipeline'
   default_args = default_args,
   description = 'Fetch, transform and save daily weather data'
   schedule_interval = '0 6 * * *' # runs daily at 6 am
   start_date=datetime(2024,1,1)
   catchup = False,
   tags= ['Weather','Tutorial']
) as dag:
   pass

フルスクリーンモードに入る フルスクリーンモードを終了する

*注意点いくつか
*

  • スケジュール間隔はCRON式です。
  • catchup=FalseはAirflowにstart_dateから今日までのすべての歴史の日付を実行しないように伝えます。ほとんどの場合、それをFalse
  • Retries =2に設定する必要があります。これにより、Airflowは失敗とマークする前に2回試行します。

タスクの追加

今、pass

def fetch_weather(**context):
    """Pull weather from open meteo (free, No API needed)"""
    url = "https://api.open-meteo.com/v1/forecast"
    params = {
       "latitude": 28.6,
       "longitude": 77.2,
       "daily":"temperature_2m_max, temperature_2m_min, precipitation_sum",
       "timezone":"Asia/Kolkata",
       "forecast_days": 1
 }
    response = request.get(url, params=params)
    response.raise_for_status() #raise an exception

    data = response.json()
    #Push data to Xcom so the next task can access it
    context['ti'].xcom_push(key='raw_weather', value =data)
    print(f"Fetched weather data for {data['daily']['time'][0]}")

def transform_weather(**context):
   """clean and reshape the raw API response"""
   raw = context['ti'].xcom_pull(key='raw_weather', task_ids='fetch_weather')
   daily = raw['daily']

   df = pd.DataFrame({
        'date': daily['time']
        'temp_max_c': daily['temperature_2m_max'],
        'temp_min_c':daily['temperature_2m_min'],
        'precipitation_mm':daily['precipitation_mm'],
        })
   context['ti'].xcom_push(key='clean_weather', value=df.to_dict('records'))
        print(f"✓ Transformed {len(df)} rows")


   df save_weather(**context):
      """Append the cleaned data to csv file"""
      records = context["ti"].xcom_pull(key='clean_weather', task_ids='transform_weather')

      df= pd.DataFrame(records)
      output_path= '/tmp/weather_data.csv'
      file_exists = os.path.exists(output_path)

      df.to_csv(output_path, mode='a', header=not file_exists, index=False)

      print(f"✓ Saved to {output_path}")


   #Wiring the tasks
   t1 = PythonOperator(task_id = 'fetch_weather', python_callable=fetch_weather)
   t2 = PythonOperator(task_id = 'transform_weather', python_callable=transform_weather)
   t3 = PythonOperator(task_id = 'Save_weather', python_callable=Save_weather)

の代わりに3つのタスクを追加しています。Enter fullscreen mode 全画面モードを終了

下のt1>>t2>>t3は「この順番で実行する」という意味の方法です。これは少し奇妙に思えるかもしれませんが、後で慣れますよ.

Xcom:タスクがどう話し合うか

コードでxcom_pushxcom_pullに気づいたでしょう。
このように Airflow のタスクはお互いにデータを渡します。

XCom(クロスコミュニケーション)は Airflow に組み込まれた小さなキー付きストアです。
一つのタスクが値をプッシュし、次のタスクは TaskID と Key でそれをプルします。

#Task-1 Push
context['ti'].xcom_push(key='my_data', value={'rows':42})
#Task-2 Pull
data = context['ti'].xcom_pull(key='my_data', task_ids='task_1')

フルスクリーンモードに入る フルスクリーンモードから退出する

XcomはAirflowメタデータデータベースに格納されるため、大規模なデータセットには適していない。小さなペイロード(カウント、ファイルパス、APIレスポンス)のために使用する。大規模なデータの場合は、S3/GCSに書き込み、ファイルパスをXcom経由で渡す。

実行方法

ファイルがあなたのdags/フォルダにあると一旦、Airflowが自動的にそれを拾う。http://localhost:8080に表示されることを確認する。

手動でトリガーするには:

  1. はDAGリストでweather_pipelineを探してください.
  2. をオンにします.
  3. DAGをトリガーします.

グラフビューを確認し、タスクが完了すると緑色になります。何か失敗するとタスクは赤色になります.

生産環境で重要なパターン

  1. execution_dateを使用して、idempotentなパイプラインを作成します.
def fetch_weather(**context):
    run_date = context['execution_date'].strftime('%Y-%m-%d')
    #fetch data for that specific date

フルスクリーンモードに入ります フルスクリーンモードを終了

これにより、再実行可能なものになりますので、火曜日に失敗した場合、水曜日に火曜日のデータを取得するために実行できます

  1. 常にretriesretry_delay

ネットワーク呼び出しが失敗します。APIがダウンし、データベースが遅くなります
再試行を設定することで、午前2時の電話を節約できます

default_args = {
      'retries': 3
      'retry_delay' : timedelta(minutes=10)

}

フルスクリーンモードに入る フルスクリーンモードを終了

  1. はアラートにon_failure_callbackを使用してください
def alert_on_failure(context):
    #send a slack message, email etc
    print(f"Task failed:{context['task_instance'].task_id}")

default_args = {
   'on_failure_callback': alert_on_failure,
}

フルスクリーンモードを開始 フルスクリーンモードを終了

  1. タスクは小さく集中させなさい

一つのタスクは一つのこと、全てをやる巨大な関数を書かないで。小さなタスクはクリーンな再試行と読みやすいログを意味する

それだけ

あなたは自分のマシンで手動でPythonコードを実行していたものを、リトライ機能付きでスケジューリングするように変更し、タスクを自動的にステップバイステップで実行し、失敗通知を伴うようにしました

学習曲線は本物ですが、それは主にセットアップの段階です。2つか3つほどのDAGを書いたら、パターンがクリックしてCRONに戻ることはありません