




















@@ -85,6 +85,7 @@ type ModelCallObservationState = {
8585outputMessages?: unknown[];
8686contentCapture?: DiagnosticModelContentCapturePolicy;
8787lastStreamProgressAt?: number;
88+terminalEventEmitted?: boolean;
8889};
89909091const MODEL_CALL_STREAM_PROGRESS_INTERVAL_MS = 30_000;
@@ -184,6 +185,23 @@ function observeOutputMessageContent(state: ModelCallObservationState, chunk: un
184185}
185186}
186187188+function observeResultMessageContent(
189+state: ModelCallObservationState,
190+startedAt: number,
191+result: unknown,
192+): void {
193+state.timeToFirstByteMs ??= Math.max(0, Date.now() - startedAt);
194+if (state.contentCapture?.outputMessages && state.outputMessages === undefined) {
195+state.outputMessages = [cloneDiagnosticContentValue(result)];
196+}
197+if (state.responseStreamBytes === 0) {
198+const bytes = utf8JsonByteLength(result);
199+if (bytes !== undefined) {
200+state.responseStreamBytes = bytes;
201+}
202+}
203+}
204+187205function observeResponseChunk(
188206state: ModelCallObservationState,
189207startedAt: number,
@@ -419,6 +437,10 @@ function emitModelCallCompleted(
419437startedAt: number,
420438state: ModelCallObservationState,
421439): void {
440+if (state.terminalEventEmitted) {
441+return;
442+}
443+state.terminalEventEmitted = true;
422444const durationMs = Date.now() - startedAt;
423445const sizeTimingFields = modelCallSizeTimingFields(state);
424446emitTrustedDiagnosticEventWithPrivateData(
@@ -443,6 +465,10 @@ function emitModelCallError(
443465state: ModelCallObservationState,
444466fields: ModelCallErrorFields,
445467): void {
468+if (state.terminalEventEmitted) {
469+return;
470+}
471+state.terminalEventEmitted = true;
446472const durationMs = Date.now() - startedAt;
447473const sizeTimingFields = modelCallSizeTimingFields(state);
448474emitTrustedDiagnosticEventWithPrivateData(
@@ -548,33 +574,80 @@ async function* observeModelCallIterator<T>(
548574startedAt: number,
549575state: ModelCallObservationState,
550576): AsyncIterable<T> {
551-let terminalEmitted = false;
577+// Tracks whether the underlying iterator terminated on its own (done or threw).
578+// This is independent of state.terminalEventEmitted: result() can emit the
579+// terminal event first, but the abandoned iterator still needs return() cleanup.
580+let iteratorSettled = false;
552581try {
553582for (;;) {
554583const next = await iterator.next();
555584if (next.done) {
585+iteratorSettled = true;
556586break;
557587}
558588observeResponseChunk(state, startedAt, next.value);
559589maybeEmitModelCallStreamProgress(eventBase, state);
560590yield next.value;
561591}
562-terminalEmitted = true;
563592emitModelCallCompleted(eventBase, startedAt, state);
564593} catch (err) {
565-terminalEmitted = true;
594+iteratorSettled = true;
566595emitModelCallError(eventBase, startedAt, state, modelCallErrorFields(err));
567596throw err;
568597} finally {
569-if (!terminalEmitted) {
570-// A consumer can stop reading before the provider emits done/error. Close
571-// the iterator best-effort and record the call as completed with observed bytes.
598+if (!iteratorSettled) {
599+// A consumer can stop reading before the provider emits done/error — e.g.
600+// the agent loop returns on the terminal event after awaiting result().
601+// Close the underlying iterator for provider cleanup (idle-timeout abort
602+// listeners, SSE readers) even when result() already emitted the terminal
603+// event; emitModelCallCompleted self-dedupes via state.terminalEventEmitted.
572604await safeReturnIterator(iterator);
573605emitModelCallCompleted(eventBase, startedAt, state);
574606}
575607}
576608}
577609610+function observeModelCallFinalResult<T>(
611+result: T,
612+eventBase: ModelCallEventBase,
613+startedAt: number,
614+state: ModelCallObservationState,
615+): T {
616+observeResultMessageContent(state, startedAt, result);
617+emitModelCallCompleted(eventBase, startedAt, state);
618+return result;
619+}
620+621+function createObservedResultFunction(
622+stream: unknown,
623+eventBase: ModelCallEventBase,
624+startedAt: number,
625+state: ModelCallObservationState,
626+): ((...args: unknown[]) => unknown) | undefined {
627+if (!isRecord(stream) || typeof stream.result !== "function") {
628+return undefined;
629+}
630+const resultFn = stream.result;
631+return (...args: unknown[]) => {
632+try {
633+const result = resultFn.apply(stream, args);
634+if (isPromiseLike(result)) {
635+return result.then(
636+(resolved) => observeModelCallFinalResult(resolved, eventBase, startedAt, state),
637+(err: unknown) => {
638+emitModelCallError(eventBase, startedAt, state, modelCallErrorFields(err));
639+throw err;
640+},
641+);
642+}
643+return observeModelCallFinalResult(result, eventBase, startedAt, state);
644+} catch (err) {
645+emitModelCallError(eventBase, startedAt, state, modelCallErrorFields(err));
646+throw err;
647+}
648+};
649+}
650+578651function observeModelCallStream<T extends AsyncIterable<unknown>>(
579652stream: T,
580653createIterator: () => AsyncIterator<unknown>,
@@ -584,6 +657,7 @@ function observeModelCallStream<T extends AsyncIterable<unknown>>(
584657): T {
585658const observedIterator = () =>
586659observeModelCallIterator(createIterator(), eventBase, startedAt, state)[Symbol.asyncIterator]();
660+const observedResult = createObservedResultFunction(stream, eventBase, startedAt, state);
587661let hasNonConfigurableIterator;
588662try {
589663hasNonConfigurableIterator =
@@ -594,13 +668,17 @@ function observeModelCallStream<T extends AsyncIterable<unknown>>(
594668if (hasNonConfigurableIterator) {
595669return {
596670[Symbol.asyncIterator]: observedIterator,
671+ ...(observedResult ? { result: observedResult } : {}),
597672} as T;
598673}
599674return new Proxy(stream, {
600675get(target, property, receiver) {
601676if (property === Symbol.asyncIterator) {
602677return observedIterator;
603678}
679+if (property === "result" && observedResult) {
680+return observedResult;
681+}
604682const value = Reflect.get(target, property, receiver);
605683return typeof value === "function" ? value.bind(target) : value;
606684},
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。