From 26e93fefdb1bbdecb24bba6d85092c92fa9b9cf3 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev <haiodo@users.noreply.github.com> Date: Wed, 4 Dec 2024 14:10:36 +0700 Subject: [PATCH] UBERF-8615: Backup/restore fixes (#7258) Signed-off-by: Andrey Sobolev <haiodo@gmail.com> --- pods/server/src/__start.ts | 6 ++- server/backup/src/backup.ts | 18 +++++++- server/postgres/src/schemas.ts | 24 ++++++++++ server/postgres/src/storage.ts | 71 ++++++++++++++++------------- server/postgres/src/utils.ts | 38 ++++++++++++--- server/server/src/sessionManager.ts | 3 +- 6 files changed, 118 insertions(+), 42 deletions(-) diff --git a/pods/server/src/__start.ts b/pods/server/src/__start.ts index c52052600f..f89bac6965 100644 --- a/pods/server/src/__start.ts +++ b/pods/server/src/__start.ts @@ -122,7 +122,11 @@ const close = (): void => { } process.on('unhandledRejection', (reason, promise) => { - console.log('Unhandled Rejection at:', promise, 'reason:', reason) + metricsContext.error('Unhandled Rejection at:', { origin, promise }) +}) + +global.process.on('uncaughtException', (error, origin) => { + metricsContext.error('Uncaught Exception at:', { origin, error }) }) process.on('SIGINT', close) diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts index ed1e7d323a..302962e046 100644 --- a/server/backup/src/backup.ts +++ b/server/backup/src/backup.ts @@ -32,11 +32,13 @@ import core, { Ref, SortingOrder, systemAccountEmail, + TxProcessor, WorkspaceId, type BackupStatus, type Blob, type DocIndexState, - type Tx + type Tx, + type TxCUD } from '@hcengineering/core' import { BlobClient, createClient } from '@hcengineering/server-client' import { type StorageAdapter } from '@hcengineering/server-core' @@ -1768,8 +1770,20 @@ export async function restore ( d.space = core.space.Workspace ;(d as any)['%hash%'] = '' } + + if (TxProcessor.isExtendsCUD(d._class)) { + const tx = d as TxCUD<Doc> + if (tx.objectSpace == null) { + tx.objectSpace = core.space.Workspace + ;(tx as any)['%hash%'] = '' + } + } + } + try { + await connection.upload(c, docs) + } catch (err: any) { + ctx.error('error during upload', { err, docs: JSON.stringify(docs) }) } - await connection.upload(c, docs) docs.length = 0 sendSize = 0 } diff --git a/server/postgres/src/schemas.ts b/server/postgres/src/schemas.ts index 253de0bfca..ffdd50d90b 100644 --- a/server/postgres/src/schemas.ts +++ b/server/postgres/src/schemas.ts @@ -241,3 +241,27 @@ export function getDocFieldsByDomains (domain: string): string[] { const schema = domainSchemas[translateDomain(domain)] ?? defaultSchema return Object.keys(schema) } + +export interface SchemaAndFields { + schema: Schema + + fields: string[] + domainFields: Set<string> +} + +function createSchemaFields (schema: Schema): SchemaAndFields { + const fields = Object.keys(schema) + const domainFields = new Set(Object.keys(schema)) + return { schema, fields, domainFields } +} + +const defaultSchemaFields: SchemaAndFields = createSchemaFields(defaultSchema) + +const domainSchemaFields = new Map<string, SchemaAndFields>() +for (const [domain, _schema] of Object.entries(domainSchemas)) { + domainSchemaFields.set(domain, createSchemaFields(_schema)) +} + +export function getSchemaAndFields (domain: string): SchemaAndFields { + return domainSchemaFields.get(translateDomain(domain)) ?? defaultSchemaFields +} diff --git a/server/postgres/src/storage.ts b/server/postgres/src/storage.ts index ccce30a4f6..5d56814511 100644 --- a/server/postgres/src/storage.ts +++ b/server/postgres/src/storage.ts @@ -65,7 +65,14 @@ import { type TxAdapter } from '@hcengineering/server-core' import type postgres from 'postgres' -import { getDocFieldsByDomains, getSchema, type Schema, translateDomain } from './schemas' +import { + getDocFieldsByDomains, + getSchema, + getSchemaAndFields, + type Schema, + type SchemaAndFields, + translateDomain +} from './schemas' import { type ValueType } from './types' import { convertDoc, @@ -442,22 +449,21 @@ abstract class PostgresAdapterBase implements DbAdapter { if ((operations as any)['%hash%'] === undefined) { ;(operations as any)['%hash%'] = null } - const domainFields = new Set(getDocFieldsByDomains(domain)) + const schemaFields = getSchemaAndFields(domain) if (isOps) { await this.mgr.write(undefined, async (client) => { const res = await client.unsafe(`SELECT * FROM ${translateDomain(domain)} WHERE ${translatedQuery} FOR UPDATE`) - const schema = getSchema(domain) - const docs = res.map((p) => parseDoc(p as any, schema)) + const docs = res.map((p) => parseDoc(p as any, schemaFields.schema)) for (const doc of docs) { if (doc === undefined) continue const prevAttachedTo = (doc as any).attachedTo TxProcessor.applyUpdate(doc, operations) ;(doc as any)['%hash%'] = null - const converted = convertDoc(domain, doc, this.workspaceId.name, domainFields) + const converted = convertDoc(domain, doc, this.workspaceId.name, schemaFields) const params: any[] = [doc._id, this.workspaceId.name] let paramsIndex = params.length + 1 const updates: string[] = [] - const { extractedFields, remainingData } = parseUpdate(operations, domainFields) + const { extractedFields, remainingData } = parseUpdate(operations, schemaFields) const newAttachedTo = (doc as any).attachedTo if (Object.keys(extractedFields).length > 0) { for (const key in extractedFields) { @@ -482,7 +488,7 @@ abstract class PostgresAdapterBase implements DbAdapter { } }) } else { - await this.rawUpdateDoc(domain, query, operations, domainFields) + await this.rawUpdateDoc(domain, query, operations, schemaFields) } } @@ -490,13 +496,13 @@ abstract class PostgresAdapterBase implements DbAdapter { domain: Domain, query: DocumentQuery<T>, operations: DocumentUpdate<T>, - domainFields: Set<string> + schemaFields: SchemaAndFields ): Promise<void> { const translatedQuery = this.buildRawQuery(domain, query) const updates: string[] = [] const params: any[] = [] let paramsIndex = params.length + 1 - const { extractedFields, remainingData } = parseUpdate(operations, domainFields) + const { extractedFields, remainingData } = parseUpdate(operations, schemaFields) const { space, attachedTo, ...ops } = operations as any for (const key in extractedFields) { updates.push(`"${key}" = $${paramsIndex++}`) @@ -1397,8 +1403,8 @@ abstract class PostgresAdapterBase implements DbAdapter { upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise<void> { return ctx.with('upload', { domain }, async (ctx) => { - const fields = getDocFieldsByDomains(domain) - const filedsWithData = [...fields, 'data'] + const schemaFields = getSchemaAndFields(domain) + const filedsWithData = [...schemaFields.fields, 'data'] const insertFields: string[] = [] const onConflict: string[] = [] for (const field of filedsWithData) { @@ -1409,7 +1415,6 @@ abstract class PostgresAdapterBase implements DbAdapter { const onConflictStr = onConflict.join(', ') try { - const domainFields = new Set(getDocFieldsByDomains(domain)) const toUpload = [...docs] const tdomain = translateDomain(domain) while (toUpload.length > 0) { @@ -1427,11 +1432,11 @@ abstract class PostgresAdapterBase implements DbAdapter { } const size = digest != null ? estimateDocSize(doc) : 0 ;(doc as any)['%hash%'] = digest == null ? null : `${digest}|${size.toString(16)}` - const d = convertDoc(domain, doc, this.workspaceId.name, domainFields) + const d = convertDoc(domain, doc, this.workspaceId.name, schemaFields) values.push(d.workspaceId) variables.push(`$${index++}`) - for (const field of fields) { + for (const field of schemaFields.fields) { values.push(d[field]) variables.push(`$${index++}`) } @@ -1504,7 +1509,7 @@ abstract class PostgresAdapterBase implements DbAdapter { const schema = getSchema(domain) const docs = res.map((p) => parseDoc(p, schema)) const map = new Map(docs.map((d) => [d._id, d])) - const domainFields = new Set(getDocFieldsByDomains(domain)) + const schemaFields = getSchemaAndFields(domain) for (const [_id, ops] of operations) { const doc = map.get(_id) if (doc === undefined) continue @@ -1513,10 +1518,10 @@ abstract class PostgresAdapterBase implements DbAdapter { ;(op as any)['%hash%'] = null } TxProcessor.applyUpdate(doc, op) - const converted = convertDoc(domain, doc, this.workspaceId.name, domainFields) + const converted = convertDoc(domain, doc, this.workspaceId.name, schemaFields) const columns: string[] = [] - const { extractedFields, remainingData } = parseUpdate(op, domainFields) + const { extractedFields, remainingData } = parseUpdate(op, schemaFields) for (const key in extractedFields) { columns.push(key) } @@ -1539,20 +1544,18 @@ abstract class PostgresAdapterBase implements DbAdapter { @withContext('insert') async insert (ctx: MeasureContext, domain: string, docs: Doc[]): Promise<TxResult> { - const schema = getSchema(domain) - const fields = Object.keys(schema) - const filedsWithData = [...fields, 'data'] + const schemaFields = getSchemaAndFields(domain) + const filedsWithData = [...schemaFields.fields, 'data'] const columns: string[] = ['workspaceId'] for (const field of filedsWithData) { columns.push(field) } - const domainFields = new Set(fields) while (docs.length > 0) { const part = docs.splice(0, 500) const values: DBDoc[] = [] for (let i = 0; i < part.length; i++) { const doc = part[i] - const d = convertDoc(domain, doc, this.workspaceId.name, domainFields) + const d = convertDoc(domain, doc, this.workspaceId.name, schemaFields) values.push(d) } await this.mgr.write(ctx.id, async (client) => { @@ -1608,7 +1611,7 @@ class PostgresAdapter extends PostgresAdapterBase { } } - private async txMixin (ctx: MeasureContext, tx: TxMixin<Doc, Doc>, domainFields: Set<string>): Promise<TxResult> { + private async txMixin (ctx: MeasureContext, tx: TxMixin<Doc, Doc>, schemaFields: SchemaAndFields): Promise<TxResult> { await ctx.with('tx-mixin', { _class: tx.objectClass, mixin: tx.mixin }, async (ctx) => { await this.mgr.write(ctx.id, async (client) => { const doc = await this.findDoc(ctx, client, tx.objectClass, tx.objectId, true) @@ -1616,8 +1619,8 @@ class PostgresAdapter extends PostgresAdapterBase { TxProcessor.updateMixin4Doc(doc, tx) ;(doc as any)['%hash%'] = null const domain = this.hierarchy.getDomain(tx.objectClass) - const converted = convertDoc(domain, doc, this.workspaceId.name, domainFields) - const { extractedFields } = parseUpdate(tx.attributes as Partial<Doc>, domainFields) + const converted = convertDoc(domain, doc, this.workspaceId.name, schemaFields) + const { extractedFields } = parseUpdate(tx.attributes as Partial<Doc>, schemaFields) const columns = new Set<string>() for (const key in extractedFields) { columns.add(key) @@ -1625,6 +1628,7 @@ class PostgresAdapter extends PostgresAdapterBase { columns.add('modifiedBy') columns.add('modifiedOn') columns.add('data') + columns.add('%hash%') await client`UPDATE ${client(translateDomain(domain))} SET ${client(converted, Array.from(columns))} WHERE _id = ${tx.objectId} AND "workspaceId" = ${this.workspaceId.name}` }) }) @@ -1655,7 +1659,7 @@ class PostgresAdapter extends PostgresAdapterBase { this.process(ops, tx) } - const domainFields = new Set(getDocFieldsByDomains(domain)) + const domainFields = getSchemaAndFields(domain) if (ops.add.length > 0) { const res = await this.insert(ctx, domain, ops.add) if (Object.keys(res).length > 0) { @@ -1694,7 +1698,7 @@ class PostgresAdapter extends PostgresAdapterBase { ctx: MeasureContext, domain: Domain, txes: TxUpdateDoc<Doc>[], - domainFields: Set<string> + schemaFields: SchemaAndFields ): Promise<TxResult[]> { const byOperator = groupByArray(txes, (it) => isOperator(it.operations)) @@ -1715,15 +1719,18 @@ class PostgresAdapter extends PostgresAdapterBase { ops.modifiedOn = tx.modifiedOn TxProcessor.applyUpdate(doc, ops) ;(doc as any)['%hash%'] = null - const converted = convertDoc(domain, doc, this.workspaceId.name, domainFields) + const converted = convertDoc(domain, doc, this.workspaceId.name, schemaFields) const columns: string[] = [] - const { extractedFields, remainingData } = parseUpdate(ops, domainFields) + const { extractedFields, remainingData } = parseUpdate(ops, schemaFields) for (const key in extractedFields) { columns.push(key) } if (Object.keys(remainingData).length > 0) { columns.push('data') } + if (!columns.includes('%hash%')) { + columns.push('%hash%') + } await client`UPDATE ${client(translateDomain(domain))} SET ${client(converted, columns)} WHERE _id = ${tx.objectId} AND "workspaceId" = ${this.workspaceId.name}` }) if (tx.retrieve === true && doc !== undefined) { @@ -1734,7 +1741,7 @@ class PostgresAdapter extends PostgresAdapterBase { ) } if ((withoutOperator ?? [])?.length > 0) { - result.push(...(await this.updateDoc(ctx, domain, withoutOperator ?? [], domainFields))) + result.push(...(await this.updateDoc(ctx, domain, withoutOperator ?? [], schemaFields))) } return result } @@ -1743,7 +1750,7 @@ class PostgresAdapter extends PostgresAdapterBase { ctx: MeasureContext, domain: Domain, txes: TxUpdateDoc<T>[], - domainFields: Set<string> + schemaFields: SchemaAndFields ): Promise<TxResult[]> { return ctx.with('update jsonb_set', {}, async (_ctx) => { const operations: { @@ -1760,7 +1767,7 @@ class PostgresAdapter extends PostgresAdapterBase { const updates: string[] = ['"modifiedBy" = $1', '"modifiedOn" = $2', '"%hash%" = $3'] const params: any[] = [tx.modifiedBy, tx.modifiedOn, null] let paramsIndex = params.length - const { extractedFields, remainingData } = parseUpdate(tx.operations, domainFields) + const { extractedFields, remainingData } = parseUpdate(tx.operations, schemaFields) const { space, attachedTo, ...ops } = tx.operations as any for (const key in extractedFields) { fields.push(key) diff --git a/server/postgres/src/utils.ts b/server/postgres/src/utils.ts index 60e821de15..1d1e9cc70f 100644 --- a/server/postgres/src/utils.ts +++ b/server/postgres/src/utils.ts @@ -36,7 +36,9 @@ import { getDocFieldsByDomains, getIndex, getSchema, + getSchemaAndFields, type Schema, + type SchemaAndFields, translateDomain } from './schemas' @@ -264,7 +266,7 @@ export function convertDoc<T extends Doc> ( domain: string, doc: T, workspaceId: string, - domainFields?: Set<string> + schemaAndFields?: SchemaAndFields ): DBDoc { const extractedFields: Doc & Record<string, any> = { _id: doc._id, @@ -280,19 +282,43 @@ export function convertDoc<T extends Doc> ( const extractedFieldsKeys = new Set(Object.keys(extractedFields)) - domainFields = domainFields ?? new Set(getDocFieldsByDomains(domain)) + schemaAndFields = schemaAndFields ?? getSchemaAndFields(domain) for (const key in doc) { if (extractedFieldsKeys.has(key)) { continue } - if (domainFields.has(key)) { + if (schemaAndFields.domainFields.has(key)) { extractedFields[key] = doc[key] } else { remainingData[key] = doc[key] } } + // Check if some fields are missing + for (const [key, _type] of Object.entries(schemaAndFields.schema)) { + if (!(key in doc)) { + // We missing required field, and we need to add a dummy value for it. + if (_type.notNull) { + // Null value is not allowed + switch (_type.type) { + case 'bigint': + extractedFields[key] = 0 + break + case 'bool': + extractedFields[key] = false + break + case 'text': + extractedFields[key] = '' + break + case 'text[]': + extractedFields[key] = [] + break + } + } + } + } + const res: any = { ...extractedFields, workspaceId, @@ -328,7 +354,7 @@ export function inferType (val: any): string { export function parseUpdate<T extends Doc> ( ops: DocumentUpdate<T> | MixinUpdate<Doc, T>, - fields: Set<string> + schemaFields: SchemaAndFields ): { extractedFields: Partial<T> remainingData: Partial<T> @@ -340,14 +366,14 @@ export function parseUpdate<T extends Doc> ( const val = (ops as any)[key] if (key.startsWith('$')) { for (const k in val) { - if (fields.has(k)) { + if (schemaFields.domainFields.has(k)) { ;(extractedFields as any)[k] = val[key] } else { ;(remainingData as any)[k] = val[key] } } } else { - if (fields.has(key)) { + if (schemaFields.domainFields.has(key)) { ;(extractedFields as any)[key] = val } else { ;(remainingData as any)[key] = val diff --git a/server/server/src/sessionManager.ts b/server/server/src/sessionManager.ts index 01120bbdda..3daf92f423 100644 --- a/server/server/src/sessionManager.ts +++ b/server/server/src/sessionManager.ts @@ -17,6 +17,7 @@ import { Analytics } from '@hcengineering/analytics' import core, { TxFactory, WorkspaceEvent, + cutObjectArray, generateId, isWorkspaceCreating, systemAccountEmail, @@ -249,7 +250,7 @@ class TSessionManager implements SessionManager { sec, wsId, user: s[1].session.getUser(), - ...r.params + ...cutObjectArray(r.params) }) } }