mirror of
https://github.com/hcengineering/platform.git
synced 2025-04-16 05:13:06 +00:00
UBERF-8277: Fix blobs backup (#6730)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
9000e8c0bb
commit
28fffa883f
@ -40,6 +40,7 @@ import {
|
||||
backup,
|
||||
backupFind,
|
||||
backupList,
|
||||
backupSize,
|
||||
compactBackup,
|
||||
createFileBackupStorage,
|
||||
createStorageBackupStorage,
|
||||
@ -59,7 +60,7 @@ import toolPlugin, { FileModelLogger } from '@hcengineering/server-tool'
|
||||
import { createWorkspace, upgradeWorkspace } from '@hcengineering/workspace-service'
|
||||
import path from 'path'
|
||||
|
||||
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
|
||||
import { buildStorageFromConfig, createStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
|
||||
import { program, type Command } from 'commander'
|
||||
import { type Db, type MongoClient } from 'mongodb'
|
||||
import { clearTelegramHistory } from './telegram'
|
||||
@ -86,6 +87,7 @@ import core, {
|
||||
import { consoleModelLogger, type MigrateOperation } from '@hcengineering/model'
|
||||
import contact from '@hcengineering/model-contact'
|
||||
import { getMongoClient, getWorkspaceDB, shutdown } from '@hcengineering/mongo'
|
||||
import { backupDownload } from '@hcengineering/server-backup/src/backup'
|
||||
import type { StorageAdapter, StorageAdapterEx } from '@hcengineering/server-core'
|
||||
import { deepEqual } from 'fast-equals'
|
||||
import { createWriteStream, readFileSync } from 'fs'
|
||||
@ -924,56 +926,79 @@ export function devTool (
|
||||
.command('backup-compact-s3 <bucketName> <dirName>')
|
||||
.description('Compact a given backup to just one snapshot')
|
||||
.option('-f, --force', 'Force compact.', false)
|
||||
.action(async (bucketName: string, dirName: string, cmd: { force: boolean }) => {
|
||||
const { mongodbUri } = prepareTools()
|
||||
await withStorage(mongodbUri, async (adapter) => {
|
||||
const storage = await createStorageBackupStorage(toolCtx, adapter, getWorkspaceId(bucketName), dirName)
|
||||
.action(async (bucketName: string, dirName: string, cmd: { force: boolean, print: boolean }) => {
|
||||
const backupStorageConfig = storageConfigFromEnv(process.env.STORAGE)
|
||||
const storageAdapter = createStorageFromConfig(backupStorageConfig.storages[0])
|
||||
try {
|
||||
const storage = await createStorageBackupStorage(toolCtx, storageAdapter, getWorkspaceId(bucketName), dirName)
|
||||
await compactBackup(toolCtx, storage, cmd.force)
|
||||
})
|
||||
} catch (err: any) {
|
||||
toolCtx.error('failed to size backup', { err })
|
||||
}
|
||||
await storageAdapter.close()
|
||||
})
|
||||
|
||||
program
|
||||
.command('backup-compact-s3-all <bucketName>')
|
||||
.description('Compact a given backup to just one snapshot')
|
||||
.option('-f, --force', 'Force compact.', false)
|
||||
.action(async (bucketName: string, dirName: string, cmd: { force: boolean }) => {
|
||||
const { mongodbUri } = prepareTools()
|
||||
await withDatabase(mongodbUri, async (db) => {
|
||||
const { mongodbUri } = prepareTools()
|
||||
await withStorage(mongodbUri, async (adapter) => {
|
||||
const storage = await createStorageBackupStorage(toolCtx, adapter, getWorkspaceId(bucketName), dirName)
|
||||
const workspaces = await listWorkspacesPure(db)
|
||||
|
||||
for (const w of workspaces) {
|
||||
console.log(`clearing ${w.workspace} history:`)
|
||||
await compactBackup(toolCtx, storage, cmd.force)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
program
|
||||
.command('backup-s3-restore <bucketName> <dirName> <workspace> [date]')
|
||||
.description('dump workspace transactions and minio resources')
|
||||
.action(async (bucketName: string, dirName: string, workspace: string, date, cmd) => {
|
||||
const { mongodbUri } = prepareTools()
|
||||
await withStorage(mongodbUri, async (adapter) => {
|
||||
const storage = await createStorageBackupStorage(toolCtx, adapter, getWorkspaceId(bucketName), dirName)
|
||||
const backupStorageConfig = storageConfigFromEnv(process.env.STORAGE)
|
||||
const storageAdapter = createStorageFromConfig(backupStorageConfig.storages[0])
|
||||
try {
|
||||
const storage = await createStorageBackupStorage(toolCtx, storageAdapter, getWorkspaceId(bucketName), dirName)
|
||||
const wsid = getWorkspaceId(workspace)
|
||||
const endpoint = await getTransactorEndpoint(generateToken(systemAccountEmail, wsid), 'external')
|
||||
await restore(toolCtx, endpoint, wsid, storage, {
|
||||
date: parseInt(date ?? '-1')
|
||||
})
|
||||
})
|
||||
} catch (err: any) {
|
||||
toolCtx.error('failed to size backup', { err })
|
||||
}
|
||||
await storageAdapter.close()
|
||||
})
|
||||
program
|
||||
.command('backup-s3-list <bucketName> <dirName>')
|
||||
.description('list snaphost ids for backup')
|
||||
.action(async (bucketName: string, dirName: string, cmd) => {
|
||||
const { mongodbUri } = prepareTools()
|
||||
await withStorage(mongodbUri, async (adapter) => {
|
||||
const storage = await createStorageBackupStorage(toolCtx, adapter, getWorkspaceId(bucketName), dirName)
|
||||
const backupStorageConfig = storageConfigFromEnv(process.env.STORAGE)
|
||||
const storageAdapter = createStorageFromConfig(backupStorageConfig.storages[0])
|
||||
try {
|
||||
const storage = await createStorageBackupStorage(toolCtx, storageAdapter, getWorkspaceId(bucketName), dirName)
|
||||
await backupList(storage)
|
||||
})
|
||||
} catch (err: any) {
|
||||
toolCtx.error('failed to size backup', { err })
|
||||
}
|
||||
await storageAdapter.close()
|
||||
})
|
||||
|
||||
program
|
||||
.command('backup-s3-size <bucketName> <dirName>')
|
||||
.description('list snaphost ids for backup')
|
||||
.action(async (bucketName: string, dirName: string, cmd) => {
|
||||
const backupStorageConfig = storageConfigFromEnv(process.env.STORAGE)
|
||||
const storageAdapter = createStorageFromConfig(backupStorageConfig.storages[0])
|
||||
try {
|
||||
const storage = await createStorageBackupStorage(toolCtx, storageAdapter, getWorkspaceId(bucketName), dirName)
|
||||
await backupSize(storage)
|
||||
} catch (err: any) {
|
||||
toolCtx.error('failed to size backup', { err })
|
||||
}
|
||||
await storageAdapter.close()
|
||||
})
|
||||
|
||||
program
|
||||
.command('backup-s3-download <bucketName> <dirName> <storeIn>')
|
||||
.description('Download a full backup from s3 to local dir')
|
||||
.action(async (bucketName: string, dirName: string, storeIn: string, cmd) => {
|
||||
const backupStorageConfig = storageConfigFromEnv(process.env.STORAGE)
|
||||
const storageAdapter = createStorageFromConfig(backupStorageConfig.storages[0])
|
||||
try {
|
||||
const storage = await createStorageBackupStorage(toolCtx, storageAdapter, getWorkspaceId(bucketName), dirName)
|
||||
await backupDownload(storage, storeIn)
|
||||
} catch (err: any) {
|
||||
toolCtx.error('failed to size backup', { err })
|
||||
}
|
||||
await storageAdapter.close()
|
||||
})
|
||||
|
||||
program
|
||||
|
@ -34,6 +34,7 @@ import core, {
|
||||
systemAccountEmail,
|
||||
TxCollectionCUD,
|
||||
WorkspaceId,
|
||||
type BackupStatus,
|
||||
type Blob,
|
||||
type DocIndexState,
|
||||
type Tx
|
||||
@ -42,6 +43,8 @@ import { BlobClient, createClient } from '@hcengineering/server-client'
|
||||
import { fullTextPushStagePrefix, type StorageAdapter } from '@hcengineering/server-core'
|
||||
import { generateToken } from '@hcengineering/server-token'
|
||||
import { connect } from '@hcengineering/server-tool'
|
||||
import { createWriteStream, existsSync, mkdirSync, statSync } from 'node:fs'
|
||||
import { dirname } from 'node:path'
|
||||
import { PassThrough } from 'node:stream'
|
||||
import { createGzip } from 'node:zlib'
|
||||
import { join } from 'path'
|
||||
@ -49,7 +52,6 @@ import { Writable } from 'stream'
|
||||
import { extract, Pack, pack } from 'tar-stream'
|
||||
import { createGunzip, gunzipSync, gzipSync } from 'zlib'
|
||||
import { BackupStorage } from './storage'
|
||||
import type { BackupStatus } from '@hcengineering/core/src/classes'
|
||||
export * from './storage'
|
||||
|
||||
const dataBlobSize = 50 * 1024 * 1024
|
||||
@ -1113,6 +1115,100 @@ export async function backupList (storage: BackupStorage): Promise<void> {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export async function backupSize (storage: BackupStorage): Promise<void> {
|
||||
const infoFile = 'backup.json.gz'
|
||||
|
||||
if (!(await storage.exists(infoFile))) {
|
||||
throw new Error(`${infoFile} should present to restore`)
|
||||
}
|
||||
let size = 0
|
||||
|
||||
const backupInfo: BackupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString())
|
||||
console.log('workspace:', backupInfo.workspace ?? '', backupInfo.version)
|
||||
const addFileSize = async (file: string | undefined | null): Promise<void> => {
|
||||
if (file != null && (await storage.exists(file))) {
|
||||
const fileSize = await storage.stat(file)
|
||||
console.log(file, fileSize)
|
||||
size += fileSize
|
||||
}
|
||||
}
|
||||
|
||||
// Let's calculate data size for backup
|
||||
for (const sn of backupInfo.snapshots) {
|
||||
for (const [, d] of Object.entries(sn.domains)) {
|
||||
await addFileSize(d.snapshot)
|
||||
for (const snp of d.snapshots ?? []) {
|
||||
await addFileSize(snp)
|
||||
}
|
||||
for (const snp of d.storage ?? []) {
|
||||
await addFileSize(snp)
|
||||
}
|
||||
}
|
||||
}
|
||||
await addFileSize(infoFile)
|
||||
|
||||
console.log('Backup size', size / (1024 * 1024), 'Mb')
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export async function backupDownload (storage: BackupStorage, storeIn: string): Promise<void> {
|
||||
const infoFile = 'backup.json.gz'
|
||||
|
||||
if (!(await storage.exists(infoFile))) {
|
||||
throw new Error(`${infoFile} should present to restore`)
|
||||
}
|
||||
let size = 0
|
||||
|
||||
const backupInfo: BackupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString())
|
||||
console.log('workspace:', backupInfo.workspace ?? '', backupInfo.version)
|
||||
const addFileSize = async (file: string | undefined | null): Promise<void> => {
|
||||
if (file != null && (await storage.exists(file))) {
|
||||
const fileSize = await storage.stat(file)
|
||||
const target = join(storeIn, file)
|
||||
const dir = dirname(target)
|
||||
if (!existsSync(dir)) {
|
||||
mkdirSync(dir, { recursive: true })
|
||||
}
|
||||
if (!existsSync(target) || fileSize !== statSync(target).size) {
|
||||
console.log('downloading', file, fileSize)
|
||||
const readStream = await storage.load(file)
|
||||
const outp = createWriteStream(target)
|
||||
|
||||
readStream.pipe(outp)
|
||||
await new Promise<void>((resolve) => {
|
||||
readStream.on('end', () => {
|
||||
readStream.destroy()
|
||||
outp.close()
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
}
|
||||
size += fileSize
|
||||
}
|
||||
}
|
||||
|
||||
// Let's calculate data size for backup
|
||||
for (const sn of backupInfo.snapshots) {
|
||||
for (const [, d] of Object.entries(sn.domains)) {
|
||||
await addFileSize(d.snapshot)
|
||||
for (const snp of d.snapshots ?? []) {
|
||||
await addFileSize(snp)
|
||||
}
|
||||
for (const snp of d.storage ?? []) {
|
||||
await addFileSize(snp)
|
||||
}
|
||||
}
|
||||
}
|
||||
await addFileSize(infoFile)
|
||||
|
||||
console.log('Backup size', size / (1024 * 1024), 'Mb')
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
|
@ -124,21 +124,23 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
|
||||
let iterator: BlobStorageIterator | undefined
|
||||
return {
|
||||
next: async () => {
|
||||
if (iterator === undefined && adapters.length > 0) {
|
||||
iterator = await (adapters.shift() as StorageAdapter).listStream(ctx, workspaceId)
|
||||
}
|
||||
if (iterator === undefined) {
|
||||
return []
|
||||
}
|
||||
const docInfos = await iterator.next()
|
||||
if (docInfos.length > 0) {
|
||||
// We need to check if our stored version is fine
|
||||
return docInfos
|
||||
} else {
|
||||
// We need to take next adapter
|
||||
await iterator.close()
|
||||
iterator = undefined
|
||||
return []
|
||||
while (true) {
|
||||
if (iterator === undefined && adapters.length > 0) {
|
||||
iterator = await (adapters.shift() as StorageAdapter).listStream(ctx, workspaceId)
|
||||
}
|
||||
if (iterator === undefined) {
|
||||
return []
|
||||
}
|
||||
const docInfos = await iterator.next()
|
||||
if (docInfos.length > 0) {
|
||||
// We need to check if our stored version is fine
|
||||
return docInfos
|
||||
} else {
|
||||
// We need to take next adapter
|
||||
await iterator.close()
|
||||
iterator = undefined
|
||||
continue
|
||||
}
|
||||
}
|
||||
},
|
||||
close: async () => {
|
||||
|
Loading…
Reference in New Issue
Block a user