uberf-6708: composite elastic doc key (#5457)

Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com>
This commit is contained in:
Alexey Zinoviev 2024-04-24 19:39:14 +04:00 committed by GitHub
parent 70e6420aa7
commit e1c9523d4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 86 additions and 40 deletions

View File

@ -40,6 +40,7 @@ setMetadata(serverNotification.metadata.PushPrivateKey, config.pushPrivateKey)
setMetadata(serverNotification.metadata.PushSubject, config.pushSubject) setMetadata(serverNotification.metadata.PushSubject, config.pushSubject)
setMetadata(contactPlugin.metadata.LastNameFirst, lastNameFirst) setMetadata(contactPlugin.metadata.LastNameFirst, lastNameFirst)
setMetadata(serverCore.metadata.ElasticIndexName, config.elasticIndexName) setMetadata(serverCore.metadata.ElasticIndexName, config.elasticIndexName)
setMetadata(serverCore.metadata.ElasticIndexVersion, 'v1')
// eslint-disable-next-line @typescript-eslint/no-floating-promises // eslint-disable-next-line @typescript-eslint/no-floating-promises
console.log( console.log(

View File

@ -107,4 +107,4 @@ export const fieldStateId = 'fld-v13b'
/** /**
* @public * @public
*/ */
export const fullTextPushStageId = 'fts-v14' export const fullTextPushStageId = 'fts-v15'

View File

@ -43,7 +43,8 @@ const serverCore = plugin(serverCoreId, {
FrontUrl: '' as Metadata<string>, FrontUrl: '' as Metadata<string>,
UploadURL: '' as Metadata<string>, UploadURL: '' as Metadata<string>,
CursorMaxTimeMS: '' as Metadata<string>, CursorMaxTimeMS: '' as Metadata<string>,
ElasticIndexName: '' as Metadata<string> ElasticIndexName: '' as Metadata<string>,
ElasticIndexVersion: '' as Metadata<string>
} }
}) })

View File

