From 9fef46b059b6026a686d1a53432bf823478ebc33 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Tue, 7 Feb 2023 11:46:28 +0700 Subject: [PATCH] Low level bulk operations (#2592) Signed-off-by: Andrey Sobolev --- dev/client-resources/src/index.ts | 3 +- dev/storage/src/storage.ts | 14 +- dev/tool/src/__start.ts | 2 +- dev/tool/src/index.ts | 2 +- dev/tool/src/workspace.ts | 2 +- models/all/src/migration.ts | 46 +- models/contact/src/migration.ts | 69 +-- packages/core/src/tx.ts | 48 ++- .../src/components/EditDoc.svelte | 4 +- .../attachment-resources/src/index.ts | 37 +- server-plugins/contact-resources/src/index.ts | 16 +- server-plugins/tags-resources/src/index.ts | 18 +- server/account/src/index.ts | 8 +- server/core/src/adapter.ts | 8 +- server/core/src/storage.ts | 395 ++++++++++++------ server/core/src/types.ts | 1 + server/core/src/utils.ts | 34 ++ server/elastic/src/backup.ts | 2 +- server/mongo/src/storage.ts | 365 ++++++++++------ server/server/src/minio.ts | 2 +- server/tool/src/index.ts | 13 +- 21 files changed, 684 insertions(+), 405 deletions(-) create mode 100644 server/core/src/utils.ts diff --git a/dev/client-resources/src/index.ts b/dev/client-resources/src/index.ts index fe9ec3e117..dda50fb3d7 100644 --- a/dev/client-resources/src/index.ts +++ b/dev/client-resources/src/index.ts @@ -28,7 +28,8 @@ export default async () => { if (client === undefined) { client = await createClient(connect) for (const op of migrateOperations) { - await op.upgrade(client) + console.log('Migrate', op[0]) + await op[1].upgrade(client) } } // Check if we had dev hook for client. diff --git a/dev/storage/src/storage.ts b/dev/storage/src/storage.ts index 1e0e047f07..ad2e135c26 100644 --- a/dev/storage/src/storage.ts +++ b/dev/storage/src/storage.ts @@ -44,8 +44,18 @@ class InMemoryTxAdapter extends DummyDbAdapter implements TxAdapter { return await this.txdb.findAll(_class, query, options) } - tx (tx: Tx): Promise { - return this.txdb.tx(tx) + async tx (...tx: Tx[]): Promise { + const r: TxResult[] = [] + for (const t of tx) { + r.push(await this.txdb.tx(t)) + } + if (r.length === 1) { + return r[0] + } + if (r.length === 0) { + return {} + } + return r } async init (model: Tx[]): Promise { diff --git a/dev/tool/src/__start.ts b/dev/tool/src/__start.ts index 2ff76b4df0..5db4b1338f 100644 --- a/dev/tool/src/__start.ts +++ b/dev/tool/src/__start.ts @@ -63,7 +63,7 @@ function prepareTools (): { minio: MinioService txes: Tx[] version: Data - migrateOperations: MigrateOperation[] + migrateOperations: [string, MigrateOperation][] } { return { ...prepareToolsRaw(builder.getTxes()), version, migrateOperations } } diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index a1b9d89397..e65df86e1c 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -61,7 +61,7 @@ export function devTool ( minio: MinioService txes: Tx[] version: Data - migrateOperations: MigrateOperation[] + migrateOperations: [string, MigrateOperation][] }, productId: string ): void { diff --git a/dev/tool/src/workspace.ts b/dev/tool/src/workspace.ts index ea408a2019..ca9fe32fc0 100644 --- a/dev/tool/src/workspace.ts +++ b/dev/tool/src/workspace.ts @@ -105,7 +105,7 @@ export async function restoreWorkspace ( elasticUrl: string, transactorUrl: string, rawTxes: Tx[], - migrateOperations: MigrateOperation[] + migrateOperations: [string, MigrateOperation][] ): Promise { console.log('Restoring workspace', mongoUrl, workspaceId, fileName) const client = new MongoClient(mongoUrl) diff --git a/models/all/src/migration.ts b/models/all/src/migration.ts index 9fab63b12a..3d1c1e7103 100644 --- a/models/all/src/migration.ts +++ b/models/all/src/migration.ts @@ -38,27 +38,27 @@ import { hrOperation } from '@hcengineering/model-hr' import { documentOperation } from '@hcengineering/model-document' import { bitrixOperation } from '@hcengineering/model-bitrix' -export const migrateOperations: MigrateOperation[] = [ - coreOperation, - chunterOperation, - demoOperation, - gmailOperation, - templatesOperation, - telegramOperation, - taskOperation, - attachmentOperation, - automationOperation, - leadOperation, - recruitOperation, - viewOperation, - contactOperation, - tagsOperation, - notificationOperation, - settingOperation, - trackerOperation, - boardOperation, - hrOperation, - documentOperation, - bitrixOperation, - inventoryOperation +export const migrateOperations: [string, MigrateOperation][] = [ + ['core', coreOperation], + ['chunter', chunterOperation], + ['demo', demoOperation], + ['gmail', gmailOperation], + ['templates', templatesOperation], + ['telegram', telegramOperation], + ['task', taskOperation], + ['attachment', attachmentOperation], + ['', automationOperation], + ['lead', leadOperation], + ['recruit', recruitOperation], + ['view', viewOperation], + ['contact', contactOperation], + ['tags', tagsOperation], + ['notification', notificationOperation], + ['setting', settingOperation], + ['tracker', trackerOperation], + ['board', boardOperation], + ['hr', hrOperation], + ['document', documentOperation], + ['bitrix', bitrixOperation], + ['inventiry', inventoryOperation] ] diff --git a/models/contact/src/migration.ts b/models/contact/src/migration.ts index d3f06f862f..112f592757 100644 --- a/models/contact/src/migration.ts +++ b/models/contact/src/migration.ts @@ -44,36 +44,47 @@ async function createSpace (tx: TxOperations): Promise { } async function setCreate (client: MigrationClient): Promise { - const docs = await client.find(DOMAIN_CONTACT, { - _class: { $in: [contact.class.Contact, contact.class.Organization, contact.class.Person, contact.class.Employee] }, - createOn: { $exists: false } - }) - for (const doc of docs) { - const tx = ( - await client.find>(DOMAIN_TX, { - objectId: doc._id, - _class: core.class.TxCreateDoc - }) - )[0] - if (tx !== undefined) { - await client.update( - DOMAIN_CONTACT, - { - _id: doc._id + while (true) { + const docs = await client.find( + DOMAIN_CONTACT, + { + _class: { + $in: [contact.class.Contact, contact.class.Organization, contact.class.Person, contact.class.Employee] }, - { - createOn: tx.modifiedOn - } - ) - await client.update( - DOMAIN_TX, - { - _id: tx._id - }, - { - 'attributes.createOn': tx.modifiedOn - } - ) + createOn: { $exists: false } + }, + { limit: 500 } + ) + if (docs.length === 0) { + break + } + console.log('processing createOn migration', docs.length) + const creates = await client.find>(DOMAIN_TX, { + objectId: { $in: docs.map((it) => it._id) }, + _class: core.class.TxCreateDoc + }) + for (const doc of docs) { + const tx = creates.find((it) => it.objectId === doc._id) + if (tx !== undefined) { + await client.update( + DOMAIN_CONTACT, + { + _id: doc._id + }, + { + createOn: tx.modifiedOn + } + ) + await client.update( + DOMAIN_TX, + { + _id: tx._id + }, + { + 'attributes.createOn': tx.modifiedOn + } + ) + } } } } diff --git a/packages/core/src/tx.ts b/packages/core/src/tx.ts index 311dd29bd1..569a4d6889 100644 --- a/packages/core/src/tx.ts +++ b/packages/core/src/tx.ts @@ -264,30 +264,44 @@ export const DOMAIN_TX = 'tx' as Domain * @public */ export interface WithTx { - tx: (tx: Tx) => Promise + tx: (...txs: Tx[]) => Promise } /** * @public */ export abstract class TxProcessor implements WithTx { - async tx (tx: Tx): Promise { - switch (tx._class) { - case core.class.TxCreateDoc: - return await this.txCreateDoc(tx as TxCreateDoc) - case core.class.TxCollectionCUD: - return await this.txCollectionCUD(tx as TxCollectionCUD) - case core.class.TxUpdateDoc: - return await this.txUpdateDoc(tx as TxUpdateDoc) - case core.class.TxRemoveDoc: - return await this.txRemoveDoc(tx as TxRemoveDoc) - case core.class.TxMixin: - return await this.txMixin(tx as TxMixin) - case core.class.TxApplyIf: - // Apply if processed on server - return await Promise.resolve({}) + async tx (...txes: Tx[]): Promise { + const result: TxResult[] = [] + for (const tx of txes) { + switch (tx._class) { + case core.class.TxCreateDoc: + result.push(await this.txCreateDoc(tx as TxCreateDoc)) + break + case core.class.TxCollectionCUD: + result.push(await this.txCollectionCUD(tx as TxCollectionCUD)) + break + case core.class.TxUpdateDoc: + result.push(await this.txUpdateDoc(tx as TxUpdateDoc)) + break + case core.class.TxRemoveDoc: + result.push(await this.txRemoveDoc(tx as TxRemoveDoc)) + break + case core.class.TxMixin: + result.push(await this.txMixin(tx as TxMixin)) + break + case core.class.TxApplyIf: + // Apply if processed on server + return await Promise.resolve({}) + } } - throw new Error('TxProcessor: unhandled transaction class: ' + tx._class) + if (result.length === 0) { + return {} + } + if (result.length === 1) { + return result[0] + } + return result } static createDoc2Doc(tx: TxCreateDoc): T { diff --git a/plugins/view-resources/src/components/EditDoc.svelte b/plugins/view-resources/src/components/EditDoc.svelte index 77bef7f2e4..8f12734d6e 100644 --- a/plugins/view-resources/src/components/EditDoc.svelte +++ b/plugins/view-resources/src/components/EditDoc.svelte @@ -70,7 +70,9 @@ $: if (_id && _class) { query.query(_class, { _id }, (result) => { object = result[0] - realObjectClass = object._class + if (object != null) { + realObjectClass = object._class + } }) } else { query.unsubscribe() diff --git a/server-plugins/attachment-resources/src/index.ts b/server-plugins/attachment-resources/src/index.ts index 37846f901c..c1493613cb 100644 --- a/server-plugins/attachment-resources/src/index.ts +++ b/server-plugins/attachment-resources/src/index.ts @@ -14,41 +14,18 @@ // limitations under the License. // -import type { Doc, Ref, Tx, TxCollectionCUD, TxCreateDoc, TxRemoveDoc } from '@hcengineering/core' -import type { TriggerControl } from '@hcengineering/server-core' import type { Attachment } from '@hcengineering/attachment' import attachment from '@hcengineering/attachment' +import type { Doc, Ref, Tx, TxRemoveDoc } from '@hcengineering/core' import core, { TxProcessor } from '@hcengineering/core' - -const findCreateTx = async ( - id: Ref, - findAll: TriggerControl['findAll'] -): Promise | undefined> => { - const createTx = (await findAll>(core.class.TxCreateDoc, { objectId: id }))[0] - - if (createTx !== undefined) { - return createTx - } - - const colTx = ( - await findAll>(core.class.TxCollectionCUD, { - 'tx._class': core.class.TxCreateDoc, - 'tx.objectClass': attachment.class.Attachment, - 'tx.objectId': id - }) - )[0] - - if (colTx === undefined) return - - return colTx.tx as TxCreateDoc -} +import type { TriggerControl } from '@hcengineering/server-core' /** * @public */ export async function OnAttachmentDelete ( tx: Tx, - { findAll, hierarchy, fulltextFx, storageFx }: TriggerControl + { findAll, hierarchy, fulltextFx, storageFx, removedMap }: TriggerControl ): Promise { const actualTx = TxProcessor.extractTx(tx) if (actualTx._class !== core.class.TxRemoveDoc) { @@ -61,14 +38,12 @@ export async function OnAttachmentDelete ( return [] } - const createTx = await findCreateTx(rmTx.objectId, findAll) + // Obtain document being deleted. + const attach = removedMap.get(rmTx.objectId) as Attachment - if (createTx === undefined) { + if (attach === undefined) { return [] } - - const attach = TxProcessor.createDoc2Doc(createTx) - fulltextFx(async (adapter) => { await adapter.remove([attach.file as Ref]) }) diff --git a/server-plugins/contact-resources/src/index.ts b/server-plugins/contact-resources/src/index.ts index 4d99da0892..f9d96bd7cd 100644 --- a/server-plugins/contact-resources/src/index.ts +++ b/server-plugins/contact-resources/src/index.ts @@ -15,7 +15,7 @@ // import contact, { Contact, contactId, formatName, Organization, Person } from '@hcengineering/contact' -import core, { concatLink, Doc, Tx, TxCreateDoc, TxRemoveDoc, TxUpdateDoc } from '@hcengineering/core' +import core, { concatLink, Doc, Tx, TxRemoveDoc } from '@hcengineering/core' import login from '@hcengineering/login' import { getMetadata } from '@hcengineering/platform' import type { TriggerControl } from '@hcengineering/server-core' @@ -25,7 +25,10 @@ import { workbenchId } from '@hcengineering/workbench' /** * @public */ -export async function OnContactDelete (tx: Tx, { findAll, hierarchy, storageFx }: TriggerControl): Promise { +export async function OnContactDelete ( + tx: Tx, + { findAll, hierarchy, storageFx, removedMap }: TriggerControl +): Promise { if (tx._class !== core.class.TxRemoveDoc) { return [] } @@ -36,15 +39,12 @@ export async function OnContactDelete (tx: Tx, { findAll, hierarchy, storageFx } return [] } - const createTx = (await findAll>(core.class.TxCreateDoc, { objectId: rmTx.objectId }))[0] - if (createTx === undefined) { + const removeContact = removedMap.get(rmTx.objectId) as Contact + if (removeContact === undefined) { return [] } - const updateTxes = await findAll>(core.class.TxUpdateDoc, { objectId: rmTx.objectId }) - const avatar: string | undefined = [createTx.attributes.avatar, ...updateTxes.map((x) => x.operations.avatar)] - .filter((x): x is string => x !== undefined) - .slice(-1)[0] + const avatar: string | undefined = [removeContact.avatar].filter((x): x is string => x !== undefined).slice(-1)[0] if (avatar === undefined) { return [] diff --git a/server-plugins/tags-resources/src/index.ts b/server-plugins/tags-resources/src/index.ts index a482f19de3..67899628cf 100644 --- a/server-plugins/tags-resources/src/index.ts +++ b/server-plugins/tags-resources/src/index.ts @@ -64,16 +64,14 @@ export async function onTagReference (tx: Tx, control: TriggerControl): Promise< } if (isRemove) { const ctx = actualTx as TxRemoveDoc - const createTx = ( - await control.findAll(core.class.TxCollectionCUD, { 'tx.objectId': ctx.objectId }, { limit: 1 }) - )[0] - if (createTx !== undefined) { - const actualCreateTx = TxProcessor.extractTx(createTx) - const doc = TxProcessor.createDoc2Doc(actualCreateTx as TxCreateDoc) - const res = control.txFactory.createTxUpdateDoc(tags.class.TagElement, tags.space.Tags, doc.tag, { - $inc: { refCount: -1 } - }) - return [res] + const doc = control.removedMap.get(ctx.objectId) as TagReference + if (doc !== undefined) { + if (!control.removedMap.has(doc.tag)) { + const res = control.txFactory.createTxUpdateDoc(tags.class.TagElement, tags.space.Tags, doc.tag, { + $inc: { refCount: -1 } + }) + return [res] + } } } return [] diff --git a/server/account/src/index.ts b/server/account/src/index.ts index 43bb7fb265..a05c1d8d09 100644 --- a/server/account/src/index.ts +++ b/server/account/src/index.ts @@ -393,7 +393,7 @@ export async function listAccounts (db: Db): Promise { export async function createWorkspace ( version: Data, txes: Tx[], - migrationOperation: MigrateOperation[], + migrationOperation: [string, MigrateOperation][], db: Db, productId: string, workspace: string, @@ -421,7 +421,7 @@ export async function createWorkspace ( export async function upgradeWorkspace ( version: Data, txes: Tx[], - migrationOperation: MigrateOperation[], + migrationOperation: [string, MigrateOperation][], productId: string, db: Db, workspace: string @@ -449,7 +449,7 @@ export async function upgradeWorkspace ( * @public */ export const createUserWorkspace = - (version: Data, txes: Tx[], migrationOperation: MigrateOperation[]) => + (version: Data, txes: Tx[], migrationOperation: [string, MigrateOperation][]) => async (db: Db, productId: string, token: string, workspace: string): Promise => { const { email } = decodeToken(token) await createWorkspace(version, txes, migrationOperation, db, productId, workspace, '') @@ -925,7 +925,7 @@ function wrap (f: (db: Db, productId: string, ...args: any[]) => Promise): export function getMethods ( version: Data, txes: Tx[], - migrateOperations: MigrateOperation[] + migrateOperations: [string, MigrateOperation][] ): Record { return { login: wrap(login), diff --git a/server/core/src/adapter.ts b/server/core/src/adapter.ts index baca1ed1b3..0a987cdd88 100644 --- a/server/core/src/adapter.ts +++ b/server/core/src/adapter.ts @@ -46,7 +46,7 @@ export interface DbAdapter { query: DocumentQuery, options?: FindOptions ) => Promise> - tx: (tx: Tx) => Promise + tx: (...tx: Tx[]) => Promise find: (domain: Domain) => StorageIterator @@ -97,7 +97,7 @@ export class DummyDbAdapter implements DbAdapter { return toFindResult([]) } - async tx (tx: Tx): Promise { + async tx (...tx: Tx[]): Promise { return {} } @@ -137,8 +137,8 @@ class InMemoryAdapter extends DummyDbAdapter implements DbAdapter { return await this.modeldb.findAll(_class, query, options) } - async tx (tx: Tx): Promise { - return await this.modeldb.tx(tx) + async tx (...tx: Tx[]): Promise { + return await this.modeldb.tx(...tx) } async init (model: Tx[]): Promise { diff --git a/server/core/src/storage.ts b/server/core/src/storage.ts index 2d0398c724..fd8a7fb049 100644 --- a/server/core/src/storage.ts +++ b/server/core/src/storage.ts @@ -39,10 +39,8 @@ import core, { Tx, TxApplyIf, TxCollectionCUD, - TxCreateDoc, TxCUD, TxFactory, - TxMixin, TxProcessor, TxRemoveDoc, TxResult, @@ -64,6 +62,7 @@ import type { FullTextAdapterFactory, ObjectDDParticipant } from './types' +import { createCacheFindAll } from './utils' /** * @public @@ -138,16 +137,50 @@ class TServerStorage implements ServerStorage { return adapter } - private async routeTx (ctx: MeasureContext, tx: Tx): Promise { - if (this.hierarchy.isDerived(tx._class, core.class.TxCUD)) { - const txCUD = tx as TxCUD + private async routeTx (ctx: MeasureContext, ...txes: Tx[]): Promise { + let part: TxCUD[] = [] + let lastDomain: Domain | undefined + const result: TxResult[] = [] + const processPart = async (): Promise => { + if (part.length > 0) { + const adapter = this.getAdapter(lastDomain as Domain) + const r = await adapter.tx(...part) + if (Array.isArray(r)) { + result.push(...r) + } else { + result.push(r) + } + part = [] + } + } + for (const tx of txes) { + const txCUD = TxProcessor.extractTx(tx) as TxCUD + if (!this.hierarchy.isDerived(txCUD._class, core.class.TxCUD)) { + // Skip unsupported tx + console.error('Unsupported transacton', tx) + continue + } const domain = this.hierarchy.getDomain(txCUD.objectClass) - const adapter = this.getAdapter(domain) - const res = await adapter.tx(txCUD) - return res - } else { + if (part.length > 0) { + if (lastDomain !== domain) { + await processPart() + } + lastDomain = domain + part.push(txCUD) + } else { + lastDomain = domain + part.push(txCUD) + } + } + await processPart() + + if (result.length === 1) { + return result[0] + } + if (result.length === 0) { return [{}, false] } + return result } private async getCollectionUpdateTx( @@ -174,7 +207,7 @@ class TServerStorage implements ServerStorage { } } - private async updateCollection (ctx: MeasureContext, tx: Tx): Promise { + private async updateCollection (ctx: MeasureContext, tx: Tx, findAll: ServerStorage['findAll']): Promise { if (tx._class !== core.class.TxCollectionCUD) { return [] } @@ -195,7 +228,7 @@ class TServerStorage implements ServerStorage { return [] } - const oldAttachedTo = (await this.findAll(ctx, _class, { _id }, { limit: 1 }))[0] + const oldAttachedTo = (await findAll(ctx, _class, { _id }, { limit: 1 }))[0] let oldTx: Tx | null = null if (oldAttachedTo !== undefined) { const attr = this.hierarchy.getAttribute(oldAttachedTo._class, colTx.collection) @@ -209,7 +242,7 @@ class TServerStorage implements ServerStorage { const newAttachedToClass = operations.attachedToClass ?? _class const newAttachedToCollection = operations.collection ?? colTx.collection - const newAttachedTo = (await this.findAll(ctx, newAttachedToClass, { _id: operations.attachedTo }, { limit: 1 }))[0] + const newAttachedTo = (await findAll(ctx, newAttachedToClass, { _id: operations.attachedTo }, { limit: 1 }))[0] let newTx: Tx | null = null const newAttr = this.hierarchy.getAttribute(newAttachedToClass, newAttachedToCollection) if (newAttachedTo !== undefined && newAttr !== undefined) { @@ -226,37 +259,45 @@ class TServerStorage implements ServerStorage { return [...(oldTx !== null ? [oldTx] : []), ...(newTx !== null ? [newTx] : [])] } - private async processCollection (ctx: MeasureContext, tx: Tx): Promise { - if (tx._class === core.class.TxCollectionCUD) { - const colTx = tx as TxCollectionCUD - const _id = colTx.objectId - const _class = colTx.objectClass + private async processCollection ( + ctx: MeasureContext, + txes: Tx[], + findAll: ServerStorage['findAll'], + removedMap: Map, Doc> + ): Promise { + const result: Tx[] = [] + for (const tx of txes) { + if (tx._class === core.class.TxCollectionCUD) { + const colTx = tx as TxCollectionCUD + const _id = colTx.objectId + const _class = colTx.objectClass - // Skip model operations - if (this.hierarchy.getDomain(_class) === DOMAIN_MODEL) { - // We could not update increments for model classes - return [] - } + // Skip model operations + if (this.hierarchy.getDomain(_class) === DOMAIN_MODEL) { + // We could not update increments for model classes + continue + } - const isCreateTx = colTx.tx._class === core.class.TxCreateDoc - const isDeleteTx = colTx.tx._class === core.class.TxRemoveDoc - const isUpdateTx = colTx.tx._class === core.class.TxUpdateDoc - if (isUpdateTx) { - return await this.updateCollection(ctx, tx) - } + const isCreateTx = colTx.tx._class === core.class.TxCreateDoc + const isDeleteTx = colTx.tx._class === core.class.TxRemoveDoc + const isUpdateTx = colTx.tx._class === core.class.TxUpdateDoc + if (isUpdateTx) { + result.push(...(await this.updateCollection(ctx, tx, findAll))) + } - if (isCreateTx || isDeleteTx) { - const attachedTo = (await this.findAll(ctx, _class, { _id }, { limit: 1 }))[0] - if (attachedTo !== undefined) { - return [ - await this.getCollectionUpdateTx(_id, _class, tx.modifiedBy, colTx.modifiedOn, attachedTo, { - $inc: { [colTx.collection]: isCreateTx ? 1 : -1 } - }) - ] + if ((isCreateTx || isDeleteTx) && !removedMap.has(_id)) { + const attachedTo = (await findAll(ctx, _class, { _id }, { limit: 1 }))[0] + if (attachedTo !== undefined) { + result.push( + await this.getCollectionUpdateTx(_id, _class, tx.modifiedBy, colTx.modifiedOn, attachedTo, { + $inc: { [colTx.collection]: isCreateTx ? 1 : -1 } + }) + ) + } } } } - return [] + return result } async findAll( @@ -299,49 +340,95 @@ class TServerStorage implements ServerStorage { ) } - private async buildRemovedDoc (ctx: MeasureContext, tx: TxRemoveDoc): Promise { - const isAttached = this.hierarchy.isDerived(tx.objectClass, core.class.AttachedDoc) - const txes = await this.findAll>( - ctx, - isAttached ? core.class.TxCollectionCUD : core.class.TxCUD, - isAttached - ? { 'tx.objectId': tx.objectId as Ref } - : { - objectId: tx.objectId - }, - { sort: { modifiedOn: 1 } } - ) - const createTx = isAttached - ? txes.find((tx) => (tx as TxCollectionCUD).tx._class === core.class.TxCreateDoc) - : txes.find((tx) => tx._class === core.class.TxCreateDoc) - if (createTx === undefined) return - let doc = TxProcessor.createDoc2Doc(createTx as TxCreateDoc) - for (let tx of txes) { - tx = TxProcessor.extractTx(tx) as TxCUD - if (tx._class === core.class.TxUpdateDoc) { - doc = TxProcessor.updateDoc2Doc(doc, tx as TxUpdateDoc) - } else if (tx._class === core.class.TxMixin) { - const mixinTx = tx as TxMixin - doc = TxProcessor.updateMixin4Doc(doc, mixinTx) + private async buildRemovedDoc (ctx: MeasureContext, rawTxes: Tx[], findAll: ServerStorage['findAll']): Promise { + const removeObjectIds: Ref[] = [] + const removeAttachObjectIds: Ref[] = [] + + const removeTxes = rawTxes + .filter((it) => this.hierarchy.isDerived(it._class, core.class.TxRemoveDoc)) + .map((it) => TxProcessor.extractTx(it) as TxRemoveDoc) + + for (const rtx of removeTxes) { + const isAttached = this.hierarchy.isDerived(rtx.objectClass, core.class.AttachedDoc) + if (isAttached) { + removeAttachObjectIds.push(rtx.objectId as Ref) + } else { + removeObjectIds.push(rtx.objectId) } } - return doc + const txes = + removeObjectIds.length > 0 + ? await findAll>( + ctx, + core.class.TxCUD, + { + objectId: { $in: removeObjectIds } + }, + { sort: { modifiedOn: 1 } } + ) + : [] + const result: Doc[] = [] + + const txesAttach = + removeAttachObjectIds.length > 0 + ? await findAll>( + ctx, + core.class.TxCollectionCUD, + { 'tx.objectId': { $in: removeAttachObjectIds } }, + { sort: { modifiedOn: 1 } } + ) + : [] + + for (const rtx of removeTxes) { + const isAttached = this.hierarchy.isDerived(rtx.objectClass, core.class.AttachedDoc) + + const objTxex = isAttached + ? txesAttach.filter((tx) => tx.tx.objectId === rtx.objectId) + : txes.filter((it) => it.objectId === rtx.objectId) + + const doc = TxProcessor.buildDoc2Doc(objTxex) + if (doc !== undefined) { + result.push(doc) + } + } + + return result } - private async processRemove (ctx: MeasureContext, tx: Tx): Promise { - const actualTx = TxProcessor.extractTx(tx) - if (!this.hierarchy.isDerived(actualTx._class, core.class.TxRemoveDoc)) return [] - const rtx = actualTx as TxRemoveDoc + private async processRemove ( + ctx: MeasureContext, + txes: Tx[], + findAll: ServerStorage['findAll'], + removedMap: Map, Doc> + ): Promise { const result: Tx[] = [] - const object = await this.buildRemovedDoc(ctx, rtx) - if (object === undefined) return [] - result.push(...(await this.deleteClassCollections(ctx, object._class, rtx.objectId))) - const mixins = this.getMixins(object._class, object) - for (const mixin of mixins) { - result.push(...(await this.deleteClassCollections(ctx, mixin, rtx.objectId, object._class))) + + const objects = await this.buildRemovedDoc(ctx, txes, findAll) + for (const obj of objects) { + removedMap.set(obj._id, obj) + } + + for (const tx of txes) { + const actualTx = TxProcessor.extractTx(tx) + if (!this.hierarchy.isDerived(actualTx._class, core.class.TxRemoveDoc)) { + continue + } + const rtx = actualTx as TxRemoveDoc + const object = removedMap.get(rtx.objectId) + if (object === undefined) { + continue + } + result.push(...(await this.deleteClassCollections(ctx, object._class, rtx.objectId, findAll, removedMap))) + const mixins = this.getMixins(object._class, object) + for (const mixin of mixins) { + result.push( + ...(await this.deleteClassCollections(ctx, mixin, rtx.objectId, findAll, removedMap, object._class)) + ) + } + + result.push(...(await this.deleteRelatedDocuments(ctx, object, findAll, removedMap))) } - result.push(...(await this.deleteRelatedDocuments(ctx, object))) return result } @@ -349,6 +436,8 @@ class TServerStorage implements ServerStorage { ctx: MeasureContext, _class: Ref>, objectId: Ref, + findAll: ServerStorage['findAll'], + removedMap: Map, Doc>, to?: Ref> ): Promise { const attributes = this.hierarchy.getAllAttributes(_class, to) @@ -356,18 +445,18 @@ class TServerStorage implements ServerStorage { for (const attribute of attributes) { if (this.hierarchy.isDerived(attribute[1].type._class, core.class.Collection)) { const collection = attribute[1].type as Collection - const allAttached = await this.findAll(ctx, collection.of, { attachedTo: objectId }) + const allAttached = await findAll(ctx, collection.of, { attachedTo: objectId }) for (const attached of allAttached) { - result.push(...(await this.deleteObject(ctx, attached))) + result.push(...this.deleteObject(ctx, attached, removedMap)) } } } return result } - private async deleteObject (ctx: MeasureContext, object: Doc): Promise { + private deleteObject (ctx: MeasureContext, object: Doc, removedMap: Map, Doc>): Tx[] { const result: Tx[] = [] - const factory = new TxFactory(core.account.System) + const factory = new TxFactory(object.modifiedBy) if (this.hierarchy.isDerived(object._class, core.class.AttachedDoc)) { const adoc = object as AttachedDoc const nestedTx = factory.createTxRemoveDoc(adoc._class, adoc.space, adoc._id) @@ -378,14 +467,21 @@ class TServerStorage implements ServerStorage { adoc.collection, nestedTx ) + removedMap.set(adoc._id, adoc) result.push(tx) } else { result.push(factory.createTxRemoveDoc(object._class, object.space, object._id)) + removedMap.set(object._id, object) } return result } - private async deleteRelatedDocuments (ctx: MeasureContext, object: Doc): Promise { + private async deleteRelatedDocuments ( + ctx: MeasureContext, + object: Doc, + findAll: ServerStorage['findAll'], + removedMap: Map, Doc> + ): Promise { const result: Tx[] = [] const objectClass = this.hierarchy.getClass(object._class) if (this.hierarchy.hasMixin(objectClass, serverCore.mixin.ObjectDDParticipant)) { @@ -395,39 +491,48 @@ class TServerStorage implements ServerStorage { ) const collector = await getResource(removeParticipand.collectDocs) const docs = await collector(object, this.hierarchy, async (_class, query, options) => { - return await this.findAll(ctx, _class, query, options) + return await findAll(ctx, _class, query, options) }) for (const d of docs) { - result.push(...(await this.deleteObject(ctx, d))) + result.push(...this.deleteObject(ctx, d, removedMap)) } } return result } - private async processMove (ctx: MeasureContext, tx: Tx): Promise { - const actualTx = TxProcessor.extractTx(tx) - if (!this.hierarchy.isDerived(actualTx._class, core.class.TxUpdateDoc)) return [] - const rtx = actualTx as TxUpdateDoc - if (rtx.operations.space === undefined || rtx.operations.space === rtx.objectSpace) return [] + private async processMove (ctx: MeasureContext, txes: Tx[], findAll: ServerStorage['findAll']): Promise { const result: Tx[] = [] - const factory = new TxFactory(core.account.System) - for (const [, attribute] of this.hierarchy.getAllAttributes(rtx.objectClass)) { - if (!this.hierarchy.isDerived(attribute.type._class, core.class.Collection)) continue - const collection = attribute.type as Collection - const allAttached = await this.findAll(ctx, collection.of, { attachedTo: rtx.objectId, space: rtx.objectSpace }) - const allTx = allAttached.map(({ _class, space, _id }) => - factory.createTxUpdateDoc(_class, space, _id, { space: rtx.operations.space }) - ) - result.push(...allTx) + for (const tx of txes) { + const actualTx = TxProcessor.extractTx(tx) + if (!this.hierarchy.isDerived(actualTx._class, core.class.TxUpdateDoc)) { + continue + } + const rtx = actualTx as TxUpdateDoc + if (rtx.operations.space === undefined || rtx.operations.space === rtx.objectSpace) { + continue + } + const factory = new TxFactory(tx.modifiedBy) + for (const [, attribute] of this.hierarchy.getAllAttributes(rtx.objectClass)) { + if (!this.hierarchy.isDerived(attribute.type._class, core.class.Collection)) { + continue + } + const collection = attribute.type as Collection + const allAttached = await findAll(ctx, collection.of, { attachedTo: rtx.objectId, space: rtx.objectSpace }) + const allTx = allAttached.map(({ _class, space, _id }) => + factory.createTxUpdateDoc(_class, space, _id, { space: rtx.operations.space }) + ) + result.push(...allTx) + } } return result } private async proccessDerived ( ctx: MeasureContext, - tx: Tx, - _class: Ref>, - triggerFx: Effects + txes: Tx[], + triggerFx: Effects, + findAll: ServerStorage['findAll'], + removedMap: Map, Doc> ): Promise { const fAll = (mctx: MeasureContext) => @@ -436,48 +541,62 @@ class TServerStorage implements ServerStorage { query: DocumentQuery, options?: FindOptions ): Promise> => - this.findAll(mctx, clazz, query, options) - const derived = [ - ...(await ctx.with('process-collection', { _class }, () => this.processCollection(ctx, tx))), - ...(await ctx.with('process-remove', { _class }, () => this.processRemove(ctx, tx))), - ...(await ctx.with('process-move', { _class }, () => this.processMove(ctx, tx))), - ...(await ctx.with('process-triggers', {}, (ctx) => - this.triggers.apply(tx.modifiedBy, tx, { - workspace: this.workspace, - fx: triggerFx.fx, - fulltextFx: (f) => triggerFx.fx(() => f(this.fulltextAdapter)), - storageFx: (f) => { - const adapter = this.storageAdapter - if (adapter === undefined) { - return + findAll(mctx, clazz, query, options) + + const removed = await ctx.with('process-remove', {}, () => this.processRemove(ctx, txes, findAll, removedMap)) + const collections = await ctx.with('process-collection', {}, () => + this.processCollection(ctx, txes, findAll, removedMap) + ) + const moves = await ctx.with('process-move', {}, () => this.processMove(ctx, txes, findAll)) + + const triggers = await ctx.with('process-triggers', {}, async (ctx) => { + const result: Tx[] = [] + for (const tx of txes) { + result.push( + ...(await this.triggers.apply(tx.modifiedBy, tx, { + removedMap, + workspace: this.workspace, + fx: triggerFx.fx, + fulltextFx: (f) => triggerFx.fx(() => f(this.fulltextAdapter)), + storageFx: (f) => { + const adapter = this.storageAdapter + if (adapter === undefined) { + return + } + + triggerFx.fx(() => f(adapter, this.workspace)) + }, + findAll: fAll(ctx), + modelDb: this.modelDb, + hierarchy: this.hierarchy, + txFx: async (f) => { + await f(this.getAdapter(DOMAIN_TX)) } + })) + ) + } + return result + }) - triggerFx.fx(() => f(adapter, this.workspace)) - }, - findAll: fAll(ctx), - modelDb: this.modelDb, - hierarchy: this.hierarchy, - txFx: async (f) => { - await f(this.getAdapter(DOMAIN_TX)) - } - }) - )) - ] + const derived = [...removed, ...collections, ...moves, ...triggers] - return await this.processDerivedTxes(derived, ctx, triggerFx) + return await this.processDerivedTxes(derived, ctx, triggerFx, findAll, removedMap) } - private async processDerivedTxes (derived: Tx[], ctx: MeasureContext, triggerFx: Effects): Promise { + private async processDerivedTxes ( + derived: Tx[], + ctx: MeasureContext, + triggerFx: Effects, + findAll: ServerStorage['findAll'], + removedMap: Map, Doc> + ): Promise { derived.sort((a, b) => a.modifiedOn - b.modifiedOn) - for (const tx of derived) { - await ctx.with('derived-route-tx', { _class: txClass(tx) }, (ctx) => this.routeTx(ctx, tx)) - } + await ctx.with('derived-route-tx', {}, (ctx) => this.routeTx(ctx, ...derived)) const nestedTxes: Tx[] = [] - for (const tx of derived) { - const _class = txClass(tx) - nestedTxes.push(...(await this.proccessDerived(ctx, tx, _class, triggerFx))) + if (derived.length > 0) { + nestedTxes.push(...(await this.proccessDerived(ctx, derived, triggerFx, findAll, removedMap))) } const res = [...derived, ...nestedTxes] @@ -490,7 +609,8 @@ class TServerStorage implements ServerStorage { */ async verifyApplyIf ( ctx: MeasureContext, - applyIf: TxApplyIf + applyIf: TxApplyIf, + findAll: ServerStorage['findAll'] ): Promise<{ onEnd: () => void passed: boolean @@ -510,7 +630,7 @@ class TServerStorage implements ServerStorage { ) let passed = true for (const { _class, query } of applyIf.match) { - const res = await this.findAll(ctx, _class, query, { limit: 1 }) + const res = await findAll(ctx, _class, query, { limit: 1 }) if (res.length === 0) { passed = false break @@ -522,6 +642,7 @@ class TServerStorage implements ServerStorage { async tx (ctx: MeasureContext, tx: Tx): Promise<[TxResult, Tx[]]> { // store tx const _class = txClass(tx) + const cacheFind = createCacheFindAll(this) const objClass = txObjectClass(tx) return await ctx.with('tx', { _class, objClass }, async (ctx) => { if (tx.space !== core.space.DerivedTx && !this.hierarchy.isDerived(tx._class, core.class.TxApplyIf)) { @@ -546,23 +667,21 @@ class TServerStorage implements ServerStorage { const applyIf = tx as TxApplyIf // Wait for scope promise if found let passed: boolean - ;({ passed, onEnd } = await this.verifyApplyIf(ctx, applyIf)) + ;({ passed, onEnd } = await this.verifyApplyIf(ctx, applyIf, cacheFind)) result = passed if (passed) { // Store apply if transaction's await ctx.with('domain-tx', { _class, objClass }, async () => { const atx = await this.getAdapter(DOMAIN_TX) - for (const ctx of applyIf.txes) { - await atx.tx(ctx) - } + await atx.tx(...applyIf.txes) }) - derived = await this.processDerivedTxes(applyIf.txes, ctx, triggerFx) + derived = await this.processDerivedTxes(applyIf.txes, ctx, triggerFx, cacheFind, new Map, Doc>()) } } else { // store object result = await ctx.with('route-tx', { _class, objClass }, (ctx) => this.routeTx(ctx, tx)) // invoke triggers and store derived objects - derived = await this.proccessDerived(ctx, tx, _class, triggerFx) + derived = await this.proccessDerived(ctx, [tx], triggerFx, cacheFind, new Map, Doc>()) } // index object diff --git a/server/core/src/types.ts b/server/core/src/types.ts index d302f607b1..29e605ca3e 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -105,6 +105,7 @@ export interface TriggerControl { findAll: Storage['findAll'] hierarchy: Hierarchy modelDb: ModelDb + removedMap: Map, Doc> fulltextFx: (f: (adapter: FullTextAdapter) => Promise) => void // Since we don't have other storages let's consider adapter is MinioClient diff --git a/server/core/src/utils.ts b/server/core/src/utils.ts new file mode 100644 index 0000000000..fe66544be9 --- /dev/null +++ b/server/core/src/utils.ts @@ -0,0 +1,34 @@ +import { + Class, + Doc, + DocumentQuery, + FindOptions, + FindResult, + MeasureContext, + Ref, + ServerStorage +} from '@hcengineering/core' + +/** + * @public + */ +export function createCacheFindAll (storage: ServerStorage): ServerStorage['findAll'] { + // We will cache all queries for same objects for all derived data checks. + const queryCache = new Map>() + + return async ( + ctx: MeasureContext, + clazz: Ref>, + query: DocumentQuery, + options?: FindOptions + ): Promise> => { + const key = JSON.stringify(clazz) + JSON.stringify(query) + JSON.stringify(options) + let cacheResult = queryCache.get(key) + if (cacheResult !== undefined) { + return cacheResult as FindResult + } + cacheResult = await storage.findAll(ctx, clazz, query, options) + queryCache.set(key, cacheResult) + return cacheResult as FindResult + } +} diff --git a/server/elastic/src/backup.ts b/server/elastic/src/backup.ts index 9ab9386b9f..d8f5656bb8 100644 --- a/server/elastic/src/backup.ts +++ b/server/elastic/src/backup.ts @@ -47,7 +47,7 @@ class ElasticDataAdapter implements DbAdapter { return Object.assign([], { total: 0 }) } - async tx (tx: Tx): Promise { + async tx (...tx: Tx[]): Promise { return {} } diff --git a/server/mongo/src/storage.ts b/server/mongo/src/storage.ts index ee4e5b76c3..cea75aaf60 100644 --- a/server/mongo/src/storage.ts +++ b/server/mongo/src/storage.ts @@ -14,6 +14,7 @@ // import core, { + AttachedDoc, Class, Doc, DocumentQuery, @@ -38,6 +39,7 @@ import core, { StorageIterator, toFindResult, Tx, + TxCollectionCUD, TxCreateDoc, TxMixin, TxProcessor, @@ -48,7 +50,7 @@ import core, { WorkspaceId } from '@hcengineering/core' import type { DbAdapter, TxAdapter } from '@hcengineering/server-core' -import { Collection, Db, Document, Filter, MongoClient, Sort, UpdateFilter } from 'mongodb' +import { AnyBulkWriteOperation, Collection, Db, Document, Filter, MongoClient, Sort, UpdateFilter } from 'mongodb' import { createHash } from 'node:crypto' import { getMongoClient, getWorkspaceDB } from './utils' @@ -80,18 +82,20 @@ interface LookupStep { pipeline?: any } -abstract class MongoAdapterBase extends TxProcessor { +abstract class MongoAdapterBase implements DbAdapter { constructor ( protected readonly db: Db, protected readonly hierarchy: Hierarchy, protected readonly modelDb: ModelDb, protected readonly client: MongoClient - ) { - super() - } + ) {} async init (): Promise {} + async tx (...tx: Tx[]): Promise { + return {} + } + async close (): Promise { await this.client.close() } @@ -105,14 +109,7 @@ abstract class MongoAdapterBase extends TxProcessor { if (value !== null && typeof value === 'object') { const keys = Object.keys(value) if (keys[0] === '$like') { - const pattern = value.$like as string - translated[tkey] = { - $regex: `^${pattern - .split('%') - .map((it) => escapeLikeForRegexp(it)) - .join('.*')}$`, - $options: 'i' - } + translated[tkey] = translateLikeQuery(value.$like as string) continue } } @@ -130,7 +127,6 @@ abstract class MongoAdapterBase extends TxProcessor { // Add an mixin to be exists flag translated[clazz] = { $exists: true } } - // return Object.assign({}, query, { _class: { $in: classes } }) return translated } @@ -522,9 +518,6 @@ abstract class MongoAdapterBase extends TxProcessor { } })) ) - - // await coll.deleteMany({ _id: { $in: keys } }) - // await coll.insertMany(Array.from(docMap.values()) as Document[]) } } @@ -555,75 +548,173 @@ abstract class MongoAdapterBase extends TxProcessor { } } +interface DomainOperation { + raw: () => Promise + domain: Domain + bulk?: AnyBulkWriteOperation[] +} + class MongoAdapter extends MongoAdapterBase { - protected override async txRemoveDoc (tx: TxRemoveDoc): Promise { - const domain = this.hierarchy.getDomain(tx.objectClass) - await this.db.collection(domain).deleteOne({ _id: tx.objectId }) - return {} + getOperations (tx: Tx): DomainOperation | undefined { + switch (tx._class) { + case core.class.TxCreateDoc: + return this.txCreateDoc(tx as TxCreateDoc) + case core.class.TxCollectionCUD: + return this.txCollectionCUD(tx as TxCollectionCUD) + case core.class.TxUpdateDoc: + return this.txUpdateDoc(tx as TxUpdateDoc) + case core.class.TxRemoveDoc: + return this.txRemoveDoc(tx as TxRemoveDoc) + case core.class.TxMixin: + return this.txMixin(tx as TxMixin) + case core.class.TxApplyIf: + return undefined + } + + console.error('Unknown/Unsupported operation:', tx._class, tx) } - protected async txMixin (tx: TxMixin): Promise { + async tx (...txes: Tx[]): Promise { + const result: TxResult[] = [] + + const bulkOperations: DomainOperation[] = [] + + let lastDomain: Domain | undefined + + const bulkExecute = async (): Promise => { + if (lastDomain === undefined || bulkOperations.length === 0) { + return + } + try { + await this.db + .collection(lastDomain) + .bulkWrite(bulkOperations.reduce((ops, op) => ops.concat(...(op.bulk ?? [])), [])) + } catch (err: any) { + console.trace(err) + throw err + } + bulkOperations.splice(0, bulkOperations.length) + lastDomain = undefined + } + + if (txes.length > 1) { + for (const tx of txes) { + const dop: DomainOperation | undefined = this.getOperations(tx) + if (dop === undefined) { + continue + } + if (dop.bulk === undefined) { + // Execute previous bulk and capture result. + await bulkExecute() + try { + result.push(await dop.raw()) + } catch (err: any) { + console.error(err) + } + continue + } + if (lastDomain === undefined) { + lastDomain = dop.domain + } + bulkOperations.push(dop) + } + await bulkExecute() + } else { + return (await this.getOperations(txes[0])?.raw()) ?? {} + } + if (result.length === 0) { + return {} + } + if (result.length === 1) { + return result[0] + } + return result + } + + protected txCollectionCUD (tx: TxCollectionCUD): DomainOperation { + // We need update only create transactions to contain attached, attachedToClass. + if (tx.tx._class === core.class.TxCreateDoc) { + const createTx = tx.tx as TxCreateDoc + const d: TxCreateDoc = { + ...createTx, + attributes: { + ...createTx.attributes, + attachedTo: tx.objectId, + attachedToClass: tx.objectClass, + collection: tx.collection + } + } + return this.txCreateDoc(d) + } + // We could cast since we know collection cud is supported. + return this.getOperations(tx.tx) as DomainOperation + } + + protected txRemoveDoc (tx: TxRemoveDoc): DomainOperation { + const domain = this.hierarchy.getDomain(tx.objectClass) + return { + raw: () => this.db.collection(domain).deleteOne({ _id: tx.objectId }), + domain, + bulk: [{ deleteOne: { filter: { _id: tx.objectId } } }] + } + } + + protected txMixin (tx: TxMixin): DomainOperation { const domain = this.hierarchy.getDomain(tx.objectClass) + const filter = { _id: tx.objectId } + const modifyOp = { + modifiedBy: tx.modifiedBy, + modifiedOn: tx.modifiedOn + } if (isOperator(tx.attributes)) { const operator = Object.keys(tx.attributes)[0] if (operator === '$move') { const keyval = (tx.attributes as any).$move const arr = tx.mixin + '.' + Object.keys(keyval)[0] const desc = keyval[arr] - const ops = [ + const ops: any = [ + { updateOne: { filter, update: { $pull: { [arr]: desc.$value } } } }, { updateOne: { - filter: { _id: tx.objectId }, - update: { - $pull: { - [arr]: desc.$value - } - } - } - }, - { - updateOne: { - filter: { _id: tx.objectId }, - update: { - $set: { - modifiedBy: tx.modifiedBy, - modifiedOn: tx.modifiedOn - }, - $push: { - [arr]: { - $each: [desc.$value], - $position: desc.$position - } - } - } + filter, + update: { $set: modifyOp, $push: { [arr]: { $each: [desc.$value], $position: desc.$position } } } } } ] - return await this.db.collection(domain).bulkWrite(ops as any) - } else { - return await this.db.collection(domain).updateOne( - { _id: tx.objectId }, + // return await this.db.collection(domain).bulkWrite(ops as any) + return { + raw: async () => await this.db.collection(domain).bulkWrite(ops), + domain, + bulk: ops + } + } + const update = { ...this.translateMixinAttrs(tx.mixin, tx.attributes), $set: { ...modifyOp } } + return { + raw: async () => await this.db.collection(domain).updateOne(filter, update), + domain, + bulk: [ { - ...this.translateMixinAttrs(tx.mixin, tx.attributes), - $set: { - modifiedBy: tx.modifiedBy, - modifiedOn: tx.modifiedOn + updateOne: { + filter, + update } } - ) + ] } - } else { - return await this.db.collection(domain).updateOne( - { _id: tx.objectId }, + } + const update = { $set: { ...this.translateMixinAttrs(tx.mixin, tx.attributes), ...modifyOp } } + return { + raw: async () => await this.db.collection(domain).updateOne(filter, update), + domain, + bulk: [ { - $set: { - ...this.translateMixinAttrs(tx.mixin, tx.attributes), - modifiedBy: tx.modifiedBy, - modifiedOn: tx.modifiedOn + updateOne: { + filter, + update } } - ) + ] } } @@ -647,14 +738,22 @@ class MongoAdapter extends MongoAdapterBase { return attrs } - protected override async txCreateDoc (tx: TxCreateDoc): Promise { + protected txCreateDoc (tx: TxCreateDoc): DomainOperation { const doc = TxProcessor.createDoc2Doc(tx) const domain = this.hierarchy.getDomain(doc._class) - await this.db.collection(domain).insertOne(translateDoc(doc)) - return {} + const tdoc = translateDoc(doc) + return { + raw: async () => await this.db.collection(domain).insertOne(tdoc), + domain, + bulk: [ + { + insertOne: { document: tdoc } + } + ] + } } - protected override async txUpdateDoc (tx: TxUpdateDoc): Promise { + protected txUpdateDoc (tx: TxUpdateDoc): DomainOperation { const domain = this.hierarchy.getDomain(tx.objectClass) if (isOperator(tx.operations)) { const operator = Object.keys(tx.operations)[0] @@ -662,7 +761,8 @@ class MongoAdapter extends MongoAdapterBase { const keyval = (tx.operations as any).$move const arr = Object.keys(keyval)[0] const desc = keyval[arr] - const ops = [ + + const ops: any = [ { updateOne: { filter: { _id: tx.objectId }, @@ -691,7 +791,11 @@ class MongoAdapter extends MongoAdapterBase { } } ] - return await this.db.collection(domain).bulkWrite(ops as any) + return { + raw: async () => await this.db.collection(domain).bulkWrite(ops), + domain, + bulk: ops + } } else if (operator === '$update') { const keyval = (tx.operations as any).$update const arr = Object.keys(keyval)[0] @@ -722,51 +826,66 @@ class MongoAdapter extends MongoAdapterBase { } } ] - return await this.db.collection(domain).bulkWrite(ops as any) + return { + raw: async () => await this.db.collection(domain).bulkWrite(ops), + domain, + bulk: ops + } } else { if (tx.retrieve === true) { - const result = await this.db.collection(domain).findOneAndUpdate( - { _id: tx.objectId }, - { - ...tx.operations, - $set: { - modifiedBy: tx.modifiedBy, - modifiedOn: tx.modifiedOn - } - } as unknown as UpdateFilter, - { returnDocument: 'after' } - ) - return { object: result.value } + const raw = async (): Promise => { + const result = await this.db.collection(domain).findOneAndUpdate( + { _id: tx.objectId }, + { + ...tx.operations, + $set: { + modifiedBy: tx.modifiedBy, + modifiedOn: tx.modifiedOn + } + } as unknown as UpdateFilter, + { returnDocument: 'after' } + ) + return { object: result.value } + } + return { + raw, + domain, + bulk: undefined + } } else { - return await this.db.collection(domain).updateOne( - { _id: tx.objectId }, - { - ...tx.operations, - $set: { - modifiedBy: tx.modifiedBy, - modifiedOn: tx.modifiedOn - } + const filter = { _id: tx.objectId } + const update = { + ...tx.operations, + $set: { + modifiedBy: tx.modifiedBy, + modifiedOn: tx.modifiedOn } - ) + } + return { + raw: async () => await this.db.collection(domain).updateOne(filter, update), + domain, + bulk: [{ updateOne: { filter, update } }] + } } } } else { - if (tx.retrieve === true) { - const result = await this.db.collection(domain).findOneAndUpdate( - { _id: tx.objectId }, - { - $set: { ...tx.operations, modifiedBy: tx.modifiedBy, modifiedOn: tx.modifiedOn } - } as unknown as UpdateFilter, - { returnDocument: 'after' } - ) - return { object: result.value } - } else { - return await this.db - .collection(domain) - .updateOne( - { _id: tx.objectId }, - { $set: { ...tx.operations, modifiedBy: tx.modifiedBy, modifiedOn: tx.modifiedOn } } - ) + const filter = { _id: tx.objectId } + const update = { $set: { ...tx.operations, modifiedBy: tx.modifiedBy, modifiedOn: tx.modifiedOn } } + const raw = + tx.retrieve === true + ? async (): Promise => { + const result = await this.db + .collection(domain) + .findOneAndUpdate(filter, update, { returnDocument: 'after' }) + return { object: result.value } + } + : async () => await this.db.collection(domain).updateOne(filter, update) + + // Disable bulk for operators + return { + raw, + domain, + bulk: [{ updateOne: { filter, update } }] } } } @@ -774,27 +893,9 @@ class MongoAdapter extends MongoAdapterBase { class MongoTxAdapter extends MongoAdapterBase implements TxAdapter { txColl: Collection | undefined - protected txCreateDoc (tx: TxCreateDoc): Promise { - throw new Error('Method not implemented.') - } - protected txUpdateDoc (tx: TxUpdateDoc): Promise { - throw new Error('Method not implemented.') - } - - protected txRemoveDoc (tx: TxRemoveDoc): Promise { - throw new Error('Method not implemented.') - } - - protected txMixin (tx: TxMixin): Promise { - throw new Error('Method not implemented.') - } - - override async tx (tx: Tx, user: string): Promise - override async tx (tx: Tx): Promise - - override async tx (tx: Tx, user?: string): Promise { - await this.txCollection().insertOne(translateDoc(tx)) + override async tx (...tx: Tx[]): Promise { + await this.txCollection().insertMany(tx.map((it) => translateDoc(it))) return {} } @@ -822,6 +923,16 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter { } } +function translateLikeQuery (pattern: string): { $regex: string, $options: string } { + return { + $regex: `^${pattern + .split('%') + .map((it) => escapeLikeForRegexp(it)) + .join('.*')}$`, + $options: 'i' + } +} + /** * @public */ diff --git a/server/server/src/minio.ts b/server/server/src/minio.ts index 1a232586fc..9390537d49 100644 --- a/server/server/src/minio.ts +++ b/server/server/src/minio.ts @@ -45,7 +45,7 @@ class MinioBlobAdapter implements DbAdapter { return Object.assign([], { total: 0 }) } - async tx (tx: Tx): Promise { + async tx (...tx: Tx[]): Promise { return {} } diff --git a/server/tool/src/index.ts b/server/tool/src/index.ts index 687f998c0e..8581fe8152 100644 --- a/server/tool/src/index.ts +++ b/server/tool/src/index.ts @@ -89,7 +89,7 @@ export async function initModel ( transactorUrl: string, workspaceId: WorkspaceId, rawTxes: Tx[], - migrateOperations: MigrateOperation[] + migrateOperations: [string, MigrateOperation][] ): Promise { const { mongodbUri, minio, txes } = prepareTools(rawTxes) if (txes.some((tx) => tx.objectSpace !== core.space.Model)) { @@ -115,7 +115,8 @@ export async function initModel ( })) as unknown as CoreClient & BackupClient try { for (const op of migrateOperations) { - await op.upgrade(connection) + console.log('Migrage', op[0]) + await op[1].upgrade(connection) } } catch (e) { console.log(e) @@ -142,7 +143,7 @@ export async function upgradeModel ( transactorUrl: string, workspaceId: WorkspaceId, rawTxes: Tx[], - migrateOperations: MigrateOperation[] + migrateOperations: [string, MigrateOperation][] ): Promise { const { mongodbUri, txes } = prepareTools(rawTxes) @@ -171,7 +172,8 @@ export async function upgradeModel ( const migrateClient = new MigrateClientImpl(db) for (const op of migrateOperations) { - await op.migrate(migrateClient) + console.log('migrate:', op[0]) + await op[1].migrate(migrateClient) } console.log('Apply upgrade operations') @@ -182,7 +184,8 @@ export async function upgradeModel ( await createUpdateIndexes(connection, db) for (const op of migrateOperations) { - await op.upgrade(connection) + console.log('upgrade:', op[0]) + await op[1].upgrade(connection) } await connection.close()