惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

酷 壳 – CoolShell
酷 壳 – CoolShell
H
Hacker News: Front Page
P
Palo Alto Networks Blog
T
ThreatConnect
Apple Machine Learning Research
Apple Machine Learning Research
博客园_首页
T
True Tiger Recordings
P
Privacy & Cybersecurity Law Blog
B
Blog
IT之家
IT之家
Last Week in AI
Last Week in AI
F
Full Disclosure
Hacker News: Ask HN
Hacker News: Ask HN
C
Comments on: Blog
Microsoft Azure Blog
Microsoft Azure Blog
C
Cybersecurity and Infrastructure Security Agency CISA
Microsoft Security Blog
Microsoft Security Blog
博客园 - 【当耐特】
N
News and Events Feed by Topic
NISL@THU
NISL@THU
腾讯CDC
雷峰网
雷峰网
Security Latest
Security Latest
李成银的技术随笔
M
Microsoft Research Blog - Microsoft Research
L
LangChain Blog
L
Lohrmann on Cybersecurity
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
C
Check Point Blog
Y
Y Combinator Blog
Recent Announcements
Recent Announcements
博客园 - Franky
N
News | PayPal Newsroom
V
V2EX
A
About on SuperTechFans
The Register - Security
The Register - Security
月光博客
月光博客
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
Google Online Security Blog
Google Online Security Blog
MyScale Blog
MyScale Blog
Cisco Talos Blog
Cisco Talos Blog
Vercel News
Vercel News
WordPress大学
WordPress大学
C
Cyber Attacks, Cyber Crime and Cyber Security
The Hacker News
The Hacker News
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
爱范儿
爱范儿
A
Arctic Wolf
L
LINUX DO - 最新话题
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More

博客园 - 青石路

Claude Code安装,接入阿里云百炼模型,蹭蹭免费额度 异源数据同步 → 记一次 DataX 已同步数据量优化 明明连接的是Redis的DB0,为什么能查到DB3的数据? TINYINT(1) 类型的字段,明明数据存的是 2,为什么查出来是 true 用了MySQL的INSERT ON DUPLICATE KEY UPDATE,怎么还报唯一索引冲突错误 那些年不该放到事务中的操作,你实现过哪些 关于布尔类型的变量不要加 is 前缀,被网友们吐槽了,特来完善下 都说了布尔类型的变量不要加 is 前缀,非要加,这不是坑我了嘛 安全漏洞修复导致SpringBoot2.7与Springfox不兼容,问题排查与处理 记一次SQL隐式转换导致精度丢失问题的排查 → 不规范就踩坑 经由同个文件多次压缩的文件MD5都不一样问题排查,感慨AI的强大! 记一次cannot access its superinterface问题的的排查 → 强如Spring也一样写Bug SpringBoot支持Kafka多源配置的同时还要支持启停配置化,是真的会玩 如果XXL-JOB执行器在执行某任务中被重启了,重启后该任务能够被自动弥补调度吗 Spring Boot读取外部配置文件失败,原因绝对出乎你意料 不依赖 Spring,你会如何自实现 RabbitMQ 消息的消费(一) 异源数据同步 → DataX 同步启动后如何手动终止? 记一次 RabbitMQ 消费者莫名消失问题的排查 不升级 POI 版本,如何生成符合新版标准的Excel 2007文件 以MySQL为例,来看看maven-shade-plugin如何解决多版本驱动共存的问题? maven 插件之 maven-shade-plugin,解决同包同名 class 共存问题的神器
异源数据同步 → 如何获取 DataX 已同步数据量?
青石路 · 2024-11-04 · via 博客园 - 青石路

开心一刻

今天,表妹问我:哥,我男朋友过两天要生日了,你们男生一般喜欢什么,帮忙推荐个礼物呗
我:预算多少
表妹:预算300
我:20块买条黑丝,剩下280给自己买支口红,你男朋友生日那天你都给自己用上
表妹:秒啊,哥
我:必须的嘛,你要知道男人最懂男人!

