mirror of
https://github.com/hcengineering/platform.git
synced 2025-05-11 18:01:59 +00:00
Qfix: restore collab json cr tool (#8794)
This commit is contained in:
parent
85fc90e26c
commit
cd0a687e47
@ -87,6 +87,7 @@ import core, {
|
||||
generateId,
|
||||
getWorkspaceId,
|
||||
isActiveMode,
|
||||
type LowLevelStorage,
|
||||
MeasureMetricsContext,
|
||||
metricsToString,
|
||||
RateLimiter,
|
||||
@ -121,7 +122,12 @@ import {
|
||||
shutdownPostgres
|
||||
} from '@hcengineering/postgres'
|
||||
import { CONFIG_KIND as S3_CONFIG_KIND, S3Service, type S3Config } from '@hcengineering/s3'
|
||||
import type { PipelineFactory, StorageAdapter, StorageAdapterEx } from '@hcengineering/server-core'
|
||||
import {
|
||||
createDummyStorageAdapter,
|
||||
type PipelineFactory,
|
||||
type StorageAdapter,
|
||||
type StorageAdapterEx
|
||||
} from '@hcengineering/server-core'
|
||||
import { deepEqual } from 'fast-equals'
|
||||
import { createWriteStream, readFileSync } from 'fs'
|
||||
import { getAccountDBUrl, getMongoDBUrl } from './__start'
|
||||
@ -156,9 +162,10 @@ import {
|
||||
updateDataWorkspaceIdToUuid
|
||||
} from './db'
|
||||
import { reindexWorkspace } from './fulltext'
|
||||
import { restoreControlledDocContentMongo, restoreMarkupRefsMongo, restoreWikiContentMongo } from './markup'
|
||||
import { restoreControlledDocContentMongo, restoreMarkupRefs, restoreWikiContentMongo } from './markup'
|
||||
import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin'
|
||||
import { copyToDatalake, moveFiles, showLostFiles } from './storage'
|
||||
import { SimpleFileLogger } from './logger'
|
||||
|
||||
const colorConstants = {
|
||||
colorRed: '\u001b[31m',
|
||||
@ -1537,56 +1544,86 @@ export function devTool (
|
||||
})
|
||||
|
||||
program
|
||||
.command('restore-markup-ref-mongo')
|
||||
.command('restore-markup-ref')
|
||||
.description('restore markup document content refs')
|
||||
.option('-w, --workspace <workspace>', 'Selected workspace only', '')
|
||||
.option('-f, --force', 'Force update', false)
|
||||
.action(async (cmd: { workspace: string, force: boolean }) => {
|
||||
const { txes, version } = prepareTools()
|
||||
|
||||
.option('-w, --workspace <workspace>', 'Selected "workspaces" only, comma separated', '')
|
||||
.option('-m, --migrated', 'Migrated only', false)
|
||||
.option('-d, --dryrun', 'Dry run', false)
|
||||
.option('-f, --force', 'Force update (skip version check)', false)
|
||||
.action(async (cmd: { workspace: string, migrated: boolean, dryrun: boolean, force: boolean }) => {
|
||||
let workspaces: Workspace[] = []
|
||||
const targetWorkspaces = cmd.workspace.split(',')
|
||||
const { txes, version, dbUrl } = prepareTools()
|
||||
const { hierarchy } = await buildModel(toolCtx, txes)
|
||||
|
||||
let workspaces: Workspace[] = []
|
||||
await withAccountDatabase(async (db) => {
|
||||
workspaces = await listWorkspacesPure(db)
|
||||
workspaces = workspaces
|
||||
.filter((p) => isActiveMode(p.mode))
|
||||
.filter((p) => cmd.workspace === '' || p.workspace === cmd.workspace)
|
||||
.filter((p) => cmd.workspace === '' || targetWorkspaces.includes(p.workspace))
|
||||
.filter(
|
||||
(p) =>
|
||||
!cmd.migrated ||
|
||||
(p.region === 'europe' && p.targetRegion === 'europe' && p.message === 'restore-done done')
|
||||
)
|
||||
.sort((a, b) => b.lastVisit - a.lastVisit)
|
||||
})
|
||||
|
||||
console.log('found workspaces', workspaces.length)
|
||||
toolCtx.info('found workspaces', { count: workspaces.length })
|
||||
|
||||
await withStorage(async (storageAdapter) => {
|
||||
const mongodbUri = getMongoDBUrl()
|
||||
const client = getMongoClient(mongodbUri)
|
||||
const _client = await client.getClient()
|
||||
const count = workspaces.length
|
||||
let index = 0
|
||||
for (const workspace of workspaces) {
|
||||
index++
|
||||
|
||||
try {
|
||||
const count = workspaces.length
|
||||
let index = 0
|
||||
for (const workspace of workspaces) {
|
||||
index++
|
||||
const ctx = new MeasureMetricsContext(
|
||||
workspace.workspace,
|
||||
{},
|
||||
{},
|
||||
undefined,
|
||||
new SimpleFileLogger(workspace.workspace)
|
||||
)
|
||||
|
||||
toolCtx.info('processing workspace', {
|
||||
workspace: workspace.workspace,
|
||||
version: workspace.version,
|
||||
index,
|
||||
count
|
||||
})
|
||||
ctx.info('processing workspace', {
|
||||
workspace: workspace.workspace,
|
||||
version: workspace.version,
|
||||
index,
|
||||
count
|
||||
})
|
||||
|
||||
if (!cmd.force && (workspace.version === undefined || !deepEqual(workspace.version, version))) {
|
||||
console.log(`upgrade to ${versionToString(version)} is required`)
|
||||
continue
|
||||
}
|
||||
|
||||
const workspaceId = getWorkspaceId(workspace.workspace)
|
||||
const wsDb = getWorkspaceMongoDB(_client, { name: workspace.workspace })
|
||||
|
||||
await restoreMarkupRefsMongo(toolCtx, wsDb, workspaceId, hierarchy, storageAdapter)
|
||||
if (!cmd.force && (workspace.version === undefined || !deepEqual(workspace.version, version))) {
|
||||
console.log(`Upgrade to ${versionToString(version)} is required or run with --force flag`)
|
||||
continue
|
||||
}
|
||||
|
||||
const workspaceId = getWorkspaceId(workspace.workspace)
|
||||
const backupPipeline = createBackupPipeline(ctx, dbUrl, txes, {
|
||||
externalStorage: createDummyStorageAdapter()
|
||||
})
|
||||
|
||||
const pipeline = await backupPipeline(
|
||||
ctx,
|
||||
{
|
||||
name: workspace.workspace,
|
||||
workspaceName: workspace.workspaceName ?? '',
|
||||
workspaceUrl: '',
|
||||
uuid: workspace.uuid ?? ''
|
||||
},
|
||||
false,
|
||||
() => {},
|
||||
null
|
||||
)
|
||||
|
||||
try {
|
||||
const lowLevel = pipeline.context.lowLevelStorage as LowLevelStorage
|
||||
|
||||
await restoreMarkupRefs(ctx, lowLevel, workspaceId, hierarchy, storageAdapter, cmd.dryrun)
|
||||
} catch (err: any) {
|
||||
ctx.error('failed to restore markup refs', { err })
|
||||
} finally {
|
||||
await pipeline.close()
|
||||
}
|
||||
} finally {
|
||||
client.close()
|
||||
}
|
||||
})
|
||||
})
|
||||
|
65
dev/tool/src/logger.ts
Normal file
65
dev/tool/src/logger.ts
Normal file
@ -0,0 +1,65 @@
|
||||
import { type MeasureLogger, type ParamsType } from '@hcengineering/core'
|
||||
import { createWriteStream, mkdirSync } from 'fs'
|
||||
import { join } from 'path'
|
||||
|
||||
export class SimpleFileLogger implements MeasureLogger {
|
||||
private readonly stream: NodeJS.WritableStream
|
||||
private readonly children: SimpleFileLogger[] = []
|
||||
|
||||
constructor (
|
||||
private readonly name: string,
|
||||
private readonly logDir: string = 'logs'
|
||||
) {
|
||||
// Ensure log directory exists
|
||||
mkdirSync(logDir, { recursive: true })
|
||||
|
||||
// Create write stream for logging
|
||||
const logPath = join(logDir, `${name}.log`)
|
||||
this.stream = createWriteStream(logPath, { flags: 'a' })
|
||||
}
|
||||
|
||||
private writeLog (level: string, message: string, obj?: Record<string, any>): void {
|
||||
const timestamp = new Date().toISOString()
|
||||
const logEntry = {
|
||||
timestamp,
|
||||
level,
|
||||
message,
|
||||
...obj
|
||||
}
|
||||
this.stream.write(JSON.stringify(logEntry) + '\n')
|
||||
}
|
||||
|
||||
info (message: string, obj?: Record<string, any>): void {
|
||||
this.writeLog('info', message, obj)
|
||||
}
|
||||
|
||||
error (message: string, obj?: Record<string, any>): void {
|
||||
this.writeLog('error', message, obj)
|
||||
}
|
||||
|
||||
warn (message: string, obj?: Record<string, any>): void {
|
||||
this.writeLog('warn', message, obj)
|
||||
}
|
||||
|
||||
logOperation (operation: string, time: number, params: ParamsType): void {
|
||||
this.writeLog('operation', operation, { time, ...params })
|
||||
}
|
||||
|
||||
childLogger (name: string, params: Record<string, any>): MeasureLogger {
|
||||
const child = new SimpleFileLogger(name, join(this.logDir, name))
|
||||
this.children.push(child)
|
||||
return child
|
||||
}
|
||||
|
||||
async close (): Promise<void> {
|
||||
// Close all child loggers
|
||||
await Promise.all(this.children.map((child) => child.close()))
|
||||
|
||||
// Close current stream
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
this.stream.end(() => {
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
@ -30,6 +30,7 @@ import core, {
|
||||
type TxUpdateDoc,
|
||||
type WorkspaceId,
|
||||
DOMAIN_TX,
|
||||
type LowLevelStorage,
|
||||
SortingOrder,
|
||||
makeCollabId,
|
||||
makeCollabYdocId,
|
||||
@ -299,14 +300,17 @@ export async function restoreControlledDocContentForDoc (
|
||||
return true
|
||||
}
|
||||
|
||||
export async function restoreMarkupRefsMongo (
|
||||
export async function restoreMarkupRefs (
|
||||
ctx: MeasureContext,
|
||||
db: Db,
|
||||
lowLevelStorage: LowLevelStorage,
|
||||
workspaceId: WorkspaceId,
|
||||
hierarchy: Hierarchy,
|
||||
storageAdapter: StorageAdapter
|
||||
storageAdapter: StorageAdapter,
|
||||
dryRun: boolean
|
||||
): Promise<void> {
|
||||
const classes = hierarchy.getDescendants(core.class.Doc)
|
||||
let updatedCount = 0
|
||||
const curHash = Date.now().toString(16) // Current hash value
|
||||
for (const _class of classes) {
|
||||
const domain = hierarchy.findDomain(_class)
|
||||
if (domain === undefined) continue
|
||||
@ -319,44 +323,60 @@ export async function restoreMarkupRefsMongo (
|
||||
if (attributes.length === 0) continue
|
||||
if (hierarchy.isMixin(_class) && attributes.every((p) => p.attributeOf !== _class)) continue
|
||||
|
||||
ctx.info('processing', { _class, attributes: attributes.map((p) => p.name) })
|
||||
ctx.info('processing', { _class, domain, attributes: attributes.map((p) => p.name) })
|
||||
|
||||
const collection = db.collection<Doc>(domain)
|
||||
const iterator = collection.find({ _class })
|
||||
const iterator = await lowLevelStorage.traverse(domain, { _class })
|
||||
try {
|
||||
while (true) {
|
||||
const doc = await iterator.next()
|
||||
if (doc === null) {
|
||||
// next's count param doesn't work for CR. Always returns all rows regardless of count param.
|
||||
const docs = await iterator.next(20)
|
||||
if (docs === null) {
|
||||
break
|
||||
}
|
||||
for (const doc of docs) {
|
||||
if (doc === null) continue
|
||||
|
||||
for (const attribute of attributes) {
|
||||
const isMixin = hierarchy.isMixin(attribute.attributeOf)
|
||||
for (const attribute of attributes) {
|
||||
const isMixin = hierarchy.isMixin(attribute.attributeOf)
|
||||
|
||||
const attributeName = isMixin ? `${attribute.attributeOf}.${attribute.name}` : attribute.name
|
||||
const attributeName = isMixin ? `${attribute.attributeOf}.${attribute.name}` : attribute.name
|
||||
|
||||
const value = isMixin
|
||||
? ((doc as any)[attribute.attributeOf]?.[attribute.name] as string)
|
||||
: ((doc as any)[attribute.name] as string)
|
||||
const value = isMixin
|
||||
? ((doc as any)[attribute.attributeOf]?.[attribute.name] as string)
|
||||
: ((doc as any)[attribute.name] as string)
|
||||
|
||||
if (typeof value === 'string') {
|
||||
continue
|
||||
if (value != null && value !== '') {
|
||||
continue
|
||||
}
|
||||
|
||||
const collabId = makeCollabId(doc._class, doc._id, attribute.name)
|
||||
const ydocId = makeCollabYdocId(collabId)
|
||||
|
||||
try {
|
||||
// Note: read operation throws if not found so it won't update docs for which
|
||||
// there's no blob.
|
||||
const buffer = await storageAdapter.read(ctx, workspaceId, ydocId)
|
||||
if (!dryRun) {
|
||||
const ydoc = yDocFromBuffer(Buffer.concat(buffer as any))
|
||||
|
||||
const jsonId = await saveCollabJson(ctx, storageAdapter, workspaceId, collabId, ydoc)
|
||||
await lowLevelStorage.rawUpdate(domain, { _id: doc._id }, {
|
||||
[attributeName]: jsonId,
|
||||
'%hash%': curHash
|
||||
} as any)
|
||||
}
|
||||
|
||||
ctx.info('Restored empty markup ref', { _class, attributeName, collabId, ydocId })
|
||||
updatedCount++
|
||||
} catch (err: any) {
|
||||
ctx.error('Failed to restore markup ref', { _class, attributeName, collabId, ydocId, err })
|
||||
}
|
||||
}
|
||||
|
||||
const collabId = makeCollabId(doc._class, doc._id, attribute.name)
|
||||
const ydocId = makeCollabYdocId(collabId)
|
||||
|
||||
try {
|
||||
const buffer = await storageAdapter.read(ctx, workspaceId, ydocId)
|
||||
const ydoc = yDocFromBuffer(Buffer.concat(buffer as any))
|
||||
|
||||
const jsonId = await saveCollabJson(ctx, storageAdapter, workspaceId, collabId, ydoc)
|
||||
await collection.updateOne({ _id: doc._id }, { $set: { [attributeName]: jsonId } })
|
||||
} catch {}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
await iterator.close()
|
||||
}
|
||||
}
|
||||
ctx.info('Restored markup refs', { updatedCount })
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user