feat: markup migration tool (#6398)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2024-08-27 14:11:30 +07:00 committed by GitHub
parent d7820206c0
commit 0f56185730
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 252 additions and 53 deletions

View File

@ -90,7 +90,7 @@ import {
restoreRecruitingTaskTypes restoreRecruitingTaskTypes
} from './clean' } from './clean'
import { changeConfiguration } from './configuration' import { changeConfiguration } from './configuration'
import { fixJsonMarkup } from './markup' import { fixJsonMarkup, migrateMarkup } from './markup'
import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin' import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin'
import { fixAccountEmails, renameAccount } from './renameAccount' import { fixAccountEmails, renameAccount } from './renameAccount'
import { moveFiles } from './storage' import { moveFiles } from './storage'
@ -1290,6 +1290,34 @@ export function devTool (
}) })
}) })
program
.command('migrate-markup')
.description('migrates collaborative markup to storage')
.option('-w, --workspace <workspace>', 'Selected workspace only', '')
.option('-c, --concurrency <concurrency>', 'Number of documents being processed concurrently', '10')
.action(async (cmd: { workspace: string, concurrency: string }) => {
const { mongodbUri } = prepareTools()
await withDatabase(mongodbUri, async (db, client) => {
await withStorage(mongodbUri, async (adapter) => {
const workspaces = await listWorkspacesPure(db)
for (const workspace of workspaces) {
if (cmd.workspace !== '' && workspace.workspace !== cmd.workspace) {
continue
}
const wsId = getWorkspaceId(workspace.workspace)
const endpoint = await getTransactorEndpoint(generateToken(systemAccountEmail, wsId), 'external')
console.log('processing workspace', workspace.workspace)
await migrateMarkup(toolCtx, adapter, wsId, client, endpoint, parseInt(cmd.concurrency))
console.log('...done', workspace.workspace)
}
})
})
})
program program
.command('remove-duplicates-ids <workspaces>') .command('remove-duplicates-ids <workspaces>')
.description('remove duplicates ids for futue migration') .description('remove duplicates ids for futue migration')

View File

