TSK-1170: Fix transactions retrieval to speedup of workspace open (#2976)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2023-04-14 00:39:20 +07:00 committed by GitHub
parent 2a11846d6e
commit 396b55ff43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 52 additions and 28 deletions

View File

@ -129,11 +129,15 @@ class TServerStorage implements ServerStorage {
} }
async close (): Promise<void> { async close (): Promise<void> {
console.timeLog(this.workspace.name, 'closing')
await this.fulltext.close() await this.fulltext.close()
console.timeLog(this.workspace.name, 'closing triggers')
await this.triggerProcessor.cancel() await this.triggerProcessor.cancel()
console.timeLog(this.workspace.name, 'closing adapters')
for (const o of this.adapters.values()) { for (const o of this.adapters.values()) {
await o.close() await o.close()
} }
console.timeLog(this.workspace.name, 'closing fulltext')
await this.fulltextAdapter.close() await this.fulltextAdapter.close()
} }
@ -770,11 +774,14 @@ export async function createServerStorage (
const triggers = new Triggers() const triggers = new Triggers()
const adapters = new Map<string, DbAdapter>() const adapters = new Map<string, DbAdapter>()
const modelDb = new ModelDb(hierarchy) const modelDb = new ModelDb(hierarchy)
console.timeLog(conf.workspace.name, 'create server storage')
const storageAdapter = conf.storageFactory?.() const storageAdapter = conf.storageFactory?.()
for (const key in conf.adapters) { for (const key in conf.adapters) {
const adapterConf = conf.adapters[key] const adapterConf = conf.adapters[key]
adapters.set(key, await adapterConf.factory(hierarchy, adapterConf.url, conf.workspace, modelDb, storageAdapter)) adapters.set(key, await adapterConf.factory(hierarchy, adapterConf.url, conf.workspace, modelDb, storageAdapter))
console.timeLog(conf.workspace.name, 'adapter', key)
} }
const txAdapter = adapters.get(conf.domains[DOMAIN_TX]) as TxAdapter const txAdapter = adapters.get(conf.domains[DOMAIN_TX]) as TxAdapter
@ -782,8 +789,9 @@ export async function createServerStorage (
console.log('no txadapter found') console.log('no txadapter found')
} }
console.timeLog(conf.workspace.name, 'begin get model')
const model = await txAdapter.getModel() const model = await txAdapter.getModel()
console.timeLog(conf.workspace.name, 'get model')
for (const tx of model) { for (const tx of model) {
try { try {
hierarchy.tx(tx) hierarchy.tx(tx)
@ -792,6 +800,7 @@ export async function createServerStorage (
console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err)
} }
} }
console.timeLog(conf.workspace.name, 'finish hierarchy')
for (const tx of model) { for (const tx of model) {
try { try {
@ -800,9 +809,11 @@ export async function createServerStorage (
console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err)
} }
} }
console.timeLog(conf.workspace.name, 'finish local model')
for (const [, adapter] of adapters) { for (const [adn, adapter] of adapters) {
await adapter.init(model) await adapter.init(model)
console.timeLog(conf.workspace.name, 'finish init adapter', adn)
} }
const fulltextAdapter = await conf.fulltextAdapter.factory( const fulltextAdapter = await conf.fulltextAdapter.factory(
@ -810,6 +821,7 @@ export async function createServerStorage (
conf.workspace, conf.workspace,
conf.metrics.newChild('fulltext', {}) conf.metrics.newChild('fulltext', {})
) )
console.timeLog(conf.workspace.name, 'finish fulltext adapter')
const metrics = conf.metrics.newChild('server-storage', {}) const metrics = conf.metrics.newChild('server-storage', {})
@ -819,6 +831,8 @@ export async function createServerStorage (
metrics.newChild('content', {}) metrics.newChild('content', {})
) )
console.timeLog(conf.workspace.name, 'finish content adapter')
const defaultAdapter = adapters.get(conf.defaultAdapter) const defaultAdapter = adapters.get(conf.defaultAdapter)
if (defaultAdapter === undefined) { if (defaultAdapter === undefined) {
throw new Error(`No Adapter for ${DOMAIN_DOC_INDEX_STATE}`) throw new Error(`No Adapter for ${DOMAIN_DOC_INDEX_STATE}`)
@ -829,7 +843,7 @@ export async function createServerStorage (
throw new Error('No storage adapter') throw new Error('No storage adapter')
} }
const stages = conf.fulltextAdapter.stages(fulltextAdapter, storage, storageAdapter, contentAdapter) const stages = conf.fulltextAdapter.stages(fulltextAdapter, storage, storageAdapter, contentAdapter)
console.timeLog(conf.workspace.name, 'finish index pipeline stages')
const indexer = new FullTextIndexPipeline( const indexer = new FullTextIndexPipeline(
defaultAdapter, defaultAdapter,
stages, stages,
@ -854,6 +868,7 @@ export async function createServerStorage (
options.broadcast?.([tx]) options.broadcast?.([tx])
} }
) )
console.timeLog(conf.workspace.name, 'finish create indexer')
return new FullTextIndex( return new FullTextIndex(
hierarchy, hierarchy,
fulltextAdapter, fulltextAdapter,

View File

@ -985,7 +985,7 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
const model = await this.db const model = await this.db
.collection(DOMAIN_TX) .collection(DOMAIN_TX)
.find<Tx>({ objectSpace: core.space.Model }) .find<Tx>({ objectSpace: core.space.Model })
.sort({ _id: 1 }) .sort({ _id: 1, modifiedOn: 1 })
.toArray() .toArray()
// We need to put all core.account.System transactions first // We need to put all core.account.System transactions first
const systemTx: Tx[] = [] const systemTx: Tx[] = []
@ -1004,7 +1004,6 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
model.forEach((tx) => model.forEach((tx) =>
(tx.modifiedBy === core.account.System && !isEmployeeAccount(tx) ? systemTx : userTx).push(tx) (tx.modifiedBy === core.account.System && !isEmployeeAccount(tx) ? systemTx : userTx).push(tx)
) )
return systemTx.concat(userTx) return systemTx.concat(userTx)
} }
} }
@ -1095,6 +1094,7 @@ export async function createMongoAdapter (
): Promise<DbAdapter> { ): Promise<DbAdapter> {
const client = await getMongoClient(url) const client = await getMongoClient(url)
const db = getWorkspaceDB(client, workspaceId) const db = getWorkspaceDB(client, workspaceId)
return new MongoAdapter(db, hierarchy, modelDb, client) return new MongoAdapter(db, hierarchy, modelDb, client)
} }

