UBERF-6150: Improve backup logic (#5041)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-03-23 00:47:29 +07:00 committed by GitHub
parent 3d2b60ac99
commit 18e8dcab57
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 102 additions and 42 deletions

View File

@ -400,13 +400,15 @@ export function devTool (
.command('backup <dirName> <workspace>')
.description('dump workspace transactions and minio resources')
.option('-s, --skip <skip>', 'A list of ; separated domain names to skip during backup', '')
.action(async (dirName: string, workspace: string, cmd: { skip: string }) => {
.option('-f, --force', 'Force backup', false)
.action(async (dirName: string, workspace: string, cmd: { skip: string, force: boolean }) => {
const storage = await createFileBackupStorage(dirName)
await backup(
transactorUrl,
getWorkspaceId(workspace, productId),
storage,
(cmd.skip ?? '').split(';').map((it) => it.trim())
(cmd.skip ?? '').split(';').map((it) => it.trim()),
cmd.force
)
})

View File

@ -614,9 +614,9 @@ export async function listWorkspaces (db: Db, productId: string): Promise<Worksp
* @public
*/
export async function listWorkspacesRaw (db: Db, productId: string): Promise<Workspace[]> {
return (await db.collection<Workspace>(WORKSPACE_COLLECTION).find(withProductId(productId, {})).toArray()).map(
(it) => ({ ...it, productId })
)
return (await db.collection<Workspace>(WORKSPACE_COLLECTION).find(withProductId(productId, {})).toArray())
.map((it) => ({ ...it, productId }))
.filter((it) => it.disabled !== true)
}
/**
@ -967,7 +967,7 @@ export async function getWorkspaceInfo (
db: Db,
productId: string,
token: string,
updateLatsVisit: boolean = false
_updateLastVisit: boolean = false
): Promise<ClientWorkspaceInfo> {
const { email, workspace, extra } = decodeToken(token)
const guest = extra?.guest === 'true'
@ -1002,7 +1002,7 @@ export async function getWorkspaceInfo (
if (ws == null) {
throw new PlatformError(new Status(Severity.ERROR, platform.status.Forbidden, {}))
}
if (updateLatsVisit && isAccount(account)) {
if (_updateLastVisit && isAccount(account)) {
await updateLastVisit(db, ws, account)
}
return mapToClientWorkspace(ws)

View File

@ -327,7 +327,8 @@ export async function backup (
transactorUrl: string,
workspaceId: WorkspaceId,
storage: BackupStorage,
skipDomains: string[] = []
skipDomains: string[] = [],
force: boolean = false
): Promise<void> {
const connection = (await connect(transactorUrl, workspaceId, undefined, {
mode: 'backup'
@ -368,7 +369,7 @@ export async function backup (
{ limit: 1, sort: { modifiedOn: SortingOrder.Descending } }
)
if (lastTx !== undefined) {
if (lastTx._id === backupInfo.lastTxId) {
if (lastTx._id === backupInfo.lastTxId && !force) {
console.log('No transaction changes. Skipping backup.')
return
} else {
@ -787,12 +788,14 @@ export async function restore (
const d = blobs.get(bname)
blobs.delete(bname)
;(doc as BlobData).base64Data = d?.buffer?.toString('base64') ?? ''
;(doc as any)['%hash%'] = changeset.get(doc._id)
void sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
next()
})
}
} else {
;(doc as any)['%hash%'] = changeset.get(doc._id)
void sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
next()

View File

@ -295,6 +295,9 @@ export class FullTextIndexPipeline implements FullTextPipeline {
async initializeStages (): Promise<void> {
for (const st of this.stages) {
if (this.cancelling) {
return
}
await st.initialize(this.metrics, this.storage, this)
}
}
@ -312,6 +315,9 @@ export class FullTextIndexPipeline implements FullTextPipeline {
// We need to be sure we have individual indexes per stage.
const oldStagesRegex = [/fld-v.*/, /cnt-v.*/, /fts-v.*/, /sum-v.*/]
for (const st of this.stages) {
if (this.cancelling) {
return
}
const regexp = oldStagesRegex.find((r) => r.test(st.stageId))
if (regexp !== undefined) {
await this.storage.removeOldIndex(DOMAIN_DOC_INDEX_STATE, regexp, new RegExp(st.stageId))
@ -404,6 +410,9 @@ export class FullTextIndexPipeline implements FullTextPipeline {
let idx = 0
const _classUpdate = new Set<Ref<Class<Doc>>>()
for (const st of this.stages) {
if (this.cancelling) {
return []
}
idx++
await rateLimiter.exec(async () => {
while (true) {

View File

@ -20,6 +20,7 @@ import core, {
TxProcessor,
cutObjectArray,
escapeLikeForRegexp,
generateId,
getTypeOf,
isOperator,
toFindResult,
@ -70,7 +71,6 @@ import {
type Sort,
type UpdateFilter
} from 'mongodb'
import { createHash } from 'node:crypto'
import { getMongoClient, getWorkspaceDB } from './utils'
function translateDoc (doc: Doc): Document {
@ -699,6 +699,8 @@ abstract class MongoAdapterBase implements DbAdapter {
const coll = this.db.collection<Doc>(domain)
const iterator = coll.find({}, {})
const hashID = generateId() // We just need a different value
const bulkUpdate = new Map<Ref<Doc>, string>()
const flush = async (flush = false): Promise<void> => {
if (bulkUpdate.size > 1000 || flush) {
@ -728,11 +730,8 @@ abstract class MongoAdapterBase implements DbAdapter {
}
const pos = (digest ?? '').indexOf('|')
if (digest == null || digest === '' || pos === -1) {
const doc = JSON.stringify(d)
const hash = createHash('sha256')
hash.update(doc)
const size = doc.length
digest = hash.digest('base64')
const size = this.calcSize(d)
digest = hashID // we just need some random value
bulkUpdate.set(d._id, `${digest}|${size.toString(16)}`)
@ -757,6 +756,9 @@ abstract class MongoAdapterBase implements DbAdapter {
}
}
/**
* Return some estimation for object size
*/
calcSize (obj: any): number {
if (typeof obj === 'undefined') {
return 0
@ -769,17 +771,41 @@ abstract class MongoAdapterBase implements DbAdapter {
// include prototype properties
const value = obj[key]
const type = getTypeOf(value)
if (type === 'Array') {
result += this.calcSize(value)
} else if (type === 'Object') {
result += this.calcSize(value)
} else if (type === 'Date') {
result += new Date(value.getTime()).toString().length
}
if (type === 'string') {
result += (value as string).length
} else {
result += JSON.stringify(value).length
result += key.length
switch (type) {
case 'Array':
result += 4 + this.calcSize(value)
break
case 'Object':
result += this.calcSize(value)
break
case 'Date':
result += 24 // Some value
break
case 'string':
result += (value as string).length
break
case 'number':
result += 8
break
case 'boolean':
result += 1
break
case 'symbol':
result += (value as symbol).toString().length
break
case 'bigint':
result += (value as bigint).toString().length
break
case 'undefined':
result += 1
break
case 'null':
result += 1
break
default:
result += value.toString().length
}
}
return result
@ -802,13 +828,21 @@ abstract class MongoAdapterBase implements DbAdapter {
while (ops.length > 0) {
const part = ops.splice(0, 500)
await coll.bulkWrite(
part.map((it) => ({
replaceOne: {
filter: { _id: it._id },
replacement: { ...it, '%hash%': null },
upsert: true
part.map((it) => {
const digest: string | null = (it as any)['%hash%']
if ('%hash%' in it) {
delete it['%hash%']
}
}))
const size = this.calcSize(it)
return {
replaceOne: {
filter: { _id: it._id },
replacement: { ...it, '%hash%': digest == null ? null : `${digest}|${size.toString(16)}` },
upsert: true
}
}
})
)
}
}

View File

@ -221,7 +221,11 @@ export async function upgradeModel (
logger.log(`${workspaceId.name}: Apply upgrade operations`)
const connection = await connect(transactorUrl, workspaceId, undefined, { mode: 'backup', model: 'upgrade' })
const connection = await connect(transactorUrl, workspaceId, undefined, {
mode: 'backup',
model: 'upgrade',
admin: 'true'
})
// Create update indexes
await createUpdateIndexes(connection, db, logger)

View File

@ -212,17 +212,13 @@ class TSessionManager implements SessionManager {
return await baseCtx.with('📲 add-session', {}, async (ctx) => {
const wsString = toWorkspaceString(token.workspace, '@')
const workspaceInfo =
accountsUrl !== ''
? await this.getWorkspaceInfo(accountsUrl, rawToken)
: {
workspace: token.workspace.name,
workspaceUrl: token.workspace.name,
workspaceName: token.workspace.name
}
if (workspaceInfo === undefined) {
let workspaceInfo =
accountsUrl !== '' ? await this.getWorkspaceInfo(accountsUrl, rawToken) : this.wsFromToken(token)
if (workspaceInfo === undefined && token.extra?.admin !== 'true') {
// No access to workspace for token.
return { error: new Error(`No access to workspace for token ${token.email} ${token.workspace.name}`) }
} else {
workspaceInfo = this.wsFromToken(token)
}
let workspace = this.workspaces.get(wsString)
@ -296,6 +292,18 @@ class TSessionManager implements SessionManager {
})
}
private wsFromToken (token: Token): {
workspace: string
workspaceUrl?: string | null
workspaceName?: string
} {
return {
workspace: token.workspace.name,
workspaceUrl: token.workspace.name,
workspaceName: token.workspace.name
}
}
private async createUpgradeSession (
token: Token,
sessionId: string | undefined,