Fix indexer start (#7157)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-11-12 11:27:37 +07:00 committed by GitHub
parent 94f8b9f846
commit 867990ab83
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 69 additions and 76 deletions

View File

@ -18,6 +18,7 @@
"bundle": "mkdir -p bundle && rushx get-model && esbuild src/index.ts --keep-names --bundle --platform=node --external:*.node --outfile=bundle/bundle.js --log-level=error --sourcemap=external",
"docker:build": "../../common/scripts/docker_build.sh hardcoreeng/fulltext",
"docker:tbuild": "docker build -t hardcoreeng/fulltext . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/fulltext",
"docker:abuild": "docker build -t hardcoreeng/fulltext . --platform=linux/arm64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/fulltext",
"docker:staging": "../../common/scripts/docker_tag.sh hardcoreeng/fulltext staging",
"docker:push": "../../common/scripts/docker_tag.sh hardcoreeng/fulltext",
"format": "format src",

View File

@ -244,6 +244,7 @@ export async function startIndexer (
ctx.error('Workspace not available for token')
return
}
ctx.warn('indexer created', { workspace: workspace.name })
idx = WorkspaceIndexer.create(
ctx,
opt.model,

View File

@ -323,6 +323,8 @@ export interface FullTextAdapter {
index: (ctx: MeasureContext, workspace: WorkspaceId, doc: IndexedDoc) => Promise<TxResult>
update: (ctx: MeasureContext, workspace: WorkspaceId, id: Ref<Doc>, update: Record<string, any>) => Promise<TxResult>
remove: (ctx: MeasureContext, workspace: WorkspaceId, id: Ref<Doc>[]) => Promise<void>
clean: (ctx: MeasureContext, workspace: WorkspaceId) => Promise<void>
updateMany: (ctx: MeasureContext, workspace: WorkspaceId, docs: IndexedDoc[]) => Promise<TxResult[]>
load: (ctx: MeasureContext, workspace: WorkspaceId, docs: Ref<Doc>[]) => Promise<IndexedDoc[]>
searchString: (
@ -347,53 +349,6 @@ export interface FullTextAdapter {
initMapping: (ctx: MeasureContext, field?: { key: string, dims: number }) => Promise<boolean>
}
/**
* @public
*/
export class DummyFullTextAdapter implements FullTextAdapter {
async initMapping (ctx: MeasureContext): Promise<boolean> {
return true
}
async index (ctx: MeasureContext, workspace: WorkspaceId, doc: IndexedDoc): Promise<TxResult> {
return {}
}
async load (ctx: MeasureContext, workspace: WorkspaceId, docs: Ref<Doc>[]): Promise<IndexedDoc[]> {
return []
}
async update (
ctx: MeasureContext,
workspace: WorkspaceId,
id: Ref<Doc>,
update: Record<string, any>
): Promise<TxResult> {
return {}
}
async updateMany (ctx: MeasureContext, workspace: WorkspaceId, docs: IndexedDoc[]): Promise<TxResult[]> {
return []
}
async searchString (
ctx: MeasureContext,
workspace: WorkspaceId,
query: SearchQuery,
options: SearchOptions
): Promise<SearchStringResult> {
return { docs: [] }
}
async search (ctx: MeasureContext, workspace: WorkspaceId, query: any): Promise<IndexedDoc[]> {
return []
}
async remove (ctx: MeasureContext, workspace: WorkspaceId, id: Ref<Doc>[]): Promise<void> {}
async close (): Promise<void> {}
}
/**
* @public
*/

View File

@ -457,6 +457,36 @@ class ElasticAdapter implements FullTextAdapter {
}
}
async clean (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {
try {
await this.client.deleteByQuery(
{
type: '_doc',
index: this.indexName,
body: {
query: {
bool: {
must: [
{
match: {
workspaceId: { query: toWorkspaceString(workspaceId), operator: 'and' }
}
}
]
}
}
}
},
undefined
)
} catch (e: any) {
if (e instanceof esErr.ResponseError && e.meta.statusCode === 404) {
return
}
throw e
}
}
async load (ctx: MeasureContext, workspaceId: WorkspaceId, docs: Ref<Doc>[]): Promise<IndexedDoc[]> {
const resp = await this.client.search({
index: this.indexName,

View File

@ -176,34 +176,11 @@ export class FullTextIndexPipeline implements FullTextPipeline {
@withContext('verify-workspace')
async verifyWorkspace (ctx: MeasureContext, indexing: () => void): Promise<void> {
const fullReindex = 'full-text-indexer-v2'
const docStructure = 'full-text-structure-v2'
const indexes = 'verify-indexes-v2'
// We need to apply migrations if required.
const migrations = await this.storage.findAll<MigrationState>(ctx, core.class.MigrationState, {
plugin: coreId
})
if (migrations.find((it) => it.state === indexes) === undefined) {
ctx.warn('Rebuild DB index', { workspace: this.workspace.name })
// Clean all existing docs, they will be re-created on verify stage
await this.checkIndexes()
await this.addMigration(ctx, indexes)
}
if (migrations.find((it) => it.state === fullReindex) === undefined) {
ctx.warn('rebuilding index to v2', { workspace: this.workspace.name })
// Clean all existing docs, they will be re-created on verify stage
await this.storage.rawUpdate<DocIndexState & { attributes: any, stages: any }>(
DOMAIN_DOC_INDEX_STATE,
{},
{ needIndex: true, attributes: null, stages: null }
)
await this.addMigration(ctx, fullReindex)
}
// Verify class integrity if required
const allClasses = this.hierarchy.getDescendants(core.class.Doc)
@ -229,26 +206,55 @@ export class FullTextIndexPipeline implements FullTextPipeline {
})
)
const indexes = 'verify-indexes-v2'
if (migrations.find((it) => it.state === indexes) === undefined) {
ctx.warn('Rebuild DB index', { workspace: this.workspace.name })
// Clean all existing docs, they will be re-created on verify stage
await this.checkIndexes()
await this.addMigration(ctx, indexes)
ctx.warn('Rebuild DB index complete', { workspace: this.workspace.name })
}
const fullReindex = 'full-text-indexer-v4'
if (migrations.find((it) => it.state === fullReindex) === undefined) {
ctx.warn('rebuilding index to v4', { workspace: this.workspace.name })
// Clean all existing docs, they will be re-created on verify stage
await this.storage.rawDeleteMany<DocIndexState>(DOMAIN_DOC_INDEX_STATE, {})
await this.fulltextAdapter.clean(ctx, this.workspace)
ctx.warn('rebuilding index to v3 complete', { workspace: this.workspace.name })
await this.addMigration(ctx, fullReindex)
}
const docStructure = 'full-text-structure-v4'
if (migrations.find((it) => it.state === docStructure) === undefined) {
ctx.warn('verify document structure', { version: docStructure, workspace: this.workspace.name })
for (const [domain, classes] of this.byDomain.entries()) {
await ctx.with('verify-domain', { domain }, async () => {
// Iterate over all domain documents and add appropriate entries
const iterator = await this.storage.traverse(
const allDocs = await this.storage.rawFindAll(
domain,
{ _class: { $in: classes } },
{ projection: { _class: 1, _id: 1 } }
)
try {
let processed = 0
while (true) {
indexing()
const docs = await iterator.next(1000)
if (docs == null || docs.length === 0) {
const docs = allDocs.splice(0, 1000)
if (docs.length === 0) {
break
}
const states = toIdMap(
await this.storage.rawFindAll(DOMAIN_DOC_INDEX_STATE, { _id: { $in: docs.map((it) => it._id) } })
await this.storage.rawFindAll(
DOMAIN_DOC_INDEX_STATE,
{ _id: { $in: docs.map((it) => it._id) } },
{
projection: { _id: 1 }
}
)
)
// Find missing documents
const missingDocs = docs
@ -258,11 +264,11 @@ export class FullTextIndexPipeline implements FullTextPipeline {
if (missingDocs.length > 0) {
await this.storage.upload(ctx, DOMAIN_DOC_INDEX_STATE, missingDocs)
}
processed += docs.length
ctx.info('processed', { processed, allDocs: allDocs.length, domain })
}
} catch (err: any) {
ctx.error('failed to restore index state', { err })
} finally {
await iterator.close()
}
})
}