UBERF-4526: Elastic bulk error on re-indexing (#4155)

Signed-off-by: Maxim Karmatskikh <mkarmatskih@gmail.com>
This commit is contained in:
Maksim Karmatskikh 2023-12-07 08:25:12 +01:00 committed by GitHub
parent dabf59c379
commit 3df1f1d6df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 70 additions and 6 deletions

View File

@ -37,7 +37,8 @@ import {
getFullTextIndexableAttributes, getFullTextIndexableAttributes,
getFullTextContext, getFullTextContext,
isFullTextAttribute, isFullTextAttribute,
loadIndexStageStage loadIndexStageStage,
getCustomAttrKeys
} from './utils' } from './utils'
/** /**
@ -146,7 +147,15 @@ export class IndexedFieldStage implements FullTextPipelineStage {
} }
} }
const customAttrValues: any = []
for (const [, v] of Object.entries(content)) { for (const [, v] of Object.entries(content)) {
if (v.attr.isCustom === true && v.value !== '' && v.value !== undefined) {
// No need to put localized text as attributes. We do not use it at all
// Just put all content for custom attribute inside one custom field
customAttrValues.push({ label: v.attr.label, value: v.value })
continue
}
// Check for content changes and collect update // Check for content changes and collect update
const dKey = docKey(v.attr.name, { _class: v.attr.attributeOf }) const dKey = docKey(v.attr.name, { _class: v.attr.attributeOf })
const dUKey = docUpdKey(v.attr.name, { _class: v.attr.attributeOf }) const dUKey = docUpdKey(v.attr.name, { _class: v.attr.attributeOf })
@ -159,6 +168,16 @@ export class IndexedFieldStage implements FullTextPipelineStage {
} }
} }
} }
const { customAttrKey, customAttrUKey } = getCustomAttrKeys()
if (
(docState.attributes[customAttrKey] !== undefined || customAttrValues.length > 0) &&
!deepEqual(docState.attributes[customAttrKey], customAttrValues)
) {
changes++
;(docUpdate as any)[customAttrUKey] = customAttrValues
}
if (docState.attachedTo != null && changes > 0) { if (docState.attachedTo != null && changes > 0) {
const ctx = getFullTextContext(pipeline.hierarchy, objClass) const ctx = getFullTextContext(pipeline.hierarchy, objClass)
if (ctx.parentPropagate ?? true) { if (ctx.parentPropagate ?? true) {

View File

@ -40,7 +40,14 @@ import {
FullTextPipelineStage, FullTextPipelineStage,
fullTextPushStageId fullTextPushStageId
} from './types' } from './types'
import { collectPropagate, collectPropagateClasses, docKey, getFullTextContext, IndexKeyOptions } from './utils' import {
collectPropagate,
collectPropagateClasses,
docKey,
getFullTextContext,
IndexKeyOptions,
isCustomAttr
} from './utils'
import { updateDocWithPresenter } from '../mapper' import { updateDocWithPresenter } from '../mapper'
/** /**
@ -283,7 +290,7 @@ function updateDoc2Elastic (
docId = docIdOverride ?? docId docId = docIdOverride ?? docId
if (docId === undefined) { if (docId === undefined) {
if (typeof vv !== 'object') { if (typeof vv !== 'object' || isCustomAttr(k)) {
doc[k] = vv doc[k] = vv
} }
continue continue

View File

@ -33,7 +33,13 @@ import { translate } from '@hcengineering/platform'
import { convert } from 'html-to-text' import { convert } from 'html-to-text'
import { IndexedDoc } from '../types' import { IndexedDoc } from '../types'
import { contentStageId, DocUpdateHandler, fieldStateId, FullTextPipeline, FullTextPipelineStage } from './types' import { contentStageId, DocUpdateHandler, fieldStateId, FullTextPipeline, FullTextPipelineStage } from './types'
import { collectPropagate, collectPropagateClasses, getFullTextContext, loadIndexStageStage } from './utils' import {
collectPropagate,
collectPropagateClasses,
getFullTextContext,
loadIndexStageStage,
isCustomAttr
} from './utils'
/** /**
* @public * @public
@ -240,6 +246,16 @@ export async function extractIndexedValues (
continue continue
} }
if (isCustomAttr(attr)) {
const str = v
.map((pair: { label: string, value: string }) => {
return `${pair.label} is ${pair.value}`
})
.join(' ')
const cl = doc.objectClass
attributes[cl] = { ...attributes[cl], [k]: str }
}
if (_class === undefined) { if (_class === undefined) {
// Skip all helper fields. // Skip all helper fields.
continue continue

View File

@ -102,7 +102,7 @@ export const contentStageId = 'cnt-v2b'
/** /**
* @public * @public
*/ */
export const fieldStateId = 'fld-v9' export const fieldStateId = 'fld-v10'
/** /**
* @public * @public

View File

@ -308,3 +308,20 @@ export function collectPropagateClasses (pipeline: FullTextPipeline, objectClass
return Array.from(propagate.values()) return Array.from(propagate.values())
} }
const CUSTOM_ATTR_KEY = 'customAttributes'
const CUSTOM_ATTR_UPDATE_KEY = 'attributes.customAttributes'
/**
* @public
*/
export function getCustomAttrKeys (): { customAttrKey: string, customAttrUKey: string } {
return { customAttrKey: CUSTOM_ATTR_KEY, customAttrUKey: CUSTOM_ATTR_UPDATE_KEY }
}
/**
* @public
*/
export function isCustomAttr (attr: string): boolean {
return attr === CUSTOM_ATTR_KEY
}

View File

@ -416,7 +416,12 @@ class ElasticAdapter implements FullTextAdapter {
const errorIds = new Set(errors.map((it: any) => it.index._id)) const errorIds = new Set(errors.map((it: any) => it.index._id))
const erroDocs = docs.filter((it) => errorIds.has(it.id)) const erroDocs = docs.filter((it) => errorIds.has(it.id))
// Collect only errors // Collect only errors
const errs = Array.from(errors.map((it: any) => it.index.error.reason as string)).join('\n') const errs = Array.from(
errors.map((it: any) => {
return `${it.index.error.reason}: ${it.index.error.caused_by?.reason}`
})
).join('\n')
console.error(`Failed to process bulk request: ${errs} ${JSON.stringify(erroDocs)}`) console.error(`Failed to process bulk request: ${errs} ${JSON.stringify(erroDocs)}`)
} }
} }