


























@@ -1808,6 +1808,99 @@ describe("TelegramPollingSession", () => {
18081808});
18091809});
181018101811+it("recovers a lone active spooled handler owned by a replaced session (#84158)", async () => {
1812+vi.useFakeTimers({ shouldAdvanceTime: true });
1813+const firstAbort = new AbortController();
1814+const secondAbort = new AbortController();
1815+const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-"));
1816+const log = vi.fn();
1817+let releaseTurn: (() => void) | undefined;
1818+const turnDone = new Promise<void>((resolve) => {
1819+releaseTurn = resolve;
1820+});
1821+const handleUpdate = vi.fn(async () => {
1822+await turnDone;
1823+});
1824+createTelegramBotMock.mockImplementation(() => ({
1825+api: {
1826+deleteWebhook: vi.fn(async () => true),
1827+config: { use: vi.fn() },
1828+},
1829+init: vi.fn(async () => undefined),
1830+ handleUpdate,
1831+stop: vi.fn(async () => undefined),
1832+}));
1833+await writeSpooledTestUpdates(tempDir, [topicUpdate(42, 10, "lone active topic turn")]);
1834+const createWorker = vi.fn(() => {
1835+let stopWorker: (() => void) | undefined;
1836+const workerDone = new Promise<void>((resolve) => {
1837+stopWorker = resolve;
1838+});
1839+return {
1840+onMessage: vi.fn(() => () => undefined),
1841+stop: vi.fn(async () => {
1842+stopWorker?.();
1843+}),
1844+task: vi.fn(async () => {
1845+await workerDone;
1846+}),
1847+};
1848+});
1849+1850+try {
1851+const firstSession = createPollingSession({
1852+abortSignal: firstAbort.signal,
1853+ log,
1854+isolatedIngress: {
1855+enabled: true,
1856+spoolDir: tempDir,
1857+ createWorker,
1858+drainIntervalMs: 100,
1859+spooledUpdateHandlerTimeoutMs: 100,
1860+spooledUpdateHandlerAbortGraceMs: 5_000,
1861+},
1862+});
1863+const firstRunPromise = firstSession.runUntilAbort();
1864+await vi.waitFor(() => expect(handleUpdate).toHaveBeenCalledTimes(1));
1865+firstAbort.abort();
1866+await vi.advanceTimersByTimeAsync(16_000);
1867+await firstRunPromise;
1868+1869+const secondSession = createPollingSession({
1870+abortSignal: secondAbort.signal,
1871+ log,
1872+isolatedIngress: {
1873+enabled: true,
1874+spoolDir: tempDir,
1875+ createWorker,
1876+drainIntervalMs: 100,
1877+spooledUpdateHandlerTimeoutMs: 100,
1878+spooledUpdateHandlerAbortGraceMs: 5_000,
1879+},
1880+});
1881+const secondRunPromise = secondSession.runUntilAbort();
1882+1883+await vi.advanceTimersByTimeAsync(1_000);
1884+await vi.waitFor(async () => expect(await failedUpdateIds(tempDir)).toEqual([42]));
1885+expect(log.mock.calls).toEqual(
1886+expect.arrayContaining([
1887+[expect.stringContaining("timed out spooled update 42 did not stop")],
1888+]),
1889+);
1890+expect(handleUpdate).toHaveBeenCalledTimes(1);
1891+1892+secondAbort.abort();
1893+await vi.advanceTimersByTimeAsync(20_000);
1894+await secondRunPromise;
1895+} finally {
1896+releaseTurn?.();
1897+firstAbort.abort();
1898+secondAbort.abort();
1899+vi.useRealTimers();
1900+await fs.rm(tempDir, { recursive: true, force: true });
1901+}
1902+});
1903+18111904it("lets isolated ingress drain interleave different Telegram topic lanes", async () => {
18121905const abort = new AbortController();
18131906const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-"));
@@ -3224,8 +3317,13 @@ describe("TelegramPollingSession", () => {
32243317String(patch.lastError).includes("isolated polling spool handler timed out"),
32253318),
32263319).toBe(true);
3227-await vi.waitFor(async () => expect(await failedUpdateIds(tempDir)).toEqual([42]));
3228-expect(createWorker).toHaveBeenCalledTimes(2);
3320+// 42 (the backlog handler) recovers first; after the restart 43 becomes a
3321+// lone active handler on the same lane, hangs the same way, and is now also
3322+// recovered on timeout rather than stranded with no backlog behind it (#84158).
3323+// Each recovery restarts ingress, so the worker is created once more per
3324+// recovered handler (initial + two restarts).
3325+await vi.waitFor(async () => expect(await failedUpdateIds(tempDir)).toEqual([42, 43]));
3326+expect(createWorker).toHaveBeenCalledTimes(3);
3229332732303328abort.abort();
32313329await vi.advanceTimersByTimeAsync(20_000);
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。