UBERF-7620: Send broadcast on delay with combine (#6094)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-07-19 15:32:53 +07:00 committed by GitHub
parent 3f5d1ceef8
commit 0a65d46eec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 100 additions and 18 deletions

View File

@ -34,6 +34,7 @@ import core, {
Data,
Doc,
DocumentUpdate,
generateId,
MeasureContext,
MixinUpdate,
Ref,
@ -73,8 +74,8 @@ import serverNotification, {
getPersonAccount,
getPersonAccountById,
NOTIFICATION_BODY_SIZE,
UserInfo,
NOTIFICATION_TITLE_SIZE
NOTIFICATION_TITLE_SIZE,
UserInfo
} from '@hcengineering/server-notification'
import serverView from '@hcengineering/server-view'
import { stripTags } from '@hcengineering/text'
@ -362,9 +363,11 @@ export async function pushInboxNotifications (
lastUpdateTimestamp: shouldUpdateTimestamp ? modifiedOn : undefined
})
await control.apply([createContextTx])
control.operationContext.derived.targets['docNotifyContext' + createContextTx._id] = (it) => {
if (it._id === createContextTx._id) {
return [account.email]
if (target.account?.email !== undefined) {
control.operationContext.derived.targets['docNotifyContext' + createContextTx._id] = (it) => {
if (it._id === createContextTx._id) {
return [target.account?.email as string]
}
}
}
docNotifyContextId = createContextTx.objectId
@ -374,12 +377,15 @@ export async function pushInboxNotifications (
lastUpdateTimestamp: modifiedOn
})
await control.apply([updateTx])
control.operationContext.derived.targets['docNotifyContext' + updateTx._id] = (it) => {
if (it._id === updateTx._id) {
return [account.email]
if (target.account?.email !== undefined) {
control.operationContext.derived.targets['docNotifyContext' + updateTx._id] = (it) => {
if (it._id === updateTx._id) {
return [target.account?.email as string]
}
}
}
}
docNotifyContextId = context._id
}
@ -805,9 +811,27 @@ export async function createCollabDocInfo (
if (info === undefined) continue
res = res.concat(
await getNotificationTxes(control, object, tx, originTx, info, sender, params, notifyContexts, docMessages)
const targetRes = await getNotificationTxes(
control,
object,
tx,
originTx,
info,
sender,
params,
notifyContexts,
docMessages
)
const ids = new Set(targetRes.map((it) => it._id))
if (info.account?.email !== undefined) {
const id = generateId() as string
control.operationContext.derived.targets[id] = (it) => {
if (ids.has(it._id)) {
return [info.account?.email as string]
}
}
}
res = res.concat(targetRes)
}
return res
}

View File

@ -781,13 +781,16 @@ export class TServerStorage implements ServerStorage {
this.options.branding,
true
)
const result = await performAsync(applyCtx)
const aresult = await performAsync(applyCtx)
if (applyTxes.length > 0) {
await this.apply(applyCtx, applyTxes)
}
// We need to broadcast changes
await this.broadcastCtx(applyCtx.derived.txes.concat(result), applyCtx.derived.targets)
const combinedTxes = applyCtx.derived.txes.concat(aresult)
if (combinedTxes.length > 0) {
await this.broadcastCtx(combinedTxes, applyCtx.derived.targets)
}
},
{ count: txes.length }
)

View File

