Low level bulk operations (#2592)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2023-02-07 11:46:28 +07:00 committed by GitHub
parent 59c53a09f2
commit 9fef46b059
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 684 additions and 405 deletions

View File

@ -28,7 +28,8 @@ export default async () => {
if (client === undefined) { if (client === undefined) {
client = await createClient(connect) client = await createClient(connect)
for (const op of migrateOperations) { for (const op of migrateOperations) {
await op.upgrade(client) console.log('Migrate', op[0])
await op[1].upgrade(client)
} }
} }
// Check if we had dev hook for client. // Check if we had dev hook for client.

View File

@ -44,8 +44,18 @@ class InMemoryTxAdapter extends DummyDbAdapter implements TxAdapter {
return await this.txdb.findAll(_class, query, options) return await this.txdb.findAll(_class, query, options)
} }
tx (tx: Tx): Promise<TxResult> { async tx (...tx: Tx[]): Promise<TxResult> {
return this.txdb.tx(tx) const r: TxResult[] = []
for (const t of tx) {
r.push(await this.txdb.tx(t))
}
if (r.length === 1) {
return r[0]
}
if (r.length === 0) {
return {}
}
return r
} }
async init (model: Tx[]): Promise<void> { async init (model: Tx[]): Promise<void> {

View File

@ -63,7 +63,7 @@ function prepareTools (): {
minio: MinioService minio: MinioService
txes: Tx[] txes: Tx[]
version: Data<Version> version: Data<Version>
migrateOperations: MigrateOperation[] migrateOperations: [string, MigrateOperation][]
} { } {
return { ...prepareToolsRaw(builder.getTxes()), version, migrateOperations } return { ...prepareToolsRaw(builder.getTxes()), version, migrateOperations }
} }

View File

@ -61,7 +61,7 @@ export function devTool (
minio: MinioService minio: MinioService
txes: Tx[] txes: Tx[]
version: Data<Version> version: Data<Version>
migrateOperations: MigrateOperation[] migrateOperations: [string, MigrateOperation][]
}, },
productId: string productId: string
): void { ): void {

View File

@ -105,7 +105,7 @@ export async function restoreWorkspace (
elasticUrl: string, elasticUrl: string,
transactorUrl: string, transactorUrl: string,
rawTxes: Tx[], rawTxes: Tx[],
migrateOperations: MigrateOperation[] migrateOperations: [string, MigrateOperation][]
): Promise<void> { ): Promise<void> {
console.log('Restoring workspace', mongoUrl, workspaceId, fileName) console.log('Restoring workspace', mongoUrl, workspaceId, fileName)
const client = new MongoClient(mongoUrl) const client = new MongoClient(mongoUrl)

View File

@ -38,27 +38,27 @@ import { hrOperation } from '@hcengineering/model-hr'
import { documentOperation } from '@hcengineering/model-document' import { documentOperation } from '@hcengineering/model-document'
import { bitrixOperation } from '@hcengineering/model-bitrix' import { bitrixOperation } from '@hcengineering/model-bitrix'
export const migrateOperations: MigrateOperation[] = [ export const migrateOperations: [string, MigrateOperation][] = [
coreOperation, ['core', coreOperation],
chunterOperation, ['chunter', chunterOperation],
demoOperation, ['demo', demoOperation],
gmailOperation, ['gmail', gmailOperation],
templatesOperation, ['templates', templatesOperation],
telegramOperation, ['telegram', telegramOperation],
taskOperation, ['task', taskOperation],
attachmentOperation, ['attachment', attachmentOperation],
automationOperation, ['', automationOperation],
leadOperation, ['lead', leadOperation],
recruitOperation, ['recruit', recruitOperation],
viewOperation, ['view', viewOperation],
contactOperation, ['contact', contactOperation],
tagsOperation, ['tags', tagsOperation],
notificationOperation, ['notification', notificationOperation],
settingOperation, ['setting', settingOperation],
trackerOperation, ['tracker', trackerOperation],
boardOperation, ['board', boardOperation],
hrOperation, ['hr', hrOperation],
documentOperation, ['document', documentOperation],
bitrixOperation, ['bitrix', bitrixOperation],
inventoryOperation ['inventiry', inventoryOperation]
] ]

View File

@ -44,36 +44,47 @@ async function createSpace (tx: TxOperations): Promise<void> {
} }
async function setCreate (client: MigrationClient): Promise<void> { async function setCreate (client: MigrationClient): Promise<void> {
const docs = await client.find<Contact>(DOMAIN_CONTACT, { while (true) {
_class: { $in: [contact.class.Contact, contact.class.Organization, contact.class.Person, contact.class.Employee] }, const docs = await client.find<Contact>(
createOn: { $exists: false } DOMAIN_CONTACT,
}) {
for (const doc of docs) { _class: {
const tx = ( $in: [contact.class.Contact, contact.class.Organization, contact.class.Person, contact.class.Employee]
await client.find<TxCreateDoc<Contact>>(DOMAIN_TX, {
objectId: doc._id,
_class: core.class.TxCreateDoc
})
)[0]
if (tx !== undefined) {
await client.update(
DOMAIN_CONTACT,
{
_id: doc._id
}, },
{ createOn: { $exists: false }
createOn: tx.modifiedOn },
} { limit: 500 }
) )
await client.update( if (docs.length === 0) {
DOMAIN_TX, break
{ }
_id: tx._id console.log('processing createOn migration', docs.length)
}, const creates = await client.find<TxCreateDoc<Contact>>(DOMAIN_TX, {
{ objectId: { $in: docs.map((it) => it._id) },
'attributes.createOn': tx.modifiedOn _class: core.class.TxCreateDoc
} })
) for (const doc of docs) {
const tx = creates.find((it) => it.objectId === doc._id)
if (tx !== undefined) {
await client.update(
DOMAIN_CONTACT,
{
_id: doc._id
},
{
createOn: tx.modifiedOn
}
)
await client.update(
DOMAIN_TX,
{
_id: tx._id
},
{
'attributes.createOn': tx.modifiedOn
}
)
}
} }
} }
} }

View File

@ -264,30 +264,44 @@ export const DOMAIN_TX = 'tx' as Domain
* @public * @public
*/ */
export interface WithTx { export interface WithTx {
tx: (tx: Tx) => Promise<TxResult> tx: (...txs: Tx[]) => Promise<TxResult>
} }
/** /**
* @public * @public
*/ */
export abstract class TxProcessor implements WithTx { export abstract class TxProcessor implements WithTx {
async tx (tx: Tx): Promise<TxResult> { async tx (...txes: Tx[]): Promise<TxResult> {
switch (tx._class) { const result: TxResult[] = []
case core.class.TxCreateDoc: for (const tx of txes) {
return await this.txCreateDoc(tx as TxCreateDoc<Doc>) switch (tx._class) {
case core.class.TxCollectionCUD: case core.class.TxCreateDoc:
return await this.txCollectionCUD(tx as TxCollectionCUD<Doc, AttachedDoc>) result.push(await this.txCreateDoc(tx as TxCreateDoc<Doc>))
case core.class.TxUpdateDoc: break
return await this.txUpdateDoc(tx as TxUpdateDoc<Doc>) case core.class.TxCollectionCUD:
case core.class.TxRemoveDoc: result.push(await this.txCollectionCUD(tx as TxCollectionCUD<Doc, AttachedDoc>))
return await this.txRemoveDoc(tx as TxRemoveDoc<Doc>) break
case core.class.TxMixin: case core.class.TxUpdateDoc:
return await this.txMixin(tx as TxMixin<Doc, Doc>) result.push(await this.txUpdateDoc(tx as TxUpdateDoc<Doc>))
case core.class.TxApplyIf: break
// Apply if processed on server case core.class.TxRemoveDoc:
return await Promise.resolve({}) result.push(await this.txRemoveDoc(tx as TxRemoveDoc<Doc>))
break
case core.class.TxMixin:
result.push(await this.txMixin(tx as TxMixin<Doc, Doc>))
break
case core.class.TxApplyIf:
// Apply if processed on server
return await Promise.resolve({})
}
} }
throw new Error('TxProcessor: unhandled transaction class: ' + tx._class) if (result.length === 0) {
return {}
}
if (result.length === 1) {
return result[0]
}
return result
} }
static createDoc2Doc<T extends Doc>(tx: TxCreateDoc<T>): T { static createDoc2Doc<T extends Doc>(tx: TxCreateDoc<T>): T {

View File

@ -70,7 +70,9 @@
$: if (_id && _class) { $: if (_id && _class) {
query.query(_class, { _id }, (result) => { query.query(_class, { _id }, (result) => {
object = result[0] object = result[0]
realObjectClass = object._class if (object != null) {
realObjectClass = object._class
}
}) })
} else { } else {
query.unsubscribe() query.unsubscribe()

View File

@ -14,41 +14,18 @@
// limitations under the License. // limitations under the License.
// //
import type { Doc, Ref, Tx, TxCollectionCUD, TxCreateDoc, TxRemoveDoc } from '@hcengineering/core'
import type { TriggerControl } from '@hcengineering/server-core'
import type { Attachment } from '@hcengineering/attachment' import type { Attachment } from '@hcengineering/attachment'
import attachment from '@hcengineering/attachment' import attachment from '@hcengineering/attachment'
import type { Doc, Ref, Tx, TxRemoveDoc } from '@hcengineering/core'
import core, { TxProcessor } from '@hcengineering/core' import core, { TxProcessor } from '@hcengineering/core'
import type { TriggerControl } from '@hcengineering/server-core'
const findCreateTx = async (
id: Ref<Attachment>,
findAll: TriggerControl['findAll']
): Promise<TxCreateDoc<Attachment> | undefined> => {
const createTx = (await findAll<TxCreateDoc<Attachment>>(core.class.TxCreateDoc, { objectId: id }))[0]
if (createTx !== undefined) {
return createTx
}
const colTx = (
await findAll<TxCollectionCUD<Doc, Attachment>>(core.class.TxCollectionCUD, {
'tx._class': core.class.TxCreateDoc,
'tx.objectClass': attachment.class.Attachment,
'tx.objectId': id
})
)[0]
if (colTx === undefined) return
return colTx.tx as TxCreateDoc<Attachment>
}
/** /**
* @public * @public
*/ */
export async function OnAttachmentDelete ( export async function OnAttachmentDelete (
tx: Tx, tx: Tx,
{ findAll, hierarchy, fulltextFx, storageFx }: TriggerControl { findAll, hierarchy, fulltextFx, storageFx, removedMap }: TriggerControl
): Promise<Tx[]> { ): Promise<Tx[]> {
const actualTx = TxProcessor.extractTx(tx) const actualTx = TxProcessor.extractTx(tx)
if (actualTx._class !== core.class.TxRemoveDoc) { if (actualTx._class !== core.class.TxRemoveDoc) {
@ -61,14 +38,12 @@ export async function OnAttachmentDelete (
return [] return []
} }
const createTx = await findCreateTx(rmTx.objectId, findAll) // Obtain document being deleted.
const attach = removedMap.get(rmTx.objectId) as Attachment
if (createTx === undefined) { if (attach === undefined) {
return [] return []
} }
const attach = TxProcessor.createDoc2Doc(createTx)
fulltextFx(async (adapter) => { fulltextFx(async (adapter) => {
await adapter.remove([attach.file as Ref<Doc>]) await adapter.remove([attach.file as Ref<Doc>])
}) })

View File

@ -15,7 +15,7 @@
// //
import contact, { Contact, contactId, formatName, Organization, Person } from '@hcengineering/contact' import contact, { Contact, contactId, formatName, Organization, Person } from '@hcengineering/contact'
import core, { concatLink, Doc, Tx, TxCreateDoc, TxRemoveDoc, TxUpdateDoc } from '@hcengineering/core' import core, { concatLink, Doc, Tx, TxRemoveDoc } from '@hcengineering/core'
import login from '@hcengineering/login' import login from '@hcengineering/login'
import { getMetadata } from '@hcengineering/platform' import { getMetadata } from '@hcengineering/platform'
import type { TriggerControl } from '@hcengineering/server-core' import type { TriggerControl } from '@hcengineering/server-core'
@ -25,7 +25,10 @@ import { workbenchId } from '@hcengineering/workbench'
/** /**
* @public * @public
*/ */
export async function OnContactDelete (tx: Tx, { findAll, hierarchy, storageFx }: TriggerControl): Promise<Tx[]> { export async function OnContactDelete (
tx: Tx,
{ findAll, hierarchy, storageFx, removedMap }: TriggerControl
): Promise<Tx[]> {
if (tx._class !== core.class.TxRemoveDoc) { if (tx._class !== core.class.TxRemoveDoc) {
return [] return []
} }
@ -36,15 +39,12 @@ export async function OnContactDelete (tx: Tx, { findAll, hierarchy, storageFx }
return [] return []
} }
const createTx = (await findAll<TxCreateDoc<Contact>>(core.class.TxCreateDoc, { objectId: rmTx.objectId }))[0] const removeContact = removedMap.get(rmTx.objectId) as Contact
if (createTx === undefined) { if (removeContact === undefined) {
return [] return []
} }
const updateTxes = await findAll<TxUpdateDoc<Contact>>(core.class.TxUpdateDoc, { objectId: rmTx.objectId }) const avatar: string | undefined = [removeContact.avatar].filter((x): x is string => x !== undefined).slice(-1)[0]
const avatar: string | undefined = [createTx.attributes.avatar, ...updateTxes.map((x) => x.operations.avatar)]
.filter((x): x is string => x !== undefined)
.slice(-1)[0]
if (avatar === undefined) { if (avatar === undefined) {
return [] return []

View File

@ -64,16 +64,14 @@ export async function onTagReference (tx: Tx, control: TriggerControl): Promise<
} }
if (isRemove) { if (isRemove) {
const ctx = actualTx as TxRemoveDoc<TagReference> const ctx = actualTx as TxRemoveDoc<TagReference>
const createTx = ( const doc = control.removedMap.get(ctx.objectId) as TagReference
await control.findAll(core.class.TxCollectionCUD, { 'tx.objectId': ctx.objectId }, { limit: 1 }) if (doc !== undefined) {
)[0] if (!control.removedMap.has(doc.tag)) {
if (createTx !== undefined) { const res = control.txFactory.createTxUpdateDoc(tags.class.TagElement, tags.space.Tags, doc.tag, {
const actualCreateTx = TxProcessor.extractTx(createTx) $inc: { refCount: -1 }
const doc = TxProcessor.createDoc2Doc(actualCreateTx as TxCreateDoc<TagReference>) })
const res = control.txFactory.createTxUpdateDoc(tags.class.TagElement, tags.space.Tags, doc.tag, { return [res]
$inc: { refCount: -1 } }
})
return [res]
} }
} }
return [] return []

View File

@ -393,7 +393,7 @@ export async function listAccounts (db: Db): Promise<Account[]> {
export async function createWorkspace ( export async function createWorkspace (
version: Data<Version>, version: Data<Version>,
txes: Tx[], txes: Tx[],
migrationOperation: MigrateOperation[], migrationOperation: [string, MigrateOperation][],
db: Db, db: Db,
productId: string, productId: string,
workspace: string, workspace: string,
@ -421,7 +421,7 @@ export async function createWorkspace (
export async function upgradeWorkspace ( export async function upgradeWorkspace (
version: Data<Version>, version: Data<Version>,
txes: Tx[], txes: Tx[],
migrationOperation: MigrateOperation[], migrationOperation: [string, MigrateOperation][],
productId: string, productId: string,
db: Db, db: Db,
workspace: string workspace: string
@ -449,7 +449,7 @@ export async function upgradeWorkspace (
* @public * @public
*/ */
export const createUserWorkspace = export const createUserWorkspace =
(version: Data<Version>, txes: Tx[], migrationOperation: MigrateOperation[]) => (version: Data<Version>, txes: Tx[], migrationOperation: [string, MigrateOperation][]) =>
async (db: Db, productId: string, token: string, workspace: string): Promise<LoginInfo> => { async (db: Db, productId: string, token: string, workspace: string): Promise<LoginInfo> => {
const { email } = decodeToken(token) const { email } = decodeToken(token)
await createWorkspace(version, txes, migrationOperation, db, productId, workspace, '') await createWorkspace(version, txes, migrationOperation, db, productId, workspace, '')
@ -925,7 +925,7 @@ function wrap (f: (db: Db, productId: string, ...args: any[]) => Promise<any>):
export function getMethods ( export function getMethods (
version: Data<Version>, version: Data<Version>,
txes: Tx[], txes: Tx[],
migrateOperations: MigrateOperation[] migrateOperations: [string, MigrateOperation][]
): Record<string, AccountMethod> { ): Record<string, AccountMethod> {
return { return {
login: wrap(login), login: wrap(login),

View File

@ -46,7 +46,7 @@ export interface DbAdapter {
query: DocumentQuery<T>, query: DocumentQuery<T>,
options?: FindOptions<T> options?: FindOptions<T>
) => Promise<FindResult<T>> ) => Promise<FindResult<T>>
tx: (tx: Tx) => Promise<TxResult> tx: (...tx: Tx[]) => Promise<TxResult>
find: (domain: Domain) => StorageIterator find: (domain: Domain) => StorageIterator
@ -97,7 +97,7 @@ export class DummyDbAdapter implements DbAdapter {
return toFindResult([]) return toFindResult([])
} }
async tx (tx: Tx): Promise<TxResult> { async tx (...tx: Tx[]): Promise<TxResult> {
return {} return {}
} }
@ -137,8 +137,8 @@ class InMemoryAdapter extends DummyDbAdapter implements DbAdapter {
return await this.modeldb.findAll(_class, query, options) return await this.modeldb.findAll(_class, query, options)
} }
async tx (tx: Tx): Promise<TxResult> { async tx (...tx: Tx[]): Promise<TxResult> {
return await this.modeldb.tx(tx) return await this.modeldb.tx(...tx)
} }
async init (model: Tx[]): Promise<void> { async init (model: Tx[]): Promise<void> {

View File

@ -39,10 +39,8 @@ import core, {
Tx, Tx,
TxApplyIf, TxApplyIf,
TxCollectionCUD, TxCollectionCUD,
TxCreateDoc,
TxCUD, TxCUD,
TxFactory, TxFactory,
TxMixin,
TxProcessor, TxProcessor,
TxRemoveDoc, TxRemoveDoc,
TxResult, TxResult,
@ -64,6 +62,7 @@ import type {
FullTextAdapterFactory, FullTextAdapterFactory,
ObjectDDParticipant ObjectDDParticipant
} from './types' } from './types'
import { createCacheFindAll } from './utils'
/** /**
* @public * @public
@ -138,16 +137,50 @@ class TServerStorage implements ServerStorage {
return adapter return adapter
} }
private async routeTx (ctx: MeasureContext, tx: Tx): Promise<TxResult> { private async routeTx (ctx: MeasureContext, ...txes: Tx[]): Promise<TxResult> {
if (this.hierarchy.isDerived(tx._class, core.class.TxCUD)) { let part: TxCUD<Doc>[] = []
const txCUD = tx as TxCUD<Doc> let lastDomain: Domain | undefined
const result: TxResult[] = []
const processPart = async (): Promise<void> => {
if (part.length > 0) {
const adapter = this.getAdapter(lastDomain as Domain)
const r = await adapter.tx(...part)
if (Array.isArray(r)) {
result.push(...r)
} else {
result.push(r)
}
part = []
}
}
for (const tx of txes) {
const txCUD = TxProcessor.extractTx(tx) as TxCUD<Doc>
if (!this.hierarchy.isDerived(txCUD._class, core.class.TxCUD)) {
// Skip unsupported tx
console.error('Unsupported transacton', tx)
continue
}
const domain = this.hierarchy.getDomain(txCUD.objectClass) const domain = this.hierarchy.getDomain(txCUD.objectClass)
const adapter = this.getAdapter(domain) if (part.length > 0) {
const res = await adapter.tx(txCUD) if (lastDomain !== domain) {
return res await processPart()
} else { }
lastDomain = domain
part.push(txCUD)
} else {
lastDomain = domain
part.push(txCUD)
}
}
await processPart()
if (result.length === 1) {
return result[0]
}
if (result.length === 0) {
return [{}, false] return [{}, false]
} }
return result
} }
private async getCollectionUpdateTx<D extends Doc>( private async getCollectionUpdateTx<D extends Doc>(
@ -174,7 +207,7 @@ class TServerStorage implements ServerStorage {
} }
} }
private async updateCollection (ctx: MeasureContext, tx: Tx): Promise<Tx[]> { private async updateCollection (ctx: MeasureContext, tx: Tx, findAll: ServerStorage['findAll']): Promise<Tx[]> {
if (tx._class !== core.class.TxCollectionCUD) { if (tx._class !== core.class.TxCollectionCUD) {
return [] return []
} }
@ -195,7 +228,7 @@ class TServerStorage implements ServerStorage {
return [] return []
} }
const oldAttachedTo = (await this.findAll(ctx, _class, { _id }, { limit: 1 }))[0] const oldAttachedTo = (await findAll(ctx, _class, { _id }, { limit: 1 }))[0]
let oldTx: Tx | null = null let oldTx: Tx | null = null
if (oldAttachedTo !== undefined) { if (oldAttachedTo !== undefined) {
const attr = this.hierarchy.getAttribute(oldAttachedTo._class, colTx.collection) const attr = this.hierarchy.getAttribute(oldAttachedTo._class, colTx.collection)
@ -209,7 +242,7 @@ class TServerStorage implements ServerStorage {
const newAttachedToClass = operations.attachedToClass ?? _class const newAttachedToClass = operations.attachedToClass ?? _class
const newAttachedToCollection = operations.collection ?? colTx.collection const newAttachedToCollection = operations.collection ?? colTx.collection
const newAttachedTo = (await this.findAll(ctx, newAttachedToClass, { _id: operations.attachedTo }, { limit: 1 }))[0] const newAttachedTo = (await findAll(ctx, newAttachedToClass, { _id: operations.attachedTo }, { limit: 1 }))[0]
let newTx: Tx | null = null let newTx: Tx | null = null
const newAttr = this.hierarchy.getAttribute(newAttachedToClass, newAttachedToCollection) const newAttr = this.hierarchy.getAttribute(newAttachedToClass, newAttachedToCollection)
if (newAttachedTo !== undefined && newAttr !== undefined) { if (newAttachedTo !== undefined && newAttr !== undefined) {
@ -226,37 +259,45 @@ class TServerStorage implements ServerStorage {
return [...(oldTx !== null ? [oldTx] : []), ...(newTx !== null ? [newTx] : [])] return [...(oldTx !== null ? [oldTx] : []), ...(newTx !== null ? [newTx] : [])]
} }
private async processCollection (ctx: MeasureContext, tx: Tx): Promise<Tx[]> { private async processCollection (
if (tx._class === core.class.TxCollectionCUD) { ctx: MeasureContext,
const colTx = tx as TxCollectionCUD<Doc, AttachedDoc> txes: Tx[],
const _id = colTx.objectId findAll: ServerStorage['findAll'],
const _class = colTx.objectClass removedMap: Map<Ref<Doc>, Doc>
): Promise<Tx[]> {
const result: Tx[] = []
for (const tx of txes) {
if (tx._class === core.class.TxCollectionCUD) {
const colTx = tx as TxCollectionCUD<Doc, AttachedDoc>
const _id = colTx.objectId
const _class = colTx.objectClass
// Skip model operations // Skip model operations
if (this.hierarchy.getDomain(_class) === DOMAIN_MODEL) { if (this.hierarchy.getDomain(_class) === DOMAIN_MODEL) {
// We could not update increments for model classes // We could not update increments for model classes
return [] continue
} }
const isCreateTx = colTx.tx._class === core.class.TxCreateDoc const isCreateTx = colTx.tx._class === core.class.TxCreateDoc
const isDeleteTx = colTx.tx._class === core.class.TxRemoveDoc const isDeleteTx = colTx.tx._class === core.class.TxRemoveDoc
const isUpdateTx = colTx.tx._class === core.class.TxUpdateDoc const isUpdateTx = colTx.tx._class === core.class.TxUpdateDoc
if (isUpdateTx) { if (isUpdateTx) {
return await this.updateCollection(ctx, tx) result.push(...(await this.updateCollection(ctx, tx, findAll)))
} }
if (isCreateTx || isDeleteTx) { if ((isCreateTx || isDeleteTx) && !removedMap.has(_id)) {
const attachedTo = (await this.findAll(ctx, _class, { _id }, { limit: 1 }))[0] const attachedTo = (await findAll(ctx, _class, { _id }, { limit: 1 }))[0]
if (attachedTo !== undefined) { if (attachedTo !== undefined) {
return [ result.push(
await this.getCollectionUpdateTx(_id, _class, tx.modifiedBy, colTx.modifiedOn, attachedTo, { await this.getCollectionUpdateTx(_id, _class, tx.modifiedBy, colTx.modifiedOn, attachedTo, {
$inc: { [colTx.collection]: isCreateTx ? 1 : -1 } $inc: { [colTx.collection]: isCreateTx ? 1 : -1 }
}) })
] )
}
} }
} }
} }
return [] return result
} }
async findAll<T extends Doc>( async findAll<T extends Doc>(
@ -299,49 +340,95 @@ class TServerStorage implements ServerStorage {
) )
} }
private async buildRemovedDoc (ctx: MeasureContext, tx: TxRemoveDoc<Doc>): Promise<Doc | undefined> { private async buildRemovedDoc (ctx: MeasureContext, rawTxes: Tx[], findAll: ServerStorage['findAll']): Promise<Doc[]> {
const isAttached = this.hierarchy.isDerived(tx.objectClass, core.class.AttachedDoc) const removeObjectIds: Ref<Doc>[] = []
const txes = await this.findAll<TxCUD<Doc>>( const removeAttachObjectIds: Ref<AttachedDoc>[] = []
ctx,
isAttached ? core.class.TxCollectionCUD : core.class.TxCUD, const removeTxes = rawTxes
isAttached .filter((it) => this.hierarchy.isDerived(it._class, core.class.TxRemoveDoc))
? { 'tx.objectId': tx.objectId as Ref<AttachedDoc> } .map((it) => TxProcessor.extractTx(it) as TxRemoveDoc<Doc>)
: {
objectId: tx.objectId for (const rtx of removeTxes) {
}, const isAttached = this.hierarchy.isDerived(rtx.objectClass, core.class.AttachedDoc)
{ sort: { modifiedOn: 1 } } if (isAttached) {
) removeAttachObjectIds.push(rtx.objectId as Ref<AttachedDoc>)
const createTx = isAttached } else {
? txes.find((tx) => (tx as TxCollectionCUD<Doc, AttachedDoc>).tx._class === core.class.TxCreateDoc) removeObjectIds.push(rtx.objectId)
: txes.find((tx) => tx._class === core.class.TxCreateDoc)
if (createTx === undefined) return
let doc = TxProcessor.createDoc2Doc(createTx as TxCreateDoc<Doc>)
for (let tx of txes) {
tx = TxProcessor.extractTx(tx) as TxCUD<Doc>
if (tx._class === core.class.TxUpdateDoc) {
doc = TxProcessor.updateDoc2Doc(doc, tx as TxUpdateDoc<Doc>)
} else if (tx._class === core.class.TxMixin) {
const mixinTx = tx as TxMixin<Doc, Doc>
doc = TxProcessor.updateMixin4Doc(doc, mixinTx)
} }
} }
return doc const txes =
removeObjectIds.length > 0
? await findAll<TxCUD<Doc>>(
ctx,
core.class.TxCUD,
{
objectId: { $in: removeObjectIds }
},
{ sort: { modifiedOn: 1 } }
)
: []
const result: Doc[] = []
const txesAttach =
removeAttachObjectIds.length > 0
? await findAll<TxCollectionCUD<Doc, AttachedDoc>>(
ctx,
core.class.TxCollectionCUD,
{ 'tx.objectId': { $in: removeAttachObjectIds } },
{ sort: { modifiedOn: 1 } }
)
: []
for (const rtx of removeTxes) {
const isAttached = this.hierarchy.isDerived(rtx.objectClass, core.class.AttachedDoc)
const objTxex = isAttached
? txesAttach.filter((tx) => tx.tx.objectId === rtx.objectId)
: txes.filter((it) => it.objectId === rtx.objectId)
const doc = TxProcessor.buildDoc2Doc(objTxex)
if (doc !== undefined) {
result.push(doc)
}
}
return result
} }
private async processRemove (ctx: MeasureContext, tx: Tx): Promise<Tx[]> { private async processRemove (
const actualTx = TxProcessor.extractTx(tx) ctx: MeasureContext,
if (!this.hierarchy.isDerived(actualTx._class, core.class.TxRemoveDoc)) return [] txes: Tx[],
const rtx = actualTx as TxRemoveDoc<Doc> findAll: ServerStorage['findAll'],
removedMap: Map<Ref<Doc>, Doc>
): Promise<Tx[]> {
const result: Tx[] = [] const result: Tx[] = []
const object = await this.buildRemovedDoc(ctx, rtx)
if (object === undefined) return [] const objects = await this.buildRemovedDoc(ctx, txes, findAll)
result.push(...(await this.deleteClassCollections(ctx, object._class, rtx.objectId))) for (const obj of objects) {
const mixins = this.getMixins(object._class, object) removedMap.set(obj._id, obj)
for (const mixin of mixins) { }
result.push(...(await this.deleteClassCollections(ctx, mixin, rtx.objectId, object._class)))
for (const tx of txes) {
const actualTx = TxProcessor.extractTx(tx)
if (!this.hierarchy.isDerived(actualTx._class, core.class.TxRemoveDoc)) {
continue
}
const rtx = actualTx as TxRemoveDoc<Doc>
const object = removedMap.get(rtx.objectId)
if (object === undefined) {
continue
}
result.push(...(await this.deleteClassCollections(ctx, object._class, rtx.objectId, findAll, removedMap)))
const mixins = this.getMixins(object._class, object)
for (const mixin of mixins) {
result.push(
...(await this.deleteClassCollections(ctx, mixin, rtx.objectId, findAll, removedMap, object._class))
)
}
result.push(...(await this.deleteRelatedDocuments(ctx, object, findAll, removedMap)))
} }
result.push(...(await this.deleteRelatedDocuments(ctx, object)))
return result return result
} }
@ -349,6 +436,8 @@ class TServerStorage implements ServerStorage {
ctx: MeasureContext, ctx: MeasureContext,
_class: Ref<Class<Doc>>, _class: Ref<Class<Doc>>,
objectId: Ref<Doc>, objectId: Ref<Doc>,
findAll: ServerStorage['findAll'],
removedMap: Map<Ref<Doc>, Doc>,
to?: Ref<Class<Doc>> to?: Ref<Class<Doc>>
): Promise<Tx[]> { ): Promise<Tx[]> {
const attributes = this.hierarchy.getAllAttributes(_class, to) const attributes = this.hierarchy.getAllAttributes(_class, to)
@ -356,18 +445,18 @@ class TServerStorage implements ServerStorage {
for (const attribute of attributes) { for (const attribute of attributes) {
if (this.hierarchy.isDerived(attribute[1].type._class, core.class.Collection)) { if (this.hierarchy.isDerived(attribute[1].type._class, core.class.Collection)) {
const collection = attribute[1].type as Collection<AttachedDoc> const collection = attribute[1].type as Collection<AttachedDoc>
const allAttached = await this.findAll(ctx, collection.of, { attachedTo: objectId }) const allAttached = await findAll(ctx, collection.of, { attachedTo: objectId })
for (const attached of allAttached) { for (const attached of allAttached) {
result.push(...(await this.deleteObject(ctx, attached))) result.push(...this.deleteObject(ctx, attached, removedMap))
} }
} }
} }
return result return result
} }
private async deleteObject (ctx: MeasureContext, object: Doc): Promise<Tx[]> { private deleteObject (ctx: MeasureContext, object: Doc, removedMap: Map<Ref<Doc>, Doc>): Tx[] {
const result: Tx[] = [] const result: Tx[] = []
const factory = new TxFactory(core.account.System) const factory = new TxFactory(object.modifiedBy)
if (this.hierarchy.isDerived(object._class, core.class.AttachedDoc)) { if (this.hierarchy.isDerived(object._class, core.class.AttachedDoc)) {
const adoc = object as AttachedDoc const adoc = object as AttachedDoc
const nestedTx = factory.createTxRemoveDoc(adoc._class, adoc.space, adoc._id) const nestedTx = factory.createTxRemoveDoc(adoc._class, adoc.space, adoc._id)
@ -378,14 +467,21 @@ class TServerStorage implements ServerStorage {
adoc.collection, adoc.collection,
nestedTx nestedTx
) )
removedMap.set(adoc._id, adoc)
result.push(tx) result.push(tx)
} else { } else {
result.push(factory.createTxRemoveDoc(object._class, object.space, object._id)) result.push(factory.createTxRemoveDoc(object._class, object.space, object._id))
removedMap.set(object._id, object)
} }
return result return result
} }
private async deleteRelatedDocuments (ctx: MeasureContext, object: Doc): Promise<Tx[]> { private async deleteRelatedDocuments (
ctx: MeasureContext,
object: Doc,
findAll: ServerStorage['findAll'],
removedMap: Map<Ref<Doc>, Doc>
): Promise<Tx[]> {
const result: Tx[] = [] const result: Tx[] = []
const objectClass = this.hierarchy.getClass(object._class) const objectClass = this.hierarchy.getClass(object._class)
if (this.hierarchy.hasMixin(objectClass, serverCore.mixin.ObjectDDParticipant)) { if (this.hierarchy.hasMixin(objectClass, serverCore.mixin.ObjectDDParticipant)) {
@ -395,39 +491,48 @@ class TServerStorage implements ServerStorage {
) )
const collector = await getResource(removeParticipand.collectDocs) const collector = await getResource(removeParticipand.collectDocs)
const docs = await collector(object, this.hierarchy, async (_class, query, options) => { const docs = await collector(object, this.hierarchy, async (_class, query, options) => {
return await this.findAll(ctx, _class, query, options) return await findAll(ctx, _class, query, options)
}) })
for (const d of docs) { for (const d of docs) {
result.push(...(await this.deleteObject(ctx, d))) result.push(...this.deleteObject(ctx, d, removedMap))
} }
} }
return result return result
} }
private async processMove (ctx: MeasureContext, tx: Tx): Promise<Tx[]> { private async processMove (ctx: MeasureContext, txes: Tx[], findAll: ServerStorage['findAll']): Promise<Tx[]> {
const actualTx = TxProcessor.extractTx(tx)
if (!this.hierarchy.isDerived(actualTx._class, core.class.TxUpdateDoc)) return []
const rtx = actualTx as TxUpdateDoc<Doc>
if (rtx.operations.space === undefined || rtx.operations.space === rtx.objectSpace) return []
const result: Tx[] = [] const result: Tx[] = []
const factory = new TxFactory(core.account.System) for (const tx of txes) {
for (const [, attribute] of this.hierarchy.getAllAttributes(rtx.objectClass)) { const actualTx = TxProcessor.extractTx(tx)
if (!this.hierarchy.isDerived(attribute.type._class, core.class.Collection)) continue if (!this.hierarchy.isDerived(actualTx._class, core.class.TxUpdateDoc)) {
const collection = attribute.type as Collection<AttachedDoc> continue
const allAttached = await this.findAll(ctx, collection.of, { attachedTo: rtx.objectId, space: rtx.objectSpace }) }
const allTx = allAttached.map(({ _class, space, _id }) => const rtx = actualTx as TxUpdateDoc<Doc>
factory.createTxUpdateDoc(_class, space, _id, { space: rtx.operations.space }) if (rtx.operations.space === undefined || rtx.operations.space === rtx.objectSpace) {
) continue
result.push(...allTx) }
const factory = new TxFactory(tx.modifiedBy)
for (const [, attribute] of this.hierarchy.getAllAttributes(rtx.objectClass)) {
if (!this.hierarchy.isDerived(attribute.type._class, core.class.Collection)) {
continue
}
const collection = attribute.type as Collection<AttachedDoc>
const allAttached = await findAll(ctx, collection.of, { attachedTo: rtx.objectId, space: rtx.objectSpace })
const allTx = allAttached.map(({ _class, space, _id }) =>
factory.createTxUpdateDoc(_class, space, _id, { space: rtx.operations.space })
)
result.push(...allTx)
}
} }
return result return result
} }
private async proccessDerived ( private async proccessDerived (
ctx: MeasureContext, ctx: MeasureContext,
tx: Tx, txes: Tx[],
_class: Ref<Class<Tx>>, triggerFx: Effects,
triggerFx: Effects findAll: ServerStorage['findAll'],
removedMap: Map<Ref<Doc>, Doc>
): Promise<Tx[]> { ): Promise<Tx[]> {
const fAll = const fAll =
(mctx: MeasureContext) => (mctx: MeasureContext) =>
@ -436,48 +541,62 @@ class TServerStorage implements ServerStorage {
query: DocumentQuery<T>, query: DocumentQuery<T>,
options?: FindOptions<T> options?: FindOptions<T>
): Promise<FindResult<T>> => ): Promise<FindResult<T>> =>
this.findAll(mctx, clazz, query, options) findAll(mctx, clazz, query, options)
const derived = [
...(await ctx.with('process-collection', { _class }, () => this.processCollection(ctx, tx))), const removed = await ctx.with('process-remove', {}, () => this.processRemove(ctx, txes, findAll, removedMap))
...(await ctx.with('process-remove', { _class }, () => this.processRemove(ctx, tx))), const collections = await ctx.with('process-collection', {}, () =>
...(await ctx.with('process-move', { _class }, () => this.processMove(ctx, tx))), this.processCollection(ctx, txes, findAll, removedMap)
...(await ctx.with('process-triggers', {}, (ctx) => )
this.triggers.apply(tx.modifiedBy, tx, { const moves = await ctx.with('process-move', {}, () => this.processMove(ctx, txes, findAll))
workspace: this.workspace,
fx: triggerFx.fx, const triggers = await ctx.with('process-triggers', {}, async (ctx) => {
fulltextFx: (f) => triggerFx.fx(() => f(this.fulltextAdapter)), const result: Tx[] = []
storageFx: (f) => { for (const tx of txes) {
const adapter = this.storageAdapter result.push(
if (adapter === undefined) { ...(await this.triggers.apply(tx.modifiedBy, tx, {
return removedMap,
workspace: this.workspace,
fx: triggerFx.fx,
fulltextFx: (f) => triggerFx.fx(() => f(this.fulltextAdapter)),
storageFx: (f) => {
const adapter = this.storageAdapter
if (adapter === undefined) {
return
}
triggerFx.fx(() => f(adapter, this.workspace))
},
findAll: fAll(ctx),
modelDb: this.modelDb,
hierarchy: this.hierarchy,
txFx: async (f) => {
await f(this.getAdapter(DOMAIN_TX))
} }
}))
)
}
return result
})
triggerFx.fx(() => f(adapter, this.workspace)) const derived = [...removed, ...collections, ...moves, ...triggers]
},
findAll: fAll(ctx),
modelDb: this.modelDb,
hierarchy: this.hierarchy,
txFx: async (f) => {
await f(this.getAdapter(DOMAIN_TX))
}
})
))
]
return await this.processDerivedTxes(derived, ctx, triggerFx) return await this.processDerivedTxes(derived, ctx, triggerFx, findAll, removedMap)
} }
private async processDerivedTxes (derived: Tx[], ctx: MeasureContext, triggerFx: Effects): Promise<Tx[]> { private async processDerivedTxes (
derived: Tx[],
ctx: MeasureContext,
triggerFx: Effects,
findAll: ServerStorage['findAll'],
removedMap: Map<Ref<Doc>, Doc>
): Promise<Tx[]> {
derived.sort((a, b) => a.modifiedOn - b.modifiedOn) derived.sort((a, b) => a.modifiedOn - b.modifiedOn)
for (const tx of derived) { await ctx.with('derived-route-tx', {}, (ctx) => this.routeTx(ctx, ...derived))
await ctx.with('derived-route-tx', { _class: txClass(tx) }, (ctx) => this.routeTx(ctx, tx))
}
const nestedTxes: Tx[] = [] const nestedTxes: Tx[] = []
for (const tx of derived) { if (derived.length > 0) {
const _class = txClass(tx) nestedTxes.push(...(await this.proccessDerived(ctx, derived, triggerFx, findAll, removedMap)))
nestedTxes.push(...(await this.proccessDerived(ctx, tx, _class, triggerFx)))
} }
const res = [...derived, ...nestedTxes] const res = [...derived, ...nestedTxes]
@ -490,7 +609,8 @@ class TServerStorage implements ServerStorage {
*/ */
async verifyApplyIf ( async verifyApplyIf (
ctx: MeasureContext, ctx: MeasureContext,
applyIf: TxApplyIf applyIf: TxApplyIf,
findAll: ServerStorage['findAll']
): Promise<{ ): Promise<{
onEnd: () => void onEnd: () => void
passed: boolean passed: boolean
@ -510,7 +630,7 @@ class TServerStorage implements ServerStorage {
) )
let passed = true let passed = true
for (const { _class, query } of applyIf.match) { for (const { _class, query } of applyIf.match) {
const res = await this.findAll(ctx, _class, query, { limit: 1 }) const res = await findAll(ctx, _class, query, { limit: 1 })
if (res.length === 0) { if (res.length === 0) {
passed = false passed = false
break break
@ -522,6 +642,7 @@ class TServerStorage implements ServerStorage {
async tx (ctx: MeasureContext, tx: Tx): Promise<[TxResult, Tx[]]> { async tx (ctx: MeasureContext, tx: Tx): Promise<[TxResult, Tx[]]> {
// store tx // store tx
const _class = txClass(tx) const _class = txClass(tx)
const cacheFind = createCacheFindAll(this)
const objClass = txObjectClass(tx) const objClass = txObjectClass(tx)
return await ctx.with('tx', { _class, objClass }, async (ctx) => { return await ctx.with('tx', { _class, objClass }, async (ctx) => {
if (tx.space !== core.space.DerivedTx && !this.hierarchy.isDerived(tx._class, core.class.TxApplyIf)) { if (tx.space !== core.space.DerivedTx && !this.hierarchy.isDerived(tx._class, core.class.TxApplyIf)) {
@ -546,23 +667,21 @@ class TServerStorage implements ServerStorage {
const applyIf = tx as TxApplyIf const applyIf = tx as TxApplyIf
// Wait for scope promise if found // Wait for scope promise if found
let passed: boolean let passed: boolean
;({ passed, onEnd } = await this.verifyApplyIf(ctx, applyIf)) ;({ passed, onEnd } = await this.verifyApplyIf(ctx, applyIf, cacheFind))
result = passed result = passed
if (passed) { if (passed) {
// Store apply if transaction's // Store apply if transaction's
await ctx.with('domain-tx', { _class, objClass }, async () => { await ctx.with('domain-tx', { _class, objClass }, async () => {
const atx = await this.getAdapter(DOMAIN_TX) const atx = await this.getAdapter(DOMAIN_TX)
for (const ctx of applyIf.txes) { await atx.tx(...applyIf.txes)
await atx.tx(ctx)
}
}) })
derived = await this.processDerivedTxes(applyIf.txes, ctx, triggerFx) derived = await this.processDerivedTxes(applyIf.txes, ctx, triggerFx, cacheFind, new Map<Ref<Doc>, Doc>())
} }
} else { } else {
// store object // store object
result = await ctx.with('route-tx', { _class, objClass }, (ctx) => this.routeTx(ctx, tx)) result = await ctx.with('route-tx', { _class, objClass }, (ctx) => this.routeTx(ctx, tx))
// invoke triggers and store derived objects // invoke triggers and store derived objects
derived = await this.proccessDerived(ctx, tx, _class, triggerFx) derived = await this.proccessDerived(ctx, [tx], triggerFx, cacheFind, new Map<Ref<Doc>, Doc>())
} }
// index object // index object

View File

@ -105,6 +105,7 @@ export interface TriggerControl {
findAll: Storage['findAll'] findAll: Storage['findAll']
hierarchy: Hierarchy hierarchy: Hierarchy
modelDb: ModelDb modelDb: ModelDb
removedMap: Map<Ref<Doc>, Doc>
fulltextFx: (f: (adapter: FullTextAdapter) => Promise<void>) => void fulltextFx: (f: (adapter: FullTextAdapter) => Promise<void>) => void
// Since we don't have other storages let's consider adapter is MinioClient // Since we don't have other storages let's consider adapter is MinioClient

34
server/core/src/utils.ts Normal file
View File

@ -0,0 +1,34 @@
import {
Class,
Doc,
DocumentQuery,
FindOptions,
FindResult,
MeasureContext,
Ref,
ServerStorage
} from '@hcengineering/core'
/**
* @public
*/
export function createCacheFindAll (storage: ServerStorage): ServerStorage['findAll'] {
// We will cache all queries for same objects for all derived data checks.
const queryCache = new Map<string, FindResult<Doc>>()
return async <T extends Doc>(
ctx: MeasureContext,
clazz: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> => {
const key = JSON.stringify(clazz) + JSON.stringify(query) + JSON.stringify(options)
let cacheResult = queryCache.get(key)
if (cacheResult !== undefined) {
return cacheResult as FindResult<T>
}
cacheResult = await storage.findAll(ctx, clazz, query, options)
queryCache.set(key, cacheResult)
return cacheResult as FindResult<T>
}
}

View File

@ -47,7 +47,7 @@ class ElasticDataAdapter implements DbAdapter {
return Object.assign([], { total: 0 }) return Object.assign([], { total: 0 })
} }
async tx (tx: Tx): Promise<TxResult> { async tx (...tx: Tx[]): Promise<TxResult> {
return {} return {}
} }

View File

@ -14,6 +14,7 @@
// //
import core, { import core, {
AttachedDoc,
Class, Class,
Doc, Doc,
DocumentQuery, DocumentQuery,
@ -38,6 +39,7 @@ import core, {
StorageIterator, StorageIterator,
toFindResult, toFindResult,
Tx, Tx,
TxCollectionCUD,
TxCreateDoc, TxCreateDoc,
TxMixin, TxMixin,
TxProcessor, TxProcessor,
@ -48,7 +50,7 @@ import core, {
WorkspaceId WorkspaceId
} from '@hcengineering/core' } from '@hcengineering/core'
import type { DbAdapter, TxAdapter } from '@hcengineering/server-core' import type { DbAdapter, TxAdapter } from '@hcengineering/server-core'
import { Collection, Db, Document, Filter, MongoClient, Sort, UpdateFilter } from 'mongodb' import { AnyBulkWriteOperation, Collection, Db, Document, Filter, MongoClient, Sort, UpdateFilter } from 'mongodb'
import { createHash } from 'node:crypto' import { createHash } from 'node:crypto'
import { getMongoClient, getWorkspaceDB } from './utils' import { getMongoClient, getWorkspaceDB } from './utils'
@ -80,18 +82,20 @@ interface LookupStep {
pipeline?: any pipeline?: any
} }
abstract class MongoAdapterBase extends TxProcessor { abstract class MongoAdapterBase implements DbAdapter {
constructor ( constructor (
protected readonly db: Db, protected readonly db: Db,
protected readonly hierarchy: Hierarchy, protected readonly hierarchy: Hierarchy,
protected readonly modelDb: ModelDb, protected readonly modelDb: ModelDb,
protected readonly client: MongoClient protected readonly client: MongoClient
) { ) {}
super()
}
async init (): Promise<void> {} async init (): Promise<void> {}
async tx (...tx: Tx[]): Promise<TxResult> {
return {}
}
async close (): Promise<void> { async close (): Promise<void> {
await this.client.close() await this.client.close()
} }
@ -105,14 +109,7 @@ abstract class MongoAdapterBase extends TxProcessor {
if (value !== null && typeof value === 'object') { if (value !== null && typeof value === 'object') {
const keys = Object.keys(value) const keys = Object.keys(value)
if (keys[0] === '$like') { if (keys[0] === '$like') {
const pattern = value.$like as string translated[tkey] = translateLikeQuery(value.$like as string)
translated[tkey] = {
$regex: `^${pattern
.split('%')
.map((it) => escapeLikeForRegexp(it))
.join('.*')}$`,
$options: 'i'
}
continue continue
} }
} }
@ -130,7 +127,6 @@ abstract class MongoAdapterBase extends TxProcessor {
// Add an mixin to be exists flag // Add an mixin to be exists flag
translated[clazz] = { $exists: true } translated[clazz] = { $exists: true }
} }
// return Object.assign({}, query, { _class: { $in: classes } })
return translated return translated
} }
@ -522,9 +518,6 @@ abstract class MongoAdapterBase extends TxProcessor {
} }
})) }))
) )
// await coll.deleteMany({ _id: { $in: keys } })
// await coll.insertMany(Array.from(docMap.values()) as Document[])
} }
} }
@ -555,75 +548,173 @@ abstract class MongoAdapterBase extends TxProcessor {
} }
} }
interface DomainOperation {
raw: () => Promise<TxResult>
domain: Domain
bulk?: AnyBulkWriteOperation[]
}
class MongoAdapter extends MongoAdapterBase { class MongoAdapter extends MongoAdapterBase {
protected override async txRemoveDoc (tx: TxRemoveDoc<Doc>): Promise<TxResult> { getOperations (tx: Tx): DomainOperation | undefined {
const domain = this.hierarchy.getDomain(tx.objectClass) switch (tx._class) {
await this.db.collection(domain).deleteOne({ _id: tx.objectId }) case core.class.TxCreateDoc:
return {} return this.txCreateDoc(tx as TxCreateDoc<Doc>)
case core.class.TxCollectionCUD:
return this.txCollectionCUD(tx as TxCollectionCUD<Doc, AttachedDoc>)
case core.class.TxUpdateDoc:
return this.txUpdateDoc(tx as TxUpdateDoc<Doc>)
case core.class.TxRemoveDoc:
return this.txRemoveDoc(tx as TxRemoveDoc<Doc>)
case core.class.TxMixin:
return this.txMixin(tx as TxMixin<Doc, Doc>)
case core.class.TxApplyIf:
return undefined
}
console.error('Unknown/Unsupported operation:', tx._class, tx)
} }
protected async txMixin (tx: TxMixin<Doc, Doc>): Promise<TxResult> { async tx (...txes: Tx[]): Promise<TxResult> {
const result: TxResult[] = []
const bulkOperations: DomainOperation[] = []
let lastDomain: Domain | undefined
const bulkExecute = async (): Promise<void> => {
if (lastDomain === undefined || bulkOperations.length === 0) {
return
}
try {
await this.db
.collection(lastDomain)
.bulkWrite(bulkOperations.reduce<AnyBulkWriteOperation[]>((ops, op) => ops.concat(...(op.bulk ?? [])), []))
} catch (err: any) {
console.trace(err)
throw err
}
bulkOperations.splice(0, bulkOperations.length)
lastDomain = undefined
}
if (txes.length > 1) {
for (const tx of txes) {
const dop: DomainOperation | undefined = this.getOperations(tx)
if (dop === undefined) {
continue
}
if (dop.bulk === undefined) {
// Execute previous bulk and capture result.
await bulkExecute()
try {
result.push(await dop.raw())
} catch (err: any) {
console.error(err)
}
continue
}
if (lastDomain === undefined) {
lastDomain = dop.domain
}
bulkOperations.push(dop)
}
await bulkExecute()
} else {
return (await this.getOperations(txes[0])?.raw()) ?? {}
}
if (result.length === 0) {
return {}
}
if (result.length === 1) {
return result[0]
}
return result
}
protected txCollectionCUD (tx: TxCollectionCUD<Doc, AttachedDoc>): DomainOperation {
// We need update only create transactions to contain attached, attachedToClass.
if (tx.tx._class === core.class.TxCreateDoc) {
const createTx = tx.tx as TxCreateDoc<AttachedDoc>
const d: TxCreateDoc<AttachedDoc> = {
...createTx,
attributes: {
...createTx.attributes,
attachedTo: tx.objectId,
attachedToClass: tx.objectClass,
collection: tx.collection
}
}
return this.txCreateDoc(d)
}
// We could cast since we know collection cud is supported.
return this.getOperations(tx.tx) as DomainOperation
}
protected txRemoveDoc (tx: TxRemoveDoc<Doc>): DomainOperation {
const domain = this.hierarchy.getDomain(tx.objectClass)
return {
raw: () => this.db.collection(domain).deleteOne({ _id: tx.objectId }),
domain,
bulk: [{ deleteOne: { filter: { _id: tx.objectId } } }]
}
}
protected txMixin (tx: TxMixin<Doc, Doc>): DomainOperation {
const domain = this.hierarchy.getDomain(tx.objectClass) const domain = this.hierarchy.getDomain(tx.objectClass)
const filter = { _id: tx.objectId }
const modifyOp = {
modifiedBy: tx.modifiedBy,
modifiedOn: tx.modifiedOn
}
if (isOperator(tx.attributes)) { if (isOperator(tx.attributes)) {
const operator = Object.keys(tx.attributes)[0] const operator = Object.keys(tx.attributes)[0]
if (operator === '$move') { if (operator === '$move') {
const keyval = (tx.attributes as any).$move const keyval = (tx.attributes as any).$move
const arr = tx.mixin + '.' + Object.keys(keyval)[0] const arr = tx.mixin + '.' + Object.keys(keyval)[0]
const desc = keyval[arr] const desc = keyval[arr]
const ops = [ const ops: any = [
{ updateOne: { filter, update: { $pull: { [arr]: desc.$value } } } },
{ {
updateOne: { updateOne: {
filter: { _id: tx.objectId }, filter,
update: { update: { $set: modifyOp, $push: { [arr]: { $each: [desc.$value], $position: desc.$position } } }
$pull: {
[arr]: desc.$value
}
}
}
},
{
updateOne: {
filter: { _id: tx.objectId },
update: {
$set: {
modifiedBy: tx.modifiedBy,
modifiedOn: tx.modifiedOn
},
$push: {
[arr]: {
$each: [desc.$value],
$position: desc.$position
}
}
}
} }
} }
] ]
return await this.db.collection(domain).bulkWrite(ops as any) // return await this.db.collection(domain).bulkWrite(ops as any)
} else { return {
return await this.db.collection(domain).updateOne( raw: async () => await this.db.collection(domain).bulkWrite(ops),
{ _id: tx.objectId }, domain,
bulk: ops
}
}
const update = { ...this.translateMixinAttrs(tx.mixin, tx.attributes), $set: { ...modifyOp } }
return {
raw: async () => await this.db.collection(domain).updateOne(filter, update),
domain,
bulk: [
{ {
...this.translateMixinAttrs(tx.mixin, tx.attributes), updateOne: {
$set: { filter,
modifiedBy: tx.modifiedBy, update
modifiedOn: tx.modifiedOn
} }
} }
) ]
} }
} else { }
return await this.db.collection(domain).updateOne( const update = { $set: { ...this.translateMixinAttrs(tx.mixin, tx.attributes), ...modifyOp } }
{ _id: tx.objectId }, return {
raw: async () => await this.db.collection(domain).updateOne(filter, update),
domain,
bulk: [
{ {
$set: { updateOne: {
...this.translateMixinAttrs(tx.mixin, tx.attributes), filter,
modifiedBy: tx.modifiedBy, update
modifiedOn: tx.modifiedOn
} }
} }
) ]
} }
} }
@ -647,14 +738,22 @@ class MongoAdapter extends MongoAdapterBase {
return attrs return attrs
} }
protected override async txCreateDoc (tx: TxCreateDoc<Doc>): Promise<TxResult> { protected txCreateDoc (tx: TxCreateDoc<Doc>): DomainOperation {
const doc = TxProcessor.createDoc2Doc(tx) const doc = TxProcessor.createDoc2Doc(tx)
const domain = this.hierarchy.getDomain(doc._class) const domain = this.hierarchy.getDomain(doc._class)
await this.db.collection(domain).insertOne(translateDoc(doc)) const tdoc = translateDoc(doc)
return {} return {
raw: async () => await this.db.collection(domain).insertOne(tdoc),
domain,
bulk: [
{
insertOne: { document: tdoc }
}
]
}
} }
protected override async txUpdateDoc (tx: TxUpdateDoc<Doc>): Promise<TxResult> { protected txUpdateDoc (tx: TxUpdateDoc<Doc>): DomainOperation {
const domain = this.hierarchy.getDomain(tx.objectClass) const domain = this.hierarchy.getDomain(tx.objectClass)
if (isOperator(tx.operations)) { if (isOperator(tx.operations)) {
const operator = Object.keys(tx.operations)[0] const operator = Object.keys(tx.operations)[0]
@ -662,7 +761,8 @@ class MongoAdapter extends MongoAdapterBase {
const keyval = (tx.operations as any).$move const keyval = (tx.operations as any).$move
const arr = Object.keys(keyval)[0] const arr = Object.keys(keyval)[0]
const desc = keyval[arr] const desc = keyval[arr]
const ops = [
const ops: any = [
{ {
updateOne: { updateOne: {
filter: { _id: tx.objectId }, filter: { _id: tx.objectId },
@ -691,7 +791,11 @@ class MongoAdapter extends MongoAdapterBase {
} }
} }
] ]
return await this.db.collection(domain).bulkWrite(ops as any) return {
raw: async () => await this.db.collection(domain).bulkWrite(ops),
domain,
bulk: ops
}
} else if (operator === '$update') { } else if (operator === '$update') {
const keyval = (tx.operations as any).$update const keyval = (tx.operations as any).$update
const arr = Object.keys(keyval)[0] const arr = Object.keys(keyval)[0]
@ -722,51 +826,66 @@ class MongoAdapter extends MongoAdapterBase {
} }
} }
] ]
return await this.db.collection(domain).bulkWrite(ops as any) return {
raw: async () => await this.db.collection(domain).bulkWrite(ops),
domain,
bulk: ops
}
} else { } else {
if (tx.retrieve === true) { if (tx.retrieve === true) {
const result = await this.db.collection(domain).findOneAndUpdate( const raw = async (): Promise<TxResult> => {
{ _id: tx.objectId }, const result = await this.db.collection(domain).findOneAndUpdate(
{ { _id: tx.objectId },
...tx.operations, {
$set: { ...tx.operations,
modifiedBy: tx.modifiedBy, $set: {
modifiedOn: tx.modifiedOn modifiedBy: tx.modifiedBy,
} modifiedOn: tx.modifiedOn
} as unknown as UpdateFilter<Document>, }
{ returnDocument: 'after' } } as unknown as UpdateFilter<Document>,
) { returnDocument: 'after' }
return { object: result.value } )
return { object: result.value }
}
return {
raw,
domain,
bulk: undefined
}
} else { } else {
return await this.db.collection(domain).updateOne( const filter = { _id: tx.objectId }
{ _id: tx.objectId }, const update = {
{ ...tx.operations,
...tx.operations, $set: {
$set: { modifiedBy: tx.modifiedBy,
modifiedBy: tx.modifiedBy, modifiedOn: tx.modifiedOn
modifiedOn: tx.modifiedOn
}
} }
) }
return {
raw: async () => await this.db.collection(domain).updateOne(filter, update),
domain,
bulk: [{ updateOne: { filter, update } }]
}
} }
} }
} else { } else {
if (tx.retrieve === true) { const filter = { _id: tx.objectId }
const result = await this.db.collection(domain).findOneAndUpdate( const update = { $set: { ...tx.operations, modifiedBy: tx.modifiedBy, modifiedOn: tx.modifiedOn } }
{ _id: tx.objectId }, const raw =
{ tx.retrieve === true
$set: { ...tx.operations, modifiedBy: tx.modifiedBy, modifiedOn: tx.modifiedOn } ? async (): Promise<TxResult> => {
} as unknown as UpdateFilter<Document>, const result = await this.db
{ returnDocument: 'after' } .collection(domain)
) .findOneAndUpdate(filter, update, { returnDocument: 'after' })
return { object: result.value } return { object: result.value }
} else { }
return await this.db : async () => await this.db.collection(domain).updateOne(filter, update)
.collection(domain)
.updateOne( // Disable bulk for operators
{ _id: tx.objectId }, return {
{ $set: { ...tx.operations, modifiedBy: tx.modifiedBy, modifiedOn: tx.modifiedOn } } raw,
) domain,
bulk: [{ updateOne: { filter, update } }]
} }
} }
} }
@ -774,27 +893,9 @@ class MongoAdapter extends MongoAdapterBase {
class MongoTxAdapter extends MongoAdapterBase implements TxAdapter { class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
txColl: Collection | undefined txColl: Collection | undefined
protected txCreateDoc (tx: TxCreateDoc<Doc>): Promise<TxResult> {
throw new Error('Method not implemented.')
}
protected txUpdateDoc (tx: TxUpdateDoc<Doc>): Promise<TxResult> { override async tx (...tx: Tx[]): Promise<TxResult> {
throw new Error('Method not implemented.') await this.txCollection().insertMany(tx.map((it) => translateDoc(it)))
}
protected txRemoveDoc (tx: TxRemoveDoc<Doc>): Promise<TxResult> {
throw new Error('Method not implemented.')
}
protected txMixin (tx: TxMixin<Doc, Doc>): Promise<TxResult> {
throw new Error('Method not implemented.')
}
override async tx (tx: Tx, user: string): Promise<TxResult>
override async tx (tx: Tx): Promise<TxResult>
override async tx (tx: Tx, user?: string): Promise<TxResult> {
await this.txCollection().insertOne(translateDoc(tx))
return {} return {}
} }
@ -822,6 +923,16 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
} }
} }
function translateLikeQuery (pattern: string): { $regex: string, $options: string } {
return {
$regex: `^${pattern
.split('%')
.map((it) => escapeLikeForRegexp(it))
.join('.*')}$`,
$options: 'i'
}
}
/** /**
* @public * @public
*/ */

View File

@ -45,7 +45,7 @@ class MinioBlobAdapter implements DbAdapter {
return Object.assign([], { total: 0 }) return Object.assign([], { total: 0 })
} }
async tx (tx: Tx): Promise<TxResult> { async tx (...tx: Tx[]): Promise<TxResult> {
return {} return {}
} }

View File

@ -89,7 +89,7 @@ export async function initModel (
transactorUrl: string, transactorUrl: string,
workspaceId: WorkspaceId, workspaceId: WorkspaceId,
rawTxes: Tx[], rawTxes: Tx[],
migrateOperations: MigrateOperation[] migrateOperations: [string, MigrateOperation][]
): Promise<void> { ): Promise<void> {
const { mongodbUri, minio, txes } = prepareTools(rawTxes) const { mongodbUri, minio, txes } = prepareTools(rawTxes)
if (txes.some((tx) => tx.objectSpace !== core.space.Model)) { if (txes.some((tx) => tx.objectSpace !== core.space.Model)) {
@ -115,7 +115,8 @@ export async function initModel (
})) as unknown as CoreClient & BackupClient })) as unknown as CoreClient & BackupClient
try { try {
for (const op of migrateOperations) { for (const op of migrateOperations) {
await op.upgrade(connection) console.log('Migrage', op[0])
await op[1].upgrade(connection)
} }
} catch (e) { } catch (e) {
console.log(e) console.log(e)
@ -142,7 +143,7 @@ export async function upgradeModel (
transactorUrl: string, transactorUrl: string,
workspaceId: WorkspaceId, workspaceId: WorkspaceId,
rawTxes: Tx[], rawTxes: Tx[],
migrateOperations: MigrateOperation[] migrateOperations: [string, MigrateOperation][]
): Promise<void> { ): Promise<void> {
const { mongodbUri, txes } = prepareTools(rawTxes) const { mongodbUri, txes } = prepareTools(rawTxes)
@ -171,7 +172,8 @@ export async function upgradeModel (
const migrateClient = new MigrateClientImpl(db) const migrateClient = new MigrateClientImpl(db)
for (const op of migrateOperations) { for (const op of migrateOperations) {
await op.migrate(migrateClient) console.log('migrate:', op[0])
await op[1].migrate(migrateClient)
} }
console.log('Apply upgrade operations') console.log('Apply upgrade operations')
@ -182,7 +184,8 @@ export async function upgradeModel (
await createUpdateIndexes(connection, db) await createUpdateIndexes(connection, db)
for (const op of migrateOperations) { for (const op of migrateOperations) {
await op.upgrade(connection) console.log('upgrade:', op[0])
await op[1].upgrade(connection)
} }
await connection.close() await connection.close()