diff --git a/plugins/client-resources/src/connection.ts b/plugins/client-resources/src/connection.ts index e7e673de42..aca8d47ea4 100644 --- a/plugins/client-resources/src/connection.ts +++ b/plugins/client-resources/src/connection.ts @@ -15,7 +15,13 @@ // import { Analytics } from '@hcengineering/analytics' -import client, { ClientSocket, ClientSocketReadyState, type ClientFactoryOptions } from '@hcengineering/client' +import client, { + ClientSocket, + ClientSocketReadyState, + pingConst, + pongConst, + type ClientFactoryOptions +} from '@hcengineering/client' import core, { Account, Class, @@ -160,7 +166,7 @@ class Connection implements ClientConnection { if (!this.closed) { // eslint-disable-next-line @typescript-eslint/no-floating-promises void this.sendRequest({ - method: 'ping', + method: pingConst, params: [], once: true, handleResult: async (result) => { @@ -317,8 +323,8 @@ class Connection implements ClientConnection { } return } - if (resp.result === 'ping') { - void this.sendRequest({ method: 'ping', params: [] }) + if (resp.result === pingConst) { + void this.sendRequest({ method: pingConst, params: [] }) return } if (resp.id !== undefined) { @@ -461,6 +467,27 @@ class Connection implements ClientConnection { if (this.websocket !== wsocket) { return } + if (event.data === pongConst) { + this.pingResponse = Date.now() + return + } + if (event.data === pingConst) { + void this.sendRequest({ method: pingConst, params: [] }) + return + } + if ( + event.data instanceof ArrayBuffer && + (event.data.byteLength === pingConst.length || event.data.byteLength === pongConst.length) + ) { + const text = new TextDecoder().decode(event.data) + if (text === pingConst) { + void this.sendRequest({ method: pingConst, params: [] }) + } + if (text === pongConst) { + this.pingResponse = Date.now() + } + return + } if (event.data instanceof Blob) { void event.data.arrayBuffer().then((data) => { const resp = this.rpcHandler.readResponse(data, this.binaryMode) @@ -546,23 +573,30 @@ class Connection implements ClientConnection { if (w instanceof Promise) { await w } - this.requests.set(id, promise) + if (data.method !== pingConst) { + this.requests.set(id, promise) + } const sendData = (): void => { if (this.websocket?.readyState === ClientSocketReadyState.OPEN) { promise.startTime = Date.now() - const dta = ctx.withSync('serialize', {}, () => - this.rpcHandler.serialize( - { - method: data.method, - params: data.params, - id, - time: Date.now() - }, - this.binaryMode + if (data.method !== pingConst) { + const dta = ctx.withSync('serialize', {}, () => + this.rpcHandler.serialize( + { + method: data.method, + params: data.params, + id, + time: Date.now() + }, + this.binaryMode + ) ) - ) - ctx.withSync('send-data', {}, () => this.websocket?.send(dta)) + + ctx.withSync('send-data', {}, () => this.websocket?.send(dta)) + } else { + this.websocket?.send(pingConst) + } } } if (data.allowReconnect ?? true) { @@ -579,7 +613,9 @@ class Connection implements ClientConnection { sendData() }) void ctx.with('broadcast-event', {}, () => broadcastEvent(client.event.NetworkRequests, this.requests.size)) - return await promise.promise + if (data.method !== pingConst) { + return await promise.promise + } }) } diff --git a/plugins/client/src/index.ts b/plugins/client/src/index.ts index 1dac0ee3ca..3bd5ccccc2 100644 --- a/plugins/client/src/index.ts +++ b/plugins/client/src/index.ts @@ -77,6 +77,9 @@ export type ClientFactory = (token: string, endpoint: string, opt?: ClientFactor // ui - will filter out all server element's and all UI disabled elements. export type FilterMode = 'none' | 'client' | 'ui' +export const pingConst = 'ping' +export const pongConst = 'pong!' + export default plugin(clientId, { metadata: { ClientSocketFactory: '' as Metadata, diff --git a/server/core/src/types.ts b/server/core/src/types.ts index 193216ac81..2a45c7869d 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -497,6 +497,7 @@ export interface SessionRequest { export interface ClientSessionCtx { ctx: MeasureContext sendResponse: (msg: any) => Promise + sendPong: () => void sendError: (msg: any, error: any) => Promise } @@ -553,6 +554,8 @@ export interface ConnectionSocket { isClosed: boolean close: () => void send: (ctx: MeasureContext, msg: Response, binary: boolean, compression: boolean) => void + + sendPong: () => void data: () => Record readRequest: (buffer: Buffer, binary: boolean) => Request @@ -664,7 +667,7 @@ export interface SessionManager { ws: ConnectionSocket, request: Request, workspace: string // wsId, toWorkspaceString() - ) => void + ) => Promise } /** @@ -692,3 +695,6 @@ export type ServerFactory = ( accountsUrl: string, externalStorage: StorageAdapter ) => () => Promise + +export const pingConst = 'ping' +export const pongConst = 'pong!' diff --git a/server/server/src/client.ts b/server/server/src/client.ts index 046231fdfa..0c8b7ed32b 100644 --- a/server/server/src/client.ts +++ b/server/server/src/client.ts @@ -39,8 +39,8 @@ import core, { import { PlatformError, unknownError } from '@hcengineering/platform' import { BackupClientOps, - SessionDataImpl, createBroadcastEvent, + SessionDataImpl, type ClientSessionCtx, type ConnectionSocket, type Pipeline, @@ -98,9 +98,8 @@ export class ClientSession implements Session { } async ping (ctx: ClientSessionCtx): Promise { - // console.log('ping') this.lastRequest = Date.now() - await ctx.sendResponse('pong!') + ctx.sendPong() } async loadModel (ctx: ClientSessionCtx, lastModelTx: Timestamp, hash?: string): Promise { diff --git a/server/server/src/sessionManager.ts b/server/server/src/sessionManager.ts index 024b14babc..d93c572e75 100644 --- a/server/server/src/sessionManager.ts +++ b/server/server/src/sessionManager.ts @@ -15,8 +15,6 @@ import { Analytics } from '@hcengineering/analytics' import core, { - TxFactory, - WorkspaceEvent, cutObjectArray, generateId, isArchivingMode, @@ -25,8 +23,10 @@ import core, { isWorkspaceCreating, systemAccountEmail, toWorkspaceString, + TxFactory, versionToString, withContext, + WorkspaceEvent, type BaseWorkspaceInfo, type Branding, type BrandingMap, @@ -40,6 +40,7 @@ import { unknownError, type Status } from '@hcengineering/platform' import { type HelloRequest, type HelloResponse, type Request, type Response } from '@hcengineering/rpc' import { LOGGING_ENABLED, + pingConst, Pipeline, PipelineFactory, ServerFactory, @@ -240,7 +241,7 @@ class TSessionManager implements SessionManager { if (s[1].socket.checkState()) { s[1].socket.send( workspace.context, - { result: 'ping' }, + { result: pingConst }, s[1].session.binaryMode, s[1].session.useCompression ) @@ -724,7 +725,8 @@ class TSessionManager implements SessionManager { ctx, sendError: async (msg, error: Status) => { // Assume no error send - } + }, + sendPong: () => {} } const status = (await session.findAllRaw(ctx, core.class.UserStatus, { user: user._id }, { limit: 1 }))[0] @@ -933,7 +935,7 @@ class TSessionManager implements SessionManager { ws: ConnectionSocket, request: Request, workspace: string // wsId, toWorkspaceString() - ): void { + ): Promise { const backupMode = service.getMode() === 'backup' const userCtx = requestCtx.newChild( @@ -949,7 +951,7 @@ class TSessionManager implements SessionManager { const reqId = generateId() const st = Date.now() - void userCtx + return userCtx .with(`🧭 ${backupMode ? 'handleBackup' : 'handleRequest'}`, {}, async (ctx) => { if (request.time != null) { const delta = Date.now() - request.time @@ -1024,6 +1026,9 @@ class TSessionManager implements SessionManager { }) userCtx.end() }, + sendPong: () => { + ws.sendPong() + }, ctx, sendError: async (msg, error: Status) => { await sendResponse(ctx, service, ws, { @@ -1137,7 +1142,7 @@ export function startSessionManager ( shutdown: opt.serverFactory( sessions, (rctx, service, ws, msg, workspace) => { - sessions.handleRequest(rctx, service, ws, msg, workspace) + void sessions.handleRequest(rctx, service, ws, msg, workspace) }, ctx, opt.pipelineFactory, diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index ddbe35544d..9018a7f476 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -29,6 +29,8 @@ import { } from '@hcengineering/server' import { LOGGING_ENABLED, + pingConst, + pongConst, type ConnectionSocket, type HandleRequestFunction, type PipelineFactory, @@ -552,9 +554,18 @@ function createWebsocketClientSocket ( return true }, readRequest: (buffer: Buffer, binary: boolean) => { + if (buffer.length === pingConst.length && buffer.toString() === pingConst) { + return { method: pingConst, params: [], id: -1, time: Date.now() } + } return rpcHandler.readRequest(buffer, binary) }, data: () => data, + sendPong: () => { + if (ws.readyState !== ws.OPEN || cs.isClosed) { + return + } + ws.send(pongConst) + }, send: (ctx: MeasureContext, msg, binary, compression) => { const smsg = rpcHandler.serialize(msg, binary) diff --git a/workers/transactor/src/transactor.ts b/workers/transactor/src/transactor.ts index ff2bd652fe..17710ac387 100644 --- a/workers/transactor/src/transactor.ts +++ b/workers/transactor/src/transactor.ts @@ -21,7 +21,9 @@ import { createDummyStorageAdapter, initStatisticsContext, loadBrandingMap, + pingConst, Pipeline, + pongConst, Session, type ConnectionSocket, type PipelineFactory, @@ -48,7 +50,6 @@ export const PREFERRED_SAVE_SIZE = 500 export const PREFERRED_SAVE_INTERVAL = 30 * 1000 export class Transactor extends DurableObject { - rpcHandler = new RPCHandler() private workspace: string = '' private sessionManager!: SessionManager @@ -72,6 +73,8 @@ export class Transactor extends DurableObject { registerServerPlugins() this.accountsUrl = env.ACCOUNTS_URL ?? 'http://127.0.0.1:3000' + this.ctx.setWebSocketAutoResponse(new WebSocketRequestResponsePair(pingConst, pongConst)) + this.measureCtx = this.measureCtx = initStatisticsContext('cloud-transactor', { statsUrl: this.env.STATS_URL ?? 'http://127.0.0.1:4900', serviceName: () => 'cloud-transactor: ' + this.workspace @@ -79,7 +82,8 @@ export class Transactor extends DurableObject { setMetadata(serverPlugin.metadata.Secret, env.SERVER_SECRET ?? 'secret') - console.log(`Connecting DB to ${env.DB_URL !== '' ? 'Direct ' : 'Hyperdrive'}`) + console.log({ message: 'Connecting DB', mode: env.DB_URL !== '' ? 'Direct ' : 'Hyperdrive' }) + console.log({ message: 'use stats: ' + (this.env.STATS_URL ?? 'http://127.0.0.1:4900') }) // TODO: const storage = createDummyStorageAdapter() @@ -157,7 +161,13 @@ export class Transactor extends DurableObject { s.context.measure('receive-data', buff?.length ?? 0) // processRequest(s.session, cs, s.context, s.workspaceId, buff, handleRequest) const request = cs.readRequest(buff, s.session.binaryMode) - this.sessionManager.handleRequest(this.measureCtx, s.session, cs, request, this.workspace) + console.log({ + message: 'handle-request', + method: request.method, + workspace: s.workspaceId, + user: s.session.getUser() + }) + this.ctx.waitUntil(this.sessionManager.handleRequest(this.measureCtx, s.session, cs, request, this.workspace)) }, typeof message === 'string' ? Buffer.from(message) : Buffer.from(message) ) @@ -168,7 +178,9 @@ export class Transactor extends DurableObject { await this.handleClose(ws, 1011, 'error') } - async alarm (): Promise {} + async alarm (): Promise { + console.log({ message: 'alarm' }) + } async handleSession ( ws: WebSocket, @@ -238,6 +250,7 @@ export class Transactor extends DurableObject { model: any } ): ConnectionSocket { + const rpcHandler = new RPCHandler() const cs: ConnectionSocket = { id: generateId(), isClosed: false, @@ -253,23 +266,35 @@ export class Transactor extends DurableObject { return true }, readRequest: (buffer: Buffer, binary: boolean) => { - return this.rpcHandler.readRequest(buffer, binary) + if (buffer.length === pingConst.length) { + if (buffer.toString() === pingConst) { + return { method: pingConst, params: [], id: -1, time: Date.now() } + } + } + return rpcHandler.readRequest(buffer, binary) }, data: () => data, send: (ctx: MeasureContext, msg, binary, compression) => { - const smsg = this.rpcHandler.serialize(msg, binary) + const smsg = rpcHandler.serialize(msg, binary) ctx.measure('send-data', smsg.length) if (ws.readyState !== WebSocket.OPEN || cs.isClosed) { return } ws.send(smsg) + }, + sendPong: () => { + if (ws.readyState !== WebSocket.OPEN || cs.isClosed) { + return + } + ws.send(pongConst) } } return cs } async broadcastMessage (message: Uint8Array, origin?: any): Promise { + console.log({ message: 'broadcast' }) const wss = this.ctx.getWebSockets().filter((ws) => ws.readyState === WebSocket.OPEN) await Promise.all( wss.map(async (ws) => { @@ -315,7 +340,8 @@ export class Transactor extends DurableObject { data: () => { return {} }, - send: (ctx: MeasureContext, msg, binary, compression) => {} + send: (ctx: MeasureContext, msg, binary, compression) => {}, + sendPong: () => {} } return cs } @@ -380,6 +406,9 @@ export class Transactor extends DurableObject { // it just logs them to console and return an empty result sendError: async (msg, error) => { result = { error: `${msg}`, status: `${error}` } + }, + sendPong: () => { + cs.sendPong() } } await session.tx(sessionCtx, tx) @@ -411,7 +440,8 @@ export class Transactor extends DurableObject { }, sendError: async (msg, error) => { result = { error: `${msg}`, status: `${error}` } - } + }, + sendPong: () => {} } await (session as any).getAccount(sessionCtx) } catch (error: any) {