From aedad834f1c51ae096ce7725aed633036aef8212 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Thu, 24 Apr 2025 17:40:25 +0700 Subject: [PATCH] Basic rate limits (#8539) Signed-off-by: Andrey Sobolev --- dev/docker-compose.yaml | 6 +- packages/api-client/src/rest/rest.ts | 62 ++++++++-- packages/api-client/src/rest/utils.ts | 10 +- plugins/client-resources/src/connection.ts | 52 ++++++++- pods/server/src/rpc.ts | 102 +++++++++++----- qms-tests/docker-compose.yaml | 26 +++-- server/core/src/types.ts | 9 +- server/rpc/src/index.ts | 1 + server/rpc/src/rpc.ts | 11 ++ server/rpc/src/sliding.ts | 77 ++++++++++++ server/rpc/src/test/rateLimit.spec.ts | 129 +++++++++++++++++++++ server/server/src/sessionManager.ts | 73 ++++++++++-- tests/docker-compose.yaml | 2 + ws-tests/docker-compose.yaml | 4 + 14 files changed, 488 insertions(+), 76 deletions(-) create mode 100644 server/rpc/src/sliding.ts create mode 100644 server/rpc/src/test/rateLimit.spec.ts diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index 8f0ea9c252..6b699547e5 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -349,6 +349,8 @@ services: - AI_BOT_URL=http://huly.local:4010 - MSG2FILE_URL=http://huly.local:9087 - COMMUNICATION_TIME_LOGGING_ENABLED=true + - RATE_LIMIT_MAX=250 # 250 requests per 30 seconds + - RATE_LIMIT_WINDOW=30000 restart: unless-stopped rekoni: image: hardcoreeng/rekoni-service @@ -472,8 +474,8 @@ services: - AVATAR_PATH=./avatar.png - AVATAR_CONTENT_TYPE=.png - STATS_URL=http://huly.local:4900 -# - LOVE_ENDPOINT=http://huly.local:8096 -# - OPENAI_API_KEY=token + # - LOVE_ENDPOINT=http://huly.local:8096 + # - OPENAI_API_KEY=token msg2file: image: hardcoreeng/msg2file ports: diff --git a/packages/api-client/src/rest/rest.ts b/packages/api-client/src/rest/rest.ts index cd9431132c..0da9b80f79 100644 --- a/packages/api-client/src/rest/rest.ts +++ b/packages/api-client/src/rest/rest.ts @@ -42,8 +42,19 @@ export function createRestClient (endpoint: string, workspaceId: string, token: return new RestClientImpl(endpoint, workspaceId, token) } +const rateLimitError = 'rate-limit' + +function isRLE (err: any): boolean { + return err.message === rateLimitError +} + export class RestClientImpl implements RestClient { endpoint: string + + slowDownTimer = 0 + + remaining: number = 1000 + limit: number = 1000 constructor ( endpoint: string, readonly workspace: string, @@ -85,10 +96,15 @@ export class RestClientImpl implements RestClient { const result = await withRetry(async () => { const response = await fetch(requestUrl, this.requestInit()) if (!response.ok) { + await this.checkRateLimits(response) throw new PlatformError(unknownError(response.statusText)) } return await extractJson>(response) - }) + }, isRLE) + + if (result.error !== undefined) { + throw new PlatformError(result.error) + } if (result.lookupMap !== undefined) { // We need to extract lookup map to document lookups @@ -124,6 +140,25 @@ export class RestClientImpl implements RestClient { return result } + private async checkRateLimits (response: Response): Promise { + if (response.status === 429) { + // Extract rate limit information from headers + const retryAfter = response.headers.get('Retry-After') + const rateLimitReset = response.headers.get('X-RateLimit-Reset') + // const rateLimitLimit: string | null = response.headers.get('X-RateLimit-Limit') + const waitTime = + retryAfter != null + ? parseInt(retryAfter) + : rateLimitReset != null + ? new Date(parseInt(rateLimitReset)).getTime() - Date.now() + : 1000 // Default to 1 seconds if no headers are provided + + console.warn(`Rate limit exceeded. Waiting ${Math.round((10 * waitTime) / 1000) / 10} seconds before retrying...`) + await new Promise((resolve) => setTimeout(resolve, waitTime)) + throw new Error(rateLimitError) + } + } + async getAccount (): Promise { const requestUrl = concatLink(this.endpoint, `/api/v1/account/${this.workspace}`) const response = await fetch(requestUrl, this.requestInit()) @@ -160,16 +195,23 @@ export class RestClientImpl implements RestClient { async tx (tx: Tx): Promise { const requestUrl = concatLink(this.endpoint, `/api/v1/tx/${this.workspace}`) - const response = await fetch(requestUrl, { - method: 'POST', - headers: this.jsonHeaders(), - keepalive: true, - body: JSON.stringify(tx) - }) - if (!response.ok) { - throw new PlatformError(unknownError(response.statusText)) + const result = await withRetry(async () => { + const response = await fetch(requestUrl, { + method: 'POST', + headers: this.jsonHeaders(), + keepalive: true, + body: JSON.stringify(tx) + }) + if (!response.ok) { + await this.checkRateLimits(response) + throw new PlatformError(unknownError(response.statusText)) + } + return await extractJson(response) + }, isRLE) + if (result.error !== undefined) { + throw new PlatformError(result.error) } - return await extractJson(response) + return result } async searchFulltext (query: SearchQuery, options: SearchOptions): Promise { diff --git a/packages/api-client/src/rest/utils.ts b/packages/api-client/src/rest/utils.ts index d6b87cbc58..8b098ac7e5 100644 --- a/packages/api-client/src/rest/utils.ts +++ b/packages/api-client/src/rest/utils.ts @@ -1,6 +1,6 @@ import { uncompress } from 'snappyjs' -export async function withRetry (fn: () => Promise): Promise { +export async function withRetry (fn: () => Promise, ignoreAttemptCheck?: (err: any) => boolean): Promise { const maxRetries = 3 let lastError: any @@ -8,9 +8,13 @@ export async function withRetry (fn: () => Promise): Promise { try { return await fn() } catch (err: any) { - lastError = err + if (ignoreAttemptCheck !== undefined && ignoreAttemptCheck(err)) { + // Do not decrement attempt + attempt-- + } else { + lastError = err + } if (attempt === maxRetries - 1) { - console.error('Failed to execute query', err) throw lastError } await new Promise((resolve) => setTimeout(resolve, Math.pow(2, attempt) * 100)) diff --git a/plugins/client-resources/src/connection.ts b/plugins/client-resources/src/connection.ts index e0475d6988..c1e0aadf84 100644 --- a/plugins/client-resources/src/connection.ts +++ b/plugins/client-resources/src/connection.ts @@ -61,7 +61,7 @@ import platform, { UNAUTHORIZED } from '@hcengineering/platform' import { uncompress } from 'snappyjs' -import { HelloRequest, HelloResponse, ReqId, type Response, RPCHandler } from '@hcengineering/rpc' +import { HelloRequest, HelloResponse, ReqId, type Response, RPCHandler, type RateLimitInfo } from '@hcengineering/rpc' import { EventResult } from '@hcengineering/communication-sdk-types' import { FindLabelsParams, @@ -87,9 +87,14 @@ class RequestPromise { resolve!: (value?: any) => void reject!: (reason?: any) => void reconnect?: () => void + + // Required to proeprly handle rate limits + sendData: () => void = () => {} + constructor ( readonly method: string, readonly params: any[], + readonly handleResult?: (result: any) => Promise ) { this.promise = new Promise((resolve, reject) => { @@ -285,11 +290,30 @@ class Connection implements ClientConnection { } } + currentRateLimit: RateLimitInfo | undefined + slowDownTimer = 0 + handleMsg (socketId: number, resp: Response): void { if (this.closed) { return } + if (resp.rateLimit !== undefined) { + console.log( + 'Rate limits:', + resp.rateLimit.remaining, + resp.rateLimit.limit, + resp.rateLimit.reset, + resp.rateLimit.retryAfter + ) + this.currentRateLimit = resp.rateLimit + if (this.currentRateLimit.remaining < this.currentRateLimit.limit / 3) { + this.slowDownTimer++ + } else if (this.slowDownTimer > 0) { + this.slowDownTimer-- + } + } + if (resp.error !== undefined) { if (resp.error?.code === UNAUTHORIZED.code || resp.terminate === true) { Analytics.handleError(new PlatformError(resp.error)) @@ -308,6 +332,20 @@ class Connection implements ClientConnection { if (resp.id !== undefined) { const promise = this.requests.get(resp.id) + + // Support rate limits + if (resp.rateLimit !== undefined) { + const { remaining, retryAfter } = resp.rateLimit + if (remaining === 0) { + console.log('Rate limit exceed:', resp.rateLimit) + void new Promise((resolve) => setTimeout(resolve, retryAfter ?? 1)).then(() => { + // Retry after a while, so rate limits allow to call more. + promise?.sendData() + }) + return + } + } + if (promise !== undefined) { promise.reject(new PlatformError(resp.error)) } @@ -382,6 +420,7 @@ class Connection implements ClientConnection { } if (resp.id !== undefined) { const promise = this.requests.get(resp.id) + if (promise === undefined) { console.error( new Error(`unknown response id: ${resp.id as string} ${this.workspace} ${this.user}`), @@ -680,6 +719,11 @@ class Connection implements ClientConnection { throw new PlatformError(new Status(Severity.ERROR, platform.status.ConnectionClosed, {})) } + if (this.slowDownTimer > 0) { + // We need to wait a bit to avoid ban. + await new Promise((resolve) => setTimeout(resolve, this.slowDownTimer)) + } + if (data.once === true) { // Check if has same request already then skip const dparams = JSON.stringify(data.params) @@ -702,7 +746,7 @@ class Connection implements ClientConnection { if (data.method !== pingConst) { this.requests.set(id, promise) } - const sendData = (): void => { + promise.sendData = (): void => { if (this.websocket?.readyState === ClientSocketReadyState.OPEN) { promise.startTime = Date.now() @@ -730,13 +774,13 @@ class Connection implements ClientConnection { setTimeout(async () => { // In case we don't have response yet. if (this.requests.has(id) && ((await data.retry?.()) ?? true)) { - sendData() + promise.sendData() } }, 50) } } ctx.withSync('send-data', {}, () => { - sendData() + promise.sendData() }) void ctx .with('broadcast-event', {}, () => broadcastEvent(client.event.NetworkRequests, this.requests.size)) diff --git a/pods/server/src/rpc.ts b/pods/server/src/rpc.ts index 13fae1e3e2..83793c32b0 100644 --- a/pods/server/src/rpc.ts +++ b/pods/server/src/rpc.ts @@ -12,7 +12,7 @@ import core, { import type { ClientSessionCtx, ConnectionSocket, Session, SessionManager } from '@hcengineering/server-core' import { decodeToken } from '@hcengineering/server-token' -import { rpcJSONReplacer } from '@hcengineering/rpc' +import { rpcJSONReplacer, type RateLimitInfo } from '@hcengineering/rpc' import { createHash } from 'crypto' import { type Express, type Response as ExpressResponse, type Request } from 'express' import type { OutgoingHttpHeaders } from 'http2' @@ -20,6 +20,8 @@ import { compress } from 'snappy' import { promisify } from 'util' import { gzip } from 'zlib' import { retrieveJson } from './utils' + +import { unknownError } from '@hcengineering/platform' interface RPCClientInfo { client: ConnectionSocket session: Session @@ -28,16 +30,34 @@ interface RPCClientInfo { const gzipAsync = promisify(gzip) +const keepAliveOptions = { + 'keep-alive': 'timeout=5, max=1000', + Connection: 'keep-alive' +} + const sendError = (res: ExpressResponse, code: number, data: any): void => { res.writeHead(code, { + ...keepAliveOptions, 'Content-Type': 'application/json', - 'Cache-Control': 'no-cache', - Connection: 'keep-alive', - 'keep-alive': 'timeout=5, max=1000' + 'Cache-Control': 'no-cache' }) res.end(JSON.stringify(data)) } +function rateLimitToHeaders (rateLimit?: RateLimitInfo): OutgoingHttpHeaders { + if (rateLimit === undefined) { + return {} + } + const { remaining, limit, reset, retryAfter } = rateLimit + return { + 'Retry-After': `${Math.max(retryAfter ?? 0, 1)}`, + 'Retry-After-ms': `${retryAfter ?? 0}`, + 'X-RateLimit-Limit': `${limit}`, + 'X-RateLimit-Remaining': `${remaining}`, + 'X-RateLimit-Reset': `${reset}` + } +} + async function sendJson ( req: Request, res: ExpressResponse, @@ -50,10 +70,9 @@ async function sendJson ( const etag = createHash('sha256').update(body).digest('hex') const headers: OutgoingHttpHeaders = { ...(extraHeaders ?? {}), + ...keepAliveOptions, 'Content-Type': 'application/json', 'Cache-Control': 'no-cache', - connection: 'keep-alive', - 'keep-alive': 'timeout=5, max=1000', ETag: etag } @@ -97,7 +116,7 @@ export function registerRPC (app: Express, sessions: SessionManager, ctx: Measur async function withSession ( req: Request, res: ExpressResponse, - operation: (ctx: ClientSessionCtx, session: Session) => Promise + operation: (ctx: ClientSessionCtx, session: Session, rateLimit?: RateLimitInfo) => Promise ): Promise { try { if (req.params.workspaceId === undefined || req.params.workspaceId === '') { @@ -136,62 +155,87 @@ export function registerRPC (app: Express, sessions: SessionManager, ctx: Measur } const rpc = transactorRpc - await sessions.handleRPC(ctx, rpc.session, rpc.client, async (ctx) => { - await operation(ctx, rpc.session) + const rateLimit = await sessions.handleRPC(ctx, rpc.session, rpc.client, async (ctx, rateLimit) => { + await operation(ctx, rpc.session, rateLimit) }) + if (rateLimit !== undefined) { + const { remaining, limit, reset, retryAfter } = rateLimit + const retryHeaders: OutgoingHttpHeaders = { + ...keepAliveOptions, + 'Content-Type': 'application/json', + 'Cache-Control': 'no-cache', + 'Retry-After': `${Math.max((retryAfter ?? 0) / 1000, 1)}`, + 'Retry-After-ms': `${retryAfter ?? 0}`, + 'X-RateLimit-Limit': `${limit}`, + 'X-RateLimit-Remaining': `${remaining}`, + 'X-RateLimit-Reset': `${reset}` + } + res.writeHead(429, retryHeaders) + res.end( + JSON.stringify({ + id: -1, + error: unknownError('Rate limit') + }) + ) + } } catch (err: any) { sendError(res, 500, { message: 'Failed to execute operation', error: err.message, stack: err.stack }) } } app.get('/api/v1/ping/:workspaceId', (req, res) => { - void withSession(req, res, async (ctx, session) => { + void withSession(req, res, async (ctx, session, rateLimit) => { await session.ping(ctx) - await sendJson(req, res, { - pong: true, - lastTx: ctx.pipeline.context.lastTx, - lastHash: ctx.pipeline.context.lastHash - }) + await sendJson( + req, + res, + { + pong: true, + lastTx: ctx.pipeline.context.lastTx, + lastHash: ctx.pipeline.context.lastHash + }, + rateLimitToHeaders(rateLimit) + ) }) }) app.get('/api/v1/find-all/:workspaceId', (req, res) => { - void withSession(req, res, async (ctx, session) => { + void withSession(req, res, async (ctx, session, rateLimit) => { const _class = req.query.class as Ref> const query = req.query.query !== undefined ? JSON.parse(req.query.query as string) : {} const options = req.query.options !== undefined ? JSON.parse(req.query.options as string) : {} const result = await session.findAllRaw(ctx, _class, query, options) - await sendJson(req, res, result) + await sendJson(req, res, result, rateLimitToHeaders(rateLimit)) }) }) app.post('/api/v1/find-all/:workspaceId', (req, res) => { - void withSession(req, res, async (ctx, session) => { + void withSession(req, res, async (ctx, session, rateLimit) => { const { _class, query, options }: any = (await retrieveJson(req)) ?? {} const result = await session.findAllRaw(ctx, _class, query, options) - await sendJson(req, res, result) + await sendJson(req, res, result, rateLimitToHeaders(rateLimit)) }) }) app.post('/api/v1/tx/:workspaceId', (req, res) => { - void withSession(req, res, async (ctx, session) => { + void withSession(req, res, async (ctx, session, rateLimit) => { const tx: any = (await retrieveJson(req)) ?? {} const result = await session.txRaw(ctx, tx) - await sendJson(req, res, result.result) + await sendJson(req, res, result.result, rateLimitToHeaders(rateLimit)) }) }) app.get('/api/v1/account/:workspaceId', (req, res) => { - void withSession(req, res, async (ctx, session) => { + void withSession(req, res, async (ctx, session, rateLimit) => { const result = session.getRawAccount() - await sendJson(req, res, result) + await sendJson(req, res, result, rateLimitToHeaders(rateLimit)) }) }) app.get('/api/v1/load-model/:workspaceId', (req, res) => { - void withSession(req, res, async (ctx, session) => { + void withSession(req, res, async (ctx, session, rateLimit) => { const lastModelTx = parseInt((req.query.lastModelTx as string) ?? '0') const lastHash = req.query.lastHash as string const result = await session.loadModelRaw(ctx, lastModelTx, lastHash) @@ -214,12 +258,12 @@ export function registerRPC (app: Express, sessions: SessionManager, ctx: Measur allowedClasess.some((cl) => h.isDerived((it as TxCUD).objectClass, cl)) ) - await sendJson(req, res, filtered) + await sendJson(req, res, filtered, rateLimitToHeaders(rateLimit)) }) }) app.get('/api/v1/search-fulltext/:workspaceId', (req, res) => { - void withSession(req, res, async (ctx, session) => { + void withSession(req, res, async (ctx, session, rateLimit) => { const query: SearchQuery = { query: req.query.query as string, classes: req.query.classes !== undefined ? JSON.parse(req.query.classes as string) : undefined, @@ -229,7 +273,7 @@ export function registerRPC (app: Express, sessions: SessionManager, ctx: Measur limit: req.query.limit !== undefined ? parseInt(req.query.limit as string) : undefined } const result = await session.searchFulltextRaw(ctx, query, options) - await sendJson(req, res, result) + await sendJson(req, res, result, rateLimitToHeaders(rateLimit)) }) }) @@ -260,9 +304,9 @@ export function registerRPC (app: Express, sessions: SessionManager, ctx: Measur // To use in non-js (rust) clients that can't link to @hcengineering/core app.get('/api/v1/generate-id/:workspaceId', (req, res) => { - void withSession(req, res, async () => { + void withSession(req, res, async (ctx, session, rateLimit) => { const result = { id: generateId() } - await sendJson(req, res, result) + await sendJson(req, res, result, rateLimitToHeaders(rateLimit)) }) }) } diff --git a/qms-tests/docker-compose.yaml b/qms-tests/docker-compose.yaml index 69ab03b914..2dff7d730f 100644 --- a/qms-tests/docker-compose.yaml +++ b/qms-tests/docker-compose.yaml @@ -1,9 +1,9 @@ -version: "3" +version: '3' services: mongodb: image: 'mongo:7-jammy' extra_hosts: - - "huly.local:host-gateway" + - 'huly.local:host-gateway' command: mongod --port 27018 environment: - PUID=1000 @@ -90,7 +90,7 @@ services: account: image: hardcoreeng/account extra_hosts: - - "huly.local:host-gateway" + - 'huly.local:host-gateway' pull_policy: never links: - mongodb @@ -112,7 +112,7 @@ services: workspace: image: hardcoreeng/workspace extra_hosts: - - "huly.local:host-gateway" + - 'huly.local:host-gateway' links: - mongodb - minio @@ -132,7 +132,7 @@ services: front: image: hardcoreeng/front extra_hosts: - - "huly.local:host-gateway" + - 'huly.local:host-gateway' pull_policy: never links: - account @@ -163,7 +163,7 @@ services: transactor: image: hardcoreeng/transactor extra_hosts: - - "huly.local:host-gateway" + - 'huly.local:host-gateway' pull_policy: never links: - mongodb @@ -189,10 +189,12 @@ services: - LAST_NAME_FIRST=true - FULLTEXT_URL=http://fulltext:4710 - BRANDING_PATH=/var/cfg/branding-test.json + - RATE_LIMIT_MAX=25000 + - RATE_LIMIT_WINDOW=1000 collaborator: image: hardcoreeng/collaborator extra_hosts: - - "huly.local:host-gateway" + - 'huly.local:host-gateway' links: - mongodb - minio @@ -206,7 +208,7 @@ services: - UPLOAD_URL=/files - MONGO_URL=mongodb://mongodb:27018 - STORAGE_CONFIG=${STORAGE_CONFIG} - restart: unless-stopped + restart: unless-stopped rekoni: image: hardcoreeng/rekoni-service restart: on-failure @@ -215,7 +217,7 @@ services: print: image: hardcoreeng/print extra_hosts: - - "huly.local:host-gateway" + - 'huly.local:host-gateway' restart: unless-stopped ports: - 4003:4005 @@ -232,7 +234,7 @@ services: sign: image: hardcoreeng/sign extra_hosts: - - "huly.local:host-gateway" + - 'huly.local:host-gateway' restart: unless-stopped ports: - 4008:4006 @@ -248,7 +250,7 @@ services: - ACCOUNTS_URL=http://account:3003 - MINIO_SECRET_KEY=minioadmin - CERTIFICATE_PATH=/var/cfg/certificate.p12 - - SERVICE_ID=sign-service + - SERVICE_ID=sign-service - BRANDING_PATH=/var/cfg/branding.json deploy: resources: @@ -272,4 +274,4 @@ services: - ELASTIC_INDEX_NAME=local_storage_index - STORAGE_CONFIG=${STORAGE_CONFIG} - REKONI_URL=http://rekoni:4007 - - ACCOUNTS_URL=http://account:3003 + - ACCOUNTS_URL=http://account:3003 diff --git a/server/core/src/types.ts b/server/core/src/types.ts index d40405084a..cb08d6f845 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -57,7 +57,7 @@ import { } from '@hcengineering/core' import type { Asset, Resource } from '@hcengineering/platform' import type { LiveQuery } from '@hcengineering/query' -import type { ReqId, Request, Response } from '@hcengineering/rpc' +import type { RateLimitInfo, ReqId, Request, Response } from '@hcengineering/rpc' import type { Token } from '@hcengineering/server-token' import { type Readable } from 'stream' @@ -720,8 +720,8 @@ export interface SessionManager { requestCtx: MeasureContext, service: S, ws: ConnectionSocket, - operation: (ctx: ClientSessionCtx) => Promise - ) => Promise + operation: (ctx: ClientSessionCtx, rateLimit?: RateLimitInfo) => Promise + ) => Promise createOpContext: ( ctx: MeasureContext, @@ -730,7 +730,8 @@ export interface SessionManager { communicationApi: CommunicationApi, requestId: Request['id'], service: Session, - ws: ConnectionSocket + ws: ConnectionSocket, + rateLimit?: RateLimitInfo ) => ClientSessionCtx getStatistics: () => WorkspaceStatistics[] diff --git a/server/rpc/src/index.ts b/server/rpc/src/index.ts index f51ab1aa30..17c0812118 100644 --- a/server/rpc/src/index.ts +++ b/server/rpc/src/index.ts @@ -15,3 +15,4 @@ // export * from './rpc' +export * from './sliding' diff --git a/server/rpc/src/rpc.ts b/server/rpc/src/rpc.ts index 8745cfee74..010bfcd0ff 100644 --- a/server/rpc/src/rpc.ts +++ b/server/rpc/src/rpc.ts @@ -75,6 +75,15 @@ export function rpcJSONReceiver (key: string, value: any): any { return value } +export interface RateLimitInfo { + remaining: number + limit: number + + current: number // in milliseconds + reset: number // in milliseconds + retryAfter?: number // in milliseconds +} + /** * Response object define a server response on transaction request. * Also used to inform other clients about operations being performed by server. @@ -86,6 +95,8 @@ export interface Response { id?: ReqId error?: Status terminate?: boolean + + rateLimit?: RateLimitInfo chunk?: { index: number final: boolean diff --git a/server/rpc/src/sliding.ts b/server/rpc/src/sliding.ts new file mode 100644 index 0000000000..4917765a9b --- /dev/null +++ b/server/rpc/src/sliding.ts @@ -0,0 +1,77 @@ +import type { RateLimitInfo } from './rpc' + +export class SlidingWindowRateLimitter { + private readonly rateLimits = new Map< + string, + { + requests: number[] + rejectedRequests: number // Counter for rejected requests + resetTime: number + } + >() + + constructor ( + readonly rateLimitMax: number, + readonly rateLimitWindow: number, + readonly now: () => number = Date.now + ) { + this.rateLimitMax = rateLimitMax + this.rateLimitWindow = rateLimitWindow + } + + public checkRateLimit (groupId: string): RateLimitInfo { + const now = this.now() + const windowStart = now - this.rateLimitWindow + + let rateLimit = this.rateLimits.get(groupId) + if (rateLimit == null) { + rateLimit = { requests: [], resetTime: now + this.rateLimitWindow, rejectedRequests: 0 } + this.rateLimits.set(groupId, rateLimit) + } + + // Remove requests outside the current window + rateLimit.requests = rateLimit.requests.filter((time) => time > windowStart) + + // Reset rejected requests counter when window changes + if (rateLimit.requests.length === 0) { + rateLimit.rejectedRequests = 0 + } + + // Update reset time + rateLimit.resetTime = now + this.rateLimitWindow + + rateLimit.requests.push(now + (rateLimit.rejectedRequests > this.rateLimitMax * 2 ? this.rateLimitWindow * 5 : 0)) + + if (rateLimit.requests.length > this.rateLimitMax) { + rateLimit.rejectedRequests++ + + if (rateLimit.requests.length > this.rateLimitMax * 2) { + // Keep only last requests + rateLimit.requests.splice(0, rateLimit.requests.length - this.rateLimitMax) + } + // Find when the oldest request will exit the window + const someRequest = Math.round(Math.random() * rateLimit.requests.length) + const nextAvailableTime = rateLimit.requests[someRequest] + this.rateLimitWindow + + return { + remaining: 0, + limit: this.rateLimitMax, + current: rateLimit.requests.length, + reset: rateLimit.resetTime, + retryAfter: Math.max(1, nextAvailableTime - now + 1) + } + } + + return { + remaining: this.rateLimitMax - rateLimit.requests.length, + current: rateLimit.requests.length, + limit: this.rateLimitMax, + reset: rateLimit.resetTime + } + } + + // Add a reset method for testing purposes + public reset (): void { + this.rateLimits.clear() + } +} diff --git a/server/rpc/src/test/rateLimit.spec.ts b/server/rpc/src/test/rateLimit.spec.ts new file mode 100644 index 0000000000..a22e91abf3 --- /dev/null +++ b/server/rpc/src/test/rateLimit.spec.ts @@ -0,0 +1,129 @@ +import { SlidingWindowRateLimitter } from '../sliding' + +describe('SlidingWindowRateLimitter', () => { + let clock = 100000 + + beforeEach(() => { + // Mock Date.now to control time + clock = 100000 + }) + + afterEach(() => { + jest.restoreAllMocks() + }) + + it('should allow requests within the limit', () => { + const limiter = new SlidingWindowRateLimitter(5, 60000, () => clock) + + for (let i = 0; i < 5; i++) { + const result = limiter.checkRateLimit('user1') + expect(result.remaining).toBe(5 - i - 1) + expect(result.limit).toBe(5) + } + + // The next request should hit the limit + const result = limiter.checkRateLimit('user1') + expect(result.remaining).toBe(0) + expect(result.retryAfter).toBeDefined() + }) + + it('should reject requests beyond the limit', () => { + const limiter = new SlidingWindowRateLimitter(3, 60000, () => clock) + + // Use up the limit + limiter.checkRateLimit('user1') + limiter.checkRateLimit('user1') + limiter.checkRateLimit('user1') + + // This should be limited + const result = limiter.checkRateLimit('user1') + expect(result.remaining).toBe(0) + expect(result.retryAfter).toBeDefined() + }) + + it('should allow new requests as the window slides', () => { + const limiter = new SlidingWindowRateLimitter(2, 10000, () => clock) + + // Use up the limit + limiter.checkRateLimit('user1') + limiter.checkRateLimit('user1') + + // This should be limited + expect(limiter.checkRateLimit('user1').remaining).toBe(0) + + // Move time forward by 5 seconds (half the window) + clock += 5 * 1000 // 5 seconds + + // Should still have one request outside the current window + // and one within, so we can make one more request + const result = limiter.checkRateLimit('user1') + expect(result.remaining).toBe(0) // Now at limit again + + // Move time forward by full window + clock += 11 * 1000 // 1011 seconds + + // All previous requests should be outside the window + const newResult = limiter.checkRateLimit('user1') + expect(newResult.remaining).toBe(1) // One request used, one remaining + expect(limiter.checkRateLimit('user1').remaining).toBe(0) // Now at limit again + }) + + it('should handle different identifiers separately', () => { + const limiter = new SlidingWindowRateLimitter(2, 60000, () => clock) + + limiter.checkRateLimit('user1') + limiter.checkRateLimit('user1') + + // User1 should be at limit + expect(limiter.checkRateLimit('user1').remaining).toBe(0) + + // Different user should have separate limit + expect(limiter.checkRateLimit('user2').remaining).toBe(1) + expect(limiter.checkRateLimit('user2').remaining).toBe(0) + + // Both users should be at their limits + expect(limiter.checkRateLimit('user1').remaining).toBe(0) + expect(limiter.checkRateLimit('user2').remaining).toBe(0) + }) + + it('should handle sliding window correctly', () => { + const limiter = new SlidingWindowRateLimitter(10, 60000, () => clock) + + // Use up half the capacity + for (let i = 0; i < 5; i++) { + limiter.checkRateLimit('user1') + } + + // Move halfway through the window + clock += 30 * 1000 + 1 // 30 seconds + + // Make some more requests + for (let i = 0; i < 7; i++) { + const result = limiter.checkRateLimit('user1') + if (i < 5) { + expect(result.remaining).toBeGreaterThanOrEqual(0) + } else { + expect(result.remaining).toBe(0) + expect(result.retryAfter).toBeDefined() + break + } + } + }) + + it('check for ban', () => { + const limiter = new SlidingWindowRateLimitter(10, 10000, () => clock) + + for (let i = 0; i < 50; i++) { + limiter.checkRateLimit('user1') + } + + const r1 = limiter.checkRateLimit('user1') + expect(r1.remaining).toBe(0) + // Pass all window time. + clock += 10000 + + const r2 = limiter.checkRateLimit('user1') + expect(r2.remaining).toBe(0) + expect(r2.retryAfter).toBeDefined() + }) +}) diff --git a/server/server/src/sessionManager.ts b/server/server/src/sessionManager.ts index a151af7b6b..3c047d1891 100644 --- a/server/server/src/sessionManager.ts +++ b/server/server/src/sessionManager.ts @@ -51,7 +51,14 @@ import core, { type WorkspaceUuid } from '@hcengineering/core' import { unknownError, type Status } from '@hcengineering/platform' -import { type HelloRequest, type HelloResponse, type Request, type Response } from '@hcengineering/rpc' +import { + SlidingWindowRateLimitter, + type HelloRequest, + type HelloResponse, + type RateLimitInfo, + type Request, + type Response +} from '@hcengineering/rpc' import { CommunicationApiFactory, LOGGING_ENABLED, @@ -111,6 +118,9 @@ export class TSessionManager implements SessionManager { workspaceProducer: PlatformQueueProducer usersProducer: PlatformQueueProducer + + now: number = Date.now() + constructor ( readonly ctx: MeasureContext, readonly timeouts: Timeouts, @@ -823,7 +833,7 @@ export class TSessionManager implements SessionManager { user: sessionRef.session.getSocialIds().find((it) => it.type !== SocialIdType.HULY)?.value, binary: sessionRef.session.binaryMode, compression: sessionRef.session.useCompression, - totalTime: Date.now() - sessionRef.session.createTime, + totalTime: this.now - sessionRef.session.createTime, workspaceUsers: workspace?.sessions?.size, totalUsers: this.sessions.size }) @@ -1006,7 +1016,8 @@ export class TSessionManager implements SessionManager { communicationApi: CommunicationApi, requestId: Request['id'], service: Session, - ws: ConnectionSocket + ws: ConnectionSocket, + rateLimit: RateLimitInfo | undefined ): ClientSessionCtx { const st = platformNow() return { @@ -1019,8 +1030,9 @@ export class TSessionManager implements SessionManager { id: reqId, result: msg, time: platformNowDiff(st), - bfst: Date.now(), - queue: service.requests.size + bfst: this.now, + queue: service.requests.size, + rateLimit }), sendPong: () => { ws.sendPong() @@ -1032,7 +1044,8 @@ export class TSessionManager implements SessionManager { result: msg, error, time: platformNowDiff(st), - bfst: Date.now(), + rateLimit, + bfst: this.now, queue: service.requests.size }) } @@ -1044,7 +1057,6 @@ export class TSessionManager implements SessionManager { if (ws === undefined) { return new Map() } - const res = new Map() for (const s of [...Array.from(ws.sessions.values()).map((it) => it.session), ...extra]) { const sessionAccount = s.getUser() @@ -1059,6 +1071,12 @@ export class TSessionManager implements SessionManager { return res } + limitter = new SlidingWindowRateLimitter( + parseInt(process.env.RATE_LIMIT_MAX ?? '250'), + parseInt(process.env.RATE_LIMIT_WINDOW ?? '30000'), + () => Date.now() + ) + handleRequest( requestCtx: MeasureContext, service: S, @@ -1067,15 +1085,30 @@ export class TSessionManager implements SessionManager { workspaceId: WorkspaceUuid ): Promise { const userCtx = requestCtx.newChild('๐Ÿ“ž client', {}) + const rateLimit = this.limitter.checkRateLimit(service.getUser()) + // If remaining is 0, rate limit is exceeded + if (rateLimit?.remaining === 0) { + void ws.send( + userCtx, + { + id: request.id, + rateLimit, + error: unknownError('Rate limit') + }, + service.binaryMode, + service.useCompression + ) + return Promise.resolve() + } // Calculate total number of clients const reqId = generateId() - const st = platformNow() + const st = Date.now() return userCtx .with('๐Ÿงญ handleRequest', {}, async (ctx) => { if (request.time != null) { - const delta = platformNow() - request.time + const delta = Date.now() - request.time requestCtx.measure('msg-receive-delta', delta) } const workspace = this.workspaces.get(workspaceId) @@ -1134,7 +1167,7 @@ export class TSessionManager implements SessionManager { await workspace.with(async (pipeline, communicationApi) => { await ctx.with('๐Ÿงจ process', {}, (callTx) => f.apply(service, [ - this.createOpContext(callTx, userCtx, pipeline, communicationApi, request.id, service, ws), + this.createOpContext(callTx, userCtx, pipeline, communicationApi, request.id, service, ws, rateLimit), ...params ]) ) @@ -1167,7 +1200,13 @@ export class TSessionManager implements SessionManager { service: S, ws: ConnectionSocket, operation: (ctx: ClientSessionCtx) => Promise - ): Promise { + ): Promise { + const rateLimitStatus = this.limitter.checkRateLimit(service.getUser()) + // If remaining is 0, rate limit is exceeded + if (rateLimitStatus?.remaining === 0) { + return Promise.resolve(rateLimitStatus) + } + const userCtx = requestCtx.newChild('๐Ÿ“ž client', {}) // Calculate total number of clients @@ -1189,7 +1228,16 @@ export class TSessionManager implements SessionManager { try { await workspace.with(async (pipeline, communicationApi) => { - const uctx = this.createOpContext(ctx, userCtx, pipeline, communicationApi, reqId, service, ws) + const uctx = this.createOpContext( + ctx, + userCtx, + pipeline, + communicationApi, + reqId, + service, + ws, + rateLimitStatus + ) await operation(uctx) }) } catch (err: any) { @@ -1209,6 +1257,7 @@ export class TSessionManager implements SessionManager { ) throw err } + return undefined }) .finally(() => { userCtx.end() diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml index 3b42697a62..6a2214ede1 100644 --- a/tests/docker-compose.yaml +++ b/tests/docker-compose.yaml @@ -212,6 +212,8 @@ services: - FULLTEXT_URL=http://fulltext:4710 - STATS_URL=http://stats:4901 - ENABLE_COMPRESSION=true + - RATE_LIMIT_MAX=25000 + - RATE_LIMIT_WINDOW=1000 collaborator: image: hardcoreeng/collaborator links: diff --git a/ws-tests/docker-compose.yaml b/ws-tests/docker-compose.yaml index ccb8ead964..2d968faad8 100644 --- a/ws-tests/docker-compose.yaml +++ b/ws-tests/docker-compose.yaml @@ -295,6 +295,8 @@ services: - LAST_NAME_FIRST=true - BRANDING_PATH=/var/cfg/branding.json - AI_BOT_URL=http://huly.local:4011 + - RATE_LIMIT_MAX=25000 + - RATE_LIMIT_WINDOW=1000 transactor-europe: image: hardcoreeng/transactor extra_hosts: @@ -326,6 +328,8 @@ services: - LAST_NAME_FIRST=true - BRANDING_PATH=/var/cfg/branding.json - AI_BOT_URL=http://huly.local:4011 + - RATE_LIMIT_MAX=25000 + - RATE_LIMIT_WINDOW=1000 restart: unless-stopped rekoni: image: hardcoreeng/rekoni-service