diff --git a/server-plugins/notification-resources/src/index.ts b/server-plugins/notification-resources/src/index.ts index e8c6e9e537..5988b75642 100644 --- a/server-plugins/notification-resources/src/index.ts +++ b/server-plugins/notification-resources/src/index.ts @@ -34,6 +34,7 @@ import core, { Data, Doc, DocumentUpdate, + generateId, MeasureContext, MixinUpdate, Ref, @@ -73,8 +74,8 @@ import serverNotification, { getPersonAccount, getPersonAccountById, NOTIFICATION_BODY_SIZE, - UserInfo, - NOTIFICATION_TITLE_SIZE + NOTIFICATION_TITLE_SIZE, + UserInfo } from '@hcengineering/server-notification' import serverView from '@hcengineering/server-view' import { stripTags } from '@hcengineering/text' @@ -362,9 +363,11 @@ export async function pushInboxNotifications ( lastUpdateTimestamp: shouldUpdateTimestamp ? modifiedOn : undefined }) await control.apply([createContextTx]) - control.operationContext.derived.targets['docNotifyContext' + createContextTx._id] = (it) => { - if (it._id === createContextTx._id) { - return [account.email] + if (target.account?.email !== undefined) { + control.operationContext.derived.targets['docNotifyContext' + createContextTx._id] = (it) => { + if (it._id === createContextTx._id) { + return [target.account?.email as string] + } } } docNotifyContextId = createContextTx.objectId @@ -374,12 +377,15 @@ export async function pushInboxNotifications ( lastUpdateTimestamp: modifiedOn }) await control.apply([updateTx]) - control.operationContext.derived.targets['docNotifyContext' + updateTx._id] = (it) => { - if (it._id === updateTx._id) { - return [account.email] + if (target.account?.email !== undefined) { + control.operationContext.derived.targets['docNotifyContext' + updateTx._id] = (it) => { + if (it._id === updateTx._id) { + return [target.account?.email as string] + } } } } + docNotifyContextId = context._id } @@ -805,9 +811,27 @@ export async function createCollabDocInfo ( if (info === undefined) continue - res = res.concat( - await getNotificationTxes(control, object, tx, originTx, info, sender, params, notifyContexts, docMessages) + const targetRes = await getNotificationTxes( + control, + object, + tx, + originTx, + info, + sender, + params, + notifyContexts, + docMessages ) + const ids = new Set(targetRes.map((it) => it._id)) + if (info.account?.email !== undefined) { + const id = generateId() as string + control.operationContext.derived.targets[id] = (it) => { + if (ids.has(it._id)) { + return [info.account?.email as string] + } + } + } + res = res.concat(targetRes) } return res } diff --git a/server/core/src/server/storage.ts b/server/core/src/server/storage.ts index 9e17fa77e0..44f04bedd4 100644 --- a/server/core/src/server/storage.ts +++ b/server/core/src/server/storage.ts @@ -781,13 +781,16 @@ export class TServerStorage implements ServerStorage { this.options.branding, true ) - const result = await performAsync(applyCtx) + const aresult = await performAsync(applyCtx) if (applyTxes.length > 0) { await this.apply(applyCtx, applyTxes) } // We need to broadcast changes - await this.broadcastCtx(applyCtx.derived.txes.concat(result), applyCtx.derived.targets) + const combinedTxes = applyCtx.derived.txes.concat(aresult) + if (combinedTxes.length > 0) { + await this.broadcastCtx(combinedTxes, applyCtx.derived.targets) + } }, { count: txes.length } ) diff --git a/server/ws/src/client.ts b/server/ws/src/client.ts index 789960c71c..a8ba459442 100644 --- a/server/ws/src/client.ts +++ b/server/ws/src/client.ts @@ -17,6 +17,7 @@ import core, { AccountRole, TxFactory, TxProcessor, + reduceCalls, toIdMap, type Account, type Class, @@ -36,7 +37,14 @@ import core, { } from '@hcengineering/core' import { SessionContextImpl, createBroadcastEvent, type Pipeline } from '@hcengineering/server-core' import { type Token } from '@hcengineering/server-token' -import { type ClientSessionCtx, type Session, type SessionRequest, type StatisticsElement } from './types' +import { + type ClientSessionCtx, + type ConnectionSocket, + type Session, + type SessionRequest, + type StatisticsElement +} from './types' +import { handleSend } from './utils' /** * @public */ @@ -48,6 +56,8 @@ export class ClientSession implements Session { sessionId = '' lastRequest = Date.now() + broadcastTx: Tx[] = [] + total: StatisticsElement = { find: 0, tx: 0 } current: StatisticsElement = { find: 0, tx: 0 } mins5: StatisticsElement = { find: 0, tx: 0 } @@ -297,6 +307,47 @@ export class ClientSession implements Session { void handleSend(toSendAll, undefined, Array.from(toSendTarget.keys())) } + doBroadcast = reduceCalls(async (ctx: MeasureContext, socket: ConnectionSocket) => { + if (this.broadcastTx.length > 10000) { + const classes = new Set>>() + for (const dtx of this.broadcastTx) { + if (TxProcessor.isExtendsCUD(dtx._class)) { + classes.add((dtx as TxCUD).objectClass) + } + const etx = TxProcessor.extractTx(dtx) + if (TxProcessor.isExtendsCUD(etx._class)) { + classes.add((etx as TxCUD).objectClass) + } + } + const bevent = createBroadcastEvent(Array.from(classes)) + this.broadcastTx = [] + await socket.send( + ctx, + { + result: [bevent] + }, + this.binaryMode, + this.useCompression + ) + } else { + const txes = [...this.broadcastTx] + this.broadcastTx = [] + await handleSend(ctx, socket, { result: txes }, 32 * 1024, this.binaryMode, this.useCompression) + } + }) + + timeout: any + + broadcast (ctx: MeasureContext, socket: ConnectionSocket, tx: Tx[]): void { + this.broadcastTx.push(...tx) + // We need to put into client broadcast queue, to send user requests first + // Collapse events in 1 second interval + clearTimeout(this.timeout) + this.timeout = setTimeout(() => { + void this.doBroadcast(ctx, socket) + }, 5) + } + private async sendWithPart ( derived: Tx[], ctx: ClientSessionCtx, diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index b7fb1dc133..c19664b36b 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -511,11 +511,11 @@ class TSessionManager implements SessionManager { const sessions = [...workspace.sessions.values()] const ctx = this.ctx.newChild('📭 broadcast', {}) - function send (): void { + const send = (): void => { for (const sessionRef of sessions) { const tt = sessionRef.session.getUser() if ((target === undefined && !(exclude ?? []).includes(tt)) || (target?.includes(tt) ?? false)) { - void sendResponse(ctx, sessionRef.session, sessionRef.socket, { result: resp }) + sessionRef.session.broadcast(ctx, sessionRef.socket, resp) } } ctx.end() @@ -547,8 +547,8 @@ class TSessionManager implements SessionManager { pipelineCtx, { ...token.workspace, workspaceUrl, workspaceName }, upgrade, - (tx, targets) => { - this.broadcastAll(workspace, tx, targets) + (tx, targets, exclude) => { + this.broadcastAll(workspace, tx, targets, exclude) }, branding ), diff --git a/server/ws/src/types.ts b/server/ws/src/types.ts index 3eb21aced1..8d997c19a5 100644 --- a/server/ws/src/types.ts +++ b/server/ws/src/types.ts @@ -70,6 +70,8 @@ export interface Session { requests: Map + broadcastTx: Tx[] + binaryMode: boolean useCompression: boolean total: StatisticsElement @@ -81,6 +83,8 @@ export interface Session { isUpgradeClient: () => boolean getMode: () => string + + broadcast: (ctx: MeasureContext, socket: ConnectionSocket, tx: Tx[]) => void } /** diff --git a/tests/sanity/package.json b/tests/sanity/package.json index 2f25891c13..af4066221c 100644 --- a/tests/sanity/package.json +++ b/tests/sanity/package.json @@ -22,7 +22,7 @@ "uitest": "cross-env LOCAL_URL=http://localhost:3003/ DEV_URL= playwright test -c ./tests/playwright.config.ts", "staging-uitest": "cross-env PLATFORM_URI=https://front.hc.engineering/ playwright test -c ./tests/playwright.config.ts --grep @staging", "dev-uitest": "cross-env PLATFORM_URI=http://localhost:8080 PLATFORM_TRANSACTOR=ws://localhost:3333 SETTING=storage-dev.json SETTING_SECOND=storageSecond-dev.json DEV_URL=http://localhost:8080/account playwright test -c ./tests/playwright.config.ts", - "debug": "playwright test -c ./tests/playwright.config.ts --debug --headed", + "debug": "cross-env LOCAL_URL=http://localhost:3003/ DEV_URL= playwright test -c ./tests/playwright.config.ts --debug --headed", "dev-debug": "cross-env PLATFORM_URI=http://localhost:8080 PLATFORM_TRANSACTOR=ws://localhost:3333 SETTING=storage-dev.json SETTING_SECOND=storageSecond-dev.json playwright test -c ./tests/playwright.config.ts --debug --headed", "codegen": "playwright codegen --load-storage storage.json http://localhost:8083/workbench/sanity-ws/", "dev-codegen": "cross-env playwright codegen --load-storage storage-dev.json http://localhost:8080/workbench/sanity-ws/",