UBERF-8615: Backup/restore fixes (#7258)
Some checks are pending
CI / build (push) Waiting to run
CI / svelte-check (push) Blocked by required conditions
CI / formatting (push) Blocked by required conditions
CI / test (push) Blocked by required conditions
CI / uitest (push) Waiting to run
CI / uitest-pg (push) Waiting to run
CI / uitest-qms (push) Waiting to run
CI / docker-build (push) Blocked by required conditions
CI / dist-build (push) Blocked by required conditions

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-12-04 14:10:36 +07:00 committed by GitHub
parent f69d5ede0e
commit 26e93fefdb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 118 additions and 42 deletions

View File

@ -122,7 +122,11 @@ const close = (): void => {
}
process.on('unhandledRejection', (reason, promise) => {
console.log('Unhandled Rejection at:', promise, 'reason:', reason)
metricsContext.error('Unhandled Rejection at:', { origin, promise })
})
global.process.on('uncaughtException', (error, origin) => {
metricsContext.error('Uncaught Exception at:', { origin, error })
})
process.on('SIGINT', close)

View File

@ -32,11 +32,13 @@ import core, {
Ref,
SortingOrder,
systemAccountEmail,
TxProcessor,
WorkspaceId,
type BackupStatus,
type Blob,
type DocIndexState,
type Tx
type Tx,
type TxCUD
} from '@hcengineering/core'
import { BlobClient, createClient } from '@hcengineering/server-client'
import { type StorageAdapter } from '@hcengineering/server-core'
@ -1768,8 +1770,20 @@ export async function restore (
d.space = core.space.Workspace
;(d as any)['%hash%'] = ''
}
if (TxProcessor.isExtendsCUD(d._class)) {
const tx = d as TxCUD<Doc>
if (tx.objectSpace == null) {
tx.objectSpace = core.space.Workspace
;(tx as any)['%hash%'] = ''
}
}
}
try {
await connection.upload(c, docs)
} catch (err: any) {
ctx.error('error during upload', { err, docs: JSON.stringify(docs) })
}
await connection.upload(c, docs)
docs.length = 0
sendSize = 0
}

View File

@ -241,3 +241,27 @@ export function getDocFieldsByDomains (domain: string): string[] {
const schema = domainSchemas[translateDomain(domain)] ?? defaultSchema
return Object.keys(schema)
}
export interface SchemaAndFields {
schema: Schema
fields: string[]
domainFields: Set<string>
}
function createSchemaFields (schema: Schema): SchemaAndFields {
const fields = Object.keys(schema)
const domainFields = new Set(Object.keys(schema))
return { schema, fields, domainFields }
}
const defaultSchemaFields: SchemaAndFields = createSchemaFields(defaultSchema)
const domainSchemaFields = new Map<string, SchemaAndFields>()
for (const [domain, _schema] of Object.entries(domainSchemas)) {
domainSchemaFields.set(domain, createSchemaFields(_schema))
}
export function getSchemaAndFields (domain: string): SchemaAndFields {
return domainSchemaFields.get(translateDomain(domain)) ?? defaultSchemaFields
}

View File

