惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

Simon Willison's Weblog
Simon Willison's Weblog
P
Privacy International News Feed
www.infosecurity-magazine.com
www.infosecurity-magazine.com
T
Troy Hunt's Blog
Hacker News - Newest:
Hacker News - Newest: "LLM"
Attack and Defense Labs
Attack and Defense Labs
S
Secure Thoughts
V2EX - 技术
V2EX - 技术
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
O
OpenAI News
Cloudbric
Cloudbric
Google Online Security Blog
Google Online Security Blog
Schneier on Security
Schneier on Security
cs.CV updates on arXiv.org
cs.CV updates on arXiv.org
Help Net Security
Help Net Security
Cyberwarzone
Cyberwarzone
G
GRAHAM CLULEY
L
Lohrmann on Cybersecurity
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
Spread Privacy
Spread Privacy
NISL@THU
NISL@THU
N
News and Events Feed by Topic
T
Tenable Blog
S
Security @ Cisco Blogs
N
News and Events Feed by Topic
The Hacker News
The Hacker News
C
CXSECURITY Database RSS Feed - CXSecurity.com
宝玉的分享
宝玉的分享
月光博客
月光博客
酷 壳 – CoolShell
酷 壳 – CoolShell
美团技术团队
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
Google DeepMind News
Google DeepMind News
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
T
Tailwind CSS Blog
V
Visual Studio Blog
P
Proofpoint News Feed
Webroot Blog
Webroot Blog
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
博客园 - 三生石上(FineUI控件)
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
Jina AI
Jina AI
雷峰网
雷峰网
T
The Blog of Author Tim Ferriss
Hugging Face - Blog
Hugging Face - Blog
腾讯CDC
L
LangChain Blog
The Register - Security
The Register - Security
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
博客园 - 聂微东

彩虹海是我哒

Vast AI 内推 如何判断AWS Cloudfront边缘节点地域 如何在本地启动Kafka集群 User interaction is not allowed ghcr.io 容器注册表验证失败 迁移站点到家庭服务器 2023春节好 为了玩私服,组了台“虚拟机” 如果太阳是蓝色的 默哀
Airflow任务工作流框架部署
作者: 冯小贤 · 2024-06-25 · via 彩虹海是我哒

airflow可以通过DAG配置文件,定义一组有依赖的任务,按照依赖依次执行。

基于python的工作流管理平台,自带webUI,命令行和调度。

使用python编写子任务,有各种operater可以直接开箱即用,这里看支持的operators

系统依赖

安装更新一些依赖

sudo apt update && sudo apt upgrade

sudo apt install build-essential unzip bzip2 zlib1g-dev pkg-config libncurses5-dev libffi-dev libreadline-dev libbz2-dev libsqlite3-dev libssl-dev liblzma-dev libmysqlclient-dev libkrb5-dev unixodbc

安装pyenv

curl https://pyenv.run | bash

pyenv install 3.12.3

pyenv versions

pyenv global 3.12.3

python -V

pyenv virtualenv 3.12.3 airflow

curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py

python get-pip.py

运行时依赖

requirements.txt

apache-airflow==2.9.0
airflow-clickhouse-plugin==1.4.0
apache-airflow-providers-apache-flink==1.5.1
apache-airflow-providers-apache-hdfs==4.6.0
apache-airflow-providers-apache-hive==8.2.1
apache-airflow-providers-apache-kafka==1.6.1
apache-airflow-providers-apache-spark==4.11.3
apache-airflow-providers-elasticsearch==5.5.3
apache-airflow-providers-grpc==3.6.0
apache-airflow-providers-redis==3.8.0
apache-airflow-providers-postgres==5.14.0
apache-airflow-providers-influxdb==2.7.1
apache-airflow-providers-jdbc==4.5.3
apache-airflow-providers-microsoft-azure==11.1.0
apache-airflow-providers-mysql==5.7.4
apache-airflow-providers-mongo==4.2.2
apache-airflow-providers-neo4j==3.7.0
apache-airflow-providers-odbc==4.8.1
apache-airflow-providers-trino==5.9.0
apache-airflow-providers-ssh==3.14.0
apache-airflow-providers-amazon==9.1.0
apache-airflow-providers-cncf-kubernetes==10.0.1
apache-airflow-providers-http==4.13.3

安装依赖

pip install -r requirements.txt
# 指标
pip install apache-airflow[statsd]
pip install apache-airflow[celery]

运行

export AIRFLOW_HOME=~/airflow

# 第一次需要初始化数据库
airflow db init

# -D 后台运行
airflow webserver --port 9988

# 创建管理员用户
airflow users create -e admin@abc.com -f my -l admin -r Admin -u admin -p Pa55w0rd

# 创建后的管理员账户密码
# admin
# Pa55w0rd

# -D 后台运行
airflow scheduler -D

# -D 后台运行
airflow triggerer -D

# 需要在 airflow.cfg 配置文件中,把 standalone_dag_processor 项设置为 True
airflow dag-processor -D

# -D 后台运行
airflow celery flower -D

# -D 后台运行
airflow celery worker -D

高可用

将上面所有操作在新机器执行一遍,最后运行的命令只需要执行如下命令

