UBERF-9694: Queue processing improvements (#8418)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2025-04-02 00:15:06 +07:00 committed by GitHub
parent 19ea34f260
commit 08434ccb3c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 1240 additions and 407 deletions

View File

@ -200,7 +200,7 @@ jobs:
cd ./tests
./prepare-tests.sh
- name: Testing...
run: node common/scripts/install-run-rush.js test
run: node common/scripts/install-run-rush.js test --verbose
env:
DB_URL: 'postgresql://root@localhost:26258/defaultdb?sslmode=disable'
ELASTIC_URL: 'http://localhost:9201'

16
.vscode/launch.json vendored
View File

@ -152,20 +152,20 @@
"args": ["src/index.ts"],
"env": {
// "PORT": "4700",// For mongo
"PORT": "4702", // for cockroach
"FULLTEXT_DB_URL": "http://localhost:9200",
// "DB_URL": "mongodb://localhost:27017",
"PORT": "4710", // for cockroach
"FULLTEXT_DB_URL": "http://localhost:9201",
"DB_URL": "mongodb://localhost:27018",
// "DB_URL": "postgresql://postgres:example@localhost:5432",
"DB_URL": "postgresql://root@huly.local:26257/defaultdb?sslmode=disable",
// "DB_URL": "postgresql://root@huly.local:26257/defaultdb?sslmode=disable",
"STORAGE_CONFIG": "minio|localhost?accessKey=minioadmin&secretKey=minioadmin",
"SERVER_SECRET": "secret",
"REKONI_URL": "http://localhost:4004",
"MODEL_JSON": "${workspaceRoot}/models/all/bundle/model.json",
"ELASTIC_INDEX_NAME": "local_storage_index",
"REGION": "cockroach",
"STATS_URL": "http://huly.local:4900",
"ACCOUNTS_URL": "http://localhost:3000",
"QUEUE_CONFIG": "localhost:19092"
"REGION": "",
"STATS_URL": "http://huly.local:4901",
"ACCOUNTS_URL": "http://localhost:3003",
"QUEUE_CONFIG": "localhost:19093;-staging"
},
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
"runtimeVersion": "20",

View File

@ -61,7 +61,6 @@
"@hcengineering/client-resources": "^0.6.27",
"@hcengineering/core": "^0.6.32",
"dotenv": "~16.0.0",
"@hcengineering/backup-service": "^0.6.0",
"@hcengineering/analytics": "^0.6.0",
"@hcengineering/analytics-service": "^0.6.0",
"koa": "^2.15.4",

View File

@ -0,0 +1,223 @@
/* eslint-disable @typescript-eslint/unbound-method */
import core, {
generateId,
MeasureMetricsContext,
TxOperations,
type Doc,
type MeasureContext,
type PersonUuid,
type Ref,
type Tx,
type WorkspaceDataId,
type WorkspaceInfoWithStatus,
type WorkspaceUuid
} from '@hcengineering/core'
import { WorkspaceManager } from '../manager'
import { createPlatformQueue, parseQueueConfig } from '@hcengineering/kafka'
import {
createDummyStorageAdapter,
QueueTopic,
workspaceEvents,
wrapPipeline,
type FulltextListener,
type IndexedDoc,
type QueueWorkspaceMessage
} from '@hcengineering/server-core'
import { decodeToken, generateToken } from '@hcengineering/server-token'
import { randomUUID } from 'crypto'
import { createDoc, test, type TestDocument } from './minmodel'
import { dbConfig, dbUrl, elasticIndexName, model, prepare, preparePipeline } from './utils'
prepare()
jest.setTimeout(500000)
class TestWorkspaceManager extends WorkspaceManager {
public async getWorkspaceInfo (token?: string): Promise<WorkspaceInfoWithStatus | undefined> {
const decodedToken = decodeToken(token ?? '')
return {
uuid: decodedToken.workspace,
url: decodedToken.workspace,
region: 'test',
name: 'test',
dataId: decodedToken.workspace as unknown as WorkspaceDataId,
mode: 'active',
processingProgress: 0,
backupInfo: {
dataSize: 0,
blobsSize: 0,
backupSize: 0,
lastBackup: 0,
backups: 0
},
versionMajor: 0,
versionMinor: 0,
versionPatch: 0,
lastVisit: 0,
createdOn: 0,
createdBy: decodedToken.account
}
}
async getTransactorAPIEndpoint (token: string): Promise<string | undefined> {
return undefined
}
}
class TestQueue {
genId = generateId()
config = parseQueueConfig('localhost:19093;-testing-' + this.genId, 'fulltext-test-' + this.genId, '')
fulltextListener: FulltextListener | undefined
queue = createPlatformQueue(this.config)
mgr!: TestWorkspaceManager
constructor (readonly ctx: MeasureContext) {}
async start (): Promise<void> {
await this.queue.createTopics(1)
this.mgr = new TestWorkspaceManager(this.ctx, model, {
queue: this.queue,
accountsUrl: 'http://localhost:3003',
elasticIndexName,
serverSecret: 'secret',
dbURL: dbUrl,
config: dbConfig,
externalStorage: createDummyStorageAdapter(),
listener: {
onIndexing: async (doc: IndexedDoc) => {
return await this.fulltextListener?.onIndexing?.(doc)
},
onClean: async (doc: Ref<Doc>[]) => {
return await this.fulltextListener?.onClean?.(doc)
}
}
})
await this.mgr.startIndexer()
}
async close (): Promise<void> {
await this.mgr.shutdown(true)
await this.queue.shutdown()
}
async expectIndexingDoc (pattern: string, op: () => Promise<void>, timeoutMs: number = 10000): Promise<void> {
const waitPromise = new Promise<void>((resolve, reject) => {
const to = setTimeout(() => {
reject(new Error(`Timeout waiting for document with pattern "${pattern}" to be indexed`))
}, timeoutMs)
this.fulltextListener = {
onIndexing: async (doc) => {
if ((doc.fulltextSummary ?? '')?.includes(pattern)) {
clearTimeout(to)
resolve()
}
}
}
})
await op()
await waitPromise
}
}
describe('full-text-indexing', () => {
const toolCtx = new MeasureMetricsContext('tool', {})
it('check-file-indexing', async () => {
const queue = new TestQueue(toolCtx)
await queue.start()
try {
const txProducer = queue.queue.createProducer<Tx>(toolCtx, QueueTopic.Tx)
const personId = randomUUID().toString() as PersonUuid
const wsId: WorkspaceUuid = randomUUID().toString() as WorkspaceUuid
const token = generateToken(personId, wsId)
const indexer = await queue.mgr.getIndexer(toolCtx, wsId, token, true)
expect(indexer).toBeDefined()
const dataId = generateId()
await queue.expectIndexingDoc(dataId, async () => {
await txProducer.send(wsId, [
createDoc(test.class.TestDocument, {
title: 'first doc',
description: dataId
})
])
})
} finally {
await queue.close()
}
})
it('check-full-pipeline', async () => {
const queue = new TestQueue(toolCtx)
await queue.start()
const { pipeline, wsIds } = await preparePipeline(toolCtx, queue.queue)
try {
const pipelineClient = wrapPipeline(toolCtx, pipeline, wsIds, true)
const dataId = generateId()
const ops = new TxOperations(pipelineClient, core.account.System)
let id: Ref<TestDocument>
await queue.expectIndexingDoc(dataId, async () => {
id = await ops.createDoc(test.class.TestDocument, core.space.Workspace, {
title: 'first doc',
description: dataId
})
})
const newData = generateId()
await queue.expectIndexingDoc(newData, async () => {
await ops.updateDoc<TestDocument>(test.class.TestDocument, core.space.Workspace, id, {
description: newData
})
})
} finally {
await queue.close()
await pipeline.close()
}
})
it('test-reindex', async () => {
const queue = new TestQueue(toolCtx)
await queue.start()
const { pipeline, wsIds } = await preparePipeline(toolCtx, queue.queue, false) // Do not use broadcast
const wsProcessor = queue.queue.createProducer<QueueWorkspaceMessage>(toolCtx, QueueTopic.Workspace)
try {
const pipelineClient = wrapPipeline(toolCtx, pipeline, wsIds)
const dataId = generateId()
const ops = new TxOperations(pipelineClient, core.account.System)
for (let i = 0; i < 1000; i++) {
await ops.createDoc(test.class.TestDocument, core.space.Workspace, {
title: 'first doc:' + i,
description: dataId + i
})
}
let indexOps = 0
const reindexAllP = new Promise<void>((resolve) => {
queue.fulltextListener = {
onIndexing: async (doc) => {
indexOps++
if (indexOps === 1000) {
resolve()
}
}
}
})
await wsProcessor.send(wsIds.uuid, [workspaceEvents.fullReindex()])
// Wait for reindex
await reindexAllP
} finally {
await wsProcessor.close()
await queue.close()
await pipeline.close()
}
})
})

View File

@ -0,0 +1,196 @@
//
// Copyright © 2020 Anticrm Platform Contributors.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import core, {
type AccountUuid,
type Arr,
type AttachedDoc,
type Attribute,
type Class,
ClassifierKind,
type Data,
type Doc,
type Domain,
IndexKind,
type Mixin,
type Obj,
type PersonId,
type Ref,
type Space,
type Tx,
type TxCreateDoc,
TxFactory
} from '@hcengineering/core'
import type { IntlString, Plugin } from '@hcengineering/platform'
import { plugin } from '@hcengineering/platform'
import buildModel from '@hcengineering/model-all'
const txFactory = new TxFactory(core.account.System)
function createClass (_class: Ref<Class<Obj>>, attributes: Data<Class<Obj>>): TxCreateDoc<Doc> {
return txFactory.createTxCreateDoc(core.class.Class, core.space.Model, attributes, _class)
}
function createAttribute (
_class: Ref<Class<Obj>>,
attributes: Omit<Data<Attribute<any>>, 'attributeOf'>
): TxCreateDoc<Doc> {
return txFactory.createTxCreateDoc(
core.class.Attribute,
core.space.Model,
{ ...attributes, attributeOf: _class },
(_class + attributes.name) as Ref<Attribute<any>>
)
}
/**
* @public
*/
export function createDoc<T extends Doc> (
_class: Ref<Class<T>>,
attributes: Data<T>,
id?: Ref<T>,
modifiedBy?: PersonId
): TxCreateDoc<Doc> {
const result = txFactory.createTxCreateDoc(_class, core.space.Model, attributes, id)
if (modifiedBy !== undefined) {
result.modifiedBy = modifiedBy
}
return result
}
/**
* @public
*/
export interface TestMixin extends Doc {
arr: Arr<string>
}
/**
* @public
*/
export interface AttachedComment extends AttachedDoc {
message: string
}
export interface TestProject extends Space {
prjName: string
}
export interface TestDocument extends Doc {
title: string
description: string
}
/**
* @public
*/
export const test = plugin('test' as Plugin, {
mixin: {
TestMixin: '' as Ref<Mixin<TestMixin>>
},
class: {
TestComment: '' as Ref<Class<AttachedComment>>,
TestProject: '' as Ref<Class<TestProject>>,
TestDocument: '' as Ref<Class<TestDocument>>
}
})
const DOMAIN_TEST: Domain = 'test' as Domain
/**
* @public
* Generate minimal model for testing purposes.
* @returns R
*/
export function genMinModel (): Tx[] {
const txes = buildModel().getTxes()
// Fill Tx'es with basic model classes.
txes.push(
createClass(test.mixin.TestMixin, {
label: 'TestMixin' as IntlString,
extends: core.class.Doc,
kind: ClassifierKind.MIXIN
})
)
txes.push(
createClass(test.class.TestProject, {
label: 'TestProject' as IntlString,
extends: core.class.Space,
kind: ClassifierKind.CLASS,
domain: DOMAIN_TEST
})
)
txes.push(
createClass(test.class.TestComment, {
label: 'TestComment' as IntlString,
extends: core.class.AttachedDoc,
kind: ClassifierKind.CLASS,
domain: DOMAIN_TEST
})
)
txes.push(
...[
createClass(test.class.TestDocument, {
label: 'TestDocument' as IntlString,
extends: core.class.Doc,
kind: ClassifierKind.CLASS,
domain: DOMAIN_TEST
}),
createAttribute(test.class.TestDocument, {
name: 'title',
type: core.class.TypeString,
index: IndexKind.FullText
}),
createAttribute(test.class.TestDocument, {
name: 'description',
type: core.class.TypeString,
index: IndexKind.FullText
})
]
)
const u1 = 'User1' as AccountUuid
const u2 = 'User2' as AccountUuid
// TODO: fixme!
txes.push(
// createDoc(core.class.Account, { email: 'user1@site.com', role: AccountRole.User }, u1),
// createDoc(core.class.Account, { email: 'user2@site.com', role: AccountRole.User }, u2),
createDoc(core.class.Space, {
name: 'Sp1',
description: '',
private: false,
members: [u1, u2],
archived: false
})
)
txes.push(
createDoc(core.class.Space, {
name: 'Sp2',
description: '',
private: false,
members: [u1],
archived: false
})
)
return txes
}

