惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

酷 壳 – CoolShell
酷 壳 – CoolShell
H
Hacker News: Front Page
P
Palo Alto Networks Blog
T
ThreatConnect
Apple Machine Learning Research
Apple Machine Learning Research
博客园_首页
T
True Tiger Recordings
P
Privacy & Cybersecurity Law Blog
B
Blog
IT之家
IT之家
Last Week in AI
Last Week in AI
F
Full Disclosure
Hacker News: Ask HN
Hacker News: Ask HN
C
Comments on: Blog
Microsoft Azure Blog
Microsoft Azure Blog
C
Cybersecurity and Infrastructure Security Agency CISA
Microsoft Security Blog
Microsoft Security Blog
博客园 - 【当耐特】
N
News and Events Feed by Topic
NISL@THU
NISL@THU
腾讯CDC
雷峰网
雷峰网
Security Latest
Security Latest
李成银的技术随笔
M
Microsoft Research Blog - Microsoft Research
L
LangChain Blog
L
Lohrmann on Cybersecurity
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
C
Check Point Blog
Y
Y Combinator Blog
Recent Announcements
Recent Announcements
博客园 - Franky
N
News | PayPal Newsroom
V
V2EX
A
About on SuperTechFans
The Register - Security
The Register - Security
月光博客
月光博客
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
Google Online Security Blog
Google Online Security Blog
MyScale Blog
MyScale Blog
Cisco Talos Blog
Cisco Talos Blog
Vercel News
Vercel News
WordPress大学
WordPress大学
C
Cyber Attacks, Cyber Crime and Cyber Security
The Hacker News
The Hacker News
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
爱范儿
爱范儿
A
Arctic Wolf
L
LINUX DO - 最新话题
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More

博客园 - 每天进步多一点

C#学习相关系列之Linq用法---group和join相关用法 linq group by having 实现 常用知识-T-SQL优化 mysql窗口函数、Mysql分析函数 MySQL系列三(定位慢SQL、索引优化、SQL优化)Using filesort MySQL 内存相关参数设置 MySQL COALESCE 函数使用详解 SQL性能优化指南:如何优化MySQL多表join场景 MySQL内部临时表(Using temporary)案例详解及优化解决方法 cookie操作类(加密,获取,删除) MySql 5.7 索引不存在则创建,存在则忽略 SQL SERVER年月周日超止时间 数据抽取的常见理论方法 常用时间sql语句 数据库运维:mysql 数据库迁移方法-mysqldump 了解MySQL中的JSON_ARRAYAGG和JSON_OBJECT函数 MySQL的IFNULL()、ISNULL()、NULLIF()函数用法说明 如何看懂explain工具信息,使用explain工具来分析索引 mysql 如何查看sql语句执行时间和效率
ETL系列-数据抽取(Extract)
每天进步多一点 · 2025-11-29 · via 博客园 - 每天进步多一点

1、数据抽取:确定数据源,定义数据接口,选择数据抽取方法(主动抽取或由源系统推送)。
2、数据清洗:处理不完整数据、错误数据、重复数据等,确保数据的准确性和一致性。(是数据转换的一部分)
3、数据转换:进行空值处理、数据标准统一、数据拆分、数据验证、数据替换和数据关联等操作。
4、规则检查:根据业务需求进行数据质量和业务规则的校验。
5、数据加载:将数据缓冲区的数据加载到目标数据库或数据仓库中,可能是全量加载或增量加载。

  • 全量抽取
    • 特点:一次性抽取所有数据,适合数据量较小或首次抽取的场景。
    • 实现方式:直接查询整个表或读取整个文件。
  • 增量抽取
    • 特点:仅抽取发生变化的数据,适合数据量较大且需要频繁更新的场景。
    • 常用技术:
    • 时间戳:通过记录最后更新时间来抽取新增或修改的数据。
    • CDC(Change Data Capture):通过数据库日志或触发器捕获数据变化
  • 数据库抽取

    • mysql、oracle等
  • 文件抽取

    • 读取文件:使用文件读取库(如csv)。
  • API 抽取

    • HTTP 请求:使用 HTTP 客户端库(如 requests)发送请求,获取 API 返回的数据。
  • 消息队列抽取

    • 订阅消息:使用消息队列客户端(如 Kafka Consumer)订阅消息并获取数据。
  1. Apache NiFi

    • 特点:提供可视化界面,支持实时和批量数据抽取,内置多种数据源连接器(如数据库、API、文件系统等)。
    • 适用场景:适合需要实时数据流处理的场景,支持复杂的数据路由和转换逻辑。
  2. Apache Kafka

  • 特点:分布式流处理平台,支持高吞吐量的实时数据抽取和传输。
  • 适用场景:适合需要实时数据流处理的场景,常与 Spark Streaming 或 Flink 结合使用。
  1. Talend Open Studio
  • 特点:提供图形化界面,支持多种数据源(如数据库、文件、API)的抽取,支持代码生成。
  • 适用场景:适合中小型项目,支持快速开发和部署。
  1. Apache Sqoop
  • 特点:专门用于在 Hadoop 和关系型数据库之间传输数据,支持增量抽取。
  • 适用场景:适合大数据场景,尤其是 Hadoop 生态系统的数据抽取。
  1. Logstash
  • 特点:主要用于日志数据的抽取和传输,支持多种输入和输出插件。
  • 适用场景:适合日志数据的实时抽取和处理。

