diff --git a/packages/core/src/__tests__/limits.test.ts b/packages/core/src/__tests__/limits.test.ts new file mode 100644 index 0000000000..5a7c15d63e --- /dev/null +++ b/packages/core/src/__tests__/limits.test.ts @@ -0,0 +1,118 @@ +// +// Copyright © 2023 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { TimeRateLimiter } from '../utils' + +describe('TimeRateLimiter', () => { + it('should limit rate of executions', async () => { + jest.useFakeTimers() + const limiter = new TimeRateLimiter(2, 1000) // 2 executions per second + const mockFn = jest.fn().mockResolvedValue('result') + const operations: Promise[] = [] + + // Try to execute 4 operations + for (let i = 0; i < 4; i++) { + operations.push(limiter.exec(mockFn)) + } + + // First 2 should execute immediately + expect(mockFn).toHaveBeenCalledTimes(2) + + // Advance time by 1 second + jest.advanceTimersByTime(1001) + await Promise.resolve() + + // Next 2 should execute after time advance + expect(mockFn).toHaveBeenCalledTimes(4) + + await Promise.all(operations) + }) + + it('should cleanup old executions', async () => { + jest.useFakeTimers() + const limiter = new TimeRateLimiter(2, 1000) + const mockFn = jest.fn().mockResolvedValue('result') + + // Execute first operation + await limiter.exec(mockFn) + expect(mockFn).toHaveBeenCalledTimes(1) + expect(limiter.executions.length).toBe(1) + + // Advance time past period + jest.advanceTimersByTime(1001) + + // Execute another operation + await limiter.exec(mockFn) + expect(mockFn).toHaveBeenCalledTimes(2) + expect(limiter.executions.length).toBe(1) // Old execution should be cleaned up + }) + + it('should handle concurrent operations', async () => { + jest.useFakeTimers() + const limiter = new TimeRateLimiter(2, 1000) + const mockFn = jest.fn().mockImplementation(async () => { + console.log('start#') + await new Promise((resolve) => setTimeout(resolve, 450)) + console.log('finished#') + return 'result' + }) + + const operations = Promise.all([limiter.exec(mockFn), limiter.exec(mockFn), limiter.exec(mockFn)]) + + expect(mockFn).toHaveBeenCalledTimes(2) + expect(limiter.processingQueue.size).toBe(2) + + jest.advanceTimersByTime(500) + await Promise.resolve() + await Promise.resolve() + jest.advanceTimersByTime(1000) + await Promise.resolve() + + jest.advanceTimersByTime(2001) + await Promise.resolve() + await Promise.resolve() + + expect(limiter.processingQueue.size).toBe(0) + + expect(mockFn).toHaveBeenCalledTimes(3) + + await operations + }) + + it('should wait for processing to complete', async () => { + jest.useFakeTimers() + const limiter = new TimeRateLimiter(2, 1000) + const mockFn = jest.fn().mockImplementation(async () => { + await new Promise((resolve) => setTimeout(resolve, 500)) + return 'result' + }) + + const operation = limiter.exec(mockFn) + const waitPromise = limiter.waitProcessing().then(() => { + console.log('wait complete') + }) + + expect(limiter.processingQueue.size).toBe(1) + + jest.advanceTimersByTime(1001) + await Promise.resolve() + await Promise.resolve() + await Promise.resolve() + + await waitPromise + await operation + expect(limiter.processingQueue.size).toBe(0) + }) +}) diff --git a/packages/core/src/utils.ts b/packages/core/src/utils.ts index 3d634653af..61289073fc 100644 --- a/packages/core/src/utils.ts +++ b/packages/core/src/utils.ts @@ -839,3 +839,68 @@ export function pluginFilterTx ( systemTx = systemTx.filter((t) => !totalExcluded.has(t._id)) return systemTx } + +/** + * @public + */ + +export class TimeRateLimiter { + idCounter: number = 0 + processingQueue = new Map>() + last: number = 0 + rate: number + period: number + executions: { time: number, running: boolean }[] = [] + + queue: (() => Promise)[] = [] + notify: (() => void)[] = [] + + constructor (rate: number, period: number = 1000) { + this.rate = rate + this.period = period + } + + private cleanupExecutions (): void { + const now = Date.now() + this.executions = this.executions.filter((time) => time.running || now - time.time < this.period) + } + + async exec = any>(op: (args?: B) => Promise, args?: B): Promise { + const processingId = this.idCounter++ + + while (this.processingQueue.size >= this.rate || this.executions.length >= this.rate) { + this.cleanupExecutions() + if (this.executions.length < this.rate) { + break + } + await new Promise((resolve) => { + setTimeout(resolve, this.period / this.rate) + }) + } + + const v = { time: Date.now(), running: true } + try { + this.executions.push(v) + const p = op(args) + this.processingQueue.set(processingId, p as Promise) + return await p + } finally { + v.running = false + this.processingQueue.delete(processingId) + this.cleanupExecutions() + const n = this.notify.shift() + if (n !== undefined) { + n() + } + } + } + + async waitProcessing (): Promise { + while (this.processingQueue.size > 0) { + console.log('wait', this.processingQueue.size) + await new Promise((resolve) => { + this.notify.push(resolve) + }) + } + } +} diff --git a/services/github/pod-github/src/client.ts b/services/github/pod-github/src/client.ts index 11fedcde1c..9093844c54 100644 --- a/services/github/pod-github/src/client.ts +++ b/services/github/pod-github/src/client.ts @@ -19,7 +19,7 @@ export async function createPlatformClient ( workspace: string, timeout: number, reconnect?: (event: ClientConnectEvent, data: any) => Promise -): Promise { +): Promise<{ client: Client, endpoint: string }> { setMetadata(client.metadata.ClientSocketFactory, (url) => { return new WebSocket(url, { headers: { @@ -45,5 +45,5 @@ export async function createPlatformClient ( onConnect: reconnect }) - return connection + return { client: connection, endpoint } } diff --git a/services/github/pod-github/src/config.ts b/services/github/pod-github/src/config.ts index 3be2097fc3..4dbf37ad69 100644 --- a/services/github/pod-github/src/config.ts +++ b/services/github/pod-github/src/config.ts @@ -29,6 +29,9 @@ interface Config { BrandingPath: string WorkspaceInactivityInterval: number // Interval in days to stop workspace synchronization if not visited + + // Limits + RateLimit: number } const envMap: { [key in keyof Config]: string } = { @@ -55,7 +58,11 @@ const envMap: { [key in keyof Config]: string } = { SentryDSN: 'SENTRY_DSN', BrandingPath: 'BRANDING_PATH', - WorkspaceInactivityInterval: 'WORKSPACE_INACTIVITY_INTERVAL' + WorkspaceInactivityInterval: 'WORKSPACE_INACTIVITY_INTERVAL', + + // Limits + + RateLimit: 'RATE_LIMIT' // Operations per second for one transactor } const required: Array = [ @@ -101,7 +108,8 @@ const config: Config = (() => { SentryDSN: process.env[envMap.SentryDSN], BrandingPath: process.env[envMap.BrandingPath] ?? '', - WorkspaceInactivityInterval: parseInt(process.env[envMap.WorkspaceInactivityInterval] ?? '5') // In days + WorkspaceInactivityInterval: parseInt(process.env[envMap.WorkspaceInactivityInterval] ?? '5'), // In days + RateLimit: parseInt(process.env[envMap.RateLimit] ?? '25') } const missingEnv = required.filter((key) => params[key] === undefined).map((key) => envMap[key]) diff --git a/services/github/pod-github/src/platform.ts b/services/github/pod-github/src/platform.ts index b80dd8de88..d10e7319db 100644 --- a/services/github/pod-github/src/platform.ts +++ b/services/github/pod-github/src/platform.ts @@ -15,6 +15,7 @@ import core, { RateLimiter, Ref, systemAccountEmail, + TimeRateLimiter, TxOperations } from '@hcengineering/core' import github, { GithubAuthentication, makeQuery, type GithubIntegration } from '@hcengineering/github' @@ -73,6 +74,8 @@ export class PlatformWorker { userManager!: UserManager + rateLimits = new Map() + private constructor ( readonly ctx: MeasureContext, readonly app: App, @@ -83,6 +86,15 @@ export class PlatformWorker { registerLoaders() } + getRateLimiter (endpoint: string): TimeRateLimiter { + let limiter = this.rateLimits.get(endpoint) + if (limiter === undefined) { + limiter = new TimeRateLimiter(config.RateLimit) + this.rateLimits.set(endpoint, limiter) + } + return limiter + } + public async initStorage (): Promise { this.mongoRef = getMongoClient(config.MongoURL) const mongoClient = await this.mongoRef.getClient() @@ -230,7 +242,7 @@ export class PlatformWorker { } else { let client: Client | undefined try { - client = await createPlatformClient(oldWorkspace, 30000) + ;({ client } = await createPlatformClient(oldWorkspace, 30000)) await this.removeInstallationFromWorkspace(oldWorker, installationId) await client.close() } catch (err: any) { @@ -386,7 +398,7 @@ export class PlatformWorker { platformClient = this.clients.get(payload.workspace)?.client if (platformClient === undefined) { shouldClose = true - platformClient = await createPlatformClient(payload.workspace, 30000) + ;({ client: platformClient } = await createPlatformClient(payload.workspace, 30000)) } const client = new TxOperations(platformClient, payload.accountId) diff --git a/services/github/pod-github/src/sync/comments.ts b/services/github/pod-github/src/sync/comments.ts index 3492c30599..c9e5357924 100644 --- a/services/github/pod-github/src/sync/comments.ts +++ b/services/github/pod-github/src/sync/comments.ts @@ -355,6 +355,10 @@ export class CommentSyncManager implements DocSyncManager { } } + isHulyLinkComment (message: string): boolean { + return message.includes('

