From c3a5b88b6eeebf5da188bb12b10e9285314425a3 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Wed, 24 Jul 2024 17:27:09 +0700 Subject: [PATCH] Fix mongo indexes (#6122) Signed-off-by: Andrey Sobolev --- dev/tool/src/index.ts | 41 +- models/activity/src/index.ts | 5 +- models/core/src/core.ts | 7 +- models/core/src/index.ts | 59 +- models/core/src/tx.ts | 4 +- models/notification/src/index.ts | 3 +- packages/core/src/classes.ts | 12 +- packages/core/src/measurements/context.ts | 32 +- packages/core/src/measurements/metrics.ts | 68 +- packages/core/src/measurements/types.ts | 5 +- packages/core/src/utils.ts | 17 +- .../components/ServerManagerGeneral.svelte | 3 +- .../components/statistics/MetricsInfo.svelte | 14 +- .../src/components/statistics/Params.svelte | 80 +++ pods/account/Dockerfile | 2 +- pods/backup/Dockerfile | 2 +- pods/collaborator/Dockerfile | 2 +- pods/front/Dockerfile | 2 +- pods/server/Dockerfile | 2 +- server-plugins/tracker-resources/src/index.ts | 48 +- server/core/src/adapter.ts | 6 +- server/core/src/indexer/indexer.ts | 52 +- server/core/src/mem.ts | 2 +- server/core/src/server/domainHelper.ts | 51 +- server/elastic/src/backup.ts | 2 +- server/middleware/src/spaceSecurity.ts | 9 +- server/mongo/src/rawAdapter.ts | 5 +- server/mongo/src/storage.ts | 582 ++++++++++-------- server/mongo/src/utils.ts | 41 +- server/server-storage/src/blobStorage.ts | 2 +- server/server/src/apm.ts | 4 +- server/server/src/starter.ts | 2 +- server/tool/src/index.ts | 66 +- tests/sanity/tests/playwright.config.ts | 5 +- 34 files changed, 797 insertions(+), 440 deletions(-) create mode 100644 plugins/workbench-resources/src/components/statistics/Params.svelte diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 554051c231..87016d0288 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -46,7 +46,7 @@ import { createStorageBackupStorage, restore } from '@hcengineering/server-backup' -import serverClientPlugin, { BlobClient, getTransactorEndpoint } from '@hcengineering/server-client' +import serverClientPlugin, { BlobClient, createClient, getTransactorEndpoint } from '@hcengineering/server-client' import serverToken, { decodeToken, generateToken } from '@hcengineering/server-token' import toolPlugin from '@hcengineering/server-tool' @@ -244,13 +244,29 @@ export function devTool ( const { mongodbUri } = prepareTools() await withDatabase(mongodbUri, async (db, client) => { console.log(`assigning user ${email} to ${workspace}...`) - const workspaceInfo = await getWorkspaceById(db, productId, workspace) - if (workspaceInfo === null) { - throw new Error(`workspace ${workspace} not found`) - } - console.log('assigning to workspace', workspaceInfo) try { - await assignWorkspace(toolCtx, db, productId, null, email, workspaceInfo.workspace, AccountRole.User) + const workspaceInfo = await getWorkspaceById(db, productId, workspace) + if (workspaceInfo === null) { + throw new Error(`workspace ${workspace} not found`) + } + const token = generateToken(systemAccountEmail, { name: workspaceInfo.workspace, productId }) + const endpoint = await getTransactorEndpoint(token, 'external') + console.log('assigning to workspace', workspaceInfo, endpoint) + const client = await createClient(endpoint, token) + console.log('assigning to workspace connected', workspaceInfo, endpoint) + await assignWorkspace( + toolCtx, + db, + productId, + null, + email, + workspaceInfo.workspace, + AccountRole.User, + undefined, + undefined, + client + ) + await client.close() } catch (err: any) { console.error(err) } @@ -328,7 +344,16 @@ export function devTool ( const { mongodbUri } = prepareTools() console.log(`set user ${email} role for ${workspace}...`) await withDatabase(mongodbUri, async (db) => { - await setRole(toolCtx, db, email, workspace, productId, role) + const workspaceInfo = await getWorkspaceById(db, productId, workspace) + if (workspaceInfo === null) { + throw new Error(`workspace ${workspace} not found`) + } + console.log('assigning to workspace', workspaceInfo) + const token = generateToken(systemAccountEmail, { name: workspaceInfo.workspace, productId }) + const endpoint = await getTransactorEndpoint(token, 'external') + const client = await createClient(endpoint, token) + await setRole(toolCtx, db, email, workspace, productId, role, client) + await client.close() }) }) diff --git a/models/activity/src/index.ts b/models/activity/src/index.ts index 81c652c5f7..5e1e3f1192 100644 --- a/models/activity/src/index.ts +++ b/models/activity/src/index.ts @@ -362,10 +362,7 @@ export function createModel (builder: Builder): void { builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, { domain: DOMAIN_ACTIVITY, - indexes: [ - { attachedTo: 1, createdOn: 1 }, - { attachedTo: 1, createdOn: -1 } - ], + indexes: [{ keys: { attachedTo: 1, createdOn: 1 } }, { keys: { attachedTo: 1, createdOn: -1 } }], disabled: [ { modifiedOn: 1 }, { createdOn: -1 }, diff --git a/models/core/src/core.ts b/models/core/src/core.ts index dffd58af23..783c86ab83 100644 --- a/models/core/src/core.ts +++ b/models/core/src/core.ts @@ -36,7 +36,7 @@ import { type DomainIndexConfiguration, type Enum, type EnumOf, - type FieldIndex, + type FieldIndexConfig, type FullTextSearchContext, type IndexStageState, type IndexingConfiguration, @@ -134,7 +134,7 @@ export class TAttachedDoc extends TDoc implements AttachedDoc { export class TBlob extends TDoc implements Blob { @Prop(TypeString(), core.string.Blob) @ReadOnly() - @Index(IndexKind.Indexed) + // @Index(IndexKind.Indexed) provider!: string @Prop(TypeString(), core.string.BlobContentType) @@ -340,7 +340,6 @@ export class TDocIndexState extends TDoc implements DocIndexState { stages!: Record @Prop(TypeString(), getEmbeddedLabel('Generation')) - @Index(IndexKind.Indexed) @Hidden() generationId?: string } @@ -371,7 +370,7 @@ export class TConfiguration extends TDoc implements Configuration { @MMixin(core.mixin.IndexConfiguration, core.class.Class) export class TIndexConfiguration extends TClass implements IndexingConfiguration { - indexes!: FieldIndex[] + indexes!: (string | FieldIndexConfig)[] searchDisabled!: boolean } diff --git a/models/core/src/index.ts b/models/core/src/index.ts index c4df62fdb6..a409e7bbcd 100644 --- a/models/core/src/index.ts +++ b/models/core/src/index.ts @@ -194,26 +194,32 @@ export function createModel (builder: Builder): void { core.class.Class, core.mixin.IndexConfiguration, { - indexes: [ - 'tx.objectId', - 'tx.operations.attachedTo', - 'space', - { - objectSpace: 1, - _id: 1, - modifiedOn: 1 - }, - { - objectSpace: 1, - modifiedBy: 1, - objectClass: 1 - } - ] + indexes: ['tx.objectId', 'tx.operations.attachedTo'] } ) builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, { domain: DOMAIN_TX, - disabled: [{ space: 1 }, { objectClass: 1 }, { createdBy: 1 }, { createdBy: -1 }, { createdOn: -1 }] + disabled: [ + { space: 1 }, + { objectClass: 1 }, + { createdBy: 1 }, + { createdBy: -1 }, + { createdOn: -1 }, + { modifiedBy: 1 }, + { objectSpace: 1 } + ], + indexes: [ + { + keys: { + objectSpace: 1, + _id: 1, + modifiedOn: 1 + }, + filter: { + objectSpace: core.space.Model + } + } + ] }) builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, { @@ -299,20 +305,13 @@ export function createModel (builder: Builder): void { { indexes: [ { - _class: 1, - stages: 1, - _id: 1, - modifiedOn: 1 - }, - { - _class: 1, - _id: 1, - modifiedOn: 1 - }, - { - _class: 1, - _id: 1, - objectClass: 1 + keys: { + _class: 1, + stages: 1, + _id: 1, + modifiedOn: 1 + }, + sparse: true } ] } diff --git a/models/core/src/tx.ts b/models/core/src/tx.ts index 956ccccd09..c12cfd1e09 100644 --- a/models/core/src/tx.ts +++ b/models/core/src/tx.ts @@ -46,7 +46,7 @@ import { TDoc } from './core' @Model(core.class.Tx, core.class.Doc, DOMAIN_TX) export class TTx extends TDoc implements Tx { @Prop(TypeRef(core.class.Space), core.string.Space) - @Index(IndexKind.Indexed) + // @Index(IndexKind.Indexed) @Hidden() objectSpace!: Ref } @@ -62,7 +62,7 @@ export class TTxCUD extends TTx implements TxCUD { objectId!: Ref @Prop(TypeRef(core.class.Class), core.string.ClassLabel) - @Index(IndexKind.Indexed) + // @Index(IndexKind.Indexed) @Hidden() objectClass!: Ref> } diff --git a/models/notification/src/index.ts b/models/notification/src/index.ts index e673ce41ba..5e7ecfb620 100644 --- a/models/notification/src/index.ts +++ b/models/notification/src/index.ts @@ -619,9 +619,10 @@ export function createModel (builder: Builder): void { }, presenter: notification.component.ReactionNotificationPresenter }) + builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, { domain: DOMAIN_NOTIFICATION, - indexes: [{ user: 1, archived: 1 }], + indexes: [{ keys: { user: 1, archived: 1 } }], disabled: [{ modifiedOn: 1 }, { modifiedBy: 1 }, { createdBy: 1 }, { isViewed: 1 }, { hidden: 1 }] }) diff --git a/packages/core/src/classes.ts b/packages/core/src/classes.ts index 5c979c590e..df6802b66a 100644 --- a/packages/core/src/classes.ts +++ b/packages/core/src/classes.ts @@ -15,6 +15,7 @@ // import type { Asset, IntlString, Plugin } from '@hcengineering/platform' +import type { DocumentQuery } from './storage' /** * @public @@ -122,6 +123,7 @@ export enum IndexKind { * Also mean to include into Elastic search. */ Indexed, + // Same as indexed but for descending IndexedDsc } @@ -623,6 +625,12 @@ export type FieldIndex = { [P in keyof T]?: IndexOrder } & Record +export interface FieldIndexConfig { + sparse?: boolean + filter?: Omit, '$search'> + keys: FieldIndex | string +} + /** * @public * @@ -630,7 +638,7 @@ export type FieldIndex = { */ export interface IndexingConfiguration extends Class { // Define a list of extra index definitions. - indexes: (FieldIndex | string)[] + indexes: (string | FieldIndexConfig)[] searchDisabled?: boolean } @@ -643,7 +651,7 @@ export interface DomainIndexConfiguration extends Doc { disabled?: (FieldIndex | string)[] // Additional indexes we could like to enabled - indexes?: (FieldIndex | string)[] + indexes?: (FieldIndexConfig | string)[] skip?: string[] } diff --git a/packages/core/src/measurements/context.ts b/packages/core/src/measurements/context.ts index 7ce37d48de..d88054cf68 100644 --- a/packages/core/src/measurements/context.ts +++ b/packages/core/src/measurements/context.ts @@ -11,12 +11,12 @@ export class MeasureMetricsContext implements MeasureContext { private readonly params: ParamsType logger: MeasureLogger metrics: Metrics - private readonly done: (value?: number) => void + private readonly done: (value?: number, override?: boolean) => void constructor ( name: string, params: ParamsType, - fullParams: FullParamsType = {}, + fullParams: FullParamsType | (() => FullParamsType) = {}, metrics: Metrics = newMetrics(), logger?: MeasureLogger, readonly parent?: MeasureContext, @@ -25,8 +25,21 @@ export class MeasureMetricsContext implements MeasureContext { this.name = name this.params = params this.metrics = metrics + this.metrics.namedParams = this.metrics.namedParams ?? {} + for (const [k, v] of Object.entries(params)) { + if (this.metrics.namedParams[k] !== v) { + this.metrics.namedParams[k] = v + } else { + this.metrics.namedParams[k] = '*' + } + } this.done = measure(metrics, params, fullParams, (spend) => { - this.logger.logOperation(this.name, spend, { ...params, ...fullParams, ...(this.logParams ?? {}) }) + this.logger.logOperation(this.name, spend, { + ...params, + ...(typeof fullParams === 'function' ? fullParams() : fullParams), + ...fullParams, + ...(this.logParams ?? {}) + }) }) const errorPrinter = ({ message, stack, ...rest }: Error): object => ({ @@ -63,12 +76,17 @@ export class MeasureMetricsContext implements MeasureContext { } } - measure (name: string, value: number): void { + measure (name: string, value: number, override?: boolean): void { const c = new MeasureMetricsContext('#' + name, {}, {}, childMetrics(this.metrics, ['#' + name]), this.logger, this) - c.done(value) + c.done(value, override) } - newChild (name: string, params: ParamsType, fullParams?: FullParamsType, logger?: MeasureLogger): MeasureContext { + newChild ( + name: string, + params: ParamsType, + fullParams?: FullParamsType | (() => FullParamsType), + logger?: MeasureLogger + ): MeasureContext { return new MeasureMetricsContext( name, params, @@ -84,7 +102,7 @@ export class MeasureMetricsContext implements MeasureContext { name: string, params: ParamsType, op: (ctx: MeasureContext) => T | Promise, - fullParams?: ParamsType + fullParams?: ParamsType | (() => FullParamsType) ): Promise { const c = this.newChild(name, params, fullParams, this.logger) try { diff --git a/packages/core/src/measurements/metrics.ts b/packages/core/src/measurements/metrics.ts index e77e2fcd82..fbef8f77f9 100644 --- a/packages/core/src/measurements/metrics.ts +++ b/packages/core/src/measurements/metrics.ts @@ -18,7 +18,8 @@ export function newMetrics (): Metrics { operations: 0, value: 0, measurements: {}, - params: {} + params: {}, + namedParams: {} } } @@ -27,18 +28,32 @@ function getUpdatedTopResult ( time: number, params: FullParamsType ): Metrics['topResult'] { - if (current === undefined || current.length < 3 || current.some((it) => it.value < time)) { - const result = [ - ...(current ?? []), - { - value: time, - params: cutObjectArray(params) - } - ] - result.sort((a, b) => b.value - a.value) - return result.slice(0, 3) + if (time === 0) { + return current + } + const result: Metrics['topResult'] = current ?? [] + + const newValue = { + value: time, + params: cutObjectArray(params) + } + + if (result.length > 6) { + if (result[0].value < newValue.value) { + result[0] = newValue + return result + } + if (result[result.length - 1].value > newValue.value) { + result[result.length - 1] = newValue + return result + } + + // Shift the middle + return [result[0], newValue, ...result.slice(1, 3), result[5]] + } else { + result.push(newValue) + return result } - return current } /** @@ -48,12 +63,14 @@ function getUpdatedTopResult ( export function measure ( metrics: Metrics, params: ParamsType, - fullParams: FullParamsType = {}, + fullParams: FullParamsType | (() => FullParamsType) = {}, endOp?: (spend: number) => void ): () => void { const st = Date.now() - return (value?: number) => { + return (value?: number, override?: boolean) => { const ed = Date.now() + + const fParams = typeof fullParams === 'function' ? fullParams() : fullParams // Update params if required for (const [k, v] of Object.entries(params)) { let params = metrics.params[k] @@ -70,16 +87,24 @@ export function measure ( } params[vKey] = param } - param.value += value ?? ed - st - param.operations++ + if (override === true) { + metrics.operations = value ?? ed - st + } else { + param.value += value ?? ed - st + param.operations++ + } - param.topResult = getUpdatedTopResult(param.topResult, ed - st, fullParams) + param.topResult = getUpdatedTopResult(param.topResult, ed - st, fParams) } // Update leaf data - metrics.value += value ?? ed - st - metrics.operations++ + if (override === true) { + metrics.operations = value ?? ed - st + } else { + metrics.value += value ?? ed - st + metrics.operations++ + } - metrics.topResult = getUpdatedTopResult(metrics.topResult, ed - st, fullParams) + metrics.topResult = getUpdatedTopResult(metrics.topResult, ed - st, fParams) endOp?.(ed - st) } } @@ -136,7 +161,8 @@ export function metricsAggregate (m: Metrics, limit: number = -1): Metrics { measurements: ms, params: m.params, value: sumVal, - topResult: m.topResult + topResult: m.topResult, + namedParams: m.namedParams } } diff --git a/packages/core/src/measurements/types.ts b/packages/core/src/measurements/types.ts index e07fa2d1d2..fbe77be055 100644 --- a/packages/core/src/measurements/types.ts +++ b/packages/core/src/measurements/types.ts @@ -29,6 +29,7 @@ export interface MetricsData { * @public */ export interface Metrics extends MetricsData { + namedParams: ParamsType params: Record> measurements: Record } @@ -59,7 +60,7 @@ export interface MeasureContext { name: string, params: ParamsType, op: (ctx: MeasureContext) => T | Promise, - fullParams?: FullParamsType + fullParams?: FullParamsType | (() => FullParamsType) ) => Promise withLog: ( @@ -73,7 +74,7 @@ export interface MeasureContext { parent?: MeasureContext - measure: (name: string, value: number) => void + measure: (name: string, value: number, override?: boolean) => void // Capture error error: (message: string, obj?: Record) => void diff --git a/packages/core/src/utils.ts b/packages/core/src/utils.ts index c567ddcc16..2fe36d2e10 100644 --- a/packages/core/src/utils.ts +++ b/packages/core/src/utils.ts @@ -365,11 +365,15 @@ export class RateLimiter { this.rate = rate } + notify: (() => void)[] = [] + async exec = any>(op: (args?: B) => Promise, args?: B): Promise { const processingId = this.idCounter++ - while (this.processingQueue.size > this.rate) { - await Promise.race(this.processingQueue.values()) + while (this.processingQueue.size >= this.rate) { + await new Promise((resolve) => { + this.notify.push(resolve) + }) } try { const p = op(args) @@ -377,6 +381,10 @@ export class RateLimiter { return await p } finally { this.processingQueue.delete(processingId) + const n = this.notify.shift() + if (n !== undefined) { + n() + } } } @@ -384,10 +392,7 @@ export class RateLimiter { if (this.processingQueue.size < this.rate) { void this.exec(op, args) } else { - while (this.processingQueue.size > this.rate) { - await Promise.race(this.processingQueue.values()) - } - void this.exec(op, args) + await this.exec(op, args) } } diff --git a/plugins/workbench-resources/src/components/ServerManagerGeneral.svelte b/plugins/workbench-resources/src/components/ServerManagerGeneral.svelte index 4e4d38a31b..a0ca57db40 100644 --- a/plugins/workbench-resources/src/components/ServerManagerGeneral.svelte +++ b/plugins/workbench-resources/src/components/ServerManagerGeneral.svelte @@ -56,11 +56,12 @@ ops = 0 }, 1000) const rate = new RateLimiter(commandsToSendParallel) + const client = getClient() const doOp = async () => { const st = Date.now() active++ - await getClient().createDoc(core.class.BenchmarkDoc, core.space.Configuration, { + await client.createDoc(core.class.BenchmarkDoc, core.space.Configuration, { source: genData(dataSize), request: { documents: 1, diff --git a/plugins/workbench-resources/src/components/statistics/MetricsInfo.svelte b/plugins/workbench-resources/src/components/statistics/MetricsInfo.svelte index e3e4159567..78108061ae 100644 --- a/plugins/workbench-resources/src/components/statistics/MetricsInfo.svelte +++ b/plugins/workbench-resources/src/components/statistics/MetricsInfo.svelte @@ -1,7 +1,9 @@ + + + +
+
+
+ {#each Object.entries(params) as kv} +
+ {kv[0]}: {typeof kv[1] === 'object' ? JSON.stringify(kv[1]) : kv[1]} +
+ {/each} +
+ +
+ + diff --git a/pods/account/Dockerfile b/pods/account/Dockerfile index f0d3230221..81527977b8 100644 --- a/pods/account/Dockerfile +++ b/pods/account/Dockerfile @@ -3,7 +3,7 @@ FROM node:20 WORKDIR /usr/src/app COPY bundle/bundle.js ./ -RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd --unsafe-perm +RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd snappy --unsafe-perm RUN apt-get update RUN apt-get install libjemalloc2 diff --git a/pods/backup/Dockerfile b/pods/backup/Dockerfile index c2ba2e0a73..690eec7953 100644 --- a/pods/backup/Dockerfile +++ b/pods/backup/Dockerfile @@ -3,7 +3,7 @@ FROM node:20 WORKDIR /usr/src/app -RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd --unsafe-perm +RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd snappy --unsafe-perm COPY bundle/bundle.js ./ EXPOSE 3000 diff --git a/pods/collaborator/Dockerfile b/pods/collaborator/Dockerfile index 95bcc8195c..101ba42731 100644 --- a/pods/collaborator/Dockerfile +++ b/pods/collaborator/Dockerfile @@ -2,7 +2,7 @@ FROM node:20 WORKDIR /usr/src/app -RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd --unsafe-perm +RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd snappy --unsafe-perm RUN apt-get update RUN apt-get install libjemalloc2 diff --git a/pods/front/Dockerfile b/pods/front/Dockerfile index 3842ede82f..ea4a47bff9 100644 --- a/pods/front/Dockerfile +++ b/pods/front/Dockerfile @@ -3,7 +3,7 @@ FROM node:20 ENV NODE_ENV production WORKDIR /app -RUN npm install --ignore-scripts=false --verbose sharp@v0.32.6 bufferutil utf-8-validate @mongodb-js/zstd --unsafe-perm +RUN npm install --ignore-scripts=false --verbose sharp@v0.32.6 bufferutil utf-8-validate @mongodb-js/zstd snappy --unsafe-perm RUN apt-get update RUN apt-get install libjemalloc2 diff --git a/pods/server/Dockerfile b/pods/server/Dockerfile index d2663510e6..c733cd4a02 100644 --- a/pods/server/Dockerfile +++ b/pods/server/Dockerfile @@ -3,7 +3,7 @@ FROM node:20 ENV NODE_ENV production WORKDIR /app -RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd --unsafe-perm +RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd snappy --unsafe-perm RUN npm install --ignore-scripts=false --verbose uNetworking/uWebSockets.js#v20.43.0 RUN apt-get update diff --git a/server-plugins/tracker-resources/src/index.ts b/server-plugins/tracker-resources/src/index.ts index 5d896be831..6ed0f03155 100644 --- a/server-plugins/tracker-resources/src/index.ts +++ b/server-plugins/tracker-resources/src/index.ts @@ -298,12 +298,19 @@ async function doTimeReportUpdate (cud: TxCUD, tx: Tx, control: switch (cud._class) { case core.class.TxCreateDoc: { const ccud = cud as TxCreateDoc - const res = [ - control.txFactory.createTxUpdateDoc(parentTx.objectClass, parentTx.objectSpace, parentTx.objectId, { - $inc: { reportedTime: ccud.attributes.value } - }) - ] const [currentIssue] = await control.findAll(tracker.class.Issue, { _id: parentTx.objectId }, { limit: 1 }) + const res = [ + control.txFactory.createTxUpdateDoc( + parentTx.objectClass, + parentTx.objectSpace, + parentTx.objectId, + { + $inc: { reportedTime: ccud.attributes.value } + }, + false, + currentIssue.modifiedOn + ) + ] currentIssue.reportedTime += ccud.attributes.value currentIssue.remainingTime = Math.max(0, currentIssue.estimation - currentIssue.reportedTime) updateIssueParentEstimations(currentIssue, res, control, currentIssue.parents, currentIssue.parents) @@ -325,9 +332,16 @@ async function doTimeReportUpdate (cud: TxCUD, tx: Tx, control: const [currentIssue] = await control.findAll(tracker.class.Issue, { _id: parentTx.objectId }, { limit: 1 }) if (doc !== undefined) { res.push( - control.txFactory.createTxUpdateDoc(parentTx.objectClass, parentTx.objectSpace, parentTx.objectId, { - $inc: { reportedTime: upd.operations.value - doc.value } - }) + control.txFactory.createTxUpdateDoc( + parentTx.objectClass, + parentTx.objectSpace, + parentTx.objectId, + { + $inc: { reportedTime: upd.operations.value - doc.value } + }, + false, + currentIssue.modifiedOn + ) ) currentIssue.reportedTime -= doc.value currentIssue.reportedTime += upd.operations.value @@ -350,13 +364,19 @@ async function doTimeReportUpdate (cud: TxCUD, tx: Tx, control: ).map(TxProcessor.extractTx) const doc: TimeSpendReport | undefined = TxProcessor.buildDoc2Doc(logTxes) if (doc !== undefined) { - const res = [ - control.txFactory.createTxUpdateDoc(parentTx.objectClass, parentTx.objectSpace, parentTx.objectId, { - $inc: { reportedTime: -1 * doc.value } - }) - ] - const [currentIssue] = await control.findAll(tracker.class.Issue, { _id: parentTx.objectId }, { limit: 1 }) + const res = [ + control.txFactory.createTxUpdateDoc( + parentTx.objectClass, + parentTx.objectSpace, + parentTx.objectId, + { + $inc: { reportedTime: -1 * doc.value } + }, + false, + currentIssue.modifiedOn + ) + ] currentIssue.reportedTime -= doc.value currentIssue.remainingTime = Math.max(0, currentIssue.estimation - currentIssue.reportedTime) updateIssueParentEstimations(currentIssue, res, control, currentIssue.parents, currentIssue.parents) diff --git a/server/core/src/adapter.ts b/server/core/src/adapter.ts index bb52cd634b..43496ae8fd 100644 --- a/server/core/src/adapter.ts +++ b/server/core/src/adapter.ts @@ -19,7 +19,7 @@ import { type DocumentQuery, type DocumentUpdate, type Domain, - type FieldIndex, + type FieldIndexConfig, type FindOptions, type FindResult, type Hierarchy, @@ -37,7 +37,7 @@ import { type StorageAdapter } from './storage' export interface DomainHelperOperations { create: (domain: Domain) => Promise exists: (domain: Domain) => boolean - createIndex: (domain: Domain, value: string | FieldIndex, options?: { name: string }) => Promise + createIndex: (domain: Domain, value: string | FieldIndexConfig, options?: { name: string }) => Promise dropIndex: (domain: Domain, name: string) => Promise listIndexes: (domain: Domain) => Promise<{ name: string }[]> hasDocuments: (domain: Domain, count: number) => Promise @@ -94,7 +94,7 @@ export interface DbAdapter { helper?: () => DomainHelperOperations createIndexes: (domain: Domain, config: Pick, 'indexes'>) => Promise - removeOldIndex: (domain: Domain, deletePattern: RegExp, keepPattern: RegExp) => Promise + removeOldIndex: (domain: Domain, deletePattern: RegExp[], keepPattern: RegExp[]) => Promise close: () => Promise findAll: ( diff --git a/server/core/src/indexer/indexer.ts b/server/core/src/indexer/indexer.ts index 61ebcd92ff..e6be20baf1 100644 --- a/server/core/src/indexer/indexer.ts +++ b/server/core/src/indexer/indexer.ts @@ -335,25 +335,35 @@ export class FullTextIndexPipeline implements FullTextPipeline { if (!this.indexesCreated) { this.indexesCreated = true // We need to be sure we have individual indexes per stage. - const oldStagesRegex = [/fld-v.*/, /cnt-v.*/, /fts-v.*/, /sum-v.*/] + const oldStagesRegex = [/fld-v.*/, /cnt-v.*/, /fts-v.*/, /sum-v.*/, /emb-v.*/] + + const deletePattern: RegExp[] = [] + const keepPattern: RegExp[] = [] for (const st of this.stages) { if (this.cancelling) { return } const regexp = oldStagesRegex.find((r) => r.test(st.stageId)) if (regexp !== undefined) { - await this.storage.removeOldIndex(DOMAIN_DOC_INDEX_STATE, regexp, new RegExp(st.stageId)) + deletePattern.push(regexp) + keepPattern.push(new RegExp(st.stageId)) + } + } + if (deletePattern.length > 0) { + await this.storage.removeOldIndex(DOMAIN_DOC_INDEX_STATE, deletePattern, keepPattern) + } + + for (const st of this.stages) { + if (this.cancelling) { + return } await this.storage.createIndexes(DOMAIN_DOC_INDEX_STATE, { indexes: [ { - ['stages.' + st.stageId]: 1 - }, - { - _class: 1, - _id: 1, - ['stages.' + st.stageId]: 1, - removed: 1 + keys: { + ['stages.' + st.stageId]: 1 + }, + sparse: true } ] }) @@ -459,23 +469,21 @@ export class FullTextIndexPipeline implements FullTextPipeline { .filter((it) => it[1] > 3) .map((it) => it[0]) + const q: DocumentQuery = { + [`stages.${st.stageId}`]: { $ne: st.stageValue }, + removed: false + } + if (toSkip.length > 0) { + q._id = { $nin: toSkip } + } let result = await ctx.with( 'get-to-index', {}, async (ctx) => - await this.storage.findAll( - ctx, - core.class.DocIndexState, - { - [`stages.${st.stageId}`]: { $ne: st.stageValue }, - _id: { $nin: toSkip }, - removed: false - }, - { - sort: { modifiedOn: SortingOrder.Descending }, - limit: globalIndexer.processingSize - } - ) + await this.storage.findAll(ctx, core.class.DocIndexState, q, { + sort: { modifiedOn: SortingOrder.Descending }, + limit: globalIndexer.processingSize + }) ) const toRemove: DocIndexState[] = [] // Check and remove missing class documents. diff --git a/server/core/src/mem.ts b/server/core/src/mem.ts index 49487ce666..3d053675f3 100644 --- a/server/core/src/mem.ts +++ b/server/core/src/mem.ts @@ -50,7 +50,7 @@ export class DummyDbAdapter implements DbAdapter { async init (): Promise {} async createIndexes (domain: Domain, config: Pick, 'indexes'>): Promise {} - async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise {} + async removeOldIndex (domain: Domain, deletePattern: RegExp[], keepPattern: RegExp[]): Promise {} async tx (ctx: MeasureContext, ...tx: Tx[]): Promise { return [] diff --git a/server/core/src/server/domainHelper.ts b/server/core/src/server/domainHelper.ts index 919caf25ff..64f3c01f3c 100644 --- a/server/core/src/server/domainHelper.ts +++ b/server/core/src/server/domainHelper.ts @@ -3,7 +3,7 @@ import type { Doc, Domain, DomainIndexConfiguration, - FieldIndex, + FieldIndexConfig, Hierarchy, MeasureContext, ModelDb, @@ -14,7 +14,7 @@ import { deepEqual } from 'fast-equals' import type { DomainHelper, DomainHelperOperations } from '../adapter' export class DomainIndexHelperImpl implements DomainHelper { - domains = new Map>>() + domains = new Map>>() domainConfigurations: DomainIndexConfiguration[] = [] constructor ( readonly ctx: MeasureContext, @@ -33,7 +33,7 @@ export class DomainIndexHelperImpl implements DomainHelper { ctx.error('failed to find domain index configuration', { err }) } - this.domains = new Map>>() + this.domains = new Map>>() // Find all domains and indexed fields inside for (const c of classes) { try { @@ -42,14 +42,15 @@ export class DomainIndexHelperImpl implements DomainHelper { continue } const attrs = hierarchy.getAllAttributes(c._id) - const domainAttrs = this.domains.get(domain) ?? new Set>() + const domainAttrs = this.domains.get(domain) ?? new Set>() for (const a of attrs.values()) { - if (a.index !== undefined && (a.index === IndexKind.Indexed || a.index === IndexKind.IndexedDsc)) { - if (a.index === IndexKind.Indexed) { - domainAttrs.add(a.name) - } else { - domainAttrs.add({ [a.name]: IndexOrder.Descending }) - } + if (a.index !== undefined && a.index !== IndexKind.FullText) { + domainAttrs.add({ + keys: { + [a.name]: a.index === IndexKind.Indexed ? IndexOrder.Ascending : IndexOrder.Descending + }, + sparse: true // Default to sparse indexes + }) } } @@ -57,7 +58,11 @@ export class DomainIndexHelperImpl implements DomainHelper { if (hierarchy.hasMixin(c, core.mixin.IndexConfiguration)) { const config = hierarchy.as(c, core.mixin.IndexConfiguration) for (const attr of config.indexes) { - domainAttrs.add(attr) + if (typeof attr === 'string') { + domainAttrs.add({ keys: { [attr]: IndexOrder.Ascending }, sparse: true }) + } else { + domainAttrs.add(attr) + } } } @@ -97,7 +102,7 @@ export class DomainIndexHelperImpl implements DomainHelper { // Do not need to create, since not force and no documents. return false } - const bb: (string | FieldIndex)[] = [] + const bb: (string | FieldIndexConfig)[] = [] const added = new Set() try { @@ -107,18 +112,26 @@ export class DomainIndexHelperImpl implements DomainHelper { if (has50Documents) { for (const vv of [...(domainInfo?.values() ?? []), ...(cfg?.indexes ?? [])]) { try { - const name = - typeof vv === 'string' - ? `${vv}_1` - : Object.entries(vv) - .map(([key, val]) => `${key}_${val}`) - .join('_') + let name: string + if (typeof vv === 'string') { + name = `${vv}_sp_1` + } else { + let pfix = '' + if (vv.filter !== undefined) { + pfix += '_fi' + } else if (vv.sparse === true) { + pfix += '_sp' + } + name = Object.entries(vv.keys) + .map(([key, val]) => `${key + pfix}_${val}`) + .join('_') + } // Check if index is disabled or not const isDisabled = cfg?.disabled?.some((it) => { const _it = typeof it === 'string' ? { [it]: 1 } : it - const _vv = typeof vv === 'string' ? { [vv]: 1 } : vv + const _vv = typeof vv === 'string' ? { [vv]: 1 } : vv.keys return deepEqual(_it, _vv) }) ?? false if (isDisabled) { diff --git a/server/elastic/src/backup.ts b/server/elastic/src/backup.ts index ccc80f8a81..eadbe947a2 100644 --- a/server/elastic/src/backup.ts +++ b/server/elastic/src/backup.ts @@ -75,7 +75,7 @@ class ElasticDataAdapter implements DbAdapter { } async createIndexes (domain: Domain, config: Pick, 'indexes'>): Promise {} - async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise {} + async removeOldIndex (domain: Domain, deletePattern: RegExp[], keepPattern: RegExp[]): Promise {} async close (): Promise { await this.client.close() diff --git a/server/middleware/src/spaceSecurity.ts b/server/middleware/src/spaceSecurity.ts index c9fe137cad..1494f11eb4 100644 --- a/server/middleware/src/spaceSecurity.ts +++ b/server/middleware/src/spaceSecurity.ts @@ -453,12 +453,15 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar const map = new Set>() const field = this.getKey(domain) while (true) { + const nin = Array.from(map.values()) const spaces = await this.storage.findAll( ctx, core.class.Doc, - { - [field]: { $nin: Array.from(map.values()) } - }, + nin.length > 0 + ? { + [field]: { $nin: nin } + } + : {}, { projection: { [field]: 1 }, limit: 1000, diff --git a/server/mongo/src/rawAdapter.ts b/server/mongo/src/rawAdapter.ts index b3e5e9adb2..3a315f71bd 100644 --- a/server/mongo/src/rawAdapter.ts +++ b/server/mongo/src/rawAdapter.ts @@ -164,7 +164,10 @@ export function createRawMongoDBAdapter (url: string): RawDBAdapter { } } } - }) + }), + { + ordered: false + } ) }) } catch (err: any) { diff --git a/server/mongo/src/storage.ts b/server/mongo/src/storage.ts index ca0edfef47..bf01af5cc7 100644 --- a/server/mongo/src/storage.ts +++ b/server/mongo/src/storage.ts @@ -16,12 +16,15 @@ import core, { DOMAIN_MODEL, DOMAIN_TX, + RateLimiter, SortingOrder, TxProcessor, cutObjectArray, escapeLikeForRegexp, + groupByArray, isOperator, toFindResult, + withContext, type AttachedDoc, type Class, type Doc, @@ -32,6 +35,7 @@ import core, { type EnumOf, type FindOptions, type FindResult, + type FullParamsType, type Hierarchy, type IndexingConfiguration, type Lookup, @@ -80,8 +84,8 @@ import { } from 'mongodb' import { DBCollectionHelper, getMongoClient, getWorkspaceDB, type MongoClientReference } from './utils' -function translateDoc (doc: Doc): Document { - return { ...doc, '%hash%': null } +function translateDoc (doc: Doc): Doc { + return { ...doc, '%hash%': null } as any } function isLookupQuery (query: DocumentQuery): boolean { @@ -121,7 +125,11 @@ export interface DbAdapterOptions { abstract class MongoAdapterBase implements DbAdapter { _db: DBCollectionHelper + findRateLimit = new RateLimiter(parseInt(process.env.FIND_RLIMIT ?? '10')) + rateLimit = new RateLimiter(parseInt(process.env.TX_RLIMIT ?? '1')) + constructor ( + readonly globalCtx: MeasureContext, protected readonly db: Db, protected readonly hierarchy: Hierarchy, protected readonly modelDb: ModelDb, @@ -142,22 +150,29 @@ abstract class MongoAdapterBase implements DbAdapter { } async createIndexes (domain: Domain, config: Pick, 'indexes'>): Promise { - for (const vv of config.indexes) { + for (const value of config.indexes) { try { - await this.collection(domain).createIndex(vv) + if (typeof value === 'string') { + await this.collection(domain).createIndex(value, { sparse: true }) + } else { + await this.collection(domain).createIndex(value.keys, { sparse: value.sparse ?? true }) + } } catch (err: any) { - console.error('failed to create index', domain, vv, err) + console.error('failed to create index', domain, value, err) } } } - async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise { + async removeOldIndex (domain: Domain, deletePattern: RegExp[], keepPattern: RegExp[]): Promise { try { const existingIndexes = await this.collection(domain).indexes() for (const existingIndex of existingIndexes) { if (existingIndex.name !== undefined) { const name: string = existingIndex.name - if (deletePattern.test(name) && !keepPattern.test(name)) { + if ( + deletePattern.some((it) => it.test(name)) && + (existingIndex.sparse !== true || !keepPattern.some((it) => it.test(name))) + ) { await this.collection(domain).dropIndex(name) } } @@ -604,6 +619,45 @@ abstract class MongoAdapterBase implements DbAdapter { return false } + findOps: number = 0 + txOps: number = 0 + opIndex: number = 0 + + async collectOps( + ctx: MeasureContext, + domain: Domain | undefined, + operation: 'find' | 'tx', + op: (ctx: MeasureContext) => Promise, + fullParam: FullParamsType + ): Promise { + const id = `${++this.opIndex}` + + if (operation === 'find') { + this.findOps++ + } else { + this.txOps++ + } + + const result = await ctx.with( + operation, + { domain }, + async (ctx) => await op(ctx), + () => ({ + ...fullParam, + id, + findOps: this.findOps, + txOps: this.txOps + }) + ) + if (operation === 'find') { + this.findOps-- + } else { + this.txOps-- + } + return result + } + + @withContext('find-all') async findAll( ctx: MeasureContext, _class: Ref>, @@ -612,58 +666,78 @@ abstract class MongoAdapterBase implements DbAdapter { domain?: Domain // Allow to find for Doc's in specified domain only. } ): Promise> { - if (options != null && (options?.lookup != null || this.isEnumSort(_class, options) || this.isRulesSort(options))) { - return await ctx.with('pipeline', {}, async (ctx) => await this.findWithPipeline(ctx, _class, query, options), { - _class, - query, - options - }) - } - const domain = options?.domain ?? this.hierarchy.getDomain(_class) - const coll = this.collection(domain) - const mongoQuery = this.translateQuery(_class, query) + return await this.findRateLimit.exec(async () => { + return await this.collectOps( + this.globalCtx, + this.hierarchy.findDomain(_class), + 'find', + async (ctx) => { + if ( + options != null && + (options?.lookup != null || this.isEnumSort(_class, options) || this.isRulesSort(options)) + ) { + return await ctx.with( + 'pipeline', + {}, + async (ctx) => await this.findWithPipeline(ctx, _class, query, options), + { + _class, + query, + options + } + ) + } + const domain = options?.domain ?? this.hierarchy.getDomain(_class) + const coll = this.collection(domain) + const mongoQuery = this.translateQuery(_class, query) - let cursor = coll.find(mongoQuery, { - checkKeys: false + let cursor = coll.find(mongoQuery) + + if (options?.projection !== undefined) { + const projection = this.calcProjection(options, _class) + if (projection != null) { + cursor = cursor.project(projection) + } + } + let total: number = -1 + if (options != null) { + if (options.sort !== undefined) { + const sort = this.collectSort(options, _class) + if (sort !== undefined) { + cursor = cursor.sort(sort) + } + } + if (options.limit !== undefined || typeof query._id === 'string') { + if (options.total === true) { + total = await coll.countDocuments(mongoQuery) + } + cursor = cursor.limit(options.limit ?? 1) + } + } + + // Error in case of timeout + try { + const res: T[] = await ctx.with('toArray', {}, async (ctx) => await toArray(cursor), { + mongoQuery, + options, + domain + }) + if (options?.total === true && options?.limit === undefined) { + total = res.length + } + return toFindResult(this.stripHash(res), total) + } catch (e) { + console.error('error during executing cursor in findAll', _class, cutObjectArray(query), options, e) + throw e + } + }, + { + _class, + query, + options + } + ) }) - - if (options?.projection !== undefined) { - const projection = this.calcProjection(options, _class) - if (projection != null) { - cursor = cursor.project(projection) - } - } - let total: number = -1 - if (options != null) { - if (options.sort !== undefined) { - const sort = this.collectSort(options, _class) - if (sort !== undefined) { - cursor = cursor.sort(sort) - } - } - if (options.limit !== undefined || typeof query._id === 'string') { - if (options.total === true) { - total = await coll.countDocuments(mongoQuery) - } - cursor = cursor.limit(options.limit ?? 1) - } - } - - // Error in case of timeout - try { - const res: T[] = await ctx.with('toArray', {}, async (ctx) => await toArray(cursor), { - mongoQuery, - options, - domain - }) - if (options?.total === true && options?.limit === undefined) { - total = res.length - } - return toFindResult(this.stripHash(res), total) - } catch (e) { - console.error('error during executing cursor in findAll', _class, cutObjectArray(query), options, e) - throw e - } } private collectSort( @@ -875,7 +949,10 @@ abstract class MongoAdapterBase implements DbAdapter { } } } - }) + }), + { + ordered: false + } ) }) } catch (err: any) { @@ -896,10 +973,15 @@ abstract class MongoAdapterBase implements DbAdapter { } } -interface DomainOperation { - raw: () => Promise - domain: Domain - bulk?: AnyBulkWriteOperation[] +interface OperationBulk { + add: Doc[] + update: Map, Partial> + + bulkOperations: AnyBulkWriteOperation[] + + findUpdate: Set> + + raw: (() => Promise)[] } class MongoAdapter extends MongoAdapterBase { @@ -907,100 +989,138 @@ class MongoAdapter extends MongoAdapterBase { await this._db.init() } - getOperations (tx: Tx): DomainOperation | undefined { + updateBulk (bulk: OperationBulk, tx: Tx): void { switch (tx._class) { case core.class.TxCreateDoc: - return this.txCreateDoc(tx as TxCreateDoc) + this.txCreateDoc(bulk, tx as TxCreateDoc) + break case core.class.TxCollectionCUD: - return this.txCollectionCUD(tx as TxCollectionCUD) + this.txCollectionCUD(bulk, tx as TxCollectionCUD) + break case core.class.TxUpdateDoc: - return this.txUpdateDoc(tx as TxUpdateDoc) + this.txUpdateDoc(bulk, tx as TxUpdateDoc) + break case core.class.TxRemoveDoc: - return this.txRemoveDoc(tx as TxRemoveDoc) + this.txRemoveDoc(bulk, tx as TxRemoveDoc) + break case core.class.TxMixin: - return this.txMixin(tx as TxMixin) + this.txMixin(bulk, tx as TxMixin) + break case core.class.TxApplyIf: return undefined + default: + console.error('Unknown/Unsupported operation:', tx._class, tx) + break } - - console.error('Unknown/Unsupported operation:', tx._class, tx) } + @withContext('tx') async tx (ctx: MeasureContext, ...txes: Tx[]): Promise { const result: TxResult[] = [] - const bulkOperations: DomainOperation[] = [] - - let lastDomain: Domain | undefined - - const bulkExecute = async (): Promise => { - if (lastDomain === undefined || bulkOperations.length === 0) { - return + const h = this.hierarchy + const byDomain = groupByArray(txes, (it) => { + if (TxProcessor.isExtendsCUD(it._class)) { + return h.findDomain((it as TxCUD).objectClass) } - const ops = bulkOperations.reduce((ops, op) => ops.concat(...(op.bulk ?? [])), []) - try { - await this.db.collection(lastDomain).bulkWrite(ops) - } catch (err: any) { - console.trace(err) - throw err - } - bulkOperations.splice(0, bulkOperations.length) - lastDomain = undefined - } + return undefined + }) - if (txes.length > 1) { - for (const tx of txes) { - const dop: DomainOperation | undefined = this.getOperations(tx) - if (dop === undefined) { - continue - } - if (dop.bulk === undefined) { - // Execute previous bulk and capture result. - await ctx.with( - 'bulkExecute', - {}, - async () => { - await bulkExecute() - }, - { txes: cutObjectArray(tx) } - ) - try { - result.push(await dop.raw()) - } catch (err: any) { - console.error(err) + for (const [domain, txs] of byDomain) { + if (domain === undefined) { + continue + } + const domainBulk: OperationBulk = { + add: [], + update: new Map(), + bulkOperations: [], + findUpdate: new Set(), + raw: [] + } + for (const t of txs) { + this.updateBulk(domainBulk, t) + } + if ( + domainBulk.add.length === 0 && + domainBulk.update.size === 0 && + domainBulk.bulkOperations.length === 0 && + domainBulk.findUpdate.size === 0 && + domainBulk.raw.length === 0 + ) { + continue + } + await this.rateLimit.exec(async () => { + await this.collectOps( + this.globalCtx, + domain, + 'tx', + async (ctx) => { + const coll = this.db.collection(domain) + + // Minir optimizations + // Add Remove optimization + + if (domainBulk.add.length > 0) { + await ctx.with('insertMany', {}, async () => { + await coll.insertMany(domainBulk.add, { ordered: false }) + }) + } + if (domainBulk.update.size > 0) { + // Extract similar update to update many if possible + // TODO: + await ctx.with('updateMany-bulk', {}, async () => { + await coll.bulkWrite( + Array.from(domainBulk.update.entries()).map((it) => ({ + updateOne: { + filter: { _id: it[0] }, + update: { + $set: it[1] + } + } + })), + { + ordered: false + } + ) + }) + } + if (domainBulk.bulkOperations.length > 0) { + await ctx.with('bulkWrite', {}, async () => { + await coll.bulkWrite(domainBulk.bulkOperations, { + ordered: false + }) + }) + } + if (domainBulk.findUpdate.size > 0) { + await ctx.with('find-result', {}, async () => { + const docs = await coll.find({ _id: { $in: Array.from(domainBulk.findUpdate) } }).toArray() + result.push(...docs) + }) + } + + if (domainBulk.raw.length > 0) { + await ctx.with('raw', {}, async () => { + for (const r of domainBulk.raw) { + result.push({ object: await r() }) + } + }) + } + }, + { + domain, + add: domainBulk.add.length, + update: domainBulk.update.size, + bulk: domainBulk.bulkOperations.length, + find: domainBulk.findUpdate.size, + raw: domainBulk.raw.length } - continue - } - if (lastDomain === undefined) { - lastDomain = dop.domain - } - if (lastDomain !== dop.domain) { - // If we have domain switch, let's execute previous bulk and start new one. - await ctx.with( - 'bulkExecute', - {}, - async () => { - await bulkExecute() - }, - { operations: cutObjectArray(bulkOperations) } - ) - lastDomain = dop.domain - } - bulkOperations.push(dop) - } - await ctx.with('bulkExecute', {}, async () => { - await bulkExecute() + ) }) - } else { - const r = await this.getOperations(txes[0])?.raw() - if (r !== undefined) { - result.push(r) - } } return result } - protected txCollectionCUD (tx: TxCollectionCUD): DomainOperation { + protected txCollectionCUD (bulk: OperationBulk, tx: TxCollectionCUD): void { // We need update only create transactions to contain attached, attachedToClass. if (tx.tx._class === core.class.TxCreateDoc) { const createTx = tx.tx as TxCreateDoc @@ -1013,24 +1133,18 @@ class MongoAdapter extends MongoAdapterBase { collection: tx.collection } } - return this.txCreateDoc(d) + this.txCreateDoc(bulk, d) + return } // We could cast since we know collection cud is supported. - return this.getOperations(tx.tx) as DomainOperation + this.updateBulk(bulk, tx.tx) } - protected txRemoveDoc (tx: TxRemoveDoc): DomainOperation { - const domain = this.hierarchy.getDomain(tx.objectClass) - return { - raw: async () => await this.collection(domain).deleteOne({ _id: tx.objectId }), - domain, - bulk: [{ deleteOne: { filter: { _id: tx.objectId } } }] - } + protected txRemoveDoc (bulk: OperationBulk, tx: TxRemoveDoc): void { + bulk.bulkOperations.push({ deleteOne: { filter: { _id: tx.objectId } } }) } - protected txMixin (tx: TxMixin): DomainOperation { - const domain = this.hierarchy.getDomain(tx.objectClass) - + protected txMixin (bulk: OperationBulk, tx: TxMixin): void { const filter = { _id: tx.objectId } const modifyOp = { modifiedBy: tx.modifiedBy, @@ -1051,38 +1165,29 @@ class MongoAdapter extends MongoAdapterBase { } } ] - return { - raw: async () => await this.collection(domain).bulkWrite(ops), - domain, - bulk: ops - } + bulk.bulkOperations.push(...ops) + return } const update = { ...this.translateMixinAttrs(tx.mixin, tx.attributes), $set: { ...modifyOp } } - return { - raw: async () => await this.collection(domain).updateOne(filter, update), - domain, - bulk: [ - { - updateOne: { - filter, - update - } - } - ] - } - } - const update = { $set: { ...this.translateMixinAttrs(tx.mixin, tx.attributes), ...modifyOp } } - return { - raw: async () => await this.collection(domain).updateOne(filter, update), - domain, - bulk: [ - { - updateOne: { - filter, - update - } + + bulk.bulkOperations.push({ + updateOne: { + filter, + update } - ] + }) + return + } + const update = { ...this.translateMixinAttrs(tx.mixin, tx.attributes), ...modifyOp } + + let upd = bulk.update.get(tx.objectId) + if (upd === undefined) { + upd = {} + bulk.update.set(tx.objectId, upd) + } + + for (const [k, v] of Object.entries(update)) { + ;(upd as any)[k] = v } } @@ -1106,23 +1211,12 @@ class MongoAdapter extends MongoAdapterBase { return attrs } - protected txCreateDoc (tx: TxCreateDoc): DomainOperation { + protected txCreateDoc (bulk: OperationBulk, tx: TxCreateDoc): void { const doc = TxProcessor.createDoc2Doc(tx) - const domain = this.hierarchy.getDomain(doc._class) - const tdoc = translateDoc(doc) - return { - raw: async () => await this.collection(domain).insertOne(tdoc), - domain, - bulk: [ - { - insertOne: { document: tdoc } - } - ] - } + bulk.add.push(translateDoc(doc)) } - protected txUpdateDoc (tx: TxUpdateDoc): DomainOperation { - const domain = this.hierarchy.getDomain(tx.objectClass) + protected txUpdateDoc (bulk: OperationBulk, tx: TxUpdateDoc): void { if (isOperator(tx.operations)) { const operator = Object.keys(tx.operations)[0] if (operator === '$move') { @@ -1163,11 +1257,7 @@ class MongoAdapter extends MongoAdapterBase { } } ] - return { - raw: async () => await this.collection(domain).bulkWrite(ops), - domain, - bulk: ops - } + bulk.bulkOperations.push(...ops) } else if (operator === '$update') { const keyval = (tx.operations as any).$update const arr = Object.keys(keyval)[0] @@ -1200,15 +1290,13 @@ class MongoAdapter extends MongoAdapterBase { } } ] - return { - raw: async () => await this.collection(domain).bulkWrite(ops), - domain, - bulk: ops - } + bulk.bulkOperations.push(...ops) } else { + const domain = this.hierarchy.getDomain(tx.objectClass) + if (tx.retrieve === true) { - const raw = async (): Promise => { - const result = await this.collection(domain).findOneAndUpdate( + bulk.raw.push(async () => { + const res = await this.collection(domain).findOneAndUpdate( { _id: tx.objectId }, { ...tx.operations, @@ -1220,76 +1308,72 @@ class MongoAdapter extends MongoAdapterBase { } as unknown as UpdateFilter, { returnDocument: 'after', includeResultMetadata: true } ) - return { object: result.value } - } - return { - raw, - domain, - bulk: undefined - } + return res.value as TxResult + }) } else { - const filter = { _id: tx.objectId } - const update = { - ...tx.operations, - $set: { - modifiedBy: tx.modifiedBy, - modifiedOn: tx.modifiedOn, - '%hash%': null + bulk.bulkOperations.push({ + updateOne: { + filter: { _id: tx.objectId }, + update: { + ...tx.operations, + $set: { + modifiedBy: tx.modifiedBy, + modifiedOn: tx.modifiedOn, + '%hash%': null + } + } } - } - return { - raw: async () => await this.collection(domain).updateOne(filter, update), - domain, - bulk: [{ updateOne: { filter, update } }] - } + }) } } } else { - const filter = { _id: tx.objectId } - const update = { - $set: { - ...tx.operations, - modifiedBy: tx.modifiedBy, - modifiedOn: tx.modifiedOn, - '%hash%': null - } + let upd = bulk.update.get(tx.objectId) + if (upd === undefined) { + upd = {} + bulk.update.set(tx.objectId, upd) } - const raw = - tx.retrieve === true - ? async (): Promise => { - const result = await this.db - .collection(domain) - .findOneAndUpdate(filter, update, { returnDocument: 'after', includeResultMetadata: true }) - return { object: result.value } - } - : async () => await this.collection(domain).updateOne(filter, update) - // Disable bulk for operators - return { - raw, - domain, - bulk: [{ updateOne: { filter, update } }] + for (const [k, v] of Object.entries({ + ...tx.operations, + modifiedBy: tx.modifiedBy, + modifiedOn: tx.modifiedOn, + '%hash%': null + })) { + ;(upd as any)[k] = v + } + + if (tx.retrieve === true) { + bulk.findUpdate.add(tx.objectId) } } } } class MongoTxAdapter extends MongoAdapterBase implements TxAdapter { - txColl: Collection | undefined + txColl: Collection | undefined async init (): Promise { await this._db.init(DOMAIN_TX) } + @withContext('tx') override async tx (ctx: MeasureContext, ...tx: Tx[]): Promise { if (tx.length === 0) { return [] } - await ctx.with('insertMany', {}, async () => await this.txCollection().insertMany(tx.map((it) => translateDoc(it)))) + await this.collectOps( + this.globalCtx, + DOMAIN_TX, + 'tx', + async () => { + await this.txCollection().insertMany(tx.map((it) => translateDoc(it))) + }, + { tx: tx.length } + ) return [] } - private txCollection (): Collection { + private txCollection (): Collection { if (this.txColl !== undefined) { return this.txColl } @@ -1297,6 +1381,7 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter { return this.txColl } + @withContext('get-model') async getModel (ctx: MeasureContext): Promise { const cursor = await ctx.with('find', {}, async () => this.db.collection(DOMAIN_TX).find( @@ -1461,7 +1546,7 @@ export async function createMongoAdapter ( const client = getMongoClient(url) const db = getWorkspaceDB(await client.getClient(), workspaceId) - return new MongoAdapter(db, hierarchy, modelDb, client, options) + return new MongoAdapter(ctx.newChild('mongoDb', {}), db, hierarchy, modelDb, client, options) } /** @@ -1476,5 +1561,6 @@ export async function createMongoTxAdapter ( ): Promise { const client = getMongoClient(url) const db = getWorkspaceDB(await client.getClient(), workspaceId) - return new MongoTxAdapter(db, hierarchy, modelDb, client) + + return new MongoTxAdapter(ctx.newChild('mongoDbTx', {}), db, hierarchy, modelDb, client) } diff --git a/server/mongo/src/utils.ts b/server/mongo/src/utils.ts index 6bb9ba7bfe..428fcf91bd 100644 --- a/server/mongo/src/utils.ts +++ b/server/mongo/src/utils.ts @@ -13,7 +13,14 @@ // limitations under the License. // -import { toWorkspaceString, type Doc, type Domain, type FieldIndex, type WorkspaceId } from '@hcengineering/core' +import { + generateId, + toWorkspaceString, + type Doc, + type Domain, + type FieldIndexConfig, + type WorkspaceId +} from '@hcengineering/core' import { PlatformError, unknownStatus } from '@hcengineering/platform' import { type DomainHelperOperations } from '@hcengineering/server-core' import { MongoClient, type Collection, type Db, type Document, type MongoClientOptions } from 'mongodb' @@ -27,10 +34,15 @@ process.on('exit', () => { }) }) +const clientRefs = new Map() + /** * @public */ export async function shutdown (): Promise { + for (const it of Array.from(clientRefs.values())) { + console.error((it as any).stack) + } for (const c of connections.values()) { c.close(true) } @@ -78,9 +90,12 @@ class MongoClientReferenceImpl { this.count++ } } - export class ClientRef implements MongoClientReference { - constructor (readonly client: MongoClientReferenceImpl) {} + id = generateId() + stack = new Error().stack + constructor (readonly client: MongoClientReferenceImpl) { + clientRefs.set(this.id, this) + } closed = false async getClient (): Promise { @@ -94,6 +109,7 @@ export class ClientRef implements MongoClientReference { close (): void { // Do not allow double close of mongo connection client if (!this.closed) { + clientRefs.delete(this.id) this.closed = true this.client.close() } @@ -106,13 +122,14 @@ export class ClientRef implements MongoClientReference { */ export function getMongoClient (uri: string, options?: MongoClientOptions): MongoClientReference { const extraOptions = JSON.parse(process.env.MONGO_OPTIONS ?? '{}') - const key = `${uri}${process.env.MONGO_OPTIONS}_${JSON.stringify(options)}` + const key = `${uri}${process.env.MONGO_OPTIONS ?? '{}'}_${JSON.stringify(options ?? {})}` let existing = connections.get(key) // If not created or closed if (existing === undefined) { existing = new MongoClientReferenceImpl( MongoClient.connect(uri, { + appName: 'transactor', ...options, enableUtf8Validation: false, ...extraOptions @@ -184,8 +201,20 @@ export class DBCollectionHelper implements DomainHelperOperations { return this.collections.has(domain) } - async createIndex (domain: Domain, value: string | FieldIndex, options?: { name: string }): Promise { - await this.collection(domain).createIndex(value, options) + async createIndex (domain: Domain, value: string | FieldIndexConfig, options?: { name: string }): Promise { + if (typeof value === 'string') { + await this.collection(domain).createIndex(value, options) + } else { + if (value.filter !== undefined) { + await this.collection(domain).createIndex(value.keys, { + ...options, + sparse: false, + partialFilterExpression: value.filter + }) + } else { + await this.collection(domain).createIndex(value.keys, { ...options, sparse: value.sparse ?? true }) + } + } } async dropIndex (domain: Domain, name: string): Promise { diff --git a/server/server-storage/src/blobStorage.ts b/server/server-storage/src/blobStorage.ts index 70f714a07b..ba62756e48 100644 --- a/server/server-storage/src/blobStorage.ts +++ b/server/server-storage/src/blobStorage.ts @@ -58,7 +58,7 @@ class StorageBlobAdapter implements DbAdapter { } async createIndexes (domain: Domain, config: Pick, 'indexes'>): Promise {} - async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise {} + async removeOldIndex (domain: Domain, deletePattern: RegExp[], keepPattern: RegExp[]): Promise {} async close (): Promise { await this.blobAdapter.close() diff --git a/server/server/src/apm.ts b/server/server/src/apm.ts index 983b52c087..a2a915bf07 100644 --- a/server/server/src/apm.ts +++ b/server/server/src/apm.ts @@ -1,4 +1,4 @@ -import { MeasureContext, MeasureLogger, ParamType, ParamsType } from '@hcengineering/core' +import { MeasureContext, MeasureLogger, ParamType, ParamsType, type FullParamsType } from '@hcengineering/core' import apm, { Agent, Span, Transaction } from 'elastic-apm-node' /** @@ -71,7 +71,7 @@ export class APMMeasureContext implements MeasureContext { name: string, params: ParamsType, op: (ctx: MeasureContext) => T | Promise, - fullParams?: ParamsType + fullParams?: FullParamsType | (() => FullParamsType) ): Promise { const c = this.newChild(name, params) try { diff --git a/server/server/src/starter.ts b/server/server/src/starter.ts index 0faabb2051..8d87611c7f 100644 --- a/server/server/src/starter.ts +++ b/server/server/src/starter.ts @@ -18,7 +18,7 @@ export interface ServerEnv { export function serverConfigFromEnv (): ServerEnv { const serverPort = parseInt(process.env.SERVER_PORT ?? '3333') - const enableCompression = (process.env.ENABLE_COMPRESSION ?? 'false') === 'true' + const enableCompression = (process.env.ENABLE_COMPRESSION ?? 'true') === 'true' const url = process.env.MONGO_URL if (url === undefined) { diff --git a/server/tool/src/index.ts b/server/tool/src/index.ts index bf42df070a..91d06e9bf5 100644 --- a/server/tool/src/index.ts +++ b/server/tool/src/index.ts @@ -18,6 +18,7 @@ import core, { BackupClient, Branding, Client as CoreClient, + coreId, DOMAIN_BENCHMARK, DOMAIN_MIGRATION, DOMAIN_MODEL, @@ -37,7 +38,7 @@ import core, { type Doc, type TxCUD } from '@hcengineering/core' -import { consoleModelLogger, MigrateOperation, ModelLogger } from '@hcengineering/model' +import { consoleModelLogger, MigrateOperation, ModelLogger, tryMigrate } from '@hcengineering/model' import { createMongoTxAdapter, DBCollectionHelper, getMongoClient, getWorkspaceDB } from '@hcengineering/mongo' import { AggregatorStorageAdapter, @@ -180,7 +181,8 @@ export async function updateModel ( // Create update indexes await createUpdateIndexes( ctx, - connection, + connection.getHierarchy(), + connection.getModel(), db, logger, async (value) => { @@ -236,13 +238,6 @@ export async function initializeWorkspace ( } } -export function getStorageAdapter (): StorageAdapter { - const { mongodbUri } = prepareTools([]) - - const storageConfig: StorageConfiguration = storageConfigFromEnv() - return buildStorageFromConfig(storageConfig, mongodbUri) -} - /** * @public */ @@ -368,6 +363,27 @@ export async function upgradeModel ( await progress(20 + ((100 / migrateOperations.length) * i * 20) / 100) i++ } + + await tryMigrate(migrateClient, coreId, [ + { + state: '#sparse', + func: async () => { + ctx.info('Migrate to sparse indexes') + // Create update indexes + await createUpdateIndexes( + ctx, + hierarchy, + modelDb, + db, + logger, + async (value) => { + await progress(90 + (Math.min(value, 100) / 100) * 10) + }, + workspaceId + ) + } + } + ]) }) logger.log('Apply upgrade operations', { workspaceId: workspaceId.name }) @@ -400,7 +416,7 @@ export async function upgradeModel ( await op[1].upgrade(migrateState, getUpgradeClient, logger) }) logger.log('upgrade:', { operation: op[0], time: Date.now() - t, workspaceId: workspaceId.name }) - await progress(60 + ((100 / migrateOperations.length) * i * 40) / 100) + await progress(60 + ((100 / migrateOperations.length) * i * 30) / 100) i++ } }) @@ -460,33 +476,37 @@ async function fetchModelFromMongo ( const txAdapter = await createMongoTxAdapter(ctx, hierarchy, mongodbUri, workspaceId, modelDb) - model = model ?? (await ctx.with('get-model', {}, async (ctx) => await txAdapter.getModel(ctx))) + try { + model = model ?? (await ctx.with('get-model', {}, async (ctx) => await txAdapter.getModel(ctx))) - await ctx.with('build local model', {}, async () => { - for (const tx of model ?? []) { - try { - hierarchy.tx(tx) - } catch (err: any) {} - } - modelDb.addTxes(ctx, model as Tx[], false) - }) - await txAdapter.close() + await ctx.with('build local model', {}, async () => { + for (const tx of model ?? []) { + try { + hierarchy.tx(tx) + } catch (err: any) {} + } + modelDb.addTxes(ctx, model as Tx[], false) + }) + } finally { + await txAdapter.close() + } return { hierarchy, modelDb, model } } async function createUpdateIndexes ( ctx: MeasureContext, - connection: CoreClient, + hierarchy: Hierarchy, + model: ModelDb, db: Db, logger: ModelLogger, progress: (value: number) => Promise, workspaceId: WorkspaceId ): Promise { - const domainHelper = new DomainIndexHelperImpl(ctx, connection.getHierarchy(), connection.getModel(), workspaceId) + const domainHelper = new DomainIndexHelperImpl(ctx, hierarchy, model, workspaceId) const dbHelper = new DBCollectionHelper(db) await dbHelper.init() let completed = 0 - const allDomains = connection.getHierarchy().domains() + const allDomains = hierarchy.domains() for (const domain of allDomains) { if (domain === DOMAIN_MODEL || domain === DOMAIN_TRANSIENT || domain === DOMAIN_BENCHMARK) { continue diff --git a/tests/sanity/tests/playwright.config.ts b/tests/sanity/tests/playwright.config.ts index ffd9f15e2d..1f977da65e 100644 --- a/tests/sanity/tests/playwright.config.ts +++ b/tests/sanity/tests/playwright.config.ts @@ -24,11 +24,14 @@ const config: PlaywrightTestConfig = { snapshots: true, screenshots: true, sources: true + }, + contextOptions: { + reducedMotion: 'reduce' } } } ], - retries: 1, + retries: 2, timeout: 60000, maxFailures, expect: {