diff --git a/packages/core/src/utils.ts b/packages/core/src/utils.ts index 075f0f535e..2fe36d2e10 100644 --- a/packages/core/src/utils.ts +++ b/packages/core/src/utils.ts @@ -13,9 +13,8 @@ // limitations under the License. // -import { getEmbeddedLabel, IntlString, PlatformError, unknownError } from '@hcengineering/platform' +import { getEmbeddedLabel, IntlString } from '@hcengineering/platform' import { deepEqual } from 'fast-equals' -import { DOMAIN_BENCHMARK } from './benchmark' import { Account, AccountRole, @@ -47,6 +46,7 @@ import { TxOperations } from './operations' import { isPredicate } from './predicate' import { DocumentQuery, FindResult } from './storage' import { DOMAIN_TX } from './tx' +import { DOMAIN_BENCHMARK } from './benchmark' function toHex (value: number, chars: number): string { const result = value.toString(16) @@ -355,6 +355,7 @@ export class DocManager implements IDocManager { export class RateLimiter { idCounter: number = 0 + processingQueue = new Map>() last: number = 0 rate: number @@ -365,21 +366,21 @@ export class RateLimiter { } notify: (() => void)[] = [] - finished: boolean = false async exec = any>(op: (args?: B) => Promise, args?: B): Promise { - if (this.finished) { - throw new PlatformError(unknownError('No Possible to add/exec on finished queue')) - } - while (this.notify.length >= this.rate) { + const processingId = this.idCounter++ + + while (this.processingQueue.size >= this.rate) { await new Promise((resolve) => { this.notify.push(resolve) }) } try { const p = op(args) + this.processingQueue.set(processingId, p as Promise) return await p } finally { + this.processingQueue.delete(processingId) const n = this.notify.shift() if (n !== undefined) { n() @@ -388,7 +389,7 @@ export class RateLimiter { } async add = any>(op: (args?: B) => Promise, args?: B): Promise { - if (this.notify.length < this.rate) { + if (this.processingQueue.size < this.rate) { void this.exec(op, args) } else { await this.exec(op, args) @@ -396,12 +397,7 @@ export class RateLimiter { } async waitProcessing (): Promise { - this.finished = true - while (this.notify.length > 0) { - await new Promise((resolve) => { - this.notify.push(resolve) - }) - } + await Promise.all(this.processingQueue.values()) } }