UBERF-9522: Fix memory backpressure (#8098)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2025-02-26 15:13:24 +07:00 committed by GitHub
parent c2f8eaad12
commit 9c460d6388
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 298 additions and 165 deletions

2
.vscode/launch.json vendored
View File

@ -91,7 +91,7 @@
"FRONT_URL": "http://localhost:8083",
"ACCOUNTS_URL": "http://localhost:3003",
"MODEL_JSON": "${workspaceRoot}/models/all/bundle/model.json",
"MODEL_VERSION": "0.6.435",
"MODEL_VERSION": "0.6.436",
"STATS_URL": "http://host.docker.internal:4901"
},
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],

View File

@ -135,6 +135,9 @@ export async function benchmark (
}
}
})
worker.on('error', (err) => {
console.error('worker error', err)
})
})
const m = newMetrics()
@ -147,6 +150,9 @@ export async function benchmark (
moment: number
mem: number
memTotal: number
memRSS: number
memFree: number
memArrays: number
cpu: number
requestTime: number
operations: number
@ -158,6 +164,9 @@ export async function benchmark (
moment: 'Moment Time',
mem: 'Mem',
memTotal: 'Mem total',
memRSS: 'Mem RSS',
memFree: 'Mem Free',
memArrays: 'Mem Arrays',
cpu: 'CPU',
requestTime: 'Request time',
operations: 'OPS',
@ -170,6 +179,9 @@ export async function benchmark (
let cpu: number = 0
let memUsed: number = 0
let memTotal: number = 0
let memRSS: number = 0
const memFree: number = 0
let memArrays: number = 0
let elapsed = 0
let requestTime: number = 0
let operations = 0
@ -204,6 +216,7 @@ export async function benchmark (
}
}
if (!found) {
console.log('no measurements found for path', path, p)
return null
}
}
@ -211,47 +224,60 @@ export async function benchmark (
}
let timer: any
let p: Promise<void> | undefined
if (isMainThread && monitorConnection !== undefined) {
timer = setInterval(() => {
const st = Date.now()
const st = performance.now()
try {
const fetchUrl = endpoint.replace('ws:/', 'http:/') + '/api/v1/statistics?token=' + token
void fetch(fetchUrl)
.then((res) => {
void res
.json()
.then((json) => {
memUsed = json.statistics.memoryUsed
memTotal = json.statistics.memoryTotal
cpu = json.statistics.cpuUsage
// operations = 0
requestTime = 0
// transfer = 0
const r = extract(
json.metrics as Metrics,
'🧲 session',
'client',
'handleRequest',
'process',
'find-all'
)
operations = (r?.operations ?? 0) - oldOperations
oldOperations = r?.operations ?? 0
requestTime = (r?.value ?? 0) / (((r?.operations as number) ?? 0) + 1)
const tr = extract(json.metrics as Metrics, '🧲 session', '#send-data')
transfer = (tr?.value ?? 0) - oldTransfer
oldTransfer = tr?.value ?? 0
})
.catch((err) => {
console.log(err)
})
})
.catch((err) => {
console.log(err)
const fetchUrl = endpoint.replace('ws:/', 'http:/') + '/api/v1/statistics'
if (p === undefined) {
p = fetch(fetchUrl, {
headers: {
Authorization: 'Bearer ' + token
},
keepalive: true
})
.then((res) => {
void res
.json()
.then((json) => {
memUsed = json.statistics.memoryUsed
memTotal = json.statistics.memoryTotal
memRSS = json.statistics.memoryRSS
memArrays = json.statistics.memoryArrayBuffers
cpu = json.statistics.cpuUsage
// operations = 0
requestTime = 0
// transfer = 0
const r = extract(
json.metrics as Metrics,
'🧲 session',
'client',
'handleRequest',
'process',
'find-all'
)
operations = (r?.operations ?? 0) - oldOperations
oldOperations = r?.operations ?? 0
requestTime = (r?.value ?? 0) / (((r?.operations as number) ?? 0) + 1)
const tr = extract(json.metrics as Metrics, '🧲 session', 'client', '#send-data')
transfer = (tr?.value ?? 0) - oldTransfer
oldTransfer = tr?.value ?? 0
p = undefined
})
.catch((err) => {
console.log(err)
p = undefined
})
})
.catch((err) => {
console.log(err)
p = undefined
})
}
} catch (err) {
console.log(err)
}
@ -285,7 +311,10 @@ export async function benchmark (
moment,
average: Math.round(opTime / (ops + 1)),
mem: memUsed,
memRSS,
memTotal,
memFree,
memArrays,
cpu,
requestTime,
operations,
@ -360,7 +389,9 @@ export function benchmarkWorker (): void {
if (!isMainThread) {
parentPort?.on('message', (msg: StartMessage) => {
console.log('starting worker', msg.workId)
void perform(msg)
void perform(msg).catch((err) => {
console.error('failed to perform', err)
})
})
}

View File

@ -12,13 +12,14 @@
type BaseWorkspaceInfo
} from '@hcengineering/core'
import { getEmbeddedLabel } from '@hcengineering/platform'
import { isAdminUser, MessageBox } from '@hcengineering/presentation'
import { copyTextToClipboard, isAdminUser, MessageBox } from '@hcengineering/presentation'
import {
Button,
ButtonMenu,
CheckBox,
Expandable,
IconArrowRight,
IconCopy,
IconOpen,
IconStart,
IconStop,
@ -383,14 +384,21 @@
<div class="flex fs-title cursor-pointer focused-button bordered" id={`${workspace.workspace}`}>
<div class="flex p-2">
<span class="label overflow-label flex-row-center" style:width={'12rem'}>
{wsName}
<div class="ml-1">
<div class="mr-1">
<Button
icon={IconOpen}
size={'small'}
on:click={() => select(workspace.workspaceUrl ?? workspace.workspace)}
/>
</div>
<div class="mr-1">
<Button
icon={IconCopy}
size={'small'}
on:click={() => copyTextToClipboard(workspace.workspace)}
/>
</div>
{wsName}
</span>
<div class="ml-1" style:width={'12rem'}>
{workspace.createdBy}

View File

@ -19,7 +19,7 @@ import Koa from 'koa'
import bodyParser from 'koa-bodyparser'
import Router from 'koa-router'
const serviceTimeout = 30000
const serviceTimeout = 5 * 60000
interface ServiceStatisticsEx extends ServiceStatistics {
lastUpdate: number // Last updated

View File

@ -138,7 +138,7 @@ export function serveAccount (measureCtx: MeasureContext, brandings: BrandingMap
router.get('/api/v1/statistics', (req, res) => {
try {
const token = req.query.token as string
const token = (req.query.token as string) ?? extractToken(req.headers)
const payload = decodeToken(token)
const admin = payload.extra?.admin === 'true'
const data: Record<string, any> = {
@ -146,8 +146,11 @@ export function serveAccount (measureCtx: MeasureContext, brandings: BrandingMap
statistics: {}
}
data.statistics.totalClients = 0
data.statistics.memoryUsed = Math.round((process.memoryUsage().heapUsed / 1024 / 1024) * 100) / 100
data.statistics.memoryTotal = Math.round((process.memoryUsage().heapTotal / 1024 / 1024) * 100) / 100
const mem = process.memoryUsage()
data.statistics.memoryUsed = Math.round((mem.heapUsed / 1024 / 1024) * 100) / 100
data.statistics.memoryTotal = Math.round((mem.heapTotal / 1024 / 1024) * 100) / 100
data.statistics.memoryRSS = Math.round((mem.rss / 1024 / 1024) * 100) / 100
data.statistics.memoryArrayBuffers = Math.round((mem.arrayBuffers / 1024 / 1024) * 100) / 100
data.statistics.cpuUsage = Math.round(os.loadavg()[0] * 100) / 100
data.statistics.freeMem = Math.round((os.freemem() / 1024 / 1024) * 100) / 100
data.statistics.totalMem = Math.round((os.totalmem() / 1024 / 1024) * 100) / 100

View File

@ -57,7 +57,7 @@ export abstract class BaseMiddleware implements Middleware {
return this.provideFindAll(ctx, _class, query, options)
}
loadModel (
provideLoadModel (
ctx: MeasureContext<SessionData>,
lastModelTx: Timestamp,
hash?: string
@ -65,6 +65,14 @@ export abstract class BaseMiddleware implements Middleware {
return this.next?.loadModel(ctx, lastModelTx, hash) ?? emptyModelResult
}
loadModel (
ctx: MeasureContext<SessionData>,
lastModelTx: Timestamp,
hash?: string
): Promise<Tx[] | LoadModelResponse> {
return this.provideLoadModel(ctx, lastModelTx, hash)
}
provideGroupBy<T, P extends Doc>(
ctx: MeasureContext<SessionData>,
domain: Domain,

View File

@ -7,6 +7,8 @@ import os from 'os'
export interface MemoryStatistics {
memoryUsed: number
memoryTotal: number
memoryArrayBuffers: number
memoryRSS: number
freeMem: number
totalMem: number
@ -56,6 +58,7 @@ export function getMemoryInfo (): MemoryStatistics {
memoryUsed: Math.round((memU.heapUsed / 1024 / 1024) * 100) / 100,
memoryRSS: Math.round((memU.rss / 1024 / 1024) * 100) / 100,
memoryTotal: Math.round((memU.heapTotal / 1024 / 1024) * 100) / 100,
memoryArrayBuffers: Math.round((memU.arrayBuffers / 1024 / 1024) * 100) / 100,
freeMem: Math.round((os.freemem() / 1024 / 1024) * 100) / 100,
totalMem: Math.round((os.totalmem() / 1024 / 1024) * 100) / 100
}
@ -103,6 +106,7 @@ export function initStatisticsContext (
let oldMetricsValue = ''
const serviceId = encodeURIComponent(os.hostname() + '-' + serviceName)
let prev: Promise<void> | undefined
const handleError = (err: any): void => {
errorToSend++
if (errorToSend % 2 === 0) {
@ -110,6 +114,7 @@ export function initStatisticsContext (
console.error(err)
}
}
prev = undefined
}
const intTimer = setInterval(() => {
@ -128,6 +133,10 @@ export function initStatisticsContext (
}
}
}
if (prev !== undefined) {
// In case of high load, skip
return
}
if (statsUrl !== undefined) {
const token = generateToken(systemAccountEmail, { name: '' }, { service: 'true' })
const data: ServiceStatistics = {
@ -140,7 +149,7 @@ export function initStatisticsContext (
const statData = JSON.stringify(data)
void fetch(
prev = fetch(
concatLink(statsUrl, '/api/v1/statistics') + `/?token=${encodeURIComponent(token)}&name=${serviceId}`,
{
method: 'PUT',
@ -149,7 +158,11 @@ export function initStatisticsContext (
},
body: statData
}
).catch(handleError)
)
.catch(handleError)
.then(() => {
prev = undefined
})
}
} catch (err: any) {
handleError(err)

View File

@ -591,13 +591,15 @@ export interface ConnectionSocket {
id: string
isClosed: boolean
close: () => void
send: (ctx: MeasureContext, msg: Response<any>, binary: boolean, compression: boolean) => void
send: (ctx: MeasureContext, msg: Response<any>, binary: boolean, compression: boolean) => Promise<void>
sendPong: () => void
data: () => Record<string, any>
readRequest: (buffer: Buffer, binary: boolean) => Request<any>
isBackpressure: () => boolean // In bytes
backpressure: (ctx: MeasureContext) => Promise<void>
checkState: () => boolean
}
@ -715,6 +717,7 @@ export interface SessionManager {
createOpContext: (
ctx: MeasureContext,
sendCtx: MeasureContext,
pipeline: Pipeline,
requestId: Request<any>['id'],
service: Session,

View File

@ -178,8 +178,8 @@ async function getFile (
etag: stat.etag,
'last-modified': new Date(stat.modifiedOn).toISOString(),
'cache-control': cacheControlValue,
Connection: 'keep-alive',
'Keep-Alive': 'timeout=5'
connection: 'keep-alive',
'keep-alive': 'timeout=5, max=1000'
})
res.end()
return
@ -191,8 +191,8 @@ async function getFile (
etag: stat.etag,
'last-modified': new Date(stat.modifiedOn).toISOString(),
'cache-control': cacheControlValue,
Connection: 'keep-alive',
'Keep-Alive': 'timeout=5'
connection: 'keep-alive',
'keep-alive': 'timeout=5, max=1000'
})
res.end()
return
@ -211,8 +211,8 @@ async function getFile (
Etag: stat.etag,
'Last-Modified': new Date(stat.modifiedOn).toISOString(),
'Cache-Control': cacheControlValue,
Connection: 'keep-alive',
'Keep-Alive': 'timeout=5'
connection: 'keep-alive',
'keep-alive': 'timeout=5, max=1000'
})
dataStream.pipe(res)
@ -442,6 +442,7 @@ export function start (
if (req.method === 'HEAD') {
res.writeHead(200, {
'accept-ranges': 'bytes',
connection: 'keep-alive',
'Keep-Alive': 'timeout=5',
'content-length': blobInfo.size,
'content-security-policy': "default-src 'none';",

View File

@ -18,6 +18,7 @@ import {
type Doc,
type DocumentQuery,
type Domain,
type FindOptions,
type FindResult,
type MeasureContext,
type Ref,
@ -44,6 +45,11 @@ export class DomainFindMiddleware extends BaseMiddleware implements Middleware {
return middleware
}
toPrintableOptions (options?: ServerFindOptions<Doc>): FindOptions<Doc> {
const { ctx, allowedSpaces, associations, ...opt } = options ?? {}
return opt
}
findAll<T extends Doc>(
ctx: MeasureContext,
_class: Ref<Class<T>>,
@ -65,7 +71,7 @@ export class DomainFindMiddleware extends BaseMiddleware implements Middleware {
(ctx) => {
return this.adapterManager.getAdapter(domain, false).findAll(ctx, _class, query, options)
},
{ _class, query, options }
{ _class, query, options: this.toPrintableOptions(options) }
)
}

View File

@ -17,19 +17,23 @@ import {
type Class,
type Doc,
DocumentQuery,
FindOptions,
type Domain,
FindResult,
type LoadModelResponse,
type MeasureContext,
Ref
Ref,
type SearchOptions,
type SearchQuery,
type SearchResult,
type SessionData,
type Timestamp,
type Tx
} from '@hcengineering/core'
import { BaseMiddleware, Middleware, ServerFindOptions, type PipelineContext } from '@hcengineering/server-core'
import { deepEqual } from 'fast-equals'
import { BaseMiddleware, Middleware, type PipelineContext, ServerFindOptions } from '@hcengineering/server-core'
interface Query {
_class: Ref<Class<Doc>>
query: DocumentQuery<Doc>
result: FindResult<Doc> | Promise<FindResult<Doc>> | undefined
options?: FindOptions<Doc>
key: string
result: object | Promise<object> | undefined
callbacks: number
max: number
}
@ -37,27 +41,20 @@ interface Query {
* @public
*/
export class QueryJoiner {
private readonly queries: Map<Ref<Class<Doc>>, Query[]> = new Map<Ref<Class<Doc>>, Query[]>()
private readonly queries: Map<string, Query> = new Map<string, Query>()
constructor (readonly _findAll: Middleware['findAll']) {}
async findAll<T extends Doc>(
ctx: MeasureContext,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: ServerFindOptions<T>
): Promise<FindResult<T>> {
async query<T>(ctx: MeasureContext, key: string, retrieve: (ctx: MeasureContext) => Promise<T>): Promise<T> {
// Will find a query or add + 1 to callbacks
const q = this.findQuery(_class, query, options) ?? this.createQuery(_class, query, options)
const q = this.getQuery(key)
try {
if (q.result === undefined) {
q.result = this._findAll(ctx, _class, query, options)
q.result = retrieve(ctx)
}
if (q.result instanceof Promise) {
q.result = await q.result
}
return q.result as FindResult<T>
return q.result as T
} finally {
q.callbacks--
@ -65,46 +62,27 @@ export class QueryJoiner {
}
}
private findQuery<T extends Doc>(
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Query | undefined {
const queries = this.queries.get(_class)
if (queries === undefined) return
for (const q of queries) {
if (!deepEqual(query, q.query) || !deepEqual(options, q.options)) {
continue
private getQuery (key: string): Query {
const query = this.queries.get(key)
if (query === undefined) {
const q: Query = {
key,
result: undefined,
callbacks: 1,
max: 1
}
q.callbacks++
q.max++
this.queries.set(key, q)
return q
}
}
private createQuery<T extends Doc>(_class: Ref<Class<T>>, query: DocumentQuery<T>, options?: FindOptions<T>): Query {
const queries = this.queries.get(_class) ?? []
const q: Query = {
_class,
query,
result: undefined,
options: options as FindOptions<Doc>,
callbacks: 1,
max: 1
}
queries.push(q)
this.queries.set(_class, queries)
return q
query.callbacks++
query.max++
return query
}
private removeFromQueue (q: Query): void {
if (q.callbacks === 0) {
const queries = this.queries.get(q._class) ?? []
this.queries.set(
q._class,
queries.filter((it) => it !== q)
)
this.queries.delete(q.key)
}
}
}
@ -117,8 +95,16 @@ export class QueryJoinMiddleware extends BaseMiddleware implements Middleware {
private constructor (context: PipelineContext, next?: Middleware) {
super(context, next)
this.joiner = new QueryJoiner((ctx, _class, query, options) => {
return this.provideFindAll(ctx, _class, query, options)
this.joiner = new QueryJoiner()
}
loadModel (
ctx: MeasureContext<SessionData>,
lastModelTx: Timestamp,
hash?: string
): Promise<Tx[] | LoadModelResponse> {
return this.joiner.query(ctx, `model-${lastModelTx}${hash ?? ''}`, async (ctx) => {
return await this.provideLoadModel(ctx, lastModelTx, hash)
})
}
@ -136,7 +122,31 @@ export class QueryJoinMiddleware extends BaseMiddleware implements Middleware {
query: DocumentQuery<T>,
options?: ServerFindOptions<T>
): Promise<FindResult<T>> {
// Will find a query or add + 1 to callbacks
return this.joiner.findAll(ctx, _class, query, options)
const opt = { ...options }
delete opt.ctx
return this.joiner.query(
ctx,
`findAll-${_class}-${JSON.stringify(query)}-${JSON.stringify(options)}`,
async (ctx) => {
return await this.provideFindAll(ctx, _class, query, options)
}
)
}
groupBy<T, P extends Doc>(
ctx: MeasureContext<SessionData>,
domain: Domain,
field: string,
query?: DocumentQuery<P>
): Promise<Map<T, number>> {
return this.joiner.query(ctx, `groupBy-${domain}-${field}-${JSON.stringify(query ?? {})})`, async (ctx) => {
return await this.provideGroupBy(ctx, domain, field, query)
})
}
searchFulltext (ctx: MeasureContext<SessionData>, query: SearchQuery, options: SearchOptions): Promise<SearchResult> {
return this.joiner.query(ctx, `searchFulltext-${JSON.stringify(query)}-${JSON.stringify(options)}`, async (ctx) => {
return await this.provideSearchFulltext(ctx, query, options)
})
}
}

View File

@ -12,14 +12,13 @@ describe('test query joiner', () => {
})
return toFindResult([])
}
const joiner = new QueryJoiner(findT)
const joiner = new QueryJoiner()
const ctx = new MeasureMetricsContext('test', {})
const p1 = joiner.findAll(ctx, core.class.Class, {})
const p2 = joiner.findAll(ctx, core.class.Class, {})
const p1 = joiner.query(ctx, core.class.Class, (ctx) => findT(ctx, core.class.Class, {}))
const p2 = joiner.query(ctx, core.class.Class, (ctx) => findT(ctx, core.class.Class, {}))
await Promise.all([p1, p2])
expect(count).toBe(1)
expect((joiner as any).queries.size).toBe(1)
expect((joiner as any).queries.get(core.class.Class).length).toBe(0)
expect((joiner as any).queries.size).toBe(0)
})
})

View File

@ -42,6 +42,8 @@ export async function getFile (
res.writeHead(200, {
'Content-Type': stat.contentType,
Etag: stat.etag,
connection: 'keep-alive',
'keep-alive': 'timeout=5, max=1000',
'Last-Modified': new Date(stat.modifiedOn).toISOString(),
'Cache-Control': cacheControlNoCache
})
@ -118,7 +120,9 @@ export async function getFileRange (
if (start >= size) {
res.cork(() => {
res.writeHead(416, {
'Content-Range': `bytes */${size}`
'Content-Range': `bytes */${size}`,
connection: 'keep-alive',
'keep-alive': 'timeout=5, max=1000'
})
res.end()
})
@ -139,9 +143,10 @@ export async function getFileRange (
await new Promise<void>((resolve, reject) => {
res.cork(() => {
res.writeHead(206, {
Connection: 'keep-alive',
'Content-Range': `bytes ${start}-${end}/${size}`,
'Accept-Ranges': 'bytes',
connection: 'keep-alive',
'keep-alive': 'timeout=5, max=1000',
// 'Content-Length': end - start + 1,
'Content-Type': stat.contentType,
Etag: stat.etag,

View File

@ -234,7 +234,7 @@ export class ClientSession implements Session {
}
}
const bevent = createBroadcastEvent(Array.from(classes))
socket.send(
void socket.send(
ctx,
{
result: [bevent]
@ -243,7 +243,7 @@ export class ClientSession implements Session {
this.useCompression
)
} else {
socket.send(ctx, { result: tx }, this.binaryMode, this.useCompression)
void socket.send(ctx, { result: tx }, this.binaryMode, this.useCompression)
}
}

View File

@ -237,7 +237,7 @@ export class TSessionManager implements SessionManager {
// And ping other wize
s[1].session.lastPing = now
if (s[1].socket.checkState()) {
s[1].socket.send(
void s[1].socket.send(
workspace.context,
{ result: pingConst },
s[1].session.binaryMode,
@ -504,7 +504,7 @@ export class TSessionManager implements SessionManager {
}
if (this.timeMinutes > 0) {
ws.send(ctx, { result: this.createMaintenanceWarning() }, session.binaryMode, session.useCompression)
void ws.send(ctx, { result: this.createMaintenanceWarning() }, session.binaryMode, session.useCompression)
}
return { session, context: workspace.context, workspaceId: wsString }
}
@ -884,7 +884,7 @@ export class TSessionManager implements SessionManager {
}
private sendUpgrade (ctx: MeasureContext, webSocket: ConnectionSocket, binary: boolean, compression: boolean): void {
webSocket.send(
void webSocket.send(
ctx,
{
result: {
@ -951,6 +951,7 @@ export class TSessionManager implements SessionManager {
createOpContext (
ctx: MeasureContext,
sendCtx: MeasureContext,
pipeline: Pipeline,
requestId: Request<any>['id'],
service: Session,
@ -962,7 +963,7 @@ export class TSessionManager implements SessionManager {
pipeline,
requestId,
sendResponse: (reqId, msg) =>
sendResponse(ctx, service, ws, {
sendResponse(sendCtx, service, ws, {
id: reqId,
result: msg,
time: Date.now() - st,
@ -973,7 +974,7 @@ export class TSessionManager implements SessionManager {
ws.sendPong()
},
sendError: (reqId, msg, error: Status) =>
sendResponse(ctx, service, ws, {
sendResponse(sendCtx, service, ws, {
id: reqId,
result: msg,
error,
@ -1004,7 +1005,7 @@ export class TSessionManager implements SessionManager {
requestCtx.measure('msg-receive-delta', delta)
}
if (service.workspace.closing !== undefined) {
ws.send(
await ws.send(
ctx,
{
id: request.id,
@ -1033,7 +1034,7 @@ export class TSessionManager implements SessionManager {
id: request.id,
result: done
}
ws.send(ctx, forceCloseResponse, service.binaryMode, service.useCompression)
await ws.send(ctx, forceCloseResponse, service.binaryMode, service.useCompression)
return
}
@ -1054,16 +1055,20 @@ export class TSessionManager implements SessionManager {
try {
const params = [...request.params]
if (ws.isBackpressure()) {
await ws.backpressure(ctx)
}
await ctx.with('🧨 process', {}, (callTx) =>
f.apply(service, [this.createOpContext(callTx, pipeline, request.id, service, ws), ...params])
f.apply(service, [this.createOpContext(callTx, userCtx, pipeline, request.id, service, ws), ...params])
)
} catch (err: any) {
Analytics.handleError(err)
if (LOGGING_ENABLED) {
this.ctx.error('error handle request', { error: err, request })
}
ws.send(
ctx,
await ws.send(
userCtx,
{
id: request.id,
error: unknownError(err),
@ -1108,15 +1113,15 @@ export class TSessionManager implements SessionManager {
service.workspace.pipeline instanceof Promise ? await service.workspace.pipeline : service.workspace.pipeline
try {
const uctx = this.createOpContext(ctx, pipeline, reqId, service, ws)
const uctx = this.createOpContext(ctx, userCtx, pipeline, reqId, service, ws)
await operation(uctx)
} catch (err: any) {
Analytics.handleError(err)
if (LOGGING_ENABLED) {
this.ctx.error('error handle request', { error: err })
}
ws.send(
ctx,
await ws.send(
userCtx,
{
id: reqId,
error: unknownError(err),
@ -1174,7 +1179,7 @@ export class TSessionManager implements SessionManager {
account: service.getRawAccount(pipeline),
useCompression: service.useCompression
}
ws.send(requestCtx, helloResponse, false, false)
await ws.send(requestCtx, helloResponse, false, false)
// We do not need to wait for set-status, just return session to client
const _workspace = service.workspace

View File

@ -43,6 +43,8 @@ export function getStatistics (ctx: MeasureContext, sessions: SessionManager, ad
const memU = process.memoryUsage()
data.statistics.memoryUsed = Math.round(((memU.heapUsed + memU.rss) / 1024 / 1024) * 100) / 100
data.statistics.memoryTotal = Math.round((memU.heapTotal / 1024 / 1024) * 100) / 100
data.statistics.memoryRSS = Math.round((memU.rss / 1024 / 1024) * 100) / 100
data.statistics.memoryArrayBuffers = Math.round((memU.arrayBuffers / 1024 / 1024) * 100) / 100
data.statistics.cpuUsage = Math.round(os.loadavg()[0] * 100) / 100
data.statistics.freeMem = Math.round((os.freemem() / 1024 / 1024) * 100) / 100
data.statistics.totalMem = Math.round((os.totalmem() / 1024 / 1024) * 100) / 100

View File

@ -85,6 +85,5 @@ export function sendResponse (
socket: ConnectionSocket,
resp: Response<any>
): Promise<void> {
socket.send(ctx, resp, session.binaryMode, session.useCompression)
return Promise.resolve()
return socket.send(ctx, resp, session.binaryMode, session.useCompression)
}

View File

@ -26,6 +26,7 @@ const sendError = (res: ExpressResponse, code: number, data: any): void => {
res.writeHead(code, {
'Content-Type': 'application/json',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'keep-alive': 'timeout=5, max=1000'
})
res.end(JSON.stringify(data))
@ -35,6 +36,7 @@ async function sendJson (req: Request, res: ExpressResponse, result: any): Promi
const headers: OutgoingHttpHeaders = {
'Content-Type': 'application/json',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'keep-alive': 'timeout=5, max=1000'
}
let body: any = JSON.stringify(result)
@ -173,7 +175,9 @@ function createClosingSocket (rawToken: string, rpcSessions: Map<string, RPCClie
close: () => {
rpcSessions.delete(rawToken)
},
send: (ctx, msg, binary, compression) => {},
send: async (ctx, msg, binary, compression) => {},
isBackpressure: () => false,
backpressure: async (ctx) => {},
sendPong: () => {},
data: () => ({}),
readRequest: (buffer, binary) => ({ method: '', params: [], id: -1, time: Date.now() }),

View File

@ -50,8 +50,12 @@ import 'utf-8-validate'
import { registerRPC } from './rpc'
import { retrieveJson } from './utils'
import { setImmediate } from 'timers/promises'
let profiling = false
const rpcHandler = new RPCHandler()
const backpressureSize = 100 * 1024
/**
* @public
* @param sessionFactory -
@ -81,7 +85,11 @@ export function startHttpServer (
const getUsers = (): any => Array.from(sessions.sessions.entries()).map(([k, v]) => v.session.getUser())
app.get('/api/v1/version', (req, res) => {
res.writeHead(200, { 'Content-Type': 'application/json' })
res.writeHead(200, {
'Content-Type': 'application/json',
Connection: 'keep-alive',
'keep-alive': 'timeout=5, max=1000'
})
res.end(
JSON.stringify({
version: process.env.MODEL_VERSION
@ -91,7 +99,7 @@ export function startHttpServer (
app.get('/api/v1/statistics', (req, res) => {
try {
const token = req.query.token as string
const token = (req.query.token as string) ?? (req.headers.authorization ?? '').split(' ')[1]
const payload = decodeToken(token)
const admin = payload.extra?.admin === 'true'
const jsonData = {
@ -101,7 +109,11 @@ export function startHttpServer (
profiling
}
const json = JSON.stringify(jsonData)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.writeHead(200, {
'Content-Type': 'application/json',
Connection: 'keep-alive',
'keep-alive': 'timeout=5, max=1000'
})
res.end(json)
} catch (err: any) {
Analytics.handleError(err)
@ -130,7 +142,7 @@ export function startHttpServer (
})
app.put('/api/v1/manage', (req, res) => {
try {
const token = req.query.token as string
const token = (req.query.token as string) ?? (req.headers.authorization ?? '').split(' ')[1]
const payload = decodeToken(token)
if (payload.extra?.admin !== 'true' && payload.email !== systemAccountEmail) {
console.warn('Non admin attempt to maintenance action', { payload })
@ -246,7 +258,11 @@ export function startHttpServer (
{ file: name, contentType }
)
.then(() => {
res.writeHead(200, { 'Cache-Control': 'no-cache' })
res.writeHead(200, {
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'keep-alive': 'timeout=5, max=1000'
})
res.end()
})
.catch((err) => {
@ -373,7 +389,7 @@ export function startHttpServer (
void webSocketData.session.then((s) => {
if ('error' in s) {
if (s.specialError === 'archived') {
cs.send(
void cs.send(
ctx,
{
id: -1,
@ -386,7 +402,7 @@ export function startHttpServer (
false
)
} else if (s.specialError === 'migration') {
cs.send(
void cs.send(
ctx,
{
id: -1,
@ -399,7 +415,7 @@ export function startHttpServer (
false
)
} else {
cs.send(
void cs.send(
ctx,
{ id: -1, error: unknownStatus(s.error.message ?? 'Unknown error'), terminate: s.terminate },
false,
@ -412,7 +428,7 @@ export function startHttpServer (
}, 1000)
}
if ('upgrade' in s) {
cs.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false)
void cs.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false)
setTimeout(() => {
cs.close()
}, 5000)
@ -557,6 +573,17 @@ function createWebsocketClientSocket (
ws.close()
ws.terminate()
},
isBackpressure: () => ws.bufferedAmount > backpressureSize,
backpressure: async (ctx) => {
if (ws.bufferedAmount < backpressureSize) {
return
}
await ctx.with('backpressure', {}, async () => {
while (ws.bufferedAmount > backpressureSize) {
await setImmediate()
}
})
},
checkState: () => {
if (ws.readyState === ws.CLOSED || ws.readyState === ws.CLOSING) {
ws.terminate()
@ -577,7 +604,7 @@ function createWebsocketClientSocket (
}
ws.send(pongConst)
},
send: (ctx: MeasureContext, msg, binary, _compression) => {
send: async (ctx: MeasureContext, msg, binary, _compression): Promise<void> => {
const smsg = rpcHandler.serialize(msg, binary)
ctx.measure('send-data', smsg.length)
@ -586,23 +613,28 @@ function createWebsocketClientSocket (
return
}
const handleErr = (err?: Error): void => {
ctx.measure('msg-send-delta', Date.now() - st)
if (err != null) {
if (!`${err.message}`.includes('WebSocket is not open')) {
ctx.error('send error', { err })
Analytics.handleError(err)
}
}
// We need to be sure all data is send before we will send more.
if (cs.isBackpressure()) {
await cs.backpressure(ctx)
}
let sendMsg = smsg
if (_compression) {
void compress(smsg).then((msg: any) => {
ws.send(msg, { binary: true }, handleErr)
})
} else {
ws.send(smsg, { binary: true }, handleErr)
sendMsg = await compress(smsg)
}
await new Promise<void>((resolve) => {
const handleErr = (err?: Error): void => {
ctx.measure('msg-send-delta', Date.now() - st)
if (err != null) {
if (!`${err.message}`.includes('WebSocket is not open')) {
ctx.error('send error', { err })
Analytics.handleError(err)
}
}
resolve() // In any case we need to resolve.
}
ws.send(sendMsg, { binary: true }, handleErr)
})
}
}
return cs

View File

@ -300,7 +300,7 @@ export class Transactor extends DurableObject<Env> {
throw session.error
}
if ('upgrade' in session) {
cs.send(
await cs.send(
this.measureCtx,
{ id: -1, result: { state: 'upgrading', stats: (session as any).upgradeInfo } },
false,
@ -352,6 +352,8 @@ export class Transactor extends DurableObject<Env> {
}
return true
},
backpressure: async (ctx) => {},
isBackpressure: () => false,
readRequest: (buffer: Buffer, binary: boolean) => {
if (buffer.length === pingConst.length) {
if (buffer.toString() === pingConst) {
@ -361,7 +363,7 @@ export class Transactor extends DurableObject<Env> {
return rpcHandler.readRequest(buffer, binary)
},
data: () => data,
send: (ctx: MeasureContext, msg, binary, _compression) => {
send: async (ctx: MeasureContext, msg, binary, _compression) => {
let smsg = rpcHandler.serialize(msg, binary)
ctx.measure('send-data', smsg.length)
@ -435,7 +437,9 @@ export class Transactor extends DurableObject<Env> {
data: () => {
return {}
},
send: (ctx: MeasureContext, msg, binary, compression) => {},
isBackpressure: () => false,
backpressure: async (ctx) => {},
send: async (ctx: MeasureContext, msg, binary, compression) => {},
sendPong: () => {}
}
return cs

View File

@ -13,4 +13,4 @@ export ELASTIC_URL=http://localhost:9201
export SERVER_SECRET=secret
export DB_URL=postgresql://root@localhost:26258/defaultdb?sslmode=disable
node ${TOOL_OPTIONS} ../dev/tool/bundle/bundle.js $@
node ${TOOL_OPTIONS} --max-old-space-size=8096 ../dev/tool/bundle/bundle.js $@