Connected to') && message.includes('Huly®') + } + private async createComment ( info: DocSyncInfo, messageData: MessageData, @@ -367,6 +371,11 @@ export class CommentSyncManager implements DocSyncManager { ...messageData, attachments: 0 } + // Check if it is Connected message. + if ((comment as any).performed_via_github_app !== undefined && this.isHulyLinkComment(comment.body)) { + // No need to create comment on platform. + return + } await this.client.addCollection( chunter.class.ChatMessage, info.space, diff --git a/services/github/pod-github/src/worker.ts b/services/github/pod-github/src/worker.ts index 5e25b64b61..a6f5c58048 100644 --- a/services/github/pod-github/src/worker.ts +++ b/services/github/pod-github/src/worker.ts @@ -35,7 +35,8 @@ import core, { reduceCalls, toIdMap, type Blob, - type MigrationState + type MigrationState, + type TimeRateLimiter } from '@hcengineering/core' import github, { DocSyncInfo, @@ -336,6 +337,7 @@ export class GithubWorker implements IntegrationManager { private constructor ( readonly ctx: MeasureContext, + readonly limiter: TimeRateLimiter, readonly platform: PlatformWorker, readonly installations: Map, readonly client: Client, @@ -1152,23 +1154,26 @@ export class GithubWorker implements IntegrationManager { const _projects = toIdMap(projects) const _repositories = repositories.map((it) => it._id) - const docs = await this.ctx.with( - 'find-doc-sync-info', - {}, - (ctx) => - this._client.findAll( - github.class.DocSyncInfo, - { - needSync: { $ne: githubSyncVersion }, - externalVersion: { $in: [githubExternalSyncVersion, '#'] }, - space: { $in: Array.from(_projects.keys()) }, - repository: { $in: [null, ..._repositories] } - }, - { - limit: 50 - } - ), - { _projects, _repositories } + const docs = await this.limiter.exec( + async () => + await this.ctx.with( + 'find-doc-sync-info', + {}, + (ctx) => + this._client.findAll( + github.class.DocSyncInfo, + { + needSync: { $ne: githubSyncVersion }, + externalVersion: { $in: [githubExternalSyncVersion, '#'] }, + space: { $in: Array.from(_projects.keys()) }, + repository: { $in: [null, ..._repositories] } + }, + { + limit: 50 + } + ), + { _projects, _repositories } + ) ) // @@ -1289,98 +1294,100 @@ export class GithubWorker implements IntegrationManager { }) for (const info of orderedSyncInfo) { - try { - const existing = externalDocs.find((it) => it._id === info._id) - const mapper = this.mappers.find((it) => it._class.includes(info.objectClass))?.mapper - if (mapper === undefined) { - this.ctx.info('No mapper for class', { objectClass: info.objectClass, workspace: this.workspace.name }) - await derivedClient.update(info, { - needSync: githubSyncVersion - }) - continue - } - const repo = await this.getRepositoryById(info.repository) - if (repo !== undefined && !repo.enabled) { - continue - } - - let parent = - info.parent !== undefined - ? parents.find((it) => it.url.toLowerCase() === info.parent?.toLowerCase()) - : undefined - if ( - parent === undefined && - existing !== undefined && - this.client.getHierarchy().isDerived(existing._class, core.class.AttachedDoc) - ) { - // Find with attached parent - parent = attachedParents.find((it) => it._id === (existing as AttachedDoc).attachedTo) - } - - if (existing !== undefined && existing.space !== info.space) { - // document is moved to non github project, so for github it like delete. - const targetProject = await this.client.findOne(github.mixin.GithubProject, { - _id: existing.space as Ref - }) - if (await mapper.handleDelete(existing, info, derivedClient, false, parent)) { - const h = this._client.getHierarchy() - await derivedClient.remove(info) - if (h.hasMixin(existing, github.mixin.GithubIssue)) { - const mixinData = this._client.getHierarchy().as(existing, github.mixin.GithubIssue) - await this._client.update( - mixinData, - { - url: '', - githubNumber: 0, - repository: '' as Ref - }, - false, - Date.now(), - existing.modifiedBy - ) - } - continue - } - if (targetProject !== undefined) { - // We need to sync into new project. + await this.limiter.exec(async () => { + try { + const existing = externalDocs.find((it) => it._id === info._id) + const mapper = this.mappers.find((it) => it._class.includes(info.objectClass))?.mapper + if (mapper === undefined) { + this.ctx.info('No mapper for class', { objectClass: info.objectClass, workspace: this.workspace.name }) await derivedClient.update(info, { - external: null, - current: null, - url: '', - needSync: '', - externalVersion: '', - githubNumber: 0, - repository: null + needSync: githubSyncVersion }) + return } - } - - if (info.deleted === true) { - if (await mapper.handleDelete(existing, info, derivedClient, true)) { - await derivedClient.remove(info) + const repo = await this.getRepositoryById(info.repository) + if (repo !== undefined && !repo.enabled) { + return } - continue - } - const docUpdate = await this.ctx.withLog( - 'sync doc', - {}, - (ctx) => mapper.sync(existing, info, parent, derivedClient), - { url: info.url.toLowerCase(), workspace: this.workspace.name } - ) - if (docUpdate !== undefined) { - await derivedClient.update(info, docUpdate) + let parent = + info.parent !== undefined + ? parents.find((it) => it.url.toLowerCase() === info.parent?.toLowerCase()) + : undefined + if ( + parent === undefined && + existing !== undefined && + this.client.getHierarchy().isDerived(existing._class, core.class.AttachedDoc) + ) { + // Find with attached parent + parent = attachedParents.find((it) => it._id === (existing as AttachedDoc).attachedTo) + } + + if (existing !== undefined && existing.space !== info.space) { + // document is moved to non github project, so for github it like delete. + const targetProject = await this.client.findOne(github.mixin.GithubProject, { + _id: existing.space as Ref + }) + if (await mapper.handleDelete(existing, info, derivedClient, false, parent)) { + const h = this._client.getHierarchy() + await derivedClient.remove(info) + if (h.hasMixin(existing, github.mixin.GithubIssue)) { + const mixinData = this._client.getHierarchy().as(existing, github.mixin.GithubIssue) + await this._client.update( + mixinData, + { + url: '', + githubNumber: 0, + repository: '' as Ref + }, + false, + Date.now(), + existing.modifiedBy + ) + } + return + } + if (targetProject !== undefined) { + // We need to sync into new project. + await derivedClient.update(info, { + external: null, + current: null, + url: '', + needSync: '', + externalVersion: '', + githubNumber: 0, + repository: null + }) + } + } + + if (info.deleted === true) { + if (await mapper.handleDelete(existing, info, derivedClient, true)) { + await derivedClient.remove(info) + } + return + } + + const docUpdate = await this.ctx.withLog( + 'sync doc', + {}, + (ctx) => mapper.sync(existing, info, parent, derivedClient), + { url: info.url.toLowerCase(), workspace: this.workspace.name } + ) + if (docUpdate !== undefined) { + await derivedClient.update(info, docUpdate) + } + } catch (err: any) { + Analytics.handleError(err) + this.ctx.error('failed to sync doc', { _id: info._id, objectClass: info.objectClass, error: err }) + // Mark to stop processing of document, before restart. + await derivedClient.update(info, { + error: errorToObj(err), + needSync: githubSyncVersion, + externalVersion: githubExternalSyncVersion + }) } - } catch (err: any) { - Analytics.handleError(err) - this.ctx.error('failed to sync doc', { _id: info._id, objectClass: info.objectClass, error: err }) - // Mark to stop processing of document, before restart. - await derivedClient.update(info, { - error: errorToObj(err), - needSync: githubSyncVersion, - externalVersion: githubExternalSyncVersion - }) - } + }) } } @@ -1564,15 +1571,17 @@ export class GithubWorker implements IntegrationManager { ): Promise { ctx.info('Connecting to', { workspace: workspace.workspaceUrl, workspaceId: workspace.workspaceName }) let client: Client | undefined + let endpoint: string | undefined try { - client = await createPlatformClient(workspace.name, 30000, async (event: ClientConnectEvent) => { + ;({ client, endpoint } = await createPlatformClient(workspace.name, 30000, async (event: ClientConnectEvent) => { reconnect(workspace.name, event) - }) + })) await GithubWorker.checkIntegrations(client, installations) const worker = new GithubWorker( ctx, + platformWorker.getRateLimiter(endpoint ?? ''), platformWorker, installations, client, diff --git a/services/github/server-github-model/src/index.ts b/services/github/server-github-model/src/index.ts index c1d6c06dbe..843c8a7158 100644 --- a/services/github/server-github-model/src/index.ts +++ b/services/github/server-github-model/src/index.ts @@ -16,6 +16,10 @@ export function createModel (builder: Builder): void { trigger: serverGithub.trigger.OnProjectChanges, isAsync: true }) + builder.createDoc(serverCore.class.Trigger, core.space.Model, { + trigger: serverGithub.trigger.OnGithubBroadcast, + isAsync: false + }) // We should skip activity github mixin stuff. builder.createDoc(time.class.TodoAutomationHelper, core.space.Model, { diff --git a/services/github/server-github-resources/src/index.ts b/services/github/server-github-resources/src/index.ts index 8e49a880d4..cee1a2f07a 100644 --- a/services/github/server-github-resources/src/index.ts +++ b/services/github/server-github-resources/src/index.ts @@ -24,10 +24,28 @@ import { TriggerControl } from '@hcengineering/server-core' import time, { ToDo } from '@hcengineering/time' import tracker from '@hcengineering/tracker' +/** + * @public + */ +export async function OnGithubBroadcast (txes: Tx[], control: TriggerControl): Promise { + // Enhance broadcast to send DocSyncInfo change only to system account. + control.ctx.contextData.broadcast.targets.github = (it) => { + if (TxProcessor.isExtendsCUD(it._class)) { + if ((it as TxCUD).objectClass === github.class.DocSyncInfo) { + return [systemAccountEmail] + } + } + } + return [] +} + /** * @public */ export async function OnProjectChanges (txes: Tx[], control: TriggerControl): Promise { + // Enhance broadcast to send DocSyncInfo change only to system account. + await OnGithubBroadcast(txes, control) + const result: Tx[] = [] const cache = new Map() @@ -83,7 +101,8 @@ export async function OnProjectChanges (txes: Tx[], control: TriggerControl): Pr // eslint-disable-next-line @typescript-eslint/explicit-function-return-type export default async () => ({ trigger: { - OnProjectChanges + OnProjectChanges, + OnGithubBroadcast }, functions: { TodoDoneTester @@ -206,14 +225,6 @@ function updateSyncDoc ( data ) ) - - control.ctx.contextData.broadcast.targets.github = (it) => { - if (control.hierarchy.isDerived(it._class, core.class.TxCUD)) { - if ((it as TxCUD).objectClass === github.class.DocSyncInfo) { - return [systemAccountEmail] - } - } - } } function createSyncDoc ( @@ -249,11 +260,4 @@ function createSyncDoc ( cud.objectId as Ref ) ) - control.ctx.contextData.broadcast.targets.github = (it) => { - if (control.hierarchy.isDerived(it._class, core.class.TxCUD)) { - if ((it as TxCUD).objectClass === github.class.DocSyncInfo) { - return [systemAccountEmail] - } - } - } } diff --git a/services/github/server-github/src/index.ts b/services/github/server-github/src/index.ts index 17c44599d6..95fc808418 100644 --- a/services/github/server-github/src/index.ts +++ b/services/github/server-github/src/index.ts @@ -20,7 +20,8 @@ export const serverGithubId = 'server-github' as Plugin */ export default plugin(serverGithubId, { trigger: { - OnProjectChanges: '' as Resource + OnProjectChanges: '' as Resource, + OnGithubBroadcast: '' as Resource }, functions: { TodoDoneTester: '' as Resource