From cd0a687e470497fb7392477d8064eb651d3f3a15 Mon Sep 17 00:00:00 2001 From: Alexey Zinoviev Date: Thu, 1 May 2025 07:06:53 +0400 Subject: [PATCH] Qfix: restore collab json cr tool (#8794) --- dev/tool/src/index.ts | 109 +++++++++++++++++++++++++++-------------- dev/tool/src/logger.ts | 65 ++++++++++++++++++++++++ dev/tool/src/markup.ts | 74 ++++++++++++++++++---------- 3 files changed, 185 insertions(+), 63 deletions(-) create mode 100644 dev/tool/src/logger.ts diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index d5caf0b533..8073370263 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -87,6 +87,7 @@ import core, { generateId, getWorkspaceId, isActiveMode, + type LowLevelStorage, MeasureMetricsContext, metricsToString, RateLimiter, @@ -121,7 +122,12 @@ import { shutdownPostgres } from '@hcengineering/postgres' import { CONFIG_KIND as S3_CONFIG_KIND, S3Service, type S3Config } from '@hcengineering/s3' -import type { PipelineFactory, StorageAdapter, StorageAdapterEx } from '@hcengineering/server-core' +import { + createDummyStorageAdapter, + type PipelineFactory, + type StorageAdapter, + type StorageAdapterEx +} from '@hcengineering/server-core' import { deepEqual } from 'fast-equals' import { createWriteStream, readFileSync } from 'fs' import { getAccountDBUrl, getMongoDBUrl } from './__start' @@ -156,9 +162,10 @@ import { updateDataWorkspaceIdToUuid } from './db' import { reindexWorkspace } from './fulltext' -import { restoreControlledDocContentMongo, restoreMarkupRefsMongo, restoreWikiContentMongo } from './markup' +import { restoreControlledDocContentMongo, restoreMarkupRefs, restoreWikiContentMongo } from './markup' import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin' import { copyToDatalake, moveFiles, showLostFiles } from './storage' +import { SimpleFileLogger } from './logger' const colorConstants = { colorRed: '\u001b[31m', @@ -1537,56 +1544,86 @@ export function devTool ( }) program - .command('restore-markup-ref-mongo') + .command('restore-markup-ref') .description('restore markup document content refs') - .option('-w, --workspace ', 'Selected workspace only', '') - .option('-f, --force', 'Force update', false) - .action(async (cmd: { workspace: string, force: boolean }) => { - const { txes, version } = prepareTools() - + .option('-w, --workspace ', 'Selected "workspaces" only, comma separated', '') + .option('-m, --migrated', 'Migrated only', false) + .option('-d, --dryrun', 'Dry run', false) + .option('-f, --force', 'Force update (skip version check)', false) + .action(async (cmd: { workspace: string, migrated: boolean, dryrun: boolean, force: boolean }) => { + let workspaces: Workspace[] = [] + const targetWorkspaces = cmd.workspace.split(',') + const { txes, version, dbUrl } = prepareTools() const { hierarchy } = await buildModel(toolCtx, txes) - let workspaces: Workspace[] = [] await withAccountDatabase(async (db) => { workspaces = await listWorkspacesPure(db) workspaces = workspaces .filter((p) => isActiveMode(p.mode)) - .filter((p) => cmd.workspace === '' || p.workspace === cmd.workspace) + .filter((p) => cmd.workspace === '' || targetWorkspaces.includes(p.workspace)) + .filter( + (p) => + !cmd.migrated || + (p.region === 'europe' && p.targetRegion === 'europe' && p.message === 'restore-done done') + ) .sort((a, b) => b.lastVisit - a.lastVisit) }) - console.log('found workspaces', workspaces.length) + toolCtx.info('found workspaces', { count: workspaces.length }) await withStorage(async (storageAdapter) => { - const mongodbUri = getMongoDBUrl() - const client = getMongoClient(mongodbUri) - const _client = await client.getClient() + const count = workspaces.length + let index = 0 + for (const workspace of workspaces) { + index++ - try { - const count = workspaces.length - let index = 0 - for (const workspace of workspaces) { - index++ + const ctx = new MeasureMetricsContext( + workspace.workspace, + {}, + {}, + undefined, + new SimpleFileLogger(workspace.workspace) + ) - toolCtx.info('processing workspace', { - workspace: workspace.workspace, - version: workspace.version, - index, - count - }) + ctx.info('processing workspace', { + workspace: workspace.workspace, + version: workspace.version, + index, + count + }) - if (!cmd.force && (workspace.version === undefined || !deepEqual(workspace.version, version))) { - console.log(`upgrade to ${versionToString(version)} is required`) - continue - } - - const workspaceId = getWorkspaceId(workspace.workspace) - const wsDb = getWorkspaceMongoDB(_client, { name: workspace.workspace }) - - await restoreMarkupRefsMongo(toolCtx, wsDb, workspaceId, hierarchy, storageAdapter) + if (!cmd.force && (workspace.version === undefined || !deepEqual(workspace.version, version))) { + console.log(`Upgrade to ${versionToString(version)} is required or run with --force flag`) + continue + } + + const workspaceId = getWorkspaceId(workspace.workspace) + const backupPipeline = createBackupPipeline(ctx, dbUrl, txes, { + externalStorage: createDummyStorageAdapter() + }) + + const pipeline = await backupPipeline( + ctx, + { + name: workspace.workspace, + workspaceName: workspace.workspaceName ?? '', + workspaceUrl: '', + uuid: workspace.uuid ?? '' + }, + false, + () => {}, + null + ) + + try { + const lowLevel = pipeline.context.lowLevelStorage as LowLevelStorage + + await restoreMarkupRefs(ctx, lowLevel, workspaceId, hierarchy, storageAdapter, cmd.dryrun) + } catch (err: any) { + ctx.error('failed to restore markup refs', { err }) + } finally { + await pipeline.close() } - } finally { - client.close() } }) }) diff --git a/dev/tool/src/logger.ts b/dev/tool/src/logger.ts new file mode 100644 index 0000000000..c38e9e6b63 --- /dev/null +++ b/dev/tool/src/logger.ts @@ -0,0 +1,65 @@ +import { type MeasureLogger, type ParamsType } from '@hcengineering/core' +import { createWriteStream, mkdirSync } from 'fs' +import { join } from 'path' + +export class SimpleFileLogger implements MeasureLogger { + private readonly stream: NodeJS.WritableStream + private readonly children: SimpleFileLogger[] = [] + + constructor ( + private readonly name: string, + private readonly logDir: string = 'logs' + ) { + // Ensure log directory exists + mkdirSync(logDir, { recursive: true }) + + // Create write stream for logging + const logPath = join(logDir, `${name}.log`) + this.stream = createWriteStream(logPath, { flags: 'a' }) + } + + private writeLog (level: string, message: string, obj?: Record): void { + const timestamp = new Date().toISOString() + const logEntry = { + timestamp, + level, + message, + ...obj + } + this.stream.write(JSON.stringify(logEntry) + '\n') + } + + info (message: string, obj?: Record): void { + this.writeLog('info', message, obj) + } + + error (message: string, obj?: Record): void { + this.writeLog('error', message, obj) + } + + warn (message: string, obj?: Record): void { + this.writeLog('warn', message, obj) + } + + logOperation (operation: string, time: number, params: ParamsType): void { + this.writeLog('operation', operation, { time, ...params }) + } + + childLogger (name: string, params: Record): MeasureLogger { + const child = new SimpleFileLogger(name, join(this.logDir, name)) + this.children.push(child) + return child + } + + async close (): Promise { + // Close all child loggers + await Promise.all(this.children.map((child) => child.close())) + + // Close current stream + await new Promise((resolve, reject) => { + this.stream.end(() => { + resolve() + }) + }) + } +} diff --git a/dev/tool/src/markup.ts b/dev/tool/src/markup.ts index 776be29e79..734f91f41b 100644 --- a/dev/tool/src/markup.ts +++ b/dev/tool/src/markup.ts @@ -30,6 +30,7 @@ import core, { type TxUpdateDoc, type WorkspaceId, DOMAIN_TX, + type LowLevelStorage, SortingOrder, makeCollabId, makeCollabYdocId, @@ -299,14 +300,17 @@ export async function restoreControlledDocContentForDoc ( return true } -export async function restoreMarkupRefsMongo ( +export async function restoreMarkupRefs ( ctx: MeasureContext, - db: Db, + lowLevelStorage: LowLevelStorage, workspaceId: WorkspaceId, hierarchy: Hierarchy, - storageAdapter: StorageAdapter + storageAdapter: StorageAdapter, + dryRun: boolean ): Promise { const classes = hierarchy.getDescendants(core.class.Doc) + let updatedCount = 0 + const curHash = Date.now().toString(16) // Current hash value for (const _class of classes) { const domain = hierarchy.findDomain(_class) if (domain === undefined) continue @@ -319,44 +323,60 @@ export async function restoreMarkupRefsMongo ( if (attributes.length === 0) continue if (hierarchy.isMixin(_class) && attributes.every((p) => p.attributeOf !== _class)) continue - ctx.info('processing', { _class, attributes: attributes.map((p) => p.name) }) + ctx.info('processing', { _class, domain, attributes: attributes.map((p) => p.name) }) - const collection = db.collection(domain) - const iterator = collection.find({ _class }) + const iterator = await lowLevelStorage.traverse(domain, { _class }) try { while (true) { - const doc = await iterator.next() - if (doc === null) { + // next's count param doesn't work for CR. Always returns all rows regardless of count param. + const docs = await iterator.next(20) + if (docs === null) { break } + for (const doc of docs) { + if (doc === null) continue - for (const attribute of attributes) { - const isMixin = hierarchy.isMixin(attribute.attributeOf) + for (const attribute of attributes) { + const isMixin = hierarchy.isMixin(attribute.attributeOf) - const attributeName = isMixin ? `${attribute.attributeOf}.${attribute.name}` : attribute.name + const attributeName = isMixin ? `${attribute.attributeOf}.${attribute.name}` : attribute.name - const value = isMixin - ? ((doc as any)[attribute.attributeOf]?.[attribute.name] as string) - : ((doc as any)[attribute.name] as string) + const value = isMixin + ? ((doc as any)[attribute.attributeOf]?.[attribute.name] as string) + : ((doc as any)[attribute.name] as string) - if (typeof value === 'string') { - continue + if (value != null && value !== '') { + continue + } + + const collabId = makeCollabId(doc._class, doc._id, attribute.name) + const ydocId = makeCollabYdocId(collabId) + + try { + // Note: read operation throws if not found so it won't update docs for which + // there's no blob. + const buffer = await storageAdapter.read(ctx, workspaceId, ydocId) + if (!dryRun) { + const ydoc = yDocFromBuffer(Buffer.concat(buffer as any)) + + const jsonId = await saveCollabJson(ctx, storageAdapter, workspaceId, collabId, ydoc) + await lowLevelStorage.rawUpdate(domain, { _id: doc._id }, { + [attributeName]: jsonId, + '%hash%': curHash + } as any) + } + + ctx.info('Restored empty markup ref', { _class, attributeName, collabId, ydocId }) + updatedCount++ + } catch (err: any) { + ctx.error('Failed to restore markup ref', { _class, attributeName, collabId, ydocId, err }) + } } - - const collabId = makeCollabId(doc._class, doc._id, attribute.name) - const ydocId = makeCollabYdocId(collabId) - - try { - const buffer = await storageAdapter.read(ctx, workspaceId, ydocId) - const ydoc = yDocFromBuffer(Buffer.concat(buffer as any)) - - const jsonId = await saveCollabJson(ctx, storageAdapter, workspaceId, collabId, ydoc) - await collection.updateOne({ _id: doc._id }, { $set: { [attributeName]: jsonId } }) - } catch {} } } } finally { await iterator.close() } } + ctx.info('Restored markup refs', { updatedCount }) }