diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index e470b9a64e..821bcf97f9 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -15,7 +15,6 @@ // import accountPlugin, { - type AccountDB, assignWorkspace, confirmEmail, createAcc, @@ -34,6 +33,7 @@ import accountPlugin, { setAccountAdmin, setRole, updateWorkspace, + type AccountDB, type Workspace } from '@hcengineering/account' import { setMetadata } from '@hcengineering/platform' @@ -41,13 +41,21 @@ import { backup, backupFind, backupList, + backupRemoveLast, backupSize, + checkBackupIntegrity, compactBackup, createFileBackupStorage, createStorageBackupStorage, restore } from '@hcengineering/server-backup' -import serverClientPlugin, { BlobClient, createClient, getTransactorEndpoint } from '@hcengineering/server-client' +import serverClientPlugin, { + BlobClient, + createClient, + getTransactorEndpoint, + listAccountWorkspaces, + updateBackupInfo +} from '@hcengineering/server-client' import { getServerPipeline, registerServerPlugins, registerStringLoaders } from '@hcengineering/server-pipeline' import serverToken, { decodeToken, generateToken } from '@hcengineering/server-token' import toolPlugin, { FileModelLogger } from '@hcengineering/server-tool' @@ -65,6 +73,7 @@ import core, { getWorkspaceId, MeasureMetricsContext, metricsToString, + RateLimiter, systemAccountEmail, versionToString, type Data, @@ -77,8 +86,8 @@ import core, { } from '@hcengineering/core' import { consoleModelLogger, type MigrateOperation } from '@hcengineering/model' import contact from '@hcengineering/model-contact' -import { backupDownload } from '@hcengineering/server-backup/src/backup' import { getMongoClient, getWorkspaceMongoDB, shutdown } from '@hcengineering/mongo' +import { backupDownload } from '@hcengineering/server-backup/src/backup' import type { StorageAdapter, StorageAdapterEx } from '@hcengineering/server-core' import { deepEqual } from 'fast-equals' @@ -104,7 +113,7 @@ import { restoreRecruitingTaskTypes } from './clean' import { changeConfiguration } from './configuration' -import { moveFromMongoToPG, moveWorkspaceFromMongoToPG, moveAccountDbFromMongoToPG } from './db' +import { moveAccountDbFromMongoToPG, moveFromMongoToPG, moveWorkspaceFromMongoToPG } from './db' import { fixJsonMarkup, migrateMarkup, restoreLostMarkup } from './markup' import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin' import { fixAccountEmails, renameAccount } from './renameAccount' @@ -814,6 +823,13 @@ export function devTool ( const storage = await createFileBackupStorage(dirName) await compactBackup(toolCtx, storage, cmd.force) }) + program + .command('backup-check ') + .description('Compact a given backup, will create one snapshot clean unused resources') + .action(async (dirName: string, cmd: any) => { + const storage = await createFileBackupStorage(dirName) + await checkBackupIntegrity(toolCtx, storage) + }) program .command('backup-restore [date]') @@ -864,6 +880,61 @@ export function devTool ( await backup(toolCtx, endpoint, wsid, storage) }) }) + program + .command('backup-s3-clean ') + .description('dump workspace transactions and minio resources') + .action(async (bucketName: string, days: string, cmd) => { + const backupStorageConfig = storageConfigFromEnv(process.env.STORAGE) + const storageAdapter = createStorageFromConfig(backupStorageConfig.storages[0]) + + const daysInterval = Date.now() - parseInt(days) * 24 * 60 * 60 * 1000 + try { + const token = generateToken(systemAccountEmail, { name: 'any' }) + const workspaces = (await listAccountWorkspaces(token)).filter((it) => { + const lastBackup = it.backupInfo?.lastBackup ?? 0 + if (lastBackup > daysInterval) { + // No backup required, interval not elapsed + return true + } + + if (it.lastVisit == null) { + return false + } + + return false + }) + workspaces.sort((a, b) => { + return (b.backupInfo?.backupSize ?? 0) - (a.backupInfo?.backupSize ?? 0) + }) + + for (const ws of workspaces) { + const storage = await createStorageBackupStorage( + toolCtx, + storageAdapter, + getWorkspaceId(bucketName), + ws.workspace + ) + await backupRemoveLast(storage, daysInterval) + await updateBackupInfo(generateToken(systemAccountEmail, { name: 'any' }), { + backups: ws.backupInfo?.backups ?? 0, + backupSize: ws.backupInfo?.backupSize ?? 0, + blobsSize: ws.backupInfo?.blobsSize ?? 0, + dataSize: ws.backupInfo?.dataSize ?? 0, + lastBackup: daysInterval + }) + } + } finally { + await storageAdapter.close() + } + }) + program + .command('backup-clean ') + .description('dump workspace transactions and minio resources') + .action(async (dirName: string, days: string, cmd) => { + const daysInterval = Date.now() - parseInt(days) * 24 * 60 * 60 * 1000 + const storage = await createFileBackupStorage(dirName) + await backupRemoveLast(storage, daysInterval) + }) program .command('backup-s3-compact ') @@ -880,6 +951,20 @@ export function devTool ( } await storageAdapter.close() }) + program + .command('backup-s3-check ') + .description('Compact a given backup to just one snapshot') + .action(async (bucketName: string, dirName: string, cmd: any) => { + const backupStorageConfig = storageConfigFromEnv(process.env.STORAGE) + const storageAdapter = createStorageFromConfig(backupStorageConfig.storages[0]) + try { + const storage = await createStorageBackupStorage(toolCtx, storageAdapter, getWorkspaceId(bucketName), dirName) + await checkBackupIntegrity(toolCtx, storage) + } catch (err: any) { + toolCtx.error('failed to size backup', { err }) + } + await storageAdapter.close() + }) program .command('backup-s3-restore [date]') @@ -1100,7 +1185,7 @@ export function devTool ( .command('move-files') .option('-w, --workspace ', 'Selected workspace only', '') .option('-m, --move ', 'When set to true, the files will be moved, otherwise copied', 'false') - .option('-bl, --blobLimit ', 'A blob size limit in megabytes (default 50mb)', '50') + .option('-bl, --blobLimit ', 'A blob size limit in megabytes (default 50mb)', '999999') .option('-c, --concurrency ', 'Number of files being processed concurrently', '10') .option('--disabled', 'Include disabled workspaces', false) .action( @@ -1125,6 +1210,7 @@ export function devTool ( const workspaces = await listWorkspacesPure(db) workspaces.sort((a, b) => b.lastVisit - a.lastVisit) + const rateLimit = new RateLimiter(10) for (const workspace of workspaces) { if (cmd.workspace !== '' && workspace.workspace !== cmd.workspace) { continue @@ -1134,12 +1220,14 @@ export function devTool ( continue } - console.log('start', workspace.workspace, index, '/', workspaces.length) - await moveFiles(toolCtx, getWorkspaceId(workspace.workspace), exAdapter, params) - console.log('done', workspace.workspace) - - index += 1 + await rateLimit.exec(async () => { + console.log('start', workspace.workspace, index, '/', workspaces.length) + await moveFiles(toolCtx, getWorkspaceId(workspace.workspace), exAdapter, params) + console.log('done', workspace.workspace) + index += 1 + }) } + await rateLimit.waitProcessing() } catch (err: any) { console.error(err) } diff --git a/dev/tool/src/storage.ts b/dev/tool/src/storage.ts index 3a76b25ea2..a1ec1a7719 100644 --- a/dev/tool/src/storage.ts +++ b/dev/tool/src/storage.ts @@ -144,6 +144,23 @@ async function processAdapter ( let movedBytes = 0 let batchBytes = 0 + function printStats (): void { + const duration = Date.now() - time + console.log( + '...processed', + processedCnt, + Math.round(processedBytes / 1024 / 1024) + 'MB', + 'moved', + movedCnt, + Math.round(movedBytes / 1024 / 1024) + 'MB', + '+' + Math.round(batchBytes / 1024 / 1024) + 'MB', + Math.round(duration / 1000) + 's' + ) + + batchBytes = 0 + time = Date.now() + } + const rateLimiter = new RateLimiter(params.concurrency) const iterator = await source.listStream(ctx, workspaceId) @@ -152,15 +169,7 @@ async function processAdapter ( const targetBlobs = new Map, ListBlobResult>() - while (true) { - const part = await targetIterator.next() - for (const p of part) { - targetBlobs.set(p._id, p) - } - if (part.length === 0) { - break - } - } + let targetFilled = false const toRemove: string[] = [] try { @@ -168,6 +177,20 @@ async function processAdapter ( const dataBulk = await iterator.next() if (dataBulk.length === 0) break + if (!targetFilled) { + // Only fill target if have something to move. + targetFilled = true + while (true) { + const part = await targetIterator.next() + for (const p of part) { + targetBlobs.set(p._id, p) + } + if (part.length === 0) { + break + } + } + } + for (const data of dataBulk) { let targetBlob: Blob | ListBlobResult | undefined = targetBlobs.get(data._id) if (targetBlob !== undefined) { @@ -219,22 +242,7 @@ async function processAdapter ( if (processedCnt % 100 === 0) { await rateLimiter.waitProcessing() - - const duration = Date.now() - time - - console.log( - '...processed', - processedCnt, - Math.round(processedBytes / 1024 / 1024) + 'MB', - 'moved', - movedCnt, - Math.round(movedBytes / 1024 / 1024) + 'MB', - '+' + Math.round(batchBytes / 1024 / 1024) + 'MB', - Math.round(duration / 1000) + 's' - ) - - batchBytes = 0 - time = Date.now() + printStats() } } } @@ -246,6 +254,7 @@ async function processAdapter ( await source.remove(ctx, workspaceId, part) } } + printStats() } finally { await iterator.close() } diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts index 27e71b7581..1aef026c39 100644 --- a/server/backup/src/backup.ts +++ b/server/backup/src/backup.ts @@ -189,6 +189,190 @@ async function loadDigest ( ctx.end() return result } +async function verifyDigest ( + ctx: MeasureContext, + storage: BackupStorage, + snapshots: BackupSnapshot[], + domain: Domain +): Promise { + ctx = ctx.newChild('verify digest', { domain, count: snapshots.length }) + ctx.info('verify-digest', { domain, count: snapshots.length }) + let modified = false + for (const s of snapshots) { + const d = s.domains[domain] + if (d === undefined) { + continue + } + + const storageToRemove = new Set() + // We need to verify storage has all necessary resources + ctx.info('checking', { domain }) + // We have required documents here. + const validDocs = new Set>() + + for (const sf of d.storage ?? []) { + const blobs = new Map() + try { + ctx.info('checking storage', { sf }) + const readStream = await storage.load(sf) + const ex = extract() + + ex.on('entry', (headers, stream, next) => { + const name = headers.name ?? '' + // We found blob data + if (name.endsWith('.json')) { + const chunks: Buffer[] = [] + const bname = name.substring(0, name.length - 5) + stream.on('data', (chunk) => { + chunks.push(chunk) + }) + stream.on('end', () => { + const bf = Buffer.concat(chunks as any) + const doc = JSON.parse(bf.toString()) as Doc + if (doc._class === core.class.Blob || doc._class === 'core:class:BlobData') { + const data = migradeBlobData(doc as Blob, '') + const d = blobs.get(bname) ?? (data !== '' ? Buffer.from(data, 'base64') : undefined) + if (d === undefined) { + blobs.set(bname, { doc, buffer: undefined }) + } else { + blobs.delete(bname) + const blob = doc as Blob + + if (blob.size === bf.length) { + validDocs.add(name as Ref) + } + } + } else { + validDocs.add(name as Ref) + } + next() + }) + } else { + const chunks: Buffer[] = [] + stream.on('data', (chunk) => { + chunks.push(chunk) + }) + stream.on('end', () => { + const bf = Buffer.concat(chunks as any) + const d = blobs.get(name) + if (d === undefined) { + blobs.set(name, { doc: undefined, buffer: bf }) + } else { + blobs.delete(name) + const doc = d?.doc as Blob + let sz = doc.size + if (Number.isNaN(sz) || sz !== bf.length) { + sz = bf.length + } + + // If blob size matches doc size, remove from requiredDocs + if (sz === bf.length) { + validDocs.add(name as Ref) + } + } + next() + }) + } + stream.resume() // just auto drain the stream + }) + + const unzip = createGunzip({ level: defaultLevel }) + const endPromise = new Promise((resolve) => { + ex.on('finish', () => { + resolve(null) + }) + unzip.on('error', (err) => { + ctx.error('error during reading of', { sf, err }) + modified = true + storageToRemove.add(sf) + resolve(null) + }) + }) + + readStream.on('end', () => { + readStream.destroy() + }) + readStream.pipe(unzip) + unzip.pipe(ex) + + await endPromise + } catch (err: any) { + ctx.error('error during reading of', { sf, err }) + // In case of invalid archive, we need to + // We need to remove broken storage file + modified = true + storageToRemove.add(sf) + } + } + if (storageToRemove.size > 0) { + modified = true + d.storage = (d.storage ?? []).filter((it) => !storageToRemove.has(it)) + } + + // if (d?.snapshot !== undefined) { + // Will not check old format + // } + const digestToRemove = new Set() + for (const snapshot of d?.snapshots ?? []) { + try { + ctx.info('checking', { snapshot }) + const changes: Snapshot = { + added: new Map(), + removed: [], + updated: new Map() + } + let lmodified = false + try { + const dataBlob = gunzipSync(await storage.loadFile(snapshot)) + .toString() + .split('\n') + const addedCount = parseInt(dataBlob.shift() ?? '0') + const added = dataBlob.splice(0, addedCount) + for (const it of added) { + const [k, v] = it.split(';') + if (validDocs.has(k as any)) { + changes.added.set(k as Ref, v) + } else { + lmodified = true + } + } + + const updatedCount = parseInt(dataBlob.shift() ?? '0') + const updated = dataBlob.splice(0, updatedCount) + for (const it of updated) { + const [k, v] = it.split(';') + if (validDocs.has(k as any)) { + changes.updated.set(k as Ref, v) + } else { + lmodified = true + } + } + + const removedCount = parseInt(dataBlob.shift() ?? '0') + const removed = dataBlob.splice(0, removedCount) + changes.removed = removed as Ref[] + } catch (err: any) { + ctx.warn('failed during processing of snapshot file, it will be skipped', { snapshot }) + digestToRemove.add(snapshot) + modified = true + } + + if (lmodified) { + modified = true + // Store changes without missing files + await writeChanges(storage, snapshot, changes) + } + } catch (err: any) { + digestToRemove.add(snapshot) + ctx.error('digest is broken, will do full backup for', { domain }) + modified = true + } + } + d.snapshots = (d.snapshots ?? []).filter((it) => !digestToRemove.has(it)) + } + ctx.end() + return modified +} async function write (chunk: any, stream: Writable): Promise { let needDrain = false @@ -982,10 +1166,12 @@ export async function backup ( storageZip.pipe(sizePass) _packClose = async () => { - _pack?.finalize() - storageZip.destroy() - _pack?.destroy() - tempFile.destroy() + await new Promise((resolve) => { + tempFile.on('close', () => { + resolve() + }) + _pack?.finalize() + }) // We need to upload file to storage ctx.info('Upload pack file', { storageFile, size: sz, workspace: workspaceId.name }) @@ -1241,6 +1427,26 @@ export async function backupList (storage: BackupStorage): Promise { } } +/** + * @public + */ +export async function backupRemoveLast (storage: BackupStorage, date: number): Promise { + const infoFile = 'backup.json.gz' + + if (!(await storage.exists(infoFile))) { + throw new Error(`${infoFile} should present to restore`) + } + const backupInfo: BackupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString()) + console.log('workspace:', backupInfo.workspace ?? '', backupInfo.version) + const old = backupInfo.snapshots.length + backupInfo.snapshots = backupInfo.snapshots.filter((it) => it.date < date) + if (old !== backupInfo.snapshots.length) { + console.log('removed snapshots: id:', old - backupInfo.snapshots.length) + + await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) + } +} + /** * @public */ @@ -1980,7 +2186,7 @@ export async function compactBackup ( chunks.push(chunk) }) stream.on('end', () => { - const bf = Buffer.concat(chunks) + const bf = Buffer.concat(chunks as any) const d = blobs.get(name) if (d === undefined) { blobs.set(name, { doc: undefined, buffer: bf }) @@ -2031,12 +2237,16 @@ export async function compactBackup ( stream.resume() // just auto drain the stream }) + const unzip = createGunzip({ level: defaultLevel }) const endPromise = new Promise((resolve) => { ex.on('finish', () => { resolve(null) }) + unzip.on('error', (err) => { + ctx.error('error during processing', { snapshot, err }) + resolve(null) + }) }) - const unzip = createGunzip({ level: defaultLevel }) readStream.on('end', () => { readStream.destroy() @@ -2111,3 +2321,53 @@ function migradeBlobData (blob: Blob, etag: string): string { } return '' } + +/** + * Will check backup integrity, and in case of some missing resources, will update digest files, so next backup will backup all missing parts. + * @public + */ +export async function checkBackupIntegrity (ctx: MeasureContext, storage: BackupStorage): Promise { + console.log('starting backup compaction') + try { + let backupInfo: BackupInfo + + // Version 0.6.2, format of digest file is changed to + + const infoFile = 'backup.json.gz' + + if (await storage.exists(infoFile)) { + backupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString()) + } else { + console.log('No backup found') + return + } + if (backupInfo.version !== '0.6.2') { + console.log('Invalid backup version') + return + } + + const domains: Domain[] = [] + for (const sn of backupInfo.snapshots) { + for (const d of Object.keys(sn.domains)) { + if (!domains.includes(d as Domain)) { + domains.push(d as Domain) + } + } + } + let modified = false + + for (const domain of domains) { + console.log('checking domain...', domain) + if (await verifyDigest(ctx, storage, backupInfo.snapshots, domain)) { + modified = true + } + } + if (modified) { + await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) + } + } catch (err: any) { + console.error(err) + } finally { + console.log('end compacting') + } +}