diff --git a/dev/tool/src/cleanOrphan.ts b/dev/tool/src/cleanOrphan.ts index 8ea4378ed9..44b1146329 100644 --- a/dev/tool/src/cleanOrphan.ts +++ b/dev/tool/src/cleanOrphan.ts @@ -1,7 +1,6 @@ -import { dropWorkspace, setWorkspaceDisabled, type Workspace } from '@hcengineering/account' -import core, { AccountRole, type MeasureContext, MeasureMetricsContext, SortingOrder } from '@hcengineering/core' +import { dropWorkspaceFull, setWorkspaceDisabled, type Workspace } from '@hcengineering/account' +import core, { AccountRole, MeasureMetricsContext, SortingOrder, type MeasureContext } from '@hcengineering/core' import contact from '@hcengineering/model-contact' -import { getWorkspaceDB } from '@hcengineering/mongo' import { type StorageAdapter } from '@hcengineering/server-core' import { connect } from '@hcengineering/server-tool' import { type Db, type MongoClient } from 'mongodb' @@ -66,18 +65,14 @@ export async function checkOrphanWorkspaces ( await setWorkspaceDisabled(db, ws._id, true) } if (cmd.remove) { - await dropWorkspace(new MeasureMetricsContext('tool', {}), db, productId, ws.workspace) - const workspaceDb = getWorkspaceDB(client, { name: ws.workspace, productId }) - await workspaceDb.dropDatabase() - if (storageAdapter !== undefined && hasBucket) { - const docs = await storageAdapter.list(ctx, wspace) - await storageAdapter.remove( - ctx, - wspace, - docs.map((it) => it._id) - ) - await storageAdapter.delete(ctx, wspace) - } + await dropWorkspaceFull( + new MeasureMetricsContext('tool', {}), + db, + client, + productId, + ws.workspace, + storageAdapter + ) } } } diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 82ff64c61a..b926625e7b 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -22,9 +22,11 @@ import { createWorkspace, dropAccount, dropWorkspace, + dropWorkspaceFull, getAccount, getWorkspaceById, listAccounts, + listWorkspacesByAccount, listWorkspacesPure, listWorkspacesRaw, replacePassword, @@ -79,10 +81,10 @@ import { } from './clean' import { checkOrphanWorkspaces } from './cleanOrphan' import { changeConfiguration } from './configuration' +import { fixJsonMarkup } from './markup' import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin' import { openAIConfig } from './openai' import { fixAccountEmails, renameAccount } from './renameAccount' -import { fixJsonMarkup } from './markup' const colorConstants = { colorRed: '\u001b[31m', @@ -395,15 +397,65 @@ export function devTool ( program .command('drop-workspace ') .description('drop workspace') - .action(async (workspace, cmd) => { - const { mongodbUri } = prepareTools() - await withDatabase(mongodbUri, async (db) => { + .option('--full [full]', 'Force remove all data', false) + .action(async (workspace, cmd: { full: boolean }) => { + const { mongodbUri, storageAdapter } = prepareTools() + await withDatabase(mongodbUri, async (db, client) => { const ws = await getWorkspaceById(db, productId, workspace) if (ws === null) { console.log('no workspace exists') return } - await dropWorkspace(toolCtx, db, productId, workspace) + if (cmd.full) { + await dropWorkspaceFull(toolCtx, db, client, productId, workspace, storageAdapter) + } else { + await dropWorkspace(toolCtx, db, productId, workspace) + } + }) + }) + + program + .command('drop-workspace-by-email ') + .description('drop workspace') + .option('--full [full]', 'Force remove all data', false) + .action(async (email, cmd: { full: boolean }) => { + const { mongodbUri, storageAdapter } = prepareTools() + await withDatabase(mongodbUri, async (db, client) => { + for (const workspace of await listWorkspacesByAccount(db, productId, email)) { + if (cmd.full) { + await dropWorkspaceFull(toolCtx, db, client, productId, workspace.workspace, storageAdapter) + } else { + await dropWorkspace(toolCtx, db, productId, workspace.workspace) + } + } + }) + }) + program + .command('list-workspace-by-email ') + .description('drop workspace') + .option('--full [full]', 'Force remove all data', false) + .action(async (email, cmd: { full: boolean }) => { + const { mongodbUri } = prepareTools() + await withDatabase(mongodbUri, async (db, client) => { + for (const workspace of await listWorkspacesByAccount(db, productId, email)) { + console.log(workspace.workspace, workspace.workspaceUrl, workspace.workspaceName) + } + }) + }) + + program + .command('drop-workspace-last-visit') + .description('drop old workspaces') + .action(async (cmd: any) => { + const { mongodbUri, storageAdapter } = prepareTools() + await withDatabase(mongodbUri, async (db, client) => { + const workspacesJSON = await listWorkspacesPure(db, productId) + for (const ws of workspacesJSON) { + const lastVisit = Math.floor((Date.now() - ws.lastVisit) / 1000 / 3600 / 24) + if (lastVisit > 30) { + await dropWorkspaceFull(toolCtx, db, client, productId, ws.workspace, storageAdapter) + } + } }) }) diff --git a/server/account/src/operations.ts b/server/account/src/operations.ts index 8a3e867182..7147f02cb1 100644 --- a/server/account/src/operations.ts +++ b/server/account/src/operations.ts @@ -50,8 +50,9 @@ import { cloneWorkspace } from '@hcengineering/server-backup' import { decodeToken, generateToken } from '@hcengineering/server-token' import toolPlugin, { connect, initModel, upgradeModel } from '@hcengineering/server-tool' import { pbkdf2Sync, randomBytes } from 'crypto' -import { Binary, Db, Filter, ObjectId } from 'mongodb' +import { Binary, Db, Filter, ObjectId, type MongoClient } from 'mongodb' import fetch from 'node-fetch' +import type { StorageAdapter } from '../../core/types' import { accountPlugin } from './plugin' const WORKSPACE_COLLECTION = 'workspace' @@ -689,6 +690,22 @@ export async function listWorkspaces ( .map(trimWorkspaceInfo) } +/** + * @public + */ +export async function listWorkspacesByAccount (db: Db, productId: string, email: string): Promise { + const account = await getAccount(db, email) + return ( + await db + .collection(WORKSPACE_COLLECTION) + .find(withProductId(productId, { _id: { $in: account?.workspaces } })) + .toArray() + ) + .map((it) => ({ ...it, productId })) + .filter((it) => it.disabled !== true) + .map(trimWorkspaceInfo) +} + /** * @public */ @@ -1727,7 +1744,7 @@ export async function dropWorkspace ( db: Db, productId: string, workspaceId: string -): Promise { +): Promise { const ws = await getWorkspaceById(db, productId, workspaceId) if (ws === null) { throw new PlatformError(new Status(Severity.ERROR, platform.status.WorkspaceNotFound, { workspace: workspaceId })) @@ -1738,6 +1755,35 @@ export async function dropWorkspace ( .updateMany({ _id: { $in: ws.accounts ?? [] } }, { $pull: { workspaces: ws._id } }) ctx.info('Workspace dropped', { workspace: ws.workspace }) + return ws +} + +/** + * @public + */ +export async function dropWorkspaceFull ( + ctx: MeasureContext, + db: Db, + client: MongoClient, + productId: string, + workspaceId: string, + storageAdapter?: StorageAdapter +): Promise { + const ws = await dropWorkspace(ctx, db, productId, workspaceId) + const workspaceDb = client.db(ws.workspace) + await workspaceDb.dropDatabase() + const wspace = getWorkspaceId(workspaceId, productId) + const hasBucket = await storageAdapter?.exists(ctx, wspace) + if (storageAdapter !== undefined && hasBucket === true) { + const docs = await storageAdapter.list(ctx, wspace) + await storageAdapter.remove( + ctx, + wspace, + docs.map((it) => it._id) + ) + await storageAdapter.delete(ctx, wspace) + } + ctx.info('Workspace fully dropped', { workspace: ws.workspace }) } /** diff --git a/server/core/src/adapter.ts b/server/core/src/adapter.ts index 3495156f8e..f93d50eb21 100644 --- a/server/core/src/adapter.ts +++ b/server/core/src/adapter.ts @@ -19,6 +19,7 @@ import { type DocumentQuery, type DocumentUpdate, type Domain, + type FieldIndex, type FindOptions, type FindResult, type Hierarchy, @@ -33,6 +34,24 @@ import { } from '@hcengineering/core' import { type StorageAdapter } from './storage' +export interface DomainHelperOperations { + create: (domain: Domain) => Promise + exists: (domain: Domain) => boolean + createIndex: (domain: Domain, value: string | FieldIndex, options?: { name: string }) => Promise + dropIndex: (domain: Domain, name: string) => Promise + listIndexes: (domain: Domain) => Promise<{ name: string }[]> + hasDocuments: (domain: Domain, count: number) => Promise +} + +export interface DomainHelper { + checkDomain: ( + ctx: MeasureContext, + domain: Domain, + forceCreate: boolean, + operations: DomainHelperOperations + ) => Promise +} + /** * @public */ @@ -51,6 +70,9 @@ export interface RawDBAdapter { * @public */ export interface DbAdapter { + init?: () => Promise + + helper?: () => DomainHelperOperations createIndexes: (domain: Domain, config: Pick, 'indexes'>) => Promise removeOldIndex: (domain: Domain, deletePattern: RegExp, keepPattern: RegExp) => Promise diff --git a/server/core/src/indexer/indexer.ts b/server/core/src/indexer/indexer.ts index bce7aad0ed..bf1d8937b2 100644 --- a/server/core/src/indexer/indexer.ts +++ b/server/core/src/indexer/indexer.ts @@ -93,6 +93,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { async cancel (): Promise { this.cancelling = true + clearTimeout(this.updateBroadcast) clearTimeout(this.skippedReiterationTimeout) this.triggerIndexing() await this.indexing diff --git a/server/core/src/mem.ts b/server/core/src/mem.ts index c5a3a103d5..49487ce666 100644 --- a/server/core/src/mem.ts +++ b/server/core/src/mem.ts @@ -47,6 +47,8 @@ export class DummyDbAdapter implements DbAdapter { return toFindResult([]) } + async init (): Promise {} + async createIndexes (domain: Domain, config: Pick, 'indexes'>): Promise {} async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise {} diff --git a/server/core/src/server/domainHelper.ts b/server/core/src/server/domainHelper.ts new file mode 100644 index 0000000000..6c6822e3d2 --- /dev/null +++ b/server/core/src/server/domainHelper.ts @@ -0,0 +1,169 @@ +import type { + Doc, + Domain, + DomainIndexConfiguration, + FieldIndex, + Hierarchy, + MeasureContext, + ModelDb +} from '@hcengineering/core' +import core, { DOMAIN_MODEL, IndexKind, IndexOrder } from '@hcengineering/core' +import { deepEqual } from 'fast-equals' +import type { DomainHelper, DomainHelperOperations } from '../adapter' + +export class DomainIndexHelperImpl implements DomainHelper { + domains = new Map>>() + domainConfigurations: DomainIndexConfiguration[] = [] + constructor ( + readonly hierarchy: Hierarchy, + readonly model: ModelDb + ) { + const classes = model.findAllSync(core.class.Class, {}) + + this.domainConfigurations = + model.findAllSync(core.class.DomainIndexConfiguration, {}) ?? [] + + this.domains = new Map>>() + // Find all domains and indexed fields inside + for (const c of classes) { + try { + const domain = hierarchy.findDomain(c._id) + if (domain === undefined || domain === DOMAIN_MODEL) { + continue + } + const attrs = hierarchy.getAllAttributes(c._id) + const domainAttrs = this.domains.get(domain) ?? new Set>() + for (const a of attrs.values()) { + if (a.index !== undefined && (a.index === IndexKind.Indexed || a.index === IndexKind.IndexedDsc)) { + if (a.index === IndexKind.Indexed) { + domainAttrs.add(a.name) + } else { + domainAttrs.add({ [a.name]: IndexOrder.Descending }) + } + } + } + + // Handle extra configurations + if (hierarchy.hasMixin(c, core.mixin.IndexConfiguration)) { + const config = hierarchy.as(c, core.mixin.IndexConfiguration) + for (const attr of config.indexes) { + domainAttrs.add(attr) + } + } + + this.domains.set(domain, domainAttrs) + } catch (err: any) { + // Ignore, since we have classes without domain. + } + } + } + + /** + * return false if and only if domain underline structures are not required. + */ + async checkDomain ( + ctx: MeasureContext, + domain: Domain, + forceCreate: boolean, + operations: DomainHelperOperations + ): Promise { + const domainInfo = this.domains.get(domain) + const cfg = this.domainConfigurations.find((it) => it.domain === domain) + + let exists = operations.exists(domain) + const hasDocuments = exists && (await operations.hasDocuments(domain, 1)) + // Drop collection if it exists and should not exists or doesn't have documents. + if (exists && (cfg?.disableCollection === true || (!hasDocuments && !forceCreate))) { + // We do not need this collection + return false + } + + if (forceCreate && !exists) { + await operations.create(domain) + console.log('collection will be created', domain) + exists = true + } + if (!exists) { + // Do not need to create, since not force and no documents. + return false + } + const bb: (string | FieldIndex)[] = [] + const added = new Set() + + try { + const has50Documents = await operations.hasDocuments(domain, 50) + const allIndexes = (await operations.listIndexes(domain)).filter((it) => it.name !== '_id_') + console.log('check indexes', domain, has50Documents) + if (has50Documents) { + for (const vv of [...(domainInfo?.values() ?? []), ...(cfg?.indexes ?? [])]) { + try { + const name = + typeof vv === 'string' + ? `${vv}_1` + : Object.entries(vv) + .map(([key, val]) => `${key}_${val}`) + .join('_') + + // Check if index is disabled or not + const isDisabled = + cfg?.disabled?.some((it) => { + const _it = typeof it === 'string' ? { [it]: 1 } : it + const _vv = typeof vv === 'string' ? { [vv]: 1 } : vv + return deepEqual(_it, _vv) + }) ?? false + if (isDisabled) { + // skip index since it is disabled + continue + } + if (added.has(name)) { + // Index already added + continue + } + added.add(name) + + const existingOne = allIndexes.findIndex((it) => it.name === name) + if (existingOne !== -1) { + allIndexes.splice(existingOne, 1) + } + const exists = existingOne !== -1 + // Check if index exists + if (!exists) { + if (!isDisabled) { + // Check if not disabled + bb.push(vv) + await operations.createIndex(domain, vv, { + name + }) + } + } + } catch (err: any) { + ctx.error('error: failed to create index', { domain, vv, err }) + } + } + } + if (allIndexes.length > 0) { + for (const c of allIndexes) { + try { + if (cfg?.skip !== undefined) { + if (Array.from(cfg.skip ?? []).some((it) => c.name.includes(it))) { + continue + } + } + ctx.info('drop indexe', { domain, name: c.name, has50Documents }) + await operations.dropIndex(domain, c.name) + } catch (err) { + console.error('error: failed to drop index', { c, err }) + } + } + } + } catch (err: any) { + console.error(err) + } + + if (bb.length > 0) { + ctx.info('created indexes', { domain, bb }) + } + + return true + } +} diff --git a/server/core/src/server/index.ts b/server/core/src/server/index.ts index 1e6269b02b..da4651d58a 100644 --- a/server/core/src/server/index.ts +++ b/server/core/src/server/index.ts @@ -15,20 +15,20 @@ // import core, { - type Class, DOMAIN_DOC_INDEX_STATE, DOMAIN_TX, - type Doc, Hierarchy, + ModelDb, + WorkspaceEvent, + generateId, + type Class, + type Doc, type IndexingUpdateEvent, type MeasureContext, - ModelDb, type Ref, type ServerStorage, type TxWorkspaceEvent, - WorkspaceEvent, - type WorkspaceId, - generateId + type WorkspaceId } from '@hcengineering/core' import { type DbAdapter, type TxAdapter } from '../adapter' import { type DbConfiguration } from '../configuration' @@ -39,6 +39,7 @@ import { createServiceAdaptersManager } from '../service' import { type StorageAdapter } from '../storage' import { Triggers } from '../triggers' import { type ServerStorageOptions } from '../types' +import { DomainIndexHelperImpl } from './domainHelper' import { TServerStorage } from './storage' /** @@ -66,6 +67,11 @@ export async function createServerStorage ( } }) + await ctx.with('init-adapters', {}, async (ctx) => { + for (const adapter of adapters.values()) { + await adapter.init?.() + } + }) const txAdapter = adapters.get(conf.domains[DOMAIN_TX]) as TxAdapter const model = await ctx.with('get model', {}, async (ctx) => { @@ -157,6 +163,9 @@ export async function createServerStorage ( options.upgrade ?? false ) } + + const domainHelper = new DomainIndexHelperImpl(hierarchy, modelDb) + return new TServerStorage( conf.domains, conf.defaultAdapter, @@ -171,7 +180,8 @@ export async function createServerStorage ( indexFactory, options, metrics, - model + model, + domainHelper ) } @@ -198,3 +208,4 @@ export function createNullStorageFactory (): StorageAdapter { } export { AggregatorStorageAdapter, buildStorage } from './aggregator' +export { DomainIndexHelperImpl } from './domainHelper' diff --git a/server/core/src/server/storage.ts b/server/core/src/server/storage.ts index 359100c66d..712b58fb43 100644 --- a/server/core/src/server/storage.ts +++ b/server/core/src/server/storage.ts @@ -59,8 +59,9 @@ import core, { import { getResource, type Metadata } from '@hcengineering/platform' import { LiveQuery as LQ } from '@hcengineering/query' import crypto from 'node:crypto' -import { type DbAdapter } from '../adapter' +import { type DbAdapter, type DomainHelper } from '../adapter' import { type FullTextIndex } from '../fulltext' +import { DummyDbAdapter } from '../mem' import serverCore from '../plugin' import { type ServiceAdaptersManager } from '../service' import { type StorageAdapter } from '../storage' @@ -86,6 +87,17 @@ export class TServerStorage implements ServerStorage { liveQuery: LQ + domainInfo = new Map< + Domain, + { + exists: boolean + checkPromise: Promise + lastCheck: number + } + >() + + emptyAdapter = new DummyDbAdapter() + constructor ( private readonly _domains: Record, private readonly defaultAdapter: string, @@ -99,8 +111,9 @@ export class TServerStorage implements ServerStorage { private readonly workspace: WorkspaceIdWithUrl, readonly indexFactory: (storage: ServerStorage) => FullTextIndex, readonly options: ServerStorageOptions, - metrics: MeasureContext, - readonly model: Tx[] + readonly metrics: MeasureContext, + readonly model: Tx[], + readonly domainHelper: DomainHelper ) { this.liveQuery = new LQ(this.newCastClient(hierarchy, modelDb, metrics)) this.hierarchy = hierarchy @@ -147,6 +160,13 @@ export class TServerStorage implements ServerStorage { async close (): Promise { await this.fulltext.close() + for (const [domain, info] of this.domainInfo.entries()) { + if (info.checkPromise !== undefined) { + console.log('wait for check domain', domain) + // We need to be sure we wait for check to be complete + await info.checkPromise + } + } for (const o of this.adapters.values()) { await o.close() } @@ -154,12 +174,34 @@ export class TServerStorage implements ServerStorage { await this.serviceAdaptersManager.close() } - private getAdapter (domain: Domain): DbAdapter { + private getAdapter (domain: Domain, requireExists: boolean): DbAdapter { const name = this._domains[domain] ?? this.defaultAdapter const adapter = this.adapters.get(name) if (adapter === undefined) { throw new Error('adapter not provided: ' + name) } + + const helper = adapter.helper?.() + if (helper !== undefined) { + let info = this.domainInfo.get(domain) + if (info == null || Date.now() - info.lastCheck > 5 * 60 * 1000) { + // Re-check every 5 minutes + const exists = helper.exists(domain) + // We will create necessary indexes if required, and not touch collection if not required. + info = { + exists, + lastCheck: Date.now(), + checkPromise: this.domainHelper.checkDomain(this.metrics, domain, requireExists, helper) + } + this.domainInfo.set(domain, info) + } + if (!info.exists && !requireExists) { + return this.emptyAdapter + } + // If we require it exists, it will be exists + info.exists = true + } + return adapter } @@ -171,7 +213,7 @@ export class TServerStorage implements ServerStorage { if (part.length > 0) { // Find all deleted documents - const adapter = this.getAdapter(lastDomain as Domain) + const adapter = this.getAdapter(lastDomain as Domain, true) const toDelete = part.filter((it) => it._class === core.class.TxRemoveDoc).map((it) => it.objectId) if (toDelete.length > 0) { @@ -397,7 +439,7 @@ export class TServerStorage implements ServerStorage { p + '-find-all', { _class: clazz }, (ctx) => { - return this.getAdapter(domain).findAll(ctx, clazz, query, options) + return this.getAdapter(domain, false).findAll(ctx, clazz, query, options) }, { clazz, query, options } ) @@ -860,7 +902,7 @@ export class TServerStorage implements ServerStorage { await this.triggers.tx(tx) await this.modelDb.tx(tx) } - await ctx.with('domain-tx', {}, async (ctx) => await this.getAdapter(DOMAIN_TX).tx(ctx.ctx, ...txToStore)) + await ctx.with('domain-tx', {}, async (ctx) => await this.getAdapter(DOMAIN_TX, true).tx(ctx.ctx, ...txToStore)) result.push(...(await ctx.with('apply', {}, (ctx) => this.routeTx(ctx.ctx, removedMap, ...txToProcess)))) // invoke triggers and store derived objects @@ -891,18 +933,18 @@ export class TServerStorage implements ServerStorage { } find (ctx: MeasureContext, domain: Domain): StorageIterator { - return this.getAdapter(domain).find(ctx, domain) + return this.getAdapter(domain, false).find(ctx, domain) } async load (ctx: MeasureContext, domain: Domain, docs: Ref[]): Promise { - return await this.getAdapter(domain).load(ctx, domain, docs) + return await this.getAdapter(domain, false).load(ctx, domain, docs) } async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise { - await this.getAdapter(domain).upload(ctx, domain, docs) + await this.getAdapter(domain, true).upload(ctx, domain, docs) } async clean (ctx: MeasureContext, domain: Domain, docs: Ref[]): Promise { - await this.getAdapter(domain).clean(ctx, domain, docs) + await this.getAdapter(domain, true).clean(ctx, domain, docs) } } diff --git a/server/mongo/src/__tests__/minmodel.ts b/server/mongo/src/__tests__/minmodel.ts index 6f7f58e73f..2580f4d375 100644 --- a/server/mongo/src/__tests__/minmodel.ts +++ b/server/mongo/src/__tests__/minmodel.ts @@ -220,5 +220,14 @@ export function genMinModel (): TxCUD[] { members: [u1] }) ) + + txes.push( + createClass(core.class.DomainIndexConfiguration, { + label: 'DomainIndexConfiguration' as IntlString, + extends: core.class.Doc, + kind: ClassifierKind.CLASS, + domain: DOMAIN_MODEL + }) + ) return txes } diff --git a/server/mongo/src/__tests__/storage.test.ts b/server/mongo/src/__tests__/storage.test.ts index 00e26955f6..f011df1af0 100644 --- a/server/mongo/src/__tests__/storage.test.ts +++ b/server/mongo/src/__tests__/storage.test.ts @@ -35,7 +35,8 @@ import core, { type WorkspaceId, type SessionOperationContext, type ParamsType, - type FullParamsType + type FullParamsType, + type ServerStorage } from '@hcengineering/core' import { type ContentTextAdapter, @@ -89,6 +90,7 @@ describe('mongo operations', () => { let model: ModelDb let client: Client let operations: TxOperations + let serverStorage: ServerStorage beforeAll(async () => { mongoClient = getMongoClient(mongodbUri) @@ -106,6 +108,7 @@ describe('mongo operations', () => { try { await (await mongoClient.getClient()).db(dbId).dropDatabase() } catch (eee) {} + await serverStorage.close() }) async function initDb (): Promise { @@ -172,7 +175,7 @@ describe('mongo operations', () => { storageFactory: () => createNullStorageFactory() } const ctx = new MeasureMetricsContext('client', {}) - const serverStorage = await createServerStorage(ctx, conf, { + serverStorage = await createServerStorage(ctx, conf, { upgrade: false, broadcast: () => {} }) diff --git a/server/mongo/src/storage.ts b/server/mongo/src/storage.ts index 8ef338f3e1..fc33fda369 100644 --- a/server/mongo/src/storage.ts +++ b/server/mongo/src/storage.ts @@ -57,7 +57,13 @@ import core, { type WithLookup, type WorkspaceId } from '@hcengineering/core' -import { estimateDocSize, updateHashForDoc, type DbAdapter, type TxAdapter } from '@hcengineering/server-core' +import { + estimateDocSize, + updateHashForDoc, + type DbAdapter, + type DomainHelperOperations, + type TxAdapter +} from '@hcengineering/server-core' import { calculateObjectSize } from 'bson' import { createHash } from 'crypto' import { @@ -70,7 +76,7 @@ import { type Sort, type UpdateFilter } from 'mongodb' -import { getMongoClient, getWorkspaceDB, type MongoClientReference } from './utils' +import { DBCollectionHelper, getMongoClient, getWorkspaceDB, type MongoClientReference } from './utils' function translateDoc (doc: Doc): Document { return { ...doc, '%hash%': null } @@ -107,19 +113,31 @@ export async function toArray (cursor: AbstractCursor): Promise { } abstract class MongoAdapterBase implements DbAdapter { + _db: DBCollectionHelper + constructor ( protected readonly db: Db, protected readonly hierarchy: Hierarchy, protected readonly modelDb: ModelDb, protected readonly client: MongoClientReference - ) {} + ) { + this._db = new DBCollectionHelper(db) + } - async init (): Promise {} + abstract init (): Promise + + collection(domain: Domain): Collection { + return this._db.collection(domain) + } + + helper (): DomainHelperOperations { + return this._db + } async createIndexes (domain: Domain, config: Pick, 'indexes'>): Promise { for (const vv of config.indexes) { try { - await this.db.collection(domain).createIndex(vv) + await this.collection(domain).createIndex(vv) } catch (err: any) { console.error('failed to create index', domain, vv, err) } @@ -128,12 +146,11 @@ abstract class MongoAdapterBase implements DbAdapter { async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise { try { - const existingIndexes = await this.db.collection(domain).indexes() + const existingIndexes = await this.collection(domain).indexes() for (const existingIndex of existingIndexes) { const name: string = existingIndex.name if (deletePattern.test(name) && !keepPattern.test(name)) { - console.log('removing old index', name, keepPattern) - await this.db.collection(domain).dropIndex(existingIndex.name) + await this.collection(domain).dropIndex(existingIndex.name) } } } catch (err: any) { @@ -465,17 +482,17 @@ abstract class MongoAdapterBase implements DbAdapter { // const domain = this.hierarchy.getDomain(clazz) const domain = options?.domain ?? this.hierarchy.getDomain(clazz) - const cursor = this.db.collection(domain).aggregate(pipeline, { + const cursor = this.collection(domain).aggregate>(pipeline, { checkKeys: false, enableUtf8Validation: false }) let result: WithLookup[] = [] let total = options?.total === true ? 0 : -1 try { - result = (await ctx.with('toArray', {}, async (ctx) => await toArray(cursor), { + result = await ctx.with('toArray', {}, async (ctx) => await toArray(cursor), { domain, pipeline - })) as any[] + }) } catch (e) { console.error('error during executing cursor in findWithPipeline', clazz, cutObjectArray(query), options, e) throw e @@ -488,7 +505,7 @@ abstract class MongoAdapterBase implements DbAdapter { } if (options?.total === true) { totalPipeline.push({ $count: 'total' }) - const totalCursor = this.db.collection(domain).aggregate(totalPipeline, { + const totalCursor = this.collection(domain).aggregate(totalPipeline, { checkKeys: false }) const arr = await toArray(totalCursor) @@ -590,7 +607,7 @@ abstract class MongoAdapterBase implements DbAdapter { return await ctx.with('pipeline', {}, async (ctx) => await this.findWithPipeline(ctx, _class, query, options)) } const domain = options?.domain ?? this.hierarchy.getDomain(_class) - const coll = this.db.collection(domain) + const coll = this.collection(domain) const mongoQuery = this.translateQuery(_class, query) let cursor = coll.find(mongoQuery, { @@ -801,7 +818,7 @@ abstract class MongoAdapterBase implements DbAdapter { async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise { await ctx.with('upload', { domain }, async () => { - const coll = this.db.collection(domain) + const coll = this.collection(domain) await uploadDocuments(ctx, docs, coll) }) @@ -809,7 +826,7 @@ abstract class MongoAdapterBase implements DbAdapter { async update (ctx: MeasureContext, domain: Domain, operations: Map, DocumentUpdate>): Promise { await ctx.with('update', { domain }, async () => { - const coll = this.db.collection(domain) + const coll = this.collection(domain) // remove old and insert new ones const ops = Array.from(operations.entries()) @@ -866,6 +883,10 @@ interface DomainOperation { } class MongoAdapter extends MongoAdapterBase { + async init (): Promise { + await this._db.init() + } + getOperations (tx: Tx): DomainOperation | undefined { switch (tx._class) { case core.class.TxCreateDoc: @@ -981,7 +1002,7 @@ class MongoAdapter extends MongoAdapterBase { protected txRemoveDoc (tx: TxRemoveDoc): DomainOperation { const domain = this.hierarchy.getDomain(tx.objectClass) return { - raw: () => this.db.collection(domain).deleteOne({ _id: tx.objectId }), + raw: async () => await this.collection(domain).deleteOne({ _id: tx.objectId }), domain, bulk: [{ deleteOne: { filter: { _id: tx.objectId } } }] } @@ -1011,14 +1032,14 @@ class MongoAdapter extends MongoAdapterBase { } ] return { - raw: async () => await this.db.collection(domain).bulkWrite(ops), + raw: async () => await this.collection(domain).bulkWrite(ops), domain, bulk: ops } } const update = { ...this.translateMixinAttrs(tx.mixin, tx.attributes), $set: { ...modifyOp } } return { - raw: async () => await this.db.collection(domain).updateOne(filter, update), + raw: async () => await this.collection(domain).updateOne(filter, update), domain, bulk: [ { @@ -1032,7 +1053,7 @@ class MongoAdapter extends MongoAdapterBase { } const update = { $set: { ...this.translateMixinAttrs(tx.mixin, tx.attributes), ...modifyOp } } return { - raw: async () => await this.db.collection(domain).updateOne(filter, update), + raw: async () => await this.collection(domain).updateOne(filter, update), domain, bulk: [ { @@ -1070,7 +1091,7 @@ class MongoAdapter extends MongoAdapterBase { const domain = this.hierarchy.getDomain(doc._class) const tdoc = translateDoc(doc) return { - raw: async () => await this.db.collection(domain).insertOne(tdoc), + raw: async () => await this.collection(domain).insertOne(tdoc), domain, bulk: [ { @@ -1123,7 +1144,7 @@ class MongoAdapter extends MongoAdapterBase { } ] return { - raw: async () => await this.db.collection(domain).bulkWrite(ops), + raw: async () => await this.collection(domain).bulkWrite(ops), domain, bulk: ops } @@ -1160,14 +1181,14 @@ class MongoAdapter extends MongoAdapterBase { } ] return { - raw: async () => await this.db.collection(domain).bulkWrite(ops), + raw: async () => await this.collection(domain).bulkWrite(ops), domain, bulk: ops } } else { if (tx.retrieve === true) { const raw = async (): Promise => { - const result = await this.db.collection(domain).findOneAndUpdate( + const result = await this.collection(domain).findOneAndUpdate( { _id: tx.objectId }, { ...tx.operations, @@ -1197,7 +1218,7 @@ class MongoAdapter extends MongoAdapterBase { } } return { - raw: async () => await this.db.collection(domain).updateOne(filter, update), + raw: async () => await this.collection(domain).updateOne(filter, update), domain, bulk: [{ updateOne: { filter, update } }] } @@ -1221,7 +1242,7 @@ class MongoAdapter extends MongoAdapterBase { .findOneAndUpdate(filter, update, { returnDocument: 'after', includeResultMetadata: true }) return { object: result.value } } - : async () => await this.db.collection(domain).updateOne(filter, update) + : async () => await this.collection(domain).updateOne(filter, update) // Disable bulk for operators return { @@ -1236,6 +1257,10 @@ class MongoAdapter extends MongoAdapterBase { class MongoTxAdapter extends MongoAdapterBase implements TxAdapter { txColl: Collection | undefined + async init (): Promise { + await this._db.init(DOMAIN_TX) + } + override async tx (ctx: MeasureContext, ...tx: Tx[]): Promise { if (tx.length === 0) { return [] diff --git a/server/mongo/src/utils.ts b/server/mongo/src/utils.ts index 262860f80c..6bb9ba7bfe 100644 --- a/server/mongo/src/utils.ts +++ b/server/mongo/src/utils.ts @@ -13,9 +13,10 @@ // limitations under the License. // -import { toWorkspaceString, type WorkspaceId } from '@hcengineering/core' +import { toWorkspaceString, type Doc, type Domain, type FieldIndex, type WorkspaceId } from '@hcengineering/core' import { PlatformError, unknownStatus } from '@hcengineering/platform' -import { type Db, MongoClient, type MongoClientOptions } from 'mongodb' +import { type DomainHelperOperations } from '@hcengineering/server-core' +import { MongoClient, type Collection, type Db, type Document, type MongoClientOptions } from 'mongodb' const connections = new Map() @@ -135,3 +136,67 @@ export function getMongoClient (uri: string, options?: MongoClientOptions): Mong export function getWorkspaceDB (client: MongoClient, workspaceId: WorkspaceId): Db { return client.db(toWorkspaceString(workspaceId)) } + +export class DBCollectionHelper implements DomainHelperOperations { + collections = new Map>() + constructor (readonly db: Db) {} + + async init (domain?: Domain): Promise { + if (domain === undefined) { + // Init existing collecfions + for (const c of (await this.db.listCollections({}, { nameOnly: true }).toArray()).map((it) => it.name)) { + this.collections.set(c, this.db.collection(c)) + } + } else { + this.collections.set(domain, this.db.collection(domain)) + } + } + + collection(domain: Domain): Collection { + let info = this.collections.get(domain) + if (info === undefined) { + info = this.db.collection(domain as string) + this.collections.set(domain, info) + } + return info + } + + async create (domain: Domain): Promise { + if (this.collections.get(domain) === undefined) { + const coll = this.db.collection(domain as string) + this.collections.set(domain, coll) + + while (true) { + const exists = await this.db.listCollections({ name: domain }).next() + if (exists === undefined) { + console.log('check connection to be created', domain) + await new Promise((resolve) => { + setTimeout(resolve) + }) + } else { + break + } + } + } + } + + exists (domain: Domain): boolean { + return this.collections.has(domain) + } + + async createIndex (domain: Domain, value: string | FieldIndex, options?: { name: string }): Promise { + await this.collection(domain).createIndex(value, options) + } + + async dropIndex (domain: Domain, name: string): Promise { + await this.collection(domain).dropIndex(name) + } + + async listIndexes (domain: Domain): Promise<{ name: string }[]> { + return await this.collection(domain).listIndexes().toArray() + } + + async hasDocuments (domain: Domain, count: number): Promise { + return (await this.collection(domain).countDocuments({}, { limit: count })) >= count + } +} diff --git a/server/tool/src/index.ts b/server/tool/src/index.ts index 27ac472583..55681de78e 100644 --- a/server/tool/src/index.ts +++ b/server/tool/src/index.ts @@ -17,16 +17,12 @@ import contact from '@hcengineering/contact' import core, { BackupClient, Client as CoreClient, - Doc, - Domain, DOMAIN_MIGRATION, DOMAIN_MODEL, + DOMAIN_TRANSIENT, DOMAIN_TX, - FieldIndex, groupByArray, Hierarchy, - IndexKind, - IndexOrder, MeasureContext, MigrationState, ModelDb, @@ -34,15 +30,14 @@ import core, { WorkspaceId } from '@hcengineering/core' import { consoleModelLogger, MigrateOperation, ModelLogger } from '@hcengineering/model' -import { createMongoTxAdapter, getMongoClient, getWorkspaceDB } from '@hcengineering/mongo' +import { createMongoTxAdapter, DBCollectionHelper, getMongoClient, getWorkspaceDB } from '@hcengineering/mongo' import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server' -import { StorageAdapter, StorageConfiguration } from '@hcengineering/server-core' +import { DomainIndexHelperImpl, StorageAdapter, StorageConfiguration } from '@hcengineering/server-core' import { Db, Document } from 'mongodb' import { connect } from './connect' import toolPlugin from './plugin' import { MigrateClientImpl } from './upgrade' -import { deepEqual } from 'fast-equals' import fs from 'fs' import path from 'path' @@ -387,162 +382,27 @@ async function createUpdateIndexes ( logger: ModelLogger, progress: (value: number) => Promise ): Promise { - const classes = await ctx.with('find-classes', {}, async () => await connection.findAll(core.class.Class, {})) - - const domainConfigurations = await ctx.with( - 'find-domain-configs', - {}, - async () => await connection.findAll(core.class.DomainIndexConfiguration, {}) - ) - - const hierarchy = connection.getHierarchy() - const domains = new Map>>() - // Find all domains and indexed fields inside - for (const c of classes) { - try { - const domain = hierarchy.findDomain(c._id) - if (domain === undefined || domain === DOMAIN_MODEL) { - continue - } - const attrs = hierarchy.getAllAttributes(c._id) - const domainAttrs = domains.get(domain) ?? new Set>() - for (const a of attrs.values()) { - if (a.index !== undefined && (a.index === IndexKind.Indexed || a.index === IndexKind.IndexedDsc)) { - if (a.index === IndexKind.Indexed) { - domainAttrs.add(a.name) - } else { - domainAttrs.add({ [a.name]: IndexOrder.Descending }) - } - } - } - - // Handle extra configurations - if (hierarchy.hasMixin(c, core.mixin.IndexConfiguration)) { - const config = hierarchy.as(c, core.mixin.IndexConfiguration) - for (const attr of config.indexes) { - domainAttrs.add(attr) - } - } - - domains.set(domain, domainAttrs) - } catch (err: any) { - // Ignore, since we have classes without domain. - } - } - - const collections = await ctx.with( - 'list-collections', - {}, - async () => await db.listCollections({}, { nameOnly: true }).toArray() - ) + const domainHelper = new DomainIndexHelperImpl(connection.getHierarchy(), connection.getModel()) + const dbHelper = new DBCollectionHelper(db) + await dbHelper.init() let completed = 0 - const allDomains = Array.from(domains.entries()) - for (const [d, v] of allDomains) { - const cfg = domainConfigurations.find((it) => it.domain === d) - - const collInfo = collections.find((it) => it.name === d) - - if (cfg?.disableCollection === true && collInfo != null) { - try { - await db.dropCollection(d) - } catch (err) { - logger.error('error: failed to delete collection', { d, err }) - } + const allDomains = connection.getHierarchy().domains() + for (const domain of allDomains) { + if (domain === DOMAIN_MODEL || domain === DOMAIN_TRANSIENT) { continue } - - if (collInfo == null) { - await ctx.with('create-collection', { d }, async () => await db.createCollection(d)) - } - const collection = db.collection(d) - const bb: (string | FieldIndex)[] = [] - const added = new Set() - - const allIndexes = (await collection.listIndexes().toArray()).filter((it) => it.name !== '_id_') - - for (const vv of [...v.values(), ...(cfg?.indexes ?? [])]) { + const result = await domainHelper.checkDomain(ctx, domain, false, dbHelper) + if (!result && dbHelper.exists(domain)) { try { - const name = - typeof vv === 'string' - ? `${vv}_1` - : Object.entries(vv) - .map(([key, val]) => `${key}_${val}`) - .join('_') - - // Check if index is disabled or not - const isDisabled = - cfg?.disabled?.some((it) => { - const _it = typeof it === 'string' ? { [it]: 1 } : it - const _vv = typeof vv === 'string' ? { [vv]: 1 } : vv - return deepEqual(_it, _vv) - }) ?? false - if (isDisabled) { - // skip index since it is disabled - continue + logger.log('dropping domain', { domain }) + if ((await db.collection(domain).countDocuments({})) === 0) { + await db.dropCollection(domain) } - if (added.has(name)) { - // Index already added - continue - } - added.add(name) - - const existingOne = allIndexes.findIndex((it) => it.name === name) - if (existingOne !== -1) { - allIndexes.splice(existingOne, 1) - } - const exists = existingOne !== -1 - // Check if index exists - if (!exists) { - if (!isDisabled) { - // Check if not disabled - bb.push(vv) - await collection.createIndex(vv, { - background: true, - name - }) - } - } - } catch (err: any) { - logger.error('error: failed to create index', { d, vv, err }) + } catch (err) { + logger.error('error: failed to delete collection', { domain, err }) } } - if (allIndexes.length > 0) { - for (const c of allIndexes) { - try { - if (cfg?.skip !== undefined) { - if (Array.from(cfg.skip ?? []).some((it) => c.name.includes(it))) { - continue - } - } - logger.log('drop unused indexes', { name: c.name }) - await collection.dropIndex(c.name) - } catch (err) { - console.error('error: failed to drop index', { c, err }) - } - } - } - - if (bb.length > 0) { - logger.log('created indexes', { d, bb }) - } - - const pos = collections.findIndex((it) => it.name === d) - if (pos !== -1) { - collections.splice(pos, 1) - } - completed++ await progress((100 / allDomains.length) * completed) } - if (collections.length > 0) { - // We could drop unused collections. - for (const c of collections) { - try { - logger.log('drop unused collection', { name: c.name }) - await db.dropCollection(c.name) - } catch (err) { - console.error('error: failed to drop collection', { c, err }) - } - } - } } diff --git a/server/ws/src/client.ts b/server/ws/src/client.ts index e93699c19b..1e39e4a888 100644 --- a/server/ws/src/client.ts +++ b/server/ws/src/client.ts @@ -119,8 +119,10 @@ export class ClientSession implements Session { await this._pipeline.tx(context, createTx) const acc = TxProcessor.createDoc2Doc(createTx) await ctx.sendResponse(acc) + return } else { await ctx.sendResponse(systemAccount[0]) + return } } await ctx.sendResponse(account[0])