UBERF-6893: Move index build into workspace usage. (#5586)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-05-14 13:40:14 +07:00 committed by GitHub
parent 44ac774d1e
commit 4e1169a21e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 529 additions and 225 deletions

View File

@ -1,7 +1,6 @@
import { dropWorkspace, setWorkspaceDisabled, type Workspace } from '@hcengineering/account'
import core, { AccountRole, type MeasureContext, MeasureMetricsContext, SortingOrder } from '@hcengineering/core'
import { dropWorkspaceFull, setWorkspaceDisabled, type Workspace } from '@hcengineering/account'
import core, { AccountRole, MeasureMetricsContext, SortingOrder, type MeasureContext } from '@hcengineering/core'
import contact from '@hcengineering/model-contact'
import { getWorkspaceDB } from '@hcengineering/mongo'
import { type StorageAdapter } from '@hcengineering/server-core'
import { connect } from '@hcengineering/server-tool'
import { type Db, type MongoClient } from 'mongodb'
@ -66,18 +65,14 @@ export async function checkOrphanWorkspaces (
await setWorkspaceDisabled(db, ws._id, true)
}
if (cmd.remove) {
await dropWorkspace(new MeasureMetricsContext('tool', {}), db, productId, ws.workspace)
const workspaceDb = getWorkspaceDB(client, { name: ws.workspace, productId })
await workspaceDb.dropDatabase()
if (storageAdapter !== undefined && hasBucket) {
const docs = await storageAdapter.list(ctx, wspace)
await storageAdapter.remove(
ctx,
wspace,
docs.map((it) => it._id)
)
await storageAdapter.delete(ctx, wspace)
}
await dropWorkspaceFull(
new MeasureMetricsContext('tool', {}),
db,
client,
productId,
ws.workspace,
storageAdapter
)
}
}
}

View File

