diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 0f87adf1c3..f7aebcb3d3 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -991,7 +991,8 @@ export function devTool ( .command('move-files') .option('-w, --workspace ', 'Selected workspace only', '') .option('-bl, --blobLimit ', 'A blob size limit in megabytes (default 50mb)', '50') - .action(async (cmd: { workspace: string, blobLimit: string }) => { + .option('-c, --concurrency ', 'Number of files being processed concurrently', '10') + .action(async (cmd: { workspace: string, blobLimit: string, concurrency: string }) => { const { mongodbUri } = prepareTools() await withDatabase(mongodbUri, async (db, client) => { await withStorage(mongodbUri, async (adapter) => { @@ -1010,7 +1011,10 @@ export function devTool ( } const wsId = getWorkspaceId(workspace.workspace) - await moveFiles(toolCtx, wsId, exAdapter, parseInt(cmd.blobLimit)) + await moveFiles(toolCtx, wsId, exAdapter, { + blobSizeLimitMb: parseInt(cmd.blobLimit), + concurrency: parseInt(cmd.concurrency) + }) } } catch (err: any) { console.error(err) diff --git a/dev/tool/src/storage.ts b/dev/tool/src/storage.ts index f091549a37..47387298de 100644 --- a/dev/tool/src/storage.ts +++ b/dev/tool/src/storage.ts @@ -13,7 +13,7 @@ // limitations under the License. // -import { type Blob, type MeasureContext, type WorkspaceId } from '@hcengineering/core' +import { type Blob, type MeasureContext, type WorkspaceId, RateLimiter } from '@hcengineering/core' import { type StorageAdapterEx } from '@hcengineering/server-core' import { PassThrough } from 'stream' @@ -21,7 +21,10 @@ export async function moveFiles ( ctx: MeasureContext, workspaceId: WorkspaceId, exAdapter: StorageAdapterEx, - blobSizeLimitMb: number + params: { + blobSizeLimitMb: number + concurrency: number + } ): Promise { if (exAdapter.adapters === undefined) return @@ -35,7 +38,11 @@ export async function moveFiles ( for (const [name, adapter] of exAdapter.adapters.entries()) { if (name === target) continue - console.log('moving from', name) + console.log('moving from', name, 'limit', params.blobSizeLimitMb, 'concurrency', params.concurrency) + + let time = Date.now() + + const rateLimiter = new RateLimiter(params.concurrency) const iterator = await adapter.listStream(ctx, workspaceId) while (true) { @@ -46,29 +53,37 @@ export async function moveFiles ( if (blob === undefined) continue if (blob.provider === target) continue - if (blob.size > blobSizeLimitMb * 1024 * 1024) { + if (blob.size > params.blobSizeLimitMb * 1024 * 1024) { console.log('skipping large blob', name, data._id, Math.round(blob.size / 1024 / 1024)) continue } - try { - await retryOnFailure( - ctx, - 5, - async () => { - await moveFile(ctx, exAdapter, workspaceId, blob) - }, - 50 - ) - } catch (err) { - console.error('failed to process blob', name, data._id, err) - } + await rateLimiter.exec(async () => { + try { + await retryOnFailure( + ctx, + 5, + async () => { + await moveFile(ctx, exAdapter, workspaceId, blob) + }, + 50 + ) + } catch (err) { + console.error('failed to process blob', name, data._id, err) + } + }) count += 1 if (count % 100 === 0) { - console.log('...moved: ', count) + await rateLimiter.waitProcessing() + const duration = Date.now() - time + time = Date.now() + console.log('...moved: ', count, Math.round(duration / 1000)) } } + + await rateLimiter.waitProcessing() + await iterator.close() } diff --git a/models/core/src/migration.ts b/models/core/src/migration.ts index 769dab9a69..6efe17ca4c 100644 --- a/models/core/src/migration.ts +++ b/models/core/src/migration.ts @@ -13,13 +13,14 @@ // limitations under the License. // -import { saveCollaborativeDoc, takeCollaborativeDocSnapshot } from '@hcengineering/collaboration' +import { saveCollaborativeDoc } from '@hcengineering/collaboration' import core, { DOMAIN_BLOB, DOMAIN_DOC_INDEX_STATE, DOMAIN_STATUS, DOMAIN_TX, MeasureMetricsContext, + RateLimiter, collaborativeDocParse, coreId, generateId, @@ -188,7 +189,10 @@ async function processMigrateContentFor ( storageAdapter: StorageAdapter, iterator: MigrationIterator ): Promise { + const rateLimiter = new RateLimiter(10) + let processed = 0 + while (true) { const docs = await iterator.next(1000) if (docs === null || docs.length === 0) { @@ -201,45 +205,36 @@ async function processMigrateContentFor ( const operations: { filter: MigrationDocumentQuery, update: MigrateUpdate }[] = [] for (const doc of docs) { - const update: MigrateUpdate = {} + await rateLimiter.exec(async () => { + const update: MigrateUpdate = {} - for (const attribute of attributes) { - const collaborativeDoc = makeCollaborativeDoc(doc._id, attribute.name, revisionId) + for (const attribute of attributes) { + const collaborativeDoc = makeCollaborativeDoc(doc._id, attribute.name, revisionId) - const value = (doc as any)[attribute.name] as string - if (value != null && value.startsWith('{')) { - const { documentId } = collaborativeDocParse(collaborativeDoc) - const blob = await storageAdapter.stat(ctx, client.workspaceId, documentId) - // only for documents not in storage - if (blob === undefined) { - const ydoc = markupToYDoc(value, attribute.name) - await saveCollaborativeDoc(storageAdapter, client.workspaceId, collaborativeDoc, ydoc, ctx) - await takeCollaborativeDocSnapshot( - storageAdapter, - client.workspaceId, - collaborativeDoc, - ydoc, - { - versionId: revisionId, - name: 'Migration to storage', - createdBy: core.account.System, - createdOn: Date.now() - }, - ctx - ) + const value = (doc as any)[attribute.name] as string + if (value != null && value.startsWith('{')) { + const { documentId } = collaborativeDocParse(collaborativeDoc) + const blob = await storageAdapter.stat(ctx, client.workspaceId, documentId) + // only for documents not in storage + if (blob === undefined) { + const ydoc = markupToYDoc(value, attribute.name) + await saveCollaborativeDoc(storageAdapter, client.workspaceId, collaborativeDoc, ydoc, ctx) + } + + update[attribute.name] = collaborativeDoc + } else if (value == null) { + update[attribute.name] = makeCollaborativeDoc(doc._id, attribute.name, revisionId) } - - update[attribute.name] = collaborativeDoc - } else if (value == null) { - update[attribute.name] = makeCollaborativeDoc(doc._id, attribute.name, revisionId) } - } - if (Object.keys(update).length > 0) { - operations.push({ filter: { _id: doc._id }, update }) - } + if (Object.keys(update).length > 0) { + operations.push({ filter: { _id: doc._id }, update }) + } + }) } + await rateLimiter.waitProcessing() + if (operations.length > 0) { await client.bulk(domain, operations) }