Pythonを使用しており、データを取得し、クリーンアップしてどこかにロードするスクリプトを書いていたかもしれません。おそらくサーバーのどこかにwhile Trueのループが走っているか、またはcronジョブに触れられないと思っているかもしれません。
このチュートリアルはあなたのためです。
Apache Airflowは、あなたのPythonスクリプトを「私のマシンで実行する」から「毎朝6時に確実に実行し、失敗した場合に再試行し、何か問題が発生した場合にアラートを送信し、何が正確に起こったかを確認できるUIがある」ツールです。これは大多数のデータエンジニアリングチームで標準的なオーケストレーションツールであり、見た目が分かりやすいため、よりアクセスしやすくなっています。
この投稿の終わりには、Airflowがどのように動作するかを理解し、実際のデータパイプラインをスケジュールする動作するDAGを持つようになります。
実際のAirflowとは(何ではないか)?
Airflowはワークフロー・オーケストレーターです。データを移動することはありませんが、タスクを実行するものをスケジュールし監視します。
非常に賢いCRONジョブのように考えてください:
- タスクが実行される順序を知っています。
- 失敗したタスクを自動的に再試行します。
- 各実行の完全な履歴を保持します。
- UI上のすべてを表示します。
- 問題が発生したときにアラートを送信します。
それは何ではありません:データ処理エンジン。実際の作業を行うためにPythonをまだ書きます。Airflowはそれをいつ実行するかと失敗したときに何をするかを決定します。
2分間でコアコンセプト
コードを触る前に知っておくべき3つの用語:
DAG(有向非巡回グラフ):これはあなたのパイプラインです。DAGはタスクのセットとその実行順序を定義するPythonファイルです。非巡回部分は、タスクが自分自身にループしないことを意味します。
Task: DAG内の単一の作業単位で、Python関数を実行するか、SQLクエリを実行するか、APIを呼び出すか、他にも数十種類のことを行うことができます。
Operator: タスクの構成要素です。PythonOperator は Python 関数を実行します。BashOperator はシェルコマンドを実行します。PostGresOperator は SQL を実行し、airflow にはほぼすべての操作に対応する operator があります。
ローカルで Airflow を設定する
Airflow をローカルで実行する最も簡単な方法は pip です:
pip install apache-airflow
airflow dbt init
airflow user create \
--username admin \
--password admin \
--firstname your \
--lastname name \
--role admin \
--email admin@x.com \
別々のターミナルでスケジューラとサーバーを起動する
#Terminal 1
airflow scheduler
#Terminal 2
airflow webserver --port 8080
今、http://localhost:8080にアクセスしてください - すでに多くのサンプルDAGが含まれたAirflow UIが表示されます.
あなたの最初のDAG: 実際のデータパイプライン
実用的なものを作成しましょう「Hello world」だけではなく、パイプラインを作成します
1- 公開APIからデータを取得します
2- 清掃し、変換します
3- CSV形式で保存
これは実際のインジェストパイプラインに似ており、企業の複雑さはありません。
ファイルを作成してweather_pipeline.pyあなたのairflowの内部にdags/フォルダ。
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import pandas as pd
import os
default_args= {
'owner': 'varun'
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_in_future': False,
}
#Define a DAG----
with DAG(
dag_id='weather_pipeline'
default_args = default_args,
description = 'Fetch, transform and save daily weather data'
schedule_interval = '0 6 * * *' # runs daily at 6 am
start_date=datetime(2024,1,1)
catchup = False,
tags= ['Weather','Tutorial']
) as dag:
pass
*注意点いくつか
*
- スケジュール間隔はCRON式です。
-
catchup=FalseはAirflowにstart_dateから今日までのすべての歴史の日付を実行しないように伝えます。ほとんどの場合、それをFalse -
Retries =2に設定する必要があります。これにより、Airflowは失敗とマークする前に2回試行します。
タスクの追加
今、pass
def fetch_weather(**context):
"""Pull weather from open meteo (free, No API needed)"""
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": 28.6,
"longitude": 77.2,
"daily":"temperature_2m_max, temperature_2m_min, precipitation_sum",
"timezone":"Asia/Kolkata",
"forecast_days": 1
}
response = request.get(url, params=params)
response.raise_for_status() #raise an exception
data = response.json()
#Push data to Xcom so the next task can access it
context['ti'].xcom_push(key='raw_weather', value =data)
print(f"Fetched weather data for {data['daily']['time'][0]}")
def transform_weather(**context):
"""clean and reshape the raw API response"""
raw = context['ti'].xcom_pull(key='raw_weather', task_ids='fetch_weather')
daily = raw['daily']
df = pd.DataFrame({
'date': daily['time']
'temp_max_c': daily['temperature_2m_max'],
'temp_min_c':daily['temperature_2m_min'],
'precipitation_mm':daily['precipitation_mm'],
})
context['ti'].xcom_push(key='clean_weather', value=df.to_dict('records'))
print(f"✓ Transformed {len(df)} rows")
df save_weather(**context):
"""Append the cleaned data to csv file"""
records = context["ti"].xcom_pull(key='clean_weather', task_ids='transform_weather')
df= pd.DataFrame(records)
output_path= '/tmp/weather_data.csv'
file_exists = os.path.exists(output_path)
df.to_csv(output_path, mode='a', header=not file_exists, index=False)
print(f"✓ Saved to {output_path}")
#Wiring the tasks
t1 = PythonOperator(task_id = 'fetch_weather', python_callable=fetch_weather)
t2 = PythonOperator(task_id = 'transform_weather', python_callable=transform_weather)
t3 = PythonOperator(task_id = 'Save_weather', python_callable=Save_weather)
下のt1>>t2>>t3は「この順番で実行する」という意味の方法です。これは少し奇妙に思えるかもしれませんが、後で慣れますよ.
Xcom:タスクがどう話し合うか
コードでxcom_pushとxcom_pullに気づいたでしょう。
このように Airflow のタスクはお互いにデータを渡します。
XCom(クロスコミュニケーション)は Airflow に組み込まれた小さなキー付きストアです。
一つのタスクが値をプッシュし、次のタスクは TaskID と Key でそれをプルします。
#Task-1 Push
context['ti'].xcom_push(key='my_data', value={'rows':42})
#Task-2 Pull
data = context['ti'].xcom_pull(key='my_data', task_ids='task_1')
XcomはAirflowメタデータデータベースに格納されるため、大規模なデータセットには適していない。小さなペイロード(カウント、ファイルパス、APIレスポンス)のために使用する。大規模なデータの場合は、S3/GCSに書き込み、ファイルパスをXcom経由で渡す。
実行方法
ファイルがあなたのdags/フォルダにあると一旦、Airflowが自動的にそれを拾う。http://localhost:8080に表示されることを確認する。
手動でトリガーするには:
- はDAGリストで
weather_pipelineを探してください. - をオンにします.
- DAGをトリガーします.
グラフビューを確認し、タスクが完了すると緑色になります。何か失敗するとタスクは赤色になります.
生産環境で重要なパターン
execution_dateを使用して、idempotentなパイプラインを作成します.
def fetch_weather(**context):
run_date = context['execution_date'].strftime('%Y-%m-%d')
#fetch data for that specific date
これにより、再実行可能なものになりますので、火曜日に失敗した場合、水曜日に火曜日のデータを取得するために実行できます
- 常に
retriesとretry_delay
ネットワーク呼び出しが失敗します。APIがダウンし、データベースが遅くなります
再試行を設定することで、午前2時の電話を節約できます
default_args = {
'retries': 3
'retry_delay' : timedelta(minutes=10)
}
- はアラートに
on_failure_callbackを使用してください
def alert_on_failure(context):
#send a slack message, email etc
print(f"Task failed:{context['task_instance'].task_id}")
default_args = {
'on_failure_callback': alert_on_failure,
}
- タスクは小さく集中させなさい
一つのタスクは一つのこと、全てをやる巨大な関数を書かないで。小さなタスクはクリーンな再試行と読みやすいログを意味する
それだけ
あなたは自分のマシンで手動でPythonコードを実行していたものを、リトライ機能付きでスケジューリングするように変更し、タスクを自動的にステップバイステップで実行し、失敗通知を伴うようにしました
学習曲線は本物ですが、それは主にセットアップの段階です。2つか3つほどのDAGを書いたら、パターンがクリックしてCRONに戻ることはありません