# -D 后台运行
airflow celery worker -D
由于跨机器的任务调度执行,需要将 dag 文件完全同步到所有worker中

维护

# 启动服务
/home/ubuntu/airflow/run.sh start

# 停止服务
/home/ubuntu/airflow/run.sh stop

启停脚本 run.sh

#!/bin/bash

set -eux

source /home/ubuntu/.pyenv/versions/airflow/bin/activate

case $1 in
"start") {
    echo " --------启动 airflow-------"
    airflow webserver --port 9988 -D
    airflow scheduler -D
    airflow triggerer -D
    airflow dag-processor -D
    airflow celery worker -D
    airflow celery flower -D
};;
"stop") {
    echo " --------关闭 airflow-------"
    airflow celery stop

    celery_flower_pid=$(ps -ef | egrep 'airflow celery flower' | grep -v grep | awk '{print $2}')
    if [[ $celery_flower_pid != "" ]]; then
        ps -ef | egrep 'airflow celery flower' | grep -v grep | awk '{print $2}' | xargs kill -15
        airflow_flower_pid_file="/home/ubuntu/airflow/airflow-flower.pid"
        if [ -f $airflow_flower_pid_file ]; then
            rm $airflow_flower_pid_file
        fi
    fi

    airflow_scheduler_pid=$(ps -ef | egrep 'airflow scheduler' | grep -v grep | awk '{print $2}')
    if [[ $airflow_scheduler_pid != "" ]]; then
        ps -ef | egrep 'airflow scheduler' | grep -v grep | awk '{print $2}' | xargs kill -15
        airflow_scheduler_pid_file="/home/ubuntu/airflow/airflow-scheduler.pid"
        if [ -f $airflow_scheduler_pid_file ]; then
            rm $airflow_scheduler_pid_file
        fi
    fi

    airflow_triggerer_pid=$(ps -ef | egrep 'airflow triggerer' | grep -v grep | awk '{print $2}')
    if [[ $airflow_triggerer_pid != "" ]]; then
        ps -ef | egrep 'airflow triggerer' | grep -v grep | awk '{print $2}' | xargs kill -15
        airflow_triggerer_pid_file="/home/ubuntu/airflow/airflow-triggerer.pid"
        if [ -f $airflow_triggerer_pid_file ]; then
            rm $airflow_triggerer_pid_file
        fi
    fi

    airflow_master_pid=$(ps -ef | egrep 'gunicorn: master' | grep -v grep | awk '{print $2}')
    if [[ $airflow_master_pid != "" ]]; then
        ps -ef | egrep 'gunicorn: master' | grep -v grep | awk '{print $2}' | xargs kill -15
        airflow_webserver_pid_file="/home/ubuntu/airflow/airflow-webserver.pid"
        if [ -f $airflow_webserver_pid_file ]; then
            rm $airflow_webserver_pid_file
        fi
        airflow_webserver_monitor_pid_file="/home/ubuntu/airflow/airflow-webserver-monitor.pid"
        if [ -f $airflow_webserver_monitor_pid_file ]; then
            rm $airflow_webserver_monitor_pid_file
        fi
        airflow_master_pid_file="/home/ubuntu/airflow/airflow-master.pid"
        if [ -f $airflow_master_pid_file ]; then
            rm $airflow_master_pid_file
        fi
        airflow_worker_pid_file="/home/ubuntu/airflow/airflow-worker.pid"
        if [ -f $airflow_worker_pid_file ]; then
            rm $airflow_worker_pid_file
        fi
    fi

    # ps -ef | egrep 'airflow scheduler' | grep -v grep | awk '{print $2}' | xargs kill -15
};;
esac

健康检查脚本 health.sh

需要先安装 antonmedv/fx 用于处理json,也可自行替换
#!/bin/bash

print() {
  echo -e "$(date) $1"
}
print "开始检查airflow健康状态"

source ~/.pyenv/versions/airflow/bin/activate

echo -e
health_resp=$(curl -sL http://127.0.0.1:9988/health)
echo $health_resp | /usr/local/bin/fx .
echo -e

print "输出各服务状态"
dag_processor_status=$(echo $health_resp | /usr/local/bin/fx .dag_processor.status)
metadatabase_status=$(echo $health_resp | /usr/local/bin/fx .metadatabase.status)
scheduler_status=$(echo $health_resp | /usr/local/bin/fx .scheduler.status)
trigger_status=$(echo $health_resp | /usr/local/bin/fx .triggerer.status)
printf "%20s: %10s\n" "dag_processor_status" $dag_processor_status "metadatabase_status" $metadatabase_status "scheduler_status" $scheduler_status "trigger_status" $trigger_status
echo -e

if [[ "$scheduler_status" != "healthy" ]];then
    print "重新启动airflow调度器..."
    airflow scheduler -D
    print "成功启动airflow调度器!"
fi

if [[ "$trigger_status" != "healthy" ]];then
    print "重新启动airflow触发器..."
    airflow triggerer -D
    print "成功启动airflow触发器!"
fi

# crontab
# 1 * * * * /home/ubuntu/airflow/health.sh

第三方服务

https://www.astronomer.io/product/

参考