Initial rest RPC (#8076)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2025-02-24 10:26:10 +07:00 committed by Andrey Sobolev
parent 0d76e68516
commit cd90f8bce6
No known key found for this signature in database
GPG Key ID: BD80F68D68D8F7F2
13 changed files with 727 additions and 57 deletions

View File

@ -39,7 +39,8 @@
"ts-node": "^10.8.0",
"@types/node": "~20.11.16",
"@types/jest": "^29.5.5",
"@types/ws": "^8.5.11"
"@types/ws": "^8.5.11",
"@types/snappyjs": "^0.7.1"
},
"dependencies": {
"@hcengineering/core": "^0.6.32",
@ -48,7 +49,8 @@
"@hcengineering/collaborator-client": "^0.6.4",
"@hcengineering/account-client": "^0.6.0",
"@hcengineering/platform": "^0.6.11",
"@hcengineering/text": "^0.6.5"
"@hcengineering/text": "^0.6.5",
"snappyjs": "^0.7.0"
},
"repository": "https://github.com/hcengineering/platform",
"publishConfig": {

View File

@ -17,3 +17,4 @@ export * from './client'
export * from './markup/types'
export * from './socket'
export * from './types'
export * from './rest'

View File

@ -0,0 +1,130 @@
//
// Copyright © 2025 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import {
type Account,
type Class,
type Doc,
type DocumentQuery,
type FindOptions,
type FindResult,
type Ref,
type Storage,
type Tx,
type TxResult,
type WithLookup,
concatLink
} from '@hcengineering/core'
import { PlatformError, unknownError } from '@hcengineering/platform'
import { uncompress } from 'snappyjs'
export interface RestClient extends Storage {
getAccount: () => Promise<Account>
findOne: <T extends Doc>(
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
) => Promise<WithLookup<T> | undefined>
}
export async function createRestClient (endpoint: string, workspaceId: string, token: string): Promise<RestClient> {
return new RestClientImpl(endpoint, workspaceId, token)
}
class RestClientImpl implements RestClient {
constructor (
readonly endpoint: string,
readonly workspace: string,
readonly token: string
) {}
async findAll<T extends Doc>(
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
const params = new URLSearchParams()
params.append('class', _class)
if (query !== undefined && Object.keys(query).length > 0) {
params.append('query', JSON.stringify(query))
}
if (options !== undefined && Object.keys(options).length > 0) {
params.append('options', JSON.stringify(options))
}
const response = await fetch(concatLink(this.endpoint, `/api/v1/find-all/${this.workspace}?${params.toString()}`), {
method: 'GET',
headers: {
'Content-Type': 'application/json',
Authorization: 'Bearer ' + this.token,
'accept-encoding': 'snappy, gzip'
},
keepalive: true
})
if (!response.ok) {
throw new PlatformError(unknownError(response.statusText))
}
const encoding = response.headers.get('content-encoding')
if (encoding === 'snappy') {
const buffer = await response.arrayBuffer()
const decompressed = uncompress(buffer)
const decoder = new TextDecoder()
const jsonString = decoder.decode(decompressed)
return JSON.parse(jsonString) as FindResult<T>
}
return (await response.json()) as FindResult<T>
}
async getAccount (): Promise<Account> {
const response = await fetch(concatLink(this.endpoint, `/api/v1/account/${this.workspace}`), {
method: 'GET',
headers: {
'Content-Type': 'application/json',
Authorization: 'Bearer ' + this.token
},
keepalive: true
})
if (!response.ok) {
throw new PlatformError(unknownError(response.statusText))
}
return (await response.json()) as Account
}
async findOne<T extends Doc>(
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<WithLookup<T> | undefined> {
return (await this.findAll(_class, query, { ...options, limit: 1 })).shift()
}
async tx (tx: Tx): Promise<TxResult> {
const response = await fetch(concatLink(this.endpoint, `/api/v1/tx/${this.workspace}`), {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: 'Bearer ' + this.token
},
keepalive: true,
body: JSON.stringify(tx)
})
if (!response.ok) {
throw new PlatformError(unknownError(response.statusText))
}
return (await response.json()) as TxResult
}
}