@ -18,6 +18,7 @@ import {
Class, Class,
Doc, Doc,
DocumentQuery, DocumentQuery,
FullTextData,
IndexingConfiguration, IndexingConfiguration,
MeasureContext, MeasureContext,
Ref, Ref,
@ -46,29 +47,54 @@ function getIndexName (): string {
return getMetadata(serverCore.metadata.ElasticIndexName) ?? 'storage_index' return getMetadata(serverCore.metadata.ElasticIndexName) ?? 'storage_index'
} }
function getIndexVersion (): string {
return getMetadata(serverCore.metadata.ElasticIndexVersion) ?? 'v1'
}
class ElasticAdapter implements FullTextAdapter { class ElasticAdapter implements FullTextAdapter {
private readonly workspaceString: string
private readonly getFulltextDocId: (doc: Ref<Doc>) => Ref<FullTextData>
private readonly getDocId: (fulltext: Ref<FullTextData>) => Ref<Doc>
private readonly indexName: string
constructor ( constructor (
private readonly client: Client, private readonly client: Client,
private readonly workspaceId: WorkspaceId, readonly workspaceId: WorkspaceId,
private readonly indexName: string, private readonly indexBaseName: string,
readonly indexVersion: string,
private readonly _metrics: MeasureContext private readonly _metrics: MeasureContext
) {} ) {
this.indexName = `${indexBaseName}_${indexVersion}`
this.workspaceString = toWorkspaceString(workspaceId)
this.getFulltextDocId = (doc) => `${doc}@${this.workspaceString}` as Ref<FullTextData>
this.getDocId = (fulltext) => fulltext.slice(0, -1 * (this.workspaceString.length + 1)) as Ref<Doc>
}
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {} async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {}
async initMapping (field?: { key: string, dims: number }): Promise<Record<string, number>> { async initMapping (field?: { key: string, dims: number }): Promise<Record<string, number>> {
// const current = await this.client.indices.getMapping({}) // const current = await this.client.indices.getMapping({})
// console.log('Mappings', current) // console.log('Mappings', current)
// const mappings = current.body[toWorkspaceString(this.workspaceId)] // const mappings = current.body[this.workspaceString]
const indexName = this.indexName const indexName = this.indexName
const result: Record<string, number> = {} const result: Record<string, number> = {}
try { try {
const existingBaseIndices = await this.client.indices.get({
index: [this.indexBaseName, `${this.indexBaseName}_*`]
})
const existingOldVersionIndices = Object.keys(existingBaseIndices.body).filter((name) => name !== indexName)
if (existingOldVersionIndices.length > 0) {
await this.client.indices.delete({
index: existingOldVersionIndices
})
}
const existsOldIndex = await this.client.indices.exists({ const existsOldIndex = await this.client.indices.exists({
index: toWorkspaceString(this.workspaceId) index: this.workspaceString
}) })
if (existsOldIndex.body) { if (existsOldIndex.body) {
await this.client.indices.delete({ await this.client.indices.delete({
index: toWorkspaceString(this.workspaceId) index: this.workspaceString
}) })
} }
const existsIndex = await this.client.indices.exists({ const existsIndex = await this.client.indices.exists({
@ -200,7 +226,7 @@ class ElasticAdapter implements FullTextAdapter {
}, },
{ {
match: { match: {
workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } workspaceId: { query: this.workspaceString, operator: 'and' }
} }
} }
] ]
@ -305,7 +331,7 @@ class ElasticAdapter implements FullTextAdapter {
}, },
{ {
match: { match: {
workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } workspaceId: { query: this.workspaceString, operator: 'and' }
} }
} }
], ],
@ -413,7 +439,7 @@ class ElasticAdapter implements FullTextAdapter {
], ],
must: { must: {
match: { match: {
workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } workspaceId: { query: this.workspaceString, operator: 'and' }
} }
}, },
filter: [ filter: [
@ -450,20 +476,21 @@ class ElasticAdapter implements FullTextAdapter {
async index (doc: IndexedDoc): Promise<TxResult> { async index (doc: IndexedDoc): Promise<TxResult> {
const wsDoc = { const wsDoc = {
workspaceId: toWorkspaceString(this.workspaceId), workspaceId: this.workspaceString,
...doc ...doc
} }
const fulltextId = this.getFulltextDocId(doc.id)
if (doc.data === undefined) { if (doc.data === undefined) {
await this.client.index({ await this.client.index({
index: this.indexName, index: this.indexName,
id: doc.id, id: fulltextId,
type: '_doc', type: '_doc',
body: wsDoc body: wsDoc
}) })
} else { } else {
await this.client.index({ await this.client.index({
index: this.indexName, index: this.indexName,
id: doc.id, id: fulltextId,
type: '_doc', type: '_doc',
pipeline: 'attachment', pipeline: 'attachment',
body: wsDoc body: wsDoc
@ -475,7 +502,7 @@ class ElasticAdapter implements FullTextAdapter {
async update (id: Ref<Doc>, update: Record<string, any>): Promise<TxResult> { async update (id: Ref<Doc>, update: Record<string, any>): Promise<TxResult> {
await this.client.update({ await this.client.update({
index: this.indexName, index: this.indexName,
id, id: this.getFulltextDocId(id),
body: { body: {
doc: update doc: update
} }
@ -490,8 +517,8 @@ class ElasticAdapter implements FullTextAdapter {
const part = parts.splice(0, 1000) const part = parts.splice(0, 1000)
const operations = part.flatMap((doc) => { const operations = part.flatMap((doc) => {
const wsDoc = { workspaceId: toWorkspaceString(this.workspaceId), ...doc } const wsDoc = { workspaceId: this.workspaceString, ...doc }
return [{ index: { _index: this.indexName, _id: doc.id } }, { ...wsDoc, type: '_doc' }] return [{ index: { _index: this.indexName, _id: this.getFulltextDocId(doc.id) } }, { ...wsDoc, type: '_doc' }]
}) })
const response = await this.client.bulk({ refresh: true, body: operations }) const response = await this.client.bulk({ refresh: true, body: operations })
@ -526,13 +553,13 @@ class ElasticAdapter implements FullTextAdapter {
must: [ must: [
{ {
terms: { terms: {
_id: part, _id: part.map(this.getFulltextDocId),
boost: 1.0 boost: 1.0
} }
}, },
{ {
match: { match: {
workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } workspaceId: { query: this.workspaceString, operator: 'and' }
} }
} }
] ]
@ -562,13 +589,13 @@ class ElasticAdapter implements FullTextAdapter {
must: [ must: [
{ {
terms: { terms: {
_id: docs, _id: docs.map(this.getFulltextDocId),
boost: 1.0 boost: 1.0
} }
}, },
{ {
match: { match: {
workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } workspaceId: { query: this.workspaceString, operator: 'and' }
} }
} }
] ]
@ -577,7 +604,7 @@ class ElasticAdapter implements FullTextAdapter {
size: docs.length size: docs.length
} }
}) })
return Array.from(resp.body.hits.hits.map((hit: any) => ({ ...hit._source, id: hit._id }))) return Array.from(resp.body.hits.hits.map((hit: any) => ({ ...hit._source, id: this.getDocId(hit._id) })))
} }
} }
@ -592,7 +619,8 @@ export async function createElasticAdapter (
const client = new Client({ const client = new Client({
node: url node: url
}) })
const indexName = getIndexName() const indexBaseName = getIndexName()
const indexVersion = getIndexVersion()
return new ElasticAdapter(client, workspaceId, indexName, metrics) return new ElasticAdapter(client, workspaceId, indexBaseName, indexVersion, metrics)
} }

View File

@ -43,12 +43,27 @@ function getIndexName (): string {
return getMetadata(serverCore.metadata.ElasticIndexName) ?? 'storage_index' return getMetadata(serverCore.metadata.ElasticIndexName) ?? 'storage_index'
} }
function getIndexVersion (): string {
return getMetadata(serverCore.metadata.ElasticIndexVersion) ?? 'v1'
}
class ElasticDataAdapter implements DbAdapter { class ElasticDataAdapter implements DbAdapter {
private readonly workspaceString: string
private readonly getFulltextDocId: (doc: Ref<Doc>) => Ref<FullTextData>
private readonly getDocId: (fulltext: Ref<FullTextData>) => Ref<Doc>
private readonly indexName: string
constructor ( constructor (
readonly workspaceId: WorkspaceId, readonly workspaceId: WorkspaceId,
readonly client: Client, private readonly client: Client,
readonly indexName: string readonly indexBaseName: string,
) {} readonly indexVersion: string
) {
this.indexName = `${indexBaseName}_${indexVersion}`
this.workspaceString = toWorkspaceString(workspaceId)
this.getFulltextDocId = (doc) => `${doc}@${this.workspaceString}` as Ref<FullTextData>
this.getDocId = (fulltext) => fulltext.slice(0, -1 * (this.workspaceString.length + 1)) as Ref<Doc>
}
async findAll<T extends Doc>( async findAll<T extends Doc>(
ctx: MeasureContext, ctx: MeasureContext,
@ -94,7 +109,7 @@ class ElasticDataAdapter implements DbAdapter {
bool: { bool: {
must: { must: {
match: { match: {
workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } workspaceId: { query: this.workspaceString, operator: 'and' }
} }
} }
} }
@ -142,7 +157,7 @@ class ElasticDataAdapter implements DbAdapter {
hash.update(json) hash.update(json)
const digest = hash.digest('base64') const digest = hash.digest('base64')
const result = { const result = {
id: item._id, id: this.getDocId(item._id as Ref<FullTextData>),
hash: digest, hash: digest,
size: json.length size: json.length
} }
@ -183,13 +198,13 @@ class ElasticDataAdapter implements DbAdapter {
must: [ must: [
{ {
terms: { terms: {
_id: part, _id: part.map(this.getFulltextDocId),
boost: 1.0 boost: 1.0
} }
}, },
{ {
match: { match: {
workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } workspaceId: { query: this.workspaceString, operator: 'and' }
} }
} }
] ]
@ -202,7 +217,7 @@ class ElasticDataAdapter implements DbAdapter {
for (const item of buffer) { for (const item of buffer) {
const dta: FullTextData = { const dta: FullTextData = {
_id: item._id as Ref<FullTextData>, _id: this.getDocId(item._id) as Ref<FullTextData>, // Export without workspace portion of ID
_class: core.class.FulltextData, _class: core.class.FulltextData,
space: 'fulltext-blob' as Ref<Space>, space: 'fulltext-blob' as Ref<Space>,
modifiedOn: item.data.modifiedOn, modifiedOn: item.data.modifiedOn,
@ -229,13 +244,13 @@ class ElasticDataAdapter implements DbAdapter {
must: [ must: [
{ {
terms: { terms: {
_id: Array.from(part.map((it) => it._id)), _id: part.map((it) => this.getFulltextDocId(it._id)),
boost: 1.0 boost: 1.0
} }
}, },
{ {
match: { match: {
workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } workspaceId: { query: this.workspaceString, operator: 'and' }
} }
} }
] ]
@ -251,10 +266,10 @@ class ElasticDataAdapter implements DbAdapter {
} }
const operations = part.flatMap((doc) => [ const operations = part.flatMap((doc) => [
{ index: { _index: this.indexName, _id: doc._id } }, { index: { _index: this.indexName, _id: this.getFulltextDocId(doc._id) } },
{ {
...(doc as FullTextData).data, ...(doc as FullTextData).data,
workspaceId: toWorkspaceString(this.workspaceId) workspaceId: this.workspaceString
} }
]) ])
@ -279,13 +294,13 @@ class ElasticDataAdapter implements DbAdapter {
must: [ must: [
{ {
terms: { terms: {
_id: part, _id: part.map(this.getFulltextDocId),
boost: 1.0 boost: 1.0
} }
}, },
{ {
match: { match: {
workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' } workspaceId: { query: this.workspaceString, operator: 'and' }
} }
} }
] ]
@ -312,6 +327,7 @@ export async function createElasticBackupDataAdapter (
const client = new Client({ const client = new Client({
node: url node: url
}) })
const indexName = getIndexName() const indexBaseName = getIndexName()
return new ElasticDataAdapter(workspaceId, client, indexName) const indexVersion = getIndexVersion()
return new ElasticDataAdapter(workspaceId, client, indexBaseName, indexVersion)
} }