UBERF-8102: Remove client timeout on broadcast (#6560)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-09-13 17:25:32 +07:00 committed by GitHub
parent 52e930cd07
commit 939103a116
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 26 additions and 98 deletions

View File

@ -17,7 +17,6 @@ import core, {
AccountRole,
TxFactory,
TxProcessor,
reduceCalls,
type Account,
type Branding,
type Class,
@ -55,8 +54,6 @@ export class ClientSession implements Session {
sessionId = ''
lastRequest = Date.now()
broadcastTx: Tx[] = []
total: StatisticsElement = { find: 0, tx: 0 }
current: StatisticsElement = { find: 0, tx: 0 }
mins5: StatisticsElement = { find: 0, tx: 0 }
@ -92,22 +89,7 @@ export class ClientSession implements Session {
}
async loadModel (ctx: ClientSessionCtx, lastModelTx: Timestamp, hash?: string): Promise<void> {
const contextData = new SessionDataImpl(
this.token.email,
this.sessionId,
this.token.extra?.admin === 'true',
{
txes: [],
targets: {}
},
this.workspaceId,
this.branding,
false,
new Map(),
new Map(),
this._pipeline.context.modelDb
)
ctx.ctx.contextData = contextData
this.includeSessionContext(ctx.ctx)
const result = await ctx.ctx.with('load-model', {}, () => this._pipeline.loadModel(ctx.ctx, lastModelTx, hash))
await ctx.sendResponse(result)
}
@ -129,22 +111,7 @@ export class ClientSession implements Session {
},
this.token.email as Ref<Account>
)
const contextData = new SessionDataImpl(
this.token.email,
this.sessionId,
this.token.extra?.admin === 'true',
{
txes: [],
targets: {}
},
this.workspaceId,
this.branding,
false,
new Map(),
new Map(),
this._pipeline.context.modelDb
)
ctx.ctx.contextData = contextData
this.includeSessionContext(ctx.ctx)
await this._pipeline.tx(ctx.ctx, [createTx])
const acc = TxProcessor.createDoc2Doc(createTx)
await ctx.sendResponse(acc)
@ -157,15 +124,7 @@ export class ClientSession implements Session {
await ctx.sendResponse(account)
}
findAllRaw<T extends Doc>(
ctx: MeasureContext,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
this.lastRequest = Date.now()
this.total.find++
this.current.find++
includeSessionContext (ctx: MeasureContext): void {
const contextData = new SessionDataImpl(
this.token.email,
this.sessionId,
@ -182,6 +141,18 @@ export class ClientSession implements Session {
this._pipeline.context.modelDb
)
ctx.contextData = contextData
}
findAllRaw<T extends Doc>(
ctx: MeasureContext,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
this.lastRequest = Date.now()
this.total.find++
this.current.find++
this.includeSessionContext(ctx)
return this._pipeline.findAll(ctx, _class, query, options)
}
@ -196,22 +167,7 @@ export class ClientSession implements Session {
async searchFulltext (ctx: ClientSessionCtx, query: SearchQuery, options: SearchOptions): Promise<void> {
this.lastRequest = Date.now()
const contextData = new SessionDataImpl(
this.token.email,
this.sessionId,
this.token.extra?.admin === 'true',
{
txes: [],
targets: {}
},
this.workspaceId,
this.branding,
false,
new Map(),
new Map(),
this._pipeline.context.modelDb
)
ctx.ctx.contextData = contextData
this.includeSessionContext(ctx.ctx)
await ctx.sendResponse(await this._pipeline.searchFulltext(ctx.ctx, query, options))
}
@ -219,22 +175,7 @@ export class ClientSession implements Session {
this.lastRequest = Date.now()
this.total.tx++
this.current.tx++
const contextData = new SessionDataImpl(
this.token.email,
this.sessionId,
this.token.extra?.admin === 'true',
{
txes: [],
targets: {}
},
this.workspaceId,
this.branding,
false,
new Map(),
new Map(),
this._pipeline.context.modelDb
)
ctx.ctx.contextData = contextData
this.includeSessionContext(ctx.ctx)
const result = await this._pipeline.tx(ctx.ctx, [tx])
@ -245,10 +186,10 @@ export class ClientSession implements Session {
await this._pipeline.handleBroadcast(ctx.ctx)
}
doBroadcast = reduceCalls(async (ctx: MeasureContext, socket: ConnectionSocket) => {
if (this.broadcastTx.length > 10000) {
broadcast (ctx: MeasureContext, socket: ConnectionSocket, tx: Tx[]): void {
if (this.tx.length > 10000) {
const classes = new Set<Ref<Class<Doc>>>()
for (const dtx of this.broadcastTx) {
for (const dtx of tx) {
if (TxProcessor.isExtendsCUD(dtx._class)) {
classes.add((dtx as TxCUD<Doc>).objectClass)
}
@ -258,7 +199,6 @@ export class ClientSession implements Session {
}
}
const bevent = createBroadcastEvent(Array.from(classes))
this.broadcastTx = []
socket.send(
ctx,
{
@ -268,21 +208,7 @@ export class ClientSession implements Session {
this.useCompression
)
} else {
const txes = [...this.broadcastTx]
this.broadcastTx = []
await handleSend(ctx, socket, { result: txes }, 32 * 1024, this.binaryMode, this.useCompression)
void handleSend(ctx, socket, { result: tx }, 1024 * 1024, this.binaryMode, this.useCompression)
}
})
timeout: any
broadcast (ctx: MeasureContext, socket: ConnectionSocket, tx: Tx[]): void {
this.broadcastTx.push(...tx)
// We need to put into client broadcast queue, to send user requests first
// Collapse events in 1 second interval
clearTimeout(this.timeout)
this.timeout = setTimeout(() => {
void this.doBroadcast(ctx, socket)
}, 1)
}
}

View File

@ -206,7 +206,11 @@ export function startHttpServer (
const contentType = req.query.contentType as string
const size = parseInt((req.query.size as string) ?? '-1')
if (Number.isNaN(size)) {
ctx.error('/api/v1/blob put error', { message: 'invalid NaN file size' })
ctx.error('/api/v1/blob put error', {
message: 'invalid NaN file size',
name,
workspace: payload.workspace.name
})
res.writeHead(404, {})
res.end()
return

View File

@ -67,8 +67,6 @@ export interface Session {
requests: Map<string, SessionRequest>
broadcastTx: Tx[]
binaryMode: boolean
useCompression: boolean
total: StatisticsElement