





















@@ -4,10 +4,16 @@ import {
44onInternalDiagnosticEvent,
55onTrustedInternalDiagnosticEvent,
66resetDiagnosticEventsForTest,
7+setDiagnosticsEnabledForProcess,
78type DiagnosticEventPrivateData,
89type DiagnosticEventPayload,
10+waitForDiagnosticEventsDrained,
911} from "../../../infra/diagnostic-events.js";
1012import { createDiagnosticTraceContext } from "../../../infra/diagnostic-trace-context.js";
13+import {
14+getDiagnosticSessionActivitySnapshot,
15+resetDiagnosticRunActivityForTest,
16+} from "../../../logging/diagnostic-run-activity.js";
1117import {
1218initializeGlobalHookRunner,
1319resetGlobalHookRunner,
@@ -100,11 +106,16 @@ function requireMockRecordArg(
100106describe("wrapStreamFnWithDiagnosticModelCallEvents", () => {
101107beforeEach(() => {
102108resetDiagnosticEventsForTest();
109+resetDiagnosticRunActivityForTest();
103110resetGlobalHookRunner();
104111});
105112106113afterEach(() => {
114+resetDiagnosticEventsForTest();
107115resetGlobalHookRunner();
116+resetDiagnosticRunActivityForTest();
117+vi.restoreAllMocks();
118+vi.useRealTimers();
108119});
109120110121it("emits started and completed events for async streams", async () => {
@@ -182,6 +193,117 @@ describe("wrapStreamFnWithDiagnosticModelCallEvents", () => {
182193expect(JSON.stringify(events)).not.toContain("sk-test-secret-value");
183194});
184195196+it("updates diagnostic run activity from throttled stream chunks", async () => {
197+let now = 1_000_000;
198+vi.spyOn(Date, "now").mockImplementation(() => now);
199+async function* stream() {
200+yield { type: "text_delta", delta: "first" };
201+yield { type: "text_delta", delta: "second" };
202+yield { type: "text_delta", delta: "third" };
203+}
204+const runProgressEvents: DiagnosticEventPayload[] = [];
205+const stop = onInternalDiagnosticEvent((event) => {
206+if (event.type === "run.progress") {
207+runProgressEvents.push(event);
208+}
209+});
210+const wrapped = wrapStreamFnWithDiagnosticModelCallEvents(
211+(() => stream()) as unknown as StreamFn,
212+{
213+runId: "run-1",
214+sessionKey: "session-key",
215+sessionId: "session-id",
216+provider: "vllm",
217+model: "qwen/qwen3.5-9b",
218+trace: createDiagnosticTraceContext(),
219+nextCallId: () => "call-stream",
220+},
221+);
222+223+const returned = wrapped({} as never, {} as never, {} as never) as AsyncIterable<unknown>;
224+const iterator = returned[Symbol.asyncIterator]();
225+226+try {
227+await iterator.next();
228+await waitForDiagnosticEventsDrained();
229+let snapshot = getDiagnosticSessionActivitySnapshot({
230+sessionKey: "session-key",
231+sessionId: "session-id",
232+});
233+expect(snapshot.activeWorkKind).toBe("model_call");
234+expect(snapshot.lastProgressReason).toBe("model_call:stream_progress");
235+expect(snapshot.lastProgressAgeMs).toBe(0);
236+expect(runProgressEvents).toHaveLength(1);
237+238+now += 10_000;
239+await iterator.next();
240+await waitForDiagnosticEventsDrained();
241+snapshot = getDiagnosticSessionActivitySnapshot({
242+sessionKey: "session-key",
243+sessionId: "session-id",
244+});
245+expect(snapshot.lastProgressReason).toBe("model_call:stream_progress");
246+expect(snapshot.lastProgressAgeMs).toBe(0);
247+expect(runProgressEvents).toHaveLength(1);
248+249+now += 30_000;
250+await iterator.next();
251+await waitForDiagnosticEventsDrained();
252+snapshot = getDiagnosticSessionActivitySnapshot({
253+sessionKey: "session-key",
254+sessionId: "session-id",
255+});
256+expect(snapshot.lastProgressReason).toBe("model_call:stream_progress");
257+expect(snapshot.lastProgressAgeMs).toBe(0);
258+expect(runProgressEvents).toHaveLength(2);
259+} finally {
260+await iterator.return?.();
261+await waitForDiagnosticEventsDrained();
262+stop();
263+}
264+});
265+266+it("does not retain stream progress activity when diagnostics are disabled", async () => {
267+setDiagnosticsEnabledForProcess(false);
268+const runProgressEvents: DiagnosticEventPayload[] = [];
269+const stop = onInternalDiagnosticEvent((event) => {
270+if (event.type === "run.progress") {
271+runProgressEvents.push(event);
272+}
273+});
274+async function* stream() {
275+yield { type: "text_delta", delta: "first" };
276+yield { type: "text_delta", delta: "second" };
277+}
278+const wrapped = wrapStreamFnWithDiagnosticModelCallEvents(
279+(() => stream()) as unknown as StreamFn,
280+{
281+runId: "run-1",
282+sessionKey: "session-key",
283+sessionId: "session-id",
284+provider: "vllm",
285+model: "qwen/qwen3.5-9b",
286+trace: createDiagnosticTraceContext(),
287+nextCallId: () => "call-disabled-diagnostics",
288+},
289+);
290+291+try {
292+await drain(wrapped({} as never, {} as never, {} as never) as AsyncIterable<unknown>);
293+await waitForDiagnosticEventsDrained();
294+} finally {
295+stop();
296+}
297+298+expect(
299+getDiagnosticSessionActivitySnapshot({
300+sessionKey: "session-key",
301+sessionId: "session-id",
302+}),
303+).toEqual({});
304+expect(runProgressEvents).toEqual([]);
305+});
306+185307it("counts async onPayload replacements instead of raw payload content", async () => {
186308async function* stream() {
187309yield { type: "text_delta", delta: "safe" };
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。