Fix workspace creation with index enabled (#7013)

This commit is contained in:
Andrey Sobolev 2024-10-22 21:42:40 +07:00 committed by GitHub
parent 5e234a0646
commit afcc25beca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 103 additions and 128 deletions

6
.vscode/launch.json vendored
View File

@ -118,9 +118,9 @@
"args": ["src/__start.ts"],
"env": {
"MONGO_URL": "mongodb://localhost:27017",
// "DB_URL": "mongodb://localhost:27017",
"REGION": "pg",
"DB_URL": "postgresql://postgres:example@localhost:5432",
"DB_URL": "mongodb://localhost:27017",
"REGION": "",
// "DB_URL": "postgresql://postgres:example@localhost:5432",
"SERVER_SECRET": "secret",
"TRANSACTOR_URL": "ws://localhost:3333",
"ACCOUNTS_URL": "http://localhost:3000",

View File

@ -115,10 +115,9 @@ services:
- MODEL_ENABLED=*
- ACCOUNTS_URL=http://host.docker.internal:3000
- BRANDING_PATH=/var/cfg/branding.json
- NOTIFY_INBOX_ONLY=true
# - PARALLEL=2
# - INIT_SCRIPT_URL=https://raw.githubusercontent.com/hcengineering/init/main/script.yaml
# - INIT_WORKSPACE=onboarding
- INIT_SCRIPT_URL=https://raw.githubusercontent.com/hcengineering/init/main/script.yaml
- INIT_WORKSPACE=test
restart: unless-stopped
workspacepg:
image: hardcoreeng/workspace
@ -142,7 +141,7 @@ services:
- ACCOUNTS_URL=http://host.docker.internal:3000
- BRANDING_PATH=/var/cfg/branding.json
# - PARALLEL=2
# - INIT_SCRIPT_URL=https://raw.githubusercontent.com/hcengineering/init/main/script.yaml
- INIT_SCRIPT_URL=https://raw.githubusercontent.com/hcengineering/init/main/script.yaml
# - INIT_WORKSPACE=onboarding
restart: unless-stopped
collaborator:

View File

@ -72,8 +72,6 @@ async function processFixJsonMarkupFor (
db: Db,
storageAdapter: StorageAdapter
): Promise<void> {
console.log('processing', domain, _class)
const collection = db.collection<Doc>(domain)
const docs = await collection.find({ _class }).toArray()
for (const doc of docs) {
@ -119,8 +117,6 @@ async function processFixJsonMarkupFor (
}
}
}
console.log('...processed', docs.length)
}
export async function migrateMarkup (
@ -151,12 +147,9 @@ export async function migrateMarkup (
const collection = workspaceDb.collection(domain)
const filter = hierarchy.isMixin(_class) ? { [_class]: { $exists: true } } : { _class }
const count = await collection.countDocuments(filter)
const iterator = collection.find<Doc>(filter)
try {
console.log('processing', _class, '->', count)
await processMigrateMarkupFor(ctx, hierarchy, storageAdapter, workspaceId, attributes, iterator, concurrency)
} finally {
await iterator.close()

View File

@ -67,7 +67,6 @@ async function processMigrateMarkupFor (
client: MigrationClient,
iterator: MigrationIterator<DocUpdateMessage>
): Promise<void> {
let processed = 0
while (true) {
const docs = await iterator.next(1000)
if (docs === null || docs.length === 0) {
@ -104,9 +103,6 @@ async function processMigrateMarkupFor (
if (ops.length > 0) {
await client.bulk(DOMAIN_ACTIVITY, ops)
}
processed += docs.length
console.log('...processed', processed)
}
}

View File

@ -53,7 +53,6 @@ async function processMigrateMarkupFor (
client: MigrationClient,
iterator: MigrationIterator<Doc>
): Promise<void> {
let processed = 0
while (true) {
const docs = await iterator.next(1000)
if (docs === null || docs.length === 0) {
@ -88,9 +87,6 @@ async function processMigrateMarkupFor (
if (operations.length > 0) {
await client.bulk(domain, operations)
}
processed += docs.length
console.log('...processed', processed)
}
}
@ -122,7 +118,6 @@ async function processFixMigrateMarkupFor (
client: MigrationClient,
iterator: MigrationIterator<Doc>
): Promise<void> {
let processed = 0
while (true) {
const docs = await iterator.next(1000)
if (docs === null || docs.length === 0) {
@ -164,9 +159,6 @@ async function processFixMigrateMarkupFor (
if (operations.length > 0) {
await client.bulk(domain, operations)
}
processed += docs.length
console.log('...processed', processed)
}
}

View File

@ -71,8 +71,11 @@ export class FullTextIndex implements WithFind {
}
async close (): Promise<void> {
this.indexer.triggerIndexing()
if (!this.upgrade) {
await this.indexer.cancel()
} else {
await this.indexer.processUpload(this.indexer.metrics)
}
}

View File

@ -351,7 +351,9 @@ class TSessionManager implements SessionManager {
version: this.modelVersion,
workspaceVersion: versionToString(workspaceInfo.version),
workspace: workspaceInfo.workspaceId,
workspaceUrl: workspaceInfo.workspaceUrl
workspaceUrl: workspaceInfo.workspaceUrl,
email: token.email,
extra: JSON.stringify(token.extra ?? {})
})
// Version mismatch, return upgrading.
return { upgrade: true, upgradeInfo: workspaceInfo.upgrade }

View File

@ -14,9 +14,7 @@
//
import core, {
BackupClient,
Branding,
Client as CoreClient,
coreId,
DOMAIN_BENCHMARK,
DOMAIN_MIGRATION,
@ -34,13 +32,13 @@ import core, {
TxOperations,
WorkspaceId,
WorkspaceIdWithUrl,
type Client,
type Doc,
type Ref,
type WithLookup
} from '@hcengineering/core'
import { consoleModelLogger, MigrateOperation, ModelLogger, tryMigrate } from '@hcengineering/model'
import { DomainIndexHelperImpl, Pipeline, StorageAdapter, type DbAdapter } from '@hcengineering/server-core'
import { connect } from './connect'
import { InitScript, WorkspaceInitializer } from './initializer'
import toolPlugin from './plugin'
import { MigrateClientImpl } from './upgrade'
@ -165,23 +163,15 @@ export async function updateModel (
try {
let i = 0
for (const op of migrateOperations) {
logger.log('Migrate', { name: op[0] })
const st = Date.now()
await op[1].upgrade(migrateState, async () => connection as any, logger)
const tdelta = Date.now() - st
if (tdelta > 0) {
logger.log('Create', { name: op[0], time: tdelta })
}
i++
await progress((((100 / migrateOperations.length) * i) / 100) * 30)
await progress((((100 / migrateOperations.length) * i) / 100) * 100)
}
// Create update indexes
await createUpdateIndexes(
ctx,
connection.getHierarchy(),
connection.getModel(),
pipeline,
async (value) => {
await progress(30 + (Math.min(value, 100) / 100) * 70)
},
workspaceId
)
await progress(100)
} catch (e: any) {
logger.error('error', { error: e })
@ -203,6 +193,7 @@ export async function initializeWorkspace (
): Promise<void> {
const initWS = branding?.initWorkspace ?? getMetadata(toolPlugin.metadata.InitWorkspace)
const scriptUrl = getMetadata(toolPlugin.metadata.InitScriptURL)
ctx.info('Init script details', { scriptUrl, initWS })
if (initWS === undefined || scriptUrl === undefined) return
try {
// `https://raw.githubusercontent.com/hcengineering/init/main/script.yaml`
@ -237,11 +228,12 @@ export async function upgradeModel (
workspaceId: WorkspaceIdWithUrl,
txes: Tx[],
pipeline: Pipeline,
connection: Client,
storageAdapter: StorageAdapter,
migrateOperations: [string, MigrateOperation][],
logger: ModelLogger = consoleModelLogger,
progress: (value: number) => Promise<void>,
forceIndexes: boolean = false
updateIndexes: 'perform' | 'skip' | 'disable' = 'skip'
): Promise<Tx[]> {
if (txes.some((tx) => tx.objectSpace !== core.space.Model)) {
throw Error('Model txes must target only core.space.Model')
@ -308,87 +300,69 @@ export async function upgradeModel (
workspaceId
)
}
if (forceIndexes) {
if (updateIndexes === 'perform') {
await upgradeIndexes()
}
await ctx.with('migrate', {}, async (ctx) => {
let i = 0
for (const op of migrateOperations) {
const t = Date.now()
try {
const t = Date.now()
await ctx.with(op[0], {}, async () => {
await op[1].migrate(migrateClient, logger)
})
const tdelta = Date.now() - t
if (tdelta > 0) {
logger.log('migrate:', { workspaceId: workspaceId.name, operation: op[0], time: Date.now() - t })
}
} catch (err: any) {
logger.error(`error during migrate: ${op[0]} ${err.message}`, err)
throw err
}
logger.log('migrate:', { workspaceId: workspaceId.name, operation: op[0], time: Date.now() - t })
await progress(20 + ((100 / migrateOperations.length) * i * 20) / 100)
i++
}
await tryMigrate(migrateClient, coreId, [
{
state: 'indexes-v5',
func: upgradeIndexes
}
])
if (updateIndexes === 'skip') {
await tryMigrate(migrateClient, coreId, [
{
state: 'indexes-v5',
func: upgradeIndexes
}
])
}
})
logger.log('Apply upgrade operations', { workspaceId: workspaceId.name })
let connection: (CoreClient & BackupClient) | undefined
const getUpgradeClient = async (): Promise<CoreClient & BackupClient> =>
await ctx.with('connect-platform', {}, async (ctx) => {
if (connection !== undefined) {
return connection
}
connection = (await connect(
transactorUrl,
workspaceId,
undefined,
{
mode: 'backup',
model: 'upgrade',
admin: 'true'
},
model
)) as CoreClient & BackupClient
return connection
})
try {
await ctx.with('upgrade', {}, async (ctx) => {
let i = 0
for (const op of migrateOperations) {
const t = Date.now()
await ctx.with(op[0], {}, () => op[1].upgrade(migrateState, getUpgradeClient, logger))
logger.log('upgrade:', { operation: op[0], time: Date.now() - t, workspaceId: workspaceId.name })
await progress(60 + ((100 / migrateOperations.length) * i * 30) / 100)
i++
}
})
if (connection === undefined) {
// We need to send reboot for workspace
ctx.info('send force close', { workspace: workspaceId.name, transactorUrl })
const serverEndpoint = transactorUrl.replaceAll('wss://', 'https://').replace('ws://', 'http://')
const token = generateToken(systemAccountEmail, workspaceId, { admin: 'true' })
try {
await fetch(
serverEndpoint + `/api/v1/manage?token=${token}&operation=force-close&wsId=${toWorkspaceString(workspaceId)}`,
{
method: 'PUT'
}
)
} catch (err: any) {
// Ignore error if transactor is not yet ready
await ctx.with('upgrade', {}, async (ctx) => {
let i = 0
for (const op of migrateOperations) {
const t = Date.now()
await ctx.with(op[0], {}, () => op[1].upgrade(migrateState, async () => connection, logger))
const tdelta = Date.now() - t
if (tdelta > 0) {
logger.log('upgrade:', { operation: op[0], time: tdelta, workspaceId: workspaceId.name })
}
await progress(60 + ((100 / migrateOperations.length) * i * 30) / 100)
i++
}
} finally {
await connection?.sendForceClose()
await connection?.close()
})
// We need to send reboot for workspace
ctx.info('send force close', { workspace: workspaceId.name, transactorUrl })
const serverEndpoint = transactorUrl.replaceAll('wss://', 'https://').replace('ws://', 'http://')
const token = generateToken(systemAccountEmail, workspaceId, { admin: 'true' })
try {
await fetch(
serverEndpoint + `/api/v1/manage?token=${token}&operation=force-close&wsId=${toWorkspaceString(workspaceId)}`,
{
method: 'PUT'
}
)
} catch (err: any) {
// Ignore error if transactor is not yet ready
}
return model
}
@ -407,7 +381,13 @@ async function prepareMigrationClient (
const migrateClient = new MigrateClientImpl(pipeline, hierarchy, model, logger, storageAdapter, workspaceId)
const states = await migrateClient.find<MigrationState>(DOMAIN_MIGRATION, { _class: core.class.MigrationState })
const sts = Array.from(groupByArray(states, (it) => it.plugin).entries())
const migrateState = new Map(sts.map((it) => [it[0], new Set(it[1].map((q) => q.state))]))
const _toSet = (vals: WithLookup<MigrationState>[]): Set<string> => {
return new Set(vals.map((q) => q.state))
}
const migrateState = new Map<string, Set<string>>(sts.map((it) => [it[0], _toSet(it[1])]))
// const migrateState = new Map(sts.map((it) => [it[0], new Set(it[1].map((q) => q.state))]))
migrateClient.migrateState = migrateState
return { migrateClient, migrateState }

View File

@ -90,6 +90,8 @@ export function serveWorkspaceAccount (
setMetadata(serverNotification.metadata.InboxOnlyNotifications, true)
let canceled = false
const worker = new WorkspaceWorker(
version,
txes,
@ -100,17 +102,22 @@ export function serveWorkspaceAccount (
brandings
)
void worker.start(measureCtx, {
errorHandler: async (ws, err) => {
Analytics.handleError(err)
void worker.start(
measureCtx,
{
errorHandler: async (ws, err) => {
Analytics.handleError(err)
},
force: false,
console: false,
logs: 'upgrade-logs',
waitTimeout
},
force: false,
console: false,
logs: 'upgrade-logs',
waitTimeout
})
() => canceled
)
const close = (): void => {
canceled = true
onClose?.()
}

View File

@ -79,7 +79,7 @@ export class WorkspaceWorker {
wakeup: () => void = () => {}
defaultWakeup: () => void = () => {}
async start (ctx: MeasureContext, opt: WorkspaceOptions): Promise<void> {
async start (ctx: MeasureContext, opt: WorkspaceOptions, isCanceled: () => boolean): Promise<void> {
this.defaultWakeup = () => {
ctx.info("I'm busy", { version: this.version, region: this.region })
}
@ -92,7 +92,7 @@ export class WorkspaceWorker {
ctx.info('Successfully connected to the account service')
while (true) {
while (!isCanceled()) {
await this.waitForAvailableThread()
const workspace = await ctx.with('get-pending-workspace', {}, async (ctx) => {

View File

@ -131,23 +131,20 @@ export async function createWorkspace (
usePassedCtx: true
})
const txAdapter = await txFactory(ctx, hierarchy, dbUrl, wsId, modelDb, storageAdapter)
await childLogger.withLog('init-workspace', {}, async (ctx) => {
await initModel(ctx, wsId, txes, txAdapter, storageAdapter, ctxModellogger, async (value) => {
await handleWsEvent?.('progress', version, 10 + Math.round((Math.min(value, 100) / 100) * 10))
})
await initModel(ctx, wsId, txes, txAdapter, storageAdapter, ctxModellogger, async (value) => {})
})
const client = new TxOperations(wrapPipeline(ctx, pipeline, wsUrl), core.account.ConfigUser)
await updateModel(ctx, wsId, migrationOperation, client, pipeline, ctxModellogger, async (value) => {
await handleWsEvent?.('progress', version, 20 + Math.round((Math.min(value, 100) / 100) * 10))
await handleWsEvent?.('progress', version, 10 + Math.round((Math.min(value, 100) / 100) * 10))
})
ctx.info('Starting init script if any')
await initializeWorkspace(ctx, branding, wsUrl, storageAdapter, client, ctxModellogger, async (value) => {
ctx.info('Init script progress', { value })
await handleWsEvent?.('progress', version, 30 + Math.round((Math.min(value, 100) / 100) * 60))
await handleWsEvent?.('progress', version, 20 + Math.round((Math.min(value, 100) / 100) * 60))
})
await upgradeWorkspaceWith(
@ -157,14 +154,15 @@ export async function createWorkspace (
migrationOperation,
workspaceInfo,
pipeline,
client,
storageAdapter,
ctxModellogger,
async (event, version, value) => {
ctx.info('Init script progress', { event, value })
await handleWsEvent?.('progress', version, 90 + Math.round((Math.min(value, 100) / 100) * 10))
await handleWsEvent?.('progress', version, 80 + Math.round((Math.min(value, 100) / 100) * 20))
},
false,
false
'disable'
)
await handleWsEvent?.('create-done', version, 100, '')
@ -216,6 +214,12 @@ export async function upgradeWorkspace (
return
}
const wsUrl: WorkspaceIdWithUrl = {
name: ws.workspace,
workspaceName: ws.workspaceName ?? '',
workspaceUrl: ws.workspaceUrl ?? ''
}
await upgradeWorkspaceWith(
ctx,
version,
@ -223,11 +227,12 @@ export async function upgradeWorkspace (
migrationOperation,
ws,
pipeline,
wrapPipeline(ctx, pipeline, wsUrl),
storageAdapter,
logger,
handleWsEvent,
forceUpdate,
forceIndexes,
forceIndexes ? 'perform' : 'skip',
external
)
} finally {
@ -246,6 +251,7 @@ export async function upgradeWorkspaceWith (
migrationOperation: [string, MigrateOperation][],
ws: BaseWorkspaceInfo,
pipeline: Pipeline,
connection: Client,
storageAdapter: StorageAdapter,
logger: ModelLogger = consoleModelLogger,
handleWsEvent?: (
@ -255,7 +261,7 @@ export async function upgradeWorkspaceWith (
message?: string
) => Promise<void>,
forceUpdate: boolean = true,
forceIndexes: boolean = false,
updateIndexes: 'perform' | 'skip' | 'disable' = 'skip',
external: boolean = false
): Promise<void> {
const versionStr = versionToString(version)
@ -310,13 +316,14 @@ export async function upgradeWorkspaceWith (
wsId,
txes,
pipeline,
connection,
storageAdapter,
migrationOperation,
logger,
async (value) => {
progress = value
},
forceIndexes
updateIndexes
)
await handleWsEvent?.('upgrade-done', version, 100, '')

View File

@ -271,7 +271,6 @@ async function processMigrateMarkupFor (
client: MigrationClient,
iterator: MigrationIterator<DocSyncInfo>
): Promise<void> {
let processed = 0
while (true) {
const docs = await iterator.next(1000)
if (docs === null || docs.length === 0) {
@ -298,9 +297,6 @@ async function processMigrateMarkupFor (
if (operations.length > 0) {
await client.bulk(DOMAIN_GITHUB, operations)
}
processed += docs.length
console.log('...processed', processed)
}
}