From 867990ab837e3818c818c437ed89f0e83aa38406 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Tue, 12 Nov 2024 11:27:37 +0700 Subject: [PATCH] Fix indexer start (#7157) Signed-off-by: Andrey Sobolev --- pods/fulltext/package.json | 1 + pods/fulltext/src/server.ts | 1 + server/core/src/types.ts | 49 +------------------- server/elastic/src/adapter.ts | 30 +++++++++++++ server/indexer/src/indexer/indexer.ts | 64 +++++++++++++++------------ 5 files changed, 69 insertions(+), 76 deletions(-) diff --git a/pods/fulltext/package.json b/pods/fulltext/package.json index ed47d1c9e0..956091dc0c 100644 --- a/pods/fulltext/package.json +++ b/pods/fulltext/package.json @@ -18,6 +18,7 @@ "bundle": "mkdir -p bundle && rushx get-model && esbuild src/index.ts --keep-names --bundle --platform=node --external:*.node --outfile=bundle/bundle.js --log-level=error --sourcemap=external", "docker:build": "../../common/scripts/docker_build.sh hardcoreeng/fulltext", "docker:tbuild": "docker build -t hardcoreeng/fulltext . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/fulltext", + "docker:abuild": "docker build -t hardcoreeng/fulltext . --platform=linux/arm64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/fulltext", "docker:staging": "../../common/scripts/docker_tag.sh hardcoreeng/fulltext staging", "docker:push": "../../common/scripts/docker_tag.sh hardcoreeng/fulltext", "format": "format src", diff --git a/pods/fulltext/src/server.ts b/pods/fulltext/src/server.ts index 79f1b190fd..97587f6bbe 100644 --- a/pods/fulltext/src/server.ts +++ b/pods/fulltext/src/server.ts @@ -244,6 +244,7 @@ export async function startIndexer ( ctx.error('Workspace not available for token') return } + ctx.warn('indexer created', { workspace: workspace.name }) idx = WorkspaceIndexer.create( ctx, opt.model, diff --git a/server/core/src/types.ts b/server/core/src/types.ts index 3549ad6ee1..5c0ccff2d8 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -323,6 +323,8 @@ export interface FullTextAdapter { index: (ctx: MeasureContext, workspace: WorkspaceId, doc: IndexedDoc) => Promise update: (ctx: MeasureContext, workspace: WorkspaceId, id: Ref, update: Record) => Promise remove: (ctx: MeasureContext, workspace: WorkspaceId, id: Ref[]) => Promise + + clean: (ctx: MeasureContext, workspace: WorkspaceId) => Promise updateMany: (ctx: MeasureContext, workspace: WorkspaceId, docs: IndexedDoc[]) => Promise load: (ctx: MeasureContext, workspace: WorkspaceId, docs: Ref[]) => Promise searchString: ( @@ -347,53 +349,6 @@ export interface FullTextAdapter { initMapping: (ctx: MeasureContext, field?: { key: string, dims: number }) => Promise } -/** - * @public - */ -export class DummyFullTextAdapter implements FullTextAdapter { - async initMapping (ctx: MeasureContext): Promise { - return true - } - - async index (ctx: MeasureContext, workspace: WorkspaceId, doc: IndexedDoc): Promise { - return {} - } - - async load (ctx: MeasureContext, workspace: WorkspaceId, docs: Ref[]): Promise { - return [] - } - - async update ( - ctx: MeasureContext, - workspace: WorkspaceId, - id: Ref, - update: Record - ): Promise { - return {} - } - - async updateMany (ctx: MeasureContext, workspace: WorkspaceId, docs: IndexedDoc[]): Promise { - return [] - } - - async searchString ( - ctx: MeasureContext, - workspace: WorkspaceId, - query: SearchQuery, - options: SearchOptions - ): Promise { - return { docs: [] } - } - - async search (ctx: MeasureContext, workspace: WorkspaceId, query: any): Promise { - return [] - } - - async remove (ctx: MeasureContext, workspace: WorkspaceId, id: Ref[]): Promise {} - - async close (): Promise {} -} - /** * @public */ diff --git a/server/elastic/src/adapter.ts b/server/elastic/src/adapter.ts index c263f7ac34..c49fba0871 100644 --- a/server/elastic/src/adapter.ts +++ b/server/elastic/src/adapter.ts @@ -457,6 +457,36 @@ class ElasticAdapter implements FullTextAdapter { } } + async clean (ctx: MeasureContext, workspaceId: WorkspaceId): Promise { + try { + await this.client.deleteByQuery( + { + type: '_doc', + index: this.indexName, + body: { + query: { + bool: { + must: [ + { + match: { + workspaceId: { query: toWorkspaceString(workspaceId), operator: 'and' } + } + } + ] + } + } + } + }, + undefined + ) + } catch (e: any) { + if (e instanceof esErr.ResponseError && e.meta.statusCode === 404) { + return + } + throw e + } + } + async load (ctx: MeasureContext, workspaceId: WorkspaceId, docs: Ref[]): Promise { const resp = await this.client.search({ index: this.indexName, diff --git a/server/indexer/src/indexer/indexer.ts b/server/indexer/src/indexer/indexer.ts index 63bc00787d..34ecb2c100 100644 --- a/server/indexer/src/indexer/indexer.ts +++ b/server/indexer/src/indexer/indexer.ts @@ -176,34 +176,11 @@ export class FullTextIndexPipeline implements FullTextPipeline { @withContext('verify-workspace') async verifyWorkspace (ctx: MeasureContext, indexing: () => void): Promise { - const fullReindex = 'full-text-indexer-v2' - const docStructure = 'full-text-structure-v2' - const indexes = 'verify-indexes-v2' // We need to apply migrations if required. const migrations = await this.storage.findAll(ctx, core.class.MigrationState, { plugin: coreId }) - if (migrations.find((it) => it.state === indexes) === undefined) { - ctx.warn('Rebuild DB index', { workspace: this.workspace.name }) - // Clean all existing docs, they will be re-created on verify stage - await this.checkIndexes() - - await this.addMigration(ctx, indexes) - } - - if (migrations.find((it) => it.state === fullReindex) === undefined) { - ctx.warn('rebuilding index to v2', { workspace: this.workspace.name }) - // Clean all existing docs, they will be re-created on verify stage - await this.storage.rawUpdate( - DOMAIN_DOC_INDEX_STATE, - {}, - { needIndex: true, attributes: null, stages: null } - ) - - await this.addMigration(ctx, fullReindex) - } - // Verify class integrity if required const allClasses = this.hierarchy.getDescendants(core.class.Doc) @@ -229,26 +206,55 @@ export class FullTextIndexPipeline implements FullTextPipeline { }) ) + const indexes = 'verify-indexes-v2' + if (migrations.find((it) => it.state === indexes) === undefined) { + ctx.warn('Rebuild DB index', { workspace: this.workspace.name }) + // Clean all existing docs, they will be re-created on verify stage + await this.checkIndexes() + + await this.addMigration(ctx, indexes) + ctx.warn('Rebuild DB index complete', { workspace: this.workspace.name }) + } + + const fullReindex = 'full-text-indexer-v4' + if (migrations.find((it) => it.state === fullReindex) === undefined) { + ctx.warn('rebuilding index to v4', { workspace: this.workspace.name }) + // Clean all existing docs, they will be re-created on verify stage + await this.storage.rawDeleteMany(DOMAIN_DOC_INDEX_STATE, {}) + await this.fulltextAdapter.clean(ctx, this.workspace) + ctx.warn('rebuilding index to v3 complete', { workspace: this.workspace.name }) + + await this.addMigration(ctx, fullReindex) + } + + const docStructure = 'full-text-structure-v4' if (migrations.find((it) => it.state === docStructure) === undefined) { ctx.warn('verify document structure', { version: docStructure, workspace: this.workspace.name }) for (const [domain, classes] of this.byDomain.entries()) { await ctx.with('verify-domain', { domain }, async () => { // Iterate over all domain documents and add appropriate entries - const iterator = await this.storage.traverse( + const allDocs = await this.storage.rawFindAll( domain, { _class: { $in: classes } }, { projection: { _class: 1, _id: 1 } } ) try { + let processed = 0 while (true) { indexing() - const docs = await iterator.next(1000) - if (docs == null || docs.length === 0) { + const docs = allDocs.splice(0, 1000) + if (docs.length === 0) { break } const states = toIdMap( - await this.storage.rawFindAll(DOMAIN_DOC_INDEX_STATE, { _id: { $in: docs.map((it) => it._id) } }) + await this.storage.rawFindAll( + DOMAIN_DOC_INDEX_STATE, + { _id: { $in: docs.map((it) => it._id) } }, + { + projection: { _id: 1 } + } + ) ) // Find missing documents const missingDocs = docs @@ -258,11 +264,11 @@ export class FullTextIndexPipeline implements FullTextPipeline { if (missingDocs.length > 0) { await this.storage.upload(ctx, DOMAIN_DOC_INDEX_STATE, missingDocs) } + processed += docs.length + ctx.info('processed', { processed, allDocs: allDocs.length, domain }) } } catch (err: any) { ctx.error('failed to restore index state', { err }) - } finally { - await iterator.close() } }) }