Fix move tool (#6825)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-10-07 15:35:01 +07:00 committed by GitHub
parent 304beeae31
commit db9af92f1d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 50 additions and 33 deletions

View File

@ -1110,7 +1110,6 @@ export function devTool (
.action(
async (cmd: { workspace: string, move: string, blobLimit: string, concurrency: string, disabled: boolean }) => {
const params = {
blobSizeLimitMb: parseInt(cmd.blobLimit),
concurrency: parseInt(cmd.concurrency),
move: cmd.move === 'true'
}

View File

@ -21,7 +21,6 @@ import { type Db } from 'mongodb'
import { PassThrough } from 'stream'
export interface MoveFilesParams {
blobSizeLimitMb: number
concurrency: number
move: boolean
}
@ -86,7 +85,7 @@ export async function moveFiles (
for (const [name, adapter] of exAdapter.adapters.entries()) {
if (name === exAdapter.defaultAdapter) continue
console.log('moving from', name, 'limit', params.blobSizeLimitMb, 'concurrency', params.concurrency)
console.log('moving from', name, 'limit', 'concurrency', params.concurrency)
// we attempt retry the whole process in case of failure
// files that were already moved will be skipped
@ -129,10 +128,13 @@ async function processAdapter (
workspaceId: WorkspaceId,
params: MoveFilesParams
): Promise<void> {
if (source === target) {
// Just in case
return
}
let time = Date.now()
let processedCnt = 0
let processedBytes = 0
let skippedCnt = 0
let movedCnt = 0
let movedBytes = 0
let batchBytes = 0
@ -140,47 +142,62 @@ async function processAdapter (
const rateLimiter = new RateLimiter(params.concurrency)
const iterator = await source.listStream(ctx, workspaceId)
const toRemove: string[] = []
try {
while (true) {
const dataBulk = await iterator.next()
if (dataBulk.length === 0) break
for (const data of dataBulk) {
const blob =
(await exAdapter.stat(ctx, workspaceId, data._id)) ?? (await source.stat(ctx, workspaceId, data._id))
let targetBlob = await target.stat(ctx, workspaceId, data._id)
const sourceBlob = await source.stat(ctx, workspaceId, data._id)
if (blob === undefined) {
if (sourceBlob === undefined) {
console.error('blob not found', data._id)
continue
}
if (targetBlob !== undefined) {
console.log('Target blob already exists', targetBlob._id, targetBlob.contentType)
}
if (blob.provider !== exAdapter.defaultAdapter) {
if (blob.size <= params.blobSizeLimitMb * 1024 * 1024) {
await rateLimiter.exec(async () => {
try {
await retryOnFailure(
ctx,
5,
async () => {
await processFile(ctx, source, params.move ? exAdapter : target, workspaceId, blob)
},
50
)
movedCnt += 1
movedBytes += blob.size
batchBytes += blob.size
} catch (err) {
console.error('failed to process blob', data._id, err)
}
})
} else {
skippedCnt += 1
console.log('skipping large blob', data._id, Math.round(blob.size / 1024 / 1024))
}
if (targetBlob === undefined) {
await rateLimiter.exec(async () => {
try {
await retryOnFailure(
ctx,
5,
async () => {
await processFile(ctx, source, target, workspaceId, sourceBlob)
// We need to sync and update aggregator table for now.
await exAdapter.syncBlobFromStorage(ctx, workspaceId, sourceBlob._id, exAdapter.defaultAdapter)
},
50
)
movedCnt += 1
movedBytes += sourceBlob.size
batchBytes += sourceBlob.size
} catch (err) {
console.error('failed to process blob', data._id, err)
}
})
}
if (targetBlob === undefined) {
targetBlob = await target.stat(ctx, workspaceId, data._id)
}
if (
targetBlob !== undefined &&
targetBlob.size === sourceBlob.size &&
targetBlob.contentType === sourceBlob.contentType
) {
// We could safely delete source blob
toRemove.push(sourceBlob._id)
}
processedCnt += 1
processedBytes += blob.size
processedBytes += sourceBlob.size
if (processedCnt % 100 === 0) {
await rateLimiter.waitProcessing()
@ -195,8 +212,6 @@ async function processAdapter (
movedCnt,
Math.round(movedBytes / 1024 / 1024) + 'MB',
'+' + Math.round(batchBytes / 1024 / 1024) + 'MB',
'skipped',
skippedCnt,
Math.round(duration / 1000) + 's'
)
@ -207,6 +222,9 @@ async function processAdapter (
}
await rateLimiter.waitProcessing()
if (toRemove.length > 0 && params.move) {
await source.remove(ctx, workspaceId, toRemove)
}
} finally {
await iterator.close()
}