View File

@ -0,0 +1,143 @@
/* eslint-disable @typescript-eslint/unbound-method */
import {
Hierarchy,
ModelDb,
systemAccountUuid,
type MeasureMetricsContext,
type WorkspaceIds,
type WorkspaceUuid
} from '@hcengineering/core'
import {
BroadcastMiddleware,
DBAdapterInitMiddleware,
DBAdapterMiddleware,
DomainFindMiddleware,
DomainTxMiddleware,
FullTextMiddleware,
LowLevelMiddleware,
ModelMiddleware,
QueryJoinMiddleware,
QueueMiddleware,
TxMiddleware
} from '@hcengineering/middleware'
import {
createDummyStorageAdapter,
createPipeline,
type MiddlewareCreator,
type Pipeline,
type PipelineContext,
type PlatformQueue
} from '@hcengineering/server-core'
import {
getConfig,
registerAdapterFactory,
registerDestroyFactory,
registerServerPlugins,
registerStringLoaders,
registerTxAdapterFactory,
setAdapterSecurity
} from '@hcengineering/server-pipeline'
import serverToken, { generateToken } from '@hcengineering/server-token'
import { randomUUID } from 'crypto'
/* eslint-disable @typescript-eslint/unbound-method */
import { setMetadata } from '@hcengineering/platform'
import {
createPostgreeDestroyAdapter,
createPostgresAdapter,
createPostgresTxAdapter,
setDBExtraOptions
} from '@hcengineering/postgres'
import serverClientPlugin from '@hcengineering/server-client'
import serverCore from '@hcengineering/server-core'
import { createElasticAdapter } from '@hcengineering/elastic'
import type { FulltextDBConfiguration } from '@hcengineering/server-indexer'
import { genMinModel } from './minmodel'
export const model = genMinModel()
export async function preparePipeline (
toolCtx: MeasureMetricsContext,
queue: PlatformQueue,
useBroadcast: boolean = true // If not passed wll not do broadcast so queue will not be triggered.
): Promise<{ pipeline: Pipeline, wsIds: WorkspaceIds }> {
const wsId: WorkspaceUuid = randomUUID().toString() as WorkspaceUuid
const wsIds: WorkspaceIds = {
uuid: wsId,
url: wsId
}
const storage = createDummyStorageAdapter()
const conf = getConfig(toolCtx, dbUrl, toolCtx, {
externalStorage: storage,
disableTriggers: true,
usePassedCtx: true
})
const middlewares: MiddlewareCreator[] = [
TxMiddleware.create, // Store tx into transaction domain
FullTextMiddleware.create('', generateToken(systemAccountUuid, wsIds.uuid)),
LowLevelMiddleware.create,
QueryJoinMiddleware.create,
DomainFindMiddleware.create,
DomainTxMiddleware.create,
QueueMiddleware.create(queue),
DBAdapterInitMiddleware.create,
ModelMiddleware.create(model),
DBAdapterMiddleware.create(conf), // Configure DB adapters
...(useBroadcast ? [BroadcastMiddleware.create((ctx, tx) => {})] : [])
]
const hierarchy = new Hierarchy()
const modelDb = new ModelDb(hierarchy)
const context: PipelineContext = {
workspace: wsIds,
branding: null,
modelDb,
hierarchy,
storageAdapter: storage,
contextVars: {},
communicationApi: null
}
const pipeline = await createPipeline(toolCtx, middlewares, context)
return { pipeline, wsIds }
}
export const fullTextDbURL = 'http://localhost:9201'
export const dbUrl = 'postgresql://root@localhost:26258/defaultdb?sslmode=disable'
export const elasticIndexName = 'testing'
export function prepare (): void {
setDBExtraOptions({
prepare: true // We override defaults
})
setMetadata(serverToken.metadata.Secret, 'secret')
setMetadata(serverCore.metadata.ElasticIndexName, elasticIndexName)
setMetadata(serverClientPlugin.metadata.Endpoint, 'http://localhost:3003')
registerTxAdapterFactory('postgresql', createPostgresTxAdapter, true)
registerAdapterFactory('postgresql', createPostgresAdapter, true)
registerDestroyFactory('postgresql', createPostgreeDestroyAdapter, true)
setAdapterSecurity('postgresql', true)
registerServerPlugins()
registerStringLoaders()
}
export const dbConfig: FulltextDBConfiguration = {
fulltextAdapter: {
factory: createElasticAdapter,
url: fullTextDbURL
},
contentAdapters: {
Rekoni: {
factory: async (url) => ({
content: async () => ''
}),
contentType: '*',
url: ''
}
},
defaultContentAdapter: 'Rekoni'
}

