mirror of
https://github.com/hcengineering/platform.git
synced 2025-04-29 11:43:49 +00:00
UBERF-9550: Add backup with verify (#8137)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
3116af99ba
commit
f5f94f6366
7
.vscode/launch.json
vendored
7
.vscode/launch.json
vendored
@ -397,15 +397,14 @@
|
||||
"name": "Debug backup tool",
|
||||
"type": "node",
|
||||
"request": "launch",
|
||||
"args": ["src/__start.ts", "backup", "../../../hardware/dump/githubcr", "w-haiodo-githubcr-67403799-de2a46aa46-beb3b2"],
|
||||
"args": ["src/__start.ts", "backup", "../../../hardware/dump/felix", "felix", "--full"],
|
||||
"env": {
|
||||
"MINIO_ACCESS_KEY": "minioadmin",
|
||||
"MINIO_SECRET_KEY": "minioadmin",
|
||||
"MINIO_ENDPOINT": "localhost",
|
||||
"SERVER_SECRET": "secret",
|
||||
"MONGO_URL": "mongodb://localhost:27017",
|
||||
"ACCOUNTS_URL": "http://localhost:3000",
|
||||
"TELEGRAM_DATABASE": "telegram-service"
|
||||
"MONGO_URL": "mongodb://localhost:27018",
|
||||
"ACCOUNTS_URL": "http://localhost:3003"
|
||||
},
|
||||
"smartStep": true,
|
||||
"sourceMapRenames": true,
|
||||
|
@ -907,6 +907,7 @@ export function devTool (
|
||||
.description('dump workspace transactions and minio resources')
|
||||
.option('-i, --include <include>', 'A list of ; separated domain names to include during backup', '*')
|
||||
.option('-s, --skip <skip>', 'A list of ; separated domain names to skip during backup', '')
|
||||
.option('--full', 'Full recheck', false)
|
||||
.option(
|
||||
'-ct, --contentTypes <contentTypes>',
|
||||
'A list of ; separated content types for blobs to skip download if size >= limit',
|
||||
@ -926,6 +927,7 @@ export function devTool (
|
||||
include: string
|
||||
blobLimit: string
|
||||
contentTypes: string
|
||||
full: boolean
|
||||
}
|
||||
) => {
|
||||
const storage = await createFileBackupStorage(dirName)
|
||||
@ -981,6 +983,71 @@ export function devTool (
|
||||
// await checkBackupIntegrity(toolCtx, storage)
|
||||
// })
|
||||
|
||||
program
|
||||
.command('backup-check-all')
|
||||
.description('Check Backup integrity')
|
||||
.option('-r|--region [region]', 'Timeout in days', '')
|
||||
.option('-w|--workspace [workspace]', 'Force backup of selected workspace', '')
|
||||
.option('-s|--skip [skip]', 'A command separated list of workspaces to skip', '')
|
||||
.option('-d|--dry [dry]', 'Dry run', false)
|
||||
.action(async (cmd: { timeout: string, workspace: string, region: string, dry: boolean, skip: string }) => {
|
||||
const bucketName = process.env.BUCKET_NAME
|
||||
if (bucketName === '' || bucketName == null) {
|
||||
console.error('please provide butket name env')
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
const skipWorkspaces = new Set(cmd.skip.split(',').map((it) => it.trim()))
|
||||
|
||||
const token = generateToken(systemAccountEmail, getWorkspaceId(''))
|
||||
const workspaces = (await listAccountWorkspaces(token, cmd.region))
|
||||
.sort((a, b) => {
|
||||
const bsize = b.backupInfo?.backupSize ?? 0
|
||||
const asize = a.backupInfo?.backupSize ?? 0
|
||||
return bsize - asize
|
||||
})
|
||||
.filter((it) => (cmd.workspace === '' || cmd.workspace === it.workspace) && !skipWorkspaces.has(it.workspace))
|
||||
|
||||
const backupStorageConfig = storageConfigFromEnv(process.env.STORAGE)
|
||||
const storageAdapter = createStorageFromConfig(backupStorageConfig.storages[0])
|
||||
for (const ws of workspaces) {
|
||||
const lastVisitDays = Math.floor((Date.now() - ws.lastVisit) / 1000 / 3600 / 24)
|
||||
|
||||
toolCtx.warn('--- checking workspace backup', {
|
||||
url: ws.workspaceUrl,
|
||||
id: ws.workspace,
|
||||
lastVisitDays,
|
||||
backupSize: ws.backupInfo?.blobsSize ?? 0,
|
||||
mode: ws.mode
|
||||
})
|
||||
if (cmd.dry) {
|
||||
continue
|
||||
}
|
||||
try {
|
||||
const st = Date.now()
|
||||
|
||||
try {
|
||||
const storage = await createStorageBackupStorage(
|
||||
toolCtx,
|
||||
storageAdapter,
|
||||
getWorkspaceId(bucketName),
|
||||
ws.workspace
|
||||
)
|
||||
await checkBackupIntegrity(toolCtx, storage)
|
||||
} catch (err: any) {
|
||||
toolCtx.error('failed to size backup', { err })
|
||||
}
|
||||
const ed = Date.now()
|
||||
toolCtx.warn('--- check complete', {
|
||||
time: ed - st
|
||||
})
|
||||
} catch (err: any) {
|
||||
toolCtx.error('REstore of f workspace failedarchive workspace', { workspace: ws.workspace })
|
||||
}
|
||||
}
|
||||
await storageAdapter.close()
|
||||
})
|
||||
|
||||
program
|
||||
.command('backup-restore <dirName> <workspace> [date]')
|
||||
.option('-m, --merge', 'Enable merge of remote and backup content.', false)
|
||||
|
@ -94,7 +94,7 @@ export async function backupWorkspace (
|
||||
region: string,
|
||||
downloadLimit: number,
|
||||
contextVars: Record<string, any>,
|
||||
|
||||
fullCheck: boolean = false,
|
||||
onFinish?: (backupStorage: StorageAdapter, workspaceStorage: StorageAdapter) => Promise<void>
|
||||
): Promise<boolean> {
|
||||
const config = _config()
|
||||
@ -129,7 +129,8 @@ export async function backupWorkspace (
|
||||
region,
|
||||
downloadLimit,
|
||||
[],
|
||||
contextVars
|
||||
contextVars,
|
||||
fullCheck
|
||||
)
|
||||
if (result && onFinish !== undefined) {
|
||||
await onFinish(storageAdapter, workspaceStorageAdapter)
|
||||
|
@ -312,71 +312,80 @@ async function verifyDigest (
|
||||
d.storage = (d.storage ?? []).filter((it) => !storageToRemove.has(it))
|
||||
}
|
||||
|
||||
// if (d?.snapshot !== undefined) {
|
||||
// Will not check old format
|
||||
// }
|
||||
const digestToRemove = new Set<string>()
|
||||
for (const snapshot of d?.snapshots ?? []) {
|
||||
try {
|
||||
ctx.info('checking', { snapshot })
|
||||
const changes: Snapshot = {
|
||||
added: new Map(),
|
||||
removed: [],
|
||||
updated: new Map()
|
||||
}
|
||||
let lmodified = false
|
||||
try {
|
||||
const dataBlob = gunzipSync(new Uint8Array(await storage.loadFile(snapshot)))
|
||||
.toString()
|
||||
.split('\n')
|
||||
const addedCount = parseInt(dataBlob.shift() ?? '0')
|
||||
const added = dataBlob.splice(0, addedCount)
|
||||
for (const it of added) {
|
||||
const [k, v] = it.split(';')
|
||||
if (validDocs.has(k as any)) {
|
||||
changes.added.set(k as Ref<Doc>, v)
|
||||
} else {
|
||||
lmodified = true
|
||||
}
|
||||
}
|
||||
|
||||
const updatedCount = parseInt(dataBlob.shift() ?? '0')
|
||||
const updated = dataBlob.splice(0, updatedCount)
|
||||
for (const it of updated) {
|
||||
const [k, v] = it.split(';')
|
||||
if (validDocs.has(k as any)) {
|
||||
changes.updated.set(k as Ref<Doc>, v)
|
||||
} else {
|
||||
lmodified = true
|
||||
}
|
||||
}
|
||||
|
||||
const removedCount = parseInt(dataBlob.shift() ?? '0')
|
||||
const removed = dataBlob.splice(0, removedCount)
|
||||
changes.removed = removed as Ref<Doc>[]
|
||||
} catch (err: any) {
|
||||
ctx.warn('failed during processing of snapshot file, it will be skipped', { snapshot })
|
||||
digestToRemove.add(snapshot)
|
||||
modified = true
|
||||
}
|
||||
|
||||
if (lmodified) {
|
||||
modified = true
|
||||
// Store changes without missing files
|
||||
await writeChanges(storage, snapshot, changes)
|
||||
}
|
||||
} catch (err: any) {
|
||||
digestToRemove.add(snapshot)
|
||||
ctx.error('digest is broken, will do full backup for', { domain, err: err.message, snapshot })
|
||||
modified = true
|
||||
}
|
||||
}
|
||||
d.snapshots = (d.snapshots ?? []).filter((it) => !digestToRemove.has(it))
|
||||
modified = await updateDigest(d, ctx, storage, validDocs, modified, domain)
|
||||
}
|
||||
ctx.end()
|
||||
return modified
|
||||
}
|
||||
|
||||
async function updateDigest (
|
||||
d: DomainData,
|
||||
ctx: MeasureContext<any>,
|
||||
storage: BackupStorage,
|
||||
validDocs: Set<Ref<Doc>>,
|
||||
modified: boolean,
|
||||
domain: Domain
|
||||
): Promise<boolean> {
|
||||
const digestToRemove = new Set<string>()
|
||||
for (const snapshot of d?.snapshots ?? []) {
|
||||
try {
|
||||
ctx.info('checking', { snapshot })
|
||||
const changes: Snapshot = {
|
||||
added: new Map(),
|
||||
removed: [],
|
||||
updated: new Map()
|
||||
}
|
||||
let lmodified = false
|
||||
try {
|
||||
const dataBlob = gunzipSync(new Uint8Array(await storage.loadFile(snapshot)))
|
||||
.toString()
|
||||
.split('\n')
|
||||
const addedCount = parseInt(dataBlob.shift() ?? '0')
|
||||
const added = dataBlob.splice(0, addedCount)
|
||||
for (const it of added) {
|
||||
const [k, v] = it.split(';')
|
||||
if (validDocs.has(k as any)) {
|
||||
changes.added.set(k as Ref<Doc>, v)
|
||||
} else {
|
||||
lmodified = true
|
||||
}
|
||||
}
|
||||
|
||||
const updatedCount = parseInt(dataBlob.shift() ?? '0')
|
||||
const updated = dataBlob.splice(0, updatedCount)
|
||||
for (const it of updated) {
|
||||
const [k, v] = it.split(';')
|
||||
if (validDocs.has(k as any)) {
|
||||
changes.updated.set(k as Ref<Doc>, v)
|
||||
} else {
|
||||
lmodified = true
|
||||
}
|
||||
}
|
||||
|
||||
const removedCount = parseInt(dataBlob.shift() ?? '0')
|
||||
const removed = dataBlob.splice(0, removedCount)
|
||||
changes.removed = removed as Ref<Doc>[]
|
||||
} catch (err: any) {
|
||||
ctx.warn('failed during processing of snapshot file, it will be skipped', { snapshot })
|
||||
digestToRemove.add(snapshot)
|
||||
modified = true
|
||||
}
|
||||
|
||||
if (lmodified) {
|
||||
modified = true
|
||||
// Store changes without missing files
|
||||
await writeChanges(storage, snapshot, changes)
|
||||
}
|
||||
} catch (err: any) {
|
||||
digestToRemove.add(snapshot)
|
||||
ctx.error('digest is broken, will do full backup for', { domain, err: err.message, snapshot })
|
||||
modified = true
|
||||
}
|
||||
}
|
||||
d.snapshots = (d.snapshots ?? []).filter((it) => !digestToRemove.has(it))
|
||||
return modified
|
||||
}
|
||||
|
||||
async function write (chunk: any, stream: Writable): Promise<void> {
|
||||
let needDrain = false
|
||||
await new Promise((resolve, reject) => {
|
||||
@ -683,6 +692,7 @@ export async function backup (
|
||||
isCanceled?: () => boolean
|
||||
progress?: (progress: number) => Promise<void>
|
||||
token?: string
|
||||
fullVerify?: boolean
|
||||
} = {
|
||||
force: false,
|
||||
timeout: 0,
|
||||
@ -758,11 +768,13 @@ export async function backup (
|
||||
backupInfo.domainHashes = {}
|
||||
}
|
||||
|
||||
const fullCheck = options.fullVerify === true
|
||||
|
||||
let lastTx: Tx | undefined
|
||||
|
||||
let lastTxChecked = false
|
||||
// Skip backup if there is no transaction changes.
|
||||
if (options.getLastTx !== undefined) {
|
||||
if (options.getLastTx !== undefined && !fullCheck) {
|
||||
lastTx = await options.getLastTx()
|
||||
if (lastTx !== undefined) {
|
||||
if (lastTx._id === backupInfo.lastTxId && !options.force) {
|
||||
@ -788,7 +800,7 @@ export async function backup (
|
||||
? await options.getConnection()
|
||||
: ((await createClient(transactorUrl, token, undefined, options.connectTimeout)) as CoreClient & BackupClient)
|
||||
|
||||
if (!lastTxChecked) {
|
||||
if (!lastTxChecked && !fullCheck) {
|
||||
lastTx = await connection.findOne(
|
||||
core.class.Tx,
|
||||
{ objectSpace: { $ne: core.space.Model } },
|
||||
@ -871,7 +883,8 @@ export async function backup (
|
||||
ctx: MeasureContext,
|
||||
domain: Domain,
|
||||
digest: Map<Ref<Doc>, string>,
|
||||
changes: Snapshot
|
||||
changes: Snapshot,
|
||||
same: Map<Ref<Doc>, string>
|
||||
): Promise<{ changed: number, needRetrieveChunks: Ref<Doc>[][] }> {
|
||||
let idx: number | undefined
|
||||
let processed = 0
|
||||
@ -924,7 +937,11 @@ export async function backup (
|
||||
const serverDocHash = doTrimHash(hash) as string
|
||||
const currentHash = doTrimHash(digest.get(id as Ref<Doc>) ?? oldHash.get(id as Ref<Doc>))
|
||||
if (currentHash !== undefined) {
|
||||
const oldD = digest.get(id as Ref<Doc>)
|
||||
if (digest.delete(id as Ref<Doc>)) {
|
||||
if (oldD !== undefined) {
|
||||
same.set(id as Ref<Doc>, oldD)
|
||||
}
|
||||
oldHash.set(id as Ref<Doc>, currentHash)
|
||||
}
|
||||
if (currentHash !== serverDocHash) {
|
||||
@ -1015,12 +1032,13 @@ export async function backup (
|
||||
}
|
||||
|
||||
const dHash = await connection.getDomainHash(domain)
|
||||
if (backupInfo.domainHashes[domain] === dHash) {
|
||||
if (backupInfo.domainHashes[domain] === dHash && !fullCheck) {
|
||||
ctx.info('no changes in domain', { domain })
|
||||
return
|
||||
}
|
||||
// Cumulative digest
|
||||
const digest = await ctx.with('load-digest', {}, (ctx) => loadDigest(ctx, storage, backupInfo.snapshots, domain))
|
||||
const same = new Map<Ref<Doc>, string>()
|
||||
|
||||
let _pack: Pack | undefined
|
||||
let _packClose = async (): Promise<void> => {}
|
||||
@ -1030,11 +1048,69 @@ export async function backup (
|
||||
await progress(0)
|
||||
}
|
||||
let { changed, needRetrieveChunks } = await ctx.with('load-chunks', { domain }, (ctx) =>
|
||||
loadChangesFromServer(ctx, domain, digest, changes)
|
||||
loadChangesFromServer(ctx, domain, digest, changes, same)
|
||||
)
|
||||
processedChanges.removed = Array.from(digest.keys())
|
||||
digest.clear()
|
||||
|
||||
if (options.fullVerify === true && domain !== DOMAIN_BLOB && same.size > 0) {
|
||||
// We need to verify existing documents are correct
|
||||
const rsnapshots = Array.from(backupInfo.snapshots).reverse()
|
||||
// We need to load all documents locally and from server and compare
|
||||
for (const s of rsnapshots) {
|
||||
const d = s.domains[domain]
|
||||
if (d == null) {
|
||||
continue
|
||||
}
|
||||
const { docs, modified } = await verifyDocsFromSnapshot(ctx, domain, d, s, storage, same)
|
||||
if (modified) {
|
||||
changed++
|
||||
}
|
||||
const batchSize = 200
|
||||
let needRetrieve: Ref<Doc>[] = []
|
||||
for (let i = 0; i < docs.length; i += batchSize) {
|
||||
const part = docs.slice(i, i + batchSize)
|
||||
const serverDocs = await connection.loadDocs(
|
||||
domain,
|
||||
part.map((it) => it._id)
|
||||
)
|
||||
const smap = toIdMap(serverDocs)
|
||||
for (const localDoc of part) {
|
||||
if (TxProcessor.isExtendsCUD(localDoc._class)) {
|
||||
const tx = localDoc as TxCUD<Doc>
|
||||
if (tx.objectSpace == null) {
|
||||
tx.objectSpace = core.space.Workspace
|
||||
}
|
||||
}
|
||||
const serverDoc = smap.get(localDoc._id)
|
||||
if (serverDoc === undefined) {
|
||||
// We do not have a doc on server already, ignore it.
|
||||
} else {
|
||||
const { '%hash%': _h1, ...dData } = localDoc as any
|
||||
const { '%hash%': _h2, ...sData } = serverDoc as any
|
||||
|
||||
const dsame = deepEqual(dData, sData)
|
||||
if (!dsame) {
|
||||
needRetrieve.push(localDoc._id)
|
||||
changes.updated.set(localDoc._id, same.get(localDoc._id) ?? '')
|
||||
// Docs are not same
|
||||
if (needRetrieve.length > 200) {
|
||||
needRetrieveChunks.push(needRetrieve)
|
||||
needRetrieve = []
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (needRetrieve.length > 0) {
|
||||
needRetrieveChunks.push(needRetrieve)
|
||||
needRetrieve = []
|
||||
}
|
||||
}
|
||||
} else {
|
||||
same.clear()
|
||||
}
|
||||
|
||||
if (progress !== undefined) {
|
||||
await progress(10)
|
||||
}
|
||||
@ -1281,19 +1357,21 @@ export async function backup (
|
||||
changed++
|
||||
}
|
||||
|
||||
if (changed > 0 || backupInfo.domainHashes[domain] !== dHash) {
|
||||
if (changed > 0 || (domain !== DOMAIN_BLOB && backupInfo.domainHashes[domain] !== dHash)) {
|
||||
// Store domain hash, to be used on next time.
|
||||
backupInfo.domainHashes[domain] = dHash
|
||||
|
||||
snapshot.domains[domain] = domainInfo
|
||||
domainInfo.added += processedChanges.added.size
|
||||
domainInfo.updated += processedChanges.updated.size
|
||||
domainInfo.removed += processedChanges.removed.length
|
||||
if (domainInfo.added + domainInfo.updated + domainInfo.removed > 0) {
|
||||
snapshot.domains[domain] = domainInfo
|
||||
|
||||
const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`)
|
||||
snapshotIndex++
|
||||
domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile]
|
||||
await writeChanges(storage, snapshotFile, processedChanges)
|
||||
const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`)
|
||||
snapshotIndex++
|
||||
domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile]
|
||||
await writeChanges(storage, snapshotFile, processedChanges)
|
||||
}
|
||||
|
||||
processedChanges.added.clear()
|
||||
processedChanges.removed = []
|
||||
@ -1955,95 +2033,111 @@ export async function restore (
|
||||
if (docsToAdd.size === 0) {
|
||||
break
|
||||
}
|
||||
ctx.info('processing', { storageFile: sf, processed, workspace: workspaceId })
|
||||
ctx.info('processing', { storageFile: sf, processed, workspace: workspaceId.name })
|
||||
try {
|
||||
const readStream = await storage.load(sf)
|
||||
const ex = extract()
|
||||
|
||||
const readStream = await storage.load(sf)
|
||||
const ex = extract()
|
||||
|
||||
ex.on('entry', (headers, stream, next) => {
|
||||
const name = headers.name ?? ''
|
||||
processed++
|
||||
// We found blob data
|
||||
if (requiredDocs.has(name as Ref<Doc>)) {
|
||||
const chunks: Buffer[] = []
|
||||
stream.on('data', (chunk) => {
|
||||
chunks.push(chunk)
|
||||
})
|
||||
stream.on('end', () => {
|
||||
const bf = Buffer.concat(chunks as any)
|
||||
const d = blobs.get(name)
|
||||
if (d === undefined) {
|
||||
blobs.set(name, { doc: undefined, buffer: bf })
|
||||
next()
|
||||
} else {
|
||||
blobs.delete(name)
|
||||
const blob = d?.doc as Blob
|
||||
;(blob as any)['%hash%'] = changeset.get(blob._id)
|
||||
let sz = blob.size
|
||||
if (Number.isNaN(sz) || sz !== bf.length) {
|
||||
sz = bf.length
|
||||
}
|
||||
void sendBlob(blob, bf, next)
|
||||
}
|
||||
})
|
||||
} else if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref<Doc>)) {
|
||||
const chunks: Buffer[] = []
|
||||
const bname = name.substring(0, name.length - 5)
|
||||
stream.on('data', (chunk) => {
|
||||
chunks.push(chunk)
|
||||
})
|
||||
stream.on('end', () => {
|
||||
const bf = Buffer.concat(chunks as any)
|
||||
let doc: Doc
|
||||
try {
|
||||
doc = JSON.parse(bf.toString()) as Doc
|
||||
} catch (err) {
|
||||
ctx.warn('failed to parse blob metadata', { name, workspace: workspaceId, err })
|
||||
next()
|
||||
return
|
||||
}
|
||||
|
||||
if (doc._class === core.class.Blob || doc._class === 'core:class:BlobData') {
|
||||
const data = migradeBlobData(doc as Blob, changeset.get(doc._id) as string)
|
||||
const d = blobs.get(bname) ?? (data !== '' ? Buffer.from(data, 'base64') : undefined)
|
||||
ex.on('entry', (headers, stream, next) => {
|
||||
const name = headers.name ?? ''
|
||||
processed++
|
||||
// We found blob data
|
||||
if (requiredDocs.has(name as Ref<Doc>)) {
|
||||
const chunks: Buffer[] = []
|
||||
stream.on('data', (chunk) => {
|
||||
chunks.push(chunk)
|
||||
})
|
||||
stream.on('end', () => {
|
||||
const bf = Buffer.concat(chunks as any)
|
||||
const d = blobs.get(name)
|
||||
if (d === undefined) {
|
||||
blobs.set(bname, { doc, buffer: undefined })
|
||||
blobs.set(name, { doc: undefined, buffer: bf })
|
||||
next()
|
||||
} else {
|
||||
blobs.delete(bname)
|
||||
const blob = doc as Blob
|
||||
void sendBlob(blob, d instanceof Buffer ? d : (d.buffer as Buffer), next)
|
||||
blobs.delete(name)
|
||||
const blob = d?.doc as Blob
|
||||
;(blob as any)['%hash%'] = changeset.get(blob._id)
|
||||
let sz = blob.size
|
||||
if (Number.isNaN(sz) || sz !== bf.length) {
|
||||
sz = bf.length
|
||||
}
|
||||
void sendBlob(blob, bf, next).catch((err) => {
|
||||
ctx.error('failed to send blob', { err })
|
||||
})
|
||||
}
|
||||
} else {
|
||||
;(doc as any)['%hash%'] = changeset.get(doc._id)
|
||||
void sendChunk(doc, bf.length).finally(() => {
|
||||
requiredDocs.delete(doc._id)
|
||||
})
|
||||
} else if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref<Doc>)) {
|
||||
const chunks: Buffer[] = []
|
||||
const bname = name.substring(0, name.length - 5)
|
||||
stream.on('data', (chunk) => {
|
||||
chunks.push(chunk)
|
||||
})
|
||||
stream.on('end', () => {
|
||||
const bf = Buffer.concat(chunks as any)
|
||||
let doc: Doc
|
||||
try {
|
||||
doc = JSON.parse(bf.toString()) as Doc
|
||||
} catch (err) {
|
||||
ctx.warn('failed to parse blob metadata', { name, workspace: workspaceId.name, err })
|
||||
next()
|
||||
})
|
||||
}
|
||||
})
|
||||
} else {
|
||||
next()
|
||||
}
|
||||
stream.resume() // just auto drain the stream
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
await blobUploader.waitProcessing()
|
||||
|
||||
const endPromise = new Promise((resolve) => {
|
||||
ex.on('finish', () => {
|
||||
resolve(null)
|
||||
if (doc._class === core.class.Blob || doc._class === 'core:class:BlobData') {
|
||||
const data = migradeBlobData(doc as Blob, changeset.get(doc._id) as string)
|
||||
const d = blobs.get(bname) ?? (data !== '' ? Buffer.from(data, 'base64') : undefined)
|
||||
if (d === undefined) {
|
||||
blobs.set(bname, { doc, buffer: undefined })
|
||||
next()
|
||||
} else {
|
||||
blobs.delete(bname)
|
||||
const blob = doc as Blob
|
||||
const buff = d instanceof Buffer ? d : d.buffer
|
||||
if (buff != null) {
|
||||
void sendBlob(blob, d instanceof Buffer ? d : (d.buffer as Buffer), next).catch((err) => {
|
||||
ctx.error('failed to send blob', { err })
|
||||
})
|
||||
} else {
|
||||
next()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
;(doc as any)['%hash%'] = changeset.get(doc._id)
|
||||
void sendChunk(doc, bf.length).finally(() => {
|
||||
requiredDocs.delete(doc._id)
|
||||
next()
|
||||
})
|
||||
}
|
||||
})
|
||||
} else {
|
||||
next()
|
||||
}
|
||||
stream.resume() // just auto drain the stream
|
||||
})
|
||||
})
|
||||
const unzip = createGunzip({ level: defaultLevel })
|
||||
|
||||
readStream.on('end', () => {
|
||||
readStream.destroy()
|
||||
})
|
||||
readStream.pipe(unzip)
|
||||
unzip.pipe(ex)
|
||||
await blobUploader.waitProcessing()
|
||||
|
||||
await endPromise
|
||||
const unzip = createGunzip({ level: defaultLevel })
|
||||
|
||||
const endPromise = new Promise((resolve, reject) => {
|
||||
ex.on('finish', () => {
|
||||
resolve(null)
|
||||
})
|
||||
|
||||
readStream.on('end', () => {
|
||||
readStream.destroy()
|
||||
})
|
||||
readStream.pipe(unzip).on('error', (err) => {
|
||||
readStream.destroy()
|
||||
reject(err)
|
||||
})
|
||||
unzip.pipe(ex)
|
||||
})
|
||||
|
||||
await endPromise
|
||||
} catch (err: any) {
|
||||
ctx.error('failed to processing', { storageFile: sf, processed, workspace: workspaceId.name })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2146,6 +2240,103 @@ export async function restore (
|
||||
return true
|
||||
}
|
||||
|
||||
async function verifyDocsFromSnapshot (
|
||||
ctx: MeasureContext,
|
||||
domain: Domain,
|
||||
d: DomainData,
|
||||
s: BackupSnapshot,
|
||||
storage: BackupStorage,
|
||||
digest: Map<Ref<Doc>, string>
|
||||
): Promise<{ docs: Doc[], modified: boolean }> {
|
||||
const result: Doc[] = []
|
||||
const storageToRemove = new Set<string>()
|
||||
const validDocs = new Set<Ref<Doc>>()
|
||||
if (digest.size > 0) {
|
||||
const sDigest = await loadDigest(ctx, storage, [s], domain)
|
||||
const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => digest.has(it)))
|
||||
|
||||
if (requiredDocs.size > 0) {
|
||||
ctx.info('updating', { domain, requiredDocs: requiredDocs.size })
|
||||
// We have required documents here.
|
||||
for (const sf of d.storage ?? []) {
|
||||
if (digest.size === 0) {
|
||||
break
|
||||
}
|
||||
try {
|
||||
const readStream = await storage.load(sf)
|
||||
const ex = extract()
|
||||
|
||||
ex.on('entry', (headers, stream, next) => {
|
||||
const name = headers.name ?? ''
|
||||
// We found blob data
|
||||
if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref<Doc>)) {
|
||||
const chunks: Buffer[] = []
|
||||
const bname = name.substring(0, name.length - 5)
|
||||
stream.on('data', (chunk) => {
|
||||
chunks.push(chunk)
|
||||
})
|
||||
stream.on('end', () => {
|
||||
const bf = Buffer.concat(chunks as any)
|
||||
let doc: Doc
|
||||
try {
|
||||
doc = JSON.parse(bf.toString()) as Doc
|
||||
} catch (err) {
|
||||
next()
|
||||
return
|
||||
}
|
||||
|
||||
if (doc._class === core.class.Blob || doc._class === 'core:class:BlobData') {
|
||||
// Skip blob
|
||||
validDocs.add(bname as Ref<Doc>)
|
||||
} else {
|
||||
;(doc as any)['%hash%'] = digest.get(doc._id)
|
||||
digest.delete(bname as Ref<Doc>)
|
||||
result.push(doc)
|
||||
validDocs.add(bname as Ref<Doc>)
|
||||
next()
|
||||
}
|
||||
})
|
||||
} else {
|
||||
next()
|
||||
}
|
||||
stream.resume() // just auto drain the stream
|
||||
})
|
||||
|
||||
const unzip = createGunzip({ level: defaultLevel })
|
||||
|
||||
const endPromise = new Promise((resolve, reject) => {
|
||||
ex.on('finish', () => {
|
||||
resolve(null)
|
||||
})
|
||||
|
||||
readStream.on('end', () => {
|
||||
readStream.destroy()
|
||||
})
|
||||
readStream.pipe(unzip).on('error', (err) => {
|
||||
readStream.destroy()
|
||||
storageToRemove.add(sf)
|
||||
reject(err)
|
||||
})
|
||||
unzip.pipe(ex)
|
||||
})
|
||||
|
||||
await endPromise
|
||||
} catch (err: any) {
|
||||
storageToRemove.add(sf)
|
||||
ctx.error('failed to processing', { storageFile: sf })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let modified = false
|
||||
if (storageToRemove.size > 0) {
|
||||
d.storage = (d.storage ?? []).filter((it) => !storageToRemove.has(it))
|
||||
modified = true
|
||||
}
|
||||
modified = await updateDigest(d, ctx, storage, validDocs, modified, domain)
|
||||
return { docs: result, modified }
|
||||
}
|
||||
|
||||
/**
|
||||
* Compacting backup into just one snapshot.
|
||||
* @public
|
||||
@ -2429,15 +2620,18 @@ export async function compactBackup (
|
||||
}
|
||||
|
||||
if (changed > 0) {
|
||||
snapshot.domains[domain] = domainInfo
|
||||
domainInfo.added += processedChanges.added.size
|
||||
domainInfo.updated += processedChanges.updated.size
|
||||
domainInfo.removed += processedChanges.removed.length
|
||||
|
||||
const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`)
|
||||
snapshotIndex++
|
||||
domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile]
|
||||
await writeChanges(storage, snapshotFile, processedChanges)
|
||||
if (domainInfo.added + domainInfo.updated + domainInfo.removed > 0) {
|
||||
snapshot.domains[domain] = domainInfo
|
||||
|
||||
const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`)
|
||||
snapshotIndex++
|
||||
domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile]
|
||||
await writeChanges(storage, snapshotFile, processedChanges)
|
||||
}
|
||||
|
||||
processedChanges.added.clear()
|
||||
processedChanges.removed = []
|
||||
|
@ -74,7 +74,8 @@ class BackupWorker {
|
||||
) => DbConfiguration,
|
||||
readonly region: string,
|
||||
readonly contextVars: Record<string, any>,
|
||||
readonly skipDomains: string[] = []
|
||||
readonly skipDomains: string[] = [],
|
||||
readonly fullCheck: boolean = false
|
||||
) {}
|
||||
|
||||
canceled = false
|
||||
@ -285,6 +286,7 @@ class BackupWorker {
|
||||
connectTimeout: 5 * 60 * 1000, // 5 minutes to,
|
||||
blobDownloadLimit: this.downloadLimit,
|
||||
skipBlobContentTypes: [],
|
||||
fullVerify: this.fullCheck,
|
||||
storageAdapter: this.workspaceStorageAdapter,
|
||||
getLastTx: async (): Promise<Tx | undefined> => {
|
||||
const config = this.getConfig(ctx, wsIds, null, this.workspaceStorageAdapter)
|
||||
@ -408,6 +410,7 @@ export async function doBackupWorkspace (
|
||||
downloadLimit: number,
|
||||
skipDomains: string[],
|
||||
contextVars: Record<string, any>,
|
||||
fullCheck: boolean = false,
|
||||
notify?: (progress: number) => Promise<void>
|
||||
): Promise<boolean> {
|
||||
const backupWorker = new BackupWorker(
|
||||
@ -418,7 +421,8 @@ export async function doBackupWorkspace (
|
||||
getConfig,
|
||||
region,
|
||||
contextVars,
|
||||
skipDomains
|
||||
skipDomains,
|
||||
fullCheck
|
||||
)
|
||||
backupWorker.downloadLimit = downloadLimit
|
||||
const result = await backupWorker.doBackup(ctx, workspace, notify)
|
||||
|
@ -479,7 +479,7 @@ export class WorkspaceWorker {
|
||||
await sendEvent('archiving-backup-started', 0)
|
||||
|
||||
await this.sendTransactorMaitenance(token, workspace.uuid)
|
||||
if (await this.doBackup(ctx, workspace, opt, true)) {
|
||||
if (await this.doBackup(ctx, workspace, opt)) {
|
||||
await sendEvent('archiving-backup-done', 100)
|
||||
}
|
||||
break
|
||||
@ -516,7 +516,7 @@ export class WorkspaceWorker {
|
||||
case 'migration-backup':
|
||||
await sendEvent('migrate-backup-started', 0)
|
||||
await this.sendTransactorMaitenance(token, workspace.uuid)
|
||||
if (await this.doBackup(ctx, workspace, opt, false)) {
|
||||
if (await this.doBackup(ctx, workspace, opt)) {
|
||||
await sendEvent('migrate-backup-done', 100)
|
||||
}
|
||||
break
|
||||
@ -551,12 +551,7 @@ export class WorkspaceWorker {
|
||||
}
|
||||
}
|
||||
|
||||
private async doBackup (
|
||||
ctx: MeasureContext,
|
||||
workspace: WorkspaceInfoWithStatus,
|
||||
opt: WorkspaceOptions,
|
||||
archive: boolean
|
||||
): Promise<boolean> {
|
||||
private async doBackup (ctx: MeasureContext, workspace: WorkspaceInfoWithStatus, opt: WorkspaceOptions): Promise<boolean> {
|
||||
if (opt.backup === undefined) {
|
||||
return false
|
||||
}
|
||||
@ -624,6 +619,7 @@ export class WorkspaceWorker {
|
||||
50000,
|
||||
['blob'],
|
||||
sharedPipelineContextVars,
|
||||
true,
|
||||
(_p: number) => {
|
||||
if (progress !== Math.round(_p)) {
|
||||
progress = Math.round(_p)
|
||||
|
Loading…
Reference in New Issue
Block a user