mirror of
https://github.com/hcengineering/platform.git
synced 2025-04-13 11:50:56 +00:00
UBERF-8185: Fix duplicate hierarchy clases (#6660)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
f998dc6faa
commit
1e0edd4eb5
@ -80,8 +80,11 @@
|
||||
<div class="flex-column p-3 h-full" style:overflow="auto">
|
||||
{#each Object.entries(activeSessions) as act}
|
||||
{@const wsInstance = $workspacesStore.find((it) => it.workspaceId === act[0])}
|
||||
{@const totalFind = act[1].sessions.reduce((it, itm) => itm.current.find + it, 0)}
|
||||
{@const totalTx = act[1].sessions.reduce((it, itm) => itm.current.tx + it, 0)}
|
||||
{@const totalFind = act[1].sessions.reduce((it, itm) => itm.total.find + it, 0)}
|
||||
{@const totalTx = act[1].sessions.reduce((it, itm) => itm.total.tx + it, 0)}
|
||||
|
||||
{@const currentFind = act[1].sessions.reduce((it, itm) => itm.current.find + it, 0)}
|
||||
{@const currentTx = act[1].sessions.reduce((it, itm) => itm.current.tx + it, 0)}
|
||||
{@const employeeGroups = Array.from(new Set(act[1].sessions.map((it) => it.userId))).filter(
|
||||
(it) => systemAccountEmail !== it || !realUsers
|
||||
)}
|
||||
@ -94,7 +97,8 @@
|
||||
<svelte:fragment slot="title">
|
||||
<div class="flex flex-row-center flex-between flex-grow p-1">
|
||||
<div class="fs-title" class:greyed={realGroup.length === 0}>
|
||||
Workspace: {wsInstance?.workspaceName ?? act[0]}: {employeeGroups.length} current 5 mins => {totalFind}/{totalTx}
|
||||
Workspace: {wsInstance?.workspaceName ?? act[0]}: {employeeGroups.length} current 5 mins => {currentFind}/{currentTx},
|
||||
total => {totalFind}/{totalTx}
|
||||
{#if act[1].upgrading}
|
||||
(Upgrading)
|
||||
{/if}
|
||||
@ -138,7 +142,7 @@
|
||||
{/if}
|
||||
: {connections.length}
|
||||
<div class="ml-4">
|
||||
<div class="ml-1">{find}/{txes}</div>
|
||||
<div class="ml-1">{find} rx/{txes} tx</div>
|
||||
</div>
|
||||
</div>
|
||||
</svelte:fragment>
|
||||
@ -147,13 +151,13 @@
|
||||
#{i}
|
||||
{user.userId}
|
||||
<div class="p-1">
|
||||
Total: {user.total.find}/{user.total.tx}
|
||||
Total: {user.total.find} rx/{user.total.tx} tx
|
||||
</div>
|
||||
<div class="p-1">
|
||||
Previous 5 mins: {user.mins5.find}/{user.mins5.tx}
|
||||
Previous 5 mins: {user.mins5.find} rx/{user.mins5.tx} tx
|
||||
</div>
|
||||
<div class="p-1">
|
||||
Current 5 mins: {user.current.find}/{user.current.tx}
|
||||
Current 5 mins: {user.current.find} tx/{user.current.tx} tx
|
||||
</div>
|
||||
</div>
|
||||
<div class="p-1 flex-col ml-10">
|
||||
|
@ -1812,7 +1812,7 @@ export async function setRole (
|
||||
})
|
||||
}
|
||||
} finally {
|
||||
if (client === undefined) {
|
||||
if (client == null) {
|
||||
await connection.close()
|
||||
}
|
||||
}
|
||||
@ -2049,7 +2049,7 @@ async function createPersonAccount (
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (client === undefined) {
|
||||
if (client == null) {
|
||||
await connection.close()
|
||||
}
|
||||
}
|
||||
|
@ -14,11 +14,12 @@
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import { type Domain, type MeasureContext } from '@hcengineering/core'
|
||||
import { Analytics } from '@hcengineering/analytics'
|
||||
import { DOMAIN_TX, type Domain, type MeasureContext } from '@hcengineering/core'
|
||||
import { type DbAdapter, type DomainHelper } from './adapter'
|
||||
import type { DbConfiguration } from './configuration'
|
||||
import { DummyDbAdapter } from './mem'
|
||||
import type { DBAdapterManager, PipelineContext } from './types'
|
||||
import { Analytics } from '@hcengineering/analytics'
|
||||
|
||||
interface DomainInfo {
|
||||
exists: boolean
|
||||
@ -35,7 +36,7 @@ export class DbAdapterManagerImpl implements DBAdapterManager {
|
||||
constructor (
|
||||
private readonly metrics: MeasureContext,
|
||||
|
||||
private readonly _domains: Record<string, string>,
|
||||
readonly conf: DbConfiguration,
|
||||
private readonly context: PipelineContext,
|
||||
private readonly defaultAdapter: DbAdapter,
|
||||
private readonly adapters: Map<string, DbAdapter>
|
||||
@ -86,8 +87,36 @@ export class DbAdapterManagerImpl implements DBAdapterManager {
|
||||
}
|
||||
}
|
||||
|
||||
async initAdapters (ctx: MeasureContext): Promise<void> {
|
||||
await ctx.with('init-adapters', {}, async (ctx) => {
|
||||
for (const [key, adapter] of this.adapters) {
|
||||
// already initialized
|
||||
if (key !== this.conf.domains[DOMAIN_TX] && adapter.init !== undefined) {
|
||||
let excludeDomains: string[] | undefined
|
||||
let domains: string[] | undefined
|
||||
if (this.conf.defaultAdapter === key) {
|
||||
excludeDomains = []
|
||||
for (const domain in this.conf.domains) {
|
||||
if (this.conf.domains[domain] !== key) {
|
||||
excludeDomains.push(domain)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
domains = []
|
||||
for (const domain in this.conf.domains) {
|
||||
if (this.conf.domains[domain] === key) {
|
||||
domains.push(domain)
|
||||
}
|
||||
}
|
||||
}
|
||||
await adapter.init(domains, excludeDomains)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
private async updateInfo (d: Domain, adapterDomains: Map<DbAdapter, Set<Domain>>, info: DomainInfo): Promise<void> {
|
||||
const name = this._domains[d] ?? '#default'
|
||||
const name = this.conf.domains[d] ?? '#default'
|
||||
const adapter = this.adapters.get(name) ?? this.defaultAdapter
|
||||
if (adapter !== undefined) {
|
||||
const h = adapter.helper?.()
|
||||
@ -129,7 +158,7 @@ export class DbAdapterManagerImpl implements DBAdapterManager {
|
||||
}
|
||||
|
||||
public getAdapter (domain: Domain, requireExists: boolean): DbAdapter {
|
||||
const name = this._domains[domain] ?? '#default'
|
||||
const name = this.conf.domains[domain] ?? '#default'
|
||||
const adapter = this.adapters.get(name) ?? this.defaultAdapter
|
||||
if (adapter === undefined) {
|
||||
throw new Error('adapter not provided: ' + name)
|
||||
|
@ -134,6 +134,8 @@ export interface DBAdapterManager {
|
||||
close: () => Promise<void>
|
||||
|
||||
registerHelper: (helper: DomainHelper) => Promise<void>
|
||||
|
||||
initAdapters: (ctx: MeasureContext) => Promise<void>
|
||||
}
|
||||
|
||||
export interface PipelineContext {
|
||||
|
@ -77,41 +77,6 @@ export class DBAdapterMiddleware extends BaseMiddleware implements Middleware {
|
||||
}
|
||||
}
|
||||
await txAdapter.init?.(txAdapterDomains)
|
||||
const model = await txAdapter.getModel(ctx)
|
||||
|
||||
for (const tx of model) {
|
||||
try {
|
||||
this.context.hierarchy.tx(tx)
|
||||
} catch (err: any) {
|
||||
ctx.warn('failed to apply model transaction, skipping', { tx: JSON.stringify(tx), err })
|
||||
}
|
||||
}
|
||||
|
||||
await ctx.with('init-adapters', {}, async (ctx) => {
|
||||
for (const [key, adapter] of adapters) {
|
||||
// already initialized
|
||||
if (key !== this.conf.domains[DOMAIN_TX] && adapter.init !== undefined) {
|
||||
let excludeDomains: string[] | undefined
|
||||
let domains: string[] | undefined
|
||||
if (this.conf.defaultAdapter === key) {
|
||||
excludeDomains = []
|
||||
for (const domain in this.conf.domains) {
|
||||
if (this.conf.domains[domain] !== key) {
|
||||
excludeDomains.push(domain)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
domains = []
|
||||
for (const domain in this.conf.domains) {
|
||||
if (this.conf.domains[domain] === key) {
|
||||
domains.push(domain)
|
||||
}
|
||||
}
|
||||
}
|
||||
await adapter.init(domains, excludeDomains)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
const metrics = this.conf.metrics.newChild('📔 server-storage', {})
|
||||
|
||||
@ -127,7 +92,7 @@ export class DBAdapterMiddleware extends BaseMiddleware implements Middleware {
|
||||
|
||||
// We need to init all next, since we will use model
|
||||
|
||||
const adapterManager = new DbAdapterManagerImpl(metrics, this.conf.domains, this.context, defaultAdapter, adapters)
|
||||
const adapterManager = new DbAdapterManagerImpl(metrics, this.conf, this.context, defaultAdapter, adapters)
|
||||
this.context.adapterManager = adapterManager
|
||||
}
|
||||
|
||||
|
@ -20,12 +20,13 @@ import { BaseMiddleware, DomainIndexHelperImpl } from '@hcengineering/server-cor
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export class DBAdapterHelperMiddleware extends BaseMiddleware implements Middleware {
|
||||
export class DBAdapterInitMiddleware extends BaseMiddleware implements Middleware {
|
||||
static async create (
|
||||
ctx: MeasureContext,
|
||||
context: PipelineContext,
|
||||
next?: Middleware
|
||||
): Promise<Middleware | undefined> {
|
||||
await context.adapterManager?.initAdapters?.(ctx)
|
||||
const domainHelper = new DomainIndexHelperImpl(ctx, context.hierarchy, context.modelDb, context.workspace)
|
||||
await context.adapterManager?.registerHelper?.(domainHelper)
|
||||
return undefined
|
||||
|
@ -312,7 +312,9 @@ abstract class MongoAdapterBase implements DbAdapter {
|
||||
}
|
||||
const baseClass = this.hierarchy.getBaseClass(clazz)
|
||||
if (baseClass !== core.class.Doc) {
|
||||
const classes = this.hierarchy.getDescendants(baseClass).filter((it) => !this.hierarchy.isMixin(it))
|
||||
const classes = Array.from(
|
||||
new Set(this.hierarchy.getDescendants(baseClass).filter((it) => !this.hierarchy.isMixin(it)))
|
||||
)
|
||||
|
||||
// Only replace if not specified
|
||||
if (translatedBase._class === undefined) {
|
||||
@ -334,7 +336,9 @@ abstract class MongoAdapterBase implements DbAdapter {
|
||||
descendants = descendants.filter((c) => !excludedClassesIds.has(c))
|
||||
}
|
||||
|
||||
const desc = descendants.filter((it: any) => !this.hierarchy.isMixin(it as Ref<Class<Doc>>))
|
||||
const desc = Array.from(
|
||||
new Set(descendants.filter((it: any) => !this.hierarchy.isMixin(it as Ref<Class<Doc>>)))
|
||||
)
|
||||
translatedBase._class = desc.length === 1 ? desc[0] : { $in: desc }
|
||||
}
|
||||
|
||||
@ -348,6 +352,9 @@ abstract class MongoAdapterBase implements DbAdapter {
|
||||
delete translatedBase._class
|
||||
}
|
||||
}
|
||||
if (translatedBase._class?.$in != null && Array.isArray(translatedBase._class.$in)) {
|
||||
translatedBase._class.$in = Array.from(new Set(translatedBase._class.$in))
|
||||
}
|
||||
if (translatedBase._class?.$in?.length === 1 && translatedBase._class?.$nin === undefined) {
|
||||
translatedBase._class = translatedBase._class.$in[0]
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ import {
|
||||
BroadcastMiddleware,
|
||||
ConfigurationMiddleware,
|
||||
ContextNameMiddleware,
|
||||
DBAdapterHelperMiddleware,
|
||||
DBAdapterInitMiddleware,
|
||||
DBAdapterMiddleware,
|
||||
DomainFindMiddleware,
|
||||
DomainTxMiddleware,
|
||||
@ -135,7 +135,7 @@ export function createServerPipeline (
|
||||
LiveQueryMiddleware.create,
|
||||
DomainFindMiddleware.create,
|
||||
DomainTxMiddleware.create,
|
||||
DBAdapterHelperMiddleware.create,
|
||||
DBAdapterInitMiddleware.create,
|
||||
ModelMiddleware.create,
|
||||
DBAdapterMiddleware.create(conf), // Configure DB adapters
|
||||
BroadcastMiddleware.create(broadcast)
|
||||
|
@ -48,6 +48,9 @@ import {
|
||||
} from './types'
|
||||
import { sendResponse } from './utils'
|
||||
|
||||
const ticksPerSecond = 20
|
||||
const workspaceSoftShutdownTicks = 3 * ticksPerSecond
|
||||
|
||||
interface WorkspaceLoginInfo extends Omit<BaseWorkspaceInfo, 'workspace'> {
|
||||
upgrade?: {
|
||||
toProcess: number
|
||||
@ -75,7 +78,7 @@ function timeoutPromise (time: number): { promise: Promise<void>, cancelHandle:
|
||||
*/
|
||||
export interface Timeouts {
|
||||
// Timeout preferences
|
||||
pingTimeout: number // Default 1 second
|
||||
pingTimeout: number // Default 10 second
|
||||
reconnectTimeout: number // Default 3 seconds
|
||||
}
|
||||
|
||||
@ -113,8 +116,8 @@ class TSessionManager implements SessionManager {
|
||||
}
|
||||
) {
|
||||
this.checkInterval = setInterval(() => {
|
||||
this.handleInterval()
|
||||
}, timeouts.pingTimeout)
|
||||
this.handleTick()
|
||||
}, 1000 / ticksPerSecond)
|
||||
}
|
||||
|
||||
scheduleMaintenance (timeMinutes: number): void {
|
||||
@ -168,47 +171,58 @@ class TSessionManager implements SessionManager {
|
||||
|
||||
ticks = 0
|
||||
|
||||
handleInterval (): void {
|
||||
handleTick (): void {
|
||||
for (const [wsId, workspace] of this.workspaces.entries()) {
|
||||
for (const s of workspace.sessions) {
|
||||
if (this.ticks % (5 * 60) === 0) {
|
||||
if (this.ticks % (5 * 60 * ticksPerSecond) === 0) {
|
||||
s[1].session.mins5.find = s[1].session.current.find
|
||||
s[1].session.mins5.tx = s[1].session.current.tx
|
||||
|
||||
s[1].session.current = { find: 0, tx: 0 }
|
||||
}
|
||||
const now = Date.now()
|
||||
const diff = now - s[1].session.lastRequest
|
||||
const lastRequestDiff = now - s[1].session.lastRequest
|
||||
|
||||
let timeout = 60000
|
||||
if (s[1].session.getUser() === systemAccountEmail) {
|
||||
timeout = timeout * 10
|
||||
}
|
||||
|
||||
if (diff > timeout && this.ticks % 10 === 0) {
|
||||
this.ctx.warn('session hang, closing...', { wsId, user: s[1].session.getUser() })
|
||||
const isCurrentUserTick = this.ticks % ticksPerSecond === s[1].tickHash
|
||||
|
||||
// Force close workspace if only one client and it hang.
|
||||
void this.close(this.ctx, s[1].socket, wsId)
|
||||
continue
|
||||
}
|
||||
if (diff > 20000 && diff < 60000 && this.ticks % 10 === 0) {
|
||||
s[1].socket.send(workspace.context, { result: 'ping' }, s[1].session.binaryMode, s[1].session.useCompression)
|
||||
}
|
||||
if (isCurrentUserTick) {
|
||||
if (lastRequestDiff > timeout) {
|
||||
this.ctx.warn('session hang, closing...', { wsId, user: s[1].session.getUser() })
|
||||
|
||||
for (const r of s[1].session.requests.values()) {
|
||||
if (now - r.start > 30000) {
|
||||
this.ctx.warn('request hang found, 30sec', {
|
||||
wsId,
|
||||
user: s[1].session.getUser(),
|
||||
...r.params
|
||||
})
|
||||
// Force close workspace if only one client and it hang.
|
||||
void this.close(this.ctx, s[1].socket, wsId)
|
||||
continue
|
||||
}
|
||||
if (lastRequestDiff + (1 / 10) * lastRequestDiff > this.timeouts.pingTimeout) {
|
||||
// We need to check state and close socket if it broken
|
||||
if (s[1].socket.checkState()) {
|
||||
s[1].socket.send(
|
||||
workspace.context,
|
||||
{ result: 'ping' },
|
||||
s[1].session.binaryMode,
|
||||
s[1].session.useCompression
|
||||
)
|
||||
}
|
||||
}
|
||||
for (const r of s[1].session.requests.values()) {
|
||||
if (now - r.start > 30000) {
|
||||
this.ctx.warn('request hang found, 30sec', {
|
||||
wsId,
|
||||
user: s[1].session.getUser(),
|
||||
...r.params
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait some time for new client to appear before closing workspace.
|
||||
if (workspace.sessions.size === 0 && workspace.closing === undefined) {
|
||||
if (workspace.sessions.size === 0 && workspace.closing === undefined && workspace.workspaceInitCompleted) {
|
||||
workspace.softShutdown--
|
||||
if (workspace.softShutdown <= 0) {
|
||||
this.ctx.warn('closing workspace, no users', {
|
||||
@ -219,7 +233,7 @@ class TSessionManager implements SessionManager {
|
||||
workspace.closing = this.performWorkspaceCloseCheck(workspace, workspace.workspaceId, wsId)
|
||||
}
|
||||
} else {
|
||||
workspace.softShutdown = 3
|
||||
workspace.softShutdown = workspaceSoftShutdownTicks
|
||||
}
|
||||
|
||||
if (this.clientErrors !== this.oldClientErrors) {
|
||||
@ -270,6 +284,8 @@ class TSessionManager implements SessionManager {
|
||||
}
|
||||
}
|
||||
|
||||
sessionCounter = 0
|
||||
|
||||
@withContext('📲 add-session')
|
||||
async addSession (
|
||||
ctx: MeasureContext,
|
||||
@ -423,7 +439,13 @@ class TSessionManager implements SessionManager {
|
||||
session.sessionInstanceId = generateId()
|
||||
this.sessions.set(ws.id, { session, socket: ws })
|
||||
// We need to delete previous session with Id if found.
|
||||
workspace.sessions.set(session.sessionId, { session, socket: ws })
|
||||
this.sessionCounter++
|
||||
workspace.sessions.set(session.sessionId, { session, socket: ws, tickHash: this.sessionCounter % ticksPerSecond })
|
||||
|
||||
// Mark workspace as init completed and we had at least one client.
|
||||
if (!workspace.workspaceInitCompleted) {
|
||||
workspace.workspaceInitCompleted = true
|
||||
}
|
||||
|
||||
// We do not need to wait for set-status, just return session to client
|
||||
const _workspace = workspace
|
||||
@ -593,11 +615,12 @@ class TSessionManager implements SessionManager {
|
||||
branding
|
||||
),
|
||||
sessions: new Map(),
|
||||
softShutdown: 3,
|
||||
softShutdown: workspaceSoftShutdownTicks,
|
||||
upgrade,
|
||||
workspaceId: token.workspace,
|
||||
workspaceName,
|
||||
branding
|
||||
branding,
|
||||
workspaceInitCompleted: false
|
||||
}
|
||||
this.workspaces.set(toWorkspaceString(token.workspace), workspace)
|
||||
return workspace
|
||||
|
@ -93,6 +93,8 @@ export interface ConnectionSocket {
|
||||
data: () => Record<string, any>
|
||||
|
||||
readRequest: (buffer: Buffer, binary: boolean) => Request<any>
|
||||
|
||||
checkState: () => boolean
|
||||
}
|
||||
|
||||
/**
|
||||
@ -114,11 +116,12 @@ export interface Workspace {
|
||||
context: MeasureContext
|
||||
id: string
|
||||
pipeline: Promise<Pipeline>
|
||||
sessions: Map<string, { session: Session, socket: ConnectionSocket }>
|
||||
sessions: Map<string, { session: Session, socket: ConnectionSocket, tickHash: number }>
|
||||
upgrade: boolean
|
||||
|
||||
closing?: Promise<void>
|
||||
softShutdown: number
|
||||
workspaceInitCompleted: boolean
|
||||
|
||||
workspaceId: WorkspaceId
|
||||
workspaceName: string
|
||||
|
@ -310,7 +310,9 @@ export function startHttpServer (
|
||||
}
|
||||
: false,
|
||||
skipUTF8Validation: true,
|
||||
maxPayload: 250 * 1024 * 1024
|
||||
maxPayload: 250 * 1024 * 1024,
|
||||
backlog: 1000,
|
||||
clientTracking: false // We do not need to track clients inside clients.
|
||||
})
|
||||
// eslint-disable-next-line @typescript-eslint/no-misused-promises
|
||||
const handleConnection = async (
|
||||
@ -492,6 +494,14 @@ function createWebsocketClientSocket (
|
||||
close: () => {
|
||||
cs.isClosed = true
|
||||
ws.close()
|
||||
ws.terminate()
|
||||
},
|
||||
checkState: () => {
|
||||
if (ws.readyState === ws.CLOSED || ws.readyState === ws.CLOSING) {
|
||||
ws.terminate()
|
||||
return false
|
||||
}
|
||||
return true
|
||||
},
|
||||
readRequest: (buffer: Buffer, binary: boolean) => {
|
||||
return rpcHandler.readRequest(buffer, binary)
|
||||
|
Loading…
Reference in New Issue
Block a user