@ -65,7 +65,14 @@ import {
type TxAdapter
} from '@hcengineering/server-core'
import type postgres from 'postgres'
import { getDocFieldsByDomains, getSchema, type Schema, translateDomain } from './schemas'
import {
getDocFieldsByDomains,
getSchema,
getSchemaAndFields,
type Schema,
type SchemaAndFields,
translateDomain
} from './schemas'
import { type ValueType } from './types'
import {
convertDoc,
@ -442,22 +449,21 @@ abstract class PostgresAdapterBase implements DbAdapter {
if ((operations as any)['%hash%'] === undefined) {
;(operations as any)['%hash%'] = null
}
const domainFields = new Set(getDocFieldsByDomains(domain))
const schemaFields = getSchemaAndFields(domain)
if (isOps) {
await this.mgr.write(undefined, async (client) => {
const res = await client.unsafe(`SELECT * FROM ${translateDomain(domain)} WHERE ${translatedQuery} FOR UPDATE`)
const schema = getSchema(domain)
const docs = res.map((p) => parseDoc(p as any, schema))
const docs = res.map((p) => parseDoc(p as any, schemaFields.schema))
for (const doc of docs) {
if (doc === undefined) continue
const prevAttachedTo = (doc as any).attachedTo
TxProcessor.applyUpdate(doc, operations)
;(doc as any)['%hash%'] = null
const converted = convertDoc(domain, doc, this.workspaceId.name, domainFields)
const converted = convertDoc(domain, doc, this.workspaceId.name, schemaFields)
const params: any[] = [doc._id, this.workspaceId.name]
let paramsIndex = params.length + 1
const updates: string[] = []
const { extractedFields, remainingData } = parseUpdate(operations, domainFields)
const { extractedFields, remainingData } = parseUpdate(operations, schemaFields)
const newAttachedTo = (doc as any).attachedTo
if (Object.keys(extractedFields).length > 0) {
for (const key in extractedFields) {
@ -482,7 +488,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
})
} else {
await this.rawUpdateDoc(domain, query, operations, domainFields)
await this.rawUpdateDoc(domain, query, operations, schemaFields)
}
}
@ -490,13 +496,13 @@ abstract class PostgresAdapterBase implements DbAdapter {
domain: Domain,
query: DocumentQuery<T>,
operations: DocumentUpdate<T>,
domainFields: Set<string>
schemaFields: SchemaAndFields
): Promise<void> {
const translatedQuery = this.buildRawQuery(domain, query)
const updates: string[] = []
const params: any[] = []
let paramsIndex = params.length + 1
const { extractedFields, remainingData } = parseUpdate(operations, domainFields)
const { extractedFields, remainingData } = parseUpdate(operations, schemaFields)
const { space, attachedTo, ...ops } = operations as any
for (const key in extractedFields) {
updates.push(`"${key}" = $${paramsIndex++}`)
@ -1397,8 +1403,8 @@ abstract class PostgresAdapterBase implements DbAdapter {
upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise<void> {
return ctx.with('upload', { domain }, async (ctx) => {
const fields = getDocFieldsByDomains(domain)
const filedsWithData = [...fields, 'data']
const schemaFields = getSchemaAndFields(domain)
const filedsWithData = [...schemaFields.fields, 'data']
const insertFields: string[] = []
const onConflict: string[] = []
for (const field of filedsWithData) {
@ -1409,7 +1415,6 @@ abstract class PostgresAdapterBase implements DbAdapter {
const onConflictStr = onConflict.join(', ')
try {
const domainFields = new Set(getDocFieldsByDomains(domain))
const toUpload = [...docs]
const tdomain = translateDomain(domain)
while (toUpload.length > 0) {
@ -1427,11 +1432,11 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
const size = digest != null ? estimateDocSize(doc) : 0
;(doc as any)['%hash%'] = digest == null ? null : `${digest}|${size.toString(16)}`
const d = convertDoc(domain, doc, this.workspaceId.name, domainFields)
const d = convertDoc(domain, doc, this.workspaceId.name, schemaFields)
values.push(d.workspaceId)
variables.push(`$${index++}`)
for (const field of fields) {
for (const field of schemaFields.fields) {
values.push(d[field])
variables.push(`$${index++}`)
}
@ -1504,7 +1509,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
const schema = getSchema(domain)
const docs = res.map((p) => parseDoc(p, schema))
const map = new Map(docs.map((d) => [d._id, d]))
const domainFields = new Set(getDocFieldsByDomains(domain))
const schemaFields = getSchemaAndFields(domain)
for (const [_id, ops] of operations) {
const doc = map.get(_id)
if (doc === undefined) continue
@ -1513,10 +1518,10 @@ abstract class PostgresAdapterBase implements DbAdapter {
;(op as any)['%hash%'] = null
}
TxProcessor.applyUpdate(doc, op)
const converted = convertDoc(domain, doc, this.workspaceId.name, domainFields)
const converted = convertDoc(domain, doc, this.workspaceId.name, schemaFields)
const columns: string[] = []
const { extractedFields, remainingData } = parseUpdate(op, domainFields)
const { extractedFields, remainingData } = parseUpdate(op, schemaFields)
for (const key in extractedFields) {
columns.push(key)
}
@ -1539,20 +1544,18 @@ abstract class PostgresAdapterBase implements DbAdapter {
@withContext('insert')
async insert (ctx: MeasureContext, domain: string, docs: Doc[]): Promise<TxResult> {
const schema = getSchema(domain)
const fields = Object.keys(schema)
const filedsWithData = [...fields, 'data']
const schemaFields = getSchemaAndFields(domain)
const filedsWithData = [...schemaFields.fields, 'data']
const columns: string[] = ['workspaceId']
for (const field of filedsWithData) {
columns.push(field)
}
const domainFields = new Set(fields)
while (docs.length > 0) {
const part = docs.splice(0, 500)
const values: DBDoc[] = []
for (let i = 0; i < part.length; i++) {
const doc = part[i]
const d = convertDoc(domain, doc, this.workspaceId.name, domainFields)
const d = convertDoc(domain, doc, this.workspaceId.name, schemaFields)
values.push(d)
}
await this.mgr.write(ctx.id, async (client) => {
@ -1608,7 +1611,7 @@ class PostgresAdapter extends PostgresAdapterBase {
}
}
private async txMixin (ctx: MeasureContext, tx: TxMixin<Doc, Doc>, domainFields: Set<string>): Promise<TxResult> {
private async txMixin (ctx: MeasureContext, tx: TxMixin<Doc, Doc>, schemaFields: SchemaAndFields): Promise<TxResult> {
await ctx.with('tx-mixin', { _class: tx.objectClass, mixin: tx.mixin }, async (ctx) => {
await this.mgr.write(ctx.id, async (client) => {
const doc = await this.findDoc(ctx, client, tx.objectClass, tx.objectId, true)
@ -1616,8 +1619,8 @@ class PostgresAdapter extends PostgresAdapterBase {
TxProcessor.updateMixin4Doc(doc, tx)
;(doc as any)['%hash%'] = null
const domain = this.hierarchy.getDomain(tx.objectClass)
const converted = convertDoc(domain, doc, this.workspaceId.name, domainFields)
const { extractedFields } = parseUpdate(tx.attributes as Partial<Doc>, domainFields)
const converted = convertDoc(domain, doc, this.workspaceId.name, schemaFields)
const { extractedFields } = parseUpdate(tx.attributes as Partial<Doc>, schemaFields)
const columns = new Set<string>()
for (const key in extractedFields) {
columns.add(key)
@ -1625,6 +1628,7 @@ class PostgresAdapter extends PostgresAdapterBase {
columns.add('modifiedBy')
columns.add('modifiedOn')
columns.add('data')
columns.add('%hash%')
await client`UPDATE ${client(translateDomain(domain))} SET ${client(converted, Array.from(columns))} WHERE _id = ${tx.objectId} AND "workspaceId" = ${this.workspaceId.name}`
})
})
@ -1655,7 +1659,7 @@ class PostgresAdapter extends PostgresAdapterBase {
this.process(ops, tx)
}
const domainFields = new Set(getDocFieldsByDomains(domain))
const domainFields = getSchemaAndFields(domain)
if (ops.add.length > 0) {
const res = await this.insert(ctx, domain, ops.add)
if (Object.keys(res).length > 0) {
@ -1694,7 +1698,7 @@ class PostgresAdapter extends PostgresAdapterBase {
ctx: MeasureContext,
domain: Domain,
txes: TxUpdateDoc<Doc>[],
domainFields: Set<string>
schemaFields: SchemaAndFields
): Promise<TxResult[]> {
const byOperator = groupByArray(txes, (it) => isOperator(it.operations))
@ -1715,15 +1719,18 @@ class PostgresAdapter extends PostgresAdapterBase {
ops.modifiedOn = tx.modifiedOn
TxProcessor.applyUpdate(doc, ops)
;(doc as any)['%hash%'] = null
const converted = convertDoc(domain, doc, this.workspaceId.name, domainFields)
const converted = convertDoc(domain, doc, this.workspaceId.name, schemaFields)
const columns: string[] = []
const { extractedFields, remainingData } = parseUpdate(ops, domainFields)
const { extractedFields, remainingData } = parseUpdate(ops, schemaFields)
for (const key in extractedFields) {
columns.push(key)
}
if (Object.keys(remainingData).length > 0) {
columns.push('data')
}
if (!columns.includes('%hash%')) {
columns.push('%hash%')
}
await client`UPDATE ${client(translateDomain(domain))} SET ${client(converted, columns)} WHERE _id = ${tx.objectId} AND "workspaceId" = ${this.workspaceId.name}`
})
if (tx.retrieve === true && doc !== undefined) {
@ -1734,7 +1741,7 @@ class PostgresAdapter extends PostgresAdapterBase {
)
}
if ((withoutOperator ?? [])?.length > 0) {
result.push(...(await this.updateDoc(ctx, domain, withoutOperator ?? [], domainFields)))
result.push(...(await this.updateDoc(ctx, domain, withoutOperator ?? [], schemaFields)))
}
return result
}
@ -1743,7 +1750,7 @@ class PostgresAdapter extends PostgresAdapterBase {
ctx: MeasureContext,
domain: Domain,
txes: TxUpdateDoc<T>[],
domainFields: Set<string>
schemaFields: SchemaAndFields
): Promise<TxResult[]> {
return ctx.with('update jsonb_set', {}, async (_ctx) => {
const operations: {
@ -1760,7 +1767,7 @@ class PostgresAdapter extends PostgresAdapterBase {
const updates: string[] = ['"modifiedBy" = $1', '"modifiedOn" = $2', '"%hash%" = $3']
const params: any[] = [tx.modifiedBy, tx.modifiedOn, null]
let paramsIndex = params.length
const { extractedFields, remainingData } = parseUpdate(tx.operations, domainFields)
const { extractedFields, remainingData } = parseUpdate(tx.operations, schemaFields)
const { space, attachedTo, ...ops } = tx.operations as any
for (const key in extractedFields) {
fields.push(key)

View File

@ -36,7 +36,9 @@ import {
getDocFieldsByDomains,
getIndex,
getSchema,
getSchemaAndFields,
type Schema,
type SchemaAndFields,
translateDomain
} from './schemas'
@ -264,7 +266,7 @@ export function convertDoc<T extends Doc> (
domain: string,
doc: T,
workspaceId: string,
domainFields?: Set<string>
schemaAndFields?: SchemaAndFields
): DBDoc {
const extractedFields: Doc & Record<string, any> = {
_id: doc._id,
@ -280,19 +282,43 @@ export function convertDoc<T extends Doc> (
const extractedFieldsKeys = new Set(Object.keys(extractedFields))
domainFields = domainFields ?? new Set(getDocFieldsByDomains(domain))
schemaAndFields = schemaAndFields ?? getSchemaAndFields(domain)
for (const key in doc) {
if (extractedFieldsKeys.has(key)) {
continue
}
if (domainFields.has(key)) {
if (schemaAndFields.domainFields.has(key)) {
extractedFields[key] = doc[key]
} else {
remainingData[key] = doc[key]
}
}
// Check if some fields are missing
for (const [key, _type] of Object.entries(schemaAndFields.schema)) {
if (!(key in doc)) {
// We missing required field, and we need to add a dummy value for it.
if (_type.notNull) {
// Null value is not allowed
switch (_type.type) {
case 'bigint':
extractedFields[key] = 0
break
case 'bool':
extractedFields[key] = false
break
case 'text':
extractedFields[key] = ''
break
case 'text[]':
extractedFields[key] = []
break
}
}
}
}
const res: any = {
...extractedFields,
workspaceId,
@ -328,7 +354,7 @@ export function inferType (val: any): string {
export function parseUpdate<T extends Doc> (
ops: DocumentUpdate<T> | MixinUpdate<Doc, T>,
fields: Set<string>
schemaFields: SchemaAndFields
): {
extractedFields: Partial<T>
remainingData: Partial<T>
@ -340,14 +366,14 @@ export function parseUpdate<T extends Doc> (
const val = (ops as any)[key]
if (key.startsWith('$')) {
for (const k in val) {
if (fields.has(k)) {
if (schemaFields.domainFields.has(k)) {
;(extractedFields as any)[k] = val[key]
} else {
;(remainingData as any)[k] = val[key]
}
}
} else {
if (fields.has(key)) {
if (schemaFields.domainFields.has(key)) {
;(extractedFields as any)[key] = val
} else {
;(remainingData as any)[key] = val

View File

@ -17,6 +17,7 @@ import { Analytics } from '@hcengineering/analytics'
import core, {
TxFactory,
WorkspaceEvent,
cutObjectArray,
generateId,
isWorkspaceCreating,
systemAccountEmail,
@ -249,7 +250,7 @@ class TSessionManager implements SessionManager {
sec,
wsId,
user: s[1].session.getUser(),
...r.params
...cutObjectArray(r.params)
})
}
}