UBERF-9550: Fix backup verification memory usage (#8138)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2025-03-05 12:24:36 +07:00 committed by GitHub
parent fdd0505845
commit cc0cb34ba6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 190 additions and 122 deletions

View File

@ -665,7 +665,7 @@ export function devTool (
cmd.region,
5000, // 5 gigabytes per blob
sharedPipelineContextVars,
true,
true, // Do full check
async (storage, workspaceStorage) => {
if (cmd.remove) {
await updateArchiveInfo(toolCtx, db, ws.workspace, true)
@ -781,12 +781,14 @@ export function devTool (
.option('--region [region]', 'Force backup of selected workspace', '')
.option('--full', 'Full recheck', false)
.option('-w|--workspace [workspace]', 'Force backup of selected workspace', '')
.action(async (cmd: { workspace: string, region: string, full: boolean }) => {
.option('-s|--skip [skip]', 'A command separated list of workspaces to skip', '')
.action(async (cmd: { workspace: string, region: string, full: boolean, skip: string }) => {
const { txes } = prepareTools()
const skipped = new Set(cmd.skip.split(',').map((it) => it.trim()))
await withAccountDatabase(async (db) => {
const workspaces = (await listWorkspacesPure(db))
.sort((a, b) => a.lastVisit - b.lastVisit)
.filter((it) => cmd.workspace === '' || cmd.workspace === it.workspace)
.filter((it) => (cmd.workspace === '' || cmd.workspace === it.workspace) && !skipped.has(it.workspace))
let processed = 0
@ -819,7 +821,11 @@ export function devTool (
processed++
}
} catch (err: any) {
toolCtx.error('Failed to backup workspace', { workspace: ws.workspace })
toolCtx.error('Failed to backup workspace', {
workspace: ws.workspace,
err: err.message,
errStack: err.stack
})
}
}
console.log('Processed workspaces', processed)

View File

@ -204,10 +204,11 @@ async function verifyDigest (
storage: BackupStorage,
snapshots: BackupSnapshot[],
domain: Domain
): Promise<boolean> {
): Promise<{ modified: boolean, modifiedFiles: string[] }> {
ctx = ctx.newChild('verify digest', { domain, count: snapshots.length })
ctx.info('verify-digest', { domain, count: snapshots.length })
let modified = false
const modifiedFiles: string[] = []
for (const s of snapshots) {
const d = s.domains[domain]
if (d === undefined) {
@ -246,35 +247,13 @@ async function verifyDigest (
blobs.set(bname, { doc, buffer: undefined })
} else {
blobs.delete(bname)
validDocs.add(bname as Ref<Doc>)
}
} else {
validDocs.add(bname as Ref<Doc>)
}
validDocs.add(bname as Ref<Doc>)
next()
})
} else {
const chunks: Buffer[] = []
stream.on('data', (chunk) => {
chunks.push(chunk)
})
stream.on('end', () => {
const bf = Buffer.concat(chunks as any)
const d = blobs.get(name)
if (d === undefined) {
blobs.set(name, { doc: undefined, buffer: bf })
} else {
blobs.delete(name)
const doc = d?.doc as Blob
let sz = doc.size
if (Number.isNaN(sz) || sz !== bf.length) {
sz = bf.length
}
validDocs.add(name as Ref<Doc>)
}
next()
})
}
stream.resume() // just auto drain the stream
})
@ -310,12 +289,17 @@ async function verifyDigest (
if (storageToRemove.size > 0) {
modified = true
d.storage = (d.storage ?? []).filter((it) => !storageToRemove.has(it))
modifiedFiles.push(...Array.from(storageToRemove))
for (const sf of storageToRemove) {
await storage.delete(sf)
}
modified = await updateDigest(d, ctx, storage, validDocs, modified, domain)
}
let mfiles: string[] = []
;({ modified, modifiedFiles: mfiles } = await updateDigest(d, ctx, storage, validDocs, modified, domain))
modifiedFiles.push(...mfiles)
}
ctx.end()
return modified
return { modified, modifiedFiles }
}
async function updateDigest (
@ -325,8 +309,9 @@ async function updateDigest (
validDocs: Set<Ref<Doc>>,
modified: boolean,
domain: Domain
): Promise<boolean> {
): Promise<{ modified: boolean, modifiedFiles: string[] }> {
const digestToRemove = new Set<string>()
const modifiedFiles: string[] = []
for (const snapshot of d?.snapshots ?? []) {
try {
ctx.info('checking', { snapshot })
@ -365,6 +350,11 @@ async function updateDigest (
const removedCount = parseInt(dataBlob.shift() ?? '0')
const removed = dataBlob.splice(0, removedCount)
changes.removed = removed as Ref<Doc>[]
if (addedCount === 0 && removedCount === 0 && updatedCount === 0) {
// Empty digest, need to clean
digestToRemove.add(snapshot)
lmodified = true
}
} catch (err: any) {
ctx.warn('failed during processing of snapshot file, it will be skipped', { snapshot })
digestToRemove.add(snapshot)
@ -373,17 +363,22 @@ async function updateDigest (
if (lmodified) {
modified = true
if (digestToRemove.has(snapshot)) {
await storage.delete(snapshot) // No need for digest, lets' remove it
} else {
// Store changes without missing files
await writeChanges(storage, snapshot, changes)
}
}
} catch (err: any) {
digestToRemove.add(snapshot)
modifiedFiles.push(snapshot)
ctx.error('digest is broken, will do full backup for', { domain, err: err.message, snapshot })
modified = true
}
}
d.snapshots = (d.snapshots ?? []).filter((it) => !digestToRemove.has(it))
return modified
return { modified, modifiedFiles }
}
async function write (chunk: any, stream: Writable): Promise<void> {
@ -837,6 +832,8 @@ export async function backup (
backupInfo.lastTxId = '' // Clear until full backup will be complete
const recheckSizes: string[] = []
const snapshot: BackupSnapshot = {
date: Date.now(),
domains: {}
@ -1053,20 +1050,22 @@ export async function backup (
if (d == null) {
continue
}
const { docs, modified } = await verifyDocsFromSnapshot(ctx, domain, d, s, storage, same)
if (modified) {
changed++
}
const batchSize = 200
let needRetrieve: Ref<Doc>[] = []
for (let i = 0; i < docs.length; i += batchSize) {
const part = docs.slice(i, i + batchSize)
const batchSize = 200
const { modified, modifiedFiles } = await verifyDocsFromSnapshot(
ctx,
domain,
d,
s,
storage,
same,
async (docs) => {
const serverDocs = await connection.loadDocs(
domain,
part.map((it) => it._id)
docs.map((it) => it._id)
)
const smap = toIdMap(serverDocs)
for (const localDoc of part) {
for (const localDoc of docs) {
if (TxProcessor.isExtendsCUD(localDoc._class)) {
const tx = localDoc as TxCUD<Doc>
if (tx.objectSpace == null) {
@ -1092,12 +1091,24 @@ export async function backup (
}
}
}
},
batchSize
)
if (modified) {
changed++
recheckSizes.push(...modifiedFiles)
}
if (needRetrieve.length > 0) {
needRetrieveChunks.push(needRetrieve)
needRetrieve = []
}
}
// We need to retrieve all documents from same not matched
const sameArray: Ref<Doc>[] = Array.from(same.keys())
while (sameArray.length > 0) {
const docs = sameArray.splice(0, 200)
needRetrieveChunks.push(docs)
}
} else {
same.clear()
}
@ -1135,8 +1146,10 @@ export async function backup (
docs = await ctx.with('<<<< load-docs', {}, async () => await connection.loadDocs(domain, needRetrieve))
lastSize = docs.reduce((p, it) => p + estimateDocSize(it), 0)
if (docs.length !== needRetrieve.length) {
const nr = new Set(docs.map((it) => it._id))
ctx.error('failed to retrieve all documents', { missing: needRetrieve.filter((it) => !nr.has(it)) })
ctx.error('failed to retrieve all documents', {
docsLen: docs.length,
needRetrieve: needRetrieve.length
})
}
ops++
} catch (err: any) {
@ -1407,6 +1420,40 @@ export async function backup (
}
result.result = true
if (!canceled()) {
backupInfo.lastTxId = lastTx?._id ?? '0' // We could store last tx, since full backup is complete
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
}
await rebuildSizeInfo(storage, recheckSizes, ctx, result, backupInfo, infoFile)
return result
} catch (err: any) {
ctx.error('backup error', { err, workspace: workspaceId.name })
return result
} finally {
await rm(tmpRoot, { recursive: true })
if (printEnd) {
ctx.info('end backup', { workspace: workspaceId.name, totalTime: Date.now() - st })
}
if (options.getConnection === undefined && connection !== undefined) {
await connection.close()
}
ctx.end()
if (options.timeout !== -1) {
clearInterval(timer)
}
}
}
async function rebuildSizeInfo (
storage: BackupStorage,
recheckSizes: string[],
ctx: MeasureContext<any>,
result: BackupResult,
backupInfo: BackupInfo,
infoFile: string
): Promise<void> {
const sizeFile = 'backup.size.gz'
let sizeInfo: Record<string, number> = {}
@ -1416,9 +1463,9 @@ export async function backup (
}
let processed = 0
if (!canceled()) {
backupInfo.lastTxId = lastTx?._id ?? '0' // We could store last tx, since full backup is complete
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
for (const file of recheckSizes) {
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete sizeInfo[file]
}
const addFileSize = async (file: string | undefined | null): Promise<void> => {
@ -1451,24 +1498,6 @@ export async function backup (
await addFileSize(infoFile)
await storage.writeFile(sizeFile, gzipSync(JSON.stringify(sizeInfo, undefined, 2), { level: defaultLevel }))
return result
} catch (err: any) {
ctx.error('backup error', { err, workspace: workspaceId.name })
return result
} finally {
await rm(tmpRoot, { recursive: true })
if (printEnd) {
ctx.info('end backup', { workspace: workspaceId.name, totalTime: Date.now() - st })
}
if (options.getConnection === undefined && connection !== undefined) {
await connection.close()
}
ctx.end()
if (options.timeout !== -1) {
clearInterval(timer)
}
}
}
/**
@ -2236,11 +2265,14 @@ async function verifyDocsFromSnapshot (
d: DomainData,
s: BackupSnapshot,
storage: BackupStorage,
digest: Map<Ref<Doc>, string>
): Promise<{ docs: Doc[], modified: boolean }> {
const result: Doc[] = []
digest: Map<Ref<Doc>, string>,
verify: (docs: Doc[]) => Promise<void>,
chunkSize: number
): Promise<{ modified: boolean, modifiedFiles: string[] }> {
let result: Doc[] = []
const storageToRemove = new Set<string>()
const validDocs = new Set<Ref<Doc>>()
const modifiedFiles: string[] = []
if (digest.size > 0) {
const sDigest = await loadDigest(ctx, storage, [s], domain)
const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => digest.has(it)))
@ -2259,7 +2291,8 @@ async function verifyDocsFromSnapshot (
ex.on('entry', (headers, stream, next) => {
const name = headers.name ?? ''
// We found blob data
if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref<Doc>)) {
const rdoc = name.substring(0, name.length - 5) as Ref<Doc>
if (name.endsWith('.json') && requiredDocs.has(rdoc)) {
const chunks: Buffer[] = []
const bname = name.substring(0, name.length - 5)
stream.on('data', (chunk) => {
@ -2279,11 +2312,19 @@ async function verifyDocsFromSnapshot (
// Skip blob
validDocs.add(bname as Ref<Doc>)
} else {
;(doc as any)['%hash%'] = digest.get(doc._id)
digest.delete(bname as Ref<Doc>)
;(doc as any)['%hash%'] = digest.get(rdoc)
digest.delete(rdoc)
result.push(doc)
validDocs.add(bname as Ref<Doc>)
if (result.length > chunkSize) {
void verify(result).then(() => {
result = []
next()
})
} else {
next()
}
}
})
} else {
@ -2311,6 +2352,9 @@ async function verifyDocsFromSnapshot (
})
await endPromise
if (result.length > 0) {
await verify(result)
}
} catch (err: any) {
storageToRemove.add(sf)
ctx.error('failed to processing', { storageFile: sf })
@ -2320,11 +2364,17 @@ async function verifyDocsFromSnapshot (
}
let modified = false
if (storageToRemove.size > 0) {
modifiedFiles.push(...Array.from(storageToRemove))
d.storage = (d.storage ?? []).filter((it) => !storageToRemove.has(it))
for (const sf of storageToRemove) {
await storage.delete(sf)
}
modified = true
}
modified = await updateDigest(d, ctx, storage, validDocs, modified, domain)
return { docs: result, modified }
let smodifiedFiles: string[] = []
;({ modified, modifiedFiles: smodifiedFiles } = await updateDigest(d, ctx, storage, validDocs, modified, domain))
modifiedFiles.push(...smodifiedFiles)
return { modified, modifiedFiles }
}
/**
@ -2676,7 +2726,7 @@ function migradeBlobData (blob: Blob, etag: string): string {
* @public
*/
export async function checkBackupIntegrity (ctx: MeasureContext, storage: BackupStorage): Promise<void> {
console.log('starting backup compaction')
console.log('check backup integrity')
try {
let backupInfo: BackupInfo
@ -2695,6 +2745,8 @@ export async function checkBackupIntegrity (ctx: MeasureContext, storage: Backup
return
}
const recheckSizes: string[] = []
const domains: Domain[] = []
for (const sn of backupInfo.snapshots) {
for (const d of Object.keys(sn.domains)) {
@ -2707,13 +2759,23 @@ export async function checkBackupIntegrity (ctx: MeasureContext, storage: Backup
for (const domain of domains) {
console.log('checking domain...', domain)
if (await verifyDigest(ctx, storage, backupInfo.snapshots, domain)) {
const { modified: mm, modifiedFiles } = await verifyDigest(ctx, storage, backupInfo.snapshots, domain)
if (mm) {
recheckSizes.push(...modifiedFiles)
modified = true
}
}
if (modified) {
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
}
const bresult: BackupResult = {
backupSize: 0,
blobsSize: 0,
dataSize: 0,
result: true
}
await rebuildSizeInfo(storage, recheckSizes, ctx, bresult, backupInfo, infoFile)
} catch (err: any) {
console.error(err)
} finally {

View File

@ -583,7 +583,7 @@ export class WorkspaceWorker {
50000,
['blob'],
sharedPipelineContextVars,
true,
true, // Do a full check
(_p: number) => {
if (progress !== Math.round(_p)) {
progress = Math.round(_p)