From 396b55ff43ed8aeae7d1abcedaf4fad56b0b688e Mon Sep 17 00:00:00 2001 From: Andrey Sobolev <haiodo@users.noreply.github.com> Date: Fri, 14 Apr 2023 00:39:20 +0700 Subject: [PATCH] TSK-1170: Fix transactions retrieval to speedup of workspace open (#2976) Signed-off-by: Andrey Sobolev <haiodo@gmail.com> --- server/core/src/storage.ts | 21 ++++++++++++-- server/mongo/src/storage.ts | 4 +-- server/ws/src/server.ts | 55 +++++++++++++++++++++---------------- 3 files changed, 52 insertions(+), 28 deletions(-) diff --git a/server/core/src/storage.ts b/server/core/src/storage.ts index 875d77bb6d..69856b3901 100644 --- a/server/core/src/storage.ts +++ b/server/core/src/storage.ts @@ -129,11 +129,15 @@ class TServerStorage implements ServerStorage { } async close (): Promise<void> { + console.timeLog(this.workspace.name, 'closing') await this.fulltext.close() + console.timeLog(this.workspace.name, 'closing triggers') await this.triggerProcessor.cancel() + console.timeLog(this.workspace.name, 'closing adapters') for (const o of this.adapters.values()) { await o.close() } + console.timeLog(this.workspace.name, 'closing fulltext') await this.fulltextAdapter.close() } @@ -770,11 +774,14 @@ export async function createServerStorage ( const triggers = new Triggers() const adapters = new Map<string, DbAdapter>() const modelDb = new ModelDb(hierarchy) + + console.timeLog(conf.workspace.name, 'create server storage') const storageAdapter = conf.storageFactory?.() for (const key in conf.adapters) { const adapterConf = conf.adapters[key] adapters.set(key, await adapterConf.factory(hierarchy, adapterConf.url, conf.workspace, modelDb, storageAdapter)) + console.timeLog(conf.workspace.name, 'adapter', key) } const txAdapter = adapters.get(conf.domains[DOMAIN_TX]) as TxAdapter @@ -782,8 +789,9 @@ export async function createServerStorage ( console.log('no txadapter found') } + console.timeLog(conf.workspace.name, 'begin get model') const model = await txAdapter.getModel() - + console.timeLog(conf.workspace.name, 'get model') for (const tx of model) { try { hierarchy.tx(tx) @@ -792,6 +800,7 @@ export async function createServerStorage ( console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) } } + console.timeLog(conf.workspace.name, 'finish hierarchy') for (const tx of model) { try { @@ -800,9 +809,11 @@ export async function createServerStorage ( console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) } } + console.timeLog(conf.workspace.name, 'finish local model') - for (const [, adapter] of adapters) { + for (const [adn, adapter] of adapters) { await adapter.init(model) + console.timeLog(conf.workspace.name, 'finish init adapter', adn) } const fulltextAdapter = await conf.fulltextAdapter.factory( @@ -810,6 +821,7 @@ export async function createServerStorage ( conf.workspace, conf.metrics.newChild('fulltext', {}) ) + console.timeLog(conf.workspace.name, 'finish fulltext adapter') const metrics = conf.metrics.newChild('server-storage', {}) @@ -819,6 +831,8 @@ export async function createServerStorage ( metrics.newChild('content', {}) ) + console.timeLog(conf.workspace.name, 'finish content adapter') + const defaultAdapter = adapters.get(conf.defaultAdapter) if (defaultAdapter === undefined) { throw new Error(`No Adapter for ${DOMAIN_DOC_INDEX_STATE}`) @@ -829,7 +843,7 @@ export async function createServerStorage ( throw new Error('No storage adapter') } const stages = conf.fulltextAdapter.stages(fulltextAdapter, storage, storageAdapter, contentAdapter) - + console.timeLog(conf.workspace.name, 'finish index pipeline stages') const indexer = new FullTextIndexPipeline( defaultAdapter, stages, @@ -854,6 +868,7 @@ export async function createServerStorage ( options.broadcast?.([tx]) } ) + console.timeLog(conf.workspace.name, 'finish create indexer') return new FullTextIndex( hierarchy, fulltextAdapter, diff --git a/server/mongo/src/storage.ts b/server/mongo/src/storage.ts index e8bf2f40e8..1275eb8bdd 100644 --- a/server/mongo/src/storage.ts +++ b/server/mongo/src/storage.ts @@ -985,7 +985,7 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter { const model = await this.db .collection(DOMAIN_TX) .find<Tx>({ objectSpace: core.space.Model }) - .sort({ _id: 1 }) + .sort({ _id: 1, modifiedOn: 1 }) .toArray() // We need to put all core.account.System transactions first const systemTx: Tx[] = [] @@ -1004,7 +1004,6 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter { model.forEach((tx) => (tx.modifiedBy === core.account.System && !isEmployeeAccount(tx) ? systemTx : userTx).push(tx) ) - return systemTx.concat(userTx) } } @@ -1095,6 +1094,7 @@ export async function createMongoAdapter ( ): Promise<DbAdapter> { const client = await getMongoClient(url) const db = getWorkspaceDB(client, workspaceId) + return new MongoAdapter(db, hierarchy, modelDb, client) } diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index 6588470b71..9435f6e841 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -80,7 +80,7 @@ class SessionManager { } if (token.extra?.model === 'upgrade') { - console.log('reloading workspace', JSON.stringify(token)) + if (LOGGING_ENABLED) console.log('reloading workspace', JSON.stringify(token)) this.upgradeId = sessionId // If upgrade client is used. // Drop all existing clients @@ -156,7 +156,8 @@ class SessionManager { sessions: [], upgrade } - console.log('Creating Workspace:', workspace.id) + if (LOGGING_ENABLED) console.time(token.workspace.name) + if (LOGGING_ENABLED) console.timeLog(token.workspace.name, 'Creating Workspace:', workspace.id) this.workspaces.set(toWorkspaceString(token.workspace), workspace) return workspace } @@ -201,7 +202,7 @@ class SessionManager { const wsid = toWorkspaceString(workspaceId) const workspace = this.workspaces.get(wsid) if (workspace === undefined) { - console.error(new Error('internal: cannot find sessions')) + if (LOGGING_ENABLED) console.error(new Error('internal: cannot find sessions')) return } const index = workspace.sessions.findIndex((p) => p[1] === ws) @@ -215,8 +216,8 @@ class SessionManager { await this.setStatus(ctx, session[0], false) } if (workspace.sessions.length === 0) { - const workspaceId = workspace.id - if (LOGGING_ENABLED) console.log('no sessions for workspace', wsid, workspaceId) + const wsUID = workspace.id + if (LOGGING_ENABLED) console.log('no sessions for workspace', wsid, wsUID) const waitAndClose = async (workspace: Workspace): Promise<void> => { try { @@ -224,13 +225,13 @@ class SessionManager { await Promise.race([pl, timeoutPromise(60000)]) await Promise.race([pl.close(), timeoutPromise(60000)]) - if (this.workspaces.get(wsid)?.id === workspaceId) { + if (this.workspaces.get(wsid)?.id === wsUID) { this.workspaces.delete(wsid) } - console.log('Closed workspace', workspaceId) + if (LOGGING_ENABLED) console.timeLog(workspaceId.name, 'Closed workspace', wsUID) } catch (err: any) { this.workspaces.delete(wsid) - console.error(err) + if (LOGGING_ENABLED) console.error(err) } } workspace.closing = waitAndClose(workspace) @@ -246,7 +247,7 @@ class SessionManager { code: number, reason: 'upgrade' | 'shutdown' ): Promise<void> { - console.log(`closing workspace ${wsId} - ${workspace.id}, code: ${code}, reason: ${reason}`) + if (LOGGING_ENABLED) console.timeLog(wsId, `closing workspace ${workspace.id}, code: ${code}, reason: ${reason}`) const sessions = Array.from(workspace.sessions) workspace.sessions = [] @@ -275,18 +276,21 @@ class SessionManager { await this.setStatus(ctx, s, false) } - console.log(workspace.id, 'Clients disconnected. Closing Workspace...') + if (LOGGING_ENABLED) console.timeLog(wsId, workspace.id, 'Clients disconnected. Closing Workspace...') await Promise.all(sessions.map((s) => closeS(s[0], s[1]))) const closePipeline = async (): Promise<void> => { try { + if (LOGGING_ENABLED) console.timeLog(wsId, 'closing pipeline') await (await workspace.pipeline).close() + if (LOGGING_ENABLED) console.timeLog(wsId, 'closing pipeline done') } catch (err: any) { console.error(err) } } await Promise.race([closePipeline(), timeoutPromise(15000)]) - console.log(workspace.id, 'Workspace closed...') + if (LOGGING_ENABLED) console.timeLog(wsId, 'Workspace closed...') + console.timeEnd(wsId) } async closeWorkspaces (ctx: MeasureContext): Promise<void> { @@ -324,7 +328,7 @@ async function handleRequest<S extends Session> ( ): Promise<void> { const request = readRequest(msg) if (request.id === -1 && request.method === 'hello') { - console.log('hello happen', service.getUser()) + if (LOGGING_ENABLED) console.timeLog(workspace, 'hello happen', service.getUser()) ws.send(serialize({ id: -1, result: 'hello' })) return } @@ -342,11 +346,13 @@ async function handleRequest<S extends Session> ( const st = Date.now() timeout = setTimeout(() => { - console.log('long request found', workspace, service.getUser(), request, params) + if (LOGGING_ENABLED) console.timeLog(workspace, 'long request found', service.getUser(), request, params) }, 4000) hangTimeout = setTimeout(() => { - console.log('request hang found, 30sec', workspace, service.getUser(), request, params) + if (LOGGING_ENABLED) { + console.timeLog(workspace, 'request hang found, 30sec', workspace, service.getUser(), request, params) + } }, 30000) const result = await f.apply(service, params) @@ -355,8 +361,9 @@ async function handleRequest<S extends Session> ( const resp: Response<any> = { id: request.id, result } const diff = Date.now() - st - if (diff > 5000) { - console.log( + if (diff > 5000 && LOGGING_ENABLED) { + console.timeLog( + timeout, 'very long request found', workspace, service.getUser(), @@ -368,7 +375,7 @@ async function handleRequest<S extends Session> ( } ws.send(serialize(resp)) } catch (err: any) { - console.error(err) + if (LOGGING_ENABLED) console.error(err) clearTimeout(timeout) clearTimeout(hangTimeout) const resp: Response<any> = { @@ -393,7 +400,7 @@ export function start ( productId: string, host?: string ): () => Promise<void> { - console.log(`starting server on port ${port} ...`) + if (LOGGING_ENABLED) console.log(`starting server on port ${port} ...`) const sessions = new SessionManager(sessionFactory) @@ -438,12 +445,14 @@ export function start ( } // remove session after 1seconds, give a time to reconnect. if (code === 1000) { - console.log(`client "${token.email}" closed normally`) + if (LOGGING_ENABLED) console.log(`client "${token.email}" closed normally`) void sessions.close(ctx, ws, token.workspace, code, reason.toString()) } else { - console.log(`client "${token.email}" closed abnormally, waiting reconnect`, code, reason.toString()) + if (LOGGING_ENABLED) { + console.log(`client "${token.email}" closed abnormally, waiting reconnect`, code, reason.toString()) + } session.closeTimeout = setTimeout(() => { - console.log(`client "${token.email}" force closed`) + if (LOGGING_ENABLED) console.log(`client "${token.email}" force closed`) void sessions.close(ctx, ws, token.workspace, code, reason.toString()) }, 10000) } @@ -463,7 +472,7 @@ export function start ( try { const payload = decodeToken(token ?? '') const sessionId = url.searchParams.get('sessionId') - console.log('client connected with payload', payload, sessionId) + if (LOGGING_ENABLED) console.log('client connected with payload', payload, sessionId) if (payload.workspace.productId !== productId) { throw new Error('Invalid workspace product') @@ -471,7 +480,7 @@ export function start ( wss.handleUpgrade(request, socket, head, (ws) => wss.emit('connection', ws, request, payload, sessionId)) } catch (err) { - console.error('invalid token', err) + if (LOGGING_ENABLED) console.error('invalid token', err) wss.handleUpgrade(request, socket, head, (ws) => { const resp: Response<any> = { id: -1,