인셔셔RSS 관심 있는 블로그, 뉴스, 기술 정보를 효율적으로 추적하고 읽으세요
원문 읽기 InertiaRSS에서 열기

추천 피드

Google DeepMind News
Google DeepMind News
人人都是产品经理
人人都是产品经理
M
MIT News - Artificial intelligence
博客园 - 叶小钗
MyScale Blog
MyScale Blog
V
Visual Studio Blog
月光博客
月光博客
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
量子位
I
InfoQ
有赞技术团队
有赞技术团队
阮一峰的网络日志
阮一峰的网络日志
Jina AI
Jina AI
V
V2EX
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
Blog — PlanetScale
Blog — PlanetScale
Last Week in AI
Last Week in AI
雷峰网
雷峰网
Stack Overflow Blog
Stack Overflow Blog
博客园 - Franky

DEV Community

Authentication Security Deep Dive: From Brute Force to Salted Hashing (With Java Examples) Why AI Systems Don’t Fail — They Drift Spilling beans for how i learn for exam😁"Reinforcement Learning Cheat Sheet" I Replaced Chrome with Safari for AI Browser Automation. Here's What Broke (and What Finally Worked) How Python Borrows Other People's Work The $40 Architecture: Processing 1 Billion API Requests with 99.99% Uptime Vibe Coding: A Workflow Guide (From Zero to SaaS) Most webhook security guides protect the wrong side. The scary part is delivery. Headless CMS for TanStack Start: Build a Blog with Cosmic EU Age Verification App "Hacked in 2 Minutes" — What Actually Happened Comfy Cloud’s delete function does not actually remove files Running AI Models on GPU Cloud Servers: A Beginner Guide Event-driven media intelligence with AWS Step Functions and Bedrock I scored 500 AI prompts across 8 quality dimensions — here's what broke How to Call Google Gemini API from Next.js (Free Tier, No Backend Needed) The Portal Protocol: Reclaiming Human Connection in the Age of AI How to Fix Your Team's Scattered Knowledge Problem With a Self-Hosted Forum Intro to tc Cloud Functors: A Graph-First Mental Model for the Modern Cloud Designing Multi-Tenant Backends With Both Ownership and Team Access I Built a Neumorphic CSS Library with 77+ Components — Here's What I Learned PostgreSQL Performance Optimization: Why Connection Pooling Is Critical at Scale Cómo construí un SaaS multi-rubro para gestionar expensas en Argentina con FastAPI + Vue 3 🚀 I Built an Ethical Hacking Scanner Tool – Open Source Project I Replaced /usage and /context in Claude Code With a Single Statusline A Pythonic Way to Handle Emails (IMAP/SMTP) with Auto-Discovery and AI-Ready Design I Collected 8.9 Million Polymarket Price Points — Here's What I Found About How Markets Really Move EcoTrack AI — Carbon Footprint Tracker & Dashboard Everyone's Using AI. No One Agrees How. 5 self-hosted ebook managers worth trying in 2026 Building Your First AI Agent with LangChain: From Chatbot to Autonomous Assistant Common SOC 2 Failures (Real World) Stop Vibe-Checking Your AI App: A Practical Guide to Evals How to Use SonarQube and SonarScanner Locally to Level Up Your Code Quality Your Next To-Do App Is Dead — I Replaced Mine with an OpenClaw AI Sign a Nostr event in 60 lines of Python using coincurve — no nostr-sdk, no nbxplorer, no rust toolchain ITGC Audit Explained Like You’re in Big 4 Patch Tuesday abril 2026: Microsoft parcha 163 vulnerabilidades y un zero-day en SharePoint Stop scraping everything: a better way to track competitor price changes Listing on MCPize + the Official MCP Registry while routing payments OUTSIDE the marketplace — how I kept 100% of my x402 revenue Building an AI-Powered Risk Intelligence System Using Serverless Architecture Why We Ripped Function Overloading Out of Our AI Toolchain Testing AI-Generated Code: How to Actually Know If It Works SaaS Churn Is Killing Your Business. Here Is What to Do About It (Without a Support Team) The Speed of AI Is No Longer Linear - And Self-Improving Models Are Why How to Implement RBAC for MCP Tools: A Practical Guide for Engineering Teams From Standard Quote to Persuasive Proposal: AI Automation for Arborists I built a CLI that scaffolds complete multi-tenant SaaS apps Axios CVE-2025–62718: The Silent SSRF Bug That Could Be Hiding in Your Node.js App Right Now The dashboard that ended our friendship Data Pipelines Explained Simply (and How to Build Them with Python)
파이썬에서 생산 파이프라인으로: Apache Airflow에 대한 실용 가이드
Varun Joshi · 2026-05-25 · via DEV Community

