























@@ -1,10 +1,129 @@
11package ai.openclaw.app.gateway
223+import kotlinx.coroutines.CompletableDeferred
4+import kotlinx.coroutines.CoroutineScope
5+import kotlinx.coroutines.Dispatchers
6+import kotlinx.coroutines.Job
7+import kotlinx.coroutines.SupervisorJob
8+import kotlinx.coroutines.cancelAndJoin
9+import kotlinx.coroutines.runBlocking
10+import kotlinx.coroutines.withTimeout
11+import kotlinx.serialization.json.Json
12+import kotlinx.serialization.json.jsonObject
13+import kotlinx.serialization.json.jsonPrimitive
14+import okhttp3.Response
15+import okhttp3.WebSocket
16+import okhttp3.WebSocketListener
17+import okhttp3.mockwebserver.Dispatcher
18+import okhttp3.mockwebserver.MockResponse
19+import okhttp3.mockwebserver.MockWebServer
20+import okhttp3.mockwebserver.RecordedRequest
21+import org.junit.Assert.assertEquals
322import org.junit.Assert.assertFalse
423import org.junit.Assert.assertTrue
524import org.junit.Test
25+import org.junit.runner.RunWith
26+import org.robolectric.RobolectricTestRunner
27+import org.robolectric.RuntimeEnvironment
28+import org.robolectric.annotation.Config
29+import java.util.concurrent.ConcurrentLinkedQueue
63031+private const val LIFECYCLE_TEST_TIMEOUT_MS = 8_000L
32+private const val LIFECYCLE_CONNECT_CHALLENGE_FRAME =
33+"""{"type":"event","event":"connect.challenge","payload":{"nonce":"android-test-nonce"}}"""
34+35+private class ReconnectDeviceAuthStore : DeviceAuthTokenStore {
36+override fun loadEntry(
37+deviceId: String,
38+role: String,
39+ ): DeviceAuthEntry? = null
40+41+override fun saveToken(
42+deviceId: String,
43+role: String,
44+token: String,
45+scopes: List<String>,
46+ ) = Unit
47+48+override fun clearToken(
49+deviceId: String,
50+role: String,
51+ ) = Unit
52+}
53+54+private data class ReconnectHarness(
55+val session: GatewaySession,
56+val sessionJob: Job,
57+)
58+59+private data class ReconnectServer(
60+val server: MockWebServer,
61+val sockets: ConcurrentLinkedQueue<WebSocket>,
62+) {
63+val port: Int
64+ get() = server.port
65+66+val requestCount: Int
67+ get() = server.requestCount
68+69+fun shutdown() {
70+ sockets.forEach { runCatching { it.cancel() } }
71+ runCatching { server.shutdown() }
72+ .onFailure { err ->
73+if (err.message != "Gave up waiting for queue to shut down") throw err
74+ }
75+ }
76+}
77+78+@RunWith(RobolectricTestRunner::class)
79+@Config(sdk = [34])
780class GatewaySessionReconnectTest {
81+ @Test
82+fun connectToNewGatewayClosesActiveConnectionAndStartsReplacement() =
83+ runBlocking {
84+val json = Json { ignoreUnknownKeys = true }
85+val firstConnect = CompletableDeferred<Unit>()
86+val firstClosed = CompletableDeferred<Unit>()
87+val secondConnect = CompletableDeferred<Unit>()
88+val secondClosed = CompletableDeferred<Unit>()
89+val firstServer =
90+ startGatewayServer(
91+ json = json,
92+ onClosed = { firstClosed.complete(Unit) },
93+ ) { webSocket, id, method ->
94+if (method == "connect") {
95+ firstConnect.complete(Unit)
96+ webSocket.send(connectResponseFrame(id))
97+ }
98+ }
99+val secondServer =
100+ startGatewayServer(
101+ json = json,
102+ onClosed = { secondClosed.complete(Unit) },
103+ ) { webSocket, id, method ->
104+if (method == "connect") {
105+ secondConnect.complete(Unit)
106+ webSocket.send(connectResponseFrame(id))
107+ }
108+ }
109+val harness = createReconnectHarness()
110+111+try {
112+ connectNodeSession(harness.session, firstServer.port)
113+ withTimeout(LIFECYCLE_TEST_TIMEOUT_MS) { firstConnect.await() }
114+115+ connectNodeSession(harness.session, secondServer.port)
116+117+ withTimeout(LIFECYCLE_TEST_TIMEOUT_MS) { firstClosed.await() }
118+ withTimeout(LIFECYCLE_TEST_TIMEOUT_MS) { secondConnect.await() }
119+ assertEquals(1, secondServer.requestCount)
120+ harness.session.disconnect()
121+ withTimeout(LIFECYCLE_TEST_TIMEOUT_MS) { secondClosed.await() }
122+ } finally {
123+ shutdownReconnectHarness(harness, firstServer, secondServer)
124+ }
125+ }
126+8127 @Test
9128fun bootstrapNodePairingRequiredKeepsReconnectActive() {
10129val error =
@@ -113,4 +232,125 @@ class GatewaySessionReconnectTest {
113232 ),
114233 )
115234 }
235+236+private fun createReconnectHarness(): ReconnectHarness {
237+val app = RuntimeEnvironment.getApplication()
238+val sessionJob = SupervisorJob()
239+val session =
240+GatewaySession(
241+ scope = CoroutineScope(sessionJob + Dispatchers.Default),
242+ identityStore = DeviceIdentityStore(app),
243+ deviceAuthStore = ReconnectDeviceAuthStore(),
244+ onConnected = { _, _, _ -> },
245+ onDisconnected = { _ -> },
246+ onEvent = { _, _ -> },
247+ onInvoke = { GatewaySession.InvokeResult.ok("""{"handled":true}""") },
248+ )
249+return ReconnectHarness(session = session, sessionJob = sessionJob)
250+ }
251+252+private suspend fun connectNodeSession(
253+session: GatewaySession,
254+port: Int,
255+ ) {
256+ session.connect(
257+ endpoint =
258+GatewayEndpoint(
259+ stableId = "manual|127.0.0.1|$port",
260+ name = "test",
261+ host = "127.0.0.1",
262+ port = port,
263+ tlsEnabled = false,
264+ ),
265+ token = "test-token",
266+ bootstrapToken = null,
267+ password = null,
268+ options =
269+GatewayConnectOptions(
270+ role = "node",
271+ scopes = listOf("node:invoke"),
272+ caps = emptyList(),
273+ commands = emptyList(),
274+ permissions = emptyMap(),
275+ client =
276+GatewayClientInfo(
277+ id = "openclaw-android-test",
278+ displayName = "Android Test",
279+ version = "1.0.0-test",
280+ platform = "android",
281+ mode = "node",
282+ instanceId = "android-test-instance",
283+ deviceFamily = "android",
284+ modelIdentifier = "test",
285+ ),
286+ ),
287+ tls = null,
288+ )
289+ }
290+291+private suspend fun shutdownReconnectHarness(
292+harness: ReconnectHarness,
293+vararg servers: ReconnectServer,
294+ ) {
295+ harness.session.disconnect()
296+ harness.sessionJob.cancelAndJoin()
297+ servers.forEach { it.shutdown() }
298+ }
299+300+private fun connectResponseFrame(id: String): String = """{"type":"res","id":"$id","ok":true,"payload":{"snapshot":{"sessionDefaults":{"mainSessionKey":"main"}}}}"""
301+302+private fun startGatewayServer(
303+json: Json,
304+onClosed: () -> Unit = {},
305+onRequestFrame: (webSocket: WebSocket, id: String, method: String) -> Unit,
306+ ): ReconnectServer {
307+val sockets = ConcurrentLinkedQueue<WebSocket>()
308+val server =
309+MockWebServer().apply {
310+ dispatcher =
311+object : Dispatcher() {
312+override fun dispatch(request: RecordedRequest): MockResponse =
313+MockResponse().withWebSocketUpgrade(
314+object : WebSocketListener() {
315+override fun onOpen(
316+webSocket: WebSocket,
317+response: Response,
318+ ) {
319+ sockets += webSocket
320+ webSocket.send(LIFECYCLE_CONNECT_CHALLENGE_FRAME)
321+ }
322+323+override fun onMessage(
324+webSocket: WebSocket,
325+text: String,
326+ ) {
327+val frame = json.parseToJsonElement(text).jsonObject
328+if (frame["type"]?.jsonPrimitive?.content != "req") return
329+val id = frame["id"]?.jsonPrimitive?.content ?: return
330+val method = frame["method"]?.jsonPrimitive?.content ?: return
331+ onRequestFrame(webSocket, id, method)
332+ }
333+334+override fun onClosing(
335+webSocket: WebSocket,
336+code: Int,
337+reason: String,
338+ ) {
339+ onClosed()
340+ }
341+342+override fun onClosed(
343+webSocket: WebSocket,
344+code: Int,
345+reason: String,
346+ ) {
347+ onClosed()
348+ }
349+ },
350+ )
351+ }
352+ start()
353+ }
354+return ReconnectServer(server = server, sockets = sockets)
355+ }
116356}
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。