


























AI 概述
本文介绍PostgreSQL 18新增逻辑复制原生冲突检测能力,梳理增删改七类冲突场景,对比旧版靠触发器处理的短板,讲解底层源码实现、时间戳参数、冲突统计视图、订阅配置,附带实操案例、性能开销与后续版本规划。
目录
文章目录隐藏

PostgreSQL 18 重磅升级了逻辑复制能力,新增系统级冲突检测与日志统计功能,彻底解决旧版本无统一冲突机制、需手动写触发器处理的痛点。本文详细梳理多源、双向复制中的各类冲突场景,全面讲解七种冲突类型、底层实现原理、配置方式、统计视图与实战案例,并补充性能优化及未来版本的功能展望。
在双向复制或多源复制中,冲突不可避免:
场景 1:INSERT 冲突 ┌─────────────────────────────────────────────────────────────┐ │ 数据库 A:INSERT INTO users (id, name) VALUES (1, 'Alice') │ │ 数据库 B:INSERT INTO users (id, name) VALUES (1, 'Bob') │ │ │ │ 同步时: │ │ A → B:B 收到 INSERT (id=1),但 id=1 已存在! │ │ 冲突:唯一键冲突 │ └─────────────────────────────────────────────────────────────┘ 场景 2:UPDATE 冲突 ┌─────────────────────────────────────────────────────────────┐ │ 数据库 A:UPDATE users SET name='Alice' WHERE id=1 │ │ (修改的是原始数据) │ │ │ │ 数据库 B:UPDATE users SET name='Bob' WHERE id=1 │ │ (修改的是同样的行) │ │ │ │ 同步时: │ │ B 先修改了 id=1 → name='Bob' │ │ A 的修改到达 B: │ │ - B 查询 WHERE id=1,找到的是 name='Bob' │ │ - 但 A 的修改基于的是 name='原始值' │ │ - 冲突:更新的是"错误"的行版本 │ └─────────────────────────────────────────────────────────────┘ 场景 3:DELETE 冲突 ┌─────────────────────────────────────────────────────────────┐ │ 数据库 A:DELETE FROM users WHERE id=1 │ │ 数据库 B:UPDATE users SET name='Modified' WHERE id=1 │ │ │ │ 同步时: │ │ B 先执行 UPDATE,id=1 仍然存在 │ │ A 的 DELETE 到达 B: │ │ - 查找 WHERE id=1 │ │ - 找到行,但行已被 B 修改 │ │ - 冲突:删除的行已被其他人修改 │ │ │ │ 或者另一种情况: │ │ A 的 DELETE 先到达 B,删除成功 │ │ B 的 UPDATE 到达 A: │ │ - 查找 WHERE id=1 │ │ - 找不到行! │ │ - 冲突:更新的行不存在 │ └─────────────────────────────────────────────────────────────┘
在 PostgreSQL 17 及之前:
-- 面对冲突,订阅端只能: -- 1. 默认行为:错误中止,整个事务失败 -- 2. 使用 trigger:手动处理冲突 -- 3. 使用应用层逻辑:在应用代码中处理 -- 示例:使用 trigger 处理 INSERT 冲突 CREATE OR REPLACE FUNCTION handle_insert_conflict() RETURNS TRIGGER AS $$ BEGIN -- 如果 INSERT 冲突,改为 UPDATE UPDATE users SET name = NEW.name WHERE id = NEW.id; RETURN NULL; END; $$ LANGUAGE plpgsql; CREATE TRIGGER conflict_handler BEFORE INSERT ON users FOR EACH ROW WHEN (EXISTS (SELECT 1 FROM users WHERE id = NEW.id)) EXECUTE FUNCTION handle_insert_conflict();
痛点:
PostgreSQL 18 定义了完整的冲突类型枚举:
// 源码位置: src/include/replication/conflict.h
typedef enum ConflictType {
// INSERT 冲突
CT_INSERT_EXISTS, // INSERT 违反唯一约束
// UPDATE 冲突
CT_UPDATE_ORIGIN_DIFFERS, // UPDATE 的行被其他 origin 修改
CT_UPDATE_EXISTS, // UPDATE 违反唯一约束
CT_UPDATE_MISSING, // UPDATE 的行不存在
// DELETE 冲突
CT_DELETE_ORIGIN_DIFFERS, // DELETE 的行被其他 origin 修改
CT_DELETE_MISSING, // DELETE 的行不存在
// 多重冲突
CT_MULTIPLE_UNIQUE_CONFLICTS, // 同时违反多个唯一约束
} ConflictType;
场景: ┌─────────────────────────────────────────────────────────────┐ │ 本地表:users (id PRIMARY KEY) │ │ 已有数据:id=1, name='Alice' │ │ │ │ 远端发送:INSERT (id=1, name='Bob') │ │ │ │ 检测:id=1 已存在,违反主键约束 │ │ 冲突类型:CT_INSERT_EXISTS │ └─────────────────────────────────────────────────────────────┘
场景: ┌─────────────────────────────────────────────────────────────┐ │ 本地表:users │ │ 行数据:id=1, name='Alice' │ │ 行来源:本地修改(origin=InvalidRepOriginId) │ │ │ │ 远端发送:UPDATE (id=1) SET name='Bob' │ │ 远端 origin:origin_A │ │ │ │ 检测: │ │ - 行存在 │ │ - 行的 origin ≠ 远端 origin │ │ - 冲突:行被不同来源修改 │ │ │ │ 冲突类型:CT_UPDATE_ORIGIN_DIFFERS │ └─────────────────────────────────────────────────────────────┘
为什么这算冲突?
因为 UPDATE 的语义假设”行没有被其他人修改”。如果行被其他来源修改,当前 UPDATE 可能不是你想要的。
场景: ┌─────────────────────────────────────────────────────────────┐ │ 本地表:users │ │ 索引:(email) UNIQUE, (phone) UNIQUE │ │ 行数据:id=1, email='a@x.com', phone='111' │ │ │ │ 远端发送:UPDATE (id=1) SET email='b@x.com', phone='222' │ │ │ │ 但本地已有:email='b@x.com'(在另一行) │ │ │ │ 检测:UPDATE 后违反唯一约束 │ │ 冲突类型:CT_UPDATE_EXISTS │ └─────────────────────────────────────────────────────────────┘
场景: ┌─────────────────────────────────────────────────────────────┐ │ 本地表:users │ │ 数据:无 id=1 的行 │ │ │ │ 远端发送:UPDATE (id=1) SET name='Bob' │ │ │ │ 检测:WHERE id=1 找不到行 │ │ 冲突类型:CT_UPDATE_MISSING │ └─────────────────────────────────────────────────────────────┘
场景: ┌─────────────────────────────────────────────────────────────┐ │ 本地表:users │ │ 行数据:id=1, name='Alice' │ │ 行来源:origin_B │ │ │ │ 远端发送:DELETE (id=1) │ │ 远端 origin:origin_A │ │ │ │ 检测: │ │ - 行存在 │ │ - 行的 origin ≠ 远端 origin │ │ - 冲突:试图删除被其他来源修改的行 │ │ │ │ 冲突类型:CT_DELETE_ORIGIN_DIFFERS │ └─────────────────────────────────────────────────────────────┘
场景: ┌─────────────────────────────────────────────────────────────┐ │ 本地表:users │ │ 数据:无 id=1 的行 │ │ │ │ 远端发送:DELETE (id=1) │ │ │ │ 检测:WHERE id=1 找不到行 │ │ 冲突类型:CT_DELETE_MISSING │ └─────────────────────────────────────────────────────────────┘
场景: ┌─────────────────────────────────────────────────────────────┐ │ 本地表:users │ │ 索引:(email) UNIQUE, (username) UNIQUE │ │ │ │ 行 A:email='a@x.com', username='user_a' │ │ 行 B:email='b@x.com', username='user_b' │ │ │ │ 远端发送:INSERT email='a@x.com', username='user_b' │ │ │ │ 检测: │ │ - email='a@x.com' 与行 A 冲突 │ │ - username='user_b' 与行 B 冲突 │ │ - 同时违反多个唯一约束 │ │ │ │ 冲突类型:CT_MULTIPLE_UNIQUE_CONFLICTS │ └─────────────────────────────────────────────────────────────┘
// 源码位置: src/include/replication/conflict.h
typedef struct ConflictTupleInfo {
TupleTableSlot *slot; // 冲突的本地行数据
Oid indexoid; // 冲突发生的索引 OID
TransactionId xmin; // 修改本地行的事务 ID
RepOriginId origin; // 修改本地行的 origin
TimestampTz ts; // 修改本地行的时间戳
} ConflictTupleInfo;
// 源码位置: src/backend/replication/logical/conflict.c
bool GetTupleTransactionInfo(
TupleTableSlot *localslot,
TransactionId *xmin,
RepOriginId *localorigin,
TimestampTz *localts
) {
// 1. 获取行的 xmin(系统列)
Datum xminDatum = slot_getsysattr(localslot,
MinTransactionIdAttributeNumber,
&isnull);
*xmin = DatumGetTransactionId(xminDatum);
// 2. 如果 track_commit_timestamp 启用,获取更多信息
if (track_commit_timestamp) {
return TransactionIdGetCommitTsData(*xmin, localts, localorigin);
}
// 3. 否则只能获取 xmin
*localorigin = InvalidRepOriginId;
*localts = 0;
return false;
}
void ReportApplyConflict(
EState *estate,
ResultRelInfo *relinfo,
int elevel, // 日志级别
ConflictType type,
TupleTableSlot *searchslot, // 远端查询的行
TupleTableSlot *remoteslot, // 远端新行
List *conflicttuples // 冲突的本地行列表
) {
StringInfoData err_detail;
initStringInfo(&err_detail);
// 构建详细信息
foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) {
errdetail_apply_conflict(estate, relinfo, type,
searchslot, conflicttuple->slot, remoteslot,
conflicttuple->indexoid,
conflicttuple->xmin,
conflicttuple->origin,
conflicttuple->ts,
&err_detail);
}
// 报告统计
pgstat_report_subscription_conflict(MySubscription->oid, type);
// 发出日志
ereport(elevel,
errcode_apply_conflict(type),
errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
get_namespace_name(RelationGetNamespace(localrel)),
RelationGetRelationName(localrel),
ConflictTypeNames[type]),
errdetail_internal("%s", err_detail.data));
}
-- 启用提交时间戳追踪 ALTER SYSTEM SET track_commit_timestamp = on; SELECT pg_reload_conf(); -- 需要重启生效 -- 重启 PostgreSQL pg_ctl restart
不启用 track_commit_timestamp 时: ┌─────────────────────────────────────────────────────────────┐ │ 冲突报告: │ │ "Key already exists in unique index 'users_pkey', │ │ modified in transaction 1234." │ │ │ │ 信息有限: │ │ - 只知道事务 ID │ │ - 不知道什么时候修改的 │ │ - 不知道是哪个 origin 修改的 │ └─────────────────────────────────────────────────────────────┘ 启用 track_commit_timestamp 后: ┌─────────────────────────────────────────────────────────────┐ │ 冲突报告: │ │ "Key already exists in unique index 'users_pkey', │ │ modified by origin 'node_B' in transaction 1234 │ │ at 2024-01-15 10:30:45." │ │ │ │ 信息丰富: │ │ - 知道事务 ID │ │ - 知道修改时间 │ │ - 知道是哪个 origin 修改的 │ │ │ │ 这对于调试和决策非常重要! │ └─────────────────────────────────────────────────────────────┘
PostgreSQL 使用 pg_commit_ts SLRU 存储时间戳:
$PGDATA/pg_commit_ts/ ├── 0000 # 存储事务时间戳和 origin ├── 0001 └── ...
\d pg_stat_subscription_stats View "pg_catalog.pg_stat_subscription_stats" ┌─────────────────────────────────────────────────────────────────────┬ │ Column │ Type │ ├─────────────────────────────────────────────────────────────────────┤ │ subid │ oid │ │ subname │ text │ │ conflict_insert_exists │ bigint │ │ conflict_update_origin_differs │ bigint │ │ conflict_update_exists │ bigint │ │ conflict_update_missing │ bigint │ │ conflict_delete_origin_differs │ bigint │ │ conflict_delete_missing │ bigint │ │ conflict_multiple_unique │ bigint │ └─────────────────────────────────────────────────────────────────────┘
SELECT subname, conflict_insert_exists AS insert_conflicts, conflict_update_origin_differs AS update_origin_conflicts, conflict_update_missing AS update_missing, conflict_delete_missing AS delete_missing FROM pg_stat_subscription_stats; ┌─────────────────────────────────────────────────────────────────────┐ │ subname │ insert │ update_origin │ update_missing │ delete │ ├─────────────────────────────────────────────────────────────────────┤ │ my_subscription │ 5 │ 3 │ 2 │ 1 │ │ other_sub │ 0 │ 10 │ 0 │ 0 │ └─────────────────────────────────────────────────────────────────────┘ -- 重置统计 SELECT pg_stat_reset_subscription_stats(sub_oid);
CREATE SUBSCRIPTION my_sub CONNECTION 'host=primary port=5432 dbname=mydb' PUBLICATION my_pub WITH ( -- 冲突检测相关参数 conflict_resolution = 'error', -- 默认:出错中止 -- 或其他选项... );
-- 当前 PG 18 的选项 conflict_resolution = 'error' -- 默认:出错,事务中止 -- 未来可能支持的选项(正在讨论中) conflict_resolution = 'ignore' -- 忽略冲突,继续 conflict_resolution = 'skip' -- 跳过冲突行,继续事务 conflict_resolution = 'apply_if_newer' -- 如果远端更新,则应用 conflict_resolution = 'apply_always' -- 强制应用,覆盖本地
-- 准备环境 -- 数据库 A(发布端) CREATE TABLE users (id INT PRIMARY KEY, name TEXT); INSERT INTO users VALUES (1, 'Alice'); CREATE PUBLICATION pub_users FOR TABLE users; -- 数据库 B(订阅端) CREATE TABLE users (id INT PRIMARY KEY, name TEXT); -- 假设已经有数据(可能是手动插入或另一条复制) INSERT INTO users VALUES (1, 'Bob'); CREATE SUBSCRIPTION sub_users CONNECTION 'host=A port=5432 dbname=mydb' PUBLICATION pub_users; -- 启用 track_commit_timestamp(两边都要) ALTER SYSTEM SET track_commit_timestamp = on;
结果:
-- 在 B 上查看日志 -- PostgreSQL 输出: 2024-01-15 10:30:45 [ERROR] conflict detected on relation "public.users": conflict=insert_exists DETAIL: Key already exists in unique index "users_pkey", modified locally in transaction 1234 at 2024-01-15 10:00:00. Key (id)=(1). Remote tuple: (1, 'Alice'). Local tuple: (1, 'Bob').
-- 场景:多源复制 -- A → B → C 三节点 -- 在 C 上,一个行可能被 A 或 B 修改 -- 如果 A 和 B 都修改同一行,就会产生冲突 -- 查看 B 发送来的 UPDATE SELECT * FROM pg_stat_subscription_stats; ┌─────────────────────────────────────────────────────────────────────┐ │ subname │ conflict_update_origin_differs │ │ ├─────────────────────────────────────────────────────────────────────┤ │ sub_from_A │ 5 │ │ │ sub_from_B │ 0 │ │ └─────────────────────────────────────────────────────────────────────┘ -- 日志输出: 2024-01-15 11:00:00 [ERROR] conflict detected on relation "public.products": conflict=update_origin_differs DETAIL: Updating the row that was modified by a different origin "node_A" in transaction 5678 at 2024-01-15 10:50:00. Key (id)=(100). Remote new tuple: (100, 'Product from B', 50.00). Local tuple: (100, 'Product from A', 45.00).
冲突检测需要知道哪些索引可能产生冲突:
// 源码位置: src/backend/replication/logical/conflict.c
void InitConflictIndexes(ResultRelInfo *relInfo) {
List *uniqueIndexes = NIL;
// 遍历所有索引
for (int i = 0; i < relInfo->ri_NumIndices; i++) {
Relation indexRelation = relInfo->ri_IndexRelationDescs[i];
if (indexRelation == NULL)
continue;
// 只检测唯一索引
if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique)
continue;
// 不支持延迟约束索引
if (!indexRelation->rd_index->indimmediate)
continue;
// 加入列表
uniqueIndexes = lappend_oid(uniqueIndexes,
RelationGetRelid(indexRelation));
}
// 存储到 ResultRelInfo
relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
}
应用 INSERT 时: ┌─────────────────────────────────────────────────────────────┐ │ 1. 遍历 ri_onConflictArbiterIndexes │ │ │ │ 2. 对每个唯一索引,尝试查找匹配的行 │ │ │ │ 3. 如果找到匹配行: │ │ - 获取行的 xmin, origin, timestamp │ │ - 构建 ConflictTupleInfo │ │ - 加入冲突列表 │ │ │ │ 4. 如果冲突列表非空: │ │ - 调用 ReportApplyConflict() │ │ - 根据配置决定如何处理 │ └─────────────────────────────────────────────────────────────┘
如果使用 pg_replicate 写入目标 PostgreSQL:
// 在你的 Sink 实现中
async fn write_cdc_events(&mut self, events: Vec<CdcEvent>) -> Result<PgLsn> {
for event in events {
match event {
CdcEvent::Insert((tid, row, xid)) => {
// 执行 INSERT
// PostgreSQL 会自动检测唯一约束冲突
let result = self.db.execute(
"INSERT INTO users VALUES (?, ?)",
&[&row.values[0], &row.values[1]]
).await;
match result {
Ok(_) => continue,
Err(e) if is_unique_violation(&e) => {
// 这是冲突!
// 记录日志,决定如何处理
log::warn!(
"INSERT conflict: table={}, key={}",
tid, row.values[0]
);
// 可以选择:
// 1. 跳过这条 INSERT
// 2. 转换为 UPDATE
// 3. 报错中止
// 示例:转换为 UPDATE(类似 UPSERT)
self.db.execute(
"UPDATE users SET name = ? WHERE id = ?",
&[&row.values[1], &row.values[0]]
).await?;
}
Err(e) => return Err(e.into()),
}
}
// 其他事件...
}
}
}
// 在 pg_replicate 中监控冲突统计
async fn check_conflict_stats(&self) -> Result<()> {
let stats = self.db.query(
"SELECT conflict_insert_exists,
conflict_update_origin_differs,
conflict_update_missing,
conflict_delete_missing
FROM pg_stat_subscription_stats
WHERE subname = 'my_sub'",
&[]
).await?;
// 如果冲突数量过多,发出告警
if stats[0].get::<i64, _>("conflict_insert_exists") > 100 {
log::warn!("High number of INSERT conflicts detected!");
}
}
根据 PostgreSQL 邮件列表讨论,未来可能增加:
1. 自动冲突解决策略 ┌─────────────────────────────────────────────────────────────┐ │ CREATE SUBSCRIPTION ... WITH ( │ │ conflict_resolution = 'apply_if_newer' -- 时间戳优先 │ │ ); │ │ │ │ 原理:比较本地和远端的时间戳,选择更新的 │ └─────────────────────────────────────────────────────────────┘ 2. 冲突日志持久化 ┌─────────────────────────────────────────────────────────────┐ │ -- 冲突信息写入专门的表 │ │ SELECT * FROM pg_replication_conflicts; │ │ │ │ -- 可以事后分析冲突 │ └─────────────────────────────────────────────────────────────┘ 3. 冲突回调函数 ┌─────────────────────────────────────────────────────────────┐ │ -- 用户定义如何处理冲突 │ │ CREATE FUNCTION my_conflict_handler(conflict_type, ...) │ │ AS $$ ... $$; │ │ │ │ CREATE SUBSCRIPTION ... WITH ( │ │ conflict_handler = 'my_conflict_handler' │ │ ); │ └─────────────────────────────────────────────────────────────┘
每次 INSERT/UPDATE/DELETE 应用时: ┌─────────────────────────────────────────────────────────────┐ │ 1. 查找唯一索引:O(log n) │ │ - 对于每个唯一索引 │ │ │ │ 2. 获取行元数据(如果冲突): │ │ - xmin 系统列获取 │ │ - commit_ts 查询(如果启用) │ │ │ │ 3. 构建错误消息: │ │ - 格式化行数据 │ │ - 字符串操作 │ └─────────────────────────────────────────────────────────────┘ 额外开销: - 有冲突时:中等(日志记录) - 无冲突时:很小(仅索引查找)
-- 1. 只对必要的表启用冲突检测 -- 如果表不会有冲突,不需要检测 -- 2. 避免过多唯一索引 -- 每个唯一索引都需要检测 -- 3. 使用 REPLICA IDENTITY 设置 ALTER TABLE users REPLICA IDENTITY FULL; -- 确保 DELETE/UPDATE 有足够信息检测冲突 -- 4. 监控冲突统计,调整策略 SELECT * FROM pg_stat_subscription_stats; -- 如果某类冲突频繁,考虑数据分区或其他策略
| 配置项 | 命令 | 说明 |
|---|---|---|
| track_commit_timestamp | ALTER SYSTEM SET track_commit_timestamp = on | 启用时间戳追踪 |
| conflict_resolution | CREATE SUBSCRIPTION … WITH (conflict_resolution = …) | 冲突处理策略 |
| REPLICA IDENTITY | ALTER TABLE … REPLICA IDENTITY FULL | 设置副本标识 |
| 文件 | 路径 | 内容 |
|---|---|---|
| conflict.c | src/backend/replication/logical/ | 冲突检测实现(15KB) |
| conflict.h | src/include/replication/ | 冲突类型定义 |
| worker.c | src/backend/replication/logical/ | 应用变更时调用冲突检测 |
| pgstat_relation.c | src/backend/pgstat/ | 统计收集 |
| logicalproto.h | src/include/replication/ | 协议消息类型 |
PostgreSQL 18 的冲突检测机制,补齐了逻辑复制的核心短板,实现冲突可检测、可日志、可统计、可追溯。通过开启时间戳追踪、合理配置副本标识与冲突策略,能精准定位复制异常。该功能大幅降低了多源复制运维难度,也为后续自动冲突修复、持久化日志等进阶能力打下坚实基础。
推荐阅读:
SQLite、MySQL、PostgreSQL 三者区别是什么?
新手必看!PostgreSQL 入门超实用避坑秘籍,让你轻松少走弯路
MySQL 与 PostgreSQL 软删除进阶的唯一索引模式
以上关于详解PostgreSQL 18 逻辑复制冲突检测与解决方法的文章就介绍到这了,更多相关内容请搜索码云笔记以前的文章或继续浏览下面的相关文章,希望大家以后多多支持码云笔记。
声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 admin@mybj123.com 进行投诉反馈,一经查实,立即处理!
重要:如软件存在付费、会员、充值等,均属软件开发者或所属公司行为,与本站无关,网友需自行判断
码云笔记 » 详解PostgreSQL 18 逻辑复制冲突检测与解决方法
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。