View File

@ -0,0 +1,245 @@
/* eslint-disable @typescript-eslint/unbound-method */
import type { Doc, MeasureContext, Space, Tx, TxCUD, WorkspaceInfoWithStatus, WorkspaceUuid } from '@hcengineering/core'
import { Hierarchy, systemAccountUuid } from '@hcengineering/core'
import { getAccountClient, getTransactorEndpoint } from '@hcengineering/server-client'
import {
createContentAdapter,
QueueTopic,
QueueWorkspaceEvent,
workspaceEvents,
type ConsumerControl,
type ConsumerHandle,
type ConsumerMessage,
type ContentTextAdapter,
type FullTextAdapter,
type FulltextListener,
type PlatformQueue,
type PlatformQueueProducer,
type QueueWorkspaceMessage,
type QueueWorkspaceReindexMessage,
type StorageAdapter
} from '@hcengineering/server-core'
import { type FulltextDBConfiguration } from '@hcengineering/server-indexer'
import { generateToken } from '@hcengineering/server-token'
import { WorkspaceIndexer } from './workspace'
const closeTimeout = 5 * 60 * 1000
export class WorkspaceManager {
indexers = new Map<string, WorkspaceIndexer | Promise<WorkspaceIndexer>>()
sysHierarchy = new Hierarchy()
workspaceConsumer?: ConsumerHandle
txConsumer?: ConsumerHandle
constructor (
readonly ctx: MeasureContext,
readonly model: Tx[],
private readonly opt: {
queue: PlatformQueue
dbURL: string
config: FulltextDBConfiguration
externalStorage: StorageAdapter
elasticIndexName: string
serverSecret: string
accountsUrl: string
listener?: FulltextListener
}
) {
for (const tx of model) {
this.sysHierarchy.tx(tx)
}
this.workspaceProducer = this.opt.queue.createProducer<QueueWorkspaceMessage>(this.ctx, QueueTopic.Workspace)
}
shutdownInterval: any
contentAdapter!: ContentTextAdapter
fulltextAdapter!: FullTextAdapter
workspaceProducer!: PlatformQueueProducer<QueueWorkspaceMessage>
txInformer: any
async startIndexer (): Promise<void> {
this.contentAdapter = await this.ctx.with('create content adapter', {}, (ctx) =>
createContentAdapter(this.opt.config.contentAdapters, this.opt.config.defaultContentAdapter)
)
this.fulltextAdapter = await this.opt.config.fulltextAdapter.factory(this.opt.config.fulltextAdapter.url)
await this.fulltextAdapter.initMapping(this.ctx)
this.shutdownInterval = setInterval(() => {
for (const [k, v] of [...this.indexers.entries()]) {
if (v instanceof Promise) {
continue
}
if (Date.now() - v.lastUpdate > closeTimeout) {
this.indexers.delete(k)
void v.close()
}
}
}, closeTimeout) // Every 5 minutes we should close unused indexes.
this.workspaceConsumer = this.opt.queue.createConsumer<QueueWorkspaceMessage>(
this.ctx,
QueueTopic.Workspace,
this.opt.queue.getClientId(),
async (msg, control) => {
await this.processWorkspaceEvent(msg, control)
}
)
let txMessages: number = 0
this.txConsumer = this.opt.queue.createConsumer<TxCUD<Doc>>(
this.ctx,
QueueTopic.Tx,
this.opt.queue.getClientId(),
async (msg, control) => {
clearTimeout(this.txInformer)
this.txInformer = setTimeout(() => {
this.ctx.info('tx message', { count: txMessages })
txMessages = 0
}, 5000)
txMessages += msg.length
await this.processDocuments(msg, control)
}
)
}
private async processDocuments (msg: ConsumerMessage<TxCUD<Doc<Space>>>[], control: ConsumerControl): Promise<void> {
for (const m of msg) {
const ws = m.id as WorkspaceUuid
const indexer = await this.getIndexer(this.ctx, ws, generateToken(systemAccountUuid, ws), true)
await indexer?.fulltext.processDocuments(this.ctx, m.value, control)
}
}
private async processWorkspaceEvent (
msg: ConsumerMessage<QueueWorkspaceMessage>[],
control: ConsumerControl
): Promise<void> {
for (const m of msg) {
this.ctx.info('workspace message', { message: m })
const ws = m.id as WorkspaceUuid
for (const mm of m.value) {
if (
mm.type === QueueWorkspaceEvent.Created ||
mm.type === QueueWorkspaceEvent.Restored ||
mm.type === QueueWorkspaceEvent.FullReindex
) {
const indexer = await this.getIndexer(this.ctx, ws, generateToken(systemAccountUuid, ws), true)
if (indexer !== undefined) {
await indexer.dropWorkspace() // TODO: Add heartbeat
const classes = await indexer.getIndexClassess()
await this.workspaceProducer.send(
ws,
classes.map((it) => workspaceEvents.reindex(it.domain, it.classes))
)
}
} else if (
mm.type === QueueWorkspaceEvent.Deleted ||
mm.type === QueueWorkspaceEvent.Archived ||
mm.type === QueueWorkspaceEvent.ClearIndex
) {
const token = generateToken(systemAccountUuid, ws)
const workspaceInfo = await this.getWorkspaceInfo(token)
if (workspaceInfo !== undefined) {
if (workspaceInfo.dataId != null) {
await this.fulltextAdapter.clean(this.ctx, workspaceInfo.dataId as unknown as WorkspaceUuid)
}
await this.fulltextAdapter.clean(this.ctx, workspaceInfo.uuid)
}
} else if (mm.type === QueueWorkspaceEvent.Reindex) {
const indexer = await this.getIndexer(this.ctx, ws, generateToken(systemAccountUuid, ws), true)
const mmd = mm as QueueWorkspaceReindexMessage
await indexer?.reindex(this.ctx, mmd.domain, mmd.classes, control)
}
}
}
}
public async getWorkspaceInfo (token?: string): Promise<WorkspaceInfoWithStatus | undefined> {
const accountClient = getAccountClient(token)
return await accountClient.getWorkspaceInfo(false)
}
async getTransactorAPIEndpoint (token: string): Promise<string | undefined> {
return (await getTransactorEndpoint(token, 'internal')).replace('wss://', 'https://').replace('ws://', 'http://')
}
async getIndexer (
ctx: MeasureContext,
workspace: WorkspaceUuid,
token: string | undefined,
create: boolean = false
): Promise<WorkspaceIndexer | undefined> {
let idx = this.indexers.get(workspace)
if (idx === undefined && create) {
const workspaceInfo = await this.getWorkspaceInfo(token)
if (workspaceInfo === undefined) {
ctx.error('Workspace not available for token')
return
}
ctx.warn('indexer created', { workspace })
idx = WorkspaceIndexer.create(
ctx,
this.model,
{
uuid: workspace,
dataId: workspaceInfo.dataId,
url: workspaceInfo.url
},
this.opt.dbURL,
this.opt.externalStorage,
this.fulltextAdapter,
this.contentAdapter,
(token) => this.getTransactorAPIEndpoint(token),
this.opt.listener
)
this.indexers.set(workspace, idx)
}
if (idx instanceof Promise) {
idx = await idx
this.indexers.set(workspace, idx)
}
return idx
}
async shutdown (deleteTopics: boolean = false): Promise<void> {
clearInterval(this.shutdownInterval)
clearTimeout(this.txInformer)
await this.txConsumer?.close()
await this.workspaceConsumer?.close()
await this.workspaceProducer.close()
for (const v of this.indexers.values()) {
if (v instanceof Promise) {
const d = await v
await d.close()
} else {
await v.close()
}
}
if (deleteTopics) {
await this.opt.queue.deleteTopics()
}
await this.fulltextAdapter.close()
}
async closeWorkspace (workspace: WorkspaceUuid): Promise<void> {
let idx = this.indexers.get(workspace)
this.indexers.delete(workspace)
if (idx !== undefined && idx instanceof Promise) {
idx = await idx
}
await idx?.close()
}
}

