TSK-728: Server reconnect support (#2689)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2023-02-28 14:02:15 +07:00 committed by GitHub
parent a089cb9198
commit 63d0483ab9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 49 additions and 8 deletions

View File

@ -24,6 +24,7 @@ import core, {
Domain,
FindOptions,
FindResult,
generateId,
Ref,
Tx,
TxHandler,
@ -48,6 +49,7 @@ class Connection implements ClientConnection {
private readonly requests = new Map<ReqId, DeferredPromise>()
private lastId = 0
private readonly interval: number
private readonly sessionId = generateId() as string
constructor (
private readonly url: string,
@ -106,7 +108,7 @@ class Connection implements ClientConnection {
const clientSocketFactory =
getMetadata(client.metadata.ClientSocketFactory) ?? ((url: string) => new WebSocket(url) as ClientSocket)
const websocket = clientSocketFactory(this.url)
const websocket = clientSocketFactory(this.url + `?sessionId=${this.sessionId}`)
const socketId = this.sockets++
websocket.onmessage = (event: MessageEvent) => {
const resp = readResponse(event.data)

View File

@ -58,7 +58,8 @@ class SessionManager {
ws: WebSocket,
token: Token,
pipelineFactory: PipelineFactory,
productId: string
productId: string,
sessionId?: string
): Promise<Session> {
const wsString = toWorkspaceString(token.workspace, '@')
@ -98,7 +99,22 @@ class SessionManager {
}
const pipeline = await workspace.pipeline
if (sessionId !== undefined) {
// try restore session
const existingSession = workspace.sessions.find((it) => it[0].sessionId === sessionId)
if (existingSession !== undefined) {
if (LOGGING_ENABLED) console.log('found existing session', existingSession)
// Update websocket
clearTimeout(existingSession[0].closeTimeout)
existingSession[0].closeTimeout = undefined
existingSession[1] = ws
return existingSession[0]
}
}
const session = this.createSession(token, pipeline)
session.sessionId = sessionId
workspace.sessions.push([session, ws])
await this.setStatus(ctx, session, true)
return session
@ -210,6 +226,7 @@ class SessionManager {
workspace.sessions = []
const closeS = async (s: Session, webSocket: WebSocket): Promise<void> => {
clearTimeout(s.closeTimeout)
// await for message to go to client.
await new Promise((resolve) => {
// Override message handler, to wait for upgrading response from clients.
@ -283,6 +300,12 @@ async function handleRequest<S extends Session> (
const request = readRequest(msg)
if (request.id === -1 && request.method === 'hello') {
ws.send(serialize({ id: -1, result: 'hello' }))
// Push result buffer messages to client.
for (const r of service.resultBuffer ?? []) {
ws.send(serialize(r))
}
service.resultBuffer = []
return
}
if (request.id === -1 && request.method === '#upgrade') {
@ -294,7 +317,12 @@ async function handleRequest<S extends Session> (
const params = [ctx, ...request.params]
const result = await f.apply(service, params)
const resp: Response<any> = { id: request.id, result }
ws.send(serialize(resp))
ws.send(serialize(resp), (err) => {
if (err !== undefined) {
// It seems we failed to send to client.
service.resultBuffer = [...(service.resultBuffer ?? []), resp]
}
})
} catch (err: any) {
const resp: Response<any> = {
id: request.id,
@ -337,18 +365,21 @@ export function start (
}
})
// eslint-disable-next-line @typescript-eslint/no-misused-promises
wss.on('connection', async (ws: WebSocket, request: any, token: Token) => {
wss.on('connection', async (ws: WebSocket, request: any, token: Token, sessionId?: string) => {
let buffer: string[] | undefined = []
ws.on('message', (msg: string) => {
buffer?.push(msg)
})
const session = await sessions.addSession(ctx, ws, token, pipelineFactory, productId)
const session = await sessions.addSession(ctx, ws, token, pipelineFactory, productId, sessionId)
// eslint-disable-next-line @typescript-eslint/no-misused-promises
ws.on('message', async (msg: string) => await handleRequest(ctx, session, ws, msg))
// eslint-disable-next-line @typescript-eslint/no-misused-promises
ws.on('close', (code: number, reason: string) => {
void sessions.close(ctx, ws, token.workspace, code, reason)
// remove session after 1seconds, give a time to reconnect.
session.closeTimeout = setTimeout(() => {
void sessions.close(ctx, ws, token.workspace, code, reason)
}, 1000)
})
const b = buffer
buffer = undefined
@ -359,16 +390,19 @@ export function start (
const server = createServer()
server.on('upgrade', (request: IncomingMessage, socket: any, head: Buffer) => {
const token = request.url?.substring(1) // remove leading '/'
const url = new URL('http://localhost' + (request.url ?? ''))
const token = url.pathname.substring(1)
try {
const payload = decodeToken(token ?? '')
console.log('client connected with payload', payload)
const sessionId = url.searchParams.get('sessionId')
if (payload.workspace.productId !== productId) {
throw new Error('Invalid workspace product')
}
wss.handleUpgrade(request, socket, head, (ws) => wss.emit('connection', ws, request, payload))
wss.handleUpgrade(request, socket, head, (ws) => wss.emit('connection', ws, request, payload, sessionId))
} catch (err) {
console.error('invalid token', err)
wss.handleUpgrade(request, socket, head, (ws) => {

View File

@ -27,6 +27,11 @@ export interface Session {
options?: FindOptions<T>
) => Promise<FindResult<T>>
tx: (ctx: MeasureContext, tx: Tx) => Promise<TxResult>
// Session restore information
sessionId?: string
resultBuffer?: TxResult[]
closeTimeout?: any
}
/**