From bb6ee39174f008baf8df395c683b3e37d62875a5 Mon Sep 17 00:00:00 2001
From: Andrey Sobolev <haiodo@users.noreply.github.com>
Date: Mon, 23 Dec 2024 13:26:16 +0700
Subject: [PATCH] UBERF-8532: Rework how ping work (#7522)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
---
 plugins/client-resources/src/connection.ts | 70 ++++++++++++++++------
 plugins/client/src/index.ts                |  3 +
 server/core/src/types.ts                   |  8 ++-
 server/server/src/client.ts                |  5 +-
 server/server/src/sessionManager.ts        | 19 +++---
 server/ws/src/server_http.ts               | 11 ++++
 workers/transactor/src/transactor.ts       | 46 +++++++++++---
 7 files changed, 126 insertions(+), 36 deletions(-)

diff --git a/plugins/client-resources/src/connection.ts b/plugins/client-resources/src/connection.ts
index e7e673de42..aca8d47ea4 100644
--- a/plugins/client-resources/src/connection.ts
+++ b/plugins/client-resources/src/connection.ts
@@ -15,7 +15,13 @@
 //
 
 import { Analytics } from '@hcengineering/analytics'
-import client, { ClientSocket, ClientSocketReadyState, type ClientFactoryOptions } from '@hcengineering/client'
+import client, {
+  ClientSocket,
+  ClientSocketReadyState,
+  pingConst,
+  pongConst,
+  type ClientFactoryOptions
+} from '@hcengineering/client'
 import core, {
   Account,
   Class,
@@ -160,7 +166,7 @@ class Connection implements ClientConnection {
       if (!this.closed) {
         // eslint-disable-next-line @typescript-eslint/no-floating-promises
         void this.sendRequest({
-          method: 'ping',
+          method: pingConst,
           params: [],
           once: true,
           handleResult: async (result) => {
@@ -317,8 +323,8 @@ class Connection implements ClientConnection {
       }
       return
     }
-    if (resp.result === 'ping') {
-      void this.sendRequest({ method: 'ping', params: [] })
+    if (resp.result === pingConst) {
+      void this.sendRequest({ method: pingConst, params: [] })
       return
     }
     if (resp.id !== undefined) {
@@ -461,6 +467,27 @@ class Connection implements ClientConnection {
       if (this.websocket !== wsocket) {
         return
       }
+      if (event.data === pongConst) {
+        this.pingResponse = Date.now()
+        return
+      }
+      if (event.data === pingConst) {
+        void this.sendRequest({ method: pingConst, params: [] })
+        return
+      }
+      if (
+        event.data instanceof ArrayBuffer &&
+        (event.data.byteLength === pingConst.length || event.data.byteLength === pongConst.length)
+      ) {
+        const text = new TextDecoder().decode(event.data)
+        if (text === pingConst) {
+          void this.sendRequest({ method: pingConst, params: [] })
+        }
+        if (text === pongConst) {
+          this.pingResponse = Date.now()
+        }
+        return
+      }
       if (event.data instanceof Blob) {
         void event.data.arrayBuffer().then((data) => {
           const resp = this.rpcHandler.readResponse<any>(data, this.binaryMode)
@@ -546,23 +573,30 @@ class Connection implements ClientConnection {
       if (w instanceof Promise) {
         await w
       }
-      this.requests.set(id, promise)
+      if (data.method !== pingConst) {
+        this.requests.set(id, promise)
+      }
       const sendData = (): void => {
         if (this.websocket?.readyState === ClientSocketReadyState.OPEN) {
           promise.startTime = Date.now()
 
-          const dta = ctx.withSync('serialize', {}, () =>
-            this.rpcHandler.serialize(
-              {
-                method: data.method,
-                params: data.params,
-                id,
-                time: Date.now()
-              },
-              this.binaryMode
+          if (data.method !== pingConst) {
+            const dta = ctx.withSync('serialize', {}, () =>
+              this.rpcHandler.serialize(
+                {
+                  method: data.method,
+                  params: data.params,
+                  id,
+                  time: Date.now()
+                },
+                this.binaryMode
+              )
             )
-          )
-          ctx.withSync('send-data', {}, () => this.websocket?.send(dta))
+
+            ctx.withSync('send-data', {}, () => this.websocket?.send(dta))
+          } else {
+            this.websocket?.send(pingConst)
+          }
         }
       }
       if (data.allowReconnect ?? true) {
@@ -579,7 +613,9 @@ class Connection implements ClientConnection {
         sendData()
       })
       void ctx.with('broadcast-event', {}, () => broadcastEvent(client.event.NetworkRequests, this.requests.size))
-      return await promise.promise
+      if (data.method !== pingConst) {
+        return await promise.promise
+      }
     })
   }
 
diff --git a/plugins/client/src/index.ts b/plugins/client/src/index.ts
index 1dac0ee3ca..3bd5ccccc2 100644
--- a/plugins/client/src/index.ts
+++ b/plugins/client/src/index.ts
@@ -77,6 +77,9 @@ export type ClientFactory = (token: string, endpoint: string, opt?: ClientFactor
 // ui - will filter out all server element's and all UI disabled elements.
 export type FilterMode = 'none' | 'client' | 'ui'
 
+export const pingConst = 'ping'
+export const pongConst = 'pong!'
+
 export default plugin(clientId, {
   metadata: {
     ClientSocketFactory: '' as Metadata<ClientSocketFactory>,
diff --git a/server/core/src/types.ts b/server/core/src/types.ts
index 193216ac81..2a45c7869d 100644
--- a/server/core/src/types.ts
+++ b/server/core/src/types.ts
@@ -497,6 +497,7 @@ export interface SessionRequest {
 export interface ClientSessionCtx {
   ctx: MeasureContext
   sendResponse: (msg: any) => Promise<void>
+  sendPong: () => void
   sendError: (msg: any, error: any) => Promise<void>
 }
 
@@ -553,6 +554,8 @@ export interface ConnectionSocket {
   isClosed: boolean
   close: () => void
   send: (ctx: MeasureContext, msg: Response<any>, binary: boolean, compression: boolean) => void
+
+  sendPong: () => void
   data: () => Record<string, any>
 
   readRequest: (buffer: Buffer, binary: boolean) => Request<any>
@@ -664,7 +667,7 @@ export interface SessionManager {
     ws: ConnectionSocket,
     request: Request<any>,
     workspace: string // wsId, toWorkspaceString()
-  ) => void
+  ) => Promise<void>
 }
 
 /**
@@ -692,3 +695,6 @@ export type ServerFactory = (
   accountsUrl: string,
   externalStorage: StorageAdapter
 ) => () => Promise<void>
+
+export const pingConst = 'ping'
+export const pongConst = 'pong!'
diff --git a/server/server/src/client.ts b/server/server/src/client.ts
index 046231fdfa..0c8b7ed32b 100644
--- a/server/server/src/client.ts
+++ b/server/server/src/client.ts
@@ -39,8 +39,8 @@ import core, {
 import { PlatformError, unknownError } from '@hcengineering/platform'
 import {
   BackupClientOps,
-  SessionDataImpl,
   createBroadcastEvent,
+  SessionDataImpl,
   type ClientSessionCtx,
   type ConnectionSocket,
   type Pipeline,
@@ -98,9 +98,8 @@ export class ClientSession implements Session {
   }
 
   async ping (ctx: ClientSessionCtx): Promise<void> {
-    // console.log('ping')
     this.lastRequest = Date.now()
-    await ctx.sendResponse('pong!')
+    ctx.sendPong()
   }
 
   async loadModel (ctx: ClientSessionCtx, lastModelTx: Timestamp, hash?: string): Promise<void> {
diff --git a/server/server/src/sessionManager.ts b/server/server/src/sessionManager.ts
index 024b14babc..d93c572e75 100644
--- a/server/server/src/sessionManager.ts
+++ b/server/server/src/sessionManager.ts
@@ -15,8 +15,6 @@
 
 import { Analytics } from '@hcengineering/analytics'
 import core, {
-  TxFactory,
-  WorkspaceEvent,
   cutObjectArray,
   generateId,
   isArchivingMode,
@@ -25,8 +23,10 @@ import core, {
   isWorkspaceCreating,
   systemAccountEmail,
   toWorkspaceString,
+  TxFactory,
   versionToString,
   withContext,
+  WorkspaceEvent,
   type BaseWorkspaceInfo,
   type Branding,
   type BrandingMap,
@@ -40,6 +40,7 @@ import { unknownError, type Status } from '@hcengineering/platform'
 import { type HelloRequest, type HelloResponse, type Request, type Response } from '@hcengineering/rpc'
 import {
   LOGGING_ENABLED,
+  pingConst,
   Pipeline,
   PipelineFactory,
   ServerFactory,
@@ -240,7 +241,7 @@ class TSessionManager implements SessionManager {
             if (s[1].socket.checkState()) {
               s[1].socket.send(
                 workspace.context,
-                { result: 'ping' },
+                { result: pingConst },
                 s[1].session.binaryMode,
                 s[1].session.useCompression
               )
@@ -724,7 +725,8 @@ class TSessionManager implements SessionManager {
         ctx,
         sendError: async (msg, error: Status) => {
           // Assume no error send
-        }
+        },
+        sendPong: () => {}
       }
 
       const status = (await session.findAllRaw(ctx, core.class.UserStatus, { user: user._id }, { limit: 1 }))[0]
@@ -933,7 +935,7 @@ class TSessionManager implements SessionManager {
     ws: ConnectionSocket,
     request: Request<any>,
     workspace: string // wsId, toWorkspaceString()
-  ): void {
+  ): Promise<void> {
     const backupMode = service.getMode() === 'backup'
 
     const userCtx = requestCtx.newChild(
@@ -949,7 +951,7 @@ class TSessionManager implements SessionManager {
     const reqId = generateId()
 
     const st = Date.now()
-    void userCtx
+    return userCtx
       .with(`🧭 ${backupMode ? 'handleBackup' : 'handleRequest'}`, {}, async (ctx) => {
         if (request.time != null) {
           const delta = Date.now() - request.time
@@ -1024,6 +1026,9 @@ class TSessionManager implements SessionManager {
             })
             userCtx.end()
           },
+          sendPong: () => {
+            ws.sendPong()
+          },
           ctx,
           sendError: async (msg, error: Status) => {
             await sendResponse(ctx, service, ws, {
@@ -1137,7 +1142,7 @@ export function startSessionManager (
     shutdown: opt.serverFactory(
       sessions,
       (rctx, service, ws, msg, workspace) => {
-        sessions.handleRequest(rctx, service, ws, msg, workspace)
+        void sessions.handleRequest(rctx, service, ws, msg, workspace)
       },
       ctx,
       opt.pipelineFactory,
diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts
index ddbe35544d..9018a7f476 100644
--- a/server/ws/src/server_http.ts
+++ b/server/ws/src/server_http.ts
@@ -29,6 +29,8 @@ import {
 } from '@hcengineering/server'
 import {
   LOGGING_ENABLED,
+  pingConst,
+  pongConst,
   type ConnectionSocket,
   type HandleRequestFunction,
   type PipelineFactory,
@@ -552,9 +554,18 @@ function createWebsocketClientSocket (
       return true
     },
     readRequest: (buffer: Buffer, binary: boolean) => {
+      if (buffer.length === pingConst.length && buffer.toString() === pingConst) {
+        return { method: pingConst, params: [], id: -1, time: Date.now() }
+      }
       return rpcHandler.readRequest(buffer, binary)
     },
     data: () => data,
+    sendPong: () => {
+      if (ws.readyState !== ws.OPEN || cs.isClosed) {
+        return
+      }
+      ws.send(pongConst)
+    },
     send: (ctx: MeasureContext, msg, binary, compression) => {
       const smsg = rpcHandler.serialize(msg, binary)
 
diff --git a/workers/transactor/src/transactor.ts b/workers/transactor/src/transactor.ts
index ff2bd652fe..17710ac387 100644
--- a/workers/transactor/src/transactor.ts
+++ b/workers/transactor/src/transactor.ts
@@ -21,7 +21,9 @@ import {
   createDummyStorageAdapter,
   initStatisticsContext,
   loadBrandingMap,
+  pingConst,
   Pipeline,
+  pongConst,
   Session,
   type ConnectionSocket,
   type PipelineFactory,
@@ -48,7 +50,6 @@ export const PREFERRED_SAVE_SIZE = 500
 export const PREFERRED_SAVE_INTERVAL = 30 * 1000
 
 export class Transactor extends DurableObject<Env> {
-  rpcHandler = new RPCHandler()
   private workspace: string = ''
 
   private sessionManager!: SessionManager
@@ -72,6 +73,8 @@ export class Transactor extends DurableObject<Env> {
     registerServerPlugins()
     this.accountsUrl = env.ACCOUNTS_URL ?? 'http://127.0.0.1:3000'
 
+    this.ctx.setWebSocketAutoResponse(new WebSocketRequestResponsePair(pingConst, pongConst))
+
     this.measureCtx = this.measureCtx = initStatisticsContext('cloud-transactor', {
       statsUrl: this.env.STATS_URL ?? 'http://127.0.0.1:4900',
       serviceName: () => 'cloud-transactor: ' + this.workspace
@@ -79,7 +82,8 @@ export class Transactor extends DurableObject<Env> {
 
     setMetadata(serverPlugin.metadata.Secret, env.SERVER_SECRET ?? 'secret')
 
-    console.log(`Connecting DB to ${env.DB_URL !== '' ? 'Direct ' : 'Hyperdrive'}`)
+    console.log({ message: 'Connecting DB', mode: env.DB_URL !== '' ? 'Direct ' : 'Hyperdrive' })
+    console.log({ message: 'use stats: ' + (this.env.STATS_URL ?? 'http://127.0.0.1:4900') })
 
     // TODO:
     const storage = createDummyStorageAdapter()
@@ -157,7 +161,13 @@ export class Transactor extends DurableObject<Env> {
         s.context.measure('receive-data', buff?.length ?? 0)
         // processRequest(s.session, cs, s.context, s.workspaceId, buff, handleRequest)
         const request = cs.readRequest(buff, s.session.binaryMode)
-        this.sessionManager.handleRequest(this.measureCtx, s.session, cs, request, this.workspace)
+        console.log({
+          message: 'handle-request',
+          method: request.method,
+          workspace: s.workspaceId,
+          user: s.session.getUser()
+        })
+        this.ctx.waitUntil(this.sessionManager.handleRequest(this.measureCtx, s.session, cs, request, this.workspace))
       },
       typeof message === 'string' ? Buffer.from(message) : Buffer.from(message)
     )
@@ -168,7 +178,9 @@ export class Transactor extends DurableObject<Env> {
     await this.handleClose(ws, 1011, 'error')
   }
 
-  async alarm (): Promise<void> {}
+  async alarm (): Promise<void> {
+    console.log({ message: 'alarm' })
+  }
 
   async handleSession (
     ws: WebSocket,
@@ -238,6 +250,7 @@ export class Transactor extends DurableObject<Env> {
       model: any
     }
   ): ConnectionSocket {
+    const rpcHandler = new RPCHandler()
     const cs: ConnectionSocket = {
       id: generateId(),
       isClosed: false,
@@ -253,23 +266,35 @@ export class Transactor extends DurableObject<Env> {
         return true
       },
       readRequest: (buffer: Buffer, binary: boolean) => {
-        return this.rpcHandler.readRequest(buffer, binary)
+        if (buffer.length === pingConst.length) {
+          if (buffer.toString() === pingConst) {
+            return { method: pingConst, params: [], id: -1, time: Date.now() }
+          }
+        }
+        return rpcHandler.readRequest(buffer, binary)
       },
       data: () => data,
       send: (ctx: MeasureContext, msg, binary, compression) => {
-        const smsg = this.rpcHandler.serialize(msg, binary)
+        const smsg = rpcHandler.serialize(msg, binary)
 
         ctx.measure('send-data', smsg.length)
         if (ws.readyState !== WebSocket.OPEN || cs.isClosed) {
           return
         }
         ws.send(smsg)
+      },
+      sendPong: () => {
+        if (ws.readyState !== WebSocket.OPEN || cs.isClosed) {
+          return
+        }
+        ws.send(pongConst)
       }
     }
     return cs
   }
 
   async broadcastMessage (message: Uint8Array, origin?: any): Promise<void> {
+    console.log({ message: 'broadcast' })
     const wss = this.ctx.getWebSockets().filter((ws) => ws.readyState === WebSocket.OPEN)
     await Promise.all(
       wss.map(async (ws) => {
@@ -315,7 +340,8 @@ export class Transactor extends DurableObject<Env> {
       data: () => {
         return {}
       },
-      send: (ctx: MeasureContext, msg, binary, compression) => {}
+      send: (ctx: MeasureContext, msg, binary, compression) => {},
+      sendPong: () => {}
     }
     return cs
   }
@@ -380,6 +406,9 @@ export class Transactor extends DurableObject<Env> {
         // it just logs them to console and return an empty result
         sendError: async (msg, error) => {
           result = { error: `${msg}`, status: `${error}` }
+        },
+        sendPong: () => {
+          cs.sendPong()
         }
       }
       await session.tx(sessionCtx, tx)
@@ -411,7 +440,8 @@ export class Transactor extends DurableObject<Env> {
         },
         sendError: async (msg, error) => {
           result = { error: `${msg}`, status: `${error}` }
-        }
+        },
+        sendPong: () => {}
       }
       await (session as any).getAccount(sessionCtx)
     } catch (error: any) {