Fix Indexer use of $ne (#6264)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-08-06 21:36:03 +07:00 committed by GitHub
parent 1f1b8f51d6
commit c42923c2b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 485 additions and 599 deletions

4
.vscode/launch.json vendored
View File

@ -60,6 +60,7 @@
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
"runtimeVersion": "20",
"showAsyncStacks": true,
"outputCapture": "std",
"sourceMaps": true,
"cwd": "${workspaceRoot}/pods/server",
"protocol": "inspector"
@ -75,6 +76,7 @@
"TRANSACTOR_URL": "ws://localhost:3333",
"ACCOUNT_PORT": "3000",
"FRONT_URL": "http://localhost:8080",
"outputCapture": "std",
"SES_URL": "http://localhost:8091",
"MINIO_ACCESS_KEY": "minioadmin",
"MINIO_SECRET_KEY": "minioadmin",
@ -186,7 +188,7 @@
"name": "Debug tool upgrade",
"type": "node",
"request": "launch",
"args": ["src/__start.ts", "upgrade"],
"args": ["src/__start.ts", "upgrade-workspace", "platform"],
"env": {
"SERVER_SECRET": "secret",
"MINIO_ACCESS_KEY": "minioadmin",

View File

@ -1242,7 +1242,7 @@ async function updateId (
stages: {},
removed: false
})
await txop.update(docIndexState, { removed: true })
await txop.update(docIndexState, { removed: true, needIndex: true })
}
if (domain !== DOMAIN_MODEL) {

View File

@ -38,7 +38,6 @@ import {
type EnumOf,
type FieldIndexConfig,
type FullTextSearchContext,
type IndexStageState,
type IndexingConfiguration,
type Interface,
type MigrationState,
@ -329,27 +328,24 @@ export class TDocIndexState extends TDoc implements DocIndexState {
attributes!: Record<string, any>
@Prop(TypeBoolean(), getEmbeddedLabel('Removed'))
// @Index(IndexKind.Indexed)
@Hidden()
removed!: boolean
@Prop(TypeBoolean(), getEmbeddedLabel('NeedIndexing'))
@Hidden()
needIndex!: boolean
// States for different stages
@Prop(TypeRecord(), getEmbeddedLabel('Stages'))
// @Index(IndexKind.Indexed)
@Hidden()
stages!: Record<string, boolean | string>
stages!: Record<string, boolean>
@Prop(TypeString(), getEmbeddedLabel('Generation'))
@Hidden()
generationId?: string
}
@Model(core.class.IndexStageState, core.class.Doc, DOMAIN_DOC_INDEX_STATE)
export class TIndexStageState extends TDoc implements IndexStageState {
stageId!: string
attributes!: Record<string, any>
}
@MMixin(core.mixin.FullTextSearchContext, core.class.Class)
export class TFullTextSearchContext extends TClass implements FullTextSearchContext {}

View File

@ -49,7 +49,6 @@ import {
TEnumOf,
TFullTextSearchContext,
TIndexConfiguration,
TIndexStageState,
TInterface,
TMigrationState,
TMixin,
@ -94,9 +93,9 @@ import { TUserStatus } from './transient'
import {
TTx,
TTxApplyIf,
TTxCUD,
TTxCollectionCUD,
TTxCreateDoc,
TTxCUD,
TTxMixin,
TTxRemoveDoc,
TTxUpdateDoc,
@ -164,7 +163,6 @@ export function createModel (builder: Builder): void {
TTypeAny,
TTypeRelatedDocument,
TDocIndexState,
TIndexStageState,
TFullTextSearchContext,
TConfiguration,
TConfigurationElement,
@ -284,8 +282,9 @@ export function createModel (builder: Builder): void {
builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, {
domain: DOMAIN_DOC_INDEX_STATE,
indexes: [
{ keys: { removed: 1 }, filter: { removed: true } },
{ keys: { _class: 1 }, filter: { _class: core.class.DocIndexState } }
{
keys: { needIndex: 1 }
}
],
disabled: [
{ attachedToClass: 1 },
@ -297,8 +296,7 @@ export function createModel (builder: Builder): void {
{ createdBy: 1 },
{ createdBy: -1 },
{ createdOn: -1 }
],
skip: ['stages.']
]
})
builder.mixin(core.class.Space, core.class.Class, core.mixin.FullTextSearchContext, {

View File

@ -155,7 +155,8 @@ export const coreOperation: MigrateOperation = {
{ objectClass: { $nin: allIndexed } },
{
$set: {
removed: true
removed: true,
needIndex: true
}
}
)
@ -182,6 +183,12 @@ export const coreOperation: MigrateOperation = {
{
state: 'old-statuses-transactions',
func: migrateStatusTransactions
},
{
state: 'add-need-index',
func: async (client: MigrationClient) => {
await client.update(DOMAIN_DOC_INDEX_STATE, {}, { $set: { needIndex: true } })
}
}
])
},

View File

@ -510,7 +510,9 @@ export interface DocIndexState extends Doc {
generationId?: string
// States for stages
stages: Record<string, boolean | string>
stages: Record<string, boolean>
needIndex: boolean
removed: boolean
@ -522,14 +524,6 @@ export interface DocIndexState extends Doc {
shortSummary?: string | null
}
/**
* @public
*/
export interface IndexStageState extends Doc {
stageId: string
attributes: Record<string, any>
}
/**
* @public
*

View File

@ -32,7 +32,6 @@ import type {
EnumOf,
FullTextSearchContext,
Hyperlink,
IndexStageState,
IndexingConfiguration,
Interface,
Markup,
@ -135,7 +134,6 @@ export default plugin(coreId, {
UserStatus: '' as Ref<Class<UserStatus>>,
TypeRelatedDocument: '' as Ref<Class<Type<RelatedDocument>>>,
DocIndexState: '' as Ref<Class<DocIndexState>>,
IndexStageState: '' as Ref<Class<IndexStageState>>,
DomainIndexConfiguration: '' as Ref<Class<DomainIndexConfiguration>>,
Configuration: '' as Ref<Class<Configuration>>,

View File

@ -116,6 +116,20 @@ export class MeasureMetricsContext implements MeasureContext {
}
}
withSync<T>(
name: string,
params: ParamsType,
op: (ctx: MeasureContext) => T,
fullParams?: ParamsType | (() => FullParamsType)
): T {
const c = this.newChild(name, params, fullParams, this.logger)
try {
return op(c)
} finally {
c.end()
}
}
async withLog<T>(
name: string,
params: ParamsType,

View File

@ -63,6 +63,13 @@ export interface MeasureContext {
fullParams?: FullParamsType | (() => FullParamsType)
) => Promise<T>
withSync: <T>(
name: string,
params: ParamsType,
op: (ctx: MeasureContext) => T,
fullParams?: FullParamsType | (() => FullParamsType)
) => T
withLog: <T>(
name: string,
params: ParamsType,

View File

@ -14,7 +14,7 @@
"_phase:bundle": "rushx bundle",
"_phase:docker-build": "rushx docker:build",
"_phase:docker-staging": "rushx docker:staging",
"bundle": "mkdir -p bundle && esbuild src/__start.ts --bundle --define:process.env.MODEL_VERSION=$(node ../../common/scripts/show_version.js) --minify --platform=node > bundle/bundle.js",
"bundle": "mkdir -p bundle && esbuild src/__start.ts --sourcemap=inline --external:*.node --external:snappy --bundle --define:process.env.MODEL_VERSION=$(node ../../common/scripts/show_version.js) --minify --platform=node > bundle/bundle.js",
"docker:build": "../../common/scripts/docker_build.sh hardcoreeng/account",
"docker:tbuild": "docker build -t hardcoreeng/account . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/account",
"docker:abuild": "docker build -t hardcoreeng/account . --platform=linux/arm64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/account",

View File

@ -14,7 +14,7 @@
"_phase:bundle": "rushx bundle",
"_phase:docker-build": "rushx docker:build",
"_phase:docker-staging": "rushx docker:staging",
"bundle": "mkdir -p bundle && esbuild src/index.ts --bundle --sourcemap=inline --minify --platform=node > bundle/bundle.js",
"bundle": "mkdir -p bundle && esbuild src/index.ts --bundle --sourcemap=inline --platform=node --external:*.node --external:bufferutil --external:snappy > bundle/bundle.js",
"docker:build": "../../common/scripts/docker_build.sh hardcoreeng/backup",
"docker:staging": "../../common/scripts/docker_tag.sh hardcoreeng/backup staging",
"docker:push": "../../common/scripts/docker_tag.sh hardcoreeng/backup",

View File

@ -13,7 +13,7 @@
"_phase:bundle": "rushx bundle",
"_phase:docker-build": "rushx docker:build",
"_phase:docker-staging": "rushx docker:staging",
"bundle": "mkdir -p bundle && esbuild src/__start.ts --bundle --platform=node --keep-names > bundle/bundle.js",
"bundle": "mkdir -p bundle && esbuild src/__start.ts --bundle --platform=node --sourcemap=inline --keep-names --external:*.node --external:bufferutil --external:snappy > bundle/bundle.js",
"docker:build": "../../common/scripts/docker_build.sh hardcoreeng/collaborator",
"docker:tbuild": "docker build -t hardcoreeng/collaborator . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/collaborator",
"docker:abuild": "docker build -t hardcoreeng/collaborator . --platform=linux/arm64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/collaborator",

View File

@ -15,7 +15,7 @@
"_phase:bundle": "rushx bundle",
"_phase:docker-build": "rushx docker:build",
"_phase:docker-staging": "rushx docker:staging",
"bundle": "mkdir -p bundle && esbuild src/__start.ts --minify --bundle --keep-names --platform=node --external:*.node --external:bufferutil --external:utf-8-validate --define:process.env.MODEL_VERSION=$(node ../../common/scripts/show_version.js) --define:process.env.GIT_REVISION=$(../../common/scripts/git_version.sh) --outfile=bundle/bundle.js --log-level=error --sourcemap=external",
"bundle": "mkdir -p bundle && esbuild src/__start.ts --sourcemap=inline --bundle --keep-names --platform=node --external:*.node --external:bufferutil --external:snappy --external:utf-8-validate --define:process.env.MODEL_VERSION=$(node ../../common/scripts/show_version.js) --define:process.env.GIT_REVISION=$(../../common/scripts/git_version.sh) --outfile=bundle/bundle.js --log-level=error --sourcemap=external",
"docker:build": "../../common/scripts/docker_build.sh hardcoreeng/transactor",
"docker:tbuild": "docker build -t hardcoreeng/transactor . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/transactor",
"docker:abuild": "docker build -t hardcoreeng/transactor . --platform=linux/arm64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/transactor",

View File

@ -35,6 +35,7 @@ import {
FullTextPipelineStage,
IndexedDoc,
StorageAdapter,
collabStageId,
contentStageId,
docKey,
docUpdKey,
@ -46,7 +47,7 @@ import {
*/
export class CollaborativeContentRetrievalStage implements FullTextPipelineStage {
require = []
stageId = contentStageId
stageId = collabStageId
extra = ['content', 'base64']
@ -59,8 +60,6 @@ export class CollaborativeContentRetrievalStage implements FullTextPipelineStage
textLimit = 100 * 1024
stageValue: boolean | string = true
constructor (
readonly storageAdapter: StorageAdapter | undefined,
readonly workspace: WorkspaceId,

View File

@ -20,7 +20,6 @@ import core, {
DocumentQuery,
DocumentUpdate,
docUpdKey,
IndexStageState,
MeasureContext,
Ref,
WorkspaceId
@ -37,7 +36,6 @@ import {
FullTextPipelineStage,
IndexedDoc,
isIndexingRequired,
loadIndexStageStage,
RateLimiter
} from '@hcengineering/server-core'
import got from 'got'
@ -92,12 +90,7 @@ export class OpenAIEmbeddingsStage implements FullTextPipelineStage {
rate = 5
stageValue: boolean | string = true
limiter = new RateLimiter(this.rate)
indexState?: IndexStageState
async update (doc: DocIndexState, update: DocumentUpdate<DocIndexState>): Promise<void> {}
constructor (
@ -156,22 +149,6 @@ export class OpenAIEmbeddingsStage implements FullTextPipelineStage {
console.error(err)
this.enabled = false
}
;[this.stageValue, this.indexState] = await loadIndexStageStage(
ctx,
storage,
this.indexState,
this.stageId,
'config',
{
enabled: this.enabled,
endpoint: this.endpoint,
field: this.field,
mode: this.model,
copyToState: this.copyToState,
stripNewLines: true
}
)
}
async getEmbedding (text: string): Promise<OpenAIEmbeddingResponse> {
@ -299,7 +276,7 @@ export class OpenAIEmbeddingsStage implements FullTextPipelineStage {
// No need to index this class, mark embeddings as empty ones.
if (!needIndex) {
await pipeline.update(doc._id, this.stageValue, {})
await pipeline.update(doc._id, true, {})
return
}
@ -359,13 +336,13 @@ export class OpenAIEmbeddingsStage implements FullTextPipelineStage {
console.error(err)
}
await pipeline.update(doc._id, this.stageValue, update)
await pipeline.update(doc._id, true, update)
}
async remove (docs: DocIndexState[], pipeline: FullTextPipeline): Promise<void> {
// will be handled by field processor
for (const doc of docs) {
await pipeline.update(doc._id, this.stageValue, {})
await pipeline.update(doc._id, true, {})
}
}
}

View File

@ -112,6 +112,7 @@ export interface DbAdapter {
query: DocumentQuery<T>,
options?: ServerFindOptions<T>
) => Promise<FindResult<T>>
tx: (ctx: MeasureContext, ...tx: Tx[]) => Promise<TxResult[]>
find: (ctx: MeasureContext, domain: Domain, recheck?: boolean) => StorageIterator

View File

@ -14,9 +14,11 @@
// limitations under the License.
//
import { Analytics } from '@hcengineering/analytics'
import core, {
type AttachedDoc,
type Class,
type Collection,
type Doc,
type DocIndexState,
type DocumentQuery,
@ -47,7 +49,6 @@ import { createStateDoc } from './indexer/utils'
import { getScoringConfig, mapSearchResultDoc } from './mapper'
import { type StorageAdapter } from './storage'
import type { FullTextAdapter, IndexedDoc, ServerStorage, WithFind } from './types'
import { Analytics } from '@hcengineering/analytics'
/**
* @public
@ -102,7 +103,8 @@ export class FullTextIndex implements WithFind {
attachedTo,
attachedToClass,
space: tx.objectSpace,
removed: false
removed: false,
needIndex: true
})
stDocs.set(cud.objectId as Ref<DocIndexState>, { create: stDoc, updated: false, removed: false })
} else {
@ -143,7 +145,7 @@ export class FullTextIndex implements WithFind {
const ids: Set<Ref<Doc>> = new Set<Ref<Doc>>()
const baseClass = this.hierarchy.getBaseClass(_class)
let classes = this.hierarchy.getDescendants(baseClass)
let classes = this.hierarchy.getDescendants(baseClass).filter((it) => !this.hierarchy.isMixin(it))
const attrs = this.hierarchy.getAllAttributes(_class)
@ -173,7 +175,8 @@ export class FullTextIndex implements WithFind {
}
if (attr.type._class === core.class.Collection) {
// we need attached documents to be in classes
const dsc = this.hierarchy.getDescendants(attr.attributeOf)
const coll = attr.type as Collection<AttachedDoc>
const dsc = this.hierarchy.getDescendants(coll.of).filter((it) => !this.hierarchy.isMixin(it))
classes = classes.concat(dsc)
}
}

View File

@ -56,8 +56,6 @@ export class ContentRetrievalStage implements FullTextPipelineStage {
textLimit = 100 * 1024
stageValue: boolean | string = true
constructor (
readonly storageAdapter: StorageAdapter | undefined,
readonly workspace: WorkspaceId,

View File

@ -22,7 +22,6 @@ import core, {
type DocIndexState,
type DocumentQuery,
type DocumentUpdate,
type IndexStageState,
type MeasureContext,
type Ref
} from '@hcengineering/core'
@ -36,15 +35,7 @@ import {
type FullTextPipeline,
type FullTextPipelineStage
} from './types'
import {
collectPropagate,
docKey,
docUpdKey,
getContent,
getCustomAttrKeys,
isFullTextAttribute,
loadIndexStageStage
} from './utils'
import { collectPropagate, docKey, docUpdKey, getContent, getCustomAttrKeys, isFullTextAttribute } from './utils'
import { Analytics } from '@hcengineering/analytics'
/**
@ -61,46 +52,9 @@ export class IndexedFieldStage implements FullTextPipelineStage {
updateFields: DocUpdateHandler[] = []
enabled = true
stageValue: boolean | string = true
indexState?: IndexStageState
constructor (private readonly dbStorage: ServerStorage) {}
async initialize (ctx: MeasureContext, storage: DbAdapter, pipeline: FullTextPipeline): Promise<void> {
const indexablePropogate = (
await pipeline.model.findAll(core.class.Class, {
[core.mixin.FullTextSearchContext]: { $exists: true }
})
)
.map((it) => pipeline.hierarchy.as(it, core.mixin.FullTextSearchContext))
.filter((it) => it.propagate != null || it.parentPropagate)
.map((it) =>
JSON.stringify({
id: it._id,
propogate: it.propagate,
parentPropgate: it.parentPropagate
})
)
const forceIndexing = (
await pipeline.model.findAll(core.class.Class, { [core.mixin.FullTextSearchContext + '.forceIndex']: true })
).map((it) => it._id)
indexablePropogate.sort()
;[this.stageValue, this.indexState] = await loadIndexStageStage(
ctx,
storage,
this.indexState,
this.stageId,
'config',
{
classes: indexablePropogate,
forceIndex: forceIndexing
}
)
}
async initialize (ctx: MeasureContext, storage: DbAdapter, pipeline: FullTextPipeline): Promise<void> {}
async search (
_classes: Ref<Class<Doc>>[],
@ -234,7 +188,7 @@ export class IndexedFieldStage implements FullTextPipelineStage {
}
}
await pipeline.update(docState._id, this.stageValue, docUpdate)
await pipeline.update(docState._id, true, docUpdate)
} catch (err: any) {
Analytics.handleError(err)
continue
@ -272,7 +226,7 @@ export class IndexedFieldStage implements FullTextPipelineStage {
await pipeline.update(attachedTo, false, parentDocUpdate)
}
}
await pipeline.update(doc._id, this.stageValue, {})
await pipeline.update(doc._id, true, {})
}
}
}

View File

@ -13,37 +13,40 @@
// limitations under the License.
//
import { Analytics } from '@hcengineering/analytics'
import core, {
type AnyAttribute,
type ArrOf,
type Branding,
type Class,
type Doc,
type DocIndexState,
type DocumentQuery,
type DocumentUpdate,
extractDocKey,
getFullTextContext,
type Hierarchy,
isFullTextAttribute,
type MeasureContext,
RateLimiter,
type Ref,
type WorkspaceId,
getFullTextContext,
type Branding
toIdMap,
type WorkspaceId
} from '@hcengineering/core'
import { jsonToText, markupToJSON } from '@hcengineering/text'
import { type DbAdapter } from '../adapter'
import { updateDocWithPresenter } from '../mapper'
import { type FullTextAdapter, type IndexedDoc, type ServerStorage } from '../types'
import { summaryStageId } from './summary'
import {
contentStageId,
type DocUpdateHandler,
fieldStateId,
type FullTextPipeline,
type FullTextPipelineStage,
fullTextPushStageId
fullTextPushStageId,
summaryStageId
} from './types'
import { collectPropagate, collectPropagateClasses, docKey, isCustomAttr } from './utils'
import { Analytics } from '@hcengineering/analytics'
/**
* @public
@ -62,8 +65,6 @@ export class FullTextPushStage implements FullTextPipelineStage {
field_enabled = '_use'
stageValue: boolean | string = true
constructor (
private readonly dbStorage: ServerStorage,
readonly fulltextAdapter: FullTextAdapter,
@ -103,10 +104,16 @@ export class FullTextPushStage implements FullTextPipelineStage {
return { docs: [], pass: true }
}
allAttrs = new WeakMap<Ref<Class<Doc>>, Map<string, AnyAttribute>>()
async collect (toIndex: DocIndexState[], pipeline: FullTextPipeline, ctx: MeasureContext): Promise<void> {
const bulk: IndexedDoc[] = []
const part = [...toIndex]
const parentsMap = new Map<Ref<DocIndexState>, DocIndexState>()
const pushQueue = new RateLimiter(5)
while (part.length > 0) {
const toIndexPart = part.splice(0, 50)
@ -126,14 +133,31 @@ export class FullTextPushStage implements FullTextPipelineStage {
})
)
// spaces
const spaceDocs = toIdMap(
await ctx.with(
'find-spaces',
{},
async (ctx) =>
await this.dbStorage.findAll(ctx, core.class.DocIndexState, {
_id: {
$in: toIndexPart.map(
(doc) =>
(doc.attributes[docKey('space', { _class: doc.objectClass })] ?? doc.space) as Ref<DocIndexState>
)
}
})
)
)
for (const doc of toIndexPart) {
if (pipeline.cancelling) {
return
}
const elasticDoc = createElasticDoc(doc)
try {
await ctx.with('updateDoc2Elastic', {}, async () => {
updateDoc2Elastic(doc.attributes, elasticDoc, undefined, undefined, pipeline.hierarchy)
ctx.withSync('updateDoc2Elastic', {}, (ctx) => {
updateDoc2Elastic(this.allAttrs, ctx, doc.attributes, elasticDoc, undefined, undefined, pipeline.hierarchy)
})
// Include all child attributes
@ -142,8 +166,17 @@ export class FullTextPushStage implements FullTextPipelineStage {
for (const c of childDocs) {
const fctx = getFullTextContext(pipeline.hierarchy, c.objectClass)
if (fctx.parentPropagate ?? true) {
await ctx.with('updateDoc2Elastic', {}, async () => {
updateDoc2Elastic(c.attributes, elasticDoc, c._id, undefined, pipeline.hierarchy, true)
ctx.withSync('updateDoc2Elastic', {}, (ctx) => {
updateDoc2Elastic(
this.allAttrs,
ctx,
c.attributes,
elasticDoc,
c._id,
undefined,
pipeline.hierarchy,
true
)
})
}
}
@ -153,18 +186,34 @@ export class FullTextPushStage implements FullTextPipelineStage {
const propagate: Ref<Class<Doc>>[] = collectPropagate(pipeline, doc.attachedToClass)
if (propagate.some((it) => pipeline.hierarchy.isDerived(doc.objectClass, it))) {
// We need to include all parent content into this one.
;[parentDoc] = await ctx.with(
'find-parent',
{},
async (ctx) =>
await this.dbStorage.findAll(ctx, core.class.DocIndexState, {
_id: doc.attachedTo as Ref<DocIndexState>
})
)
parentDoc =
parentsMap.get(doc.attachedTo as Ref<DocIndexState>) ??
(await ctx.with('find-parent', {}, async (ctx) =>
(
await this.dbStorage.findAll(
ctx,
core.class.DocIndexState,
{
_id: doc.attachedTo as Ref<DocIndexState>
},
{ limit: 1 }
)
).shift()
))
if (parentDoc !== undefined) {
parentsMap.set(parentDoc._id, parentDoc)
const ppdoc = parentDoc
await ctx.with('updateDoc2Elastic', {}, async () => {
updateDoc2Elastic(ppdoc.attributes, elasticDoc, ppdoc._id, undefined, pipeline.hierarchy, true)
ctx.withSync('updateDoc2Elastic', {}, (ctx) => {
updateDoc2Elastic(
this.allAttrs,
ctx,
ppdoc.attributes,
elasticDoc,
ppdoc._id,
undefined,
pipeline.hierarchy,
true
)
})
const collectClasses = collectPropagateClasses(pipeline, parentDoc.objectClass)
@ -175,21 +224,25 @@ export class FullTextPushStage implements FullTextPipelineStage {
{ attachedTo: parentDoc._id, objectClass: { $in: collectClasses } }
)
for (const c of collections) {
await ctx.with('updateDoc2Elastic', {}, async () => {
updateDoc2Elastic(c.attributes, elasticDoc, c._id, undefined, pipeline.hierarchy, true)
ctx.withSync('updateDoc2Elastic', {}, (ctx) => {
updateDoc2Elastic(
this.allAttrs,
ctx,
c.attributes,
elasticDoc,
c._id,
undefined,
pipeline.hierarchy,
true
)
})
}
}
}
}
}
const [spaceDoc] = await ctx.with(
'find-space',
{},
async (ctx) =>
await this.dbStorage.findAll(ctx, core.class.DocIndexState, {
_id: (doc.attributes[docKey('space', { _class: doc.objectClass })] ?? doc.space) as Ref<DocIndexState>
})
const spaceDoc = spaceDocs.get(
(doc.attributes[docKey('space', { _class: doc.objectClass })] ?? doc.space) as Ref<DocIndexState>
)
await updateDocWithPresenter(pipeline.hierarchy, doc, elasticDoc, { parentDoc, spaceDoc }, this.branding)
@ -210,15 +263,37 @@ export class FullTextPushStage implements FullTextPipelineStage {
}
}
// Perform bulk update to elastic
try {
await this.fulltextAdapter.updateMany(bulk)
for (const doc of toIndex) {
await pipeline.update(doc._id, true, {})
void pushQueue.add(async () => {
try {
try {
await ctx.with('push-elastic', {}, async () => {
await this.fulltextAdapter.updateMany(bulk)
})
} catch (err: any) {
Analytics.handleError(err)
// Try to push one by one
await ctx.with('push-elastic-by-one', {}, async () => {
for (const d of bulk) {
try {
await this.fulltextAdapter.update(d.id, d)
} catch (err2: any) {
Analytics.handleError(err2)
}
}
})
}
if (!pipeline.cancelling) {
for (const doc of toIndexPart) {
await pipeline.update(doc._id, true, {})
}
}
} catch (err: any) {
Analytics.handleError(err)
}
} catch (err: any) {
Analytics.handleError(err)
}
})
}
await pushQueue.waitProcessing()
}
async remove (docs: DocIndexState[], pipeline: FullTextPipeline): Promise<void> {
@ -246,6 +321,8 @@ export function createElasticDoc (upd: DocIndexState): IndexedDoc {
return doc
}
function updateDoc2Elastic (
allAttrs: WeakMap<Ref<Class<Doc>>, Map<string, AnyAttribute>>,
ctx: MeasureContext,
attributes: Record<string, any>,
doc: IndexedDoc,
docIdOverride?: Ref<DocIndexState>,
@ -264,10 +341,25 @@ function updateDoc2Elastic (
let vv: any = v
if (vv != null && extra.includes('base64')) {
vv = Buffer.from(v, 'base64').toString()
ctx.withSync('buffer-from', {}, () => {
vv = Buffer.from(v, 'base64').toString()
})
}
try {
const attribute = hierarchy?.getAttribute(_class ?? doc._class[0], attr)
let attrs = allAttrs.get(_class ?? doc._class[0])
if (attrs === undefined) {
attrs = new Map()
if (attrs !== undefined) {
allAttrs.set(_class ?? doc._class[0], attrs)
}
}
const attribute = attrs?.get(attr) ?? hierarchy?.findAttribute(_class ?? doc._class[0], attr)
if (attribute !== undefined) {
attrs.set(attr, attribute)
allAttrs.set(_class ?? doc._class[0], attrs)
}
if (attribute !== undefined && vv != null) {
if (
isFullTextAttribute(attribute) ||
@ -283,7 +375,9 @@ function updateDoc2Elastic (
attribute.type._class === core.class.TypeMarkup ||
attribute.type._class === core.class.TypeCollaborativeMarkup
) {
vvv = jsonToText(markupToJSON(vv))
ctx.withSync('markup-to-json-text', {}, () => {
vvv = jsonToText(markupToJSON(vv))
})
}
if (!(doc.fulltextSummary ?? '').includes(vvv)) {
doc.fulltextSummary = (doc.fulltextSummary ?? '') + vvv + '\n'
@ -321,8 +415,8 @@ function updateDoc2Elastic (
const spaceKey = docKey('space', { _class: core.class.Doc })
if (doc[spaceKey] !== undefined) {
const existsingSpace = Array.isArray(doc.space) ? doc.space : [doc.space]
const existingSpaces = Array.isArray(doc.space) ? doc.space : [doc.space]
const newSpaces = Array.isArray(doc[spaceKey]) ? doc[spaceKey] : [doc[spaceKey]]
doc.space = [...existsingSpace, ...newSpaces].filter((it, idx, arr) => arr.indexOf(it) === idx)
doc.space = [...existingSpaces, ...newSpaces].filter((it, idx, arr) => arr.indexOf(it) === idx)
}
}

View File

@ -25,14 +25,12 @@ import core, {
type MeasureContext,
type ModelDb,
type Ref,
SortingOrder,
TxFactory,
type WorkspaceId,
_getOperator,
docKey,
groupByArray,
setObjectValue,
toFindResult
setObjectValue
} from '@hcengineering/core'
import { type DbAdapter } from '../adapter'
import { RateLimiter } from '../limitter'
@ -74,9 +72,6 @@ export class FullTextIndexPipeline implements FullTextPipeline {
indexing: Promise<void> | undefined
// Temporary skipped items
skipped = new Map<Ref<DocIndexState>, number>()
indexId = indexCounter++
updateTriggerTimer: any
@ -99,7 +94,6 @@ export class FullTextIndexPipeline implements FullTextPipeline {
async cancel (): Promise<void> {
this.cancelling = true
clearTimeout(this.updateBroadcast)
clearTimeout(this.skippedReiterationTimeout)
clearInterval(this.updateTriggerTimer)
// We need to upload all bulk changes.
await this.processUpload(this.metrics)
@ -114,7 +108,8 @@ export class FullTextIndexPipeline implements FullTextPipeline {
await this.storage.tx(
this.metrics,
ops.createTxUpdateDoc(doc._class, doc.space, doc._id, {
removed: true
removed: true,
needIndex: true
})
)
}
@ -153,7 +148,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
}
}
updateDoc (doc: DocIndexState, tx: DocumentUpdate<DocIndexState>, updateDate: boolean): DocIndexState {
updateDoc (doc: DocIndexState, tx: DocumentUpdate<DocIndexState>, finish: boolean): DocIndexState {
for (const key in tx) {
if (key.startsWith('$')) {
const operator = _getOperator(key)
@ -168,7 +163,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
doc.space = doc.attributes[spaceKey]
}
if (updateDate) {
if (finish) {
doc.modifiedBy = core.account.System
doc.modifiedOn = Date.now()
}
@ -213,6 +208,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
// We need to clear only first state, to prevent multiple index operations to happen.
;(upd as any)['stages.' + this.stages[0].stageId] = false
upd.needIndex = true
this.updateOps.set(u[0], upd)
}
}
@ -225,7 +221,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
// Update are commulative
async update (
docId: Ref<DocIndexState>,
mark: boolean | string,
mark: boolean,
update: DocumentUpdate<DocIndexState>,
flush?: boolean
): Promise<void> {
@ -233,7 +229,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
if (udoc !== undefined) {
await this.stageUpdate(udoc, update)
udoc = this.updateDoc(udoc, update, mark !== false)
udoc = this.updateDoc(udoc, update, mark)
this.toIndex.set(docId, udoc)
}
@ -241,7 +237,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
udoc = this.extraIndex.get(docId)
if (udoc !== undefined) {
await this.stageUpdate(udoc, update)
udoc = this.updateDoc(udoc, update, mark !== false)
udoc = this.updateDoc(udoc, update, mark)
this.extraIndex.set(docId, udoc)
}
}
@ -268,6 +264,12 @@ export class FullTextIndexPipeline implements FullTextPipeline {
// Filter unsupported stages
udoc.stages = update.stages
const stg = Object.values(udoc.stages)
if (!stg.includes(false) && stg.length === this.stages.length) {
// Check if all marks are true, we need to clear needIndex.
udoc.needIndex = false
}
if (Object.keys(update).length > 0) {
this.currentStages[stageId] = (this.currentStages[stageId] ?? 0) + 1
this.stageChanged++
@ -284,14 +286,48 @@ export class FullTextIndexPipeline implements FullTextPipeline {
await this.flush(flush ?? false)
}
// Update are commulative
async updateNeedIndex (docId: Ref<DocIndexState>, value: boolean, flush?: boolean): Promise<void> {
const update = { needIndex: value }
let udoc = this.toIndex.get(docId)
if (udoc !== undefined) {
await this.stageUpdate(udoc, update)
udoc = this.updateDoc(udoc, update, true)
this.toIndex.set(docId, udoc)
}
if (udoc === undefined) {
udoc = this.extraIndex.get(docId)
if (udoc !== undefined) {
await this.stageUpdate(udoc, update)
udoc = this.updateDoc(udoc, update, true)
this.extraIndex.set(docId, udoc)
}
}
if (udoc === undefined) {
// Some updated, document, let's load it.
udoc = (await this.storage.load(this.metrics, DOMAIN_DOC_INDEX_STATE, [docId])).shift() as DocIndexState
}
const current = this.pending.get(docId)
if (current === undefined) {
this.pending.set(docId, update)
} else {
this.pending.set(docId, { ...current, ...update })
}
await this.flush(flush ?? false)
}
triggerCounts = 0
triggerIndexing = (): void => {}
skippedReiterationTimeout: any
currentStages: Record<string, number> = {}
private filterCurrentStages (udoc: DocIndexState): Record<string, string | boolean> {
const result: Record<string, string | boolean> = {}
private filterCurrentStages (udoc: DocIndexState): Record<string, boolean> {
const result: Record<string, boolean> = {}
for (const [k, v] of Object.entries(udoc.stages ?? {})) {
if (this.currentStages[k] !== undefined) {
result[k] = v
@ -327,70 +363,9 @@ export class FullTextIndexPipeline implements FullTextPipeline {
broadcastClasses = new Set<Ref<Class<Doc>>>()
updateBroadcast: any = undefined
indexesCreated = false
async doIndexing (): Promise<void> {
// Check model is upgraded to support indexer.
if (!this.indexesCreated) {
this.indexesCreated = true
// We need to be sure we have individual indexes per stage.
const oldStagesRegex = [/fld-v.*/, /cnt-v.*/, /fts-v.*/, /sum-v.*/, /emb-v.*/]
const deletePattern: RegExp[] = []
const keepPattern: RegExp[] = []
for (const st of this.stages) {
if (this.cancelling) {
return
}
const regexp = oldStagesRegex.find((r) => r.test(st.stageId))
if (regexp !== undefined) {
deletePattern.push(regexp)
keepPattern.push(new RegExp(st.stageId))
}
}
const helper = this.storage.helper()
if (deletePattern.length > 0) {
try {
const existingIndexes = await helper.listIndexes(DOMAIN_DOC_INDEX_STATE)
for (const existingIndex of existingIndexes) {
if (existingIndex.name !== undefined) {
const name: string = existingIndex.name
if (deletePattern.some((it) => it.test(name)) && !keepPattern.some((it) => it.test(name))) {
await helper.dropIndex(DOMAIN_DOC_INDEX_STATE, name)
}
}
}
} catch (err: any) {
console.error(err)
}
}
for (const st of this.stages) {
if (this.cancelling) {
return
}
await this.storage.helper().createIndex(
DOMAIN_DOC_INDEX_STATE,
{
keys: {
['stages.' + st.stageId]: 1
}
},
{ name: 'stages.' + st.stageId + '_1' }
)
}
}
try {
this.hierarchy.getClass(core.class.DocIndexState)
} catch (err: any) {
this.metrics.warn('Models is not upgraded to support indexer', {
indexId: this.indexId,
workspace: this.workspace.name
})
return
}
await this.metrics.with('init-states', {}, async () => {
await this.initStates()
})
@ -403,10 +378,6 @@ export class FullTextIndexPipeline implements FullTextPipeline {
await this.initializeStages()
})
await this.metrics.with('process-remove', { workspace: this.workspace.name }, async () => {
await this.processRemove()
})
const _classes = await this.metrics.with(
'processIndex',
{ workspace: this.workspace.name },
@ -442,14 +413,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
this.triggerIndexing = () => {
this.triggerCounts++
resolve(null)
clearTimeout(this.skippedReiterationTimeout)
}
this.skippedReiterationTimeout = setTimeout(() => {
// Force skipped reiteration, just decrease by -1
for (const [s, v] of Array.from(this.skipped.entries())) {
this.skipped.set(s, v - 1)
}
}, 60000)
})
}
}
@ -458,199 +422,174 @@ export class FullTextIndexPipeline implements FullTextPipeline {
}
private async processIndex (ctx: MeasureContext): Promise<Ref<Class<Doc>>[]> {
let idx = 0
const _classUpdate = new Set<Ref<Class<Doc>>>()
for (const st of this.stages) {
if (this.cancelling) {
return []
}
idx++
await rateLimiter.exec(async () => {
while (true) {
try {
if (this.cancelling) {
return Array.from(_classUpdate.values())
}
if (!st.enabled) {
break
}
await ctx.with('flush', {}, async () => {
await this.flush(true)
})
const toSkip = Array.from(this.skipped.entries())
.filter((it) => it[1] > 3)
.map((it) => it[0])
const q: DocumentQuery<DocIndexState> = {
[`stages.${st.stageId}`]: { $ne: st.stageValue },
removed: false
}
if (toSkip.length > 0) {
q._id = { $nin: toSkip }
}
let result = await ctx.with(
'get-to-index',
{},
async (ctx) =>
await this.storage.findAll(ctx, core.class.DocIndexState, q, {
sort: { modifiedOn: SortingOrder.Descending },
limit: globalIndexer.processingSize,
skipClass: true,
skipSpace: true
})
)
const toRemove: DocIndexState[] = []
// Check and remove missing class documents.
result = toFindResult(
result.filter((doc) => {
const _class = this.model.findObject(doc.objectClass)
if (_class === undefined) {
// no _class present, remove doc
toRemove.push(doc)
return false
}
return true
}),
result.total
)
if (toRemove.length > 0) {
try {
await this.storage.clean(
this.metrics,
DOMAIN_DOC_INDEX_STATE,
toRemove.map((it) => it._id)
)
} catch (err: any) {
Analytics.handleError(err)
// QuotaExceededError, ignore
}
}
if (result.length > 0) {
this.metrics.info('Full text: Indexing', {
indexId: this.indexId,
stageId: st.stageId,
workspace: this.workspace.name,
...this.currentStages
})
} else {
// Nothing to index, check on next cycle.
break
}
this.toIndex = new Map(result.map((it) => [it._id, it]))
this.extraIndex.clear()
this.stageChanged = 0
// Find documents matching query
const toIndex = this.matchStates(st)
if (toIndex.length > 0) {
// Do Indexing
this.currentStage = st
await ctx.with('collect-' + st.stageId, {}, async (ctx) => {
await st.collect(toIndex, this, ctx)
})
if (this.cancelling) {
break
}
toIndex.forEach((it) => _classUpdate.add(it.objectClass))
// go with next stages if they accept it
for (const nst of this.stages.slice(idx)) {
const toIndex2 = this.matchStates(nst)
if (toIndex2.length > 0) {
this.currentStage = nst
await ctx.with('collect-' + nst.stageId, {}, async (ctx) => {
await nst.collect(toIndex2, this, ctx)
})
}
if (this.cancelling) {
break
}
}
} else {
break
}
// Check items with not updated state.
for (const d of toIndex) {
if (d.stages?.[st.stageId] === false) {
this.skipped.set(d._id, (this.skipped.get(d._id) ?? 0) + 1)
} else {
this.skipped.delete(d._id)
}
}
} catch (err: any) {
Analytics.handleError(err)
this.metrics.error('error during index', { error: err })
await rateLimiter.exec(async () => {
while (true) {
try {
if (this.cancelling) {
return Array.from(_classUpdate.values())
}
await ctx.with('flush', {}, async () => {
await this.flush(true)
})
let result: DocIndexState[] | undefined = await ctx.with('get-indexable', {}, async () => {
const q: DocumentQuery<DocIndexState> = {
needIndex: true
}
return await this.storage.findAll(ctx, core.class.DocIndexState, q, {
limit: globalIndexer.processingSize,
skipClass: true,
skipSpace: true
})
})
if (result === undefined) {
// No more results
break
}
await this.processRemove(result)
result = result.filter((it) => !it.removed)
const toRemove: DocIndexState[] = []
// Check and remove missing class documents.
result = result.filter((doc) => {
const _class = this.model.findObject(doc.objectClass)
if (_class === undefined) {
// no _class present, remove doc
toRemove.push(doc)
return false
}
return true
})
if (toRemove.length > 0) {
try {
await this.storage.clean(
this.metrics,
DOMAIN_DOC_INDEX_STATE,
toRemove.map((it) => it._id)
)
} catch (err: any) {
Analytics.handleError(err)
// QuotaExceededError, ignore
}
}
if (result.length > 0) {
this.metrics.info('Full text: Indexing', {
indexId: this.indexId,
workspace: this.workspace.name,
...this.currentStages
})
} else {
// Nothing to index, check on next cycle.
break
}
const retry: DocIndexState[] = []
await this.processStages(result, ctx, _classUpdate)
// Force clear needIndex, it will be re trigger if some propogate will happen next.
if (!this.cancelling) {
for (const u of result) {
const stg = Object.values(u.stages)
if (!stg.includes(false) && stg.length === this.stages.length) {
// Check if all marks are true, we need to clear needIndex.
u.needIndex = false
await this.updateNeedIndex(u._id, false)
} else {
// Mark as retry on
retry.push(u)
}
}
}
if (retry.length > 0) {
await this.processStages(retry, ctx, _classUpdate)
if (!this.cancelling) {
for (const u of retry) {
// Since retry is happen, it shoudl be marked already.
u.needIndex = false
await this.updateNeedIndex(u._id, false)
}
}
}
} catch (err: any) {
Analytics.handleError(err)
this.metrics.error('error during index', { error: err })
}
})
}
}
})
return Array.from(_classUpdate.values())
}
private async processRemove (): Promise<void> {
let total = 0
while (true) {
const result = await this.storage.findAll(
this.metrics,
core.class.DocIndexState,
{
removed: true
},
{
limit: 1000,
projection: {
_id: 1,
stages: 1,
objectClass: 1
},
skipSpace: true,
skipClass: true
}
)
this.toIndex = new Map(result.map((it) => [it._id, it]))
private async processStages (
result: DocIndexState[],
ctx: MeasureContext,
_classUpdate: Set<Ref<Class<Doc>>>
): Promise<void> {
this.toIndex = new Map(result.map((it) => [it._id, it]))
for (const st of this.stages) {
this.extraIndex.clear()
this.stageChanged = 0
// Find documents matching query
const toIndex = this.matchStates(st)
const toIndex = Array.from(this.toIndex.values())
const toRemoveIds = []
for (const st of this.stages) {
if (toIndex.length > 0) {
// Do Indexing
this.currentStage = st
await st.remove(toIndex, this)
} else {
if (toIndex.length > 0) {
// Do Indexing
this.currentStage = st
await ctx.with('collect-' + st.stageId, {}, async (ctx) => {
await st.collect(toIndex, this, ctx)
})
if (this.cancelling) {
break
}
}
// If all stages are complete, remove document
const allStageIds = this.stages.map((it) => it.stageId)
for (const doc of toIndex) {
if (allStageIds.every((it) => doc.stages[it])) {
toRemoveIds.push(doc._id)
}
}
await this.flush(true)
if (toRemoveIds.length > 0) {
await this.storage.clean(this.metrics, DOMAIN_DOC_INDEX_STATE, toRemoveIds)
total += toRemoveIds.length
this.metrics.info('indexer', {
_classes: Array.from(groupByArray(toIndex, (it) => it.objectClass).keys()),
total,
count: toRemoveIds.length
})
toIndex.forEach((it) => _classUpdate.add(it.objectClass))
} else {
continue
}
}
}
private async processRemove (docs: DocIndexState[]): Promise<void> {
let total = 0
this.toIndex = new Map(docs.map((it) => [it._id, it]))
this.extraIndex.clear()
const toIndex = Array.from(this.toIndex.values()).filter((it) => it.removed)
if (toIndex.length === 0) {
return
}
const toRemoveIds = []
for (const st of this.stages) {
if (toIndex.length > 0) {
// Do Indexing
this.currentStage = st
await st.remove(toIndex, this)
} else {
break
}
}
// If all stages are complete, remove document
const allStageIds = this.stages.map((it) => it.stageId)
for (const doc of toIndex) {
if (allStageIds.every((it) => doc.stages[it])) {
toRemoveIds.push(doc._id)
}
}
await this.flush(true)
if (toRemoveIds.length > 0) {
await this.storage.clean(this.metrics, DOMAIN_DOC_INDEX_STATE, toRemoveIds)
total += toRemoveIds.length
this.metrics.info('indexer', {
_classes: Array.from(groupByArray(toIndex, (it) => it.objectClass).keys()),
total,
count: toRemoveIds.length
})
}
}
private async initStates (): Promise<void> {
@ -665,7 +604,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
const require = [...st.require].filter((it) => this.stages.find((q) => q.stageId === it && q.enabled))
for (const o of this.toIndex.values()) {
// We need to contain all state values
if (require.every((it) => o.stages?.[it])) {
if (require.every((it) => o.stages?.[it]) && !(o.stages?.[st.stageId] ?? false)) {
toIndex.push(o)
}
}

View File

@ -13,6 +13,7 @@
// limitations under the License.
//
import { Analytics } from '@hcengineering/analytics'
import core, {
type AnyAttribute,
type Class,
@ -21,12 +22,11 @@ import core, {
type DocumentQuery,
type DocumentUpdate,
extractDocKey,
getFullTextContext,
type Hierarchy,
type IndexStageState,
isFullTextAttribute,
type MeasureContext,
type Ref,
getFullTextContext
type Ref
} from '@hcengineering/core'
import { translate } from '@hcengineering/platform'
import { jsonToText, markupToJSON } from '@hcengineering/text'
@ -37,15 +37,10 @@ import {
type DocUpdateHandler,
fieldStateId,
type FullTextPipeline,
type FullTextPipelineStage
type FullTextPipelineStage,
summaryStageId
} from './types'
import { collectPropagate, collectPropagateClasses, isCustomAttr, loadIndexStageStage } from './utils'
import { Analytics } from '@hcengineering/analytics'
/**
* @public
*/
export const summaryStageId = 'sum-v5'
import { collectPropagate, collectPropagateClasses, isCustomAttr } from './utils'
/**
* @public
@ -65,35 +60,12 @@ export class FullSummaryStage implements FullTextPipelineStage {
fieldFilter: ((attr: AnyAttribute, value: string) => boolean)[] = []
stageValue: boolean | string = true
indexState?: IndexStageState
// Summary should be not a bigger what 1mb of data.
summaryLimit = 1024 * 1024
constructor (private readonly dbStorage: ServerStorage) {}
async initialize (ctx: MeasureContext, storage: DbAdapter, pipeline: FullTextPipeline): Promise<void> {
const indexable = (
await pipeline.model.findAll(core.class.Class, { [core.mixin.FullTextSearchContext]: { $exists: true } })
)
.map((it) => pipeline.hierarchy.as(it, core.mixin.FullTextSearchContext))
.filter((it) => it.fullTextSummary)
.map((it) => it._id + (it.propagateClasses ?? []).join('|'))
indexable.sort()
;[this.stageValue, this.indexState] = await loadIndexStageStage(
ctx,
storage,
this.indexState,
this.stageId,
'config',
{
classes: indexable,
matchExtra: this.matchExtra
}
)
}
async initialize (ctx: MeasureContext, storage: DbAdapter, pipeline: FullTextPipeline): Promise<void> {}
async search (
_classes: Ref<Class<Doc>>[],
@ -107,7 +79,7 @@ export class FullSummaryStage implements FullTextPipelineStage {
async collect (toIndex: DocIndexState[], pipeline: FullTextPipeline, metrics: MeasureContext): Promise<void> {
const part = [...toIndex]
while (part.length > 0) {
const toIndexPart = part.splice(0, 1000)
const toIndexPart = part.splice(0, 100)
const kids = toIndexPart.map((it) => it._id)
const allChildDocs = await metrics.with(
@ -128,7 +100,7 @@ export class FullSummaryStage implements FullTextPipelineStage {
// No need to index this class, mark embeddings as empty ones.
if (!needIndex) {
await pipeline.update(doc._id, this.stageValue, {})
await pipeline.update(doc._id, true, {})
continue
}
@ -201,7 +173,7 @@ export class FullSummaryStage implements FullTextPipelineStage {
update.fullSummary = embeddingText
await pipeline.update(doc._id, this.stageValue, update)
await pipeline.update(doc._id, true, update)
}
}
}
@ -209,7 +181,7 @@ export class FullSummaryStage implements FullTextPipelineStage {
async remove (docs: DocIndexState[], pipeline: FullTextPipeline): Promise<void> {
// will be handled by field processor
for (const doc of docs) {
await pipeline.update(doc._id, this.stageValue, {})
await pipeline.update(doc._id, true, {})
}
}
}

View File

@ -35,7 +35,7 @@ export interface FullTextPipeline {
model: ModelDb
update: (
docId: Ref<DocIndexState>,
mark: boolean | string,
mark: boolean,
update: DocumentUpdate<DocIndexState>,
flush?: boolean
) => Promise<void>
@ -75,9 +75,6 @@ export interface FullTextPipelineStage {
updateFields: DocUpdateHandler[]
enabled: boolean
stageValue: boolean | string
initialize: (ctx: MeasureContext, storage: DbAdapter, pipeline: FullTextPipeline) => Promise<void>
// Collect all changes related to bulk of document states
@ -109,6 +106,16 @@ export const fieldStateId = 'fld-v15'
*/
export const fullTextPushStageId = 'fts-v17'
/**
* @public
*/
export const summaryStageId = 'sum-v5'
/**
* @public
*/
export const collabStageId = 'collab-v1'
/**
* @public
*/

View File

@ -20,18 +20,12 @@ import core, {
type Doc,
type DocIndexState,
type FullTextSearchContext,
generateId,
getFullTextContext,
type Hierarchy,
type IndexStageState,
type MeasureContext,
type Obj,
type Ref,
type Space,
TxFactory
type Space
} from '@hcengineering/core'
import { deepEqual } from 'fast-equals'
import { type DbAdapter } from '../adapter'
import plugin from '../plugin'
import { type FullTextPipeline } from './types'
@ -81,59 +75,6 @@ export function createStateDoc (
}
}
/**
* @public
*/
export async function loadIndexStageStage (
ctx: MeasureContext,
storage: DbAdapter,
state: IndexStageState | undefined,
stageId: string,
field: string,
newValue: any
): Promise<[boolean | string, IndexStageState]> {
if (state === undefined) {
;[state] = await storage.findAll(ctx, core.class.IndexStageState, { stageId }, { limit: 1 })
}
const attributes: Record<string, any> = state?.attributes ?? {}
let result: boolean | string | undefined = attributes?.index !== undefined ? `${attributes?.index as number}` : true
if (!deepEqual(attributes[field], newValue)) {
// Not match,
const newIndex = ((attributes.index as number) ?? 0) + 1
result = `${newIndex}`
const ops = new TxFactory(core.account.System, true)
const data = {
stageId,
attributes: {
[field]: newValue,
index: newIndex
}
}
if (state === undefined) {
const id: Ref<IndexStageState> = generateId()
await storage.tx(ctx, ops.createTxCreateDoc(core.class.IndexStageState, plugin.space.DocIndexState, data, id))
state = {
...data,
_class: core.class.IndexStageState,
_id: id,
space: plugin.space.DocIndexState,
modifiedBy: core.account.System,
modifiedOn: Date.now()
}
} else {
await storage.tx(
ctx,
ops.createTxUpdateDoc(core.class.IndexStageState, plugin.space.DocIndexState, state._id, data)
)
state = { ...state, ...data, modifiedOn: Date.now() }
}
}
return [result, state]
}
/**
* @public
*/

View File

@ -98,7 +98,6 @@ export class TServerStorage implements ServerStorage {
branding: Branding | null
domainInfo = new Map<Domain, DomainInfo>()
statsCtx: MeasureContext
emptyAdapter = new DummyDbAdapter()
@ -125,7 +124,6 @@ export class TServerStorage implements ServerStorage {
this.branding = options.branding
this.setModel(model)
this.statsCtx = metrics.newChild('stats-' + this.workspaceId.name, {})
}
async initDomainInfo (): Promise<void> {

View File

@ -731,7 +731,7 @@ abstract class MongoAdapterBase implements DbAdapter {
const coll = this.collection(domain)
const mongoQuery = this.translateQuery(_class, query, options)
if (options?.limit === 1) {
if (options?.limit === 1 || typeof query._id === 'string') {
// Skip sort/projection/etc.
return await ctx.with(
'find-one',
@ -748,11 +748,11 @@ abstract class MongoAdapterBase implements DbAdapter {
const doc = await coll.findOne(mongoQuery, findOptions)
let total = -1
if (options.total === true) {
if (options?.total === true) {
total = await coll.countDocuments(mongoQuery)
}
if (doc != null) {
return toFindResult([doc as unknown as T], total)
return toFindResult([this.stripHash<T>(doc as unknown as T) as T], total)
}
return toFindResult([], total)
},
@ -783,7 +783,6 @@ abstract class MongoAdapterBase implements DbAdapter {
cursor = cursor.limit(options.limit ?? 1)
}
}
// Error in case of timeout
try {
let res: T[] = []

View File

@ -79,11 +79,29 @@ export class APMMeasureContext implements MeasureContext {
if (value instanceof Promise) {
value = await value
}
c.end()
return value
} catch (err: any) {
c.error(err)
throw err
} finally {
c.end()
}
}
withSync<T>(
name: string,
params: ParamsType,
op: (ctx: MeasureContext) => T,
fullParams?: FullParamsType | (() => FullParamsType)
): T {
const c = this.newChild(name, params)
try {
return op(c)
} catch (err: any) {
c.error(err)
throw err
} finally {
c.end()
}
}

View File

@ -348,7 +348,7 @@ export async function upgradeModel (
)
const upgradeIndexes = async (): Promise<void> => {
ctx.info('Migrate to sparse indexes')
ctx.info('Migrate indexes')
// Create update indexes
await createUpdateIndexes(
ctx,
@ -385,7 +385,7 @@ export async function upgradeModel (
await tryMigrate(migrateClient, coreId, [
{
state: 'indexes-v2',
state: 'indexes-v3',
func: upgradeIndexes
}
])

View File

@ -19,7 +19,6 @@ import {
DocIndexState,
DocumentQuery,
DocumentUpdate,
IndexStageState,
MeasureContext,
Ref,
WorkspaceId
@ -33,12 +32,10 @@ import {
extractDocKey,
fieldStateId,
FullTextPipeline,
IndexedDoc,
loadIndexStageStage
IndexedDoc
} from '@hcengineering/server-core'
import got from 'got'
import translatePlugin from './plugin'
import { translateStateId, TranslationStage } from './types'
/**
@ -59,40 +56,9 @@ export class LibRetranslateStage implements TranslationStage {
token: string = ''
endpoint: string = ''
stageValue: boolean | string = true
indexState?: IndexStageState
constructor (readonly workspaceId: WorkspaceId) {}
async initialize (ctx: MeasureContext, storage: DbAdapter, pipeline: FullTextPipeline): Promise<void> {
// Just do nothing
try {
const config = await storage.findAll(ctx, translatePlugin.class.TranslateConfiguration, {})
if (config.length > 0) {
this.enabled = config[0].enabled
this.token = config[0].token
this.endpoint = config[0].endpoint
} else {
this.enabled = false
}
} catch (err: any) {
console.error(err)
this.enabled = false
}
;[this.stageValue, this.indexState] = await loadIndexStageStage(
ctx,
storage,
this.indexState,
this.stageId,
'config',
{
enabled: this.enabled,
endpoint: this.endpoint
}
)
}
async initialize (ctx: MeasureContext, storage: DbAdapter, pipeline: FullTextPipeline): Promise<void> {}
async search (
_classes: Ref<Class<Doc>>[],
@ -247,13 +213,13 @@ export class LibRetranslateStage implements TranslationStage {
return
}
await pipeline.update(doc._id, this.stageValue, update, true)
await pipeline.update(doc._id, true, update, true)
}
async remove (docs: DocIndexState[], pipeline: FullTextPipeline): Promise<void> {
// will be handled by field processor
for (const doc of docs) {
await pipeline.update(doc._id, this.stageValue, {})
await pipeline.update(doc._id, true, {})
}
}
}

View File

@ -431,18 +431,13 @@ function createWebsocketClientSocket (
},
data: () => data,
send: async (ctx: MeasureContext, msg, binary, compression) => {
const sst = Date.now()
const smsg = rpcHandler.serialize(msg, binary)
ctx.measure('serialize', Date.now() - sst)
ctx.measure('send-data', smsg.length)
const st = Date.now()
if (ws.readyState !== ws.OPEN || cs.isClosed) {
return
}
if (ws.bufferedAmount > 16 * 1024) {
ctx.measure('send-bufferAmmount', 1)
}
ws.send(smsg, { binary: true, compress: compression }, (err) => {
if (err != null) {
if (!`${err.message}`.includes('WebSocket is not open')) {

View File

@ -40,11 +40,8 @@ export function processRequest (
buff: any,
handleRequest: HandleRequestFunction
): void {
const st = Date.now()
try {
const request = cs.readRequest(buff, session.binaryMode)
const ed = Date.now()
context.measure('deserialize', ed - st)
handleRequest(context, session, cs, request, workspaceId)
} catch (err: any) {
if (((err.message as string) ?? '').includes('Data read, but end of buffer not reached')) {

View File

@ -1,13 +1,14 @@
import { test, expect } from '@playwright/test'
import { PlatformURI, generateTestData } from '../utils'
import { LeftSideMenuPage } from '../model/left-side-menu-page'
import { ChunterPage } from '../model/chunter-page'
import { ChannelPage } from '../model/channel-page'
import { ApiEndpoint } from '../API/Api'
import { LoginPage } from '../model/login-page'
import { SignUpData } from '../model/common-types'
import { faker } from '@faker-js/faker'
import { expect, test } from '@playwright/test'
import { ApiEndpoint } from '../API/Api'
import { ChannelPage } from '../model/channel-page'
import { ChunterPage } from '../model/chunter-page'
import { SignUpData } from '../model/common-types'
import { LeftSideMenuPage } from '../model/left-side-menu-page'
import { LoginPage } from '../model/login-page'
import { SelectWorkspacePage } from '../model/select-workspace-page'
import { SignInJoinPage } from '../model/signin-page'
import { PlatformURI, generateTestData } from '../utils'
test.describe('channel tests', () => {
let leftSideMenuPage: LeftSideMenuPage
@ -36,7 +37,9 @@ test.describe('channel tests', () => {
await api.createWorkspaceWithLogin(data.workspaceName, data.userName, '1234')
await (await page.goto(`${PlatformURI}`))?.finished()
await loginPage.login(data.userName, '1234')
await (await page.goto(`${PlatformURI}/workbench/${data.workspaceName}`))?.finished()
const swp = new SelectWorkspacePage(page)
await swp.selectWorkspace(data.workspaceName)
// await (await page.goto(`${PlatformURI}/workbench/${data.workspaceName}`))?.finished()
})
test('create new private channel and check if the messages stays on it', async ({ browser, page }) => {

View File

@ -12,6 +12,7 @@ import { NewCompany } from '../model/recruiting/types'
import { NavigationMenuPage } from '../model/recruiting/navigation-menu-page'
import { CompaniesPage } from '../model/recruiting/companies-page'
import { CompanyDetailsPage } from '../model/recruiting/company-details-page'
import { SelectWorkspacePage } from '../model/select-workspace-page'
test.describe('Custom attributes tests', () => {
let leftSideMenuPage: LeftSideMenuPage
@ -42,7 +43,9 @@ test.describe('Custom attributes tests', () => {
await api.createWorkspaceWithLogin(data.workspaceName, data.userName, '1234')
await (await page.goto(`${PlatformURI}`))?.finished()
await loginPage.login(data.userName, '1234')
await (await page.goto(`${PlatformURI}/workbench/${data.workspaceName}`))?.finished()
const swp = new SelectWorkspacePage(page)
await swp.selectWorkspace(data.workspaceName)
// await (await page.goto(`${PlatformURI}/workbench/${data.workspaceName}`))?.finished()
})
test('Check if all custom attributes exists', async ({ browser, page }) => {

View File

@ -12,6 +12,7 @@ import { SignInJoinPage } from '../model/signin-page'
import { ChannelPage } from '../model/channel-page'
import { UserProfilePage } from '../model/profile/user-profile-page'
import { MenuItems, NotificationsPage } from '../model/profile/notifications-page'
import { SelectWorkspacePage } from '../model/select-workspace-page'
test.describe('Inbox tests', () => {
let leftSideMenuPage: LeftSideMenuPage
@ -40,7 +41,9 @@ test.describe('Inbox tests', () => {
await api.createWorkspaceWithLogin(data.workspaceName, data.userName, '1234')
await (await page.goto(`${PlatformURI}`))?.finished()
await loginPage.login(data.userName, '1234')
await (await page.goto(`${PlatformURI}/workbench/${data.workspaceName}`))?.finished()
const swp = new SelectWorkspacePage(page)
await swp.selectWorkspace(data.workspaceName)
// await (await page.goto(`${PlatformURI}/workbench/${data.workspaceName}`))?.finished()
})
test('User is able to create a task, assign a himself and see it inside the inbox', async ({ page }) => {

View File

@ -11,6 +11,7 @@ import { ApiEndpoint } from '../API/Api'
import { LoginPage } from '../model/login-page'
import { SignInJoinPage } from '../model/signin-page'
import { TeamPage } from '../model/team-page'
import { SelectWorkspacePage } from '../model/select-workspace-page'
test.use({
storageState: PlatformSetting
@ -192,7 +193,9 @@ test.describe('Planning ToDo tests', () => {
await api.createWorkspaceWithLogin(data.workspaceName, data.userName, '1234')
await (await page.goto(`${PlatformURI}`))?.finished()
await loginPage.login(data.userName, '1234')
await (await page.goto(`${PlatformURI}/workbench/${data.workspaceName}`))?.finished()
const swp = new SelectWorkspacePage(page)
await swp.selectWorkspace(data.workspaceName)
// await (await page.goto(`${PlatformURI}/workbench/${data.workspaceName}`))?.finished()
await leftSideMenuPage.clickPlanner()
const planningNavigationMenuPage = new PlanningNavigationMenuPage(page)