diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index 416600c367..453af2c123 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -5537,7 +5537,7 @@ packages: version: 0.0.0 '@rush-temp/tool@file:projects/tool.tgz': - resolution: {integrity: sha512-97pzx1qZJlMkJmWU1JEKOII/g4WM7evf3NZuj+9uHAvqjtpfHJDZ7QYzA4P7tMz+DVmCNjqUxWQyCSvHz5MqJA==, tarball: file:projects/tool.tgz} + resolution: {integrity: sha512-M5PhT+eqpWqh6vTLuYmcQhjIYmRyoYNGc7pFOmozt4RgKb5s0V5AwfqIkPr23DiBZm7Q9aYrscp1NxXx/RObOg==, tarball: file:projects/tool.tgz} version: 0.0.0 '@rush-temp/tracker-assets@file:projects/tracker-assets.tgz': @@ -27335,6 +27335,7 @@ snapshots: '@rush-temp/tool@file:projects/tool.tgz(@babel/core@7.23.9)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.23.9))(gcp-metadata@5.3.0(encoding@0.1.13))(snappy@7.2.2)(socks@2.8.3)': dependencies: '@elastic/elasticsearch': 7.17.14 + '@faker-js/faker': 8.4.1 '@types/jest': 29.5.12 '@types/mime-types': 2.1.4 '@types/minio': 7.0.18 diff --git a/dev/tool/package.json b/dev/tool/package.json index 59edf08fe0..a074b52bb7 100644 --- a/dev/tool/package.json +++ b/dev/tool/package.json @@ -175,6 +175,8 @@ "utf-8-validate": "^6.0.4", "msgpackr": "^1.11.2", "msgpackr-extract": "^3.0.3", - "@hcengineering/kafka": "^0.6.0" + "@hcengineering/kafka": "^0.6.0", + "@hcengineering/api-client": "^0.6.0", + "@faker-js/faker": "^8.4.1" } } diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 106ac18b9a..7e0fc98c69 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -34,7 +34,7 @@ import { createStorageBackupStorage, restore } from '@hcengineering/server-backup' -import serverClientPlugin, { getAccountClient } from '@hcengineering/server-client' +import serverClientPlugin, { getAccountClient, getTransactorEndpoint } from '@hcengineering/server-client' import { registerAdapterFactory, registerDestroyFactory, @@ -47,6 +47,7 @@ import { import serverToken, { generateToken } from '@hcengineering/server-token' import { createWorkspace, upgradeWorkspace } from '@hcengineering/workspace-service' +import { faker } from '@faker-js/faker' import { getPlatformQueue } from '@hcengineering/kafka' import { buildStorageFromConfig, createStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage' import { program, type Command } from 'commander' @@ -56,7 +57,9 @@ import { AccountRole, MeasureMetricsContext, metricsToString, + SocialIdType, systemAccountUuid, + type AccountUuid, type Data, type Doc, type PersonId, @@ -93,17 +96,19 @@ import { getAccountDBUrl, getKvsUrl, getMongoDBUrl } from './__start' // import { fillGithubUsers, fixAccountEmails, renameAccount } from './account' import { changeConfiguration } from './configuration' -import { performGithubAccountMigrations } from './github' -import { - migrateCreatedModifiedBy, - ensureGlobalPersonsForLocalAccounts, - moveAccountDbFromMongoToPG, - migrateMergedAccounts, - filterMergedAccountsInMembers -} from './db' -import { getToolToken, getWorkspace, getWorkspaceTransactorEndpoint } from './utils' -import { performGmailAccountMigrations } from './gmail' import { performCalendarAccountMigrations } from './calendar' +import { + ensureGlobalPersonsForLocalAccounts, + filterMergedAccountsInMembers, + migrateCreatedModifiedBy, + migrateMergedAccounts, + moveAccountDbFromMongoToPG +} from './db' +import { performGithubAccountMigrations } from './github' +import { performGmailAccountMigrations } from './gmail' +import { getToolToken, getWorkspace, getWorkspaceTransactorEndpoint } from './utils' + +import { createRestClient } from '@hcengineering/api-client' const colorConstants = { colorRed: '\u001b[31m', @@ -1659,13 +1664,39 @@ export function devTool ( // }) // }) - // program - // .command('generate-token ') - // .description('generate token') - // .option('--admin', 'Generate token with admin access', false) - // .action(async (name: string, workspace: string, opt: { admin: boolean }) => { - // console.log(generateToken(name, getWorkspaceId(workspace), { ...(opt.admin ? { admin: 'true' } : {}) })) - // }) + program + .command('generate-token ') + .description('generate token') + .option('--admin', 'Generate token with admin access', false) + .action(async (name: string, workspace: string, opt: { admin: boolean }) => { + await withAccountDatabase(async (db) => { + const account = await db.socialId.findOne({ key: name }) + console.log( + generateToken(account?.personUuid ?? (name as AccountUuid), workspace as WorkspaceUuid, { + ...(opt.admin ? { admin: 'true' } : {}) + }) + ) + }) + }) + + program + .command('generate-persons ') + .description('generate a random persons into workspace') + .option('--admin', 'Generate token with admin access', false) + .option('--count ', 'Number of persons to generate', '1000') + .action(async (workspace: string, opt: { admin: boolean, count: string }) => { + const count = parseInt(opt.count) + const token = generateToken(systemAccountUuid, workspace as WorkspaceUuid, { + ...(opt.admin ? { admin: 'true' } : {}) + }) + const endpoint = await getTransactorEndpoint(token, 'external') + const client = createRestClient(endpoint, workspace, token) + for (let i = 0; i < count; i++) { + const email = `${faker.internet.email()}` + await client.ensurePerson(SocialIdType.EMAIL, email, faker.person.firstName(), faker.person.lastName()) + } + }) + // program // .command('decode-token ') // .description('decode token') diff --git a/packages/api-client/src/rest/rest.ts b/packages/api-client/src/rest/rest.ts index b545f73a71..075fa1c8dc 100644 --- a/packages/api-client/src/rest/rest.ts +++ b/packages/api-client/src/rest/rest.ts @@ -36,12 +36,12 @@ import { type TxResult, type WithLookup } from '@hcengineering/core' -import { PlatformError, unknownError } from '@hcengineering/platform' +import { PlatformError, type Status, unknownError } from '@hcengineering/platform' -import type { RestClient } from './types' import { AuthOptions } from '../types' -import { extractJson, withRetry } from './utils' import { getWorkspaceToken } from '../utils' +import type { RestClient } from './types' +import { extractJson, withRetry } from './utils' export function createRestClient (endpoint: string, workspaceId: string, token: string): RestClient { return new RestClientImpl(endpoint, workspaceId, token) @@ -62,6 +62,7 @@ export class RestClientImpl implements RestClient { endpoint: string slowDownTimer = 0 + currentRateLimit: { remaining: number, limit: number } = { remaining: 1000, limit: 1000 } remaining: number = 1000 limit: number = 1000 @@ -103,12 +104,13 @@ export class RestClientImpl implements RestClient { params.append('options', JSON.stringify(options)) } const requestUrl = concatLink(this.endpoint, `/api/v1/find-all/${this.workspace}?${params.toString()}`) - const result = await withRetry(async () => { + const result = await withRetry & { error?: Status }>(async () => { const response = await fetch(requestUrl, this.requestInit()) if (!response.ok) { await this.checkRateLimits(response) throw new PlatformError(unknownError(response.statusText)) } + this.updateRateLimit(response) return await extractJson>(response) }, isRLE) @@ -122,9 +124,9 @@ export class RestClientImpl implements RestClient { if (d.$lookup !== undefined) { for (const [k, v] of Object.entries(d.$lookup)) { if (!Array.isArray(v)) { - d.$lookup[k] = result.lookupMap[v as any] + ;(d as any).$lookup[k] = result.lookupMap[v] } else { - d.$lookup[k] = v.map((it) => result.lookupMap?.[it]) + ;(d as any).$lookup[k] = v.map((it) => result.lookupMap?.[it]) } } } @@ -140,8 +142,8 @@ export class RestClientImpl implements RestClient { } for (const [k, v] of Object.entries(query)) { if (typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') { - if (doc[k] == null) { - doc[k] = v + if ((doc as any)[k] == null) { + ;(doc as any)[k] = v } } } @@ -150,20 +152,42 @@ export class RestClientImpl implements RestClient { return result } + private async checkRate (): Promise { + if (this.currentRateLimit.remaining < this.currentRateLimit.limit / 3) { + if (this.slowDownTimer < 50) { + this.slowDownTimer += 50 + } + this.slowDownTimer++ + } else if (this.slowDownTimer > 0) { + this.slowDownTimer-- + } + if (this.slowDownTimer > 0) { + // We need to wait a bit to avoid ban. + await new Promise((resolve) => setTimeout(resolve, this.slowDownTimer)) + } + } + + private updateRateLimit (response: Response): void { + const rateLimitLimit: number = parseInt(response.headers.get('X-RateLimit-Limit') ?? '100') + const remaining: number = parseInt(response.headers.get('X-RateLimit-Remaining') ?? '100') + this.currentRateLimit = { remaining, limit: rateLimitLimit } + } + private async checkRateLimits (response: Response): Promise { if (response.status === 429) { // Extract rate limit information from headers const retryAfter = response.headers.get('Retry-After') + const retryAfterMS = response.headers.get('Retry-After-ms') const rateLimitReset = response.headers.get('X-RateLimit-Reset') - // const rateLimitLimit: string | null = response.headers.get('X-RateLimit-Limit') + + this.updateRateLimit(response) const waitTime = - retryAfter != null - ? parseInt(retryAfter) + (retryAfterMS != null ? parseInt(retryAfterMS) : undefined) ?? + (retryAfter != null + ? parseInt(retryAfter) * 1000 : rateLimitReset != null ? new Date(parseInt(rateLimitReset)).getTime() - Date.now() - : 1000 // Default to 1 seconds if no headers are provided - - console.warn(`Rate limit exceeded. Waiting ${Math.round((10 * waitTime) / 1000) / 10} seconds before retrying...`) + : 1000) // Default to 1 seconds if no headers are provided await new Promise((resolve) => setTimeout(resolve, waitTime)) throw new Error(rateLimitError) } @@ -171,28 +195,47 @@ export class RestClientImpl implements RestClient { async getAccount (): Promise { const requestUrl = concatLink(this.endpoint, `/api/v1/account/${this.workspace}`) - const response = await fetch(requestUrl, this.requestInit()) - if (!response.ok) { - throw new PlatformError(unknownError(response.statusText)) + await this.checkRate() + const result = await withRetry(async () => { + const response = await fetch(requestUrl, this.requestInit()) + if (!response.ok) { + await this.checkRateLimits(response) + throw new PlatformError(unknownError(response.statusText)) + } + this.updateRateLimit(response) + return await extractJson(response) + }) + if (result.error !== undefined) { + throw new PlatformError(result.error) } - return await extractJson(response) + return result } async getModel (): Promise<{ hierarchy: Hierarchy, model: ModelDb }> { const requestUrl = concatLink(this.endpoint, `/api/v1/load-model/${this.workspace}`) - const response = await fetch(requestUrl, this.requestInit()) - if (!response.ok) { - throw new PlatformError(unknownError(response.statusText)) + await this.checkRate() + const result = await withRetry<{ hierarchy: Hierarchy, model: ModelDb, error?: Status }>(async () => { + const response = await fetch(requestUrl, this.requestInit()) + if (!response.ok) { + await this.checkRateLimits(response) + throw new PlatformError(unknownError(response.statusText)) + } + this.updateRateLimit(response) + + const modelResponse: Tx[] = await extractJson(response) + + const hierarchy = new Hierarchy() + const model = new ModelDb(hierarchy) + + const ctx = new MeasureMetricsContext('loadModel', {}) + buildModel(ctx, modelResponse, (txes: Tx[]) => txes, hierarchy, model) + + return { hierarchy, model } + }, isRLE) + if (result.error !== undefined) { + throw new PlatformError(result.error) } - const modelResponse: Tx[] = await extractJson(response) - - const hierarchy = new Hierarchy() - const model = new ModelDb(hierarchy) - - const ctx = new MeasureMetricsContext('loadModel', {}) - buildModel(ctx, modelResponse, (txes: Tx[]) => txes, hierarchy, model) - - return { hierarchy, model } + return result } async findOne( @@ -205,7 +248,8 @@ export class RestClientImpl implements RestClient { async tx (tx: Tx): Promise { const requestUrl = concatLink(this.endpoint, `/api/v1/tx/${this.workspace}`) - const result = await withRetry(async () => { + await this.checkRate() + const result = await withRetry(async () => { const response = await fetch(requestUrl, { method: 'POST', headers: this.jsonHeaders(), @@ -216,6 +260,7 @@ export class RestClientImpl implements RestClient { await this.checkRateLimits(response) throw new PlatformError(unknownError(response.statusText)) } + this.updateRateLimit(response) return await extractJson(response) }, isRLE) if (result.error !== undefined) { @@ -225,27 +270,35 @@ export class RestClientImpl implements RestClient { } async searchFulltext (query: SearchQuery, options: SearchOptions): Promise { - const params = new URLSearchParams() - params.append('query', query.query) - if (query.classes != null && Object.keys(query.classes).length > 0) { - params.append('classes', JSON.stringify(query.classes)) - } - if (query.spaces != null && Object.keys(query.spaces).length > 0) { - params.append('spaces', JSON.stringify(query.spaces)) - } - if (options.limit != null) { - params.append('limit', `${options.limit}`) - } - const requestUrl = concatLink(this.endpoint, `/api/v1/search-fulltext/${this.workspace}?${params.toString()}`) - const response = await fetch(requestUrl, { - method: 'GET', - headers: this.jsonHeaders(), - keepalive: true + const result = await withRetry(async () => { + const params = new URLSearchParams() + params.append('query', query.query) + if (query.classes != null && Object.keys(query.classes).length > 0) { + params.append('classes', JSON.stringify(query.classes)) + } + if (query.spaces != null && Object.keys(query.spaces).length > 0) { + params.append('spaces', JSON.stringify(query.spaces)) + } + if (options.limit != null) { + params.append('limit', `${options.limit}`) + } + const requestUrl = concatLink(this.endpoint, `/api/v1/search-fulltext/${this.workspace}?${params.toString()}`) + const response = await fetch(requestUrl, { + method: 'GET', + headers: this.jsonHeaders(), + keepalive: true + }) + if (!response.ok) { + await this.checkRateLimits(response) + throw new PlatformError(unknownError(response.statusText)) + } + this.updateRateLimit(response) + return await extractJson(response) }) - if (!response.ok) { - throw new PlatformError(unknownError(response.statusText)) + if (result.error !== undefined) { + throw new PlatformError(result.error) } - return await extractJson(response) + return result } async ensurePerson ( @@ -255,20 +308,29 @@ export class RestClientImpl implements RestClient { lastName: string ): Promise<{ uuid: PersonUuid, socialId: PersonId, localPerson: string }> { const requestUrl = concatLink(this.endpoint, `/api/v1/ensure-person/${this.workspace}`) - const response = await fetch(requestUrl, { - method: 'POST', - headers: this.jsonHeaders(), - keepalive: true, - body: JSON.stringify({ - socialType, - socialValue, - firstName, - lastName + await this.checkRate() + const result = await withRetry(async () => { + const response = await fetch(requestUrl, { + method: 'POST', + headers: this.jsonHeaders(), + keepalive: true, + body: JSON.stringify({ + socialType, + socialValue, + firstName, + lastName + }) }) - }) - if (!response.ok) { - throw new PlatformError(unknownError(response.statusText)) + if (!response.ok) { + await this.checkRateLimits(response) + throw new PlatformError(unknownError(response.statusText)) + } + this.updateRateLimit(response) + return await extractJson<{ uuid: PersonUuid, socialId: PersonId, localPerson: string }>(response) + }, isRLE) + if (result.error !== undefined) { + throw new PlatformError(result.error) } - return await extractJson<{ uuid: PersonUuid, socialId: PersonId, localPerson: string }>(response) + return result } } diff --git a/plugins/client-resources/src/connection.ts b/plugins/client-resources/src/connection.ts index e2d04df9d9..ed86878b86 100644 --- a/plugins/client-resources/src/connection.ts +++ b/plugins/client-resources/src/connection.ts @@ -310,6 +310,9 @@ class Connection implements ClientConnection { ) this.currentRateLimit = resp.rateLimit if (this.currentRateLimit.remaining < this.currentRateLimit.limit / 3) { + if (this.slowDownTimer < 50) { + this.slowDownTimer += 50 + } this.slowDownTimer++ } else if (this.slowDownTimer > 0) { this.slowDownTimer-- diff --git a/pods/server/src/rpc.ts b/pods/server/src/rpc.ts index 39c51465c7..9cbfed63db 100644 --- a/pods/server/src/rpc.ts +++ b/pods/server/src/rpc.ts @@ -65,8 +65,8 @@ function rateLimitToHeaders (rateLimit?: RateLimitInfo): OutgoingHttpHeaders { } const { remaining, limit, reset, retryAfter } = rateLimit return { - 'Retry-After': `${Math.max(retryAfter ?? 0, 1)}`, - 'Retry-After-ms': `${retryAfter ?? 0}`, + 'Retry-After': `${Math.max(Math.round((retryAfter ?? 0) / 1000), 1)}`, + 'Retry-After-ms': `${retryAfter ?? 1000}`, 'X-RateLimit-Limit': `${limit}`, 'X-RateLimit-Remaining': `${remaining}`, 'X-RateLimit-Reset': `${reset}` @@ -188,8 +188,8 @@ export function registerRPC (app: Express, sessions: SessionManager, ctx: Measur ...keepAliveOptions, 'Content-Type': 'application/json', 'Cache-Control': 'no-cache', - 'Retry-After': `${Math.max((retryAfter ?? 0) / 1000, 1)}`, - 'Retry-After-ms': `${retryAfter ?? 0}`, + 'Retry-After': `${Math.max(Math.round((retryAfter ?? 0) / 1000), 1)}`, + 'Retry-After-ms': `${retryAfter ?? 1000}`, 'X-RateLimit-Limit': `${limit}`, 'X-RateLimit-Remaining': `${remaining}`, 'X-RateLimit-Reset': `${reset}` diff --git a/server-plugins/activity-resources/src/references.ts b/server-plugins/activity-resources/src/references.ts index d7a89c774c..adbf6b31fa 100644 --- a/server-plugins/activity-resources/src/references.ts +++ b/server-plugins/activity-resources/src/references.ts @@ -277,7 +277,7 @@ async function getCreateReferencesTxes ( refs.push(...attrReferences) } else if (attr.type._class === core.class.TypeCollaborativeDoc) { const blobId = (createdDoc as any)[attr.name] as Ref - if (blobId != null) { + if (blobId != null && blobId !== '') { try { const buffer = await storage.read(ctx, control.workspace, blobId) const markup = Buffer.concat(buffer as any).toString() diff --git a/server/rpc/src/sliding.ts b/server/rpc/src/sliding.ts index 6deb39e47d..15ffbfa275 100644 --- a/server/rpc/src/sliding.ts +++ b/server/rpc/src/sliding.ts @@ -44,12 +44,11 @@ export class SlidingWindowRateLimitter { rateLimit.requests.push(now + (rateLimit.rejectedRequests > this.rateLimitMax * 2 ? this.rateLimitWindow * 5 : 0)) } - if (rateLimit.requests.length > this.rateLimitMax) { + if (rateLimit.requests.length >= this.rateLimitMax) { rateLimit.rejectedRequests++ // Find when the oldest request will exit the window - const someRequest = Math.round(Math.random() * rateLimit.requests.length) - const nextAvailableTime = rateLimit.requests[someRequest] + this.rateLimitWindow + const nextAvailableTime = rateLimit.requests[0] + this.rateLimitWindow return { remaining: 0, diff --git a/server/server/src/sessionManager.ts b/server/server/src/sessionManager.ts index 3bf8f51d1f..d1d28710fb 100644 --- a/server/server/src/sessionManager.ts +++ b/server/server/src/sessionManager.ts @@ -1211,7 +1211,7 @@ export class TSessionManager implements SessionManager { requestCtx: MeasureContext, service: S, ws: ConnectionSocket, - operation: (ctx: ClientSessionCtx) => Promise + operation: (ctx: ClientSessionCtx, rateLimit: RateLimitInfo | undefined) => Promise ): Promise { const rateLimitStatus = this.limitter.checkRateLimit(service.getUser()) // If remaining is 0, rate limit is exceeded @@ -1252,7 +1252,7 @@ export class TSessionManager implements SessionManager { ws, rateLimitStatus ) - await operation(uctx) + await operation(uctx, rateLimitStatus) }) } catch (err: any) { Analytics.handleError(err)