mirror of
https://github.com/hcengineering/platform.git
synced 2025-04-20 07:10:02 +00:00
UBERF-7690: Use query joiner for server/trigger requests (#6339)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
87ded4d797
commit
f0727bf42f
@ -199,3 +199,4 @@ export function createNullStorageFactory (): StorageAdapter {
|
|||||||
|
|
||||||
export { AggregatorStorageAdapter, buildStorage } from './aggregator'
|
export { AggregatorStorageAdapter, buildStorage } from './aggregator'
|
||||||
export { DomainIndexHelperImpl } from './domainHelper'
|
export { DomainIndexHelperImpl } from './domainHelper'
|
||||||
|
export { QueryJoiner } from './utils'
|
||||||
|
@ -78,6 +78,7 @@ import type {
|
|||||||
TriggerControl
|
TriggerControl
|
||||||
} from '../types'
|
} from '../types'
|
||||||
import { SessionContextImpl, createBroadcastEvent } from '../utils'
|
import { SessionContextImpl, createBroadcastEvent } from '../utils'
|
||||||
|
import { QueryJoiner } from './utils'
|
||||||
|
|
||||||
interface DomainInfo {
|
interface DomainInfo {
|
||||||
exists: boolean
|
exists: boolean
|
||||||
@ -101,6 +102,8 @@ export class TServerStorage implements ServerStorage {
|
|||||||
|
|
||||||
emptyAdapter = new DummyDbAdapter()
|
emptyAdapter = new DummyDbAdapter()
|
||||||
|
|
||||||
|
joiner: QueryJoiner
|
||||||
|
|
||||||
constructor (
|
constructor (
|
||||||
private readonly _domains: Record<string, string>,
|
private readonly _domains: Record<string, string>,
|
||||||
private readonly defaultAdapter: string,
|
private readonly defaultAdapter: string,
|
||||||
@ -122,6 +125,9 @@ export class TServerStorage implements ServerStorage {
|
|||||||
this.hierarchy = hierarchy
|
this.hierarchy = hierarchy
|
||||||
this.fulltext = indexFactory(this)
|
this.fulltext = indexFactory(this)
|
||||||
this.branding = options.branding
|
this.branding = options.branding
|
||||||
|
this.joiner = new QueryJoiner((ctx, _class, query, options) => {
|
||||||
|
return this.liveQuery.findAll(_class, query, { ...options, ctx } as any)
|
||||||
|
})
|
||||||
|
|
||||||
this.setModel(model)
|
this.setModel(model)
|
||||||
}
|
}
|
||||||
@ -949,12 +955,20 @@ export class TServerStorage implements ServerStorage {
|
|||||||
async processTxes (ctx: SessionOperationContext, txes: Tx[]): Promise<TxResult> {
|
async processTxes (ctx: SessionOperationContext, txes: Tx[]): Promise<TxResult> {
|
||||||
// store tx
|
// store tx
|
||||||
const _findAll: ServerStorage['findAll'] = async <T extends Doc>(
|
const _findAll: ServerStorage['findAll'] = async <T extends Doc>(
|
||||||
ctx: MeasureContext,
|
_ctx: MeasureContext,
|
||||||
clazz: Ref<Class<T>>,
|
clazz: Ref<Class<T>>,
|
||||||
query: DocumentQuery<T>,
|
query: DocumentQuery<T>,
|
||||||
options?: FindOptions<T>
|
options?: FindOptions<T>
|
||||||
): Promise<FindResult<T>> => {
|
): Promise<FindResult<T>> => {
|
||||||
return await this.findAll(ctx, clazz, query, { ...options, prefix: 'server' })
|
return await _ctx.with(
|
||||||
|
'findAll',
|
||||||
|
{ _class: clazz },
|
||||||
|
async (ctx) => await this.joiner.findAll(ctx, clazz, query, { ...options, prefix: 'server' } as any),
|
||||||
|
{
|
||||||
|
query,
|
||||||
|
options
|
||||||
|
}
|
||||||
|
)
|
||||||
}
|
}
|
||||||
const txToStore: Tx[] = []
|
const txToStore: Tx[] = []
|
||||||
const modelTx: Tx[] = []
|
const modelTx: Tx[] = []
|
||||||
|
107
server/core/src/server/utils.ts
Normal file
107
server/core/src/server/utils.ts
Normal file
@ -0,0 +1,107 @@
|
|||||||
|
//
|
||||||
|
// Copyright © 2024 Hardcore Engineering Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License. You may
|
||||||
|
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
//
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
import {
|
||||||
|
type Class,
|
||||||
|
type Doc,
|
||||||
|
type DocumentQuery,
|
||||||
|
type FindOptions,
|
||||||
|
type FindResult,
|
||||||
|
type MeasureContext,
|
||||||
|
type Ref
|
||||||
|
} from '@hcengineering/core'
|
||||||
|
|
||||||
|
import { deepEqual } from 'fast-equals'
|
||||||
|
import type { ServerStorage } from '../types'
|
||||||
|
|
||||||
|
interface Query {
|
||||||
|
_class: Ref<Class<Doc>>
|
||||||
|
query: DocumentQuery<Doc>
|
||||||
|
result: FindResult<Doc> | Promise<FindResult<Doc>> | undefined
|
||||||
|
options?: FindOptions<Doc>
|
||||||
|
callbacks: number
|
||||||
|
max: number
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* @public
|
||||||
|
*/
|
||||||
|
export class QueryJoiner {
|
||||||
|
private readonly queries: Map<Ref<Class<Doc>>, Query[]> = new Map<Ref<Class<Doc>>, Query[]>()
|
||||||
|
|
||||||
|
constructor (readonly _findAll: ServerStorage['findAll']) {}
|
||||||
|
|
||||||
|
async findAll<T extends Doc>(
|
||||||
|
ctx: MeasureContext,
|
||||||
|
_class: Ref<Class<T>>,
|
||||||
|
query: DocumentQuery<T>,
|
||||||
|
options?: FindOptions<T>
|
||||||
|
): Promise<FindResult<T>> {
|
||||||
|
// Will find a query or add + 1 to callbacks
|
||||||
|
const q = this.findQuery(_class, query, options) ?? this.createQuery(_class, query, options)
|
||||||
|
if (q.result === undefined) {
|
||||||
|
q.result = this._findAll(ctx, _class, query, options)
|
||||||
|
}
|
||||||
|
if (q.result instanceof Promise) {
|
||||||
|
q.result = await q.result
|
||||||
|
q.callbacks--
|
||||||
|
}
|
||||||
|
this.removeFromQueue(q)
|
||||||
|
|
||||||
|
return q.result as FindResult<T>
|
||||||
|
}
|
||||||
|
|
||||||
|
private findQuery<T extends Doc>(
|
||||||
|
_class: Ref<Class<T>>,
|
||||||
|
query: DocumentQuery<T>,
|
||||||
|
options?: FindOptions<T>
|
||||||
|
): Query | undefined {
|
||||||
|
const queries = this.queries.get(_class)
|
||||||
|
if (queries === undefined) return
|
||||||
|
for (const q of queries) {
|
||||||
|
if (!deepEqual(query, q.query) || !deepEqual(options, q.options)) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
q.callbacks++
|
||||||
|
q.max++
|
||||||
|
return q
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private createQuery<T extends Doc>(_class: Ref<Class<T>>, query: DocumentQuery<T>, options?: FindOptions<T>): Query {
|
||||||
|
const queries = this.queries.get(_class) ?? []
|
||||||
|
const q: Query = {
|
||||||
|
_class,
|
||||||
|
query,
|
||||||
|
result: undefined,
|
||||||
|
options: options as FindOptions<Doc>,
|
||||||
|
callbacks: 1,
|
||||||
|
max: 1
|
||||||
|
}
|
||||||
|
|
||||||
|
queries.push(q)
|
||||||
|
this.queries.set(_class, queries)
|
||||||
|
return q
|
||||||
|
}
|
||||||
|
|
||||||
|
private removeFromQueue (q: Query): void {
|
||||||
|
if (q.callbacks === 0) {
|
||||||
|
const queries = this.queries.get(q._class) ?? []
|
||||||
|
this.queries.set(
|
||||||
|
q._class,
|
||||||
|
queries.filter((it) => it !== q)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -13,28 +13,32 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
//
|
//
|
||||||
|
|
||||||
import { Class, Doc, DocumentQuery, FindOptions, FindResult, MeasureContext, Ref, Tx } from '@hcengineering/core'
|
import {
|
||||||
import { Middleware, SessionContext, TxMiddlewareResult, type ServerStorage } from '@hcengineering/server-core'
|
type Class,
|
||||||
|
type Doc,
|
||||||
|
DocumentQuery,
|
||||||
|
FindOptions,
|
||||||
|
FindResult,
|
||||||
|
type MeasureContext,
|
||||||
|
Ref,
|
||||||
|
type Tx
|
||||||
|
} from '@hcengineering/core'
|
||||||
|
import { Middleware, type ServerStorage, SessionContext, TxMiddlewareResult } from '@hcengineering/server-core'
|
||||||
import { BaseMiddleware } from './base'
|
import { BaseMiddleware } from './base'
|
||||||
|
|
||||||
import { deepEqual } from 'fast-equals'
|
import { QueryJoiner } from '@hcengineering/server-core'
|
||||||
|
|
||||||
interface Query {
|
|
||||||
_class: Ref<Class<Doc>>
|
|
||||||
query: DocumentQuery<Doc>
|
|
||||||
result: FindResult<Doc> | Promise<FindResult<Doc>> | undefined
|
|
||||||
options?: FindOptions<Doc>
|
|
||||||
callbacks: number
|
|
||||||
max: number
|
|
||||||
}
|
|
||||||
/**
|
/**
|
||||||
* @public
|
* @public
|
||||||
*/
|
*/
|
||||||
export class QueryJoinMiddleware extends BaseMiddleware implements Middleware {
|
export class QueryJoinMiddleware extends BaseMiddleware implements Middleware {
|
||||||
private readonly queries: Map<Ref<Class<Doc>>, Query[]> = new Map<Ref<Class<Doc>>, Query[]>()
|
private readonly joiner: QueryJoiner
|
||||||
|
|
||||||
private constructor (storage: ServerStorage, next?: Middleware) {
|
private constructor (storage: ServerStorage, next?: Middleware) {
|
||||||
super(storage, next)
|
super(storage, next)
|
||||||
|
this.joiner = new QueryJoiner((ctx, _class, query, options) => {
|
||||||
|
return storage.findAll(ctx, _class, query, options)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
static async create (ctx: MeasureContext, storage: ServerStorage, next?: Middleware): Promise<QueryJoinMiddleware> {
|
static async create (ctx: MeasureContext, storage: ServerStorage, next?: Middleware): Promise<QueryJoinMiddleware> {
|
||||||
@ -52,59 +56,6 @@ export class QueryJoinMiddleware extends BaseMiddleware implements Middleware {
|
|||||||
options?: FindOptions<T>
|
options?: FindOptions<T>
|
||||||
): Promise<FindResult<T>> {
|
): Promise<FindResult<T>> {
|
||||||
// Will find a query or add + 1 to callbacks
|
// Will find a query or add + 1 to callbacks
|
||||||
const q = this.findQuery(_class, query, options) ?? this.createQuery(_class, query, options)
|
return await this.joiner.findAll(ctx.ctx, _class, query, options)
|
||||||
if (q.result === undefined) {
|
|
||||||
q.result = this.provideFindAll(ctx, _class, query, options)
|
|
||||||
}
|
|
||||||
if (q.result instanceof Promise) {
|
|
||||||
q.result = await q.result
|
|
||||||
q.callbacks--
|
|
||||||
}
|
|
||||||
this.removeFromQueue(q)
|
|
||||||
|
|
||||||
return q.result as FindResult<T>
|
|
||||||
}
|
|
||||||
|
|
||||||
private findQuery<T extends Doc>(
|
|
||||||
_class: Ref<Class<T>>,
|
|
||||||
query: DocumentQuery<T>,
|
|
||||||
options?: FindOptions<T>
|
|
||||||
): Query | undefined {
|
|
||||||
const queries = this.queries.get(_class)
|
|
||||||
if (queries === undefined) return
|
|
||||||
for (const q of queries) {
|
|
||||||
if (!deepEqual(query, q.query) || !deepEqual(options, q.options)) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
q.callbacks++
|
|
||||||
q.max++
|
|
||||||
return q
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private createQuery<T extends Doc>(_class: Ref<Class<T>>, query: DocumentQuery<T>, options?: FindOptions<T>): Query {
|
|
||||||
const queries = this.queries.get(_class) ?? []
|
|
||||||
const q: Query = {
|
|
||||||
_class,
|
|
||||||
query,
|
|
||||||
result: undefined,
|
|
||||||
options: options as FindOptions<Doc>,
|
|
||||||
callbacks: 1,
|
|
||||||
max: 1
|
|
||||||
}
|
|
||||||
|
|
||||||
queries.push(q)
|
|
||||||
this.queries.set(_class, queries)
|
|
||||||
return q
|
|
||||||
}
|
|
||||||
|
|
||||||
private removeFromQueue (q: Query): void {
|
|
||||||
if (q.callbacks === 0) {
|
|
||||||
const queries = this.queries.get(q._class) ?? []
|
|
||||||
this.queries.set(
|
|
||||||
q._class,
|
|
||||||
queries.filter((it) => it !== q)
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user