From 63d0483ab99dd3596f9a39b34e72381e7e82c6cb Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Tue, 28 Feb 2023 14:02:15 +0700 Subject: [PATCH] TSK-728: Server reconnect support (#2689) Signed-off-by: Andrey Sobolev --- plugins/client-resources/src/connection.ts | 4 +- server/ws/src/server.ts | 48 ++++++++++++++++++---- server/ws/src/types.ts | 5 +++ 3 files changed, 49 insertions(+), 8 deletions(-) diff --git a/plugins/client-resources/src/connection.ts b/plugins/client-resources/src/connection.ts index 422f54ad13..8c76f6c094 100644 --- a/plugins/client-resources/src/connection.ts +++ b/plugins/client-resources/src/connection.ts @@ -24,6 +24,7 @@ import core, { Domain, FindOptions, FindResult, + generateId, Ref, Tx, TxHandler, @@ -48,6 +49,7 @@ class Connection implements ClientConnection { private readonly requests = new Map() private lastId = 0 private readonly interval: number + private readonly sessionId = generateId() as string constructor ( private readonly url: string, @@ -106,7 +108,7 @@ class Connection implements ClientConnection { const clientSocketFactory = getMetadata(client.metadata.ClientSocketFactory) ?? ((url: string) => new WebSocket(url) as ClientSocket) - const websocket = clientSocketFactory(this.url) + const websocket = clientSocketFactory(this.url + `?sessionId=${this.sessionId}`) const socketId = this.sockets++ websocket.onmessage = (event: MessageEvent) => { const resp = readResponse(event.data) diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index df3acba198..798b2049b2 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -58,7 +58,8 @@ class SessionManager { ws: WebSocket, token: Token, pipelineFactory: PipelineFactory, - productId: string + productId: string, + sessionId?: string ): Promise { const wsString = toWorkspaceString(token.workspace, '@') @@ -98,7 +99,22 @@ class SessionManager { } const pipeline = await workspace.pipeline + + if (sessionId !== undefined) { + // try restore session + const existingSession = workspace.sessions.find((it) => it[0].sessionId === sessionId) + if (existingSession !== undefined) { + if (LOGGING_ENABLED) console.log('found existing session', existingSession) + // Update websocket + clearTimeout(existingSession[0].closeTimeout) + existingSession[0].closeTimeout = undefined + existingSession[1] = ws + return existingSession[0] + } + } + const session = this.createSession(token, pipeline) + session.sessionId = sessionId workspace.sessions.push([session, ws]) await this.setStatus(ctx, session, true) return session @@ -210,6 +226,7 @@ class SessionManager { workspace.sessions = [] const closeS = async (s: Session, webSocket: WebSocket): Promise => { + clearTimeout(s.closeTimeout) // await for message to go to client. await new Promise((resolve) => { // Override message handler, to wait for upgrading response from clients. @@ -283,6 +300,12 @@ async function handleRequest ( const request = readRequest(msg) if (request.id === -1 && request.method === 'hello') { ws.send(serialize({ id: -1, result: 'hello' })) + + // Push result buffer messages to client. + for (const r of service.resultBuffer ?? []) { + ws.send(serialize(r)) + } + service.resultBuffer = [] return } if (request.id === -1 && request.method === '#upgrade') { @@ -294,7 +317,12 @@ async function handleRequest ( const params = [ctx, ...request.params] const result = await f.apply(service, params) const resp: Response = { id: request.id, result } - ws.send(serialize(resp)) + ws.send(serialize(resp), (err) => { + if (err !== undefined) { + // It seems we failed to send to client. + service.resultBuffer = [...(service.resultBuffer ?? []), resp] + } + }) } catch (err: any) { const resp: Response = { id: request.id, @@ -337,18 +365,21 @@ export function start ( } }) // eslint-disable-next-line @typescript-eslint/no-misused-promises - wss.on('connection', async (ws: WebSocket, request: any, token: Token) => { + wss.on('connection', async (ws: WebSocket, request: any, token: Token, sessionId?: string) => { let buffer: string[] | undefined = [] ws.on('message', (msg: string) => { buffer?.push(msg) }) - const session = await sessions.addSession(ctx, ws, token, pipelineFactory, productId) + const session = await sessions.addSession(ctx, ws, token, pipelineFactory, productId, sessionId) // eslint-disable-next-line @typescript-eslint/no-misused-promises ws.on('message', async (msg: string) => await handleRequest(ctx, session, ws, msg)) // eslint-disable-next-line @typescript-eslint/no-misused-promises ws.on('close', (code: number, reason: string) => { - void sessions.close(ctx, ws, token.workspace, code, reason) + // remove session after 1seconds, give a time to reconnect. + session.closeTimeout = setTimeout(() => { + void sessions.close(ctx, ws, token.workspace, code, reason) + }, 1000) }) const b = buffer buffer = undefined @@ -359,16 +390,19 @@ export function start ( const server = createServer() server.on('upgrade', (request: IncomingMessage, socket: any, head: Buffer) => { - const token = request.url?.substring(1) // remove leading '/' + const url = new URL('http://localhost' + (request.url ?? '')) + const token = url.pathname.substring(1) + try { const payload = decodeToken(token ?? '') console.log('client connected with payload', payload) + const sessionId = url.searchParams.get('sessionId') if (payload.workspace.productId !== productId) { throw new Error('Invalid workspace product') } - wss.handleUpgrade(request, socket, head, (ws) => wss.emit('connection', ws, request, payload)) + wss.handleUpgrade(request, socket, head, (ws) => wss.emit('connection', ws, request, payload, sessionId)) } catch (err) { console.error('invalid token', err) wss.handleUpgrade(request, socket, head, (ws) => { diff --git a/server/ws/src/types.ts b/server/ws/src/types.ts index 2e6094e77d..c04b44770a 100644 --- a/server/ws/src/types.ts +++ b/server/ws/src/types.ts @@ -27,6 +27,11 @@ export interface Session { options?: FindOptions ) => Promise> tx: (ctx: MeasureContext, tx: Tx) => Promise + + // Session restore information + sessionId?: string + resultBuffer?: TxResult[] + closeTimeout?: any } /**