From 9c460d63880d15c96938fcf04fc6fa4540e4f702 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Wed, 26 Feb 2025 15:13:24 +0700 Subject: [PATCH] UBERF-9522: Fix memory backpressure (#8098) Signed-off-by: Andrey Sobolev --- .vscode/launch.json | 2 +- dev/tool/src/benchmark.ts | 105 +++++++++------ .../src/components/AdminWorkspaces.svelte | 14 +- pods/stats/src/stats.ts | 2 +- server/account-service/src/index.ts | 9 +- server/core/src/base.ts | 10 +- server/core/src/stats.ts | 17 ++- server/core/src/types.ts | 5 +- server/front/src/index.ts | 13 +- server/middleware/src/domainFind.ts | 8 +- server/middleware/src/queryJoin.ts | 122 ++++++++++-------- .../middleware/src/tests/queryJoiner.spec.ts | 9 +- server/server/src/blobs.ts | 9 +- server/server/src/client.ts | 4 +- server/server/src/sessionManager.ts | 33 +++-- server/server/src/stats.ts | 2 + server/server/src/utils.ts | 3 +- server/ws/src/rpc.ts | 6 +- server/ws/src/server_http.ts | 78 +++++++---- workers/transactor/src/transactor.ts | 10 +- ws-tests/tool.sh | 2 +- 21 files changed, 298 insertions(+), 165 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 77330c4067..d0977e7b7c 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -91,7 +91,7 @@ "FRONT_URL": "http://localhost:8083", "ACCOUNTS_URL": "http://localhost:3003", "MODEL_JSON": "${workspaceRoot}/models/all/bundle/model.json", - "MODEL_VERSION": "0.6.435", + "MODEL_VERSION": "0.6.436", "STATS_URL": "http://host.docker.internal:4901" }, "runtimeArgs": ["--nolazy", "-r", "ts-node/register"], diff --git a/dev/tool/src/benchmark.ts b/dev/tool/src/benchmark.ts index 7143746c0e..8d22dcb6e7 100644 --- a/dev/tool/src/benchmark.ts +++ b/dev/tool/src/benchmark.ts @@ -135,6 +135,9 @@ export async function benchmark ( } } }) + worker.on('error', (err) => { + console.error('worker error', err) + }) }) const m = newMetrics() @@ -147,6 +150,9 @@ export async function benchmark ( moment: number mem: number memTotal: number + memRSS: number + memFree: number + memArrays: number cpu: number requestTime: number operations: number @@ -158,6 +164,9 @@ export async function benchmark ( moment: 'Moment Time', mem: 'Mem', memTotal: 'Mem total', + memRSS: 'Mem RSS', + memFree: 'Mem Free', + memArrays: 'Mem Arrays', cpu: 'CPU', requestTime: 'Request time', operations: 'OPS', @@ -170,6 +179,9 @@ export async function benchmark ( let cpu: number = 0 let memUsed: number = 0 let memTotal: number = 0 + let memRSS: number = 0 + const memFree: number = 0 + let memArrays: number = 0 let elapsed = 0 let requestTime: number = 0 let operations = 0 @@ -204,6 +216,7 @@ export async function benchmark ( } } if (!found) { + console.log('no measurements found for path', path, p) return null } } @@ -211,47 +224,60 @@ export async function benchmark ( } let timer: any + let p: Promise | undefined if (isMainThread && monitorConnection !== undefined) { timer = setInterval(() => { - const st = Date.now() + const st = performance.now() try { - const fetchUrl = endpoint.replace('ws:/', 'http:/') + '/api/v1/statistics?token=' + token - void fetch(fetchUrl) - .then((res) => { - void res - .json() - .then((json) => { - memUsed = json.statistics.memoryUsed - memTotal = json.statistics.memoryTotal - cpu = json.statistics.cpuUsage - // operations = 0 - requestTime = 0 - // transfer = 0 - const r = extract( - json.metrics as Metrics, - '🧲 session', - 'client', - 'handleRequest', - 'process', - 'find-all' - ) - operations = (r?.operations ?? 0) - oldOperations - oldOperations = r?.operations ?? 0 - - requestTime = (r?.value ?? 0) / (((r?.operations as number) ?? 0) + 1) - - const tr = extract(json.metrics as Metrics, '🧲 session', '#send-data') - transfer = (tr?.value ?? 0) - oldTransfer - oldTransfer = tr?.value ?? 0 - }) - .catch((err) => { - console.log(err) - }) - }) - .catch((err) => { - console.log(err) + const fetchUrl = endpoint.replace('ws:/', 'http:/') + '/api/v1/statistics' + if (p === undefined) { + p = fetch(fetchUrl, { + headers: { + Authorization: 'Bearer ' + token + }, + keepalive: true }) + .then((res) => { + void res + .json() + .then((json) => { + memUsed = json.statistics.memoryUsed + memTotal = json.statistics.memoryTotal + memRSS = json.statistics.memoryRSS + memArrays = json.statistics.memoryArrayBuffers + cpu = json.statistics.cpuUsage + // operations = 0 + requestTime = 0 + // transfer = 0 + const r = extract( + json.metrics as Metrics, + '🧲 session', + 'client', + 'handleRequest', + 'process', + 'find-all' + ) + operations = (r?.operations ?? 0) - oldOperations + oldOperations = r?.operations ?? 0 + + requestTime = (r?.value ?? 0) / (((r?.operations as number) ?? 0) + 1) + + const tr = extract(json.metrics as Metrics, '🧲 session', 'client', '#send-data') + transfer = (tr?.value ?? 0) - oldTransfer + oldTransfer = tr?.value ?? 0 + p = undefined + }) + .catch((err) => { + console.log(err) + p = undefined + }) + }) + .catch((err) => { + console.log(err) + p = undefined + }) + } } catch (err) { console.log(err) } @@ -285,7 +311,10 @@ export async function benchmark ( moment, average: Math.round(opTime / (ops + 1)), mem: memUsed, + memRSS, memTotal, + memFree, + memArrays, cpu, requestTime, operations, @@ -360,7 +389,9 @@ export function benchmarkWorker (): void { if (!isMainThread) { parentPort?.on('message', (msg: StartMessage) => { console.log('starting worker', msg.workId) - void perform(msg) + void perform(msg).catch((err) => { + console.error('failed to perform', err) + }) }) } diff --git a/plugins/login-resources/src/components/AdminWorkspaces.svelte b/plugins/login-resources/src/components/AdminWorkspaces.svelte index 2d91fc4764..a127765c9b 100644 --- a/plugins/login-resources/src/components/AdminWorkspaces.svelte +++ b/plugins/login-resources/src/components/AdminWorkspaces.svelte @@ -12,13 +12,14 @@ type BaseWorkspaceInfo } from '@hcengineering/core' import { getEmbeddedLabel } from '@hcengineering/platform' - import { isAdminUser, MessageBox } from '@hcengineering/presentation' + import { copyTextToClipboard, isAdminUser, MessageBox } from '@hcengineering/presentation' import { Button, ButtonMenu, CheckBox, Expandable, IconArrowRight, + IconCopy, IconOpen, IconStart, IconStop, @@ -383,14 +384,21 @@
- {wsName} -
+
+
+
+ {wsName}
{workspace.createdBy} diff --git a/pods/stats/src/stats.ts b/pods/stats/src/stats.ts index 8a4bce1cac..46bf6a31fe 100644 --- a/pods/stats/src/stats.ts +++ b/pods/stats/src/stats.ts @@ -19,7 +19,7 @@ import Koa from 'koa' import bodyParser from 'koa-bodyparser' import Router from 'koa-router' -const serviceTimeout = 30000 +const serviceTimeout = 5 * 60000 interface ServiceStatisticsEx extends ServiceStatistics { lastUpdate: number // Last updated diff --git a/server/account-service/src/index.ts b/server/account-service/src/index.ts index 884289f5e9..9cb7df7832 100644 --- a/server/account-service/src/index.ts +++ b/server/account-service/src/index.ts @@ -138,7 +138,7 @@ export function serveAccount (measureCtx: MeasureContext, brandings: BrandingMap router.get('/api/v1/statistics', (req, res) => { try { - const token = req.query.token as string + const token = (req.query.token as string) ?? extractToken(req.headers) const payload = decodeToken(token) const admin = payload.extra?.admin === 'true' const data: Record = { @@ -146,8 +146,11 @@ export function serveAccount (measureCtx: MeasureContext, brandings: BrandingMap statistics: {} } data.statistics.totalClients = 0 - data.statistics.memoryUsed = Math.round((process.memoryUsage().heapUsed / 1024 / 1024) * 100) / 100 - data.statistics.memoryTotal = Math.round((process.memoryUsage().heapTotal / 1024 / 1024) * 100) / 100 + const mem = process.memoryUsage() + data.statistics.memoryUsed = Math.round((mem.heapUsed / 1024 / 1024) * 100) / 100 + data.statistics.memoryTotal = Math.round((mem.heapTotal / 1024 / 1024) * 100) / 100 + data.statistics.memoryRSS = Math.round((mem.rss / 1024 / 1024) * 100) / 100 + data.statistics.memoryArrayBuffers = Math.round((mem.arrayBuffers / 1024 / 1024) * 100) / 100 data.statistics.cpuUsage = Math.round(os.loadavg()[0] * 100) / 100 data.statistics.freeMem = Math.round((os.freemem() / 1024 / 1024) * 100) / 100 data.statistics.totalMem = Math.round((os.totalmem() / 1024 / 1024) * 100) / 100 diff --git a/server/core/src/base.ts b/server/core/src/base.ts index 571764cba4..070d1d5cff 100644 --- a/server/core/src/base.ts +++ b/server/core/src/base.ts @@ -57,7 +57,7 @@ export abstract class BaseMiddleware implements Middleware { return this.provideFindAll(ctx, _class, query, options) } - loadModel ( + provideLoadModel ( ctx: MeasureContext, lastModelTx: Timestamp, hash?: string @@ -65,6 +65,14 @@ export abstract class BaseMiddleware implements Middleware { return this.next?.loadModel(ctx, lastModelTx, hash) ?? emptyModelResult } + loadModel ( + ctx: MeasureContext, + lastModelTx: Timestamp, + hash?: string + ): Promise { + return this.provideLoadModel(ctx, lastModelTx, hash) + } + provideGroupBy( ctx: MeasureContext, domain: Domain, diff --git a/server/core/src/stats.ts b/server/core/src/stats.ts index 959155a35f..4f1853dff0 100644 --- a/server/core/src/stats.ts +++ b/server/core/src/stats.ts @@ -7,6 +7,8 @@ import os from 'os' export interface MemoryStatistics { memoryUsed: number memoryTotal: number + + memoryArrayBuffers: number memoryRSS: number freeMem: number totalMem: number @@ -56,6 +58,7 @@ export function getMemoryInfo (): MemoryStatistics { memoryUsed: Math.round((memU.heapUsed / 1024 / 1024) * 100) / 100, memoryRSS: Math.round((memU.rss / 1024 / 1024) * 100) / 100, memoryTotal: Math.round((memU.heapTotal / 1024 / 1024) * 100) / 100, + memoryArrayBuffers: Math.round((memU.arrayBuffers / 1024 / 1024) * 100) / 100, freeMem: Math.round((os.freemem() / 1024 / 1024) * 100) / 100, totalMem: Math.round((os.totalmem() / 1024 / 1024) * 100) / 100 } @@ -103,6 +106,7 @@ export function initStatisticsContext ( let oldMetricsValue = '' const serviceId = encodeURIComponent(os.hostname() + '-' + serviceName) + let prev: Promise | undefined const handleError = (err: any): void => { errorToSend++ if (errorToSend % 2 === 0) { @@ -110,6 +114,7 @@ export function initStatisticsContext ( console.error(err) } } + prev = undefined } const intTimer = setInterval(() => { @@ -128,6 +133,10 @@ export function initStatisticsContext ( } } } + if (prev !== undefined) { + // In case of high load, skip + return + } if (statsUrl !== undefined) { const token = generateToken(systemAccountEmail, { name: '' }, { service: 'true' }) const data: ServiceStatistics = { @@ -140,7 +149,7 @@ export function initStatisticsContext ( const statData = JSON.stringify(data) - void fetch( + prev = fetch( concatLink(statsUrl, '/api/v1/statistics') + `/?token=${encodeURIComponent(token)}&name=${serviceId}`, { method: 'PUT', @@ -149,7 +158,11 @@ export function initStatisticsContext ( }, body: statData } - ).catch(handleError) + ) + .catch(handleError) + .then(() => { + prev = undefined + }) } } catch (err: any) { handleError(err) diff --git a/server/core/src/types.ts b/server/core/src/types.ts index 53e0db939d..af0053c2bd 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -591,13 +591,15 @@ export interface ConnectionSocket { id: string isClosed: boolean close: () => void - send: (ctx: MeasureContext, msg: Response, binary: boolean, compression: boolean) => void + send: (ctx: MeasureContext, msg: Response, binary: boolean, compression: boolean) => Promise sendPong: () => void data: () => Record readRequest: (buffer: Buffer, binary: boolean) => Request + isBackpressure: () => boolean // In bytes + backpressure: (ctx: MeasureContext) => Promise checkState: () => boolean } @@ -715,6 +717,7 @@ export interface SessionManager { createOpContext: ( ctx: MeasureContext, + sendCtx: MeasureContext, pipeline: Pipeline, requestId: Request['id'], service: Session, diff --git a/server/front/src/index.ts b/server/front/src/index.ts index 904c2599c1..95cd2e433e 100644 --- a/server/front/src/index.ts +++ b/server/front/src/index.ts @@ -178,8 +178,8 @@ async function getFile ( etag: stat.etag, 'last-modified': new Date(stat.modifiedOn).toISOString(), 'cache-control': cacheControlValue, - Connection: 'keep-alive', - 'Keep-Alive': 'timeout=5' + connection: 'keep-alive', + 'keep-alive': 'timeout=5, max=1000' }) res.end() return @@ -191,8 +191,8 @@ async function getFile ( etag: stat.etag, 'last-modified': new Date(stat.modifiedOn).toISOString(), 'cache-control': cacheControlValue, - Connection: 'keep-alive', - 'Keep-Alive': 'timeout=5' + connection: 'keep-alive', + 'keep-alive': 'timeout=5, max=1000' }) res.end() return @@ -211,8 +211,8 @@ async function getFile ( Etag: stat.etag, 'Last-Modified': new Date(stat.modifiedOn).toISOString(), 'Cache-Control': cacheControlValue, - Connection: 'keep-alive', - 'Keep-Alive': 'timeout=5' + connection: 'keep-alive', + 'keep-alive': 'timeout=5, max=1000' }) dataStream.pipe(res) @@ -442,6 +442,7 @@ export function start ( if (req.method === 'HEAD') { res.writeHead(200, { 'accept-ranges': 'bytes', + connection: 'keep-alive', 'Keep-Alive': 'timeout=5', 'content-length': blobInfo.size, 'content-security-policy': "default-src 'none';", diff --git a/server/middleware/src/domainFind.ts b/server/middleware/src/domainFind.ts index cf73fdf46b..6678bd2408 100644 --- a/server/middleware/src/domainFind.ts +++ b/server/middleware/src/domainFind.ts @@ -18,6 +18,7 @@ import { type Doc, type DocumentQuery, type Domain, + type FindOptions, type FindResult, type MeasureContext, type Ref, @@ -44,6 +45,11 @@ export class DomainFindMiddleware extends BaseMiddleware implements Middleware { return middleware } + toPrintableOptions (options?: ServerFindOptions): FindOptions { + const { ctx, allowedSpaces, associations, ...opt } = options ?? {} + return opt + } + findAll( ctx: MeasureContext, _class: Ref>, @@ -65,7 +71,7 @@ export class DomainFindMiddleware extends BaseMiddleware implements Middleware { (ctx) => { return this.adapterManager.getAdapter(domain, false).findAll(ctx, _class, query, options) }, - { _class, query, options } + { _class, query, options: this.toPrintableOptions(options) } ) } diff --git a/server/middleware/src/queryJoin.ts b/server/middleware/src/queryJoin.ts index 6cc71787cc..25090fb92b 100644 --- a/server/middleware/src/queryJoin.ts +++ b/server/middleware/src/queryJoin.ts @@ -17,19 +17,23 @@ import { type Class, type Doc, DocumentQuery, - FindOptions, + type Domain, FindResult, + type LoadModelResponse, type MeasureContext, - Ref + Ref, + type SearchOptions, + type SearchQuery, + type SearchResult, + type SessionData, + type Timestamp, + type Tx } from '@hcengineering/core' -import { BaseMiddleware, Middleware, ServerFindOptions, type PipelineContext } from '@hcengineering/server-core' -import { deepEqual } from 'fast-equals' +import { BaseMiddleware, Middleware, type PipelineContext, ServerFindOptions } from '@hcengineering/server-core' interface Query { - _class: Ref> - query: DocumentQuery - result: FindResult | Promise> | undefined - options?: FindOptions + key: string + result: object | Promise | undefined callbacks: number max: number } @@ -37,27 +41,20 @@ interface Query { * @public */ export class QueryJoiner { - private readonly queries: Map>, Query[]> = new Map>, Query[]>() + private readonly queries: Map = new Map() - constructor (readonly _findAll: Middleware['findAll']) {} - - async findAll( - ctx: MeasureContext, - _class: Ref>, - query: DocumentQuery, - options?: ServerFindOptions - ): Promise> { + async query(ctx: MeasureContext, key: string, retrieve: (ctx: MeasureContext) => Promise): Promise { // Will find a query or add + 1 to callbacks - const q = this.findQuery(_class, query, options) ?? this.createQuery(_class, query, options) + const q = this.getQuery(key) try { if (q.result === undefined) { - q.result = this._findAll(ctx, _class, query, options) + q.result = retrieve(ctx) } if (q.result instanceof Promise) { q.result = await q.result } - return q.result as FindResult + return q.result as T } finally { q.callbacks-- @@ -65,46 +62,27 @@ export class QueryJoiner { } } - private findQuery( - _class: Ref>, - query: DocumentQuery, - options?: FindOptions - ): Query | undefined { - const queries = this.queries.get(_class) - if (queries === undefined) return - for (const q of queries) { - if (!deepEqual(query, q.query) || !deepEqual(options, q.options)) { - continue + private getQuery (key: string): Query { + const query = this.queries.get(key) + if (query === undefined) { + const q: Query = { + key, + result: undefined, + callbacks: 1, + max: 1 } - q.callbacks++ - q.max++ + this.queries.set(key, q) return q } - } - private createQuery(_class: Ref>, query: DocumentQuery, options?: FindOptions): Query { - const queries = this.queries.get(_class) ?? [] - const q: Query = { - _class, - query, - result: undefined, - options: options as FindOptions, - callbacks: 1, - max: 1 - } - - queries.push(q) - this.queries.set(_class, queries) - return q + query.callbacks++ + query.max++ + return query } private removeFromQueue (q: Query): void { if (q.callbacks === 0) { - const queries = this.queries.get(q._class) ?? [] - this.queries.set( - q._class, - queries.filter((it) => it !== q) - ) + this.queries.delete(q.key) } } } @@ -117,8 +95,16 @@ export class QueryJoinMiddleware extends BaseMiddleware implements Middleware { private constructor (context: PipelineContext, next?: Middleware) { super(context, next) - this.joiner = new QueryJoiner((ctx, _class, query, options) => { - return this.provideFindAll(ctx, _class, query, options) + this.joiner = new QueryJoiner() + } + + loadModel ( + ctx: MeasureContext, + lastModelTx: Timestamp, + hash?: string + ): Promise { + return this.joiner.query(ctx, `model-${lastModelTx}${hash ?? ''}`, async (ctx) => { + return await this.provideLoadModel(ctx, lastModelTx, hash) }) } @@ -136,7 +122,31 @@ export class QueryJoinMiddleware extends BaseMiddleware implements Middleware { query: DocumentQuery, options?: ServerFindOptions ): Promise> { - // Will find a query or add + 1 to callbacks - return this.joiner.findAll(ctx, _class, query, options) + const opt = { ...options } + delete opt.ctx + return this.joiner.query( + ctx, + `findAll-${_class}-${JSON.stringify(query)}-${JSON.stringify(options)}`, + async (ctx) => { + return await this.provideFindAll(ctx, _class, query, options) + } + ) + } + + groupBy( + ctx: MeasureContext, + domain: Domain, + field: string, + query?: DocumentQuery

+ ): Promise> { + return this.joiner.query(ctx, `groupBy-${domain}-${field}-${JSON.stringify(query ?? {})})`, async (ctx) => { + return await this.provideGroupBy(ctx, domain, field, query) + }) + } + + searchFulltext (ctx: MeasureContext, query: SearchQuery, options: SearchOptions): Promise { + return this.joiner.query(ctx, `searchFulltext-${JSON.stringify(query)}-${JSON.stringify(options)}`, async (ctx) => { + return await this.provideSearchFulltext(ctx, query, options) + }) } } diff --git a/server/middleware/src/tests/queryJoiner.spec.ts b/server/middleware/src/tests/queryJoiner.spec.ts index e6402ec15f..b289cd8850 100644 --- a/server/middleware/src/tests/queryJoiner.spec.ts +++ b/server/middleware/src/tests/queryJoiner.spec.ts @@ -12,14 +12,13 @@ describe('test query joiner', () => { }) return toFindResult([]) } - const joiner = new QueryJoiner(findT) + const joiner = new QueryJoiner() const ctx = new MeasureMetricsContext('test', {}) - const p1 = joiner.findAll(ctx, core.class.Class, {}) - const p2 = joiner.findAll(ctx, core.class.Class, {}) + const p1 = joiner.query(ctx, core.class.Class, (ctx) => findT(ctx, core.class.Class, {})) + const p2 = joiner.query(ctx, core.class.Class, (ctx) => findT(ctx, core.class.Class, {})) await Promise.all([p1, p2]) expect(count).toBe(1) - expect((joiner as any).queries.size).toBe(1) - expect((joiner as any).queries.get(core.class.Class).length).toBe(0) + expect((joiner as any).queries.size).toBe(0) }) }) diff --git a/server/server/src/blobs.ts b/server/server/src/blobs.ts index c55660b3a4..3bf152afb7 100644 --- a/server/server/src/blobs.ts +++ b/server/server/src/blobs.ts @@ -42,6 +42,8 @@ export async function getFile ( res.writeHead(200, { 'Content-Type': stat.contentType, Etag: stat.etag, + connection: 'keep-alive', + 'keep-alive': 'timeout=5, max=1000', 'Last-Modified': new Date(stat.modifiedOn).toISOString(), 'Cache-Control': cacheControlNoCache }) @@ -118,7 +120,9 @@ export async function getFileRange ( if (start >= size) { res.cork(() => { res.writeHead(416, { - 'Content-Range': `bytes */${size}` + 'Content-Range': `bytes */${size}`, + connection: 'keep-alive', + 'keep-alive': 'timeout=5, max=1000' }) res.end() }) @@ -139,9 +143,10 @@ export async function getFileRange ( await new Promise((resolve, reject) => { res.cork(() => { res.writeHead(206, { - Connection: 'keep-alive', 'Content-Range': `bytes ${start}-${end}/${size}`, 'Accept-Ranges': 'bytes', + connection: 'keep-alive', + 'keep-alive': 'timeout=5, max=1000', // 'Content-Length': end - start + 1, 'Content-Type': stat.contentType, Etag: stat.etag, diff --git a/server/server/src/client.ts b/server/server/src/client.ts index 875ad702f4..7bc77f0cab 100644 --- a/server/server/src/client.ts +++ b/server/server/src/client.ts @@ -234,7 +234,7 @@ export class ClientSession implements Session { } } const bevent = createBroadcastEvent(Array.from(classes)) - socket.send( + void socket.send( ctx, { result: [bevent] @@ -243,7 +243,7 @@ export class ClientSession implements Session { this.useCompression ) } else { - socket.send(ctx, { result: tx }, this.binaryMode, this.useCompression) + void socket.send(ctx, { result: tx }, this.binaryMode, this.useCompression) } } diff --git a/server/server/src/sessionManager.ts b/server/server/src/sessionManager.ts index 500eb3f5d8..c4dafb3f87 100644 --- a/server/server/src/sessionManager.ts +++ b/server/server/src/sessionManager.ts @@ -237,7 +237,7 @@ export class TSessionManager implements SessionManager { // And ping other wize s[1].session.lastPing = now if (s[1].socket.checkState()) { - s[1].socket.send( + void s[1].socket.send( workspace.context, { result: pingConst }, s[1].session.binaryMode, @@ -504,7 +504,7 @@ export class TSessionManager implements SessionManager { } if (this.timeMinutes > 0) { - ws.send(ctx, { result: this.createMaintenanceWarning() }, session.binaryMode, session.useCompression) + void ws.send(ctx, { result: this.createMaintenanceWarning() }, session.binaryMode, session.useCompression) } return { session, context: workspace.context, workspaceId: wsString } } @@ -884,7 +884,7 @@ export class TSessionManager implements SessionManager { } private sendUpgrade (ctx: MeasureContext, webSocket: ConnectionSocket, binary: boolean, compression: boolean): void { - webSocket.send( + void webSocket.send( ctx, { result: { @@ -951,6 +951,7 @@ export class TSessionManager implements SessionManager { createOpContext ( ctx: MeasureContext, + sendCtx: MeasureContext, pipeline: Pipeline, requestId: Request['id'], service: Session, @@ -962,7 +963,7 @@ export class TSessionManager implements SessionManager { pipeline, requestId, sendResponse: (reqId, msg) => - sendResponse(ctx, service, ws, { + sendResponse(sendCtx, service, ws, { id: reqId, result: msg, time: Date.now() - st, @@ -973,7 +974,7 @@ export class TSessionManager implements SessionManager { ws.sendPong() }, sendError: (reqId, msg, error: Status) => - sendResponse(ctx, service, ws, { + sendResponse(sendCtx, service, ws, { id: reqId, result: msg, error, @@ -1004,7 +1005,7 @@ export class TSessionManager implements SessionManager { requestCtx.measure('msg-receive-delta', delta) } if (service.workspace.closing !== undefined) { - ws.send( + await ws.send( ctx, { id: request.id, @@ -1033,7 +1034,7 @@ export class TSessionManager implements SessionManager { id: request.id, result: done } - ws.send(ctx, forceCloseResponse, service.binaryMode, service.useCompression) + await ws.send(ctx, forceCloseResponse, service.binaryMode, service.useCompression) return } @@ -1054,16 +1055,20 @@ export class TSessionManager implements SessionManager { try { const params = [...request.params] + if (ws.isBackpressure()) { + await ws.backpressure(ctx) + } + await ctx.with('🧨 process', {}, (callTx) => - f.apply(service, [this.createOpContext(callTx, pipeline, request.id, service, ws), ...params]) + f.apply(service, [this.createOpContext(callTx, userCtx, pipeline, request.id, service, ws), ...params]) ) } catch (err: any) { Analytics.handleError(err) if (LOGGING_ENABLED) { this.ctx.error('error handle request', { error: err, request }) } - ws.send( - ctx, + await ws.send( + userCtx, { id: request.id, error: unknownError(err), @@ -1108,15 +1113,15 @@ export class TSessionManager implements SessionManager { service.workspace.pipeline instanceof Promise ? await service.workspace.pipeline : service.workspace.pipeline try { - const uctx = this.createOpContext(ctx, pipeline, reqId, service, ws) + const uctx = this.createOpContext(ctx, userCtx, pipeline, reqId, service, ws) await operation(uctx) } catch (err: any) { Analytics.handleError(err) if (LOGGING_ENABLED) { this.ctx.error('error handle request', { error: err }) } - ws.send( - ctx, + await ws.send( + userCtx, { id: reqId, error: unknownError(err), @@ -1174,7 +1179,7 @@ export class TSessionManager implements SessionManager { account: service.getRawAccount(pipeline), useCompression: service.useCompression } - ws.send(requestCtx, helloResponse, false, false) + await ws.send(requestCtx, helloResponse, false, false) // We do not need to wait for set-status, just return session to client const _workspace = service.workspace diff --git a/server/server/src/stats.ts b/server/server/src/stats.ts index b77d2c506c..cc7c3ae280 100644 --- a/server/server/src/stats.ts +++ b/server/server/src/stats.ts @@ -43,6 +43,8 @@ export function getStatistics (ctx: MeasureContext, sessions: SessionManager, ad const memU = process.memoryUsage() data.statistics.memoryUsed = Math.round(((memU.heapUsed + memU.rss) / 1024 / 1024) * 100) / 100 data.statistics.memoryTotal = Math.round((memU.heapTotal / 1024 / 1024) * 100) / 100 + data.statistics.memoryRSS = Math.round((memU.rss / 1024 / 1024) * 100) / 100 + data.statistics.memoryArrayBuffers = Math.round((memU.arrayBuffers / 1024 / 1024) * 100) / 100 data.statistics.cpuUsage = Math.round(os.loadavg()[0] * 100) / 100 data.statistics.freeMem = Math.round((os.freemem() / 1024 / 1024) * 100) / 100 data.statistics.totalMem = Math.round((os.totalmem() / 1024 / 1024) * 100) / 100 diff --git a/server/server/src/utils.ts b/server/server/src/utils.ts index 429029d659..fe44d5402e 100644 --- a/server/server/src/utils.ts +++ b/server/server/src/utils.ts @@ -85,6 +85,5 @@ export function sendResponse ( socket: ConnectionSocket, resp: Response ): Promise { - socket.send(ctx, resp, session.binaryMode, session.useCompression) - return Promise.resolve() + return socket.send(ctx, resp, session.binaryMode, session.useCompression) } diff --git a/server/ws/src/rpc.ts b/server/ws/src/rpc.ts index 2d2685ff5f..e633cb0fc8 100644 --- a/server/ws/src/rpc.ts +++ b/server/ws/src/rpc.ts @@ -26,6 +26,7 @@ const sendError = (res: ExpressResponse, code: number, data: any): void => { res.writeHead(code, { 'Content-Type': 'application/json', 'Cache-Control': 'no-cache', + Connection: 'keep-alive', 'keep-alive': 'timeout=5, max=1000' }) res.end(JSON.stringify(data)) @@ -35,6 +36,7 @@ async function sendJson (req: Request, res: ExpressResponse, result: any): Promi const headers: OutgoingHttpHeaders = { 'Content-Type': 'application/json', 'Cache-Control': 'no-cache', + Connection: 'keep-alive', 'keep-alive': 'timeout=5, max=1000' } let body: any = JSON.stringify(result) @@ -173,7 +175,9 @@ function createClosingSocket (rawToken: string, rpcSessions: Map { rpcSessions.delete(rawToken) }, - send: (ctx, msg, binary, compression) => {}, + send: async (ctx, msg, binary, compression) => {}, + isBackpressure: () => false, + backpressure: async (ctx) => {}, sendPong: () => {}, data: () => ({}), readRequest: (buffer, binary) => ({ method: '', params: [], id: -1, time: Date.now() }), diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index 82609f67d5..d9961c4806 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -50,8 +50,12 @@ import 'utf-8-validate' import { registerRPC } from './rpc' import { retrieveJson } from './utils' +import { setImmediate } from 'timers/promises' + let profiling = false const rpcHandler = new RPCHandler() + +const backpressureSize = 100 * 1024 /** * @public * @param sessionFactory - @@ -81,7 +85,11 @@ export function startHttpServer ( const getUsers = (): any => Array.from(sessions.sessions.entries()).map(([k, v]) => v.session.getUser()) app.get('/api/v1/version', (req, res) => { - res.writeHead(200, { 'Content-Type': 'application/json' }) + res.writeHead(200, { + 'Content-Type': 'application/json', + Connection: 'keep-alive', + 'keep-alive': 'timeout=5, max=1000' + }) res.end( JSON.stringify({ version: process.env.MODEL_VERSION @@ -91,7 +99,7 @@ export function startHttpServer ( app.get('/api/v1/statistics', (req, res) => { try { - const token = req.query.token as string + const token = (req.query.token as string) ?? (req.headers.authorization ?? '').split(' ')[1] const payload = decodeToken(token) const admin = payload.extra?.admin === 'true' const jsonData = { @@ -101,7 +109,11 @@ export function startHttpServer ( profiling } const json = JSON.stringify(jsonData) - res.writeHead(200, { 'Content-Type': 'application/json' }) + res.writeHead(200, { + 'Content-Type': 'application/json', + Connection: 'keep-alive', + 'keep-alive': 'timeout=5, max=1000' + }) res.end(json) } catch (err: any) { Analytics.handleError(err) @@ -130,7 +142,7 @@ export function startHttpServer ( }) app.put('/api/v1/manage', (req, res) => { try { - const token = req.query.token as string + const token = (req.query.token as string) ?? (req.headers.authorization ?? '').split(' ')[1] const payload = decodeToken(token) if (payload.extra?.admin !== 'true' && payload.email !== systemAccountEmail) { console.warn('Non admin attempt to maintenance action', { payload }) @@ -246,7 +258,11 @@ export function startHttpServer ( { file: name, contentType } ) .then(() => { - res.writeHead(200, { 'Cache-Control': 'no-cache' }) + res.writeHead(200, { + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'keep-alive': 'timeout=5, max=1000' + }) res.end() }) .catch((err) => { @@ -373,7 +389,7 @@ export function startHttpServer ( void webSocketData.session.then((s) => { if ('error' in s) { if (s.specialError === 'archived') { - cs.send( + void cs.send( ctx, { id: -1, @@ -386,7 +402,7 @@ export function startHttpServer ( false ) } else if (s.specialError === 'migration') { - cs.send( + void cs.send( ctx, { id: -1, @@ -399,7 +415,7 @@ export function startHttpServer ( false ) } else { - cs.send( + void cs.send( ctx, { id: -1, error: unknownStatus(s.error.message ?? 'Unknown error'), terminate: s.terminate }, false, @@ -412,7 +428,7 @@ export function startHttpServer ( }, 1000) } if ('upgrade' in s) { - cs.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false) + void cs.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false) setTimeout(() => { cs.close() }, 5000) @@ -557,6 +573,17 @@ function createWebsocketClientSocket ( ws.close() ws.terminate() }, + isBackpressure: () => ws.bufferedAmount > backpressureSize, + backpressure: async (ctx) => { + if (ws.bufferedAmount < backpressureSize) { + return + } + await ctx.with('backpressure', {}, async () => { + while (ws.bufferedAmount > backpressureSize) { + await setImmediate() + } + }) + }, checkState: () => { if (ws.readyState === ws.CLOSED || ws.readyState === ws.CLOSING) { ws.terminate() @@ -577,7 +604,7 @@ function createWebsocketClientSocket ( } ws.send(pongConst) }, - send: (ctx: MeasureContext, msg, binary, _compression) => { + send: async (ctx: MeasureContext, msg, binary, _compression): Promise => { const smsg = rpcHandler.serialize(msg, binary) ctx.measure('send-data', smsg.length) @@ -586,23 +613,28 @@ function createWebsocketClientSocket ( return } - const handleErr = (err?: Error): void => { - ctx.measure('msg-send-delta', Date.now() - st) - if (err != null) { - if (!`${err.message}`.includes('WebSocket is not open')) { - ctx.error('send error', { err }) - Analytics.handleError(err) - } - } + // We need to be sure all data is send before we will send more. + if (cs.isBackpressure()) { + await cs.backpressure(ctx) } + let sendMsg = smsg if (_compression) { - void compress(smsg).then((msg: any) => { - ws.send(msg, { binary: true }, handleErr) - }) - } else { - ws.send(smsg, { binary: true }, handleErr) + sendMsg = await compress(smsg) } + await new Promise((resolve) => { + const handleErr = (err?: Error): void => { + ctx.measure('msg-send-delta', Date.now() - st) + if (err != null) { + if (!`${err.message}`.includes('WebSocket is not open')) { + ctx.error('send error', { err }) + Analytics.handleError(err) + } + } + resolve() // In any case we need to resolve. + } + ws.send(sendMsg, { binary: true }, handleErr) + }) } } return cs diff --git a/workers/transactor/src/transactor.ts b/workers/transactor/src/transactor.ts index 1b516c265e..ebad370d06 100644 --- a/workers/transactor/src/transactor.ts +++ b/workers/transactor/src/transactor.ts @@ -300,7 +300,7 @@ export class Transactor extends DurableObject { throw session.error } if ('upgrade' in session) { - cs.send( + await cs.send( this.measureCtx, { id: -1, result: { state: 'upgrading', stats: (session as any).upgradeInfo } }, false, @@ -352,6 +352,8 @@ export class Transactor extends DurableObject { } return true }, + backpressure: async (ctx) => {}, + isBackpressure: () => false, readRequest: (buffer: Buffer, binary: boolean) => { if (buffer.length === pingConst.length) { if (buffer.toString() === pingConst) { @@ -361,7 +363,7 @@ export class Transactor extends DurableObject { return rpcHandler.readRequest(buffer, binary) }, data: () => data, - send: (ctx: MeasureContext, msg, binary, _compression) => { + send: async (ctx: MeasureContext, msg, binary, _compression) => { let smsg = rpcHandler.serialize(msg, binary) ctx.measure('send-data', smsg.length) @@ -435,7 +437,9 @@ export class Transactor extends DurableObject { data: () => { return {} }, - send: (ctx: MeasureContext, msg, binary, compression) => {}, + isBackpressure: () => false, + backpressure: async (ctx) => {}, + send: async (ctx: MeasureContext, msg, binary, compression) => {}, sendPong: () => {} } return cs diff --git a/ws-tests/tool.sh b/ws-tests/tool.sh index 779df41367..c5003d0c73 100755 --- a/ws-tests/tool.sh +++ b/ws-tests/tool.sh @@ -13,4 +13,4 @@ export ELASTIC_URL=http://localhost:9201 export SERVER_SECRET=secret export DB_URL=postgresql://root@localhost:26258/defaultdb?sslmode=disable -node ${TOOL_OPTIONS} ../dev/tool/bundle/bundle.js $@ \ No newline at end of file +node ${TOOL_OPTIONS} --max-old-space-size=8096 ../dev/tool/bundle/bundle.js $@ \ No newline at end of file