UBERF-11398: Fixing rate limits

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2025-05-30 22:41:43 +07:00
parent 96c6e4f6f4
commit b6d7ff3c43
No known key found for this signature in database
GPG Key ID: BD80F68D68D8F7F2
9 changed files with 191 additions and 93 deletions

View File

@ -5537,7 +5537,7 @@ packages:
version: 0.0.0
'@rush-temp/tool@file:projects/tool.tgz':
resolution: {integrity: sha512-97pzx1qZJlMkJmWU1JEKOII/g4WM7evf3NZuj+9uHAvqjtpfHJDZ7QYzA4P7tMz+DVmCNjqUxWQyCSvHz5MqJA==, tarball: file:projects/tool.tgz}
resolution: {integrity: sha512-M5PhT+eqpWqh6vTLuYmcQhjIYmRyoYNGc7pFOmozt4RgKb5s0V5AwfqIkPr23DiBZm7Q9aYrscp1NxXx/RObOg==, tarball: file:projects/tool.tgz}
version: 0.0.0
'@rush-temp/tracker-assets@file:projects/tracker-assets.tgz':
@ -27335,6 +27335,7 @@ snapshots:
'@rush-temp/tool@file:projects/tool.tgz(@babel/core@7.23.9)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.23.9))(gcp-metadata@5.3.0(encoding@0.1.13))(snappy@7.2.2)(socks@2.8.3)':
dependencies:
'@elastic/elasticsearch': 7.17.14
'@faker-js/faker': 8.4.1
'@types/jest': 29.5.12
'@types/mime-types': 2.1.4
'@types/minio': 7.0.18

View File

@ -175,6 +175,8 @@
"utf-8-validate": "^6.0.4",
"msgpackr": "^1.11.2",
"msgpackr-extract": "^3.0.3",
"@hcengineering/kafka": "^0.6.0"
"@hcengineering/kafka": "^0.6.0",
"@hcengineering/api-client": "^0.6.0",
"@faker-js/faker": "^8.4.1"
}
}

View File

