UBERF-9099: Rate limits (#7629)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2025-01-10 22:27:08 +07:00 committed by GitHub
parent c9416ed852
commit e5306b2afd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 359 additions and 129 deletions

View File

@ -0,0 +1,118 @@
//
// Copyright © 2023 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { TimeRateLimiter } from '../utils'
describe('TimeRateLimiter', () => {
it('should limit rate of executions', async () => {
jest.useFakeTimers()
const limiter = new TimeRateLimiter(2, 1000) // 2 executions per second
const mockFn = jest.fn().mockResolvedValue('result')
const operations: Promise<string>[] = []
// Try to execute 4 operations
for (let i = 0; i < 4; i++) {
operations.push(limiter.exec(mockFn))
}
// First 2 should execute immediately
expect(mockFn).toHaveBeenCalledTimes(2)
// Advance time by 1 second
jest.advanceTimersByTime(1001)
await Promise.resolve()
// Next 2 should execute after time advance
expect(mockFn).toHaveBeenCalledTimes(4)
await Promise.all(operations)
})
it('should cleanup old executions', async () => {
jest.useFakeTimers()
const limiter = new TimeRateLimiter(2, 1000)
const mockFn = jest.fn().mockResolvedValue('result')
// Execute first operation
await limiter.exec(mockFn)
expect(mockFn).toHaveBeenCalledTimes(1)
expect(limiter.executions.length).toBe(1)
// Advance time past period
jest.advanceTimersByTime(1001)
// Execute another operation
await limiter.exec(mockFn)
expect(mockFn).toHaveBeenCalledTimes(2)
expect(limiter.executions.length).toBe(1) // Old execution should be cleaned up
})
it('should handle concurrent operations', async () => {
jest.useFakeTimers()
const limiter = new TimeRateLimiter(2, 1000)
const mockFn = jest.fn().mockImplementation(async () => {
console.log('start#')
await new Promise((resolve) => setTimeout(resolve, 450))
console.log('finished#')
return 'result'
})
const operations = Promise.all([limiter.exec(mockFn), limiter.exec(mockFn), limiter.exec(mockFn)])
expect(mockFn).toHaveBeenCalledTimes(2)
expect(limiter.processingQueue.size).toBe(2)
jest.advanceTimersByTime(500)
await Promise.resolve()
await Promise.resolve()
jest.advanceTimersByTime(1000)
await Promise.resolve()
jest.advanceTimersByTime(2001)
await Promise.resolve()
await Promise.resolve()
expect(limiter.processingQueue.size).toBe(0)
expect(mockFn).toHaveBeenCalledTimes(3)
await operations
})
it('should wait for processing to complete', async () => {
jest.useFakeTimers()
const limiter = new TimeRateLimiter(2, 1000)
const mockFn = jest.fn().mockImplementation(async () => {
await new Promise((resolve) => setTimeout(resolve, 500))
return 'result'
})
const operation = limiter.exec(mockFn)
const waitPromise = limiter.waitProcessing().then(() => {
console.log('wait complete')
})
expect(limiter.processingQueue.size).toBe(1)
jest.advanceTimersByTime(1001)
await Promise.resolve()
await Promise.resolve()
await Promise.resolve()
await waitPromise
await operation
expect(limiter.processingQueue.size).toBe(0)
})
})

View File

@ -839,3 +839,68 @@ export function pluginFilterTx (
systemTx = systemTx.filter((t) => !totalExcluded.has(t._id))
return systemTx
}
/**
* @public
*/
export class TimeRateLimiter {
idCounter: number = 0
processingQueue = new Map<number, Promise<void>>()
last: number = 0
rate: number
period: number
executions: { time: number, running: boolean }[] = []
queue: (() => Promise<void>)[] = []
notify: (() => void)[] = []
constructor (rate: number, period: number = 1000) {
this.rate = rate
this.period = period
}
private cleanupExecutions (): void {
const now = Date.now()
this.executions = this.executions.filter((time) => time.running || now - time.time < this.period)
}
async exec<T, B extends Record<string, any> = any>(op: (args?: B) => Promise<T>, args?: B): Promise<T> {
const processingId = this.idCounter++
while (this.processingQueue.size >= this.rate || this.executions.length >= this.rate) {
this.cleanupExecutions()
if (this.executions.length < this.rate) {
break
}
await new Promise<void>((resolve) => {
setTimeout(resolve, this.period / this.rate)
})
}
const v = { time: Date.now(), running: true }
try {
this.executions.push(v)
const p = op(args)
this.processingQueue.set(processingId, p as Promise<void>)
return await p
} finally {
v.running = false
this.processingQueue.delete(processingId)
this.cleanupExecutions()
const n = this.notify.shift()
if (n !== undefined) {
n()
}
}
}
async waitProcessing (): Promise<void> {
while (this.processingQueue.size > 0) {
console.log('wait', this.processingQueue.size)
await new Promise<void>((resolve) => {
this.notify.push(resolve)
})
}
}
}

View File

@ -19,7 +19,7 @@ export async function createPlatformClient (
workspace: string,
timeout: number,
reconnect?: (event: ClientConnectEvent, data: any) => Promise<void>
): Promise<Client> {
): Promise<{ client: Client, endpoint: string }> {
setMetadata(client.metadata.ClientSocketFactory, (url) => {
return new WebSocket(url, {
headers: {
@ -45,5 +45,5 @@ export async function createPlatformClient (
onConnect: reconnect
})
return connection
return { client: connection, endpoint }
}

View File

@ -29,6 +29,9 @@ interface Config {
BrandingPath: string
WorkspaceInactivityInterval: number // Interval in days to stop workspace synchronization if not visited
// Limits
RateLimit: number
}
const envMap: { [key in keyof Config]: string } = {
@ -55,7 +58,11 @@ const envMap: { [key in keyof Config]: string } = {
SentryDSN: 'SENTRY_DSN',
BrandingPath: 'BRANDING_PATH',
WorkspaceInactivityInterval: 'WORKSPACE_INACTIVITY_INTERVAL'
WorkspaceInactivityInterval: 'WORKSPACE_INACTIVITY_INTERVAL',
// Limits
RateLimit: 'RATE_LIMIT' // Operations per second for one transactor
}
const required: Array<keyof Config> = [
@ -101,7 +108,8 @@ const config: Config = (() => {
SentryDSN: process.env[envMap.SentryDSN],
BrandingPath: process.env[envMap.BrandingPath] ?? '',
WorkspaceInactivityInterval: parseInt(process.env[envMap.WorkspaceInactivityInterval] ?? '5') // In days
WorkspaceInactivityInterval: parseInt(process.env[envMap.WorkspaceInactivityInterval] ?? '5'), // In days
RateLimit: parseInt(process.env[envMap.RateLimit] ?? '25')
}
const missingEnv = required.filter((key) => params[key] === undefined).map((key) => envMap[key])

View File

@ -15,6 +15,7 @@ import core, {
RateLimiter,
Ref,
systemAccountEmail,
TimeRateLimiter,
TxOperations
} from '@hcengineering/core'
import github, { GithubAuthentication, makeQuery, type GithubIntegration } from '@hcengineering/github'
@ -73,6 +74,8 @@ export class PlatformWorker {
userManager!: UserManager
rateLimits = new Map<string, TimeRateLimiter>()
private constructor (
readonly ctx: MeasureContext,
readonly app: App,
@ -83,6 +86,15 @@ export class PlatformWorker {
registerLoaders()
}
getRateLimiter (endpoint: string): TimeRateLimiter {
let limiter = this.rateLimits.get(endpoint)
if (limiter === undefined) {
limiter = new TimeRateLimiter(config.RateLimit)
this.rateLimits.set(endpoint, limiter)
}
return limiter
}
public async initStorage (): Promise<void> {
this.mongoRef = getMongoClient(config.MongoURL)
const mongoClient = await this.mongoRef.getClient()
@ -230,7 +242,7 @@ export class PlatformWorker {
} else {
let client: Client | undefined
try {
client = await createPlatformClient(oldWorkspace, 30000)
;({ client } = await createPlatformClient(oldWorkspace, 30000))
await this.removeInstallationFromWorkspace(oldWorker, installationId)
await client.close()
} catch (err: any) {
@ -386,7 +398,7 @@ export class PlatformWorker {
platformClient = this.clients.get(payload.workspace)?.client
if (platformClient === undefined) {
shouldClose = true
platformClient = await createPlatformClient(payload.workspace, 30000)
;({ client: platformClient } = await createPlatformClient(payload.workspace, 30000))
}
const client = new TxOperations(platformClient, payload.accountId)

View File

@ -355,6 +355,10 @@ export class CommentSyncManager implements DocSyncManager {
}
}
isHulyLinkComment (message: string): boolean {
return message.includes('<p>Connected to') && message.includes('Huly&reg;')
}
private async createComment (
info: DocSyncInfo,
messageData: MessageData,
@ -367,6 +371,11 @@ export class CommentSyncManager implements DocSyncManager {
...messageData,
attachments: 0
}
// Check if it is Connected message.
if ((comment as any).performed_via_github_app !== undefined && this.isHulyLinkComment(comment.body)) {
// No need to create comment on platform.
return
}
await this.client.addCollection(
chunter.class.ChatMessage,
info.space,

View File

@ -35,7 +35,8 @@ import core, {
reduceCalls,
toIdMap,
type Blob,
type MigrationState
type MigrationState,
type TimeRateLimiter
} from '@hcengineering/core'
import github, {
DocSyncInfo,
@ -336,6 +337,7 @@ export class GithubWorker implements IntegrationManager {
private constructor (
readonly ctx: MeasureContext,
readonly limiter: TimeRateLimiter,
readonly platform: PlatformWorker,
readonly installations: Map<number, InstallationRecord>,
readonly client: Client,
@ -1152,23 +1154,26 @@ export class GithubWorker implements IntegrationManager {
const _projects = toIdMap(projects)
const _repositories = repositories.map((it) => it._id)
const docs = await this.ctx.with(
'find-doc-sync-info',
{},
(ctx) =>
this._client.findAll<DocSyncInfo>(
github.class.DocSyncInfo,
{
needSync: { $ne: githubSyncVersion },
externalVersion: { $in: [githubExternalSyncVersion, '#'] },
space: { $in: Array.from(_projects.keys()) },
repository: { $in: [null, ..._repositories] }
},
{
limit: 50
}
),
{ _projects, _repositories }
const docs = await this.limiter.exec(
async () =>
await this.ctx.with(
'find-doc-sync-info',
{},
(ctx) =>
this._client.findAll<DocSyncInfo>(
github.class.DocSyncInfo,
{
needSync: { $ne: githubSyncVersion },
externalVersion: { $in: [githubExternalSyncVersion, '#'] },
space: { $in: Array.from(_projects.keys()) },
repository: { $in: [null, ..._repositories] }
},
{
limit: 50
}
),
{ _projects, _repositories }
)
)
//
@ -1289,98 +1294,100 @@ export class GithubWorker implements IntegrationManager {
})
for (const info of orderedSyncInfo) {
try {
const existing = externalDocs.find((it) => it._id === info._id)
const mapper = this.mappers.find((it) => it._class.includes(info.objectClass))?.mapper
if (mapper === undefined) {
this.ctx.info('No mapper for class', { objectClass: info.objectClass, workspace: this.workspace.name })
await derivedClient.update<DocSyncInfo>(info, {
needSync: githubSyncVersion
})
continue
}
const repo = await this.getRepositoryById(info.repository)
if (repo !== undefined && !repo.enabled) {
continue
}
let parent =
info.parent !== undefined
? parents.find((it) => it.url.toLowerCase() === info.parent?.toLowerCase())
: undefined
if (
parent === undefined &&
existing !== undefined &&
this.client.getHierarchy().isDerived(existing._class, core.class.AttachedDoc)
) {
// Find with attached parent
parent = attachedParents.find((it) => it._id === (existing as AttachedDoc).attachedTo)
}
if (existing !== undefined && existing.space !== info.space) {
// document is moved to non github project, so for github it like delete.
const targetProject = await this.client.findOne(github.mixin.GithubProject, {
_id: existing.space as Ref<GithubProject>
})
if (await mapper.handleDelete(existing, info, derivedClient, false, parent)) {
const h = this._client.getHierarchy()
await derivedClient.remove(info)
if (h.hasMixin(existing, github.mixin.GithubIssue)) {
const mixinData = this._client.getHierarchy().as(existing, github.mixin.GithubIssue)
await this._client.update<GithubIssue>(
mixinData,
{
url: '',
githubNumber: 0,
repository: '' as Ref<GithubIntegrationRepository>
},
false,
Date.now(),
existing.modifiedBy
)
}
continue
}
if (targetProject !== undefined) {
// We need to sync into new project.
await this.limiter.exec(async () => {
try {
const existing = externalDocs.find((it) => it._id === info._id)
const mapper = this.mappers.find((it) => it._class.includes(info.objectClass))?.mapper
if (mapper === undefined) {
this.ctx.info('No mapper for class', { objectClass: info.objectClass, workspace: this.workspace.name })
await derivedClient.update<DocSyncInfo>(info, {
external: null,
current: null,
url: '',
needSync: '',
externalVersion: '',
githubNumber: 0,
repository: null
needSync: githubSyncVersion
})
return
}
}
if (info.deleted === true) {
if (await mapper.handleDelete(existing, info, derivedClient, true)) {
await derivedClient.remove(info)
const repo = await this.getRepositoryById(info.repository)
if (repo !== undefined && !repo.enabled) {
return
}
continue
}
const docUpdate = await this.ctx.withLog(
'sync doc',
{},
(ctx) => mapper.sync(existing, info, parent, derivedClient),
{ url: info.url.toLowerCase(), workspace: this.workspace.name }
)
if (docUpdate !== undefined) {
await derivedClient.update(info, docUpdate)
let parent =
info.parent !== undefined
? parents.find((it) => it.url.toLowerCase() === info.parent?.toLowerCase())
: undefined
if (
parent === undefined &&
existing !== undefined &&
this.client.getHierarchy().isDerived(existing._class, core.class.AttachedDoc)
) {
// Find with attached parent
parent = attachedParents.find((it) => it._id === (existing as AttachedDoc).attachedTo)
}
if (existing !== undefined && existing.space !== info.space) {
// document is moved to non github project, so for github it like delete.
const targetProject = await this.client.findOne(github.mixin.GithubProject, {
_id: existing.space as Ref<GithubProject>
})
if (await mapper.handleDelete(existing, info, derivedClient, false, parent)) {
const h = this._client.getHierarchy()
await derivedClient.remove(info)
if (h.hasMixin(existing, github.mixin.GithubIssue)) {
const mixinData = this._client.getHierarchy().as(existing, github.mixin.GithubIssue)
await this._client.update<GithubIssue>(
mixinData,
{
url: '',
githubNumber: 0,
repository: '' as Ref<GithubIntegrationRepository>
},
false,
Date.now(),
existing.modifiedBy
)
}
return
}
if (targetProject !== undefined) {
// We need to sync into new project.
await derivedClient.update<DocSyncInfo>(info, {
external: null,
current: null,
url: '',
needSync: '',
externalVersion: '',
githubNumber: 0,
repository: null
})
}
}
if (info.deleted === true) {
if (await mapper.handleDelete(existing, info, derivedClient, true)) {
await derivedClient.remove(info)
}
return
}
const docUpdate = await this.ctx.withLog(
'sync doc',
{},
(ctx) => mapper.sync(existing, info, parent, derivedClient),
{ url: info.url.toLowerCase(), workspace: this.workspace.name }
)
if (docUpdate !== undefined) {
await derivedClient.update(info, docUpdate)
}
} catch (err: any) {
Analytics.handleError(err)
this.ctx.error('failed to sync doc', { _id: info._id, objectClass: info.objectClass, error: err })
// Mark to stop processing of document, before restart.
await derivedClient.update<DocSyncInfo>(info, {
error: errorToObj(err),
needSync: githubSyncVersion,
externalVersion: githubExternalSyncVersion
})
}
} catch (err: any) {
Analytics.handleError(err)
this.ctx.error('failed to sync doc', { _id: info._id, objectClass: info.objectClass, error: err })
// Mark to stop processing of document, before restart.
await derivedClient.update<DocSyncInfo>(info, {
error: errorToObj(err),
needSync: githubSyncVersion,
externalVersion: githubExternalSyncVersion
})
}
})
}
}
@ -1564,15 +1571,17 @@ export class GithubWorker implements IntegrationManager {
): Promise<GithubWorker | undefined> {
ctx.info('Connecting to', { workspace: workspace.workspaceUrl, workspaceId: workspace.workspaceName })
let client: Client | undefined
let endpoint: string | undefined
try {
client = await createPlatformClient(workspace.name, 30000, async (event: ClientConnectEvent) => {
;({ client, endpoint } = await createPlatformClient(workspace.name, 30000, async (event: ClientConnectEvent) => {
reconnect(workspace.name, event)
})
}))
await GithubWorker.checkIntegrations(client, installations)
const worker = new GithubWorker(
ctx,
platformWorker.getRateLimiter(endpoint ?? ''),
platformWorker,
installations,
client,

View File

@ -16,6 +16,10 @@ export function createModel (builder: Builder): void {
trigger: serverGithub.trigger.OnProjectChanges,
isAsync: true
})
builder.createDoc(serverCore.class.Trigger, core.space.Model, {
trigger: serverGithub.trigger.OnGithubBroadcast,
isAsync: false
})
// We should skip activity github mixin stuff.
builder.createDoc(time.class.TodoAutomationHelper, core.space.Model, {

View File

@ -24,10 +24,28 @@ import { TriggerControl } from '@hcengineering/server-core'
import time, { ToDo } from '@hcengineering/time'
import tracker from '@hcengineering/tracker'
/**
* @public
*/
export async function OnGithubBroadcast (txes: Tx[], control: TriggerControl): Promise<Tx[]> {
// Enhance broadcast to send DocSyncInfo change only to system account.
control.ctx.contextData.broadcast.targets.github = (it) => {
if (TxProcessor.isExtendsCUD(it._class)) {
if ((it as TxCUD<Doc>).objectClass === github.class.DocSyncInfo) {
return [systemAccountEmail]
}
}
}
return []
}
/**
* @public
*/
export async function OnProjectChanges (txes: Tx[], control: TriggerControl): Promise<Tx[]> {
// Enhance broadcast to send DocSyncInfo change only to system account.
await OnGithubBroadcast(txes, control)
const result: Tx[] = []
const cache = new Map<string, any>()
@ -83,7 +101,8 @@ export async function OnProjectChanges (txes: Tx[], control: TriggerControl): Pr
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
export default async () => ({
trigger: {
OnProjectChanges
OnProjectChanges,
OnGithubBroadcast
},
functions: {
TodoDoneTester
@ -206,14 +225,6 @@ function updateSyncDoc (
data
)
)
control.ctx.contextData.broadcast.targets.github = (it) => {
if (control.hierarchy.isDerived(it._class, core.class.TxCUD)) {
if ((it as TxCUD<Doc>).objectClass === github.class.DocSyncInfo) {
return [systemAccountEmail]
}
}
}
}
function createSyncDoc (
@ -249,11 +260,4 @@ function createSyncDoc (
cud.objectId as Ref<DocSyncInfo>
)
)
control.ctx.contextData.broadcast.targets.github = (it) => {
if (control.hierarchy.isDerived(it._class, core.class.TxCUD)) {
if ((it as TxCUD<Doc>).objectClass === github.class.DocSyncInfo) {
return [systemAccountEmail]
}
}
}
}

View File

@ -20,7 +20,8 @@ export const serverGithubId = 'server-github' as Plugin
*/
export default plugin(serverGithubId, {
trigger: {
OnProjectChanges: '' as Resource<TriggerFunc>
OnProjectChanges: '' as Resource<TriggerFunc>,
OnGithubBroadcast: '' as Resource<TriggerFunc>
},
functions: {
TodoDoneTester: '' as Resource<TodoDoneTester>