@ -1,19 +1,23 @@
import { saveCollaborativeDoc } from '@hcengineering/collaboration'
import core, { import core, {
type AnyAttribute, type AnyAttribute,
type Class, type Class,
type Client as CoreClient, type Client as CoreClient,
type Doc, type Doc,
type Domain, type Domain,
type Hierarchy,
type MeasureContext, type MeasureContext,
type Ref, type Ref,
type WorkspaceId, type WorkspaceId,
RateLimiter,
collaborativeDocParse,
makeCollaborativeDoc makeCollaborativeDoc
} from '@hcengineering/core' } from '@hcengineering/core'
import { getMongoClient, getWorkspaceDB } from '@hcengineering/mongo' import { getMongoClient, getWorkspaceDB } from '@hcengineering/mongo'
import { type StorageAdapter } from '@hcengineering/server-core' import { type StorageAdapter } from '@hcengineering/server-core'
import { connect } from '@hcengineering/server-tool' import { connect } from '@hcengineering/server-tool'
import { jsonToText } from '@hcengineering/text' import { jsonToText, markupToYDoc } from '@hcengineering/text'
import { type Db } from 'mongodb' import { type Db, type FindCursor, type MongoClient } from 'mongodb'
export async function fixJsonMarkup ( export async function fixJsonMarkup (
ctx: MeasureContext, ctx: MeasureContext,
@ -110,3 +114,105 @@ async function processFixJsonMarkupFor (
console.log('...processed', docs.length) console.log('...processed', docs.length)
} }
export async function migrateMarkup (
ctx: MeasureContext,
storageAdapter: StorageAdapter,
workspaceId: WorkspaceId,
client: MongoClient,
transactorUrl: string,
concurrency: number
): Promise<void> {
const connection = (await connect(transactorUrl, workspaceId, undefined, {
mode: 'backup'
})) as unknown as CoreClient
const hierarchy = connection.getHierarchy()
const workspaceDb = client.db(workspaceId.name)
try {
const classes = hierarchy.getDescendants(core.class.Doc)
for (const _class of classes) {
const domain = hierarchy.findDomain(_class)
if (domain === undefined) continue
const allAttributes = hierarchy.getAllAttributes(_class)
const attributes = Array.from(allAttributes.values()).filter((attribute) => {
return hierarchy.isDerived(attribute.type._class, 'core:class:TypeCollaborativeMarkup' as Ref<Class<Doc>>)
})
if (attributes.length === 0) continue
if (hierarchy.isMixin(_class) && attributes.every((p) => p.attributeOf !== _class)) continue
const collection = workspaceDb.collection(domain)
const filter = hierarchy.isMixin(_class) ? { [_class]: { $exists: true } } : { _class }
const count = await collection.countDocuments(filter)
const iterator = collection.find<Doc>(filter)
try {
console.log('processing', _class, '->', count)
await processMigrateMarkupFor(ctx, hierarchy, storageAdapter, workspaceId, attributes, iterator, concurrency)
} finally {
await iterator.close()
}
}
} finally {
await connection.close()
}
}
async function processMigrateMarkupFor (
ctx: MeasureContext,
hierarchy: Hierarchy,
storageAdapter: StorageAdapter,
workspaceId: WorkspaceId,
attributes: AnyAttribute[],
iterator: FindCursor<Doc>,
concurrency: number
): Promise<void> {
const rateLimiter = new RateLimiter(concurrency)
let processed = 0
while (true) {
const doc = await iterator.next()
if (doc === null) break
const timestamp = Date.now()
const revisionId = `${timestamp}`
await rateLimiter.exec(async () => {
for (const attribute of attributes) {
const collaborativeDoc = makeCollaborativeDoc(doc._id, attribute.name, revisionId)
const { documentId } = collaborativeDocParse(collaborativeDoc)
const value = hierarchy.isMixin(attribute.attributeOf)
? ((doc as any)[attribute.attributeOf]?.[attribute.name] as string)
: ((doc as any)[attribute.name] as string)
if (value != null && value.startsWith('{')) {
const blob = await storageAdapter.stat(ctx, workspaceId, documentId)
// only for documents not in storage
if (blob === undefined) {
const ydoc = markupToYDoc(value, attribute.name)
await saveCollaborativeDoc(storageAdapter, workspaceId, collaborativeDoc, ydoc, ctx)
}
}
}
})
processed += 1
if (processed % 100 === 0) {
await rateLimiter.waitProcessing()
console.log('...processing', processed)
}
}
await rateLimiter.waitProcessing()
console.log('processed', processed)
}

View File

@ -14,22 +14,22 @@
// //
import { type Blob, type MeasureContext, type WorkspaceId, RateLimiter } from '@hcengineering/core' import { type Blob, type MeasureContext, type WorkspaceId, RateLimiter } from '@hcengineering/core'
import { type StorageAdapterEx } from '@hcengineering/server-core' import { type StorageAdapter, type StorageAdapterEx } from '@hcengineering/server-core'
import { PassThrough } from 'stream' import { PassThrough } from 'stream'
export interface MoveFilesParams {
blobSizeLimitMb: number
concurrency: number
}
export async function moveFiles ( export async function moveFiles (
ctx: MeasureContext, ctx: MeasureContext,
workspaceId: WorkspaceId, workspaceId: WorkspaceId,
exAdapter: StorageAdapterEx, exAdapter: StorageAdapterEx,
params: { params: MoveFilesParams
blobSizeLimitMb: number
concurrency: number
}
): Promise<void> { ): Promise<void> {
if (exAdapter.adapters === undefined) return if (exAdapter.adapters === undefined) return
let count = 0
console.log('start', workspaceId.name) console.log('start', workspaceId.name)
// We assume that the adapter moves all new files to the default adapter // We assume that the adapter moves all new files to the default adapter
@ -38,74 +38,125 @@ export async function moveFiles (
for (const [name, adapter] of exAdapter.adapters.entries()) { for (const [name, adapter] of exAdapter.adapters.entries()) {
if (name === target) continue if (name === target) continue
console.log('moving from', name, 'limit', params.blobSizeLimitMb, 'concurrency', params.concurrency) console.log('moving from', name, 'limit', params.blobSizeLimitMb, 'concurrency', params.concurrency)
let time = Date.now() // we attempt retry the whole process in case of failure
// files that were already moved will be skipped
await retryOnFailure(ctx, 5, async () => {
await processAdapter(ctx, exAdapter, adapter, workspaceId, params)
})
}
const rateLimiter = new RateLimiter(params.concurrency) console.log('...done', workspaceId.name)
}
const iterator = await adapter.listStream(ctx, workspaceId) async function processAdapter (
ctx: MeasureContext,
exAdapter: StorageAdapterEx,
adapter: StorageAdapter,
workspaceId: WorkspaceId,
params: MoveFilesParams
): Promise<void> {
const target = exAdapter.defaultAdapter
let time = Date.now()
let processedCnt = 0
let processedBytes = 0
let skippedCnt = 0
let movedCnt = 0
let movedBytes = 0
let batchBytes = 0
const rateLimiter = new RateLimiter(params.concurrency)
const iterator = await adapter.listStream(ctx, workspaceId)
try {
while (true) { while (true) {
const data = await iterator.next() const data = await iterator.next()
if (data === undefined) break if (data === undefined) break
const blob = await exAdapter.stat(ctx, workspaceId, data._id) const blob =
if (blob === undefined) continue (await exAdapter.stat(ctx, workspaceId, data._id)) ?? (await adapter.stat(ctx, workspaceId, data._id))
if (blob.provider === target) continue
if (blob.size > params.blobSizeLimitMb * 1024 * 1024) { if (blob === undefined) {
console.log('skipping large blob', name, data._id, Math.round(blob.size / 1024 / 1024)) console.error('blob not found', data._id)
continue continue
} }
await rateLimiter.exec(async () => { if (blob.provider !== target) {
try { if (blob.size <= params.blobSizeLimitMb * 1024 * 1024) {
await retryOnFailure( await rateLimiter.exec(async () => {
ctx, try {
5, await retryOnFailure(
async () => { ctx,
await moveFile(ctx, exAdapter, workspaceId, blob) 5,
}, async () => {
50 await processFile(ctx, exAdapter, adapter, workspaceId, blob)
) },
} catch (err) { 50
console.error('failed to process blob', name, data._id, err) )
movedCnt += 1
movedBytes += blob.size
batchBytes += blob.size
} catch (err) {
console.error('failed to process blob', data._id, err)
}
})
} else {
skippedCnt += 1
console.log('skipping large blob', data._id, Math.round(blob.size / 1024 / 1024))
} }
}) }
count += 1 processedCnt += 1
if (count % 100 === 0) { processedBytes += blob.size
if (processedCnt % 100 === 0) {
await rateLimiter.waitProcessing() await rateLimiter.waitProcessing()
const duration = Date.now() - time const duration = Date.now() - time
console.log(
'...processed',
processedCnt,
Math.round(processedBytes / 1024 / 1024) + 'MB',
'moved',
movedCnt,
Math.round(movedBytes / 1024 / 1024) + 'MB',
'+' + Math.round(batchBytes / 1024 / 1024) + 'MB',
'skipped',
skippedCnt,
Math.round(duration / 1000) + 's'
)
batchBytes = 0
time = Date.now() time = Date.now()
console.log('...moved: ', count, Math.round(duration / 1000))
} }
} }
await rateLimiter.waitProcessing() await rateLimiter.waitProcessing()
} finally {
await iterator.close() await iterator.close()
} }
console.log('...done', workspaceId.name, count)
} }
async function moveFile ( async function processFile (
ctx: MeasureContext, ctx: MeasureContext,
exAdapter: StorageAdapterEx, exAdapter: StorageAdapterEx,
adapter: StorageAdapter,
workspaceId: WorkspaceId, workspaceId: WorkspaceId,
blob: Blob blob: Blob
): Promise<void> { ): Promise<void> {
const readable = await exAdapter.get(ctx, workspaceId, blob._id) const readable = await adapter.get(ctx, workspaceId, blob._id)
try { try {
readable.on('end', () => { readable.on('end', () => {
readable.destroy() readable.destroy()
}) })
const stream = readable.pipe(new PassThrough()) const stream = readable.pipe(new PassThrough())
await exAdapter.put(ctx, workspaceId, blob._id, stream, blob.contentType, blob.size) await exAdapter.put(ctx, workspaceId, blob._id, stream, blob.contentType, blob.size)
} catch (err) { } finally {
readable.destroy() readable.destroy()
throw err
} }
} }
@ -115,18 +166,19 @@ async function retryOnFailure<T> (
op: () => Promise<T>, op: () => Promise<T>,
delay: number = 0 delay: number = 0
): Promise<T> { ): Promise<T> {
let error: any let lastError: any
while (retries > 0) { while (retries > 0) {
retries-- retries--
try { try {
return await op() return await op()
} catch (err: any) { } catch (err: any) {
error = err console.error(err)
lastError = err
ctx.error('error', { err, retries }) ctx.error('error', { err, retries })
if (retries !== 0 && delay > 0) { if (retries !== 0 && delay > 0) {
await new Promise((resolve) => setTimeout(resolve, delay)) await new Promise((resolve) => setTimeout(resolve, delay))
} }
} }
} }
throw error throw lastError
} }

View File

@ -165,16 +165,20 @@ async function migrateCollaborativeContentToStorage (client: MigrationClient): P
const domain = hierarchy.findDomain(_class) const domain = hierarchy.findDomain(_class)
if (domain === undefined) continue if (domain === undefined) continue
const attributes = hierarchy.getAllAttributes(_class) const allAttributes = hierarchy.getAllAttributes(_class)
const filtered = Array.from(attributes.values()).filter((attribute) => { const attributes = Array.from(allAttributes.values()).filter((attribute) => {
return hierarchy.isDerived(attribute.type._class, core.class.TypeCollaborativeDoc) return hierarchy.isDerived(attribute.type._class, core.class.TypeCollaborativeDoc)
}) })
if (filtered.length === 0) continue
const iterator = await client.traverse(domain, { _class }) if (attributes.length === 0) continue
if (hierarchy.isMixin(_class) && attributes.every((p) => p.attributeOf !== _class)) continue
const query = hierarchy.isMixin(_class) ? { [_class]: { $exists: true } } : { _class }
const iterator = await client.traverse(domain, query)
try { try {
console.log('processing', _class) console.log('processing', _class)
await processMigrateContentFor(ctx, domain, filtered, client, storageAdapter, iterator) await processMigrateContentFor(ctx, domain, attributes, client, storageAdapter, iterator)
} finally { } finally {
await iterator.close() await iterator.close()
} }
@ -189,6 +193,8 @@ async function processMigrateContentFor (
storageAdapter: StorageAdapter, storageAdapter: StorageAdapter,
iterator: MigrationIterator<Doc> iterator: MigrationIterator<Doc>
): Promise<void> { ): Promise<void> {
const hierarchy = client.hierarchy
const rateLimiter = new RateLimiter(10) const rateLimiter = new RateLimiter(10)
let processed = 0 let processed = 0
@ -211,7 +217,14 @@ async function processMigrateContentFor (
for (const attribute of attributes) { for (const attribute of attributes) {
const collaborativeDoc = makeCollaborativeDoc(doc._id, attribute.name, revisionId) const collaborativeDoc = makeCollaborativeDoc(doc._id, attribute.name, revisionId)
const value = (doc as any)[attribute.name] as string const value = hierarchy.isMixin(attribute.attributeOf)
? ((doc as any)[attribute.attributeOf]?.[attribute.name] as string)
: ((doc as any)[attribute.name] as string)
const attributeName = hierarchy.isMixin(attribute.attributeOf)
? `${attribute.attributeOf}.${attribute.name}`
: attribute.name
if (value != null && value.startsWith('{')) { if (value != null && value.startsWith('{')) {
const { documentId } = collaborativeDocParse(collaborativeDoc) const { documentId } = collaborativeDocParse(collaborativeDoc)
const blob = await storageAdapter.stat(ctx, client.workspaceId, documentId) const blob = await storageAdapter.stat(ctx, client.workspaceId, documentId)
@ -221,9 +234,9 @@ async function processMigrateContentFor (
await saveCollaborativeDoc(storageAdapter, client.workspaceId, collaborativeDoc, ydoc, ctx) await saveCollaborativeDoc(storageAdapter, client.workspaceId, collaborativeDoc, ydoc, ctx)
} }
update[attribute.name] = collaborativeDoc update[attributeName] = collaborativeDoc
} else if (value == null) { } else if (value == null || value === '') {
update[attribute.name] = makeCollaborativeDoc(doc._id, attribute.name, revisionId) update[attributeName] = collaborativeDoc
} }
} }