@ -34,7 +34,7 @@ import {
createStorageBackupStorage,
restore
} from '@hcengineering/server-backup'
import serverClientPlugin, { getAccountClient } from '@hcengineering/server-client'
import serverClientPlugin, { getAccountClient, getTransactorEndpoint } from '@hcengineering/server-client'
import {
registerAdapterFactory,
registerDestroyFactory,
@ -47,6 +47,7 @@ import {
import serverToken, { generateToken } from '@hcengineering/server-token'
import { createWorkspace, upgradeWorkspace } from '@hcengineering/workspace-service'
import { faker } from '@faker-js/faker'
import { getPlatformQueue } from '@hcengineering/kafka'
import { buildStorageFromConfig, createStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import { program, type Command } from 'commander'
@ -56,7 +57,9 @@ import {
AccountRole,
MeasureMetricsContext,
metricsToString,
SocialIdType,
systemAccountUuid,
type AccountUuid,
type Data,
type Doc,
type PersonId,
@ -93,17 +96,19 @@ import { getAccountDBUrl, getKvsUrl, getMongoDBUrl } from './__start'
// import { fillGithubUsers, fixAccountEmails, renameAccount } from './account'
import { changeConfiguration } from './configuration'
import { performGithubAccountMigrations } from './github'
import {
migrateCreatedModifiedBy,
ensureGlobalPersonsForLocalAccounts,
moveAccountDbFromMongoToPG,
migrateMergedAccounts,
filterMergedAccountsInMembers
} from './db'
import { getToolToken, getWorkspace, getWorkspaceTransactorEndpoint } from './utils'
import { performGmailAccountMigrations } from './gmail'
import { performCalendarAccountMigrations } from './calendar'
import {
ensureGlobalPersonsForLocalAccounts,
filterMergedAccountsInMembers,
migrateCreatedModifiedBy,
migrateMergedAccounts,
moveAccountDbFromMongoToPG
} from './db'
import { performGithubAccountMigrations } from './github'
import { performGmailAccountMigrations } from './gmail'
import { getToolToken, getWorkspace, getWorkspaceTransactorEndpoint } from './utils'
import { createRestClient } from '@hcengineering/api-client'
const colorConstants = {
colorRed: '\u001b[31m',
@ -1659,13 +1664,39 @@ export function devTool (
// })
// })
// program
// .command('generate-token <name> <workspace>')
// .description('generate token')
// .option('--admin', 'Generate token with admin access', false)
// .action(async (name: string, workspace: string, opt: { admin: boolean }) => {
// console.log(generateToken(name, getWorkspaceId(workspace), { ...(opt.admin ? { admin: 'true' } : {}) }))
// })
program
.command('generate-token <name> <workspace>')
.description('generate token')
.option('--admin', 'Generate token with admin access', false)
.action(async (name: string, workspace: string, opt: { admin: boolean }) => {
await withAccountDatabase(async (db) => {
const account = await db.socialId.findOne({ key: name })
console.log(
generateToken(account?.personUuid ?? (name as AccountUuid), workspace as WorkspaceUuid, {
...(opt.admin ? { admin: 'true' } : {})
})
)
})
})
program
.command('generate-persons <workspace>')
.description('generate a random persons into workspace')
.option('--admin', 'Generate token with admin access', false)
.option('--count <count>', 'Number of persons to generate', '1000')
.action(async (workspace: string, opt: { admin: boolean, count: string }) => {
const count = parseInt(opt.count)
const token = generateToken(systemAccountUuid, workspace as WorkspaceUuid, {
...(opt.admin ? { admin: 'true' } : {})
})
const endpoint = await getTransactorEndpoint(token, 'external')
const client = createRestClient(endpoint, workspace, token)
for (let i = 0; i < count; i++) {
const email = `${faker.internet.email()}`
await client.ensurePerson(SocialIdType.EMAIL, email, faker.person.firstName(), faker.person.lastName())
}
})
// program
// .command('decode-token <token>')
// .description('decode token')

View File

@ -36,12 +36,12 @@ import {
type TxResult,
type WithLookup
} from '@hcengineering/core'
import { PlatformError, unknownError } from '@hcengineering/platform'
import { PlatformError, type Status, unknownError } from '@hcengineering/platform'
import type { RestClient } from './types'
import { AuthOptions } from '../types'
import { extractJson, withRetry } from './utils'
import { getWorkspaceToken } from '../utils'
import type { RestClient } from './types'
import { extractJson, withRetry } from './utils'
export function createRestClient (endpoint: string, workspaceId: string, token: string): RestClient {
return new RestClientImpl(endpoint, workspaceId, token)
@ -62,6 +62,7 @@ export class RestClientImpl implements RestClient {
endpoint: string
slowDownTimer = 0
currentRateLimit: { remaining: number, limit: number } = { remaining: 1000, limit: 1000 }
remaining: number = 1000
limit: number = 1000
@ -103,12 +104,13 @@ export class RestClientImpl implements RestClient {
params.append('options', JSON.stringify(options))
}
const requestUrl = concatLink(this.endpoint, `/api/v1/find-all/${this.workspace}?${params.toString()}`)
const result = await withRetry(async () => {
const result = await withRetry<FindResult<T> & { error?: Status }>(async () => {
const response = await fetch(requestUrl, this.requestInit())
if (!response.ok) {
await this.checkRateLimits(response)
throw new PlatformError(unknownError(response.statusText))
}
this.updateRateLimit(response)
return await extractJson<FindResult<T>>(response)
}, isRLE)
@ -122,9 +124,9 @@ export class RestClientImpl implements RestClient {
if (d.$lookup !== undefined) {
for (const [k, v] of Object.entries(d.$lookup)) {
if (!Array.isArray(v)) {
d.$lookup[k] = result.lookupMap[v as any]
;(d as any).$lookup[k] = result.lookupMap[v]
} else {
d.$lookup[k] = v.map((it) => result.lookupMap?.[it])
;(d as any).$lookup[k] = v.map((it) => result.lookupMap?.[it])
}
}
}
@ -140,8 +142,8 @@ export class RestClientImpl implements RestClient {
}
for (const [k, v] of Object.entries(query)) {
if (typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') {
if (doc[k] == null) {
doc[k] = v
if ((doc as any)[k] == null) {
;(doc as any)[k] = v
}
}
}
@ -150,20 +152,42 @@ export class RestClientImpl implements RestClient {
return result
}
private async checkRate (): Promise<void> {
if (this.currentRateLimit.remaining < this.currentRateLimit.limit / 3) {
if (this.slowDownTimer < 50) {
this.slowDownTimer += 50
}
this.slowDownTimer++
} else if (this.slowDownTimer > 0) {
this.slowDownTimer--
}
if (this.slowDownTimer > 0) {
// We need to wait a bit to avoid ban.
await new Promise((resolve) => setTimeout(resolve, this.slowDownTimer))
}
}
private updateRateLimit (response: Response): void {
const rateLimitLimit: number = parseInt(response.headers.get('X-RateLimit-Limit') ?? '100')
const remaining: number = parseInt(response.headers.get('X-RateLimit-Remaining') ?? '100')
this.currentRateLimit = { remaining, limit: rateLimitLimit }
}
private async checkRateLimits (response: Response): Promise<void> {
if (response.status === 429) {
// Extract rate limit information from headers
const retryAfter = response.headers.get('Retry-After')
const retryAfterMS = response.headers.get('Retry-After-ms')
const rateLimitReset = response.headers.get('X-RateLimit-Reset')
// const rateLimitLimit: string | null = response.headers.get('X-RateLimit-Limit')
this.updateRateLimit(response)
const waitTime =
retryAfter != null
? parseInt(retryAfter)
(retryAfterMS != null ? parseInt(retryAfterMS) : undefined) ??
(retryAfter != null
? parseInt(retryAfter) * 1000
: rateLimitReset != null
? new Date(parseInt(rateLimitReset)).getTime() - Date.now()
: 1000 // Default to 1 seconds if no headers are provided
console.warn(`Rate limit exceeded. Waiting ${Math.round((10 * waitTime) / 1000) / 10} seconds before retrying...`)
: 1000) // Default to 1 seconds if no headers are provided
await new Promise((resolve) => setTimeout(resolve, waitTime))
throw new Error(rateLimitError)
}
@ -171,28 +195,47 @@ export class RestClientImpl implements RestClient {
async getAccount (): Promise<Account> {
const requestUrl = concatLink(this.endpoint, `/api/v1/account/${this.workspace}`)
const response = await fetch(requestUrl, this.requestInit())
if (!response.ok) {
throw new PlatformError(unknownError(response.statusText))
await this.checkRate()
const result = await withRetry<Account & { error?: Status }>(async () => {
const response = await fetch(requestUrl, this.requestInit())
if (!response.ok) {
await this.checkRateLimits(response)
throw new PlatformError(unknownError(response.statusText))
}
this.updateRateLimit(response)
return await extractJson<Account>(response)
})
if (result.error !== undefined) {
throw new PlatformError(result.error)
}
return await extractJson<Account>(response)
return result
}
async getModel (): Promise<{ hierarchy: Hierarchy, model: ModelDb }> {
const requestUrl = concatLink(this.endpoint, `/api/v1/load-model/${this.workspace}`)
const response = await fetch(requestUrl, this.requestInit())
if (!response.ok) {
throw new PlatformError(unknownError(response.statusText))
await this.checkRate()
const result = await withRetry<{ hierarchy: Hierarchy, model: ModelDb, error?: Status }>(async () => {
const response = await fetch(requestUrl, this.requestInit())
if (!response.ok) {
await this.checkRateLimits(response)
throw new PlatformError(unknownError(response.statusText))
}
this.updateRateLimit(response)
const modelResponse: Tx[] = await extractJson<Tx[]>(response)
const hierarchy = new Hierarchy()
const model = new ModelDb(hierarchy)
const ctx = new MeasureMetricsContext('loadModel', {})
buildModel(ctx, modelResponse, (txes: Tx[]) => txes, hierarchy, model)
return { hierarchy, model }
}, isRLE)
if (result.error !== undefined) {
throw new PlatformError(result.error)
}
const modelResponse: Tx[] = await extractJson<Tx[]>(response)
const hierarchy = new Hierarchy()
const model = new ModelDb(hierarchy)
const ctx = new MeasureMetricsContext('loadModel', {})
buildModel(ctx, modelResponse, (txes: Tx[]) => txes, hierarchy, model)
return { hierarchy, model }
return result
}
async findOne<T extends Doc>(
@ -205,7 +248,8 @@ export class RestClientImpl implements RestClient {
async tx (tx: Tx): Promise<TxResult> {
const requestUrl = concatLink(this.endpoint, `/api/v1/tx/${this.workspace}`)
const result = await withRetry(async () => {
await this.checkRate()
const result = await withRetry<TxResult & { error?: Status }>(async () => {
const response = await fetch(requestUrl, {
method: 'POST',
headers: this.jsonHeaders(),
@ -216,6 +260,7 @@ export class RestClientImpl implements RestClient {
await this.checkRateLimits(response)
throw new PlatformError(unknownError(response.statusText))
}
this.updateRateLimit(response)
return await extractJson<TxResult>(response)
}, isRLE)
if (result.error !== undefined) {
@ -225,27 +270,35 @@ export class RestClientImpl implements RestClient {
}
async searchFulltext (query: SearchQuery, options: SearchOptions): Promise<SearchResult> {
const params = new URLSearchParams()
params.append('query', query.query)
if (query.classes != null && Object.keys(query.classes).length > 0) {
params.append('classes', JSON.stringify(query.classes))
}
if (query.spaces != null && Object.keys(query.spaces).length > 0) {
params.append('spaces', JSON.stringify(query.spaces))
}
if (options.limit != null) {
params.append('limit', `${options.limit}`)
}
const requestUrl = concatLink(this.endpoint, `/api/v1/search-fulltext/${this.workspace}?${params.toString()}`)
const response = await fetch(requestUrl, {
method: 'GET',
headers: this.jsonHeaders(),
keepalive: true
const result = await withRetry<SearchResult & { error?: Status }>(async () => {
const params = new URLSearchParams()
params.append('query', query.query)
if (query.classes != null && Object.keys(query.classes).length > 0) {
params.append('classes', JSON.stringify(query.classes))
}
if (query.spaces != null && Object.keys(query.spaces).length > 0) {
params.append('spaces', JSON.stringify(query.spaces))
}
if (options.limit != null) {
params.append('limit', `${options.limit}`)
}
const requestUrl = concatLink(this.endpoint, `/api/v1/search-fulltext/${this.workspace}?${params.toString()}`)
const response = await fetch(requestUrl, {
method: 'GET',
headers: this.jsonHeaders(),
keepalive: true
})
if (!response.ok) {
await this.checkRateLimits(response)
throw new PlatformError(unknownError(response.statusText))
}
this.updateRateLimit(response)
return await extractJson<TxResult>(response)
})
if (!response.ok) {
throw new PlatformError(unknownError(response.statusText))
if (result.error !== undefined) {
throw new PlatformError(result.error)
}
return await extractJson<TxResult>(response)
return result
}
async ensurePerson (
@ -255,20 +308,29 @@ export class RestClientImpl implements RestClient {
lastName: string
): Promise<{ uuid: PersonUuid, socialId: PersonId, localPerson: string }> {
const requestUrl = concatLink(this.endpoint, `/api/v1/ensure-person/${this.workspace}`)
const response = await fetch(requestUrl, {
method: 'POST',
headers: this.jsonHeaders(),
keepalive: true,
body: JSON.stringify({
socialType,
socialValue,
firstName,
lastName
await this.checkRate()
const result = await withRetry(async () => {
const response = await fetch(requestUrl, {
method: 'POST',
headers: this.jsonHeaders(),
keepalive: true,
body: JSON.stringify({
socialType,
socialValue,
firstName,
lastName
})
})
})
if (!response.ok) {
throw new PlatformError(unknownError(response.statusText))
if (!response.ok) {
await this.checkRateLimits(response)
throw new PlatformError(unknownError(response.statusText))
}
this.updateRateLimit(response)
return await extractJson<{ uuid: PersonUuid, socialId: PersonId, localPerson: string }>(response)
}, isRLE)
if (result.error !== undefined) {
throw new PlatformError(result.error)
}
return await extractJson<{ uuid: PersonUuid, socialId: PersonId, localPerson: string }>(response)
return result
}
}

View File

@ -310,6 +310,9 @@ class Connection implements ClientConnection {
)
this.currentRateLimit = resp.rateLimit
if (this.currentRateLimit.remaining < this.currentRateLimit.limit / 3) {
if (this.slowDownTimer < 50) {
this.slowDownTimer += 50
}
this.slowDownTimer++
} else if (this.slowDownTimer > 0) {
this.slowDownTimer--

View File

@ -65,8 +65,8 @@ function rateLimitToHeaders (rateLimit?: RateLimitInfo): OutgoingHttpHeaders {
}
const { remaining, limit, reset, retryAfter } = rateLimit
return {
'Retry-After': `${Math.max(retryAfter ?? 0, 1)}`,
'Retry-After-ms': `${retryAfter ?? 0}`,
'Retry-After': `${Math.max(Math.round((retryAfter ?? 0) / 1000), 1)}`,
'Retry-After-ms': `${retryAfter ?? 1000}`,
'X-RateLimit-Limit': `${limit}`,
'X-RateLimit-Remaining': `${remaining}`,
'X-RateLimit-Reset': `${reset}`
@ -188,8 +188,8 @@ export function registerRPC (app: Express, sessions: SessionManager, ctx: Measur
...keepAliveOptions,
'Content-Type': 'application/json',
'Cache-Control': 'no-cache',
'Retry-After': `${Math.max((retryAfter ?? 0) / 1000, 1)}`,
'Retry-After-ms': `${retryAfter ?? 0}`,
'Retry-After': `${Math.max(Math.round((retryAfter ?? 0) / 1000), 1)}`,
'Retry-After-ms': `${retryAfter ?? 1000}`,
'X-RateLimit-Limit': `${limit}`,
'X-RateLimit-Remaining': `${remaining}`,
'X-RateLimit-Reset': `${reset}`

View File

@ -277,7 +277,7 @@ async function getCreateReferencesTxes (
refs.push(...attrReferences)
} else if (attr.type._class === core.class.TypeCollaborativeDoc) {
const blobId = (createdDoc as any)[attr.name] as Ref<Blob>
if (blobId != null) {
if (blobId != null && blobId !== '') {
try {
const buffer = await storage.read(ctx, control.workspace, blobId)
const markup = Buffer.concat(buffer as any).toString()

View File

@ -44,12 +44,11 @@ export class SlidingWindowRateLimitter {
rateLimit.requests.push(now + (rateLimit.rejectedRequests > this.rateLimitMax * 2 ? this.rateLimitWindow * 5 : 0))
}
if (rateLimit.requests.length > this.rateLimitMax) {
if (rateLimit.requests.length >= this.rateLimitMax) {
rateLimit.rejectedRequests++
// Find when the oldest request will exit the window
const someRequest = Math.round(Math.random() * rateLimit.requests.length)
const nextAvailableTime = rateLimit.requests[someRequest] + this.rateLimitWindow
const nextAvailableTime = rateLimit.requests[0] + this.rateLimitWindow
return {
remaining: 0,

View File

@ -1211,7 +1211,7 @@ export class TSessionManager implements SessionManager {
requestCtx: MeasureContext,
service: S,
ws: ConnectionSocket,
operation: (ctx: ClientSessionCtx) => Promise<void>
operation: (ctx: ClientSessionCtx, rateLimit: RateLimitInfo | undefined) => Promise<void>
): Promise<RateLimitInfo | undefined> {
const rateLimitStatus = this.limitter.checkRateLimit(service.getUser())
// If remaining is 0, rate limit is exceeded
@ -1252,7 +1252,7 @@ export class TSessionManager implements SessionManager {
ws,
rateLimitStatus
)
await operation(uctx)
await operation(uctx, rateLimitStatus)
})
} catch (err: any) {
Analytics.handleError(err)