From f5f94f63665b37ae56c4f8e5f10f6b48570b5b1d Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Wed, 5 Mar 2025 09:02:14 +0700 Subject: [PATCH] UBERF-9550: Add backup with verify (#8137) Signed-off-by: Andrey Sobolev --- .vscode/launch.json | 7 +- dev/tool/src/index.ts | 67 ++++ server/backup-service/src/index.ts | 5 +- server/backup/src/backup.ts | 504 ++++++++++++++++-------- server/backup/src/service.ts | 8 +- server/workspace-service/src/service.ts | 12 +- 6 files changed, 432 insertions(+), 171 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index ead314dae7..8cf20dae2d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -397,15 +397,14 @@ "name": "Debug backup tool", "type": "node", "request": "launch", - "args": ["src/__start.ts", "backup", "../../../hardware/dump/githubcr", "w-haiodo-githubcr-67403799-de2a46aa46-beb3b2"], + "args": ["src/__start.ts", "backup", "../../../hardware/dump/felix", "felix", "--full"], "env": { "MINIO_ACCESS_KEY": "minioadmin", "MINIO_SECRET_KEY": "minioadmin", "MINIO_ENDPOINT": "localhost", "SERVER_SECRET": "secret", - "MONGO_URL": "mongodb://localhost:27017", - "ACCOUNTS_URL": "http://localhost:3000", - "TELEGRAM_DATABASE": "telegram-service" + "MONGO_URL": "mongodb://localhost:27018", + "ACCOUNTS_URL": "http://localhost:3003" }, "smartStep": true, "sourceMapRenames": true, diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 9bd8b9c4db..8d26f284dc 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -907,6 +907,7 @@ export function devTool ( .description('dump workspace transactions and minio resources') .option('-i, --include ', 'A list of ; separated domain names to include during backup', '*') .option('-s, --skip ', 'A list of ; separated domain names to skip during backup', '') + .option('--full', 'Full recheck', false) .option( '-ct, --contentTypes ', 'A list of ; separated content types for blobs to skip download if size >= limit', @@ -926,6 +927,7 @@ export function devTool ( include: string blobLimit: string contentTypes: string + full: boolean } ) => { const storage = await createFileBackupStorage(dirName) @@ -981,6 +983,71 @@ export function devTool ( // await checkBackupIntegrity(toolCtx, storage) // }) + program + .command('backup-check-all') + .description('Check Backup integrity') + .option('-r|--region [region]', 'Timeout in days', '') + .option('-w|--workspace [workspace]', 'Force backup of selected workspace', '') + .option('-s|--skip [skip]', 'A command separated list of workspaces to skip', '') + .option('-d|--dry [dry]', 'Dry run', false) + .action(async (cmd: { timeout: string, workspace: string, region: string, dry: boolean, skip: string }) => { + const bucketName = process.env.BUCKET_NAME + if (bucketName === '' || bucketName == null) { + console.error('please provide butket name env') + process.exit(1) + } + + const skipWorkspaces = new Set(cmd.skip.split(',').map((it) => it.trim())) + + const token = generateToken(systemAccountEmail, getWorkspaceId('')) + const workspaces = (await listAccountWorkspaces(token, cmd.region)) + .sort((a, b) => { + const bsize = b.backupInfo?.backupSize ?? 0 + const asize = a.backupInfo?.backupSize ?? 0 + return bsize - asize + }) + .filter((it) => (cmd.workspace === '' || cmd.workspace === it.workspace) && !skipWorkspaces.has(it.workspace)) + + const backupStorageConfig = storageConfigFromEnv(process.env.STORAGE) + const storageAdapter = createStorageFromConfig(backupStorageConfig.storages[0]) + for (const ws of workspaces) { + const lastVisitDays = Math.floor((Date.now() - ws.lastVisit) / 1000 / 3600 / 24) + + toolCtx.warn('--- checking workspace backup', { + url: ws.workspaceUrl, + id: ws.workspace, + lastVisitDays, + backupSize: ws.backupInfo?.blobsSize ?? 0, + mode: ws.mode + }) + if (cmd.dry) { + continue + } + try { + const st = Date.now() + + try { + const storage = await createStorageBackupStorage( + toolCtx, + storageAdapter, + getWorkspaceId(bucketName), + ws.workspace + ) + await checkBackupIntegrity(toolCtx, storage) + } catch (err: any) { + toolCtx.error('failed to size backup', { err }) + } + const ed = Date.now() + toolCtx.warn('--- check complete', { + time: ed - st + }) + } catch (err: any) { + toolCtx.error('REstore of f workspace failedarchive workspace', { workspace: ws.workspace }) + } + } + await storageAdapter.close() + }) + program .command('backup-restore [date]') .option('-m, --merge', 'Enable merge of remote and backup content.', false) diff --git a/server/backup-service/src/index.ts b/server/backup-service/src/index.ts index 2a0efc83a4..83431e466a 100644 --- a/server/backup-service/src/index.ts +++ b/server/backup-service/src/index.ts @@ -94,7 +94,7 @@ export async function backupWorkspace ( region: string, downloadLimit: number, contextVars: Record, - + fullCheck: boolean = false, onFinish?: (backupStorage: StorageAdapter, workspaceStorage: StorageAdapter) => Promise ): Promise { const config = _config() @@ -129,7 +129,8 @@ export async function backupWorkspace ( region, downloadLimit, [], - contextVars + contextVars, + fullCheck ) if (result && onFinish !== undefined) { await onFinish(storageAdapter, workspaceStorageAdapter) diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts index 747e69e34d..e80071c773 100644 --- a/server/backup/src/backup.ts +++ b/server/backup/src/backup.ts @@ -312,71 +312,80 @@ async function verifyDigest ( d.storage = (d.storage ?? []).filter((it) => !storageToRemove.has(it)) } - // if (d?.snapshot !== undefined) { - // Will not check old format - // } - const digestToRemove = new Set() - for (const snapshot of d?.snapshots ?? []) { - try { - ctx.info('checking', { snapshot }) - const changes: Snapshot = { - added: new Map(), - removed: [], - updated: new Map() - } - let lmodified = false - try { - const dataBlob = gunzipSync(new Uint8Array(await storage.loadFile(snapshot))) - .toString() - .split('\n') - const addedCount = parseInt(dataBlob.shift() ?? '0') - const added = dataBlob.splice(0, addedCount) - for (const it of added) { - const [k, v] = it.split(';') - if (validDocs.has(k as any)) { - changes.added.set(k as Ref, v) - } else { - lmodified = true - } - } - - const updatedCount = parseInt(dataBlob.shift() ?? '0') - const updated = dataBlob.splice(0, updatedCount) - for (const it of updated) { - const [k, v] = it.split(';') - if (validDocs.has(k as any)) { - changes.updated.set(k as Ref, v) - } else { - lmodified = true - } - } - - const removedCount = parseInt(dataBlob.shift() ?? '0') - const removed = dataBlob.splice(0, removedCount) - changes.removed = removed as Ref[] - } catch (err: any) { - ctx.warn('failed during processing of snapshot file, it will be skipped', { snapshot }) - digestToRemove.add(snapshot) - modified = true - } - - if (lmodified) { - modified = true - // Store changes without missing files - await writeChanges(storage, snapshot, changes) - } - } catch (err: any) { - digestToRemove.add(snapshot) - ctx.error('digest is broken, will do full backup for', { domain, err: err.message, snapshot }) - modified = true - } - } - d.snapshots = (d.snapshots ?? []).filter((it) => !digestToRemove.has(it)) + modified = await updateDigest(d, ctx, storage, validDocs, modified, domain) } ctx.end() return modified } +async function updateDigest ( + d: DomainData, + ctx: MeasureContext, + storage: BackupStorage, + validDocs: Set>, + modified: boolean, + domain: Domain +): Promise { + const digestToRemove = new Set() + for (const snapshot of d?.snapshots ?? []) { + try { + ctx.info('checking', { snapshot }) + const changes: Snapshot = { + added: new Map(), + removed: [], + updated: new Map() + } + let lmodified = false + try { + const dataBlob = gunzipSync(new Uint8Array(await storage.loadFile(snapshot))) + .toString() + .split('\n') + const addedCount = parseInt(dataBlob.shift() ?? '0') + const added = dataBlob.splice(0, addedCount) + for (const it of added) { + const [k, v] = it.split(';') + if (validDocs.has(k as any)) { + changes.added.set(k as Ref, v) + } else { + lmodified = true + } + } + + const updatedCount = parseInt(dataBlob.shift() ?? '0') + const updated = dataBlob.splice(0, updatedCount) + for (const it of updated) { + const [k, v] = it.split(';') + if (validDocs.has(k as any)) { + changes.updated.set(k as Ref, v) + } else { + lmodified = true + } + } + + const removedCount = parseInt(dataBlob.shift() ?? '0') + const removed = dataBlob.splice(0, removedCount) + changes.removed = removed as Ref[] + } catch (err: any) { + ctx.warn('failed during processing of snapshot file, it will be skipped', { snapshot }) + digestToRemove.add(snapshot) + modified = true + } + + if (lmodified) { + modified = true + // Store changes without missing files + await writeChanges(storage, snapshot, changes) + } + } catch (err: any) { + digestToRemove.add(snapshot) + ctx.error('digest is broken, will do full backup for', { domain, err: err.message, snapshot }) + modified = true + } + } + d.snapshots = (d.snapshots ?? []).filter((it) => !digestToRemove.has(it)) + return modified +} + async function write (chunk: any, stream: Writable): Promise { let needDrain = false await new Promise((resolve, reject) => { @@ -683,6 +692,7 @@ export async function backup ( isCanceled?: () => boolean progress?: (progress: number) => Promise token?: string + fullVerify?: boolean } = { force: false, timeout: 0, @@ -758,11 +768,13 @@ export async function backup ( backupInfo.domainHashes = {} } + const fullCheck = options.fullVerify === true + let lastTx: Tx | undefined let lastTxChecked = false // Skip backup if there is no transaction changes. - if (options.getLastTx !== undefined) { + if (options.getLastTx !== undefined && !fullCheck) { lastTx = await options.getLastTx() if (lastTx !== undefined) { if (lastTx._id === backupInfo.lastTxId && !options.force) { @@ -788,7 +800,7 @@ export async function backup ( ? await options.getConnection() : ((await createClient(transactorUrl, token, undefined, options.connectTimeout)) as CoreClient & BackupClient) - if (!lastTxChecked) { + if (!lastTxChecked && !fullCheck) { lastTx = await connection.findOne( core.class.Tx, { objectSpace: { $ne: core.space.Model } }, @@ -871,7 +883,8 @@ export async function backup ( ctx: MeasureContext, domain: Domain, digest: Map, string>, - changes: Snapshot + changes: Snapshot, + same: Map, string> ): Promise<{ changed: number, needRetrieveChunks: Ref[][] }> { let idx: number | undefined let processed = 0 @@ -924,7 +937,11 @@ export async function backup ( const serverDocHash = doTrimHash(hash) as string const currentHash = doTrimHash(digest.get(id as Ref) ?? oldHash.get(id as Ref)) if (currentHash !== undefined) { + const oldD = digest.get(id as Ref) if (digest.delete(id as Ref)) { + if (oldD !== undefined) { + same.set(id as Ref, oldD) + } oldHash.set(id as Ref, currentHash) } if (currentHash !== serverDocHash) { @@ -1015,12 +1032,13 @@ export async function backup ( } const dHash = await connection.getDomainHash(domain) - if (backupInfo.domainHashes[domain] === dHash) { + if (backupInfo.domainHashes[domain] === dHash && !fullCheck) { ctx.info('no changes in domain', { domain }) return } // Cumulative digest const digest = await ctx.with('load-digest', {}, (ctx) => loadDigest(ctx, storage, backupInfo.snapshots, domain)) + const same = new Map, string>() let _pack: Pack | undefined let _packClose = async (): Promise => {} @@ -1030,11 +1048,69 @@ export async function backup ( await progress(0) } let { changed, needRetrieveChunks } = await ctx.with('load-chunks', { domain }, (ctx) => - loadChangesFromServer(ctx, domain, digest, changes) + loadChangesFromServer(ctx, domain, digest, changes, same) ) processedChanges.removed = Array.from(digest.keys()) digest.clear() + if (options.fullVerify === true && domain !== DOMAIN_BLOB && same.size > 0) { + // We need to verify existing documents are correct + const rsnapshots = Array.from(backupInfo.snapshots).reverse() + // We need to load all documents locally and from server and compare + for (const s of rsnapshots) { + const d = s.domains[domain] + if (d == null) { + continue + } + const { docs, modified } = await verifyDocsFromSnapshot(ctx, domain, d, s, storage, same) + if (modified) { + changed++ + } + const batchSize = 200 + let needRetrieve: Ref[] = [] + for (let i = 0; i < docs.length; i += batchSize) { + const part = docs.slice(i, i + batchSize) + const serverDocs = await connection.loadDocs( + domain, + part.map((it) => it._id) + ) + const smap = toIdMap(serverDocs) + for (const localDoc of part) { + if (TxProcessor.isExtendsCUD(localDoc._class)) { + const tx = localDoc as TxCUD + if (tx.objectSpace == null) { + tx.objectSpace = core.space.Workspace + } + } + const serverDoc = smap.get(localDoc._id) + if (serverDoc === undefined) { + // We do not have a doc on server already, ignore it. + } else { + const { '%hash%': _h1, ...dData } = localDoc as any + const { '%hash%': _h2, ...sData } = serverDoc as any + + const dsame = deepEqual(dData, sData) + if (!dsame) { + needRetrieve.push(localDoc._id) + changes.updated.set(localDoc._id, same.get(localDoc._id) ?? '') + // Docs are not same + if (needRetrieve.length > 200) { + needRetrieveChunks.push(needRetrieve) + needRetrieve = [] + } + } + } + } + } + if (needRetrieve.length > 0) { + needRetrieveChunks.push(needRetrieve) + needRetrieve = [] + } + } + } else { + same.clear() + } + if (progress !== undefined) { await progress(10) } @@ -1281,19 +1357,21 @@ export async function backup ( changed++ } - if (changed > 0 || backupInfo.domainHashes[domain] !== dHash) { + if (changed > 0 || (domain !== DOMAIN_BLOB && backupInfo.domainHashes[domain] !== dHash)) { // Store domain hash, to be used on next time. backupInfo.domainHashes[domain] = dHash - snapshot.domains[domain] = domainInfo domainInfo.added += processedChanges.added.size domainInfo.updated += processedChanges.updated.size domainInfo.removed += processedChanges.removed.length + if (domainInfo.added + domainInfo.updated + domainInfo.removed > 0) { + snapshot.domains[domain] = domainInfo - const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`) - snapshotIndex++ - domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile] - await writeChanges(storage, snapshotFile, processedChanges) + const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`) + snapshotIndex++ + domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile] + await writeChanges(storage, snapshotFile, processedChanges) + } processedChanges.added.clear() processedChanges.removed = [] @@ -1955,95 +2033,111 @@ export async function restore ( if (docsToAdd.size === 0) { break } - ctx.info('processing', { storageFile: sf, processed, workspace: workspaceId }) + ctx.info('processing', { storageFile: sf, processed, workspace: workspaceId.name }) + try { + const readStream = await storage.load(sf) + const ex = extract() - const readStream = await storage.load(sf) - const ex = extract() - - ex.on('entry', (headers, stream, next) => { - const name = headers.name ?? '' - processed++ - // We found blob data - if (requiredDocs.has(name as Ref)) { - const chunks: Buffer[] = [] - stream.on('data', (chunk) => { - chunks.push(chunk) - }) - stream.on('end', () => { - const bf = Buffer.concat(chunks as any) - const d = blobs.get(name) - if (d === undefined) { - blobs.set(name, { doc: undefined, buffer: bf }) - next() - } else { - blobs.delete(name) - const blob = d?.doc as Blob - ;(blob as any)['%hash%'] = changeset.get(blob._id) - let sz = blob.size - if (Number.isNaN(sz) || sz !== bf.length) { - sz = bf.length - } - void sendBlob(blob, bf, next) - } - }) - } else if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref)) { - const chunks: Buffer[] = [] - const bname = name.substring(0, name.length - 5) - stream.on('data', (chunk) => { - chunks.push(chunk) - }) - stream.on('end', () => { - const bf = Buffer.concat(chunks as any) - let doc: Doc - try { - doc = JSON.parse(bf.toString()) as Doc - } catch (err) { - ctx.warn('failed to parse blob metadata', { name, workspace: workspaceId, err }) - next() - return - } - - if (doc._class === core.class.Blob || doc._class === 'core:class:BlobData') { - const data = migradeBlobData(doc as Blob, changeset.get(doc._id) as string) - const d = blobs.get(bname) ?? (data !== '' ? Buffer.from(data, 'base64') : undefined) + ex.on('entry', (headers, stream, next) => { + const name = headers.name ?? '' + processed++ + // We found blob data + if (requiredDocs.has(name as Ref)) { + const chunks: Buffer[] = [] + stream.on('data', (chunk) => { + chunks.push(chunk) + }) + stream.on('end', () => { + const bf = Buffer.concat(chunks as any) + const d = blobs.get(name) if (d === undefined) { - blobs.set(bname, { doc, buffer: undefined }) + blobs.set(name, { doc: undefined, buffer: bf }) next() } else { - blobs.delete(bname) - const blob = doc as Blob - void sendBlob(blob, d instanceof Buffer ? d : (d.buffer as Buffer), next) + blobs.delete(name) + const blob = d?.doc as Blob + ;(blob as any)['%hash%'] = changeset.get(blob._id) + let sz = blob.size + if (Number.isNaN(sz) || sz !== bf.length) { + sz = bf.length + } + void sendBlob(blob, bf, next).catch((err) => { + ctx.error('failed to send blob', { err }) + }) } - } else { - ;(doc as any)['%hash%'] = changeset.get(doc._id) - void sendChunk(doc, bf.length).finally(() => { - requiredDocs.delete(doc._id) + }) + } else if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref)) { + const chunks: Buffer[] = [] + const bname = name.substring(0, name.length - 5) + stream.on('data', (chunk) => { + chunks.push(chunk) + }) + stream.on('end', () => { + const bf = Buffer.concat(chunks as any) + let doc: Doc + try { + doc = JSON.parse(bf.toString()) as Doc + } catch (err) { + ctx.warn('failed to parse blob metadata', { name, workspace: workspaceId.name, err }) next() - }) - } - }) - } else { - next() - } - stream.resume() // just auto drain the stream - }) + return + } - await blobUploader.waitProcessing() - - const endPromise = new Promise((resolve) => { - ex.on('finish', () => { - resolve(null) + if (doc._class === core.class.Blob || doc._class === 'core:class:BlobData') { + const data = migradeBlobData(doc as Blob, changeset.get(doc._id) as string) + const d = blobs.get(bname) ?? (data !== '' ? Buffer.from(data, 'base64') : undefined) + if (d === undefined) { + blobs.set(bname, { doc, buffer: undefined }) + next() + } else { + blobs.delete(bname) + const blob = doc as Blob + const buff = d instanceof Buffer ? d : d.buffer + if (buff != null) { + void sendBlob(blob, d instanceof Buffer ? d : (d.buffer as Buffer), next).catch((err) => { + ctx.error('failed to send blob', { err }) + }) + } else { + next() + } + } + } else { + ;(doc as any)['%hash%'] = changeset.get(doc._id) + void sendChunk(doc, bf.length).finally(() => { + requiredDocs.delete(doc._id) + next() + }) + } + }) + } else { + next() + } + stream.resume() // just auto drain the stream }) - }) - const unzip = createGunzip({ level: defaultLevel }) - readStream.on('end', () => { - readStream.destroy() - }) - readStream.pipe(unzip) - unzip.pipe(ex) + await blobUploader.waitProcessing() - await endPromise + const unzip = createGunzip({ level: defaultLevel }) + + const endPromise = new Promise((resolve, reject) => { + ex.on('finish', () => { + resolve(null) + }) + + readStream.on('end', () => { + readStream.destroy() + }) + readStream.pipe(unzip).on('error', (err) => { + readStream.destroy() + reject(err) + }) + unzip.pipe(ex) + }) + + await endPromise + } catch (err: any) { + ctx.error('failed to processing', { storageFile: sf, processed, workspace: workspaceId.name }) + } } } } @@ -2146,6 +2240,103 @@ export async function restore ( return true } +async function verifyDocsFromSnapshot ( + ctx: MeasureContext, + domain: Domain, + d: DomainData, + s: BackupSnapshot, + storage: BackupStorage, + digest: Map, string> +): Promise<{ docs: Doc[], modified: boolean }> { + const result: Doc[] = [] + const storageToRemove = new Set() + const validDocs = new Set>() + if (digest.size > 0) { + const sDigest = await loadDigest(ctx, storage, [s], domain) + const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => digest.has(it))) + + if (requiredDocs.size > 0) { + ctx.info('updating', { domain, requiredDocs: requiredDocs.size }) + // We have required documents here. + for (const sf of d.storage ?? []) { + if (digest.size === 0) { + break + } + try { + const readStream = await storage.load(sf) + const ex = extract() + + ex.on('entry', (headers, stream, next) => { + const name = headers.name ?? '' + // We found blob data + if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref)) { + const chunks: Buffer[] = [] + const bname = name.substring(0, name.length - 5) + stream.on('data', (chunk) => { + chunks.push(chunk) + }) + stream.on('end', () => { + const bf = Buffer.concat(chunks as any) + let doc: Doc + try { + doc = JSON.parse(bf.toString()) as Doc + } catch (err) { + next() + return + } + + if (doc._class === core.class.Blob || doc._class === 'core:class:BlobData') { + // Skip blob + validDocs.add(bname as Ref) + } else { + ;(doc as any)['%hash%'] = digest.get(doc._id) + digest.delete(bname as Ref) + result.push(doc) + validDocs.add(bname as Ref) + next() + } + }) + } else { + next() + } + stream.resume() // just auto drain the stream + }) + + const unzip = createGunzip({ level: defaultLevel }) + + const endPromise = new Promise((resolve, reject) => { + ex.on('finish', () => { + resolve(null) + }) + + readStream.on('end', () => { + readStream.destroy() + }) + readStream.pipe(unzip).on('error', (err) => { + readStream.destroy() + storageToRemove.add(sf) + reject(err) + }) + unzip.pipe(ex) + }) + + await endPromise + } catch (err: any) { + storageToRemove.add(sf) + ctx.error('failed to processing', { storageFile: sf }) + } + } + } + } + let modified = false + if (storageToRemove.size > 0) { + d.storage = (d.storage ?? []).filter((it) => !storageToRemove.has(it)) + modified = true + } + modified = await updateDigest(d, ctx, storage, validDocs, modified, domain) + return { docs: result, modified } +} + /** * Compacting backup into just one snapshot. * @public @@ -2429,15 +2620,18 @@ export async function compactBackup ( } if (changed > 0) { - snapshot.domains[domain] = domainInfo domainInfo.added += processedChanges.added.size domainInfo.updated += processedChanges.updated.size domainInfo.removed += processedChanges.removed.length - const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`) - snapshotIndex++ - domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile] - await writeChanges(storage, snapshotFile, processedChanges) + if (domainInfo.added + domainInfo.updated + domainInfo.removed > 0) { + snapshot.domains[domain] = domainInfo + + const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`) + snapshotIndex++ + domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile] + await writeChanges(storage, snapshotFile, processedChanges) + } processedChanges.added.clear() processedChanges.removed = [] diff --git a/server/backup/src/service.ts b/server/backup/src/service.ts index 2025d50ad3..7c3e826371 100644 --- a/server/backup/src/service.ts +++ b/server/backup/src/service.ts @@ -74,7 +74,8 @@ class BackupWorker { ) => DbConfiguration, readonly region: string, readonly contextVars: Record, - readonly skipDomains: string[] = [] + readonly skipDomains: string[] = [], + readonly fullCheck: boolean = false ) {} canceled = false @@ -285,6 +286,7 @@ class BackupWorker { connectTimeout: 5 * 60 * 1000, // 5 minutes to, blobDownloadLimit: this.downloadLimit, skipBlobContentTypes: [], + fullVerify: this.fullCheck, storageAdapter: this.workspaceStorageAdapter, getLastTx: async (): Promise => { const config = this.getConfig(ctx, wsIds, null, this.workspaceStorageAdapter) @@ -408,6 +410,7 @@ export async function doBackupWorkspace ( downloadLimit: number, skipDomains: string[], contextVars: Record, + fullCheck: boolean = false, notify?: (progress: number) => Promise ): Promise { const backupWorker = new BackupWorker( @@ -418,7 +421,8 @@ export async function doBackupWorkspace ( getConfig, region, contextVars, - skipDomains + skipDomains, + fullCheck ) backupWorker.downloadLimit = downloadLimit const result = await backupWorker.doBackup(ctx, workspace, notify) diff --git a/server/workspace-service/src/service.ts b/server/workspace-service/src/service.ts index b47fb0ac86..4974a0233e 100644 --- a/server/workspace-service/src/service.ts +++ b/server/workspace-service/src/service.ts @@ -479,7 +479,7 @@ export class WorkspaceWorker { await sendEvent('archiving-backup-started', 0) await this.sendTransactorMaitenance(token, workspace.uuid) - if (await this.doBackup(ctx, workspace, opt, true)) { + if (await this.doBackup(ctx, workspace, opt)) { await sendEvent('archiving-backup-done', 100) } break @@ -516,7 +516,7 @@ export class WorkspaceWorker { case 'migration-backup': await sendEvent('migrate-backup-started', 0) await this.sendTransactorMaitenance(token, workspace.uuid) - if (await this.doBackup(ctx, workspace, opt, false)) { + if (await this.doBackup(ctx, workspace, opt)) { await sendEvent('migrate-backup-done', 100) } break @@ -551,12 +551,7 @@ export class WorkspaceWorker { } } - private async doBackup ( - ctx: MeasureContext, - workspace: WorkspaceInfoWithStatus, - opt: WorkspaceOptions, - archive: boolean - ): Promise { + private async doBackup (ctx: MeasureContext, workspace: WorkspaceInfoWithStatus, opt: WorkspaceOptions): Promise { if (opt.backup === undefined) { return false } @@ -624,6 +619,7 @@ export class WorkspaceWorker { 50000, ['blob'], sharedPipelineContextVars, + true, (_p: number) => { if (progress !== Math.round(_p)) { progress = Math.round(_p)