파이썬을 사용해 왔고, 데이터를 가져오고, 정리하고, 어딘가에 로드하는 스크립트를 작성했을 수 있습니다. 아마도 서버 어딘가에 while True 루프가 실행 중이거나, 혹은触碰하지 않고 싶은 cron job이 있을 수도 있습니다.

이 튜토리얼은 당신을 위해입니다.

Apache Airflow는 Python 스크립트를 "나의 머신에서 실행"에서 "매일 아침 6시에 신뢰성 있게 실행, 실패 시 재시도, 문제가 발생하면 알림 전송, 정확히 무슨 일이 일어났는지 확인할 수 있는 UI를 가진 도구로 변환하는 도구입니다. 대부분의 데이터 엔지니어링 팀에서 표준 오케스트레이션 도구이며, 보이는 것처럼 더 접근하기 쉽습니다.

이 포스트의 끝에 이해하게 될 것입니다. Airflow가 어떻게 생각하는지와 실제 데이터 파이프라인을 스케줄하는 작동하는 DAG를 가질 것입니다.

실제로 Airflow가 무엇인지 (그리고 아닌지)는 무엇인가요?

Airflow는 작업 조정자입니다. 데이터를 직접 이동하지는 않지만, 작업을 일정하고 모니터링합니다.

매우 똑똑한 CRON 작업처럼 생각해보세요:

  1. 작업이 실행되어야 할 순서를 알고 있습니다.
  2. 실패한 작업을 자동으로 다시 시도합니다.
  3. 모든 실행의 전체 이력을 유지합니다.
  4. UI에 표시되는 모든 내용을 보여줍니다.
  5. 문제가 발생할 때 경고를 보냅니다.

이것이 아닌 것: 데이터 처리 엔진입니다. 실제 작업을 수행하기 위해 여전히 Python을 작성합니다. Airflow는 실행할 때를 결정하고 실패했을 때 할 일을 결정합니다.

2분 안에 핵심 개념

코드를 다루기 전에 알아야 할 세 가지 용어:

DAG(방향성 비순환 그래프): 이것이 당신의 파이프라인입니다. DAG는 일련의 작업과 그 순서를 정의하는 파이썬 파일입니다. 비순환 부분은 작업이 스스로 루프를 돌 수 없음을 의미합니다.

작업: DAG 내의 단일 작업 단위, 파이썬 함수를 실행하거나, SQL 쿼리를 실행하거나, API를 호출하거나 다른 다수의 작업을 수행할 수 있습니다.

연산자: 작업의 구성 요소입니다.PythonOperator는 파이썬 함수를 실행합니다.BashOperator는 셸 명령어를 실행합니다.PostGresOperator는 SQL을 실행하며, Airflow는 거의 모든 작업에 operator를 제공합니다.

로컬에서 Airflow 설정하기

로컬에서 Airflow를 실행하는 가장 빠른 방법은 pip를 사용하는 것입니다.

pip install apache-airflow
airflow dbt init
airflow user create \
   --username admin \
   --password admin \
   --firstname your \
   --lastname name \
   --role admin \
   --email admin@x.com \