@ -17,6 +17,7 @@ import core, {
AccountRole,
TxFactory,
TxProcessor,
reduceCalls,
toIdMap,
type Account,
type Class,
@ -36,7 +37,14 @@ import core, {
} from '@hcengineering/core'
import { SessionContextImpl, createBroadcastEvent, type Pipeline } from '@hcengineering/server-core'
import { type Token } from '@hcengineering/server-token'
import { type ClientSessionCtx, type Session, type SessionRequest, type StatisticsElement } from './types'
import {
type ClientSessionCtx,
type ConnectionSocket,
type Session,
type SessionRequest,
type StatisticsElement
} from './types'
import { handleSend } from './utils'
/**
* @public
*/
@ -48,6 +56,8 @@ export class ClientSession implements Session {
sessionId = ''
lastRequest = Date.now()
broadcastTx: Tx[] = []
total: StatisticsElement = { find: 0, tx: 0 }
current: StatisticsElement = { find: 0, tx: 0 }
mins5: StatisticsElement = { find: 0, tx: 0 }
@ -297,6 +307,47 @@ export class ClientSession implements Session {
void handleSend(toSendAll, undefined, Array.from(toSendTarget.keys()))
}
doBroadcast = reduceCalls(async (ctx: MeasureContext, socket: ConnectionSocket) => {
if (this.broadcastTx.length > 10000) {
const classes = new Set<Ref<Class<Doc>>>()
for (const dtx of this.broadcastTx) {
if (TxProcessor.isExtendsCUD(dtx._class)) {
classes.add((dtx as TxCUD<Doc>).objectClass)
}
const etx = TxProcessor.extractTx(dtx)
if (TxProcessor.isExtendsCUD(etx._class)) {
classes.add((etx as TxCUD<Doc>).objectClass)
}
}
const bevent = createBroadcastEvent(Array.from(classes))
this.broadcastTx = []
await socket.send(
ctx,
{
result: [bevent]
},
this.binaryMode,
this.useCompression
)
} else {
const txes = [...this.broadcastTx]
this.broadcastTx = []
await handleSend(ctx, socket, { result: txes }, 32 * 1024, this.binaryMode, this.useCompression)
}
})
timeout: any
broadcast (ctx: MeasureContext, socket: ConnectionSocket, tx: Tx[]): void {
this.broadcastTx.push(...tx)
// We need to put into client broadcast queue, to send user requests first
// Collapse events in 1 second interval
clearTimeout(this.timeout)
this.timeout = setTimeout(() => {
void this.doBroadcast(ctx, socket)
}, 5)
}
private async sendWithPart (
derived: Tx[],
ctx: ClientSessionCtx,

View File

@ -511,11 +511,11 @@ class TSessionManager implements SessionManager {
const sessions = [...workspace.sessions.values()]
const ctx = this.ctx.newChild('📭 broadcast', {})
function send (): void {
const send = (): void => {
for (const sessionRef of sessions) {
const tt = sessionRef.session.getUser()
if ((target === undefined && !(exclude ?? []).includes(tt)) || (target?.includes(tt) ?? false)) {
void sendResponse(ctx, sessionRef.session, sessionRef.socket, { result: resp })
sessionRef.session.broadcast(ctx, sessionRef.socket, resp)
}
}
ctx.end()
@ -547,8 +547,8 @@ class TSessionManager implements SessionManager {
pipelineCtx,
{ ...token.workspace, workspaceUrl, workspaceName },
upgrade,
(tx, targets) => {
this.broadcastAll(workspace, tx, targets)
(tx, targets, exclude) => {
this.broadcastAll(workspace, tx, targets, exclude)
},
branding
),

View File

@ -70,6 +70,8 @@ export interface Session {
requests: Map<string, SessionRequest>
broadcastTx: Tx[]
binaryMode: boolean
useCompression: boolean
total: StatisticsElement
@ -81,6 +83,8 @@ export interface Session {
isUpgradeClient: () => boolean
getMode: () => string
broadcast: (ctx: MeasureContext, socket: ConnectionSocket, tx: Tx[]) => void
}
/**

View File

@ -22,7 +22,7 @@
"uitest": "cross-env LOCAL_URL=http://localhost:3003/ DEV_URL= playwright test -c ./tests/playwright.config.ts",
"staging-uitest": "cross-env PLATFORM_URI=https://front.hc.engineering/ playwright test -c ./tests/playwright.config.ts --grep @staging",
"dev-uitest": "cross-env PLATFORM_URI=http://localhost:8080 PLATFORM_TRANSACTOR=ws://localhost:3333 SETTING=storage-dev.json SETTING_SECOND=storageSecond-dev.json DEV_URL=http://localhost:8080/account playwright test -c ./tests/playwright.config.ts",
"debug": "playwright test -c ./tests/playwright.config.ts --debug --headed",
"debug": "cross-env LOCAL_URL=http://localhost:3003/ DEV_URL= playwright test -c ./tests/playwright.config.ts --debug --headed",
"dev-debug": "cross-env PLATFORM_URI=http://localhost:8080 PLATFORM_TRANSACTOR=ws://localhost:3333 SETTING=storage-dev.json SETTING_SECOND=storageSecond-dev.json playwright test -c ./tests/playwright.config.ts --debug --headed",
"codegen": "playwright codegen --load-storage storage.json http://localhost:8083/workbench/sanity-ws/",
"dev-codegen": "cross-env playwright codegen --load-storage storage-dev.json http://localhost:8080/workbench/sanity-ws/",