UBERF-4319: Performance changes (#4474)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-01-30 18:07:34 +07:00 committed by GitHub
parent d02e88737d
commit e6a35d2a03
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 775 additions and 436 deletions

View File

@ -34,7 +34,8 @@ import core, {
TxResult, TxResult,
SearchQuery, SearchQuery,
SearchOptions, SearchOptions,
SearchResult SearchResult,
MeasureDoneOperation
} from '@hcengineering/core' } from '@hcengineering/core'
import { createInMemoryTxAdapter } from '@hcengineering/dev-storage' import { createInMemoryTxAdapter } from '@hcengineering/dev-storage'
import devmodel from '@hcengineering/devmodel' import devmodel from '@hcengineering/devmodel'
@ -104,6 +105,10 @@ class ServerStorageWrapper implements ClientConnection {
async upload (domain: Domain, docs: Doc[]): Promise<void> {} async upload (domain: Domain, docs: Doc[]): Promise<void> {}
async clean (domain: Domain, docs: Ref<Doc>[]): Promise<void> {} async clean (domain: Domain, docs: Ref<Doc>[]): Promise<void> {}
async measure (operationName: string): Promise<MeasureDoneOperation> {
return async () => ({ time: 0, serverTime: 0 })
}
} }
async function createNullFullTextAdapter (): Promise<FullTextAdapter> { async function createNullFullTextAdapter (): Promise<FullTextAdapter> {
@ -152,7 +157,8 @@ export async function connect (handler: (tx: Tx) => void): Promise<ClientConnect
defaultContentAdapter: 'default', defaultContentAdapter: 'default',
workspace: getWorkspaceId('') workspace: getWorkspaceId('')
} }
const serverStorage = await createServerStorage(conf, { const ctx = new MeasureMetricsContext('client', {})
const serverStorage = await createServerStorage(ctx, conf, {
upgrade: false upgrade: false
}) })
setMetadata(devmodel.metadata.DevModel, serverStorage) setMetadata(devmodel.metadata.DevModel, serverStorage)

View File