View File

@ -4,41 +4,19 @@ import type {
Class,
Doc,
DocumentQuery,
Domain,
IndexingUpdateEvent,
MeasureContext,
Ref,
SearchOptions,
SearchQuery,
Tx,
TxCUD,
TxWorkspaceEvent,
WorkspaceIds,
WorkspaceUuid
Tx
} from '@hcengineering/core'
import core, {
generateId,
Hierarchy,
ModelDb,
systemAccountUuid,
TxProcessor,
WorkspaceEvent
} from '@hcengineering/core'
import {
ContextNameMiddleware,
DBAdapterInitMiddleware,
DBAdapterMiddleware,
DomainFindMiddleware,
LowLevelMiddleware,
ModelMiddleware
} from '@hcengineering/middleware'
import {
createMongoAdapter,
createMongoDestroyAdapter,
createMongoTxAdapter,
shutdownMongo
} from '@hcengineering/mongo'
import { PlatformError, setMetadata, unknownError } from '@hcengineering/platform'
import { setMetadata } from '@hcengineering/platform'
import {
createPostgreeDestroyAdapter,
createPostgresAdapter,
@ -46,27 +24,10 @@ import {
setDBExtraOptions,
shutdownPostgres
} from '@hcengineering/postgres'
import serverClientPlugin, { getAccountClient, getTransactorEndpoint } from '@hcengineering/server-client'
import serverCore, {
createContentAdapter,
createPipeline,
QueueTopic,
QueueWorkspaceEvent,
workspaceEvents,
type ConsumerControl,
type ContentTextAdapter,
type FullTextAdapter,
type MiddlewareCreator,
type Pipeline,
type PipelineContext,
type PlatformQueue,
type QueueWorkspaceMessage,
type QueueWorkspaceReindexMessage,
type StorageAdapter
} from '@hcengineering/server-core'
import { FullTextIndexPipeline, searchFulltext, type FulltextDBConfiguration } from '@hcengineering/server-indexer'
import serverClientPlugin from '@hcengineering/server-client'
import serverCore, { workspaceEvents, type PlatformQueue, type StorageAdapter } from '@hcengineering/server-core'
import { searchFulltext, type FulltextDBConfiguration } from '@hcengineering/server-indexer'
import {
getConfig,
registerAdapterFactory,
registerDestroyFactory,
registerServerPlugins,
@ -75,148 +36,13 @@ import {
setAdapterSecurity,
sharedPipelineContextVars
} from '@hcengineering/server-pipeline'
import serverToken, { decodeToken, generateToken } from '@hcengineering/server-token'
import serverToken, { decodeToken } from '@hcengineering/server-token'
import cors from '@koa/cors'
import Koa from 'koa'
import bodyParser from 'koa-bodyparser'
import Router from 'koa-router'
function fulltextModelFilter (h: Hierarchy, model: Tx[]): Tx[] {
const allowedClasess: Ref<Class<Doc>>[] = [
core.class.Class,
core.class.Attribute,
core.class.Mixin,
core.class.Type,
core.class.Status,
core.class.Permission,
core.class.Space,
core.class.Tx,
core.class.FullTextSearchContext
]
return model.filter(
(it) =>
TxProcessor.isExtendsCUD(it._class) &&
allowedClasess.some((cl) => h.isDerived((it as TxCUD<Doc>).objectClass, cl))
)
}
class WorkspaceIndexer {
fulltext!: FullTextIndexPipeline
pipeline!: Pipeline
lastUpdate: number = Date.now()
constructor (readonly fulltextAdapter: FullTextAdapter) {}
static async create (
ctx: MeasureContext,
model: Tx[],
workspace: WorkspaceIds,
dbURL: string,
externalStorage: StorageAdapter,
ftadapter: FullTextAdapter,
contentAdapter: ContentTextAdapter
): Promise<WorkspaceIndexer> {
const result = new WorkspaceIndexer(ftadapter)
const dbConf = getConfig(ctx, dbURL, ctx, {
disableTriggers: true,
externalStorage
})
const middlewares: MiddlewareCreator[] = [
LowLevelMiddleware.create,
ContextNameMiddleware.create,
DomainFindMiddleware.create,
DBAdapterInitMiddleware.create,
ModelMiddleware.create(model, fulltextModelFilter), // TODO: Add filtration of only class structure and FullTextSearchContext
DBAdapterMiddleware.create(dbConf)
]
const hierarchy = new Hierarchy()
const modelDb = new ModelDb(hierarchy)
const context: PipelineContext = {
workspace,
branding: null,
modelDb,
hierarchy,
storageAdapter: externalStorage,
contextVars: {},
// TODO: Communication API ??
communicationApi: null
}
result.pipeline = await createPipeline(ctx, middlewares, context)
const defaultAdapter = result.pipeline.context.adapterManager?.getDefaultAdapter()
if (defaultAdapter === undefined) {
throw new PlatformError(unknownError('Default adapter should be set'))
}
const token = generateToken(systemAccountUuid, workspace.uuid)
const transactorEndpoint = (await getTransactorEndpoint(token, 'internal'))
.replace('wss://', 'https://')
.replace('ws://', 'http://')
result.fulltext = new FullTextIndexPipeline(
ftadapter,
defaultAdapter,
hierarchy,
workspace,
ctx,
modelDb,
externalStorage,
contentAdapter,
(ctx: MeasureContext, classes: Ref<Class<Doc>>[]) => {
ctx.info('broadcast indexing update', { classes, workspace })
const evt: IndexingUpdateEvent = {
_class: classes
}
const tx: TxWorkspaceEvent = {
_class: core.class.TxWorkspaceEvent,
_id: generateId(),
event: WorkspaceEvent.IndexingUpdate,
modifiedBy: core.account.System,
modifiedOn: Date.now(),
objectSpace: core.space.DerivedTx,
space: core.space.DerivedTx,
params: evt
}
// Send tx to pipeline
// TODO: Fix me
void fetch(transactorEndpoint + `/api/v1/broadcast?workspace=${workspace.uuid}`, {
method: 'PUT',
keepalive: true,
headers: {
Authorization: `Bearer ${token}`
},
body: JSON.stringify(tx)
})
}
)
return result
}
async reindex (
ctx: MeasureContext,
domain: Domain,
classes: Ref<Class<Doc>>[],
control?: ConsumerControl
): Promise<void> {
await this.fulltext.reindex(ctx, domain, classes, control)
}
async dropWorkspace (): Promise<void> {
await this.fulltext.dropWorkspace()
}
async getIndexClassess (): Promise<{ domain: Domain, classes: Ref<Class<Doc>>[] }[]> {
return await this.fulltext.getIndexClassess()
}
async close (): Promise<void> {
await this.pipeline.close()
}
}
import { WorkspaceManager } from './manager'
interface IndexDocuments {
token: string
@ -267,8 +93,6 @@ export async function startIndexer (
accountsUrl: string
}
): Promise<() => void> {
const closeTimeout = 5 * 60 * 1000
const usePrepare = (process.env.DB_PREPARE ?? 'true') === 'true'
setDBExtraOptions({
@ -291,145 +115,11 @@ export async function startIndexer (
registerServerPlugins()
registerStringLoaders()
const sysHierarchy = new Hierarchy()
for (const tx of opt.model) {
sysHierarchy.tx(tx)
}
const app = new Koa()
const router = new Router()
const indexers = new Map<string, WorkspaceIndexer | Promise<WorkspaceIndexer>>()
const contentAdapter = await ctx.with('create content adapter', {}, (ctx) =>
createContentAdapter(opt.config.contentAdapters, opt.config.defaultContentAdapter)
)
const fulltextAdapter = await opt.config.fulltextAdapter.factory(opt.config.fulltextAdapter.url)
await fulltextAdapter.initMapping(ctx)
const shutdownInterval = setInterval(() => {
for (const [k, v] of [...indexers.entries()]) {
if (v instanceof Promise) {
continue
}
if (Date.now() - v.lastUpdate > closeTimeout) {
indexers.delete(k)
void v.close()
}
}
}, closeTimeout) // Every 5 minutes we should close unused indexes.
const workspaceProducer = opt.queue.createProducer<QueueWorkspaceMessage>(ctx, QueueTopic.Workspace)
const workspaceConsumer = opt.queue.createConsumer<QueueWorkspaceMessage>(
ctx,
QueueTopic.Workspace,
opt.queue.getClientId(),
async (msg, control) => {
for (const m of msg) {
ctx.info('workspace message', { message: m })
const ws = m.id as WorkspaceUuid
for (const mm of m.value) {
if (
mm.type === QueueWorkspaceEvent.Created ||
mm.type === QueueWorkspaceEvent.Restored ||
mm.type === QueueWorkspaceEvent.FullReindex
) {
const indexer = await getIndexer(ctx, ws, generateToken(systemAccountUuid, ws), true)
if (indexer !== undefined) {
await indexer.dropWorkspace() // TODO: Add heartbeat
const classes = await indexer.getIndexClassess()
await workspaceProducer.send(
ws,
classes.map((it) => workspaceEvents.reindex(it.domain, it.classes))
)
}
} else if (
mm.type === QueueWorkspaceEvent.Deleted ||
mm.type === QueueWorkspaceEvent.Archived ||
mm.type === QueueWorkspaceEvent.ClearIndex
) {
const token = generateToken(systemAccountUuid, ws)
const accountClient = getAccountClient(token)
const workspaceInfo = await accountClient.getWorkspaceInfo(false)
if (workspaceInfo !== undefined) {
if (workspaceInfo.dataId != null) {
await fulltextAdapter.clean(ctx, workspaceInfo.dataId as unknown as WorkspaceUuid)
}
await fulltextAdapter.clean(ctx, workspaceInfo.uuid)
}
} else if (mm.type === QueueWorkspaceEvent.Reindex) {
const indexer = await getIndexer(ctx, ws, generateToken(systemAccountUuid, ws), true)
const mmd = mm as QueueWorkspaceReindexMessage
await indexer?.reindex(ctx, mmd.domain, mmd.classes, control)
}
}
}
}
)
let txInformer: any
let txMessages: number = 0
const txConsumer = opt.queue.createConsumer<TxCUD<Doc>>(
ctx,
QueueTopic.Tx,
opt.queue.getClientId(),
async (msg, control) => {
clearTimeout(txInformer)
txInformer = setTimeout(() => {
ctx.info('tx message', { count: txMessages })
txMessages = 0
}, 5000)
txMessages += msg.length
for (const m of msg) {
const ws = m.id as WorkspaceUuid
const indexer = await getIndexer(ctx, ws, generateToken(systemAccountUuid, ws), true)
await indexer?.fulltext.processDocuments(ctx, m.value, control)
}
}
)
async function getIndexer (
ctx: MeasureContext,
workspace: WorkspaceUuid,
token: string | undefined,
create: boolean = false
): Promise<WorkspaceIndexer | undefined> {
let idx = indexers.get(workspace)
if (idx === undefined && create) {
const accountClient = getAccountClient(token)
const workspaceInfo = await accountClient.getWorkspaceInfo(false)
if (workspaceInfo === undefined) {
ctx.error('Workspace not available for token')
return
}
ctx.warn('indexer created', { workspace })
idx = WorkspaceIndexer.create(
ctx,
opt.model,
{
uuid: workspace,
dataId: workspaceInfo.dataId,
url: workspaceInfo.url
},
opt.dbURL,
opt.externalStorage,
fulltextAdapter,
contentAdapter
)
indexers.set(workspace, idx)
}
if (idx instanceof Promise) {
idx = await idx
indexers.set(workspace, idx)
}
return idx
}
const manager = new WorkspaceManager(ctx, opt.model, { ...opt })
await manager.startIndexer()
app.use(
cors({
credentials: true
@ -446,7 +136,7 @@ export async function startIndexer (
ctx.info('search', { classes: request._classes, query: request.query, workspace: decoded.workspace })
await ctx.with('search', {}, async (ctx) => {
const docs = await ctx.with('search', { workspace: decoded.workspace }, (ctx) =>
fulltextAdapter.search(ctx, decoded.workspace, request._classes, request.query, request.fullTextLimit)
manager.fulltextAdapter.search(ctx, decoded.workspace, request._classes, request.query, request.fullTextLimit)
)
req.body = docs
})
@ -466,7 +156,14 @@ export async function startIndexer (
ctx.info('fulltext-search', { ...request.query, workspace: decoded.workspace })
await ctx.with('full-text-search', {}, async (ctx) => {
const result = await ctx.with('searchFulltext', {}, (ctx) =>
searchFulltext(ctx, decoded.workspace, sysHierarchy, fulltextAdapter, request.query, request.options)
searchFulltext(
ctx,
decoded.workspace,
manager.sysHierarchy,
manager.fulltextAdapter,
request.query,
request.options
)
)
req.body = result
})
@ -486,15 +183,7 @@ export async function startIndexer (
req.body = {}
ctx.info('close', { workspace: decoded.workspace })
const idx = indexers.get(decoded.workspace)
indexers.delete(decoded.workspace)
if (idx !== undefined && idx instanceof Promise) {
void idx.then((res) => {
void res.close()
})
} else if (idx !== undefined) {
void idx.close()
}
await manager.closeWorkspace(decoded.workspace)
} catch (err: any) {
Analytics.handleError(err)
console.error(err)
@ -509,7 +198,7 @@ export async function startIndexer (
const token = request.token ?? req.headers.authorization?.split(' ')[1]
const decoded = decodeToken(token) // Just to be safe
const indexer = await getIndexer(ctx, decoded.workspace, token)
const indexer = await manager.getIndexer(ctx, decoded.workspace, token)
if (indexer !== undefined) {
indexer.lastUpdate = Date.now()
// TODO: Fixme
@ -532,13 +221,13 @@ export async function startIndexer (
req.body = {}
ctx.info('reindex', { workspace: decoded.workspace })
const indexer = await getIndexer(ctx, decoded.workspace, token, true)
const indexer = await manager.getIndexer(ctx, decoded.workspace, token, true)
if (indexer !== undefined) {
indexer.lastUpdate = Date.now()
if (request?.onlyDrop ?? false) {
await workspaceProducer.send(decoded.workspace, [workspaceEvents.clearIndex()])
await manager.workspaceProducer.send(decoded.workspace, [workspaceEvents.clearIndex()])
} else {
await workspaceProducer.send(decoded.workspace, [workspaceEvents.fullReindex()])
await manager.workspaceProducer.send(decoded.workspace, [workspaceEvents.fullReindex()])
}
}
} catch (err: any) {
@ -556,10 +245,8 @@ export async function startIndexer (
})
const close = (): void => {
void txConsumer.close()
void workspaceConsumer.close()
void workspaceProducer.close()
clearInterval(shutdownInterval)
void manager.shutdown()
void opt.queue.shutdown()
server.close()
}

View File

@ -0,0 +1,29 @@
/* eslint-disable @typescript-eslint/unbound-method */
import core, {
type Class,
type Doc,
type Hierarchy,
type Ref,
type Tx,
type TxCUD,
TxProcessor
} from '@hcengineering/core'
export function fulltextModelFilter (h: Hierarchy, model: Tx[]): Tx[] {
const allowedClasess: Ref<Class<Doc>>[] = [
core.class.Class,
core.class.Attribute,
core.class.Mixin,
core.class.Type,
core.class.Status,
core.class.Permission,
core.class.Space,
core.class.Tx,
core.class.FullTextSearchContext
]
return model.filter(
(it) =>
TxProcessor.isExtendsCUD(it._class) &&
allowedClasess.some((cl) => h.isDerived((it as TxCUD<Doc>).objectClass, cl))
)
}

View File

@ -0,0 +1,165 @@
/* eslint-disable @typescript-eslint/unbound-method */
import core, {
type Class,
type Doc,
type Domain,
generateId,
Hierarchy,
type IndexingUpdateEvent,
type MeasureContext,
ModelDb,
type Ref,
systemAccountUuid,
type Tx,
type TxWorkspaceEvent,
WorkspaceEvent,
type WorkspaceIds
} from '@hcengineering/core'
import {
ContextNameMiddleware,
DBAdapterInitMiddleware,
DBAdapterMiddleware,
DomainFindMiddleware,
LowLevelMiddleware,
ModelMiddleware
} from '@hcengineering/middleware'
import { PlatformError, unknownError } from '@hcengineering/platform'
import {
type ConsumerControl,
type ContentTextAdapter,
createPipeline,
type FullTextAdapter,
type FulltextListener,
type MiddlewareCreator,
type Pipeline,
type PipelineContext,
type StorageAdapter
} from '@hcengineering/server-core'
import { FullTextIndexPipeline } from '@hcengineering/server-indexer'
import { getConfig } from '@hcengineering/server-pipeline'
import { generateToken } from '@hcengineering/server-token'
import { fulltextModelFilter } from './utils'
export class WorkspaceIndexer {
fulltext!: FullTextIndexPipeline
pipeline!: Pipeline
lastUpdate: number = Date.now()
constructor (readonly fulltextAdapter: FullTextAdapter) {}
static async create (
ctx: MeasureContext,
model: Tx[],
workspace: WorkspaceIds,
dbURL: string,
externalStorage: StorageAdapter,
ftadapter: FullTextAdapter,
contentAdapter: ContentTextAdapter,
endpointProvider: (token: string) => Promise<string | undefined>,
listener?: FulltextListener
): Promise<WorkspaceIndexer> {
const result = new WorkspaceIndexer(ftadapter)
const dbConf = getConfig(ctx, dbURL, ctx, {
disableTriggers: true,
externalStorage
})
const middlewares: MiddlewareCreator[] = [
LowLevelMiddleware.create,
ContextNameMiddleware.create,
DomainFindMiddleware.create,
DBAdapterInitMiddleware.create,
ModelMiddleware.create(model, fulltextModelFilter), // TODO: Add filtration of only class structure and FullTextSearchContext
DBAdapterMiddleware.create(dbConf)
]
const hierarchy = new Hierarchy()
const modelDb = new ModelDb(hierarchy)
const context: PipelineContext = {
workspace,
branding: null,
modelDb,
hierarchy,
storageAdapter: externalStorage,
contextVars: {},
// TODO: Communication API ??
communicationApi: null
}
result.pipeline = await createPipeline(ctx, middlewares, context)
const defaultAdapter = result.pipeline.context.adapterManager?.getDefaultAdapter()
if (defaultAdapter === undefined) {
throw new PlatformError(unknownError('Default adapter should be set'))
}
const token = generateToken(systemAccountUuid, workspace.uuid)
const transactorEndpoint = await endpointProvider(token)
result.fulltext = new FullTextIndexPipeline(
ftadapter,
defaultAdapter,
hierarchy,
workspace,
ctx,
modelDb,
externalStorage,
contentAdapter,
(ctx: MeasureContext, classes: Ref<Class<Doc>>[]) => {
ctx.info('broadcast indexing update', { classes, workspace })
const evt: IndexingUpdateEvent = {
_class: classes
}
const tx: TxWorkspaceEvent = {
_class: core.class.TxWorkspaceEvent,
_id: generateId(),
event: WorkspaceEvent.IndexingUpdate,
modifiedBy: core.account.System,
modifiedOn: Date.now(),
objectSpace: core.space.DerivedTx,
space: core.space.DerivedTx,
params: evt
}
// Send tx to pipeline
if (transactorEndpoint !== undefined) {
void fetch(transactorEndpoint + `/api/v1/broadcast?workspace=${workspace.uuid}`, {
method: 'PUT',
keepalive: true,
headers: {
Authorization: `Bearer ${token}`
},
body: JSON.stringify(tx)
}).catch((err) => {
ctx.error('failed to send broadcast', { err })
})
}
},
listener
)
return result
}
async reindex (
ctx: MeasureContext,
domain: Domain,
classes: Ref<Class<Doc>>[],
control?: ConsumerControl
): Promise<void> {
await this.fulltext.reindex(ctx, domain, classes, control)
}
async dropWorkspace (): Promise<void> {
await this.fulltext.dropWorkspace()
}
async getIndexClassess (): Promise<{ domain: Domain, classes: Ref<Class<Doc>>[] }[]> {
return await this.fulltext.getIndexClassess()
}
async close (): Promise<void> {
this.fulltext.cancel()
await this.pipeline.close()
}
}

View File

@ -135,6 +135,7 @@ getUsers = () => {
const close = (): void => {
console.trace('Exiting from server')
console.log('Shutdown request accepted')
void queue.shutdown()
void shutdown().then(() => {
process.exit(0)
})

View File

@ -22,7 +22,7 @@ class DummyQueueProducer<T> implements PlatformQueueProducer<T> {
* A dummy implementation of PlatformQueue for testing and development
*/
export class DummyQueue implements PlatformQueue {
createProducer<T>(ctx: MeasureContext, topic: QueueTopic): PlatformQueueProducer<T> {
createProducer<T>(ctx: MeasureContext, topic: QueueTopic | string): PlatformQueueProducer<T> {
return new DummyQueueProducer<T>()
}
@ -30,9 +30,11 @@ export class DummyQueue implements PlatformQueue {
return 'dummy'
}
async shutdown (): Promise<void> {}
createConsumer<T>(
ctx: MeasureContext,
topic: QueueTopic,
topic: QueueTopic | string,
groupId: string,
onMessage: (
msg: { id: WorkspaceUuid | string, value: T }[],
@ -59,6 +61,8 @@ export class DummyQueue implements PlatformQueue {
async createTopics (tx: number): Promise<void> {
await Promise.resolve()
}
async deleteTopics (topics?: (QueueTopic | string)[]): Promise<void> {}
}
/**

View File

@ -30,7 +30,7 @@ export interface ConsumerControl {
}
export interface PlatformQueue {
createProducer: <T>(ctx: MeasureContext, topic: QueueTopic) => PlatformQueueProducer<T>
createProducer: <T>(ctx: MeasureContext, topic: QueueTopic | string) => PlatformQueueProducer<T>
/**
* Create a consumer for a topic.
@ -38,7 +38,7 @@ export interface PlatformQueue {
*/
createConsumer: <T>(
ctx: MeasureContext,
topic: QueueTopic,
topic: QueueTopic | string,
groupId: string,
onMessage: (msg: ConsumerMessage<T>[], queue: ConsumerControl) => Promise<void>,
options?: {
@ -47,7 +47,13 @@ export interface PlatformQueue {
) => ConsumerHandle
createTopics: (tx: number) => Promise<void>
// If not passed will delete all topica from QueueTopic enum
deleteTopics: (topics?: (QueueTopic | string)[]) => Promise<void>
getClientId: () => string
// Will close all producers and consumers
shutdown: () => Promise<void>
}
/**

View File

@ -352,6 +352,11 @@ export interface SearchStringResult {
total?: number
}
export interface FulltextListener {
onIndexing?: (doc: IndexedDoc) => Promise<void>
onClean?: (doc: Ref<Doc>[]) => Promise<void>
}
/**
* @public
*/

View File

@ -3,8 +3,9 @@ import core, {
WorkspaceEvent,
generateId,
getTypeOf,
type WorkspaceIds,
systemAccount,
type Account,
type AccountUuid,
type BackupClient,
type Branding,
type BrandingMap,
@ -20,15 +21,14 @@ import core, {
type FindResult,
type MeasureContext,
type ModelDb,
type PersonId,
type Ref,
type SearchResult,
type SessionData,
type Tx,
type TxResult,
type TxWorkspaceEvent,
type PersonId,
systemAccount,
type AccountUuid
type WorkspaceIds
} from '@hcengineering/core'
import { PlatformError, unknownError } from '@hcengineering/platform'
import { createHash, type Hash } from 'crypto'
@ -232,7 +232,12 @@ export function loadBrandingMap (brandingPath?: string): BrandingMap {
return brandings
}
export function wrapPipeline (ctx: MeasureContext, pipeline: Pipeline, wsIds: WorkspaceIds): Client & BackupClient {
export function wrapPipeline (
ctx: MeasureContext,
pipeline: Pipeline,
wsIds: WorkspaceIds,
doBroadcast: boolean = false
): Client & BackupClient {
const contextData = new SessionDataImpl(
systemAccount,
'pipeline',
@ -267,7 +272,13 @@ export function wrapPipeline (ctx: MeasureContext, pipeline: Pipeline, wsIds: Wo
upload: (domain, docs) => backupOps.upload(ctx, domain, docs),
searchFulltext: async (query, options) => ({ docs: [], total: 0 }),
sendForceClose: async () => {},
tx: (tx) => pipeline.tx(ctx, [tx]),
tx: async (tx) => {
const result = await pipeline.tx(ctx, [tx])
if (doBroadcast) {
await pipeline.handleBroadcast(ctx)
}
return result
},
notify: (...tx) => {}
}
}

View File

@ -21,7 +21,6 @@ import core, {
type AttachedDoc,
type Blob,
type Class,
DOMAIN_MIGRATION,
DOMAIN_MODEL,
type Doc,
type Domain,
@ -29,7 +28,6 @@ import core, {
type Hierarchy,
type IdMap,
type MeasureContext,
type MigrationState,
type ModelDb,
type Ref,
type Space,
@ -37,9 +35,7 @@ import core, {
TxProcessor,
type WorkspaceIds,
type WorkspaceUuid,
coreId,
docKey,
generateId,
getFullTextIndexableAttributes,
groupByArray,
isClassIndexable,
@ -56,6 +52,7 @@ import type {
ContentTextAdapter,
DbAdapter,
FullTextAdapter,
FulltextListener,
IndexedDoc,
StorageAdapter
} from '@hcengineering/server-core'
@ -179,7 +176,8 @@ export class FullTextIndexPipeline implements FullTextPipeline {
readonly model: ModelDb,
readonly storageAdapter: StorageAdapter,
readonly contentAdapter: ContentTextAdapter,
readonly broadcastUpdate: (ctx: MeasureContext, classes: Ref<Class<Doc>>[]) => void
readonly broadcastUpdate: (ctx: MeasureContext, classes: Ref<Class<Doc>>[]) => void,
readonly listener?: FulltextListener
) {
this.contexts = new Map(model.findAllSync(core.class.FullTextSearchContext, {}).map((it) => [it.toClass, it]))
}
@ -277,27 +275,21 @@ export class FullTextIndexPipeline implements FullTextPipeline {
broadcastClasses = new Set<Ref<Class<Doc>>>()
broadcasts: number = 0
private async addMigration (ctx: MeasureContext, state: string): Promise<void> {
const mstate: MigrationState = {
_class: core.class.MigrationState,
_id: generateId(),
modifiedBy: core.account.System,
modifiedOn: Date.now(),
space: core.space.Configuration,
plugin: coreId,
state
}
await this.storage.upload(ctx, DOMAIN_MIGRATION, [mstate])
cancel (): void {
this.cancelling = true
clearTimeout(this.broadCastTimeout)
}
broadCastTimeout: any
scheduleBroadcast (): void {
if (!this.cancelling) {
// We need to send index update event
if (this.broadcasts === 0) {
this.broadcasts++
setTimeout(() => {
this.broadCastTimeout = setTimeout(() => {
this.broadcasts = 0
if (this.broadcastClasses.size > 0) {
if (this.broadcastClasses.size > 0 && !this.cancelling) {
const toSend = Array.from(this.broadcastClasses.values())
this.broadcastClasses.clear()
this.broadcastUpdate(this.metrics, toSend)
@ -454,6 +446,9 @@ export class FullTextIndexPipeline implements FullTextPipeline {
indexedDoc.id = doc._id
indexedDoc.space = doc.space
if (this.listener?.onIndexing !== undefined) {
await this.listener.onIndexing(indexedDoc)
}
await pushQueue.push(indexedDoc)
} catch (err: any) {
ctx.error('failed to process document', {
@ -508,11 +503,11 @@ export class FullTextIndexPipeline implements FullTextPipeline {
for (const _cl of new Set(toRemove.values().map((it) => it._class))) {
this.broadcastClasses.add(_cl)
}
await this.fulltextAdapter.remove(
ctx,
this.workspace.uuid,
toRemove.map((it) => it._id)
)
const ids = toRemove.map((it) => it._id)
if (this.listener?.onClean !== undefined) {
await this.listener.onClean(ids)
}
await this.fulltextAdapter.remove(ctx, this.workspace.uuid, ids)
}
} catch (err: any) {
Analytics.handleError(err)

View File

@ -0,0 +1,66 @@
import { generateId, MeasureMetricsContext } from '@hcengineering/core'
import { createPlatformQueue, parseQueueConfig } from '..'
jest.setTimeout(120000)
const testCtx = new MeasureMetricsContext('test', {})
describe('queue', () => {
it('check-queue', async () => {
const genId = generateId()
const queue = createPlatformQueue(parseQueueConfig('localhost:19093;-queue_testing-' + genId, 'test-' + genId, ''))
const docsCount = 100
try {
let msgCount = 0
const p1 = new Promise<void>((resolve, reject) => {
const to = setTimeout(() => {
reject(new Error(`Timeout waiting for messages:${msgCount}`))
}, 100000)
queue.createConsumer<string>(testCtx, 'qtest', genId, async (msg) => {
msgCount += msg.length
console.log('msgCount', msgCount)
if (msgCount === docsCount) {
clearTimeout(to)
resolve()
}
})
})
const producer = queue.createProducer<string>(testCtx, 'qtest')
for (let i = 0; i < docsCount; i++) {
await producer.send(genId, ['msg' + i])
}
await p1
} catch (err: any) {
console.log(err)
} finally {
await queue.shutdown()
await queue.deleteTopics(['test'])
}
})
it('check-processing-errors', async () => {
const genId = generateId()
const queue = createPlatformQueue(parseQueueConfig('localhost:19093;-queue_testing-' + genId, 'test-' + genId, ''))
try {
let counter = 2
const p = new Promise<void>((resolve, reject) => {
queue.createConsumer<string>(testCtx, 'test', genId, async (msg) => {
counter--
if (counter > 0) {
throw new Error('Processing Error')
}
resolve()
})
})
const producer = queue.createProducer<string>(testCtx, 'test')
await producer.send(genId, ['msg'])
await p
} finally {
await queue.shutdown()
await queue.deleteTopics(['test'])
}
})
})

View File

@ -48,7 +48,7 @@ export function parseQueueConfig (config: string, serviceId: string, region: str
}
}
function getKafkaTopicId (topic: QueueTopic, config: QueueConfig): string {
function getKafkaTopicId (topic: QueueTopic | string, config: QueueConfig): string {
if (config.region !== '') {
return `${config.region}.${topic}${config.postfix ?? ''}`
}
@ -56,6 +56,8 @@ function getKafkaTopicId (topic: QueueTopic, config: QueueConfig): string {
}
class PlatformQueueImpl implements PlatformQueue {
consumers: ConsumerHandle[] = []
producers: PlatformQueueProducerImpl[] = []
constructor (
private readonly kafka: Kafka,
readonly config: QueueConfig
@ -65,33 +67,84 @@ class PlatformQueueImpl implements PlatformQueue {
return this.config.clientId
}
createProducer<T>(ctx: MeasureContext, topic: QueueTopic): PlatformQueueProducer<T> {
return new PlatformQueueProducerImpl(ctx, this.kafka, getKafkaTopicId(topic, this.config), this)
async shutdown (): Promise<void> {
for (const p of this.producers) {
try {
await p.close()
} catch (err: any) {
console.error('failed to close producer', err)
}
}
for (const c of this.consumers) {
try {
await c.close()
} catch (err: any) {
console.error('failed to close consumer', err)
}
}
}
createProducer<T>(ctx: MeasureContext, topic: QueueTopic | string): PlatformQueueProducer<T> {
const result = new PlatformQueueProducerImpl(ctx, this.kafka, getKafkaTopicId(topic, this.config), this)
this.producers.push(result)
return result
}
createConsumer<T>(
ctx: MeasureContext,
topic: QueueTopic,
topic: QueueTopic | string,
groupId: string,
onMessage: (msg: ConsumerMessage<T>[], queue: ConsumerControl) => Promise<void>,
options?: {
fromBegining?: boolean
}
): ConsumerHandle {
return new PlatformQueueConsumerImpl(ctx, this.kafka, this.config, topic, groupId, onMessage, options)
const result = new PlatformQueueConsumerImpl(ctx, this.kafka, this.config, topic, groupId, onMessage, options)
this.consumers.push(result)
return result
}
async checkCreateTopic (topic: QueueTopic, topics: Set<string>, numPartitions?: number): Promise<void> {
const kTopic = getKafkaTopicId(topic, this.config)
if (!topics.has(kTopic)) {
try {
await this.kafka.admin().createTopics({ topics: [{ topic: kTopic, numPartitions: numPartitions ?? 1 }] })
} catch (err: any) {
console.error('Failed to create topic', kTopic, err)
}
}
}
async createTopics (tx: number): Promise<void> {
const topics = new Set(await this.kafka.admin({}).listTopics())
if (!topics.has(QueueTopic.Tx)) {
await this.kafka.admin().createTopics({
topics: [
{
topic: getKafkaTopicId(QueueTopic.Tx, this.config),
numPartitions: tx
}
]
})
await this.checkCreateTopic(QueueTopic.Tx, topics, tx)
await this.checkCreateTopic(QueueTopic.Fulltext, topics, 1)
await this.checkCreateTopic(QueueTopic.Workspace, topics, 1)
await this.checkCreateTopic(QueueTopic.Users, topics, 1)
}
async checkDeleteTopic (topic: QueueTopic | string, topics: Set<string>): Promise<void> {
const kTopic = getKafkaTopicId(topic, this.config)
if (topics.has(kTopic)) {
try {
await this.kafka.admin().deleteTopics({ topics: [kTopic] })
} catch (err: any) {
console.error('Failed to delete topic', kTopic, err)
}
}
}
async deleteTopics (topics?: (QueueTopic | string)[]): Promise<void> {
const existing = new Set(await this.kafka.admin({}).listTopics())
if (topics !== undefined) {
for (const t of topics) {
await this.checkDeleteTopic(t, existing)
}
} else {
await this.checkDeleteTopic(QueueTopic.Tx, existing)
await this.checkDeleteTopic(QueueTopic.Fulltext, existing)
await this.checkDeleteTopic(QueueTopic.Workspace, existing)
await this.checkDeleteTopic(QueueTopic.Users, existing)
}
}
}
@ -144,7 +197,7 @@ class PlatformQueueConsumerImpl implements ConsumerHandle {
readonly ctx: MeasureContext,
readonly kafka: Kafka,
readonly config: QueueConfig,
private readonly topic: QueueTopic,
private readonly topic: QueueTopic | string,
groupId: string,
private readonly onMessage: (msg: ConsumerMessage<any>[], queue: ConsumerControl) => Promise<void>,
private readonly options?: {
@ -164,18 +217,22 @@ class PlatformQueueConsumerImpl implements ConsumerHandle {
await this.cc.run({
eachMessage: async ({ topic, message, pause, heartbeat }) => {
await this.onMessage(
[
{
id: message.key?.toString() ?? '',
value: [JSON.parse(message.value?.toString() ?? '{}')]
const msgKey = message.key?.toString() ?? ''
const msgData = JSON.parse(message.value?.toString() ?? '{}')
let to = 1
while (true) {
try {
await this.onMessage([{ id: msgKey, value: [msgData] }], { heartbeat, pause })
break
} catch (err: any) {
this.ctx.error('failed to process message', { err, msgKey, msgData })
await heartbeat()
await new Promise((resolve) => setTimeout(resolve, to * 1000))
if (to < 10) {
to++
}
],
{
heartbeat,
pause
}
)
}
}
// , // TODO: Finish testinf
// eachBatch: async ({ batch, pause, heartbeat, resolveOffset }) => {

View File

@ -159,6 +159,7 @@ export function serveWorkspaceAccount (
const close = (): void => {
canceled = true
void wsProducer.close()
void queue.shutdown()
onClose?.()
}

View File

@ -2,7 +2,7 @@
docker compose -p sanity kill
docker compose -p sanity down --volumes
docker compose -f docker-compose.yaml -p sanity up elastic mongodb cockroach -d --force-recreate --renew-anon-volumes
docker compose -f docker-compose.yaml -p sanity up elastic mongodb cockroach redpanda redpanda_console -d --force-recreate --renew-anon-volumes
docker_exit=$?
if [ ${docker_exit} -eq 0 ]; then
echo "Container started successfully"