From e1c9523d4dcb894378ce127ee3f763dd56efd3e8 Mon Sep 17 00:00:00 2001 From: Alexey Zinoviev Date: Wed, 24 Apr 2024 19:39:14 +0400 Subject: [PATCH] uberf-6708: composite elastic doc key (#5457) Signed-off-by: Alexey Zinoviev --- pods/server/src/__start.ts | 1 + server/core/src/indexer/types.ts | 2 +- server/core/src/plugin.ts | 3 +- server/elastic/src/adapter.ts | 72 ++++++++++++++++++++++---------- server/elastic/src/backup.ts | 48 ++++++++++++++------- 5 files changed, 86 insertions(+), 40 deletions(-) diff --git a/pods/server/src/__start.ts b/pods/server/src/__start.ts index 295cd2f2e2..b6c4ff2685 100644 --- a/pods/server/src/__start.ts +++ b/pods/server/src/__start.ts @@ -40,6 +40,7 @@ setMetadata(serverNotification.metadata.PushPrivateKey, config.pushPrivateKey) setMetadata(serverNotification.metadata.PushSubject, config.pushSubject) setMetadata(contactPlugin.metadata.LastNameFirst, lastNameFirst) setMetadata(serverCore.metadata.ElasticIndexName, config.elasticIndexName) +setMetadata(serverCore.metadata.ElasticIndexVersion, 'v1') // eslint-disable-next-line @typescript-eslint/no-floating-promises console.log( diff --git a/server/core/src/indexer/types.ts b/server/core/src/indexer/types.ts index 16e2a993af..80394a9f5d 100644 --- a/server/core/src/indexer/types.ts +++ b/server/core/src/indexer/types.ts @@ -107,4 +107,4 @@ export const fieldStateId = 'fld-v13b' /** * @public */ -export const fullTextPushStageId = 'fts-v14' +export const fullTextPushStageId = 'fts-v15' diff --git a/server/core/src/plugin.ts b/server/core/src/plugin.ts index f1ed4c3da1..64d60c4868 100644 --- a/server/core/src/plugin.ts +++ b/server/core/src/plugin.ts @@ -43,7 +43,8 @@ const serverCore = plugin(serverCoreId, { FrontUrl: '' as Metadata, UploadURL: '' as Metadata, CursorMaxTimeMS: '' as Metadata, - ElasticIndexName: '' as Metadata + ElasticIndexName: '' as Metadata, + ElasticIndexVersion: '' as Metadata } }) diff --git a/server/elastic/src/adapter.ts b/server/elastic/src/adapter.ts index 663e1279e7..9cdccbb235 100644 --- a/server/elastic/src/adapter.ts +++ b/server/elastic/src/adapter.ts @@ -18,6 +18,7 @@ import { Class, Doc, DocumentQuery, + FullTextData, IndexingConfiguration, MeasureContext, Ref, @@ -46,29 +47,54 @@ function getIndexName (): string { return getMetadata(serverCore.metadata.ElasticIndexName) ?? 'storage_index' } +function getIndexVersion (): string { + return getMetadata(serverCore.metadata.ElasticIndexVersion) ?? 'v1' +} + class ElasticAdapter implements FullTextAdapter { + private readonly workspaceString: string + private readonly getFulltextDocId: (doc: Ref) => Ref + private readonly getDocId: (fulltext: Ref) => Ref + private readonly indexName: string + constructor ( private readonly client: Client, - private readonly workspaceId: WorkspaceId, - private readonly indexName: string, + readonly workspaceId: WorkspaceId, + private readonly indexBaseName: string, + readonly indexVersion: string, private readonly _metrics: MeasureContext - ) {} + ) { + this.indexName = `${indexBaseName}_${indexVersion}` + this.workspaceString = toWorkspaceString(workspaceId) + this.getFulltextDocId = (doc) => `${doc}@${this.workspaceString}` as Ref + this.getDocId = (fulltext) => fulltext.slice(0, -1 * (this.workspaceString.length + 1)) as Ref + } async createIndexes (domain: Domain, config: Pick, 'indexes'>): Promise {} async initMapping (field?: { key: string, dims: number }): Promise> { // const current = await this.client.indices.getMapping({}) // console.log('Mappings', current) - // const mappings = current.body[toWorkspaceString(this.workspaceId)] + // const mappings = current.body[this.workspaceString] const indexName = this.indexName const result: Record = {} try { + const existingBaseIndices = await this.client.indices.get({ + index: [this.indexBaseName, `${this.indexBaseName}_*`] + }) + const existingOldVersionIndices = Object.keys(existingBaseIndices.body).filter((name) => name !== indexName) + if (existingOldVersionIndices.length > 0) { + await this.client.indices.delete({ + index: existingOldVersionIndices + }) + } + const existsOldIndex = await this.client.indices.exists({ - index: toWorkspaceString(this.workspaceId) + index: this.workspaceString }) if (existsOldIndex.body) { await this.client.indices.delete({ - index: toWorkspaceString(this.workspaceId) + index: this.workspaceString }) } const existsIndex = await this.client.indices.exists({ @@ -200,7 +226,7 @@ class ElasticAdapter implements FullTextAdapter { }, { match: { - workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } + workspaceId: { query: this.workspaceString, operator: 'and' } } } ] @@ -305,7 +331,7 @@ class ElasticAdapter implements FullTextAdapter { }, { match: { - workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } + workspaceId: { query: this.workspaceString, operator: 'and' } } } ], @@ -413,7 +439,7 @@ class ElasticAdapter implements FullTextAdapter { ], must: { match: { - workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } + workspaceId: { query: this.workspaceString, operator: 'and' } } }, filter: [ @@ -450,20 +476,21 @@ class ElasticAdapter implements FullTextAdapter { async index (doc: IndexedDoc): Promise { const wsDoc = { - workspaceId: toWorkspaceString(this.workspaceId), + workspaceId: this.workspaceString, ...doc } + const fulltextId = this.getFulltextDocId(doc.id) if (doc.data === undefined) { await this.client.index({ index: this.indexName, - id: doc.id, + id: fulltextId, type: '_doc', body: wsDoc }) } else { await this.client.index({ index: this.indexName, - id: doc.id, + id: fulltextId, type: '_doc', pipeline: 'attachment', body: wsDoc @@ -475,7 +502,7 @@ class ElasticAdapter implements FullTextAdapter { async update (id: Ref, update: Record): Promise { await this.client.update({ index: this.indexName, - id, + id: this.getFulltextDocId(id), body: { doc: update } @@ -490,8 +517,8 @@ class ElasticAdapter implements FullTextAdapter { const part = parts.splice(0, 1000) const operations = part.flatMap((doc) => { - const wsDoc = { workspaceId: toWorkspaceString(this.workspaceId), ...doc } - return [{ index: { _index: this.indexName, _id: doc.id } }, { ...wsDoc, type: '_doc' }] + const wsDoc = { workspaceId: this.workspaceString, ...doc } + return [{ index: { _index: this.indexName, _id: this.getFulltextDocId(doc.id) } }, { ...wsDoc, type: '_doc' }] }) const response = await this.client.bulk({ refresh: true, body: operations }) @@ -526,13 +553,13 @@ class ElasticAdapter implements FullTextAdapter { must: [ { terms: { - _id: part, + _id: part.map(this.getFulltextDocId), boost: 1.0 } }, { match: { - workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } + workspaceId: { query: this.workspaceString, operator: 'and' } } } ] @@ -562,13 +589,13 @@ class ElasticAdapter implements FullTextAdapter { must: [ { terms: { - _id: docs, + _id: docs.map(this.getFulltextDocId), boost: 1.0 } }, { match: { - workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } + workspaceId: { query: this.workspaceString, operator: 'and' } } } ] @@ -577,7 +604,7 @@ class ElasticAdapter implements FullTextAdapter { size: docs.length } }) - return Array.from(resp.body.hits.hits.map((hit: any) => ({ ...hit._source, id: hit._id }))) + return Array.from(resp.body.hits.hits.map((hit: any) => ({ ...hit._source, id: this.getDocId(hit._id) }))) } } @@ -592,7 +619,8 @@ export async function createElasticAdapter ( const client = new Client({ node: url }) - const indexName = getIndexName() + const indexBaseName = getIndexName() + const indexVersion = getIndexVersion() - return new ElasticAdapter(client, workspaceId, indexName, metrics) + return new ElasticAdapter(client, workspaceId, indexBaseName, indexVersion, metrics) } diff --git a/server/elastic/src/backup.ts b/server/elastic/src/backup.ts index 259991f7f1..04643663c2 100644 --- a/server/elastic/src/backup.ts +++ b/server/elastic/src/backup.ts @@ -43,12 +43,27 @@ function getIndexName (): string { return getMetadata(serverCore.metadata.ElasticIndexName) ?? 'storage_index' } +function getIndexVersion (): string { + return getMetadata(serverCore.metadata.ElasticIndexVersion) ?? 'v1' +} + class ElasticDataAdapter implements DbAdapter { + private readonly workspaceString: string + private readonly getFulltextDocId: (doc: Ref) => Ref + private readonly getDocId: (fulltext: Ref) => Ref + private readonly indexName: string + constructor ( readonly workspaceId: WorkspaceId, - readonly client: Client, - readonly indexName: string - ) {} + private readonly client: Client, + readonly indexBaseName: string, + readonly indexVersion: string + ) { + this.indexName = `${indexBaseName}_${indexVersion}` + this.workspaceString = toWorkspaceString(workspaceId) + this.getFulltextDocId = (doc) => `${doc}@${this.workspaceString}` as Ref + this.getDocId = (fulltext) => fulltext.slice(0, -1 * (this.workspaceString.length + 1)) as Ref + } async findAll( ctx: MeasureContext, @@ -94,7 +109,7 @@ class ElasticDataAdapter implements DbAdapter { bool: { must: { match: { - workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } + workspaceId: { query: this.workspaceString, operator: 'and' } } } } @@ -142,7 +157,7 @@ class ElasticDataAdapter implements DbAdapter { hash.update(json) const digest = hash.digest('base64') const result = { - id: item._id, + id: this.getDocId(item._id as Ref), hash: digest, size: json.length } @@ -183,13 +198,13 @@ class ElasticDataAdapter implements DbAdapter { must: [ { terms: { - _id: part, + _id: part.map(this.getFulltextDocId), boost: 1.0 } }, { match: { - workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } + workspaceId: { query: this.workspaceString, operator: 'and' } } } ] @@ -202,7 +217,7 @@ class ElasticDataAdapter implements DbAdapter { for (const item of buffer) { const dta: FullTextData = { - _id: item._id as Ref, + _id: this.getDocId(item._id) as Ref, // Export without workspace portion of ID _class: core.class.FulltextData, space: 'fulltext-blob' as Ref, modifiedOn: item.data.modifiedOn, @@ -229,13 +244,13 @@ class ElasticDataAdapter implements DbAdapter { must: [ { terms: { - _id: Array.from(part.map((it) => it._id)), + _id: part.map((it) => this.getFulltextDocId(it._id)), boost: 1.0 } }, { match: { - workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } + workspaceId: { query: this.workspaceString, operator: 'and' } } } ] @@ -251,10 +266,10 @@ class ElasticDataAdapter implements DbAdapter { } const operations = part.flatMap((doc) => [ - { index: { _index: this.indexName, _id: doc._id } }, + { index: { _index: this.indexName, _id: this.getFulltextDocId(doc._id) } }, { ...(doc as FullTextData).data, - workspaceId: toWorkspaceString(this.workspaceId) + workspaceId: this.workspaceString } ]) @@ -279,13 +294,13 @@ class ElasticDataAdapter implements DbAdapter { must: [ { terms: { - _id: part, + _id: part.map(this.getFulltextDocId), boost: 1.0 } }, { match: { - workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } + workspaceId: { query: this.workspaceString, operator: 'and' } } } ] @@ -312,6 +327,7 @@ export async function createElasticBackupDataAdapter ( const client = new Client({ node: url }) - const indexName = getIndexName() - return new ElasticDataAdapter(workspaceId, client, indexName) + const indexBaseName = getIndexName() + const indexVersion = getIndexVersion() + return new ElasticDataAdapter(workspaceId, client, indexBaseName, indexVersion) }