전체 화면 모드로 입력 전체 화면 모드 종료

두 개의 다른 터미널에서 스케줄러와 서버를 시작합니다

#Terminal 1
airflow scheduler

#Terminal 2
airflow webserver --port 8080

전체 화면 모드를 입력하세요 전체 화면 모드를 종료하세요

이제 http://localhost:8080를 방문하세요 - 이미 여러 예제 DAG가 Airflow UI에 표시됩니다.

첫 번째 DAG: 실제 데이터 파이프라인

유용한 것을 만들어보고 'Hello world'만은 넘어서겠습니다. 파이프라인을 만들겠습니다:

1- 공개 API에서 데이터를 가져옵니다
2- 정리하고 변환합니다
3- CSV로 저장

이는 실제 소비 파이프라인과 유사하지만 기업적인 복잡성은 없습니다.

파일을 만드세요名叫weather_pipeline.py너의 airflow 안에dags/폴더.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import pandas as pd
import os

default_args= {
    'owner': 'varun'
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_in_future': False,
}

#Define a DAG----

with DAG(
   dag_id='weather_pipeline'
   default_args = default_args,
   description = 'Fetch, transform and save daily weather data'
   schedule_interval = '0 6 * * *' # runs daily at 6 am
   start_date=datetime(2024,1,1)
   catchup = False,
   tags= ['Weather','Tutorial']
) as dag:
   pass

전체 화면 모드로 입력하세요 전체 화면 모드 종료

*주의해야 할 몇 가지 사항이 있습니다
*

  • schedule_interval은 크론 표현식입니다.
  • catchup=False는 Airflow가 start_date과 오늘 사이의 모든 역사적 날짜를 실행하지 않도록 합니다. 거의 항상 그것을 False
  • Retries =2를 원합니다.__JHSNS_SEG_199307cb_48__는 Airflow가 실패로 표시하기 전에 두 번 시도하려고 합니다.

작업 추가

이제 pass

def fetch_weather(**context):
    """Pull weather from open meteo (free, No API needed)"""
    url = "https://api.open-meteo.com/v1/forecast"
    params = {
       "latitude": 28.6,
       "longitude": 77.2,
       "daily":"temperature_2m_max, temperature_2m_min, precipitation_sum",
       "timezone":"Asia/Kolkata",
       "forecast_days": 1
 }
    response = request.get(url, params=params)
    response.raise_for_status() #raise an exception

    data = response.json()
    #Push data to Xcom so the next task can access it
    context['ti'].xcom_push(key='raw_weather', value =data)
    print(f"Fetched weather data for {data['daily']['time'][0]}")

def transform_weather(**context):
   """clean and reshape the raw API response"""
   raw = context['ti'].xcom_pull(key='raw_weather', task_ids='fetch_weather')
   daily = raw['daily']

   df = pd.DataFrame({
        'date': daily['time']
        'temp_max_c': daily['temperature_2m_max'],
        'temp_min_c':daily['temperature_2m_min'],
        'precipitation_mm':daily['precipitation_mm'],
        })
   context['ti'].xcom_push(key='clean_weather', value=df.to_dict('records'))
        print(f"✓ Transformed {len(df)} rows")


   df save_weather(**context):
      """Append the cleaned data to csv file"""
      records = context["ti"].xcom_pull(key='clean_weather', task_ids='transform_weather')

      df= pd.DataFrame(records)
      output_path= '/tmp/weather_data.csv'
      file_exists = os.path.exists(output_path)

      df.to_csv(output_path, mode='a', header=not file_exists, index=False)

      print(f"✓ Saved to {output_path}")


   #Wiring the tasks
   t1 = PythonOperator(task_id = 'fetch_weather', python_callable=fetch_weather)
   t2 = PythonOperator(task_id = 'transform_weather', python_callable=transform_weather)
   t3 = PythonOperator(task_id = 'Save_weather', python_callable=Save_weather)

