























@@ -5,6 +5,7 @@ import {
55type AcpRuntime,
66type AcpRuntimeCapabilities,
77type AcpRuntimeDoctorReport,
8+type AcpRuntimeEvent,
89type AcpRuntimeStatus,
910} from "openclaw/plugin-sdk/acp-runtime-backend";
1011import type { OpenClawPluginService, OpenClawPluginServiceContext } from "openclaw/plugin-sdk/core";
@@ -23,6 +24,9 @@ type AcpxRuntimeLike = AcpRuntime & {
2324doctor?(): Promise<AcpRuntimeDoctorReport>;
2425isHealthy(): boolean;
2526};
27+type AcpRuntimeTurnInput = Parameters<AcpRuntime["runTurn"]>[0];
28+type AcpRuntimeTurn = ReturnType<NonNullable<AcpRuntime["startTurn"]>>;
29+type AcpRuntimeTurnResult = Awaited<AcpRuntimeTurn["result"]>;
26302731type DeferredServiceState = {
2832ctx: OpenClawPluginServiceContext | null;
@@ -43,6 +47,157 @@ function shouldRunStartupProbe(env: NodeJS.ProcessEnv = process.env): boolean {
4347return env[ENABLE_STARTUP_PROBE_ENV] !== "0" && env[SKIP_RUNTIME_PROBE_ENV] !== "1";
4448}
454950+function createDeferredResult<T>() {
51+let resolve!: (value: T) => void;
52+let reject!: (error: unknown) => void;
53+const promise = new Promise<T>((resolvePromise, rejectPromise) => {
54+resolve = resolvePromise;
55+reject = rejectPromise;
56+});
57+return { promise, resolve, reject };
58+}
59+60+class LegacyRunTurnEventQueue {
61+private readonly items: AcpRuntimeEvent[] = [];
62+private readonly waits: Array<{
63+resolve: (value: AcpRuntimeEvent | null) => void;
64+reject: (error: unknown) => void;
65+}> = [];
66+private closed = false;
67+private error: unknown;
68+69+push(item: AcpRuntimeEvent): void {
70+if (this.closed) {
71+return;
72+}
73+const waiter = this.waits.shift();
74+if (waiter) {
75+waiter.resolve(item);
76+return;
77+}
78+this.items.push(item);
79+}
80+81+clear(): void {
82+this.items.length = 0;
83+}
84+85+close(): void {
86+if (this.closed) {
87+return;
88+}
89+this.closed = true;
90+for (const waiter of this.waits.splice(0)) {
91+waiter.resolve(null);
92+}
93+}
94+95+fail(error: unknown): void {
96+if (this.closed) {
97+return;
98+}
99+this.error = error;
100+this.closed = true;
101+for (const waiter of this.waits.splice(0)) {
102+waiter.reject(error);
103+}
104+}
105+106+private async next(): Promise<AcpRuntimeEvent | null> {
107+const item = this.items.shift();
108+if (item) {
109+return item;
110+}
111+if (this.error) {
112+throw this.error;
113+}
114+if (this.closed) {
115+return null;
116+}
117+return await new Promise<AcpRuntimeEvent | null>((resolve, reject) => {
118+this.waits.push({ resolve, reject });
119+});
120+}
121+122+async *iterate(): AsyncIterable<AcpRuntimeEvent> {
123+for (;;) {
124+const item = await this.next();
125+if (!item) {
126+return;
127+}
128+yield item;
129+}
130+}
131+}
132+133+function legacyRunTurnAsStartTurn(runtime: AcpRuntime, input: AcpRuntimeTurnInput): AcpRuntimeTurn {
134+const result = createDeferredResult<AcpRuntimeTurnResult>();
135+result.promise.catch(() => {});
136+const queue = new LegacyRunTurnEventQueue();
137+let resultSettled = false;
138+const settleResult = (next: AcpRuntimeTurnResult) => {
139+if (resultSettled) {
140+return;
141+}
142+resultSettled = true;
143+result.resolve(next);
144+};
145+void (async () => {
146+try {
147+for await (const event of runtime.runTurn(input)) {
148+if (event.type === "done") {
149+settleResult({
150+status: "completed",
151+ ...(event.stopReason ? { stopReason: event.stopReason } : {}),
152+});
153+continue;
154+}
155+if (event.type === "error") {
156+settleResult({
157+status: "failed",
158+error: {
159+message: event.message,
160+ ...(event.code ? { code: event.code } : {}),
161+ ...(event.detailCode ? { detailCode: event.detailCode } : {}),
162+ ...(event.retryable === undefined ? {} : { retryable: event.retryable }),
163+},
164+});
165+continue;
166+}
167+queue.push(event);
168+}
169+settleResult({
170+status: "failed",
171+error: {
172+code: "ACP_TURN_FAILED",
173+message: "ACP turn ended without a terminal done event.",
174+},
175+});
176+} catch (error) {
177+result.reject(error);
178+queue.fail(error);
179+return;
180+}
181+queue.close();
182+})();
183+return {
184+requestId: input.requestId,
185+events: queue.iterate(),
186+result: result.promise,
187+async cancel(inputArgs) {
188+await runtime.cancel({ handle: input.handle, reason: inputArgs?.reason });
189+},
190+async closeStream() {
191+queue.clear();
192+queue.close();
193+},
194+};
195+}
196+197+function startRuntimeTurn(runtime: AcpRuntime, input: AcpRuntimeTurnInput): AcpRuntimeTurn {
198+return runtime.startTurn?.(input) ?? legacyRunTurnAsStartTurn(runtime, input);
199+}
200+46201async function startRealService(state: DeferredServiceState): Promise<AcpxRuntimeLike> {
47202if (state.realRuntime) {
48203return state.realRuntime;
@@ -70,6 +225,26 @@ function createDeferredRuntime(state: DeferredServiceState): AcpxRuntimeLike {
70225async ensureSession(input) {
71226return await (await startRealService(state)).ensureSession(input);
72227},
228+startTurn(input) {
229+const turnPromise = startRealService(state).then((runtime) =>
230+startRuntimeTurn(runtime, input),
231+);
232+return {
233+requestId: input.requestId,
234+events: {
235+async *[Symbol.asyncIterator]() {
236+yield* (await turnPromise).events;
237+},
238+},
239+result: turnPromise.then((turn) => turn.result),
240+cancel(inputArgs) {
241+return turnPromise.then((turn) => turn.cancel(inputArgs));
242+},
243+closeStream(inputArgs) {
244+return turnPromise.then((turn) => turn.closeStream(inputArgs));
245+},
246+};
247+},
73248async *runTurn(input) {
74249yield* (await startRealService(state)).runTurn(input);
75250},
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。