diff --git a/dev/tool/src/storage.ts b/dev/tool/src/storage.ts index 394cc4729f..361bf19ac1 100644 --- a/dev/tool/src/storage.ts +++ b/dev/tool/src/storage.ts @@ -32,7 +32,7 @@ export async function syncFiles ( ): Promise { if (exAdapter.adapters === undefined) return - for (const [name, adapter] of exAdapter.adapters.entries()) { + for (const [name, adapter] of [...exAdapter.adapters.entries()].reverse()) { await adapter.make(ctx, workspaceId) await retryOnFailure(ctx, 5, async () => { @@ -47,7 +47,12 @@ export async function syncFiles ( for (const data of dataBulk) { const blob = await exAdapter.stat(ctx, workspaceId, data._id) - if (blob !== undefined) continue + if (blob !== undefined) { + if (blob.provider !== name && name === exAdapter.defaultAdapter) { + await exAdapter.syncBlobFromStorage(ctx, workspaceId, data._id, exAdapter.defaultAdapter) + } + continue + } await exAdapter.syncBlobFromStorage(ctx, workspaceId, data._id, name) @@ -167,6 +172,15 @@ async function processAdapter ( let targetBlob: Blob | ListBlobResult | undefined = targetBlobs.get(data._id) if (targetBlob !== undefined) { console.log('Target blob already exists', targetBlob._id) + + const aggrBlob = await exAdapter.stat(ctx, workspaceId, data._id) + if (aggrBlob === undefined || aggrBlob?.provider !== targetBlob.provider) { + targetBlob = await exAdapter.syncBlobFromStorage(ctx, workspaceId, targetBlob._id, exAdapter.defaultAdapter) + } + if (targetBlob.size === data.size) { + // We could safely delete source blob + toRemove.push(data._id) + } } if (targetBlob === undefined) { @@ -176,32 +190,28 @@ async function processAdapter ( console.error('blob not found', data._id) continue } - await rateLimiter.exec(async () => { + targetBlob = await rateLimiter.exec(async () => { try { - await retryOnFailure( + const result = await retryOnFailure( ctx, 5, async () => { await processFile(ctx, source, target, workspaceId, sourceBlob) // We need to sync and update aggregator table for now. - targetBlob = await exAdapter.syncBlobFromStorage( - ctx, - workspaceId, - sourceBlob._id, - exAdapter.defaultAdapter - ) + return await exAdapter.syncBlobFromStorage(ctx, workspaceId, sourceBlob._id, exAdapter.defaultAdapter) }, 50 ) movedCnt += 1 movedBytes += sourceBlob.size batchBytes += sourceBlob.size + return result } catch (err) { console.error('failed to process blob', data._id, err) } }) - if (targetBlob !== undefined && 'size' in targetBlob && (targetBlob as Blob).size === sourceBlob.size) { + if (targetBlob !== undefined && targetBlob.size === sourceBlob.size) { // We could safely delete source blob toRemove.push(sourceBlob._id) } @@ -233,7 +243,10 @@ async function processAdapter ( await rateLimiter.waitProcessing() if (toRemove.length > 0 && params.move) { - await source.remove(ctx, workspaceId, toRemove) + while (toRemove.length > 0) { + const part = toRemove.splice(0, 500) + await source.remove(ctx, workspaceId, part) + } } } finally { await iterator.close() diff --git a/server/server-storage/src/aggregator.ts b/server/server-storage/src/aggregator.ts index 7ad1c60b0b..db02d98b84 100644 --- a/server/server-storage/src/aggregator.ts +++ b/server/server-storage/src/aggregator.ts @@ -62,13 +62,13 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE } } - const provider = this.adapters.get(current?.provider ?? this.defaultAdapter) + const provider = this.adapters.get(providerId ?? current?.provider ?? this.defaultAdapter) if (provider === undefined) { throw new NoSuchKeyError('No such provider found') } const stat = await provider.stat(ctx, workspaceId, objectName) if (stat !== undefined) { - stat.provider = current?.provider ?? this.defaultAdapter + stat.provider = providerId ?? current?.provider ?? this.defaultAdapter if (current !== undefined) { await this.dbAdapter.clean(ctx, workspaceId, DOMAIN_BLOB, [current._id]) }