의 위치에 세 가지 작업을 추가합니다. Enter fullscreen mode 전체 화면 모드 종료

아래의 t1>>t2>>t3는 '이 순서대로 실행하라'는 의미로 표현하는 방법입니다. 이것은 나중에 익숙해질 수 있지만 처음에는 이상하게 느껴질 수 있는 일 중 하나입니다.

Xcom:작업 간 통신 방법

코드에서 xcom_pushxcom_pull를 본 적이 있을 것입니다.
이것은 airflow 작업이 서로 데이터를 전달하는 방식입니다.

XCom(크로스커뮤니케이션)은 Airflow에 내장된 작은 저장소입니다.
한 작업이 값을 전송하면 다음 작업은 TaskID와 Key를 통해 값을 가져옵니다.

#Task-1 Push
context['ti'].xcom_push(key='my_data', value={'rows':42})
#Task-2 Pull
data = context['ti'].xcom_pull(key='my_data', task_ids='task_1')

전체 화면 모드로 진입 전체 화면 모드 종료

Xcom은 Airflow 메타데이터 데이터베이스에 저장되므로 큰 데이터셋용이 아닙니다. 작은 페이로드용으로 사용하세요 - 카운트, 파일 경로, API 응답. 큰 데이터는 S3/GCS에 쓰고 파일 경로를 Xcom을 통해 전달하세요.

실행 방법

파일이 dags/ 폴더에 있으면 Airflow가 자동으로 가져옵니다. http://localhost:8080에 나타나야 합니다.

수동으로 트리거하려면:

  1. DAG 목록에서 weather_pipeline를 찾으세요.
  2. 를 켜세요.
  3. DAG를 트리거하세요.

그래프 뷰를 확인하고 작업이 완료되면 녹색으로 변해야 합니다. 만약 무언가 실패하면 작업은 빨강색으로 표시됩니다.

생산 환경에서 중요한 패턴

  1. execution_date를 무효화 가능한 파이프라인에 사용하세요.
def fetch_weather(**context):
    run_date = context['execution_date'].strftime('%Y-%m-%d')
    #fetch data for that specific date

전체 화면 모드를 입력하세요 전체 화면 모드 종료

이렇게 하면 다시 실행할 수 있도록 만들어서, 이가 화요일에 실패하면 수요일에 실행하여 화요일 데이터를 가져올 수 있습니다.

  1. 항상 retriesretry_delay

를 설정하세요. 네트워크 호출이 실패하면 APIs가 다운되고, 데이터베이스가 느려집니다.
재시도를 설정하면 새벽 2시에 통화할 필요가 없어집니다.

default_args = {
      'retries': 3
      'retry_delay' : timedelta(minutes=10)

}

전체 화면 모드 입력 전체 화면 모드 종료

  1. on_failure_callback을 사용하여 경고합니다
def alert_on_failure(context):
    #send a slack message, email etc
    print(f"Task failed:{context['task_instance'].task_id}")

default_args = {
   'on_failure_callback': alert_on_failure,
}

전체 화면 모드로 전환 전체 화면 모드 종료

  1. 작업을 작고 집중적으로 유지

하나의 작업=하나의 일, 모든 일을 하는 거대한 함수를 작성하지 마세요. 작은 작업은 더 깨끗한 재시도와 쉬운 로그 읽기를 의미합니다.

그것이 전부입니다

당신은 컴퓨터에서 수동으로 파이썬 코드를 실행하는 것에서 스케줄링으로 전환했으며, 재시도 기능과 함께 자동으로 작업을 단계별로 실행하고 실패 알림을 받았습니다.

학습 곡선은 현실이지만, 대부분은 설정 단계입니다. 두 개 또는 세 개의 DAG를 작성한 후에는 패턴을 클릭하고 CRON으로 돌아가지 않습니다.