UBERF-9747

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2025-05-06 16:54:48 +07:00
parent d3a40bb48e
commit 8e9e875936
No known key found for this signature in database
GPG Key ID: BD80F68D68D8F7F2
36 changed files with 1227 additions and 1115 deletions

3
.vscode/launch.json vendored
View File

@ -103,7 +103,8 @@
"UPLOAD_URL": "/files",
"AI_BOT_URL": "http://localhost:4010",
"STATS_URL": "http://huly.local:4900",
"QUEUE_CONFIG": "localhost:19092"
"QUEUE_CONFIG": "localhost:19092",
"RATE_LIMIT_MAX": "25000"
},
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
"runtimeVersion": "20",

View File

@ -49,9 +49,6 @@ export class TTx extends TDoc implements Tx {
objectSpace!: Ref<Space>
}
@Model(core.class.TxModelUpgrade, core.class.Tx, DOMAIN_TX)
export class TTxModelUpgrade extends TTx {}
@Model(core.class.TxCUD, core.class.Tx)
export class TTxCUD<T extends Doc> extends TTx implements TxCUD<T> {
@Prop(TypeRef(core.class.Doc), core.string.Object)

View File

@ -14,8 +14,8 @@
// limitations under the License.
//
import { IntlString, Plugin } from '@hcengineering/platform'
import { ClientConnectEvent, DocChunk } from '..'
import type { Class, Data, Doc, Domain, PluginConfiguration, Ref, Timestamp } from '../classes'
import { ClientConnectEvent, DocChunk, type WorkspaceUuid } from '..'
import type { Account, Class, Data, Doc, Domain, PluginConfiguration, Ref, Timestamp } from '../classes'
import { ClassifierKind, DOMAIN_MODEL, Space } from '../classes'
import { ClientConnection, createClient } from '../client'
import core from '../component'
@ -104,21 +104,25 @@ describe('client', () => {
}
return new (class implements ClientConnection {
handler?: (event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>
handler?: (event: ClientConnectEvent, lastTx: Record<WorkspaceUuid, string | undefined> | undefined, data: any) => Promise<void>
set onConnect (
handler: ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>) | undefined
handler: ((event: ClientConnectEvent, lastTx: Record<WorkspaceUuid, string | undefined> | undefined, data: any) => Promise<void>) | undefined
) {
this.handler = handler
void this.handler?.(ClientConnectEvent.Connected, '', {})
void this.handler?.(ClientConnectEvent.Connected, {}, {})
}
get onConnect ():
| ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>)
| ((event: ClientConnectEvent, lastTx: Record<WorkspaceUuid, string | undefined> | undefined, data: any) => Promise<void>)
| undefined {
return this.handler
}
getAccount (): Promise<Account > {
throw new Error('Method not implemented.')
}
isConnected = (): boolean => true
findAll = findAll
pushHandler = (): void => {}

View File

@ -13,8 +13,8 @@
// limitations under the License.
//
import { ClientConnectEvent, DocChunk, generateId } from '..'
import type { Class, Doc, Domain, Ref, Timestamp } from '../classes'
import { ClientConnectEvent, DocChunk, generateId, type WorkspaceUuid } from '..'
import type { Account, Class, Doc, Domain, Ref, Timestamp } from '../classes'
import { ClientConnection } from '../client'
import core from '../component'
import { Hierarchy } from '../hierarchy'
@ -47,19 +47,23 @@ export async function connect (handler: (tx: Tx) => void): Promise<ClientConnect
isConnected = (): boolean => true
findAll = findAll
handler?: (event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>
handler?: (event: ClientConnectEvent, lastTx: Record<WorkspaceUuid, string | undefined> | undefined, data: any) => Promise<void>
set onConnect (
handler: ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>) | undefined
handler: ((event: ClientConnectEvent, lastTx: Record<WorkspaceUuid, string | undefined> | undefined, data: any) => Promise<void>) | undefined
) {
this.handler = handler
void this.handler?.(ClientConnectEvent.Connected, '', {})
void this.handler?.(ClientConnectEvent.Connected, {}, {})
}
get onConnect (): ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>) | undefined {
get onConnect (): ((event: ClientConnectEvent, lastTx: Record<WorkspaceUuid, string | undefined> | undefined, data: any) => Promise<void>) | undefined {
return this.handler
}
getAccount (): Promise<Account> {
throw new Error('Method not implemented.')
}
async searchFulltext (query: SearchQuery, options: SearchOptions): Promise<SearchResult> {
return { docs: [] }
}

View File

@ -71,7 +71,10 @@ export interface Obj {
export interface Account {
uuid: AccountUuid
role: AccountRole
roles: Record<WorkspaceUuid, AccountRole>
role: AccountRole // Role in current workspace
targetWorkspace: WorkspaceUuid // In case target workspace is used, it will be filled with personal workspace UUID in case of all workspaces mode.
primarySocialId: PersonId
socialIds: PersonId[]
fullSocialIds: SocialId[]
@ -519,7 +522,8 @@ export enum AccountRole {
Guest = 'GUEST',
User = 'USER',
Maintainer = 'MAINTAINER',
Owner = 'OWNER'
Owner = 'OWNER',
Undetermined = 'UNDETERMINED' // In case of multi workspaces mode
}
/**
@ -528,6 +532,7 @@ export enum AccountRole {
export const roleOrder: Record<AccountRole, number> = {
[AccountRole.DocGuest]: 10,
[AccountRole.Guest]: 20,
[AccountRole.Undetermined]: 25,
[AccountRole.User]: 30,
[AccountRole.Maintainer]: 40,
[AccountRole.Owner]: 50

View File

@ -15,7 +15,7 @@
import { Analytics } from '@hcengineering/analytics'
import { BackupClient, DocChunk } from './backup'
import { Class, DOMAIN_MODEL, Doc, Domain, Ref, Timestamp } from './classes'
import { Class, DOMAIN_MODEL, Doc, Domain, Ref, Timestamp, type Account } from './classes'
import core from './component'
import { Hierarchy } from './hierarchy'
import { MeasureContext, MeasureMetricsContext } from './measurements'
@ -23,7 +23,8 @@ import { ModelDb } from './memdb'
import type { DocumentQuery, FindOptions, FindResult, FulltextStorage, Storage, TxResult, WithLookup } from './storage'
import { SearchOptions, SearchQuery, SearchResult } from './storage'
import { Tx, TxCUD, WorkspaceEvent, type TxWorkspaceEvent } from './tx'
import { platformNow, platformNowDiff, toFindResult } from './utils'
import { platformNow, platformNowDiff, toFindResult, type WorkspaceUuid } from './utils'
import { deepEqual } from 'fast-equals'
/**
* @public
@ -80,13 +81,15 @@ export interface ClientConnection extends Storage, FulltextStorage, BackupClient
isConnected: () => boolean
close: () => Promise<void>
onConnect?: (event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>
onConnect?: (event: ClientConnectEvent, lastTx: Record<WorkspaceUuid, string | undefined> | undefined, data: any) => Promise<void>
// If hash is passed, will return LoadModelResponse
loadModel: (last: Timestamp, hash?: string) => Promise<Tx[] | LoadModelResponse>
getLastHash?: (ctx: MeasureContext) => Promise<string | undefined>
getLastHash?: (ctx: MeasureContext) => Promise<Record<WorkspaceUuid, string | undefined>>
pushHandler: (handler: Handler) => void
getAccount: () => Promise<Account>
}
class ClientImpl implements Client, BackupClient {
@ -237,7 +240,7 @@ export async function createClient (
let hierarchy = new Hierarchy()
let model = new ModelDb(hierarchy)
let lastTx: string | undefined
let lastTx: Record<WorkspaceUuid, string | undefined> | undefined
function txHandler (...tx: Tx[]): void {
if (tx == null || tx.length === 0) {
@ -282,7 +285,7 @@ export async function createClient (
txBuffer = undefined
const oldOnConnect:
| ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>)
| ((event: ClientConnectEvent, lastTx: Record<WorkspaceUuid, string | undefined> | undefined, data: any) => Promise<void>)
| undefined = conn.onConnect
conn.onConnect = async (event, _lastTx, data) => {
console.log('Client: onConnect', event)
@ -324,7 +327,7 @@ export async function createClient (
return
}
if (lastTx === _lastTx) {
if (deepEqual(lastTx, _lastTx)) {
// Same lastTx, no need to refresh
await oldOnConnect?.(ClientConnectEvent.Reconnected, _lastTx, data)
return
@ -362,9 +365,13 @@ async function loadModel (
hash: ''
}
if (conn.getLastHash !== undefined && (await conn.getLastHash(ctx)) === current.hash) {
// We have same model hash.
return { mode: 'same', current: current.transactions, addition: [] }
if (conn.getLastHash !== undefined) {
const lastHash = await conn.getLastHash(ctx)
const account = await conn.getAccount()
if (lastHash[account.targetWorkspace] === current.hash) {
// We have same model hash.
return { mode: 'same', current: current.transactions, addition: [] }
}
}
const lastTxTime = getLastTxTime(current.transactions)
const result = await ctx.with('connection-load-model', { hash: current.hash !== '' }, (ctx) =>

View File

@ -15,9 +15,9 @@
import type { Asset, IntlString, Metadata, Plugin, StatusCode } from '@hcengineering/platform'
import { plugin } from '@hcengineering/platform'
import type { BenchmarkDoc } from './benchmark'
import { AccountRole } from './classes'
import type {
Account,
AccountUuid,
AnyAttribute,
ArrOf,
Association,
@ -59,9 +59,9 @@ import type {
TypeAny,
TypedSpace,
UserStatus,
Version,
AccountUuid
Version
} from './classes'
import { AccountRole } from './classes'
import { Status, StatusCategory } from './status'
import type {
Tx,
@ -69,11 +69,11 @@ import type {
TxCUD,
TxCreateDoc,
TxMixin,
TxModelUpgrade,
TxRemoveDoc,
TxUpdateDoc,
TxWorkspaceEvent
} from './tx'
import type { WorkspaceUuid } from './utils'
/**
* @public
@ -88,7 +88,9 @@ export const systemAccountEmail = 'anticrm@hc.engineering'
export const systemAccountUuid = '1749089e-22e6-48de-af4e-165e18fbd2f9' as AccountUuid
export const systemAccount: Account = {
uuid: systemAccountUuid,
roles: { },
role: AccountRole.Owner,
targetWorkspace: systemAccountUuid as any as WorkspaceUuid,
primarySocialId: '' as PersonId,
socialIds: [],
fullSocialIds: []
@ -107,7 +109,6 @@ export default plugin(coreId, {
Interface: '' as Ref<Class<Interface<Doc>>>,
Attribute: '' as Ref<Class<AnyAttribute>>,
Tx: '' as Ref<Class<Tx>>,
TxModelUpgrade: '' as Ref<Class<TxModelUpgrade>>,
TxWorkspaceEvent: '' as Ref<Class<TxWorkspaceEvent>>,
TxApplyIf: '' as Ref<Class<TxApplyIf>>,
TxCUD: '' as Ref<Class<TxCUD<Doc>>>,

View File

@ -34,7 +34,7 @@ import { setObjectValue } from './objvalue'
import { _getOperator } from './operator'
import { _toDoc } from './proxy'
import type { DocumentQuery, TxResult } from './storage'
import { generateId } from './utils'
import { generateId, type WorkspaceUuid } from './utils'
/**
* @public
@ -52,7 +52,9 @@ export enum WorkspaceEvent {
SecurityChange,
MaintenanceNotification,
BulkUpdate,
LastTx
LastTx,
WorkpaceActive,
ModelUpgrade = 7
}
/**
@ -62,6 +64,7 @@ export enum WorkspaceEvent {
export interface TxWorkspaceEvent<T = any> extends Tx {
event: WorkspaceEvent
params: T
workspace?: WorkspaceUuid
}
/**
@ -78,11 +81,6 @@ export interface BulkUpdateEvent {
_class: Ref<Class<Doc>>[]
}
/**
* @public
*/
export interface TxModelUpgrade extends Tx {}
/**
* @public
*/

View File

@ -762,7 +762,10 @@ export function isOwnerOrMaintainer (): boolean {
}
export function hasAccountRole (acc: Account, targerRole: AccountRole): boolean {
return roleOrder[acc.role] >= roleOrder[targerRole]
if (acc.targetWorkspace == null) {
throw new Error('Account has no target workspace')
}
return roleOrder[acc.roles[acc.targetWorkspace]] >= roleOrder[targerRole]
}
export function getBranding (brandings: BrandingMap, key: string | undefined): Branding | null {

View File

@ -39,7 +39,9 @@ import core, {
Timestamp,
Tx,
TxDb,
TxResult
TxResult,
type Account,
type WorkspaceUuid
} from '@hcengineering/core'
import { genMinModel } from './minmodel'
@ -50,6 +52,7 @@ FulltextStorage & {
isConnected: () => boolean
loadModel: (last: Timestamp, hash?: string) => Promise<Tx[] | LoadModelResponse>
pushHandler: (handler: Handler) => void
getAccount: () => Promise<Account>
}
> {
const txes = genMinModel()
@ -150,18 +153,22 @@ FulltextStorage & {
async sendForceClose (): Promise<void> {}
handler?: (event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>
handler?: (event: ClientConnectEvent, lastTx: Record<WorkspaceUuid, string | undefined> | undefined, data: any) => Promise<void>
set onConnect (
handler: ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>) | undefined
handler: ((event: ClientConnectEvent, lastTx: Record<WorkspaceUuid, string | undefined> | undefined, data: any) => Promise<void>) | undefined
) {
this.handler = handler
void this.handler?.(ClientConnectEvent.Connected, '', {})
void this.handler?.(ClientConnectEvent.Connected, {}, {})
}
get onConnect (): ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>) | undefined {
get onConnect (): ((event: ClientConnectEvent, lastTx: Record<WorkspaceUuid, string | undefined> | undefined, data: any) => Promise<void>) | undefined {
return this.handler
}
getAccount (): Promise<Account> {
throw new Error('Method not implemented.')
}
}
return new TestConnection(hierarchy, model, transactions)

View File

@ -22,13 +22,24 @@ import client, {
pingConst,
pongConst
} from '@hcengineering/client'
import { EventResult } from '@hcengineering/communication-sdk-types'
import {
FindLabelsParams,
FindMessagesGroupsParams,
FindMessagesParams,
FindNotificationContextParams,
FindNotificationsParams,
Label,
Message,
MessagesGroup,
NotificationContext
} from '@hcengineering/communication-types'
import core, {
Account,
Class,
ClientConnectEvent,
ClientConnection,
clone,
Handler,
Doc,
DocChunk,
DocumentQuery,
@ -36,6 +47,7 @@ import core, {
FindOptions,
FindResult,
generateId,
Handler,
LoadModelResponse,
type MeasureContext,
MeasureMetricsContext,
@ -50,6 +62,8 @@ import core, {
TxApplyIf,
TxHandler,
TxResult,
type TxWorkspaceEvent,
WorkspaceEvent,
type WorkspaceUuid
} from '@hcengineering/core'
import platform, {
@ -60,20 +74,8 @@ import platform, {
Status,
UNAUTHORIZED
} from '@hcengineering/platform'
import { HelloRequest, HelloResponse, type RateLimitInfo, ReqId, type Response, RPCHandler } from '@hcengineering/rpc'
import { uncompress } from 'snappyjs'
import { HelloRequest, HelloResponse, ReqId, type Response, RPCHandler, type RateLimitInfo } from '@hcengineering/rpc'
import { EventResult } from '@hcengineering/communication-sdk-types'
import {
FindLabelsParams,
FindMessagesGroupsParams,
FindMessagesParams,
FindNotificationContextParams,
FindNotificationsParams,
Label,
Message,
MessagesGroup,
NotificationContext
} from '@hcengineering/communication-types'
const SECOND = 1000
const pingTimeout = 10 * SECOND
@ -134,11 +136,11 @@ class Connection implements ClientConnection {
private account: Account | undefined
onConnect?: (event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>
onConnect?: (event: ClientConnectEvent, lastTx: Record<WorkspaceUuid, string | undefined> | undefined, data: any) => Promise<void>
rpcHandler: RPCHandler
lastHash?: string
lastHash?: Record<WorkspaceUuid, string | undefined>
handlers: Handler[] = []
@ -179,9 +181,9 @@ class Connection implements ClientConnection {
this.handlers.push(handler)
}
async getLastHash (ctx: MeasureContext): Promise<string | undefined> {
async getLastHash (ctx: MeasureContext): Promise<Record<WorkspaceUuid, string | undefined>> {
await this.waitOpenConnection(ctx)
return this.lastHash
return this.lastHash ?? {}
}
private schedulePing (socketId: number): void {
@ -506,9 +508,13 @@ class Connection implements ClientConnection {
const txArr = Array.isArray(resp.result) ? (resp.result as Tx[]) : [resp.result as Tx]
for (const tx of txArr) {
if (tx?._class === core.class.TxModelUpgrade) {
console.log('Processing upgrade', this.workspace, this.user)
this.opt?.onUpgrade?.()
if (tx?._class === core.class.TxWorkspaceEvent) {
const event = tx as TxWorkspaceEvent
// TODO: Check
if (event.event === WorkspaceEvent.ModelUpgrade) {
console.log('Processing upgrade', this.workspace, this.user)
this.opt?.onUpgrade?.(event)
}
return
}
}

View File

@ -16,10 +16,17 @@
import clientPlugin from '@hcengineering/client'
import type { ClientFactoryOptions } from '@hcengineering/client/src'
import core, {
type Class,
Client,
type ClientConnection,
type Doc,
LoadModelResponse,
type ModelFilter,
type PersonUuid,
type PluginConfiguration,
type Ref,
Tx,
type TxCUD,
TxHandler,
TxPersistenceStore,
TxWorkspaceEvent,
@ -28,15 +35,8 @@ import core, {
concatLink,
createClient,
fillConfiguration,
pluginFilterTx,
type Class,
type ClientConnection,
type Doc,
type ModelFilter,
type PluginConfiguration,
type Ref,
type TxCUD,
platformNow
platformNow,
pluginFilterTx
} from '@hcengineering/core'
import platform, { Severity, Status, getMetadata, getPlugins, setPlatformStatus } from '@hcengineering/platform'
import { connect } from './connection'
@ -103,12 +103,12 @@ export default async () => {
const upgradeHandler: TxHandler = (...txes: Tx[]) => {
for (const tx of txes) {
if (tx?._class === core.class.TxModelUpgrade) {
opt?.onUpgrade?.()
return
}
if (tx?._class === core.class.TxWorkspaceEvent) {
const event = tx as TxWorkspaceEvent
if (event.event === WorkspaceEvent.ModelUpgrade) {
// TODO: Add a workspace here
opt?.onUpgrade?.(event)
}
if (event.event === WorkspaceEvent.MaintenanceNotification) {
void setPlatformStatus(
new Status(Severity.WARNING, platform.status.MaintenanceWarning, {

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import type { Client, ClientConnectEvent, MeasureContext, TxPersistenceStore } from '@hcengineering/core'
import type { Client, ClientConnectEvent, MeasureContext, TxPersistenceStore, TxWorkspaceEvent, WorkspaceUuid } from '@hcengineering/core'
import { type Plugin, type Resource, type Metadata, plugin } from '@hcengineering/platform'
/**
@ -60,11 +60,11 @@ export interface ClientFactoryOptions {
useProtocolCompression?: boolean
connectionTimeout?: number
onHello?: (serverVersion?: string) => boolean
onUpgrade?: () => void
onUpgrade?: (event?: TxWorkspaceEvent) => void
onUnauthorized?: () => void
onArchived?: () => void
onMigration?: () => void
onConnect?: (event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>
onConnect?: (event: ClientConnectEvent, lastTx: Record<WorkspaceUuid, string | undefined> | undefined, data: any) => Promise<void>
ctx?: MeasureContext
onDialTimeout?: () => void | Promise<void>

View File

@ -186,6 +186,8 @@ export async function connect (title: string): Promise<Client | undefined> {
const me: Account = {
uuid: account,
role: workspaceLoginInfo.role,
roles: {},
targetWorkspace: workspaceLoginInfo.workspace,
primarySocialId: '' as PersonId,
socialIds: [],
fullSocialIds: []

View File

@ -343,11 +343,16 @@ export async function connect (title: string): Promise<Client | undefined> {
// TODO: should we take the function from some resource like fetchWorkspace/selectWorkspace
// to remove account client dependency?
const accountsUrl = getMetadata(login.metadata.AccountsUrl)
const socialIds: SocialId[] = await getAccountClient(accountsUrl, token).getSocialIds()
const me: Account = {
const accountVal = await newClient.getConnection?.()?.getAccount()
const socialIds: SocialId[] = accountVal?.fullSocialIds ?? await getAccountClient(accountsUrl, token).getSocialIds()
const me: Account = (await newClient.getConnection?.()?.getAccount()) ?? {
uuid: account,
role: workspaceLoginInfo.role,
roles: {},
targetWorkspace: workspaceLoginInfo.workspace,
primarySocialId: pickPrimarySocialId(socialIds)._id,
socialIds: socialIds.map((si) => si._id),
fullSocialIds: socialIds

View File

@ -1,3 +1,11 @@
import { getClient as getAccountClientRaw, type AccountClient } from '@hcengineering/account-client'
import contact, {
AvatarType,
combineName,
type Person,
type SocialIdentity,
type SocialIdentityRef
} from '@hcengineering/contact'
import core, {
buildSocialIdString,
generateId,
@ -5,26 +13,19 @@ import core, {
TxFactory,
TxProcessor,
type AttachedData,
type Data,
type Class,
type Data,
type Doc,
type MeasureContext,
type Ref,
type SearchOptions,
type SearchQuery,
type TxCUD
type TxCUD,
type WorkspaceUuid
} from '@hcengineering/core'
import type { ClientSessionCtx, ConnectionSocket, Session, SessionManager } from '@hcengineering/server-core'
import { decodeToken } from '@hcengineering/server-token'
import { rpcJSONReplacer, type RateLimitInfo } from '@hcengineering/rpc'
import contact, {
AvatarType,
combineName,
type SocialIdentity,
type Person,
type SocialIdentityRef
} from '@hcengineering/contact'
import { type AccountClient, getClient as getAccountClientRaw } from '@hcengineering/account-client'
import type { ConnectionSocket } from '@hcengineering/server-core'
import { decodeToken } from '@hcengineering/server-token'
import { createHash } from 'crypto'
import { type Express, type Response as ExpressResponse, type Request } from 'express'
@ -35,6 +36,7 @@ import { gzip } from 'zlib'
import { retrieveJson } from './utils'
import { unknownError } from '@hcengineering/platform'
import type { ClientSessionCtx, Session, SessionManager } from '@hcengineering/server'
interface RPCClientInfo {
client: ConnectionSocket
session: Session
@ -137,7 +139,8 @@ export function registerRPC (app: Express, sessions: SessionManager, ctx: Measur
ctx: ClientSessionCtx,
session: Session,
rateLimit: RateLimitInfo | undefined,
token: string
token: string,
workspaceId: WorkspaceUuid
) => Promise<void>
): Promise<void> {
try {
@ -151,35 +154,36 @@ export function registerRPC (app: Express, sessions: SessionManager, ctx: Measur
sendError(res, 401, { message: 'Missing Authorization header' })
return
}
const workspaceId = decodeURIComponent(req.params.workspaceId)
const workspaceId = decodeURIComponent(req.params.workspaceId) as WorkspaceUuid
token = token.split(' ')[1]
const decodedToken = decodeToken(token)
if (workspaceId !== decodedToken.workspace) {
sendError(res, 401, { message: 'Invalid workspace', workspace: decodedToken.workspace })
return
}
// if (workspaceId !== decodedToken.workspace) {
// sendError(res, 401, { message: 'Invalid workspace', workspace: decodedToken.workspace })
// return
// }
let transactorRpc = rpcSessions.get(token)
if (transactorRpc === undefined) {
const cs: ConnectionSocket = createClosingSocket(token, rpcSessions)
const s = await sessions.addSession(ctx, cs, decodedToken, token, token)
if (!('session' in s)) {
const session = await sessions.addSession(ctx, cs, decodedToken, token, token)
if (!('session' in session)) {
sendError(res, 401, {
message: 'Failed to create session',
mode: 'specialError' in s ? s.specialError ?? '' : 'upgrading'
mode: 'specialError' in session ? session.specialError ?? '' : 'upgrading'
})
return
}
transactorRpc = { session: s.session, client: cs, workspaceId: s.workspaceId }
transactorRpc = { session, client: cs, workspaceId }
rpcSessions.set(token, transactorRpc)
}
const rpc = transactorRpc
const rateLimit = await sessions.handleRPC(ctx, rpc.session, rpc.client, async (ctx, rateLimit) => {
await operation(ctx, rpc.session, rateLimit, token)
})
const rateLimit = await sessions.handleRPC(ctx, workspaceId,
rpc.session, rpc.client, async (ctx, rateLimit) => {
await operation(ctx, rpc.session, rateLimit, token, workspaceId)
})
if (rateLimit !== undefined) {
const { remaining, limit, reset, retryAfter } = rateLimit
const retryHeaders: OutgoingHttpHeaders = {
@ -206,15 +210,15 @@ export function registerRPC (app: Express, sessions: SessionManager, ctx: Measur
}
app.get('/api/v1/ping/:workspaceId', (req, res) => {
void withSession(req, res, async (ctx, session, rateLimit) => {
void withSession(req, res, async (ctx, session, rateLimit, token, workspaceId) => {
await session.ping(ctx)
await sendJson(
req,
res,
{
pong: true,
lastTx: ctx.pipeline.context.lastTx,
lastHash: ctx.pipeline.context.lastHash
...await sessions.getLastTxHash(workspaceId)
},
rateLimitToHeaders(rateLimit)
)
@ -273,14 +277,17 @@ export function registerRPC (app: Express, sessions: SessionManager, ctx: Measur
core.class.Space,
core.class.Tx
]
const h = ctx.pipeline.context.hierarchy
const filtered = txes.filter(
(it) =>
TxProcessor.isExtendsCUD(it._class) &&
const workspace = ctx.workspaces[0]
await workspace.with(async (pipeline) => {
const h = pipeline.context.hierarchy
const filtered = txes.filter(
(it) =>
TxProcessor.isExtendsCUD(it._class) &&
allowedClasess.some((cl) => h.isDerived((it as TxCUD<Doc>).objectClass, cl))
)
)
await sendJson(req, res, filtered, rateLimitToHeaders(rateLimit))
await sendJson(req, res, filtered, rateLimitToHeaders(rateLimit))
})
})
})

View File

@ -17,11 +17,10 @@
import { type BrandingMap, type MeasureContext, type Tx } from '@hcengineering/core'
import { buildStorageFromConfig } from '@hcengineering/server-storage'
import { startSessionManager } from '@hcengineering/server'
import { startSessionManager, type SessionManager } from '@hcengineering/server'
import {
type CommunicationApiFactory,
type PlatformQueue,
type SessionManager,
type StorageConfiguration
} from '@hcengineering/server-core'

View File

@ -28,7 +28,7 @@ import {
type WorkspaceIds,
type WorkspaceUuid
} from '@hcengineering/core'
import platform, { Severity, Status, UNAUTHORIZED, unknownStatus } from '@hcengineering/platform'
import { UNAUTHORIZED } from '@hcengineering/platform'
import { RPCHandler, type Response } from '@hcengineering/rpc'
import {
doSessionOp,
@ -38,6 +38,7 @@ import {
processRequest,
wipeStatistics,
type BlobResponse,
type SessionManager,
type WebsocketData
} from '@hcengineering/server'
import {
@ -45,7 +46,6 @@ import {
pingConst,
pongConst,
type ConnectionSocket,
type SessionManager,
type StorageAdapter
} from '@hcengineering/server-core'
import { decodeToken, type Token } from '@hcengineering/server-token'
@ -261,7 +261,7 @@ export function startHttpServer (
}
case 'force-close': {
const wsId = req.query.wsId as WorkspaceUuid
void sessions.forceClose(wsId ?? payload.workspace)
void sessions.forceCloseWorkspace(ctx, wsId ?? payload.workspace)
res.writeHead(200)
res.end()
return
@ -454,69 +454,20 @@ export function startHttpServer (
}
const cs: ConnectionSocket = createWebsocketClientSocket(ws, data)
const session = sessions.addSession(ctx, cs, token, rawToken, sessionId)
void session.catch(() => {
// Ignore err
ws.close()
})
const webSocketData: WebsocketData = {
connectionSocket: cs,
payload: token,
token: rawToken,
session: sessions.addSession(ctx, cs, token, rawToken, sessionId),
session,
url: ''
}
if (webSocketData.session instanceof Promise) {
void webSocketData.session.then((s) => {
if ('error' in s) {
if (s.specialError === 'archived') {
void cs.send(
ctx,
{
id: -1,
error: new Status(Severity.ERROR, platform.status.WorkspaceArchived, {
workspaceUuid: token.workspace
}),
terminate: s.terminate
},
false,
false
)
} else if (s.specialError === 'migration') {
void cs.send(
ctx,
{
id: -1,
error: new Status(Severity.ERROR, platform.status.WorkspaceMigration, {
workspaceUuid: token.workspace
}),
terminate: s.terminate
},
false,
false
)
} else {
void cs.send(
ctx,
{ id: -1, error: unknownStatus(s.error.message ?? 'Unknown error'), terminate: s.terminate },
false,
false
)
}
// No connection to account service, retry from client.
setTimeout(() => {
cs.close()
}, 1000)
}
if ('upgrade' in s) {
void cs
.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false)
.then(() => {
cs.close()
})
}
})
void webSocketData.session.catch((err) => {
ctx.error('unexpected error in websocket', { err })
})
}
// eslint-disable-next-line @typescript-eslint/no-misused-promises
ws.on('message', (msg: RawData) => {
try {
@ -530,8 +481,7 @@ export function startHttpServer (
doSessionOp(
webSocketData,
(s, buff) => {
s.context.measure('receive-data', buff?.length ?? 0)
processRequest(s.session, cs, s.context, s.workspaceId, buff, sessions)
processRequest(ctx, s, cs, buff, sessions)
},
buff
)
@ -543,32 +493,30 @@ export function startHttpServer (
}
}
})
// eslint-disable-next-line @typescript-eslint/no-misused-promises
ws.on('close', (code: number, reason: Buffer) => {
const handleClose = (err: Error | null): void => {
doSessionOp(
webSocketData,
(s) => {
if (!(s.session.workspaceClosed ?? false)) {
// remove session after 1seconds, give a time to reconnect.
void sessions.close(ctx, cs, token.workspace)
if (err !== null) {
ctx.error('error', { err, user: s.getUser() })
}
// remove session after 1seconds, give a time to reconnect.
void sessions.close(ctx, s).catch(err => {
ctx.error('failed to close session', { err })
})
},
Buffer.from('')
)
}
// eslint-disable-next-line @typescript-eslint/no-misused-promises
ws.on('close', (code: number, reason: Buffer) => {
handleClose(null)
})
ws.on('error', (err) => {
doSessionOp(
webSocketData,
(s) => {
ctx.error('error', { err, user: s.session.getUser() })
if (!(s.session.workspaceClosed ?? false)) {
// remove session after 1seconds, give a time to reconnect.
void sessions.close(ctx, cs, token.workspace)
}
},
Buffer.from('')
)
handleClose(err)
})
}
wss.on('connection', handleConnection as any)

View File

@ -75,7 +75,8 @@ async function createMessages (
for (const data of result) {
void api.event(
{
account: systemAccount
// TODO: Fix me, Undetermined role is missing in communication API
account: systemAccount as any
},
{
type: MessageRequestEventType.CreateMessage,

View File

@ -410,7 +410,10 @@ async function updateCollaborators (control: TriggerControl, ctx: TxCreateDoc<Ca
if (collaborators.length === 0) continue
void communicationApi.event(
{ account: systemAccount },
{
// TODO: Fix me, Undetermined role is missing in communication API
account: systemAccount as any
},
{
type: NotificationRequestEventType.AddCollaborators,
card: tx.objectId,

View File

@ -14,19 +14,9 @@
//
import {
type ServerApi as CommunicationApi,
type RequestEvent as CommunicationEvent,
type EventResult
type ServerApi as CommunicationApi
} from '@hcengineering/communication-sdk-types'
import {
type FindMessagesGroupsParams,
type FindMessagesParams,
type Message,
type MessagesGroup
} from '@hcengineering/communication-types'
import {
type Account,
type AccountUuid,
type Branding,
type Class,
type Doc,
@ -46,7 +36,6 @@ import {
type SearchQuery,
type SearchResult,
type SessionData,
type SocialId,
type Space,
type Timestamp,
type Tx,
@ -57,14 +46,12 @@ import {
} from '@hcengineering/core'
import type { Asset, Resource } from '@hcengineering/platform'
import type { LiveQuery } from '@hcengineering/query'
import type { RateLimitInfo, ReqId, Request, Response } from '@hcengineering/rpc'
import type { Token } from '@hcengineering/server-token'
import type { Request, Response } from '@hcengineering/rpc'
import { type Readable } from 'stream'
import type { DbAdapter, DomainHelper } from './adapter'
import type { StatisticsElement, WorkspaceStatistics } from './stats'
import { type PlatformQueue, type PlatformQueueProducer, type QueueTopic } from './queue'
import { type StorageAdapter } from './storage'
import { type PlatformQueueProducer, type QueueTopic, type PlatformQueue } from './queue'
export interface ServerFindOptions<T extends Doc> extends FindOptions<T> {
domain?: Domain // Allow to find for Doc's in specified domain only.
@ -540,99 +527,8 @@ export interface SessionRequest {
id: string
params: any
start: number
}
export interface ClientSessionCtx {
ctx: MeasureContext
pipeline: Pipeline
communicationApi: CommunicationApi
socialStringsToUsers: Map<PersonId, AccountUuid>
requestId: ReqId | undefined
sendResponse: (id: ReqId | undefined, msg: any) => Promise<void>
sendPong: () => void
sendError: (id: ReqId | undefined, msg: any, error: any) => Promise<void>
}
/**
* @public
*/
export interface Session {
workspace: WorkspaceIds
createTime: number
// Session restore information
sessionId: string
sessionInstanceId?: string
workspaceClosed?: boolean
requests: Map<string, SessionRequest>
binaryMode: boolean
useCompression: boolean
total: StatisticsElement
current: StatisticsElement
mins5: StatisticsElement
lastRequest: number
lastPing: number
isUpgradeClient: () => boolean
getMode: () => string
broadcast: (ctx: MeasureContext, socket: ConnectionSocket, tx: Tx[]) => void
// Client methods
ping: (ctx: ClientSessionCtx) => Promise<void>
getUser: () => AccountUuid
getUserSocialIds: () => PersonId[]
getSocialIds: () => SocialId[]
loadModel: (ctx: ClientSessionCtx, lastModelTx: Timestamp, hash?: string) => Promise<void>
loadModelRaw: (ctx: ClientSessionCtx, lastModelTx: Timestamp, hash?: string) => Promise<LoadModelResponse | Tx[]>
getRawAccount: () => Account
findAll: <T extends Doc>(
ctx: ClientSessionCtx,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
) => Promise<void>
findAllRaw: <T extends Doc>(
ctx: ClientSessionCtx,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
) => Promise<FindResult<T>>
searchFulltext: (ctx: ClientSessionCtx, query: SearchQuery, options: SearchOptions) => Promise<void>
searchFulltextRaw: (ctx: ClientSessionCtx, query: SearchQuery, options: SearchOptions) => Promise<SearchResult>
tx: (ctx: ClientSessionCtx, tx: Tx) => Promise<void>
txRaw: (
ctx: ClientSessionCtx,
tx: Tx
) => Promise<{
result: TxResult
broadcastPromise: Promise<void>
asyncsPromise: Promise<void> | undefined
}>
loadChunk: (ctx: ClientSessionCtx, domain: Domain, idx?: number) => Promise<void>
getDomainHash: (ctx: ClientSessionCtx, domain: Domain) => Promise<void>
closeChunk: (ctx: ClientSessionCtx, idx: number) => Promise<void>
loadDocs: (ctx: ClientSessionCtx, domain: Domain, docs: Ref<Doc>[]) => Promise<void>
upload: (ctx: ClientSessionCtx, domain: Domain, docs: Doc[]) => Promise<void>
clean: (ctx: ClientSessionCtx, domain: Domain, docs: Ref<Doc>[]) => Promise<void>
includeSessionContext: (ctx: ClientSessionCtx) => void
eventRaw: (ctx: ClientSessionCtx, event: CommunicationEvent) => Promise<EventResult>
findMessagesRaw: (ctx: ClientSessionCtx, params: FindMessagesParams) => Promise<Message[]>
findMessagesGroupsRaw: (ctx: ClientSessionCtx, params: FindMessagesGroupsParams) => Promise<MessagesGroup[]>
workspaceId?: WorkspaceUuid
}
/**
@ -666,76 +562,5 @@ export function disableLogging (): void {
LOGGING_ENABLED = false
}
export interface AddSessionActive {
session: Session
context: MeasureContext
workspaceId: WorkspaceUuid
}
export type GetWorkspaceResponse =
| { upgrade: true, progress?: number }
| { error: any, terminate?: boolean, specialError?: 'archived' | 'migration' }
export type AddSessionResponse = AddSessionActive | GetWorkspaceResponse
/**
* @public
*/
export interface SessionManager {
// workspaces: Map<WorkspaceUuid, Workspace>
sessions: Map<string, { session: Session, socket: ConnectionSocket }>
addSession: (
ctx: MeasureContext,
ws: ConnectionSocket,
token: Token,
rawToken: string,
sessionId: string | undefined
) => Promise<AddSessionResponse>
broadcastAll: (workspace: WorkspaceUuid, tx: Tx[], targets?: string[]) => void
close: (ctx: MeasureContext, ws: ConnectionSocket, workspaceId: WorkspaceUuid) => Promise<void>
forceClose: (wsId: WorkspaceUuid, ignoreSocket?: ConnectionSocket) => Promise<void>
closeWorkspaces: (ctx: MeasureContext) => Promise<void>
scheduleMaintenance: (timeMinutes: number) => void
profiling?: {
start: () => void
stop: () => Promise<string | undefined>
}
handleRequest: <S extends Session>(
requestCtx: MeasureContext,
service: S,
ws: ConnectionSocket,
request: Request<any>,
workspace: WorkspaceUuid
) => Promise<void>
handleRPC: <S extends Session>(
requestCtx: MeasureContext,
service: S,
ws: ConnectionSocket,
operation: (ctx: ClientSessionCtx, rateLimit?: RateLimitInfo) => Promise<void>
) => Promise<RateLimitInfo | undefined>
createOpContext: (
ctx: MeasureContext,
sendCtx: MeasureContext,
pipeline: Pipeline,
communicationApi: CommunicationApi,
requestId: Request<any>['id'],
service: Session,
ws: ConnectionSocket,
rateLimit?: RateLimitInfo
) => ClientSessionCtx
getStatistics: () => WorkspaceStatistics[]
}
export const pingConst = 'ping'
export const pongConst = 'pong!'

View File

@ -27,7 +27,8 @@ import core, {
type Tx,
type TxResult,
type TxWorkspaceEvent,
type WorkspaceIds
type WorkspaceIds,
type WorkspaceUuid
} from '@hcengineering/core'
import { PlatformError, unknownError } from '@hcengineering/platform'
import { createHash, type Hash } from 'crypto'
@ -284,19 +285,23 @@ export function wrapAdapterToClient (ctx: MeasureContext, storageAdapter: DbAdap
class TestClientConnection implements ClientConnection {
isConnected = (): boolean => true
handler?: (event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>
handler?: (event: ClientConnectEvent, lastTx: Record<WorkspaceUuid, string | undefined> | undefined, data: any) => Promise<void>
set onConnect (
handler: ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>) | undefined
handler: ((event: ClientConnectEvent, lastTx: Record<WorkspaceUuid, string | undefined> | undefined, data: any) => Promise<void>) | undefined
) {
this.handler = handler
void this.handler?.(ClientConnectEvent.Connected, '', {})
void this.handler?.(ClientConnectEvent.Connected, {}, {})
}
get onConnect (): ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>) | undefined {
get onConnect (): ((event: ClientConnectEvent, lastTx: Record<WorkspaceUuid, string | undefined> | undefined, data: any) => Promise<void>) | undefined {
return this.handler
}
getAccount (): Promise<Account> {
throw new Error('Method not implemented.')
}
pushHandler (): void {}
async findAll<T extends Doc>(

View File

@ -24,7 +24,6 @@ export * from './domainFind'
export * from './domainTx'
export * from './fulltext'
export * from './liveQuery'
export * from './lookup'
export * from './lowLevel'
export * from './model'
export * from './modified'

View File

@ -1,118 +0,0 @@
//
// Copyright © 2022 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import {
Class,
Doc,
DocumentQuery,
FindOptions,
FindResult,
MeasureContext,
Ref,
clone,
toFindResult
} from '@hcengineering/core'
import { BaseMiddleware, Middleware, type PipelineContext } from '@hcengineering/server-core'
/**
* @public
*/
export class LookupMiddleware extends BaseMiddleware implements Middleware {
private constructor (context: PipelineContext, next?: Middleware) {
super(context, next)
}
static async create (
ctx: MeasureContext,
context: PipelineContext,
next: Middleware | undefined
): Promise<LookupMiddleware> {
return new LookupMiddleware(context, next)
}
override async findAll<T extends Doc>(
ctx: MeasureContext,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
const result = await this.provideFindAll(ctx, _class, query, options)
// Fill lookup map to make more compact representation
if (options?.lookup !== undefined) {
const newResult: T[] = []
let counter = 0
const idClassMap: Record<string, { id: number, doc: Doc, count: number }> = {}
function mapDoc (doc: Doc): number {
const key = doc._class + '@' + doc._id
let docRef = idClassMap[key]
if (docRef === undefined) {
docRef = { id: ++counter, doc, count: -1 }
idClassMap[key] = docRef
}
docRef.count++
return docRef.id
}
for (const d of result) {
const newDoc: any = { ...d }
if (d.$lookup !== undefined) {
newDoc.$lookup = clone(d.$lookup)
newResult.push(newDoc)
for (const [k, v] of Object.entries(d.$lookup)) {
if (!Array.isArray(v)) {
newDoc.$lookup[k] = v != null ? mapDoc(v) : v
} else {
newDoc.$lookup[k] = v.map((it) => (it != null ? mapDoc(it) : it))
}
}
} else {
newResult.push(newDoc)
}
}
const lookupMap = Object.fromEntries(Array.from(Object.values(idClassMap)).map((it) => [it.id, it.doc]))
return this.cleanQuery<T>(toFindResult(newResult, result.total, lookupMap), query, lookupMap)
}
// We need to get rid of simple query parameters matched in documents
return this.cleanQuery<T>(result, query)
}
private cleanQuery<T extends Doc>(
result: FindResult<T>,
query: DocumentQuery<T>,
lookupMap?: Record<string, Doc>
): FindResult<T> {
const newResult: T[] = []
for (const doc of result) {
let _doc = doc
let cloned = false
for (const [k, v] of Object.entries(query)) {
if (typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') {
if ((_doc as any)[k] === v) {
if (!cloned) {
_doc = { ...doc } as any
cloned = true
}
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete (_doc as any)[k]
}
}
}
newResult.push(_doc)
}
return toFindResult(newResult, result.total, lookupMap)
}
}

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import type { Account } from '@hcengineering/core'
import type { Account, WorkspaceUuid } from '@hcengineering/core'
import platform, { PlatformError, Severity, Status } from '@hcengineering/platform'
import { Packr } from 'msgpackr'
@ -47,8 +47,8 @@ export interface HelloResponse extends Response<any> {
binary: boolean
reconnect?: boolean
serverVersion: string
lastTx?: string
lastHash?: string // Last model hash
lastTx?: Record<WorkspaceUuid, string | undefined>
lastHash?: Record<WorkspaceUuid, string | undefined> // Last model hash
account: Account
useCompression?: boolean
}

View File

@ -25,7 +25,6 @@ import {
FullTextMiddleware,
IdentityMiddleware,
LiveQueryMiddleware,
LookupMiddleware,
LowLevelMiddleware,
MarkDerivedEntryMiddleware,
ModelMiddleware,
@ -116,7 +115,6 @@ export function createServerPipeline (
const conf = getConfig(metrics, dbUrl, wsMetrics, opt, extensions)
const middlewares: MiddlewareCreator[] = [
LookupMiddleware.create,
IdentityMiddleware.create,
ModifiedMiddleware.create,
PluginConfigurationMiddleware.create,

View File

@ -31,6 +31,7 @@ import {
import {
AccountUuid,
generateId,
toFindResult,
TxProcessor,
type Account,
type Class,
@ -60,14 +61,18 @@ import {
BackupClientOps,
createBroadcastEvent,
SessionDataImpl,
type ClientSessionCtx,
type ConnectionSocket,
type Pipeline,
type Session,
type SessionRequest,
type StatisticsElement
} from '@hcengineering/server-core'
import { type Token } from '@hcengineering/server-token'
import { mapLookup } from './lookup'
import {
type ClientSessionCtx,
type Session
} from './types'
const useReserveContext = (process.env.USE_RESERVE_CTX ?? 'true') === 'true'
@ -92,10 +97,13 @@ export class ClientSession implements Session {
ops: BackupClientOps | undefined
opsPipeline: Pipeline | undefined
isAdmin: boolean
workspaceClosed = false
constructor (
readonly ctx: MeasureContext,
protected readonly token: Token,
readonly workspace: WorkspaceIds,
readonly socket: ConnectionSocket,
readonly workspaces: { wsIds: WorkspaceIds, enabled: boolean }[],
readonly account: Account,
readonly info: LoginInfoWithWorkspaces,
readonly allowUpload: boolean
@ -133,42 +141,50 @@ export class ClientSession implements Session {
}
async loadModel (ctx: ClientSessionCtx, lastModelTx: Timestamp, hash?: string): Promise<void> {
try {
this.includeSessionContext(ctx)
const result = await ctx.ctx.with('load-model', {}, () => ctx.pipeline.loadModel(ctx.ctx, lastModelTx, hash))
await ctx.sendResponse(ctx.requestId, result)
} catch (err) {
await ctx.sendError(ctx.requestId, 'Failed to loadModel', unknownError(err))
ctx.ctx.error('failed to loadModel', { err })
}
// TODO: Model is from first workspace for now.
const workspace = ctx.workspaces[0]
await workspace.with(async (pipeline, communicationApi) => {
try {
this.includeSessionContext(ctx, pipeline)
const result = await ctx.ctx.with('load-model', {}, () => pipeline.loadModel(ctx.ctx, lastModelTx, hash))
await ctx.sendResponse(ctx.requestId, result)
} catch (err) {
await ctx.sendError(ctx.requestId, 'Failed to loadModel', unknownError(err))
ctx.ctx.error('failed to loadModel', { err })
}
})
}
async loadModelRaw (ctx: ClientSessionCtx, lastModelTx: Timestamp, hash?: string): Promise<LoadModelResponse | Tx[]> {
this.includeSessionContext(ctx)
return await ctx.ctx.with('load-model', {}, (_ctx) => ctx.pipeline.loadModel(_ctx, lastModelTx, hash))
// TODO: Model is from first workspace for now.
const workspace = ctx.workspaces[0]
return await workspace.with(async (pipeline, communicationApi) => {
this.includeSessionContext(ctx, pipeline)
return await ctx.ctx.with('load-model', {}, (_ctx) => pipeline.loadModel(_ctx, lastModelTx, hash))
})
}
includeSessionContext (ctx: ClientSessionCtx): void {
const dataId = this.workspace.dataId ?? (this.workspace.uuid as unknown as WorkspaceDataId)
includeSessionContext (ctx: ClientSessionCtx, pipeline: Pipeline): void {
const dataId = pipeline.context.workspace.dataId ?? (pipeline.context.workspace.uuid as unknown as WorkspaceDataId)
const contextData = new SessionDataImpl(
this.account,
this.sessionId,
this.isAdmin,
undefined,
{
...this.workspace,
...pipeline.context.workspace,
dataId
},
false,
undefined,
undefined,
ctx.pipeline.context.modelDb,
pipeline.context.modelDb,
ctx.socialStringsToUsers
)
ctx.ctx.contextData = contextData
}
findAllRaw<T extends Doc>(
async findAllRaw<T extends Doc>(
ctx: ClientSessionCtx,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
@ -177,8 +193,23 @@ export class ClientSession implements Session {
this.lastRequest = Date.now()
this.total.find++
this.current.find++
this.includeSessionContext(ctx)
return ctx.pipeline.findAll(ctx.ctx, _class, query, options)
const result: FindResult<T> = toFindResult([], -1)
for (const workspace of ctx.workspaces) {
await workspace.with(async (pipeline, communicationApi) => {
this.includeSessionContext(ctx, pipeline)
const part = await pipeline.findAll(ctx.ctx, _class, query, options)
result.push(...part)
if (part.total !== -1) {
if (result.total === -1) {
result.total = 0
}
result.total += part.total
}
})
}
return result
}
async findAll<T extends Doc>(
@ -188,7 +219,8 @@ export class ClientSession implements Session {
options?: FindOptions<T>
): Promise<void> {
try {
await ctx.sendResponse(ctx.requestId, await this.findAllRaw(ctx, _class, query, options))
const result = await this.findAllRaw(ctx, _class, query, options)
await ctx.sendResponse(ctx.requestId, mapLookup<T>(query, result, options))
} catch (err) {
await ctx.sendError(ctx.requestId, 'Failed to findAll', unknownError(err))
ctx.ctx.error('failed to findAll', { err })
@ -198,8 +230,11 @@ export class ClientSession implements Session {
async searchFulltext (ctx: ClientSessionCtx, query: SearchQuery, options: SearchOptions): Promise<void> {
try {
this.lastRequest = Date.now()
this.includeSessionContext(ctx)
await ctx.sendResponse(ctx.requestId, await ctx.pipeline.searchFulltext(ctx.ctx, query, options))
const workspace = ctx.workspaces[0]
await workspace.with(async (pipeline, communicationApi) => {
this.includeSessionContext(ctx, pipeline)
await ctx.sendResponse(ctx.requestId, await pipeline.searchFulltext(ctx.ctx, query, options))
})
} catch (err) {
await ctx.sendError(ctx.requestId, 'Failed to searchFulltext', unknownError(err))
ctx.ctx.error('failed to searchFulltext', { err })
@ -208,8 +243,11 @@ export class ClientSession implements Session {
async searchFulltextRaw (ctx: ClientSessionCtx, query: SearchQuery, options: SearchOptions): Promise<SearchResult> {
this.lastRequest = Date.now()
this.includeSessionContext(ctx)
return await ctx.pipeline.searchFulltext(ctx.ctx, query, options)
const workspace = ctx.workspaces[0]
return await workspace.with(async (pipeline, communicationApi) => {
this.includeSessionContext(ctx, pipeline)
return await pipeline.searchFulltext(ctx.ctx, query, options)
})
}
async txRaw (
@ -223,43 +261,46 @@ export class ClientSession implements Session {
this.lastRequest = Date.now()
this.total.tx++
this.current.tx++
this.includeSessionContext(ctx)
const workspace = ctx.workspaces[0]
return await workspace.with(async (pipeline, communicationApi) => {
this.includeSessionContext(ctx, pipeline)
let cid = 'client_' + generateId()
ctx.ctx.id = cid
let onEnd = useReserveContext ? ctx.pipeline.context.adapterManager?.reserveContext?.(cid) : undefined
let result: TxResult
try {
result = await ctx.pipeline.tx(ctx.ctx, [tx])
} finally {
onEnd?.()
}
// Send result immideately
await ctx.sendResponse(ctx.requestId, result)
// We need to broadcast all collected transactions
const broadcastPromise = ctx.pipeline.handleBroadcast(ctx.ctx)
// ok we could perform async requests if any
const asyncs = (ctx.ctx.contextData as SessionData).asyncRequests ?? []
let asyncsPromise: Promise<void> | undefined
if (asyncs.length > 0) {
cid = 'client_async_' + generateId()
let cid = 'client_' + generateId()
ctx.ctx.id = cid
onEnd = useReserveContext ? ctx.pipeline.context.adapterManager?.reserveContext?.(cid) : undefined
const handleAyncs = async (): Promise<void> => {
try {
for (const r of (ctx.ctx.contextData as SessionData).asyncRequests ?? []) {
await r()
}
} finally {
onEnd?.()
}
let onEnd = useReserveContext ? pipeline.context.adapterManager?.reserveContext?.(cid) : undefined
let result: TxResult
try {
result = await pipeline.tx(ctx.ctx, [tx])
} finally {
onEnd?.()
}
asyncsPromise = handleAyncs()
}
// Send result immideately
await ctx.sendResponse(ctx.requestId, result)
return { result, broadcastPromise, asyncsPromise }
// We need to broadcast all collected transactions
const broadcastPromise = pipeline.handleBroadcast(ctx.ctx)
// ok we could perform async requests if any
const asyncs = (ctx.ctx.contextData as SessionData).asyncRequests ?? []
let asyncsPromise: Promise<void> | undefined
if (asyncs.length > 0) {
cid = 'client_async_' + generateId()
ctx.ctx.id = cid
onEnd = useReserveContext ? pipeline.context.adapterManager?.reserveContext?.(cid) : undefined
const handleAyncs = async (): Promise<void> => {
try {
for (const r of (ctx.ctx.contextData as SessionData).asyncRequests ?? []) {
await r()
}
} finally {
onEnd?.()
}
}
asyncsPromise = handleAyncs()
}
return { result, broadcastPromise, asyncsPromise }
})
}
async tx (ctx: ClientSessionCtx, tx: Tx): Promise<void> {
@ -315,8 +356,11 @@ export class ClientSession implements Session {
async loadChunk (ctx: ClientSessionCtx, domain: Domain, idx?: number): Promise<void> {
this.lastRequest = Date.now()
try {
const result = await this.getOps(ctx.pipeline).loadChunk(ctx.ctx, domain, idx)
await ctx.sendResponse(ctx.requestId, result)
const workspace = ctx.workspaces[0]
await workspace.with(async (pipeline, communicationApi) => {
const result = await this.getOps(pipeline).loadChunk(ctx.ctx, domain, idx)
await ctx.sendResponse(ctx.requestId, result)
})
} catch (err: any) {
await ctx.sendError(ctx.requestId, 'Failed to upload', unknownError(err))
ctx.ctx.error('failed to loadChunk', { domain, err })
@ -326,8 +370,11 @@ export class ClientSession implements Session {
async getDomainHash (ctx: ClientSessionCtx, domain: Domain): Promise<void> {
this.lastRequest = Date.now()
try {
const result = await this.getOps(ctx.pipeline).getDomainHash(ctx.ctx, domain)
await ctx.sendResponse(ctx.requestId, result)
const workspace = ctx.workspaces[0]
await workspace.with(async (pipeline, communicationApi) => {
const result = await this.getOps(pipeline).getDomainHash(ctx.ctx, domain)
await ctx.sendResponse(ctx.requestId, result)
})
} catch (err: any) {
await ctx.sendError(ctx.requestId, 'Failed to upload', unknownError(err))
ctx.ctx.error('failed to getDomainHash', { domain, err })
@ -337,8 +384,11 @@ export class ClientSession implements Session {
async closeChunk (ctx: ClientSessionCtx, idx: number): Promise<void> {
try {
this.lastRequest = Date.now()
await this.getOps(ctx.pipeline).closeChunk(ctx.ctx, idx)
await ctx.sendResponse(ctx.requestId, {})
const workspace = ctx.workspaces[0]
await workspace.with(async (pipeline, communicationApi) => {
await this.getOps(pipeline).closeChunk(ctx.ctx, idx)
await ctx.sendResponse(ctx.requestId, {})
})
} catch (err: any) {
await ctx.sendError(ctx.requestId, 'Failed to closeChunk', unknownError(err))
ctx.ctx.error('failed to closeChunk', { err })
@ -348,8 +398,11 @@ export class ClientSession implements Session {
async loadDocs (ctx: ClientSessionCtx, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
this.lastRequest = Date.now()
try {
const result = await this.getOps(ctx.pipeline).loadDocs(ctx.ctx, domain, docs)
await ctx.sendResponse(ctx.requestId, result)
const workspace = ctx.workspaces[0]
await workspace.with(async (pipeline, communicationApi) => {
const result = await this.getOps(pipeline).loadDocs(ctx.ctx, domain, docs)
await ctx.sendResponse(ctx.requestId, result)
})
} catch (err: any) {
await ctx.sendError(ctx.requestId, 'Failed to loadDocs', unknownError(err))
ctx.ctx.error('failed to loadDocs', { domain, err })
@ -362,7 +415,10 @@ export class ClientSession implements Session {
}
this.lastRequest = Date.now()
try {
await this.getOps(ctx.pipeline).upload(ctx.ctx, domain, docs)
const workspace = ctx.workspaces[0]
await workspace.with(async (pipeline, communicationApi) => {
await this.getOps(pipeline).upload(ctx.ctx, domain, docs)
})
} catch (err: any) {
await ctx.sendError(ctx.requestId, 'Failed to upload', unknownError(err))
ctx.ctx.error('failed to loadDocs', { domain, err })
@ -377,7 +433,10 @@ export class ClientSession implements Session {
}
this.lastRequest = Date.now()
try {
await this.getOps(ctx.pipeline).clean(ctx.ctx, domain, docs)
const workspace = ctx.workspaces[0]
await workspace.with(async (pipeline, communicationApi) => {
await this.getOps(pipeline).clean(ctx.ctx, domain, docs)
})
} catch (err: any) {
await ctx.sendError(ctx.requestId, 'Failed to clean', unknownError(err))
ctx.ctx.error('failed to clean', { domain, err })
@ -388,7 +447,10 @@ export class ClientSession implements Session {
async eventRaw (ctx: ClientSessionCtx, event: CommunicationEvent): Promise<EventResult> {
this.lastRequest = Date.now()
return await ctx.communicationApi.event(this.getCommunicationCtx(), event)
const workspace = ctx.workspaces[0]
return await workspace.with(async (pipeline, communicationApi) => {
return await communicationApi.event(this.getCommunicationCtx(workspace.wsId), event)
})
}
async event (ctx: ClientSessionCtx, event: CommunicationEvent): Promise<void> {
@ -398,7 +460,10 @@ export class ClientSession implements Session {
async findMessagesRaw (ctx: ClientSessionCtx, params: FindMessagesParams, queryId?: number): Promise<Message[]> {
this.lastRequest = Date.now()
return await ctx.communicationApi.findMessages(this.getCommunicationCtx(), params, queryId)
const workspace = ctx.workspaces[0]
return await workspace.with(async (pipeline, communicationApi) => {
return await communicationApi.findMessages(this.getCommunicationCtx(workspace.wsId), params, queryId)
})
}
async findMessages (ctx: ClientSessionCtx, params: FindMessagesParams, queryId?: number): Promise<void> {
@ -408,7 +473,10 @@ export class ClientSession implements Session {
async findMessagesGroupsRaw (ctx: ClientSessionCtx, params: FindMessagesGroupsParams): Promise<MessagesGroup[]> {
this.lastRequest = Date.now()
return await ctx.communicationApi.findMessagesGroups(this.getCommunicationCtx(), params)
const workspace = ctx.workspaces[0]
return await workspace.with(async (pipeline, communicationApi) => {
return await communicationApi.findMessagesGroups(this.getCommunicationCtx(workspace.wsId), params)
})
}
async findMessagesGroups (ctx: ClientSessionCtx, params: FindMessagesGroupsParams): Promise<void> {
@ -417,8 +485,11 @@ export class ClientSession implements Session {
}
async findNotifications (ctx: ClientSessionCtx, params: FindNotificationsParams): Promise<void> {
const result = await ctx.communicationApi.findNotifications(this.getCommunicationCtx(), params)
await ctx.sendResponse(ctx.requestId, result)
const workspace = ctx.workspaces[0]
await workspace.with(async (pipeline, communicationApi) => {
const result = await communicationApi.findNotifications(this.getCommunicationCtx(workspace.wsId), params)
await ctx.sendResponse(ctx.requestId, result)
})
}
async findNotificationContexts (
@ -426,25 +497,42 @@ export class ClientSession implements Session {
params: FindNotificationContextParams,
queryId?: number
): Promise<void> {
const result = await ctx.communicationApi.findNotificationContexts(this.getCommunicationCtx(), params, queryId)
await ctx.sendResponse(ctx.requestId, result)
const workspace = ctx.workspaces[0]
await workspace.with(async (pipeline, communicationApi) => {
const result = await communicationApi.findNotificationContexts(
this.getCommunicationCtx(workspace.wsId),
params,
queryId
)
await ctx.sendResponse(ctx.requestId, result)
})
}
async findLabels (ctx: ClientSessionCtx, params: FindLabelsParams): Promise<void> {
const result = await ctx.communicationApi.findLabels(this.getCommunicationCtx(), params)
await ctx.sendResponse(ctx.requestId, result)
const workspace = ctx.workspaces[0]
await workspace.with(async (pipeline, communicationApi) => {
const result = await communicationApi.findLabels(this.getCommunicationCtx(workspace.wsId), params)
await ctx.sendResponse(ctx.requestId, result)
})
}
async unsubscribeQuery (ctx: ClientSessionCtx, id: number): Promise<void> {
this.lastRequest = Date.now()
await ctx.communicationApi.unsubscribeQuery(this.getCommunicationCtx(), id)
await ctx.sendResponse(ctx.requestId, {})
const workspace = ctx.workspaces[0]
await workspace.with(async (pipeline, communicationApi) => {
await communicationApi.unsubscribeQuery(this.getCommunicationCtx(workspace.wsId), id)
await ctx.sendResponse(ctx.requestId, {})
})
}
private getCommunicationCtx (): CommunicationSession {
private getCommunicationCtx (workspaceId: WorkspaceIds): CommunicationSession {
return {
sessionId: this.sessionId,
account: this.account
account: {
...this.account,
// TODO: Fix me, Undetermined role is missing in communication API
role: (this.account.roles[workspaceId.uuid] ?? this.account.role) as any
}
}
}
}

View File

@ -20,3 +20,4 @@ export * from './sessionManager'
export * from './starter'
export * from './stats'
export * from './utils'
export * from './types'

View File

@ -0,0 +1,98 @@
//
// Copyright © 2022 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import {
Doc,
DocumentQuery,
FindOptions,
FindResult,
clone,
toFindResult
} from '@hcengineering/core'
/**
* @public
*/
export function mapLookup<T extends Doc> (
query: DocumentQuery<T>,
result: FindResult<T>,
options?: FindOptions<T>
): FindResult<T> {
// Fill lookup map to make more compact representation
if (options?.lookup !== undefined) {
const newResult: T[] = []
let counter = 0
const idClassMap: Record<string, { id: number, doc: Doc, count: number }> = {}
function mapDoc (doc: Doc): number {
const key = doc._class + '@' + doc._id
let docRef = idClassMap[key]
if (docRef === undefined) {
docRef = { id: ++counter, doc, count: -1 }
idClassMap[key] = docRef
}
docRef.count++
return docRef.id
}
for (const d of result) {
const newDoc: any = { ...d }
if (d.$lookup !== undefined) {
newDoc.$lookup = clone(d.$lookup)
newResult.push(newDoc)
for (const [k, v] of Object.entries(d.$lookup)) {
if (!Array.isArray(v)) {
newDoc.$lookup[k] = v != null ? mapDoc(v) : v
} else {
newDoc.$lookup[k] = v.map((it) => (it != null ? mapDoc(it) : it))
}
}
} else {
newResult.push(newDoc)
}
}
const lookupMap = Object.fromEntries(Array.from(Object.values(idClassMap)).map((it) => [it.id, it.doc]))
return cleanQuery<T>(toFindResult(newResult, result.total, lookupMap), query, lookupMap)
}
// We need to get rid of simple query parameters matched in documents
return cleanQuery<T>(result, query)
}
function cleanQuery<T extends Doc> (
result: FindResult<T>,
query: DocumentQuery<T>,
lookupMap?: Record<string, Doc>
): FindResult<T> {
const newResult: T[] = []
for (const doc of result) {
let _doc = doc
let cloned = false
for (const [k, v] of Object.entries(query)) {
if (typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') {
if ((_doc as any)[k] === v) {
if (!cloned) {
_doc = { ...doc } as any
cloned = true
}
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete (_doc as any)[k]
}
}
}
newResult.push(_doc)
}
return toFindResult(newResult, result.total, lookupMap)
}

File diff suppressed because it is too large Load Diff

View File

@ -5,8 +5,8 @@ import {
metricsAggregate,
type MetricsData
} from '@hcengineering/core'
import { type SessionManager } from '@hcengineering/server-core'
import os from 'node:os'
import { type SessionManager } from './types'
/**
* @public

233
server/server/src/types.ts Normal file
View File

@ -0,0 +1,233 @@
//
// Copyright © 2022, 2023 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import {
type RequestEvent as CommunicationEvent,
type EventResult
} from '@hcengineering/communication-sdk-types'
import {
type FindMessagesGroupsParams,
type FindMessagesParams,
type Message,
type MessagesGroup
} from '@hcengineering/communication-types'
import {
type Account,
type AccountUuid,
type Class,
type Doc,
type DocumentQuery,
type Domain,
type FindOptions,
type FindResult,
type LoadModelResponse,
type MeasureContext,
type PersonId,
type Ref,
type SearchOptions,
type SearchQuery,
type SearchResult,
type SocialId,
type Timestamp,
type Tx,
type TxResult,
type WorkspaceIds,
type WorkspaceUuid
} from '@hcengineering/core'
import type { RateLimitInfo, ReqId, Request, Response } from '@hcengineering/rpc'
import type { Token } from '@hcengineering/server-token'
import type { StatisticsElement, WorkspaceStatistics } from '@hcengineering/server-core'
import type { Workspace } from './workspace'
/**
* @public
*/
export interface SessionRequest {
id: string
params: any
start: number
workspaceId?: WorkspaceUuid
}
export interface ClientSessionCtx {
ctx: MeasureContext
workspaces: Workspace[]
socialStringsToUsers: Map<PersonId, AccountUuid>
requestId: ReqId | undefined
sendResponse: (id: ReqId | undefined, msg: any) => Promise<void>
sendPong: () => void
sendError: (id: ReqId | undefined, msg: any, error: any) => Promise<void>
}
/**
* @public
*/
export interface Session {
workspaces: { wsIds: WorkspaceIds, enabled: boolean }[]
socket: ConnectionSocket
createTime: number
// Session restore information
sessionId: string
requests: Map<string, SessionRequest>
binaryMode: boolean
useCompression: boolean
total: StatisticsElement
current: StatisticsElement
mins5: StatisticsElement
lastRequest: number
lastPing: number
isUpgradeClient: () => boolean
getMode: () => string
broadcast: (ctx: MeasureContext, socket: ConnectionSocket, tx: Tx[]) => void
// Client methods
ping: (ctx: ClientSessionCtx) => Promise<void>
getUser: () => AccountUuid
getUserSocialIds: () => PersonId[]
getSocialIds: () => SocialId[]
loadModel: (ctx: ClientSessionCtx, lastModelTx: Timestamp, hash?: string) => Promise<void>
loadModelRaw: (ctx: ClientSessionCtx, lastModelTx: Timestamp, hash?: string) => Promise<LoadModelResponse | Tx[]>
getRawAccount: () => Account
findAll: <T extends Doc>(
ctx: ClientSessionCtx,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
) => Promise<void>
findAllRaw: <T extends Doc>(
ctx: ClientSessionCtx,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
) => Promise<FindResult<T>>
searchFulltext: (ctx: ClientSessionCtx, query: SearchQuery, options: SearchOptions) => Promise<void>
searchFulltextRaw: (ctx: ClientSessionCtx, query: SearchQuery, options: SearchOptions) => Promise<SearchResult>
tx: (ctx: ClientSessionCtx, tx: Tx) => Promise<void>
txRaw: (
ctx: ClientSessionCtx,
tx: Tx
) => Promise<{
result: TxResult
broadcastPromise: Promise<void>
asyncsPromise: Promise<void> | undefined
}>
loadChunk: (ctx: ClientSessionCtx, domain: Domain, idx?: number) => Promise<void>
getDomainHash: (ctx: ClientSessionCtx, domain: Domain) => Promise<void>
closeChunk: (ctx: ClientSessionCtx, idx: number) => Promise<void>
loadDocs: (ctx: ClientSessionCtx, domain: Domain, docs: Ref<Doc>[]) => Promise<void>
upload: (ctx: ClientSessionCtx, domain: Domain, docs: Doc[]) => Promise<void>
clean: (ctx: ClientSessionCtx, domain: Domain, docs: Ref<Doc>[]) => Promise<void>
eventRaw: (ctx: ClientSessionCtx, event: CommunicationEvent) => Promise<EventResult>
findMessagesRaw: (ctx: ClientSessionCtx, params: FindMessagesParams) => Promise<Message[]>
findMessagesGroupsRaw: (ctx: ClientSessionCtx, params: FindMessagesGroupsParams) => Promise<MessagesGroup[]>
}
/**
* @public
*/
export interface ConnectionSocket {
id: string
isClosed: boolean
close: () => void
send: (ctx: MeasureContext, msg: Response<any>, binary: boolean, compression: boolean) => Promise<void>
sendPong: () => void
data: () => Record<string, any>
readRequest: (buffer: Buffer, binary: boolean) => Request<any>
isBackpressure: () => boolean // In bytes
backpressure: (ctx: MeasureContext) => Promise<void>
checkState: () => boolean
}
export interface SessionInfoRecord { session: Session, socket: ConnectionSocket, tickHash: number }
/**
* @public
*/
export interface SessionManager {
// workspaces: Map<WorkspaceUuid, Workspace>
sessions: Map<string, SessionInfoRecord>
addSession: (
ctx: MeasureContext,
ws: ConnectionSocket,
token: Token,
rawToken: string,
sessionId: string | undefined
) => Promise<Session>
broadcastAll: (workspace: WorkspaceUuid, tx: Tx[], targets?: string[]) => void
close: (ctx: MeasureContext, sessionRef: Session) => Promise<void>
closeWorkspaces: (ctx: MeasureContext) => Promise<void>
scheduleMaintenance: (timeMinutes: number) => void
profiling?: {
start: () => void
stop: () => Promise<string | undefined>
}
handleRequest: (
requestCtx: MeasureContext,
service: Session,
ws: ConnectionSocket,
request: Request<any>
) => Promise<void>
handleRPC: (
requestCtx: MeasureContext,
workspaceId: WorkspaceUuid,
service: Session,
ws: ConnectionSocket,
operation: (ctx: ClientSessionCtx, rateLimit?: RateLimitInfo | undefined) => Promise<void>
) => Promise<RateLimitInfo | undefined>
createOpContext: (
ctx: MeasureContext,
sendCtx: MeasureContext,
workspaces: Workspace[],
requestId: Request<any>['id'],
service: Session,
ws: ConnectionSocket,
rateLimit: RateLimitInfo | undefined
) => ClientSessionCtx
getStatistics: () => WorkspaceStatistics[]
forceCloseWorkspace: (ctx: MeasureContext, workspace: WorkspaceUuid) => Promise<void>
getLastTxHash: (workspaceId: WorkspaceUuid) => Promise<{ lastTx: string | undefined, lastHash: string | undefined }>
}

View File

@ -13,64 +13,47 @@
// limitations under the License.
//
import { WorkspaceUuid, type MeasureContext } from '@hcengineering/core'
import { type MeasureContext, type WorkspaceUuid } from '@hcengineering/core'
import type {
AddSessionActive,
AddSessionResponse,
ConnectionSocket,
Session,
SessionManager
} from '@hcengineering/server-core'
import type { ConnectionSocket } from '@hcengineering/server-core'
import { type Response } from '@hcengineering/rpc'
import type { Token } from '@hcengineering/server-token'
import type { Session, SessionManager } from './types'
import type { Workspace } from './workspace'
export interface WebsocketData {
connectionSocket?: ConnectionSocket
payload: Token
token: string
session: Promise<AddSessionResponse> | AddSessionResponse | undefined
session: Promise<Session> | Session | undefined
url: string
}
export function doSessionOp (
data: WebsocketData,
op: (session: AddSessionActive, msg: Buffer) => void,
msg: Buffer
): void {
export function doSessionOp (data: WebsocketData, op: (session: Session, msg: Buffer) => void, msg: Buffer): void {
if (data.session instanceof Promise) {
// We need to copy since we will out of protected buffer area
const msgCopy = Buffer.copyBytesFrom(new Uint8Array(msg))
void data.session
.then((_session) => {
data.session = _session
if ('session' in _session) {
op(_session, msgCopy)
}
op(data.session, msgCopy)
})
.catch((err) => {
console.error({ message: 'Failed to process session operation', err })
})
} else {
if (data.session !== undefined && 'session' in data.session) {
if (data.session !== undefined) {
op(data.session, msg)
}
}
}
export function processRequest (
session: Session,
cs: ConnectionSocket,
context: MeasureContext,
workspaceId: WorkspaceUuid,
buff: any,
sessions: SessionManager
): void {
export function processRequest (ctx: MeasureContext, session: Session, cs: ConnectionSocket, buff: any, sessions: SessionManager): void {
try {
const request = cs.readRequest(buff, session.binaryMode)
void sessions.handleRequest(context, session, cs, request, workspaceId).catch((err) => {
context.error('failed to handle request', { err, request })
void sessions.handleRequest(ctx, session, cs, request).catch((err) => {
ctx.error('failed to handle request', { err })
})
} catch (err: any) {
if (((err.message as string) ?? '').includes('Data read, but end of buffer not reached')) {
@ -89,3 +72,19 @@ export function sendResponse (
): Promise<void> {
return socket.send(ctx, resp, session.binaryMode, session.useCompression)
}
export function getLastHashInfo (workspaces: Workspace[]): {
lastTx: Record<WorkspaceUuid, string | undefined>
lastHash: Record<WorkspaceUuid, string | undefined>
} {
const lastTx: Record<WorkspaceUuid, string | undefined> = {}
for (const workspace of workspaces) {
lastTx[workspace.wsId.uuid] = workspace.getLastTx()
}
const lastHash: Record<WorkspaceUuid, string | undefined> = {}
for (const workspace of workspaces) {
lastHash[workspace.wsId.uuid] = workspace.getLastHash()
}
return { lastTx, lastHash }
}

View File

@ -15,13 +15,9 @@
import { Analytics } from '@hcengineering/analytics'
import { type ServerApi as CommunicationApi } from '@hcengineering/communication-sdk-types'
import { type Branding, type MeasureContext, type WorkspaceIds } from '@hcengineering/core'
import type { ConnectionSocket, Pipeline, Session } from '@hcengineering/server-core'
interface TickHandler {
ticks: number
operation: () => void
}
import { isMigrationMode, isRestoringMode, isWorkspaceCreating, systemAccountUuid, type Branding, type Data, type MeasureContext, type Version, type WorkspaceIds, type WorkspaceInfoWithStatus } from '@hcengineering/core'
import type { ConnectionSocket, Pipeline } from '@hcengineering/server-core'
import type { Session } from './types'
export interface PipelinePair {
pipeline: Pipeline
@ -34,18 +30,18 @@ export type WorkspacePipelineFactory = () => Promise<PipelinePair>
*/
export class Workspace {
pipeline?: PipelinePair | Promise<PipelinePair>
upgrade: boolean = false
closing?: Promise<void>
workspaceInitCompleted: boolean = false
softShutdown: number
sessions = new Map<string, { session: Session, socket: ConnectionSocket, tickHash: number }>()
tickHandlers = new Map<string, TickHandler>()
operations: number = 0
maintenance: boolean = false
lastTx: string | undefined // TODO: Do not cache for proxy case
lastHash: string | undefined // TODO: Do not cache for proxy case
constructor (
readonly context: MeasureContext,
readonly token: string, // Account workspace update token.
@ -61,6 +57,24 @@ export class Workspace {
this.softShutdown = softShutdown
}
open (): void {
const pair = this.getPipelinePair()
if (pair instanceof Promise) {
void pair.then((it) => {
this.lastHash = it.pipeline.context.lastHash
this.lastTx = it.pipeline.context.lastTx
})
}
}
getLastTx (): string | undefined {
return this.lastTx
}
getLastHash (): string | undefined {
return this.lastHash
}
private getPipelinePair (): PipelinePair | Promise<PipelinePair> {
if (this.pipeline === undefined) {
this.pipeline = this.factory()
@ -76,7 +90,10 @@ export class Workspace {
this.pipeline = pair
}
try {
return await op(pair.pipeline, pair.communicationApi)
const result = await op(pair.pipeline, pair.communicationApi)
this.lastHash = pair.pipeline.context.lastHash
this.lastTx = pair.pipeline.context.lastTx
return result
} finally {
this.operations--
}
@ -115,8 +132,16 @@ export class Workspace {
to.cancelHandle()
})
}
}
checkHasUser (): boolean {
for (const val of this.sessions.values()) {
if (val.session.getUser() !== systemAccountUuid) {
return true
}
}
return false
}
}
function timeoutPromise (time: number): { promise: Promise<void>, cancelHandle: () => void } {
let timer: any
return {

View File

@ -31,6 +31,7 @@ import contact, {
import core, {
type Account,
AccountRole,
AccountUuid,
Blob,
Class,
Client,
@ -38,6 +39,7 @@ import core, {
MeasureContext,
PersonId,
PersonUuid,
pickPrimarySocialId,
RateLimiter,
Ref,
SocialId,
@ -45,30 +47,28 @@ import core, {
Tx,
TxCUD,
TxOperations,
type WorkspaceUuid,
type WorkspaceIds,
AccountUuid,
pickPrimarySocialId
type WorkspaceUuid
} from '@hcengineering/core'
import { Room } from '@hcengineering/love'
import { WorkspaceInfoRecord } from '@hcengineering/server-ai-bot'
import fs from 'fs'
import { Tiktoken } from 'js-tiktoken'
import { WithId } from 'mongodb'
import OpenAI from 'openai'
import { Tiktoken } from 'js-tiktoken'
import { countTokens } from '@hcengineering/openai'
import { getAccountClient } from '@hcengineering/server-client'
import { StorageAdapter } from '@hcengineering/server-core'
import { jsonToMarkup, markupToText } from '@hcengineering/text'
import { markdownToMarkup } from '@hcengineering/text-markdown'
import config from '../config'
import { DbStorage } from '../storage'
import { HistoryRecord } from '../types'
import { getGlobalPerson } from '../utils/account'
import { createChatCompletionWithTools, requestSummary } from '../utils/openai'
import { connectPlatform } from '../utils/platform'
import { LoveController } from './love'
import { DbStorage } from '../storage'
import { jsonToMarkup, markupToText } from '@hcengineering/text'
import { markdownToMarkup } from '@hcengineering/text-markdown'
import { countTokens } from '@hcengineering/openai'
import { getAccountClient } from '@hcengineering/server-client'
import { getGlobalPerson } from '../utils/account'
export class WorkspaceClient {
client: Client | undefined
@ -109,6 +109,8 @@ export class WorkspaceClient {
private async ensureEmployee (client: Client): Promise<void> {
const me: Account = {
uuid: this.personUuid,
targetWorkspace: this.wsIds.uuid,
roles: { },
role: AccountRole.User,
primarySocialId: this.primarySocialId._id,
socialIds: this.socialIds.map((it) => it._id),

View File

@ -31,7 +31,8 @@ import core, {
type SocialId,
type Space,
type TxCreateDoc,
type TxOperations
type TxOperations,
type WorkspaceUuid
} from '@hcengineering/core'
import { type AccountClient, getClient as getAccountClient } from '@hcengineering/account-client'
import chunter from '@hcengineering/chunter'
@ -78,7 +79,9 @@ describe('rest-api-server', () => {
testCtx,
{
uuid: apiWorkspace1.info.account,
roles: {},
role: apiWorkspace1.info.role,
targetWorkspace: apiWorkspace1.workspaceId,
primarySocialId: pickPrimarySocialId(socialIds)._id,
socialIds: socialIds.map((si) => si._id),
fullSocialIds: socialIds
@ -92,7 +95,9 @@ describe('rest-api-server', () => {
testCtx,
{
uuid: apiWorkspace2.info.account,
roles: { },
role: apiWorkspace2.info.role,
targetWorkspace: apiWorkspace2.workspaceId,
primarySocialId: pickPrimarySocialId(socialIds)._id,
socialIds: socialIds.map((si) => si._id),
fullSocialIds: socialIds
@ -118,7 +123,7 @@ describe('rest-api-server', () => {
const account = await conn.getAccount()
expect(account.primarySocialId).toEqual(expect.any(String))
expect(account.role).toBe('USER')
// expect(account.role).toBe('USER')
// expect(account.space).toBe(core.space.Model)
// expect(account.modifiedBy).toBe(core.account.System)
// expect(account.createdBy).toBe(core.account.System)