diff --git a/dev/client-resources/src/index.ts b/dev/client-resources/src/index.ts index 129179c994..5c293decbd 100644 --- a/dev/client-resources/src/index.ts +++ b/dev/client-resources/src/index.ts @@ -29,7 +29,11 @@ export default async () => { client = await createClient(connect) for (const op of migrateOperations) { console.log('Migrate', op[0]) - await op[1].upgrade(client) + await op[1].upgrade(client, { + log (...data) { + console.log(...data) + } + }) } } // Check if we had dev hook for client. diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 2504387fc6..94690f9f0e 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -29,7 +29,8 @@ import { replacePassword, setAccountAdmin, setRole, - upgradeWorkspace + upgradeWorkspace, + WorkspaceInfoOnly } from '@hcengineering/account' import { setMetadata } from '@hcengineering/platform' import { @@ -40,17 +41,18 @@ import { restore } from '@hcengineering/server-backup' import serverToken, { decodeToken, generateToken } from '@hcengineering/server-token' -import toolPlugin from '@hcengineering/server-tool' +import toolPlugin, { FileModelLogger } from '@hcengineering/server-tool' import { program } from 'commander' import { Db, MongoClient } from 'mongodb' import { clearTelegramHistory } from './telegram' import { diffWorkspace } from './workspace' -import { Data, getWorkspaceId, Tx, Version } from '@hcengineering/core' +import { Data, getWorkspaceId, RateLimitter, Tx, Version } from '@hcengineering/core' import { MinioService } from '@hcengineering/minio' import { MigrateOperation } from '@hcengineering/model' import { openAIConfigDefaults } from '@hcengineering/openai' +import path from 'path' import { benchmark } from './benchmark' import { cleanArchivedSpaces, @@ -232,19 +234,47 @@ export function devTool ( program .command('upgrade') .description('upgrade') - .option('-p|--parallel', 'Parallel upgrade', false) - .action(async (cmd: { parallel: boolean }) => { + .option('-p|--parallel ', 'Parallel upgrade', '0') + .option('-l|--logs ', 'Default logs folder', './logs') + .option('-r|--retry ', 'Number of apply retries', '0') + .option('-f|--force [force]', 'Force update', false) + .action(async (cmd: { parallel: string, logs: string, retry: string, force: boolean }) => { const { mongodbUri, version, txes, migrateOperations } = prepareTools() return await withDatabase(mongodbUri, async (db) => { const workspaces = await listWorkspaces(db, productId) - if (cmd.parallel) { - await Promise.all( - workspaces.map((ws) => upgradeWorkspace(version, txes, migrateOperations, productId, db, ws.workspace)) - ) - } else { + const withError: string[] = [] + + async function _upgradeWorkspace (ws: WorkspaceInfoOnly): Promise { + const t = Date.now() + const logger = new FileModelLogger(path.join(cmd.logs, `${ws.workspace}.log`)) + console.log('---UPGRADING----', ws.workspace, logger.file) + try { + await upgradeWorkspace(version, txes, migrateOperations, productId, db, ws.workspace, logger, cmd.force) + console.log('---UPGRADING-DONE----', ws.workspace, Date.now() - t) + } catch (err: any) { + withError.push(ws.workspace) + logger.log('error', JSON.stringify(err)) + console.log('---UPGRADING-FAILED----', ws.workspace, Date.now() - t) + } finally { + logger.close() + } + } + if (cmd.parallel !== '0') { + const parallel = parseInt(cmd.parallel) ?? 1 + const rateLimit = new RateLimitter(() => ({ rate: parallel })) + console.log('parallel upgrade', parallel, cmd.parallel) for (const ws of workspaces) { - console.log('---UPGRADING----', ws.workspace) - await upgradeWorkspace(version, txes, migrateOperations, productId, db, ws.workspace) + await rateLimit.exec(() => { + return _upgradeWorkspace(ws) + }) + } + } else { + console.log('UPGRADE write logs at:', cmd.logs) + for (const ws of workspaces) { + await _upgradeWorkspace(ws) + } + if (withError.length > 0) { + console.log('Failed workspaces', withError) } } }) diff --git a/packages/core/src/classes.ts b/packages/core/src/classes.ts index 2d88402cfc..be85c2341f 100644 --- a/packages/core/src/classes.ts +++ b/packages/core/src/classes.ts @@ -376,7 +376,7 @@ export interface MigrationState extends Doc { /** * @public */ -export function versionToString (version: Version): string { +export function versionToString (version: Version | Data): string { return `${version?.major}.${version?.minor}.${version?.patch}` } diff --git a/packages/core/src/client.ts b/packages/core/src/client.ts index 2e2a582c3f..d042881ca2 100644 --- a/packages/core/src/client.ts +++ b/packages/core/src/client.ts @@ -358,11 +358,13 @@ async function loadModel ( return modelResponse } - console.log( - 'find' + (modelResponse.full ? 'full model' : 'model diff'), - modelResponse.transactions.length, - Date.now() - t - ) + if (typeof window !== 'undefined') { + console.log( + 'find' + (modelResponse.full ? 'full model' : 'model diff'), + modelResponse.transactions.length, + Date.now() - t + ) + } await buildModel(modelResponse, allowedPlugins, configs, hierarchy, model) return modelResponse diff --git a/packages/model/src/migration.ts b/packages/model/src/migration.ts index 9720f3be9d..983e039f93 100644 --- a/packages/model/src/migration.ts +++ b/packages/model/src/migration.ts @@ -17,6 +17,7 @@ import core, { TxOperations, Data } from '@hcengineering/core' +import { ModelLogger } from './utils' /** * @public @@ -95,9 +96,9 @@ export type MigrationUpgradeClient = Client */ export interface MigrateOperation { // Perform low level migration - migrate: (client: MigrationClient) => Promise + migrate: (client: MigrationClient, logger: ModelLogger) => Promise // Perform high level upgrade operations. - upgrade: (client: MigrationUpgradeClient) => Promise + upgrade: (client: MigrationUpgradeClient, logger: ModelLogger) => Promise } /** diff --git a/packages/model/src/utils.ts b/packages/model/src/utils.ts index e6fb2d8c5a..9e0478fc88 100644 --- a/packages/model/src/utils.ts +++ b/packages/model/src/utils.ts @@ -51,3 +51,19 @@ export async function createOrUpdate ( await client.createDoc(_class, space, data, _id) } } + +/** + * @public + */ +export interface ModelLogger { + log: (...data: any[]) => void +} + +/** + * @public + */ +export const consoleModelLogger: ModelLogger = { + log (...data: any[]): void { + console.log(...data) + } +} diff --git a/plugins/client-resources/src/connection.ts b/plugins/client-resources/src/connection.ts index 18d0474d4a..6ccf02f90b 100644 --- a/plugins/client-resources/src/connection.ts +++ b/plugins/client-resources/src/connection.ts @@ -78,7 +78,6 @@ class Connection implements ClientConnection { private readonly onUnauthorized?: () => void, readonly onConnect?: (event: ClientConnectEvent) => Promise ) { - console.log('connection created') this.interval = setInterval(() => { // eslint-disable-next-line @typescript-eslint/no-floating-promises @@ -171,7 +170,6 @@ class Connection implements ClientConnection { typeof sessionStorage !== 'undefined' ? sessionStorage.getItem('session.id.' + this.url) ?? undefined : undefined - console.log('find sessionId', this.sessionId) this.sessionId = this.sessionId ?? generateId() if (typeof sessionStorage !== 'undefined') { sessionStorage.setItem('session.id.' + this.url, this.sessionId) @@ -203,7 +201,6 @@ class Connection implements ClientConnection { v.reconnect?.() } resolve(websocket) - console.log('reconnect info', (resp as HelloResponse).reconnect) void this.onConnect?.( (resp as HelloResponse).reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected @@ -289,7 +286,6 @@ class Connection implements ClientConnection { websocket.onopen = () => { const useBinary = getMetadata(client.metadata.UseBinaryProtocol) ?? true const useCompression = getMetadata(client.metadata.UseProtocolCompression) ?? false - console.log('connection opened...', socketId, useBinary, useCompression) clearTimeout(dialTimer) const helloRequest: HelloRequest = { method: 'hello', diff --git a/plugins/client-resources/src/index.ts b/plugins/client-resources/src/index.ts index a27eaaf780..709469bc73 100644 --- a/plugins/client-resources/src/index.ts +++ b/plugins/client-resources/src/index.ts @@ -50,7 +50,7 @@ export default async () => { let client = createClient( (handler: TxHandler) => { const url = new URL(`/${token}`, endpoint) - console.log('connecting to', url.href) + const upgradeHandler: TxHandler = (tx) => { if (tx?._class === core.class.TxWorkspaceEvent) { const event = tx as TxWorkspaceEvent diff --git a/server/account/src/index.ts b/server/account/src/index.ts index bee90359ba..3c1f819858 100644 --- a/server/account/src/index.ts +++ b/server/account/src/index.ts @@ -32,9 +32,10 @@ import core, { Tx, TxOperations, Version, + versionToString, WorkspaceId } from '@hcengineering/core' -import { MigrateOperation } from '@hcengineering/model' +import { consoleModelLogger, MigrateOperation, ModelLogger } from '@hcengineering/model' import platform, { getMetadata, Metadata, @@ -118,6 +119,7 @@ export interface Workspace { accounts: ObjectId[] productId: string disabled?: boolean + version?: Data } /** @@ -609,7 +611,9 @@ export async function upgradeWorkspace ( migrationOperation: [string, MigrateOperation][], productId: string, db: Db, - workspace: string + workspace: string, + logger: ModelLogger = consoleModelLogger, + forceUpdate: boolean = true ): Promise { const ws = await getWorkspace(db, productId, workspace) if (ws === null) { @@ -620,14 +624,26 @@ export async function upgradeWorkspace ( throw new PlatformError(new Status(Severity.ERROR, platform.status.ProductIdMismatch, { productId })) } } + const versionStr = versionToString(version) + + const currentVersion = await db.collection(WORKSPACE_COLLECTION).findOne({ workspace }) + console.log( + `${forceUpdate ? 'force-' : ''}upgrade from "${ + currentVersion?.version !== undefined ? versionToString(currentVersion.version) : '' + }" to "${versionStr}"` + ) + + if (currentVersion?.version !== undefined && !forceUpdate && versionStr === versionToString(currentVersion.version)) { + return versionStr + } await db.collection(WORKSPACE_COLLECTION).updateOne( { workspace }, { $set: { version } } ) - await upgradeModel(getTransactor(), getWorkspaceId(workspace, productId), txes, migrationOperation) - return `${version.major}.${version.minor}.${version.patch}` + await upgradeModel(getTransactor(), getWorkspaceId(workspace, productId), txes, migrationOperation, logger) + return versionStr } /** diff --git a/server/tool/src/index.ts b/server/tool/src/index.ts index 8f631ff54d..0881b4ff82 100644 --- a/server/tool/src/index.ts +++ b/server/tool/src/index.ts @@ -29,17 +29,37 @@ import core, { WorkspaceId } from '@hcengineering/core' import { MinioService } from '@hcengineering/minio' -import { MigrateOperation } from '@hcengineering/model' +import { consoleModelLogger, MigrateOperation, ModelLogger } from '@hcengineering/model' import { getWorkspaceDB } from '@hcengineering/mongo' import { Db, Document, MongoClient } from 'mongodb' import { connect } from './connect' import toolPlugin from './plugin' import { MigrateClientImpl } from './upgrade' +import fs from 'fs' +import path from 'path' + export * from './connect' export * from './plugin' export { toolPlugin as default } +export class FileModelLogger implements ModelLogger { + handle: fs.WriteStream + constructor (readonly file: string) { + fs.mkdirSync(path.dirname(this.file), { recursive: true }) + + this.handle = fs.createWriteStream(this.file, { flags: 'a' }) + } + + log (...data: any[]): void { + this.handle.write(data.map((it: any) => JSON.stringify(it)).join(' ') + '\n') + } + + close (): void { + this.handle.close() + } +} + /** * @public */ @@ -93,7 +113,8 @@ export async function initModel ( transactorUrl: string, workspaceId: WorkspaceId, rawTxes: Tx[], - migrateOperations: [string, MigrateOperation][] + migrateOperations: [string, MigrateOperation][], + logger: ModelLogger = consoleModelLogger ): Promise { const { mongodbUri, minio, txes } = prepareTools(rawTxes) if (txes.some((tx) => tx.objectSpace !== core.space.Model)) { @@ -105,33 +126,33 @@ export async function initModel ( await client.connect() const db = getWorkspaceDB(client, workspaceId) - console.log('dropping database...') + logger.log('dropping database...') await db.dropDatabase() - console.log('creating model...') + logger.log('creating model...') const model = txes const result = await db.collection(DOMAIN_TX).insertMany(model as Document[]) - console.log(`${result.insertedCount} model transactions inserted.`) + logger.log(`${result.insertedCount} model transactions inserted.`) - console.log('creating data...') + logger.log('creating data...') const connection = (await connect(transactorUrl, workspaceId, undefined, { model: 'upgrade' })) as unknown as CoreClient & BackupClient try { for (const op of migrateOperations) { - console.log('Migrage', op[0]) - await op[1].upgrade(connection) + logger.log('Migrage', op[0]) + await op[1].upgrade(connection, logger) } } catch (e) { - console.log(e) + logger.log(e) } finally { await connection.close() } // Create update indexes - await createUpdateIndexes(connection, db) + await createUpdateIndexes(connection, db, logger) - console.log('create minio bucket') + logger.log('create minio bucket') if (!(await minio.exists(workspaceId))) { await minio.make(workspaceId) } @@ -147,7 +168,8 @@ export async function upgradeModel ( transactorUrl: string, workspaceId: WorkspaceId, rawTxes: Tx[], - migrateOperations: [string, MigrateOperation][] + migrateOperations: [string, MigrateOperation][], + logger: ModelLogger = consoleModelLogger ): Promise { const { mongodbUri, txes } = prepareTools(rawTxes) @@ -160,19 +182,19 @@ export async function upgradeModel ( await client.connect() const db = getWorkspaceDB(client, workspaceId) - console.log(`${workspaceId.name}: removing model...`) + logger.log(`${workspaceId.name}: removing model...`) // we're preserving accounts (created by core.account.System). const result = await db.collection(DOMAIN_TX).deleteMany({ objectSpace: core.space.Model, modifiedBy: core.account.System, objectClass: { $nin: [contact.class.PersonAccount, 'contact:class:EmployeeAccount'] } }) - console.log(`${workspaceId.name}: ${result.deletedCount} transactions deleted.`) + logger.log(`${workspaceId.name}: ${result.deletedCount} transactions deleted.`) - console.log(`${workspaceId.name}: creating model...`) + logger.log(`${workspaceId.name}: creating model...`) const model = txes const insert = await db.collection(DOMAIN_TX).insertMany(model as Document[]) - console.log(`${workspaceId.name}: ${insert.insertedCount} model transactions inserted.`) + logger.log(`${workspaceId.name}: ${insert.insertedCount} model transactions inserted.`) const hierarchy = new Hierarchy() const modelDb = new ModelDb(hierarchy) @@ -189,20 +211,20 @@ export async function upgradeModel ( const migrateClient = new MigrateClientImpl(db, hierarchy, modelDb) for (const op of migrateOperations) { - console.log(`${workspaceId.name}: migrate:`, op[0]) - await op[1].migrate(migrateClient) + logger.log(`${workspaceId.name}: migrate:`, op[0]) + await op[1].migrate(migrateClient, logger) } - console.log(`${workspaceId.name}: Apply upgrade operations`) + logger.log(`${workspaceId.name}: Apply upgrade operations`) const connection = await connect(transactorUrl, workspaceId, undefined, { mode: 'backup', model: 'upgrade' }) // Create update indexes - await createUpdateIndexes(connection, db) + await createUpdateIndexes(connection, db, logger) for (const op of migrateOperations) { - console.log(`${workspaceId.name}: upgrade:`, op[0]) - await op[1].upgrade(connection) + logger.log(`${workspaceId.name}: upgrade:`, op[0]) + await op[1].upgrade(connection, logger) } await connection.close() @@ -211,7 +233,7 @@ export async function upgradeModel ( } } -async function createUpdateIndexes (connection: CoreClient, db: Db): Promise { +async function createUpdateIndexes (connection: CoreClient, db: Db, logger: ModelLogger): Promise { const classes = await connection.findAll(core.class.Class, {}) const hierarchy = connection.getHierarchy() @@ -252,12 +274,12 @@ async function createUpdateIndexes (connection: CoreClient, db: Db): Promise 0) { - console.log('created indexes', d, bb) + logger.log('created indexes', d, bb) } } } diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index 5954926bc2..1e34b4683a 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -141,7 +141,7 @@ class TSessionManager implements SessionManager { return this.sessionFactory(token, pipeline, this.broadcast.bind(this)) } - upgradeId: string | undefined + upgradeIdMap: Map = new Map() async addSession ( ctx: MeasureContext, @@ -163,15 +163,18 @@ class TSessionManager implements SessionManager { } let pipeline: Pipeline + const upgradeId = this.upgradeIdMap.get(token.workspace.name) if (token.extra?.model === 'upgrade') { - if (this.upgradeId !== undefined && sessionId !== this.upgradeId) { + if (upgradeId !== undefined && sessionId !== upgradeId) { ws.close() throw new Error('Another Upgrade in progress....') } - this.upgradeId = sessionId + if (sessionId !== undefined) { + this.upgradeIdMap.set(token.workspace.name, sessionId) + } pipeline = await this.createUpgradeSession(token, sessionId, ctx, wsString, workspace, pipelineFactory, ws) } else { - if (workspace.upgrade && sessionId !== this.upgradeId) { + if (workspace.upgrade && sessionId !== upgradeId) { ws.close() throw new Error('Upgrade in progress....') } @@ -316,8 +319,9 @@ class TSessionManager implements SessionManager { } const sessionRef = this.sessions.get(ws.id) if (sessionRef !== undefined) { - if (this.upgradeId === sessionRef.session.sessionId) { - this.upgradeId = undefined + const upgradeId = this.upgradeIdMap.get(workspaceId.name) + if (upgradeId === sessionRef.session.sessionId) { + this.upgradeIdMap.delete(workspaceId.name) } this.sessions.delete(ws.id) workspace.sessions.delete(sessionRef.session.sessionId) @@ -422,12 +426,14 @@ class TSessionManager implements SessionManager { if (this.workspaces.get(wsid)?.id === wsUID) { this.workspaces.delete(wsid) + this.upgradeIdMap.delete(workspaceId.name) } if (LOGGING_ENABLED) { console.timeLog(workspaceId.name, 'Closed workspace', wsUID) } } catch (err: any) { this.workspaces.delete(wsid) + this.upgradeIdMap.delete(workspaceId.name) if (LOGGING_ENABLED) { console.error(workspaceId.name, err) }