























@@ -63,6 +63,13 @@ type PersistedMessageReadResult = TelegramMessageCacheBucket & {
6363needsRewrite: boolean;
6464};
656566+type TelegramMessageObservationMode = "authoritative" | "partial";
67+68+type TelegramCachedMessageObservation = {
69+node: TelegramCachedMessageNode;
70+mode: TelegramMessageObservationMode;
71+};
72+6673const DEFAULT_MAX_MESSAGES = 5000;
6774const COMPACT_THRESHOLD_RATIO = 2;
6875const persistedMessageCacheBuckets = new Map<string, TelegramMessageCacheBucket>();
@@ -164,14 +171,18 @@ function resolveMessageThreadId(msg: Message): number | undefined {
164171function normalizeMessageNodes(
165172msg: Message,
166173params: { threadId?: number },
167-): TelegramCachedMessageNode[] {
168-const nodes: TelegramCachedMessageNode[] = [];
174+): TelegramCachedMessageObservation[] {
175+const observations: TelegramCachedMessageObservation[] = [];
169176const visited = new Set<string>();
170177const nodeThreadId = (node: TelegramCachedMessageNode) => {
171178const threadId = Number(node.threadId);
172179return Number.isFinite(threadId) ? threadId : undefined;
173180};
174-const visit = (message: Message, inheritedThreadId?: number) => {
181+const visit = (
182+message: Message,
183+inheritedThreadId: number | undefined,
184+mode: TelegramMessageObservationMode,
185+) => {
175186const node = normalizeMessageNode(message, {
176187threadId: resolveMessageThreadId(message) ?? inheritedThreadId,
177188});
@@ -181,12 +192,12 @@ function normalizeMessageNodes(
181192visited.add(node.messageId);
182193const replyMessage = resolveEmbeddedReplyMessage(message);
183194if (replyMessage?.message_id != null) {
184-visit(replyMessage, nodeThreadId(node) ?? inheritedThreadId);
195+visit(replyMessage, nodeThreadId(node) ?? inheritedThreadId, "partial");
185196}
186-nodes.push(node);
197+observations.push({ node, mode });
187198};
188-visit(msg, params.threadId);
189-return nodes;
199+visit(msg, params.threadId, "authoritative");
200+return observations;
190201}
191202192203function isRecord(value: unknown): value is Record<string, unknown> {
@@ -215,6 +226,7 @@ function isTelegramSourceMessage(value: unknown): value is Message {
215226function parsePersistedEntry(value: unknown): Array<{
216227key: string;
217228node: TelegramCachedMessageNode;
229+mode: TelegramMessageObservationMode;
218230}> {
219231if (!isRecord(value) || !isString(value.key)) {
220232return [];
@@ -229,10 +241,15 @@ function parsePersistedEntry(value: unknown): Array<{
229241}
230242const keyPrefix = value.key.slice(0, separatorIndex + 1);
231243const threadId = Number(readOptionalString(value.node, "threadId"));
244+const sourceMessageId = String(value.node.sourceMessage.message_id);
232245return normalizeMessageNodes(
233246value.node.sourceMessage,
234247Number.isFinite(threadId) ? { threadId } : {},
235-).map((node) => ({ key: `${keyPrefix}${node.messageId}`, node }));
248+).map(({ node, mode }) => ({
249+key: `${keyPrefix}${node.messageId}`,
250+ node,
251+mode: node.messageId === sourceMessageId ? "authoritative" : mode,
252+}));
236253}
237254238255function findJsonArrayEnd(text: string): number {
@@ -337,26 +354,41 @@ function mergeTelegramSourceMessage(existing: Message, incoming: Message): Messa
337354return merged;
338355}
339356357+function mergeAuthoritativeTelegramSourceMessage(existing: Message, incoming: Message): Message {
358+const existingReply = resolveEmbeddedReplyMessage(existing);
359+const incomingReply = resolveEmbeddedReplyMessage(incoming);
360+if (existingReply?.message_id != null && incomingReply?.message_id === existingReply.message_id) {
361+return {
362+ ...incoming,
363+reply_to_message: mergeTelegramSourceMessage(existingReply, incomingReply),
364+};
365+}
366+return incoming;
367+}
368+340369function mergeCachedMessageNode(
341370existing: TelegramCachedMessageNode,
342371incoming: TelegramCachedMessageNode,
372+mode: TelegramMessageObservationMode,
343373): TelegramCachedMessageNode {
344374const threadId = Number(incoming.threadId ?? existing.threadId);
345-return normalizeRequiredMessageNode(
346-mergeTelegramSourceMessage(existing.sourceMessage, incoming.sourceMessage),
347-{
348- ...(Number.isFinite(threadId) ? { threadId } : {}),
349-},
350-);
375+const sourceMessage =
376+mode === "authoritative"
377+ ? mergeAuthoritativeTelegramSourceMessage(existing.sourceMessage, incoming.sourceMessage)
378+ : mergeTelegramSourceMessage(existing.sourceMessage, incoming.sourceMessage);
379+return normalizeRequiredMessageNode(sourceMessage, {
380+ ...(Number.isFinite(threadId) ? { threadId } : {}),
381+});
351382}
352383353384function upsertCachedMessageNode(params: {
354385messages: Map<string, TelegramCachedMessageNode>;
355386key: string;
356387node: TelegramCachedMessageNode;
388+mode: TelegramMessageObservationMode;
357389}): TelegramCachedMessageNode {
358390const existing = params.messages.get(params.key);
359-const node = existing ? mergeCachedMessageNode(existing, params.node) : params.node;
391+const node = existing ? mergeCachedMessageNode(existing, params.node, params.mode) : params.node;
360392params.messages.delete(params.key);
361393params.messages.set(params.key, node);
362394return node;
@@ -375,7 +407,12 @@ function readPersistedMessages(filePath: string, maxMessages: number): Persisted
375407for (const value of persisted.values) {
376408for (const entry of parsePersistedEntry(value)) {
377409persistedEntryCount++;
378-upsertCachedMessageNode({ messages, key: entry.key, node: entry.node });
410+upsertCachedMessageNode({
411+ messages,
412+key: entry.key,
413+node: entry.node,
414+mode: entry.mode,
415+});
379416trimMessages(messages, maxMessages);
380417}
381418}
@@ -515,16 +552,16 @@ export function createTelegramMessageCache(params?: {
515552516553return {
517554record: ({ accountId, chatId, msg, threadId }) => {
518-const entries = normalizeMessageNodes(msg, { threadId });
519-const entry = entries.at(-1);
520-if (!entry) {
555+const observations = normalizeMessageNodes(msg, { threadId });
556+const currentObservation = observations.at(-1);
557+if (!currentObservation) {
521558return null;
522559}
523560let recordedEntry: TelegramCachedMessageNode | null = null;
524-for (const node of entries) {
561+for (const { node, mode } of observations) {
525562const key = telegramMessageCacheKey({ accountId, chatId, messageId: node.messageId });
526-const cachedNode = upsertCachedMessageNode({ messages, key, node });
527-if (node.messageId === entry.messageId) {
563+const cachedNode = upsertCachedMessageNode({ messages, key, node, mode });
564+if (node.messageId === currentObservation.node.messageId) {
528565recordedEntry = cachedNode;
529566}
530567trimMessages(messages, maxMessages);
@@ -544,7 +581,7 @@ export function createTelegramMessageCache(params?: {
544581logVerbose(`telegram: failed to persist message cache: ${String(error)}`);
545582}
546583}
547-return recordedEntry ?? entry;
584+return recordedEntry ?? currentObservation.node;
548585},
549586 get,
550587recentBefore: ({ accountId, chatId, messageId, threadId, limit }) => {
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。