From 18e8dcab5782ce159c2754f3bcd794c8143ed1d5 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Sat, 23 Mar 2024 00:47:29 +0700 Subject: [PATCH] UBERF-6150: Improve backup logic (#5041) Signed-off-by: Andrey Sobolev --- dev/tool/src/index.ts | 6 ++- server/account/src/index.ts | 10 ++-- server/backup/src/index.ts | 7 ++- server/core/src/indexer/indexer.ts | 9 ++++ server/mongo/src/storage.ts | 80 +++++++++++++++++++++--------- server/tool/src/index.ts | 6 ++- server/ws/src/server.ts | 26 ++++++---- 7 files changed, 102 insertions(+), 42 deletions(-) diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 884474d0d8..86c3b46bf3 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -400,13 +400,15 @@ export function devTool ( .command('backup ') .description('dump workspace transactions and minio resources') .option('-s, --skip ', 'A list of ; separated domain names to skip during backup', '') - .action(async (dirName: string, workspace: string, cmd: { skip: string }) => { + .option('-f, --force', 'Force backup', false) + .action(async (dirName: string, workspace: string, cmd: { skip: string, force: boolean }) => { const storage = await createFileBackupStorage(dirName) await backup( transactorUrl, getWorkspaceId(workspace, productId), storage, - (cmd.skip ?? '').split(';').map((it) => it.trim()) + (cmd.skip ?? '').split(';').map((it) => it.trim()), + cmd.force ) }) diff --git a/server/account/src/index.ts b/server/account/src/index.ts index 2d30c68a27..ab4b387a45 100644 --- a/server/account/src/index.ts +++ b/server/account/src/index.ts @@ -614,9 +614,9 @@ export async function listWorkspaces (db: Db, productId: string): Promise { - return (await db.collection(WORKSPACE_COLLECTION).find(withProductId(productId, {})).toArray()).map( - (it) => ({ ...it, productId }) - ) + return (await db.collection(WORKSPACE_COLLECTION).find(withProductId(productId, {})).toArray()) + .map((it) => ({ ...it, productId })) + .filter((it) => it.disabled !== true) } /** @@ -967,7 +967,7 @@ export async function getWorkspaceInfo ( db: Db, productId: string, token: string, - updateLatsVisit: boolean = false + _updateLastVisit: boolean = false ): Promise { const { email, workspace, extra } = decodeToken(token) const guest = extra?.guest === 'true' @@ -1002,7 +1002,7 @@ export async function getWorkspaceInfo ( if (ws == null) { throw new PlatformError(new Status(Severity.ERROR, platform.status.Forbidden, {})) } - if (updateLatsVisit && isAccount(account)) { + if (_updateLastVisit && isAccount(account)) { await updateLastVisit(db, ws, account) } return mapToClientWorkspace(ws) diff --git a/server/backup/src/index.ts b/server/backup/src/index.ts index 1d846a3ab1..d375802c5d 100644 --- a/server/backup/src/index.ts +++ b/server/backup/src/index.ts @@ -327,7 +327,8 @@ export async function backup ( transactorUrl: string, workspaceId: WorkspaceId, storage: BackupStorage, - skipDomains: string[] = [] + skipDomains: string[] = [], + force: boolean = false ): Promise { const connection = (await connect(transactorUrl, workspaceId, undefined, { mode: 'backup' @@ -368,7 +369,7 @@ export async function backup ( { limit: 1, sort: { modifiedOn: SortingOrder.Descending } } ) if (lastTx !== undefined) { - if (lastTx._id === backupInfo.lastTxId) { + if (lastTx._id === backupInfo.lastTxId && !force) { console.log('No transaction changes. Skipping backup.') return } else { @@ -787,12 +788,14 @@ export async function restore ( const d = blobs.get(bname) blobs.delete(bname) ;(doc as BlobData).base64Data = d?.buffer?.toString('base64') ?? '' + ;(doc as any)['%hash%'] = changeset.get(doc._id) void sendChunk(doc, bf.length).finally(() => { requiredDocs.delete(doc._id) next() }) } } else { + ;(doc as any)['%hash%'] = changeset.get(doc._id) void sendChunk(doc, bf.length).finally(() => { requiredDocs.delete(doc._id) next() diff --git a/server/core/src/indexer/indexer.ts b/server/core/src/indexer/indexer.ts index 0902789141..5f6bac3284 100644 --- a/server/core/src/indexer/indexer.ts +++ b/server/core/src/indexer/indexer.ts @@ -295,6 +295,9 @@ export class FullTextIndexPipeline implements FullTextPipeline { async initializeStages (): Promise { for (const st of this.stages) { + if (this.cancelling) { + return + } await st.initialize(this.metrics, this.storage, this) } } @@ -312,6 +315,9 @@ export class FullTextIndexPipeline implements FullTextPipeline { // We need to be sure we have individual indexes per stage. const oldStagesRegex = [/fld-v.*/, /cnt-v.*/, /fts-v.*/, /sum-v.*/] for (const st of this.stages) { + if (this.cancelling) { + return + } const regexp = oldStagesRegex.find((r) => r.test(st.stageId)) if (regexp !== undefined) { await this.storage.removeOldIndex(DOMAIN_DOC_INDEX_STATE, regexp, new RegExp(st.stageId)) @@ -404,6 +410,9 @@ export class FullTextIndexPipeline implements FullTextPipeline { let idx = 0 const _classUpdate = new Set>>() for (const st of this.stages) { + if (this.cancelling) { + return [] + } idx++ await rateLimiter.exec(async () => { while (true) { diff --git a/server/mongo/src/storage.ts b/server/mongo/src/storage.ts index e71e163c65..4a8370871d 100644 --- a/server/mongo/src/storage.ts +++ b/server/mongo/src/storage.ts @@ -20,6 +20,7 @@ import core, { TxProcessor, cutObjectArray, escapeLikeForRegexp, + generateId, getTypeOf, isOperator, toFindResult, @@ -70,7 +71,6 @@ import { type Sort, type UpdateFilter } from 'mongodb' -import { createHash } from 'node:crypto' import { getMongoClient, getWorkspaceDB } from './utils' function translateDoc (doc: Doc): Document { @@ -699,6 +699,8 @@ abstract class MongoAdapterBase implements DbAdapter { const coll = this.db.collection(domain) const iterator = coll.find({}, {}) + const hashID = generateId() // We just need a different value + const bulkUpdate = new Map, string>() const flush = async (flush = false): Promise => { if (bulkUpdate.size > 1000 || flush) { @@ -728,11 +730,8 @@ abstract class MongoAdapterBase implements DbAdapter { } const pos = (digest ?? '').indexOf('|') if (digest == null || digest === '' || pos === -1) { - const doc = JSON.stringify(d) - const hash = createHash('sha256') - hash.update(doc) - const size = doc.length - digest = hash.digest('base64') + const size = this.calcSize(d) + digest = hashID // we just need some random value bulkUpdate.set(d._id, `${digest}|${size.toString(16)}`) @@ -757,6 +756,9 @@ abstract class MongoAdapterBase implements DbAdapter { } } + /** + * Return some estimation for object size + */ calcSize (obj: any): number { if (typeof obj === 'undefined') { return 0 @@ -769,17 +771,41 @@ abstract class MongoAdapterBase implements DbAdapter { // include prototype properties const value = obj[key] const type = getTypeOf(value) - if (type === 'Array') { - result += this.calcSize(value) - } else if (type === 'Object') { - result += this.calcSize(value) - } else if (type === 'Date') { - result += new Date(value.getTime()).toString().length - } - if (type === 'string') { - result += (value as string).length - } else { - result += JSON.stringify(value).length + result += key.length + + switch (type) { + case 'Array': + result += 4 + this.calcSize(value) + break + case 'Object': + result += this.calcSize(value) + break + case 'Date': + result += 24 // Some value + break + case 'string': + result += (value as string).length + break + case 'number': + result += 8 + break + case 'boolean': + result += 1 + break + case 'symbol': + result += (value as symbol).toString().length + break + case 'bigint': + result += (value as bigint).toString().length + break + case 'undefined': + result += 1 + break + case 'null': + result += 1 + break + default: + result += value.toString().length } } return result @@ -802,13 +828,21 @@ abstract class MongoAdapterBase implements DbAdapter { while (ops.length > 0) { const part = ops.splice(0, 500) await coll.bulkWrite( - part.map((it) => ({ - replaceOne: { - filter: { _id: it._id }, - replacement: { ...it, '%hash%': null }, - upsert: true + part.map((it) => { + const digest: string | null = (it as any)['%hash%'] + if ('%hash%' in it) { + delete it['%hash%'] } - })) + const size = this.calcSize(it) + + return { + replaceOne: { + filter: { _id: it._id }, + replacement: { ...it, '%hash%': digest == null ? null : `${digest}|${size.toString(16)}` }, + upsert: true + } + } + }) ) } } diff --git a/server/tool/src/index.ts b/server/tool/src/index.ts index 9b8f84d0b8..ac12902124 100644 --- a/server/tool/src/index.ts +++ b/server/tool/src/index.ts @@ -221,7 +221,11 @@ export async function upgradeModel ( logger.log(`${workspaceId.name}: Apply upgrade operations`) - const connection = await connect(transactorUrl, workspaceId, undefined, { mode: 'backup', model: 'upgrade' }) + const connection = await connect(transactorUrl, workspaceId, undefined, { + mode: 'backup', + model: 'upgrade', + admin: 'true' + }) // Create update indexes await createUpdateIndexes(connection, db, logger) diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index 9f3b4df841..434811e620 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -212,17 +212,13 @@ class TSessionManager implements SessionManager { return await baseCtx.with('📲 add-session', {}, async (ctx) => { const wsString = toWorkspaceString(token.workspace, '@') - const workspaceInfo = - accountsUrl !== '' - ? await this.getWorkspaceInfo(accountsUrl, rawToken) - : { - workspace: token.workspace.name, - workspaceUrl: token.workspace.name, - workspaceName: token.workspace.name - } - if (workspaceInfo === undefined) { + let workspaceInfo = + accountsUrl !== '' ? await this.getWorkspaceInfo(accountsUrl, rawToken) : this.wsFromToken(token) + if (workspaceInfo === undefined && token.extra?.admin !== 'true') { // No access to workspace for token. return { error: new Error(`No access to workspace for token ${token.email} ${token.workspace.name}`) } + } else { + workspaceInfo = this.wsFromToken(token) } let workspace = this.workspaces.get(wsString) @@ -296,6 +292,18 @@ class TSessionManager implements SessionManager { }) } + private wsFromToken (token: Token): { + workspace: string + workspaceUrl?: string | null + workspaceName?: string + } { + return { + workspace: token.workspace.name, + workspaceUrl: token.workspace.name, + workspaceName: token.workspace.name + } + } + private async createUpgradeSession ( token: Token, sessionId: string | undefined,