Fix move tool v2 (#6852)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-10-09 22:44:41 +07:00 committed by GitHub
parent ab3df02270
commit e2d1b544d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 27 additions and 14 deletions

View File

@ -32,7 +32,7 @@ export async function syncFiles (
): Promise<void> { ): Promise<void> {
if (exAdapter.adapters === undefined) return if (exAdapter.adapters === undefined) return
for (const [name, adapter] of exAdapter.adapters.entries()) { for (const [name, adapter] of [...exAdapter.adapters.entries()].reverse()) {
await adapter.make(ctx, workspaceId) await adapter.make(ctx, workspaceId)
await retryOnFailure(ctx, 5, async () => { await retryOnFailure(ctx, 5, async () => {
@ -47,7 +47,12 @@ export async function syncFiles (
for (const data of dataBulk) { for (const data of dataBulk) {
const blob = await exAdapter.stat(ctx, workspaceId, data._id) const blob = await exAdapter.stat(ctx, workspaceId, data._id)
if (blob !== undefined) continue if (blob !== undefined) {
if (blob.provider !== name && name === exAdapter.defaultAdapter) {
await exAdapter.syncBlobFromStorage(ctx, workspaceId, data._id, exAdapter.defaultAdapter)
}
continue
}
await exAdapter.syncBlobFromStorage(ctx, workspaceId, data._id, name) await exAdapter.syncBlobFromStorage(ctx, workspaceId, data._id, name)
@ -167,6 +172,15 @@ async function processAdapter (
let targetBlob: Blob | ListBlobResult | undefined = targetBlobs.get(data._id) let targetBlob: Blob | ListBlobResult | undefined = targetBlobs.get(data._id)
if (targetBlob !== undefined) { if (targetBlob !== undefined) {
console.log('Target blob already exists', targetBlob._id) console.log('Target blob already exists', targetBlob._id)
const aggrBlob = await exAdapter.stat(ctx, workspaceId, data._id)
if (aggrBlob === undefined || aggrBlob?.provider !== targetBlob.provider) {
targetBlob = await exAdapter.syncBlobFromStorage(ctx, workspaceId, targetBlob._id, exAdapter.defaultAdapter)
}
if (targetBlob.size === data.size) {
// We could safely delete source blob
toRemove.push(data._id)
}
} }
if (targetBlob === undefined) { if (targetBlob === undefined) {
@ -176,32 +190,28 @@ async function processAdapter (
console.error('blob not found', data._id) console.error('blob not found', data._id)
continue continue
} }
await rateLimiter.exec(async () => { targetBlob = await rateLimiter.exec(async () => {
try { try {
await retryOnFailure( const result = await retryOnFailure(
ctx, ctx,
5, 5,
async () => { async () => {
await processFile(ctx, source, target, workspaceId, sourceBlob) await processFile(ctx, source, target, workspaceId, sourceBlob)
// We need to sync and update aggregator table for now. // We need to sync and update aggregator table for now.
targetBlob = await exAdapter.syncBlobFromStorage( return await exAdapter.syncBlobFromStorage(ctx, workspaceId, sourceBlob._id, exAdapter.defaultAdapter)
ctx,
workspaceId,
sourceBlob._id,
exAdapter.defaultAdapter
)
}, },
50 50
) )
movedCnt += 1 movedCnt += 1
movedBytes += sourceBlob.size movedBytes += sourceBlob.size
batchBytes += sourceBlob.size batchBytes += sourceBlob.size
return result
} catch (err) { } catch (err) {
console.error('failed to process blob', data._id, err) console.error('failed to process blob', data._id, err)
} }
}) })
if (targetBlob !== undefined && 'size' in targetBlob && (targetBlob as Blob).size === sourceBlob.size) { if (targetBlob !== undefined && targetBlob.size === sourceBlob.size) {
// We could safely delete source blob // We could safely delete source blob
toRemove.push(sourceBlob._id) toRemove.push(sourceBlob._id)
} }
@ -233,7 +243,10 @@ async function processAdapter (
await rateLimiter.waitProcessing() await rateLimiter.waitProcessing()
if (toRemove.length > 0 && params.move) { if (toRemove.length > 0 && params.move) {
await source.remove(ctx, workspaceId, toRemove) while (toRemove.length > 0) {
const part = toRemove.splice(0, 500)
await source.remove(ctx, workspaceId, part)
}
} }
} finally { } finally {
await iterator.close() await iterator.close()

View File

@ -62,13 +62,13 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
} }
} }
const provider = this.adapters.get(current?.provider ?? this.defaultAdapter) const provider = this.adapters.get(providerId ?? current?.provider ?? this.defaultAdapter)
if (provider === undefined) { if (provider === undefined) {
throw new NoSuchKeyError('No such provider found') throw new NoSuchKeyError('No such provider found')
} }
const stat = await provider.stat(ctx, workspaceId, objectName) const stat = await provider.stat(ctx, workspaceId, objectName)
if (stat !== undefined) { if (stat !== undefined) {
stat.provider = current?.provider ?? this.defaultAdapter stat.provider = providerId ?? current?.provider ?? this.defaultAdapter
if (current !== undefined) { if (current !== undefined) {
await this.dbAdapter.clean(ctx, workspaceId, DOMAIN_BLOB, [current._id]) await this.dbAdapter.clean(ctx, workspaceId, DOMAIN_BLOB, [current._id])
} }