Add move/copy flag to file moving tool (#6459)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2024-09-02 18:30:50 +07:00 committed by GitHub
parent 8f6a8bf9ba
commit a81f44f74e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 33 additions and 26 deletions

View File

@ -990,11 +990,18 @@ export function devTool (
program program
.command('move-files') .command('move-files')
.option('-w, --workspace <workspace>', 'Selected workspace only', '') .option('-w, --workspace <workspace>', 'Selected workspace only', '')
.option('-m, --move <move>', 'When set to true, the files will be moved, otherwise copied', 'false')
.option('-bl, --blobLimit <blobLimit>', 'A blob size limit in megabytes (default 50mb)', '50') .option('-bl, --blobLimit <blobLimit>', 'A blob size limit in megabytes (default 50mb)', '50')
.option('-c, --concurrency <concurrency>', 'Number of files being processed concurrently', '10') .option('-c, --concurrency <concurrency>', 'Number of files being processed concurrently', '10')
.action(async (cmd: { workspace: string, blobLimit: string, concurrency: string }) => { .action(async (cmd: { workspace: string, move: string, blobLimit: string, concurrency: string }) => {
const params = {
blobSizeLimitMb: parseInt(cmd.blobLimit),
concurrency: parseInt(cmd.concurrency),
move: cmd.move === 'true'
}
const { mongodbUri } = prepareTools() const { mongodbUri } = prepareTools()
await withDatabase(mongodbUri, async (db, client) => { await withDatabase(mongodbUri, async (db) => {
await withStorage(mongodbUri, async (adapter) => { await withStorage(mongodbUri, async (adapter) => {
try { try {
const exAdapter = adapter as StorageAdapterEx const exAdapter = adapter as StorageAdapterEx
@ -1004,17 +1011,20 @@ export function devTool (
console.log('moving files to storage provider', exAdapter.defaultAdapter) console.log('moving files to storage provider', exAdapter.defaultAdapter)
let index = 1
const workspaces = await listWorkspacesPure(db) const workspaces = await listWorkspacesPure(db)
workspaces.sort((a, b) => b.lastVisit - a.lastVisit)
for (const workspace of workspaces) { for (const workspace of workspaces) {
if (cmd.workspace !== '' && workspace.workspace !== cmd.workspace) { if (cmd.workspace !== '' && workspace.workspace !== cmd.workspace) {
continue continue
} }
const wsId = getWorkspaceId(workspace.workspace) console.log('start', workspace, index, '/', workspaces.length)
await moveFiles(toolCtx, wsId, exAdapter, { await moveFiles(toolCtx, getWorkspaceId(workspace.workspace), exAdapter, params)
blobSizeLimitMb: parseInt(cmd.blobLimit), console.log('done', workspace)
concurrency: parseInt(cmd.concurrency)
}) index += 1
} }
} catch (err: any) { } catch (err: any) {
console.error(err) console.error(err)

View File

@ -20,6 +20,7 @@ import { PassThrough } from 'stream'
export interface MoveFilesParams { export interface MoveFilesParams {
blobSizeLimitMb: number blobSizeLimitMb: number
concurrency: number concurrency: number
move: boolean
} }
export async function moveFiles ( export async function moveFiles (
@ -30,36 +31,33 @@ export async function moveFiles (
): Promise<void> { ): Promise<void> {
if (exAdapter.adapters === undefined) return if (exAdapter.adapters === undefined) return
console.log('start', workspaceId.name) const target = exAdapter.adapters.get(exAdapter.defaultAdapter)
if (target === undefined) return
// We assume that the adapter moves all new files to the default adapter // We assume that the adapter moves all new files to the default adapter
const target = exAdapter.defaultAdapter await target.make(ctx, workspaceId)
await exAdapter.adapters.get(target)?.make(ctx, workspaceId)
for (const [name, adapter] of exAdapter.adapters.entries()) { for (const [name, adapter] of exAdapter.adapters.entries()) {
if (name === target) continue if (name === exAdapter.defaultAdapter) continue
console.log('moving from', name, 'limit', params.blobSizeLimitMb, 'concurrency', params.concurrency) console.log('moving from', name, 'limit', params.blobSizeLimitMb, 'concurrency', params.concurrency)
// we attempt retry the whole process in case of failure // we attempt retry the whole process in case of failure
// files that were already moved will be skipped // files that were already moved will be skipped
await retryOnFailure(ctx, 5, async () => { await retryOnFailure(ctx, 5, async () => {
await processAdapter(ctx, exAdapter, adapter, workspaceId, params) await processAdapter(ctx, exAdapter, adapter, target, workspaceId, params)
}) })
} }
console.log('...done', workspaceId.name)
} }
async function processAdapter ( async function processAdapter (
ctx: MeasureContext, ctx: MeasureContext,
exAdapter: StorageAdapterEx, exAdapter: StorageAdapterEx,
adapter: StorageAdapter, source: StorageAdapter,
target: StorageAdapter,
workspaceId: WorkspaceId, workspaceId: WorkspaceId,
params: MoveFilesParams params: MoveFilesParams
): Promise<void> { ): Promise<void> {
const target = exAdapter.defaultAdapter
let time = Date.now() let time = Date.now()
let processedCnt = 0 let processedCnt = 0
let processedBytes = 0 let processedBytes = 0
@ -70,21 +68,20 @@ async function processAdapter (
const rateLimiter = new RateLimiter(params.concurrency) const rateLimiter = new RateLimiter(params.concurrency)
const iterator = await adapter.listStream(ctx, workspaceId) const iterator = await source.listStream(ctx, workspaceId)
try { try {
while (true) { while (true) {
const data = await iterator.next() const data = await iterator.next()
if (data === undefined) break if (data === undefined) break
const blob = const blob = (await exAdapter.stat(ctx, workspaceId, data._id)) ?? (await source.stat(ctx, workspaceId, data._id))
(await exAdapter.stat(ctx, workspaceId, data._id)) ?? (await adapter.stat(ctx, workspaceId, data._id))
if (blob === undefined) { if (blob === undefined) {
console.error('blob not found', data._id) console.error('blob not found', data._id)
continue continue
} }
if (blob.provider !== target) { if (blob.provider !== exAdapter.defaultAdapter) {
if (blob.size <= params.blobSizeLimitMb * 1024 * 1024) { if (blob.size <= params.blobSizeLimitMb * 1024 * 1024) {
await rateLimiter.exec(async () => { await rateLimiter.exec(async () => {
try { try {
@ -92,7 +89,7 @@ async function processAdapter (
ctx, ctx,
5, 5,
async () => { async () => {
await processFile(ctx, exAdapter, adapter, workspaceId, blob) await processFile(ctx, source, params.move ? exAdapter : target, workspaceId, blob)
}, },
50 50
) )
@ -143,18 +140,18 @@ async function processAdapter (
async function processFile ( async function processFile (
ctx: MeasureContext, ctx: MeasureContext,
exAdapter: StorageAdapterEx, source: Pick<StorageAdapter, 'get'>,
adapter: StorageAdapter, target: Pick<StorageAdapter, 'put'>,
workspaceId: WorkspaceId, workspaceId: WorkspaceId,
blob: Blob blob: Blob
): Promise<void> { ): Promise<void> {
const readable = await adapter.get(ctx, workspaceId, blob._id) const readable = await source.get(ctx, workspaceId, blob._id)
try { try {
readable.on('end', () => { readable.on('end', () => {
readable.destroy() readable.destroy()
}) })
const stream = readable.pipe(new PassThrough()) const stream = readable.pipe(new PassThrough())
await exAdapter.put(ctx, workspaceId, blob._id, stream, blob.contentType, blob.size) await target.put(ctx, workspaceId, blob._id, stream, blob.contentType, blob.size)
} finally { } finally {
readable.destroy() readable.destroy()
} }