mirror of
https://github.com/hcengineering/platform.git
synced 2025-01-23 03:49:49 +00:00
UBERF-7692: Move FindAll slow print into mongo adapter (#6152)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
d9b4d11730
commit
c919a14fa5
@ -372,7 +372,8 @@ export function devTool (
|
||||
.command('upgrade-workspace <name>')
|
||||
.description('upgrade workspace')
|
||||
.option('-f|--force [force]', 'Force update', true)
|
||||
.action(async (workspace, cmd: { force: boolean }) => {
|
||||
.option('-i|--indexes [indexes]', 'Force indexes rebuild', false)
|
||||
.action(async (workspace, cmd: { force: boolean, indexes: boolean }) => {
|
||||
const { mongodbUri, version, txes, migrateOperations } = prepareTools()
|
||||
await withDatabase(mongodbUri, async (db) => {
|
||||
const info = await getWorkspaceById(db, productId, workspace)
|
||||
@ -391,7 +392,8 @@ export function devTool (
|
||||
db,
|
||||
info.workspaceUrl ?? info.workspace,
|
||||
consoleModelLogger,
|
||||
cmd.force
|
||||
cmd.force,
|
||||
cmd.indexes
|
||||
)
|
||||
console.log(metricsToString(measureCtx.metrics, 'upgrade', 60), {})
|
||||
console.log('upgrade done')
|
||||
|
@ -310,8 +310,7 @@ export function createModel (builder: Builder): void {
|
||||
stages: 1,
|
||||
_id: 1,
|
||||
modifiedOn: 1
|
||||
},
|
||||
sparse: true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
@ -28,12 +28,12 @@ import {
|
||||
type Doc,
|
||||
type DocumentQuery,
|
||||
type Domain,
|
||||
type IndexingConfiguration,
|
||||
type Markup,
|
||||
type Ref,
|
||||
type Space,
|
||||
type Timestamp,
|
||||
type Tx,
|
||||
type IndexingConfiguration
|
||||
type Tx
|
||||
} from '@hcengineering/core'
|
||||
import {
|
||||
ArrOf,
|
||||
@ -69,15 +69,15 @@ import {
|
||||
type NotificationObjectPresenter,
|
||||
type NotificationPreferencesGroup,
|
||||
type NotificationPreview,
|
||||
type NotificationProvider,
|
||||
type NotificationProviderDefaults,
|
||||
type NotificationProviderSetting,
|
||||
type NotificationStatus,
|
||||
type NotificationTemplate,
|
||||
type NotificationType,
|
||||
type PushSubscription,
|
||||
type PushSubscriptionKeys,
|
||||
type NotificationProvider,
|
||||
type NotificationProviderSetting,
|
||||
type NotificationTypeSetting,
|
||||
type NotificationProviderDefaults
|
||||
type PushSubscription,
|
||||
type PushSubscriptionKeys
|
||||
} from '@hcengineering/notification'
|
||||
import { type Asset, type IntlString } from '@hcengineering/platform'
|
||||
import setting from '@hcengineering/setting'
|
||||
@ -91,7 +91,11 @@ export { notification as default }
|
||||
|
||||
export const DOMAIN_NOTIFICATION = 'notification' as Domain
|
||||
|
||||
@Model(notification.class.BrowserNotification, core.class.Doc, DOMAIN_NOTIFICATION)
|
||||
export const DOMAIN_DOC_NOTIFY = 'notification-dnc' as Domain
|
||||
|
||||
export const DOMAIN_USER_NOTIFY = 'notification-user' as Domain
|
||||
|
||||
@Model(notification.class.BrowserNotification, core.class.Doc, DOMAIN_USER_NOTIFY)
|
||||
export class TBrowserNotification extends TDoc implements BrowserNotification {
|
||||
senderId?: Ref<Account> | undefined
|
||||
tag!: Ref<Doc<Space>>
|
||||
@ -102,7 +106,7 @@ export class TBrowserNotification extends TDoc implements BrowserNotification {
|
||||
status!: NotificationStatus
|
||||
}
|
||||
|
||||
@Model(notification.class.PushSubscription, core.class.Doc, DOMAIN_NOTIFICATION)
|
||||
@Model(notification.class.PushSubscription, core.class.Doc, DOMAIN_USER_NOTIFY)
|
||||
export class TPushSubscription extends TDoc implements PushSubscription {
|
||||
user!: Ref<Account>
|
||||
endpoint!: string
|
||||
@ -185,7 +189,7 @@ export class TNotificationContextPresenter extends TClass implements Notificatio
|
||||
labelPresenter?: AnyComponent
|
||||
}
|
||||
|
||||
@Model(notification.class.DocNotifyContext, core.class.Doc, DOMAIN_NOTIFICATION)
|
||||
@Model(notification.class.DocNotifyContext, core.class.Doc, DOMAIN_DOC_NOTIFY)
|
||||
export class TDocNotifyContext extends TDoc implements DocNotifyContext {
|
||||
@Prop(TypeRef(core.class.Account), core.string.Account)
|
||||
@Index(IndexKind.Indexed)
|
||||
@ -626,6 +630,49 @@ export function createModel (builder: Builder): void {
|
||||
disabled: [{ modifiedOn: 1 }, { modifiedBy: 1 }, { createdBy: 1 }, { isViewed: 1 }, { hidden: 1 }]
|
||||
})
|
||||
|
||||
builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, {
|
||||
domain: DOMAIN_DOC_NOTIFY,
|
||||
indexes: [{ keys: { user: 1 } }],
|
||||
disabled: [
|
||||
{ _class: 1 },
|
||||
{ modifiedOn: 1 },
|
||||
{ modifiedBy: 1 },
|
||||
{ createdBy: 1 },
|
||||
{ isViewed: 1 },
|
||||
{ hidden: 1 },
|
||||
{ createdOn: -1 },
|
||||
{ attachedTo: 1 }
|
||||
]
|
||||
})
|
||||
builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, {
|
||||
domain: DOMAIN_USER_NOTIFY,
|
||||
indexes: [{ keys: { user: 1 } }],
|
||||
disabled: [
|
||||
{ _class: 1 },
|
||||
{ modifiedOn: 1 },
|
||||
{ modifiedBy: 1 },
|
||||
{ createdBy: 1 },
|
||||
{ isViewed: 1 },
|
||||
{ hidden: 1 },
|
||||
{ createdOn: -1 },
|
||||
{ attachedTo: 1 }
|
||||
]
|
||||
})
|
||||
builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, {
|
||||
domain: DOMAIN_USER_NOTIFY,
|
||||
indexes: [],
|
||||
disabled: [
|
||||
{ _class: 1 },
|
||||
{ modifiedOn: 1 },
|
||||
{ modifiedBy: 1 },
|
||||
{ createdBy: 1 },
|
||||
{ isViewed: 1 },
|
||||
{ hidden: 1 },
|
||||
{ createdOn: -1 },
|
||||
{ attachedTo: 1 }
|
||||
]
|
||||
})
|
||||
|
||||
builder.mixin<Class<DocNotifyContext>, IndexingConfiguration<DocNotifyContext>>(
|
||||
notification.class.DocNotifyContext,
|
||||
core.class.Class,
|
||||
|
@ -21,10 +21,15 @@ import {
|
||||
type MigrationClient,
|
||||
type MigrationUpgradeClient
|
||||
} from '@hcengineering/model'
|
||||
import notification, { notificationId, type DocNotifyContext } from '@hcengineering/notification'
|
||||
import notification, {
|
||||
notificationId,
|
||||
NotificationStatus,
|
||||
type BrowserNotification,
|
||||
type DocNotifyContext
|
||||
} from '@hcengineering/notification'
|
||||
import { DOMAIN_PREFERENCE } from '@hcengineering/preference'
|
||||
|
||||
import { DOMAIN_NOTIFICATION } from './index'
|
||||
import { DOMAIN_DOC_NOTIFY, DOMAIN_NOTIFICATION, DOMAIN_USER_NOTIFY } from './index'
|
||||
|
||||
export async function removeNotifications (
|
||||
client: MigrationClient,
|
||||
@ -127,8 +132,46 @@ export const notificationOperation: MigrateOperation = {
|
||||
{
|
||||
state: 'migrate-setting',
|
||||
func: migrateSettings
|
||||
},
|
||||
{
|
||||
state: 'move-doc-notify',
|
||||
func: async (client) => {
|
||||
await client.move(DOMAIN_NOTIFICATION, { _class: notification.class.DocNotifyContext }, DOMAIN_DOC_NOTIFY)
|
||||
}
|
||||
},
|
||||
{
|
||||
state: 'remove-last-view',
|
||||
func: async (client) => {
|
||||
await client.deleteMany(DOMAIN_NOTIFICATION, { _class: 'notification:class:LastView' as any })
|
||||
}
|
||||
},
|
||||
{
|
||||
state: 'remove-notification',
|
||||
func: async (client) => {
|
||||
await client.deleteMany(DOMAIN_NOTIFICATION, { _class: 'notification:class:Notification' as any })
|
||||
}
|
||||
},
|
||||
{
|
||||
state: 'remove-email-notification',
|
||||
func: async (client) => {
|
||||
await client.deleteMany(DOMAIN_NOTIFICATION, { _class: 'notification:class:EmailNotification' as any })
|
||||
}
|
||||
},
|
||||
{
|
||||
state: 'move-user',
|
||||
func: async (client) => {
|
||||
await client.move(
|
||||
DOMAIN_NOTIFICATION,
|
||||
{ _class: { $in: [notification.class.BrowserNotification, notification.class.PushSubscription] } },
|
||||
DOMAIN_USER_NOTIFY
|
||||
)
|
||||
}
|
||||
}
|
||||
])
|
||||
await client.deleteMany<BrowserNotification>(DOMAIN_USER_NOTIFY, {
|
||||
_class: notification.class.BrowserNotification,
|
||||
status: NotificationStatus.Notified
|
||||
})
|
||||
},
|
||||
async upgrade (state: Map<string, Set<string>>, client: () => Promise<MigrationUpgradeClient>): Promise<void> {}
|
||||
}
|
||||
|
@ -83,7 +83,7 @@
|
||||
|
||||
let categoryQueryOptions: Partial<FindOptions<Doc>>
|
||||
$: categoryQueryOptions = {
|
||||
...noLookupOptions(resultOptions),
|
||||
...noLookupSortingOptions(resultOptions),
|
||||
projection: {
|
||||
...resultOptions.projection,
|
||||
_id: 1,
|
||||
@ -151,8 +151,8 @@
|
||||
return newQuery
|
||||
}
|
||||
|
||||
function noLookupOptions (options: FindOptions<Doc>): FindOptions<Doc> {
|
||||
const { lookup, ...resultOptions } = options
|
||||
function noLookupSortingOptions (options: FindOptions<Doc>): FindOptions<Doc> {
|
||||
const { lookup, sort, ...resultOptions } = options
|
||||
return resultOptions
|
||||
}
|
||||
|
||||
|
@ -1188,7 +1188,8 @@ export async function upgradeWorkspace (
|
||||
db: Db,
|
||||
workspaceUrl: string,
|
||||
logger: ModelLogger = consoleModelLogger,
|
||||
forceUpdate: boolean = true
|
||||
forceUpdate: boolean = true,
|
||||
forceIndexes: boolean = false
|
||||
): Promise<string> {
|
||||
const ws = await getWorkspaceByUrl(db, productId, workspaceUrl)
|
||||
if (ws === null) {
|
||||
@ -1218,7 +1219,8 @@ export async function upgradeWorkspace (
|
||||
migrationOperation,
|
||||
logger,
|
||||
false,
|
||||
async (value) => {}
|
||||
async (value) => {},
|
||||
forceIndexes
|
||||
)
|
||||
|
||||
await db.collection(WORKSPACE_COLLECTION).updateOne(
|
||||
|
@ -33,6 +33,7 @@ import {
|
||||
type WorkspaceId
|
||||
} from '@hcengineering/core'
|
||||
import { type StorageAdapter } from './storage'
|
||||
import type { ServerFindOptions } from './types'
|
||||
|
||||
export interface DomainHelperOperations {
|
||||
create: (domain: Domain) => Promise<void>
|
||||
@ -101,9 +102,7 @@ export interface DbAdapter {
|
||||
ctx: MeasureContext,
|
||||
_class: Ref<Class<T>>,
|
||||
query: DocumentQuery<T>,
|
||||
options?: FindOptions<T> & {
|
||||
domain?: Domain // Allow to find for Doc's in specified domain only.
|
||||
}
|
||||
options?: ServerFindOptions<T>
|
||||
) => Promise<FindResult<T>>
|
||||
tx: (ctx: MeasureContext, ...tx: Tx[]) => Promise<TxResult[]>
|
||||
|
||||
|
@ -362,8 +362,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
|
||||
{
|
||||
keys: {
|
||||
['stages.' + st.stageId]: 1
|
||||
},
|
||||
sparse: true
|
||||
}
|
||||
}
|
||||
]
|
||||
})
|
||||
|
@ -49,7 +49,7 @@ export class DomainIndexHelperImpl implements DomainHelper {
|
||||
keys: {
|
||||
[a.name]: a.index === IndexKind.Indexed ? IndexOrder.Ascending : IndexOrder.Descending
|
||||
},
|
||||
sparse: true // Default to sparse indexes
|
||||
sparse: false // Default to non sparse indexes
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -59,7 +59,7 @@ export class DomainIndexHelperImpl implements DomainHelper {
|
||||
const config = hierarchy.as(c, core.mixin.IndexConfiguration)
|
||||
for (const attr of config.indexes) {
|
||||
if (typeof attr === 'string') {
|
||||
domainAttrs.add({ keys: { [attr]: IndexOrder.Ascending }, sparse: true })
|
||||
domainAttrs.add({ keys: { [attr]: IndexOrder.Ascending }, sparse: false })
|
||||
} else {
|
||||
domainAttrs.add(attr)
|
||||
}
|
||||
|
@ -22,7 +22,6 @@ import core, {
|
||||
DOMAIN_TX,
|
||||
TxFactory,
|
||||
TxProcessor,
|
||||
cutObjectArray,
|
||||
toFindResult,
|
||||
type Account,
|
||||
type AttachedDoc,
|
||||
@ -72,6 +71,7 @@ import { type Triggers } from '../triggers'
|
||||
import type {
|
||||
FullTextAdapter,
|
||||
ObjectDDParticipant,
|
||||
ServerFindOptions,
|
||||
ServerStorage,
|
||||
ServerStorageOptions,
|
||||
SessionContext,
|
||||
@ -96,7 +96,7 @@ export class TServerStorage implements ServerStorage {
|
||||
Domain,
|
||||
{
|
||||
exists: boolean
|
||||
checkPromise: Promise<boolean>
|
||||
checkPromise: Promise<boolean> | undefined
|
||||
lastCheck: number
|
||||
}
|
||||
>()
|
||||
@ -196,7 +196,17 @@ export class TServerStorage implements ServerStorage {
|
||||
const helper = adapter.helper?.()
|
||||
if (helper !== undefined) {
|
||||
let info = this.domainInfo.get(domain)
|
||||
if (info == null || Date.now() - info.lastCheck > 5 * 60 * 1000) {
|
||||
if (info == null) {
|
||||
// For first time, lets assume all is fine
|
||||
info = {
|
||||
exists: true,
|
||||
lastCheck: Date.now(),
|
||||
checkPromise: undefined
|
||||
}
|
||||
this.domainInfo.set(domain, info)
|
||||
return adapter
|
||||
}
|
||||
if (Date.now() - info.lastCheck > 5 * 60 * 1000) {
|
||||
// Re-check every 5 minutes
|
||||
const exists = helper.exists(domain)
|
||||
// We will create necessary indexes if required, and not touch collection if not required.
|
||||
@ -434,10 +444,7 @@ export class TServerStorage implements ServerStorage {
|
||||
ctx: MeasureContext,
|
||||
clazz: Ref<Class<T>>,
|
||||
query: DocumentQuery<T>,
|
||||
options?: FindOptions<T> & {
|
||||
domain?: Domain // Allow to find for Doc's in specified domain only.
|
||||
prefix?: string
|
||||
}
|
||||
options?: ServerFindOptions<T>
|
||||
): Promise<FindResult<T>> {
|
||||
const p = options?.prefix ?? 'client'
|
||||
const domain = options?.domain ?? this.hierarchy.getDomain(clazz)
|
||||
@ -447,7 +454,6 @@ export class TServerStorage implements ServerStorage {
|
||||
if (domain === DOMAIN_MODEL) {
|
||||
return this.modelDb.findAllSync(clazz, query, options)
|
||||
}
|
||||
const st = Date.now()
|
||||
const result = await ctx.with(
|
||||
p + '-find-all',
|
||||
{ _class: clazz },
|
||||
@ -456,9 +462,6 @@ export class TServerStorage implements ServerStorage {
|
||||
},
|
||||
{ clazz, query, options }
|
||||
)
|
||||
if (Date.now() - st > 1000) {
|
||||
ctx.error('FindAll', { time: Date.now() - st, clazz, query: cutObjectArray(query), options })
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
|
@ -48,6 +48,10 @@ import { type Readable } from 'stream'
|
||||
import { type ServiceAdaptersManager } from './service'
|
||||
import { type StorageAdapter } from './storage'
|
||||
|
||||
export interface ServerFindOptions<T extends Doc> extends FindOptions<T> {
|
||||
domain?: Domain // Allow to find for Doc's in specified domain only.
|
||||
prefix?: string
|
||||
}
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
@ -58,10 +62,7 @@ export interface ServerStorage extends LowLevelStorage {
|
||||
ctx: MeasureContext,
|
||||
_class: Ref<Class<T>>,
|
||||
query: DocumentQuery<T>,
|
||||
options?: FindOptions<T> & {
|
||||
domain?: Domain // Allow to find for Doc's in specified domain only.
|
||||
prefix?: string
|
||||
}
|
||||
options?: ServerFindOptions<T>
|
||||
) => Promise<FindResult<T>>
|
||||
searchFulltext: (ctx: MeasureContext, query: SearchQuery, options: SearchOptions) => Promise<SearchResult>
|
||||
tx: (ctx: SessionOperationContext, tx: Tx) => Promise<TxResult>
|
||||
|
@ -27,6 +27,7 @@ describe('minio operations', () => {
|
||||
}
|
||||
const toolCtx = new MeasureMetricsContext('test', {})
|
||||
it('check root bucket', async () => {
|
||||
jest.setTimeout(50000)
|
||||
const minioService = new MinioService({ ...(config.storages[0] as MinioConfig), rootBucket: 'test-bucket' })
|
||||
|
||||
let existingTestBuckets = await minioService.listBuckets(toolCtx, '')
|
||||
|
@ -97,6 +97,7 @@ describe('mongo operations', () => {
|
||||
})
|
||||
|
||||
afterAll(async () => {
|
||||
mongoClient.close()
|
||||
await shutdown()
|
||||
})
|
||||
|
||||
@ -136,6 +137,8 @@ describe('mongo operations', () => {
|
||||
await txStorage.tx(mctx, t)
|
||||
}
|
||||
|
||||
await txStorage.close()
|
||||
|
||||
const conf: DbConfiguration = {
|
||||
domains: {
|
||||
[DOMAIN_TX]: 'MongoTx',
|
||||
@ -223,14 +226,19 @@ describe('mongo operations', () => {
|
||||
|
||||
it('check add', async () => {
|
||||
jest.setTimeout(500000)
|
||||
const times: number[] = []
|
||||
for (let i = 0; i < 50; i++) {
|
||||
const t = Date.now()
|
||||
await operations.createDoc(taskPlugin.class.Task, '' as Ref<Space>, {
|
||||
name: `my-task-${i}`,
|
||||
description: `${i * i}`,
|
||||
rate: 20 + i
|
||||
})
|
||||
times.push(Date.now() - t)
|
||||
}
|
||||
|
||||
console.log('createDoc times', times)
|
||||
|
||||
const r = await client.findAll<Task>(taskPlugin.class.Task, {})
|
||||
expect(r.length).toEqual(50)
|
||||
|
||||
|
@ -66,6 +66,7 @@ import {
|
||||
updateHashForDoc,
|
||||
type DbAdapter,
|
||||
type DomainHelperOperations,
|
||||
type ServerFindOptions,
|
||||
type StorageAdapter,
|
||||
type TxAdapter
|
||||
} from '@hcengineering/server-core'
|
||||
@ -75,6 +76,7 @@ import {
|
||||
type AbstractCursor,
|
||||
type AnyBulkWriteOperation,
|
||||
type Collection,
|
||||
type CreateIndexesOptions,
|
||||
type Db,
|
||||
type Document,
|
||||
type Filter,
|
||||
@ -153,9 +155,15 @@ abstract class MongoAdapterBase implements DbAdapter {
|
||||
for (const value of config.indexes) {
|
||||
try {
|
||||
if (typeof value === 'string') {
|
||||
await this.collection(domain).createIndex(value, { sparse: true })
|
||||
await this.collection(domain).createIndex(value)
|
||||
} else {
|
||||
await this.collection(domain).createIndex(value.keys, { sparse: value.sparse ?? true })
|
||||
const opt: CreateIndexesOptions = {}
|
||||
if (value.filter !== undefined) {
|
||||
opt.partialFilterExpression = value.filter
|
||||
} else if (value.sparse === true) {
|
||||
opt.sparse = true
|
||||
}
|
||||
await this.collection(domain).createIndex(value.keys, opt)
|
||||
}
|
||||
} catch (err: any) {
|
||||
console.error('failed to create index', domain, value, err)
|
||||
@ -171,7 +179,7 @@ abstract class MongoAdapterBase implements DbAdapter {
|
||||
const name: string = existingIndex.name
|
||||
if (
|
||||
deletePattern.some((it) => it.test(name)) &&
|
||||
(existingIndex.sparse !== true || !keepPattern.some((it) => it.test(name)))
|
||||
(existingIndex.sparse === true || !keepPattern.some((it) => it.test(name)))
|
||||
) {
|
||||
await this.collection(domain).dropIndex(name)
|
||||
}
|
||||
@ -469,9 +477,7 @@ abstract class MongoAdapterBase implements DbAdapter {
|
||||
ctx: MeasureContext,
|
||||
clazz: Ref<Class<T>>,
|
||||
query: DocumentQuery<T>,
|
||||
options?: FindOptions<T> & {
|
||||
domain?: Domain // Allow to find for Doc's in specified domain only.
|
||||
}
|
||||
options?: ServerFindOptions<T>
|
||||
): Promise<FindResult<T>> {
|
||||
const pipeline: any[] = []
|
||||
const match = { $match: this.translateQuery(clazz, query) }
|
||||
@ -506,10 +512,8 @@ abstract class MongoAdapterBase implements DbAdapter {
|
||||
|
||||
// const domain = this.hierarchy.getDomain(clazz)
|
||||
const domain = options?.domain ?? this.hierarchy.getDomain(clazz)
|
||||
const cursor = this.collection(domain).aggregate<WithLookup<T>>(pipeline, {
|
||||
checkKeys: false,
|
||||
enableUtf8Validation: false
|
||||
})
|
||||
|
||||
const cursor = this.collection(domain).aggregate<WithLookup<T>>(pipeline)
|
||||
let result: WithLookup<T>[] = []
|
||||
let total = options?.total === true ? 0 : -1
|
||||
try {
|
||||
@ -662,12 +666,12 @@ abstract class MongoAdapterBase implements DbAdapter {
|
||||
ctx: MeasureContext,
|
||||
_class: Ref<Class<T>>,
|
||||
query: DocumentQuery<T>,
|
||||
options?: FindOptions<T> & {
|
||||
domain?: Domain // Allow to find for Doc's in specified domain only.
|
||||
}
|
||||
options?: ServerFindOptions<T>
|
||||
): Promise<FindResult<T>> {
|
||||
const stTime = Date.now()
|
||||
return await this.findRateLimit.exec(async () => {
|
||||
return await this.collectOps(
|
||||
const st = Date.now()
|
||||
const result = await this.collectOps(
|
||||
this.globalCtx,
|
||||
this.hierarchy.findDomain(_class),
|
||||
'find',
|
||||
@ -737,6 +741,17 @@ abstract class MongoAdapterBase implements DbAdapter {
|
||||
options
|
||||
}
|
||||
)
|
||||
const edTime = Date.now()
|
||||
if (edTime - st > 1000 || st - stTime > 1000) {
|
||||
ctx.error('FindAll', {
|
||||
time: edTime - st,
|
||||
_class,
|
||||
query: cutObjectArray(query),
|
||||
options,
|
||||
queueTime: st - stTime
|
||||
})
|
||||
}
|
||||
return result
|
||||
})
|
||||
}
|
||||
|
||||
@ -1027,97 +1042,101 @@ class MongoAdapter extends MongoAdapterBase {
|
||||
return undefined
|
||||
})
|
||||
|
||||
for (const [domain, txs] of byDomain) {
|
||||
if (domain === undefined) {
|
||||
continue
|
||||
}
|
||||
const domainBulk: OperationBulk = {
|
||||
add: [],
|
||||
update: new Map(),
|
||||
bulkOperations: [],
|
||||
findUpdate: new Set(),
|
||||
raw: []
|
||||
}
|
||||
for (const t of txs) {
|
||||
this.updateBulk(domainBulk, t)
|
||||
}
|
||||
if (
|
||||
domainBulk.add.length === 0 &&
|
||||
domainBulk.update.size === 0 &&
|
||||
domainBulk.bulkOperations.length === 0 &&
|
||||
domainBulk.findUpdate.size === 0 &&
|
||||
domainBulk.raw.length === 0
|
||||
) {
|
||||
continue
|
||||
}
|
||||
await this.rateLimit.exec(async () => {
|
||||
await this.collectOps(
|
||||
this.globalCtx,
|
||||
domain,
|
||||
'tx',
|
||||
async (ctx) => {
|
||||
const coll = this.db.collection<Doc>(domain)
|
||||
|
||||
// Minir optimizations
|
||||
// Add Remove optimization
|
||||
|
||||
if (domainBulk.add.length > 0) {
|
||||
await ctx.with('insertMany', {}, async () => {
|
||||
await coll.insertMany(domainBulk.add, { ordered: false })
|
||||
})
|
||||
}
|
||||
if (domainBulk.update.size > 0) {
|
||||
// Extract similar update to update many if possible
|
||||
// TODO:
|
||||
await ctx.with('updateMany-bulk', {}, async () => {
|
||||
await coll.bulkWrite(
|
||||
Array.from(domainBulk.update.entries()).map((it) => ({
|
||||
updateOne: {
|
||||
filter: { _id: it[0] },
|
||||
update: {
|
||||
$set: it[1]
|
||||
}
|
||||
}
|
||||
})),
|
||||
{
|
||||
ordered: false
|
||||
}
|
||||
)
|
||||
})
|
||||
}
|
||||
if (domainBulk.bulkOperations.length > 0) {
|
||||
await ctx.with('bulkWrite', {}, async () => {
|
||||
await coll.bulkWrite(domainBulk.bulkOperations, {
|
||||
ordered: false
|
||||
})
|
||||
})
|
||||
}
|
||||
if (domainBulk.findUpdate.size > 0) {
|
||||
await ctx.with('find-result', {}, async () => {
|
||||
const docs = await coll.find({ _id: { $in: Array.from(domainBulk.findUpdate) } }).toArray()
|
||||
result.push(...docs)
|
||||
})
|
||||
}
|
||||
|
||||
if (domainBulk.raw.length > 0) {
|
||||
await ctx.with('raw', {}, async () => {
|
||||
for (const r of domainBulk.raw) {
|
||||
result.push({ object: await r() })
|
||||
}
|
||||
})
|
||||
}
|
||||
},
|
||||
{
|
||||
await this.rateLimit.exec(async () => {
|
||||
const domains: Promise<void>[] = []
|
||||
for (const [domain, txs] of byDomain) {
|
||||
if (domain === undefined) {
|
||||
continue
|
||||
}
|
||||
const domainBulk: OperationBulk = {
|
||||
add: [],
|
||||
update: new Map(),
|
||||
bulkOperations: [],
|
||||
findUpdate: new Set(),
|
||||
raw: []
|
||||
}
|
||||
for (const t of txs) {
|
||||
this.updateBulk(domainBulk, t)
|
||||
}
|
||||
if (
|
||||
domainBulk.add.length === 0 &&
|
||||
domainBulk.update.size === 0 &&
|
||||
domainBulk.bulkOperations.length === 0 &&
|
||||
domainBulk.findUpdate.size === 0 &&
|
||||
domainBulk.raw.length === 0
|
||||
) {
|
||||
continue
|
||||
}
|
||||
domains.push(
|
||||
this.collectOps(
|
||||
this.globalCtx,
|
||||
domain,
|
||||
add: domainBulk.add.length,
|
||||
update: domainBulk.update.size,
|
||||
bulk: domainBulk.bulkOperations.length,
|
||||
find: domainBulk.findUpdate.size,
|
||||
raw: domainBulk.raw.length
|
||||
}
|
||||
'tx',
|
||||
async (ctx) => {
|
||||
const coll = this.db.collection<Doc>(domain)
|
||||
|
||||
// Minir optimizations
|
||||
// Add Remove optimization
|
||||
|
||||
if (domainBulk.add.length > 0) {
|
||||
await ctx.with('insertMany', {}, async () => {
|
||||
await coll.insertMany(domainBulk.add, { ordered: false })
|
||||
})
|
||||
}
|
||||
if (domainBulk.update.size > 0) {
|
||||
// Extract similar update to update many if possible
|
||||
// TODO:
|
||||
await ctx.with('updateMany-bulk', {}, async () => {
|
||||
await coll.bulkWrite(
|
||||
Array.from(domainBulk.update.entries()).map((it) => ({
|
||||
updateOne: {
|
||||
filter: { _id: it[0] },
|
||||
update: {
|
||||
$set: it[1]
|
||||
}
|
||||
}
|
||||
})),
|
||||
{
|
||||
ordered: false
|
||||
}
|
||||
)
|
||||
})
|
||||
}
|
||||
if (domainBulk.bulkOperations.length > 0) {
|
||||
await ctx.with('bulkWrite', {}, async () => {
|
||||
await coll.bulkWrite(domainBulk.bulkOperations, {
|
||||
ordered: false
|
||||
})
|
||||
})
|
||||
}
|
||||
if (domainBulk.findUpdate.size > 0) {
|
||||
await ctx.with('find-result', {}, async () => {
|
||||
const docs = await coll.find({ _id: { $in: Array.from(domainBulk.findUpdate) } }).toArray()
|
||||
result.push(...docs)
|
||||
})
|
||||
}
|
||||
|
||||
if (domainBulk.raw.length > 0) {
|
||||
await ctx.with('raw', {}, async () => {
|
||||
for (const r of domainBulk.raw) {
|
||||
result.push({ object: await r() })
|
||||
}
|
||||
})
|
||||
}
|
||||
},
|
||||
{
|
||||
domain,
|
||||
add: domainBulk.add.length,
|
||||
update: domainBulk.update.size,
|
||||
bulk: domainBulk.bulkOperations.length,
|
||||
find: domainBulk.findUpdate.size,
|
||||
raw: domainBulk.raw.length
|
||||
}
|
||||
)
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
await Promise.all(domains)
|
||||
})
|
||||
return result
|
||||
}
|
||||
|
||||
|
@ -131,7 +131,6 @@ export function getMongoClient (uri: string, options?: MongoClientOptions): Mong
|
||||
MongoClient.connect(uri, {
|
||||
appName: 'transactor',
|
||||
...options,
|
||||
enableUtf8Validation: false,
|
||||
...extraOptions
|
||||
}),
|
||||
() => {
|
||||
@ -208,11 +207,10 @@ export class DBCollectionHelper implements DomainHelperOperations {
|
||||
if (value.filter !== undefined) {
|
||||
await this.collection(domain).createIndex(value.keys, {
|
||||
...options,
|
||||
sparse: false,
|
||||
partialFilterExpression: value.filter
|
||||
})
|
||||
} else {
|
||||
await this.collection(domain).createIndex(value.keys, { ...options, sparse: value.sparse ?? true })
|
||||
await this.collection(domain).createIndex(value.keys, { ...options, sparse: value.sparse ?? false })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ describe('s3 operations', () => {
|
||||
}
|
||||
const toolCtx = new MeasureMetricsContext('test', {})
|
||||
it('check root bucket', async () => {
|
||||
jest.setTimeout(50000)
|
||||
const minioService = new S3Service({ ...(config.storages[0] as S3Config), rootBucket: 'haiodo-test-bucket' })
|
||||
|
||||
let existingTestBuckets = await minioService.listBuckets(toolCtx, '')
|
||||
|
@ -249,7 +249,8 @@ export async function upgradeModel (
|
||||
migrateOperations: [string, MigrateOperation][],
|
||||
logger: ModelLogger = consoleModelLogger,
|
||||
skipTxUpdate: boolean = false,
|
||||
progress: (value: number) => Promise<void>
|
||||
progress: (value: number) => Promise<void>,
|
||||
forceIndexes: boolean = false
|
||||
): Promise<Tx[]> {
|
||||
const { mongodbUri, txes } = prepareTools(rawTxes)
|
||||
|
||||
@ -347,6 +348,25 @@ export async function upgradeModel (
|
||||
workspaceId
|
||||
)
|
||||
|
||||
const upgradeIndexes = async (): Promise<void> => {
|
||||
ctx.info('Migrate to sparse indexes')
|
||||
// Create update indexes
|
||||
await createUpdateIndexes(
|
||||
ctx,
|
||||
hierarchy,
|
||||
modelDb,
|
||||
db,
|
||||
logger,
|
||||
async (value) => {
|
||||
await progress(90 + (Math.min(value, 100) / 100) * 10)
|
||||
},
|
||||
workspaceId
|
||||
)
|
||||
}
|
||||
if (forceIndexes) {
|
||||
await upgradeIndexes()
|
||||
}
|
||||
|
||||
await ctx.with('migrate', {}, async (ctx) => {
|
||||
let i = 0
|
||||
for (const op of migrateOperations) {
|
||||
@ -366,22 +386,8 @@ export async function upgradeModel (
|
||||
|
||||
await tryMigrate(migrateClient, coreId, [
|
||||
{
|
||||
state: '#sparse',
|
||||
func: async () => {
|
||||
ctx.info('Migrate to sparse indexes')
|
||||
// Create update indexes
|
||||
await createUpdateIndexes(
|
||||
ctx,
|
||||
hierarchy,
|
||||
modelDb,
|
||||
db,
|
||||
logger,
|
||||
async (value) => {
|
||||
await progress(90 + (Math.min(value, 100) / 100) * 10)
|
||||
},
|
||||
workspaceId
|
||||
)
|
||||
}
|
||||
state: 'sparse',
|
||||
func: upgradeIndexes
|
||||
}
|
||||
])
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user