@ -23,6 +23,7 @@ import core, {
Doc, Doc,
DocumentUpdate, DocumentUpdate,
MeasureMetricsContext, MeasureMetricsContext,
Metrics,
Ref, Ref,
TxOperations, TxOperations,
WorkspaceId, WorkspaceId,
@ -170,13 +171,32 @@ export async function benchmark (
let running = false let running = false
function extract (metrics: Metrics, ...path: string[]): Metrics | null {
let m = metrics
for (const p of path) {
let found = false
for (const [k, v] of Object.entries(m.measurements)) {
if (k.includes(p)) {
m = v
found = true
break
}
}
if (!found) {
return null
}
}
return m
}
let timer: any let timer: any
if (isMainThread) { if (isMainThread) {
timer = setInterval(() => { timer = setInterval(() => {
const st = Date.now() const st = Date.now()
try { try {
void fetch(transactorUrl.replace('ws:/', 'http:/') + '/' + token) const fetchUrl = transactorUrl.replace('ws:/', 'http:/') + '/api/v1/statistics?token=' + token
void fetch(fetchUrl)
.then((res) => { .then((res) => {
void res void res
.json() .json()
@ -184,15 +204,17 @@ export async function benchmark (
memUsed = json.statistics.memoryUsed memUsed = json.statistics.memoryUsed
memTotal = json.statistics.memoryTotal memTotal = json.statistics.memoryTotal
cpu = json.statistics.cpuUsage cpu = json.statistics.cpuUsage
const r = operations = 0
json.metrics?.measurements?.client?.measurements?.handleRequest?.measurements?.call?.measurements?.[ requestTime = 0
'find-all' transfer = 0
] for (const w of workspaceId) {
operations = r?.operations ?? 0 const r = extract(json.metrics as Metrics, w.name, 'client', 'handleRequest', 'process', 'find-all')
requestTime = (r?.value ?? 0) / (((r?.operations as number) ?? 0) + 1) operations += r?.operations ?? 0
transfer = requestTime += (r?.value ?? 0) / (((r?.operations as number) ?? 0) + 1)
json.metrics?.measurements?.client?.measurements?.handleRequest?.measurements?.['#send-data']
?.value ?? 0 const tr = extract(json.metrics as Metrics, w.name, 'client', 'handleRequest', '#send-data')
transfer += tr?.value ?? 0
}
}) })
.catch((err) => { .catch((err) => {
console.log(err) console.log(err)

View File

@ -562,7 +562,7 @@ export function devTool (
program program
.command('benchmark') .command('benchmark')
.description('clean archived spaces') .description('benchmark')
.option('--from <from>', 'Min client count', '10') .option('--from <from>', 'Min client count', '10')
.option('--steps <steps>', 'Step with client count', '10') .option('--steps <steps>', 'Step with client count', '10')
.option('--sleep <sleep>', 'Random Delay max between operations', '0') .option('--sleep <sleep>', 'Random Delay max between operations', '0')

View File

@ -13,30 +13,31 @@
// limitations under the License. // limitations under the License.
// //
import activity, { type DocUpdateMessage } from '@hcengineering/activity'
import core, { import core, {
MeasureMetricsContext,
SortingOrder,
TxFactory,
TxProcessor,
toFindResult,
toIdMap,
type AttachedDoc, type AttachedDoc,
type Class, type Class,
type Doc, type Doc,
type Ref, type Ref,
type TxCUD, type TxCUD,
type TxCollectionCUD, type TxCollectionCUD,
TxProcessor,
toIdMap,
SortingOrder,
TxFactory,
toFindResult,
type TxCreateDoc type TxCreateDoc
} from '@hcengineering/core' } from '@hcengineering/core'
import activity, { type DocUpdateMessage } from '@hcengineering/activity'
import { tryMigrate, type MigrateOperation, type MigrationClient, type MigrationIterator } from '@hcengineering/model' import { tryMigrate, type MigrateOperation, type MigrationClient, type MigrationIterator } from '@hcengineering/model'
import { DOMAIN_ACTIVITY } from '@hcengineering/model-activity'
import { import {
type ActivityControl,
type DocObjectCache,
getAllObjectTransactions, getAllObjectTransactions,
serverActivityId serverActivityId,
type ActivityControl,
type DocObjectCache
} from '@hcengineering/server-activity' } from '@hcengineering/server-activity'
import { generateDocUpdateMessages } from '@hcengineering/server-activity-resources' import { generateDocUpdateMessages } from '@hcengineering/server-activity-resources'
import { DOMAIN_ACTIVITY } from '@hcengineering/model-activity'
function getActivityControl (client: MigrationClient): ActivityControl { function getActivityControl (client: MigrationClient): ActivityControl {
const txFactory = new TxFactory(core.account.System, false) const txFactory = new TxFactory(core.account.System, false)
@ -66,7 +67,14 @@ async function generateDocUpdateMessageByTx (
return return
} }
const createCollectionCUDTxes = await generateDocUpdateMessages(tx, control, undefined, undefined, objectCache) const createCollectionCUDTxes = await generateDocUpdateMessages(
new MeasureMetricsContext('migration', {}),
tx,
control,
undefined,
undefined,
objectCache
)
for (const collectionTx of createCollectionCUDTxes) { for (const collectionTx of createCollectionCUDTxes) {
const createTx = collectionTx.tx as TxCreateDoc<DocUpdateMessage> const createTx = collectionTx.tx as TxCreateDoc<DocUpdateMessage>

View File

@ -119,7 +119,10 @@ describe('client', () => {
upload: async (domain: Domain, docs: Doc[]) => {}, upload: async (domain: Domain, docs: Doc[]) => {},
clean: async (domain: Domain, docs: Ref<Doc>[]) => {}, clean: async (domain: Domain, docs: Ref<Doc>[]) => {},
loadModel: async (last: Timestamp) => txes, loadModel: async (last: Timestamp) => txes,
getAccount: async () => null as unknown as Account getAccount: async () => null as unknown as Account,
measure: async () => {
return async () => ({ time: 0, serverTime: 0 })
}
} }
} }
const spyCreate = jest.spyOn(TxProcessor, 'createDoc2Doc') const spyCreate = jest.spyOn(TxProcessor, 'createDoc2Doc')

View File

@ -71,6 +71,7 @@ export async function connect (handler: (tx: Tx) => void): Promise<ClientConnect
upload: async (domain: Domain, docs: Doc[]) => {}, upload: async (domain: Domain, docs: Doc[]) => {},
clean: async (domain: Domain, docs: Ref<Doc>[]) => {}, clean: async (domain: Domain, docs: Ref<Doc>[]) => {},
loadModel: async (last: Timestamp) => txes, loadModel: async (last: Timestamp) => txes,
getAccount: async () => null as unknown as Account getAccount: async () => null as unknown as Account,
measure: async () => async () => ({ time: 0, serverTime: 0 })
} }
} }

View File

@ -19,8 +19,8 @@ import { Account, AttachedDoc, Class, DOMAIN_MODEL, Doc, Domain, PluginConfigura
import core from './component' import core from './component'
import { Hierarchy } from './hierarchy' import { Hierarchy } from './hierarchy'
import { ModelDb } from './memdb' import { ModelDb } from './memdb'
import type { DocumentQuery, FindOptions, FindResult, Storage, FulltextStorage, TxResult, WithLookup } from './storage' import type { DocumentQuery, FindOptions, FindResult, FulltextStorage, Storage, TxResult, WithLookup } from './storage'
import { SortingOrder, SearchQuery, SearchOptions, SearchResult } from './storage' import { SearchOptions, SearchQuery, SearchResult, SortingOrder } from './storage'
import { Tx, TxCUD, TxCollectionCUD, TxCreateDoc, TxProcessor, TxUpdateDoc } from './tx' import { Tx, TxCUD, TxCollectionCUD, TxCreateDoc, TxProcessor, TxUpdateDoc } from './tx'
import { toFindResult } from './utils' import { toFindResult } from './utils'
@ -46,10 +46,17 @@ export interface Client extends Storage, FulltextStorage {
close: () => Promise<void> close: () => Promise<void>
} }
export type MeasureDoneOperation = () => Promise<{ time: number, serverTime: number }>
export interface MeasureClient extends Client {
// Will perform on server operation measure and will return a local client time and on server time
measure: (operationName: string) => Promise<MeasureDoneOperation>
}
/** /**
* @public * @public
*/ */
export interface AccountClient extends Client { export interface AccountClient extends MeasureClient {
getAccount: () => Promise<Account> getAccount: () => Promise<Account>
} }
@ -86,9 +93,11 @@ export interface ClientConnection extends Storage, FulltextStorage, BackupClient
// If hash is passed, will return LoadModelResponse // If hash is passed, will return LoadModelResponse
loadModel: (last: Timestamp, hash?: string) => Promise<Tx[] | LoadModelResponse> loadModel: (last: Timestamp, hash?: string) => Promise<Tx[] | LoadModelResponse>
getAccount: () => Promise<Account> getAccount: () => Promise<Account>
measure: (operationName: string) => Promise<MeasureDoneOperation>
} }
class ClientImpl implements AccountClient, BackupClient { class ClientImpl implements AccountClient, BackupClient, MeasureClient {
notify?: (tx: Tx) => void notify?: (tx: Tx) => void
hierarchy!: Hierarchy hierarchy!: Hierarchy
model!: ModelDb model!: ModelDb
@ -151,6 +160,10 @@ class ClientImpl implements AccountClient, BackupClient {
return result return result
} }
async measure (operationName: string): Promise<MeasureDoneOperation> {
return await this.conn.measure(operationName)
}
async updateFromRemote (tx: Tx): Promise<void> { async updateFromRemote (tx: Tx): Promise<void> {
if (tx.objectSpace === core.space.Model) { if (tx.objectSpace === core.space.Model) {
this.hierarchy.tx(tx) this.hierarchy.tx(tx)
@ -402,14 +415,14 @@ async function buildModel (
try { try {
hierarchy.tx(tx) hierarchy.tx(tx)
} catch (err: any) { } catch (err: any) {
console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) console.error('failed to apply model transaction, skipping', tx._id, tx._class, err?.message)
} }
} }
for (const tx of txes) { for (const tx of txes) {
try { try {
await model.tx(tx) await model.tx(tx)
} catch (err: any) { } catch (err: any) {
console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) console.error('failed to apply model transaction, skipping', tx._id, tx._class, err?.message)
} }
} }
} }

View File

@ -13,13 +13,18 @@ export class MeasureMetricsContext implements MeasureContext {
metrics: Metrics metrics: Metrics
private readonly done: (value?: number) => void private readonly done: (value?: number) => void
constructor (name: string, params: Record<string, ParamType>, metrics: Metrics = newMetrics()) { constructor (
name: string,
params: Record<string, ParamType>,
metrics: Metrics = newMetrics(),
logger?: MeasureLogger
) {
this.name = name this.name = name
this.params = params this.params = params
this.metrics = metrics this.metrics = metrics
this.done = measure(metrics, params) this.done = measure(metrics, params)
this.logger = { this.logger = logger ?? {
info: (msg, args) => { info: (msg, args) => {
console.info(msg, ...args) console.info(msg, ...args)
}, },
@ -34,8 +39,8 @@ export class MeasureMetricsContext implements MeasureContext {
c.done(value) c.done(value)
} }
newChild (name: string, params: Record<string, ParamType>): MeasureContext { newChild (name: string, params: Record<string, ParamType>, logger?: MeasureLogger): MeasureContext {
return new MeasureMetricsContext(name, params, childMetrics(this.metrics, [name])) return new MeasureMetricsContext(name, params, childMetrics(this.metrics, [name]), logger)
} }
async with<T>( async with<T>(
@ -52,13 +57,17 @@ export class MeasureMetricsContext implements MeasureContext {
c.end() c.end()
return value return value
} catch (err: any) { } catch (err: any) {
await c.error(err) await c.error('Error during:' + name, err)
throw err throw err
} }
} }
async error (err: Error | string): Promise<void> { async error (message: string, ...args: any[]): Promise<void> {
console.error(err) this.logger.error(message, args)
}
async info (message: string, ...args: any[]): Promise<void> {
this.logger.info(message, args)
} }
end (): void { end (): void {

View File

@ -31,7 +31,7 @@ export interface MeasureLogger {
*/ */
export interface MeasureContext { export interface MeasureContext {
// Create a child metrics context // Create a child metrics context
newChild: (name: string, params: Record<string, ParamType>) => MeasureContext newChild: (name: string, params: Record<string, ParamType>, logger?: MeasureLogger) => MeasureContext
with: <T>(name: string, params: Record<string, ParamType>, op: (ctx: MeasureContext) => T | Promise<T>) => Promise<T> with: <T>(name: string, params: Record<string, ParamType>, op: (ctx: MeasureContext) => T | Promise<T>) => Promise<T>
@ -40,7 +40,8 @@ export interface MeasureContext {
measure: (name: string, value: number) => void measure: (name: string, value: number) => void
// Capture error // Capture error
error: (err: Error | string | any) => Promise<void> error: (message: string, ...args: any[]) => Promise<void>
info: (message: string, ...args: any[]) => Promise<void>
// Mark current context as complete // Mark current context as complete
// If no value is passed, time difference will be used. // If no value is passed, time difference will be used.

View File

@ -14,7 +14,9 @@ import {
toFindResult, toFindResult,
type SearchQuery, type SearchQuery,
type SearchOptions, type SearchOptions,
type SearchResult type SearchResult,
type MeasureClient,
type MeasureDoneOperation
} from '@hcengineering/core' } from '@hcengineering/core'
import { type Resource } from '@hcengineering/platform' import { type Resource } from '@hcengineering/platform'
@ -62,7 +64,7 @@ export type PresentationMiddlewareCreator = (client: Client, next?: Presentation
/** /**
* @public * @public
*/ */
export interface PresentationPipeline extends Client, Exclude<PresentationMiddleware, 'next'> { export interface PresentationPipeline extends MeasureClient, Exclude<PresentationMiddleware, 'next'> {
close: () => Promise<void> close: () => Promise<void>
} }
@ -72,7 +74,7 @@ export interface PresentationPipeline extends Client, Exclude<PresentationMiddle
export class PresentationPipelineImpl implements PresentationPipeline { export class PresentationPipelineImpl implements PresentationPipeline {
private head: PresentationMiddleware | undefined private head: PresentationMiddleware | undefined
private constructor (readonly client: Client) {} private constructor (readonly client: MeasureClient) {}
getHierarchy (): Hierarchy { getHierarchy (): Hierarchy {
return this.client.getHierarchy() return this.client.getHierarchy()
@ -86,7 +88,11 @@ export class PresentationPipelineImpl implements PresentationPipeline {
await this.head?.notifyTx(tx) await this.head?.notifyTx(tx)
} }
static create (client: Client, constructors: PresentationMiddlewareCreator[]): PresentationPipeline { async measure (operationName: string): Promise<MeasureDoneOperation> {
return await this.client.measure(operationName)
}
static create (client: MeasureClient, constructors: PresentationMiddlewareCreator[]): PresentationPipeline {
const pipeline = new PresentationPipelineImpl(client) const pipeline = new PresentationPipelineImpl(client)
pipeline.head = pipeline.buildChain(constructors) pipeline.head = pipeline.buildChain(constructors)
return pipeline return pipeline

View File

@ -16,7 +16,6 @@
import core, { import core, {
TxOperations, TxOperations,
type TypeAny,
getCurrentAccount, getCurrentAccount,
type AnyAttribute, type AnyAttribute,
type ArrOf, type ArrOf,
@ -29,6 +28,8 @@ import core, {
type FindOptions, type FindOptions,
type FindResult, type FindResult,
type Hierarchy, type Hierarchy,
type MeasureClient,
type MeasureDoneOperation,
type Mixin, type Mixin,
type Obj, type Obj,
type Ref, type Ref,
@ -38,6 +39,7 @@ import core, {
type SearchResult, type SearchResult,
type Tx, type Tx,
type TxResult, type TxResult,
type TypeAny,
type WithLookup type WithLookup
} from '@hcengineering/core' } from '@hcengineering/core'
import { getMetadata, getResource } from '@hcengineering/platform' import { getMetadata, getResource } from '@hcengineering/platform'
@ -51,7 +53,7 @@ import { OptimizeQueryMiddleware, PresentationPipelineImpl, type PresentationPip
import plugin from './plugin' import plugin from './plugin'
let liveQuery: LQ let liveQuery: LQ
let client: TxOperations let client: TxOperations & MeasureClient
let pipeline: PresentationPipeline let pipeline: PresentationPipeline
const txListeners: Array<(tx: Tx) => void> = [] const txListeners: Array<(tx: Tx) => void> = []
@ -73,14 +75,35 @@ export function removeTxListener (l: (tx: Tx) => void): void {
} }
} }
class UIClient extends TxOperations implements Client { class UIClient extends TxOperations implements Client, MeasureClient {
constructor ( constructor (
client: Client, client: MeasureClient,
private readonly liveQuery: Client private readonly liveQuery: Client
) { ) {
super(client, getCurrentAccount()._id) super(client, getCurrentAccount()._id)
} }
afterMeasure: Tx[] = []
measureOp?: MeasureDoneOperation
async doNotify (tx: Tx): Promise<void> {
if (this.measureOp !== undefined) {
this.afterMeasure.push(tx)
} else {
try {
await pipeline.notifyTx(tx)
await liveQuery.tx(tx)
txListeners.forEach((it) => {
it(tx)
})
} catch (err: any) {
console.log(err)
}
}
}
override async findAll<T extends Doc>( override async findAll<T extends Doc>(
_class: Ref<Class<T>>, _class: Ref<Class<T>>,
query: DocumentQuery<T>, query: DocumentQuery<T>,
@ -104,19 +127,38 @@ class UIClient extends TxOperations implements Client {
async searchFulltext (query: SearchQuery, options: SearchOptions): Promise<SearchResult> { async searchFulltext (query: SearchQuery, options: SearchOptions): Promise<SearchResult> {
return await this.client.searchFulltext(query, options) return await this.client.searchFulltext(query, options)
} }
async measure (operationName: string): Promise<MeasureDoneOperation> {
// return await (this.client as MeasureClient).measure(operationName)
const mop = await (this.client as MeasureClient).measure(operationName)
this.measureOp = mop
return async () => {
const result = await mop()
this.measureOp = undefined
if (this.afterMeasure.length > 0) {
const txes = this.afterMeasure
console.log('after measture', txes)
this.afterMeasure = []
for (const tx of txes) {
await this.doNotify(tx)
}
}
return result
}
}
} }
/** /**
* @public * @public
*/ */
export function getClient (): TxOperations { export function getClient (): TxOperations & MeasureClient {
return client return client
} }
/** /**
* @public * @public
*/ */
export async function setClient (_client: Client): Promise<void> { export async function setClient (_client: MeasureClient): Promise<void> {
if (liveQuery !== undefined) { if (liveQuery !== undefined) {
await liveQuery.close() await liveQuery.close()
} }
@ -131,20 +173,11 @@ export async function setClient (_client: Client): Promise<void> {
const needRefresh = liveQuery !== undefined const needRefresh = liveQuery !== undefined
liveQuery = new LQ(pipeline) liveQuery = new LQ(pipeline)
client = new UIClient(pipeline, liveQuery) const uiClient = new UIClient(pipeline, liveQuery)
client = uiClient
_client.notify = (tx: Tx) => { _client.notify = (tx: Tx) => {
pipeline.notifyTx(tx).catch((err) => { void uiClient.doNotify(tx)
console.log(err)
})
liveQuery.tx(tx).catch((err) => {
console.log(err)
})
txListeners.forEach((it) => {
it(tx)
})
} }
if (needRefresh || globalQueries.length > 0) { if (needRefresh || globalQueries.length > 0) {
await refreshClient() await refreshClient()

View File

@ -95,6 +95,7 @@ FulltextStorage & {
searchFulltext: async (query: SearchQuery, options: SearchOptions): Promise<SearchResult> => { searchFulltext: async (query: SearchQuery, options: SearchOptions): Promise<SearchResult> => {
return { docs: [] } return { docs: [] }
} },
measure: async () => async () => ({ time: 0, serverTime: 0 })
} }
} }

View File

@ -25,6 +25,7 @@
export let bordered: boolean = false export let bordered: boolean = false
export let expandable = true export let expandable = true
export let contentColor = false export let contentColor = false
export let showChevron = true
</script> </script>
<div class="flex-col"> <div class="flex-col">
@ -38,7 +39,7 @@
if (expandable) expanded = !expanded if (expandable) expanded = !expanded
}} }}
> >
<Chevron {expanded} marginRight={'.5rem'} /> <Chevron {expanded} marginRight={'.5rem'} fill={!showChevron ? 'transparent' : undefined} />
{#if icon} {#if icon}
<div class="min-w-4 mr-2"> <div class="min-w-4 mr-2">
<Icon {icon} size={'small'} /> <Icon {icon} size={'small'} />

View File

@ -38,7 +38,8 @@ import core, {
generateId, generateId,
SearchQuery, SearchQuery,
SearchOptions, SearchOptions,
SearchResult SearchResult,
MeasureDoneOperation
} from '@hcengineering/core' } from '@hcengineering/core'
import { PlatformError, UNAUTHORIZED, broadcastEvent, getMetadata, unknownError } from '@hcengineering/platform' import { PlatformError, UNAUTHORIZED, broadcastEvent, getMetadata, unknownError } from '@hcengineering/platform'
@ -376,6 +377,27 @@ class Connection implements ClientConnection {
return await promise.promise return await promise.promise
} }
async measure (operationName: string): Promise<MeasureDoneOperation> {
const dateNow = Date.now()
// Send measure-start
const mid = await this.sendRequest({
method: 'measure',
params: [operationName]
})
return async () => {
const serverTime: number = await this.sendRequest({
method: 'measure-done',
params: [operationName, mid]
})
return {
time: Date.now() - dateNow,
serverTime
}
}
}
async loadModel (last: Timestamp, hash?: string): Promise<Tx[] | LoadModelResponse> { async loadModel (last: Timestamp, hash?: string): Promise<Tx[] | LoadModelResponse> {
return await this.sendRequest({ method: 'loadModel', params: [last, hash] }) return await this.sendRequest({ method: 'loadModel', params: [last, hash] })
} }

View File

@ -16,9 +16,6 @@
import type { AccountClient, ClientConnectEvent } from '@hcengineering/core' import type { AccountClient, ClientConnectEvent } from '@hcengineering/core'
import type { Plugin, Resource } from '@hcengineering/platform' import type { Plugin, Resource } from '@hcengineering/platform'
import { Metadata, plugin } from '@hcengineering/platform' import { Metadata, plugin } from '@hcengineering/platform'
// import type { LiveQuery } from '@hcengineering/query'
// export type Connection = Client & LiveQuery & TxOperations
/** /**
* @public * @public

View File

@ -30,7 +30,8 @@ import core, {
type WithLookup, type WithLookup,
type SearchQuery, type SearchQuery,
type SearchOptions, type SearchOptions,
type SearchResult type SearchResult,
type MeasureDoneOperation
} from '@hcengineering/core' } from '@hcengineering/core'
import { devModelId } from '@hcengineering/devmodel' import { devModelId } from '@hcengineering/devmodel'
import { Builder } from '@hcengineering/model' import { Builder } from '@hcengineering/model'
@ -68,6 +69,10 @@ class ModelClient implements AccountClient {
} }
} }
async measure (operationName: string): Promise<MeasureDoneOperation> {
return await this.client.measure(operationName)
}
notify?: (tx: Tx) => void notify?: (tx: Tx) => void
getHierarchy (): Hierarchy { getHierarchy (): Hierarchy {

View File

@ -352,97 +352,105 @@
return return
} }
const operations = client.apply(_id) // TODO: We need a measure client and mark all operations with it as measure under one root,
// to prevent other operations to infer our measurement.
const doneOp = await getClient().measure('tracker.createIssue')
const lastOne = await client.findOne<Issue>(tracker.class.Issue, {}, { sort: { rank: SortingOrder.Descending } }) try {
const incResult = await client.updateDoc( const operations = client.apply(_id)
tracker.class.Project,
core.space.Space,
_space,
{
$inc: { sequence: 1 }
},
true
)
const value: DocData<Issue> = { const lastOne = await client.findOne<Issue>(tracker.class.Issue, {}, { sort: { rank: SortingOrder.Descending } })
title: getTitle(object.title), const incResult = await client.updateDoc(
description: object.description, tracker.class.Project,
assignee: object.assignee, core.space.Space,
component: object.component, _space,
milestone: object.milestone, {
number: (incResult as any).object.sequence, $inc: { sequence: 1 }
status: object.status, },
priority: object.priority, true
rank: calcRank(lastOne, undefined), )
comments: 0,
subIssues: 0,
dueDate: object.dueDate,
parents:
parentIssue != null
? [
{ parentId: parentIssue._id, parentTitle: parentIssue.title, space: parentIssue.space },
...parentIssue.parents
]
: [],
reportedTime: 0,
remainingTime: 0,
estimation: object.estimation,
reports: 0,
relations: relatedTo !== undefined ? [{ _id: relatedTo._id, _class: relatedTo._class }] : [],
childInfo: [],
kind
}
await docCreateManager.commit(operations, _id, _space, value) const value: DocData<Issue> = {
title: getTitle(object.title),
description: object.description,
assignee: object.assignee,
component: object.component,
milestone: object.milestone,
number: (incResult as any).object.sequence,
status: object.status,
priority: object.priority,
rank: calcRank(lastOne, undefined),
comments: 0,
subIssues: 0,
dueDate: object.dueDate,
parents:
parentIssue != null
? [
{ parentId: parentIssue._id, parentTitle: parentIssue.title, space: parentIssue.space },
...parentIssue.parents
]
: [],
reportedTime: 0,
remainingTime: 0,
estimation: object.estimation,
reports: 0,
relations: relatedTo !== undefined ? [{ _id: relatedTo._id, _class: relatedTo._class }] : [],
childInfo: [],
kind
}
await operations.addCollection( await docCreateManager.commit(operations, _id, _space, value)
tracker.class.Issue,
_space,
parentIssue?._id ?? tracker.ids.NoParent,
parentIssue?._class ?? tracker.class.Issue,
'subIssues',
value,
_id
)
for (const label of object.labels) {
await operations.addCollection(label._class, label.space, _id, tracker.class.Issue, 'labels', {
title: label.title,
color: label.color,
tag: label.tag
})
}
if (relatedTo !== undefined) { await operations.addCollection(
const doc = await client.findOne(tracker.class.Issue, { _id }) tracker.class.Issue,
if (doc !== undefined) { _space,
if (client.getHierarchy().isDerived(relatedTo._class, tracker.class.Issue)) { parentIssue?._id ?? tracker.ids.NoParent,
await updateIssueRelation(operations, relatedTo as Issue, doc, 'relations', '$push') parentIssue?._class ?? tracker.class.Issue,
} else { 'subIssues',
const update = await getResource(chunter.backreference.Update) value,
await update(doc, 'relations', [relatedTo], tracker.string.AddedReference) _id
)
for (const label of object.labels) {
await operations.addCollection(label._class, label.space, _id, tracker.class.Issue, 'labels', {
title: label.title,
color: label.color,
tag: label.tag
})
}
if (relatedTo !== undefined) {
const doc = await client.findOne(tracker.class.Issue, { _id })
if (doc !== undefined) {
if (client.getHierarchy().isDerived(relatedTo._class, tracker.class.Issue)) {
await updateIssueRelation(operations, relatedTo as Issue, doc, 'relations', '$push')
} else {
const update = await getResource(chunter.backreference.Update)
await update(doc, 'relations', [relatedTo], tracker.string.AddedReference)
}
} }
} }
await operations.commit()
await descriptionBox.createAttachments(_id)
addNotification(
await translate(tracker.string.IssueCreated, {}, $themeStore.language),
getTitle(object.title),
IssueNotification,
{
issueId: _id,
subTitlePostfix: (await translate(tracker.string.CreatedOne, {}, $themeStore.language)).toLowerCase(),
issueUrl: currentProject != null && generateIssueShortLink(getIssueId(currentProject, value as Issue))
}
)
console.log('createIssue measure', await doneOp())
draftController.remove()
descriptionBox?.removeDraft(false)
isAssigneeTouched = false
} catch (err: any) {
console.error(err)
await doneOp() // Complete in case of error
} }
await operations.commit()
await descriptionBox.createAttachments(_id)
addNotification(
await translate(tracker.string.IssueCreated, {}, $themeStore.language),
getTitle(object.title),
IssueNotification,
{
issueId: _id,
subTitlePostfix: (await translate(tracker.string.CreatedOne, {}, $themeStore.language)).toLowerCase(),
issueUrl: currentProject != null && generateIssueShortLink(getIssueId(currentProject, value as Issue))
}
)
draftController.remove()
descriptionBox?.removeDraft(false)
isAssigneeTouched = false
} }
async function setParentIssue (): Promise<void> { async function setParentIssue (): Promise<void> {

View File

@ -157,7 +157,9 @@
async function handleSelection (evt: Event, selection: number): Promise<void> { async function handleSelection (evt: Event, selection: number): Promise<void> {
const item = items[selection] const item = items[selection]
if (item == null) {
return
}
if (item.item !== undefined) { if (item.item !== undefined) {
const doc = item.item.doc const doc = item.item.doc
void client.findOne(doc._class, { _id: doc._id }).then((value) => { void client.findOne(doc._class, { _id: doc._id }).then((value) => {

View File

@ -1,6 +1,6 @@
<script lang="ts"> <script lang="ts">
import contact, { PersonAccount } from '@hcengineering/contact' import contact, { PersonAccount } from '@hcengineering/contact'
import { metricsToRows } from '@hcengineering/core' import { Metrics } from '@hcengineering/core'
import login from '@hcengineering/login' import login from '@hcengineering/login'
import { getEmbeddedLabel, getMetadata } from '@hcengineering/platform' import { getEmbeddedLabel, getMetadata } from '@hcengineering/platform'
import presentation, { createQuery } from '@hcengineering/presentation' import presentation, { createQuery } from '@hcengineering/presentation'
@ -9,10 +9,8 @@
IconArrowRight, IconArrowRight,
Loading, Loading,
Panel, Panel,
Scroller,
TabItem, TabItem,
TabList, TabList,
closePopup,
fetchMetadataLocalStorage, fetchMetadataLocalStorage,
ticker ticker
} from '@hcengineering/ui' } from '@hcengineering/ui'
@ -20,6 +18,7 @@
import Expandable from '@hcengineering/ui/src/components/Expandable.svelte' import Expandable from '@hcengineering/ui/src/components/Expandable.svelte'
import { ObjectPresenter } from '@hcengineering/view-resources' import { ObjectPresenter } from '@hcengineering/view-resources'
import { onDestroy } from 'svelte' import { onDestroy } from 'svelte'
import MetricsInfo from './statistics/MetricsInfo.svelte'
const _endpoint: string = fetchMetadataLocalStorage(login.metadata.LoginEndpoint) ?? '' const _endpoint: string = fetchMetadataLocalStorage(login.metadata.LoginEndpoint) ?? ''
const token: string = getMetadata(presentation.metadata.Token) ?? '' const token: string = getMetadata(presentation.metadata.Token) ?? ''
@ -33,12 +32,9 @@
let admin = false let admin = false
onDestroy( onDestroy(
ticker.subscribe(() => { ticker.subscribe(() => {
fetch(endpoint + `/api/v1/statistics?token=${token}`, {}).then(async (json) => { void fetch(endpoint + `/api/v1/statistics?token=${token}`, {}).then(async (json) => {
data = await json.json() data = await json.json()
admin = data?.admin ?? false admin = data?.admin ?? false
if (!admin) {
closePopup()
}
}) })
}) })
) )
@ -86,15 +82,34 @@
} }
employees = emp employees = emp
}) })
const toNum = (value: any) => value as number
let warningTimeout = 15 let warningTimeout = 15
$: metricsData = data?.metrics as Metrics | undefined
$: totalStats = Array.from(Object.entries(activeSessions).values()).reduce(
(cur, it) => {
const totalFind = it[1].reduce((it, itm) => itm.current.find + it, 0)
const totalTx = it[1].reduce((it, itm) => itm.current.tx + it, 0)
return {
find: cur.find + totalFind,
tx: cur.tx + totalTx
}
},
{ find: 0, tx: 0 }
)
</script> </script>
<Panel on:close isFullSize useMaxWidth={true}> <Panel on:close isFullSize useMaxWidth={true}>
<svelte:fragment slot="header"> <svelte:fragment slot="header">
{#if data} {#if data}
Mem: {data.statistics.memoryUsed} / {data.statistics.memoryTotal} CPU: {data.statistics.cpuUsage} <div class="flex-col">
<span>
Mem: {data.statistics.memoryUsed} / {data.statistics.memoryTotal} CPU: {data.statistics.cpuUsage}
</span>
<span>
TotalFind: {totalStats.find} / Total Tx: {totalStats.tx}
</span>
</div>
{/if} {/if}
</svelte:fragment> </svelte:fragment>
<svelte:fragment slot="title"> <svelte:fragment slot="title">
@ -118,7 +133,7 @@
icon={IconArrowRight} icon={IconArrowRight}
label={getEmbeddedLabel('Set maintenance warning')} label={getEmbeddedLabel('Set maintenance warning')}
on:click={() => { on:click={() => {
fetch(endpoint + `/api/v1/manage?token=${token}&operation=maintenance&timeout=${warningTimeout}`, { void fetch(endpoint + `/api/v1/manage?token=${token}&operation=maintenance&timeout=${warningTimeout}`, {
method: 'PUT' method: 'PUT'
}) })
}} }}
@ -136,7 +151,7 @@
icon={IconArrowRight} icon={IconArrowRight}
label={getEmbeddedLabel('Reboot server')} label={getEmbeddedLabel('Reboot server')}
on:click={() => { on:click={() => {
fetch(endpoint + `/api/v1/manage?token=${token}&operation=reboot`, { void fetch(endpoint + `/api/v1/manage?token=${token}&operation=reboot`, {
method: 'PUT' method: 'PUT'
}) })
}} }}
@ -151,92 +166,74 @@
{@const totalTx = act[1].reduce((it, itm) => itm.current.tx + it, 0)} {@const totalTx = act[1].reduce((it, itm) => itm.current.tx + it, 0)}
{@const employeeGroups = Array.from(new Set(act[1].map((it) => it.userId)))} {@const employeeGroups = Array.from(new Set(act[1].map((it) => it.userId)))}
<span class="flex-col"> <span class="flex-col">
<div class="fs-title"> <Expandable contentColor expanded={false} expandable={true} bordered>
Workspace: {act[0]}: {act[1].length} current 5 mins => {totalFind}/{totalTx} <svelte:fragment slot="title">
</div> <div class="fs-title">
Workspace: {act[0]}: {act[1].length} current 5 mins => {totalFind}/{totalTx}
<div class="flex-col">
{#each employeeGroups as employeeId}
{@const employee = employees.get(employeeId)}
{@const connections = act[1].filter((it) => it.userId === employeeId)}
{@const find = connections.reduce((it, itm) => itm.current.find + it, 0)}
{@const txes = connections.reduce((it, itm) => itm.current.tx + it, 0)}
<div class="p-1 flex-col">
<Expandable>
<svelte:fragment slot="title">
<div class="flex-row-center p-1">
{#if employee}
<ObjectPresenter
_class={contact.mixin.Employee}
objectId={employee.person}
props={{ shouldShowAvatar: true }}
/>
{:else}
{employeeId}
{/if}
: {connections.length}
<div class="ml-4">
<div class="ml-1">{find}/{txes}</div>
</div>
</div>
</svelte:fragment>
{#each connections as user, i}
<div class="flex-row-center ml-10">
#{i}
{user.userId}
<div class="p-1">
Total: {user.total.find}/{user.total.tx}
</div>
<div class="p-1">
Previous 5 mins: {user.mins5.find}/{user.mins5.tx}
</div>
<div class="p-1">
Current 5 mins: {user.current.find}/{user.current.tx}
</div>
</div>
<div class="p-1 flex-col ml-10">
{#each Object.entries(user.data ?? {}) as [k, v]}
<div class="p-1">
{k}: {JSON.stringify(v)}
</div>
{/each}
</div>
{/each}
</Expandable>
</div> </div>
{/each} </svelte:fragment>
</div> <div class="flex-col">
{#each employeeGroups as employeeId}
{@const employee = employees.get(employeeId)}
{@const connections = act[1].filter((it) => it.userId === employeeId)}
{@const find = connections.reduce((it, itm) => itm.current.find + it, 0)}
{@const txes = connections.reduce((it, itm) => itm.current.tx + it, 0)}
<div class="p-1 flex-col ml-4">
<Expandable>
<svelte:fragment slot="title">
<div class="flex-row-center p-1">
{#if employee}
<ObjectPresenter
_class={contact.mixin.Employee}
objectId={employee.person}
props={{ shouldShowAvatar: true, disabled: true }}
/>
{:else}
{employeeId}
{/if}
: {connections.length}
<div class="ml-4">
<div class="ml-1">{find}/{txes}</div>
</div>
</div>
</svelte:fragment>
{#each connections as user, i}
<div class="flex-row-center ml-10">
#{i}
{user.userId}
<div class="p-1">
Total: {user.total.find}/{user.total.tx}
</div>
<div class="p-1">
Previous 5 mins: {user.mins5.find}/{user.mins5.tx}
</div>
<div class="p-1">
Current 5 mins: {user.current.find}/{user.current.tx}
</div>
</div>
<div class="p-1 flex-col ml-10">
{#each Object.entries(user.data ?? {}) as [k, v]}
<div class="p-1">
{k}: {JSON.stringify(v)}
</div>
{/each}
</div>
{/each}
</Expandable>
</div>
{/each}
</div>
</Expandable>
</span> </span>
{/each} {/each}
</div> </div>
{:else if selectedTab === 'statistics'} {:else if selectedTab === 'statistics'}
<Scroller> <div class="flex-column p-3 h-full" style:overflow="auto">
<table class="antiTable" class:highlightRows={true}> {#if metricsData !== undefined}
<thead class="scroller-thead"> <MetricsInfo metrics={metricsData} />
<tr> {/if}
<th><div class="p-1">Name</div> </th> </div>
<th>Average</th>
<th>Total</th>
<th>Ops</th>
</tr>
</thead>
<tbody>
{#each metricsToRows(data.metrics, 'System') as row}
<tr class="antiTable-body__row">
<td>
<span style={`padding-left: ${toNum(row[0]) + 0.5}rem;`}>
{row[1]}
</span>
</td>
<td>{row[2]}</td>
<td>{row[3]}</td>
<td>{row[4]}</td>
</tr>
{/each}
</tbody>
</table>
</Scroller>
{/if} {/if}
{:else} {:else}
<Loading /> <Loading />

View File

@ -0,0 +1,83 @@
<script lang="ts">
import { Metrics } from '@hcengineering/core'
import { Expandable } from '@hcengineering/ui'
import { FixedColumn } from '@hcengineering/view-resources'
export let metrics: Metrics
export let level = 0
export let name: string = 'System'
$: haschilds = Object.keys(metrics.measurements).length > 0 || Object.keys(metrics.params).length > 0
function showAvg (name: string, time: number, ops: number): string {
if (name.startsWith('#')) {
return `➿ ${time}`
}
if (ops === 0) {
return `⏱️ ${time}`
}
return `${Math.floor((time / ops) * 100) / 100}`
}
</script>
<Expandable
expanded={level === 0}
expandable={level !== 0 && haschilds}
bordered
showChevron={haschilds && level !== 0}
contentColor
>
<svelte:fragment slot="title">
<div class="flex-row-center flex-between flex-grow ml-2">
{name}
</div>
</svelte:fragment>
<svelte:fragment slot="tools">
<FixedColumn key="row">
<div class="flex-row-center flex-between">
<FixedColumn key="ops">
<span class="p-1">
{metrics.operations}
</span>
</FixedColumn>
<FixedColumn key="time">
<span class="p-1">
{showAvg(name, metrics.value, metrics.operations)}
</span>
</FixedColumn>
<FixedColumn key="time-full">
<span class="p-1">
{metrics.value}
</span>
</FixedColumn>
</div>
</FixedColumn>
</svelte:fragment>
{#each Object.entries(metrics.measurements) as [k, v], i}
<div style:margin-left={`${level * 0.5}rem`}>
<svelte:self metrics={v} name="{i}. {k}" level={level + 1} />
</div>
{/each}
{#each Object.entries(metrics.params) as [k, v], i}
<div style:margin-left={`${level * 0.5}rem`}>
{#each Object.entries(v).toSorted((a, b) => b[1].value / (b[1].operations + 1) - a[1].value / (a[1].operations + 1)) as [kk, vv]}
<Expandable expandable={false} bordered showChevron={false} contentColor>
<svelte:fragment slot="title">
<div class="flex-row-center flex-between flex-grow">
# {k} = {kk}
</div>
</svelte:fragment>
<svelte:fragment slot="tools">
<FixedColumn key="row">
<div class="flex-row-center flex-between">
<FixedColumn key="ops">{vv.operations}</FixedColumn>
<FixedColumn key="time">{showAvg(kk, vv.value, vv.operations)}</FixedColumn>
<FixedColumn key="time-full">{vv.value}</FixedColumn>
</div>
</FixedColumn>
</svelte:fragment>
</Expandable>
{/each}
</div>
{/each}
</Expandable>

View File

@ -218,7 +218,7 @@ export function start (
QueryJoinMiddleware.create // Should be last one QueryJoinMiddleware.create // Should be last one
] ]
const metrics = getMetricsContext().newChild('indexing', {}) const metrics = getMetricsContext()
function createIndexStages ( function createIndexStages (
fullText: MeasureContext, fullText: MeasureContext,
workspace: WorkspaceId, workspace: WorkspaceId,
@ -270,6 +270,7 @@ export function start (
} }
const pipelineFactory: PipelineFactory = (ctx, workspace, upgrade, broadcast) => { const pipelineFactory: PipelineFactory = (ctx, workspace, upgrade, broadcast) => {
const wsMetrics = metrics.newChild('🧲 ' + workspace.name, {})
const conf: DbConfiguration = { const conf: DbConfiguration = {
domains: { domains: {
[DOMAIN_TX]: 'MongoTx', [DOMAIN_TX]: 'MongoTx',
@ -278,7 +279,7 @@ export function start (
[DOMAIN_FULLTEXT_BLOB]: 'FullTextBlob', [DOMAIN_FULLTEXT_BLOB]: 'FullTextBlob',
[DOMAIN_MODEL]: 'Null' [DOMAIN_MODEL]: 'Null'
}, },
metrics, metrics: wsMetrics,
defaultAdapter: 'Mongo', defaultAdapter: 'Mongo',
adapters: { adapters: {
MongoTx: { MongoTx: {
@ -310,7 +311,14 @@ export function start (
factory: createElasticAdapter, factory: createElasticAdapter,
url: opt.fullTextUrl, url: opt.fullTextUrl,
stages: (adapter, storage, storageAdapter, contentAdapter) => stages: (adapter, storage, storageAdapter, contentAdapter) =>
createIndexStages(metrics.newChild('stages', {}), workspace, adapter, storage, storageAdapter, contentAdapter) createIndexStages(
wsMetrics.newChild('stages', {}),
workspace,
adapter,
storage,
storageAdapter,
contentAdapter
)
}, },
contentAdapters: { contentAdapters: {
Rekoni: { Rekoni: {

View File

@ -13,22 +13,23 @@
// limitations under the License. // limitations under the License.
// //
import activity, { ActivityMessage, DocUpdateMessage, Reaction } from '@hcengineering/activity'
import core, { import core, {
Account, Account,
AttachedDoc, AttachedDoc,
Data, Data,
Doc, Doc,
matchQuery, MeasureContext,
Ref, Ref,
Tx, Tx,
TxCUD,
TxCollectionCUD, TxCollectionCUD,
TxCreateDoc, TxCreateDoc,
TxCUD, TxProcessor,
TxProcessor matchQuery
} from '@hcengineering/core' } from '@hcengineering/core'
import { ActivityControl, DocObjectCache } from '@hcengineering/server-activity' import { ActivityControl, DocObjectCache } from '@hcengineering/server-activity'
import type { TriggerControl } from '@hcengineering/server-core' import type { TriggerControl } from '@hcengineering/server-core'
import activity, { ActivityMessage, DocUpdateMessage, Reaction } from '@hcengineering/activity'
import { import {
createCollabDocInfo, createCollabDocInfo,
createCollaboratorNotifications, createCollaboratorNotifications,
@ -87,6 +88,7 @@ export async function createReactionNotifications (
const messageTx = ( const messageTx = (
await pushDocUpdateMessages( await pushDocUpdateMessages(
control.ctx,
control, control,
res as TxCollectionCUD<Doc, DocUpdateMessage>[], res as TxCollectionCUD<Doc, DocUpdateMessage>[],
parentMessage, parentMessage,
@ -136,6 +138,7 @@ function getDocUpdateMessageTx (
} }
async function pushDocUpdateMessages ( async function pushDocUpdateMessages (
ctx: MeasureContext,
control: ActivityControl, control: ActivityControl,
res: TxCollectionCUD<Doc, DocUpdateMessage>[], res: TxCollectionCUD<Doc, DocUpdateMessage>[],
object: Doc | undefined, object: Doc | undefined,
@ -194,6 +197,7 @@ async function pushDocUpdateMessages (
} }
export async function generateDocUpdateMessages ( export async function generateDocUpdateMessages (
ctx: MeasureContext,
tx: TxCUD<Doc>, tx: TxCUD<Doc>,
control: ActivityControl, control: ActivityControl,
res: TxCollectionCUD<Doc, DocUpdateMessage>[] = [], res: TxCollectionCUD<Doc, DocUpdateMessage>[] = [],
@ -241,7 +245,11 @@ export async function generateDocUpdateMessages (
switch (tx._class) { switch (tx._class) {
case core.class.TxCreateDoc: { case core.class.TxCreateDoc: {
const doc = TxProcessor.createDoc2Doc(tx as TxCreateDoc<Doc>) const doc = TxProcessor.createDoc2Doc(tx as TxCreateDoc<Doc>)
return await pushDocUpdateMessages(control, res, doc, originTx ?? tx, undefined, objectCache) return await ctx.with(
'pushDocUpdateMessages',
{},
async (ctx) => await pushDocUpdateMessages(ctx, control, res, doc, originTx ?? tx, undefined, objectCache)
)
} }
case core.class.TxMixin: case core.class.TxMixin:
case core.class.TxUpdateDoc: { case core.class.TxUpdateDoc: {
@ -249,17 +257,29 @@ export async function generateDocUpdateMessages (
if (doc === undefined) { if (doc === undefined) {
doc = (await control.findAll(tx.objectClass, { _id: tx.objectId }, { limit: 1 }))[0] doc = (await control.findAll(tx.objectClass, { _id: tx.objectId }, { limit: 1 }))[0]
} }
return await pushDocUpdateMessages(control, res, doc ?? undefined, originTx ?? tx, undefined, objectCache) return await ctx.with(
'pushDocUpdateMessages',
{},
async (ctx) =>
await pushDocUpdateMessages(ctx, control, res, doc ?? undefined, originTx ?? tx, undefined, objectCache)
)
} }
case core.class.TxCollectionCUD: { case core.class.TxCollectionCUD: {
const actualTx = TxProcessor.extractTx(tx) as TxCUD<Doc> const actualTx = TxProcessor.extractTx(tx) as TxCUD<Doc>
res = await generateDocUpdateMessages(actualTx, control, res, tx, objectCache) res = await generateDocUpdateMessages(ctx, actualTx, control, res, tx, objectCache)
if ([core.class.TxCreateDoc, core.class.TxRemoveDoc, core.class.TxUpdateDoc].includes(actualTx._class)) { if ([core.class.TxCreateDoc, core.class.TxRemoveDoc, core.class.TxUpdateDoc].includes(actualTx._class)) {
let doc = objectCache?.docs?.get(tx.objectId) let doc = objectCache?.docs?.get(tx.objectId)
if (doc === undefined) { if (doc === undefined) {
doc = (await control.findAll(tx.objectClass, { _id: tx.objectId }, { limit: 1 }))[0] doc = (await control.findAll(tx.objectClass, { _id: tx.objectId }, { limit: 1 }))[0]
} }
return await pushDocUpdateMessages(control, res, doc ?? undefined, originTx ?? tx, undefined, objectCache) if (doc !== undefined) {
return await ctx.with(
'pushDocUpdateMessages',
{},
async (ctx) =>
await pushDocUpdateMessages(ctx, control, res, doc ?? undefined, originTx ?? tx, undefined, objectCache)
)
}
} }
return res return res
} }
@ -273,10 +293,18 @@ async function ActivityMessagesHandler (tx: TxCUD<Doc>, control: TriggerControl)
return [] return []
} }
const txes = await generateDocUpdateMessages(tx, control) const txes = await control.ctx.with(
'generateDocUpdateMessages',
{},
async (ctx) => await generateDocUpdateMessages(ctx, tx, control)
)
const messages = txes.map((messageTx) => TxProcessor.createDoc2Doc(messageTx.tx as TxCreateDoc<DocUpdateMessage>)) const messages = txes.map((messageTx) => TxProcessor.createDoc2Doc(messageTx.tx as TxCreateDoc<DocUpdateMessage>))
const notificationTxes = await createCollaboratorNotifications(tx, control, messages) const notificationTxes = await control.ctx.with(
'createNotificationTxes',
{},
async (ctx) => await createCollaboratorNotifications(ctx, tx, control, messages)
)
return [...txes, ...notificationTxes] return [...txes, ...notificationTxes]
} }

View File

@ -28,6 +28,7 @@ import core, {
Doc, Doc,
DocumentUpdate, DocumentUpdate,
Hierarchy, Hierarchy,
MeasureContext,
MixinUpdate, MixinUpdate,
Ref, Ref,
RefTo, RefTo,
@ -746,7 +747,7 @@ async function collectionCollabDoc (
activityMessages: ActivityMessage[] activityMessages: ActivityMessage[]
): Promise<Tx[]> { ): Promise<Tx[]> {
const actualTx = TxProcessor.extractTx(tx) as TxCUD<Doc> const actualTx = TxProcessor.extractTx(tx) as TxCUD<Doc>
let res = await createCollaboratorNotifications(actualTx, control, activityMessages, tx) let res = await createCollaboratorNotifications(control.ctx, actualTx, control, activityMessages, tx)
if (![core.class.TxCreateDoc, core.class.TxRemoveDoc, core.class.TxUpdateDoc].includes(actualTx._class)) { if (![core.class.TxCreateDoc, core.class.TxRemoveDoc, core.class.TxUpdateDoc].includes(actualTx._class)) {
return res return res
@ -959,6 +960,7 @@ export async function OnAttributeUpdate (tx: Tx, control: TriggerControl): Promi
} }
export async function createCollaboratorNotifications ( export async function createCollaboratorNotifications (
ctx: MeasureContext,
tx: TxCUD<Doc>, tx: TxCUD<Doc>,
control: TriggerControl, control: TriggerControl,
activityMessages: ActivityMessage[], activityMessages: ActivityMessage[],
@ -992,7 +994,7 @@ async function OnChatMessageCreate (tx: TxCollectionCUD<Doc, ChatMessage>, contr
const createTx = TxProcessor.extractTx(tx) as TxCreateDoc<ChatMessage> const createTx = TxProcessor.extractTx(tx) as TxCreateDoc<ChatMessage>
const message = (await control.findAll(chunter.class.ChatMessage, { _id: createTx.objectId }))[0] const message = (await control.findAll(chunter.class.ChatMessage, { _id: createTx.objectId }))[0]
return await createCollaboratorNotifications(tx, control, [message]) return await createCollaboratorNotifications(control.ctx, tx, control, [message])
} }
/** /**

View File

@ -47,14 +47,23 @@ export async function createPipeline (
let broadcastHook: HandledBroadcastFunc = (): Tx[] => { let broadcastHook: HandledBroadcastFunc = (): Tx[] => {
return [] return []
} }
const storage = await createServerStorage(conf, { const storage = await ctx.with(
upgrade, 'create-server-storage',
broadcast: (tx: Tx[], targets?: string[]) => { {},
const sendTx = broadcastHook?.(tx, targets) ?? tx async (ctx) =>
broadcast(sendTx, targets) await createServerStorage(ctx, conf, {
} upgrade,
}) broadcast: (tx: Tx[], targets?: string[]) => {
const pipeline = PipelineImpl.create(ctx, storage, constructors, broadcast) const sendTx = broadcastHook?.(tx, targets) ?? tx
broadcast(sendTx, targets)
}
})
)
const pipeline = ctx.with(
'create pipeline',
{},
async (ctx) => await PipelineImpl.create(ctx, storage, constructors, broadcast)
)
const pipelineResult = await pipeline const pipelineResult = await pipeline
broadcastHook = (tx, targets) => { broadcastHook = (tx, targets) => {
return pipelineResult.handleBroadcast(tx, targets) return pipelineResult.handleBroadcast(tx, targets)
@ -92,7 +101,7 @@ class PipelineImpl implements Pipeline {
let current: Middleware | undefined let current: Middleware | undefined
for (let index = constructors.length - 1; index >= 0; index--) { for (let index = constructors.length - 1; index >= 0; index--) {
const element = constructors[index] const element = constructors[index]
current = await element(ctx, broadcast, this.storage, current) current = await ctx.with('build chain', {}, async (ctx) => await element(ctx, broadcast, this.storage, current))
} }
return current return current
} }

View File

@ -20,16 +20,15 @@ import core, {
Class, Class,
ClassifierKind, ClassifierKind,
Collection, Collection,
DOMAIN_DOC_INDEX_STATE,
DOMAIN_MODEL,
DOMAIN_TX,
Doc, Doc,
DocumentQuery, DocumentQuery,
DocumentUpdate, DocumentUpdate,
Domain, Domain,
DOMAIN_DOC_INDEX_STATE,
DOMAIN_MODEL,
DOMAIN_TX,
FindOptions, FindOptions,
FindResult, FindResult,
generateId,
Hierarchy, Hierarchy,
IndexingUpdateEvent, IndexingUpdateEvent,
LoadModelResponse, LoadModelResponse,
@ -37,13 +36,16 @@ import core, {
Mixin, Mixin,
ModelDb, ModelDb,
Ref, Ref,
SearchOptions,
SearchQuery,
SearchResult,
ServerStorage, ServerStorage,
StorageIterator, StorageIterator,
Timestamp, Timestamp,
Tx, Tx,
TxApplyIf, TxApplyIf,
TxCollectionCUD,
TxCUD, TxCUD,
TxCollectionCUD,
TxFactory, TxFactory,
TxProcessor, TxProcessor,
TxRemoveDoc, TxRemoveDoc,
@ -52,9 +54,7 @@ import core, {
TxWorkspaceEvent, TxWorkspaceEvent,
WorkspaceEvent, WorkspaceEvent,
WorkspaceId, WorkspaceId,
SearchQuery, generateId
SearchOptions,
SearchResult
} from '@hcengineering/core' } from '@hcengineering/core'
import { MinioService } from '@hcengineering/minio' import { MinioService } from '@hcengineering/minio'
import { getResource } from '@hcengineering/platform' import { getResource } from '@hcengineering/platform'
@ -74,7 +74,6 @@ import type {
ObjectDDParticipant, ObjectDDParticipant,
TriggerControl TriggerControl
} from './types' } from './types'
import { createFindAll } from './utils'
/** /**
* @public * @public
@ -165,12 +164,16 @@ class TServerStorage implements ServerStorage {
const adapter = this.getAdapter(lastDomain as Domain) const adapter = this.getAdapter(lastDomain as Domain)
const toDelete = part.filter((it) => it._class === core.class.TxRemoveDoc).map((it) => it.objectId) const toDelete = part.filter((it) => it._class === core.class.TxRemoveDoc).map((it) => it.objectId)
const toDeleteDocs = await adapter.load(lastDomain as Domain, toDelete) const toDeleteDocs = await ctx.with(
'adapter-load',
{ domain: lastDomain },
async () => await adapter.load(lastDomain as Domain, toDelete)
)
for (const ddoc of toDeleteDocs) { for (const ddoc of toDeleteDocs) {
removedDocs.set(ddoc._id, ddoc) removedDocs.set(ddoc._id, ddoc)
} }
const r = await adapter.tx(...part) const r = await ctx.with('adapter-tx', {}, async () => await adapter.tx(...part))
if (Array.isArray(r)) { if (Array.isArray(r)) {
result.push(...r) result.push(...r)
} else { } else {
@ -370,13 +373,13 @@ class TServerStorage implements ServerStorage {
query: DocumentQuery<T>, query: DocumentQuery<T>,
options?: FindOptions<T> options?: FindOptions<T>
): Promise<FindResult<T>> { ): Promise<FindResult<T>> {
return await ctx.with('find-all', {}, (ctx) => { const domain = this.hierarchy.getDomain(clazz)
const domain = this.hierarchy.getDomain(clazz) if (query?.$search !== undefined) {
if (query?.$search !== undefined) { return await ctx.with('client-fulltext-find-all', {}, (ctx) => this.fulltext.findAll(ctx, clazz, query, options))
return ctx.with('full-text-find-all', {}, (ctx) => this.fulltext.findAll(ctx, clazz, query, options)) }
} return await ctx.with('client-find-all', { _class: clazz }, () =>
return ctx.with('db-find-all', { d: domain }, () => this.getAdapter(domain).findAll(clazz, query, options)) this.getAdapter(domain).findAll(clazz, query, options)
}) )
} }
async searchFulltext (ctx: MeasureContext, query: SearchQuery, options: SearchOptions): Promise<SearchResult> { async searchFulltext (ctx: MeasureContext, query: SearchQuery, options: SearchOptions): Promise<SearchResult> {
@ -550,13 +553,13 @@ class TServerStorage implements ServerStorage {
): Promise<FindResult<T>> => ): Promise<FindResult<T>> =>
findAll(mctx, clazz, query, options) findAll(mctx, clazz, query, options)
const removed = await ctx.with('process-remove', {}, () => this.processRemove(ctx, txes, findAll, removedMap)) const removed = await ctx.with('process-remove', {}, (ctx) => this.processRemove(ctx, txes, findAll, removedMap))
const collections = await ctx.with('process-collection', {}, () => const collections = await ctx.with('process-collection', {}, (ctx) =>
this.processCollection(ctx, txes, findAll, removedMap) this.processCollection(ctx, txes, findAll, removedMap)
) )
const moves = await ctx.with('process-move', {}, () => this.processMove(ctx, txes, findAll)) const moves = await ctx.with('process-move', {}, (ctx) => this.processMove(ctx, txes, findAll))
const triggerControl: Omit<TriggerControl, 'txFactory'> = { const triggerControl: Omit<TriggerControl, 'txFactory' | 'ctx'> = {
removedMap, removedMap,
workspace: this.workspace, workspace: this.workspace,
fx: triggerFx.fx, fx: triggerFx.fx,
@ -572,15 +575,25 @@ class TServerStorage implements ServerStorage {
triggerFx.fx(() => f(adapter, this.workspace)) triggerFx.fx(() => f(adapter, this.workspace))
}, },
findAll: fAll(ctx), findAll: fAll(ctx),
findAllCtx: findAll,
modelDb: this.modelDb, modelDb: this.modelDb,
hierarchy: this.hierarchy, hierarchy: this.hierarchy,
apply: async (tx, broadcast) => { apply: async (tx, broadcast) => {
return await this.apply(ctx, tx, broadcast) return await this.apply(ctx, tx, broadcast)
},
applyCtx: async (ctx, tx, broadcast) => {
return await this.apply(ctx, tx, broadcast)
} }
} }
const triggers = await ctx.with('process-triggers', {}, async (ctx) => { const triggers = await ctx.with('process-triggers', {}, async (ctx) => {
const result: Tx[] = [] const result: Tx[] = []
result.push(...(await this.triggers.apply(ctx, txes, triggerControl))) result.push(
...(await this.triggers.apply(ctx, txes, {
...triggerControl,
ctx,
findAll: fAll(ctx)
}))
)
return result return result
}) })
@ -685,7 +698,18 @@ class TServerStorage implements ServerStorage {
async processTxes (ctx: MeasureContext, txes: Tx[]): Promise<[TxResult, Tx[]]> { async processTxes (ctx: MeasureContext, txes: Tx[]): Promise<[TxResult, Tx[]]> {
// store tx // store tx
const _findAll = createFindAll(this) const _findAll: ServerStorage['findAll'] = async <T extends Doc>(
ctx: MeasureContext,
clazz: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> => {
const domain = this.hierarchy.getDomain(clazz)
if (query?.$search !== undefined) {
return await ctx.with('full-text-find-all', {}, (ctx) => this.fulltext.findAll(ctx, clazz, query, options))
}
return await ctx.with('find-all', { _class: clazz }, () => this.getAdapter(domain).findAll(clazz, query, options))
}
const txToStore: Tx[] = [] const txToStore: Tx[] = []
const modelTx: Tx[] = [] const modelTx: Tx[] = []
const applyTxes: Tx[] = [] const applyTxes: Tx[] = []
@ -748,7 +772,7 @@ class TServerStorage implements ServerStorage {
} }
async tx (ctx: MeasureContext, tx: Tx): Promise<[TxResult, Tx[]]> { async tx (ctx: MeasureContext, tx: Tx): Promise<[TxResult, Tx[]]> {
return await this.processTxes(ctx, [tx]) return await ctx.with('client-tx', { _class: tx._class }, async (ctx) => await this.processTxes(ctx, [tx]))
} }
find (domain: Domain): StorageIterator { find (domain: Domain): StorageIterator {
@ -801,6 +825,7 @@ export interface ServerStorageOptions {
* @public * @public
*/ */
export async function createServerStorage ( export async function createServerStorage (
ctx: MeasureContext,
conf: DbConfiguration, conf: DbConfiguration,
options: ServerStorageOptions options: ServerStorageOptions
): Promise<ServerStorage> { ): Promise<ServerStorage> {
@ -809,63 +834,65 @@ export async function createServerStorage (
const adapters = new Map<string, DbAdapter>() const adapters = new Map<string, DbAdapter>()
const modelDb = new ModelDb(hierarchy) const modelDb = new ModelDb(hierarchy)
console.timeLog(conf.workspace.name, 'create server storage')
const storageAdapter = conf.storageFactory?.() const storageAdapter = conf.storageFactory?.()
for (const key in conf.adapters) { for (const key in conf.adapters) {
const adapterConf = conf.adapters[key] const adapterConf = conf.adapters[key]
adapters.set(key, await adapterConf.factory(hierarchy, adapterConf.url, conf.workspace, modelDb, storageAdapter)) adapters.set(key, await adapterConf.factory(hierarchy, adapterConf.url, conf.workspace, modelDb, storageAdapter))
console.timeLog(conf.workspace.name, 'adapter', key)
} }
const txAdapter = adapters.get(conf.domains[DOMAIN_TX]) as TxAdapter const txAdapter = adapters.get(conf.domains[DOMAIN_TX]) as TxAdapter
if (txAdapter === undefined) {
console.log('no txadapter found')
}
console.timeLog(conf.workspace.name, 'begin get model') const model = await ctx.with('get model', {}, async (ctx) => {
const model = await txAdapter.getModel() const model = await txAdapter.getModel()
console.timeLog(conf.workspace.name, 'get model') for (const tx of model) {
for (const tx of model) { try {
try { hierarchy.tx(tx)
hierarchy.tx(tx) await triggers.tx(tx)
await triggers.tx(tx) } catch (err: any) {
} catch (err: any) { console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err)
console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) }
} }
} for (const tx of model) {
console.timeLog(conf.workspace.name, 'finish hierarchy') try {
await modelDb.tx(tx)
for (const tx of model) { } catch (err: any) {
try { console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err)
await modelDb.tx(tx) }
} catch (err: any) {
console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err)
} }
} return model
console.timeLog(conf.workspace.name, 'finish local model') })
for (const [adn, adapter] of adapters) { for (const [adn, adapter] of adapters) {
await adapter.init(model) await ctx.with('init-adapter', { name: adn }, async (ctx) => {
console.timeLog(conf.workspace.name, 'finish init adapter', adn) await adapter.init(model)
})
} }
const fulltextAdapter = await conf.fulltextAdapter.factory( const fulltextAdapter = await ctx.with(
conf.fulltextAdapter.url, 'create full text adapter',
conf.workspace, {},
conf.metrics.newChild('fulltext', {}) async (ctx) =>
await conf.fulltextAdapter.factory(
conf.fulltextAdapter.url,
conf.workspace,
conf.metrics.newChild('🗒️ fulltext', {})
)
) )
console.timeLog(conf.workspace.name, 'finish fulltext adapter')
const metrics = conf.metrics.newChild('server-storage', {}) const metrics = conf.metrics.newChild('📔 server-storage', {})
const contentAdapter = await createContentAdapter( const contentAdapter = await ctx.with(
conf.contentAdapters, 'create content adapter',
conf.defaultContentAdapter, {},
conf.workspace, async (ctx) =>
metrics.newChild('content', {}) await createContentAdapter(
conf.contentAdapters,
conf.defaultContentAdapter,
conf.workspace,
metrics.newChild('content', {})
)
) )
console.timeLog(conf.workspace.name, 'finish content adapter')
const defaultAdapter = adapters.get(conf.defaultAdapter) const defaultAdapter = adapters.get(conf.defaultAdapter)
if (defaultAdapter === undefined) { if (defaultAdapter === undefined) {
@ -877,7 +904,6 @@ export async function createServerStorage (
throw new Error('No storage adapter') throw new Error('No storage adapter')
} }
const stages = conf.fulltextAdapter.stages(fulltextAdapter, storage, storageAdapter, contentAdapter) const stages = conf.fulltextAdapter.stages(fulltextAdapter, storage, storageAdapter, contentAdapter)
console.timeLog(conf.workspace.name, 'finish index pipeline stages')
const indexer = new FullTextIndexPipeline( const indexer = new FullTextIndexPipeline(
defaultAdapter, defaultAdapter,
@ -903,7 +929,6 @@ export async function createServerStorage (
options.broadcast?.([tx]) options.broadcast?.([tx])
} }
) )
console.timeLog(conf.workspace.name, 'finish create indexer')
return new FullTextIndex( return new FullTextIndex(
hierarchy, hierarchy,
fulltextAdapter, fulltextAdapter,

View File

@ -70,7 +70,17 @@ export class Triggers {
if (matches.length > 0) { if (matches.length > 0) {
await ctx.with(resource, {}, async (ctx) => { await ctx.with(resource, {}, async (ctx) => {
for (const tx of matches) { for (const tx of matches) {
result.push(...(await trigger(tx, { ...ctrl, txFactory: new TxFactory(tx.modifiedBy, true) }))) result.push(
...(await trigger(tx, {
...ctrl,
ctx,
txFactory: new TxFactory(tx.modifiedBy, true),
findAll: async (clazz, query, options) => await ctrl.findAllCtx(ctx, clazz, query, options),
apply: async (tx, broadcast) => {
return await ctrl.applyCtx(ctx, tx, broadcast)
}
}))
)
} }
}) })
} }

View File

@ -113,13 +113,23 @@ export interface Pipeline extends LowLevelStorage {
* @public * @public
*/ */
export interface TriggerControl { export interface TriggerControl {
ctx: MeasureContext
workspace: WorkspaceId workspace: WorkspaceId
txFactory: TxFactory txFactory: TxFactory
findAll: Storage['findAll'] findAll: Storage['findAll']
findAllCtx: <T extends Doc>(
ctx: MeasureContext,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
) => Promise<FindResult<T>>
hierarchy: Hierarchy hierarchy: Hierarchy
modelDb: ModelDb modelDb: ModelDb
removedMap: Map<Ref<Doc>, Doc> removedMap: Map<Ref<Doc>, Doc>
// // An object cache,
// getCachedObject: <T extends Doc>(_class: Ref<Class<T>>, _id: Ref<T>) => Promise<T | undefined>
fulltextFx: (f: (adapter: FullTextAdapter) => Promise<void>) => void fulltextFx: (f: (adapter: FullTextAdapter) => Promise<void>) => void
// Since we don't have other storages let's consider adapter is MinioClient // Since we don't have other storages let's consider adapter is MinioClient
// Later can be replaced with generic one with bucket encapsulated inside. // Later can be replaced with generic one with bucket encapsulated inside.
@ -128,6 +138,7 @@ export interface TriggerControl {
// Bulk operations in case trigger require some // Bulk operations in case trigger require some
apply: (tx: Tx[], broadcast: boolean) => Promise<TxResult> apply: (tx: Tx[], broadcast: boolean) => Promise<TxResult>
applyCtx: (ctx: MeasureContext, tx: Tx[], broadcast: boolean) => Promise<TxResult>
} }
/** /**

View File

@ -1,24 +0,0 @@
import {
Class,
Doc,
DocumentQuery,
FindOptions,
FindResult,
MeasureContext,
Ref,
ServerStorage
} from '@hcengineering/core'
/**
* @public
*/
export function createFindAll (storage: ServerStorage): ServerStorage['findAll'] {
return async <T extends Doc>(
ctx: MeasureContext,
clazz: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> => {
return await storage.findAll(ctx, clazz, query, options)
}
}

View File

@ -78,7 +78,9 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
next?: Middleware next?: Middleware
): Promise<SpaceSecurityMiddleware> { ): Promise<SpaceSecurityMiddleware> {
const res = new SpaceSecurityMiddleware(broadcast, storage, next) const res = new SpaceSecurityMiddleware(broadcast, storage, next)
await res.init(ctx) await ctx.with('space chain', {}, async (ctx) => {
await res.init(ctx)
})
return res return res
} }

View File

@ -160,8 +160,8 @@ describe('mongo operations', () => {
workspace: getWorkspaceId(dbId, ''), workspace: getWorkspaceId(dbId, ''),
storageFactory: () => createNullStorageFactory() storageFactory: () => createNullStorageFactory()
} }
const serverStorage = await createServerStorage(conf, { upgrade: false })
const ctx = new MeasureMetricsContext('client', {}) const ctx = new MeasureMetricsContext('client', {})
const serverStorage = await createServerStorage(ctx, conf, { upgrade: false })
client = await createClient(async (handler) => { client = await createClient(async (handler) => {
const st: ClientConnection = { const st: ClientConnection = {
findAll: async (_class, query, options) => await serverStorage.findAll(ctx, _class, query, options), findAll: async (_class, query, options) => await serverStorage.findAll(ctx, _class, query, options),
@ -174,7 +174,8 @@ describe('mongo operations', () => {
upload: async (domain: Domain, docs: Doc[]) => {}, upload: async (domain: Domain, docs: Doc[]) => {},
clean: async (domain: Domain, docs: Ref<Doc>[]) => {}, clean: async (domain: Domain, docs: Ref<Doc>[]) => {},
loadModel: async () => txes, loadModel: async () => txes,
getAccount: async () => ({}) as any getAccount: async () => ({}) as any,
measure: async () => async () => ({ time: 0, serverTime: 0 })
} }
return st return st
}) })

View File

@ -80,14 +80,20 @@ export class APMMeasureContext implements MeasureContext {
} }
} }
async error (err: any): Promise<void> { async error (message: string, ...args: any[]): Promise<void> {
this.logger.error(message, args)
await new Promise<void>((resolve) => { await new Promise<void>((resolve) => {
this.agent.captureError(err, () => { this.agent.captureError({ message, params: args }, () => {
resolve() resolve()
}) })
}) })
} }
async info (message: string, ...args: any[]): Promise<void> {
this.logger.info(message, args)
}
end (): void { end (): void {
this.transaction?.end() this.transaction?.end()
} }

View File

@ -4,6 +4,7 @@ import { writeFile } from 'fs/promises'
const apmUrl = process.env.APM_SERVER_URL const apmUrl = process.env.APM_SERVER_URL
const metricsFile = process.env.METRICS_FILE const metricsFile = process.env.METRICS_FILE
// const logsRoot = process.env.LOGS_ROOT
const metricsConsole = (process.env.METRICS_CONSOLE ?? 'false') === 'true' const metricsConsole = (process.env.METRICS_CONSOLE ?? 'false') === 'true'
const METRICS_UPDATE_INTERVAL = !metricsConsole ? 1000 : 60000 const METRICS_UPDATE_INTERVAL = !metricsConsole ? 1000 : 60000

View File

@ -135,7 +135,7 @@ export async function initModel (
const result = await db.collection(DOMAIN_TX).insertMany(model as Document[]) const result = await db.collection(DOMAIN_TX).insertMany(model as Document[])
logger.log(`${result.insertedCount} model transactions inserted.`) logger.log(`${result.insertedCount} model transactions inserted.`)
logger.log('creating data...') logger.log('creating data...', transactorUrl)
const connection = (await connect(transactorUrl, workspaceId, undefined, { const connection = (await connect(transactorUrl, workspaceId, undefined, {
model: 'upgrade' model: 'upgrade'
})) as unknown as CoreClient & BackupClient })) as unknown as CoreClient & BackupClient

View File

@ -56,6 +56,7 @@ export class ClientSession implements Session {
total: StatisticsElement = { find: 0, tx: 0 } total: StatisticsElement = { find: 0, tx: 0 }
current: StatisticsElement = { find: 0, tx: 0 } current: StatisticsElement = { find: 0, tx: 0 }
mins5: StatisticsElement = { find: 0, tx: 0 } mins5: StatisticsElement = { find: 0, tx: 0 }
measures: { id: string, message: string, time: 0 }[] = []
constructor ( constructor (
protected readonly broadcast: BroadcastCall, protected readonly broadcast: BroadcastCall,

View File

@ -26,7 +26,7 @@ import core, {
type WorkspaceId type WorkspaceId
} from '@hcengineering/core' } from '@hcengineering/core'
import { unknownError } from '@hcengineering/platform' import { unknownError } from '@hcengineering/platform'
import { readRequest, type HelloRequest, type HelloResponse, type Response } from '@hcengineering/rpc' import { readRequest, type HelloRequest, type HelloResponse, type Request, type Response } from '@hcengineering/rpc'
import type { Pipeline, SessionContext } from '@hcengineering/server-core' import type { Pipeline, SessionContext } from '@hcengineering/server-core'
import { type Token } from '@hcengineering/server-token' import { type Token } from '@hcengineering/server-token'
// import WebSocket, { RawData } from 'ws' // import WebSocket, { RawData } from 'ws'
@ -155,14 +155,14 @@ class TSessionManager implements SessionManager {
} }
async addSession ( async addSession (
ctx: MeasureContext, baseCtx: MeasureContext,
ws: ConnectionSocket, ws: ConnectionSocket,
token: Token, token: Token,
pipelineFactory: PipelineFactory, pipelineFactory: PipelineFactory,
productId: string, productId: string,
sessionId?: string sessionId?: string
): Promise<Session> { ): Promise<{ session: Session, context: MeasureContext } | { upgrade: true }> {
return await ctx.with('add-session', {}, async (ctx) => { return await baseCtx.with('📲 add-session', {}, async (ctx) => {
const wsString = toWorkspaceString(token.workspace, '@') const wsString = toWorkspaceString(token.workspace, '@')
let workspace = this.workspaces.get(wsString) let workspace = this.workspaces.get(wsString)
@ -170,22 +170,29 @@ class TSessionManager implements SessionManager {
workspace = this.workspaces.get(wsString) workspace = this.workspaces.get(wsString)
if (workspace === undefined) { if (workspace === undefined) {
workspace = this.createWorkspace(ctx, pipelineFactory, token) workspace = this.createWorkspace(baseCtx, pipelineFactory, token)
} }
let pipeline: Pipeline let pipeline: Pipeline
if (token.extra?.model === 'upgrade') { if (token.extra?.model === 'upgrade') {
if (workspace.upgrade) { if (workspace.upgrade) {
pipeline = await ctx.with('pipeline', {}, async () => await (workspace as Workspace).pipeline) pipeline = await ctx.with(
'💤 wait ' + token.workspace.name,
{},
async () => await (workspace as Workspace).pipeline
)
} else { } else {
pipeline = await this.createUpgradeSession(token, sessionId, ctx, wsString, workspace, pipelineFactory, ws) pipeline = await this.createUpgradeSession(token, sessionId, ctx, wsString, workspace, pipelineFactory, ws)
} }
} else { } else {
if (workspace.upgrade) { if (workspace.upgrade) {
ws.close() return { upgrade: true }
throw new Error('Upgrade in progress....')
} }
pipeline = await ctx.with('pipeline', {}, async () => await (workspace as Workspace).pipeline) pipeline = await ctx.with(
'💤 wait ' + token.workspace.name,
{},
async () => await (workspace as Workspace).pipeline
)
} }
const session = this.createSession(token, pipeline) const session = this.createSession(token, pipeline)
@ -204,7 +211,7 @@ class TSessionManager implements SessionManager {
session.useCompression session.useCompression
) )
} }
return session return { session, context: workspace.context }
}) })
} }
@ -222,7 +229,7 @@ class TSessionManager implements SessionManager {
} }
// If upgrade client is used. // If upgrade client is used.
// Drop all existing clients // Drop all existing clients
await this.closeAll(ctx, wsString, workspace, 0, 'upgrade') await this.closeAll(wsString, workspace, 0, 'upgrade')
// Wipe workspace and update values. // Wipe workspace and update values.
if (!workspace.upgrade) { if (!workspace.upgrade) {
// This is previous workspace, intended to be closed. // This is previous workspace, intended to be closed.
@ -238,10 +245,10 @@ class TSessionManager implements SessionManager {
} }
broadcastAll (workspace: Workspace, tx: Tx[], targets?: string[]): void { broadcastAll (workspace: Workspace, tx: Tx[], targets?: string[]): void {
if (workspace?.upgrade ?? false) { if (workspace.upgrade) {
return return
} }
const ctx = this.ctx.newChild('broadcast-all', {}) const ctx = this.ctx.newChild('📬 broadcast-all', {})
const sessions = [...workspace.sessions.values()] const sessions = [...workspace.sessions.values()]
function send (): void { function send (): void {
for (const session of sessions.splice(0, 1)) { for (const session of sessions.splice(0, 1)) {
@ -266,9 +273,11 @@ class TSessionManager implements SessionManager {
private createWorkspace (ctx: MeasureContext, pipelineFactory: PipelineFactory, token: Token): Workspace { private createWorkspace (ctx: MeasureContext, pipelineFactory: PipelineFactory, token: Token): Workspace {
const upgrade = token.extra?.model === 'upgrade' const upgrade = token.extra?.model === 'upgrade'
const context = ctx.newChild('🧲 ' + token.workspace.name, {})
const workspace: Workspace = { const workspace: Workspace = {
context,
id: generateId(), id: generateId(),
pipeline: pipelineFactory(ctx, token.workspace, upgrade, (tx, targets) => { pipeline: pipelineFactory(context, token.workspace, upgrade, (tx, targets) => {
this.broadcastAll(workspace, tx, targets) this.broadcastAll(workspace, tx, targets)
}), }),
sessions: new Map(), sessions: new Map(),
@ -309,13 +318,7 @@ class TSessionManager implements SessionManager {
} catch {} } catch {}
} }
async close ( async close (ws: ConnectionSocket, workspaceId: WorkspaceId, code: number, reason: string): Promise<void> {
ctx: MeasureContext,
ws: ConnectionSocket,
workspaceId: WorkspaceId,
code: number,
reason: string
): Promise<void> {
// if (LOGGING_ENABLED) console.log(workspaceId.name, `closing websocket, code: ${code}, reason: ${reason}`) // if (LOGGING_ENABLED) console.log(workspaceId.name, `closing websocket, code: ${code}, reason: ${reason}`)
const wsid = toWorkspaceString(workspaceId) const wsid = toWorkspaceString(workspaceId)
const workspace = this.workspaces.get(wsid) const workspace = this.workspaces.get(wsid)
@ -340,7 +343,7 @@ class TSessionManager implements SessionManager {
const user = sessionRef.session.getUser() const user = sessionRef.session.getUser()
const another = Array.from(workspace.sessions.values()).findIndex((p) => p.session.getUser() === user) const another = Array.from(workspace.sessions.values()).findIndex((p) => p.session.getUser() === user)
if (another === -1) { if (another === -1) {
await this.setStatus(ctx, sessionRef.session, false) await this.setStatus(workspace.context, sessionRef.session, false)
} }
if (!workspace.upgrade) { if (!workspace.upgrade) {
// Wait some time for new client to appear before closing workspace. // Wait some time for new client to appear before closing workspace.
@ -355,13 +358,7 @@ class TSessionManager implements SessionManager {
} }
} }
async closeAll ( async closeAll (wsId: string, workspace: Workspace, code: number, reason: 'upgrade' | 'shutdown'): Promise<void> {
ctx: MeasureContext,
wsId: string,
workspace: Workspace,
code: number,
reason: 'upgrade' | 'shutdown'
): Promise<void> {
if (LOGGING_ENABLED) console.timeLog(wsId, `closing workspace ${workspace.id}, code: ${code}, reason: ${reason}`) if (LOGGING_ENABLED) console.timeLog(wsId, `closing workspace ${workspace.id}, code: ${code}, reason: ${reason}`)
const sessions = Array.from(workspace.sessions) const sessions = Array.from(workspace.sessions)
@ -371,19 +368,10 @@ class TSessionManager implements SessionManager {
s.workspaceClosed = true s.workspaceClosed = true
if (reason === 'upgrade') { if (reason === 'upgrade') {
// Override message handler, to wait for upgrading response from clients. // Override message handler, to wait for upgrading response from clients.
await webSocket.send( await this.sendUpgrade(workspace.context, webSocket, s.binaryResponseMode)
ctx,
{
result: {
_class: core.class.TxModelUpgrade
}
},
s.binaryResponseMode,
false
)
} }
webSocket.close() webSocket.close()
await this.setStatus(ctx, s, false) await this.setStatus(workspace.context, s, false)
} }
if (LOGGING_ENABLED) console.timeLog(wsId, workspace.id, 'Clients disconnected. Closing Workspace...') if (LOGGING_ENABLED) console.timeLog(wsId, workspace.id, 'Clients disconnected. Closing Workspace...')
@ -403,12 +391,25 @@ class TSessionManager implements SessionManager {
console.timeEnd(wsId) console.timeEnd(wsId)
} }
private async sendUpgrade (ctx: MeasureContext, webSocket: ConnectionSocket, binary: boolean): Promise<void> {
await webSocket.send(
ctx,
{
result: {
_class: core.class.TxModelUpgrade
}
},
binary,
false
)
}
async closeWorkspaces (ctx: MeasureContext): Promise<void> { async closeWorkspaces (ctx: MeasureContext): Promise<void> {
if (this.checkInterval !== undefined) { if (this.checkInterval !== undefined) {
clearInterval(this.checkInterval) clearInterval(this.checkInterval)
} }
for (const w of this.workspaces) { for (const w of this.workspaces) {
await this.closeAll(ctx, w[0], w[1], 1, 'shutdown') await this.closeAll(w[0], w[1], 1, 'shutdown')
} }
} }
@ -432,6 +433,7 @@ class TSessionManager implements SessionManager {
if (this.workspaces.get(wsid)?.id === wsUID) { if (this.workspaces.get(wsid)?.id === wsUID) {
this.workspaces.delete(wsid) this.workspaces.delete(wsid)
} }
workspace.context.end()
if (LOGGING_ENABLED) { if (LOGGING_ENABLED) {
console.timeLog(workspaceId.name, 'Closed workspace', wsUID) console.timeLog(workspaceId.name, 'Closed workspace', wsUID)
} }
@ -459,7 +461,7 @@ class TSessionManager implements SessionManager {
if (LOGGING_ENABLED) console.log(workspaceId.name, `server broadcasting to ${workspace.sessions.size} clients...`) if (LOGGING_ENABLED) console.log(workspaceId.name, `server broadcasting to ${workspace.sessions.size} clients...`)
const sessions = [...workspace.sessions.values()] const sessions = [...workspace.sessions.values()]
const ctx = this.ctx.newChild('broadcast', {}) const ctx = this.ctx.newChild('📭 broadcast', {})
function send (): void { function send (): void {
for (const sessionRef of sessions.splice(0, 1)) { for (const sessionRef of sessions.splice(0, 1)) {
if (sessionRef.session.sessionId !== from?.sessionId) { if (sessionRef.session.sessionId !== from?.sessionId) {
@ -496,7 +498,7 @@ class TSessionManager implements SessionManager {
msg: any, msg: any,
workspace: string workspace: string
): Promise<void> { ): Promise<void> {
const userCtx = requestCtx.newChild('client', { workspace }) as SessionContext const userCtx = requestCtx.newChild('📞 client', {}) as SessionContext
userCtx.sessionId = service.sessionInstanceId ?? '' userCtx.sessionId = service.sessionInstanceId ?? ''
// Calculate total number of clients // Calculate total number of clients
@ -504,8 +506,8 @@ class TSessionManager implements SessionManager {
const st = Date.now() const st = Date.now()
try { try {
await userCtx.with('handleRequest', {}, async (ctx) => { await userCtx.with('🧭 handleRequest', {}, async (ctx) => {
const request = await ctx.with('read', {}, async () => readRequest(msg, false)) const request = await ctx.with('📥 read', {}, async () => readRequest(msg, false))
if (request.id === -1 && request.method === 'hello') { if (request.id === -1 && request.method === 'hello') {
const hello = request as HelloRequest const hello = request as HelloRequest
service.binaryResponseMode = hello.binary ?? false service.binaryResponseMode = hello.binary ?? false
@ -536,6 +538,10 @@ class TSessionManager implements SessionManager {
await ws.send(ctx, helloResponse, false, false) await ws.send(ctx, helloResponse, false, false)
return return
} }
if (request.method === 'measure' || request.method === 'measure-done') {
await this.handleMeasure<S>(service, request, ctx, ws)
return
}
service.requests.set(reqId, { service.requests.set(reqId, {
id: reqId, id: reqId,
params: request, params: request,
@ -545,10 +551,15 @@ class TSessionManager implements SessionManager {
ws.close() ws.close()
return return
} }
const f = (service as any)[request.method] const f = (service as any)[request.method]
try { try {
const params = [...request.params] const params = [...request.params]
const result = await ctx.with('call', {}, async (callTx) => f.apply(service, [callTx, ...params]))
const result =
service.measureCtx?.ctx !== undefined
? await f.apply(service, [service.measureCtx?.ctx, ...params])
: await ctx.with('🧨 process', {}, async (callTx) => f.apply(service, [callTx, ...params]))
const resp: Response<any> = { id: request.id, result } const resp: Response<any> = { id: request.id, result }
@ -575,6 +586,43 @@ class TSessionManager implements SessionManager {
service.requests.delete(reqId) service.requests.delete(reqId)
} }
} }
private async handleMeasure<S extends Session>(
service: S,
request: Request<any[]>,
ctx: MeasureContext,
ws: ConnectionSocket
): Promise<void> {
let serverTime = 0
if (request.method === 'measure') {
service.measureCtx = { ctx: ctx.newChild('📶 ' + request.params[0], {}), time: Date.now() }
} else {
if (service.measureCtx !== undefined) {
serverTime = Date.now() - service.measureCtx.time
service.measureCtx.ctx.end(serverTime)
}
}
try {
const resp: Response<any> = { id: request.id, result: request.method === 'measure' ? 'started' : serverTime }
await handleSend(
ctx,
ws,
resp,
this.sessions.size < 100 ? 10000 : 1001,
service.binaryResponseMode,
service.useCompression
)
} catch (err: any) {
if (LOGGING_ENABLED) console.error(err)
const resp: Response<any> = {
id: request.id,
error: unknownError(err),
result: JSON.parse(JSON.stringify(err?.stack))
}
await ws.send(ctx, resp, service.binaryResponseMode, service.useCompression)
}
}
} }
async function handleSend ( async function handleSend (

View File

@ -169,11 +169,11 @@ export function startHttpServer (
if (ws.readyState !== ws.OPEN) { if (ws.readyState !== ws.OPEN) {
return return
} }
const smsg = await ctx.with('serialize', {}, async () => serialize(msg, binary)) const smsg = await ctx.with('📦 serialize', {}, async () => serialize(msg, binary))
ctx.measure('send-data', smsg.length) ctx.measure('send-data', smsg.length)
await ctx.with('socket-send', {}, async (ctx) => { await ctx.with('📤 socket-send', {}, async (ctx) => {
await new Promise<void>((resolve, reject) => { await new Promise<void>((resolve, reject) => {
ws.send(smsg, { binary, compress: compression }, (err) => { ws.send(smsg, { binary, compress: compression }, (err) => {
if (err != null) { if (err != null) {
@ -191,6 +191,10 @@ export function startHttpServer (
buffer?.push(msg) buffer?.push(msg)
}) })
const session = await sessions.addSession(ctx, cs, token, pipelineFactory, productId, sessionId) const session = await sessions.addSession(ctx, cs, token, pipelineFactory, productId, sessionId)
if ('upgrade' in session) {
cs.close()
return
}
// eslint-disable-next-line @typescript-eslint/no-misused-promises // eslint-disable-next-line @typescript-eslint/no-misused-promises
ws.on('message', (msg: RawData) => { ws.on('message', (msg: RawData) => {
let buff: any | undefined let buff: any | undefined
@ -200,22 +204,21 @@ export function startHttpServer (
buff = Buffer.concat(msg).toString() buff = Buffer.concat(msg).toString()
} }
if (buff !== undefined) { if (buff !== undefined) {
void handleRequest(ctx, session, cs, buff, token.workspace.name) void handleRequest(session.context, session.session, cs, buff, token.workspace.name)
} }
}) })
// eslint-disable-next-line @typescript-eslint/no-misused-promises // eslint-disable-next-line @typescript-eslint/no-misused-promises
ws.on('close', (code: number, reason: Buffer) => { ws.on('close', (code: number, reason: Buffer) => {
if (session.workspaceClosed ?? false) { if (session.session.workspaceClosed ?? false) {
return return
} }
// remove session after 1seconds, give a time to reconnect. // remove session after 1seconds, give a time to reconnect.
// if (LOGGING_ENABLED) console.log(token.workspace.name, `client "${token.email}" closed ${code === 1000 ? 'normally' : 'abnormally'}`) void sessions.close(cs, token.workspace, code, reason.toString())
void sessions.close(ctx, cs, token.workspace, code, reason.toString())
}) })
const b = buffer const b = buffer
buffer = undefined buffer = undefined
for (const msg of b) { for (const msg of b) {
await handleRequest(ctx, session, cs, msg, token.workspace.name) await handleRequest(session.context, session.session, cs, msg, token.workspace.name)
} }
}) })
@ -226,7 +229,6 @@ export function startHttpServer (
try { try {
const payload = decodeToken(token ?? '') const payload = decodeToken(token ?? '')
const sessionId = url.searchParams.get('sessionId') const sessionId = url.searchParams.get('sessionId')
// if (LOGGING_ENABLED) console.log(payload.workspace.name, 'client connected with payload', payload, sessionId)
if (payload.workspace.productId !== productId) { if (payload.workspace.productId !== productId) {
throw new Error('Invalid workspace product') throw new Error('Invalid workspace product')

View File

@ -59,6 +59,8 @@ export interface Session {
total: StatisticsElement total: StatisticsElement
current: StatisticsElement current: StatisticsElement
mins5: StatisticsElement mins5: StatisticsElement
measureCtx?: { ctx: MeasureContext, time: number }
} }
/** /**
@ -107,6 +109,7 @@ export function disableLogging (): void {
* @public * @public
*/ */
export interface Workspace { export interface Workspace {
context: MeasureContext
id: string id: string
pipeline: Promise<Pipeline> pipeline: Promise<Pipeline>
sessions: Map<string, { session: Session, socket: ConnectionSocket }> sessions: Map<string, { session: Session, socket: ConnectionSocket }>
@ -130,25 +133,13 @@ export interface SessionManager {
pipelineFactory: PipelineFactory, pipelineFactory: PipelineFactory,
productId: string, productId: string,
sessionId?: string sessionId?: string
) => Promise<Session> ) => Promise<{ session: Session, context: MeasureContext } | { upgrade: true }>
broadcastAll: (workspace: Workspace, tx: Tx[], targets?: string[]) => void broadcastAll: (workspace: Workspace, tx: Tx[], targets?: string[]) => void
close: ( close: (ws: ConnectionSocket, workspaceId: WorkspaceId, code: number, reason: string) => Promise<void>
ctx: MeasureContext,
ws: ConnectionSocket,
workspaceId: WorkspaceId,
code: number,
reason: string
) => Promise<void>
closeAll: ( closeAll: (wsId: string, workspace: Workspace, code: number, reason: 'upgrade' | 'shutdown') => Promise<void>
ctx: MeasureContext,
wsId: string,
workspace: Workspace,
code: number,
reason: 'upgrade' | 'shutdown'
) => Promise<void>
closeWorkspaces: (ctx: MeasureContext) => Promise<void> closeWorkspaces: (ctx: MeasureContext) => Promise<void>