Fix mongo indexes (#6122)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-07-24 17:27:09 +07:00 committed by GitHub
parent cfe99b6bcf
commit c3a5b88b6e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 797 additions and 440 deletions

View File

@ -46,7 +46,7 @@ import {
createStorageBackupStorage,
restore
} from '@hcengineering/server-backup'
import serverClientPlugin, { BlobClient, getTransactorEndpoint } from '@hcengineering/server-client'
import serverClientPlugin, { BlobClient, createClient, getTransactorEndpoint } from '@hcengineering/server-client'
import serverToken, { decodeToken, generateToken } from '@hcengineering/server-token'
import toolPlugin from '@hcengineering/server-tool'
@ -244,13 +244,29 @@ export function devTool (
const { mongodbUri } = prepareTools()
await withDatabase(mongodbUri, async (db, client) => {
console.log(`assigning user ${email} to ${workspace}...`)
const workspaceInfo = await getWorkspaceById(db, productId, workspace)
if (workspaceInfo === null) {
throw new Error(`workspace ${workspace} not found`)
}
console.log('assigning to workspace', workspaceInfo)
try {
await assignWorkspace(toolCtx, db, productId, null, email, workspaceInfo.workspace, AccountRole.User)
const workspaceInfo = await getWorkspaceById(db, productId, workspace)
if (workspaceInfo === null) {
throw new Error(`workspace ${workspace} not found`)
}
const token = generateToken(systemAccountEmail, { name: workspaceInfo.workspace, productId })
const endpoint = await getTransactorEndpoint(token, 'external')
console.log('assigning to workspace', workspaceInfo, endpoint)
const client = await createClient(endpoint, token)
console.log('assigning to workspace connected', workspaceInfo, endpoint)
await assignWorkspace(
toolCtx,
db,
productId,
null,
email,
workspaceInfo.workspace,
AccountRole.User,
undefined,
undefined,
client
)
await client.close()
} catch (err: any) {
console.error(err)
}
@ -328,7 +344,16 @@ export function devTool (
const { mongodbUri } = prepareTools()
console.log(`set user ${email} role for ${workspace}...`)
await withDatabase(mongodbUri, async (db) => {
await setRole(toolCtx, db, email, workspace, productId, role)
const workspaceInfo = await getWorkspaceById(db, productId, workspace)
if (workspaceInfo === null) {
throw new Error(`workspace ${workspace} not found`)
}
console.log('assigning to workspace', workspaceInfo)
const token = generateToken(systemAccountEmail, { name: workspaceInfo.workspace, productId })
const endpoint = await getTransactorEndpoint(token, 'external')
const client = await createClient(endpoint, token)
await setRole(toolCtx, db, email, workspace, productId, role, client)
await client.close()
})
})

View File

@ -362,10 +362,7 @@ export function createModel (builder: Builder): void {
builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, {
domain: DOMAIN_ACTIVITY,
indexes: [
{ attachedTo: 1, createdOn: 1 },
{ attachedTo: 1, createdOn: -1 }
],
indexes: [{ keys: { attachedTo: 1, createdOn: 1 } }, { keys: { attachedTo: 1, createdOn: -1 } }],
disabled: [
{ modifiedOn: 1 },
{ createdOn: -1 },

View File

@ -36,7 +36,7 @@ import {
type DomainIndexConfiguration,
type Enum,
type EnumOf,
type FieldIndex,
type FieldIndexConfig,
type FullTextSearchContext,
type IndexStageState,
type IndexingConfiguration,
@ -134,7 +134,7 @@ export class TAttachedDoc extends TDoc implements AttachedDoc {
export class TBlob extends TDoc implements Blob {
@Prop(TypeString(), core.string.Blob)
@ReadOnly()
@Index(IndexKind.Indexed)
// @Index(IndexKind.Indexed)
provider!: string
@Prop(TypeString(), core.string.BlobContentType)
@ -340,7 +340,6 @@ export class TDocIndexState extends TDoc implements DocIndexState {
stages!: Record<string, boolean | string>
@Prop(TypeString(), getEmbeddedLabel('Generation'))
@Index(IndexKind.Indexed)
@Hidden()
generationId?: string
}
@ -371,7 +370,7 @@ export class TConfiguration extends TDoc implements Configuration {
@MMixin(core.mixin.IndexConfiguration, core.class.Class)
export class TIndexConfiguration<T extends Doc = Doc> extends TClass implements IndexingConfiguration<T> {
indexes!: FieldIndex<T>[]
indexes!: (string | FieldIndexConfig<T>)[]
searchDisabled!: boolean
}

View File

@ -194,26 +194,32 @@ export function createModel (builder: Builder): void {
core.class.Class,
core.mixin.IndexConfiguration,
{
indexes: [
'tx.objectId',
'tx.operations.attachedTo',
'space',
{
objectSpace: 1,
_id: 1,
modifiedOn: 1
},
{
objectSpace: 1,
modifiedBy: 1,
objectClass: 1
}
]
indexes: ['tx.objectId', 'tx.operations.attachedTo']
}
)
builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, {
domain: DOMAIN_TX,
disabled: [{ space: 1 }, { objectClass: 1 }, { createdBy: 1 }, { createdBy: -1 }, { createdOn: -1 }]
disabled: [
{ space: 1 },
{ objectClass: 1 },
{ createdBy: 1 },
{ createdBy: -1 },
{ createdOn: -1 },
{ modifiedBy: 1 },
{ objectSpace: 1 }
],
indexes: [
{
keys: {
objectSpace: 1,
_id: 1,
modifiedOn: 1
},
filter: {
objectSpace: core.space.Model
}
}
]
})
builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, {
@ -299,20 +305,13 @@ export function createModel (builder: Builder): void {
{
indexes: [
{
_class: 1,
stages: 1,
_id: 1,
modifiedOn: 1
},
{
_class: 1,
_id: 1,
modifiedOn: 1
},
{
_class: 1,
_id: 1,
objectClass: 1
keys: {
_class: 1,
stages: 1,
_id: 1,
modifiedOn: 1
},
sparse: true
}
]
}

View File

@ -46,7 +46,7 @@ import { TDoc } from './core'
@Model(core.class.Tx, core.class.Doc, DOMAIN_TX)
export class TTx extends TDoc implements Tx {
@Prop(TypeRef(core.class.Space), core.string.Space)
@Index(IndexKind.Indexed)
// @Index(IndexKind.Indexed)
@Hidden()
objectSpace!: Ref<Space>
}
@ -62,7 +62,7 @@ export class TTxCUD<T extends Doc> extends TTx implements TxCUD<T> {
objectId!: Ref<T>
@Prop(TypeRef(core.class.Class), core.string.ClassLabel)
@Index(IndexKind.Indexed)
// @Index(IndexKind.Indexed)
@Hidden()
objectClass!: Ref<Class<T>>
}

View File

@ -619,9 +619,10 @@ export function createModel (builder: Builder): void {
},
presenter: notification.component.ReactionNotificationPresenter
})
builder.createDoc(core.class.DomainIndexConfiguration, core.space.Model, {
domain: DOMAIN_NOTIFICATION,
indexes: [{ user: 1, archived: 1 }],
indexes: [{ keys: { user: 1, archived: 1 } }],
disabled: [{ modifiedOn: 1 }, { modifiedBy: 1 }, { createdBy: 1 }, { isViewed: 1 }, { hidden: 1 }]
})

View File

@ -15,6 +15,7 @@
//
import type { Asset, IntlString, Plugin } from '@hcengineering/platform'
import type { DocumentQuery } from './storage'
/**
* @public
@ -122,6 +123,7 @@ export enum IndexKind {
* Also mean to include into Elastic search.
*/
Indexed,
// Same as indexed but for descending
IndexedDsc
}
@ -623,6 +625,12 @@ export type FieldIndex<T extends Doc> = {
[P in keyof T]?: IndexOrder
} & Record<string, IndexOrder>
export interface FieldIndexConfig<T extends Doc> {
sparse?: boolean
filter?: Omit<DocumentQuery<T>, '$search'>
keys: FieldIndex<T> | string
}
/**
* @public
*
@ -630,7 +638,7 @@ export type FieldIndex<T extends Doc> = {
*/
export interface IndexingConfiguration<T extends Doc> extends Class<Doc> {
// Define a list of extra index definitions.
indexes: (FieldIndex<T> | string)[]
indexes: (string | FieldIndexConfig<T>)[]
searchDisabled?: boolean
}
@ -643,7 +651,7 @@ export interface DomainIndexConfiguration extends Doc {
disabled?: (FieldIndex<Doc> | string)[]
// Additional indexes we could like to enabled
indexes?: (FieldIndex<Doc> | string)[]
indexes?: (FieldIndexConfig<Doc> | string)[]
skip?: string[]
}

View File

@ -11,12 +11,12 @@ export class MeasureMetricsContext implements MeasureContext {
private readonly params: ParamsType
logger: MeasureLogger
metrics: Metrics
private readonly done: (value?: number) => void
private readonly done: (value?: number, override?: boolean) => void
constructor (
name: string,
params: ParamsType,
fullParams: FullParamsType = {},
fullParams: FullParamsType | (() => FullParamsType) = {},
metrics: Metrics = newMetrics(),
logger?: MeasureLogger,
readonly parent?: MeasureContext,
@ -25,8 +25,21 @@ export class MeasureMetricsContext implements MeasureContext {
this.name = name
this.params = params
this.metrics = metrics
this.metrics.namedParams = this.metrics.namedParams ?? {}
for (const [k, v] of Object.entries(params)) {
if (this.metrics.namedParams[k] !== v) {
this.metrics.namedParams[k] = v
} else {
this.metrics.namedParams[k] = '*'
}
}
this.done = measure(metrics, params, fullParams, (spend) => {
this.logger.logOperation(this.name, spend, { ...params, ...fullParams, ...(this.logParams ?? {}) })
this.logger.logOperation(this.name, spend, {
...params,
...(typeof fullParams === 'function' ? fullParams() : fullParams),
...fullParams,
...(this.logParams ?? {})
})
})
const errorPrinter = ({ message, stack, ...rest }: Error): object => ({
@ -63,12 +76,17 @@ export class MeasureMetricsContext implements MeasureContext {
}
}
measure (name: string, value: number): void {
measure (name: string, value: number, override?: boolean): void {
const c = new MeasureMetricsContext('#' + name, {}, {}, childMetrics(this.metrics, ['#' + name]), this.logger, this)
c.done(value)
c.done(value, override)
}
newChild (name: string, params: ParamsType, fullParams?: FullParamsType, logger?: MeasureLogger): MeasureContext {
newChild (
name: string,
params: ParamsType,
fullParams?: FullParamsType | (() => FullParamsType),
logger?: MeasureLogger
): MeasureContext {
return new MeasureMetricsContext(
name,
params,
@ -84,7 +102,7 @@ export class MeasureMetricsContext implements MeasureContext {
name: string,
params: ParamsType,
op: (ctx: MeasureContext) => T | Promise<T>,
fullParams?: ParamsType
fullParams?: ParamsType | (() => FullParamsType)
): Promise<T> {
const c = this.newChild(name, params, fullParams, this.logger)
try {

View File

@ -18,7 +18,8 @@ export function newMetrics (): Metrics {
operations: 0,
value: 0,
measurements: {},
params: {}
params: {},
namedParams: {}
}
}
@ -27,18 +28,32 @@ function getUpdatedTopResult (
time: number,
params: FullParamsType
): Metrics['topResult'] {
if (current === undefined || current.length < 3 || current.some((it) => it.value < time)) {
const result = [
...(current ?? []),
{
value: time,
params: cutObjectArray(params)
}
]
result.sort((a, b) => b.value - a.value)
return result.slice(0, 3)
if (time === 0) {
return current
}
const result: Metrics['topResult'] = current ?? []
const newValue = {
value: time,
params: cutObjectArray(params)
}
if (result.length > 6) {
if (result[0].value < newValue.value) {
result[0] = newValue
return result
}
if (result[result.length - 1].value > newValue.value) {
result[result.length - 1] = newValue
return result
}
// Shift the middle
return [result[0], newValue, ...result.slice(1, 3), result[5]]
} else {
result.push(newValue)
return result
}
return current
}
/**
@ -48,12 +63,14 @@ function getUpdatedTopResult (
export function measure (
metrics: Metrics,
params: ParamsType,
fullParams: FullParamsType = {},
fullParams: FullParamsType | (() => FullParamsType) = {},
endOp?: (spend: number) => void
): () => void {
const st = Date.now()
return (value?: number) => {
return (value?: number, override?: boolean) => {
const ed = Date.now()
const fParams = typeof fullParams === 'function' ? fullParams() : fullParams
// Update params if required
for (const [k, v] of Object.entries(params)) {
let params = metrics.params[k]
@ -70,16 +87,24 @@ export function measure (
}
params[vKey] = param
}
param.value += value ?? ed - st
param.operations++
if (override === true) {
metrics.operations = value ?? ed - st
} else {
param.value += value ?? ed - st
param.operations++
}
param.topResult = getUpdatedTopResult(param.topResult, ed - st, fullParams)
param.topResult = getUpdatedTopResult(param.topResult, ed - st, fParams)
}
// Update leaf data
metrics.value += value ?? ed - st
metrics.operations++
if (override === true) {
metrics.operations = value ?? ed - st
} else {
metrics.value += value ?? ed - st
metrics.operations++
}
metrics.topResult = getUpdatedTopResult(metrics.topResult, ed - st, fullParams)
metrics.topResult = getUpdatedTopResult(metrics.topResult, ed - st, fParams)
endOp?.(ed - st)
}
}
@ -136,7 +161,8 @@ export function metricsAggregate (m: Metrics, limit: number = -1): Metrics {
measurements: ms,
params: m.params,
value: sumVal,
topResult: m.topResult
topResult: m.topResult,
namedParams: m.namedParams
}
}

View File

@ -29,6 +29,7 @@ export interface MetricsData {
* @public
*/
export interface Metrics extends MetricsData {
namedParams: ParamsType
params: Record<string, Record<string, MetricsData>>
measurements: Record<string, Metrics>
}
@ -59,7 +60,7 @@ export interface MeasureContext {
name: string,
params: ParamsType,
op: (ctx: MeasureContext) => T | Promise<T>,
fullParams?: FullParamsType
fullParams?: FullParamsType | (() => FullParamsType)
) => Promise<T>
withLog: <T>(
@ -73,7 +74,7 @@ export interface MeasureContext {
parent?: MeasureContext
measure: (name: string, value: number) => void
measure: (name: string, value: number, override?: boolean) => void
// Capture error
error: (message: string, obj?: Record<string, any>) => void

View File

@ -365,11 +365,15 @@ export class RateLimiter {
this.rate = rate
}
notify: (() => void)[] = []
async exec<T, B extends Record<string, any> = any>(op: (args?: B) => Promise<T>, args?: B): Promise<T> {
const processingId = this.idCounter++
while (this.processingQueue.size > this.rate) {
await Promise.race(this.processingQueue.values())
while (this.processingQueue.size >= this.rate) {
await new Promise<void>((resolve) => {
this.notify.push(resolve)
})
}
try {
const p = op(args)
@ -377,6 +381,10 @@ export class RateLimiter {
return await p
} finally {
this.processingQueue.delete(processingId)
const n = this.notify.shift()
if (n !== undefined) {
n()
}
}
}
@ -384,10 +392,7 @@ export class RateLimiter {
if (this.processingQueue.size < this.rate) {
void this.exec(op, args)
} else {
while (this.processingQueue.size > this.rate) {
await Promise.race(this.processingQueue.values())
}
void this.exec(op, args)
await this.exec(op, args)
}
}

View File

@ -56,11 +56,12 @@
ops = 0
}, 1000)
const rate = new RateLimiter(commandsToSendParallel)
const client = getClient()
const doOp = async () => {
const st = Date.now()
active++
await getClient().createDoc(core.class.BenchmarkDoc, core.space.Configuration, {
await client.createDoc(core.class.BenchmarkDoc, core.space.Configuration, {
source: genData(dataSize),
request: {
documents: 1,

View File

@ -1,7 +1,9 @@
<script lang="ts">
import { Metrics } from '@hcengineering/core'
import { Expandable } from '@hcengineering/ui'
import { getEmbeddedLabel } from '@hcengineering/platform'
import { Button, Expandable, showPopup } from '@hcengineering/ui'
import { FixedColumn } from '@hcengineering/view-resources'
import Params from './Params.svelte'
export let metrics: Metrics
export let level = 0
@ -28,8 +30,18 @@
contentColor
>
<svelte:fragment slot="title">
{@const params = JSON.stringify(metrics.namedParams ?? {})}
<div class="flex-row-center flex-between flex-grow ml-2">
{name}
{#if params !== '{}'}
<Button
label={getEmbeddedLabel('*')}
on:click={() => {
showPopup(Params, { params: metrics.namedParams ?? {} })
}}
kind={'ghost'}
/>
{/if}
</div>
</svelte:fragment>
<svelte:fragment slot="tools">

View File

@ -0,0 +1,80 @@
<!--
// Copyright © 2020 Anticrm Platform Contributors.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
-->
<script lang="ts">
import presentation from '@hcengineering/presentation'
import { Button, FocusHandler, createFocusManager } from '@hcengineering/ui'
import { createEventDispatcher } from 'svelte'
export let params: Record<string, any>
const dispatch = createEventDispatcher()
const manager = createFocusManager()
</script>
<FocusHandler {manager} />
<div class="msgbox-container">
<div class="overflow-label fs-title mb-4"></div>
<div class="message no-word-wrap" style:overflow={'auto'}>
{#each Object.entries(params) as kv}
<div class="flex-row-center">
{kv[0]}: {typeof kv[1] === 'object' ? JSON.stringify(kv[1]) : kv[1]}
</div>
{/each}
</div>
<div class="footer">
<Button
focus
focusIndex={1}
label={presentation.string.Ok}
size={'large'}
kind={'primary'}
on:click={() => {
dispatch('close', true)
}}
/>
</div>
</div>
<style lang="scss">
.msgbox-container {
display: flex;
flex-direction: column;
padding: 2rem 1.75rem 1.75rem;
width: 30rem;
max-width: 40rem;
background: var(--theme-popup-color);
border-radius: 0.5rem;
user-select: none;
box-shadow: var(--theme-popup-shadow);
.message {
margin-bottom: 1.75rem;
color: var(--theme-content-color);
}
.footer {
flex-shrink: 0;
display: grid;
grid-auto-flow: column;
direction: rtl;
justify-content: flex-start;
align-items: center;
column-gap: 0.5rem;
// mask-image: linear-gradient(90deg, rgba(0, 0, 0, 0) 1.25rem, rgba(0, 0, 0, 1) 2.5rem);
// overflow: hidden;
}
}
</style>

View File

@ -3,7 +3,7 @@ FROM node:20
WORKDIR /usr/src/app
COPY bundle/bundle.js ./
RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd --unsafe-perm
RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd snappy --unsafe-perm
RUN apt-get update
RUN apt-get install libjemalloc2

View File

@ -3,7 +3,7 @@ FROM node:20
WORKDIR /usr/src/app
RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd --unsafe-perm
RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd snappy --unsafe-perm
COPY bundle/bundle.js ./
EXPOSE 3000

View File

@ -2,7 +2,7 @@
FROM node:20
WORKDIR /usr/src/app
RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd --unsafe-perm
RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd snappy --unsafe-perm
RUN apt-get update
RUN apt-get install libjemalloc2

View File

@ -3,7 +3,7 @@ FROM node:20
ENV NODE_ENV production
WORKDIR /app
RUN npm install --ignore-scripts=false --verbose sharp@v0.32.6 bufferutil utf-8-validate @mongodb-js/zstd --unsafe-perm
RUN npm install --ignore-scripts=false --verbose sharp@v0.32.6 bufferutil utf-8-validate @mongodb-js/zstd snappy --unsafe-perm
RUN apt-get update
RUN apt-get install libjemalloc2

View File

@ -3,7 +3,7 @@ FROM node:20
ENV NODE_ENV production
WORKDIR /app
RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd --unsafe-perm
RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd snappy --unsafe-perm
RUN npm install --ignore-scripts=false --verbose uNetworking/uWebSockets.js#v20.43.0
RUN apt-get update

View File

@ -298,12 +298,19 @@ async function doTimeReportUpdate (cud: TxCUD<TimeSpendReport>, tx: Tx, control:
switch (cud._class) {
case core.class.TxCreateDoc: {
const ccud = cud as TxCreateDoc<TimeSpendReport>
const res = [
control.txFactory.createTxUpdateDoc<Issue>(parentTx.objectClass, parentTx.objectSpace, parentTx.objectId, {
$inc: { reportedTime: ccud.attributes.value }
})
]
const [currentIssue] = await control.findAll(tracker.class.Issue, { _id: parentTx.objectId }, { limit: 1 })
const res = [
control.txFactory.createTxUpdateDoc<Issue>(
parentTx.objectClass,
parentTx.objectSpace,
parentTx.objectId,
{
$inc: { reportedTime: ccud.attributes.value }
},
false,
currentIssue.modifiedOn
)
]
currentIssue.reportedTime += ccud.attributes.value
currentIssue.remainingTime = Math.max(0, currentIssue.estimation - currentIssue.reportedTime)
updateIssueParentEstimations(currentIssue, res, control, currentIssue.parents, currentIssue.parents)
@ -325,9 +332,16 @@ async function doTimeReportUpdate (cud: TxCUD<TimeSpendReport>, tx: Tx, control:
const [currentIssue] = await control.findAll(tracker.class.Issue, { _id: parentTx.objectId }, { limit: 1 })
if (doc !== undefined) {
res.push(
control.txFactory.createTxUpdateDoc<Issue>(parentTx.objectClass, parentTx.objectSpace, parentTx.objectId, {
$inc: { reportedTime: upd.operations.value - doc.value }
})
control.txFactory.createTxUpdateDoc<Issue>(
parentTx.objectClass,
parentTx.objectSpace,
parentTx.objectId,
{
$inc: { reportedTime: upd.operations.value - doc.value }
},
false,
currentIssue.modifiedOn
)
)
currentIssue.reportedTime -= doc.value
currentIssue.reportedTime += upd.operations.value
@ -350,13 +364,19 @@ async function doTimeReportUpdate (cud: TxCUD<TimeSpendReport>, tx: Tx, control:
).map(TxProcessor.extractTx)
const doc: TimeSpendReport | undefined = TxProcessor.buildDoc2Doc(logTxes)
if (doc !== undefined) {
const res = [
control.txFactory.createTxUpdateDoc<Issue>(parentTx.objectClass, parentTx.objectSpace, parentTx.objectId, {
$inc: { reportedTime: -1 * doc.value }
})
]
const [currentIssue] = await control.findAll(tracker.class.Issue, { _id: parentTx.objectId }, { limit: 1 })
const res = [
control.txFactory.createTxUpdateDoc<Issue>(
parentTx.objectClass,
parentTx.objectSpace,
parentTx.objectId,
{
$inc: { reportedTime: -1 * doc.value }
},
false,
currentIssue.modifiedOn
)
]
currentIssue.reportedTime -= doc.value
currentIssue.remainingTime = Math.max(0, currentIssue.estimation - currentIssue.reportedTime)
updateIssueParentEstimations(currentIssue, res, control, currentIssue.parents, currentIssue.parents)

View File

@ -19,7 +19,7 @@ import {
type DocumentQuery,
type DocumentUpdate,
type Domain,
type FieldIndex,
type FieldIndexConfig,
type FindOptions,
type FindResult,
type Hierarchy,
@ -37,7 +37,7 @@ import { type StorageAdapter } from './storage'
export interface DomainHelperOperations {
create: (domain: Domain) => Promise<void>
exists: (domain: Domain) => boolean
createIndex: (domain: Domain, value: string | FieldIndex<Doc>, options?: { name: string }) => Promise<void>
createIndex: (domain: Domain, value: string | FieldIndexConfig<Doc>, options?: { name: string }) => Promise<void>
dropIndex: (domain: Domain, name: string) => Promise<void>
listIndexes: (domain: Domain) => Promise<{ name: string }[]>
hasDocuments: (domain: Domain, count: number) => Promise<boolean>
@ -94,7 +94,7 @@ export interface DbAdapter {
helper?: () => DomainHelperOperations
createIndexes: (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>) => Promise<void>
removeOldIndex: (domain: Domain, deletePattern: RegExp, keepPattern: RegExp) => Promise<void>
removeOldIndex: (domain: Domain, deletePattern: RegExp[], keepPattern: RegExp[]) => Promise<void>
close: () => Promise<void>
findAll: <T extends Doc>(

View File

@ -335,25 +335,35 @@ export class FullTextIndexPipeline implements FullTextPipeline {
if (!this.indexesCreated) {
this.indexesCreated = true
// We need to be sure we have individual indexes per stage.
const oldStagesRegex = [/fld-v.*/, /cnt-v.*/, /fts-v.*/, /sum-v.*/]
const oldStagesRegex = [/fld-v.*/, /cnt-v.*/, /fts-v.*/, /sum-v.*/, /emb-v.*/]
const deletePattern: RegExp[] = []
const keepPattern: RegExp[] = []
for (const st of this.stages) {
if (this.cancelling) {
return
}
const regexp = oldStagesRegex.find((r) => r.test(st.stageId))
if (regexp !== undefined) {
await this.storage.removeOldIndex(DOMAIN_DOC_INDEX_STATE, regexp, new RegExp(st.stageId))
deletePattern.push(regexp)
keepPattern.push(new RegExp(st.stageId))
}
}
if (deletePattern.length > 0) {
await this.storage.removeOldIndex(DOMAIN_DOC_INDEX_STATE, deletePattern, keepPattern)
}
for (const st of this.stages) {
if (this.cancelling) {
return
}
await this.storage.createIndexes(DOMAIN_DOC_INDEX_STATE, {
indexes: [
{
['stages.' + st.stageId]: 1
},
{
_class: 1,
_id: 1,
['stages.' + st.stageId]: 1,
removed: 1
keys: {
['stages.' + st.stageId]: 1
},
sparse: true
}
]
})
@ -459,23 +469,21 @@ export class FullTextIndexPipeline implements FullTextPipeline {
.filter((it) => it[1] > 3)
.map((it) => it[0])
const q: DocumentQuery<DocIndexState> = {
[`stages.${st.stageId}`]: { $ne: st.stageValue },
removed: false
}
if (toSkip.length > 0) {
q._id = { $nin: toSkip }
}
let result = await ctx.with(
'get-to-index',
{},
async (ctx) =>
await this.storage.findAll(
ctx,
core.class.DocIndexState,
{
[`stages.${st.stageId}`]: { $ne: st.stageValue },
_id: { $nin: toSkip },
removed: false
},
{
sort: { modifiedOn: SortingOrder.Descending },
limit: globalIndexer.processingSize
}
)
await this.storage.findAll(ctx, core.class.DocIndexState, q, {
sort: { modifiedOn: SortingOrder.Descending },
limit: globalIndexer.processingSize
})
)
const toRemove: DocIndexState[] = []
// Check and remove missing class documents.

View File

@ -50,7 +50,7 @@ export class DummyDbAdapter implements DbAdapter {
async init (): Promise<void> {}
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {}
async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise<void> {}
async removeOldIndex (domain: Domain, deletePattern: RegExp[], keepPattern: RegExp[]): Promise<void> {}
async tx (ctx: MeasureContext, ...tx: Tx[]): Promise<TxResult[]> {
return []

View File

@ -3,7 +3,7 @@ import type {
Doc,
Domain,
DomainIndexConfiguration,
FieldIndex,
FieldIndexConfig,
Hierarchy,
MeasureContext,
ModelDb,
@ -14,7 +14,7 @@ import { deepEqual } from 'fast-equals'
import type { DomainHelper, DomainHelperOperations } from '../adapter'
export class DomainIndexHelperImpl implements DomainHelper {
domains = new Map<Domain, Set<string | FieldIndex<Doc>>>()
domains = new Map<Domain, Set<FieldIndexConfig<Doc>>>()
domainConfigurations: DomainIndexConfiguration[] = []
constructor (
readonly ctx: MeasureContext,
@ -33,7 +33,7 @@ export class DomainIndexHelperImpl implements DomainHelper {
ctx.error('failed to find domain index configuration', { err })
}
this.domains = new Map<Domain, Set<string | FieldIndex<Doc>>>()
this.domains = new Map<Domain, Set<FieldIndexConfig<Doc>>>()
// Find all domains and indexed fields inside
for (const c of classes) {
try {
@ -42,14 +42,15 @@ export class DomainIndexHelperImpl implements DomainHelper {
continue
}
const attrs = hierarchy.getAllAttributes(c._id)
const domainAttrs = this.domains.get(domain) ?? new Set<string | FieldIndex<Doc>>()
const domainAttrs = this.domains.get(domain) ?? new Set<FieldIndexConfig<Doc>>()
for (const a of attrs.values()) {
if (a.index !== undefined && (a.index === IndexKind.Indexed || a.index === IndexKind.IndexedDsc)) {
if (a.index === IndexKind.Indexed) {
domainAttrs.add(a.name)
} else {
domainAttrs.add({ [a.name]: IndexOrder.Descending })
}
if (a.index !== undefined && a.index !== IndexKind.FullText) {
domainAttrs.add({
keys: {
[a.name]: a.index === IndexKind.Indexed ? IndexOrder.Ascending : IndexOrder.Descending
},
sparse: true // Default to sparse indexes
})
}
}
@ -57,7 +58,11 @@ export class DomainIndexHelperImpl implements DomainHelper {
if (hierarchy.hasMixin(c, core.mixin.IndexConfiguration)) {
const config = hierarchy.as(c, core.mixin.IndexConfiguration)
for (const attr of config.indexes) {
domainAttrs.add(attr)
if (typeof attr === 'string') {
domainAttrs.add({ keys: { [attr]: IndexOrder.Ascending }, sparse: true })
} else {
domainAttrs.add(attr)
}
}
}
@ -97,7 +102,7 @@ export class DomainIndexHelperImpl implements DomainHelper {
// Do not need to create, since not force and no documents.
return false
}
const bb: (string | FieldIndex<Doc>)[] = []
const bb: (string | FieldIndexConfig<Doc>)[] = []
const added = new Set<string>()
try {
@ -107,18 +112,26 @@ export class DomainIndexHelperImpl implements DomainHelper {
if (has50Documents) {
for (const vv of [...(domainInfo?.values() ?? []), ...(cfg?.indexes ?? [])]) {
try {
const name =
typeof vv === 'string'
? `${vv}_1`
: Object.entries(vv)
.map(([key, val]) => `${key}_${val}`)
.join('_')
let name: string
if (typeof vv === 'string') {
name = `${vv}_sp_1`
} else {
let pfix = ''
if (vv.filter !== undefined) {
pfix += '_fi'
} else if (vv.sparse === true) {
pfix += '_sp'
}
name = Object.entries(vv.keys)
.map(([key, val]) => `${key + pfix}_${val}`)
.join('_')
}
// Check if index is disabled or not
const isDisabled =
cfg?.disabled?.some((it) => {
const _it = typeof it === 'string' ? { [it]: 1 } : it
const _vv = typeof vv === 'string' ? { [vv]: 1 } : vv
const _vv = typeof vv === 'string' ? { [vv]: 1 } : vv.keys
return deepEqual(_it, _vv)
}) ?? false
if (isDisabled) {

View File

@ -75,7 +75,7 @@ class ElasticDataAdapter implements DbAdapter {
}
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {}
async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise<void> {}
async removeOldIndex (domain: Domain, deletePattern: RegExp[], keepPattern: RegExp[]): Promise<void> {}
async close (): Promise<void> {
await this.client.close()

View File

@ -453,12 +453,15 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
const map = new Set<Ref<Space>>()
const field = this.getKey(domain)
while (true) {
const nin = Array.from(map.values())
const spaces = await this.storage.findAll(
ctx,
core.class.Doc,
{
[field]: { $nin: Array.from(map.values()) }
},
nin.length > 0
? {
[field]: { $nin: nin }
}
: {},
{
projection: { [field]: 1 },
limit: 1000,

View File

@ -164,7 +164,10 @@ export function createRawMongoDBAdapter (url: string): RawDBAdapter {
}
}
}
})
}),
{
ordered: false
}
)
})
} catch (err: any) {

View File

@ -16,12 +16,15 @@
import core, {
DOMAIN_MODEL,
DOMAIN_TX,
RateLimiter,
SortingOrder,
TxProcessor,
cutObjectArray,
escapeLikeForRegexp,
groupByArray,
isOperator,
toFindResult,
withContext,
type AttachedDoc,
type Class,
type Doc,
@ -32,6 +35,7 @@ import core, {
type EnumOf,
type FindOptions,
type FindResult,
type FullParamsType,
type Hierarchy,
type IndexingConfiguration,
type Lookup,
@ -80,8 +84,8 @@ import {
} from 'mongodb'
import { DBCollectionHelper, getMongoClient, getWorkspaceDB, type MongoClientReference } from './utils'
function translateDoc (doc: Doc): Document {
return { ...doc, '%hash%': null }
function translateDoc (doc: Doc): Doc {
return { ...doc, '%hash%': null } as any
}
function isLookupQuery<T extends Doc> (query: DocumentQuery<T>): boolean {
@ -121,7 +125,11 @@ export interface DbAdapterOptions {
abstract class MongoAdapterBase implements DbAdapter {
_db: DBCollectionHelper
findRateLimit = new RateLimiter(parseInt(process.env.FIND_RLIMIT ?? '10'))
rateLimit = new RateLimiter(parseInt(process.env.TX_RLIMIT ?? '1'))
constructor (
readonly globalCtx: MeasureContext,
protected readonly db: Db,
protected readonly hierarchy: Hierarchy,
protected readonly modelDb: ModelDb,
@ -142,22 +150,29 @@ abstract class MongoAdapterBase implements DbAdapter {
}
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {
for (const vv of config.indexes) {
for (const value of config.indexes) {
try {
await this.collection(domain).createIndex(vv)
if (typeof value === 'string') {
await this.collection(domain).createIndex(value, { sparse: true })
} else {
await this.collection(domain).createIndex(value.keys, { sparse: value.sparse ?? true })
}
} catch (err: any) {
console.error('failed to create index', domain, vv, err)
console.error('failed to create index', domain, value, err)
}
}
}
async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise<void> {
async removeOldIndex (domain: Domain, deletePattern: RegExp[], keepPattern: RegExp[]): Promise<void> {
try {
const existingIndexes = await this.collection(domain).indexes()
for (const existingIndex of existingIndexes) {
if (existingIndex.name !== undefined) {
const name: string = existingIndex.name
if (deletePattern.test(name) && !keepPattern.test(name)) {
if (
deletePattern.some((it) => it.test(name)) &&
(existingIndex.sparse !== true || !keepPattern.some((it) => it.test(name)))
) {
await this.collection(domain).dropIndex(name)
}
}
@ -604,6 +619,45 @@ abstract class MongoAdapterBase implements DbAdapter {
return false
}
findOps: number = 0
txOps: number = 0
opIndex: number = 0
async collectOps<T>(
ctx: MeasureContext,
domain: Domain | undefined,
operation: 'find' | 'tx',
op: (ctx: MeasureContext) => Promise<T>,
fullParam: FullParamsType
): Promise<T> {
const id = `${++this.opIndex}`
if (operation === 'find') {
this.findOps++
} else {
this.txOps++
}
const result = await ctx.with(
operation,
{ domain },
async (ctx) => await op(ctx),
() => ({
...fullParam,
id,
findOps: this.findOps,
txOps: this.txOps
})
)
if (operation === 'find') {
this.findOps--
} else {
this.txOps--
}
return result
}
@withContext('find-all')
async findAll<T extends Doc>(
ctx: MeasureContext,
_class: Ref<Class<T>>,
@ -612,58 +666,78 @@ abstract class MongoAdapterBase implements DbAdapter {
domain?: Domain // Allow to find for Doc's in specified domain only.
}
): Promise<FindResult<T>> {
if (options != null && (options?.lookup != null || this.isEnumSort(_class, options) || this.isRulesSort(options))) {
return await ctx.with('pipeline', {}, async (ctx) => await this.findWithPipeline(ctx, _class, query, options), {
_class,
query,
options
})
}
const domain = options?.domain ?? this.hierarchy.getDomain(_class)
const coll = this.collection(domain)
const mongoQuery = this.translateQuery(_class, query)
return await this.findRateLimit.exec(async () => {
return await this.collectOps(
this.globalCtx,
this.hierarchy.findDomain(_class),
'find',
async (ctx) => {
if (
options != null &&
(options?.lookup != null || this.isEnumSort(_class, options) || this.isRulesSort(options))
) {
return await ctx.with(
'pipeline',
{},
async (ctx) => await this.findWithPipeline(ctx, _class, query, options),
{
_class,
query,
options
}
)
}
const domain = options?.domain ?? this.hierarchy.getDomain(_class)
const coll = this.collection(domain)
const mongoQuery = this.translateQuery(_class, query)
let cursor = coll.find<T>(mongoQuery, {
checkKeys: false
let cursor = coll.find<T>(mongoQuery)
if (options?.projection !== undefined) {
const projection = this.calcProjection<T>(options, _class)
if (projection != null) {
cursor = cursor.project(projection)
}
}
let total: number = -1
if (options != null) {
if (options.sort !== undefined) {
const sort = this.collectSort<T>(options, _class)
if (sort !== undefined) {
cursor = cursor.sort(sort)
}
}
if (options.limit !== undefined || typeof query._id === 'string') {
if (options.total === true) {
total = await coll.countDocuments(mongoQuery)
}
cursor = cursor.limit(options.limit ?? 1)
}
}
// Error in case of timeout
try {
const res: T[] = await ctx.with('toArray', {}, async (ctx) => await toArray(cursor), {
mongoQuery,
options,
domain
})
if (options?.total === true && options?.limit === undefined) {
total = res.length
}
return toFindResult(this.stripHash(res), total)
} catch (e) {
console.error('error during executing cursor in findAll', _class, cutObjectArray(query), options, e)
throw e
}
},
{
_class,
query,
options
}
)
})
if (options?.projection !== undefined) {
const projection = this.calcProjection<T>(options, _class)
if (projection != null) {
cursor = cursor.project(projection)
}
}
let total: number = -1
if (options != null) {
if (options.sort !== undefined) {
const sort = this.collectSort<T>(options, _class)
if (sort !== undefined) {
cursor = cursor.sort(sort)
}
}
if (options.limit !== undefined || typeof query._id === 'string') {
if (options.total === true) {
total = await coll.countDocuments(mongoQuery)
}
cursor = cursor.limit(options.limit ?? 1)
}
}
// Error in case of timeout
try {
const res: T[] = await ctx.with('toArray', {}, async (ctx) => await toArray(cursor), {
mongoQuery,
options,
domain
})
if (options?.total === true && options?.limit === undefined) {
total = res.length
}
return toFindResult(this.stripHash(res), total)
} catch (e) {
console.error('error during executing cursor in findAll', _class, cutObjectArray(query), options, e)
throw e
}
}
private collectSort<T extends Doc>(
@ -875,7 +949,10 @@ abstract class MongoAdapterBase implements DbAdapter {
}
}
}
})
}),
{
ordered: false
}
)
})
} catch (err: any) {
@ -896,10 +973,15 @@ abstract class MongoAdapterBase implements DbAdapter {
}
}
interface DomainOperation {
raw: () => Promise<TxResult>
domain: Domain
bulk?: AnyBulkWriteOperation[]
interface OperationBulk {
add: Doc[]
update: Map<Ref<Doc>, Partial<Doc>>
bulkOperations: AnyBulkWriteOperation<Doc>[]
findUpdate: Set<Ref<Doc>>
raw: (() => Promise<TxResult>)[]
}
class MongoAdapter extends MongoAdapterBase {
@ -907,100 +989,138 @@ class MongoAdapter extends MongoAdapterBase {
await this._db.init()
}
getOperations (tx: Tx): DomainOperation | undefined {
updateBulk (bulk: OperationBulk, tx: Tx): void {
switch (tx._class) {
case core.class.TxCreateDoc:
return this.txCreateDoc(tx as TxCreateDoc<Doc>)
this.txCreateDoc(bulk, tx as TxCreateDoc<Doc>)
break
case core.class.TxCollectionCUD:
return this.txCollectionCUD(tx as TxCollectionCUD<Doc, AttachedDoc>)
this.txCollectionCUD(bulk, tx as TxCollectionCUD<Doc, AttachedDoc>)
break
case core.class.TxUpdateDoc:
return this.txUpdateDoc(tx as TxUpdateDoc<Doc>)
this.txUpdateDoc(bulk, tx as TxUpdateDoc<Doc>)
break
case core.class.TxRemoveDoc:
return this.txRemoveDoc(tx as TxRemoveDoc<Doc>)
this.txRemoveDoc(bulk, tx as TxRemoveDoc<Doc>)
break
case core.class.TxMixin:
return this.txMixin(tx as TxMixin<Doc, Doc>)
this.txMixin(bulk, tx as TxMixin<Doc, Doc>)
break
case core.class.TxApplyIf:
return undefined
default:
console.error('Unknown/Unsupported operation:', tx._class, tx)
break
}
console.error('Unknown/Unsupported operation:', tx._class, tx)
}
@withContext('tx')
async tx (ctx: MeasureContext, ...txes: Tx[]): Promise<TxResult[]> {
const result: TxResult[] = []
const bulkOperations: DomainOperation[] = []
let lastDomain: Domain | undefined
const bulkExecute = async (): Promise<void> => {
if (lastDomain === undefined || bulkOperations.length === 0) {
return
const h = this.hierarchy
const byDomain = groupByArray(txes, (it) => {
if (TxProcessor.isExtendsCUD(it._class)) {
return h.findDomain((it as TxCUD<Doc>).objectClass)
}
const ops = bulkOperations.reduce<AnyBulkWriteOperation[]>((ops, op) => ops.concat(...(op.bulk ?? [])), [])
try {
await this.db.collection(lastDomain).bulkWrite(ops)
} catch (err: any) {
console.trace(err)
throw err
}
bulkOperations.splice(0, bulkOperations.length)
lastDomain = undefined
}
return undefined
})
if (txes.length > 1) {
for (const tx of txes) {
const dop: DomainOperation | undefined = this.getOperations(tx)
if (dop === undefined) {
continue
}
if (dop.bulk === undefined) {
// Execute previous bulk and capture result.
await ctx.with(
'bulkExecute',
{},
async () => {
await bulkExecute()
},
{ txes: cutObjectArray(tx) }
)
try {
result.push(await dop.raw())
} catch (err: any) {
console.error(err)
for (const [domain, txs] of byDomain) {
if (domain === undefined) {
continue
}
const domainBulk: OperationBulk = {
add: [],
update: new Map(),
bulkOperations: [],
findUpdate: new Set(),
raw: []
}
for (const t of txs) {
this.updateBulk(domainBulk, t)
}
if (
domainBulk.add.length === 0 &&
domainBulk.update.size === 0 &&
domainBulk.bulkOperations.length === 0 &&
domainBulk.findUpdate.size === 0 &&
domainBulk.raw.length === 0
) {
continue
}
await this.rateLimit.exec(async () => {
await this.collectOps(
this.globalCtx,
domain,
'tx',
async (ctx) => {
const coll = this.db.collection<Doc>(domain)
// Minir optimizations
// Add Remove optimization
if (domainBulk.add.length > 0) {
await ctx.with('insertMany', {}, async () => {
await coll.insertMany(domainBulk.add, { ordered: false })
})
}
if (domainBulk.update.size > 0) {
// Extract similar update to update many if possible
// TODO:
await ctx.with('updateMany-bulk', {}, async () => {
await coll.bulkWrite(
Array.from(domainBulk.update.entries()).map((it) => ({
updateOne: {
filter: { _id: it[0] },
update: {
$set: it[1]
}
}
})),
{
ordered: false
}
)
})
}
if (domainBulk.bulkOperations.length > 0) {
await ctx.with('bulkWrite', {}, async () => {
await coll.bulkWrite(domainBulk.bulkOperations, {
ordered: false
})
})
}
if (domainBulk.findUpdate.size > 0) {
await ctx.with('find-result', {}, async () => {
const docs = await coll.find({ _id: { $in: Array.from(domainBulk.findUpdate) } }).toArray()
result.push(...docs)
})
}
if (domainBulk.raw.length > 0) {
await ctx.with('raw', {}, async () => {
for (const r of domainBulk.raw) {
result.push({ object: await r() })
}
})
}
},
{
domain,
add: domainBulk.add.length,
update: domainBulk.update.size,
bulk: domainBulk.bulkOperations.length,
find: domainBulk.findUpdate.size,
raw: domainBulk.raw.length
}
continue
}
if (lastDomain === undefined) {
lastDomain = dop.domain
}
if (lastDomain !== dop.domain) {
// If we have domain switch, let's execute previous bulk and start new one.
await ctx.with(
'bulkExecute',
{},
async () => {
await bulkExecute()
},
{ operations: cutObjectArray(bulkOperations) }
)
lastDomain = dop.domain
}
bulkOperations.push(dop)
}
await ctx.with('bulkExecute', {}, async () => {
await bulkExecute()
)
})
} else {
const r = await this.getOperations(txes[0])?.raw()
if (r !== undefined) {
result.push(r)
}
}
return result
}
protected txCollectionCUD (tx: TxCollectionCUD<Doc, AttachedDoc>): DomainOperation {
protected txCollectionCUD (bulk: OperationBulk, tx: TxCollectionCUD<Doc, AttachedDoc>): void {
// We need update only create transactions to contain attached, attachedToClass.
if (tx.tx._class === core.class.TxCreateDoc) {
const createTx = tx.tx as TxCreateDoc<AttachedDoc>
@ -1013,24 +1133,18 @@ class MongoAdapter extends MongoAdapterBase {
collection: tx.collection
}
}
return this.txCreateDoc(d)
this.txCreateDoc(bulk, d)
return
}
// We could cast since we know collection cud is supported.
return this.getOperations(tx.tx) as DomainOperation
this.updateBulk(bulk, tx.tx)
}
protected txRemoveDoc (tx: TxRemoveDoc<Doc>): DomainOperation {
const domain = this.hierarchy.getDomain(tx.objectClass)
return {
raw: async () => await this.collection(domain).deleteOne({ _id: tx.objectId }),
domain,
bulk: [{ deleteOne: { filter: { _id: tx.objectId } } }]
}
protected txRemoveDoc (bulk: OperationBulk, tx: TxRemoveDoc<Doc>): void {
bulk.bulkOperations.push({ deleteOne: { filter: { _id: tx.objectId } } })
}
protected txMixin (tx: TxMixin<Doc, Doc>): DomainOperation {
const domain = this.hierarchy.getDomain(tx.objectClass)
protected txMixin (bulk: OperationBulk, tx: TxMixin<Doc, Doc>): void {
const filter = { _id: tx.objectId }
const modifyOp = {
modifiedBy: tx.modifiedBy,
@ -1051,38 +1165,29 @@ class MongoAdapter extends MongoAdapterBase {
}
}
]
return {
raw: async () => await this.collection(domain).bulkWrite(ops),
domain,
bulk: ops
}
bulk.bulkOperations.push(...ops)
return
}
const update = { ...this.translateMixinAttrs(tx.mixin, tx.attributes), $set: { ...modifyOp } }
return {
raw: async () => await this.collection(domain).updateOne(filter, update),
domain,
bulk: [
{
updateOne: {
filter,
update
}
}
]
}
}
const update = { $set: { ...this.translateMixinAttrs(tx.mixin, tx.attributes), ...modifyOp } }
return {
raw: async () => await this.collection(domain).updateOne(filter, update),
domain,
bulk: [
{
updateOne: {
filter,
update
}
bulk.bulkOperations.push({
updateOne: {
filter,
update
}
]
})
return
}
const update = { ...this.translateMixinAttrs(tx.mixin, tx.attributes), ...modifyOp }
let upd = bulk.update.get(tx.objectId)
if (upd === undefined) {
upd = {}
bulk.update.set(tx.objectId, upd)
}
for (const [k, v] of Object.entries(update)) {
;(upd as any)[k] = v
}
}
@ -1106,23 +1211,12 @@ class MongoAdapter extends MongoAdapterBase {
return attrs
}
protected txCreateDoc (tx: TxCreateDoc<Doc>): DomainOperation {
protected txCreateDoc (bulk: OperationBulk, tx: TxCreateDoc<Doc>): void {
const doc = TxProcessor.createDoc2Doc(tx)
const domain = this.hierarchy.getDomain(doc._class)
const tdoc = translateDoc(doc)
return {
raw: async () => await this.collection(domain).insertOne(tdoc),
domain,
bulk: [
{
insertOne: { document: tdoc }
}
]
}
bulk.add.push(translateDoc(doc))
}
protected txUpdateDoc (tx: TxUpdateDoc<Doc>): DomainOperation {
const domain = this.hierarchy.getDomain(tx.objectClass)
protected txUpdateDoc (bulk: OperationBulk, tx: TxUpdateDoc<Doc>): void {
if (isOperator(tx.operations)) {
const operator = Object.keys(tx.operations)[0]
if (operator === '$move') {
@ -1163,11 +1257,7 @@ class MongoAdapter extends MongoAdapterBase {
}
}
]
return {
raw: async () => await this.collection(domain).bulkWrite(ops),
domain,
bulk: ops
}
bulk.bulkOperations.push(...ops)
} else if (operator === '$update') {
const keyval = (tx.operations as any).$update
const arr = Object.keys(keyval)[0]
@ -1200,15 +1290,13 @@ class MongoAdapter extends MongoAdapterBase {
}
}
]
return {
raw: async () => await this.collection(domain).bulkWrite(ops),
domain,
bulk: ops
}
bulk.bulkOperations.push(...ops)
} else {
const domain = this.hierarchy.getDomain(tx.objectClass)
if (tx.retrieve === true) {
const raw = async (): Promise<TxResult> => {
const result = await this.collection(domain).findOneAndUpdate(
bulk.raw.push(async () => {
const res = await this.collection(domain).findOneAndUpdate(
{ _id: tx.objectId },
{
...tx.operations,
@ -1220,76 +1308,72 @@ class MongoAdapter extends MongoAdapterBase {
} as unknown as UpdateFilter<Document>,
{ returnDocument: 'after', includeResultMetadata: true }
)
return { object: result.value }
}
return {
raw,
domain,
bulk: undefined
}
return res.value as TxResult
})
} else {
const filter = { _id: tx.objectId }
const update = {
...tx.operations,
$set: {
modifiedBy: tx.modifiedBy,
modifiedOn: tx.modifiedOn,
'%hash%': null
bulk.bulkOperations.push({
updateOne: {
filter: { _id: tx.objectId },
update: {
...tx.operations,
$set: {
modifiedBy: tx.modifiedBy,
modifiedOn: tx.modifiedOn,
'%hash%': null
}
}
}
}
return {
raw: async () => await this.collection(domain).updateOne(filter, update),
domain,
bulk: [{ updateOne: { filter, update } }]
}
})
}
}
} else {
const filter = { _id: tx.objectId }
const update = {
$set: {
...tx.operations,
modifiedBy: tx.modifiedBy,
modifiedOn: tx.modifiedOn,
'%hash%': null
}
let upd = bulk.update.get(tx.objectId)
if (upd === undefined) {
upd = {}
bulk.update.set(tx.objectId, upd)
}
const raw =
tx.retrieve === true
? async (): Promise<TxResult> => {
const result = await this.db
.collection(domain)
.findOneAndUpdate(filter, update, { returnDocument: 'after', includeResultMetadata: true })
return { object: result.value }
}
: async () => await this.collection(domain).updateOne(filter, update)
// Disable bulk for operators
return {
raw,
domain,
bulk: [{ updateOne: { filter, update } }]
for (const [k, v] of Object.entries({
...tx.operations,
modifiedBy: tx.modifiedBy,
modifiedOn: tx.modifiedOn,
'%hash%': null
})) {
;(upd as any)[k] = v
}
if (tx.retrieve === true) {
bulk.findUpdate.add(tx.objectId)
}
}
}
}
class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
txColl: Collection | undefined
txColl: Collection<Doc> | undefined
async init (): Promise<void> {
await this._db.init(DOMAIN_TX)
}
@withContext('tx')
override async tx (ctx: MeasureContext, ...tx: Tx[]): Promise<TxResult[]> {
if (tx.length === 0) {
return []
}
await ctx.with('insertMany', {}, async () => await this.txCollection().insertMany(tx.map((it) => translateDoc(it))))
await this.collectOps(
this.globalCtx,
DOMAIN_TX,
'tx',
async () => {
await this.txCollection().insertMany(tx.map((it) => translateDoc(it)))
},
{ tx: tx.length }
)
return []
}
private txCollection (): Collection {
private txCollection (): Collection<Doc> {
if (this.txColl !== undefined) {
return this.txColl
}
@ -1297,6 +1381,7 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
return this.txColl
}
@withContext('get-model')
async getModel (ctx: MeasureContext): Promise<Tx[]> {
const cursor = await ctx.with('find', {}, async () =>
this.db.collection<Tx>(DOMAIN_TX).find(
@ -1461,7 +1546,7 @@ export async function createMongoAdapter (
const client = getMongoClient(url)
const db = getWorkspaceDB(await client.getClient(), workspaceId)
return new MongoAdapter(db, hierarchy, modelDb, client, options)
return new MongoAdapter(ctx.newChild('mongoDb', {}), db, hierarchy, modelDb, client, options)
}
/**
@ -1476,5 +1561,6 @@ export async function createMongoTxAdapter (
): Promise<TxAdapter> {
const client = getMongoClient(url)
const db = getWorkspaceDB(await client.getClient(), workspaceId)
return new MongoTxAdapter(db, hierarchy, modelDb, client)
return new MongoTxAdapter(ctx.newChild('mongoDbTx', {}), db, hierarchy, modelDb, client)
}

View File

@ -13,7 +13,14 @@
// limitations under the License.
//
import { toWorkspaceString, type Doc, type Domain, type FieldIndex, type WorkspaceId } from '@hcengineering/core'
import {
generateId,
toWorkspaceString,
type Doc,
type Domain,
type FieldIndexConfig,
type WorkspaceId
} from '@hcengineering/core'
import { PlatformError, unknownStatus } from '@hcengineering/platform'
import { type DomainHelperOperations } from '@hcengineering/server-core'
import { MongoClient, type Collection, type Db, type Document, type MongoClientOptions } from 'mongodb'
@ -27,10 +34,15 @@ process.on('exit', () => {
})
})
const clientRefs = new Map<string, ClientRef>()
/**
* @public
*/
export async function shutdown (): Promise<void> {
for (const it of Array.from(clientRefs.values())) {
console.error((it as any).stack)
}
for (const c of connections.values()) {
c.close(true)
}
@ -78,9 +90,12 @@ class MongoClientReferenceImpl {
this.count++
}
}
export class ClientRef implements MongoClientReference {
constructor (readonly client: MongoClientReferenceImpl) {}
id = generateId()
stack = new Error().stack
constructor (readonly client: MongoClientReferenceImpl) {
clientRefs.set(this.id, this)
}
closed = false
async getClient (): Promise<MongoClient> {
@ -94,6 +109,7 @@ export class ClientRef implements MongoClientReference {
close (): void {
// Do not allow double close of mongo connection client
if (!this.closed) {
clientRefs.delete(this.id)
this.closed = true
this.client.close()
}
@ -106,13 +122,14 @@ export class ClientRef implements MongoClientReference {
*/
export function getMongoClient (uri: string, options?: MongoClientOptions): MongoClientReference {
const extraOptions = JSON.parse(process.env.MONGO_OPTIONS ?? '{}')
const key = `${uri}${process.env.MONGO_OPTIONS}_${JSON.stringify(options)}`
const key = `${uri}${process.env.MONGO_OPTIONS ?? '{}'}_${JSON.stringify(options ?? {})}`
let existing = connections.get(key)
// If not created or closed
if (existing === undefined) {
existing = new MongoClientReferenceImpl(
MongoClient.connect(uri, {
appName: 'transactor',
...options,
enableUtf8Validation: false,
...extraOptions
@ -184,8 +201,20 @@ export class DBCollectionHelper implements DomainHelperOperations {
return this.collections.has(domain)
}
async createIndex (domain: Domain, value: string | FieldIndex<Doc>, options?: { name: string }): Promise<void> {
await this.collection(domain).createIndex(value, options)
async createIndex (domain: Domain, value: string | FieldIndexConfig<Doc>, options?: { name: string }): Promise<void> {
if (typeof value === 'string') {
await this.collection(domain).createIndex(value, options)
} else {
if (value.filter !== undefined) {
await this.collection(domain).createIndex(value.keys, {
...options,
sparse: false,
partialFilterExpression: value.filter
})
} else {
await this.collection(domain).createIndex(value.keys, { ...options, sparse: value.sparse ?? true })
}
}
}
async dropIndex (domain: Domain, name: string): Promise<void> {

View File

@ -58,7 +58,7 @@ class StorageBlobAdapter implements DbAdapter {
}
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {}
async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise<void> {}
async removeOldIndex (domain: Domain, deletePattern: RegExp[], keepPattern: RegExp[]): Promise<void> {}
async close (): Promise<void> {
await this.blobAdapter.close()

View File

@ -1,4 +1,4 @@
import { MeasureContext, MeasureLogger, ParamType, ParamsType } from '@hcengineering/core'
import { MeasureContext, MeasureLogger, ParamType, ParamsType, type FullParamsType } from '@hcengineering/core'
import apm, { Agent, Span, Transaction } from 'elastic-apm-node'
/**
@ -71,7 +71,7 @@ export class APMMeasureContext implements MeasureContext {
name: string,
params: ParamsType,
op: (ctx: MeasureContext) => T | Promise<T>,
fullParams?: ParamsType
fullParams?: FullParamsType | (() => FullParamsType)
): Promise<T> {
const c = this.newChild(name, params)
try {

View File

@ -18,7 +18,7 @@ export interface ServerEnv {
export function serverConfigFromEnv (): ServerEnv {
const serverPort = parseInt(process.env.SERVER_PORT ?? '3333')
const enableCompression = (process.env.ENABLE_COMPRESSION ?? 'false') === 'true'
const enableCompression = (process.env.ENABLE_COMPRESSION ?? 'true') === 'true'
const url = process.env.MONGO_URL
if (url === undefined) {

View File

@ -18,6 +18,7 @@ import core, {
BackupClient,
Branding,
Client as CoreClient,
coreId,
DOMAIN_BENCHMARK,
DOMAIN_MIGRATION,
DOMAIN_MODEL,
@ -37,7 +38,7 @@ import core, {
type Doc,
type TxCUD
} from '@hcengineering/core'
import { consoleModelLogger, MigrateOperation, ModelLogger } from '@hcengineering/model'
import { consoleModelLogger, MigrateOperation, ModelLogger, tryMigrate } from '@hcengineering/model'
import { createMongoTxAdapter, DBCollectionHelper, getMongoClient, getWorkspaceDB } from '@hcengineering/mongo'
import {
AggregatorStorageAdapter,
@ -180,7 +181,8 @@ export async function updateModel (
// Create update indexes
await createUpdateIndexes(
ctx,
connection,
connection.getHierarchy(),
connection.getModel(),
db,
logger,
async (value) => {
@ -236,13 +238,6 @@ export async function initializeWorkspace (
}
}
export function getStorageAdapter (): StorageAdapter {
const { mongodbUri } = prepareTools([])
const storageConfig: StorageConfiguration = storageConfigFromEnv()
return buildStorageFromConfig(storageConfig, mongodbUri)
}
/**
* @public
*/
@ -368,6 +363,27 @@ export async function upgradeModel (
await progress(20 + ((100 / migrateOperations.length) * i * 20) / 100)
i++
}
await tryMigrate(migrateClient, coreId, [
{
state: '#sparse',
func: async () => {
ctx.info('Migrate to sparse indexes')
// Create update indexes
await createUpdateIndexes(
ctx,
hierarchy,
modelDb,
db,
logger,
async (value) => {
await progress(90 + (Math.min(value, 100) / 100) * 10)
},
workspaceId
)
}
}
])
})
logger.log('Apply upgrade operations', { workspaceId: workspaceId.name })
@ -400,7 +416,7 @@ export async function upgradeModel (
await op[1].upgrade(migrateState, getUpgradeClient, logger)
})
logger.log('upgrade:', { operation: op[0], time: Date.now() - t, workspaceId: workspaceId.name })
await progress(60 + ((100 / migrateOperations.length) * i * 40) / 100)
await progress(60 + ((100 / migrateOperations.length) * i * 30) / 100)
i++
}
})
@ -460,33 +476,37 @@ async function fetchModelFromMongo (
const txAdapter = await createMongoTxAdapter(ctx, hierarchy, mongodbUri, workspaceId, modelDb)
model = model ?? (await ctx.with('get-model', {}, async (ctx) => await txAdapter.getModel(ctx)))
try {
model = model ?? (await ctx.with('get-model', {}, async (ctx) => await txAdapter.getModel(ctx)))
await ctx.with('build local model', {}, async () => {
for (const tx of model ?? []) {
try {
hierarchy.tx(tx)
} catch (err: any) {}
}
modelDb.addTxes(ctx, model as Tx[], false)
})
await txAdapter.close()
await ctx.with('build local model', {}, async () => {
for (const tx of model ?? []) {
try {
hierarchy.tx(tx)
} catch (err: any) {}
}
modelDb.addTxes(ctx, model as Tx[], false)
})
} finally {
await txAdapter.close()
}
return { hierarchy, modelDb, model }
}
async function createUpdateIndexes (
ctx: MeasureContext,
connection: CoreClient,
hierarchy: Hierarchy,
model: ModelDb,
db: Db,
logger: ModelLogger,
progress: (value: number) => Promise<void>,
workspaceId: WorkspaceId
): Promise<void> {
const domainHelper = new DomainIndexHelperImpl(ctx, connection.getHierarchy(), connection.getModel(), workspaceId)
const domainHelper = new DomainIndexHelperImpl(ctx, hierarchy, model, workspaceId)
const dbHelper = new DBCollectionHelper(db)
await dbHelper.init()
let completed = 0
const allDomains = connection.getHierarchy().domains()
const allDomains = hierarchy.domains()
for (const domain of allDomains) {
if (domain === DOMAIN_MODEL || domain === DOMAIN_TRANSIENT || domain === DOMAIN_BENCHMARK) {
continue

View File

@ -24,11 +24,14 @@ const config: PlaywrightTestConfig = {
snapshots: true,
screenshots: true,
sources: true
},
contextOptions: {
reducedMotion: 'reduce'
}
}
}
],
retries: 1,
retries: 2,
timeout: 60000,
maxFailures,
expect: {