Merge remote-tracking branch 'origin/develop' into staging

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-12-23 13:26:40 +07:00
commit 0829555939
No known key found for this signature in database
GPG Key ID: BD80F68D68D8F7F2
7 changed files with 126 additions and 36 deletions

View File

@ -15,7 +15,13 @@
// //
import { Analytics } from '@hcengineering/analytics' import { Analytics } from '@hcengineering/analytics'
import client, { ClientSocket, ClientSocketReadyState, type ClientFactoryOptions } from '@hcengineering/client' import client, {
ClientSocket,
ClientSocketReadyState,
pingConst,
pongConst,
type ClientFactoryOptions
} from '@hcengineering/client'
import core, { import core, {
Account, Account,
Class, Class,
@ -160,7 +166,7 @@ class Connection implements ClientConnection {
if (!this.closed) { if (!this.closed) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises // eslint-disable-next-line @typescript-eslint/no-floating-promises
void this.sendRequest({ void this.sendRequest({
method: 'ping', method: pingConst,
params: [], params: [],
once: true, once: true,
handleResult: async (result) => { handleResult: async (result) => {
@ -317,8 +323,8 @@ class Connection implements ClientConnection {
} }
return return
} }
if (resp.result === 'ping') { if (resp.result === pingConst) {
void this.sendRequest({ method: 'ping', params: [] }) void this.sendRequest({ method: pingConst, params: [] })
return return
} }
if (resp.id !== undefined) { if (resp.id !== undefined) {
@ -461,6 +467,27 @@ class Connection implements ClientConnection {
if (this.websocket !== wsocket) { if (this.websocket !== wsocket) {
return return
} }
if (event.data === pongConst) {
this.pingResponse = Date.now()
return
}
if (event.data === pingConst) {
void this.sendRequest({ method: pingConst, params: [] })
return
}
if (
event.data instanceof ArrayBuffer &&
(event.data.byteLength === pingConst.length || event.data.byteLength === pongConst.length)
) {
const text = new TextDecoder().decode(event.data)
if (text === pingConst) {
void this.sendRequest({ method: pingConst, params: [] })
}
if (text === pongConst) {
this.pingResponse = Date.now()
}
return
}
if (event.data instanceof Blob) { if (event.data instanceof Blob) {
void event.data.arrayBuffer().then((data) => { void event.data.arrayBuffer().then((data) => {
const resp = this.rpcHandler.readResponse<any>(data, this.binaryMode) const resp = this.rpcHandler.readResponse<any>(data, this.binaryMode)
@ -546,23 +573,30 @@ class Connection implements ClientConnection {
if (w instanceof Promise) { if (w instanceof Promise) {
await w await w
} }
this.requests.set(id, promise) if (data.method !== pingConst) {
this.requests.set(id, promise)
}
const sendData = (): void => { const sendData = (): void => {
if (this.websocket?.readyState === ClientSocketReadyState.OPEN) { if (this.websocket?.readyState === ClientSocketReadyState.OPEN) {
promise.startTime = Date.now() promise.startTime = Date.now()
const dta = ctx.withSync('serialize', {}, () => if (data.method !== pingConst) {
this.rpcHandler.serialize( const dta = ctx.withSync('serialize', {}, () =>
{ this.rpcHandler.serialize(
method: data.method, {
params: data.params, method: data.method,
id, params: data.params,
time: Date.now() id,
}, time: Date.now()
this.binaryMode },
this.binaryMode
)
) )
)
ctx.withSync('send-data', {}, () => this.websocket?.send(dta)) ctx.withSync('send-data', {}, () => this.websocket?.send(dta))
} else {
this.websocket?.send(pingConst)
}
} }
} }
if (data.allowReconnect ?? true) { if (data.allowReconnect ?? true) {
@ -579,7 +613,9 @@ class Connection implements ClientConnection {
sendData() sendData()
}) })
void ctx.with('broadcast-event', {}, () => broadcastEvent(client.event.NetworkRequests, this.requests.size)) void ctx.with('broadcast-event', {}, () => broadcastEvent(client.event.NetworkRequests, this.requests.size))
return await promise.promise if (data.method !== pingConst) {
return await promise.promise
}
}) })
} }

View File

@ -77,6 +77,9 @@ export type ClientFactory = (token: string, endpoint: string, opt?: ClientFactor
// ui - will filter out all server element's and all UI disabled elements. // ui - will filter out all server element's and all UI disabled elements.
export type FilterMode = 'none' | 'client' | 'ui' export type FilterMode = 'none' | 'client' | 'ui'
export const pingConst = 'ping'
export const pongConst = 'pong!'
export default plugin(clientId, { export default plugin(clientId, {
metadata: { metadata: {
ClientSocketFactory: '' as Metadata<ClientSocketFactory>, ClientSocketFactory: '' as Metadata<ClientSocketFactory>,

View File

@ -497,6 +497,7 @@ export interface SessionRequest {
export interface ClientSessionCtx { export interface ClientSessionCtx {
ctx: MeasureContext ctx: MeasureContext
sendResponse: (msg: any) => Promise<void> sendResponse: (msg: any) => Promise<void>
sendPong: () => void
sendError: (msg: any, error: any) => Promise<void> sendError: (msg: any, error: any) => Promise<void>
} }
@ -553,6 +554,8 @@ export interface ConnectionSocket {
isClosed: boolean isClosed: boolean
close: () => void close: () => void
send: (ctx: MeasureContext, msg: Response<any>, binary: boolean, compression: boolean) => void send: (ctx: MeasureContext, msg: Response<any>, binary: boolean, compression: boolean) => void
sendPong: () => void
data: () => Record<string, any> data: () => Record<string, any>
readRequest: (buffer: Buffer, binary: boolean) => Request<any> readRequest: (buffer: Buffer, binary: boolean) => Request<any>
@ -664,7 +667,7 @@ export interface SessionManager {
ws: ConnectionSocket, ws: ConnectionSocket,
request: Request<any>, request: Request<any>,
workspace: string // wsId, toWorkspaceString() workspace: string // wsId, toWorkspaceString()
) => void ) => Promise<void>
} }
/** /**
@ -692,3 +695,6 @@ export type ServerFactory = (
accountsUrl: string, accountsUrl: string,
externalStorage: StorageAdapter externalStorage: StorageAdapter
) => () => Promise<void> ) => () => Promise<void>
export const pingConst = 'ping'
export const pongConst = 'pong!'

View File

@ -39,8 +39,8 @@ import core, {
import { PlatformError, unknownError } from '@hcengineering/platform' import { PlatformError, unknownError } from '@hcengineering/platform'
import { import {
BackupClientOps, BackupClientOps,
SessionDataImpl,
createBroadcastEvent, createBroadcastEvent,
SessionDataImpl,
type ClientSessionCtx, type ClientSessionCtx,
type ConnectionSocket, type ConnectionSocket,
type Pipeline, type Pipeline,
@ -98,9 +98,8 @@ export class ClientSession implements Session {
} }
async ping (ctx: ClientSessionCtx): Promise<void> { async ping (ctx: ClientSessionCtx): Promise<void> {
// console.log('ping')
this.lastRequest = Date.now() this.lastRequest = Date.now()
await ctx.sendResponse('pong!') ctx.sendPong()
} }
async loadModel (ctx: ClientSessionCtx, lastModelTx: Timestamp, hash?: string): Promise<void> { async loadModel (ctx: ClientSessionCtx, lastModelTx: Timestamp, hash?: string): Promise<void> {

View File

@ -15,8 +15,6 @@
import { Analytics } from '@hcengineering/analytics' import { Analytics } from '@hcengineering/analytics'
import core, { import core, {
TxFactory,
WorkspaceEvent,
cutObjectArray, cutObjectArray,
generateId, generateId,
isArchivingMode, isArchivingMode,
@ -25,8 +23,10 @@ import core, {
isWorkspaceCreating, isWorkspaceCreating,
systemAccountEmail, systemAccountEmail,
toWorkspaceString, toWorkspaceString,
TxFactory,
versionToString, versionToString,
withContext, withContext,
WorkspaceEvent,
type BaseWorkspaceInfo, type BaseWorkspaceInfo,
type Branding, type Branding,
type BrandingMap, type BrandingMap,
@ -40,6 +40,7 @@ import { unknownError, type Status } from '@hcengineering/platform'
import { type HelloRequest, type HelloResponse, type Request, type Response } from '@hcengineering/rpc' import { type HelloRequest, type HelloResponse, type Request, type Response } from '@hcengineering/rpc'
import { import {
LOGGING_ENABLED, LOGGING_ENABLED,
pingConst,
Pipeline, Pipeline,
PipelineFactory, PipelineFactory,
ServerFactory, ServerFactory,
@ -240,7 +241,7 @@ class TSessionManager implements SessionManager {
if (s[1].socket.checkState()) { if (s[1].socket.checkState()) {
s[1].socket.send( s[1].socket.send(
workspace.context, workspace.context,
{ result: 'ping' }, { result: pingConst },
s[1].session.binaryMode, s[1].session.binaryMode,
s[1].session.useCompression s[1].session.useCompression
) )
@ -724,7 +725,8 @@ class TSessionManager implements SessionManager {
ctx, ctx,
sendError: async (msg, error: Status) => { sendError: async (msg, error: Status) => {
// Assume no error send // Assume no error send
} },
sendPong: () => {}
} }
const status = (await session.findAllRaw(ctx, core.class.UserStatus, { user: user._id }, { limit: 1 }))[0] const status = (await session.findAllRaw(ctx, core.class.UserStatus, { user: user._id }, { limit: 1 }))[0]
@ -933,7 +935,7 @@ class TSessionManager implements SessionManager {
ws: ConnectionSocket, ws: ConnectionSocket,
request: Request<any>, request: Request<any>,
workspace: string // wsId, toWorkspaceString() workspace: string // wsId, toWorkspaceString()
): void { ): Promise<void> {
const backupMode = service.getMode() === 'backup' const backupMode = service.getMode() === 'backup'
const userCtx = requestCtx.newChild( const userCtx = requestCtx.newChild(
@ -949,7 +951,7 @@ class TSessionManager implements SessionManager {
const reqId = generateId() const reqId = generateId()
const st = Date.now() const st = Date.now()
void userCtx return userCtx
.with(`🧭 ${backupMode ? 'handleBackup' : 'handleRequest'}`, {}, async (ctx) => { .with(`🧭 ${backupMode ? 'handleBackup' : 'handleRequest'}`, {}, async (ctx) => {
if (request.time != null) { if (request.time != null) {
const delta = Date.now() - request.time const delta = Date.now() - request.time
@ -1024,6 +1026,9 @@ class TSessionManager implements SessionManager {
}) })
userCtx.end() userCtx.end()
}, },
sendPong: () => {
ws.sendPong()
},
ctx, ctx,
sendError: async (msg, error: Status) => { sendError: async (msg, error: Status) => {
await sendResponse(ctx, service, ws, { await sendResponse(ctx, service, ws, {
@ -1137,7 +1142,7 @@ export function startSessionManager (
shutdown: opt.serverFactory( shutdown: opt.serverFactory(
sessions, sessions,
(rctx, service, ws, msg, workspace) => { (rctx, service, ws, msg, workspace) => {
sessions.handleRequest(rctx, service, ws, msg, workspace) void sessions.handleRequest(rctx, service, ws, msg, workspace)
}, },
ctx, ctx,
opt.pipelineFactory, opt.pipelineFactory,

View File

@ -29,6 +29,8 @@ import {
} from '@hcengineering/server' } from '@hcengineering/server'
import { import {
LOGGING_ENABLED, LOGGING_ENABLED,
pingConst,
pongConst,
type ConnectionSocket, type ConnectionSocket,
type HandleRequestFunction, type HandleRequestFunction,
type PipelineFactory, type PipelineFactory,
@ -552,9 +554,18 @@ function createWebsocketClientSocket (
return true return true
}, },
readRequest: (buffer: Buffer, binary: boolean) => { readRequest: (buffer: Buffer, binary: boolean) => {
if (buffer.length === pingConst.length && buffer.toString() === pingConst) {
return { method: pingConst, params: [], id: -1, time: Date.now() }
}
return rpcHandler.readRequest(buffer, binary) return rpcHandler.readRequest(buffer, binary)
}, },
data: () => data, data: () => data,
sendPong: () => {
if (ws.readyState !== ws.OPEN || cs.isClosed) {
return
}
ws.send(pongConst)
},
send: (ctx: MeasureContext, msg, binary, compression) => { send: (ctx: MeasureContext, msg, binary, compression) => {
const smsg = rpcHandler.serialize(msg, binary) const smsg = rpcHandler.serialize(msg, binary)

View File

@ -21,7 +21,9 @@ import {
createDummyStorageAdapter, createDummyStorageAdapter,
initStatisticsContext, initStatisticsContext,
loadBrandingMap, loadBrandingMap,
pingConst,
Pipeline, Pipeline,
pongConst,
Session, Session,
type ConnectionSocket, type ConnectionSocket,
type PipelineFactory, type PipelineFactory,
@ -48,7 +50,6 @@ export const PREFERRED_SAVE_SIZE = 500
export const PREFERRED_SAVE_INTERVAL = 30 * 1000 export const PREFERRED_SAVE_INTERVAL = 30 * 1000
export class Transactor extends DurableObject<Env> { export class Transactor extends DurableObject<Env> {
rpcHandler = new RPCHandler()
private workspace: string = '' private workspace: string = ''
private sessionManager!: SessionManager private sessionManager!: SessionManager
@ -72,6 +73,8 @@ export class Transactor extends DurableObject<Env> {
registerServerPlugins() registerServerPlugins()
this.accountsUrl = env.ACCOUNTS_URL ?? 'http://127.0.0.1:3000' this.accountsUrl = env.ACCOUNTS_URL ?? 'http://127.0.0.1:3000'
this.ctx.setWebSocketAutoResponse(new WebSocketRequestResponsePair(pingConst, pongConst))
this.measureCtx = this.measureCtx = initStatisticsContext('cloud-transactor', { this.measureCtx = this.measureCtx = initStatisticsContext('cloud-transactor', {
statsUrl: this.env.STATS_URL ?? 'http://127.0.0.1:4900', statsUrl: this.env.STATS_URL ?? 'http://127.0.0.1:4900',
serviceName: () => 'cloud-transactor: ' + this.workspace serviceName: () => 'cloud-transactor: ' + this.workspace
@ -79,7 +82,8 @@ export class Transactor extends DurableObject<Env> {
setMetadata(serverPlugin.metadata.Secret, env.SERVER_SECRET ?? 'secret') setMetadata(serverPlugin.metadata.Secret, env.SERVER_SECRET ?? 'secret')
console.log(`Connecting DB to ${env.DB_URL !== '' ? 'Direct ' : 'Hyperdrive'}`) console.log({ message: 'Connecting DB', mode: env.DB_URL !== '' ? 'Direct ' : 'Hyperdrive' })
console.log({ message: 'use stats: ' + (this.env.STATS_URL ?? 'http://127.0.0.1:4900') })
// TODO: // TODO:
const storage = createDummyStorageAdapter() const storage = createDummyStorageAdapter()
@ -157,7 +161,13 @@ export class Transactor extends DurableObject<Env> {
s.context.measure('receive-data', buff?.length ?? 0) s.context.measure('receive-data', buff?.length ?? 0)
// processRequest(s.session, cs, s.context, s.workspaceId, buff, handleRequest) // processRequest(s.session, cs, s.context, s.workspaceId, buff, handleRequest)
const request = cs.readRequest(buff, s.session.binaryMode) const request = cs.readRequest(buff, s.session.binaryMode)
this.sessionManager.handleRequest(this.measureCtx, s.session, cs, request, this.workspace) console.log({
message: 'handle-request',
method: request.method,
workspace: s.workspaceId,
user: s.session.getUser()
})
this.ctx.waitUntil(this.sessionManager.handleRequest(this.measureCtx, s.session, cs, request, this.workspace))
}, },
typeof message === 'string' ? Buffer.from(message) : Buffer.from(message) typeof message === 'string' ? Buffer.from(message) : Buffer.from(message)
) )
@ -168,7 +178,9 @@ export class Transactor extends DurableObject<Env> {
await this.handleClose(ws, 1011, 'error') await this.handleClose(ws, 1011, 'error')
} }
async alarm (): Promise<void> {} async alarm (): Promise<void> {
console.log({ message: 'alarm' })
}
async handleSession ( async handleSession (
ws: WebSocket, ws: WebSocket,
@ -238,6 +250,7 @@ export class Transactor extends DurableObject<Env> {
model: any model: any
} }
): ConnectionSocket { ): ConnectionSocket {
const rpcHandler = new RPCHandler()
const cs: ConnectionSocket = { const cs: ConnectionSocket = {
id: generateId(), id: generateId(),
isClosed: false, isClosed: false,
@ -253,23 +266,35 @@ export class Transactor extends DurableObject<Env> {
return true return true
}, },
readRequest: (buffer: Buffer, binary: boolean) => { readRequest: (buffer: Buffer, binary: boolean) => {
return this.rpcHandler.readRequest(buffer, binary) if (buffer.length === pingConst.length) {
if (buffer.toString() === pingConst) {
return { method: pingConst, params: [], id: -1, time: Date.now() }
}
}
return rpcHandler.readRequest(buffer, binary)
}, },
data: () => data, data: () => data,
send: (ctx: MeasureContext, msg, binary, compression) => { send: (ctx: MeasureContext, msg, binary, compression) => {
const smsg = this.rpcHandler.serialize(msg, binary) const smsg = rpcHandler.serialize(msg, binary)
ctx.measure('send-data', smsg.length) ctx.measure('send-data', smsg.length)
if (ws.readyState !== WebSocket.OPEN || cs.isClosed) { if (ws.readyState !== WebSocket.OPEN || cs.isClosed) {
return return
} }
ws.send(smsg) ws.send(smsg)
},
sendPong: () => {
if (ws.readyState !== WebSocket.OPEN || cs.isClosed) {
return
}
ws.send(pongConst)
} }
} }
return cs return cs
} }
async broadcastMessage (message: Uint8Array, origin?: any): Promise<void> { async broadcastMessage (message: Uint8Array, origin?: any): Promise<void> {
console.log({ message: 'broadcast' })
const wss = this.ctx.getWebSockets().filter((ws) => ws.readyState === WebSocket.OPEN) const wss = this.ctx.getWebSockets().filter((ws) => ws.readyState === WebSocket.OPEN)
await Promise.all( await Promise.all(
wss.map(async (ws) => { wss.map(async (ws) => {
@ -315,7 +340,8 @@ export class Transactor extends DurableObject<Env> {
data: () => { data: () => {
return {} return {}
}, },
send: (ctx: MeasureContext, msg, binary, compression) => {} send: (ctx: MeasureContext, msg, binary, compression) => {},
sendPong: () => {}
} }
return cs return cs
} }
@ -380,6 +406,9 @@ export class Transactor extends DurableObject<Env> {
// it just logs them to console and return an empty result // it just logs them to console and return an empty result
sendError: async (msg, error) => { sendError: async (msg, error) => {
result = { error: `${msg}`, status: `${error}` } result = { error: `${msg}`, status: `${error}` }
},
sendPong: () => {
cs.sendPong()
} }
} }
await session.tx(sessionCtx, tx) await session.tx(sessionCtx, tx)
@ -411,7 +440,8 @@ export class Transactor extends DurableObject<Env> {
}, },
sendError: async (msg, error) => { sendError: async (msg, error) => {
result = { error: `${msg}`, status: `${error}` } result = { error: `${msg}`, status: `${error}` }
} },
sendPong: () => {}
} }
await (session as any).getAccount(sessionCtx) await (session as any).getAccount(sessionCtx)
} catch (error: any) { } catch (error: any) {