



























@@ -1,7 +1,12 @@
11import {
22getAcpRuntimeBackend,
3+registerAcpRuntimeBackend,
34unregisterAcpRuntimeBackend,
45type AcpRuntime,
6+type AcpRuntimeEvent,
7+type AcpRuntimeTurn,
8+type AcpRuntimeTurnInput,
9+type AcpRuntimeTurnResult,
510} from "openclaw/plugin-sdk/acp-runtime-backend";
611import type { OpenClawPluginService, OpenClawPluginServiceContext } from "openclaw/plugin-sdk/core";
712@@ -22,6 +27,89 @@ type DeferredServiceState = {
22272328let serviceModulePromise: Promise<RealAcpxServiceModule> | null = null;
242930+function createDeferredResult<T>() {
31+let resolve!: (value: T) => void;
32+let reject!: (error: unknown) => void;
33+const promise = new Promise<T>((resolvePromise, rejectPromise) => {
34+resolve = resolvePromise;
35+reject = rejectPromise;
36+});
37+return { promise, resolve, reject };
38+}
39+40+class LegacyRunTurnEventQueue {
41+private readonly items: AcpRuntimeEvent[] = [];
42+private readonly waits: Array<{
43+resolve: (value: AcpRuntimeEvent | null) => void;
44+reject: (error: unknown) => void;
45+}> = [];
46+private closed = false;
47+private error: unknown;
48+49+push(item: AcpRuntimeEvent): void {
50+if (this.closed) {
51+return;
52+}
53+const waiter = this.waits.shift();
54+if (waiter) {
55+waiter.resolve(item);
56+return;
57+}
58+this.items.push(item);
59+}
60+61+clear(): void {
62+this.items.length = 0;
63+}
64+65+close(): void {
66+if (this.closed) {
67+return;
68+}
69+this.closed = true;
70+for (const waiter of this.waits.splice(0)) {
71+waiter.resolve(null);
72+}
73+}
74+75+fail(error: unknown): void {
76+if (this.closed) {
77+return;
78+}
79+this.error = error;
80+this.closed = true;
81+for (const waiter of this.waits.splice(0)) {
82+waiter.reject(error);
83+}
84+}
85+86+private async next(): Promise<AcpRuntimeEvent | null> {
87+const item = this.items.shift();
88+if (item) {
89+return item;
90+}
91+if (this.error) {
92+throw this.error;
93+}
94+if (this.closed) {
95+return null;
96+}
97+return await new Promise<AcpRuntimeEvent | null>((resolve, reject) => {
98+this.waits.push({ resolve, reject });
99+});
100+}
101+102+async *iterate(): AsyncIterable<AcpRuntimeEvent> {
103+for (;;) {
104+const item = await this.next();
105+if (!item) {
106+return;
107+}
108+yield item;
109+}
110+}
111+}
112+25113function loadServiceModule(): Promise<RealAcpxServiceModule> {
26114serviceModulePromise ??= import("./src/service.js");
27115return serviceModulePromise;
@@ -46,7 +134,143 @@ async function startRealService(state: DeferredServiceState): Promise<AcpRuntime
46134state.realRuntime = backend.runtime;
47135return state.realRuntime;
48136})();
49-return await state.startPromise;
137+try {
138+return await state.startPromise;
139+} catch (error) {
140+state.startPromise = null;
141+state.realService = null;
142+throw error;
143+}
144+}
145+146+function lazyStartTurn(
147+resolveRuntime: () => Promise<AcpRuntime>,
148+input: AcpRuntimeTurnInput,
149+): AcpRuntimeTurn {
150+const turnPromise: Promise<AcpRuntimeTurn> = resolveRuntime().then((runtime) => {
151+if (runtime.startTurn) {
152+return runtime.startTurn(input);
153+}
154+return legacyRunTurnAsStartTurn(runtime, input);
155+});
156+return {
157+requestId: input.requestId,
158+events: {
159+async *[Symbol.asyncIterator]() {
160+yield* (await turnPromise).events;
161+},
162+},
163+result: turnPromise.then((turn) => turn.result),
164+cancel(inputArgs) {
165+return turnPromise.then((turn) => turn.cancel(inputArgs));
166+},
167+closeStream(inputArgs) {
168+return turnPromise.then((turn) => turn.closeStream(inputArgs));
169+},
170+};
171+}
172+173+function legacyRunTurnAsStartTurn(runtime: AcpRuntime, input: AcpRuntimeTurnInput): AcpRuntimeTurn {
174+const result = createDeferredResult<AcpRuntimeTurnResult>();
175+result.promise.catch(() => {});
176+const queue = new LegacyRunTurnEventQueue();
177+let resultSettled = false;
178+const settleResult = (next: AcpRuntimeTurnResult) => {
179+if (resultSettled) {
180+return;
181+}
182+resultSettled = true;
183+result.resolve(next);
184+};
185+void (async () => {
186+try {
187+for await (const event of runtime.runTurn(input)) {
188+if (event.type === "done") {
189+settleResult({
190+status: "completed",
191+ ...(event.stopReason ? { stopReason: event.stopReason } : {}),
192+});
193+continue;
194+}
195+if (event.type === "error") {
196+settleResult({
197+status: "failed",
198+error: {
199+message: event.message,
200+ ...(event.code ? { code: event.code } : {}),
201+ ...(event.detailCode ? { detailCode: event.detailCode } : {}),
202+ ...(event.retryable === undefined ? {} : { retryable: event.retryable }),
203+},
204+});
205+continue;
206+}
207+queue.push(event);
208+}
209+settleResult({
210+status: "failed",
211+error: {
212+code: "ACP_TURN_FAILED",
213+message: "ACP turn ended without a terminal done event.",
214+},
215+});
216+} catch (error) {
217+result.reject(error);
218+queue.fail(error);
219+return;
220+}
221+queue.close();
222+})();
223+return {
224+requestId: input.requestId,
225+events: queue.iterate(),
226+result: result.promise,
227+async cancel(inputArgs) {
228+await runtime.cancel({ handle: input.handle, reason: inputArgs?.reason });
229+},
230+async closeStream() {
231+queue.clear();
232+queue.close();
233+},
234+};
235+}
236+237+function createDeferredRuntime(state: DeferredServiceState): AcpRuntime {
238+const resolveRuntime = () => startRealService(state);
239+return {
240+async ensureSession(input) {
241+return await (await resolveRuntime()).ensureSession(input);
242+},
243+startTurn(input) {
244+return lazyStartTurn(resolveRuntime, input);
245+},
246+async *runTurn(input) {
247+yield* (await resolveRuntime()).runTurn(input);
248+},
249+async getCapabilities(input) {
250+return (await (await resolveRuntime()).getCapabilities?.(input)) ?? { controls: [] };
251+},
252+async getStatus(input) {
253+return (await (await resolveRuntime()).getStatus?.(input)) ?? {};
254+},
255+async setMode(input) {
256+await (await resolveRuntime()).setMode?.(input);
257+},
258+async setConfigOption(input) {
259+await (await resolveRuntime()).setConfigOption?.(input);
260+},
261+async doctor() {
262+return (await (await resolveRuntime()).doctor?.()) ?? { ok: true, message: "ok" };
263+},
264+async prepareFreshSession(input) {
265+await (await resolveRuntime()).prepareFreshSession?.(input);
266+},
267+async cancel(input) {
268+await (await resolveRuntime()).cancel(input);
269+},
270+async close(input) {
271+await (await resolveRuntime()).close(input);
272+},
273+};
50274}
5127552276export function createAcpxRuntimeService(
@@ -69,7 +293,11 @@ export function createAcpxRuntimeService(
69293}
7029471295state.ctx = ctx;
72-await startRealService(state);
296+registerAcpRuntimeBackend({
297+id: ACPX_BACKEND_ID,
298+runtime: createDeferredRuntime(state),
299+});
300+ctx.logger.info("embedded acpx runtime backend registered lazily");
73301},
74302async stop(ctx) {
75303if (state.realService) {
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。