UBERF-9693: Allow to reindex from migration (#8345)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2025-03-25 23:09:40 +07:00 committed by GitHub
parent d40e6fe2cf
commit 163f6785c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 124 additions and 73 deletions

View File

@ -1 +1 @@
"0.7.42"
"0.7.48"

View File

@ -20,8 +20,8 @@
"docker:abuild": "docker build -t hardcoreeng/tool . --platform=linux/arm64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/tool",
"docker:staging": "../../common/scripts/docker_tag.sh hardcoreeng/tool staging",
"docker:push": "../../common/scripts/docker_tag.sh hardcoreeng/tool",
"run-local": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret FULLTEXT_URL=http://localhost:4700 ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost ACCOUNT_DB_URL=mongodb://localhost:27017 DB_URL=mongodb://localhost:27017 TELEGRAM_DATABASE=telegram-service REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) QUEUE_CONFIG='localhost:9092' node --expose-gc --max-old-space-size=18000 ./bundle/bundle.js",
"run-local-cr": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret FULLTEXT_URL=http://localhost:4702 ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3332 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost ACCOUNT_DB_URL=postgresql://root@huly.local:26257/defaultdb?sslmode=disable DB_URL=postgresql://root@huly.local:26257/defaultdb?sslmode=disable TELEGRAM_DATABASE=telegram-service REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --expose-gc --max-old-space-size=18000 $TOOL_OPT ./bundle/bundle.js",
"run-local": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret FULLTEXT_URL=http://localhost:4700 ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost ACCOUNT_DB_URL=mongodb://localhost:27017 DB_URL=mongodb://localhost:27017 TELEGRAM_DATABASE=telegram-service REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) QUEUE_CONFIG='localhost:19092' node --expose-gc --max-old-space-size=18000 ./bundle/bundle.js",
"run-local-cr": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret FULLTEXT_URL=http://localhost:4702 ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3332 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost ACCOUNT_DB_URL=postgresql://root@huly.local:26257/defaultdb?sslmode=disable DB_URL=postgresql://root@huly.local:26257/defaultdb?sslmode=disable TELEGRAM_DATABASE=telegram-service REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) QUEUE_CONFIG='localhost:19092' node --expose-gc --max-old-space-size=18000 $TOOL_OPT ./bundle/bundle.js",
"run-local-brk": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost ACCOUNT_DB_URL=mongodb://localhost:27017 DB_URL=mongodb://localhost:27017 TELEGRAM_DATABASE=telegram-service REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --inspect-brk --enable-source-maps --max-old-space-size=18000 ./bundle/bundle.js",
"run": "rush bundle --to @hcengineering/tool >/dev/null && cross-env node --max-old-space-size=8000 ./bundle/bundle.js",
"upgrade": "rushx run-local upgrade-workspace -- $1",

View File

@ -350,6 +350,11 @@ export function devTool (
const coreWsInfo = flattenStatus(wsInfo)
const accountClient = getAccountClient(getToolToken())
const wsProducer = getPlatformQueue('tool', cmd.region).createProducer<QueueWorkspaceMessage>(
toolCtx,
QueueTopic.Workspace
)
await createWorkspace(
measureCtx,
version,
@ -358,6 +363,7 @@ export function devTool (
txes,
migrateOperations,
accountClient,
wsProducer,
undefined,
true
)
@ -368,10 +374,6 @@ export function devTool (
progress: 100
})
const wsProducer = getPlatformQueue('tool', cmd.region).createProducer<QueueWorkspaceMessage>(
toolCtx,
QueueTopic.Workspace
)
await wsProducer.send(res.workspaceUuid, [workspaceEvents.created()])
await wsProducer.close()
@ -431,7 +433,10 @@ export function devTool (
const coreWsInfo = flattenStatus(wsInfo)
const measureCtx = new MeasureMetricsContext('upgrade-workspace', {})
const accountClient = getAccountClient(getToolToken(wsInfo.uuid))
const wsProducer = getPlatformQueue('tool', info.region).createProducer<QueueWorkspaceMessage>(
toolCtx,
QueueTopic.Workspace
)
await upgradeWorkspace(
measureCtx,
version,
@ -440,6 +445,7 @@ export function devTool (
accountClient,
coreWsInfo,
consoleModelLogger,
wsProducer,
async () => {},
cmd.force,
cmd.indexes,
@ -454,10 +460,7 @@ export function devTool (
})
console.log(metricsToString(measureCtx.metrics, 'upgrade', 60))
const wsProducer = getPlatformQueue('tool', info.region).createProducer<QueueWorkspaceMessage>(
toolCtx,
QueueTopic.Workspace
)
await wsProducer.send(info.uuid, [workspaceEvents.upgraded()])
await wsProducer.close()
console.log('upgrade-workspace done')

View File

@ -519,9 +519,7 @@ export function createModel (builder: Builder): void {
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
toClass: card.class.Card,
fullTextSummary: true,
forceIndex: true,
childProcessingAllowed: true,
propagate: []
forceIndex: true
})
builder.createDoc(

View File

@ -616,8 +616,7 @@ export function createModel (builder: Builder): void {
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
toClass: documents.class.Document,
fullTextSummary: true,
childProcessingAllowed: true
fullTextSummary: true
})
builder.mixin(documents.class.Document, core.class.Class, view.mixin.ClassFilters, {
@ -1045,9 +1044,7 @@ export function defineSearch (builder: Builder): void {
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
toClass: documents.class.DocumentMeta,
fullTextSummary: true,
childProcessingAllowed: true,
propagate: []
fullTextSummary: true
})
builder.createDoc(

View File

@ -260,8 +260,7 @@ export function createModel (builder: Builder): void {
})
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
toClass: core.class.Space,
childProcessingAllowed: false
toClass: core.class.Space
})
definePermissions(builder)

View File

@ -654,8 +654,7 @@ export function createModel (builder: Builder): void {
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
toClass: love.class.MeetingMinutes,
fullTextSummary: true,
forceIndex: true,
propagate: []
forceIndex: true
})
builder.createDoc(

View File

@ -168,6 +168,12 @@ export const loveOperation: MigrateOperation = {
{ status: MeetingStatus.Finished }
)
}
},
{
state: 'meeting-minutes-reindex-v1',
func: async (client) => {
await client.reindex(DOMAIN_MEETING_MINUTES, [love.class.MeetingMinutes])
}
}
])
},