View File

@ -234,6 +234,28 @@ function toString (name: string, m: Metrics, offset: number, length: number): st
return r
}
function toJson (m: Metrics): any {
const obj: any = {
$total: m.value,
$ops: m.operations
}
if (m.operations > 1) {
obj.avg = Math.round((m.value / (m.operations > 0 ? m.operations : 1)) * 100) / 100
}
if (Object.keys(m.params).length > 0) {
obj.params = m.params
}
for (const [k, v] of Object.entries(m.measurements ?? {})) {
obj[
`${k} ${v.value} ${v.operations} ${
v.operations > 1 ? Math.round((v.value / (v.operations > 0 ? m.operations : 1)) * 100) / 100 : ''
}`
] = toJson(v)
}
return obj
}
/**
* @public
*/
@ -241,6 +263,10 @@ export function metricsToString (metrics: Metrics, name = 'System', length: numb
return toString(name, metricsAggregate(metrics, 50, true), 0, length)
}
export function metricsToJson (metrics: Metrics): any {
return toJson(metricsAggregate(metrics))
}
function printMetricsParamsRows (
params: Record<string, Record<string, MetricsData>>,
offset: number

View File

@ -3,8 +3,7 @@
import login from '@hcengineering/login'
import { getEmbeddedLabel, getMetadata } from '@hcengineering/platform'
import presentation, { getClient, isAdminUser, uiContext } from '@hcengineering/presentation'
import { Button, IconArrowLeft, IconArrowRight, fetchMetadataLocalStorage, ticker } from '@hcengineering/ui'
import EditBox from '@hcengineering/ui/src/components/EditBox.svelte'
import { Button, EditBox, IconArrowLeft, IconArrowRight, fetchMetadataLocalStorage, ticker } from '@hcengineering/ui'
import MetricsInfo from './statistics/MetricsInfo.svelte'
const _endpoint: string = fetchMetadataLocalStorage(login.metadata.LoginEndpoint) ?? ''
@ -25,8 +24,6 @@
let avgTime = 0
let rps = 0
let active = 0
let opss = 0
@ -44,7 +41,7 @@
profiling = data?.profiling ?? false
})
.catch((err) => {
console.error(err)
console.error(err, time)
})
}
let data: any
@ -65,13 +62,8 @@
avgTime = 0
maxTime = 0
let count = commandsToSend
let ops = 0
avgTime = 0
opss = 0
const int = setInterval(() => {
rps = ops
ops = 0
}, 1000)
const rate = new RateLimiter(commandsToSendParallel)
const client = getClient()
@ -96,7 +88,6 @@
} else {
avgTime = ed - st
}
ops++
opss++
count--
}
@ -112,7 +103,6 @@
}
await rate.waitProcessing()
running = false
clearInterval(int)
}
async function downloadProfile (): Promise<void> {
@ -132,7 +122,7 @@
document.body.appendChild(link)
link.click()
document.body.removeChild(link)
void fetchStats(0)
await fetchStats(0)
}
let metrics: Metrics | undefined

View File