一、数据抽取的场景

假设我们需要从以下三个数据源中抽取数据:

  1. MySQL 数据库:抽取用户表(users)中的数据。
  2. CSV 文件:抽取一个包含订单信息的文件(orders.csv)。
  3. API:从一个公开的 API 中抽取天气数据。

二、数据抽取的实现

1. 从 MySQL 数据库抽取数据
  • 工具:Python + pymysql 或 SQLAlchemy
  • 步骤:
    1. 连接数据库。
    2. 执行 SQL 查询。
    3. 将查询结果保存到 DataFrame 或文件中。
import pandas as pd
from sqlalchemy import create_engine

# 数据库连接配置
db_config = {
    'host': 'localhost',
    'user': 'root',
    'password': 'password',
    'database': 'test_db'
}

# 创建数据库连接
engine = create_engine(f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}")

# 执行 SQL 查询
query = "SELECT * FROM users"  # 假设 users 表包含 id, name, age 字段
df_users = pd.read_sql(query, engine)

# 输出结果
print("从 MySQL 抽取的用户数据:")
print(df_users)
2. 从 CSV 文件抽取数据
  • 工具:Python + pandas
  • 步骤:
    1. 读取 CSV 文件。
    2. 将数据加载到 DataFrame 中。
import pandas as pd

# 读取 CSV 文件
df_orders = pd.read_csv('orders.csv')  # 假设 orders.csv 包含 order_id, user_id, amount 字段

# 输出结果
print("从 CSV 文件抽取的订单数据:")
print(df_orders)
3. 从 API 抽取数据
  • 工具:Python + requests
  • 步骤:
    1. 发送 HTTP 请求到 API。
    2. 解析返回的 JSON 数据。
    3. 将数据保存到 DataFrame 中。
import requests
import pandas as pd

# API 配置
api_url = "https://api.weatherapi.com/v1/current.json"
api_key = "your_api_key"  # 替换为你的 API Key
params = {
    'key': api_key,
    'q': 'Beijing'  # 查询北京的天气
}

# 发送 HTTP 请求
response = requests.get(api_url, params=params)
data = response.json()  # 解析 JSON 数据

# 将数据保存到 DataFrame
weather_data = {
    'location': data['location']['name'],
    'temperature': data['current']['temp_c'],
    'condition': data['current']['condition']['text']
}
df_weather = pd.DataFrame([weather_data])

# 输出结果
print("从 API 抽取的天气数据:")
print(df_weather)

三、完整代码示例

以下是整合了上述三种数据抽取方式的完整代码:

import pandas as pd
from sqlalchemy import create_engine
import requests

# 1. 从 MySQL 数据库抽取数据
def extract_from_mysql():
    # 数据库连接配置
    db_config = {
        'host': 'localhost',
        'user': 'root',
        'password': 'password',
        'database': 'test_db'
    }
    # 创建数据库连接
    engine = create_engine(f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}")
    # 执行 SQL 查询
    query = "SELECT * FROM users"
    df_users = pd.read_sql(query, engine)
    return df_users

# 2. 从 CSV 文件抽取数据
def extract_from_csv():
    df_orders = pd.read_csv('orders.csv')
    return df_orders

# 3. 从 API 抽取数据
def extract_from_api():
    api_url = "https://api.weatherapi.com/v1/current.json"
    api_key = "your_api_key"  # 替换为你的 API Key
    params = {
        'key': api_key,
        'q': 'Beijing'
    }
    response = requests.get(api_url, params=params)
    data = response.json()
    weather_data = {
        'location': data['location']['name'],
        'temperature': data['current']['temp_c'],
        'condition': data['current']['condition']['text']
    }
    df_weather = pd.DataFrame([weather_data])
    return df_weather

# 主函数
if __name__ == "__main__":
    # 抽取数据
    df_users = extract_from_mysql()
    df_orders = extract_from_csv()
    df_weather = extract_from_api()

    # 输出结果
    print("从 MySQL 抽取的用户数据:")
    print(df_users)
    print("\n从 CSV 文件抽取的订单数据:")
    print(df_orders)
    print("\n从 API 抽取的天气数据:")
    print(df_weather)

四、运行结果

假设数据如下:

  1. MySQL 用户表:
1 Alice 25 2 Bob 30
idnameage
  1. CSV 订单文件:
101 1 100.0 102 2 200.0
order_iduser_idamount
  1. API 天气数据:
Beijing 20.0 Sunny
locationtemperaturecondition

运行代码后,输出如下:

从 MySQL 抽取的用户数据:
   id   name  age
0   1  Alice   25
1   2    Bob   30

从 CSV 文件抽取的订单数据:
   order_id  user_id  amount
0       101        1   100.0
1       102        2   200.0

从 API 抽取的天气数据:
  location  temperature condition
0  Beijing         20.0     Sunny

五、总结

数据抽取是 ETL 流程的第一步,通常涉及从多种数据源(如数据库、文件、API)中提取数据。通过 Python 和相关库(如 pandasSQLAlchemyrequests),可以轻松实现数据抽取任务。你可以根据实际需求扩展这个例子,比如支持增量抽取、处理异常情况等。希望这个例子对你有帮助!