From e6a35d2a035e4be6f2fb9598e6c8fc52b2becea9 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Tue, 30 Jan 2024 18:07:34 +0700 Subject: [PATCH] UBERF-4319: Performance changes (#4474) Signed-off-by: Andrey Sobolev --- dev/client-resources/src/connection.ts | 10 +- dev/tool/src/benchmark.ts | 42 +++- dev/tool/src/index.ts | 2 +- models/server-activity/src/migration.ts | 30 +-- packages/core/src/__tests__/client.test.ts | 5 +- packages/core/src/__tests__/connection.ts | 3 +- packages/core/src/client.ts | 25 ++- packages/core/src/measurements/context.ts | 23 ++- packages/core/src/measurements/types.ts | 5 +- packages/presentation/src/pipeline.ts | 14 +- packages/presentation/src/utils.ts | 69 +++++-- packages/query/src/__tests__/connection.ts | 3 +- packages/ui/src/components/Expandable.svelte | 3 +- plugins/client-resources/src/connection.ts | 24 ++- plugins/client/src/index.ts | 3 - plugins/devmodel-resources/src/index.ts | 7 +- .../src/components/CreateIssue.svelte | 174 +++++++++-------- .../src/components/ActionsPopup.svelte | 4 +- .../src/components/ServerManager.svelte | 183 +++++++++--------- .../components/statistics/MetricsInfo.svelte | 83 ++++++++ pods/server/src/server.ts | 14 +- .../activity-resources/src/index.ts | 48 ++++- .../notification-resources/src/index.ts | 6 +- server/core/src/pipeline.ts | 27 ++- server/core/src/storage.ts | 155 ++++++++------- server/core/src/triggers.ts | 12 +- server/core/src/types.ts | 11 ++ server/core/src/utils.ts | 24 --- server/middleware/src/spaceSecurity.ts | 4 +- server/mongo/src/__tests__/storage.test.ts | 5 +- server/server/src/apm.ts | 10 +- server/server/src/metrics.ts | 1 + server/tool/src/index.ts | 2 +- server/ws/src/client.ts | 1 + server/ws/src/server.ts | 140 +++++++++----- server/ws/src/server_http.ts | 18 +- server/ws/src/types.ts | 21 +- 37 files changed, 775 insertions(+), 436 deletions(-) create mode 100644 plugins/workbench-resources/src/components/statistics/MetricsInfo.svelte delete mode 100644 server/core/src/utils.ts diff --git a/dev/client-resources/src/connection.ts b/dev/client-resources/src/connection.ts index a07509e87c..be4b8a6fa0 100644 --- a/dev/client-resources/src/connection.ts +++ b/dev/client-resources/src/connection.ts @@ -34,7 +34,8 @@ import core, { TxResult, SearchQuery, SearchOptions, - SearchResult + SearchResult, + MeasureDoneOperation } from '@hcengineering/core' import { createInMemoryTxAdapter } from '@hcengineering/dev-storage' import devmodel from '@hcengineering/devmodel' @@ -104,6 +105,10 @@ class ServerStorageWrapper implements ClientConnection { async upload (domain: Domain, docs: Doc[]): Promise {} async clean (domain: Domain, docs: Ref[]): Promise {} + + async measure (operationName: string): Promise { + return async () => ({ time: 0, serverTime: 0 }) + } } async function createNullFullTextAdapter (): Promise { @@ -152,7 +157,8 @@ export async function connect (handler: (tx: Tx) => void): Promise { const st = Date.now() try { - void fetch(transactorUrl.replace('ws:/', 'http:/') + '/' + token) + const fetchUrl = transactorUrl.replace('ws:/', 'http:/') + '/api/v1/statistics?token=' + token + void fetch(fetchUrl) .then((res) => { void res .json() @@ -184,15 +204,17 @@ export async function benchmark ( memUsed = json.statistics.memoryUsed memTotal = json.statistics.memoryTotal cpu = json.statistics.cpuUsage - const r = - json.metrics?.measurements?.client?.measurements?.handleRequest?.measurements?.call?.measurements?.[ - 'find-all' - ] - operations = r?.operations ?? 0 - requestTime = (r?.value ?? 0) / (((r?.operations as number) ?? 0) + 1) - transfer = - json.metrics?.measurements?.client?.measurements?.handleRequest?.measurements?.['#send-data'] - ?.value ?? 0 + operations = 0 + requestTime = 0 + transfer = 0 + for (const w of workspaceId) { + const r = extract(json.metrics as Metrics, w.name, 'client', 'handleRequest', 'process', 'find-all') + operations += r?.operations ?? 0 + requestTime += (r?.value ?? 0) / (((r?.operations as number) ?? 0) + 1) + + const tr = extract(json.metrics as Metrics, w.name, 'client', 'handleRequest', '#send-data') + transfer += tr?.value ?? 0 + } }) .catch((err) => { console.log(err) diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 25ee149d0d..56fda5fdcc 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -562,7 +562,7 @@ export function devTool ( program .command('benchmark') - .description('clean archived spaces') + .description('benchmark') .option('--from ', 'Min client count', '10') .option('--steps ', 'Step with client count', '10') .option('--sleep ', 'Random Delay max between operations', '0') diff --git a/models/server-activity/src/migration.ts b/models/server-activity/src/migration.ts index f89dad5a00..cde26e27a4 100644 --- a/models/server-activity/src/migration.ts +++ b/models/server-activity/src/migration.ts @@ -13,30 +13,31 @@ // limitations under the License. // +import activity, { type DocUpdateMessage } from '@hcengineering/activity' import core, { + MeasureMetricsContext, + SortingOrder, + TxFactory, + TxProcessor, + toFindResult, + toIdMap, type AttachedDoc, type Class, type Doc, type Ref, type TxCUD, type TxCollectionCUD, - TxProcessor, - toIdMap, - SortingOrder, - TxFactory, - toFindResult, type TxCreateDoc } from '@hcengineering/core' -import activity, { type DocUpdateMessage } from '@hcengineering/activity' import { tryMigrate, type MigrateOperation, type MigrationClient, type MigrationIterator } from '@hcengineering/model' +import { DOMAIN_ACTIVITY } from '@hcengineering/model-activity' import { - type ActivityControl, - type DocObjectCache, getAllObjectTransactions, - serverActivityId + serverActivityId, + type ActivityControl, + type DocObjectCache } from '@hcengineering/server-activity' import { generateDocUpdateMessages } from '@hcengineering/server-activity-resources' -import { DOMAIN_ACTIVITY } from '@hcengineering/model-activity' function getActivityControl (client: MigrationClient): ActivityControl { const txFactory = new TxFactory(core.account.System, false) @@ -66,7 +67,14 @@ async function generateDocUpdateMessageByTx ( return } - const createCollectionCUDTxes = await generateDocUpdateMessages(tx, control, undefined, undefined, objectCache) + const createCollectionCUDTxes = await generateDocUpdateMessages( + new MeasureMetricsContext('migration', {}), + tx, + control, + undefined, + undefined, + objectCache + ) for (const collectionTx of createCollectionCUDTxes) { const createTx = collectionTx.tx as TxCreateDoc diff --git a/packages/core/src/__tests__/client.test.ts b/packages/core/src/__tests__/client.test.ts index aa8b363c88..8b47bcfa75 100644 --- a/packages/core/src/__tests__/client.test.ts +++ b/packages/core/src/__tests__/client.test.ts @@ -119,7 +119,10 @@ describe('client', () => { upload: async (domain: Domain, docs: Doc[]) => {}, clean: async (domain: Domain, docs: Ref[]) => {}, loadModel: async (last: Timestamp) => txes, - getAccount: async () => null as unknown as Account + getAccount: async () => null as unknown as Account, + measure: async () => { + return async () => ({ time: 0, serverTime: 0 }) + } } } const spyCreate = jest.spyOn(TxProcessor, 'createDoc2Doc') diff --git a/packages/core/src/__tests__/connection.ts b/packages/core/src/__tests__/connection.ts index 64d6f41c6a..69cf501eab 100644 --- a/packages/core/src/__tests__/connection.ts +++ b/packages/core/src/__tests__/connection.ts @@ -71,6 +71,7 @@ export async function connect (handler: (tx: Tx) => void): Promise {}, clean: async (domain: Domain, docs: Ref[]) => {}, loadModel: async (last: Timestamp) => txes, - getAccount: async () => null as unknown as Account + getAccount: async () => null as unknown as Account, + measure: async () => async () => ({ time: 0, serverTime: 0 }) } } diff --git a/packages/core/src/client.ts b/packages/core/src/client.ts index ae59c66c1a..eaed3821ee 100644 --- a/packages/core/src/client.ts +++ b/packages/core/src/client.ts @@ -19,8 +19,8 @@ import { Account, AttachedDoc, Class, DOMAIN_MODEL, Doc, Domain, PluginConfigura import core from './component' import { Hierarchy } from './hierarchy' import { ModelDb } from './memdb' -import type { DocumentQuery, FindOptions, FindResult, Storage, FulltextStorage, TxResult, WithLookup } from './storage' -import { SortingOrder, SearchQuery, SearchOptions, SearchResult } from './storage' +import type { DocumentQuery, FindOptions, FindResult, FulltextStorage, Storage, TxResult, WithLookup } from './storage' +import { SearchOptions, SearchQuery, SearchResult, SortingOrder } from './storage' import { Tx, TxCUD, TxCollectionCUD, TxCreateDoc, TxProcessor, TxUpdateDoc } from './tx' import { toFindResult } from './utils' @@ -46,10 +46,17 @@ export interface Client extends Storage, FulltextStorage { close: () => Promise } +export type MeasureDoneOperation = () => Promise<{ time: number, serverTime: number }> + +export interface MeasureClient extends Client { + // Will perform on server operation measure and will return a local client time and on server time + measure: (operationName: string) => Promise +} + /** * @public */ -export interface AccountClient extends Client { +export interface AccountClient extends MeasureClient { getAccount: () => Promise } @@ -86,9 +93,11 @@ export interface ClientConnection extends Storage, FulltextStorage, BackupClient // If hash is passed, will return LoadModelResponse loadModel: (last: Timestamp, hash?: string) => Promise getAccount: () => Promise + + measure: (operationName: string) => Promise } -class ClientImpl implements AccountClient, BackupClient { +class ClientImpl implements AccountClient, BackupClient, MeasureClient { notify?: (tx: Tx) => void hierarchy!: Hierarchy model!: ModelDb @@ -151,6 +160,10 @@ class ClientImpl implements AccountClient, BackupClient { return result } + async measure (operationName: string): Promise { + return await this.conn.measure(operationName) + } + async updateFromRemote (tx: Tx): Promise { if (tx.objectSpace === core.space.Model) { this.hierarchy.tx(tx) @@ -402,14 +415,14 @@ async function buildModel ( try { hierarchy.tx(tx) } catch (err: any) { - console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) + console.error('failed to apply model transaction, skipping', tx._id, tx._class, err?.message) } } for (const tx of txes) { try { await model.tx(tx) } catch (err: any) { - console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) + console.error('failed to apply model transaction, skipping', tx._id, tx._class, err?.message) } } } diff --git a/packages/core/src/measurements/context.ts b/packages/core/src/measurements/context.ts index 1d3a04031c..ca42ebd198 100644 --- a/packages/core/src/measurements/context.ts +++ b/packages/core/src/measurements/context.ts @@ -13,13 +13,18 @@ export class MeasureMetricsContext implements MeasureContext { metrics: Metrics private readonly done: (value?: number) => void - constructor (name: string, params: Record, metrics: Metrics = newMetrics()) { + constructor ( + name: string, + params: Record, + metrics: Metrics = newMetrics(), + logger?: MeasureLogger + ) { this.name = name this.params = params this.metrics = metrics this.done = measure(metrics, params) - this.logger = { + this.logger = logger ?? { info: (msg, args) => { console.info(msg, ...args) }, @@ -34,8 +39,8 @@ export class MeasureMetricsContext implements MeasureContext { c.done(value) } - newChild (name: string, params: Record): MeasureContext { - return new MeasureMetricsContext(name, params, childMetrics(this.metrics, [name])) + newChild (name: string, params: Record, logger?: MeasureLogger): MeasureContext { + return new MeasureMetricsContext(name, params, childMetrics(this.metrics, [name]), logger) } async with( @@ -52,13 +57,17 @@ export class MeasureMetricsContext implements MeasureContext { c.end() return value } catch (err: any) { - await c.error(err) + await c.error('Error during:' + name, err) throw err } } - async error (err: Error | string): Promise { - console.error(err) + async error (message: string, ...args: any[]): Promise { + this.logger.error(message, args) + } + + async info (message: string, ...args: any[]): Promise { + this.logger.info(message, args) } end (): void { diff --git a/packages/core/src/measurements/types.ts b/packages/core/src/measurements/types.ts index b65a4cf6bd..c870c65d71 100644 --- a/packages/core/src/measurements/types.ts +++ b/packages/core/src/measurements/types.ts @@ -31,7 +31,7 @@ export interface MeasureLogger { */ export interface MeasureContext { // Create a child metrics context - newChild: (name: string, params: Record) => MeasureContext + newChild: (name: string, params: Record, logger?: MeasureLogger) => MeasureContext with: (name: string, params: Record, op: (ctx: MeasureContext) => T | Promise) => Promise @@ -40,7 +40,8 @@ export interface MeasureContext { measure: (name: string, value: number) => void // Capture error - error: (err: Error | string | any) => Promise + error: (message: string, ...args: any[]) => Promise + info: (message: string, ...args: any[]) => Promise // Mark current context as complete // If no value is passed, time difference will be used. diff --git a/packages/presentation/src/pipeline.ts b/packages/presentation/src/pipeline.ts index 0f52c8c53d..2530f926d7 100644 --- a/packages/presentation/src/pipeline.ts +++ b/packages/presentation/src/pipeline.ts @@ -14,7 +14,9 @@ import { toFindResult, type SearchQuery, type SearchOptions, - type SearchResult + type SearchResult, + type MeasureClient, + type MeasureDoneOperation } from '@hcengineering/core' import { type Resource } from '@hcengineering/platform' @@ -62,7 +64,7 @@ export type PresentationMiddlewareCreator = (client: Client, next?: Presentation /** * @public */ -export interface PresentationPipeline extends Client, Exclude { +export interface PresentationPipeline extends MeasureClient, Exclude { close: () => Promise } @@ -72,7 +74,7 @@ export interface PresentationPipeline extends Client, Exclude { + return await this.client.measure(operationName) + } + + static create (client: MeasureClient, constructors: PresentationMiddlewareCreator[]): PresentationPipeline { const pipeline = new PresentationPipelineImpl(client) pipeline.head = pipeline.buildChain(constructors) return pipeline diff --git a/packages/presentation/src/utils.ts b/packages/presentation/src/utils.ts index 0b477d2cf2..ae0f1bedf7 100644 --- a/packages/presentation/src/utils.ts +++ b/packages/presentation/src/utils.ts @@ -16,7 +16,6 @@ import core, { TxOperations, - type TypeAny, getCurrentAccount, type AnyAttribute, type ArrOf, @@ -29,6 +28,8 @@ import core, { type FindOptions, type FindResult, type Hierarchy, + type MeasureClient, + type MeasureDoneOperation, type Mixin, type Obj, type Ref, @@ -38,6 +39,7 @@ import core, { type SearchResult, type Tx, type TxResult, + type TypeAny, type WithLookup } from '@hcengineering/core' import { getMetadata, getResource } from '@hcengineering/platform' @@ -51,7 +53,7 @@ import { OptimizeQueryMiddleware, PresentationPipelineImpl, type PresentationPip import plugin from './plugin' let liveQuery: LQ -let client: TxOperations +let client: TxOperations & MeasureClient let pipeline: PresentationPipeline const txListeners: Array<(tx: Tx) => void> = [] @@ -73,14 +75,35 @@ export function removeTxListener (l: (tx: Tx) => void): void { } } -class UIClient extends TxOperations implements Client { +class UIClient extends TxOperations implements Client, MeasureClient { constructor ( - client: Client, + client: MeasureClient, private readonly liveQuery: Client ) { super(client, getCurrentAccount()._id) } + afterMeasure: Tx[] = [] + measureOp?: MeasureDoneOperation + + async doNotify (tx: Tx): Promise { + if (this.measureOp !== undefined) { + this.afterMeasure.push(tx) + } else { + try { + await pipeline.notifyTx(tx) + + await liveQuery.tx(tx) + + txListeners.forEach((it) => { + it(tx) + }) + } catch (err: any) { + console.log(err) + } + } + } + override async findAll( _class: Ref>, query: DocumentQuery, @@ -104,19 +127,38 @@ class UIClient extends TxOperations implements Client { async searchFulltext (query: SearchQuery, options: SearchOptions): Promise { return await this.client.searchFulltext(query, options) } + + async measure (operationName: string): Promise { + // return await (this.client as MeasureClient).measure(operationName) + const mop = await (this.client as MeasureClient).measure(operationName) + this.measureOp = mop + return async () => { + const result = await mop() + this.measureOp = undefined + if (this.afterMeasure.length > 0) { + const txes = this.afterMeasure + console.log('after measture', txes) + this.afterMeasure = [] + for (const tx of txes) { + await this.doNotify(tx) + } + } + return result + } + } } /** * @public */ -export function getClient (): TxOperations { +export function getClient (): TxOperations & MeasureClient { return client } /** * @public */ -export async function setClient (_client: Client): Promise { +export async function setClient (_client: MeasureClient): Promise { if (liveQuery !== undefined) { await liveQuery.close() } @@ -131,20 +173,11 @@ export async function setClient (_client: Client): Promise { const needRefresh = liveQuery !== undefined liveQuery = new LQ(pipeline) - client = new UIClient(pipeline, liveQuery) + const uiClient = new UIClient(pipeline, liveQuery) + client = uiClient _client.notify = (tx: Tx) => { - pipeline.notifyTx(tx).catch((err) => { - console.log(err) - }) - - liveQuery.tx(tx).catch((err) => { - console.log(err) - }) - - txListeners.forEach((it) => { - it(tx) - }) + void uiClient.doNotify(tx) } if (needRefresh || globalQueries.length > 0) { await refreshClient() diff --git a/packages/query/src/__tests__/connection.ts b/packages/query/src/__tests__/connection.ts index 838faa1543..d065475ce7 100644 --- a/packages/query/src/__tests__/connection.ts +++ b/packages/query/src/__tests__/connection.ts @@ -95,6 +95,7 @@ FulltextStorage & { searchFulltext: async (query: SearchQuery, options: SearchOptions): Promise => { return { docs: [] } - } + }, + measure: async () => async () => ({ time: 0, serverTime: 0 }) } } diff --git a/packages/ui/src/components/Expandable.svelte b/packages/ui/src/components/Expandable.svelte index 54802dc333..d6ea835a5f 100644 --- a/packages/ui/src/components/Expandable.svelte +++ b/packages/ui/src/components/Expandable.svelte @@ -25,6 +25,7 @@ export let bordered: boolean = false export let expandable = true export let contentColor = false + export let showChevron = true
@@ -38,7 +39,7 @@ if (expandable) expanded = !expanded }} > - + {#if icon}
diff --git a/plugins/client-resources/src/connection.ts b/plugins/client-resources/src/connection.ts index 0a61777260..72d87311ab 100644 --- a/plugins/client-resources/src/connection.ts +++ b/plugins/client-resources/src/connection.ts @@ -38,7 +38,8 @@ import core, { generateId, SearchQuery, SearchOptions, - SearchResult + SearchResult, + MeasureDoneOperation } from '@hcengineering/core' import { PlatformError, UNAUTHORIZED, broadcastEvent, getMetadata, unknownError } from '@hcengineering/platform' @@ -376,6 +377,27 @@ class Connection implements ClientConnection { return await promise.promise } + async measure (operationName: string): Promise { + const dateNow = Date.now() + + // Send measure-start + const mid = await this.sendRequest({ + method: 'measure', + params: [operationName] + }) + return async () => { + const serverTime: number = await this.sendRequest({ + method: 'measure-done', + params: [operationName, mid] + }) + + return { + time: Date.now() - dateNow, + serverTime + } + } + } + async loadModel (last: Timestamp, hash?: string): Promise { return await this.sendRequest({ method: 'loadModel', params: [last, hash] }) } diff --git a/plugins/client/src/index.ts b/plugins/client/src/index.ts index 5619512d9a..aa079af9bd 100644 --- a/plugins/client/src/index.ts +++ b/plugins/client/src/index.ts @@ -16,9 +16,6 @@ import type { AccountClient, ClientConnectEvent } from '@hcengineering/core' import type { Plugin, Resource } from '@hcengineering/platform' import { Metadata, plugin } from '@hcengineering/platform' -// import type { LiveQuery } from '@hcengineering/query' - -// export type Connection = Client & LiveQuery & TxOperations /** * @public diff --git a/plugins/devmodel-resources/src/index.ts b/plugins/devmodel-resources/src/index.ts index 4c92a0772f..4f1aa58467 100644 --- a/plugins/devmodel-resources/src/index.ts +++ b/plugins/devmodel-resources/src/index.ts @@ -30,7 +30,8 @@ import core, { type WithLookup, type SearchQuery, type SearchOptions, - type SearchResult + type SearchResult, + type MeasureDoneOperation } from '@hcengineering/core' import { devModelId } from '@hcengineering/devmodel' import { Builder } from '@hcengineering/model' @@ -68,6 +69,10 @@ class ModelClient implements AccountClient { } } + async measure (operationName: string): Promise { + return await this.client.measure(operationName) + } + notify?: (tx: Tx) => void getHierarchy (): Hierarchy { diff --git a/plugins/tracker-resources/src/components/CreateIssue.svelte b/plugins/tracker-resources/src/components/CreateIssue.svelte index 086a61f1d5..7a0089ce0a 100644 --- a/plugins/tracker-resources/src/components/CreateIssue.svelte +++ b/plugins/tracker-resources/src/components/CreateIssue.svelte @@ -352,97 +352,105 @@ return } - const operations = client.apply(_id) + // TODO: We need a measure client and mark all operations with it as measure under one root, + // to prevent other operations to infer our measurement. + const doneOp = await getClient().measure('tracker.createIssue') - const lastOne = await client.findOne(tracker.class.Issue, {}, { sort: { rank: SortingOrder.Descending } }) - const incResult = await client.updateDoc( - tracker.class.Project, - core.space.Space, - _space, - { - $inc: { sequence: 1 } - }, - true - ) + try { + const operations = client.apply(_id) - const value: DocData = { - title: getTitle(object.title), - description: object.description, - assignee: object.assignee, - component: object.component, - milestone: object.milestone, - number: (incResult as any).object.sequence, - status: object.status, - priority: object.priority, - rank: calcRank(lastOne, undefined), - comments: 0, - subIssues: 0, - dueDate: object.dueDate, - parents: - parentIssue != null - ? [ - { parentId: parentIssue._id, parentTitle: parentIssue.title, space: parentIssue.space }, - ...parentIssue.parents - ] - : [], - reportedTime: 0, - remainingTime: 0, - estimation: object.estimation, - reports: 0, - relations: relatedTo !== undefined ? [{ _id: relatedTo._id, _class: relatedTo._class }] : [], - childInfo: [], - kind - } + const lastOne = await client.findOne(tracker.class.Issue, {}, { sort: { rank: SortingOrder.Descending } }) + const incResult = await client.updateDoc( + tracker.class.Project, + core.space.Space, + _space, + { + $inc: { sequence: 1 } + }, + true + ) - await docCreateManager.commit(operations, _id, _space, value) + const value: DocData = { + title: getTitle(object.title), + description: object.description, + assignee: object.assignee, + component: object.component, + milestone: object.milestone, + number: (incResult as any).object.sequence, + status: object.status, + priority: object.priority, + rank: calcRank(lastOne, undefined), + comments: 0, + subIssues: 0, + dueDate: object.dueDate, + parents: + parentIssue != null + ? [ + { parentId: parentIssue._id, parentTitle: parentIssue.title, space: parentIssue.space }, + ...parentIssue.parents + ] + : [], + reportedTime: 0, + remainingTime: 0, + estimation: object.estimation, + reports: 0, + relations: relatedTo !== undefined ? [{ _id: relatedTo._id, _class: relatedTo._class }] : [], + childInfo: [], + kind + } - await operations.addCollection( - tracker.class.Issue, - _space, - parentIssue?._id ?? tracker.ids.NoParent, - parentIssue?._class ?? tracker.class.Issue, - 'subIssues', - value, - _id - ) - for (const label of object.labels) { - await operations.addCollection(label._class, label.space, _id, tracker.class.Issue, 'labels', { - title: label.title, - color: label.color, - tag: label.tag - }) - } + await docCreateManager.commit(operations, _id, _space, value) - if (relatedTo !== undefined) { - const doc = await client.findOne(tracker.class.Issue, { _id }) - if (doc !== undefined) { - if (client.getHierarchy().isDerived(relatedTo._class, tracker.class.Issue)) { - await updateIssueRelation(operations, relatedTo as Issue, doc, 'relations', '$push') - } else { - const update = await getResource(chunter.backreference.Update) - await update(doc, 'relations', [relatedTo], tracker.string.AddedReference) + await operations.addCollection( + tracker.class.Issue, + _space, + parentIssue?._id ?? tracker.ids.NoParent, + parentIssue?._class ?? tracker.class.Issue, + 'subIssues', + value, + _id + ) + for (const label of object.labels) { + await operations.addCollection(label._class, label.space, _id, tracker.class.Issue, 'labels', { + title: label.title, + color: label.color, + tag: label.tag + }) + } + + if (relatedTo !== undefined) { + const doc = await client.findOne(tracker.class.Issue, { _id }) + if (doc !== undefined) { + if (client.getHierarchy().isDerived(relatedTo._class, tracker.class.Issue)) { + await updateIssueRelation(operations, relatedTo as Issue, doc, 'relations', '$push') + } else { + const update = await getResource(chunter.backreference.Update) + await update(doc, 'relations', [relatedTo], tracker.string.AddedReference) + } } } + + await operations.commit() + await descriptionBox.createAttachments(_id) + addNotification( + await translate(tracker.string.IssueCreated, {}, $themeStore.language), + getTitle(object.title), + IssueNotification, + { + issueId: _id, + subTitlePostfix: (await translate(tracker.string.CreatedOne, {}, $themeStore.language)).toLowerCase(), + issueUrl: currentProject != null && generateIssueShortLink(getIssueId(currentProject, value as Issue)) + } + ) + console.log('createIssue measure', await doneOp()) + + draftController.remove() + descriptionBox?.removeDraft(false) + isAssigneeTouched = false + } catch (err: any) { + console.error(err) + await doneOp() // Complete in case of error } - - await operations.commit() - - await descriptionBox.createAttachments(_id) - - addNotification( - await translate(tracker.string.IssueCreated, {}, $themeStore.language), - getTitle(object.title), - IssueNotification, - { - issueId: _id, - subTitlePostfix: (await translate(tracker.string.CreatedOne, {}, $themeStore.language)).toLowerCase(), - issueUrl: currentProject != null && generateIssueShortLink(getIssueId(currentProject, value as Issue)) - } - ) - - draftController.remove() - descriptionBox?.removeDraft(false) - isAssigneeTouched = false } async function setParentIssue (): Promise { diff --git a/plugins/view-resources/src/components/ActionsPopup.svelte b/plugins/view-resources/src/components/ActionsPopup.svelte index 0212286dde..4bb77f7b2e 100644 --- a/plugins/view-resources/src/components/ActionsPopup.svelte +++ b/plugins/view-resources/src/components/ActionsPopup.svelte @@ -157,7 +157,9 @@ async function handleSelection (evt: Event, selection: number): Promise { const item = items[selection] - + if (item == null) { + return + } if (item.item !== undefined) { const doc = item.item.doc void client.findOne(doc._class, { _id: doc._id }).then((value) => { diff --git a/plugins/workbench-resources/src/components/ServerManager.svelte b/plugins/workbench-resources/src/components/ServerManager.svelte index 3516714bce..432f35b385 100644 --- a/plugins/workbench-resources/src/components/ServerManager.svelte +++ b/plugins/workbench-resources/src/components/ServerManager.svelte @@ -1,6 +1,6 @@ {#if data} - Mem: {data.statistics.memoryUsed} / {data.statistics.memoryTotal} CPU: {data.statistics.cpuUsage} +
+ + Mem: {data.statistics.memoryUsed} / {data.statistics.memoryTotal} CPU: {data.statistics.cpuUsage} + + + TotalFind: {totalStats.find} / Total Tx: {totalStats.tx} + +
{/if}
@@ -118,7 +133,7 @@ icon={IconArrowRight} label={getEmbeddedLabel('Set maintenance warning')} on:click={() => { - fetch(endpoint + `/api/v1/manage?token=${token}&operation=maintenance&timeout=${warningTimeout}`, { + void fetch(endpoint + `/api/v1/manage?token=${token}&operation=maintenance&timeout=${warningTimeout}`, { method: 'PUT' }) }} @@ -136,7 +151,7 @@ icon={IconArrowRight} label={getEmbeddedLabel('Reboot server')} on:click={() => { - fetch(endpoint + `/api/v1/manage?token=${token}&operation=reboot`, { + void fetch(endpoint + `/api/v1/manage?token=${token}&operation=reboot`, { method: 'PUT' }) }} @@ -151,92 +166,74 @@ {@const totalTx = act[1].reduce((it, itm) => itm.current.tx + it, 0)} {@const employeeGroups = Array.from(new Set(act[1].map((it) => it.userId)))} -
- Workspace: {act[0]}: {act[1].length} current 5 mins => {totalFind}/{totalTx} -
- -
- {#each employeeGroups as employeeId} - {@const employee = employees.get(employeeId)} - {@const connections = act[1].filter((it) => it.userId === employeeId)} - - {@const find = connections.reduce((it, itm) => itm.current.find + it, 0)} - {@const txes = connections.reduce((it, itm) => itm.current.tx + it, 0)} -
- - -
- {#if employee} - - {:else} - {employeeId} - {/if} - : {connections.length} -
-
{find}/{txes}
-
-
-
- {#each connections as user, i} -
- #{i} - {user.userId} -
- Total: {user.total.find}/{user.total.tx} -
-
- Previous 5 mins: {user.mins5.find}/{user.mins5.tx} -
-
- Current 5 mins: {user.current.find}/{user.current.tx} -
-
-
- {#each Object.entries(user.data ?? {}) as [k, v]} -
- {k}: {JSON.stringify(v)} -
- {/each} -
- {/each} -
+ + +
+ Workspace: {act[0]}: {act[1].length} current 5 mins => {totalFind}/{totalTx}
- {/each} -
+ +
+ {#each employeeGroups as employeeId} + {@const employee = employees.get(employeeId)} + {@const connections = act[1].filter((it) => it.userId === employeeId)} + + {@const find = connections.reduce((it, itm) => itm.current.find + it, 0)} + {@const txes = connections.reduce((it, itm) => itm.current.tx + it, 0)} +
+ + +
+ {#if employee} + + {:else} + {employeeId} + {/if} + : {connections.length} +
+
{find}/{txes}
+
+
+
+ {#each connections as user, i} +
+ #{i} + {user.userId} +
+ Total: {user.total.find}/{user.total.tx} +
+
+ Previous 5 mins: {user.mins5.find}/{user.mins5.tx} +
+
+ Current 5 mins: {user.current.find}/{user.current.tx} +
+
+
+ {#each Object.entries(user.data ?? {}) as [k, v]} +
+ {k}: {JSON.stringify(v)} +
+ {/each} +
+ {/each} +
+
+ {/each} +
+ {/each}
{:else if selectedTab === 'statistics'} - - - - - - - - - - - - {#each metricsToRows(data.metrics, 'System') as row} - - - - - - - {/each} - -
Name
AverageTotalOps
- - {row[1]} - - {row[2]}{row[3]}{row[4]}
-
+
+ {#if metricsData !== undefined} + + {/if} +
{/if} {:else} diff --git a/plugins/workbench-resources/src/components/statistics/MetricsInfo.svelte b/plugins/workbench-resources/src/components/statistics/MetricsInfo.svelte new file mode 100644 index 0000000000..aacc404d1c --- /dev/null +++ b/plugins/workbench-resources/src/components/statistics/MetricsInfo.svelte @@ -0,0 +1,83 @@ + + + + +
+ {name} +
+
+ + +
+ + + {metrics.operations} + + + + + {showAvg(name, metrics.value, metrics.operations)} + + + + + {metrics.value} + + +
+
+
+ {#each Object.entries(metrics.measurements) as [k, v], i} +
+ +
+ {/each} + {#each Object.entries(metrics.params) as [k, v], i} +
+ {#each Object.entries(v).toSorted((a, b) => b[1].value / (b[1].operations + 1) - a[1].value / (a[1].operations + 1)) as [kk, vv]} + + +
+ # {k} = {kk} +
+
+ + +
+ {vv.operations} + {showAvg(kk, vv.value, vv.operations)} + {vv.value} +
+
+
+
+ {/each} +
+ {/each} +
diff --git a/pods/server/src/server.ts b/pods/server/src/server.ts index b4cea8ee2d..76f701245c 100644 --- a/pods/server/src/server.ts +++ b/pods/server/src/server.ts @@ -218,7 +218,7 @@ export function start ( QueryJoinMiddleware.create // Should be last one ] - const metrics = getMetricsContext().newChild('indexing', {}) + const metrics = getMetricsContext() function createIndexStages ( fullText: MeasureContext, workspace: WorkspaceId, @@ -270,6 +270,7 @@ export function start ( } const pipelineFactory: PipelineFactory = (ctx, workspace, upgrade, broadcast) => { + const wsMetrics = metrics.newChild('๐Ÿงฒ ' + workspace.name, {}) const conf: DbConfiguration = { domains: { [DOMAIN_TX]: 'MongoTx', @@ -278,7 +279,7 @@ export function start ( [DOMAIN_FULLTEXT_BLOB]: 'FullTextBlob', [DOMAIN_MODEL]: 'Null' }, - metrics, + metrics: wsMetrics, defaultAdapter: 'Mongo', adapters: { MongoTx: { @@ -310,7 +311,14 @@ export function start ( factory: createElasticAdapter, url: opt.fullTextUrl, stages: (adapter, storage, storageAdapter, contentAdapter) => - createIndexStages(metrics.newChild('stages', {}), workspace, adapter, storage, storageAdapter, contentAdapter) + createIndexStages( + wsMetrics.newChild('stages', {}), + workspace, + adapter, + storage, + storageAdapter, + contentAdapter + ) }, contentAdapters: { Rekoni: { diff --git a/server-plugins/activity-resources/src/index.ts b/server-plugins/activity-resources/src/index.ts index d8c73de9d2..2ad9172cb4 100644 --- a/server-plugins/activity-resources/src/index.ts +++ b/server-plugins/activity-resources/src/index.ts @@ -13,22 +13,23 @@ // limitations under the License. // +import activity, { ActivityMessage, DocUpdateMessage, Reaction } from '@hcengineering/activity' import core, { Account, AttachedDoc, Data, Doc, - matchQuery, + MeasureContext, Ref, Tx, + TxCUD, TxCollectionCUD, TxCreateDoc, - TxCUD, - TxProcessor + TxProcessor, + matchQuery } from '@hcengineering/core' import { ActivityControl, DocObjectCache } from '@hcengineering/server-activity' import type { TriggerControl } from '@hcengineering/server-core' -import activity, { ActivityMessage, DocUpdateMessage, Reaction } from '@hcengineering/activity' import { createCollabDocInfo, createCollaboratorNotifications, @@ -87,6 +88,7 @@ export async function createReactionNotifications ( const messageTx = ( await pushDocUpdateMessages( + control.ctx, control, res as TxCollectionCUD[], parentMessage, @@ -136,6 +138,7 @@ function getDocUpdateMessageTx ( } async function pushDocUpdateMessages ( + ctx: MeasureContext, control: ActivityControl, res: TxCollectionCUD[], object: Doc | undefined, @@ -194,6 +197,7 @@ async function pushDocUpdateMessages ( } export async function generateDocUpdateMessages ( + ctx: MeasureContext, tx: TxCUD, control: ActivityControl, res: TxCollectionCUD[] = [], @@ -241,7 +245,11 @@ export async function generateDocUpdateMessages ( switch (tx._class) { case core.class.TxCreateDoc: { const doc = TxProcessor.createDoc2Doc(tx as TxCreateDoc) - return await pushDocUpdateMessages(control, res, doc, originTx ?? tx, undefined, objectCache) + return await ctx.with( + 'pushDocUpdateMessages', + {}, + async (ctx) => await pushDocUpdateMessages(ctx, control, res, doc, originTx ?? tx, undefined, objectCache) + ) } case core.class.TxMixin: case core.class.TxUpdateDoc: { @@ -249,17 +257,29 @@ export async function generateDocUpdateMessages ( if (doc === undefined) { doc = (await control.findAll(tx.objectClass, { _id: tx.objectId }, { limit: 1 }))[0] } - return await pushDocUpdateMessages(control, res, doc ?? undefined, originTx ?? tx, undefined, objectCache) + return await ctx.with( + 'pushDocUpdateMessages', + {}, + async (ctx) => + await pushDocUpdateMessages(ctx, control, res, doc ?? undefined, originTx ?? tx, undefined, objectCache) + ) } case core.class.TxCollectionCUD: { const actualTx = TxProcessor.extractTx(tx) as TxCUD - res = await generateDocUpdateMessages(actualTx, control, res, tx, objectCache) + res = await generateDocUpdateMessages(ctx, actualTx, control, res, tx, objectCache) if ([core.class.TxCreateDoc, core.class.TxRemoveDoc, core.class.TxUpdateDoc].includes(actualTx._class)) { let doc = objectCache?.docs?.get(tx.objectId) if (doc === undefined) { doc = (await control.findAll(tx.objectClass, { _id: tx.objectId }, { limit: 1 }))[0] } - return await pushDocUpdateMessages(control, res, doc ?? undefined, originTx ?? tx, undefined, objectCache) + if (doc !== undefined) { + return await ctx.with( + 'pushDocUpdateMessages', + {}, + async (ctx) => + await pushDocUpdateMessages(ctx, control, res, doc ?? undefined, originTx ?? tx, undefined, objectCache) + ) + } } return res } @@ -273,10 +293,18 @@ async function ActivityMessagesHandler (tx: TxCUD, control: TriggerControl) return [] } - const txes = await generateDocUpdateMessages(tx, control) + const txes = await control.ctx.with( + 'generateDocUpdateMessages', + {}, + async (ctx) => await generateDocUpdateMessages(ctx, tx, control) + ) const messages = txes.map((messageTx) => TxProcessor.createDoc2Doc(messageTx.tx as TxCreateDoc)) - const notificationTxes = await createCollaboratorNotifications(tx, control, messages) + const notificationTxes = await control.ctx.with( + 'createNotificationTxes', + {}, + async (ctx) => await createCollaboratorNotifications(ctx, tx, control, messages) + ) return [...txes, ...notificationTxes] } diff --git a/server-plugins/notification-resources/src/index.ts b/server-plugins/notification-resources/src/index.ts index 0e92b28fd9..cf9ddb8c23 100644 --- a/server-plugins/notification-resources/src/index.ts +++ b/server-plugins/notification-resources/src/index.ts @@ -28,6 +28,7 @@ import core, { Doc, DocumentUpdate, Hierarchy, + MeasureContext, MixinUpdate, Ref, RefTo, @@ -746,7 +747,7 @@ async function collectionCollabDoc ( activityMessages: ActivityMessage[] ): Promise { const actualTx = TxProcessor.extractTx(tx) as TxCUD - let res = await createCollaboratorNotifications(actualTx, control, activityMessages, tx) + let res = await createCollaboratorNotifications(control.ctx, actualTx, control, activityMessages, tx) if (![core.class.TxCreateDoc, core.class.TxRemoveDoc, core.class.TxUpdateDoc].includes(actualTx._class)) { return res @@ -959,6 +960,7 @@ export async function OnAttributeUpdate (tx: Tx, control: TriggerControl): Promi } export async function createCollaboratorNotifications ( + ctx: MeasureContext, tx: TxCUD, control: TriggerControl, activityMessages: ActivityMessage[], @@ -992,7 +994,7 @@ async function OnChatMessageCreate (tx: TxCollectionCUD, contr const createTx = TxProcessor.extractTx(tx) as TxCreateDoc const message = (await control.findAll(chunter.class.ChatMessage, { _id: createTx.objectId }))[0] - return await createCollaboratorNotifications(tx, control, [message]) + return await createCollaboratorNotifications(control.ctx, tx, control, [message]) } /** diff --git a/server/core/src/pipeline.ts b/server/core/src/pipeline.ts index 9afd89c5c1..275406d0ec 100644 --- a/server/core/src/pipeline.ts +++ b/server/core/src/pipeline.ts @@ -47,14 +47,23 @@ export async function createPipeline ( let broadcastHook: HandledBroadcastFunc = (): Tx[] => { return [] } - const storage = await createServerStorage(conf, { - upgrade, - broadcast: (tx: Tx[], targets?: string[]) => { - const sendTx = broadcastHook?.(tx, targets) ?? tx - broadcast(sendTx, targets) - } - }) - const pipeline = PipelineImpl.create(ctx, storage, constructors, broadcast) + const storage = await ctx.with( + 'create-server-storage', + {}, + async (ctx) => + await createServerStorage(ctx, conf, { + upgrade, + broadcast: (tx: Tx[], targets?: string[]) => { + const sendTx = broadcastHook?.(tx, targets) ?? tx + broadcast(sendTx, targets) + } + }) + ) + const pipeline = ctx.with( + 'create pipeline', + {}, + async (ctx) => await PipelineImpl.create(ctx, storage, constructors, broadcast) + ) const pipelineResult = await pipeline broadcastHook = (tx, targets) => { return pipelineResult.handleBroadcast(tx, targets) @@ -92,7 +101,7 @@ class PipelineImpl implements Pipeline { let current: Middleware | undefined for (let index = constructors.length - 1; index >= 0; index--) { const element = constructors[index] - current = await element(ctx, broadcast, this.storage, current) + current = await ctx.with('build chain', {}, async (ctx) => await element(ctx, broadcast, this.storage, current)) } return current } diff --git a/server/core/src/storage.ts b/server/core/src/storage.ts index a41422284e..729106f4d3 100644 --- a/server/core/src/storage.ts +++ b/server/core/src/storage.ts @@ -20,16 +20,15 @@ import core, { Class, ClassifierKind, Collection, + DOMAIN_DOC_INDEX_STATE, + DOMAIN_MODEL, + DOMAIN_TX, Doc, DocumentQuery, DocumentUpdate, Domain, - DOMAIN_DOC_INDEX_STATE, - DOMAIN_MODEL, - DOMAIN_TX, FindOptions, FindResult, - generateId, Hierarchy, IndexingUpdateEvent, LoadModelResponse, @@ -37,13 +36,16 @@ import core, { Mixin, ModelDb, Ref, + SearchOptions, + SearchQuery, + SearchResult, ServerStorage, StorageIterator, Timestamp, Tx, TxApplyIf, - TxCollectionCUD, TxCUD, + TxCollectionCUD, TxFactory, TxProcessor, TxRemoveDoc, @@ -52,9 +54,7 @@ import core, { TxWorkspaceEvent, WorkspaceEvent, WorkspaceId, - SearchQuery, - SearchOptions, - SearchResult + generateId } from '@hcengineering/core' import { MinioService } from '@hcengineering/minio' import { getResource } from '@hcengineering/platform' @@ -74,7 +74,6 @@ import type { ObjectDDParticipant, TriggerControl } from './types' -import { createFindAll } from './utils' /** * @public @@ -165,12 +164,16 @@ class TServerStorage implements ServerStorage { const adapter = this.getAdapter(lastDomain as Domain) const toDelete = part.filter((it) => it._class === core.class.TxRemoveDoc).map((it) => it.objectId) - const toDeleteDocs = await adapter.load(lastDomain as Domain, toDelete) + const toDeleteDocs = await ctx.with( + 'adapter-load', + { domain: lastDomain }, + async () => await adapter.load(lastDomain as Domain, toDelete) + ) for (const ddoc of toDeleteDocs) { removedDocs.set(ddoc._id, ddoc) } - const r = await adapter.tx(...part) + const r = await ctx.with('adapter-tx', {}, async () => await adapter.tx(...part)) if (Array.isArray(r)) { result.push(...r) } else { @@ -370,13 +373,13 @@ class TServerStorage implements ServerStorage { query: DocumentQuery, options?: FindOptions ): Promise> { - return await ctx.with('find-all', {}, (ctx) => { - const domain = this.hierarchy.getDomain(clazz) - if (query?.$search !== undefined) { - return ctx.with('full-text-find-all', {}, (ctx) => this.fulltext.findAll(ctx, clazz, query, options)) - } - return ctx.with('db-find-all', { d: domain }, () => this.getAdapter(domain).findAll(clazz, query, options)) - }) + const domain = this.hierarchy.getDomain(clazz) + if (query?.$search !== undefined) { + return await ctx.with('client-fulltext-find-all', {}, (ctx) => this.fulltext.findAll(ctx, clazz, query, options)) + } + return await ctx.with('client-find-all', { _class: clazz }, () => + this.getAdapter(domain).findAll(clazz, query, options) + ) } async searchFulltext (ctx: MeasureContext, query: SearchQuery, options: SearchOptions): Promise { @@ -550,13 +553,13 @@ class TServerStorage implements ServerStorage { ): Promise> => findAll(mctx, clazz, query, options) - const removed = await ctx.with('process-remove', {}, () => this.processRemove(ctx, txes, findAll, removedMap)) - const collections = await ctx.with('process-collection', {}, () => + const removed = await ctx.with('process-remove', {}, (ctx) => this.processRemove(ctx, txes, findAll, removedMap)) + const collections = await ctx.with('process-collection', {}, (ctx) => this.processCollection(ctx, txes, findAll, removedMap) ) - const moves = await ctx.with('process-move', {}, () => this.processMove(ctx, txes, findAll)) + const moves = await ctx.with('process-move', {}, (ctx) => this.processMove(ctx, txes, findAll)) - const triggerControl: Omit = { + const triggerControl: Omit = { removedMap, workspace: this.workspace, fx: triggerFx.fx, @@ -572,15 +575,25 @@ class TServerStorage implements ServerStorage { triggerFx.fx(() => f(adapter, this.workspace)) }, findAll: fAll(ctx), + findAllCtx: findAll, modelDb: this.modelDb, hierarchy: this.hierarchy, apply: async (tx, broadcast) => { return await this.apply(ctx, tx, broadcast) + }, + applyCtx: async (ctx, tx, broadcast) => { + return await this.apply(ctx, tx, broadcast) } } const triggers = await ctx.with('process-triggers', {}, async (ctx) => { const result: Tx[] = [] - result.push(...(await this.triggers.apply(ctx, txes, triggerControl))) + result.push( + ...(await this.triggers.apply(ctx, txes, { + ...triggerControl, + ctx, + findAll: fAll(ctx) + })) + ) return result }) @@ -685,7 +698,18 @@ class TServerStorage implements ServerStorage { async processTxes (ctx: MeasureContext, txes: Tx[]): Promise<[TxResult, Tx[]]> { // store tx - const _findAll = createFindAll(this) + const _findAll: ServerStorage['findAll'] = async ( + ctx: MeasureContext, + clazz: Ref>, + query: DocumentQuery, + options?: FindOptions + ): Promise> => { + const domain = this.hierarchy.getDomain(clazz) + if (query?.$search !== undefined) { + return await ctx.with('full-text-find-all', {}, (ctx) => this.fulltext.findAll(ctx, clazz, query, options)) + } + return await ctx.with('find-all', { _class: clazz }, () => this.getAdapter(domain).findAll(clazz, query, options)) + } const txToStore: Tx[] = [] const modelTx: Tx[] = [] const applyTxes: Tx[] = [] @@ -748,7 +772,7 @@ class TServerStorage implements ServerStorage { } async tx (ctx: MeasureContext, tx: Tx): Promise<[TxResult, Tx[]]> { - return await this.processTxes(ctx, [tx]) + return await ctx.with('client-tx', { _class: tx._class }, async (ctx) => await this.processTxes(ctx, [tx])) } find (domain: Domain): StorageIterator { @@ -801,6 +825,7 @@ export interface ServerStorageOptions { * @public */ export async function createServerStorage ( + ctx: MeasureContext, conf: DbConfiguration, options: ServerStorageOptions ): Promise { @@ -809,63 +834,65 @@ export async function createServerStorage ( const adapters = new Map() const modelDb = new ModelDb(hierarchy) - console.timeLog(conf.workspace.name, 'create server storage') const storageAdapter = conf.storageFactory?.() for (const key in conf.adapters) { const adapterConf = conf.adapters[key] adapters.set(key, await adapterConf.factory(hierarchy, adapterConf.url, conf.workspace, modelDb, storageAdapter)) - console.timeLog(conf.workspace.name, 'adapter', key) } const txAdapter = adapters.get(conf.domains[DOMAIN_TX]) as TxAdapter - if (txAdapter === undefined) { - console.log('no txadapter found') - } - console.timeLog(conf.workspace.name, 'begin get model') - const model = await txAdapter.getModel() - console.timeLog(conf.workspace.name, 'get model') - for (const tx of model) { - try { - hierarchy.tx(tx) - await triggers.tx(tx) - } catch (err: any) { - console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) + const model = await ctx.with('get model', {}, async (ctx) => { + const model = await txAdapter.getModel() + for (const tx of model) { + try { + hierarchy.tx(tx) + await triggers.tx(tx) + } catch (err: any) { + console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) + } } - } - console.timeLog(conf.workspace.name, 'finish hierarchy') - - for (const tx of model) { - try { - await modelDb.tx(tx) - } catch (err: any) { - console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) + for (const tx of model) { + try { + await modelDb.tx(tx) + } catch (err: any) { + console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) + } } - } - console.timeLog(conf.workspace.name, 'finish local model') + return model + }) for (const [adn, adapter] of adapters) { - await adapter.init(model) - console.timeLog(conf.workspace.name, 'finish init adapter', adn) + await ctx.with('init-adapter', { name: adn }, async (ctx) => { + await adapter.init(model) + }) } - const fulltextAdapter = await conf.fulltextAdapter.factory( - conf.fulltextAdapter.url, - conf.workspace, - conf.metrics.newChild('fulltext', {}) + const fulltextAdapter = await ctx.with( + 'create full text adapter', + {}, + async (ctx) => + await conf.fulltextAdapter.factory( + conf.fulltextAdapter.url, + conf.workspace, + conf.metrics.newChild('๐Ÿ—’๏ธ fulltext', {}) + ) ) - console.timeLog(conf.workspace.name, 'finish fulltext adapter') - const metrics = conf.metrics.newChild('server-storage', {}) + const metrics = conf.metrics.newChild('๐Ÿ“” server-storage', {}) - const contentAdapter = await createContentAdapter( - conf.contentAdapters, - conf.defaultContentAdapter, - conf.workspace, - metrics.newChild('content', {}) + const contentAdapter = await ctx.with( + 'create content adapter', + {}, + async (ctx) => + await createContentAdapter( + conf.contentAdapters, + conf.defaultContentAdapter, + conf.workspace, + metrics.newChild('content', {}) + ) ) - console.timeLog(conf.workspace.name, 'finish content adapter') const defaultAdapter = adapters.get(conf.defaultAdapter) if (defaultAdapter === undefined) { @@ -877,7 +904,6 @@ export async function createServerStorage ( throw new Error('No storage adapter') } const stages = conf.fulltextAdapter.stages(fulltextAdapter, storage, storageAdapter, contentAdapter) - console.timeLog(conf.workspace.name, 'finish index pipeline stages') const indexer = new FullTextIndexPipeline( defaultAdapter, @@ -903,7 +929,6 @@ export async function createServerStorage ( options.broadcast?.([tx]) } ) - console.timeLog(conf.workspace.name, 'finish create indexer') return new FullTextIndex( hierarchy, fulltextAdapter, diff --git a/server/core/src/triggers.ts b/server/core/src/triggers.ts index e269c0f46a..04e643bdea 100644 --- a/server/core/src/triggers.ts +++ b/server/core/src/triggers.ts @@ -70,7 +70,17 @@ export class Triggers { if (matches.length > 0) { await ctx.with(resource, {}, async (ctx) => { for (const tx of matches) { - result.push(...(await trigger(tx, { ...ctrl, txFactory: new TxFactory(tx.modifiedBy, true) }))) + result.push( + ...(await trigger(tx, { + ...ctrl, + ctx, + txFactory: new TxFactory(tx.modifiedBy, true), + findAll: async (clazz, query, options) => await ctrl.findAllCtx(ctx, clazz, query, options), + apply: async (tx, broadcast) => { + return await ctrl.applyCtx(ctx, tx, broadcast) + } + })) + ) } }) } diff --git a/server/core/src/types.ts b/server/core/src/types.ts index 05cc8cf5df..770684eda5 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -113,13 +113,23 @@ export interface Pipeline extends LowLevelStorage { * @public */ export interface TriggerControl { + ctx: MeasureContext workspace: WorkspaceId txFactory: TxFactory findAll: Storage['findAll'] + findAllCtx: ( + ctx: MeasureContext, + _class: Ref>, + query: DocumentQuery, + options?: FindOptions + ) => Promise> hierarchy: Hierarchy modelDb: ModelDb removedMap: Map, Doc> + // // An object cache, + // getCachedObject: (_class: Ref>, _id: Ref) => Promise + fulltextFx: (f: (adapter: FullTextAdapter) => Promise) => void // Since we don't have other storages let's consider adapter is MinioClient // Later can be replaced with generic one with bucket encapsulated inside. @@ -128,6 +138,7 @@ export interface TriggerControl { // Bulk operations in case trigger require some apply: (tx: Tx[], broadcast: boolean) => Promise + applyCtx: (ctx: MeasureContext, tx: Tx[], broadcast: boolean) => Promise } /** diff --git a/server/core/src/utils.ts b/server/core/src/utils.ts deleted file mode 100644 index 8977cf5b0a..0000000000 --- a/server/core/src/utils.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { - Class, - Doc, - DocumentQuery, - FindOptions, - FindResult, - MeasureContext, - Ref, - ServerStorage -} from '@hcengineering/core' - -/** - * @public - */ -export function createFindAll (storage: ServerStorage): ServerStorage['findAll'] { - return async ( - ctx: MeasureContext, - clazz: Ref>, - query: DocumentQuery, - options?: FindOptions - ): Promise> => { - return await storage.findAll(ctx, clazz, query, options) - } -} diff --git a/server/middleware/src/spaceSecurity.ts b/server/middleware/src/spaceSecurity.ts index ec330d1e1e..269c9c3331 100644 --- a/server/middleware/src/spaceSecurity.ts +++ b/server/middleware/src/spaceSecurity.ts @@ -78,7 +78,9 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar next?: Middleware ): Promise { const res = new SpaceSecurityMiddleware(broadcast, storage, next) - await res.init(ctx) + await ctx.with('space chain', {}, async (ctx) => { + await res.init(ctx) + }) return res } diff --git a/server/mongo/src/__tests__/storage.test.ts b/server/mongo/src/__tests__/storage.test.ts index 67caae17de..97c5bd09c8 100644 --- a/server/mongo/src/__tests__/storage.test.ts +++ b/server/mongo/src/__tests__/storage.test.ts @@ -160,8 +160,8 @@ describe('mongo operations', () => { workspace: getWorkspaceId(dbId, ''), storageFactory: () => createNullStorageFactory() } - const serverStorage = await createServerStorage(conf, { upgrade: false }) const ctx = new MeasureMetricsContext('client', {}) + const serverStorage = await createServerStorage(ctx, conf, { upgrade: false }) client = await createClient(async (handler) => { const st: ClientConnection = { findAll: async (_class, query, options) => await serverStorage.findAll(ctx, _class, query, options), @@ -174,7 +174,8 @@ describe('mongo operations', () => { upload: async (domain: Domain, docs: Doc[]) => {}, clean: async (domain: Domain, docs: Ref[]) => {}, loadModel: async () => txes, - getAccount: async () => ({}) as any + getAccount: async () => ({}) as any, + measure: async () => async () => ({ time: 0, serverTime: 0 }) } return st }) diff --git a/server/server/src/apm.ts b/server/server/src/apm.ts index 2b40119805..b232e19fab 100644 --- a/server/server/src/apm.ts +++ b/server/server/src/apm.ts @@ -80,14 +80,20 @@ export class APMMeasureContext implements MeasureContext { } } - async error (err: any): Promise { + async error (message: string, ...args: any[]): Promise { + this.logger.error(message, args) + await new Promise((resolve) => { - this.agent.captureError(err, () => { + this.agent.captureError({ message, params: args }, () => { resolve() }) }) } + async info (message: string, ...args: any[]): Promise { + this.logger.info(message, args) + } + end (): void { this.transaction?.end() } diff --git a/server/server/src/metrics.ts b/server/server/src/metrics.ts index 488dfc4009..16dc49c5a9 100644 --- a/server/server/src/metrics.ts +++ b/server/server/src/metrics.ts @@ -4,6 +4,7 @@ import { writeFile } from 'fs/promises' const apmUrl = process.env.APM_SERVER_URL const metricsFile = process.env.METRICS_FILE +// const logsRoot = process.env.LOGS_ROOT const metricsConsole = (process.env.METRICS_CONSOLE ?? 'false') === 'true' const METRICS_UPDATE_INTERVAL = !metricsConsole ? 1000 : 60000 diff --git a/server/tool/src/index.ts b/server/tool/src/index.ts index 0ea39e4732..105b203da9 100644 --- a/server/tool/src/index.ts +++ b/server/tool/src/index.ts @@ -135,7 +135,7 @@ export async function initModel ( const result = await db.collection(DOMAIN_TX).insertMany(model as Document[]) logger.log(`${result.insertedCount} model transactions inserted.`) - logger.log('creating data...') + logger.log('creating data...', transactorUrl) const connection = (await connect(transactorUrl, workspaceId, undefined, { model: 'upgrade' })) as unknown as CoreClient & BackupClient diff --git a/server/ws/src/client.ts b/server/ws/src/client.ts index ce146f1e98..b73e3f1630 100644 --- a/server/ws/src/client.ts +++ b/server/ws/src/client.ts @@ -56,6 +56,7 @@ export class ClientSession implements Session { total: StatisticsElement = { find: 0, tx: 0 } current: StatisticsElement = { find: 0, tx: 0 } mins5: StatisticsElement = { find: 0, tx: 0 } + measures: { id: string, message: string, time: 0 }[] = [] constructor ( protected readonly broadcast: BroadcastCall, diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index ec436dafa4..ae4d88163e 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -26,7 +26,7 @@ import core, { type WorkspaceId } from '@hcengineering/core' import { unknownError } from '@hcengineering/platform' -import { readRequest, type HelloRequest, type HelloResponse, type Response } from '@hcengineering/rpc' +import { readRequest, type HelloRequest, type HelloResponse, type Request, type Response } from '@hcengineering/rpc' import type { Pipeline, SessionContext } from '@hcengineering/server-core' import { type Token } from '@hcengineering/server-token' // import WebSocket, { RawData } from 'ws' @@ -155,14 +155,14 @@ class TSessionManager implements SessionManager { } async addSession ( - ctx: MeasureContext, + baseCtx: MeasureContext, ws: ConnectionSocket, token: Token, pipelineFactory: PipelineFactory, productId: string, sessionId?: string - ): Promise { - return await ctx.with('add-session', {}, async (ctx) => { + ): Promise<{ session: Session, context: MeasureContext } | { upgrade: true }> { + return await baseCtx.with('๐Ÿ“ฒ add-session', {}, async (ctx) => { const wsString = toWorkspaceString(token.workspace, '@') let workspace = this.workspaces.get(wsString) @@ -170,22 +170,29 @@ class TSessionManager implements SessionManager { workspace = this.workspaces.get(wsString) if (workspace === undefined) { - workspace = this.createWorkspace(ctx, pipelineFactory, token) + workspace = this.createWorkspace(baseCtx, pipelineFactory, token) } let pipeline: Pipeline if (token.extra?.model === 'upgrade') { if (workspace.upgrade) { - pipeline = await ctx.with('pipeline', {}, async () => await (workspace as Workspace).pipeline) + pipeline = await ctx.with( + '๐Ÿ’ค wait ' + token.workspace.name, + {}, + async () => await (workspace as Workspace).pipeline + ) } else { pipeline = await this.createUpgradeSession(token, sessionId, ctx, wsString, workspace, pipelineFactory, ws) } } else { if (workspace.upgrade) { - ws.close() - throw new Error('Upgrade in progress....') + return { upgrade: true } } - pipeline = await ctx.with('pipeline', {}, async () => await (workspace as Workspace).pipeline) + pipeline = await ctx.with( + '๐Ÿ’ค wait ' + token.workspace.name, + {}, + async () => await (workspace as Workspace).pipeline + ) } const session = this.createSession(token, pipeline) @@ -204,7 +211,7 @@ class TSessionManager implements SessionManager { session.useCompression ) } - return session + return { session, context: workspace.context } }) } @@ -222,7 +229,7 @@ class TSessionManager implements SessionManager { } // If upgrade client is used. // Drop all existing clients - await this.closeAll(ctx, wsString, workspace, 0, 'upgrade') + await this.closeAll(wsString, workspace, 0, 'upgrade') // Wipe workspace and update values. if (!workspace.upgrade) { // This is previous workspace, intended to be closed. @@ -238,10 +245,10 @@ class TSessionManager implements SessionManager { } broadcastAll (workspace: Workspace, tx: Tx[], targets?: string[]): void { - if (workspace?.upgrade ?? false) { + if (workspace.upgrade) { return } - const ctx = this.ctx.newChild('broadcast-all', {}) + const ctx = this.ctx.newChild('๐Ÿ“ฌ broadcast-all', {}) const sessions = [...workspace.sessions.values()] function send (): void { for (const session of sessions.splice(0, 1)) { @@ -266,9 +273,11 @@ class TSessionManager implements SessionManager { private createWorkspace (ctx: MeasureContext, pipelineFactory: PipelineFactory, token: Token): Workspace { const upgrade = token.extra?.model === 'upgrade' + const context = ctx.newChild('๐Ÿงฒ ' + token.workspace.name, {}) const workspace: Workspace = { + context, id: generateId(), - pipeline: pipelineFactory(ctx, token.workspace, upgrade, (tx, targets) => { + pipeline: pipelineFactory(context, token.workspace, upgrade, (tx, targets) => { this.broadcastAll(workspace, tx, targets) }), sessions: new Map(), @@ -309,13 +318,7 @@ class TSessionManager implements SessionManager { } catch {} } - async close ( - ctx: MeasureContext, - ws: ConnectionSocket, - workspaceId: WorkspaceId, - code: number, - reason: string - ): Promise { + async close (ws: ConnectionSocket, workspaceId: WorkspaceId, code: number, reason: string): Promise { // if (LOGGING_ENABLED) console.log(workspaceId.name, `closing websocket, code: ${code}, reason: ${reason}`) const wsid = toWorkspaceString(workspaceId) const workspace = this.workspaces.get(wsid) @@ -340,7 +343,7 @@ class TSessionManager implements SessionManager { const user = sessionRef.session.getUser() const another = Array.from(workspace.sessions.values()).findIndex((p) => p.session.getUser() === user) if (another === -1) { - await this.setStatus(ctx, sessionRef.session, false) + await this.setStatus(workspace.context, sessionRef.session, false) } if (!workspace.upgrade) { // Wait some time for new client to appear before closing workspace. @@ -355,13 +358,7 @@ class TSessionManager implements SessionManager { } } - async closeAll ( - ctx: MeasureContext, - wsId: string, - workspace: Workspace, - code: number, - reason: 'upgrade' | 'shutdown' - ): Promise { + async closeAll (wsId: string, workspace: Workspace, code: number, reason: 'upgrade' | 'shutdown'): Promise { if (LOGGING_ENABLED) console.timeLog(wsId, `closing workspace ${workspace.id}, code: ${code}, reason: ${reason}`) const sessions = Array.from(workspace.sessions) @@ -371,19 +368,10 @@ class TSessionManager implements SessionManager { s.workspaceClosed = true if (reason === 'upgrade') { // Override message handler, to wait for upgrading response from clients. - await webSocket.send( - ctx, - { - result: { - _class: core.class.TxModelUpgrade - } - }, - s.binaryResponseMode, - false - ) + await this.sendUpgrade(workspace.context, webSocket, s.binaryResponseMode) } webSocket.close() - await this.setStatus(ctx, s, false) + await this.setStatus(workspace.context, s, false) } if (LOGGING_ENABLED) console.timeLog(wsId, workspace.id, 'Clients disconnected. Closing Workspace...') @@ -403,12 +391,25 @@ class TSessionManager implements SessionManager { console.timeEnd(wsId) } + private async sendUpgrade (ctx: MeasureContext, webSocket: ConnectionSocket, binary: boolean): Promise { + await webSocket.send( + ctx, + { + result: { + _class: core.class.TxModelUpgrade + } + }, + binary, + false + ) + } + async closeWorkspaces (ctx: MeasureContext): Promise { if (this.checkInterval !== undefined) { clearInterval(this.checkInterval) } for (const w of this.workspaces) { - await this.closeAll(ctx, w[0], w[1], 1, 'shutdown') + await this.closeAll(w[0], w[1], 1, 'shutdown') } } @@ -432,6 +433,7 @@ class TSessionManager implements SessionManager { if (this.workspaces.get(wsid)?.id === wsUID) { this.workspaces.delete(wsid) } + workspace.context.end() if (LOGGING_ENABLED) { console.timeLog(workspaceId.name, 'Closed workspace', wsUID) } @@ -459,7 +461,7 @@ class TSessionManager implements SessionManager { if (LOGGING_ENABLED) console.log(workspaceId.name, `server broadcasting to ${workspace.sessions.size} clients...`) const sessions = [...workspace.sessions.values()] - const ctx = this.ctx.newChild('broadcast', {}) + const ctx = this.ctx.newChild('๐Ÿ“ญ broadcast', {}) function send (): void { for (const sessionRef of sessions.splice(0, 1)) { if (sessionRef.session.sessionId !== from?.sessionId) { @@ -496,7 +498,7 @@ class TSessionManager implements SessionManager { msg: any, workspace: string ): Promise { - const userCtx = requestCtx.newChild('client', { workspace }) as SessionContext + const userCtx = requestCtx.newChild('๐Ÿ“ž client', {}) as SessionContext userCtx.sessionId = service.sessionInstanceId ?? '' // Calculate total number of clients @@ -504,8 +506,8 @@ class TSessionManager implements SessionManager { const st = Date.now() try { - await userCtx.with('handleRequest', {}, async (ctx) => { - const request = await ctx.with('read', {}, async () => readRequest(msg, false)) + await userCtx.with('๐Ÿงญ handleRequest', {}, async (ctx) => { + const request = await ctx.with('๐Ÿ“ฅ read', {}, async () => readRequest(msg, false)) if (request.id === -1 && request.method === 'hello') { const hello = request as HelloRequest service.binaryResponseMode = hello.binary ?? false @@ -536,6 +538,10 @@ class TSessionManager implements SessionManager { await ws.send(ctx, helloResponse, false, false) return } + if (request.method === 'measure' || request.method === 'measure-done') { + await this.handleMeasure(service, request, ctx, ws) + return + } service.requests.set(reqId, { id: reqId, params: request, @@ -545,10 +551,15 @@ class TSessionManager implements SessionManager { ws.close() return } + const f = (service as any)[request.method] try { const params = [...request.params] - const result = await ctx.with('call', {}, async (callTx) => f.apply(service, [callTx, ...params])) + + const result = + service.measureCtx?.ctx !== undefined + ? await f.apply(service, [service.measureCtx?.ctx, ...params]) + : await ctx.with('๐Ÿงจ process', {}, async (callTx) => f.apply(service, [callTx, ...params])) const resp: Response = { id: request.id, result } @@ -575,6 +586,43 @@ class TSessionManager implements SessionManager { service.requests.delete(reqId) } } + + private async handleMeasure( + service: S, + request: Request, + ctx: MeasureContext, + ws: ConnectionSocket + ): Promise { + let serverTime = 0 + if (request.method === 'measure') { + service.measureCtx = { ctx: ctx.newChild('๐Ÿ“ถ ' + request.params[0], {}), time: Date.now() } + } else { + if (service.measureCtx !== undefined) { + serverTime = Date.now() - service.measureCtx.time + service.measureCtx.ctx.end(serverTime) + } + } + try { + const resp: Response = { id: request.id, result: request.method === 'measure' ? 'started' : serverTime } + + await handleSend( + ctx, + ws, + resp, + this.sessions.size < 100 ? 10000 : 1001, + service.binaryResponseMode, + service.useCompression + ) + } catch (err: any) { + if (LOGGING_ENABLED) console.error(err) + const resp: Response = { + id: request.id, + error: unknownError(err), + result: JSON.parse(JSON.stringify(err?.stack)) + } + await ws.send(ctx, resp, service.binaryResponseMode, service.useCompression) + } + } } async function handleSend ( diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index 1dde35e9cd..8df3fc95c7 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -169,11 +169,11 @@ export function startHttpServer ( if (ws.readyState !== ws.OPEN) { return } - const smsg = await ctx.with('serialize', {}, async () => serialize(msg, binary)) + const smsg = await ctx.with('๐Ÿ“ฆ serialize', {}, async () => serialize(msg, binary)) ctx.measure('send-data', smsg.length) - await ctx.with('socket-send', {}, async (ctx) => { + await ctx.with('๐Ÿ“ค socket-send', {}, async (ctx) => { await new Promise((resolve, reject) => { ws.send(smsg, { binary, compress: compression }, (err) => { if (err != null) { @@ -191,6 +191,10 @@ export function startHttpServer ( buffer?.push(msg) }) const session = await sessions.addSession(ctx, cs, token, pipelineFactory, productId, sessionId) + if ('upgrade' in session) { + cs.close() + return + } // eslint-disable-next-line @typescript-eslint/no-misused-promises ws.on('message', (msg: RawData) => { let buff: any | undefined @@ -200,22 +204,21 @@ export function startHttpServer ( buff = Buffer.concat(msg).toString() } if (buff !== undefined) { - void handleRequest(ctx, session, cs, buff, token.workspace.name) + void handleRequest(session.context, session.session, cs, buff, token.workspace.name) } }) // eslint-disable-next-line @typescript-eslint/no-misused-promises ws.on('close', (code: number, reason: Buffer) => { - if (session.workspaceClosed ?? false) { + if (session.session.workspaceClosed ?? false) { return } // remove session after 1seconds, give a time to reconnect. - // if (LOGGING_ENABLED) console.log(token.workspace.name, `client "${token.email}" closed ${code === 1000 ? 'normally' : 'abnormally'}`) - void sessions.close(ctx, cs, token.workspace, code, reason.toString()) + void sessions.close(cs, token.workspace, code, reason.toString()) }) const b = buffer buffer = undefined for (const msg of b) { - await handleRequest(ctx, session, cs, msg, token.workspace.name) + await handleRequest(session.context, session.session, cs, msg, token.workspace.name) } }) @@ -226,7 +229,6 @@ export function startHttpServer ( try { const payload = decodeToken(token ?? '') const sessionId = url.searchParams.get('sessionId') - // if (LOGGING_ENABLED) console.log(payload.workspace.name, 'client connected with payload', payload, sessionId) if (payload.workspace.productId !== productId) { throw new Error('Invalid workspace product') diff --git a/server/ws/src/types.ts b/server/ws/src/types.ts index f747d0d5e0..4e138cf5d9 100644 --- a/server/ws/src/types.ts +++ b/server/ws/src/types.ts @@ -59,6 +59,8 @@ export interface Session { total: StatisticsElement current: StatisticsElement mins5: StatisticsElement + + measureCtx?: { ctx: MeasureContext, time: number } } /** @@ -107,6 +109,7 @@ export function disableLogging (): void { * @public */ export interface Workspace { + context: MeasureContext id: string pipeline: Promise sessions: Map @@ -130,25 +133,13 @@ export interface SessionManager { pipelineFactory: PipelineFactory, productId: string, sessionId?: string - ) => Promise + ) => Promise<{ session: Session, context: MeasureContext } | { upgrade: true }> broadcastAll: (workspace: Workspace, tx: Tx[], targets?: string[]) => void - close: ( - ctx: MeasureContext, - ws: ConnectionSocket, - workspaceId: WorkspaceId, - code: number, - reason: string - ) => Promise + close: (ws: ConnectionSocket, workspaceId: WorkspaceId, code: number, reason: string) => Promise - closeAll: ( - ctx: MeasureContext, - wsId: string, - workspace: Workspace, - code: number, - reason: 'upgrade' | 'shutdown' - ) => Promise + closeAll: (wsId: string, workspace: Workspace, code: number, reason: 'upgrade' | 'shutdown') => Promise closeWorkspaces: (ctx: MeasureContext) => Promise