View File

@ -16,7 +16,6 @@
import activity from '@hcengineering/activity'
import { AccountRole, SortingOrder, type Lookup, type Ref } from '@hcengineering/core'
import { type Builder } from '@hcengineering/model'
import attachment from '@hcengineering/model-attachment'
import calendar from '@hcengineering/model-calendar'
import chunter from '@hcengineering/model-chunter'
import contact from '@hcengineering/model-contact'
@ -1456,31 +1455,19 @@ export function createModel (builder: Builder): void {
// Allow to use fuzzy search for mixins
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
toClass: recruit.class.Vacancy,
fullTextSummary: true,
childProcessingAllowed: true,
propagate: []
fullTextSummary: true
})
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
toClass: recruit.mixin.Candidate,
fullTextSummary: true,
propagate: [recruit.class.Applicant],
childProcessingAllowed: true,
propagateClasses: [
tags.class.TagReference,
chunter.class.ChatMessage,
attachment.class.Attachment,
contact.class.Channel
]
fullTextSummary: true
})
// Allow to use fuzzy search for mixins
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
toClass: recruit.class.Applicant,
fullTextSummary: true,
forceIndex: true,
childProcessingAllowed: true,
propagate: []
forceIndex: true
})
builder.mixin(recruit.mixin.Candidate, core.class.Class, view.mixin.ObjectEditorFooter, {

View File

@ -181,8 +181,7 @@ export function createModel (builder: Builder): void {
)
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
toClass: telegram.class.Message,
childProcessingAllowed: true
toClass: telegram.class.Message
})
builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, {

View File

@ -646,13 +646,6 @@ export interface FullTextSearchContext extends Doc {
toClass: Ref<Class<Doc>>
fullTextSummary?: boolean
forceIndex?: boolean
// If defined, will propagate changes to child's with defined set of classes
propagate?: Ref<Class<Doc>>[]
// If defined, will propagate all document from child's based on class
propagateClasses?: Ref<Class<Doc>>[]
childProcessingAllowed?: boolean
}
/**

View File

@ -645,9 +645,7 @@ export function getFullTextContext (
ctx = {
toClass: objectClass,
fullTextSummary: false,
forceIndex: false,
propagate: [],
childProcessingAllowed: true
forceIndex: false
}
hierarchy.setClassifierProp(objectClass, ctxKey, ctx)
return ctx

View File

@ -116,6 +116,8 @@ export interface MigrationClient {
accountClient: AccountClient
wsIds: WorkspaceIds
reindex: (domain: Domain, classes: Ref<Class<Doc>>[]) => Promise<void>
}
/**

View File

@ -554,8 +554,9 @@ export async function startIndexer (
})
const close = (): void => {
void txConsumer.shutdown()
void workspaceConsumer.shutdown()
void txConsumer.close()
void workspaceConsumer.close()
void workspaceProducer.close()
clearInterval(shutdownInterval)
server.close()
}

View File

@ -12,6 +12,10 @@ class DummyQueueProducer<T> implements PlatformQueueProducer<T> {
async close (): Promise<void> {
await Promise.resolve()
}
getQueue (): PlatformQueue {
return new DummyQueue()
}
}
/**
@ -43,7 +47,7 @@ export class DummyQueue implements PlatformQueue {
}
): ConsumerHandle {
return {
shutdown: async (): Promise<void> => {
close: async (): Promise<void> => {
await Promise.resolve()
},
isConnected: (): boolean => {

View File

@ -15,7 +15,7 @@ export enum QueueTopic {
}
export interface ConsumerHandle {
shutdown: () => Promise<void>
close: () => Promise<void>
isConnected: () => boolean
}
@ -56,4 +56,6 @@ export interface PlatformQueue {
export interface PlatformQueueProducer<T> {
send: (id: WorkspaceUuid | string, msgs: T[]) => Promise<void>
close: () => Promise<void>
getQueue: () => PlatformQueue
}

View File

@ -66,7 +66,7 @@ class PlatformQueueImpl implements PlatformQueue {
}
createProducer<T>(ctx: MeasureContext, topic: QueueTopic): PlatformQueueProducer<T> {
return new PlatformQueueProducerImpl(ctx, this.kafka, getKafkaTopicId(topic, this.config))
return new PlatformQueueProducerImpl(ctx, this.kafka, getKafkaTopicId(topic, this.config), this)
}
createConsumer<T>(
@ -102,7 +102,8 @@ class PlatformQueueProducerImpl implements PlatformQueueProducer<any> {
constructor (
readonly ctx: MeasureContext,
kafka: Kafka,
private readonly topic: string
private readonly topic: string,
private readonly queue: PlatformQueue
) {
this.txProducer = kafka.producer({
allowAutoTopicCreation: true,
@ -111,6 +112,10 @@ class PlatformQueueProducerImpl implements PlatformQueueProducer<any> {
this.connected = this.ctx.with('connect-broker', {}, () => this.txProducer.connect())
}
getQueue (): PlatformQueue {
return this.queue
}
async send (id: WorkspaceUuid | string, msgs: any[]): Promise<void> {
if (this.connected !== undefined) {
await this.connected
@ -231,7 +236,7 @@ class PlatformQueueConsumerImpl implements ConsumerHandle {
return this.connected
}
shutdown (): Promise<void> {
close (): Promise<void> {
return this.cc.disconnect()
}
}

View File

@ -53,4 +53,8 @@ export class QueueMiddleware extends BaseMiddleware {
this.txProducer.send(this.context.workspace.uuid, ctx.contextData.broadcast.txes)
])
}
async close (): Promise<void> {
await this.txProducer.close()
}
}

View File

@ -986,6 +986,8 @@ export class TSessionManager implements SessionManager {
for (const w of this.workspaces) {
await this.closeAll(w[0], w[1], 1, 'shutdown')
}
await this.workspaceProducer.close()
await this.usersProducer.close()
}
private async performWorkspaceCloseCheck (workspace: Workspace, wsUuid: WorkspaceUuid): Promise<void> {

View File

@ -40,7 +40,14 @@ import core, {
type WithLookup
} from '@hcengineering/core'
import { consoleModelLogger, MigrateOperation, ModelLogger, tryMigrate, type MigrateMode } from '@hcengineering/model'
import { DomainIndexHelperImpl, Pipeline, StorageAdapter, type DbAdapter } from '@hcengineering/server-core'
import {
DomainIndexHelperImpl,
Pipeline,
StorageAdapter,
type DbAdapter,
type PlatformQueueProducer,
type QueueWorkspaceMessage
} from '@hcengineering/server-core'
import { InitScript, WorkspaceInitializer } from './initializer'
import toolPlugin from './plugin'
import { MigrateClientImpl } from './upgrade'
@ -246,6 +253,7 @@ export async function upgradeModel (
connection: Client,
storageAdapter: StorageAdapter,
accountClient: AccountClient,
queue: PlatformQueueProducer<QueueWorkspaceMessage>,
migrateOperations: [string, MigrateOperation][],
logger: ModelLogger = consoleModelLogger,
progress: (value: number) => Promise<void>,
@ -267,7 +275,8 @@ export async function upgradeModel (
logger,
storageAdapter,
accountClient,
wsIds
wsIds,
queue
)
await progress(0)
@ -300,7 +309,8 @@ export async function upgradeModel (
logger,
storageAdapter,
accountClient,
wsIds
wsIds,
queue
)
const upgradeIndexes = async (): Promise<void> => {
@ -387,12 +397,22 @@ async function prepareMigrationClient (
logger: ModelLogger,
storageAdapter: StorageAdapter,
accountClient: AccountClient,
wsIds: WorkspaceIds
wsIds: WorkspaceIds,
queue: PlatformQueueProducer<QueueWorkspaceMessage>
): Promise<{
migrateClient: MigrateClientImpl
migrateState: Map<string, Set<string>>
}> {
const migrateClient = new MigrateClientImpl(pipeline, hierarchy, model, logger, storageAdapter, accountClient, wsIds)
const migrateClient = new MigrateClientImpl(
pipeline,
hierarchy,
model,
logger,
storageAdapter,
accountClient,
wsIds,
queue
)
const states = await migrateClient.find<MigrationState>(DOMAIN_MIGRATION, { _class: core.class.MigrationState })
const sts = Array.from(groupByArray(states, (it) => it.plugin).entries())

View File

@ -9,10 +9,17 @@ import {
MeasureMetricsContext,
ModelDb,
Ref,
WorkspaceIds
WorkspaceIds,
type Class
} from '@hcengineering/core'
import { MigrateUpdate, MigrationClient, MigrationIterator, ModelLogger } from '@hcengineering/model'
import { Pipeline, StorageAdapter } from '@hcengineering/server-core'
import {
Pipeline,
StorageAdapter,
workspaceEvents,
type PlatformQueueProducer,
type QueueWorkspaceMessage
} from '@hcengineering/server-core'
import { AccountClient } from '@hcengineering/account-client'
/**
@ -28,7 +35,8 @@ export class MigrateClientImpl implements MigrationClient {
readonly logger: ModelLogger,
readonly storageAdapter: StorageAdapter,
readonly accountClient: AccountClient,
readonly wsIds: WorkspaceIds
readonly wsIds: WorkspaceIds,
readonly queue: PlatformQueueProducer<QueueWorkspaceMessage>
) {
if (this.pipeline.context.lowLevelStorage === undefined) {
throw new Error('lowLevelStorage is not defined')
@ -112,4 +120,8 @@ export class MigrateClientImpl implements MigrationClient {
async deleteMany<T extends Doc>(domain: Domain, query: DocumentQuery<T>): Promise<void> {
await this.lowLevel.rawDeleteMany(domain, query)
}
async reindex (domain: Domain, classes: Ref<Class<Doc>>[]): Promise<void> {
await this.queue.send(this.wsIds.uuid, [workspaceEvents.reindex(domain, classes)])
}
}

View File

@ -123,8 +123,9 @@ export function serveWorkspaceAccount (
let canceled = false
const wsProducer = queue.createProducer<QueueWorkspaceMessage>(measureCtx, QueueTopic.Workspace)
const worker = new WorkspaceWorker(
queue.createProducer<QueueWorkspaceMessage>(measureCtx, QueueTopic.Workspace),
wsProducer,
version,
txes,
migrateOperations,
@ -157,6 +158,7 @@ export function serveWorkspaceAccount (
const close = (): void => {
canceled = true
void wsProducer.close()
onClose?.()
}

View File

@ -262,6 +262,7 @@ export class WorkspaceWorker {
this.txes,
this.migrationOperation,
accountClient,
this.workspaceQueue,
handleWsEventWithRetry
)
} else {
@ -365,6 +366,7 @@ export class WorkspaceWorker {
accountClient,
ws,
logger,
this.workspaceQueue,
handleWsEventWithRetry,
opt.force
)

View File

@ -1,10 +1,11 @@
import { type AccountClient } from '@hcengineering/account-client'
import core, {
Hierarchy,
ModelDb,
systemAccount,
systemAccountUuid,
TxOperations,
versionToString,
systemAccount,
type Branding,
type Client,
type Data,
@ -16,12 +17,18 @@ import core, {
} from '@hcengineering/core'
import { consoleModelLogger, type MigrateMode, type MigrateOperation, type ModelLogger } from '@hcengineering/model'
import { getTransactorEndpoint } from '@hcengineering/server-client'
import { SessionDataImpl, wrapPipeline, type Pipeline, type StorageAdapter } from '@hcengineering/server-core'
import {
SessionDataImpl,
wrapPipeline,
type Pipeline,
type PlatformQueueProducer,
type QueueWorkspaceMessage,
type StorageAdapter
} from '@hcengineering/server-core'
import { getServerPipeline, getTxAdapterFactory, sharedPipelineContextVars } from '@hcengineering/server-pipeline'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import { generateToken } from '@hcengineering/server-token'
import { initializeWorkspace, initModel, prepareTools, updateModel, upgradeModel } from '@hcengineering/server-tool'
import { type AccountClient } from '@hcengineering/account-client'
/**
* @public
@ -34,6 +41,7 @@ export async function createWorkspace (
txes: Tx[],
migrationOperation: [string, MigrateOperation][],
accountClient: AccountClient,
queue: PlatformQueueProducer<QueueWorkspaceMessage>,
handleWsEvent?: (
event: 'ping' | 'create-started' | 'progress' | 'create-done',
version: Data<Version>,
@ -74,7 +82,7 @@ export async function createWorkspace (
const storageConfig = storageConfigFromEnv()
const storageAdapter = buildStorageFromConfig(storageConfig)
const pipeline = await getServerPipeline(ctx, txes, dbUrl, wsIds, storageAdapter)
const pipeline = await getServerPipeline(ctx, txes, dbUrl, wsIds, storageAdapter, { queue: queue.getQueue() })
try {
const txFactory = getTxAdapterFactory(ctx, dbUrl, wsIds, null, {
@ -146,6 +154,7 @@ export async function createWorkspace (
client,
storageAdapter,
accountClient,
queue,
ctxModellogger,
async (event, version, value) => {
ctx.info('upgrade workspace', { event, value })
@ -181,6 +190,7 @@ export async function upgradeWorkspace (
accountClient: AccountClient,
ws: WorkspaceInfoWithStatus,
logger: ModelLogger = consoleModelLogger,
queue: PlatformQueueProducer<QueueWorkspaceMessage>,
handleWsEvent?: (
event: 'upgrade-started' | 'progress' | 'upgrade-done' | 'ping',
version: Data<Version>,
@ -206,7 +216,10 @@ export async function upgradeWorkspace (
url: ws.url ?? '',
dataId: ws.dataId
},
storageAdapter
storageAdapter,
{
queue: queue.getQueue()
}
)
if (pipeline === undefined || storageAdapter === undefined) {
return
@ -228,6 +241,7 @@ export async function upgradeWorkspace (
wrapPipeline(ctx, pipeline, wsUrl),
storageAdapter,
accountClient,
queue,
logger,
handleWsEvent,
forceUpdate,
@ -254,6 +268,7 @@ export async function upgradeWorkspaceWith (
connection: Client,
storageAdapter: StorageAdapter,
accountClient: AccountClient,
queue: PlatformQueueProducer<QueueWorkspaceMessage>,
logger: ModelLogger = consoleModelLogger,
handleWsEvent?: (
event: 'upgrade-started' | 'progress' | 'upgrade-done' | 'ping',
@ -322,6 +337,7 @@ export async function upgradeWorkspaceWith (
connection,
storageAdapter,
accountClient,
queue,
migrationOperation,
logger,
async (value) => {