View File

@ -80,7 +80,7 @@ class SessionManager {
} }
if (token.extra?.model === 'upgrade') { if (token.extra?.model === 'upgrade') {
console.log('reloading workspace', JSON.stringify(token)) if (LOGGING_ENABLED) console.log('reloading workspace', JSON.stringify(token))
this.upgradeId = sessionId this.upgradeId = sessionId
// If upgrade client is used. // If upgrade client is used.
// Drop all existing clients // Drop all existing clients
@ -156,7 +156,8 @@ class SessionManager {
sessions: [], sessions: [],
upgrade upgrade
} }
console.log('Creating Workspace:', workspace.id) if (LOGGING_ENABLED) console.time(token.workspace.name)
if (LOGGING_ENABLED) console.timeLog(token.workspace.name, 'Creating Workspace:', workspace.id)
this.workspaces.set(toWorkspaceString(token.workspace), workspace) this.workspaces.set(toWorkspaceString(token.workspace), workspace)
return workspace return workspace
} }
@ -201,7 +202,7 @@ class SessionManager {
const wsid = toWorkspaceString(workspaceId) const wsid = toWorkspaceString(workspaceId)
const workspace = this.workspaces.get(wsid) const workspace = this.workspaces.get(wsid)
if (workspace === undefined) { if (workspace === undefined) {
console.error(new Error('internal: cannot find sessions')) if (LOGGING_ENABLED) console.error(new Error('internal: cannot find sessions'))
return return
} }
const index = workspace.sessions.findIndex((p) => p[1] === ws) const index = workspace.sessions.findIndex((p) => p[1] === ws)
@ -215,8 +216,8 @@ class SessionManager {
await this.setStatus(ctx, session[0], false) await this.setStatus(ctx, session[0], false)
} }
if (workspace.sessions.length === 0) { if (workspace.sessions.length === 0) {
const workspaceId = workspace.id const wsUID = workspace.id
if (LOGGING_ENABLED) console.log('no sessions for workspace', wsid, workspaceId) if (LOGGING_ENABLED) console.log('no sessions for workspace', wsid, wsUID)
const waitAndClose = async (workspace: Workspace): Promise<void> => { const waitAndClose = async (workspace: Workspace): Promise<void> => {
try { try {
@ -224,13 +225,13 @@ class SessionManager {
await Promise.race([pl, timeoutPromise(60000)]) await Promise.race([pl, timeoutPromise(60000)])
await Promise.race([pl.close(), timeoutPromise(60000)]) await Promise.race([pl.close(), timeoutPromise(60000)])
if (this.workspaces.get(wsid)?.id === workspaceId) { if (this.workspaces.get(wsid)?.id === wsUID) {
this.workspaces.delete(wsid) this.workspaces.delete(wsid)
} }
console.log('Closed workspace', workspaceId) if (LOGGING_ENABLED) console.timeLog(workspaceId.name, 'Closed workspace', wsUID)
} catch (err: any) { } catch (err: any) {
this.workspaces.delete(wsid) this.workspaces.delete(wsid)
console.error(err) if (LOGGING_ENABLED) console.error(err)
} }
} }
workspace.closing = waitAndClose(workspace) workspace.closing = waitAndClose(workspace)
@ -246,7 +247,7 @@ class SessionManager {
code: number, code: number,
reason: 'upgrade' | 'shutdown' reason: 'upgrade' | 'shutdown'
): Promise<void> { ): Promise<void> {
console.log(`closing workspace ${wsId} - ${workspace.id}, code: ${code}, reason: ${reason}`) if (LOGGING_ENABLED) console.timeLog(wsId, `closing workspace ${workspace.id}, code: ${code}, reason: ${reason}`)
const sessions = Array.from(workspace.sessions) const sessions = Array.from(workspace.sessions)
workspace.sessions = [] workspace.sessions = []
@ -275,18 +276,21 @@ class SessionManager {
await this.setStatus(ctx, s, false) await this.setStatus(ctx, s, false)
} }
console.log(workspace.id, 'Clients disconnected. Closing Workspace...') if (LOGGING_ENABLED) console.timeLog(wsId, workspace.id, 'Clients disconnected. Closing Workspace...')
await Promise.all(sessions.map((s) => closeS(s[0], s[1]))) await Promise.all(sessions.map((s) => closeS(s[0], s[1])))
const closePipeline = async (): Promise<void> => { const closePipeline = async (): Promise<void> => {
try { try {
if (LOGGING_ENABLED) console.timeLog(wsId, 'closing pipeline')
await (await workspace.pipeline).close() await (await workspace.pipeline).close()
if (LOGGING_ENABLED) console.timeLog(wsId, 'closing pipeline done')
} catch (err: any) { } catch (err: any) {
console.error(err) console.error(err)
} }
} }
await Promise.race([closePipeline(), timeoutPromise(15000)]) await Promise.race([closePipeline(), timeoutPromise(15000)])
console.log(workspace.id, 'Workspace closed...') if (LOGGING_ENABLED) console.timeLog(wsId, 'Workspace closed...')
console.timeEnd(wsId)
} }
async closeWorkspaces (ctx: MeasureContext): Promise<void> { async closeWorkspaces (ctx: MeasureContext): Promise<void> {
@ -324,7 +328,7 @@ async function handleRequest<S extends Session> (
): Promise<void> { ): Promise<void> {
const request = readRequest(msg) const request = readRequest(msg)
if (request.id === -1 && request.method === 'hello') { if (request.id === -1 && request.method === 'hello') {
console.log('hello happen', service.getUser()) if (LOGGING_ENABLED) console.timeLog(workspace, 'hello happen', service.getUser())
ws.send(serialize({ id: -1, result: 'hello' })) ws.send(serialize({ id: -1, result: 'hello' }))
return return
} }
@ -342,11 +346,13 @@ async function handleRequest<S extends Session> (
const st = Date.now() const st = Date.now()
timeout = setTimeout(() => { timeout = setTimeout(() => {
console.log('long request found', workspace, service.getUser(), request, params) if (LOGGING_ENABLED) console.timeLog(workspace, 'long request found', service.getUser(), request, params)
}, 4000) }, 4000)
hangTimeout = setTimeout(() => { hangTimeout = setTimeout(() => {
console.log('request hang found, 30sec', workspace, service.getUser(), request, params) if (LOGGING_ENABLED) {
console.timeLog(workspace, 'request hang found, 30sec', workspace, service.getUser(), request, params)
}
}, 30000) }, 30000)
const result = await f.apply(service, params) const result = await f.apply(service, params)
@ -355,8 +361,9 @@ async function handleRequest<S extends Session> (
const resp: Response<any> = { id: request.id, result } const resp: Response<any> = { id: request.id, result }
const diff = Date.now() - st const diff = Date.now() - st
if (diff > 5000) { if (diff > 5000 && LOGGING_ENABLED) {
console.log( console.timeLog(
timeout,
'very long request found', 'very long request found',
workspace, workspace,
service.getUser(), service.getUser(),
@ -368,7 +375,7 @@ async function handleRequest<S extends Session> (
} }
ws.send(serialize(resp)) ws.send(serialize(resp))
} catch (err: any) { } catch (err: any) {
console.error(err) if (LOGGING_ENABLED) console.error(err)
clearTimeout(timeout) clearTimeout(timeout)
clearTimeout(hangTimeout) clearTimeout(hangTimeout)
const resp: Response<any> = { const resp: Response<any> = {
@ -393,7 +400,7 @@ export function start (
productId: string, productId: string,
host?: string host?: string
): () => Promise<void> { ): () => Promise<void> {
console.log(`starting server on port ${port} ...`) if (LOGGING_ENABLED) console.log(`starting server on port ${port} ...`)
const sessions = new SessionManager(sessionFactory) const sessions = new SessionManager(sessionFactory)
@ -438,12 +445,14 @@ export function start (
} }
// remove session after 1seconds, give a time to reconnect. // remove session after 1seconds, give a time to reconnect.
if (code === 1000) { if (code === 1000) {
console.log(`client "${token.email}" closed normally`) if (LOGGING_ENABLED) console.log(`client "${token.email}" closed normally`)
void sessions.close(ctx, ws, token.workspace, code, reason.toString()) void sessions.close(ctx, ws, token.workspace, code, reason.toString())
} else { } else {
console.log(`client "${token.email}" closed abnormally, waiting reconnect`, code, reason.toString()) if (LOGGING_ENABLED) {
console.log(`client "${token.email}" closed abnormally, waiting reconnect`, code, reason.toString())
}
session.closeTimeout = setTimeout(() => { session.closeTimeout = setTimeout(() => {
console.log(`client "${token.email}" force closed`) if (LOGGING_ENABLED) console.log(`client "${token.email}" force closed`)
void sessions.close(ctx, ws, token.workspace, code, reason.toString()) void sessions.close(ctx, ws, token.workspace, code, reason.toString())
}, 10000) }, 10000)
} }
@ -463,7 +472,7 @@ export function start (
try { try {
const payload = decodeToken(token ?? '') const payload = decodeToken(token ?? '')
const sessionId = url.searchParams.get('sessionId') const sessionId = url.searchParams.get('sessionId')
console.log('client connected with payload', payload, sessionId) if (LOGGING_ENABLED) console.log('client connected with payload', payload, sessionId)
if (payload.workspace.productId !== productId) { if (payload.workspace.productId !== productId) {
throw new Error('Invalid workspace product') throw new Error('Invalid workspace product')
@ -471,7 +480,7 @@ export function start (
wss.handleUpgrade(request, socket, head, (ws) => wss.emit('connection', ws, request, payload, sessionId)) wss.handleUpgrade(request, socket, head, (ws) => wss.emit('connection', ws, request, payload, sessionId))
} catch (err) { } catch (err) {
console.error('invalid token', err) if (LOGGING_ENABLED) console.error('invalid token', err)
wss.handleUpgrade(request, socket, head, (ws) => { wss.handleUpgrade(request, socket, head, (ws) => {
const resp: Response<any> = { const resp: Response<any> = {
id: -1, id: -1,