

















@@ -8,6 +8,7 @@ import { createServer, type IncomingMessage, type ServerResponse } from "node:ht
88import { Socket } from "node:net";
99import { tmpdir } from "node:os";
1010import path from "node:path";
11+import { pathToFileURL } from "node:url";
1112import { gunzipSync } from "node:zlib";
12131314type CollectorMode = "local" | "docker";
@@ -92,9 +93,7 @@ const REQUIRED_SPAN_NAMES = [
9293"openclaw.context.assembled",
9394"openclaw.message.delivery",
9495] as const;
95-const REQUIRED_METRIC_NAMES = [
96-"openclaw.harness.duration_ms",
97-] as const;
96+const REQUIRED_METRIC_NAMES = ["openclaw.harness.duration_ms"] as const;
9897const DISALLOWED_ATTRIBUTE_KEYS = new Set([
9998"openclaw.runId",
10099"openclaw.chatId",
@@ -111,11 +110,37 @@ const DISALLOWED_ATTRIBUTE_KEYS = new Set([
111110"openclaw.call_id",
112111"openclaw.tool_call_id",
113112]);
114-const DISALLOWED_BODY_NEEDLES = [
115-"OTEL-QA-SECRET",
116-"OTEL-QA-OK",
117-];
113+const DISALLOWED_BODY_NEEDLES = ["OTEL-QA-SECRET", "OTEL-QA-OK"];
118114const COLLECTOR_OUTPUT_TAIL_BYTES = 16_000;
115+const MAX_OTLP_COMPRESSED_BODY_BYTES = readPositiveIntegerEnv(
116+"OPENCLAW_QA_OTEL_MAX_COMPRESSED_BODY_BYTES",
117+2 * 1024 * 1024,
118+);
119+const MAX_OTLP_DECODED_BODY_BYTES = readPositiveIntegerEnv(
120+"OPENCLAW_QA_OTEL_MAX_DECODED_BODY_BYTES",
121+8 * 1024 * 1024,
122+);
123+const MAX_CAPTURED_BODY_TEXT_BYTES = readPositiveIntegerEnv(
124+"OPENCLAW_QA_OTEL_MAX_CAPTURED_BODY_TEXT_BYTES",
125+512 * 1024,
126+);
127+128+function readPositiveIntegerEnv(name: string, fallback: number): number {
129+const parsed = Number.parseInt(process.env[name] ?? "", 10);
130+return Number.isInteger(parsed) && parsed > 0 ? parsed : fallback;
131+}
132+133+function oversizedBodyError(
134+label: string,
135+actualBytes: number,
136+maxBytes: number,
137+): Error & {
138+ statusCode: number;
139+} {
140+return Object.assign(new Error(`${label} exceeded ${maxBytes} bytes: ${actualBytes} bytes`), {
141+statusCode: 413,
142+});
143+}
119144120145function usage(): string {
121146return `Usage: pnpm qa:otel:smoke [--collector local|docker] [--output-dir <path>] [--provider-mode <mode>] [--scenario <id>] [--model <ref>] [--alt-model <ref>]
@@ -184,10 +209,20 @@ function disallowedBodyNeedles(options: CliOptions): string[] {
184209return [...needles];
185210}
186211187-async function readRequestBody(req: IncomingMessage): Promise<Buffer> {
212+async function readRequestBody(
213+req: IncomingMessage,
214+maxBytes = MAX_OTLP_COMPRESSED_BODY_BYTES,
215+): Promise<Buffer> {
188216const chunks: Buffer[] = [];
217+let totalBytes = 0;
189218for await (const chunk of req) {
190-chunks.push(Buffer.from(chunk));
219+const buffer = Buffer.from(chunk);
220+totalBytes += buffer.length;
221+if (totalBytes > maxBytes) {
222+req.destroy();
223+throw oversizedBodyError("compressed OTLP request body", totalBytes, maxBytes);
224+}
225+chunks.push(buffer);
191226}
192227return Buffer.concat(chunks);
193228}
@@ -196,17 +231,68 @@ function headerValue(value: string | string[] | undefined): string | undefined {
196231return Array.isArray(value) ? value[0] : value;
197232}
198233199-function decodeRequestBody(body: Buffer, contentEncoding: string | undefined): Buffer {
234+function decodeRequestBody(
235+body: Buffer,
236+contentEncoding: string | undefined,
237+maxBytes = MAX_OTLP_DECODED_BODY_BYTES,
238+): Buffer {
200239const normalizedEncoding = contentEncoding?.trim().toLowerCase();
240+if (body.length > maxBytes && (!normalizedEncoding || normalizedEncoding === "identity")) {
241+throw oversizedBodyError("OTLP request body", body.length, maxBytes);
242+}
201243if (!normalizedEncoding || normalizedEncoding === "identity") {
202244return body;
203245}
204246if (normalizedEncoding === "gzip") {
205-return gunzipSync(body);
247+ let decoded: Buffer;
248+try {
249+decoded = gunzipSync(body, { maxOutputLength: maxBytes });
250+} catch (error) {
251+const code = (error as { code?: unknown }).code;
252+const message = error instanceof Error ? error.message : String(error);
253+if (code === "ERR_BUFFER_TOO_LARGE" || /maxOutputLength|larger than/u.test(message)) {
254+throw oversizedBodyError("decoded OTLP request body", maxBytes + 1, maxBytes);
255+}
256+throw error;
257+}
258+if (decoded.length > maxBytes) {
259+throw oversizedBodyError("decoded OTLP request body", decoded.length, maxBytes);
260+}
261+return decoded;
206262}
207263throw new Error(`unsupported OTLP content-encoding ${contentEncoding}`);
208264}
209265266+function appendCapturedBodyText(
267+capturedBodyText: Partial<Record<OtlpSignal, string[]>>,
268+signal: OtlpSignal,
269+body: Buffer,
270+maxBytes = MAX_CAPTURED_BODY_TEXT_BYTES,
271+disallowedNeedles: string[] = [],
272+): void {
273+const currentEntries = capturedBodyText[signal] ?? [];
274+const leakEntries = currentEntries.filter((entry) => entry.startsWith("[detected leak needle] "));
275+const currentTail = currentEntries
276+.filter((entry) => !entry.startsWith("[detected leak needle] "))
277+.join("\n");
278+const bodyText = body.toString("utf8");
279+const next = currentTail ? `${currentTail}\n${bodyText}` : bodyText;
280+const buffer = Buffer.from(next);
281+const nextLeakEntries = [
282+ ...leakEntries,
283+ ...disallowedNeedles
284+.filter((needle) => bodyText.includes(needle))
285+.map((needle) => `[detected leak needle] ${needle}`),
286+].slice(-20);
287+const tailEntry =
288+buffer.length > maxBytes
289+ ? `[captured body text truncated to last ${maxBytes} bytes]\n${buffer
290+ .subarray(buffer.length - maxBytes)
291+ .toString("utf8")}`
292+ : next;
293+capturedBodyText[signal] = [...nextLeakEntries, tailEntry];
294+}
295+210296function normalizeOtlpValue(value: OtlpAnyValue | undefined): string | number | boolean | string[] {
211297if (!value) {
212298return "";
@@ -250,9 +336,12 @@ function spanAttributes(span: OtlpSpan): Record<string, string | number | boolea
250336}
251337252338class ProtoReader {
339+private readonly buffer: Uint8Array;
253340private offset = 0;
254341255-constructor(private readonly buffer: Uint8Array) {}
342+constructor(buffer: Uint8Array) {
343+this.buffer = buffer;
344+}
256345257346done(): boolean {
258347return this.offset >= this.buffer.length;
@@ -580,7 +669,7 @@ function decodeLogRequest(body: Buffer): CapturedLogRecord[] {
580669return records;
581670}
582671583-function startLocalOtlpReceiver() {
672+function startLocalOtlpReceiver(disallowedBodyNeedles: string[] = []) {
584673const capturedRequests: CapturedRequest[] = [];
585674const capturedSpans: CapturedSpan[] = [];
586675const capturedMetrics: CapturedMetric[] = [];
@@ -600,9 +689,30 @@ function startLocalOtlpReceiver() {
600689return;
601690}
602691603-const compressedBody = await readRequestBody(req);
604692const contentEncoding = headerValue(req.headers["content-encoding"]);
605-const body = decodeRequestBody(compressedBody, contentEncoding);
693+ let body: Buffer;
694+try {
695+const compressedBody = await readRequestBody(req);
696+body = decodeRequestBody(compressedBody, contentEncoding);
697+} catch (error) {
698+const statusCode =
699+typeof (error as { statusCode?: unknown }).statusCode === "number"
700+ ? (error as { statusCode: number }).statusCode
701+ : 400;
702+capturedRequests.push({
703+path: requestPath,
704+ signal,
705+bytes: 0,
706+ contentEncoding,
707+status: statusCode,
708+spanCount: 0,
709+metricCount: 0,
710+logCount: 0,
711+});
712+res.writeHead(statusCode, { "content-type": "text/plain" });
713+res.end(error instanceof Error ? error.message : String(error));
714+return;
715+}
606716const spans = signal === "traces" ? decodeTraceRequest(body) : [];
607717const metrics = signal === "metrics" ? decodeMetricRequest(body) : [];
608718const logRecords = signal === "logs" ? decodeLogRequest(body) : [];
@@ -615,8 +725,7 @@ function startLocalOtlpReceiver() {
615725if (logRecords.length > 0) {
616726capturedLogRecords.push(...logRecords);
617727}
618-capturedBodyText[signal] ??= [];
619-capturedBodyText[signal]?.push(body.toString("utf8"));
728+appendCapturedBodyText(capturedBodyText, signal, body, undefined, disallowedBodyNeedles);
620729capturedRequests.push({
621730path: requestPath,
622731 signal,
@@ -774,21 +883,13 @@ service:
774883containerName,
775884 ...(useHostNetwork
776885 ? ["--network", "host"]
777- : [
778-"--add-host=host.docker.internal:host-gateway",
779-"-p",
780-`127.0.0.1:${collectorPort}:4318`,
781-]),
886+ : ["--add-host=host.docker.internal:host-gateway", "-p", `127.0.0.1:${collectorPort}:4318`]),
782887"-v",
783888`${configPath}:/etc/otelcol/config.yaml:ro`,
784889DEFAULT_DOCKER_COLLECTOR_IMAGE,
785890"--config=/etc/otelcol/config.yaml",
786891];
787-const child = spawn(
788-"docker",
789-dockerArgs,
790-{ stdio: ["ignore", "pipe", "pipe"] },
791-);
892+const child = spawn("docker", dockerArgs, { stdio: ["ignore", "pipe", "pipe"] });
792893child.stdout?.on("data", (chunk) => stdout.push(String(chunk)));
793894child.stderr?.on("data", (chunk) => stderr.push(String(chunk)));
794895child.on("error", (err) => {
@@ -936,7 +1037,7 @@ function hasRequiredSmokeSignals(receiver: ReturnType<typeof startLocalOtlpRecei
9361037REQUIRED_METRIC_NAMES.every((name) => metricNames.has(name)) &&
9371038receiver.capturedLogRecords.length > 0 &&
9381039["traces", "metrics", "logs"].every((signal) =>
939-receiver.capturedRequests.some((request) => request.signal === signal)
1040+receiver.capturedRequests.some((request) => request.signal === signal),
9401041)
9411042);
9421043}
@@ -1021,9 +1122,7 @@ function assertSmoke(params: {
10211122.map((record) => record.body)
10221123.filter((body) => body !== "log");
10231124if (rawLogBodies.length > 0) {
1024-failures.push(
1025-`OTLP log records exported ${rawLogBodies.length} non-placeholder bodies`,
1026-);
1125+failures.push(`OTLP log records exported ${rawLogBodies.length} non-placeholder bodies`);
10271126}
1028112710291128const attributeKeys = collectAttributeKeys(params.spans);
@@ -1094,7 +1193,7 @@ async function main() {
10941193}
1095119410961195await mkdir(options.outputDir, { recursive: true });
1097-const receiver = startLocalOtlpReceiver();
1196+const receiver = startLocalOtlpReceiver(disallowedBodyNeedles(options));
10981197const port = await receiver.listen();
10991198process.stdout.write(
11001199`qa-otel-smoke: local OTLP receiver listening on http://127.0.0.1:${port}\n`,
@@ -1216,9 +1315,17 @@ async function main() {
12161315);
12171316}
121813171219-main().catch((error) => {
1220-process.stderr.write(
1221-`qa-otel-smoke: ${error instanceof Error ? error.stack || error.message : String(error)}\n`,
1222-);
1223-process.exitCode = 1;
1224-});
1318+export const testing = {
1319+ appendCapturedBodyText,
1320+ decodeRequestBody,
1321+ readRequestBody,
1322+};
1323+1324+if (import.meta.url === pathToFileURL(process.argv[1] ?? "").href) {
1325+main().catch((error) => {
1326+process.stderr.write(
1327+`qa-otel-smoke: ${error instanceof Error ? error.stack || error.message : String(error)}\n`,
1328+);
1329+process.exitCode = 1;
1330+});
1331+}
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。