diff --git a/dev/tool/src/benchmark.ts b/dev/tool/src/benchmark.ts index 2afbc52709..32594fa8c8 100644 --- a/dev/tool/src/benchmark.ts +++ b/dev/tool/src/benchmark.ts @@ -92,7 +92,10 @@ export async function benchmark ( os.cpus().forEach(() => { /* Spawn a new thread running this source file */ - const worker = new Worker(__filename) + console.error('__filename', __filename) + const worker = new Worker(__filename, { + argv: ['benchmarkWorker'] + }) workers.push(worker) worker.on('message', (data: Msg) => { if (data === undefined) { @@ -319,89 +322,91 @@ function randNum (value = 2): number { return Math.round(Math.random() * value) % value } -if (!isMainThread) { - parentPort?.on('message', (msg: StartMessage) => { - console.log('starting worker', msg.workId) - void perform(msg) - }) -} - -async function perform (msg: StartMessage): Promise { - let connection: Client | undefined - try { - setMetadata(client.metadata.UseBinaryProtocol, msg.binary) - setMetadata(client.metadata.UseProtocolCompression, msg.compression) - console.log('connecting to', msg.workspaceId) - - connection = await connect(msg.transactorUrl, msg.workspaceId, undefined) - const opt = new TxOperations(connection, (core.account.System + '_benchmark') as Ref) - parentPort?.postMessage({ - type: 'operate', - workId: msg.workId +export function benchmarkWorker (): void { + if (!isMainThread) { + parentPort?.on('message', (msg: StartMessage) => { + console.log('starting worker', msg.workId) + void perform(msg) }) + } - const h = connection.getHierarchy() - const allClasses = await connection.getModel().findAll(core.class.Class, {}) - const classes = allClasses.filter((it) => it.kind === ClassifierKind.CLASS && h.findDomain(it._id) !== undefined) - while (msg.options.readRequests + msg.options.modelRequests > 0) { - if (msg.options.modelRequests > 0) { - await connection?.findAll(core.class.Tx, {}, { sort: { _id: -1 } }) - msg.options.modelRequests-- - } - let doc: Doc | undefined - if (msg.options.readRequests > 0) { - const cl = classes[randNum(classes.length - 1)] - if (cl !== undefined) { - const docs = await connection?.findAll( - cl._id, - {}, - { - sort: { _id: -1 }, - limit: msg.options.limit.min + randNum(msg.options.limit.rand) - } - ) - if (docs.length > 0) { - doc = docs[randNum(docs.length - 1)] - } - msg.options.readRequests-- + async function perform (msg: StartMessage): Promise { + let connection: Client | undefined + try { + setMetadata(client.metadata.UseBinaryProtocol, msg.binary) + setMetadata(client.metadata.UseProtocolCompression, msg.compression) + console.log('connecting to', msg.workspaceId) + + connection = await connect(msg.transactorUrl, msg.workspaceId, undefined) + const opt = new TxOperations(connection, (core.account.System + '_benchmark') as Ref) + parentPort?.postMessage({ + type: 'operate', + workId: msg.workId + }) + + const h = connection.getHierarchy() + const allClasses = await connection.getModel().findAll(core.class.Class, {}) + const classes = allClasses.filter((it) => it.kind === ClassifierKind.CLASS && h.findDomain(it._id) !== undefined) + while (msg.options.readRequests + msg.options.modelRequests > 0) { + if (msg.options.modelRequests > 0) { + await connection?.findAll(core.class.Tx, {}, { sort: { _id: -1 } }) + msg.options.modelRequests-- } - if (msg.options.write && doc !== undefined) { - const attrs = connection.getHierarchy().getAllAttributes(doc._class) - const upd: DocumentUpdate = {} - for (const [key, value] of attrs.entries()) { - if (value.type._class === core.class.TypeString || value.type._class === core.class.TypeBoolean) { - if ( - key !== '_id' && - key !== '_class' && - key !== 'space' && - key !== 'attachedTo' && - key !== 'attachedToClass' - ) { - const v = (doc as any)[key] - if (v != null) { - ;(upd as any)[key] = v + let doc: Doc | undefined + if (msg.options.readRequests > 0) { + const cl = classes[randNum(classes.length - 1)] + if (cl !== undefined) { + const docs = await connection?.findAll( + cl._id, + {}, + { + sort: { _id: -1 }, + limit: msg.options.limit.min + randNum(msg.options.limit.rand) + } + ) + if (docs.length > 0) { + doc = docs[randNum(docs.length - 1)] + } + msg.options.readRequests-- + } + if (msg.options.write && doc !== undefined) { + const attrs = connection.getHierarchy().getAllAttributes(doc._class) + const upd: DocumentUpdate = {} + for (const [key, value] of attrs.entries()) { + if (value.type._class === core.class.TypeString || value.type._class === core.class.TypeBoolean) { + if ( + key !== '_id' && + key !== '_class' && + key !== 'space' && + key !== 'attachedTo' && + key !== 'attachedToClass' + ) { + const v = (doc as any)[key] + if (v != null) { + ;(upd as any)[key] = v + } } } } - } - if (Object.keys(upd).length > 0) { - await opt.update(doc, upd) + if (Object.keys(upd).length > 0) { + await opt.update(doc, upd) + } } } + if (msg.options.sleep > 0) { + await new Promise((resolve) => setTimeout(resolve, randNum(msg.options.sleep))) + } } - if (msg.options.sleep > 0) { - await new Promise((resolve) => setTimeout(resolve, randNum(msg.options.sleep))) - } + // + // console.log(`${msg.idd} perform complete`) + } catch (err: any) { + console.error(msg.workspaceId, err) + } finally { + await connection?.close() } - // - // console.log(`${msg.idd} perform complete`) - } catch (err: any) { - console.error(msg.workspaceId, err) - } finally { - await connection?.close() + parentPort?.postMessage({ + type: 'complete', + workId: msg.workId + }) } - parentPort?.postMessage({ - type: 'complete', - workId: msg.workId - }) } diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index d0eaeb3c34..c121b5d124 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -66,7 +66,7 @@ import { getMongoClient, getWorkspaceDB } from '@hcengineering/mongo' import { openAIConfigDefaults } from '@hcengineering/openai' import { type StorageAdapter } from '@hcengineering/server-core' import { deepEqual } from 'fast-equals' -import { benchmark } from './benchmark' +import { benchmark, benchmarkWorker } from './benchmark' import { cleanArchivedSpaces, cleanRemovedTransactions, @@ -332,25 +332,36 @@ export function devTool ( .option('-p|--parallel ', 'Parallel upgrade', '0') .option('-l|--logs ', 'Default logs folder', './logs') .option('-r|--retry ', 'Number of apply retries', '0') + .option('-i|--ignore [ignore]', 'Ignore workspaces', '') .option( '-c|--console', 'Display all information into console(default will create logs folder with {workspace}.log files', false ) .option('-f|--force [force]', 'Force update', false) - .action(async (cmd: { parallel: string, logs: string, retry: string, force: boolean, console: boolean }) => { - const { mongodbUri, version, txes, migrateOperations } = prepareTools() - await withDatabase(mongodbUri, async (db, client) => { - const worker = new UpgradeWorker(db, client, version, txes, migrateOperations, productId) - await worker.upgradeAll(toolCtx, { - errorHandler: async (ws, err) => {}, - force: cmd.force, - console: cmd.console, - logs: cmd.logs, - parallel: parseInt(cmd.parallel ?? '1') + .action( + async (cmd: { + parallel: string + logs: string + retry: string + force: boolean + console: boolean + ignore: string + }) => { + const { mongodbUri, version, txes, migrateOperations } = prepareTools() + await withDatabase(mongodbUri, async (db, client) => { + const worker = new UpgradeWorker(db, client, version, txes, migrateOperations, productId) + await worker.upgradeAll(toolCtx, { + errorHandler: async (ws, err) => {}, + force: cmd.force, + console: cmd.console, + logs: cmd.logs, + parallel: parseInt(cmd.parallel ?? '1'), + ignore: cmd.ignore + }) }) - }) - }) + } + ) program .command('remove-unused-workspaces') @@ -826,6 +837,13 @@ export function devTool ( ) } ) + program + .command('benchmarkWorker') + .description('benchmarkWorker') + .action(async (cmd: any) => { + console.log(JSON.stringify(cmd)) + benchmarkWorker() + }) program .command('fix-skills ') diff --git a/packages/core/src/client.ts b/packages/core/src/client.ts index e18e8df1f1..3b1ce83dc6 100644 --- a/packages/core/src/client.ts +++ b/packages/core/src/client.ts @@ -522,6 +522,7 @@ function pluginFilterTx ( ): Tx[] { const stx = toIdMap(systemTx) const totalExcluded = new Set>() + let msg = '' for (const a of excludedPlugins) { for (const c of configs.values()) { if (a.pluginId === c.pluginId) { @@ -543,10 +544,11 @@ function pluginFilterTx ( totalExcluded.add(id as Ref) } } - console.log('exclude plugin', c.pluginId, c.transactions.length) + msg += ` ${c.pluginId}:${c.transactions.length}` } } } + console.log('exclude plugin', msg) systemTx = systemTx.filter((t) => !totalExcluded.has(t._id)) return systemTx } diff --git a/packages/core/src/measurements/context.ts b/packages/core/src/measurements/context.ts index edbd82384b..93414a7cae 100644 --- a/packages/core/src/measurements/context.ts +++ b/packages/core/src/measurements/context.ts @@ -18,7 +18,8 @@ export class MeasureMetricsContext implements MeasureContext { params: ParamsType, fullParams: FullParamsType = {}, metrics: Metrics = newMetrics(), - logger?: MeasureLogger + logger?: MeasureLogger, + readonly parent?: MeasureContext ) { this.name = name this.params = params @@ -43,13 +44,16 @@ export class MeasureMetricsContext implements MeasureContext { error: (msg, args) => { console.error(msg, ...Object.entries(args ?? {}).map((it) => `${it[0]}=${JSON.stringify(replacer(it[1]))}`)) }, + warn: (msg, args) => { + console.warn(msg, ...Object.entries(args ?? {}).map((it) => `${it[0]}=${JSON.stringify(replacer(it[1]))}`)) + }, close: async () => {}, logOperation: (operation, time, params) => {} } } measure (name: string, value: number): void { - const c = new MeasureMetricsContext('#' + name, {}, {}, childMetrics(this.metrics, ['#' + name]), this.logger) + const c = new MeasureMetricsContext('#' + name, {}, {}, childMetrics(this.metrics, ['#' + name]), this.logger, this) c.done(value) } @@ -59,7 +63,8 @@ export class MeasureMetricsContext implements MeasureContext { params, fullParams ?? {}, childMetrics(this.metrics, [name]), - logger ?? this.logger + logger ?? this.logger, + this ) } @@ -78,7 +83,7 @@ export class MeasureMetricsContext implements MeasureContext { c.end() return value } catch (err: any) { - await c.error('Error during:' + name, { err }) + c.error('Error during:' + name, { err }) throw err } } @@ -95,15 +100,37 @@ export class MeasureMetricsContext implements MeasureContext { return r } - async error (message: string, args?: Record): Promise { + error (message: string, args?: Record): void { this.logger.error(message, { ...this.params, ...args }) } - async info (message: string, args?: Record): Promise { + info (message: string, args?: Record): void { this.logger.info(message, { ...this.params, ...args }) } + warn (message: string, args?: Record): void { + this.logger.warn(message, { ...this.params, ...args }) + } + end (): void { this.done() } } + +/** + * Allow to use decorator for context enabled functions + */ +export function withContext (name: string, params: ParamsType = {}): any { + return (target: any, propertyKey: string, descriptor: PropertyDescriptor): PropertyDescriptor => { + const originalMethod = descriptor.value + descriptor.value = async function (...args: any[]): Promise { + const ctx = args[0] as MeasureContext + return await ctx.with( + name, + params, + async (ctx) => await (originalMethod.apply(this, [ctx, ...args.slice(1)]) as Promise) + ) + } + return descriptor + } +} diff --git a/packages/core/src/measurements/types.ts b/packages/core/src/measurements/types.ts index 29757705eb..e07fa2d1d2 100644 --- a/packages/core/src/measurements/types.ts +++ b/packages/core/src/measurements/types.ts @@ -40,6 +40,8 @@ export interface MeasureLogger { info: (message: string, obj?: Record) => void error: (message: string, obj?: Record) => void + warn: (message: string, obj?: Record) => void + logOperation: (operation: string, time: number, params: ParamsType) => void childLogger?: (name: string, params: Record) => MeasureLogger @@ -69,11 +71,14 @@ export interface MeasureContext { logger: MeasureLogger + parent?: MeasureContext + measure: (name: string, value: number) => void // Capture error - error: (message: string, obj?: Record) => Promise - info: (message: string, obj?: Record) => Promise + error: (message: string, obj?: Record) => void + info: (message: string, obj?: Record) => void + warn: (message: string, obj?: Record) => void // Mark current context as complete // If no value is passed, time difference will be used. diff --git a/packages/core/src/memdb.ts b/packages/core/src/memdb.ts index 6fb907bfdc..bf6d184b00 100644 --- a/packages/core/src/memdb.ts +++ b/packages/core/src/memdb.ts @@ -299,7 +299,7 @@ export class ModelDb extends MemDb { if (doc !== undefined) { TxProcessor.updateDoc2Doc(doc, cud) } else { - void ctx.error('no document found, failed to apply model transaction, skipping', { + ctx.error('no document found, failed to apply model transaction, skipping', { _id: tx._id, _class: tx._class, objectId: cud.objectId @@ -311,7 +311,7 @@ export class ModelDb extends MemDb { try { this.delDoc((tx as TxRemoveDoc).objectId) } catch (err: any) { - void ctx.error('no document found, failed to apply model transaction, skipping', { + ctx.error('no document found, failed to apply model transaction, skipping', { _id: tx._id, _class: tx._class, objectId: (tx as TxRemoveDoc).objectId @@ -324,7 +324,7 @@ export class ModelDb extends MemDb { if (obj !== undefined) { TxProcessor.updateMixin4Doc(obj, mix) } else { - void ctx.error('no document found, failed to apply model transaction, skipping', { + ctx.error('no document found, failed to apply model transaction, skipping', { _id: tx._id, _class: tx._class, objectId: mix.objectId diff --git a/plugins/client-resources/src/connection.ts b/plugins/client-resources/src/connection.ts index 5cb4e2cc66..beaa68a92f 100644 --- a/plugins/client-resources/src/connection.ts +++ b/plugins/client-resources/src/connection.ts @@ -50,9 +50,11 @@ import { HelloRequest, HelloResponse, ReqId, readResponse, serialize } from '@hc const SECOND = 1000 const pingTimeout = 10 * SECOND const hangTimeout = 5 * 60 * SECOND -const dialTimeout = 60 * SECOND +const dialTimeout = 30 * SECOND class RequestPromise { + startTime: number = Date.now() + handleTime?: (diff: number, result: any, serverTime: number, queue: number) => void readonly promise: Promise resolve!: (value?: any) => void reject!: (reason?: any) => void @@ -72,7 +74,7 @@ class RequestPromise { } class Connection implements ClientConnection { - private websocket: ClientSocket | Promise | null = null + private websocket: ClientSocket | null = null private readonly requests = new Map() private lastId = 0 private interval: number | undefined @@ -93,42 +95,43 @@ class Connection implements ClientConnection { readonly onConnect?: (event: ClientConnectEvent, data?: any) => Promise ) {} - private schedulePing (): void { + private schedulePing (socketId: number): void { clearInterval(this.interval) this.pingResponse = Date.now() - this.interval = setInterval(() => { + const wsocket = this.websocket + const interval = setInterval(() => { + if (wsocket !== this.websocket) { + clearInterval(interval) + return + } if (!this.upgrading && this.pingResponse !== 0 && Date.now() - this.pingResponse > hangTimeout) { // No ping response from server. - const s = this.websocket - if (!(s instanceof Promise)) { - console.log( - 'no ping response from server. Closing socket.', - this.workspace, - this.email, - s, - (s as any)?.readyState - ) - // Trying to close connection and re-establish it. - s?.close(1000) - } else { - console.log('no ping response from server. Closing socket.', this.workspace, this.email, s) - void s.then((s) => { - s.close(1000) - }) + if (this.websocket !== null) { + console.log('no ping response from server. Closing socket.', socketId, this.workspace, this.email) + clearInterval(this.interval) + this.websocket.close(1000) + return } - this.websocket = null } if (!this.closed) { // eslint-disable-next-line @typescript-eslint/no-floating-promises - void this.sendRequest({ method: 'ping', params: [] }).then((result) => { - this.pingResponse = Date.now() + void this.sendRequest({ + method: 'ping', + params: [], + once: true, + handleResult: async (result) => { + if (this.websocket === wsocket) { + this.pingResponse = Date.now() + } + } }) } else { clearInterval(this.interval) } }, pingTimeout) + this.interval = interval } async close (): Promise { @@ -147,257 +150,284 @@ class Connection implements ClientConnection { } delay = 0 - pending: Promise | undefined + onConnectHandlers: (() => void)[] = [] - private async waitOpenConnection (): Promise { - while (true) { - try { - const socket = await this.pending - if (socket != null && socket.readyState === ClientSocketReadyState.OPEN) { - return socket - } - this.pending = this.openConnection() - await this.pending - this.delay = 0 - return await this.pending - } catch (err: any) { - if (this.closed) { - throw new Error('connection closed + ' + this.workspace + ' user: ' + this.email) - } - this.pending = undefined - if (!this.upgrading) { - console.log('connection: failed to connect', `requests: ${this.lastId}`, this.workspace, this.email) - } else { - console.log('connection: workspace during upgrade', `requests: ${this.lastId}`, this.workspace, this.email) - } - if (err?.code === UNAUTHORIZED.code) { - Analytics.handleError(err) - this.onUnauthorized?.() - throw err - } - await new Promise((resolve) => { - setTimeout(() => { - if (!this.upgrading) { - console.log(`delay ${this.delay} second`, this.workspace, this.email) - } - resolve(null) - if (this.delay < 5) { - this.delay++ - } - }, this.delay * SECOND) - }) - } + private waitOpenConnection (): Promise | undefined { + if (this.websocket != null && this.websocket.readyState === ClientSocketReadyState.OPEN) { + return undefined } + + return new Promise((resolve) => { + this.onConnectHandlers.push(() => { + resolve() + }) + // Websocket is null for first time + this.scheduleOpen(false) + }) } sockets = 0 incomingTimer: any - private openConnection (): Promise { - return new Promise((resolve, reject) => { - // Use defined factory or browser default one. - const clientSocketFactory = - getMetadata(client.metadata.ClientSocketFactory) ?? - ((url: string) => { - const s = new WebSocket(url) - s.binaryType = 'arraybuffer' - return s as ClientSocket - }) + openAction: any - if (this.sessionId === undefined) { - // Find local session id in session storage. - this.sessionId = - typeof sessionStorage !== 'undefined' - ? sessionStorage.getItem('session.id.' + this.url) ?? undefined - : undefined - this.sessionId = this.sessionId ?? generateId() - if (typeof sessionStorage !== 'undefined') { - sessionStorage.setItem('session.id.' + this.url, this.sessionId) - } + scheduleOpen (force: boolean): void { + if (force) { + if (this.websocket !== null) { + this.websocket.close() + this.websocket = null } - const websocket = clientSocketFactory(this.url + `?sessionId=${this.sessionId}`) - const opened = false - const socketId = this.sockets++ - let binaryResponse = false - - const dialTimer = setTimeout(() => { - if (!opened) { - websocket.close() - reject(new PlatformError(unknownError('timeout'))) - } - }, dialTimeout) - - websocket.onmessage = (event: MessageEvent) => { - const resp = readResponse(event.data, binaryResponse) - if (resp.id === -1) { - if (resp.result?.state === 'upgrading') { - void this.onConnect?.(ClientConnectEvent.Maintenance, resp.result.stats) - this.upgrading = true - return - } - if (resp.result === 'hello') { - if (this.upgrading) { - // We need to call upgrade since connection is upgraded - this.onUpgrade?.() - } - - console.log('connection established', this.workspace, this.email) - - this.upgrading = false - if ((resp as HelloResponse).alreadyConnected === true) { - this.sessionId = generateId() - if (typeof sessionStorage !== 'undefined') { - sessionStorage.setItem('session.id.' + this.url, this.sessionId) - } - reject(new Error('alreadyConnected')) - } - if ((resp as HelloResponse).binary) { - binaryResponse = true - } - if (resp.error !== undefined) { - reject(resp.error) - return - } - for (const [, v] of this.requests.entries()) { - v.reconnect?.() - } - resolve(websocket) - - void this.onConnect?.( - (resp as HelloResponse).reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected - ) - return - } else { - Analytics.handleError(new Error(`unexpected response: ${JSON.stringify(resp)}`)) - } - return - } - if (resp.result === 'ping') { - void this.sendRequest({ method: 'ping', params: [] }) - return - } - if (resp.id !== undefined) { - const promise = this.requests.get(resp.id) - if (promise === undefined) { - throw new Error(`unknown response id: ${resp.id as string} ${this.workspace} ${this.email}`) - } - - if (resp.chunk !== undefined) { - promise.chunks = [ - ...(promise.chunks ?? []), - { - index: resp.chunk.index, - data: resp.result as [] - } - ] - // console.log(socketId, 'chunk', promise.chunks.length, resp.chunk.total) - if (resp.chunk.final) { - promise.chunks.sort((a, b) => a.index - b.index) - let result: any[] = [] - for (const c of promise.chunks) { - result = result.concat(c.data) - } - resp.result = result - resp.chunk = undefined - } else { - // Not all chunks are available yet. - return - } - } - - const request = this.requests.get(resp.id) - this.requests.delete(resp.id) - if (resp.error !== undefined) { - console.log( - 'ERROR', - 'request:', - request?.method, - 'response-id:', - resp.id, - 'error: ', - resp.error, - 'result: ', - resp.result, - this.workspace, - this.email - ) - promise.reject(new PlatformError(resp.error)) - } else { - if (request?.handleResult !== undefined) { - void request.handleResult(resp.result).then(() => { - promise.resolve(resp.result) - }) - } else { - promise.resolve(resp.result) - } - } - void broadcastEvent(client.event.NetworkRequests, this.requests.size) + clearTimeout(this.openAction) + this.openAction = undefined + } + if (!this.closed && this.openAction === undefined) { + if (this.websocket === null) { + const socketId = ++this.sockets + // Re create socket in case of error, if not closed + if (this.delay === 0) { + this.openConnection(socketId) } else { - const txArr = Array.isArray(resp.result) ? (resp.result as Tx[]) : [resp.result as Tx] + this.openAction = setTimeout(() => { + this.openAction = undefined + this.openConnection(socketId) + }, this.delay * 1000) + } + } + } + } - for (const tx of txArr) { - if ( - (tx?._class === core.class.TxWorkspaceEvent && - (tx as TxWorkspaceEvent).event === WorkspaceEvent.Upgrade) || - tx?._class === core.class.TxModelUpgrade - ) { - console.log('Processing upgrade', this.workspace, this.email) - websocket.send( - serialize( - { - method: '#upgrading', - params: [], - id: -1 - }, - false - ) - ) - this.onUpgrade?.() - return - } + private openConnection (socketId: number): void { + // Use defined factory or browser default one. + const clientSocketFactory = + getMetadata(client.metadata.ClientSocketFactory) ?? + ((url: string) => { + const s = new WebSocket(url) + s.binaryType = 'arraybuffer' + return s as ClientSocket + }) + + if (this.sessionId === undefined) { + // Find local session id in session storage. + this.sessionId = + typeof sessionStorage !== 'undefined' + ? sessionStorage.getItem('session.id.' + this.url) ?? undefined + : undefined + this.sessionId = this.sessionId ?? generateId() + if (typeof sessionStorage !== 'undefined') { + sessionStorage.setItem('session.id.' + this.url, this.sessionId) + } + } + if (socketId !== this.sockets) { + return + } + const wsocket = clientSocketFactory(this.url + `?sessionId=${this.sessionId}`) + + if (socketId !== this.sockets) { + wsocket.close() + return + } + this.websocket = wsocket + const opened = false + let binaryResponse = false + + const dialTimer = setTimeout(() => { + if (!opened && !this.closed) { + this.scheduleOpen(true) + } + }, dialTimeout) + + wsocket.onmessage = (event: MessageEvent) => { + if (this.websocket !== wsocket) { + return + } + const resp = readResponse(event.data, binaryResponse) + + if (resp.error !== undefined) { + if (resp.error?.code === UNAUTHORIZED.code) { + Analytics.handleError(new PlatformError(resp.error)) + this.closed = true + this.websocket.close() + this.onUnauthorized?.() + } + console.error(resp.error) + return + } + + if (resp.id === -1) { + this.delay = 0 + if (resp.result?.state === 'upgrading') { + void this.onConnect?.(ClientConnectEvent.Maintenance, resp.result.stats) + this.upgrading = true + this.delay = 3 + return + } + if (resp.result === 'hello') { + if (this.upgrading) { + // We need to call upgrade since connection is upgraded + this.onUpgrade?.() } - this.handler(...txArr) - clearTimeout(this.incomingTimer) - void broadcastEvent(client.event.NetworkRequests, this.requests.size + 1) + this.upgrading = false + if ((resp as HelloResponse).alreadyConnected === true) { + this.sessionId = generateId() + if (typeof sessionStorage !== 'undefined') { + sessionStorage.setItem('session.id.' + this.url, this.sessionId) + } + console.log('Connection: alreadyConnected, reconnect with new Id') + clearTimeout(dialTimeout) + this.scheduleOpen(true) + return + } + if ((resp as HelloResponse).binary) { + binaryResponse = true + } + // Notify all waiting connection listeners + const handlers = this.onConnectHandlers.splice(0, this.onConnectHandlers.length) + for (const h of handlers) { + h() + } - this.incomingTimer = setTimeout(() => { - void broadcastEvent(client.event.NetworkRequests, this.requests.size) - }, 500) + for (const [, v] of this.requests.entries()) { + v.reconnect?.() + } + + void this.onConnect?.( + (resp as HelloResponse).reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected + ) + this.schedulePing(socketId) + return + } else { + Analytics.handleError(new Error(`unexpected response: ${JSON.stringify(resp)}`)) } + return } - websocket.onclose = (ev) => { - // console.log('client websocket closed', socketId, ev?.reason) - - if (!(this.websocket instanceof Promise)) { - this.websocket = null + if (resp.result === 'ping') { + void this.sendRequest({ method: 'ping', params: [] }) + return + } + if (resp.id !== undefined) { + const promise = this.requests.get(resp.id) + if (promise === undefined) { + throw new Error(`unknown response id: ${resp.id as string} ${this.workspace} ${this.email}`) } - void broadcastEvent(client.event.NetworkRequests, -1) - reject(new Error('websocket error')) + + if (resp.chunk !== undefined) { + promise.chunks = [ + ...(promise.chunks ?? []), + { + index: resp.chunk.index, + data: resp.result as [] + } + ] + // console.log(socketId, 'chunk', promise.chunks.length, resp.chunk.total) + if (resp.chunk.final) { + promise.chunks.sort((a, b) => a.index - b.index) + let result: any[] = [] + for (const c of promise.chunks) { + result = result.concat(c.data) + } + resp.result = result + resp.chunk = undefined + } else { + // Not all chunks are available yet. + return + } + } + + const request = this.requests.get(resp.id) + promise.handleTime?.(Date.now() - promise.startTime, resp.result, resp.time ?? 0, resp.queue ?? 0) + this.requests.delete(resp.id) + if (resp.error !== undefined) { + console.log( + 'ERROR', + 'request:', + request?.method, + 'response-id:', + resp.id, + 'error: ', + resp.error, + 'result: ', + resp.result, + this.workspace, + this.email + ) + promise.reject(new PlatformError(resp.error)) + } else { + if (request?.handleResult !== undefined) { + void request.handleResult(resp.result).then(() => { + promise.resolve(resp.result) + }) + } else { + promise.resolve(resp.result) + } + } + void broadcastEvent(client.event.NetworkRequests, this.requests.size) + } else { + const txArr = Array.isArray(resp.result) ? (resp.result as Tx[]) : [resp.result as Tx] + + for (const tx of txArr) { + if ( + (tx?._class === core.class.TxWorkspaceEvent && (tx as TxWorkspaceEvent).event === WorkspaceEvent.Upgrade) || + tx?._class === core.class.TxModelUpgrade + ) { + console.log('Processing upgrade', this.workspace, this.email) + this.onUpgrade?.() + return + } + } + this.handler(...txArr) + + clearTimeout(this.incomingTimer) + void broadcastEvent(client.event.NetworkRequests, this.requests.size + 1) + + this.incomingTimer = setTimeout(() => { + void broadcastEvent(client.event.NetworkRequests, this.requests.size) + }, 500) } - websocket.onopen = () => { - const useBinary = getMetadata(client.metadata.UseBinaryProtocol) ?? true - const useCompression = getMetadata(client.metadata.UseProtocolCompression) ?? false + } + wsocket.onclose = (ev) => { + clearTimeout(dialTimer) + if (this.websocket !== wsocket) { + wsocket.close() clearTimeout(dialTimer) - const helloRequest: HelloRequest = { - method: 'hello', - params: [], - id: -1, - binary: useBinary, - compression: useCompression, - broadcast: true - } - websocket.send(serialize(helloRequest, false)) - // Ok we connected, let's schedule ping - this.schedulePing() + return } - websocket.onerror = (event: any) => { - console.error('client websocket error:', socketId, event, this.workspace, this.email) - void broadcastEvent(client.event.NetworkRequests, -1) - reject(new Error(`websocket error:${socketId}`)) + // console.log('client websocket closed', socketId, ev?.reason) + void broadcastEvent(client.event.NetworkRequests, -1) + this.scheduleOpen(true) + } + wsocket.onopen = () => { + if (this.websocket !== wsocket) { + return } - }) + const useBinary = getMetadata(client.metadata.UseBinaryProtocol) ?? true + const useCompression = getMetadata(client.metadata.UseProtocolCompression) ?? false + clearTimeout(dialTimer) + const helloRequest: HelloRequest = { + method: 'hello', + params: [], + id: -1, + binary: useBinary, + compression: useCompression, + broadcast: true + } + this.websocket?.send(serialize(helloRequest, false)) + } + + wsocket.onerror = (event: any) => { + clearTimeout(dialTimer) + if (this.websocket !== wsocket) { + return + } + if (this.delay < 3) { + this.delay += 1 + } + if (opened) { + console.error('client websocket error:', socketId, this.url, this.workspace, this.email) + } + void broadcastEvent(client.event.NetworkRequests, -1) + } } private async sendRequest (data: { @@ -406,43 +436,56 @@ class Connection implements ClientConnection { // If not defined, on reconnect with timeout, will retry automatically. retry?: () => Promise handleResult?: (result: any) => Promise + once?: boolean // Require handleResult to retrieve result + measure?: (time: number, result: any, serverTime: number, queue: number) => void }): Promise { if (this.closed) { throw new PlatformError(unknownError('connection closed')) } + if (data.once === true) { + // Check if has same request already then skip + for (const [, v] of this.requests) { + if (v.method === data.method && JSON.stringify(v.params) === JSON.stringify(data.params)) { + // We have same unanswered, do not add one more. + return + } + } + } + const id = this.lastId++ const promise = new RequestPromise(data.method, data.params, data.handleResult) + promise.handleTime = data.measure - const sendData = async (): Promise => { - if (this.websocket instanceof Promise) { - this.websocket = await this.websocket - } - if (this.websocket === null) { - this.websocket = this.waitOpenConnection() - this.websocket = await this.websocket - } - this.requests.set(id, promise) - this.websocket.send( - serialize( - { - method: data.method, - params: data.params, - id - }, - false + const w = this.waitOpenConnection() + if (w instanceof Promise) { + await w + } + this.requests.set(id, promise) + const sendData = (): void => { + if (this.websocket?.readyState === ClientSocketReadyState.OPEN) { + promise.startTime = Date.now() + this.websocket?.send( + serialize( + { + method: data.method, + params: data.params, + id + }, + false + ) ) - ) + } } promise.reconnect = () => { setTimeout(async () => { // In case we don't have response yet. if (this.requests.has(id) && ((await data.retry?.()) ?? true)) { - await sendData() + sendData() } }, 500) } - await sendData() + sendData() void broadcastEvent(client.event.NetworkRequests, this.requests.size) return await promise.promise } @@ -480,11 +523,15 @@ class Connection implements ClientConnection { query: DocumentQuery, options?: FindOptions ): Promise> { - const st = Date.now() - const result = await this.sendRequest({ method: 'findAll', params: [_class, query, options] }) - if (Date.now() - st > 1000) { - console.error('measure slow findAll', Date.now() - st, _class, query, options, result) - } + const result = await this.sendRequest({ + method: 'findAll', + params: [_class, query, options], + measure: (time, result, serverTime, queue) => { + if (typeof window !== 'undefined' && time > 1000) { + console.error('measure slow findAll', time, serverTime, queue, _class, query, options, result) + } + } + }) return result } diff --git a/plugins/client/src/index.ts b/plugins/client/src/index.ts index 64dbbacd7d..011a89e96a 100644 --- a/plugins/client/src/index.ts +++ b/plugins/client/src/index.ts @@ -46,6 +46,8 @@ export interface ClientSocket { close: (code?: number) => void readyState: ClientSocketReadyState + + bufferedAmount?: number } /** diff --git a/plugins/login-resources/src/index.ts b/plugins/login-resources/src/index.ts index 6c2895cef6..de0e3ed8a9 100644 --- a/plugins/login-resources/src/index.ts +++ b/plugins/login-resources/src/index.ts @@ -24,7 +24,8 @@ import { selectWorkspace, sendInvite, getEnpoint, - fetchWorkspace + fetchWorkspace, + createMissingEmployee } from './utils' /*! * Anticrm Platform™ Login Plugin @@ -43,6 +44,7 @@ export default async () => ({ ChangePassword: changePassword, SelectWorkspace: selectWorkspace, FetchWorkspace: fetchWorkspace, + CreateEmployee: createMissingEmployee, GetWorkspaces: getWorkspaces, SendInvite: sendInvite, GetEndpoint: getEnpoint diff --git a/plugins/login-resources/src/utils.ts b/plugins/login-resources/src/utils.ts index 58cf69700a..07ff0537fe 100644 --- a/plugins/login-resources/src/utils.ts +++ b/plugins/login-resources/src/utils.ts @@ -396,6 +396,53 @@ export async function fetchWorkspace (workspace: string): Promise<[Status, Works return [unknownError(err), undefined] } } +export async function createMissingEmployee (workspace: string): Promise<[Status]> { + const accountsUrl = getMetadata(login.metadata.AccountsUrl) + + if (accountsUrl === undefined) { + throw new Error('accounts url not specified') + } + + const overrideToken = getMetadata(login.metadata.OverrideLoginToken) + if (overrideToken !== undefined) { + const endpoint = getMetadata(login.metadata.OverrideEndpoint) + if (endpoint !== undefined) { + return [OK] + } + } + + const token = getMetadata(presentation.metadata.Token) + if (token === undefined) { + return [unknownStatus('Please login')] + } + + const request = { + method: 'createMissingEmployee', + params: [token] + } + + try { + const response = await fetch(accountsUrl, { + method: 'POST', + headers: { + Authorization: 'Bearer ' + token, + 'Content-Type': 'application/json' + }, + body: JSON.stringify(request) + }) + const result = await response.json() + if (result.error == null) { + Analytics.handleEvent('Create missing employee') + Analytics.setTag('workspace', workspace) + } else { + await handleStatusError('Fetch workspace error', result.error) + } + return [result.error ?? OK] + } catch (err: any) { + Analytics.handleError(err) + return [unknownError(err)] + } +} export function setLoginInfo (loginInfo: WorkspaceLoginInfo): void { const tokens: Record = fetchMetadataLocalStorage(login.metadata.LoginTokens) ?? {} diff --git a/plugins/login/src/index.ts b/plugins/login/src/index.ts index 72c7dd8894..6055ddd73f 100644 --- a/plugins/login/src/index.ts +++ b/plugins/login/src/index.ts @@ -82,6 +82,7 @@ export default plugin(loginId, { ChangePassword: '' as Resource<(oldPassword: string, password: string) => Promise>, SelectWorkspace: '' as Resource<(workspace: string) => Promise<[Status, WorkspaceLoginInfo | undefined]>>, FetchWorkspace: '' as Resource<(workspace: string) => Promise<[Status, WorkspaceLoginInfo | undefined]>>, + CreateEmployee: '' as Resource<(workspace: string) => Promise<[Status]>>, GetWorkspaces: '' as Resource<() => Promise>, GetEndpoint: '' as Resource<() => Promise> } diff --git a/plugins/workbench-resources/src/connect.ts b/plugins/workbench-resources/src/connect.ts index 0103739fd4..90a5126321 100644 --- a/plugins/workbench-resources/src/connect.ts +++ b/plugins/workbench-resources/src/connect.ts @@ -7,6 +7,7 @@ import core, { metricsToString, setCurrentAccount, versionToString, + type Account, type AccountClient, type Client, type Version @@ -216,7 +217,10 @@ export async function connect (title: string): Promise { _client = newClient console.log('logging in as', email) - const me = await ctx.with('get-account', {}, async () => await newClient.getAccount()) + let me: Account | undefined = await ctx.with('get-account', {}, async () => await newClient.getAccount()) + if (me === undefined) { + me = await createEmployee(ctx, ws, me, newClient) + } if (me !== undefined) { Analytics.setUser(me.email) Analytics.setTag('workspace', ws) @@ -224,6 +228,7 @@ export async function connect (title: string): Promise { setCurrentAccount(me) } else { console.error('WARNING: no employee account found.') + clearMetadata(ws) navigate({ path: [loginId], @@ -303,6 +308,28 @@ export async function connect (title: string): Promise { return newClient } +async function createEmployee ( + ctx: MeasureMetricsContext, + ws: string, + me: Account, + newClient: AccountClient +): Promise { + const createEmployee = await getResource(login.function.CreateEmployee) + await ctx.with('create-missing-employee', {}, async () => { + await createEmployee(ws) + }) + for (let i = 0; i < 5; i++) { + me = await ctx.with('get-account', {}, async () => await newClient.getAccount()) + if (me !== undefined) { + break + } + await new Promise((resolve) => { + setTimeout(resolve, 100) + }) + } + return me +} + function clearMetadata (ws: string): void { const tokens = fetchMetadataLocalStorage(login.metadata.LoginTokens) if (tokens !== null) { diff --git a/pods/authProviders/src/github.ts b/pods/authProviders/src/github.ts index 8db90f6a4c..a456fb44e4 100644 --- a/pods/authProviders/src/github.ts +++ b/pods/authProviders/src/github.ts @@ -64,7 +64,7 @@ export function registerGithub ( // Successful authentication, redirect to your application ctx.redirect(concatLink(frontUrl, '/login/auth')) } catch (err: any) { - await measureCtx.error('failed to auth', err) + measureCtx.error('failed to auth', err) } } await next() diff --git a/pods/authProviders/src/google.ts b/pods/authProviders/src/google.ts index 297722122f..78d42276e9 100644 --- a/pods/authProviders/src/google.ts +++ b/pods/authProviders/src/google.ts @@ -62,7 +62,7 @@ export function registerGoogle ( // Successful authentication, redirect to your application ctx.redirect(concatLink(frontUrl, '/login/auth')) } catch (err: any) { - await measureCtx.error('failed to auth', err) + measureCtx.error('failed to auth', err) } } await next() diff --git a/server/account-service/src/index.ts b/server/account-service/src/index.ts index cff8e423f8..5a7f576516 100644 --- a/server/account-service/src/index.ts +++ b/server/account-service/src/index.ts @@ -97,7 +97,7 @@ export function serveAccount ( class MyStream { write (text: string): void { - void measureCtx.info(text) + measureCtx.info(text) } } @@ -181,12 +181,11 @@ export function serveAccount ( } process.on('uncaughtException', (e) => { - void measureCtx.error('uncaughtException', { error: e }) + measureCtx.error('uncaughtException', { error: e }) }) process.on('unhandledRejection', (reason, promise) => { - console.error('Unhandled Rejection at:', promise, 'reason:', reason) - void measureCtx.error('Unhandled Rejection at:', { reason, promise }) + measureCtx.error('Unhandled Rejection at:', { reason, promise }) }) process.on('SIGINT', close) process.on('SIGTERM', close) diff --git a/server/account/src/operations.ts b/server/account/src/operations.ts index 982855706b..d931aa2f80 100644 --- a/server/account/src/operations.ts +++ b/server/account/src/operations.ts @@ -251,7 +251,7 @@ async function getAccountInfoByToken ( email = decodeToken(token)?.email } catch (err: any) { Analytics.handleError(err) - await ctx.error('Invalid token', { token }) + ctx.error('Invalid token', { token }) throw new PlatformError(new Status(Severity.ERROR, platform.status.Unauthorized, {})) } const account = await getAccount(db, email) @@ -292,11 +292,11 @@ export async function login ( confirmed: info.confirmed ?? true, token: generateToken(email, getWorkspaceId('', productId), getExtra(info)) } - await ctx.info('login success', { email, productId }) + ctx.info('login success', { email, productId }) return result } catch (err: any) { Analytics.handleError(err) - await ctx.error('login failed', { email, productId, _email, err }) + ctx.error('login failed', { email, productId, _email, err }) throw err } } @@ -349,7 +349,7 @@ export async function selectWorkspace ( if (workspaceInfo !== null) { if (workspaceInfo.disabled === true && workspaceInfo.creating !== true) { - await ctx.error('workspace disabled', { workspaceUrl, email }) + ctx.error('workspace disabled', { workspaceUrl, email }) throw new PlatformError( new Status(Severity.ERROR, platform.status.WorkspaceNotFound, { workspace: workspaceUrl }) ) @@ -371,7 +371,7 @@ export async function selectWorkspace ( } } } - await ctx.error('workspace error', { workspaceUrl, email }) + ctx.error('workspace error', { workspaceUrl, email }) throw new PlatformError(new Status(Severity.ERROR, platform.status.Forbidden, {})) } @@ -387,17 +387,17 @@ export async function getInvite (db: Db, inviteId: ObjectId): Promise { if (invite === null || invite.limit === 0) { - void ctx.error('invite', { email, state: 'no invite or limit exceed' }) + ctx.error('invite', { email, state: 'no invite or limit exceed' }) Analytics.handleError(new Error(`no invite or invite limit exceed ${email}`)) throw new PlatformError(new Status(Severity.ERROR, platform.status.Forbidden, {})) } if (invite.exp < Date.now()) { - void ctx.error('invite', { email, state: 'link expired' }) + ctx.error('invite', { email, state: 'link expired' }) Analytics.handleError(new Error(`invite link expired ${invite._id.toString()} ${email}`)) throw new PlatformError(new Status(Severity.ERROR, platform.status.ExpiredLink, {})) } if (invite.emailMask != null && invite.emailMask.trim().length > 0 && !new RegExp(invite.emailMask).test(email)) { - void ctx.error('invite', { email, state: 'mask to match', mask: invite.emailMask }) + ctx.error('invite', { email, state: 'mask to match', mask: invite.emailMask }) Analytics.handleError(new Error(`invite link mask failed ${invite._id.toString()} ${email} ${invite.emailMask}`)) throw new PlatformError(new Status(Severity.ERROR, platform.status.Forbidden, {})) } @@ -426,7 +426,7 @@ export async function join ( const email = cleanEmail(_email) const invite = await getInvite(db, inviteId) const workspace = await checkInvite(ctx, invite, email) - await ctx.info(`join attempt:${email}, ${workspace.name}`) + ctx.info(`join attempt:${email}, ${workspace.name}`) const ws = await assignWorkspace(ctx, db, productId, email, workspace.name) const token = (await login(ctx, db, productId, email, password)).token @@ -462,7 +462,7 @@ export async function confirm (ctx: MeasureContext, db: Db, productId: string, t const decode = decodeToken(token) const _email = decode.extra?.confirm if (_email === undefined) { - await ctx.error('confirm email invalid', { token: decode }) + ctx.error('confirm email invalid', { token: decode }) throw new PlatformError(new Status(Severity.ERROR, platform.status.AccountNotFound, { account: _email })) } const email = cleanEmail(_email) @@ -473,7 +473,7 @@ export async function confirm (ctx: MeasureContext, db: Db, productId: string, t email, token: generateToken(email, getWorkspaceId('', productId), getExtra(account)) } - await ctx.info('confirm success', { email, productId }) + ctx.info('confirm success', { email, productId }) return result } @@ -576,7 +576,7 @@ export async function createAcc ( const systemEmails = [systemAccountEmail] if (systemEmails.includes(email)) { - await ctx.error('system email used for account', { email }) + ctx.error('system email used for account', { email }) throw new PlatformError(new Status(Severity.ERROR, platform.status.AccountAlreadyExists, { account: email })) } @@ -607,11 +607,11 @@ export async function createAcc ( if (sesURL !== undefined && sesURL !== '') { await sendConfirmation(productId, newAccount) } else { - await ctx.info('Please provide email service url to enable email confirmations.') + ctx.info('Please provide email service url to enable email confirmations.') await confirmEmail(db, email) } } - await ctx.info('account created', { account: email }) + ctx.info('account created', { account: email }) return newAccount } @@ -865,10 +865,10 @@ export async function createWorkspace ( const childLogger = ctx.newChild('createWorkspace', { workspace: workspaceInfo.workspace }) const ctxModellogger: ModelLogger = { log: (msg, data) => { - void childLogger.info(msg, data) + childLogger.info(msg, data) }, error: (msg, data) => { - void childLogger.error(msg, data) + childLogger.error(msg, data) } } let model: Tx[] = [] @@ -961,7 +961,7 @@ export async function upgradeWorkspace ( if (ws?.version !== undefined && !forceUpdate && versionStr === versionToString(ws.version)) { return versionStr } - await ctx.info('upgrading', { + ctx.info('upgrading', { force: forceUpdate, currentVersion: ws?.version !== undefined ? versionToString(ws.version) : '', toVersion: versionStr, @@ -995,7 +995,7 @@ export const createUserWorkspace = async (ctx: MeasureContext, db: Db, productId: string, token: string, workspaceName: string): Promise => { const { email } = decodeToken(token) - await ctx.info('Creating workspace', { workspaceName, email }) + ctx.info('Creating workspace', { workspaceName, email }) const info = await getAccount(db, email) @@ -1041,7 +1041,7 @@ export const createUserWorkspace = try { await assignWorkspace(ctx, db, productId, email, workspace.workspace, shouldUpdateAccount, client) await setRole(email, workspace.workspace, productId, AccountRole.Owner, client) - await ctx.info('Creating server side done', { workspaceName, email }) + ctx.info('Creating server side done', { workspaceName, email }) } catch (err: any) { Analytics.handleError(err) } finally { @@ -1051,7 +1051,7 @@ export const createUserWorkspace = ) if (err != null) { - await ctx.error('failed to create workspace', { err, workspaceName, email }) + ctx.error('failed to create workspace', { err, workspaceName, email }) // We need to drop workspace, to prevent wrong data usage. await db.collection(WORKSPACE_COLLECTION).updateOne( @@ -1083,7 +1083,7 @@ export const createUserWorkspace = productId, workspace: workspaceInfo.workspaceUrl } - await ctx.info('Creating user side done', { workspaceName, email }) + ctx.info('Creating user side done', { workspaceName, email }) return result } @@ -1102,12 +1102,12 @@ export async function getInviteLink ( const { workspace, email } = decodeToken(token) const wsPromise = await getWorkspaceById(db, productId, workspace.name) if (wsPromise === null) { - await ctx.error('workspace not found', { workspace, email }) + ctx.error('workspace not found', { workspace, email }) throw new PlatformError( new Status(Severity.ERROR, platform.status.WorkspaceNotFound, { workspace: workspace.name }) ) } - await ctx.info('Getting invite link', { workspace: workspace.name, emailMask, limit }) + ctx.info('Getting invite link', { workspace: workspace.name, emailMask, limit }) const result = await db.collection(INVITE_COLLECTION).insertOne({ workspace, exp: Date.now() + exp, @@ -1149,7 +1149,7 @@ export async function getUserWorkspaces ( const { email } = decodeToken(token) const account = await getAccount(db, email) if (account === null) { - await ctx.error('account not found', { email }) + ctx.error('account not found', { email }) return [] } return ( @@ -1272,6 +1272,26 @@ export async function setRole ( } } +/** + * @public + */ +export async function createMissingEmployee ( + ctx: MeasureContext, + db: Db, + productId: string, + token: string +): Promise { + const { email } = decodeToken(token) + const wsInfo = await getWorkspaceInfo(ctx, db, productId, token) + const account = await getAccount(db, email) + + if (account === null) { + throw new PlatformError(new Status(Severity.ERROR, platform.status.AccountNotFound, { account: email })) + } + + await createPersonAccount(account, productId, wsInfo.workspaceId, true) +} + /** * @public */ @@ -1289,7 +1309,7 @@ export async function assignWorkspace ( const initWS = getMetadata(toolPlugin.metadata.InitWorkspace) if (initWS !== undefined && initWS === workspaceId) { Analytics.handleError(new Error(`assign-workspace failed ${email} ${workspaceId}`)) - await ctx.error('assign-workspace failed', { email, workspaceId, reason: 'initWs === workspaceId' }) + ctx.error('assign-workspace failed', { email, workspaceId, reason: 'initWs === workspaceId' }) throw new PlatformError(new Status(Severity.ERROR, platform.status.Forbidden, {})) } const workspaceInfo = await getWorkspaceAndAccount(ctx, db, productId, email, workspaceId) @@ -1308,7 +1328,7 @@ export async function assignWorkspace ( // Add account into workspace. await assignWorkspaceRaw(db, workspaceInfo) - await ctx.info('assign-workspace success', { email, workspaceId }) + ctx.info('assign-workspace success', { email, workspaceId }) return workspaceInfo.workspace } @@ -1482,7 +1502,7 @@ export async function changePassword ( const hash = hashWithSalt(password, salt) await db.collection(ACCOUNT_COLLECTION).updateOne({ _id: account._id }, { $set: { salt, hash } }) - await ctx.info('change-password success', { email }) + ctx.info('change-password success', { email }) } /** @@ -1490,7 +1510,7 @@ export async function changePassword ( */ export async function changeEmail (ctx: MeasureContext, db: Db, account: Account, newEmail: string): Promise { await db.collection(ACCOUNT_COLLECTION).updateOne({ _id: account._id }, { $set: { email: newEmail } }) - await ctx.info('change-email success', { email: newEmail }) + ctx.info('change-email success', { email: newEmail }) } /** @@ -1516,7 +1536,7 @@ export async function requestPassword (ctx: MeasureContext, db: Db, productId: s const account = await getAccount(db, email) if (account === null) { - await ctx.info('account not found', { email }) + ctx.info('account not found', { email }) throw new PlatformError(new Status(Severity.ERROR, platform.status.AccountNotFound, { account: email })) } @@ -1556,7 +1576,7 @@ export async function requestPassword (ctx: MeasureContext, db: Db, productId: s to }) }) - await ctx.info('recovery email sent', { email, accountEmail: account.email }) + ctx.info('recovery email sent', { email, accountEmail: account.email }) } /** @@ -1609,7 +1629,7 @@ export async function removeWorkspace ( // Add account a workspace await db.collection(ACCOUNT_COLLECTION).updateOne({ _id: account._id }, { $pull: { workspaces: workspace._id } }) - await ctx.info('Workspace removed', { email, workspace }) + ctx.info('Workspace removed', { email, workspace }) } /** @@ -1627,7 +1647,7 @@ export async function checkJoin ( const workspace = await checkInvite(ctx, invite, email) const ws = await getWorkspaceById(db, productId, workspace.name) if (ws === null) { - await ctx.error('workspace not found', { name: workspace.name, email, inviteId }) + ctx.error('workspace not found', { name: workspace.name, email, inviteId }) throw new PlatformError( new Status(Severity.ERROR, platform.status.WorkspaceNotFound, { workspace: workspace.name }) ) @@ -1653,7 +1673,7 @@ export async function dropWorkspace ( .collection(ACCOUNT_COLLECTION) .updateMany({ _id: { $in: ws.accounts ?? [] } }, { $pull: { workspaces: ws._id } }) - await ctx.info('Workspace dropped', { workspace: ws.workspace }) + ctx.info('Workspace dropped', { workspace: ws.workspace }) } /** @@ -1680,7 +1700,7 @@ export async function dropAccount (ctx: MeasureContext, db: Db, productId: strin await db .collection(WORKSPACE_COLLECTION) .updateMany({ _id: { $in: account.workspaces } }, { $pull: { accounts: account._id } }) - await ctx.info('Account Dropped', { email, account }) + ctx.info('Account Dropped', { email, account }) } /** @@ -1718,7 +1738,7 @@ export async function leaveWorkspace ( .collection(ACCOUNT_COLLECTION) .updateOne({ _id: account._id }, { $pull: { workspaces: workspace._id } }) } - await ctx.info('Account removed from workspace', { email, workspace }) + ctx.info('Account removed from workspace', { email, workspace }) } /** @@ -1781,7 +1801,7 @@ export async function sendInvite ( to }) }) - await ctx.info('Invite sent', { email, workspace, link }) + ctx.info('Invite sent', { email, workspace, link }) } async function deactivatePersonAccount ( @@ -1803,7 +1823,7 @@ async function deactivatePersonAccount ( active: false }) } - await ctx.info('account deactivated', { email, workspace }) + ctx.info('account deactivated', { email, workspace }) } } finally { await connection.close() @@ -1835,9 +1855,9 @@ function wrap ( : new Status(Severity.ERROR, platform.status.InternalServerError, {}) if (status.code === platform.status.InternalServerError) { Analytics.handleError(err) - void ctx.error('error', { status, err }) + ctx.error('error', { status, err }) } else { - void ctx.error('error', { status }) + ctx.error('error', { status }) } return { error: status @@ -1970,7 +1990,8 @@ export function getMethods ( restorePassword: wrap(restorePassword), sendInvite: wrap(sendInvite), confirm: wrap(confirm), - getAccountInfoByToken: wrap(getAccountInfoByToken) + getAccountInfoByToken: wrap(getAccountInfoByToken), + createMissingEmployee: wrap(createMissingEmployee) // updateAccount: wrap(updateAccount) } } diff --git a/server/account/src/service.ts b/server/account/src/service.ts index c7d49ceb11..de69a265bc 100644 --- a/server/account/src/service.ts +++ b/server/account/src/service.ts @@ -28,6 +28,8 @@ export interface UpgradeOptions { console: boolean logs: string parallel: number + + ignore?: string } export class UpgradeWorker { @@ -61,17 +63,17 @@ export class UpgradeWorker { } private async _upgradeWorkspace (ctx: MeasureContext, ws: WorkspaceInfo, opt: UpgradeOptions): Promise { - if (ws.disabled === true) { + if (ws.disabled === true || (opt.ignore ?? '').includes(ws.workspace)) { return } const t = Date.now() const ctxModelLogger: ModelLogger = { log (msg: string, data: any): void { - void ctx.info(msg, data) + ctx.info(msg, data) }, error (msg: string, data: any): void { - void ctx.error(msg, data) + ctx.error(msg, data) } } @@ -79,7 +81,7 @@ export class UpgradeWorker { const avgTime = (Date.now() - this.st) / (this.total - this.toProcess + 1) this.eta = Math.floor(avgTime * this.toProcess) - await ctx.info('----------------------------------------------------------\n---UPGRADING----', { + ctx.info('----------------------------------------------------------\n---UPGRADING----', { pending: this.toProcess, eta: this.eta, workspace: ws.workspace @@ -97,7 +99,7 @@ export class UpgradeWorker { logger, opt.force ) - await ctx.info('---done---------', { + ctx.info('---done---------', { pending: this.toProcess, time: Date.now() - t, workspace: ws.workspace, @@ -109,10 +111,10 @@ export class UpgradeWorker { logger.log('error', err) if (!opt.console) { - await ctx.error('error', err) + ctx.error('error', err) } - await ctx.info('---failed---------', { + ctx.info('---failed---------', { pending: this.toProcess, time: Date.now() - t, workspace: ws.workspace @@ -149,7 +151,7 @@ export class UpgradeWorker { if (opt.parallel !== 0) { const parallel = opt.parallel const rateLimit = new RateLimiter(parallel) - await ctx.info('parallel upgrade', { parallel }) + ctx.info('parallel upgrade', { parallel }) await Promise.all( workspaces.map((it) => rateLimit.add(async () => { @@ -159,14 +161,14 @@ export class UpgradeWorker { }) ) ) - await ctx.info('Upgrade done') + ctx.info('Upgrade done') } else { - await ctx.info('UPGRADE write logs at:', { logs: opt.logs }) + ctx.info('UPGRADE write logs at:', { logs: opt.logs }) for (const ws of workspaces) { await this._upgradeWorkspace(ctx, ws, opt) } if (withError.length > 0) { - await ctx.info('Failed workspaces', withError) + ctx.info('Failed workspaces', withError) } } } diff --git a/server/backup-service/src/index.ts b/server/backup-service/src/index.ts index 0f56280947..0112a90b4d 100644 --- a/server/backup-service/src/index.ts +++ b/server/backup-service/src/index.ts @@ -50,9 +50,9 @@ export function startBackup (ctx: MeasureContext): void { process.on('SIGINT', shutdown) process.on('SIGTERM', shutdown) process.on('uncaughtException', (e) => { - void ctx.error('uncaughtException', { err: e }) + ctx.error('uncaughtException', { err: e }) }) process.on('unhandledRejection', (e) => { - void ctx.error('unhandledRejection', { err: e }) + ctx.error('unhandledRejection', { err: e }) }) } diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts index 2a9ba454bf..2de715cd9f 100644 --- a/server/backup/src/backup.ts +++ b/server/backup/src/backup.ts @@ -147,7 +147,7 @@ async function loadDigest ( result.delete(k as Ref) } } catch (err: any) { - await ctx.error('digest is broken, will do full backup for', { domain }) + ctx.error('digest is broken, will do full backup for', { domain }) } } // Stop if stop date is matched and provided @@ -392,14 +392,14 @@ export async function backup ( mode: 'backup' })) as unknown as CoreClient & BackupClient ) - await ctx.info('starting backup', { workspace: workspaceId.name }) + ctx.info('starting backup', { workspace: workspaceId.name }) let canceled = false let timer: any if (timeout > 0) { timer = setTimeout(() => { - void ctx.error('Timeout during backup', { workspace: workspaceId.name, timeout: timeout / 1000 }) + ctx.error('Timeout during backup', { workspace: workspaceId.name, timeout: timeout / 1000 }) canceled = true }, timeout) } @@ -411,7 +411,7 @@ export async function backup ( .domains() .filter((it) => it !== DOMAIN_TRANSIENT && it !== DOMAIN_MODEL && !skipDomains.includes(it)) ] - await ctx.info('domains for dump', { domains: domains.length }) + ctx.info('domains for dump', { domains: domains.length }) let backupInfo: BackupInfo = { workspace: workspaceId.name, @@ -440,7 +440,7 @@ export async function backup ( ) if (lastTx !== undefined) { if (lastTx._id === backupInfo.lastTxId && !force) { - await ctx.info('No transaction changes. Skipping backup.', { workspace: workspaceId.name }) + ctx.info('No transaction changes. Skipping backup.', { workspace: workspaceId.name }) return } } @@ -481,7 +481,7 @@ export async function backup ( for (const { id, hash, size } of currentChunk.docs) { processed++ if (Date.now() - st > 2500) { - await ctx.info('processed', { + ctx.info('processed', { processed, digest: digest.size, time: Date.now() - st, @@ -522,7 +522,7 @@ export async function backup ( } } catch (err: any) { console.error(err) - await ctx.error('failed to load chunks', { error: err }) + ctx.error('failed to load chunks', { error: err }) if (idx !== undefined) { await ctx.with('loadChunk', {}, async () => { await connection.closeChunk(idx as number) @@ -577,7 +577,7 @@ export async function backup ( ) if (needRetrieveChunks.length > 0) { - await ctx.info('dumping domain...', { workspace: workspaceId.name, domain }) + ctx.info('dumping domain...', { workspace: workspaceId.name, domain }) } while (needRetrieveChunks.length > 0) { @@ -586,7 +586,7 @@ export async function backup ( } const needRetrieve = needRetrieveChunks.shift() as Ref[] - await ctx.info('Retrieve chunk', { + ctx.info('Retrieve chunk', { needRetrieve: needRetrieveChunks.reduce((v, docs) => v + docs.length, 0), toLoad: needRetrieve.length, workspace: workspaceId.name @@ -595,7 +595,7 @@ export async function backup ( try { docs = await ctx.with('load-docs', {}, async (ctx) => await connection.loadDocs(domain, needRetrieve)) } catch (err: any) { - await ctx.error('error loading docs', { domain, err, workspace: workspaceId.name }) + ctx.error('error loading docs', { domain, err, workspace: workspaceId.name }) // Put back. needRetrieveChunks.push(needRetrieve) continue @@ -631,7 +631,7 @@ export async function backup ( _pack = pack() stIndex++ const storageFile = join(backupIndex, `${domain}-data-${snapshot.date}-${stIndex}.tar.gz`) - await ctx.info('storing from domain', { domain, storageFile, workspace: workspaceId.name }) + ctx.info('storing from domain', { domain, storageFile, workspace: workspaceId.name }) domainInfo.storage = [...(domainInfo.storage ?? []), storageFile] const dataStream = await storage.write(storageFile) const storageZip = createGzip({ level: defaultLevel }) @@ -718,9 +718,9 @@ export async function backup ( await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) } } catch (err: any) { - await ctx.error('backup error', { err, workspace: workspaceId.name }) + ctx.error('backup error', { err, workspace: workspaceId.name }) } finally { - await ctx.info('end backup', { workspace: workspaceId.name }) + ctx.info('end backup', { workspace: workspaceId.name }) await connection.close() ctx.end() if (timeout !== -1) { diff --git a/server/backup/src/service.ts b/server/backup/src/service.ts index a3ddd7b37d..08bd981474 100644 --- a/server/backup/src/service.ts +++ b/server/backup/src/service.ts @@ -77,7 +77,7 @@ class BackupWorker { return } index++ - await ctx.info('\n\nBACKUP WORKSPACE ', { + ctx.info('\n\nBACKUP WORKSPACE ', { workspace: ws.workspace, productId: ws.productId, index, @@ -102,7 +102,7 @@ class BackupWorker { ) }) } catch (err: any) { - await ctx.error('\n\nFAILED to BACKUP', { workspace: ws.workspace, err }) + ctx.error('\n\nFAILED to BACKUP', { workspace: ws.workspace, err }) } } } diff --git a/server/collaboration/src/utils/collaborative-doc.ts b/server/collaboration/src/utils/collaborative-doc.ts index 5df6404a25..debe69b8f8 100644 --- a/server/collaboration/src/utils/collaborative-doc.ts +++ b/server/collaboration/src/utils/collaborative-doc.ts @@ -84,7 +84,7 @@ export async function loadCollaborativeDoc ( for (const source of sources) { const { documentId, versionId } = collaborativeDocParse(source) - await ctx.info('loading collaborative document', { source }) + ctx.info('loading collaborative document', { source }) const ydoc = await loadCollaborativeDocVersion(ctx, storageAdapter, workspace, documentId, versionId) if (ydoc !== undefined) { diff --git a/server/collaborator/src/extensions/storage.ts b/server/collaborator/src/extensions/storage.ts index e04090c971..8c55169f11 100644 --- a/server/collaborator/src/extensions/storage.ts +++ b/server/collaborator/src/extensions/storage.ts @@ -49,7 +49,7 @@ export class StorageExtension implements Extension { } async onLoadDocument ({ context, documentName }: withContext): Promise { - await this.configuration.ctx.info('load document', { documentName }) + this.configuration.ctx.info('load document', { documentName }) return await this.configuration.ctx.with('load-document', {}, async () => { return await this.loadDocument(documentName as DocumentId, context) }) @@ -58,11 +58,11 @@ export class StorageExtension implements Extension { async onStoreDocument ({ context, documentName, document }: withContext): Promise { const { ctx } = this.configuration - await ctx.info('store document', { documentName }) + ctx.info('store document', { documentName }) const collaborators = this.collaborators.get(documentName) if (collaborators === undefined || collaborators.size === 0) { - await ctx.info('no changes for document', { documentName }) + ctx.info('no changes for document', { documentName }) return } @@ -75,7 +75,7 @@ export class StorageExtension implements Extension { async onConnect ({ context, documentName, instance }: withContext): Promise { const connections = instance.documents.get(documentName)?.getConnectionsCount() ?? 0 const params = { documentName, connectionId: context.connectionId, connections } - await this.configuration.ctx.info('connect to document', params) + this.configuration.ctx.info('connect to document', params) } async onDisconnect ({ context, documentName, document }: withContext): Promise { @@ -83,11 +83,11 @@ export class StorageExtension implements Extension { const { connectionId } = context const params = { documentName, connectionId, connections: document.getConnectionsCount() } - await ctx.info('disconnect from document', params) + ctx.info('disconnect from document', params) const collaborators = this.collaborators.get(documentName) if (collaborators === undefined || !collaborators.has(connectionId)) { - await ctx.info('no changes for document', { documentName }) + ctx.info('no changes for document', { documentName }) return } @@ -98,7 +98,7 @@ export class StorageExtension implements Extension { } async afterUnloadDocument ({ documentName }: afterUnloadDocumentPayload): Promise { - await this.configuration.ctx.info('unload document', { documentName }) + this.configuration.ctx.info('unload document', { documentName }) this.collaborators.delete(documentName) } @@ -110,7 +110,7 @@ export class StorageExtension implements Extension { return await adapter.loadDocument(ctx, documentId, context) }) } catch (err) { - await ctx.error('failed to load document content', { documentId, error: err }) + ctx.error('failed to load document content', { documentId, error: err }) return undefined } } @@ -123,7 +123,7 @@ export class StorageExtension implements Extension { await adapter.saveDocument(ctx, documentId, document, context) }) } catch (err) { - await ctx.error('failed to save document content', { documentId, error: err }) + ctx.error('failed to save document content', { documentId, error: err }) return undefined } } diff --git a/server/collaborator/src/rpc/methods/removeDocument.ts b/server/collaborator/src/rpc/methods/removeDocument.ts index f52e40d63a..b74acb9c07 100644 --- a/server/collaborator/src/rpc/methods/removeDocument.ts +++ b/server/collaborator/src/rpc/methods/removeDocument.ts @@ -46,7 +46,7 @@ export async function removeDocument ( try { await minio.remove(ctx, workspaceId, [minioDocumentId, historyDocumentId]) } catch (err) { - await ctx.error('failed to remove document', { documentId, error: err }) + ctx.error('failed to remove document', { documentId, error: err }) } return {} diff --git a/server/collaborator/src/server.ts b/server/collaborator/src/server.ts index d883a89e50..f2b73e42bb 100644 --- a/server/collaborator/src/server.ts +++ b/server/collaborator/src/server.ts @@ -51,7 +51,7 @@ export async function start ( ): Promise { const port = config.Port - await ctx.info('Starting collaborator server', { port }) + ctx.info('Starting collaborator server', { port }) const app = express() app.use(cors()) @@ -210,7 +210,7 @@ export async function start ( server.listen(port) - await ctx.info('Running collaborator server', { port }) + ctx.info('Running collaborator server', { port }) return async () => { server.close() diff --git a/server/collaborator/src/starter.ts b/server/collaborator/src/starter.ts index dfd5b20a95..f8310ec874 100644 --- a/server/collaborator/src/starter.ts +++ b/server/collaborator/src/starter.ts @@ -50,15 +50,15 @@ export async function startCollaborator (): Promise { void shutdown().then(() => { void mongoClient.close() }) - void metricsContext.info('closed') + metricsContext.info('closed') } process.on('uncaughtException', (e) => { - void metricsContext.error('UncaughtException', { error: e }) + metricsContext.error('UncaughtException', { error: e }) }) process.on('unhandledRejection', (reason, promise) => { - void metricsContext.error('Unhandled Rejection at:', { promise, reason }) + metricsContext.error('Unhandled Rejection at:', { promise, reason }) }) process.on('SIGINT', close) diff --git a/server/collaborator/src/storage/platform.ts b/server/collaborator/src/storage/platform.ts index d50641b400..41d0f0ff66 100644 --- a/server/collaborator/src/storage/platform.ts +++ b/server/collaborator/src/storage/platform.ts @@ -54,51 +54,51 @@ export class PlatformStorageAdapter implements CollabStorageAdapter { try { // try to load document content try { - await ctx.info('load document content', { documentId }) + ctx.info('load document content', { documentId }) const ydoc = await this.loadDocumentFromStorage(ctx, documentId, context) if (ydoc !== undefined) { return ydoc } } catch (err) { - await ctx.error('failed to load document content', { documentId, error: err }) + ctx.error('failed to load document content', { documentId, error: err }) } // then try to load from inital content const { initialContentId } = context if (initialContentId !== undefined && initialContentId.length > 0) { try { - await ctx.info('load document initial content', { documentId, initialContentId }) + ctx.info('load document initial content', { documentId, initialContentId }) const ydoc = await this.loadDocumentFromStorage(ctx, initialContentId, context) // if document was loaded from the initial content or storage we need to save // it to ensure the next time we load it from the ydoc document if (ydoc !== undefined) { - await ctx.info('save document content', { documentId, initialContentId }) + ctx.info('save document content', { documentId, initialContentId }) await this.saveDocumentToStorage(ctx, documentId, ydoc, context) return ydoc } } catch (err) { - await ctx.error('failed to load initial document content', { documentId, initialContentId, error: err }) + ctx.error('failed to load initial document content', { documentId, initialContentId, error: err }) } } // finally try to load from the platform const { platformDocumentId } = context if (platformDocumentId !== undefined) { - await ctx.info('load document platform content', { documentId, platformDocumentId }) + ctx.info('load document platform content', { documentId, platformDocumentId }) const ydoc = await ctx.with('load-document', { storage: 'platform' }, async (ctx) => { try { return await this.loadDocumentFromPlatform(ctx, platformDocumentId, context) } catch (err) { - await ctx.error('failed to load platform document', { documentId, platformDocumentId, error: err }) + ctx.error('failed to load platform document', { documentId, platformDocumentId, error: err }) } }) // if document was loaded from the initial content or storage we need to save // it to ensure the next time we load it from the ydoc document if (ydoc !== undefined) { - await ctx.info('save document content', { documentId, platformDocumentId }) + ctx.info('save document content', { documentId, platformDocumentId }) await this.saveDocumentToStorage(ctx, documentId, ydoc, context) return ydoc } @@ -107,29 +107,29 @@ export class PlatformStorageAdapter implements CollabStorageAdapter { // nothing found return undefined } catch (err) { - await ctx.error('failed to load document', { documentId, error: err }) + ctx.error('failed to load document', { documentId, error: err }) } } async saveDocument (ctx: MeasureContext, documentId: DocumentId, document: YDoc, context: Context): Promise { let snapshot: YDocVersion | undefined try { - await ctx.info('take document snapshot', { documentId }) + ctx.info('take document snapshot', { documentId }) snapshot = await this.takeSnapshot(ctx, documentId, document, context) } catch (err) { - await ctx.error('failed to take document snapshot', { documentId, error: err }) + ctx.error('failed to take document snapshot', { documentId, error: err }) } try { - await ctx.info('save document content', { documentId }) + ctx.info('save document content', { documentId }) await this.saveDocumentToStorage(ctx, documentId, document, context) } catch (err) { - await ctx.error('failed to save document', { documentId, error: err }) + ctx.error('failed to save document', { documentId, error: err }) } const { platformDocumentId } = context if (platformDocumentId !== undefined) { - await ctx.info('save document content to platform', { documentId, platformDocumentId }) + ctx.info('save document content to platform', { documentId, platformDocumentId }) await ctx.with('save-document', { storage: 'platform' }, async (ctx) => { await this.saveDocumentToPlatform(ctx, documentId, platformDocumentId, document, snapshot, context) }) @@ -158,7 +158,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter { try { return await loadCollaborativeDoc(adapter, context.workspaceId, collaborativeDoc, ctx) } catch (err) { - await ctx.error('failed to load storage document', { documentId, collaborativeDoc, error: err }) + ctx.error('failed to load storage document', { documentId, collaborativeDoc, error: err }) return undefined } }) @@ -249,7 +249,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter { const attribute = client.getHierarchy().findAttribute(objectClass, objectAttr) if (attribute === undefined) { - await ctx.info('attribute not found', { documentName, objectClass, objectAttr }) + ctx.info('attribute not found', { documentName, objectClass, objectAttr }) return } diff --git a/server/core/src/indexer/indexer.ts b/server/core/src/indexer/indexer.ts index 047d784313..bce7aad0ed 100644 --- a/server/core/src/indexer/indexer.ts +++ b/server/core/src/indexer/indexer.ts @@ -97,7 +97,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { this.triggerIndexing() await this.indexing await this.flush(true) - await this.metrics.info('Cancel indexing', { workspace: this.workspace.name, indexId: this.indexId }) + this.metrics.warn('Cancel indexing', { workspace: this.workspace.name, indexId: this.indexId }) } async markRemove (doc: DocIndexState): Promise { @@ -336,7 +336,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { try { this.hierarchy.getClass(core.class.DocIndexState) } catch (err: any) { - await this.metrics.info('Models is not upgraded to support indexer', { + this.metrics.warn('Models is not upgraded to support indexer', { indexId: this.indexId, workspace: this.workspace.name }) @@ -370,12 +370,12 @@ export class FullTextIndexPipeline implements FullTextPipeline { _classes.forEach((it) => this.broadcastClasses.add(it)) if (this.triggerCounts > 0) { - await this.metrics.info('No wait, trigger counts', { triggerCount: this.triggerCounts }) + this.metrics.info('No wait, trigger counts', { triggerCount: this.triggerCounts }) } if (this.toIndex.size === 0 && this.stageChanged === 0 && this.triggerCounts === 0) { if (this.toIndex.size === 0) { - await this.metrics.info('Indexing complete', { indexId: this.indexId, workspace: this.workspace.name }) + this.metrics.warn('Indexing complete', { indexId: this.indexId, workspace: this.workspace.name }) } if (!this.cancelling) { // We need to send index update event @@ -401,7 +401,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { } } } - await this.metrics.info('Exit indexer', { indexId: this.indexId, workspace: this.workspace.name }) + this.metrics.warn('Exit indexer', { indexId: this.indexId, workspace: this.workspace.name }) } private async processIndex (ctx: MeasureContext): Promise>[]> { @@ -474,7 +474,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { } if (result.length > 0) { - await this.metrics.info('Full text: Indexing', { + this.metrics.info('Full text: Indexing', { indexId: this.indexId, stageId: st.stageId, workspace: this.workspace.name, @@ -531,7 +531,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { } } } catch (err: any) { - await this.metrics.error('error during index', { error: err }) + this.metrics.error('error during index', { error: err }) } } }) @@ -585,7 +585,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { if (toRemoveIds.length > 0) { await this.storage.clean(this.metrics, DOMAIN_DOC_INDEX_STATE, toRemoveIds) total += toRemoveIds.length - await this.metrics.info('indexer', { + this.metrics.info('indexer', { _classes: Array.from(groupByArray(toIndex, (it) => it.objectClass).keys()), total, count: toRemoveIds.length diff --git a/server/core/src/server/storage.ts b/server/core/src/server/storage.ts index ab7ee7de7f..d8462e35d5 100644 --- a/server/core/src/server/storage.ts +++ b/server/core/src/server/storage.ts @@ -196,7 +196,7 @@ export class TServerStorage implements ServerStorage { const txCUD = TxProcessor.extractTx(tx) as TxCUD if (!this.hierarchy.isDerived(txCUD._class, core.class.TxCUD)) { // Skip unsupported tx - await ctx.error('Unsupported transaction', tx) + ctx.error('Unsupported transaction', tx) continue } const domain = this.hierarchy.getDomain(txCUD.objectClass) @@ -394,7 +394,7 @@ export class TServerStorage implements ServerStorage { { clazz, query, options } ) if (Date.now() - st > 1000) { - await ctx.error('FindAll', { time: Date.now() - st, clazz, query: cutObjectArray(query), options }) + ctx.error('FindAll', { time: Date.now() - st, clazz, query: cutObjectArray(query), options }) } return result } @@ -791,7 +791,7 @@ export class TServerStorage implements ServerStorage { await fx() } } catch (err: any) { - await ctx.error('error process tx', { error: err }) + ctx.error('error process tx', { error: err }) throw err } finally { onEnds.forEach((p) => { diff --git a/server/elastic/src/adapter.ts b/server/elastic/src/adapter.ts index 9cdccbb235..e0c5e9df9e 100644 --- a/server/elastic/src/adapter.ts +++ b/server/elastic/src/adapter.ts @@ -146,7 +146,7 @@ class ElasticAdapter implements FullTextAdapter { } if (k === 'workspaceId') { if (va?.type !== 'keyword') { - await this.metrics().info('Force index-recreate, since wrong index type was used') + this.metrics().info('Force index-recreate, since wrong index type was used') await this.client.indices.delete({ index: indexName }) diff --git a/server/front/src/index.ts b/server/front/src/index.ts index af1e7ad7a5..4bd0657cbb 100644 --- a/server/front/src/index.ts +++ b/server/front/src/index.ts @@ -49,7 +49,7 @@ async function minioUpload ( { file: file.name, contentType: file.mimetype } ) - await ctx.info('minio upload', resp) + ctx.info('minio upload', resp) return id } @@ -81,7 +81,7 @@ async function getFileRange ( ): Promise { const stat = await ctx.with('stats', {}, async () => await client.stat(ctx, workspace, uuid)) if (stat === undefined) { - await ctx.error('No such key', { file: uuid }) + ctx.error('No such key', { file: uuid }) res.status(404).send() return } @@ -127,7 +127,7 @@ async function getFileRange ( resolve() }) dataStream.on('error', (err) => { - void ctx.error('error receive stream', { workspace: workspace.name, uuid, error: err }) + ctx.error('error receive stream', { workspace: workspace.name, uuid, error: err }) Analytics.handleError(err) res.end() reject(err) @@ -138,12 +138,12 @@ async function getFileRange ( }) } catch (err: any) { if (err?.code === 'NoSuchKey' || err?.code === 'NotFound') { - await ctx.info('No such key', { workspace: workspace.name, uuid }) + ctx.info('No such key', { workspace: workspace.name, uuid }) res.status(404).send() return } else { Analytics.handleError(err) - void ctx.error(err) + ctx.error(err) } res.status(500).send() } @@ -162,7 +162,7 @@ async function getFile ( ): Promise { const stat = await ctx.with('stat', {}, async () => await client.stat(ctx, workspace, uuid)) if (stat === undefined) { - await ctx.error('No such key', { file: req.query.file }) + ctx.error('No such key', { file: req.query.file }) res.status(404).send() return } @@ -211,12 +211,12 @@ async function getFile ( dataStream.on('error', function (err) { res.status(500).send() Analytics.handleError(err) - void ctx.error('error', { err }) + ctx.error('error', { err }) reject(err) }) }) } catch (err: any) { - await ctx.error('get-file-error', { workspace: workspace.name, err }) + ctx.error('get-file-error', { workspace: workspace.name, err }) Analytics.handleError(err) res.status(500).send() } @@ -261,7 +261,7 @@ export function start ( class MyStream { write (text: string): void { - void ctx.info(text) + ctx.info(text) } } @@ -310,7 +310,7 @@ export function start ( }) res.end(json) } catch (err: any) { - void ctx.error('statistics error', { err }) + ctx.error('statistics error', { err }) Analytics.handleError(err) res.writeHead(404, {}) res.end() @@ -348,7 +348,7 @@ export function start ( uuid = await getResizeID(ctx, size, uuid, config, payload) const stat = await config.storageAdapter.stat(ctx, payload.workspace, uuid) if (stat === undefined) { - await ctx.error('No such key', { file: req.query.file }) + ctx.error('No such key', { file: req.query.file }) res.status(404).send() return } @@ -366,11 +366,11 @@ export function start ( res.end() } catch (error: any) { if (error?.code === 'NoSuchKey' || error?.code === 'NotFound') { - await ctx.error('No such key', { file: req.query.file }) + ctx.error('No such key', { file: req.query.file }) res.status(404).send() return } else { - await ctx.error('error-handle-files', error) + ctx.error('error-handle-files', error) } res.status(500).send() } @@ -447,11 +447,11 @@ export function start ( } } catch (error: any) { if (error?.code === 'NoSuchKey' || error?.code === 'NotFound') { - await ctx.error('No such key', { file: req.query.file }) + ctx.error('No such key', { file: req.query.file }) res.status(404).send() return } else { - await ctx.error('error-handle-files', error) + ctx.error('error-handle-files', error) } res.status(500).send() } @@ -507,7 +507,7 @@ export function start ( res.status(200).send(uuid) } catch (error: any) { - await ctx.error('error-post-files', error) + ctx.error('error-post-files', error) res.status(500).send() } }, @@ -548,7 +548,7 @@ export function start ( res.status(200).send() } catch (error: any) { Analytics.handleError(error) - await ctx.error('failed to delete', { url: req.url }) + ctx.error('failed to delete', { url: req.url }) res.status(500).send() } } @@ -617,25 +617,25 @@ export function start ( .catch((err: any) => { if (err !== null) { Analytics.handleError(err) - void ctx.error('error', { err }) + ctx.error('error', { err }) res.status(500).send(err) } }) }) .on('error', function (err) { Analytics.handleError(err) - void ctx.error('error', { err }) + ctx.error('error', { err }) res.status(500).send(err) }) }) .on('error', (e) => { Analytics.handleError(e) - void ctx.error('error', { e }) + ctx.error('error', { e }) res.status(500).send(e) }) } catch (error: any) { Analytics.handleError(error) - void ctx.error('error', { error }) + ctx.error('error', { error }) res.status(500).send() } }) @@ -695,19 +695,19 @@ export function start ( }) .catch((err: any) => { Analytics.handleError(err) - void ctx.error('error', { err }) + ctx.error('error', { err }) res.status(500).send(err) }) }) .on('error', function (err) { Analytics.handleError(err) - void ctx.error('error', { err }) + ctx.error('error', { err }) res.status(500).send(err) }) }) } catch (error: any) { Analytics.handleError(error) - void ctx.error('error', { error }) + ctx.error('error', { error }) res.status(500).send() } }) diff --git a/server/minio/src/index.ts b/server/minio/src/index.ts index 2beb202656..259fe7476f 100644 --- a/server/minio/src/index.ts +++ b/server/minio/src/index.ts @@ -128,7 +128,7 @@ export class MinioService implements StorageAdapter { version: result.versionId ?? null } } catch (err: any) { - await ctx.error('no object found', err) + ctx.error('no object found', { error: err, objectName, workspaceId: workspaceId.name }) } } diff --git a/server/mongo/src/storage.ts b/server/mongo/src/storage.ts index 6254e1cd7a..a56d04199b 100644 --- a/server/mongo/src/storage.ts +++ b/server/mongo/src/storage.ts @@ -839,7 +839,7 @@ abstract class MongoAdapterBase implements DbAdapter { ) }) } catch (err: any) { - await ctx.error('failed on bulk write', { error: err, skip }) + ctx.error('failed on bulk write', { error: err, skip }) if (skip !== 1) { ops.push(...part) skip = 1 // Let's update one by one, to loose only one failed variant. diff --git a/server/rpc/src/rpc.ts b/server/rpc/src/rpc.ts index 55845dd05c..8a79fed0ff 100644 --- a/server/rpc/src/rpc.ts +++ b/server/rpc/src/rpc.ts @@ -63,6 +63,8 @@ export interface Response { index: number final: boolean } + time?: number // Server time to perform operation + queue?: number } /** diff --git a/server/server/src/apm.ts b/server/server/src/apm.ts index 754a19b035..983b52c087 100644 --- a/server/server/src/apm.ts +++ b/server/server/src/apm.ts @@ -26,15 +26,16 @@ export function createAPMAgent (apmUrl: string): Agent { export class APMMeasureContext implements MeasureContext { logger: MeasureLogger private readonly transaction?: Transaction | Span - private readonly parent?: Transaction | Span + private readonly parentTx?: Transaction | Span constructor ( private readonly agent: Agent, name: string, params: Record, - parent?: Transaction | Span, - noTransaction?: boolean + parentTx?: Transaction | Span, + noTransaction?: boolean, + readonly parent?: MeasureContext ) { - this.parent = parent + this.parentTx = parentTx this.logger = { info: (msg, args) => { agent.logger.info({ message: msg, ...args }) @@ -42,14 +43,17 @@ export class APMMeasureContext implements MeasureContext { error: (msg, args) => { agent.logger.error({ message: msg, ...args }) }, + warn: (msg, args) => { + agent.logger.warn({ message: msg, ...args }) + }, logOperation (operation, time, params) {}, close: async () => {} } if (!(noTransaction ?? false)) { - if (this.parent === undefined) { + if (this.parentTx === undefined) { this.transaction = agent.startTransaction(name) ?? undefined } else { - this.transaction = agent.startSpan(name, { childOf: this.parent }) ?? undefined + this.transaction = agent.startSpan(name, { childOf: this.parentTx }) ?? undefined } for (const [k, v] of Object.entries(params)) { this.transaction?.setLabel(k, v) @@ -58,7 +62,7 @@ export class APMMeasureContext implements MeasureContext { } newChild (name: string, params: Record): MeasureContext { - return new APMMeasureContext(this.agent, name, params, this.transaction) + return new APMMeasureContext(this.agent, name, params, this.transaction, undefined, this) } measure (name: string, value: number): void {} @@ -78,7 +82,7 @@ export class APMMeasureContext implements MeasureContext { c.end() return value } catch (err: any) { - await c.error(err) + c.error(err) throw err } } @@ -95,20 +99,19 @@ export class APMMeasureContext implements MeasureContext { return r } - async error (message: string, ...args: any[]): Promise { + error (message: string, ...args: any[]): void { this.logger.error(message, args) - - await new Promise((resolve) => { - this.agent.captureError({ message, params: args }, () => { - resolve() - }) - }) + this.agent.captureError({ message, params: args }) } - async info (message: string, ...args: any[]): Promise { + info (message: string, ...args: any[]): void { this.logger.info(message, args) } + warn (message: string, ...args: any[]): void { + this.logger.warn(message, args) + } + end (): void { this.transaction?.end() } diff --git a/server/server/src/minio.ts b/server/server/src/minio.ts index 3e6786f1de..002d780527 100644 --- a/server/server/src/minio.ts +++ b/server/server/src/minio.ts @@ -91,7 +91,7 @@ class StorageBlobAdapter implements DbAdapter { for (const item of docs) { const stat = await this.client.stat(this.ctx, this.workspaceId, item) if (stat === undefined) { - await ctx.error('Could not find blob', { domain, item, allDocs: cutObjectArray(docs) }) + ctx.error('Could not find blob', { domain, item, allDocs: cutObjectArray(docs) }) continue } const chunks: Buffer[] = await this.client.read(this.ctx, this.workspaceId, item) diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index f1e061eaa0..7e139833ab 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -21,6 +21,7 @@ import core, { systemAccountEmail, toWorkspaceString, versionToString, + withContext, type BaseWorkspaceInfo, type MeasureContext, type Tx, @@ -160,7 +161,7 @@ class TSessionManager implements SessionManager { } if (diff > timeout && this.ticks % 10 === 0) { - void this.ctx.error('session hang, closing...', { wsId, user: s[1].session.getUser() }) + this.ctx.warn('session hang, closing...', { wsId, user: s[1].session.getUser() }) // Force close workspace if only one client and it hang. void this.close(s[1].socket, workspace.workspaceId) @@ -177,7 +178,7 @@ class TSessionManager implements SessionManager { for (const r of s[1].session.requests.values()) { if (now - r.start > 30000) { - void this.ctx.info('request hang found, 30sec', { + this.ctx.warn('request hang found, 30sec', { wsId, user: s[1].session.getUser(), ...r.params @@ -190,7 +191,7 @@ class TSessionManager implements SessionManager { if (workspace.sessions.size === 0 && workspace.closing === undefined) { workspace.softShutdown-- if (workspace.softShutdown <= 0) { - void this.ctx.info('closing workspace, no users', { + this.ctx.warn('closing workspace, no users', { workspace: workspace.workspaceId.name, wsId, upgrade: workspace.upgrade, @@ -209,7 +210,8 @@ class TSessionManager implements SessionManager { return this.sessionFactory(token, pipeline, this.broadcast.bind(this)) } - async getWorkspaceInfo (accounts: string, token: string): Promise { + @withContext('get-workspace-info') + async getWorkspaceInfo (ctx: MeasureContext, accounts: string, token: string): Promise { const userInfo = await ( await fetch(accounts, { method: 'POST', @@ -224,15 +226,16 @@ class TSessionManager implements SessionManager { }) ).json() if (userInfo.error !== undefined) { - await this.ctx.error('Error response from account service', { error: JSON.stringify(userInfo) }) + ctx.error('Error response from account service', { error: JSON.stringify(userInfo) }) throw new Error(JSON.stringify(userInfo.error)) } return { ...userInfo.result, upgrade: userInfo.upgrade } } + @withContext('📲 add-session') async addSession ( - baseCtx: MeasureContext, + ctx: MeasureContext, ws: ConnectionSocket, token: Token, rawToken: string, @@ -245,134 +248,127 @@ class TSessionManager implements SessionManager { | { upgrade: true, upgradeInfo?: WorkspaceLoginInfo['upgrade'] } | { error: any } > { - return await baseCtx.with('📲 add-session', {}, async (ctx) => { - const wsString = toWorkspaceString(token.workspace, '@') + const wsString = toWorkspaceString(token.workspace, '@') - let workspaceInfo = await ctx.with('check-token', {}, async (ctx) => - accountsUrl !== '' ? await this.getWorkspaceInfo(accountsUrl, rawToken) : this.wsFromToken(token) + let workspaceInfo = + accountsUrl !== '' ? await this.getWorkspaceInfo(ctx, accountsUrl, rawToken) : this.wsFromToken(token) + + if (workspaceInfo?.creating === true && token.email !== systemAccountEmail) { + // No access to workspace for token. + return { error: new Error(`Workspace during creation phase ${token.email} ${token.workspace.name}`) } + } + if (workspaceInfo === undefined && token.extra?.admin !== 'true') { + // No access to workspace for token. + return { error: new Error(`No access to workspace for token ${token.email} ${token.workspace.name}`) } + } else if (workspaceInfo === undefined) { + workspaceInfo = this.wsFromToken(token) + } + + if ( + this.modelVersion !== '' && + workspaceInfo.version !== undefined && + this.modelVersion !== versionToString(workspaceInfo.version) && + token.extra?.model !== 'upgrade' && + token.extra?.mode !== 'backup' + ) { + ctx.warn('model version mismatch', { + version: this.modelVersion, + workspaceVersion: versionToString(workspaceInfo.version) + }) + // Version mismatch, return upgrading. + return { upgrade: true, upgradeInfo: workspaceInfo.upgrade } + } + + let workspace = this.workspaces.get(wsString) + if (workspace?.closing !== undefined) { + await workspace?.closing + } + workspace = this.workspaces.get(wsString) + if (sessionId !== undefined && workspace?.sessions?.has(sessionId) === true) { + const helloResponse: HelloResponse = { + id: -1, + result: 'hello', + binary: false, + reconnect: false, + alreadyConnected: true + } + await ws.send(ctx, helloResponse, false, false) + return { error: new Error('Session already exists') } + } + const workspaceName = workspaceInfo.workspaceName ?? workspaceInfo.workspaceUrl ?? workspaceInfo.workspaceId + + if (workspace === undefined) { + ctx.warn('open workspace', { + email: token.email, + workspace: workspaceInfo.workspaceId, + wsUrl: workspaceInfo.workspaceUrl, + ...token.extra + }) + workspace = this.createWorkspace( + ctx.parent ?? ctx, + pipelineFactory, + token, + workspaceInfo.workspaceUrl ?? workspaceInfo.workspaceId, + workspaceName ) - if (workspaceInfo?.creating === true && token.email !== systemAccountEmail) { - // No access to workspace for token. - return { error: new Error(`Workspace during creation phase ${token.email} ${token.workspace.name}`) } - } - if (workspaceInfo === undefined && token.extra?.admin !== 'true') { - // No access to workspace for token. - return { error: new Error(`No access to workspace for token ${token.email} ${token.workspace.name}`) } - } else if (workspaceInfo === undefined) { - workspaceInfo = this.wsFromToken(token) - } + } - if ( - this.modelVersion !== '' && - workspaceInfo.version !== undefined && - this.modelVersion !== versionToString(workspaceInfo.version) && - token.extra?.model !== 'upgrade' && - token.extra?.mode !== 'backup' - ) { - await ctx.info('model version mismatch', { - version: this.modelVersion, - workspaceVersion: versionToString(workspaceInfo.version) - }) - // Version mismatch, return upgrading. - return { upgrade: true, upgradeInfo: workspaceInfo.upgrade } - } - - let workspace = this.workspaces.get(wsString) - if (workspace?.closing !== undefined) { - await workspace?.closing - } - workspace = this.workspaces.get(wsString) - if (sessionId !== undefined && workspace?.sessions?.has(sessionId) === true) { - const helloResponse: HelloResponse = { - id: -1, - result: 'hello', - binary: false, - reconnect: false, - alreadyConnected: true - } - await ws.send(ctx, helloResponse, false, false) - return { error: new Error('Session already exists') } - } - const workspaceName = workspaceInfo.workspaceName ?? workspaceInfo.workspaceUrl ?? workspaceInfo.workspaceId - - if (workspace === undefined) { - await ctx.info('open workspace', { + let pipeline: Pipeline + if (token.extra?.model === 'upgrade') { + if (workspace.upgrade) { + ctx.warn('reconnect workspace in upgrade', { email: token.email, workspace: workspaceInfo.workspaceId, - wsUrl: workspaceInfo.workspaceUrl, - ...token.extra + wsUrl: workspaceInfo.workspaceUrl }) - workspace = this.createWorkspace( - baseCtx, - pipelineFactory, + pipeline = await ctx.with('💤 wait', { workspaceName }, async () => await (workspace as Workspace).pipeline) + } else { + ctx.warn('reconnect workspace in upgrade switch', { + email: token.email, + workspace: workspaceInfo.workspaceId, + wsUrl: workspaceInfo.workspaceUrl + }) + // We need to wait in case previous upgeade connection is already closing. + pipeline = await this.switchToUpgradeSession( token, + sessionId, + ctx.parent ?? ctx, + wsString, + workspace, + pipelineFactory, + ws, workspaceInfo.workspaceUrl ?? workspaceInfo.workspaceId, workspaceName ) } - - let pipeline: Pipeline - if (token.extra?.model === 'upgrade') { - if (workspace.upgrade) { - await ctx.info('reconnect workspace in upgrade', { - email: token.email, - workspace: workspaceInfo.workspaceId, - wsUrl: workspaceInfo.workspaceUrl - }) - pipeline = await ctx.with('💤 wait', { workspaceName }, async () => await (workspace as Workspace).pipeline) - } else { - await ctx.info('reconnect workspace in upgrade switch', { - email: token.email, - workspace: workspaceInfo.workspaceId, - wsUrl: workspaceInfo.workspaceUrl - }) - // We need to wait in case previous upgeade connection is already closing. - pipeline = await this.switchToUpgradeSession( - token, - sessionId, - ctx, - wsString, - workspace, - pipelineFactory, - ws, - workspaceInfo.workspaceUrl ?? workspaceInfo.workspaceId, - workspaceName - ) - } - } else { - if (workspace.upgrade) { - await ctx.info('connect during upgrade', { - email: token.email, - workspace: workspace.workspaceId.name, - sessionUsers: Array.from(workspace.sessions.values()).map((it) => it.session.getUser()), - sessionData: Array.from(workspace.sessions.values()).map((it) => it.socket.data()) - }) - return { upgrade: true } - } - pipeline = await ctx.with('💤 wait', { workspaceName }, async () => await (workspace as Workspace).pipeline) + } else { + if (workspace.upgrade) { + ctx.warn('connect during upgrade', { + email: token.email, + workspace: workspace.workspaceId.name, + sessionUsers: Array.from(workspace.sessions.values()).map((it) => it.session.getUser()), + sessionData: Array.from(workspace.sessions.values()).map((it) => it.socket.data()) + }) + return { upgrade: true } } + pipeline = await ctx.with('💤 wait', { workspaceName }, async () => await (workspace as Workspace).pipeline) + } - const session = this.createSession(token, pipeline) + const session = this.createSession(token, pipeline) - session.sessionId = sessionId !== undefined && (sessionId ?? '').trim().length > 0 ? sessionId : generateId() - session.sessionInstanceId = generateId() - this.sessions.set(ws.id, { session, socket: ws }) - // We need to delete previous session with Id if found. - workspace.sessions.set(session.sessionId, { session, socket: ws }) + session.sessionId = sessionId !== undefined && (sessionId ?? '').trim().length > 0 ? sessionId : generateId() + session.sessionInstanceId = generateId() + this.sessions.set(ws.id, { session, socket: ws }) + // We need to delete previous session with Id if found. + workspace.sessions.set(session.sessionId, { session, socket: ws }) - // We do not need to wait for set-status, just return session to client - void ctx.with('set-status', {}, (ctx) => this.trySetStatus(ctx, session, true)) + // We do not need to wait for set-status, just return session to client + void ctx.with('set-status', {}, (ctx) => this.trySetStatus(ctx, session, true)) - if (this.timeMinutes > 0) { - void ws.send( - ctx, - { result: this.createMaintenanceWarning() }, - session.binaryResponseMode, - session.useCompression - ) - } - return { session, context: workspace.context, workspaceId: wsString } - }) + if (this.timeMinutes > 0) { + void ws.send(ctx, { result: this.createMaintenanceWarning() }, session.binaryResponseMode, session.useCompression) + } + return { session, context: workspace.context, workspaceId: wsString } } private wsFromToken (token: Token): WorkspaceLoginInfo { @@ -402,7 +398,7 @@ class TSessionManager implements SessionManager { workspaceName: string ): Promise { if (LOGGING_ENABLED) { - await ctx.info('reloading workspace', { workspaceName, token: JSON.stringify(token) }) + ctx.info('reloading workspace', { workspaceName, token: JSON.stringify(token) }) } // Mark as upgrade, to prevent any new clients to connect during close @@ -450,7 +446,7 @@ class TSessionManager implements SessionManager { ) } catch (err: any) { Analytics.handleError(err) - void ctx.error('error during send', { error: err }) + ctx.error('error during send', { error: err }) } } } @@ -539,13 +535,13 @@ class TSessionManager implements SessionManager { async close (ws: ConnectionSocket, workspaceId: WorkspaceId): Promise { const wsid = toWorkspaceString(workspaceId) const workspace = this.workspaces.get(wsid) - if (workspace === undefined) { - return - } + const sessionRef = this.sessions.get(ws.id) if (sessionRef !== undefined) { this.sessions.delete(ws.id) - workspace.sessions.delete(sessionRef.session.sessionId) + if (workspace !== undefined) { + workspace.sessions.delete(sessionRef.session.sessionId) + } this.reconnectIds.add(sessionRef.session.sessionId) setTimeout(() => { @@ -557,9 +553,11 @@ class TSessionManager implements SessionManager { // Ignore if closed } const user = sessionRef.session.getUser() - const another = Array.from(workspace.sessions.values()).findIndex((p) => p.session.getUser() === user) - if (another === -1 && !workspace.upgrade) { - await this.trySetStatus(workspace.context, sessionRef.session, false) + if (workspace !== undefined) { + const another = Array.from(workspace.sessions.values()).findIndex((p) => p.session.getUser() === user) + if (another === -1 && !workspace.upgrade) { + await this.trySetStatus(workspace.context, sessionRef.session, false) + } } } } @@ -582,7 +580,7 @@ class TSessionManager implements SessionManager { ignoreSocket?: ConnectionSocket ): Promise { if (LOGGING_ENABLED) { - await this.ctx.info('closing workspace', { + this.ctx.warn('closing workspace', { workspace: workspace.id, wsName: workspace.workspaceName, code, @@ -604,7 +602,7 @@ class TSessionManager implements SessionManager { } if (LOGGING_ENABLED) { - await this.ctx.info('Clients disconnected. Closing Workspace...', { + this.ctx.warn('Clients disconnected. Closing Workspace...', { wsId, workspace: workspace.id, wsName: workspace.workspaceName @@ -621,14 +619,14 @@ class TSessionManager implements SessionManager { }) } catch (err: any) { Analytics.handleError(err) - await this.ctx.error('close-pipeline-error', { error: err }) + this.ctx.error('close-pipeline-error', { error: err }) } } await this.ctx.with('closing', {}, async () => { await Promise.race([closePipeline(), timeoutPromise(120000)]) }) if (LOGGING_ENABLED) { - await this.ctx.info('Workspace closed...', { workspace: workspace.id, wsId, wsName: workspace.workspaceName }) + this.ctx.warn('Workspace closed...', { workspace: workspace.id, wsId, wsName: workspace.workspaceName }) } } @@ -663,7 +661,7 @@ class TSessionManager implements SessionManager { const logParams = { wsid, workspace: workspace.id, wsName: workspaceId.name } if (workspace.sessions.size === 0) { if (LOGGING_ENABLED) { - await this.ctx.info('no sessions for workspace', logParams) + this.ctx.warn('no sessions for workspace', logParams) } try { if (workspace.sessions.size === 0) { @@ -676,19 +674,19 @@ class TSessionManager implements SessionManager { } workspace.context.end() if (LOGGING_ENABLED) { - await this.ctx.info('Closed workspace', logParams) + this.ctx.warn('Closed workspace', logParams) } } } catch (err: any) { Analytics.handleError(err) this.workspaces.delete(wsid) if (LOGGING_ENABLED) { - await this.ctx.error('failed', { ...logParams, error: err }) + this.ctx.error('failed', { ...logParams, error: err }) } } } else { if (LOGGING_ENABLED) { - await this.ctx.info('few sessions for workspace, close skipped', { + this.ctx.info('few sessions for workspace, close skipped', { ...logParams, sessions: workspace.sessions.size }) @@ -699,7 +697,7 @@ class TSessionManager implements SessionManager { broadcast (from: Session | null, workspaceId: WorkspaceId, resp: Response, target?: string[]): void { const workspace = this.workspaces.get(toWorkspaceString(workspaceId)) if (workspace === undefined) { - void this.ctx.error('internal: cannot find sessions', { + this.ctx.error('internal: cannot find sessions', { workspaceId: workspaceId.name, target, userId: from?.getUser() ?? '$unknown' @@ -710,7 +708,7 @@ class TSessionManager implements SessionManager { return } if (LOGGING_ENABLED) { - void this.ctx.info('server broadcasting to clients...', { + this.ctx.info('server broadcasting to clients...', { workspace: workspaceId.name, count: workspace.sessions.size }) @@ -792,7 +790,7 @@ class TSessionManager implements SessionManager { service.useBroadcast = hello.broadcast ?? false if (LOGGING_ENABLED) { - await ctx.info('hello happen', { + ctx.info('hello happen', { workspace, user: service.getUser(), binary: service.binaryResponseMode, @@ -834,7 +832,12 @@ class TSessionManager implements SessionManager { ? await f.apply(service, [service.measureCtx?.ctx, ...params]) : await ctx.with('🧨 process', {}, async (callTx) => f.apply(service, [callTx, ...params])) - const resp: Response = { id: request.id, result } + const resp: Response = { + id: request.id, + result, + time: Date.now() - st, + queue: service.requests.size + } await handleSend( ctx, @@ -847,7 +850,7 @@ class TSessionManager implements SessionManager { } catch (err: any) { Analytics.handleError(err) if (LOGGING_ENABLED) { - await this.ctx.error('error handle request', { error: err, request }) + this.ctx.error('error handle request', { error: err, request }) } const resp: Response = { id: request.id, @@ -892,7 +895,7 @@ class TSessionManager implements SessionManager { } catch (err: any) { Analytics.handleError(err) if (LOGGING_ENABLED) { - await ctx.error('error handle measure', { error: err, request }) + ctx.error('error handle measure', { error: err, request }) } const resp: Response = { id: request.id, diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index e947eb6fa9..b523576a1c 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -49,7 +49,7 @@ export function startHttpServer ( accountsUrl: string ): () => Promise { if (LOGGING_ENABLED) { - void ctx.info('starting server on', { port, productId, enableCompression, accountsUrl }) + ctx.info('starting server on', { port, productId, enableCompression, accountsUrl }) } const app = express() @@ -157,15 +157,18 @@ export function startHttpServer ( ? { zlibDeflateOptions: { // See zlib defaults. - chunkSize: 16 * 1024, - level: 6 + chunkSize: 10 * 1024, + memLevel: 7, + level: 3 }, zlibInflateOptions: { - chunkSize: 16 * 1024, - level: 6 + chunkSize: 10 * 1024, + level: 3 }, - threshold: 1024, // Size (in bytes) below which messages, should not be compressed if context takeover is disabled. - concurrencyLimit: 100 + // Below options specified as default values. + concurrencyLimit: 20, // Limits zlib concurrency for perf. + threshold: 1024 // Size (in bytes) below which messages + // should not be compressed if context takeover is disabled. } : false, skipUTF8Validation: true @@ -231,7 +234,7 @@ export function startHttpServer ( ) if ('upgrade' in session || 'error' in session) { if ('error' in session) { - void ctx.error('error', { error: session.error?.message, stack: session.error?.stack }) + ctx.error('error', { error: session.error?.message, stack: session.error?.stack }) } await cs.send(ctx, { id: -1, result: { state: 'upgrading', stats: (session as any).upgradeInfo } }, false, false) cs.close() @@ -252,7 +255,7 @@ export function startHttpServer ( } catch (err: any) { Analytics.handleError(err) if (LOGGING_ENABLED) { - void ctx.error('message error', err) + ctx.error('message error', err) } } }) @@ -282,7 +285,7 @@ export function startHttpServer ( if (payload.workspace.productId !== productId) { if (LOGGING_ENABLED) { - void ctx.error('invalid product', { required: payload.workspace.productId, productId }) + ctx.error('invalid product', { required: payload.workspace.productId, productId }) } throw new Error('Invalid workspace product') } @@ -291,7 +294,7 @@ export function startHttpServer ( } catch (err: any) { Analytics.handleError(err) if (LOGGING_ENABLED) { - void ctx.error('invalid token', err) + ctx.error('invalid token', err) } wss.handleUpgrade(request, socket, head, (ws) => { const resp: Response = { @@ -311,7 +314,7 @@ export function startHttpServer ( }) httpServer.on('error', (err) => { if (LOGGING_ENABLED) { - void ctx.error('server error', err) + ctx.error('server error', err) } })