





















@@ -22,6 +22,7 @@ import { resolveGatewaySessionStoreTarget } from "./session-utils.js";
2222const log = createSubsystemLogger("gateway/session-compaction-checkpoints");
2323const MAX_COMPACTION_CHECKPOINTS_PER_SESSION = 25;
2424export const MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES = 64 * 1024 * 1024;
25+export const MAX_COMPACTION_CHECKPOINT_RETAINED_BYTES_PER_SESSION = 128 * 1024 * 1024;
25262627export type CapturedCompactionCheckpointSnapshot = {
2728sessionId: string;
@@ -34,17 +35,58 @@ type ForkedCompactionCheckpointTranscript = {
3435sessionFile: string;
3536};
363737-function trimSessionCheckpoints(checkpoints: SessionCompactionCheckpoint[] | undefined): {
38+function checkpointSnapshotPath(checkpoint: SessionCompactionCheckpoint): string | undefined {
39+return checkpoint.preCompaction.sessionFile?.trim() || undefined;
40+}
41+42+function checkpointSnapshotBytes(
43+checkpoint: SessionCompactionCheckpoint,
44+snapshotBytesByPath: ReadonlyMap<string, number>,
45+): number {
46+const sessionFile = checkpointSnapshotPath(checkpoint);
47+if (!sessionFile) {
48+return 0;
49+}
50+const bytes = snapshotBytesByPath.get(sessionFile);
51+return typeof bytes === "number" && Number.isFinite(bytes) && bytes > 0 ? bytes : 0;
52+}
53+54+function trimSessionCheckpoints(
55+checkpoints: SessionCompactionCheckpoint[] | undefined,
56+snapshotBytesByPath: ReadonlyMap<string, number> = new Map(),
57+): {
3858kept: SessionCompactionCheckpoint[] | undefined;
3959removed: SessionCompactionCheckpoint[];
4060} {
4161if (!Array.isArray(checkpoints) || checkpoints.length === 0) {
4262return { kept: undefined, removed: [] };
4363}
44-const kept = checkpoints.slice(-MAX_COMPACTION_CHECKPOINTS_PER_SESSION);
64+const countTrimmed = checkpoints.slice(-MAX_COMPACTION_CHECKPOINTS_PER_SESSION);
65+const countRemoved = checkpoints.slice(0, Math.max(0, checkpoints.length - countTrimmed.length));
66+const keptNewestFirst: SessionCompactionCheckpoint[] = [];
67+const byteRemovedNewestFirst: SessionCompactionCheckpoint[] = [];
68+let retainedBytes = 0;
69+for (let index = countTrimmed.length - 1; index >= 0; index -= 1) {
70+const checkpoint = countTrimmed[index];
71+if (!checkpoint) {
72+continue;
73+}
74+const checkpointBytes = checkpointSnapshotBytes(checkpoint, snapshotBytesByPath);
75+const keepNewestCheckpoint = keptNewestFirst.length === 0;
76+if (
77+keepNewestCheckpoint ||
78+retainedBytes + checkpointBytes <= MAX_COMPACTION_CHECKPOINT_RETAINED_BYTES_PER_SESSION
79+) {
80+keptNewestFirst.push(checkpoint);
81+retainedBytes += checkpointBytes;
82+} else {
83+byteRemovedNewestFirst.push(checkpoint);
84+}
85+}
86+const kept = keptNewestFirst.toReversed();
4587return {
46- kept,
47-removed: checkpoints.slice(0, Math.max(0, checkpoints.length - kept.length)),
88+kept: kept.length > 0 ? kept : undefined,
89+removed: [...countRemoved, ...byteRemovedNewestFirst.toReversed()],
4890};
4991}
5092@@ -54,6 +96,27 @@ function sessionStoreCheckpoints(
5496return Array.isArray(entry?.compactionCheckpoints) ? [...entry.compactionCheckpoints] : [];
5597}
569899+async function statCheckpointSnapshotBytes(
100+checkpoints: readonly SessionCompactionCheckpoint[],
101+): Promise<Map<string, number>> {
102+const bytesByPath = new Map<string, number>();
103+await Promise.all(
104+checkpoints.map(async (checkpoint) => {
105+const sessionFile = checkpointSnapshotPath(checkpoint);
106+if (!sessionFile || bytesByPath.has(sessionFile)) {
107+return;
108+}
109+try {
110+const stat = await fs.stat(sessionFile);
111+bytesByPath.set(sessionFile, stat.isFile() ? stat.size : 0);
112+} catch {
113+bytesByPath.set(sessionFile, 0);
114+}
115+}),
116+);
117+return bytesByPath;
118+}
119+57120export function resolveSessionCompactionCheckpointReason(params: {
58121trigger?: "budget" | "overflow" | "manual";
59122timedOut?: boolean;
@@ -443,14 +506,15 @@ export async function persistSessionCompactionCheckpoint(params: {
443506removed: SessionCompactionCheckpoint[];
444507}
445508| undefined;
446-await updateSessionStore(target.storePath, (store) => {
509+await updateSessionStore(target.storePath, async (store) => {
447510const existing = store[target.canonicalKey];
448511if (!existing?.sessionId) {
449512return;
450513}
451514const checkpoints = sessionStoreCheckpoints(existing);
452515checkpoints.push(checkpoint);
453-trimmedCheckpoints = trimSessionCheckpoints(checkpoints);
516+const snapshotBytesByPath = await statCheckpointSnapshotBytes(checkpoints);
517+trimmedCheckpoints = trimSessionCheckpoints(checkpoints, snapshotBytesByPath);
454518store[target.canonicalKey] = {
455519 ...existing,
456520updatedAt: Math.max(existing.updatedAt ?? 0, createdAt),
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。