






















1、数据抽取:确定数据源,定义数据接口,选择数据抽取方法(主动抽取或由源系统推送)。
2、数据清洗:处理不完整数据、错误数据、重复数据等,确保数据的准确性和一致性。(是数据转换的一部分)
3、数据转换:进行空值处理、数据标准统一、数据拆分、数据验证、数据替换和数据关联等操作。
4、规则检查:根据业务需求进行数据质量和业务规则的校验。
5、数据加载:将数据缓冲区的数据加载到目标数据库或数据仓库中,可能是全量加载或增量加载。
数据库抽取
文件抽取
API 抽取
requests)发送请求,获取 API 返回的数据。消息队列抽取
Apache NiFi
Apache Kafka
假设我们需要从以下三个数据源中抽取数据:
users)中的数据。orders.csv)。pymysql 或 SQLAlchemy。import pandas as pd from sqlalchemy import create_engine # 数据库连接配置 db_config = { 'host': 'localhost', 'user': 'root', 'password': 'password', 'database': 'test_db' } # 创建数据库连接 engine = create_engine(f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}") # 执行 SQL 查询 query = "SELECT * FROM users" # 假设 users 表包含 id, name, age 字段 df_users = pd.read_sql(query, engine) # 输出结果 print("从 MySQL 抽取的用户数据:") print(df_users)
pandas。import pandas as pd # 读取 CSV 文件 df_orders = pd.read_csv('orders.csv') # 假设 orders.csv 包含 order_id, user_id, amount 字段 # 输出结果 print("从 CSV 文件抽取的订单数据:") print(df_orders)
requests。import requests import pandas as pd # API 配置 api_url = "https://api.weatherapi.com/v1/current.json" api_key = "your_api_key" # 替换为你的 API Key params = { 'key': api_key, 'q': 'Beijing' # 查询北京的天气 } # 发送 HTTP 请求 response = requests.get(api_url, params=params) data = response.json() # 解析 JSON 数据 # 将数据保存到 DataFrame weather_data = { 'location': data['location']['name'], 'temperature': data['current']['temp_c'], 'condition': data['current']['condition']['text'] } df_weather = pd.DataFrame([weather_data]) # 输出结果 print("从 API 抽取的天气数据:") print(df_weather)
以下是整合了上述三种数据抽取方式的完整代码:
import pandas as pd from sqlalchemy import create_engine import requests # 1. 从 MySQL 数据库抽取数据 def extract_from_mysql(): # 数据库连接配置 db_config = { 'host': 'localhost', 'user': 'root', 'password': 'password', 'database': 'test_db' } # 创建数据库连接 engine = create_engine(f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}") # 执行 SQL 查询 query = "SELECT * FROM users" df_users = pd.read_sql(query, engine) return df_users # 2. 从 CSV 文件抽取数据 def extract_from_csv(): df_orders = pd.read_csv('orders.csv') return df_orders # 3. 从 API 抽取数据 def extract_from_api(): api_url = "https://api.weatherapi.com/v1/current.json" api_key = "your_api_key" # 替换为你的 API Key params = { 'key': api_key, 'q': 'Beijing' } response = requests.get(api_url, params=params) data = response.json() weather_data = { 'location': data['location']['name'], 'temperature': data['current']['temp_c'], 'condition': data['current']['condition']['text'] } df_weather = pd.DataFrame([weather_data]) return df_weather # 主函数 if __name__ == "__main__": # 抽取数据 df_users = extract_from_mysql() df_orders = extract_from_csv() df_weather = extract_from_api() # 输出结果 print("从 MySQL 抽取的用户数据:") print(df_users) print("\n从 CSV 文件抽取的订单数据:") print(df_orders) print("\n从 API 抽取的天气数据:") print(df_weather)
假设数据如下:
| id | name | age |
|---|---|---|
| order_id | user_id | amount |
|---|---|---|
| location | temperature | condition |
|---|---|---|
运行代码后,输出如下:
从 MySQL 抽取的用户数据: id name age 0 1 Alice 25 1 2 Bob 30 从 CSV 文件抽取的订单数据: order_id user_id amount 0 101 1 100.0 1 102 2 200.0 从 API 抽取的天气数据: location temperature condition 0 Beijing 20.0 Sunny
数据抽取是 ETL 流程的第一步,通常涉及从多种数据源(如数据库、文件、API)中提取数据。通过 Python 和相关库(如 pandas、SQLAlchemy、requests),可以轻松实现数据抽取任务。你可以根据实际需求扩展这个例子,比如支持增量抽取、处理异常情况等。希望这个例子对你有帮助!
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。