diff --git a/models/core/src/core.ts b/models/core/src/core.ts index 69e5b3dbf2..d03c01d3c7 100644 --- a/models/core/src/core.ts +++ b/models/core/src/core.ts @@ -318,6 +318,11 @@ export class TDocIndexState extends TDoc implements DocIndexState { @Index(IndexKind.Indexed) @Hidden() stages!: Record + + @Prop(TypeString(), getEmbeddedLabel('Generation')) + @Index(IndexKind.Indexed) + @Hidden() + generationId?: string } @Model(core.class.IndexStageState, core.class.Doc, DOMAIN_DOC_INDEX_STATE) diff --git a/packages/core/src/classes.ts b/packages/core/src/classes.ts index 20e3a234e3..b3a7e92700 100644 --- a/packages/core/src/classes.ts +++ b/packages/core/src/classes.ts @@ -432,6 +432,8 @@ export interface DocIndexState extends Doc { attachedTo?: Ref attachedToClass?: Ref> + generationId?: string + // States for stages stages: Record diff --git a/server/core/src/indexer/indexer.ts b/server/core/src/indexer/indexer.ts index 13bd45f492..78107310b8 100644 --- a/server/core/src/indexer/indexer.ts +++ b/server/core/src/indexer/indexer.ts @@ -17,6 +17,7 @@ import core, { AttachedDoc, Class, DOMAIN_DOC_INDEX_STATE, + DOMAIN_FULLTEXT_BLOB, Doc, DocIndexState, DocumentQuery, @@ -32,7 +33,8 @@ import core, { setObjectValue, toFindResult, versionToString, - docKey + docKey, + generateId } from '@hcengineering/core' import { DbAdapter } from '../adapter' import { RateLimitter } from '../limitter' @@ -600,44 +602,83 @@ export class FullTextIndexPipeline implements FullTextPipeline { console.log(this.workspace.name, 'checking index', c) - // All saved state documents - const states = ( - await this.storage.findAll(core.class.DocIndexState, { objectClass: c }, { projection: { _id: 1 } }) - ).map((it) => it._id) + const generationId = generateId() + + let lastId = '' while (true) { if (this.cancelling) { return } + let newDocs: DocIndexState[] = [] + let updates = new Map, DocumentUpdate>() + try { - newDocs = ( - await dbStorage.findAll( - this.metrics, - c, - { _class: c, _id: { $nin: states } }, - { limit: 500, projection: { _id: 1, attachedTo: 1, attachedToClass: 1 } as any } + const docs = await dbStorage.findAll( + this.metrics, + c, + { _class: c, _id: { $gt: lastId as any } }, + { + limit: 10000, + sort: { _id: 1 }, + projection: { _id: 1, attachedTo: 1, attachedToClass: 1 } as any + } + ) + + if (docs.length === 0) { + // All updated for this class + break + } + + lastId = docs[docs.length - 1]._id + + const states = ( + await this.storage.findAll( + core.class.DocIndexState, + { + objectClass: c, + _id: { + $gte: docs[0]._id as any, + $lte: docs[docs.length - 1]._id as any + } + }, + { projection: { _id: 1 } } ) - ).map((it) => { - return createStateDoc(it._id, c, { - stages: {}, - attributes: {}, - removed: false, - space: it.space, - attachedTo: (it as AttachedDoc)?.attachedTo ?? undefined, - attachedToClass: (it as AttachedDoc)?.attachedToClass ?? undefined + ).map((it) => it._id) + const statesSet = new Set(states) + + // create missing index states + newDocs = docs + .filter((it) => !statesSet.has(it._id as Ref)) + .map((it) => { + return createStateDoc(it._id, c, { + generationId, + stages: {}, + attributes: {}, + removed: false, + space: it.space, + attachedTo: (it as AttachedDoc)?.attachedTo ?? undefined, + attachedToClass: (it as AttachedDoc)?.attachedToClass ?? undefined + }) + }) + + // update generationId for existing index states + updates = new Map() + docs + .filter((it) => statesSet.has(it._id as Ref)) + .forEach((it) => { + updates.set(it._id as Ref, { generationId }) }) - }) } catch (e) { console.error(e) break } - states.push(...newDocs.map((it) => it._id)) - - if (newDocs.length === 0) { - // All updated for this class - break + try { + await this.storage.update(DOMAIN_DOC_INDEX_STATE, updates) + } catch (err: any) { + console.error(err) } try { @@ -646,11 +687,20 @@ export class FullTextIndexPipeline implements FullTextPipeline { console.error(err) } } - const statesSet = new Set(states) - const docIds = (await dbStorage.findAll(this.metrics, c, { _class: c }, { projection: { _id: 1 } })) - .filter((it) => !statesSet.has(it._id as Ref)) - .map((it) => it._id) - await this.storage.clean(DOMAIN_DOC_INDEX_STATE, docIds) + + // remove index states for documents that do not exist + const toRemove = ( + await this.storage.findAll( + core.class.DocIndexState, + { objectClass: c, generationId: { $ne: generationId } }, + { projection: { _id: 1 } } + ) + ).map((it) => it._id) + + if (toRemove.length > 0) { + await this.storage.clean(DOMAIN_DOC_INDEX_STATE, toRemove) + await this.storage.clean(DOMAIN_FULLTEXT_BLOB, toRemove) + } } // Clean for non existing classes