UBERF-5364: Fix targeted broadcast on server (#4565)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-02-07 15:48:54 +07:00 committed by GitHub
parent d98fb41ac3
commit 2d9eb8ec32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 12 additions and 11 deletions

View File

@ -69,6 +69,7 @@ import { FullTextPipelineStage } from './indexer/types'
import serverCore from './plugin' import serverCore from './plugin'
import { Triggers } from './triggers' import { Triggers } from './triggers'
import type { import type {
BroadcastFunc,
ContentTextAdapter, ContentTextAdapter,
ContentTextAdapterConfiguration, ContentTextAdapterConfiguration,
FullTextAdapter, FullTextAdapter,
@ -626,11 +627,11 @@ class TServerStorage implements ServerStorage {
findAllCtx: findAll, findAllCtx: findAll,
modelDb: this.modelDb, modelDb: this.modelDb,
hierarchy: this.hierarchy, hierarchy: this.hierarchy,
apply: async (tx, broadcast) => { apply: async (tx, broadcast, target) => {
return await this.apply(ctx, tx, broadcast) return await this.apply(ctx, tx, broadcast, target)
}, },
applyCtx: async (ctx, tx, broadcast) => { applyCtx: async (ctx, tx, broadcast, target) => {
return await this.apply(ctx, tx, broadcast) return await this.apply(ctx, tx, broadcast, target)
}, },
// Will create a live query if missing and return values immediately if already asked. // Will create a live query if missing and return values immediately if already asked.
queryFind: async (_class, query, options) => { queryFind: async (_class, query, options) => {
@ -719,14 +720,14 @@ class TServerStorage implements ServerStorage {
return { passed, onEnd } return { passed, onEnd }
} }
async apply (ctx: MeasureContext, txes: Tx[], broadcast: boolean): Promise<TxResult> { async apply (ctx: MeasureContext, txes: Tx[], broadcast: boolean, target?: string[]): Promise<TxResult> {
const result = await this.processTxes(ctx, txes) const result = await this.processTxes(ctx, txes)
let derived: Tx[] = [] let derived: Tx[] = []
derived = result[1] derived = result[1]
if (broadcast) { if (broadcast) {
this.options?.broadcast?.([...txes, ...derived]) this.options?.broadcast?.([...txes, ...derived], target)
} }
return result[0] return result[0]
@ -867,7 +868,7 @@ export interface ServerStorageOptions {
// Indexing is not required to be started for upgrade mode. // Indexing is not required to be started for upgrade mode.
upgrade: boolean upgrade: boolean
broadcast?: (tx: Tx[]) => void broadcast?: BroadcastFunc
} }
/** /**
* @public * @public

View File

@ -76,8 +76,8 @@ export class Triggers {
ctx, ctx,
txFactory: new TxFactory(tx.modifiedBy, true), txFactory: new TxFactory(tx.modifiedBy, true),
findAll: async (clazz, query, options) => await ctrl.findAllCtx(ctx, clazz, query, options), findAll: async (clazz, query, options) => await ctrl.findAllCtx(ctx, clazz, query, options),
apply: async (tx, broadcast) => { apply: async (tx, broadcast, target) => {
return await ctrl.applyCtx(ctx, tx, broadcast) return await ctrl.applyCtx(ctx, tx, broadcast, target)
} }
})) }))
) )

View File

@ -135,8 +135,8 @@ export interface TriggerControl {
fx: (f: () => Promise<void>) => void fx: (f: () => Promise<void>) => void
// Bulk operations in case trigger require some // Bulk operations in case trigger require some
apply: (tx: Tx[], broadcast: boolean) => Promise<TxResult> apply: (tx: Tx[], broadcast: boolean, target?: string[]) => Promise<TxResult>
applyCtx: (ctx: MeasureContext, tx: Tx[], broadcast: boolean) => Promise<TxResult> applyCtx: (ctx: MeasureContext, tx: Tx[], broadcast: boolean, target?: string[]) => Promise<TxResult>
// Will create a live query if missing and return values immediately if already asked. // Will create a live query if missing and return values immediately if already asked.
queryFind: <T extends Doc>( queryFind: <T extends Doc>(