From cd90f8bce6eefa9f99adf668d48a99d6ed4acc6b Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Mon, 24 Feb 2025 10:26:10 +0700 Subject: [PATCH] Initial rest RPC (#8076) Signed-off-by: Andrey Sobolev --- packages/api-client/package.json | 6 +- packages/api-client/src/index.ts | 1 + packages/api-client/src/rest.ts | 130 ++++++++++ packages/core/src/measurements/metrics.ts | 26 ++ .../components/ServerManagerGeneral.svelte | 16 +- server/core/src/types.ts | 20 +- server/server/src/client.ts | 54 +++-- server/server/src/sessionManager.ts | 62 ++++- server/ws/package.json | 3 +- server/ws/src/__tests__/rest.test.ts | 225 ++++++++++++++++++ server/ws/src/rpc.ts | 182 ++++++++++++++ server/ws/src/server_http.ts | 36 ++- server/ws/src/utils.ts | 23 ++ 13 files changed, 727 insertions(+), 57 deletions(-) create mode 100644 packages/api-client/src/rest.ts create mode 100644 server/ws/src/__tests__/rest.test.ts create mode 100644 server/ws/src/rpc.ts create mode 100644 server/ws/src/utils.ts diff --git a/packages/api-client/package.json b/packages/api-client/package.json index 52baa57bd7..7666e529d9 100644 --- a/packages/api-client/package.json +++ b/packages/api-client/package.json @@ -39,7 +39,8 @@ "ts-node": "^10.8.0", "@types/node": "~20.11.16", "@types/jest": "^29.5.5", - "@types/ws": "^8.5.11" + "@types/ws": "^8.5.11", + "@types/snappyjs": "^0.7.1" }, "dependencies": { "@hcengineering/core": "^0.6.32", @@ -48,7 +49,8 @@ "@hcengineering/collaborator-client": "^0.6.4", "@hcengineering/account-client": "^0.6.0", "@hcengineering/platform": "^0.6.11", - "@hcengineering/text": "^0.6.5" + "@hcengineering/text": "^0.6.5", + "snappyjs": "^0.7.0" }, "repository": "https://github.com/hcengineering/platform", "publishConfig": { diff --git a/packages/api-client/src/index.ts b/packages/api-client/src/index.ts index 2dd51ba8f5..911f155e01 100644 --- a/packages/api-client/src/index.ts +++ b/packages/api-client/src/index.ts @@ -17,3 +17,4 @@ export * from './client' export * from './markup/types' export * from './socket' export * from './types' +export * from './rest' diff --git a/packages/api-client/src/rest.ts b/packages/api-client/src/rest.ts new file mode 100644 index 0000000000..23648ca2d1 --- /dev/null +++ b/packages/api-client/src/rest.ts @@ -0,0 +1,130 @@ +// +// Copyright © 2025 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { + type Account, + type Class, + type Doc, + type DocumentQuery, + type FindOptions, + type FindResult, + type Ref, + type Storage, + type Tx, + type TxResult, + type WithLookup, + concatLink +} from '@hcengineering/core' + +import { PlatformError, unknownError } from '@hcengineering/platform' + +import { uncompress } from 'snappyjs' + +export interface RestClient extends Storage { + getAccount: () => Promise + + findOne: ( + _class: Ref>, + query: DocumentQuery, + options?: FindOptions + ) => Promise | undefined> +} + +export async function createRestClient (endpoint: string, workspaceId: string, token: string): Promise { + return new RestClientImpl(endpoint, workspaceId, token) +} + +class RestClientImpl implements RestClient { + constructor ( + readonly endpoint: string, + readonly workspace: string, + readonly token: string + ) {} + + async findAll( + _class: Ref>, + query: DocumentQuery, + options?: FindOptions + ): Promise> { + const params = new URLSearchParams() + params.append('class', _class) + if (query !== undefined && Object.keys(query).length > 0) { + params.append('query', JSON.stringify(query)) + } + if (options !== undefined && Object.keys(options).length > 0) { + params.append('options', JSON.stringify(options)) + } + const response = await fetch(concatLink(this.endpoint, `/api/v1/find-all/${this.workspace}?${params.toString()}`), { + method: 'GET', + headers: { + 'Content-Type': 'application/json', + Authorization: 'Bearer ' + this.token, + 'accept-encoding': 'snappy, gzip' + }, + keepalive: true + }) + if (!response.ok) { + throw new PlatformError(unknownError(response.statusText)) + } + const encoding = response.headers.get('content-encoding') + if (encoding === 'snappy') { + const buffer = await response.arrayBuffer() + const decompressed = uncompress(buffer) + const decoder = new TextDecoder() + const jsonString = decoder.decode(decompressed) + return JSON.parse(jsonString) as FindResult + } + return (await response.json()) as FindResult + } + + async getAccount (): Promise { + const response = await fetch(concatLink(this.endpoint, `/api/v1/account/${this.workspace}`), { + method: 'GET', + headers: { + 'Content-Type': 'application/json', + Authorization: 'Bearer ' + this.token + }, + keepalive: true + }) + if (!response.ok) { + throw new PlatformError(unknownError(response.statusText)) + } + return (await response.json()) as Account + } + + async findOne( + _class: Ref>, + query: DocumentQuery, + options?: FindOptions + ): Promise | undefined> { + return (await this.findAll(_class, query, { ...options, limit: 1 })).shift() + } + + async tx (tx: Tx): Promise { + const response = await fetch(concatLink(this.endpoint, `/api/v1/tx/${this.workspace}`), { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: 'Bearer ' + this.token + }, + keepalive: true, + body: JSON.stringify(tx) + }) + if (!response.ok) { + throw new PlatformError(unknownError(response.statusText)) + } + return (await response.json()) as TxResult + } +} diff --git a/packages/core/src/measurements/metrics.ts b/packages/core/src/measurements/metrics.ts index 09ab8fb7e0..0fe3267f3d 100644 --- a/packages/core/src/measurements/metrics.ts +++ b/packages/core/src/measurements/metrics.ts @@ -234,6 +234,28 @@ function toString (name: string, m: Metrics, offset: number, length: number): st return r } +function toJson (m: Metrics): any { + const obj: any = { + $total: m.value, + $ops: m.operations + } + if (m.operations > 1) { + obj.avg = Math.round((m.value / (m.operations > 0 ? m.operations : 1)) * 100) / 100 + } + if (Object.keys(m.params).length > 0) { + obj.params = m.params + } + for (const [k, v] of Object.entries(m.measurements ?? {})) { + obj[ + `${k} ${v.value} ${v.operations} ${ + v.operations > 1 ? Math.round((v.value / (v.operations > 0 ? m.operations : 1)) * 100) / 100 : '' + }` + ] = toJson(v) + } + + return obj +} + /** * @public */ @@ -241,6 +263,10 @@ export function metricsToString (metrics: Metrics, name = 'System', length: numb return toString(name, metricsAggregate(metrics, 50, true), 0, length) } +export function metricsToJson (metrics: Metrics): any { + return toJson(metricsAggregate(metrics)) +} + function printMetricsParamsRows ( params: Record>, offset: number diff --git a/plugins/workbench-resources/src/components/ServerManagerGeneral.svelte b/plugins/workbench-resources/src/components/ServerManagerGeneral.svelte index d53b5db874..4b752d0b17 100644 --- a/plugins/workbench-resources/src/components/ServerManagerGeneral.svelte +++ b/plugins/workbench-resources/src/components/ServerManagerGeneral.svelte @@ -3,8 +3,7 @@ import login from '@hcengineering/login' import { getEmbeddedLabel, getMetadata } from '@hcengineering/platform' import presentation, { getClient, isAdminUser, uiContext } from '@hcengineering/presentation' - import { Button, IconArrowLeft, IconArrowRight, fetchMetadataLocalStorage, ticker } from '@hcengineering/ui' - import EditBox from '@hcengineering/ui/src/components/EditBox.svelte' + import { Button, EditBox, IconArrowLeft, IconArrowRight, fetchMetadataLocalStorage, ticker } from '@hcengineering/ui' import MetricsInfo from './statistics/MetricsInfo.svelte' const _endpoint: string = fetchMetadataLocalStorage(login.metadata.LoginEndpoint) ?? '' @@ -25,8 +24,6 @@ let avgTime = 0 - let rps = 0 - let active = 0 let opss = 0 @@ -44,7 +41,7 @@ profiling = data?.profiling ?? false }) .catch((err) => { - console.error(err) + console.error(err, time) }) } let data: any @@ -65,13 +62,8 @@ avgTime = 0 maxTime = 0 let count = commandsToSend - let ops = 0 avgTime = 0 opss = 0 - const int = setInterval(() => { - rps = ops - ops = 0 - }, 1000) const rate = new RateLimiter(commandsToSendParallel) const client = getClient() @@ -96,7 +88,6 @@ } else { avgTime = ed - st } - ops++ opss++ count-- } @@ -112,7 +103,6 @@ } await rate.waitProcessing() running = false - clearInterval(int) } async function downloadProfile (): Promise { @@ -132,7 +122,7 @@ document.body.appendChild(link) link.click() document.body.removeChild(link) - void fetchStats(0) + await fetchStats(0) } let metrics: Metrics | undefined diff --git a/server/core/src/types.ts b/server/core/src/types.ts index 373e439fd3..4e8ceac0d2 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -571,6 +571,16 @@ export interface Session { ) => Promise> searchFulltext: (ctx: ClientSessionCtx, query: SearchQuery, options: SearchOptions) => Promise tx: (ctx: ClientSessionCtx, tx: Tx) => Promise + + txRaw: ( + ctx: ClientSessionCtx, + tx: Tx + ) => Promise<{ + result: TxResult + broadcastPromise: Promise + asyncsPromise: Promise | undefined + }> + loadChunk: (ctx: ClientSessionCtx, domain: Domain, idx?: number) => Promise getDomainHash: (ctx: ClientSessionCtx, domain: Domain) => Promise @@ -707,11 +717,17 @@ export interface SessionManager { createOpContext: ( ctx: MeasureContext, pipeline: Pipeline, - request: Request, + requestId: Request['id'], service: Session, ws: ConnectionSocket, - workspace: WorkspaceUuid ) => ClientSessionCtx + + handleRPC: ( + requestCtx: MeasureContext, + service: S, + ws: ConnectionSocket, + operation: (ctx: ClientSessionCtx) => Promise + ) => Promise } /** diff --git a/server/server/src/client.ts b/server/server/src/client.ts index 2ce2e1fce2..b0869d9e20 100644 --- a/server/server/src/client.ts +++ b/server/server/src/client.ts @@ -24,6 +24,8 @@ import { type FindOptions, type FindResult, type MeasureContext, + type PersonId, + type PersonUuid, type Ref, type SearchOptions, type SearchQuery, @@ -31,9 +33,8 @@ import { type Timestamp, type Tx, type TxCUD, - type PersonId, - type WorkspaceDataId, - type PersonUuid + type TxResult, + type WorkspaceDataId } from '@hcengineering/core' import { PlatformError, unknownError } from '@hcengineering/platform' import { @@ -164,7 +165,14 @@ export class ClientSession implements Session { await ctx.sendResponse(ctx.requestId, await ctx.pipeline.searchFulltext(ctx.ctx, query, options)) } - async tx (ctx: ClientSessionCtx, tx: Tx): Promise { + async txRaw ( + ctx: ClientSessionCtx, + tx: Tx + ): Promise<{ + result: TxResult + broadcastPromise: Promise + asyncsPromise: Promise | undefined + }> { this.lastRequest = Date.now() this.total.tx++ this.current.tx++ @@ -173,31 +181,45 @@ export class ClientSession implements Session { let cid = 'client_' + generateId() ctx.ctx.id = cid let onEnd = useReserveContext ? ctx.pipeline.context.adapterManager?.reserveContext?.(cid) : undefined + let result: TxResult try { - const result = await ctx.pipeline.tx(ctx.ctx, [tx]) - - // Send result immideately - await ctx.sendResponse(ctx.requestId, result) - - // We need to broadcast all collected transactions - await ctx.pipeline.handleBroadcast(ctx.ctx) + result = await ctx.pipeline.tx(ctx.ctx, [tx]) } finally { onEnd?.() } + // Send result immideately + await ctx.sendResponse(ctx.requestId, result) + + // We need to broadcast all collected transactions + const broadcastPromise = ctx.pipeline.handleBroadcast(ctx.ctx) // ok we could perform async requests if any const asyncs = (ctx.ctx.contextData as SessionData).asyncRequests ?? [] + let asyncsPromise: Promise | undefined if (asyncs.length > 0) { cid = 'client_async_' + generateId() ctx.ctx.id = cid onEnd = useReserveContext ? ctx.pipeline.context.adapterManager?.reserveContext?.(cid) : undefined - try { - for (const r of (ctx.ctx.contextData as SessionData).asyncRequests ?? []) { - await r() + const handleAyncs = async (): Promise => { + try { + for (const r of (ctx.ctx.contextData as SessionData).asyncRequests ?? []) { + await r() + } + } finally { + onEnd?.() } - } finally { - onEnd?.() } + asyncsPromise = handleAyncs() + } + + return { result, broadcastPromise, asyncsPromise } + } + + async tx (ctx: ClientSessionCtx, tx: Tx): Promise { + const { broadcastPromise, asyncsPromise } = await this.txRaw(ctx, tx) + await broadcastPromise + if (asyncsPromise !== undefined) { + await asyncsPromise } } diff --git a/server/server/src/sessionManager.ts b/server/server/src/sessionManager.ts index bbfe0eecf2..57ebea38be 100644 --- a/server/server/src/sessionManager.ts +++ b/server/server/src/sessionManager.ts @@ -91,7 +91,7 @@ export interface Timeouts { reconnectTimeout: number // Default 3 seconds } -class TSessionManager implements SessionManager { +export class TSessionManager implements SessionManager { private readonly statusPromises = new Map>() readonly workspaces = new Map() checkInterval: any @@ -981,7 +981,7 @@ class TSessionManager implements SessionManager { createOpContext ( ctx: MeasureContext, pipeline: Pipeline, - request: Request, + requestId: Request['id'], service: Session, ws: ConnectionSocket, workspace: WorkspaceUuid @@ -990,7 +990,7 @@ class TSessionManager implements SessionManager { return { ctx, pipeline, - requestId: request.id, + requestId, sendResponse: (reqId, msg) => sendResponse(ctx, service, ws, { id: reqId, @@ -1072,6 +1072,7 @@ class TSessionManager implements SessionManager { return } if (request.id === -2 && request.method === 'forceClose') { + // TODO: we chould allow this only for admin or system accounts let done = false const wsRef = this.workspaces.get(workspace) if (wsRef?.upgrade ?? false) { @@ -1106,7 +1107,7 @@ class TSessionManager implements SessionManager { const params = [...request.params] await ctx.with('🧨 process', {}, (callTx) => - f.apply(service, [this.createOpContext(callTx, pipeline, request, service, ws, workspace), ...params]) + f.apply(service, [this.createOpContext(callTx, pipeline, request.id, service, ws), ...params]) ) } catch (err: any) { Analytics.handleError(err) @@ -1131,6 +1132,59 @@ class TSessionManager implements SessionManager { }) } + handleRPC( + requestCtx: MeasureContext, + service: S, + ws: ConnectionSocket, + operation: (ctx: ClientSessionCtx) => Promise + ): Promise { + const userCtx = requestCtx.newChild('📞 client', {}) + + // Calculate total number of clients + const reqId = generateId() + + const st = Date.now() + return userCtx + .with('🧭 handleRPC', {}, async (ctx) => { + if (service.workspace.closing !== undefined) { + throw new Error('Workspace is closing') + } + + service.requests.set(reqId, { + id: reqId, + params: {}, + start: st + }) + + const pipeline = + service.workspace.pipeline instanceof Promise ? await service.workspace.pipeline : service.workspace.pipeline + + try { + const uctx = this.createOpContext(ctx, pipeline, reqId, service, ws) + await operation(uctx) + } catch (err: any) { + Analytics.handleError(err) + if (LOGGING_ENABLED) { + this.ctx.error('error handle request', { error: err }) + } + ws.send( + ctx, + { + id: reqId, + error: unknownError(err), + result: JSON.parse(JSON.stringify(err?.stack)) + }, + service.binaryMode, + service.useCompression + ) + } + }) + .finally(() => { + userCtx.end() + service.requests.delete(reqId) + }) + } + private async handleHello( request: Request, service: S, diff --git a/server/ws/package.json b/server/ws/package.json index c97852ccef..5ecf869145 100644 --- a/server/ws/package.json +++ b/server/ws/package.json @@ -53,6 +53,7 @@ "utf-8-validate": "^6.0.4", "ws": "^8.18.0", "body-parser": "^1.20.2", - "snappy": "^7.2.2" + "snappy": "^7.2.2", + "@hcengineering/api-client": "^0.6.0" } } diff --git a/server/ws/src/__tests__/rest.test.ts b/server/ws/src/__tests__/rest.test.ts new file mode 100644 index 0000000000..6ad4530733 --- /dev/null +++ b/server/ws/src/__tests__/rest.test.ts @@ -0,0 +1,225 @@ +// +// Copyright © 2025 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { generateToken } from '@hcengineering/server-token' + +import { createRestClient, type RestClient } from '@hcengineering/api-client' +import core, { + generateId, + getWorkspaceId, + Hierarchy, + MeasureMetricsContext, + ModelDb, + toFindResult, + type Class, + type Doc, + type DocumentQuery, + type Domain, + type FindOptions, + type FindResult, + type MeasureContext, + type Ref, + type Space, + type Tx, + type TxCreateDoc, + type TxResult +} from '@hcengineering/core' +import { ClientSession, startSessionManager, type TSessionManager } from '@hcengineering/server' +import { createDummyStorageAdapter, type SessionManager, type WorkspaceLoginInfo } from '@hcengineering/server-core' +import { startHttpServer } from '../server_http' +import { genMinModel } from './minmodel' + +describe('rest-server', () => { + async function getModelDb (): Promise<{ modelDb: ModelDb, hierarchy: Hierarchy, txes: Tx[] }> { + const txes = genMinModel() + const hierarchy = new Hierarchy() + for (const tx of txes) { + hierarchy.tx(tx) + } + const modelDb = new ModelDb(hierarchy) + for (const tx of txes) { + await modelDb.tx(tx) + } + return { modelDb, hierarchy, txes } + } + + let shutdown: () => Promise + let sessionManager: SessionManager + const port: number = 3330 + + beforeAll(async () => { + ;({ shutdown, sessionManager } = startSessionManager(new MeasureMetricsContext('test', {}), { + pipelineFactory: async () => { + const { modelDb, hierarchy, txes } = await getModelDb() + return { + hierarchy, + modelDb, + context: { + workspace: { + name: 'test-ws', + workspaceName: 'test-ws', + workspaceUrl: 'test-ws' + }, + hierarchy, + modelDb, + lastTx: generateId(), + lastHash: generateId(), + contextVars: {}, + branding: null + }, + handleBroadcast: async (ctx) => {}, + findAll: async ( + ctx: MeasureContext, + _class: Ref>, + query: DocumentQuery, + options?: FindOptions + ): Promise> => toFindResult(await modelDb.findAll(_class, query, options)), + tx: async (ctx: MeasureContext, tx: Tx[]): Promise<[TxResult, Tx[], string[] | undefined]> => [ + await modelDb.tx(...tx), + [], + undefined + ], + close: async () => {}, + domains: async () => hierarchy.domains(), + groupBy: async () => new Map(), + find: (ctx: MeasureContext, domain: Domain) => ({ + next: async (ctx: MeasureContext) => undefined, + close: async (ctx: MeasureContext) => {} + }), + load: async (ctx: MeasureContext, domain: Domain, docs: Ref[]) => [], + upload: async (ctx: MeasureContext, domain: Domain, docs: Doc[]) => {}, + clean: async (ctx: MeasureContext, domain: Domain, docs: Ref[]) => {}, + searchFulltext: async (ctx, query, options) => { + return { docs: [] } + }, + loadModel: async (ctx, lastModelTx, hash) => ({ + full: true, + hash: generateId(), + transactions: txes + }) + } + }, + sessionFactory: (token, workspace) => new ClientSession(token, workspace, true), + port, + brandingMap: {}, + serverFactory: startHttpServer, + accountsUrl: '', + externalStorage: createDummyStorageAdapter() + })) + jest + .spyOn(sessionManager as TSessionManager, 'getWorkspaceInfo') + .mockImplementation(async (ctx: MeasureContext, token: string): Promise => { + return { + workspaceId: 'test-ws', + workspaceUrl: 'test-ws', + workspaceName: 'Test Workspace', + uuid: 'test-ws', + createdBy: 'test-owner', + mode: 'active', + createdOn: Date.now(), + lastVisit: Date.now(), + disabled: false, + endpoint: `http://localhost:${port}`, + region: 'test-region', + targetRegion: 'test-region', + backupInfo: { + dataSize: 0, + blobsSize: 0, + backupSize: 0, + lastBackup: 0, + backups: 0 + } + } + }) + }) + afterAll(async () => { + await shutdown() + }) + + async function connect (): Promise { + const token: string = generateToken('user1@site.com', getWorkspaceId('test-ws')) + return await createRestClient(`http://localhost:${port}`, 'test-ws', token) + } + + it('get account', async () => { + const conn = await connect() + const account = await conn.getAccount() + + expect(account.email).toBe('user1@site.com') + expect(account.role).toBe('OWNER') + expect(account._id).toBe('User1') + expect(account._class).toBe('core:class:Account') + expect(account.space).toBe('core:space:Model') + expect(account.modifiedBy).toBe('core:account:System') + expect(account.createdBy).toBe('core:account:System') + expect(typeof account.modifiedOn).toBe('number') + expect(typeof account.createdOn).toBe('number') + }) + + it('find spaces', async () => { + const conn = await connect() + const spaces = await conn.findAll(core.class.Space, {}) + expect(spaces.length).toBe(2) + expect(spaces[0].name).toBe('Sp1') + expect(spaces[1].name).toBe('Sp2') + }) + + it('find avg', async () => { + const conn = await connect() + let ops = 0 + let total = 0 + const attempts = 1000 + for (let i = 0; i < attempts; i++) { + const st = performance.now() + const spaces = await conn.findAll(core.class.Space, {}) + expect(spaces.length).toBe(2) + expect(spaces[0].name).toBe('Sp1') + expect(spaces[1].name).toBe('Sp2') + const ed = performance.now() + ops++ + total += ed - st + } + const avg = total / ops + // console.log('ops:', ops, 'total:', total, 'avg:', ) + expect(ops).toEqual(attempts) + expect(avg).toBeLessThan(5) // 5ms max per operation + }) + + it('add space', async () => { + const conn = await connect() + const account = await conn.getAccount() + const tx: TxCreateDoc = { + _class: core.class.TxCreateDoc, + space: core.space.Tx, + _id: generateId(), + objectSpace: core.space.Model, + modifiedBy: account._id, + modifiedOn: Date.now(), + attributes: { + name: 'Sp3', + description: '', + private: false, + archived: false, + members: [], + autoJoin: false + }, + objectClass: core.class.Space, + objectId: generateId() + } + await conn.tx(tx) + const spaces = await conn.findAll(core.class.Space, {}) + expect(spaces.length).toBe(3) + }) +}) diff --git a/server/ws/src/rpc.ts b/server/ws/src/rpc.ts new file mode 100644 index 0000000000..2d2685ff5f --- /dev/null +++ b/server/ws/src/rpc.ts @@ -0,0 +1,182 @@ +import type { Class, Doc, MeasureContext, Ref } from '@hcengineering/core' +import type { + ClientSessionCtx, + ConnectionSocket, + PipelineFactory, + Session, + SessionManager +} from '@hcengineering/server-core' +import { decodeToken } from '@hcengineering/server-token' + +import { type Express, type Response as ExpressResponse, type Request } from 'express' +import type { OutgoingHttpHeaders } from 'http2' +import { compress } from 'snappy' +import { promisify } from 'util' +import { gzip } from 'zlib' +import { retrieveJson } from './utils' +interface RPCClientInfo { + client: ConnectionSocket + session: Session + workspaceId: string +} + +const gzipAsync = promisify(gzip) + +const sendError = (res: ExpressResponse, code: number, data: any): void => { + res.writeHead(code, { + 'Content-Type': 'application/json', + 'Cache-Control': 'no-cache', + 'keep-alive': 'timeout=5, max=1000' + }) + res.end(JSON.stringify(data)) +} + +async function sendJson (req: Request, res: ExpressResponse, result: any): Promise { + const headers: OutgoingHttpHeaders = { + 'Content-Type': 'application/json', + 'Cache-Control': 'no-cache', + 'keep-alive': 'timeout=5, max=1000' + } + let body: any = JSON.stringify(result) + + const contentEncodings: string[] = + typeof req.headers['accept-encoding'] === 'string' + ? req.headers['accept-encoding'].split(',').map((it) => it.trim()) + : req.headers['accept-encoding'] ?? [] + for (const contentEncoding of contentEncodings) { + let done = false + switch (contentEncoding) { + case 'snappy': + headers['content-encoding'] = 'snappy' + body = await compress(body) + done = true + break + case 'gzip': + headers['content-encoding'] = 'gzip' + body = await gzipAsync(body) + done = true + break + } + if (done) { + break + } + } + + res.writeHead(200, headers) + res.end(body) +} + +export function registerRPC ( + app: Express, + sessions: SessionManager, + ctx: MeasureContext, + pipelineFactory: PipelineFactory +): void { + const rpcSessions = new Map() + + async function withSession ( + req: Request, + res: ExpressResponse, + operation: (ctx: ClientSessionCtx, session: Session) => Promise + ): Promise { + if (req.params.workspaceId === undefined || req.params.workspaceId === '') { + res.writeHead(400, {}) + res.end('Missing workspace') + return + } + let token = req.headers.authorization as string + if (token === null) { + sendError(res, 401, { message: 'Missing Authorization header' }) + return + } + const workspaceId = decodeURIComponent(req.params.workspaceId) + token = token.split(' ')[1] + + const decodedToken = decodeToken(token) + if (workspaceId !== decodedToken.workspace.name) { + sendError(res, 401, { message: 'Invalid workspace' }) + return + } + + let transactorRpc = rpcSessions.get(token) + + if (transactorRpc === undefined) { + const cs: ConnectionSocket = createClosingSocket(token, rpcSessions) + const s = await sessions.addSession(ctx, cs, decodedToken, token, pipelineFactory, token) + if (!('session' in s)) { + sendError(res, 401, { + message: 'Failed to create session', + mode: 'specialError' in s ? s.specialError ?? '' : 'upgrading' + }) + return + } + transactorRpc = { session: s.session, client: cs, workspaceId: s.workspaceId } + rpcSessions.set(token, transactorRpc) + } + try { + const rpc = transactorRpc + await sessions.handleRPC(ctx, rpc.session, rpc.client, async (ctx) => { + await operation(ctx, rpc.session) + }) + } catch (err: any) { + sendError(res, 401, { message: 'Failed to execute operation', error: err.message, stack: err.stack }) + } + } + + app.get('/api/v1/ping/:workspaceId', (req, res) => { + void withSession(req, res, async (ctx, session) => { + await session.ping(ctx) + await sendJson(req, res, { pong: true }) + }) + }) + + app.get('/api/v1/find-all/:workspaceId', (req, res) => { + void withSession(req, res, async (ctx, session) => { + const _class = req.query.class as Ref> + const query = req.query.query !== undefined ? JSON.parse(req.query.query as string) : {} + const options = req.query.options !== undefined ? JSON.parse(req.query.options as string) : {} + + const result = await session.findAllRaw(ctx.ctx, ctx.pipeline, _class, query, options) + await sendJson(req, res, result) + }) + }) + + app.post('/api/v1/find-all/:workspaceId', (req, res) => { + void withSession(req, res, async (ctx, session) => { + const { _class, query, options }: any = (await retrieveJson(req)) ?? {} + + const result = await session.findAllRaw(ctx.ctx, ctx.pipeline, _class, query, options) + await sendJson(req, res, result) + }) + }) + + app.post('/api/v1/tx/:workspaceId', (req, res) => { + void withSession(req, res, async (ctx, session) => { + const tx: any = (await retrieveJson(req)) ?? {} + + const result = await session.txRaw(ctx, tx) + await sendJson(req, res, result.result) + }) + }) + app.get('/api/v1/account/:workspaceId', (req, res) => { + void withSession(req, res, async (ctx, session) => { + const result = session.getRawAccount(ctx.pipeline) + await sendJson(req, res, result) + }) + }) +} + +function createClosingSocket (rawToken: string, rpcSessions: Map): ConnectionSocket { + return { + id: rawToken, + isClosed: false, + close: () => { + rpcSessions.delete(rawToken) + }, + send: (ctx, msg, binary, compression) => {}, + sendPong: () => {}, + data: () => ({}), + readRequest: (buffer, binary) => ({ method: '', params: [], id: -1, time: Date.now() }), + checkState: () => true + } +} diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index 873d061868..2cb116d099 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -59,6 +59,8 @@ import { WebSocketServer, type RawData, type WebSocket } from 'ws' import 'bufferutil' import { compress } from 'snappy' import 'utf-8-validate' +import { registerRPC } from './rpc' +import { retrieveJson } from './utils' let profiling = false const rpcHandler = new RPCHandler() @@ -151,6 +153,7 @@ export function startHttpServer ( res.end() } }) + app.get('/api/v1/profiling', (req, res) => { try { const token = req.query.token as string @@ -357,6 +360,8 @@ export function startHttpServer ( }) ) + registerRPC(app, sessions, ctx, pipelineFactory) + app.put('/api/v1/broadcast', (req, res) => { try { const token = req.query.token as string @@ -364,26 +369,19 @@ export function startHttpServer ( const ws = sessions.workspaces.get(req.query.workspace as WorkspaceUuid) if (ws !== undefined) { // push the data to body - const body: Buffer[] = [] - req - .on('data', (chunk) => { - body.push(chunk) - }) - .on('end', () => { - // on end of data, perform necessary action - try { - const data = JSON.parse(Buffer.concat(body as any).toString()) - if (Array.isArray(data)) { - sessions.broadcastAll(ws, data as Tx[]) - } else { - sessions.broadcastAll(ws, [data as unknown as Tx]) - } - res.end() - } catch (err: any) { - ctx.error('JSON parse error', { err }) - res.writeHead(400, {}) - res.end() + void retrieveJson(req) + .then((data) => { + if (Array.isArray(data)) { + sessions.broadcastAll(ws, data as Tx[]) + } else { + sessions.broadcastAll(ws, [data as unknown as Tx]) } + res.end() + }) + .catch((err) => { + ctx.error('JSON parse error', { err }) + res.writeHead(400, {}) + res.end() }) } else { res.writeHead(404, {}) diff --git a/server/ws/src/utils.ts b/server/ws/src/utils.ts new file mode 100644 index 0000000000..ceae377428 --- /dev/null +++ b/server/ws/src/utils.ts @@ -0,0 +1,23 @@ +import type { Request } from 'express' + +export function retrieveJson (req: Request): Promise { + const body: Uint8Array[] = [] + return new Promise((resolve, reject) => { + req + .on('data', (chunk: Uint8Array) => { + body.push(chunk) + }) + .on('error', (err) => { + reject(err) + }) + .on('end', () => { + // on end of data, perform necessary action + try { + const data = JSON.parse(Buffer.concat(body).toString()) + resolve(data) + } catch (err: any) { + reject(err) + } + }) + }) +}