From 8ac05556546d3898a1f25544aad67266735a571d Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Thu, 26 Sep 2024 22:01:53 +0700 Subject: [PATCH] UBERF-8285: Fix backup service lastVisit check (#6738) Signed-off-by: Andrey Sobolev --- server/backup/src/backup.ts | 8 ++++++- server/backup/src/service.ts | 24 +++++++++++++++------ server/client/src/blob.ts | 1 + server/core/src/server/aggregator.ts | 32 +++++++++++++++++++++++----- 4 files changed, 52 insertions(+), 13 deletions(-) diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts index 0b87d1764f..2daa2fdb32 100644 --- a/server/backup/src/backup.ts +++ b/server/backup/src/backup.ts @@ -682,7 +682,7 @@ export async function backup ( let downloaded = 0 const printDownloaded = (msg: string, size?: number | null): void => { - if (size == null || Number.isNaN(size)) { + if (size == null || Number.isNaN(size) || !Number.isInteger(size)) { return } ops++ @@ -710,6 +710,12 @@ export async function backup ( let changed: number = 0 const needRetrieveChunks: Ref[][] = [] // Load all digest from collection. + ctx.info('processed', { + processed, + digest: digest.size, + time: Date.now() - st, + workspace: workspaceId.name + }) while (true) { try { const currentChunk = await ctx.with('loadChunk', {}, () => connection.loadChunk(domain, idx, options.recheck)) diff --git a/server/backup/src/service.ts b/server/backup/src/service.ts index 407a3950b2..9650305e8a 100644 --- a/server/backup/src/service.ts +++ b/server/backup/src/service.ts @@ -115,27 +115,37 @@ class BackupWorker { ctx: MeasureContext ): Promise<{ failedWorkspaces: BaseWorkspaceInfo[], processed: number, skipped: number }> { const workspacesIgnore = new Set(this.config.SkipWorkspaces.split(';')) + ctx.info('skipped workspaces', { workspacesIgnore }) + let skipped = 0 const workspaces = (await listAccountWorkspaces(this.config.Token)).filter((it) => { const lastBackup = it.backupInfo?.lastBackup ?? 0 if ((Date.now() - lastBackup) / 1000 < this.config.Interval) { // No backup required, interval not elapsed - ctx.info('Skip backup', { workspace: it.workspace, lastBackup: Math.round((Date.now() - lastBackup) / 1000) }) + skipped++ + return false + } + + if (it.lastVisit == null) { + skipped++ return false } const lastVisitSec = Math.floor((Date.now() - it.lastVisit) / 1000) if (lastVisitSec > this.config.Interval) { // No backup required, interval not elapsed - ctx.info('Skip backup, since not visited since last check', { - workspace: it.workspace, - days: Math.floor(lastVisitSec / 3600 / 24), - seconds: lastVisitSec - }) + skipped++ return false } return !workspacesIgnore.has(it.workspace) }) workspaces.sort((a, b) => b.lastVisit - a.lastVisit) + + ctx.info('Preparing for BACKUP', { + total: workspaces.length, + skipped, + workspaces: workspaces.map((it) => it.workspace) + }) + return await this.doBackup(ctx, workspaces) } @@ -230,7 +240,7 @@ class BackupWorker { dataSize: Math.round((result.dataSize * 100) / (1024 * 1024)) / 100, blobsSize: Math.round((result.blobsSize * 100) / (1024 * 1024)) / 100 } - rootCtx.warn('\n\nBACKUP STATS ', { + rootCtx.warn('BACKUP STATS', { workspace: ws.workspace, index, ...backupInfo, diff --git a/server/client/src/blob.ts b/server/client/src/blob.ts index febeda76bb..afaa6d89c4 100644 --- a/server/client/src/blob.ts +++ b/server/client/src/blob.ts @@ -98,6 +98,7 @@ export class BlobClient { chunks.push(chunk) }) readable.on('end', () => { + readable.destroy() resolve() }) }) diff --git a/server/core/src/server/aggregator.ts b/server/core/src/server/aggregator.ts index 030b8ea2b2..bde7f775e0 100644 --- a/server/core/src/server/aggregator.ts +++ b/server/core/src/server/aggregator.ts @@ -77,6 +77,16 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise {} + doTrimHash (s: string | undefined): string { + if (s == null) { + return '' + } + if (s.startsWith('"') && s.endsWith('"')) { + return s.slice(1, s.length - 1) + } + return s + } + async doSyncDocs (ctx: MeasureContext, workspaceId: WorkspaceId, docs: ListBlobResult[]): Promise { const existingBlobs = toIdMap( await this.dbAdapter.find(ctx, workspaceId, DOMAIN_BLOB, { _id: { $in: docs.map((it) => it._id) } }) @@ -84,10 +94,17 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE const toUpdate: Blob[] = [] for (const d of docs) { const blobInfo = existingBlobs.get(d._id) - if (blobInfo === undefined || blobInfo.etag !== d.etag || blobInfo.size !== d.size) { - const stat = await this.stat(ctx, workspaceId, d._id) + if ( + blobInfo === undefined || + this.doTrimHash(blobInfo.etag) !== this.doTrimHash(d.etag) || + blobInfo.size !== d.size + ) { + const stat = await this.adapters.get(d.provider)?.stat(ctx, workspaceId, d._id) if (stat !== undefined) { + stat.provider = d.provider toUpdate.push(stat) + } else { + ctx.error('blob not found for sync', { provider: d.provider, id: d._id, workspace: workspaceId.name }) } } } @@ -120,19 +137,24 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE } private makeStorageIterator (ctx: MeasureContext, workspaceId: WorkspaceId): BlobStorageIterator { - const adapters = Array.from(this.adapters.values()) + const adapters = Array.from(this.adapters.entries()) + let provider: [string, StorageAdapter] | undefined let iterator: BlobStorageIterator | undefined return { next: async () => { while (true) { if (iterator === undefined && adapters.length > 0) { - iterator = await (adapters.shift() as StorageAdapter).listStream(ctx, workspaceId) + provider = adapters.shift() as [string, StorageAdapter] + iterator = await provider[1].listStream(ctx, workspaceId) } if (iterator === undefined) { return [] } const docInfos = await iterator.next() if (docInfos.length > 0) { + for (const d of docInfos) { + d.provider = provider?.[0] as string + } // We need to check if our stored version is fine return docInfos } else { @@ -323,7 +345,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE const result = await adapter.put(ctx, workspaceId, objectName, stream, contentType, size) - if (size === undefined || size === 0) { + if (size === undefined || size === 0 || !Number.isInteger(size)) { const docStats = await adapter.stat(ctx, workspaceId, objectName) if (docStats !== undefined) { if (contentType !== docStats.contentType) {