@ -22,9 +22,11 @@ import {
createWorkspace,
dropAccount,
dropWorkspace,
dropWorkspaceFull,
getAccount,
getWorkspaceById,
listAccounts,
listWorkspacesByAccount,
listWorkspacesPure,
listWorkspacesRaw,
replacePassword,
@ -79,10 +81,10 @@ import {
} from './clean'
import { checkOrphanWorkspaces } from './cleanOrphan'
import { changeConfiguration } from './configuration'
import { fixJsonMarkup } from './markup'
import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin'
import { openAIConfig } from './openai'
import { fixAccountEmails, renameAccount } from './renameAccount'
import { fixJsonMarkup } from './markup'
const colorConstants = {
colorRed: '\u001b[31m',
@ -395,15 +397,65 @@ export function devTool (
program
.command('drop-workspace <name>')
.description('drop workspace')
.action(async (workspace, cmd) => {
const { mongodbUri } = prepareTools()
await withDatabase(mongodbUri, async (db) => {
.option('--full [full]', 'Force remove all data', false)
.action(async (workspace, cmd: { full: boolean }) => {
const { mongodbUri, storageAdapter } = prepareTools()
await withDatabase(mongodbUri, async (db, client) => {
const ws = await getWorkspaceById(db, productId, workspace)
if (ws === null) {
console.log('no workspace exists')
return
}
await dropWorkspace(toolCtx, db, productId, workspace)
if (cmd.full) {
await dropWorkspaceFull(toolCtx, db, client, productId, workspace, storageAdapter)
} else {
await dropWorkspace(toolCtx, db, productId, workspace)
}
})
})
program
.command('drop-workspace-by-email <email>')
.description('drop workspace')
.option('--full [full]', 'Force remove all data', false)
.action(async (email, cmd: { full: boolean }) => {
const { mongodbUri, storageAdapter } = prepareTools()
await withDatabase(mongodbUri, async (db, client) => {
for (const workspace of await listWorkspacesByAccount(db, productId, email)) {
if (cmd.full) {
await dropWorkspaceFull(toolCtx, db, client, productId, workspace.workspace, storageAdapter)
} else {
await dropWorkspace(toolCtx, db, productId, workspace.workspace)
}
}
})
})
program
.command('list-workspace-by-email <email>')
.description('drop workspace')
.option('--full [full]', 'Force remove all data', false)
.action(async (email, cmd: { full: boolean }) => {
const { mongodbUri } = prepareTools()
await withDatabase(mongodbUri, async (db, client) => {
for (const workspace of await listWorkspacesByAccount(db, productId, email)) {
console.log(workspace.workspace, workspace.workspaceUrl, workspace.workspaceName)
}
})
})
program
.command('drop-workspace-last-visit')
.description('drop old workspaces')
.action(async (cmd: any) => {
const { mongodbUri, storageAdapter } = prepareTools()
await withDatabase(mongodbUri, async (db, client) => {
const workspacesJSON = await listWorkspacesPure(db, productId)
for (const ws of workspacesJSON) {
const lastVisit = Math.floor((Date.now() - ws.lastVisit) / 1000 / 3600 / 24)
if (lastVisit > 30) {
await dropWorkspaceFull(toolCtx, db, client, productId, ws.workspace, storageAdapter)
}
}
})
})

View File

@ -50,8 +50,9 @@ import { cloneWorkspace } from '@hcengineering/server-backup'
import { decodeToken, generateToken } from '@hcengineering/server-token'
import toolPlugin, { connect, initModel, upgradeModel } from '@hcengineering/server-tool'
import { pbkdf2Sync, randomBytes } from 'crypto'
import { Binary, Db, Filter, ObjectId } from 'mongodb'
import { Binary, Db, Filter, ObjectId, type MongoClient } from 'mongodb'
import fetch from 'node-fetch'
import type { StorageAdapter } from '../../core/types'
import { accountPlugin } from './plugin'
const WORKSPACE_COLLECTION = 'workspace'
@ -689,6 +690,22 @@ export async function listWorkspaces (
.map(trimWorkspaceInfo)
}
/**
* @public
*/
export async function listWorkspacesByAccount (db: Db, productId: string, email: string): Promise<WorkspaceInfo[]> {
const account = await getAccount(db, email)
return (
await db
.collection<Workspace>(WORKSPACE_COLLECTION)
.find(withProductId(productId, { _id: { $in: account?.workspaces } }))
.toArray()
)
.map((it) => ({ ...it, productId }))
.filter((it) => it.disabled !== true)
.map(trimWorkspaceInfo)
}
/**
* @public
*/
@ -1727,7 +1744,7 @@ export async function dropWorkspace (
db: Db,
productId: string,
workspaceId: string
): Promise<void> {
): Promise<Workspace> {
const ws = await getWorkspaceById(db, productId, workspaceId)
if (ws === null) {
throw new PlatformError(new Status(Severity.ERROR, platform.status.WorkspaceNotFound, { workspace: workspaceId }))
@ -1738,6 +1755,35 @@ export async function dropWorkspace (
.updateMany({ _id: { $in: ws.accounts ?? [] } }, { $pull: { workspaces: ws._id } })
ctx.info('Workspace dropped', { workspace: ws.workspace })
return ws
}
/**
* @public
*/
export async function dropWorkspaceFull (
ctx: MeasureContext,
db: Db,
client: MongoClient,
productId: string,
workspaceId: string,
storageAdapter?: StorageAdapter
): Promise<void> {
const ws = await dropWorkspace(ctx, db, productId, workspaceId)
const workspaceDb = client.db(ws.workspace)
await workspaceDb.dropDatabase()
const wspace = getWorkspaceId(workspaceId, productId)
const hasBucket = await storageAdapter?.exists(ctx, wspace)
if (storageAdapter !== undefined && hasBucket === true) {
const docs = await storageAdapter.list(ctx, wspace)
await storageAdapter.remove(
ctx,
wspace,
docs.map((it) => it._id)
)
await storageAdapter.delete(ctx, wspace)
}
ctx.info('Workspace fully dropped', { workspace: ws.workspace })
}
/**

View File

@ -19,6 +19,7 @@ import {
type DocumentQuery,
type DocumentUpdate,
type Domain,
type FieldIndex,
type FindOptions,
type FindResult,
type Hierarchy,
@ -33,6 +34,24 @@ import {
} from '@hcengineering/core'
import { type StorageAdapter } from './storage'
export interface DomainHelperOperations {
create: (domain: Domain) => Promise<void>
exists: (domain: Domain) => boolean
createIndex: (domain: Domain, value: string | FieldIndex<Doc>, options?: { name: string }) => Promise<void>
dropIndex: (domain: Domain, name: string) => Promise<void>
listIndexes: (domain: Domain) => Promise<{ name: string }[]>
hasDocuments: (domain: Domain, count: number) => Promise<boolean>
}
export interface DomainHelper {
checkDomain: (
ctx: MeasureContext,
domain: Domain,
forceCreate: boolean,
operations: DomainHelperOperations
) => Promise<boolean>
}
/**
* @public
*/
@ -51,6 +70,9 @@ export interface RawDBAdapter {
* @public
*/
export interface DbAdapter {
init?: () => Promise<void>
helper?: () => DomainHelperOperations
createIndexes: (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>) => Promise<void>
removeOldIndex: (domain: Domain, deletePattern: RegExp, keepPattern: RegExp) => Promise<void>

View File

@ -93,6 +93,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
async cancel (): Promise<void> {
this.cancelling = true
clearTimeout(this.updateBroadcast)
clearTimeout(this.skippedReiterationTimeout)
this.triggerIndexing()
await this.indexing

View File

@ -47,6 +47,8 @@ export class DummyDbAdapter implements DbAdapter {
return toFindResult([])
}
async init (): Promise<void> {}
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {}
async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise<void> {}

View File

@ -0,0 +1,169 @@
import type {
Doc,
Domain,
DomainIndexConfiguration,
FieldIndex,
Hierarchy,
MeasureContext,
ModelDb
} from '@hcengineering/core'
import core, { DOMAIN_MODEL, IndexKind, IndexOrder } from '@hcengineering/core'
import { deepEqual } from 'fast-equals'
import type { DomainHelper, DomainHelperOperations } from '../adapter'
export class DomainIndexHelperImpl implements DomainHelper {
domains = new Map<Domain, Set<string | FieldIndex<Doc>>>()
domainConfigurations: DomainIndexConfiguration[] = []
constructor (
readonly hierarchy: Hierarchy,
readonly model: ModelDb
) {
const classes = model.findAllSync(core.class.Class, {})
this.domainConfigurations =
model.findAllSync<DomainIndexConfiguration>(core.class.DomainIndexConfiguration, {}) ?? []
this.domains = new Map<Domain, Set<string | FieldIndex<Doc>>>()
// Find all domains and indexed fields inside
for (const c of classes) {
try {
const domain = hierarchy.findDomain(c._id)
if (domain === undefined || domain === DOMAIN_MODEL) {
continue
}
const attrs = hierarchy.getAllAttributes(c._id)
const domainAttrs = this.domains.get(domain) ?? new Set<string | FieldIndex<Doc>>()
for (const a of attrs.values()) {
if (a.index !== undefined && (a.index === IndexKind.Indexed || a.index === IndexKind.IndexedDsc)) {
if (a.index === IndexKind.Indexed) {
domainAttrs.add(a.name)
} else {
domainAttrs.add({ [a.name]: IndexOrder.Descending })
}
}
}
// Handle extra configurations
if (hierarchy.hasMixin(c, core.mixin.IndexConfiguration)) {
const config = hierarchy.as(c, core.mixin.IndexConfiguration)
for (const attr of config.indexes) {
domainAttrs.add(attr)
}
}
this.domains.set(domain, domainAttrs)
} catch (err: any) {
// Ignore, since we have classes without domain.
}
}
}
/**
* return false if and only if domain underline structures are not required.
*/
async checkDomain (
ctx: MeasureContext,
domain: Domain,
forceCreate: boolean,
operations: DomainHelperOperations
): Promise<boolean> {
const domainInfo = this.domains.get(domain)
const cfg = this.domainConfigurations.find((it) => it.domain === domain)
let exists = operations.exists(domain)
const hasDocuments = exists && (await operations.hasDocuments(domain, 1))
// Drop collection if it exists and should not exists or doesn't have documents.
if (exists && (cfg?.disableCollection === true || (!hasDocuments && !forceCreate))) {
// We do not need this collection
return false
}
if (forceCreate && !exists) {
await operations.create(domain)
console.log('collection will be created', domain)
exists = true
}
if (!exists) {
// Do not need to create, since not force and no documents.
return false
}
const bb: (string | FieldIndex<Doc>)[] = []
const added = new Set<string>()
try {
const has50Documents = await operations.hasDocuments(domain, 50)
const allIndexes = (await operations.listIndexes(domain)).filter((it) => it.name !== '_id_')
console.log('check indexes', domain, has50Documents)
if (has50Documents) {
for (const vv of [...(domainInfo?.values() ?? []), ...(cfg?.indexes ?? [])]) {
try {
const name =
typeof vv === 'string'
? `${vv}_1`
: Object.entries(vv)
.map(([key, val]) => `${key}_${val}`)
.join('_')
// Check if index is disabled or not
const isDisabled =
cfg?.disabled?.some((it) => {
const _it = typeof it === 'string' ? { [it]: 1 } : it
const _vv = typeof vv === 'string' ? { [vv]: 1 } : vv
return deepEqual(_it, _vv)
}) ?? false
if (isDisabled) {
// skip index since it is disabled
continue
}
if (added.has(name)) {
// Index already added
continue
}
added.add(name)
const existingOne = allIndexes.findIndex((it) => it.name === name)
if (existingOne !== -1) {
allIndexes.splice(existingOne, 1)
}
const exists = existingOne !== -1
// Check if index exists
if (!exists) {
if (!isDisabled) {
// Check if not disabled
bb.push(vv)
await operations.createIndex(domain, vv, {
name
})
}
}
} catch (err: any) {
ctx.error('error: failed to create index', { domain, vv, err })
}
}
}
if (allIndexes.length > 0) {
for (const c of allIndexes) {
try {
if (cfg?.skip !== undefined) {
if (Array.from(cfg.skip ?? []).some((it) => c.name.includes(it))) {
continue
}
}
ctx.info('drop indexe', { domain, name: c.name, has50Documents })
await operations.dropIndex(domain, c.name)
} catch (err) {
console.error('error: failed to drop index', { c, err })
}
}
}
} catch (err: any) {
console.error(err)
}
if (bb.length > 0) {
ctx.info('created indexes', { domain, bb })
}
return true
}
}

View File

@ -15,20 +15,20 @@
//
import core, {
type Class,
DOMAIN_DOC_INDEX_STATE,
DOMAIN_TX,
type Doc,
Hierarchy,
ModelDb,
WorkspaceEvent,
generateId,
type Class,
type Doc,
type IndexingUpdateEvent,
type MeasureContext,
ModelDb,
type Ref,
type ServerStorage,
type TxWorkspaceEvent,
WorkspaceEvent,
type WorkspaceId,
generateId
type WorkspaceId
} from '@hcengineering/core'
import { type DbAdapter, type TxAdapter } from '../adapter'
import { type DbConfiguration } from '../configuration'
@ -39,6 +39,7 @@ import { createServiceAdaptersManager } from '../service'
import { type StorageAdapter } from '../storage'
import { Triggers } from '../triggers'
import { type ServerStorageOptions } from '../types'
import { DomainIndexHelperImpl } from './domainHelper'
import { TServerStorage } from './storage'
/**
@ -66,6 +67,11 @@ export async function createServerStorage (
}
})
await ctx.with('init-adapters', {}, async (ctx) => {
for (const adapter of adapters.values()) {
await adapter.init?.()
}
})
const txAdapter = adapters.get(conf.domains[DOMAIN_TX]) as TxAdapter
const model = await ctx.with('get model', {}, async (ctx) => {
@ -157,6 +163,9 @@ export async function createServerStorage (
options.upgrade ?? false
)
}
const domainHelper = new DomainIndexHelperImpl(hierarchy, modelDb)
return new TServerStorage(
conf.domains,
conf.defaultAdapter,
@ -171,7 +180,8 @@ export async function createServerStorage (
indexFactory,
options,
metrics,
model
model,
domainHelper
)
}
@ -198,3 +208,4 @@ export function createNullStorageFactory (): StorageAdapter {
}
export { AggregatorStorageAdapter, buildStorage } from './aggregator'
export { DomainIndexHelperImpl } from './domainHelper'

View File

@ -59,8 +59,9 @@ import core, {
import { getResource, type Metadata } from '@hcengineering/platform'
import { LiveQuery as LQ } from '@hcengineering/query'
import crypto from 'node:crypto'
import { type DbAdapter } from '../adapter'
import { type DbAdapter, type DomainHelper } from '../adapter'
import { type FullTextIndex } from '../fulltext'
import { DummyDbAdapter } from '../mem'
import serverCore from '../plugin'
import { type ServiceAdaptersManager } from '../service'
import { type StorageAdapter } from '../storage'
@ -86,6 +87,17 @@ export class TServerStorage implements ServerStorage {
liveQuery: LQ
domainInfo = new Map<
Domain,
{
exists: boolean
checkPromise: Promise<boolean>
lastCheck: number
}
>()
emptyAdapter = new DummyDbAdapter()
constructor (
private readonly _domains: Record<string, string>,
private readonly defaultAdapter: string,
@ -99,8 +111,9 @@ export class TServerStorage implements ServerStorage {
private readonly workspace: WorkspaceIdWithUrl,
readonly indexFactory: (storage: ServerStorage) => FullTextIndex,
readonly options: ServerStorageOptions,
metrics: MeasureContext,
readonly model: Tx[]
readonly metrics: MeasureContext,
readonly model: Tx[],
readonly domainHelper: DomainHelper
) {
this.liveQuery = new LQ(this.newCastClient(hierarchy, modelDb, metrics))
this.hierarchy = hierarchy
@ -147,6 +160,13 @@ export class TServerStorage implements ServerStorage {
async close (): Promise<void> {
await this.fulltext.close()
for (const [domain, info] of this.domainInfo.entries()) {
if (info.checkPromise !== undefined) {
console.log('wait for check domain', domain)
// We need to be sure we wait for check to be complete
await info.checkPromise
}
}
for (const o of this.adapters.values()) {
await o.close()
}
@ -154,12 +174,34 @@ export class TServerStorage implements ServerStorage {
await this.serviceAdaptersManager.close()
}
private getAdapter (domain: Domain): DbAdapter {
private getAdapter (domain: Domain, requireExists: boolean): DbAdapter {
const name = this._domains[domain] ?? this.defaultAdapter
const adapter = this.adapters.get(name)
if (adapter === undefined) {
throw new Error('adapter not provided: ' + name)
}
const helper = adapter.helper?.()
if (helper !== undefined) {
let info = this.domainInfo.get(domain)
if (info == null || Date.now() - info.lastCheck > 5 * 60 * 1000) {
// Re-check every 5 minutes
const exists = helper.exists(domain)
// We will create necessary indexes if required, and not touch collection if not required.
info = {
exists,
lastCheck: Date.now(),
checkPromise: this.domainHelper.checkDomain(this.metrics, domain, requireExists, helper)
}
this.domainInfo.set(domain, info)
}
if (!info.exists && !requireExists) {
return this.emptyAdapter
}
// If we require it exists, it will be exists
info.exists = true
}
return adapter
}
@ -171,7 +213,7 @@ export class TServerStorage implements ServerStorage {
if (part.length > 0) {
// Find all deleted documents
const adapter = this.getAdapter(lastDomain as Domain)
const adapter = this.getAdapter(lastDomain as Domain, true)
const toDelete = part.filter((it) => it._class === core.class.TxRemoveDoc).map((it) => it.objectId)
if (toDelete.length > 0) {
@ -397,7 +439,7 @@ export class TServerStorage implements ServerStorage {
p + '-find-all',
{ _class: clazz },
(ctx) => {
return this.getAdapter(domain).findAll(ctx, clazz, query, options)
return this.getAdapter(domain, false).findAll(ctx, clazz, query, options)
},
{ clazz, query, options }
)
@ -860,7 +902,7 @@ export class TServerStorage implements ServerStorage {
await this.triggers.tx(tx)
await this.modelDb.tx(tx)
}
await ctx.with('domain-tx', {}, async (ctx) => await this.getAdapter(DOMAIN_TX).tx(ctx.ctx, ...txToStore))
await ctx.with('domain-tx', {}, async (ctx) => await this.getAdapter(DOMAIN_TX, true).tx(ctx.ctx, ...txToStore))
result.push(...(await ctx.with('apply', {}, (ctx) => this.routeTx(ctx.ctx, removedMap, ...txToProcess))))
// invoke triggers and store derived objects
@ -891,18 +933,18 @@ export class TServerStorage implements ServerStorage {
}
find (ctx: MeasureContext, domain: Domain): StorageIterator {
return this.getAdapter(domain).find(ctx, domain)
return this.getAdapter(domain, false).find(ctx, domain)
}
async load (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return await this.getAdapter(domain).load(ctx, domain, docs)
return await this.getAdapter(domain, false).load(ctx, domain, docs)
}
async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise<void> {
await this.getAdapter(domain).upload(ctx, domain, docs)
await this.getAdapter(domain, true).upload(ctx, domain, docs)
}
async clean (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
await this.getAdapter(domain).clean(ctx, domain, docs)
await this.getAdapter(domain, true).clean(ctx, domain, docs)
}
}

View File

@ -220,5 +220,14 @@ export function genMinModel (): TxCUD<Doc>[] {
members: [u1]
})
)
txes.push(
createClass(core.class.DomainIndexConfiguration, {
label: 'DomainIndexConfiguration' as IntlString,
extends: core.class.Doc,
kind: ClassifierKind.CLASS,
domain: DOMAIN_MODEL
})
)
return txes
}

View File

@ -35,7 +35,8 @@ import core, {
type WorkspaceId,
type SessionOperationContext,
type ParamsType,
type FullParamsType
type FullParamsType,
type ServerStorage
} from '@hcengineering/core'
import {
type ContentTextAdapter,
@ -89,6 +90,7 @@ describe('mongo operations', () => {
let model: ModelDb
let client: Client
let operations: TxOperations
let serverStorage: ServerStorage
beforeAll(async () => {
mongoClient = getMongoClient(mongodbUri)
@ -106,6 +108,7 @@ describe('mongo operations', () => {
try {
await (await mongoClient.getClient()).db(dbId).dropDatabase()
} catch (eee) {}
await serverStorage.close()
})
async function initDb (): Promise<void> {
@ -172,7 +175,7 @@ describe('mongo operations', () => {
storageFactory: () => createNullStorageFactory()
}
const ctx = new MeasureMetricsContext('client', {})
const serverStorage = await createServerStorage(ctx, conf, {
serverStorage = await createServerStorage(ctx, conf, {
upgrade: false,
broadcast: () => {}
})

View File

@ -57,7 +57,13 @@ import core, {
type WithLookup,
type WorkspaceId
} from '@hcengineering/core'
import { estimateDocSize, updateHashForDoc, type DbAdapter, type TxAdapter } from '@hcengineering/server-core'
import {
estimateDocSize,
updateHashForDoc,
type DbAdapter,
type DomainHelperOperations,
type TxAdapter
} from '@hcengineering/server-core'
import { calculateObjectSize } from 'bson'
import { createHash } from 'crypto'
import {
@ -70,7 +76,7 @@ import {
type Sort,
type UpdateFilter
} from 'mongodb'
import { getMongoClient, getWorkspaceDB, type MongoClientReference } from './utils'
import { DBCollectionHelper, getMongoClient, getWorkspaceDB, type MongoClientReference } from './utils'
function translateDoc (doc: Doc): Document {
return { ...doc, '%hash%': null }
@ -107,19 +113,31 @@ export async function toArray<T> (cursor: AbstractCursor<T>): Promise<T[]> {
}
abstract class MongoAdapterBase implements DbAdapter {
_db: DBCollectionHelper
constructor (
protected readonly db: Db,
protected readonly hierarchy: Hierarchy,
protected readonly modelDb: ModelDb,
protected readonly client: MongoClientReference
) {}
) {
this._db = new DBCollectionHelper(db)
}
async init (): Promise<void> {}
abstract init (): Promise<void>
collection<TSchema extends Document = Document>(domain: Domain): Collection<TSchema> {
return this._db.collection(domain)
}
helper (): DomainHelperOperations {
return this._db
}
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {
for (const vv of config.indexes) {
try {
await this.db.collection(domain).createIndex(vv)
await this.collection(domain).createIndex(vv)
} catch (err: any) {
console.error('failed to create index', domain, vv, err)
}
@ -128,12 +146,11 @@ abstract class MongoAdapterBase implements DbAdapter {
async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise<void> {
try {
const existingIndexes = await this.db.collection(domain).indexes()
const existingIndexes = await this.collection(domain).indexes()
for (const existingIndex of existingIndexes) {
const name: string = existingIndex.name
if (deletePattern.test(name) && !keepPattern.test(name)) {
console.log('removing old index', name, keepPattern)
await this.db.collection(domain).dropIndex(existingIndex.name)
await this.collection(domain).dropIndex(existingIndex.name)
}
}
} catch (err: any) {
@ -465,17 +482,17 @@ abstract class MongoAdapterBase implements DbAdapter {
// const domain = this.hierarchy.getDomain(clazz)
const domain = options?.domain ?? this.hierarchy.getDomain(clazz)
const cursor = this.db.collection(domain).aggregate(pipeline, {
const cursor = this.collection(domain).aggregate<WithLookup<T>>(pipeline, {
checkKeys: false,
enableUtf8Validation: false
})
let result: WithLookup<T>[] = []
let total = options?.total === true ? 0 : -1
try {
result = (await ctx.with('toArray', {}, async (ctx) => await toArray(cursor), {
result = await ctx.with('toArray', {}, async (ctx) => await toArray(cursor), {
domain,
pipeline
})) as any[]
})
} catch (e) {
console.error('error during executing cursor in findWithPipeline', clazz, cutObjectArray(query), options, e)
throw e
@ -488,7 +505,7 @@ abstract class MongoAdapterBase implements DbAdapter {
}
if (options?.total === true) {
totalPipeline.push({ $count: 'total' })
const totalCursor = this.db.collection(domain).aggregate(totalPipeline, {
const totalCursor = this.collection(domain).aggregate(totalPipeline, {
checkKeys: false
})
const arr = await toArray(totalCursor)
@ -590,7 +607,7 @@ abstract class MongoAdapterBase implements DbAdapter {
return await ctx.with('pipeline', {}, async (ctx) => await this.findWithPipeline(ctx, _class, query, options))
}
const domain = options?.domain ?? this.hierarchy.getDomain(_class)
const coll = this.db.collection(domain)
const coll = this.collection(domain)
const mongoQuery = this.translateQuery(_class, query)
let cursor = coll.find<T>(mongoQuery, {
@ -801,7 +818,7 @@ abstract class MongoAdapterBase implements DbAdapter {
async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise<void> {
await ctx.with('upload', { domain }, async () => {
const coll = this.db.collection(domain)
const coll = this.collection(domain)
await uploadDocuments(ctx, docs, coll)
})
@ -809,7 +826,7 @@ abstract class MongoAdapterBase implements DbAdapter {
async update (ctx: MeasureContext, domain: Domain, operations: Map<Ref<Doc>, DocumentUpdate<Doc>>): Promise<void> {
await ctx.with('update', { domain }, async () => {
const coll = this.db.collection(domain)
const coll = this.collection(domain)
// remove old and insert new ones
const ops = Array.from(operations.entries())
@ -866,6 +883,10 @@ interface DomainOperation {
}
class MongoAdapter extends MongoAdapterBase {
async init (): Promise<void> {
await this._db.init()
}
getOperations (tx: Tx): DomainOperation | undefined {
switch (tx._class) {
case core.class.TxCreateDoc:
@ -981,7 +1002,7 @@ class MongoAdapter extends MongoAdapterBase {
protected txRemoveDoc (tx: TxRemoveDoc<Doc>): DomainOperation {
const domain = this.hierarchy.getDomain(tx.objectClass)
return {
raw: () => this.db.collection(domain).deleteOne({ _id: tx.objectId }),
raw: async () => await this.collection(domain).deleteOne({ _id: tx.objectId }),
domain,
bulk: [{ deleteOne: { filter: { _id: tx.objectId } } }]
}
@ -1011,14 +1032,14 @@ class MongoAdapter extends MongoAdapterBase {
}
]
return {
raw: async () => await this.db.collection(domain).bulkWrite(ops),
raw: async () => await this.collection(domain).bulkWrite(ops),
domain,
bulk: ops
}
}
const update = { ...this.translateMixinAttrs(tx.mixin, tx.attributes), $set: { ...modifyOp } }
return {
raw: async () => await this.db.collection(domain).updateOne(filter, update),
raw: async () => await this.collection(domain).updateOne(filter, update),
domain,
bulk: [
{
@ -1032,7 +1053,7 @@ class MongoAdapter extends MongoAdapterBase {
}
const update = { $set: { ...this.translateMixinAttrs(tx.mixin, tx.attributes), ...modifyOp } }
return {
raw: async () => await this.db.collection(domain).updateOne(filter, update),
raw: async () => await this.collection(domain).updateOne(filter, update),
domain,
bulk: [
{
@ -1070,7 +1091,7 @@ class MongoAdapter extends MongoAdapterBase {
const domain = this.hierarchy.getDomain(doc._class)
const tdoc = translateDoc(doc)
return {
raw: async () => await this.db.collection(domain).insertOne(tdoc),
raw: async () => await this.collection(domain).insertOne(tdoc),
domain,
bulk: [
{
@ -1123,7 +1144,7 @@ class MongoAdapter extends MongoAdapterBase {
}
]
return {
raw: async () => await this.db.collection(domain).bulkWrite(ops),
raw: async () => await this.collection(domain).bulkWrite(ops),
domain,
bulk: ops
}
@ -1160,14 +1181,14 @@ class MongoAdapter extends MongoAdapterBase {
}
]
return {
raw: async () => await this.db.collection(domain).bulkWrite(ops),
raw: async () => await this.collection(domain).bulkWrite(ops),
domain,
bulk: ops
}
} else {
if (tx.retrieve === true) {
const raw = async (): Promise<TxResult> => {
const result = await this.db.collection(domain).findOneAndUpdate(
const result = await this.collection(domain).findOneAndUpdate(
{ _id: tx.objectId },
{
...tx.operations,
@ -1197,7 +1218,7 @@ class MongoAdapter extends MongoAdapterBase {
}
}
return {
raw: async () => await this.db.collection(domain).updateOne(filter, update),
raw: async () => await this.collection(domain).updateOne(filter, update),
domain,
bulk: [{ updateOne: { filter, update } }]
}
@ -1221,7 +1242,7 @@ class MongoAdapter extends MongoAdapterBase {
.findOneAndUpdate(filter, update, { returnDocument: 'after', includeResultMetadata: true })
return { object: result.value }
}
: async () => await this.db.collection(domain).updateOne(filter, update)
: async () => await this.collection(domain).updateOne(filter, update)
// Disable bulk for operators
return {
@ -1236,6 +1257,10 @@ class MongoAdapter extends MongoAdapterBase {
class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
txColl: Collection | undefined
async init (): Promise<void> {
await this._db.init(DOMAIN_TX)
}
override async tx (ctx: MeasureContext, ...tx: Tx[]): Promise<TxResult[]> {
if (tx.length === 0) {
return []

View File

@ -13,9 +13,10 @@
// limitations under the License.
//
import { toWorkspaceString, type WorkspaceId } from '@hcengineering/core'
import { toWorkspaceString, type Doc, type Domain, type FieldIndex, type WorkspaceId } from '@hcengineering/core'
import { PlatformError, unknownStatus } from '@hcengineering/platform'
import { type Db, MongoClient, type MongoClientOptions } from 'mongodb'
import { type DomainHelperOperations } from '@hcengineering/server-core'
import { MongoClient, type Collection, type Db, type Document, type MongoClientOptions } from 'mongodb'
const connections = new Map<string, MongoClientReferenceImpl>()
@ -135,3 +136,67 @@ export function getMongoClient (uri: string, options?: MongoClientOptions): Mong
export function getWorkspaceDB (client: MongoClient, workspaceId: WorkspaceId): Db {
return client.db(toWorkspaceString(workspaceId))
}
export class DBCollectionHelper implements DomainHelperOperations {
collections = new Map<string, Collection<any>>()
constructor (readonly db: Db) {}
async init (domain?: Domain): Promise<void> {
if (domain === undefined) {
// Init existing collecfions
for (const c of (await this.db.listCollections({}, { nameOnly: true }).toArray()).map((it) => it.name)) {
this.collections.set(c, this.db.collection(c))
}
} else {
this.collections.set(domain, this.db.collection(domain))
}
}
collection<TSchema extends Document = Document>(domain: Domain): Collection<TSchema> {
let info = this.collections.get(domain)
if (info === undefined) {
info = this.db.collection(domain as string)
this.collections.set(domain, info)
}
return info
}
async create (domain: Domain): Promise<void> {
if (this.collections.get(domain) === undefined) {
const coll = this.db.collection(domain as string)
this.collections.set(domain, coll)
while (true) {
const exists = await this.db.listCollections({ name: domain }).next()
if (exists === undefined) {
console.log('check connection to be created', domain)
await new Promise<void>((resolve) => {
setTimeout(resolve)
})
} else {
break
}
}
}
}
exists (domain: Domain): boolean {
return this.collections.has(domain)
}
async createIndex (domain: Domain, value: string | FieldIndex<Doc>, options?: { name: string }): Promise<void> {
await this.collection(domain).createIndex(value, options)
}
async dropIndex (domain: Domain, name: string): Promise<void> {
await this.collection(domain).dropIndex(name)
}
async listIndexes (domain: Domain): Promise<{ name: string }[]> {
return await this.collection(domain).listIndexes().toArray()
}
async hasDocuments (domain: Domain, count: number): Promise<boolean> {
return (await this.collection(domain).countDocuments({}, { limit: count })) >= count
}
}

View File

@ -17,16 +17,12 @@ import contact from '@hcengineering/contact'
import core, {
BackupClient,
Client as CoreClient,
Doc,
Domain,
DOMAIN_MIGRATION,
DOMAIN_MODEL,
DOMAIN_TRANSIENT,
DOMAIN_TX,
FieldIndex,
groupByArray,
Hierarchy,
IndexKind,
IndexOrder,
MeasureContext,
MigrationState,
ModelDb,
@ -34,15 +30,14 @@ import core, {
WorkspaceId
} from '@hcengineering/core'
import { consoleModelLogger, MigrateOperation, ModelLogger } from '@hcengineering/model'
import { createMongoTxAdapter, getMongoClient, getWorkspaceDB } from '@hcengineering/mongo'
import { createMongoTxAdapter, DBCollectionHelper, getMongoClient, getWorkspaceDB } from '@hcengineering/mongo'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server'
import { StorageAdapter, StorageConfiguration } from '@hcengineering/server-core'
import { DomainIndexHelperImpl, StorageAdapter, StorageConfiguration } from '@hcengineering/server-core'
import { Db, Document } from 'mongodb'
import { connect } from './connect'
import toolPlugin from './plugin'
import { MigrateClientImpl } from './upgrade'
import { deepEqual } from 'fast-equals'
import fs from 'fs'
import path from 'path'
@ -387,162 +382,27 @@ async function createUpdateIndexes (
logger: ModelLogger,
progress: (value: number) => Promise<void>
): Promise<void> {
const classes = await ctx.with('find-classes', {}, async () => await connection.findAll(core.class.Class, {}))
const domainConfigurations = await ctx.with(
'find-domain-configs',
{},
async () => await connection.findAll(core.class.DomainIndexConfiguration, {})
)
const hierarchy = connection.getHierarchy()
const domains = new Map<Domain, Set<string | FieldIndex<Doc>>>()
// Find all domains and indexed fields inside
for (const c of classes) {
try {
const domain = hierarchy.findDomain(c._id)
if (domain === undefined || domain === DOMAIN_MODEL) {
continue
}
const attrs = hierarchy.getAllAttributes(c._id)
const domainAttrs = domains.get(domain) ?? new Set<string | FieldIndex<Doc>>()
for (const a of attrs.values()) {
if (a.index !== undefined && (a.index === IndexKind.Indexed || a.index === IndexKind.IndexedDsc)) {
if (a.index === IndexKind.Indexed) {
domainAttrs.add(a.name)
} else {
domainAttrs.add({ [a.name]: IndexOrder.Descending })
}
}
}
// Handle extra configurations
if (hierarchy.hasMixin(c, core.mixin.IndexConfiguration)) {
const config = hierarchy.as(c, core.mixin.IndexConfiguration)
for (const attr of config.indexes) {
domainAttrs.add(attr)
}
}
domains.set(domain, domainAttrs)
} catch (err: any) {
// Ignore, since we have classes without domain.
}
}
const collections = await ctx.with(
'list-collections',
{},
async () => await db.listCollections({}, { nameOnly: true }).toArray()
)
const domainHelper = new DomainIndexHelperImpl(connection.getHierarchy(), connection.getModel())
const dbHelper = new DBCollectionHelper(db)
await dbHelper.init()
let completed = 0
const allDomains = Array.from(domains.entries())
for (const [d, v] of allDomains) {
const cfg = domainConfigurations.find((it) => it.domain === d)
const collInfo = collections.find((it) => it.name === d)
if (cfg?.disableCollection === true && collInfo != null) {
try {
await db.dropCollection(d)
} catch (err) {
logger.error('error: failed to delete collection', { d, err })
}
const allDomains = connection.getHierarchy().domains()
for (const domain of allDomains) {
if (domain === DOMAIN_MODEL || domain === DOMAIN_TRANSIENT) {
continue
}
if (collInfo == null) {
await ctx.with('create-collection', { d }, async () => await db.createCollection(d))
}
const collection = db.collection(d)
const bb: (string | FieldIndex<Doc>)[] = []
const added = new Set<string>()
const allIndexes = (await collection.listIndexes().toArray()).filter((it) => it.name !== '_id_')
for (const vv of [...v.values(), ...(cfg?.indexes ?? [])]) {
const result = await domainHelper.checkDomain(ctx, domain, false, dbHelper)
if (!result && dbHelper.exists(domain)) {
try {
const name =
typeof vv === 'string'
? `${vv}_1`
: Object.entries(vv)
.map(([key, val]) => `${key}_${val}`)
.join('_')
// Check if index is disabled or not
const isDisabled =
cfg?.disabled?.some((it) => {
const _it = typeof it === 'string' ? { [it]: 1 } : it
const _vv = typeof vv === 'string' ? { [vv]: 1 } : vv
return deepEqual(_it, _vv)
}) ?? false
if (isDisabled) {
// skip index since it is disabled
continue
logger.log('dropping domain', { domain })
if ((await db.collection(domain).countDocuments({})) === 0) {
await db.dropCollection(domain)
}
if (added.has(name)) {
// Index already added
continue
}
added.add(name)
const existingOne = allIndexes.findIndex((it) => it.name === name)
if (existingOne !== -1) {
allIndexes.splice(existingOne, 1)
}
const exists = existingOne !== -1
// Check if index exists
if (!exists) {
if (!isDisabled) {
// Check if not disabled
bb.push(vv)
await collection.createIndex(vv, {
background: true,
name
})
}
}
} catch (err: any) {
logger.error('error: failed to create index', { d, vv, err })
} catch (err) {
logger.error('error: failed to delete collection', { domain, err })
}
}
if (allIndexes.length > 0) {
for (const c of allIndexes) {
try {
if (cfg?.skip !== undefined) {
if (Array.from(cfg.skip ?? []).some((it) => c.name.includes(it))) {
continue
}
}
logger.log('drop unused indexes', { name: c.name })
await collection.dropIndex(c.name)
} catch (err) {
console.error('error: failed to drop index', { c, err })
}
}
}
if (bb.length > 0) {
logger.log('created indexes', { d, bb })
}
const pos = collections.findIndex((it) => it.name === d)
if (pos !== -1) {
collections.splice(pos, 1)
}
completed++
await progress((100 / allDomains.length) * completed)
}
if (collections.length > 0) {
// We could drop unused collections.
for (const c of collections) {
try {
logger.log('drop unused collection', { name: c.name })
await db.dropCollection(c.name)
} catch (err) {
console.error('error: failed to drop collection', { c, err })
}
}
}
}

View File

@ -119,8 +119,10 @@ export class ClientSession implements Session {
await this._pipeline.tx(context, createTx)
const acc = TxProcessor.createDoc2Doc(createTx)
await ctx.sendResponse(acc)
return
} else {
await ctx.sendResponse(systemAccount[0])
return
}
}
await ctx.sendResponse(account[0])