From a81f44f74e03e14b7161809a22246ef6a92e367b Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Mon, 2 Sep 2024 18:30:50 +0700 Subject: [PATCH] Add move/copy flag to file moving tool (#6459) Signed-off-by: Alexander Onnikov --- dev/tool/src/index.ts | 24 +++++++++++++++++------- dev/tool/src/storage.ts | 35 ++++++++++++++++------------------- 2 files changed, 33 insertions(+), 26 deletions(-) diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 3a13e78bb2..e8ec0c284f 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -990,11 +990,18 @@ export function devTool ( program .command('move-files') .option('-w, --workspace ', 'Selected workspace only', '') + .option('-m, --move ', 'When set to true, the files will be moved, otherwise copied', 'false') .option('-bl, --blobLimit ', 'A blob size limit in megabytes (default 50mb)', '50') .option('-c, --concurrency ', 'Number of files being processed concurrently', '10') - .action(async (cmd: { workspace: string, blobLimit: string, concurrency: string }) => { + .action(async (cmd: { workspace: string, move: string, blobLimit: string, concurrency: string }) => { + const params = { + blobSizeLimitMb: parseInt(cmd.blobLimit), + concurrency: parseInt(cmd.concurrency), + move: cmd.move === 'true' + } + const { mongodbUri } = prepareTools() - await withDatabase(mongodbUri, async (db, client) => { + await withDatabase(mongodbUri, async (db) => { await withStorage(mongodbUri, async (adapter) => { try { const exAdapter = adapter as StorageAdapterEx @@ -1004,17 +1011,20 @@ export function devTool ( console.log('moving files to storage provider', exAdapter.defaultAdapter) + let index = 1 const workspaces = await listWorkspacesPure(db) + workspaces.sort((a, b) => b.lastVisit - a.lastVisit) + for (const workspace of workspaces) { if (cmd.workspace !== '' && workspace.workspace !== cmd.workspace) { continue } - const wsId = getWorkspaceId(workspace.workspace) - await moveFiles(toolCtx, wsId, exAdapter, { - blobSizeLimitMb: parseInt(cmd.blobLimit), - concurrency: parseInt(cmd.concurrency) - }) + console.log('start', workspace, index, '/', workspaces.length) + await moveFiles(toolCtx, getWorkspaceId(workspace.workspace), exAdapter, params) + console.log('done', workspace) + + index += 1 } } catch (err: any) { console.error(err) diff --git a/dev/tool/src/storage.ts b/dev/tool/src/storage.ts index 27f7e23792..8393d5feec 100644 --- a/dev/tool/src/storage.ts +++ b/dev/tool/src/storage.ts @@ -20,6 +20,7 @@ import { PassThrough } from 'stream' export interface MoveFilesParams { blobSizeLimitMb: number concurrency: number + move: boolean } export async function moveFiles ( @@ -30,36 +31,33 @@ export async function moveFiles ( ): Promise { if (exAdapter.adapters === undefined) return - console.log('start', workspaceId.name) + const target = exAdapter.adapters.get(exAdapter.defaultAdapter) + if (target === undefined) return // We assume that the adapter moves all new files to the default adapter - const target = exAdapter.defaultAdapter - await exAdapter.adapters.get(target)?.make(ctx, workspaceId) + await target.make(ctx, workspaceId) for (const [name, adapter] of exAdapter.adapters.entries()) { - if (name === target) continue + if (name === exAdapter.defaultAdapter) continue console.log('moving from', name, 'limit', params.blobSizeLimitMb, 'concurrency', params.concurrency) // we attempt retry the whole process in case of failure // files that were already moved will be skipped await retryOnFailure(ctx, 5, async () => { - await processAdapter(ctx, exAdapter, adapter, workspaceId, params) + await processAdapter(ctx, exAdapter, adapter, target, workspaceId, params) }) } - - console.log('...done', workspaceId.name) } async function processAdapter ( ctx: MeasureContext, exAdapter: StorageAdapterEx, - adapter: StorageAdapter, + source: StorageAdapter, + target: StorageAdapter, workspaceId: WorkspaceId, params: MoveFilesParams ): Promise { - const target = exAdapter.defaultAdapter - let time = Date.now() let processedCnt = 0 let processedBytes = 0 @@ -70,21 +68,20 @@ async function processAdapter ( const rateLimiter = new RateLimiter(params.concurrency) - const iterator = await adapter.listStream(ctx, workspaceId) + const iterator = await source.listStream(ctx, workspaceId) try { while (true) { const data = await iterator.next() if (data === undefined) break - const blob = - (await exAdapter.stat(ctx, workspaceId, data._id)) ?? (await adapter.stat(ctx, workspaceId, data._id)) + const blob = (await exAdapter.stat(ctx, workspaceId, data._id)) ?? (await source.stat(ctx, workspaceId, data._id)) if (blob === undefined) { console.error('blob not found', data._id) continue } - if (blob.provider !== target) { + if (blob.provider !== exAdapter.defaultAdapter) { if (blob.size <= params.blobSizeLimitMb * 1024 * 1024) { await rateLimiter.exec(async () => { try { @@ -92,7 +89,7 @@ async function processAdapter ( ctx, 5, async () => { - await processFile(ctx, exAdapter, adapter, workspaceId, blob) + await processFile(ctx, source, params.move ? exAdapter : target, workspaceId, blob) }, 50 ) @@ -143,18 +140,18 @@ async function processAdapter ( async function processFile ( ctx: MeasureContext, - exAdapter: StorageAdapterEx, - adapter: StorageAdapter, + source: Pick, + target: Pick, workspaceId: WorkspaceId, blob: Blob ): Promise { - const readable = await adapter.get(ctx, workspaceId, blob._id) + const readable = await source.get(ctx, workspaceId, blob._id) try { readable.on('end', () => { readable.destroy() }) const stream = readable.pipe(new PassThrough()) - await exAdapter.put(ctx, workspaceId, blob._id, stream, blob.contentType, blob.size) + await target.put(ctx, workspaceId, blob._id, stream, blob.contentType, blob.size) } finally { readable.destroy() }