


























Originally published on Towards AI.
One of the main goals of creating my home lab is to gain a deeper understanding of Machine Learning Operations (MLOps) and how to productionalize AI workflows. Generally speaking, MLOps and productionalization deals with moving AI models from research into a real-life environment with automation and ability to handle errors gracefully.
In my previous articles, I set up a PostgreSQL server and an Airflow server. These serve as the data foundation for how to get datasets that will be used to train AI models. Now we need to start filling out our PostgreSQL databases with data. We can use Airflow to orchestrate data pipelines so that up-to-date data is loaded into our PostgreSQL database. Setting up this data foundation is typically the first step in the machine learning (ML) process after planning, as you need data to train your models.
A major reason behind my home lab setup is I want to show that you can self-host the whole ML process with a couple of VMs and containers. Since I have been doing a lot of personal investments lately, let’s work with finance data. With finance data, you can analyze trends, correlate prices, and even try forecasting, making it broadly useful across many scenarios.
Before we start, here are things I learnt and implemented that will make life easier with Airflow. When I set up the Airflow server previously, I used the following commands:
nohup airflow scheduler > scheduler.log 2>&1 &
nohup airflow dag-processor > dag-processor.log 2>&1 &
nohup airflow triggerer > triggerer.log 2>&1 &
nohup airflow api-server --port 8080 > api-server.log 2>&1 &
The nohup command allows you to run a process continuously even after you log out or close the shell. The issue I ran into was that the Airflow components would crash and I would have to go to the Airflow server and re-run the commands. Another issue was that starting up all the components required 4 commands which could be annoying to run all the time.
To solve the latter issue, you can write a script to start or restart Airflow.
nano airflow_restart.sh
In the airflow_restart.sh file, copy the below code and save it.
#!/bin/bash
pkill -f "airflow" --ignore-ancestors
sleep 2echo "Starting scheduler..."
nohup airflow scheduler > scheduler.log 2>&1 & echo $! | tee scheduler.pid
echo "Starting dag-processor..."
nohup airflow dag-processor > dag-processor.log 2>&1 & echo $! | tee dag-processor.pid
echo "Starting triggerer..."
nohup airflow triggerer > triggerer.log 2>&1 & echo $! | tee triggerer.pid
echo "Starting api-server..."
nohup airflow api-server --port 8080 > api-server.log 2>&1 & echo $! | tee api-server.pid
echo "Airflow restarted"
This script will kill any processes with “airflow” and restart all 4 components. So instead of running all 4 commands, you can now just run the script.
To go a step further and solve the issue of having to restart the Airflow components every time it crashes, you can set up Airflow as a system daemon process. A daemon is a background process that typically starts when you boot up your system, restarts when there are failures, and is detached from the shell so you can keep it running at all times.
First, create a file using:
nano /etc/systemd/system/airflow-scheduler.service
Next, copy the below text, put it in the file, and save it.
[Unit]
Description=Airflow Scheduler
After=network.target postgresql.service
Wants=postgresql.service[Service]
User=<USER>
Group=<GROUP>
Environment=PATH=<AIRFLOW_PATH>:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin
Environment=AIRFLOW_HOME=<AIRFLOW_FOLDER>
ExecStart=<AIRFLOW_PATH> scheduler
Restart=on-failure
RestartSec=5s
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
Replace <USER>, <GROUP>, <AIRFLOW_PATH>, and <AIRFLOW_FOLDER> depending on how you set up your Airflow server. Repeat this for all the other components of Airflow: dag-processor, triggerer, api-server.
Run the below to pick up new system daemon processes.
sudo systemctl daemon-reload
Then, enable and start all the services.
# Enable services on boot
sudo systemctl enable airflow-scheduler
sudo systemctl enable airflow-dag-processor
sudo systemctl enable airflow-triggerer
sudo systemctl enable airflow-api-server# Start services
sudo systemctl start airflow-scheduler
sudo systemctl start airflow-dag-processor
sudo systemctl start airflow-triggerer
sudo systemctl start airflow-api-server
Voila! You now have Airflow running on your server in a more robust way. You can use systemctl status to check if each component is running and journalctl to see logging information.
Another thing to do is to set up the PostgreSQL connection in your Airflow server so that it can talk to your PostgreSQL database. Airflow has a handy PostgreSQL connection that lets you specify the connection parameters once, and this allows you to re-use the connection anytime you want to connect to PostgreSQL.
Edit the airflow.cfg file.
nano airflow.cfg
Find where it says test_connection and set it equal to Enabled. Restart Airflow (either using the script or restart the daemon processes). This allows you to test your connections through the Airflow UI.
Now, let’s go to the Airflow UI and add the PostgreSQL connection.
One the left panel, select Admin > Connections.
On the top right, select Add Connection. Under Connection Type, select Postgres. Note you may need to run pip install apache-airflow-providers-postgres in your Airflow server if you do not see Postgres as an option. Fill in your PostgreSQL Host (IP address), Login, Password, Port, and Database.
You should see your new PostgreSQL connection. To test the connection, click on the graph-like icon. After clicking, it should become like a wi-fi icon. There should also be a green pop-up message on the bottom-right indicating your test connection was successful.
Airflow has been set up and configured properly, so now we can get to writing the code for the Airflow pipeline.
First, a little bit about Airflow. Airflow works through Directed Acyclic Graphs (DAGs), which is just a term they use to describe structured workflows that can contain multiple data processing tasks. Within a DAG, the tasks can be run in specified orders and with dependencies, so you can set up tasks to run when tasks complete or in parallel with other tasks.
To get finance data, I used yfinance, a python package that leverages Yahoo! Finance’s APIs to get market data. We can use this to extract and write stock ticker price data to our PostgreSQL database.
Here is the final code for our Airflow DAG which I put in a file named dag.py.
import pandas as pd
import yfinance as yf
import os
import csv
from datetime import date, datetime, timedelta
from dateutil.relativedelta import relativedelta
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHookdef cad_ticker_price_task():
"""Fetch 5-year historical price data for Canadian equities and load into PostgreSQL.Reads ticker symbols from watchlist/watchlist_cad.csv, appends the '.TO' suffix
for the Toronto Stock Exchange, downloads OHLCV data via yfinance starting 5 years
ago, and writes the result to the finance.watchlist_cad_ticker_price table using
the Airflow connection pg_data_science_conn. Replaces the table on each run.
"""# Open csv file to get watchlist and read into list
dag_folder = os.path.dirname(os.path.abspath(__file__))
csv_path = os.path.join(dag_folder, "watchlist/watchlist_cad.csv")with open(csv_path, mode="r", encoding="utf-8") as file:
csv_reader = csv.reader(file)
watchlist_cad = list(csv_reader)[0]watchlist_cad = [x.upper() + ".TO" for x in watchlist_cad]
# Create pandas dataframe with data starting from 5 years agi
start_dt = date.today() - relativedelta(years=5)finance_df = yf.download(watchlist_cad, start=start_dt)
finance_df = finance_df.stack(level=1)
finance_df = finance_df.reset_index()
finance_df = finance_df.rename(columns={'level_0': 'Date'})
finance_df.index.name = None# Write dataframe to SQL table
db_hook = PostgresHook(postgres_conn_id='pg_data_science_conn')
engine = db_hook.get_sqlalchemy_engine()
table_name = "finance.watchlist_cad_ticker_price"finance_df.to_sql(
name=table_name,
con=engine,
if_exists='replace',
index=False,
chunksize=1000
)
print(f"{table_name} written to PostgreSQL")# Default arguments for Airflow DAG
default_args = {
"owner": "mike",
"depends_on_past": False,
"start_date": datetime(2026, 6, 1),
"retries": 1,
"retry_delay": timedelta(minutes=5),
}# Airflow DAG
with DAG(
dag_id="finance_pipeline_to_postgres",
default_args=default_args,
schedule="@daily",
catchup=False
) as dag:
write_to_db = PythonOperator(
task_id="watchlist_cad_ticker_price_to_postgres",
python_callable=cad_ticker_price_task
)
write_to_db
In simple terms, the code creates a Airflow DAG named finance_pipeline_to_postgres that runs daily. The task takes a list of stock ticker symbols from watchlist_cad.csv and extracts stock ticker price data starting from 5 years before the current date via yfinance. This is loaded into a pandas dataframe where it is also transformed to the correct structure. The code then connects to the PostgreSQL connection we configured earlier and writes it to a table named finance.watchlist_cad_ticker_price.
After we have created the dag.py file, we need to get it to our Airflow server. By default, on the Airflow server, there is a dedicated dags folder where the server looks for Python files to parse and load DAGs. Since I developed the DAG code on a Windows machine separate from the Airflow server, we need to now move it onto the Airflow server into the dags folder.
One simple way is to use scp, a command-line utility to transfer files between local and remote machines. If you decide to go this route, you would simply need to run:
scp <LOCAL_DAG_PATH>/dag.py <AIRFLOW_USER>@<AIRFLOW_IP>:<AIRFLOW_FOLDER>/dags
Here, <LOCAL_DAG_PATH> is where your dag.py file is located, <AIRFLOW_USER> and <AIRFLOW_IP> is the username and IP address of your Airflow server, and <AIRFLOW_FOLDER> is the folder on the Airflow server where your dags folder is located. After running the command, you should be prompted for the password for your Airflow user, and you will need to input the correct password to transfer the file.
The problem with the scp approach is that you need to run the command every time you make edits to the dag.py file. To better mirror production settings, I decided to put the code in a GitHub repository. So, now the development work becomes:
From a production point-of-view, this streamlines the process as I can put many files in the GitHub repo in my Windows machine and then pull all the new files and changes into my Airflow server. It also offers version control so I can rollback changes in case things fail.
To set this up on the Airflow server, we run the following commands.
# Install Git
sudo apt install git# Clone the remote GitHub repo
git clone https://github.com/miketcchung/homelab-data-engineering.git
# Pull changes as needed
git pull
One thing to note is that on the Airflow server, you will need to have all the required packages already installed to run the dag.py code. To fix this, on my Windows machine I wrote a requirements.txt file with all the required packages using:
pip freeze > requirements.txt
Then, on the Airflow server, I pulled the requirements.txt file and updated the Python environment with:
pip install -r requirements.txt
Now that we have our DAG code on our Airflow server and required packages are installed, we just need to make sure that the server reads the DAG correctly.
Run the following. It should output “No Data Found”, meaning there are no input errors.
airflow dags list-import-errors
Below are additional commands you can run to ensure the Airflow server is reading the DAG properly.
In the Airflow UI, you should also be able to see the DAG that you created. On the left panel, select Dags. Filter for Active DAGs as there are a lot of example DAGs that comes with Airflow.
Verify that it is successful. If it failed, there are logs you can refer to to fix your DAG.
You now have a working Airflow DAG writing stock ticker price data to your PostgreSQL database.
You can find the code I used to set up the Airflow DAG and write the stock ticker price dataset to PostgreSQL in this GitHub repo. In the future, I plan to put more data engineering code and workflows into this repo, so stay updated for more.
This is one of my beginning steps toward building production data engineering workflows, and there are additional considerations I came across when developing the code and setting up Airflow.
One pain point when developing the DAG code is that testing is decoupled from the development process. To test the DAG code, I had to commit changes to GitHub first, and then pull the changes on the Airflow server, trigger the DAG, and monitor the logs for errors. Ideally, there should be a way to test the DAG code on my Windows machine before committing changes to GitHub.
Another pain point I know will become a larger issue later on is how to manage the Python environments, especially as I scale up Airflow to run multiple DAGs. With the current setup, any time I add a new DAG or edit a DAG to use new packages, I need to install the package on the Airflow server’s Python environment. Python packages have different dependencies, so using the same Python environment for all the DAGs will likely cause conflicts as I scale up. The obvious fix to this is to use Docker operators, which is supported in Airflow. With Docker, we can spin up a container with its own environment for each DAG or task.
Lastly, it would be nice to have alerting in case DAGs fail, as models depend on the data and would be affected as well. Airflow has built-in alerting for email or Slack, and is something that I will be further developing as the home lab is built out.
Published via Towards AI
15 engineers. 100,000+ students. Towards AI Academy teaches what actually survives production.
Start free — no commitment:
→ 6-Day Agentic AI Engineering Email Guide — one practical lesson per day
→ Agents Architecture Cheatsheet — 3 years of architecture decisions in 6 pages
Our courses:
→ AI Engineering Certification — 90+ lessons from project selection to deployed product. The most comprehensive practical LLM course out there.
→ Agent Engineering Course — Hands on with production agent architectures, memory, routing, and eval frameworks — built from real enterprise engagements.
→ AI for Work — Understand, evaluate, and apply AI for complex work tasks.
Note: Article content contains the views of the contributing authors and not Towards AI.
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。