@ -571,6 +571,16 @@ export interface Session {
) => Promise<FindResult<T>>
searchFulltext: (ctx: ClientSessionCtx, query: SearchQuery, options: SearchOptions) => Promise<void>
tx: (ctx: ClientSessionCtx, tx: Tx) => Promise<void>
txRaw: (
ctx: ClientSessionCtx,
tx: Tx
) => Promise<{
result: TxResult
broadcastPromise: Promise<void>
asyncsPromise: Promise<void> | undefined
}>
loadChunk: (ctx: ClientSessionCtx, domain: Domain, idx?: number) => Promise<void>
getDomainHash: (ctx: ClientSessionCtx, domain: Domain) => Promise<void>
@ -707,11 +717,17 @@ export interface SessionManager {
createOpContext: (
ctx: MeasureContext,
pipeline: Pipeline,
request: Request<any>,
requestId: Request<any>['id'],
service: Session,
ws: ConnectionSocket,
workspace: WorkspaceUuid
) => ClientSessionCtx
handleRPC: <S extends Session>(
requestCtx: MeasureContext,
service: S,
ws: ConnectionSocket,
operation: (ctx: ClientSessionCtx) => Promise<void>
) => Promise<void>
}
/**

View File

@ -24,6 +24,8 @@ import {
type FindOptions,
type FindResult,
type MeasureContext,
type PersonId,
type PersonUuid,
type Ref,
type SearchOptions,
type SearchQuery,
@ -31,9 +33,8 @@ import {
type Timestamp,
type Tx,
type TxCUD,
type PersonId,
type WorkspaceDataId,
type PersonUuid
type TxResult,
type WorkspaceDataId
} from '@hcengineering/core'
import { PlatformError, unknownError } from '@hcengineering/platform'
import {
@ -164,7 +165,14 @@ export class ClientSession implements Session {
await ctx.sendResponse(ctx.requestId, await ctx.pipeline.searchFulltext(ctx.ctx, query, options))
}
async tx (ctx: ClientSessionCtx, tx: Tx): Promise<void> {
async txRaw (
ctx: ClientSessionCtx,
tx: Tx
): Promise<{
result: TxResult
broadcastPromise: Promise<void>
asyncsPromise: Promise<void> | undefined
}> {
this.lastRequest = Date.now()
this.total.tx++
this.current.tx++
@ -173,24 +181,26 @@ export class ClientSession implements Session {
let cid = 'client_' + generateId()
ctx.ctx.id = cid
let onEnd = useReserveContext ? ctx.pipeline.context.adapterManager?.reserveContext?.(cid) : undefined
let result: TxResult
try {
const result = await ctx.pipeline.tx(ctx.ctx, [tx])
result = await ctx.pipeline.tx(ctx.ctx, [tx])
} finally {
onEnd?.()
}
// Send result immideately
await ctx.sendResponse(ctx.requestId, result)
// We need to broadcast all collected transactions
await ctx.pipeline.handleBroadcast(ctx.ctx)
} finally {
onEnd?.()
}
const broadcastPromise = ctx.pipeline.handleBroadcast(ctx.ctx)
// ok we could perform async requests if any
const asyncs = (ctx.ctx.contextData as SessionData).asyncRequests ?? []
let asyncsPromise: Promise<void> | undefined
if (asyncs.length > 0) {
cid = 'client_async_' + generateId()
ctx.ctx.id = cid
onEnd = useReserveContext ? ctx.pipeline.context.adapterManager?.reserveContext?.(cid) : undefined
const handleAyncs = async (): Promise<void> => {
try {
for (const r of (ctx.ctx.contextData as SessionData).asyncRequests ?? []) {
await r()
@ -199,6 +209,18 @@ export class ClientSession implements Session {
onEnd?.()
}
}
asyncsPromise = handleAyncs()
}
return { result, broadcastPromise, asyncsPromise }
}
async tx (ctx: ClientSessionCtx, tx: Tx): Promise<void> {
const { broadcastPromise, asyncsPromise } = await this.txRaw(ctx, tx)
await broadcastPromise
if (asyncsPromise !== undefined) {
await asyncsPromise
}
}
broadcast (ctx: MeasureContext, socket: ConnectionSocket, tx: Tx[]): void {

View File

@ -91,7 +91,7 @@ export interface Timeouts {
reconnectTimeout: number // Default 3 seconds
}
class TSessionManager implements SessionManager {
export class TSessionManager implements SessionManager {
private readonly statusPromises = new Map<string, Promise<void>>()
readonly workspaces = new Map<WorkspaceUuid, Workspace>()
checkInterval: any
@ -981,7 +981,7 @@ class TSessionManager implements SessionManager {
createOpContext (
ctx: MeasureContext,
pipeline: Pipeline,
request: Request<any>,
requestId: Request<any>['id'],
service: Session,
ws: ConnectionSocket,
workspace: WorkspaceUuid
@ -990,7 +990,7 @@ class TSessionManager implements SessionManager {
return {
ctx,
pipeline,
requestId: request.id,
requestId,
sendResponse: (reqId, msg) =>
sendResponse(ctx, service, ws, {
id: reqId,
@ -1072,6 +1072,7 @@ class TSessionManager implements SessionManager {
return
}
if (request.id === -2 && request.method === 'forceClose') {
// TODO: we chould allow this only for admin or system accounts
let done = false
const wsRef = this.workspaces.get(workspace)
if (wsRef?.upgrade ?? false) {
@ -1106,7 +1107,7 @@ class TSessionManager implements SessionManager {
const params = [...request.params]
await ctx.with('🧨 process', {}, (callTx) =>
f.apply(service, [this.createOpContext(callTx, pipeline, request, service, ws, workspace), ...params])
f.apply(service, [this.createOpContext(callTx, pipeline, request.id, service, ws), ...params])
)
} catch (err: any) {
Analytics.handleError(err)
@ -1131,6 +1132,59 @@ class TSessionManager implements SessionManager {
})
}
handleRPC<S extends Session>(
requestCtx: MeasureContext,
service: S,
ws: ConnectionSocket,
operation: (ctx: ClientSessionCtx) => Promise<void>
): Promise<void> {
const userCtx = requestCtx.newChild('📞 client', {})
// Calculate total number of clients
const reqId = generateId()
const st = Date.now()
return userCtx
.with('🧭 handleRPC', {}, async (ctx) => {
if (service.workspace.closing !== undefined) {
throw new Error('Workspace is closing')
}
service.requests.set(reqId, {
id: reqId,
params: {},
start: st
})
const pipeline =
service.workspace.pipeline instanceof Promise ? await service.workspace.pipeline : service.workspace.pipeline
try {
const uctx = this.createOpContext(ctx, pipeline, reqId, service, ws)
await operation(uctx)
} catch (err: any) {
Analytics.handleError(err)
if (LOGGING_ENABLED) {
this.ctx.error('error handle request', { error: err })
}
ws.send(
ctx,
{
id: reqId,
error: unknownError(err),
result: JSON.parse(JSON.stringify(err?.stack))
},
service.binaryMode,
service.useCompression
)
}
})
.finally(() => {
userCtx.end()
service.requests.delete(reqId)
})
}
private async handleHello<S extends Session>(
request: Request<any>,
service: S,

View File

@ -53,6 +53,7 @@
"utf-8-validate": "^6.0.4",
"ws": "^8.18.0",
"body-parser": "^1.20.2",
"snappy": "^7.2.2"
"snappy": "^7.2.2",
"@hcengineering/api-client": "^0.6.0"
}
}

View File

@ -0,0 +1,225 @@
//
// Copyright © 2025 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { generateToken } from '@hcengineering/server-token'
import { createRestClient, type RestClient } from '@hcengineering/api-client'
import core, {
generateId,
getWorkspaceId,
Hierarchy,
MeasureMetricsContext,
ModelDb,
toFindResult,
type Class,
type Doc,
type DocumentQuery,
type Domain,
type FindOptions,
type FindResult,
type MeasureContext,
type Ref,
type Space,
type Tx,
type TxCreateDoc,
type TxResult
} from '@hcengineering/core'
import { ClientSession, startSessionManager, type TSessionManager } from '@hcengineering/server'
import { createDummyStorageAdapter, type SessionManager, type WorkspaceLoginInfo } from '@hcengineering/server-core'
import { startHttpServer } from '../server_http'
import { genMinModel } from './minmodel'
describe('rest-server', () => {
async function getModelDb (): Promise<{ modelDb: ModelDb, hierarchy: Hierarchy, txes: Tx[] }> {
const txes = genMinModel()
const hierarchy = new Hierarchy()
for (const tx of txes) {
hierarchy.tx(tx)
}
const modelDb = new ModelDb(hierarchy)
for (const tx of txes) {
await modelDb.tx(tx)
}
return { modelDb, hierarchy, txes }
}
let shutdown: () => Promise<void>
let sessionManager: SessionManager
const port: number = 3330
beforeAll(async () => {
;({ shutdown, sessionManager } = startSessionManager(new MeasureMetricsContext('test', {}), {
pipelineFactory: async () => {
const { modelDb, hierarchy, txes } = await getModelDb()
return {
hierarchy,
modelDb,
context: {
workspace: {
name: 'test-ws',
workspaceName: 'test-ws',
workspaceUrl: 'test-ws'
},
hierarchy,
modelDb,
lastTx: generateId(),
lastHash: generateId(),
contextVars: {},
branding: null
},
handleBroadcast: async (ctx) => {},
findAll: async <T extends Doc>(
ctx: MeasureContext,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> => toFindResult(await modelDb.findAll(_class, query, options)),
tx: async (ctx: MeasureContext, tx: Tx[]): Promise<[TxResult, Tx[], string[] | undefined]> => [
await modelDb.tx(...tx),
[],
undefined
],
close: async () => {},
domains: async () => hierarchy.domains(),
groupBy: async () => new Map(),
find: (ctx: MeasureContext, domain: Domain) => ({
next: async (ctx: MeasureContext) => undefined,
close: async (ctx: MeasureContext) => {}
}),
load: async (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]) => [],
upload: async (ctx: MeasureContext, domain: Domain, docs: Doc[]) => {},
clean: async (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]) => {},
searchFulltext: async (ctx, query, options) => {
return { docs: [] }
},
loadModel: async (ctx, lastModelTx, hash) => ({
full: true,
hash: generateId(),
transactions: txes
})
}
},
sessionFactory: (token, workspace) => new ClientSession(token, workspace, true),
port,
brandingMap: {},
serverFactory: startHttpServer,
accountsUrl: '',
externalStorage: createDummyStorageAdapter()
}))
jest
.spyOn(sessionManager as TSessionManager, 'getWorkspaceInfo')
.mockImplementation(async (ctx: MeasureContext, token: string): Promise<WorkspaceLoginInfo> => {
return {
workspaceId: 'test-ws',
workspaceUrl: 'test-ws',
workspaceName: 'Test Workspace',
uuid: 'test-ws',
createdBy: 'test-owner',
mode: 'active',
createdOn: Date.now(),
lastVisit: Date.now(),
disabled: false,
endpoint: `http://localhost:${port}`,
region: 'test-region',
targetRegion: 'test-region',
backupInfo: {
dataSize: 0,
blobsSize: 0,
backupSize: 0,
lastBackup: 0,
backups: 0
}
}
})
})
afterAll(async () => {
await shutdown()
})
async function connect (): Promise<RestClient> {
const token: string = generateToken('user1@site.com', getWorkspaceId('test-ws'))
return await createRestClient(`http://localhost:${port}`, 'test-ws', token)
}
it('get account', async () => {
const conn = await connect()
const account = await conn.getAccount()
expect(account.email).toBe('user1@site.com')
expect(account.role).toBe('OWNER')
expect(account._id).toBe('User1')
expect(account._class).toBe('core:class:Account')
expect(account.space).toBe('core:space:Model')
expect(account.modifiedBy).toBe('core:account:System')
expect(account.createdBy).toBe('core:account:System')
expect(typeof account.modifiedOn).toBe('number')
expect(typeof account.createdOn).toBe('number')
})
it('find spaces', async () => {
const conn = await connect()
const spaces = await conn.findAll(core.class.Space, {})
expect(spaces.length).toBe(2)
expect(spaces[0].name).toBe('Sp1')
expect(spaces[1].name).toBe('Sp2')
})
it('find avg', async () => {
const conn = await connect()
let ops = 0
let total = 0
const attempts = 1000
for (let i = 0; i < attempts; i++) {
const st = performance.now()
const spaces = await conn.findAll(core.class.Space, {})
expect(spaces.length).toBe(2)
expect(spaces[0].name).toBe('Sp1')
expect(spaces[1].name).toBe('Sp2')
const ed = performance.now()
ops++
total += ed - st
}
const avg = total / ops
// console.log('ops:', ops, 'total:', total, 'avg:', )
expect(ops).toEqual(attempts)
expect(avg).toBeLessThan(5) // 5ms max per operation
})
it('add space', async () => {
const conn = await connect()
const account = await conn.getAccount()
const tx: TxCreateDoc<Space> = {
_class: core.class.TxCreateDoc,
space: core.space.Tx,
_id: generateId(),
objectSpace: core.space.Model,
modifiedBy: account._id,
modifiedOn: Date.now(),
attributes: {
name: 'Sp3',
description: '',
private: false,
archived: false,
members: [],
autoJoin: false
},
objectClass: core.class.Space,
objectId: generateId()
}
await conn.tx(tx)
const spaces = await conn.findAll(core.class.Space, {})
expect(spaces.length).toBe(3)
})
})

182
server/ws/src/rpc.ts Normal file
View File

@ -0,0 +1,182 @@
import type { Class, Doc, MeasureContext, Ref } from '@hcengineering/core'
import type {
ClientSessionCtx,
ConnectionSocket,
PipelineFactory,
Session,
SessionManager
} from '@hcengineering/server-core'
import { decodeToken } from '@hcengineering/server-token'
import { type Express, type Response as ExpressResponse, type Request } from 'express'
import type { OutgoingHttpHeaders } from 'http2'
import { compress } from 'snappy'
import { promisify } from 'util'
import { gzip } from 'zlib'
import { retrieveJson } from './utils'
interface RPCClientInfo {
client: ConnectionSocket
session: Session
workspaceId: string
}
const gzipAsync = promisify(gzip)
const sendError = (res: ExpressResponse, code: number, data: any): void => {
res.writeHead(code, {
'Content-Type': 'application/json',
'Cache-Control': 'no-cache',
'keep-alive': 'timeout=5, max=1000'
})
res.end(JSON.stringify(data))
}
async function sendJson (req: Request, res: ExpressResponse, result: any): Promise<void> {
const headers: OutgoingHttpHeaders = {
'Content-Type': 'application/json',
'Cache-Control': 'no-cache',
'keep-alive': 'timeout=5, max=1000'
}
let body: any = JSON.stringify(result)
const contentEncodings: string[] =
typeof req.headers['accept-encoding'] === 'string'
? req.headers['accept-encoding'].split(',').map((it) => it.trim())
: req.headers['accept-encoding'] ?? []
for (const contentEncoding of contentEncodings) {
let done = false
switch (contentEncoding) {
case 'snappy':
headers['content-encoding'] = 'snappy'
body = await compress(body)
done = true
break
case 'gzip':
headers['content-encoding'] = 'gzip'
body = await gzipAsync(body)
done = true
break
}
if (done) {
break
}
}
res.writeHead(200, headers)
res.end(body)
}
export function registerRPC (
app: Express,
sessions: SessionManager,
ctx: MeasureContext,
pipelineFactory: PipelineFactory
): void {
const rpcSessions = new Map<string, RPCClientInfo>()
async function withSession (
req: Request,
res: ExpressResponse,
operation: (ctx: ClientSessionCtx, session: Session) => Promise<void>
): Promise<void> {
if (req.params.workspaceId === undefined || req.params.workspaceId === '') {
res.writeHead(400, {})
res.end('Missing workspace')
return
}
let token = req.headers.authorization as string
if (token === null) {
sendError(res, 401, { message: 'Missing Authorization header' })
return
}
const workspaceId = decodeURIComponent(req.params.workspaceId)
token = token.split(' ')[1]
const decodedToken = decodeToken(token)
if (workspaceId !== decodedToken.workspace.name) {
sendError(res, 401, { message: 'Invalid workspace' })
return
}
let transactorRpc = rpcSessions.get(token)
if (transactorRpc === undefined) {
const cs: ConnectionSocket = createClosingSocket(token, rpcSessions)
const s = await sessions.addSession(ctx, cs, decodedToken, token, pipelineFactory, token)
if (!('session' in s)) {
sendError(res, 401, {
message: 'Failed to create session',
mode: 'specialError' in s ? s.specialError ?? '' : 'upgrading'
})
return
}
transactorRpc = { session: s.session, client: cs, workspaceId: s.workspaceId }
rpcSessions.set(token, transactorRpc)
}
try {
const rpc = transactorRpc
await sessions.handleRPC(ctx, rpc.session, rpc.client, async (ctx) => {
await operation(ctx, rpc.session)
})
} catch (err: any) {
sendError(res, 401, { message: 'Failed to execute operation', error: err.message, stack: err.stack })
}
}
app.get('/api/v1/ping/:workspaceId', (req, res) => {
void withSession(req, res, async (ctx, session) => {
await session.ping(ctx)
await sendJson(req, res, { pong: true })
})
})
app.get('/api/v1/find-all/:workspaceId', (req, res) => {
void withSession(req, res, async (ctx, session) => {
const _class = req.query.class as Ref<Class<Doc>>
const query = req.query.query !== undefined ? JSON.parse(req.query.query as string) : {}
const options = req.query.options !== undefined ? JSON.parse(req.query.options as string) : {}
const result = await session.findAllRaw(ctx.ctx, ctx.pipeline, _class, query, options)
await sendJson(req, res, result)
})
})
app.post('/api/v1/find-all/:workspaceId', (req, res) => {
void withSession(req, res, async (ctx, session) => {
const { _class, query, options }: any = (await retrieveJson(req)) ?? {}
const result = await session.findAllRaw(ctx.ctx, ctx.pipeline, _class, query, options)
await sendJson(req, res, result)
})
})
app.post('/api/v1/tx/:workspaceId', (req, res) => {
void withSession(req, res, async (ctx, session) => {
const tx: any = (await retrieveJson(req)) ?? {}
const result = await session.txRaw(ctx, tx)
await sendJson(req, res, result.result)
})
})
app.get('/api/v1/account/:workspaceId', (req, res) => {
void withSession(req, res, async (ctx, session) => {
const result = session.getRawAccount(ctx.pipeline)
await sendJson(req, res, result)
})
})
}
function createClosingSocket (rawToken: string, rpcSessions: Map<string, RPCClientInfo>): ConnectionSocket {
return {
id: rawToken,
isClosed: false,
close: () => {
rpcSessions.delete(rawToken)
},
send: (ctx, msg, binary, compression) => {},
sendPong: () => {},
data: () => ({}),
readRequest: (buffer, binary) => ({ method: '', params: [], id: -1, time: Date.now() }),
checkState: () => true
}
}

View File

@ -59,6 +59,8 @@ import { WebSocketServer, type RawData, type WebSocket } from 'ws'
import 'bufferutil'
import { compress } from 'snappy'
import 'utf-8-validate'
import { registerRPC } from './rpc'
import { retrieveJson } from './utils'
let profiling = false
const rpcHandler = new RPCHandler()
@ -151,6 +153,7 @@ export function startHttpServer (
res.end()
}
})
app.get('/api/v1/profiling', (req, res) => {
try {
const token = req.query.token as string
@ -357,6 +360,8 @@ export function startHttpServer (
})
)
registerRPC(app, sessions, ctx, pipelineFactory)
app.put('/api/v1/broadcast', (req, res) => {
try {
const token = req.query.token as string
@ -364,26 +369,19 @@ export function startHttpServer (
const ws = sessions.workspaces.get(req.query.workspace as WorkspaceUuid)
if (ws !== undefined) {
// push the data to body
const body: Buffer[] = []
req
.on('data', (chunk) => {
body.push(chunk)
})
.on('end', () => {
// on end of data, perform necessary action
try {
const data = JSON.parse(Buffer.concat(body as any).toString())
void retrieveJson(req)
.then((data) => {
if (Array.isArray(data)) {
sessions.broadcastAll(ws, data as Tx[])
} else {
sessions.broadcastAll(ws, [data as unknown as Tx])
}
res.end()
} catch (err: any) {
})
.catch((err) => {
ctx.error('JSON parse error', { err })
res.writeHead(400, {})
res.end()
}
})
} else {
res.writeHead(404, {})

23
server/ws/src/utils.ts Normal file
View File

@ -0,0 +1,23 @@
import type { Request } from 'express'
export function retrieveJson (req: Request): Promise<any> {
const body: Uint8Array[] = []
return new Promise((resolve, reject) => {
req
.on('data', (chunk: Uint8Array) => {
body.push(chunk)
})
.on('error', (err) => {
reject(err)
})
.on('end', () => {
// on end of data, perform necessary action
try {
const data = JSON.parse(Buffer.concat(body).toString())
resolve(data)
} catch (err: any) {
reject(err)
}
})
})
}