From c42923c2b18bb224042b496b106ef084894cd11c Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Tue, 6 Aug 2024 21:36:03 +0700 Subject: [PATCH] Fix Indexer use of $ne (#6264) Signed-off-by: Andrey Sobolev --- .vscode/launch.json | 4 +- dev/tool/src/clean.ts | 2 +- models/core/src/core.ts | 14 +- models/core/src/index.ts | 12 +- models/core/src/migration.ts | 9 +- packages/core/src/classes.ts | 12 +- packages/core/src/component.ts | 2 - packages/core/src/measurements/context.ts | 14 + packages/core/src/measurements/types.ts | 7 + pods/account/package.json | 2 +- pods/backup/package.json | 2 +- pods/collaborator/package.json | 2 +- pods/server/package.json | 2 +- server-plugins/collaboration/src/fulltext.ts | 5 +- server-plugins/openai/src/openai.ts | 29 +- server/core/src/adapter.ts | 1 + server/core/src/fulltext.ts | 11 +- server/core/src/indexer/content.ts | 2 - server/core/src/indexer/field.ts | 54 +- server/core/src/indexer/fulltextPush.ts | 180 +++++-- server/core/src/indexer/indexer.ts | 473 ++++++++---------- server/core/src/indexer/summary.ts | 50 +- server/core/src/indexer/types.ts | 15 +- server/core/src/indexer/utils.ts | 61 +-- server/core/src/server/storage.ts | 2 - server/mongo/src/storage.ts | 7 +- server/server/src/apm.ts | 20 +- server/tool/src/index.ts | 4 +- server/translate/src/retranslate.ts | 42 +- server/ws/src/server_http.ts | 5 - server/ws/src/utils.ts | 3 - tests/sanity/tests/chat/chat.spec.ts | 21 +- .../custom-attributes.spec.ts | 5 +- tests/sanity/tests/inbox/inbox.spec.ts | 5 +- tests/sanity/tests/planning/plan.spec.ts | 5 +- 35 files changed, 485 insertions(+), 599 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index d28b8c8952..a0d4b2742b 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -60,6 +60,7 @@ "runtimeArgs": ["--nolazy", "-r", "ts-node/register"], "runtimeVersion": "20", "showAsyncStacks": true, + "outputCapture": "std", "sourceMaps": true, "cwd": "${workspaceRoot}/pods/server", "protocol": "inspector" @@ -75,6 +76,7 @@ "TRANSACTOR_URL": "ws://localhost:3333", "ACCOUNT_PORT": "3000", "FRONT_URL": "http://localhost:8080", + "outputCapture": "std", "SES_URL": "http://localhost:8091", "MINIO_ACCESS_KEY": "minioadmin", "MINIO_SECRET_KEY": "minioadmin", @@ -186,7 +188,7 @@ "name": "Debug tool upgrade", "type": "node", "request": "launch", - "args": ["src/__start.ts", "upgrade"], + "args": ["src/__start.ts", "upgrade-workspace", "platform"], "env": { "SERVER_SECRET": "secret", "MINIO_ACCESS_KEY": "minioadmin", diff --git a/dev/tool/src/clean.ts b/dev/tool/src/clean.ts index a7d6c51c0d..20cca7c2b5 100644 --- a/dev/tool/src/clean.ts +++ b/dev/tool/src/clean.ts @@ -1242,7 +1242,7 @@ async function updateId ( stages: {}, removed: false }) - await txop.update(docIndexState, { removed: true }) + await txop.update(docIndexState, { removed: true, needIndex: true }) } if (domain !== DOMAIN_MODEL) { diff --git a/models/core/src/core.ts b/models/core/src/core.ts index 2f3bc0a777..5ddbb9cd77 100644 --- a/models/core/src/core.ts +++ b/models/core/src/core.ts @@ -38,7 +38,6 @@ import { type EnumOf, type FieldIndexConfig, type FullTextSearchContext, - type IndexStageState, type IndexingConfiguration, type Interface, type MigrationState, @@ -329,27 +328,24 @@ export class TDocIndexState extends TDoc implements DocIndexState { attributes!: Record @Prop(TypeBoolean(), getEmbeddedLabel('Removed')) - // @Index(IndexKind.Indexed) @Hidden() removed!: boolean + @Prop(TypeBoolean(), getEmbeddedLabel('NeedIndexing')) + @Hidden() + needIndex!: boolean + // States for different stages @Prop(TypeRecord(), getEmbeddedLabel('Stages')) // @Index(IndexKind.Indexed) @Hidden() - stages!: Record + stages!: Record @Prop(TypeString(), getEmbeddedLabel('Generation')) @Hidden() generationId?: string } -@Model(core.class.IndexStageState, core.class.Doc, DOMAIN_DOC_INDEX_STATE) -export class TIndexStageState extends TDoc implements IndexStageState { - stageId!: string - attributes!: Record -} - @MMixin(core.mixin.FullTextSearchContext, core.class.Class) export class TFullTextSearchContext extends TClass implements FullTextSearchContext {} diff --git a/models/core/src/index.ts b/models/core/src/index.ts index e8b21d7c19..b37e77d1ac 100644 --- a/models/core/src/index.ts +++ b/models/core/src/index.ts @@ -49,7 +49,6 @@ import { TEnumOf, TFullTextSearchContext, TIndexConfiguration, - TIndexStageState, TInterface, TMigrationState, TMixin, @@ -94,9 +93,9 @@ import { TUserStatus } from './transient' import { TTx, TTxApplyIf, - TTxCUD, TTxCollectionCUD, TTxCreateDoc, + TTxCUD, TTxMixin, TTxRemoveDoc, TTxUpdateDoc, @@ -164,7 +163,6 @@ export function createModel (builder: Builder): void { TTypeAny, TTypeRelatedDocument, TDocIndexState, - TIndexStageState, TFullTextSearchContext, TConfiguration, TConfigurationElement, @@ -284,8 +282,9 @@ export function createModel (builder: Builder): void { builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, { domain: DOMAIN_DOC_INDEX_STATE, indexes: [ - { keys: { removed: 1 }, filter: { removed: true } }, - { keys: { _class: 1 }, filter: { _class: core.class.DocIndexState } } + { + keys: { needIndex: 1 } + } ], disabled: [ { attachedToClass: 1 }, @@ -297,8 +296,7 @@ export function createModel (builder: Builder): void { { createdBy: 1 }, { createdBy: -1 }, { createdOn: -1 } - ], - skip: ['stages.'] + ] }) builder.mixin(core.class.Space, core.class.Class, core.mixin.FullTextSearchContext, { diff --git a/models/core/src/migration.ts b/models/core/src/migration.ts index 28fe968978..0844c55c33 100644 --- a/models/core/src/migration.ts +++ b/models/core/src/migration.ts @@ -155,7 +155,8 @@ export const coreOperation: MigrateOperation = { { objectClass: { $nin: allIndexed } }, { $set: { - removed: true + removed: true, + needIndex: true } } ) @@ -182,6 +183,12 @@ export const coreOperation: MigrateOperation = { { state: 'old-statuses-transactions', func: migrateStatusTransactions + }, + { + state: 'add-need-index', + func: async (client: MigrationClient) => { + await client.update(DOMAIN_DOC_INDEX_STATE, {}, { $set: { needIndex: true } }) + } } ]) }, diff --git a/packages/core/src/classes.ts b/packages/core/src/classes.ts index df6802b66a..27e8c0c912 100644 --- a/packages/core/src/classes.ts +++ b/packages/core/src/classes.ts @@ -510,7 +510,9 @@ export interface DocIndexState extends Doc { generationId?: string // States for stages - stages: Record + stages: Record + + needIndex: boolean removed: boolean @@ -522,14 +524,6 @@ export interface DocIndexState extends Doc { shortSummary?: string | null } -/** - * @public - */ -export interface IndexStageState extends Doc { - stageId: string - attributes: Record -} - /** * @public * diff --git a/packages/core/src/component.ts b/packages/core/src/component.ts index 2af9823260..436d77f8c6 100644 --- a/packages/core/src/component.ts +++ b/packages/core/src/component.ts @@ -32,7 +32,6 @@ import type { EnumOf, FullTextSearchContext, Hyperlink, - IndexStageState, IndexingConfiguration, Interface, Markup, @@ -135,7 +134,6 @@ export default plugin(coreId, { UserStatus: '' as Ref>, TypeRelatedDocument: '' as Ref>>, DocIndexState: '' as Ref>, - IndexStageState: '' as Ref>, DomainIndexConfiguration: '' as Ref>, Configuration: '' as Ref>, diff --git a/packages/core/src/measurements/context.ts b/packages/core/src/measurements/context.ts index d88054cf68..5bcb4af6ca 100644 --- a/packages/core/src/measurements/context.ts +++ b/packages/core/src/measurements/context.ts @@ -116,6 +116,20 @@ export class MeasureMetricsContext implements MeasureContext { } } + withSync( + name: string, + params: ParamsType, + op: (ctx: MeasureContext) => T, + fullParams?: ParamsType | (() => FullParamsType) + ): T { + const c = this.newChild(name, params, fullParams, this.logger) + try { + return op(c) + } finally { + c.end() + } + } + async withLog( name: string, params: ParamsType, diff --git a/packages/core/src/measurements/types.ts b/packages/core/src/measurements/types.ts index fbe77be055..e056ba1009 100644 --- a/packages/core/src/measurements/types.ts +++ b/packages/core/src/measurements/types.ts @@ -63,6 +63,13 @@ export interface MeasureContext { fullParams?: FullParamsType | (() => FullParamsType) ) => Promise + withSync: ( + name: string, + params: ParamsType, + op: (ctx: MeasureContext) => T, + fullParams?: FullParamsType | (() => FullParamsType) + ) => T + withLog: ( name: string, params: ParamsType, diff --git a/pods/account/package.json b/pods/account/package.json index 3d3e661abe..5ae1ba8424 100644 --- a/pods/account/package.json +++ b/pods/account/package.json @@ -14,7 +14,7 @@ "_phase:bundle": "rushx bundle", "_phase:docker-build": "rushx docker:build", "_phase:docker-staging": "rushx docker:staging", - "bundle": "mkdir -p bundle && esbuild src/__start.ts --bundle --define:process.env.MODEL_VERSION=$(node ../../common/scripts/show_version.js) --minify --platform=node > bundle/bundle.js", + "bundle": "mkdir -p bundle && esbuild src/__start.ts --sourcemap=inline --external:*.node --external:snappy --bundle --define:process.env.MODEL_VERSION=$(node ../../common/scripts/show_version.js) --minify --platform=node > bundle/bundle.js", "docker:build": "../../common/scripts/docker_build.sh hardcoreeng/account", "docker:tbuild": "docker build -t hardcoreeng/account . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/account", "docker:abuild": "docker build -t hardcoreeng/account . --platform=linux/arm64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/account", diff --git a/pods/backup/package.json b/pods/backup/package.json index 86c36c2c6f..13f45b9e88 100644 --- a/pods/backup/package.json +++ b/pods/backup/package.json @@ -14,7 +14,7 @@ "_phase:bundle": "rushx bundle", "_phase:docker-build": "rushx docker:build", "_phase:docker-staging": "rushx docker:staging", - "bundle": "mkdir -p bundle && esbuild src/index.ts --bundle --sourcemap=inline --minify --platform=node > bundle/bundle.js", + "bundle": "mkdir -p bundle && esbuild src/index.ts --bundle --sourcemap=inline --platform=node --external:*.node --external:bufferutil --external:snappy > bundle/bundle.js", "docker:build": "../../common/scripts/docker_build.sh hardcoreeng/backup", "docker:staging": "../../common/scripts/docker_tag.sh hardcoreeng/backup staging", "docker:push": "../../common/scripts/docker_tag.sh hardcoreeng/backup", diff --git a/pods/collaborator/package.json b/pods/collaborator/package.json index feebcac7c0..31898da99f 100644 --- a/pods/collaborator/package.json +++ b/pods/collaborator/package.json @@ -13,7 +13,7 @@ "_phase:bundle": "rushx bundle", "_phase:docker-build": "rushx docker:build", "_phase:docker-staging": "rushx docker:staging", - "bundle": "mkdir -p bundle && esbuild src/__start.ts --bundle --platform=node --keep-names > bundle/bundle.js", + "bundle": "mkdir -p bundle && esbuild src/__start.ts --bundle --platform=node --sourcemap=inline --keep-names --external:*.node --external:bufferutil --external:snappy > bundle/bundle.js", "docker:build": "../../common/scripts/docker_build.sh hardcoreeng/collaborator", "docker:tbuild": "docker build -t hardcoreeng/collaborator . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/collaborator", "docker:abuild": "docker build -t hardcoreeng/collaborator . --platform=linux/arm64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/collaborator", diff --git a/pods/server/package.json b/pods/server/package.json index 609152ce69..72020dedef 100644 --- a/pods/server/package.json +++ b/pods/server/package.json @@ -15,7 +15,7 @@ "_phase:bundle": "rushx bundle", "_phase:docker-build": "rushx docker:build", "_phase:docker-staging": "rushx docker:staging", - "bundle": "mkdir -p bundle && esbuild src/__start.ts --minify --bundle --keep-names --platform=node --external:*.node --external:bufferutil --external:utf-8-validate --define:process.env.MODEL_VERSION=$(node ../../common/scripts/show_version.js) --define:process.env.GIT_REVISION=$(../../common/scripts/git_version.sh) --outfile=bundle/bundle.js --log-level=error --sourcemap=external", + "bundle": "mkdir -p bundle && esbuild src/__start.ts --sourcemap=inline --bundle --keep-names --platform=node --external:*.node --external:bufferutil --external:snappy --external:utf-8-validate --define:process.env.MODEL_VERSION=$(node ../../common/scripts/show_version.js) --define:process.env.GIT_REVISION=$(../../common/scripts/git_version.sh) --outfile=bundle/bundle.js --log-level=error --sourcemap=external", "docker:build": "../../common/scripts/docker_build.sh hardcoreeng/transactor", "docker:tbuild": "docker build -t hardcoreeng/transactor . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/transactor", "docker:abuild": "docker build -t hardcoreeng/transactor . --platform=linux/arm64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/transactor", diff --git a/server-plugins/collaboration/src/fulltext.ts b/server-plugins/collaboration/src/fulltext.ts index 9560271f9c..7a9bcf9d00 100644 --- a/server-plugins/collaboration/src/fulltext.ts +++ b/server-plugins/collaboration/src/fulltext.ts @@ -35,6 +35,7 @@ import { FullTextPipelineStage, IndexedDoc, StorageAdapter, + collabStageId, contentStageId, docKey, docUpdKey, @@ -46,7 +47,7 @@ import { */ export class CollaborativeContentRetrievalStage implements FullTextPipelineStage { require = [] - stageId = contentStageId + stageId = collabStageId extra = ['content', 'base64'] @@ -59,8 +60,6 @@ export class CollaborativeContentRetrievalStage implements FullTextPipelineStage textLimit = 100 * 1024 - stageValue: boolean | string = true - constructor ( readonly storageAdapter: StorageAdapter | undefined, readonly workspace: WorkspaceId, diff --git a/server-plugins/openai/src/openai.ts b/server-plugins/openai/src/openai.ts index 6b111f8141..31026630bf 100644 --- a/server-plugins/openai/src/openai.ts +++ b/server-plugins/openai/src/openai.ts @@ -20,7 +20,6 @@ import core, { DocumentQuery, DocumentUpdate, docUpdKey, - IndexStageState, MeasureContext, Ref, WorkspaceId @@ -37,7 +36,6 @@ import { FullTextPipelineStage, IndexedDoc, isIndexingRequired, - loadIndexStageStage, RateLimiter } from '@hcengineering/server-core' import got from 'got' @@ -92,12 +90,7 @@ export class OpenAIEmbeddingsStage implements FullTextPipelineStage { rate = 5 - stageValue: boolean | string = true - limiter = new RateLimiter(this.rate) - - indexState?: IndexStageState - async update (doc: DocIndexState, update: DocumentUpdate): Promise {} constructor ( @@ -156,22 +149,6 @@ export class OpenAIEmbeddingsStage implements FullTextPipelineStage { console.error(err) this.enabled = false } - - ;[this.stageValue, this.indexState] = await loadIndexStageStage( - ctx, - storage, - this.indexState, - this.stageId, - 'config', - { - enabled: this.enabled, - endpoint: this.endpoint, - field: this.field, - mode: this.model, - copyToState: this.copyToState, - stripNewLines: true - } - ) } async getEmbedding (text: string): Promise { @@ -299,7 +276,7 @@ export class OpenAIEmbeddingsStage implements FullTextPipelineStage { // No need to index this class, mark embeddings as empty ones. if (!needIndex) { - await pipeline.update(doc._id, this.stageValue, {}) + await pipeline.update(doc._id, true, {}) return } @@ -359,13 +336,13 @@ export class OpenAIEmbeddingsStage implements FullTextPipelineStage { console.error(err) } - await pipeline.update(doc._id, this.stageValue, update) + await pipeline.update(doc._id, true, update) } async remove (docs: DocIndexState[], pipeline: FullTextPipeline): Promise { // will be handled by field processor for (const doc of docs) { - await pipeline.update(doc._id, this.stageValue, {}) + await pipeline.update(doc._id, true, {}) } } } diff --git a/server/core/src/adapter.ts b/server/core/src/adapter.ts index 578cecd513..6f11b7854f 100644 --- a/server/core/src/adapter.ts +++ b/server/core/src/adapter.ts @@ -112,6 +112,7 @@ export interface DbAdapter { query: DocumentQuery, options?: ServerFindOptions ) => Promise> + tx: (ctx: MeasureContext, ...tx: Tx[]) => Promise find: (ctx: MeasureContext, domain: Domain, recheck?: boolean) => StorageIterator diff --git a/server/core/src/fulltext.ts b/server/core/src/fulltext.ts index 9e18f1c55f..b770983662 100644 --- a/server/core/src/fulltext.ts +++ b/server/core/src/fulltext.ts @@ -14,9 +14,11 @@ // limitations under the License. // +import { Analytics } from '@hcengineering/analytics' import core, { type AttachedDoc, type Class, + type Collection, type Doc, type DocIndexState, type DocumentQuery, @@ -47,7 +49,6 @@ import { createStateDoc } from './indexer/utils' import { getScoringConfig, mapSearchResultDoc } from './mapper' import { type StorageAdapter } from './storage' import type { FullTextAdapter, IndexedDoc, ServerStorage, WithFind } from './types' -import { Analytics } from '@hcengineering/analytics' /** * @public @@ -102,7 +103,8 @@ export class FullTextIndex implements WithFind { attachedTo, attachedToClass, space: tx.objectSpace, - removed: false + removed: false, + needIndex: true }) stDocs.set(cud.objectId as Ref, { create: stDoc, updated: false, removed: false }) } else { @@ -143,7 +145,7 @@ export class FullTextIndex implements WithFind { const ids: Set> = new Set>() const baseClass = this.hierarchy.getBaseClass(_class) - let classes = this.hierarchy.getDescendants(baseClass) + let classes = this.hierarchy.getDescendants(baseClass).filter((it) => !this.hierarchy.isMixin(it)) const attrs = this.hierarchy.getAllAttributes(_class) @@ -173,7 +175,8 @@ export class FullTextIndex implements WithFind { } if (attr.type._class === core.class.Collection) { // we need attached documents to be in classes - const dsc = this.hierarchy.getDescendants(attr.attributeOf) + const coll = attr.type as Collection + const dsc = this.hierarchy.getDescendants(coll.of).filter((it) => !this.hierarchy.isMixin(it)) classes = classes.concat(dsc) } } diff --git a/server/core/src/indexer/content.ts b/server/core/src/indexer/content.ts index 079a4892c0..f93f1208eb 100644 --- a/server/core/src/indexer/content.ts +++ b/server/core/src/indexer/content.ts @@ -56,8 +56,6 @@ export class ContentRetrievalStage implements FullTextPipelineStage { textLimit = 100 * 1024 - stageValue: boolean | string = true - constructor ( readonly storageAdapter: StorageAdapter | undefined, readonly workspace: WorkspaceId, diff --git a/server/core/src/indexer/field.ts b/server/core/src/indexer/field.ts index ff3949ec57..d7e9536196 100644 --- a/server/core/src/indexer/field.ts +++ b/server/core/src/indexer/field.ts @@ -22,7 +22,6 @@ import core, { type DocIndexState, type DocumentQuery, type DocumentUpdate, - type IndexStageState, type MeasureContext, type Ref } from '@hcengineering/core' @@ -36,15 +35,7 @@ import { type FullTextPipeline, type FullTextPipelineStage } from './types' -import { - collectPropagate, - docKey, - docUpdKey, - getContent, - getCustomAttrKeys, - isFullTextAttribute, - loadIndexStageStage -} from './utils' +import { collectPropagate, docKey, docUpdKey, getContent, getCustomAttrKeys, isFullTextAttribute } from './utils' import { Analytics } from '@hcengineering/analytics' /** @@ -61,46 +52,9 @@ export class IndexedFieldStage implements FullTextPipelineStage { updateFields: DocUpdateHandler[] = [] enabled = true - - stageValue: boolean | string = true - - indexState?: IndexStageState - constructor (private readonly dbStorage: ServerStorage) {} - async initialize (ctx: MeasureContext, storage: DbAdapter, pipeline: FullTextPipeline): Promise { - const indexablePropogate = ( - await pipeline.model.findAll(core.class.Class, { - [core.mixin.FullTextSearchContext]: { $exists: true } - }) - ) - .map((it) => pipeline.hierarchy.as(it, core.mixin.FullTextSearchContext)) - .filter((it) => it.propagate != null || it.parentPropagate) - .map((it) => - JSON.stringify({ - id: it._id, - propogate: it.propagate, - parentPropgate: it.parentPropagate - }) - ) - - const forceIndexing = ( - await pipeline.model.findAll(core.class.Class, { [core.mixin.FullTextSearchContext + '.forceIndex']: true }) - ).map((it) => it._id) - - indexablePropogate.sort() - ;[this.stageValue, this.indexState] = await loadIndexStageStage( - ctx, - storage, - this.indexState, - this.stageId, - 'config', - { - classes: indexablePropogate, - forceIndex: forceIndexing - } - ) - } + async initialize (ctx: MeasureContext, storage: DbAdapter, pipeline: FullTextPipeline): Promise {} async search ( _classes: Ref>[], @@ -234,7 +188,7 @@ export class IndexedFieldStage implements FullTextPipelineStage { } } - await pipeline.update(docState._id, this.stageValue, docUpdate) + await pipeline.update(docState._id, true, docUpdate) } catch (err: any) { Analytics.handleError(err) continue @@ -272,7 +226,7 @@ export class IndexedFieldStage implements FullTextPipelineStage { await pipeline.update(attachedTo, false, parentDocUpdate) } } - await pipeline.update(doc._id, this.stageValue, {}) + await pipeline.update(doc._id, true, {}) } } } diff --git a/server/core/src/indexer/fulltextPush.ts b/server/core/src/indexer/fulltextPush.ts index 683fcdf58b..6b70480c3c 100644 --- a/server/core/src/indexer/fulltextPush.ts +++ b/server/core/src/indexer/fulltextPush.ts @@ -13,37 +13,40 @@ // limitations under the License. // +import { Analytics } from '@hcengineering/analytics' import core, { + type AnyAttribute, type ArrOf, + type Branding, type Class, type Doc, type DocIndexState, type DocumentQuery, type DocumentUpdate, extractDocKey, + getFullTextContext, type Hierarchy, isFullTextAttribute, type MeasureContext, + RateLimiter, type Ref, - type WorkspaceId, - getFullTextContext, - type Branding + toIdMap, + type WorkspaceId } from '@hcengineering/core' import { jsonToText, markupToJSON } from '@hcengineering/text' import { type DbAdapter } from '../adapter' import { updateDocWithPresenter } from '../mapper' import { type FullTextAdapter, type IndexedDoc, type ServerStorage } from '../types' -import { summaryStageId } from './summary' import { contentStageId, type DocUpdateHandler, fieldStateId, type FullTextPipeline, type FullTextPipelineStage, - fullTextPushStageId + fullTextPushStageId, + summaryStageId } from './types' import { collectPropagate, collectPropagateClasses, docKey, isCustomAttr } from './utils' -import { Analytics } from '@hcengineering/analytics' /** * @public @@ -62,8 +65,6 @@ export class FullTextPushStage implements FullTextPipelineStage { field_enabled = '_use' - stageValue: boolean | string = true - constructor ( private readonly dbStorage: ServerStorage, readonly fulltextAdapter: FullTextAdapter, @@ -103,10 +104,16 @@ export class FullTextPushStage implements FullTextPipelineStage { return { docs: [], pass: true } } + allAttrs = new WeakMap>, Map>() + async collect (toIndex: DocIndexState[], pipeline: FullTextPipeline, ctx: MeasureContext): Promise { const bulk: IndexedDoc[] = [] const part = [...toIndex] + + const parentsMap = new Map, DocIndexState>() + + const pushQueue = new RateLimiter(5) while (part.length > 0) { const toIndexPart = part.splice(0, 50) @@ -126,14 +133,31 @@ export class FullTextPushStage implements FullTextPipelineStage { }) ) + // spaces + const spaceDocs = toIdMap( + await ctx.with( + 'find-spaces', + {}, + async (ctx) => + await this.dbStorage.findAll(ctx, core.class.DocIndexState, { + _id: { + $in: toIndexPart.map( + (doc) => + (doc.attributes[docKey('space', { _class: doc.objectClass })] ?? doc.space) as Ref + ) + } + }) + ) + ) + for (const doc of toIndexPart) { if (pipeline.cancelling) { return } const elasticDoc = createElasticDoc(doc) try { - await ctx.with('updateDoc2Elastic', {}, async () => { - updateDoc2Elastic(doc.attributes, elasticDoc, undefined, undefined, pipeline.hierarchy) + ctx.withSync('updateDoc2Elastic', {}, (ctx) => { + updateDoc2Elastic(this.allAttrs, ctx, doc.attributes, elasticDoc, undefined, undefined, pipeline.hierarchy) }) // Include all child attributes @@ -142,8 +166,17 @@ export class FullTextPushStage implements FullTextPipelineStage { for (const c of childDocs) { const fctx = getFullTextContext(pipeline.hierarchy, c.objectClass) if (fctx.parentPropagate ?? true) { - await ctx.with('updateDoc2Elastic', {}, async () => { - updateDoc2Elastic(c.attributes, elasticDoc, c._id, undefined, pipeline.hierarchy, true) + ctx.withSync('updateDoc2Elastic', {}, (ctx) => { + updateDoc2Elastic( + this.allAttrs, + ctx, + c.attributes, + elasticDoc, + c._id, + undefined, + pipeline.hierarchy, + true + ) }) } } @@ -153,18 +186,34 @@ export class FullTextPushStage implements FullTextPipelineStage { const propagate: Ref>[] = collectPropagate(pipeline, doc.attachedToClass) if (propagate.some((it) => pipeline.hierarchy.isDerived(doc.objectClass, it))) { // We need to include all parent content into this one. - ;[parentDoc] = await ctx.with( - 'find-parent', - {}, - async (ctx) => - await this.dbStorage.findAll(ctx, core.class.DocIndexState, { - _id: doc.attachedTo as Ref - }) - ) + parentDoc = + parentsMap.get(doc.attachedTo as Ref) ?? + (await ctx.with('find-parent', {}, async (ctx) => + ( + await this.dbStorage.findAll( + ctx, + core.class.DocIndexState, + { + _id: doc.attachedTo as Ref + }, + { limit: 1 } + ) + ).shift() + )) if (parentDoc !== undefined) { + parentsMap.set(parentDoc._id, parentDoc) const ppdoc = parentDoc - await ctx.with('updateDoc2Elastic', {}, async () => { - updateDoc2Elastic(ppdoc.attributes, elasticDoc, ppdoc._id, undefined, pipeline.hierarchy, true) + ctx.withSync('updateDoc2Elastic', {}, (ctx) => { + updateDoc2Elastic( + this.allAttrs, + ctx, + ppdoc.attributes, + elasticDoc, + ppdoc._id, + undefined, + pipeline.hierarchy, + true + ) }) const collectClasses = collectPropagateClasses(pipeline, parentDoc.objectClass) @@ -175,21 +224,25 @@ export class FullTextPushStage implements FullTextPipelineStage { { attachedTo: parentDoc._id, objectClass: { $in: collectClasses } } ) for (const c of collections) { - await ctx.with('updateDoc2Elastic', {}, async () => { - updateDoc2Elastic(c.attributes, elasticDoc, c._id, undefined, pipeline.hierarchy, true) + ctx.withSync('updateDoc2Elastic', {}, (ctx) => { + updateDoc2Elastic( + this.allAttrs, + ctx, + c.attributes, + elasticDoc, + c._id, + undefined, + pipeline.hierarchy, + true + ) }) } } } } } - const [spaceDoc] = await ctx.with( - 'find-space', - {}, - async (ctx) => - await this.dbStorage.findAll(ctx, core.class.DocIndexState, { - _id: (doc.attributes[docKey('space', { _class: doc.objectClass })] ?? doc.space) as Ref - }) + const spaceDoc = spaceDocs.get( + (doc.attributes[docKey('space', { _class: doc.objectClass })] ?? doc.space) as Ref ) await updateDocWithPresenter(pipeline.hierarchy, doc, elasticDoc, { parentDoc, spaceDoc }, this.branding) @@ -210,15 +263,37 @@ export class FullTextPushStage implements FullTextPipelineStage { } } // Perform bulk update to elastic - try { - await this.fulltextAdapter.updateMany(bulk) - for (const doc of toIndex) { - await pipeline.update(doc._id, true, {}) + + void pushQueue.add(async () => { + try { + try { + await ctx.with('push-elastic', {}, async () => { + await this.fulltextAdapter.updateMany(bulk) + }) + } catch (err: any) { + Analytics.handleError(err) + // Try to push one by one + await ctx.with('push-elastic-by-one', {}, async () => { + for (const d of bulk) { + try { + await this.fulltextAdapter.update(d.id, d) + } catch (err2: any) { + Analytics.handleError(err2) + } + } + }) + } + if (!pipeline.cancelling) { + for (const doc of toIndexPart) { + await pipeline.update(doc._id, true, {}) + } + } + } catch (err: any) { + Analytics.handleError(err) } - } catch (err: any) { - Analytics.handleError(err) - } + }) } + await pushQueue.waitProcessing() } async remove (docs: DocIndexState[], pipeline: FullTextPipeline): Promise { @@ -246,6 +321,8 @@ export function createElasticDoc (upd: DocIndexState): IndexedDoc { return doc } function updateDoc2Elastic ( + allAttrs: WeakMap>, Map>, + ctx: MeasureContext, attributes: Record, doc: IndexedDoc, docIdOverride?: Ref, @@ -264,10 +341,25 @@ function updateDoc2Elastic ( let vv: any = v if (vv != null && extra.includes('base64')) { - vv = Buffer.from(v, 'base64').toString() + ctx.withSync('buffer-from', {}, () => { + vv = Buffer.from(v, 'base64').toString() + }) } try { - const attribute = hierarchy?.getAttribute(_class ?? doc._class[0], attr) + let attrs = allAttrs.get(_class ?? doc._class[0]) + + if (attrs === undefined) { + attrs = new Map() + if (attrs !== undefined) { + allAttrs.set(_class ?? doc._class[0], attrs) + } + } + + const attribute = attrs?.get(attr) ?? hierarchy?.findAttribute(_class ?? doc._class[0], attr) + if (attribute !== undefined) { + attrs.set(attr, attribute) + allAttrs.set(_class ?? doc._class[0], attrs) + } if (attribute !== undefined && vv != null) { if ( isFullTextAttribute(attribute) || @@ -283,7 +375,9 @@ function updateDoc2Elastic ( attribute.type._class === core.class.TypeMarkup || attribute.type._class === core.class.TypeCollaborativeMarkup ) { - vvv = jsonToText(markupToJSON(vv)) + ctx.withSync('markup-to-json-text', {}, () => { + vvv = jsonToText(markupToJSON(vv)) + }) } if (!(doc.fulltextSummary ?? '').includes(vvv)) { doc.fulltextSummary = (doc.fulltextSummary ?? '') + vvv + '\n' @@ -321,8 +415,8 @@ function updateDoc2Elastic ( const spaceKey = docKey('space', { _class: core.class.Doc }) if (doc[spaceKey] !== undefined) { - const existsingSpace = Array.isArray(doc.space) ? doc.space : [doc.space] + const existingSpaces = Array.isArray(doc.space) ? doc.space : [doc.space] const newSpaces = Array.isArray(doc[spaceKey]) ? doc[spaceKey] : [doc[spaceKey]] - doc.space = [...existsingSpace, ...newSpaces].filter((it, idx, arr) => arr.indexOf(it) === idx) + doc.space = [...existingSpaces, ...newSpaces].filter((it, idx, arr) => arr.indexOf(it) === idx) } } diff --git a/server/core/src/indexer/indexer.ts b/server/core/src/indexer/indexer.ts index 40889eb95f..0fb0146a5b 100644 --- a/server/core/src/indexer/indexer.ts +++ b/server/core/src/indexer/indexer.ts @@ -25,14 +25,12 @@ import core, { type MeasureContext, type ModelDb, type Ref, - SortingOrder, TxFactory, type WorkspaceId, _getOperator, docKey, groupByArray, - setObjectValue, - toFindResult + setObjectValue } from '@hcengineering/core' import { type DbAdapter } from '../adapter' import { RateLimiter } from '../limitter' @@ -74,9 +72,6 @@ export class FullTextIndexPipeline implements FullTextPipeline { indexing: Promise | undefined - // Temporary skipped items - skipped = new Map, number>() - indexId = indexCounter++ updateTriggerTimer: any @@ -99,7 +94,6 @@ export class FullTextIndexPipeline implements FullTextPipeline { async cancel (): Promise { this.cancelling = true clearTimeout(this.updateBroadcast) - clearTimeout(this.skippedReiterationTimeout) clearInterval(this.updateTriggerTimer) // We need to upload all bulk changes. await this.processUpload(this.metrics) @@ -114,7 +108,8 @@ export class FullTextIndexPipeline implements FullTextPipeline { await this.storage.tx( this.metrics, ops.createTxUpdateDoc(doc._class, doc.space, doc._id, { - removed: true + removed: true, + needIndex: true }) ) } @@ -153,7 +148,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { } } - updateDoc (doc: DocIndexState, tx: DocumentUpdate, updateDate: boolean): DocIndexState { + updateDoc (doc: DocIndexState, tx: DocumentUpdate, finish: boolean): DocIndexState { for (const key in tx) { if (key.startsWith('$')) { const operator = _getOperator(key) @@ -168,7 +163,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { doc.space = doc.attributes[spaceKey] } - if (updateDate) { + if (finish) { doc.modifiedBy = core.account.System doc.modifiedOn = Date.now() } @@ -213,6 +208,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { // We need to clear only first state, to prevent multiple index operations to happen. ;(upd as any)['stages.' + this.stages[0].stageId] = false + upd.needIndex = true this.updateOps.set(u[0], upd) } } @@ -225,7 +221,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { // Update are commulative async update ( docId: Ref, - mark: boolean | string, + mark: boolean, update: DocumentUpdate, flush?: boolean ): Promise { @@ -233,7 +229,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { if (udoc !== undefined) { await this.stageUpdate(udoc, update) - udoc = this.updateDoc(udoc, update, mark !== false) + udoc = this.updateDoc(udoc, update, mark) this.toIndex.set(docId, udoc) } @@ -241,7 +237,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { udoc = this.extraIndex.get(docId) if (udoc !== undefined) { await this.stageUpdate(udoc, update) - udoc = this.updateDoc(udoc, update, mark !== false) + udoc = this.updateDoc(udoc, update, mark) this.extraIndex.set(docId, udoc) } } @@ -268,6 +264,12 @@ export class FullTextIndexPipeline implements FullTextPipeline { // Filter unsupported stages udoc.stages = update.stages + const stg = Object.values(udoc.stages) + if (!stg.includes(false) && stg.length === this.stages.length) { + // Check if all marks are true, we need to clear needIndex. + udoc.needIndex = false + } + if (Object.keys(update).length > 0) { this.currentStages[stageId] = (this.currentStages[stageId] ?? 0) + 1 this.stageChanged++ @@ -284,14 +286,48 @@ export class FullTextIndexPipeline implements FullTextPipeline { await this.flush(flush ?? false) } + // Update are commulative + async updateNeedIndex (docId: Ref, value: boolean, flush?: boolean): Promise { + const update = { needIndex: value } + let udoc = this.toIndex.get(docId) + if (udoc !== undefined) { + await this.stageUpdate(udoc, update) + + udoc = this.updateDoc(udoc, update, true) + this.toIndex.set(docId, udoc) + } + + if (udoc === undefined) { + udoc = this.extraIndex.get(docId) + if (udoc !== undefined) { + await this.stageUpdate(udoc, update) + udoc = this.updateDoc(udoc, update, true) + this.extraIndex.set(docId, udoc) + } + } + + if (udoc === undefined) { + // Some updated, document, let's load it. + udoc = (await this.storage.load(this.metrics, DOMAIN_DOC_INDEX_STATE, [docId])).shift() as DocIndexState + } + + const current = this.pending.get(docId) + if (current === undefined) { + this.pending.set(docId, update) + } else { + this.pending.set(docId, { ...current, ...update }) + } + + await this.flush(flush ?? false) + } + triggerCounts = 0 triggerIndexing = (): void => {} - skippedReiterationTimeout: any currentStages: Record = {} - private filterCurrentStages (udoc: DocIndexState): Record { - const result: Record = {} + private filterCurrentStages (udoc: DocIndexState): Record { + const result: Record = {} for (const [k, v] of Object.entries(udoc.stages ?? {})) { if (this.currentStages[k] !== undefined) { result[k] = v @@ -327,70 +363,9 @@ export class FullTextIndexPipeline implements FullTextPipeline { broadcastClasses = new Set>>() updateBroadcast: any = undefined - indexesCreated = false - async doIndexing (): Promise { // Check model is upgraded to support indexer. - if (!this.indexesCreated) { - this.indexesCreated = true - // We need to be sure we have individual indexes per stage. - const oldStagesRegex = [/fld-v.*/, /cnt-v.*/, /fts-v.*/, /sum-v.*/, /emb-v.*/] - - const deletePattern: RegExp[] = [] - const keepPattern: RegExp[] = [] - for (const st of this.stages) { - if (this.cancelling) { - return - } - const regexp = oldStagesRegex.find((r) => r.test(st.stageId)) - if (regexp !== undefined) { - deletePattern.push(regexp) - keepPattern.push(new RegExp(st.stageId)) - } - } - const helper = this.storage.helper() - if (deletePattern.length > 0) { - try { - const existingIndexes = await helper.listIndexes(DOMAIN_DOC_INDEX_STATE) - for (const existingIndex of existingIndexes) { - if (existingIndex.name !== undefined) { - const name: string = existingIndex.name - if (deletePattern.some((it) => it.test(name)) && !keepPattern.some((it) => it.test(name))) { - await helper.dropIndex(DOMAIN_DOC_INDEX_STATE, name) - } - } - } - } catch (err: any) { - console.error(err) - } - } - - for (const st of this.stages) { - if (this.cancelling) { - return - } - await this.storage.helper().createIndex( - DOMAIN_DOC_INDEX_STATE, - { - keys: { - ['stages.' + st.stageId]: 1 - } - }, - { name: 'stages.' + st.stageId + '_1' } - ) - } - } - - try { - this.hierarchy.getClass(core.class.DocIndexState) - } catch (err: any) { - this.metrics.warn('Models is not upgraded to support indexer', { - indexId: this.indexId, - workspace: this.workspace.name - }) - return - } await this.metrics.with('init-states', {}, async () => { await this.initStates() }) @@ -403,10 +378,6 @@ export class FullTextIndexPipeline implements FullTextPipeline { await this.initializeStages() }) - await this.metrics.with('process-remove', { workspace: this.workspace.name }, async () => { - await this.processRemove() - }) - const _classes = await this.metrics.with( 'processIndex', { workspace: this.workspace.name }, @@ -442,14 +413,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { this.triggerIndexing = () => { this.triggerCounts++ resolve(null) - clearTimeout(this.skippedReiterationTimeout) } - this.skippedReiterationTimeout = setTimeout(() => { - // Force skipped reiteration, just decrease by -1 - for (const [s, v] of Array.from(this.skipped.entries())) { - this.skipped.set(s, v - 1) - } - }, 60000) }) } } @@ -458,199 +422,174 @@ export class FullTextIndexPipeline implements FullTextPipeline { } private async processIndex (ctx: MeasureContext): Promise>[]> { - let idx = 0 const _classUpdate = new Set>>() - for (const st of this.stages) { - if (this.cancelling) { - return [] - } - idx++ - await rateLimiter.exec(async () => { - while (true) { - try { - if (this.cancelling) { - return Array.from(_classUpdate.values()) - } - if (!st.enabled) { - break - } - await ctx.with('flush', {}, async () => { - await this.flush(true) - }) - const toSkip = Array.from(this.skipped.entries()) - .filter((it) => it[1] > 3) - .map((it) => it[0]) - - const q: DocumentQuery = { - [`stages.${st.stageId}`]: { $ne: st.stageValue }, - removed: false - } - if (toSkip.length > 0) { - q._id = { $nin: toSkip } - } - let result = await ctx.with( - 'get-to-index', - {}, - async (ctx) => - await this.storage.findAll(ctx, core.class.DocIndexState, q, { - sort: { modifiedOn: SortingOrder.Descending }, - limit: globalIndexer.processingSize, - skipClass: true, - skipSpace: true - }) - ) - const toRemove: DocIndexState[] = [] - // Check and remove missing class documents. - result = toFindResult( - result.filter((doc) => { - const _class = this.model.findObject(doc.objectClass) - if (_class === undefined) { - // no _class present, remove doc - toRemove.push(doc) - return false - } - return true - }), - result.total - ) - - if (toRemove.length > 0) { - try { - await this.storage.clean( - this.metrics, - DOMAIN_DOC_INDEX_STATE, - toRemove.map((it) => it._id) - ) - } catch (err: any) { - Analytics.handleError(err) - // QuotaExceededError, ignore - } - } - - if (result.length > 0) { - this.metrics.info('Full text: Indexing', { - indexId: this.indexId, - stageId: st.stageId, - workspace: this.workspace.name, - ...this.currentStages - }) - } else { - // Nothing to index, check on next cycle. - break - } - - this.toIndex = new Map(result.map((it) => [it._id, it])) - - this.extraIndex.clear() - this.stageChanged = 0 - // Find documents matching query - const toIndex = this.matchStates(st) - - if (toIndex.length > 0) { - // Do Indexing - this.currentStage = st - - await ctx.with('collect-' + st.stageId, {}, async (ctx) => { - await st.collect(toIndex, this, ctx) - }) - if (this.cancelling) { - break - } - - toIndex.forEach((it) => _classUpdate.add(it.objectClass)) - - // go with next stages if they accept it - for (const nst of this.stages.slice(idx)) { - const toIndex2 = this.matchStates(nst) - if (toIndex2.length > 0) { - this.currentStage = nst - await ctx.with('collect-' + nst.stageId, {}, async (ctx) => { - await nst.collect(toIndex2, this, ctx) - }) - } - if (this.cancelling) { - break - } - } - } else { - break - } - - // Check items with not updated state. - for (const d of toIndex) { - if (d.stages?.[st.stageId] === false) { - this.skipped.set(d._id, (this.skipped.get(d._id) ?? 0) + 1) - } else { - this.skipped.delete(d._id) - } - } - } catch (err: any) { - Analytics.handleError(err) - this.metrics.error('error during index', { error: err }) + await rateLimiter.exec(async () => { + while (true) { + try { + if (this.cancelling) { + return Array.from(_classUpdate.values()) } + await ctx.with('flush', {}, async () => { + await this.flush(true) + }) + + let result: DocIndexState[] | undefined = await ctx.with('get-indexable', {}, async () => { + const q: DocumentQuery = { + needIndex: true + } + return await this.storage.findAll(ctx, core.class.DocIndexState, q, { + limit: globalIndexer.processingSize, + skipClass: true, + skipSpace: true + }) + }) + if (result === undefined) { + // No more results + break + } + + await this.processRemove(result) + result = result.filter((it) => !it.removed) + const toRemove: DocIndexState[] = [] + // Check and remove missing class documents. + result = result.filter((doc) => { + const _class = this.model.findObject(doc.objectClass) + if (_class === undefined) { + // no _class present, remove doc + toRemove.push(doc) + return false + } + return true + }) + + if (toRemove.length > 0) { + try { + await this.storage.clean( + this.metrics, + DOMAIN_DOC_INDEX_STATE, + toRemove.map((it) => it._id) + ) + } catch (err: any) { + Analytics.handleError(err) + // QuotaExceededError, ignore + } + } + + if (result.length > 0) { + this.metrics.info('Full text: Indexing', { + indexId: this.indexId, + workspace: this.workspace.name, + ...this.currentStages + }) + } else { + // Nothing to index, check on next cycle. + break + } + const retry: DocIndexState[] = [] + + await this.processStages(result, ctx, _classUpdate) + + // Force clear needIndex, it will be re trigger if some propogate will happen next. + if (!this.cancelling) { + for (const u of result) { + const stg = Object.values(u.stages) + if (!stg.includes(false) && stg.length === this.stages.length) { + // Check if all marks are true, we need to clear needIndex. + u.needIndex = false + await this.updateNeedIndex(u._id, false) + } else { + // Mark as retry on + retry.push(u) + } + } + } + if (retry.length > 0) { + await this.processStages(retry, ctx, _classUpdate) + if (!this.cancelling) { + for (const u of retry) { + // Since retry is happen, it shoudl be marked already. + u.needIndex = false + await this.updateNeedIndex(u._id, false) + } + } + } + } catch (err: any) { + Analytics.handleError(err) + this.metrics.error('error during index', { error: err }) } - }) - } + } + }) return Array.from(_classUpdate.values()) } - private async processRemove (): Promise { - let total = 0 - while (true) { - const result = await this.storage.findAll( - this.metrics, - core.class.DocIndexState, - { - removed: true - }, - { - limit: 1000, - projection: { - _id: 1, - stages: 1, - objectClass: 1 - }, - skipSpace: true, - skipClass: true - } - ) - - this.toIndex = new Map(result.map((it) => [it._id, it])) - + private async processStages ( + result: DocIndexState[], + ctx: MeasureContext, + _classUpdate: Set>> + ): Promise { + this.toIndex = new Map(result.map((it) => [it._id, it])) + for (const st of this.stages) { this.extraIndex.clear() + this.stageChanged = 0 + // Find documents matching query + const toIndex = this.matchStates(st) - const toIndex = Array.from(this.toIndex.values()) - const toRemoveIds = [] - for (const st of this.stages) { - if (toIndex.length > 0) { - // Do Indexing - this.currentStage = st - await st.remove(toIndex, this) - } else { + if (toIndex.length > 0) { + // Do Indexing + this.currentStage = st + + await ctx.with('collect-' + st.stageId, {}, async (ctx) => { + await st.collect(toIndex, this, ctx) + }) + if (this.cancelling) { break } - } - // If all stages are complete, remove document - const allStageIds = this.stages.map((it) => it.stageId) - for (const doc of toIndex) { - if (allStageIds.every((it) => doc.stages[it])) { - toRemoveIds.push(doc._id) - } - } - await this.flush(true) - if (toRemoveIds.length > 0) { - await this.storage.clean(this.metrics, DOMAIN_DOC_INDEX_STATE, toRemoveIds) - total += toRemoveIds.length - this.metrics.info('indexer', { - _classes: Array.from(groupByArray(toIndex, (it) => it.objectClass).keys()), - total, - count: toRemoveIds.length - }) + toIndex.forEach((it) => _classUpdate.add(it.objectClass)) + } else { + continue + } + } + } + + private async processRemove (docs: DocIndexState[]): Promise { + let total = 0 + this.toIndex = new Map(docs.map((it) => [it._id, it])) + + this.extraIndex.clear() + + const toIndex = Array.from(this.toIndex.values()).filter((it) => it.removed) + if (toIndex.length === 0) { + return + } + const toRemoveIds = [] + for (const st of this.stages) { + if (toIndex.length > 0) { + // Do Indexing + this.currentStage = st + await st.remove(toIndex, this) } else { break } } + // If all stages are complete, remove document + const allStageIds = this.stages.map((it) => it.stageId) + for (const doc of toIndex) { + if (allStageIds.every((it) => doc.stages[it])) { + toRemoveIds.push(doc._id) + } + } + + await this.flush(true) + if (toRemoveIds.length > 0) { + await this.storage.clean(this.metrics, DOMAIN_DOC_INDEX_STATE, toRemoveIds) + total += toRemoveIds.length + this.metrics.info('indexer', { + _classes: Array.from(groupByArray(toIndex, (it) => it.objectClass).keys()), + total, + count: toRemoveIds.length + }) + } } private async initStates (): Promise { @@ -665,7 +604,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { const require = [...st.require].filter((it) => this.stages.find((q) => q.stageId === it && q.enabled)) for (const o of this.toIndex.values()) { // We need to contain all state values - if (require.every((it) => o.stages?.[it])) { + if (require.every((it) => o.stages?.[it]) && !(o.stages?.[st.stageId] ?? false)) { toIndex.push(o) } } diff --git a/server/core/src/indexer/summary.ts b/server/core/src/indexer/summary.ts index ac70196ff5..77693ed988 100644 --- a/server/core/src/indexer/summary.ts +++ b/server/core/src/indexer/summary.ts @@ -13,6 +13,7 @@ // limitations under the License. // +import { Analytics } from '@hcengineering/analytics' import core, { type AnyAttribute, type Class, @@ -21,12 +22,11 @@ import core, { type DocumentQuery, type DocumentUpdate, extractDocKey, + getFullTextContext, type Hierarchy, - type IndexStageState, isFullTextAttribute, type MeasureContext, - type Ref, - getFullTextContext + type Ref } from '@hcengineering/core' import { translate } from '@hcengineering/platform' import { jsonToText, markupToJSON } from '@hcengineering/text' @@ -37,15 +37,10 @@ import { type DocUpdateHandler, fieldStateId, type FullTextPipeline, - type FullTextPipelineStage + type FullTextPipelineStage, + summaryStageId } from './types' -import { collectPropagate, collectPropagateClasses, isCustomAttr, loadIndexStageStage } from './utils' -import { Analytics } from '@hcengineering/analytics' - -/** - * @public - */ -export const summaryStageId = 'sum-v5' +import { collectPropagate, collectPropagateClasses, isCustomAttr } from './utils' /** * @public @@ -65,35 +60,12 @@ export class FullSummaryStage implements FullTextPipelineStage { fieldFilter: ((attr: AnyAttribute, value: string) => boolean)[] = [] - stageValue: boolean | string = true - - indexState?: IndexStageState - // Summary should be not a bigger what 1mb of data. summaryLimit = 1024 * 1024 constructor (private readonly dbStorage: ServerStorage) {} - async initialize (ctx: MeasureContext, storage: DbAdapter, pipeline: FullTextPipeline): Promise { - const indexable = ( - await pipeline.model.findAll(core.class.Class, { [core.mixin.FullTextSearchContext]: { $exists: true } }) - ) - .map((it) => pipeline.hierarchy.as(it, core.mixin.FullTextSearchContext)) - .filter((it) => it.fullTextSummary) - .map((it) => it._id + (it.propagateClasses ?? []).join('|')) - indexable.sort() - ;[this.stageValue, this.indexState] = await loadIndexStageStage( - ctx, - storage, - this.indexState, - this.stageId, - 'config', - { - classes: indexable, - matchExtra: this.matchExtra - } - ) - } + async initialize (ctx: MeasureContext, storage: DbAdapter, pipeline: FullTextPipeline): Promise {} async search ( _classes: Ref>[], @@ -107,7 +79,7 @@ export class FullSummaryStage implements FullTextPipelineStage { async collect (toIndex: DocIndexState[], pipeline: FullTextPipeline, metrics: MeasureContext): Promise { const part = [...toIndex] while (part.length > 0) { - const toIndexPart = part.splice(0, 1000) + const toIndexPart = part.splice(0, 100) const kids = toIndexPart.map((it) => it._id) const allChildDocs = await metrics.with( @@ -128,7 +100,7 @@ export class FullSummaryStage implements FullTextPipelineStage { // No need to index this class, mark embeddings as empty ones. if (!needIndex) { - await pipeline.update(doc._id, this.stageValue, {}) + await pipeline.update(doc._id, true, {}) continue } @@ -201,7 +173,7 @@ export class FullSummaryStage implements FullTextPipelineStage { update.fullSummary = embeddingText - await pipeline.update(doc._id, this.stageValue, update) + await pipeline.update(doc._id, true, update) } } } @@ -209,7 +181,7 @@ export class FullSummaryStage implements FullTextPipelineStage { async remove (docs: DocIndexState[], pipeline: FullTextPipeline): Promise { // will be handled by field processor for (const doc of docs) { - await pipeline.update(doc._id, this.stageValue, {}) + await pipeline.update(doc._id, true, {}) } } } diff --git a/server/core/src/indexer/types.ts b/server/core/src/indexer/types.ts index 3ce2f3c952..e1afcba2a6 100644 --- a/server/core/src/indexer/types.ts +++ b/server/core/src/indexer/types.ts @@ -35,7 +35,7 @@ export interface FullTextPipeline { model: ModelDb update: ( docId: Ref, - mark: boolean | string, + mark: boolean, update: DocumentUpdate, flush?: boolean ) => Promise @@ -75,9 +75,6 @@ export interface FullTextPipelineStage { updateFields: DocUpdateHandler[] enabled: boolean - - stageValue: boolean | string - initialize: (ctx: MeasureContext, storage: DbAdapter, pipeline: FullTextPipeline) => Promise // Collect all changes related to bulk of document states @@ -109,6 +106,16 @@ export const fieldStateId = 'fld-v15' */ export const fullTextPushStageId = 'fts-v17' +/** + * @public + */ +export const summaryStageId = 'sum-v5' + +/** + * @public + */ +export const collabStageId = 'collab-v1' + /** * @public */ diff --git a/server/core/src/indexer/utils.ts b/server/core/src/indexer/utils.ts index 4fbc8a687b..c315787084 100644 --- a/server/core/src/indexer/utils.ts +++ b/server/core/src/indexer/utils.ts @@ -20,18 +20,12 @@ import core, { type Doc, type DocIndexState, type FullTextSearchContext, - generateId, getFullTextContext, type Hierarchy, - type IndexStageState, - type MeasureContext, type Obj, type Ref, - type Space, - TxFactory + type Space } from '@hcengineering/core' -import { deepEqual } from 'fast-equals' -import { type DbAdapter } from '../adapter' import plugin from '../plugin' import { type FullTextPipeline } from './types' @@ -81,59 +75,6 @@ export function createStateDoc ( } } -/** - * @public - */ -export async function loadIndexStageStage ( - ctx: MeasureContext, - storage: DbAdapter, - state: IndexStageState | undefined, - stageId: string, - field: string, - newValue: any -): Promise<[boolean | string, IndexStageState]> { - if (state === undefined) { - ;[state] = await storage.findAll(ctx, core.class.IndexStageState, { stageId }, { limit: 1 }) - } - const attributes: Record = state?.attributes ?? {} - - let result: boolean | string | undefined = attributes?.index !== undefined ? `${attributes?.index as number}` : true - - if (!deepEqual(attributes[field], newValue)) { - // Not match, - const newIndex = ((attributes.index as number) ?? 0) + 1 - result = `${newIndex}` - - const ops = new TxFactory(core.account.System, true) - const data = { - stageId, - attributes: { - [field]: newValue, - index: newIndex - } - } - if (state === undefined) { - const id: Ref = generateId() - await storage.tx(ctx, ops.createTxCreateDoc(core.class.IndexStageState, plugin.space.DocIndexState, data, id)) - state = { - ...data, - _class: core.class.IndexStageState, - _id: id, - space: plugin.space.DocIndexState, - modifiedBy: core.account.System, - modifiedOn: Date.now() - } - } else { - await storage.tx( - ctx, - ops.createTxUpdateDoc(core.class.IndexStageState, plugin.space.DocIndexState, state._id, data) - ) - state = { ...state, ...data, modifiedOn: Date.now() } - } - } - return [result, state] -} - /** * @public */ diff --git a/server/core/src/server/storage.ts b/server/core/src/server/storage.ts index 9e320e291c..12a43e2b46 100644 --- a/server/core/src/server/storage.ts +++ b/server/core/src/server/storage.ts @@ -98,7 +98,6 @@ export class TServerStorage implements ServerStorage { branding: Branding | null domainInfo = new Map() - statsCtx: MeasureContext emptyAdapter = new DummyDbAdapter() @@ -125,7 +124,6 @@ export class TServerStorage implements ServerStorage { this.branding = options.branding this.setModel(model) - this.statsCtx = metrics.newChild('stats-' + this.workspaceId.name, {}) } async initDomainInfo (): Promise { diff --git a/server/mongo/src/storage.ts b/server/mongo/src/storage.ts index 89b96d1f0e..3fa7f6ec37 100644 --- a/server/mongo/src/storage.ts +++ b/server/mongo/src/storage.ts @@ -731,7 +731,7 @@ abstract class MongoAdapterBase implements DbAdapter { const coll = this.collection(domain) const mongoQuery = this.translateQuery(_class, query, options) - if (options?.limit === 1) { + if (options?.limit === 1 || typeof query._id === 'string') { // Skip sort/projection/etc. return await ctx.with( 'find-one', @@ -748,11 +748,11 @@ abstract class MongoAdapterBase implements DbAdapter { const doc = await coll.findOne(mongoQuery, findOptions) let total = -1 - if (options.total === true) { + if (options?.total === true) { total = await coll.countDocuments(mongoQuery) } if (doc != null) { - return toFindResult([doc as unknown as T], total) + return toFindResult([this.stripHash(doc as unknown as T) as T], total) } return toFindResult([], total) }, @@ -783,7 +783,6 @@ abstract class MongoAdapterBase implements DbAdapter { cursor = cursor.limit(options.limit ?? 1) } } - // Error in case of timeout try { let res: T[] = [] diff --git a/server/server/src/apm.ts b/server/server/src/apm.ts index a2a915bf07..41407c404c 100644 --- a/server/server/src/apm.ts +++ b/server/server/src/apm.ts @@ -79,11 +79,29 @@ export class APMMeasureContext implements MeasureContext { if (value instanceof Promise) { value = await value } - c.end() return value } catch (err: any) { c.error(err) throw err + } finally { + c.end() + } + } + + withSync( + name: string, + params: ParamsType, + op: (ctx: MeasureContext) => T, + fullParams?: FullParamsType | (() => FullParamsType) + ): T { + const c = this.newChild(name, params) + try { + return op(c) + } catch (err: any) { + c.error(err) + throw err + } finally { + c.end() } } diff --git a/server/tool/src/index.ts b/server/tool/src/index.ts index e6e3c503eb..139968640d 100644 --- a/server/tool/src/index.ts +++ b/server/tool/src/index.ts @@ -348,7 +348,7 @@ export async function upgradeModel ( ) const upgradeIndexes = async (): Promise => { - ctx.info('Migrate to sparse indexes') + ctx.info('Migrate indexes') // Create update indexes await createUpdateIndexes( ctx, @@ -385,7 +385,7 @@ export async function upgradeModel ( await tryMigrate(migrateClient, coreId, [ { - state: 'indexes-v2', + state: 'indexes-v3', func: upgradeIndexes } ]) diff --git a/server/translate/src/retranslate.ts b/server/translate/src/retranslate.ts index f661b0a60d..e94cfebdcc 100644 --- a/server/translate/src/retranslate.ts +++ b/server/translate/src/retranslate.ts @@ -19,7 +19,6 @@ import { DocIndexState, DocumentQuery, DocumentUpdate, - IndexStageState, MeasureContext, Ref, WorkspaceId @@ -33,12 +32,10 @@ import { extractDocKey, fieldStateId, FullTextPipeline, - IndexedDoc, - loadIndexStageStage + IndexedDoc } from '@hcengineering/server-core' import got from 'got' -import translatePlugin from './plugin' import { translateStateId, TranslationStage } from './types' /** @@ -59,40 +56,9 @@ export class LibRetranslateStage implements TranslationStage { token: string = '' endpoint: string = '' - stageValue: boolean | string = true - - indexState?: IndexStageState - constructor (readonly workspaceId: WorkspaceId) {} - async initialize (ctx: MeasureContext, storage: DbAdapter, pipeline: FullTextPipeline): Promise { - // Just do nothing - try { - const config = await storage.findAll(ctx, translatePlugin.class.TranslateConfiguration, {}) - if (config.length > 0) { - this.enabled = config[0].enabled - this.token = config[0].token - this.endpoint = config[0].endpoint - } else { - this.enabled = false - } - } catch (err: any) { - console.error(err) - this.enabled = false - } - - ;[this.stageValue, this.indexState] = await loadIndexStageStage( - ctx, - storage, - this.indexState, - this.stageId, - 'config', - { - enabled: this.enabled, - endpoint: this.endpoint - } - ) - } + async initialize (ctx: MeasureContext, storage: DbAdapter, pipeline: FullTextPipeline): Promise {} async search ( _classes: Ref>[], @@ -247,13 +213,13 @@ export class LibRetranslateStage implements TranslationStage { return } - await pipeline.update(doc._id, this.stageValue, update, true) + await pipeline.update(doc._id, true, update, true) } async remove (docs: DocIndexState[], pipeline: FullTextPipeline): Promise { // will be handled by field processor for (const doc of docs) { - await pipeline.update(doc._id, this.stageValue, {}) + await pipeline.update(doc._id, true, {}) } } } diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index a3e150b78e..c022aac77b 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -431,18 +431,13 @@ function createWebsocketClientSocket ( }, data: () => data, send: async (ctx: MeasureContext, msg, binary, compression) => { - const sst = Date.now() const smsg = rpcHandler.serialize(msg, binary) - ctx.measure('serialize', Date.now() - sst) ctx.measure('send-data', smsg.length) const st = Date.now() if (ws.readyState !== ws.OPEN || cs.isClosed) { return } - if (ws.bufferedAmount > 16 * 1024) { - ctx.measure('send-bufferAmmount', 1) - } ws.send(smsg, { binary: true, compress: compression }, (err) => { if (err != null) { if (!`${err.message}`.includes('WebSocket is not open')) { diff --git a/server/ws/src/utils.ts b/server/ws/src/utils.ts index 54d26ff7a1..e0ddb2744f 100644 --- a/server/ws/src/utils.ts +++ b/server/ws/src/utils.ts @@ -40,11 +40,8 @@ export function processRequest ( buff: any, handleRequest: HandleRequestFunction ): void { - const st = Date.now() try { const request = cs.readRequest(buff, session.binaryMode) - const ed = Date.now() - context.measure('deserialize', ed - st) handleRequest(context, session, cs, request, workspaceId) } catch (err: any) { if (((err.message as string) ?? '').includes('Data read, but end of buffer not reached')) { diff --git a/tests/sanity/tests/chat/chat.spec.ts b/tests/sanity/tests/chat/chat.spec.ts index 58abaf09a3..19eb25affd 100644 --- a/tests/sanity/tests/chat/chat.spec.ts +++ b/tests/sanity/tests/chat/chat.spec.ts @@ -1,13 +1,14 @@ -import { test, expect } from '@playwright/test' -import { PlatformURI, generateTestData } from '../utils' -import { LeftSideMenuPage } from '../model/left-side-menu-page' -import { ChunterPage } from '../model/chunter-page' -import { ChannelPage } from '../model/channel-page' -import { ApiEndpoint } from '../API/Api' -import { LoginPage } from '../model/login-page' -import { SignUpData } from '../model/common-types' import { faker } from '@faker-js/faker' +import { expect, test } from '@playwright/test' +import { ApiEndpoint } from '../API/Api' +import { ChannelPage } from '../model/channel-page' +import { ChunterPage } from '../model/chunter-page' +import { SignUpData } from '../model/common-types' +import { LeftSideMenuPage } from '../model/left-side-menu-page' +import { LoginPage } from '../model/login-page' +import { SelectWorkspacePage } from '../model/select-workspace-page' import { SignInJoinPage } from '../model/signin-page' +import { PlatformURI, generateTestData } from '../utils' test.describe('channel tests', () => { let leftSideMenuPage: LeftSideMenuPage @@ -36,7 +37,9 @@ test.describe('channel tests', () => { await api.createWorkspaceWithLogin(data.workspaceName, data.userName, '1234') await (await page.goto(`${PlatformURI}`))?.finished() await loginPage.login(data.userName, '1234') - await (await page.goto(`${PlatformURI}/workbench/${data.workspaceName}`))?.finished() + const swp = new SelectWorkspacePage(page) + await swp.selectWorkspace(data.workspaceName) + // await (await page.goto(`${PlatformURI}/workbench/${data.workspaceName}`))?.finished() }) test('create new private channel and check if the messages stays on it', async ({ browser, page }) => { diff --git a/tests/sanity/tests/custom-atributes/custom-attributes.spec.ts b/tests/sanity/tests/custom-atributes/custom-attributes.spec.ts index 0b591306ee..36f2530bb0 100644 --- a/tests/sanity/tests/custom-atributes/custom-attributes.spec.ts +++ b/tests/sanity/tests/custom-atributes/custom-attributes.spec.ts @@ -12,6 +12,7 @@ import { NewCompany } from '../model/recruiting/types' import { NavigationMenuPage } from '../model/recruiting/navigation-menu-page' import { CompaniesPage } from '../model/recruiting/companies-page' import { CompanyDetailsPage } from '../model/recruiting/company-details-page' +import { SelectWorkspacePage } from '../model/select-workspace-page' test.describe('Custom attributes tests', () => { let leftSideMenuPage: LeftSideMenuPage @@ -42,7 +43,9 @@ test.describe('Custom attributes tests', () => { await api.createWorkspaceWithLogin(data.workspaceName, data.userName, '1234') await (await page.goto(`${PlatformURI}`))?.finished() await loginPage.login(data.userName, '1234') - await (await page.goto(`${PlatformURI}/workbench/${data.workspaceName}`))?.finished() + const swp = new SelectWorkspacePage(page) + await swp.selectWorkspace(data.workspaceName) + // await (await page.goto(`${PlatformURI}/workbench/${data.workspaceName}`))?.finished() }) test('Check if all custom attributes exists', async ({ browser, page }) => { diff --git a/tests/sanity/tests/inbox/inbox.spec.ts b/tests/sanity/tests/inbox/inbox.spec.ts index 8e7128a396..d20d2d4fea 100644 --- a/tests/sanity/tests/inbox/inbox.spec.ts +++ b/tests/sanity/tests/inbox/inbox.spec.ts @@ -12,6 +12,7 @@ import { SignInJoinPage } from '../model/signin-page' import { ChannelPage } from '../model/channel-page' import { UserProfilePage } from '../model/profile/user-profile-page' import { MenuItems, NotificationsPage } from '../model/profile/notifications-page' +import { SelectWorkspacePage } from '../model/select-workspace-page' test.describe('Inbox tests', () => { let leftSideMenuPage: LeftSideMenuPage @@ -40,7 +41,9 @@ test.describe('Inbox tests', () => { await api.createWorkspaceWithLogin(data.workspaceName, data.userName, '1234') await (await page.goto(`${PlatformURI}`))?.finished() await loginPage.login(data.userName, '1234') - await (await page.goto(`${PlatformURI}/workbench/${data.workspaceName}`))?.finished() + const swp = new SelectWorkspacePage(page) + await swp.selectWorkspace(data.workspaceName) + // await (await page.goto(`${PlatformURI}/workbench/${data.workspaceName}`))?.finished() }) test('User is able to create a task, assign a himself and see it inside the inbox', async ({ page }) => { diff --git a/tests/sanity/tests/planning/plan.spec.ts b/tests/sanity/tests/planning/plan.spec.ts index 06d8df2009..1dfd5ca6d0 100644 --- a/tests/sanity/tests/planning/plan.spec.ts +++ b/tests/sanity/tests/planning/plan.spec.ts @@ -11,6 +11,7 @@ import { ApiEndpoint } from '../API/Api' import { LoginPage } from '../model/login-page' import { SignInJoinPage } from '../model/signin-page' import { TeamPage } from '../model/team-page' +import { SelectWorkspacePage } from '../model/select-workspace-page' test.use({ storageState: PlatformSetting @@ -192,7 +193,9 @@ test.describe('Planning ToDo tests', () => { await api.createWorkspaceWithLogin(data.workspaceName, data.userName, '1234') await (await page.goto(`${PlatformURI}`))?.finished() await loginPage.login(data.userName, '1234') - await (await page.goto(`${PlatformURI}/workbench/${data.workspaceName}`))?.finished() + const swp = new SelectWorkspacePage(page) + await swp.selectWorkspace(data.workspaceName) + // await (await page.goto(`${PlatformURI}/workbench/${data.workspaceName}`))?.finished() await leftSideMenuPage.clickPlanner() const planningNavigationMenuPage = new PlanningNavigationMenuPage(page)