파이썬을 사용해 왔고, 데이터를 가져오고, 정리하고, 어딘가에 로드하는 스크립트를 작성했을 수 있습니다. 아마도 서버 어딘가에 while True 루프가 실행 중이거나, 혹은触碰하지 않고 싶은 cron job이 있을 수도 있습니다.
이 튜토리얼은 당신을 위해입니다.
Apache Airflow는 Python 스크립트를 "나의 머신에서 실행"에서 "매일 아침 6시에 신뢰성 있게 실행, 실패 시 재시도, 문제가 발생하면 알림 전송, 정확히 무슨 일이 일어났는지 확인할 수 있는 UI를 가진 도구로 변환하는 도구입니다. 대부분의 데이터 엔지니어링 팀에서 표준 오케스트레이션 도구이며, 보이는 것처럼 더 접근하기 쉽습니다.
이 포스트의 끝에 이해하게 될 것입니다. Airflow가 어떻게 생각하는지와 실제 데이터 파이프라인을 스케줄하는 작동하는 DAG를 가질 것입니다.
실제로 Airflow가 무엇인지 (그리고 아닌지)는 무엇인가요?
Airflow는 작업 조정자입니다. 데이터를 직접 이동하지는 않지만, 작업을 일정하고 모니터링합니다.
매우 똑똑한 CRON 작업처럼 생각해보세요:
- 작업이 실행되어야 할 순서를 알고 있습니다.
- 실패한 작업을 자동으로 다시 시도합니다.
- 모든 실행의 전체 이력을 유지합니다.
- UI에 표시되는 모든 내용을 보여줍니다.
- 문제가 발생할 때 경고를 보냅니다.
이것이 아닌 것: 데이터 처리 엔진입니다. 실제 작업을 수행하기 위해 여전히 Python을 작성합니다. Airflow는 실행할 때를 결정하고 실패했을 때 할 일을 결정합니다.
2분 안에 핵심 개념
코드를 다루기 전에 알아야 할 세 가지 용어:
DAG(방향성 비순환 그래프): 이것이 당신의 파이프라인입니다. DAG는 일련의 작업과 그 순서를 정의하는 파이썬 파일입니다. 비순환 부분은 작업이 스스로 루프를 돌 수 없음을 의미합니다.
작업: DAG 내의 단일 작업 단위, 파이썬 함수를 실행하거나, SQL 쿼리를 실행하거나, API를 호출하거나 다른 다수의 작업을 수행할 수 있습니다.
연산자: 작업의 구성 요소입니다.PythonOperator는 파이썬 함수를 실행합니다.BashOperator는 셸 명령어를 실행합니다.PostGresOperator는 SQL을 실행하며, Airflow는 거의 모든 작업에 operator를 제공합니다.
로컬에서 Airflow 설정하기
로컬에서 Airflow를 실행하는 가장 빠른 방법은 pip를 사용하는 것입니다.
pip install apache-airflow
airflow dbt init
airflow user create \
--username admin \
--password admin \
--firstname your \
--lastname name \
--role admin \
--email admin@x.com \
두 개의 다른 터미널에서 스케줄러와 서버를 시작합니다
#Terminal 1
airflow scheduler
#Terminal 2
airflow webserver --port 8080
이제 http://localhost:8080를 방문하세요 - 이미 여러 예제 DAG가 Airflow UI에 표시됩니다.
첫 번째 DAG: 실제 데이터 파이프라인
유용한 것을 만들어보고 'Hello world'만은 넘어서겠습니다. 파이프라인을 만들겠습니다:
1- 공개 API에서 데이터를 가져옵니다
2- 정리하고 변환합니다
3- CSV로 저장
이는 실제 소비 파이프라인과 유사하지만 기업적인 복잡성은 없습니다.
파일을 만드세요名叫weather_pipeline.py너의 airflow 안에dags/폴더.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import pandas as pd
import os
default_args= {
'owner': 'varun'
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_in_future': False,
}
#Define a DAG----
with DAG(
dag_id='weather_pipeline'
default_args = default_args,
description = 'Fetch, transform and save daily weather data'
schedule_interval = '0 6 * * *' # runs daily at 6 am
start_date=datetime(2024,1,1)
catchup = False,
tags= ['Weather','Tutorial']
) as dag:
pass
*주의해야 할 몇 가지 사항이 있습니다
*
- schedule_interval은 크론 표현식입니다.
-
catchup=False는 Airflow가start_date과 오늘 사이의 모든 역사적 날짜를 실행하지 않도록 합니다. 거의 항상 그것을False -
Retries =2를 원합니다.__JHSNS_SEG_199307cb_48__는 Airflow가 실패로 표시하기 전에 두 번 시도하려고 합니다.
작업 추가
이제 pass
def fetch_weather(**context):
"""Pull weather from open meteo (free, No API needed)"""
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": 28.6,
"longitude": 77.2,
"daily":"temperature_2m_max, temperature_2m_min, precipitation_sum",
"timezone":"Asia/Kolkata",
"forecast_days": 1
}
response = request.get(url, params=params)
response.raise_for_status() #raise an exception
data = response.json()
#Push data to Xcom so the next task can access it
context['ti'].xcom_push(key='raw_weather', value =data)
print(f"Fetched weather data for {data['daily']['time'][0]}")
def transform_weather(**context):
"""clean and reshape the raw API response"""
raw = context['ti'].xcom_pull(key='raw_weather', task_ids='fetch_weather')
daily = raw['daily']
df = pd.DataFrame({
'date': daily['time']
'temp_max_c': daily['temperature_2m_max'],
'temp_min_c':daily['temperature_2m_min'],
'precipitation_mm':daily['precipitation_mm'],
})
context['ti'].xcom_push(key='clean_weather', value=df.to_dict('records'))
print(f"✓ Transformed {len(df)} rows")
df save_weather(**context):
"""Append the cleaned data to csv file"""
records = context["ti"].xcom_pull(key='clean_weather', task_ids='transform_weather')
df= pd.DataFrame(records)
output_path= '/tmp/weather_data.csv'
file_exists = os.path.exists(output_path)
df.to_csv(output_path, mode='a', header=not file_exists, index=False)
print(f"✓ Saved to {output_path}")
#Wiring the tasks
t1 = PythonOperator(task_id = 'fetch_weather', python_callable=fetch_weather)
t2 = PythonOperator(task_id = 'transform_weather', python_callable=transform_weather)
t3 = PythonOperator(task_id = 'Save_weather', python_callable=Save_weather)
아래의 t1>>t2>>t3는 '이 순서대로 실행하라'는 의미로 표현하는 방법입니다. 이것은 나중에 익숙해질 수 있지만 처음에는 이상하게 느껴질 수 있는 일 중 하나입니다.
Xcom:작업 간 통신 방법
코드에서 xcom_push와 xcom_pull를 본 적이 있을 것입니다.
이것은 airflow 작업이 서로 데이터를 전달하는 방식입니다.
XCom(크로스커뮤니케이션)은 Airflow에 내장된 작은 저장소입니다.
한 작업이 값을 전송하면 다음 작업은 TaskID와 Key를 통해 값을 가져옵니다.
#Task-1 Push
context['ti'].xcom_push(key='my_data', value={'rows':42})
#Task-2 Pull
data = context['ti'].xcom_pull(key='my_data', task_ids='task_1')
Xcom은 Airflow 메타데이터 데이터베이스에 저장되므로 큰 데이터셋용이 아닙니다. 작은 페이로드용으로 사용하세요 - 카운트, 파일 경로, API 응답. 큰 데이터는 S3/GCS에 쓰고 파일 경로를 Xcom을 통해 전달하세요.
실행 방법
파일이 dags/ 폴더에 있으면 Airflow가 자동으로 가져옵니다. http://localhost:8080에 나타나야 합니다.
수동으로 트리거하려면:
- DAG 목록에서
weather_pipeline를 찾으세요. - 를 켜세요.
- DAG를 트리거하세요.
그래프 뷰를 확인하고 작업이 완료되면 녹색으로 변해야 합니다. 만약 무언가 실패하면 작업은 빨강색으로 표시됩니다.
생산 환경에서 중요한 패턴
-
execution_date를 무효화 가능한 파이프라인에 사용하세요.
def fetch_weather(**context):
run_date = context['execution_date'].strftime('%Y-%m-%d')
#fetch data for that specific date
이렇게 하면 다시 실행할 수 있도록 만들어서, 이가 화요일에 실패하면 수요일에 실행하여 화요일 데이터를 가져올 수 있습니다.
- 항상
retries와retry_delay
를 설정하세요. 네트워크 호출이 실패하면 APIs가 다운되고, 데이터베이스가 느려집니다.
재시도를 설정하면 새벽 2시에 통화할 필요가 없어집니다.
default_args = {
'retries': 3
'retry_delay' : timedelta(minutes=10)
}
- 는
on_failure_callback을 사용하여 경고합니다
def alert_on_failure(context):
#send a slack message, email etc
print(f"Task failed:{context['task_instance'].task_id}")
default_args = {
'on_failure_callback': alert_on_failure,
}
- 작업을 작고 집중적으로 유지
하나의 작업=하나의 일, 모든 일을 하는 거대한 함수를 작성하지 마세요. 작은 작업은 더 깨끗한 재시도와 쉬운 로그 읽기를 의미합니다.
그것이 전부입니다
당신은 컴퓨터에서 수동으로 파이썬 코드를 실행하는 것에서 스케줄링으로 전환했으며, 재시도 기능과 함께 자동으로 작업을 단계별로 실행하고 실패 알림을 받았습니다.
학습 곡선은 현실이지만, 대부분은 설정 단계입니다. 두 개 또는 세 개의 DAG를 작성한 후에는 패턴을 클릭하고 CRON으로 돌아가지 않습니다.