开心一刻

前情回顾

关于异源数据同步工具 DataX,我已经写了好几篇文章

异构数据源同步之数据同步 → datax 改造,有点意思
异构数据源同步之数据同步 → datax 再改造,开始触及源码
异构数据源同步之数据同步 → DataX 使用细节
异构数据源数据同步 → 从源码分析 DataX 敏感信息的加解密
异源数据同步 → DataX 为什么要支持 kafka?

推荐大家去看看,可以对 DataX 有个基本的了解,掌握其基本使用;示例代码:qsl-datax

需求背景

假设我们基于 XXL-JOB 实现调度,qsl-datax-hook 作为 XXL-JOB 执行器的同时也充当 DataX 的拉起方,三者调用关系如下

datax同步数据量

离线同步的数据量往往会很大,少则上万,多则上亿,所以同步过程一般会持续很长时间,如何确认同步仍在进行中呢?我们可以看日志,也可以查目标表的记录数,但都不够直观,好的实现方式应该是有同步任务查看页面,通过该页面可以查看到正在同步中的任务,以及这些任务已同步的数据量,所以问题就来到

如何获取 DataX 已同步数据量?

已同步数据量

换做是你们,你们会如何实现?或者说有什么思路?我提供下我的方案,是不是也是你们所想

DataX 的 Writer 往目标源写数据的时候,一次写多少数据我们就记录多少,然后累加并持久化,是不是就可以实时查看当前的已同步数据量呢?

具体如何实现了,我们可以基于 DataX 的日志来实现;我们在讲 异构数据源同步之数据同步 → datax 再改造,开始触及源码 的时候,对日志进行了调整,qsl-datax-hook 能够获取 DataX 进程的日志输出,所以我们只需要在 DataX 往目标源写入数据完成后往日志中写入一条记录(写入了多少数量),qsl-datax-hook 就能够获取该记录,从而得到写入数据量,然后进行累加操作;我们以 mysqlwriter 为例,来看看通过代码到底如何实现

  1. writer 往日志中写 同步数据量

    从哪里找切入点,我就不绕弯子了

    writer写日志起点

    跟进 startWriteWithConnection,有如下代码

    doBatchInsert

    可以看到是批量写入的,继续跟进 doBatchInsert

    doBatchInsert关键点

    分两种情况

    1. 正常情况,批量插入并 commit 成功

    2. 异常情况,先回滚批量插入,然后通过 doOneInsert

      doOneInsert

      逐条插入

    所以在哪里写 同步数据量 的日志是不是清楚了,有两个地方需要写

    1. doBatchInsert 批量插入 commit 之后写日志

      批量插入记录同步数据量

    2. doOneInsert 单条插入后写日志

      单挑插入记录同步数据量

    DataX 就算改造好了,是不是很简单?

  2. qsl-datax-hook 读取 DataX 日志中的 同步数据量 并持久化

    com.qsl.hook.DataXManager#exec 适配改造下即可

    hook适配改造

    做持久化的时候一定要采用

    update table_name set sync_rows = sync_rows + syncRows;
    

    的方式,利用数据库的锁来避免并发问题,而采用 set 具体的值

    update table_name set sync_rows = totalSyncRows;
    

    会有并发覆盖问题,比如第一次将总量更新成 50000,而第二次可能将总量更新成 48000

至此,需求就算基本完成了;其他类型的 DataX writer 可以采用类似的方式来实现,具体就不演示了,你们自行去实现

总结

如果目标源支持事务,那么 已同步数据量 可以实现的很准确,如果目标源不支持事务,那么 已同步数据量 实现的就不会很准确,或者说实现难度非常高;文中讲到的日志方式,只是实现方式之一,还有其他的实现方式,例如

  1. 定时读取目标源的数据量

  2. 改造DataX,直接持久化 已同步数据量

    update table_name set sync_rows = sync_rows + syncRows;
    

各种方式都有其优势,也存在其弊端,需要结合业务选择合适的方式