mirror of
https://github.com/hcengineering/platform.git
synced 2025-04-14 04:08:19 +00:00
Revert rate limiter changes (#6237)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
29b082fc7b
commit
7c1a1619e8
@ -13,9 +13,8 @@
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import { getEmbeddedLabel, IntlString, PlatformError, unknownError } from '@hcengineering/platform'
|
||||
import { getEmbeddedLabel, IntlString } from '@hcengineering/platform'
|
||||
import { deepEqual } from 'fast-equals'
|
||||
import { DOMAIN_BENCHMARK } from './benchmark'
|
||||
import {
|
||||
Account,
|
||||
AccountRole,
|
||||
@ -47,6 +46,7 @@ import { TxOperations } from './operations'
|
||||
import { isPredicate } from './predicate'
|
||||
import { DocumentQuery, FindResult } from './storage'
|
||||
import { DOMAIN_TX } from './tx'
|
||||
import { DOMAIN_BENCHMARK } from './benchmark'
|
||||
|
||||
function toHex (value: number, chars: number): string {
|
||||
const result = value.toString(16)
|
||||
@ -355,6 +355,7 @@ export class DocManager<T extends Doc> implements IDocManager<T> {
|
||||
|
||||
export class RateLimiter {
|
||||
idCounter: number = 0
|
||||
processingQueue = new Map<number, Promise<void>>()
|
||||
last: number = 0
|
||||
rate: number
|
||||
|
||||
@ -365,21 +366,21 @@ export class RateLimiter {
|
||||
}
|
||||
|
||||
notify: (() => void)[] = []
|
||||
finished: boolean = false
|
||||
|
||||
async exec<T, B extends Record<string, any> = any>(op: (args?: B) => Promise<T>, args?: B): Promise<T> {
|
||||
if (this.finished) {
|
||||
throw new PlatformError(unknownError('No Possible to add/exec on finished queue'))
|
||||
}
|
||||
while (this.notify.length >= this.rate) {
|
||||
const processingId = this.idCounter++
|
||||
|
||||
while (this.processingQueue.size >= this.rate) {
|
||||
await new Promise<void>((resolve) => {
|
||||
this.notify.push(resolve)
|
||||
})
|
||||
}
|
||||
try {
|
||||
const p = op(args)
|
||||
this.processingQueue.set(processingId, p as Promise<void>)
|
||||
return await p
|
||||
} finally {
|
||||
this.processingQueue.delete(processingId)
|
||||
const n = this.notify.shift()
|
||||
if (n !== undefined) {
|
||||
n()
|
||||
@ -388,7 +389,7 @@ export class RateLimiter {
|
||||
}
|
||||
|
||||
async add<T, B extends Record<string, any> = any>(op: (args?: B) => Promise<T>, args?: B): Promise<void> {
|
||||
if (this.notify.length < this.rate) {
|
||||
if (this.processingQueue.size < this.rate) {
|
||||
void this.exec(op, args)
|
||||
} else {
|
||||
await this.exec(op, args)
|
||||
@ -396,12 +397,7 @@ export class RateLimiter {
|
||||
}
|
||||
|
||||
async waitProcessing (): Promise<void> {
|
||||
this.finished = true
|
||||
while (this.notify.length > 0) {
|
||||
await new Promise<void>((resolve) => {
|
||||
this.notify.push(resolve)
|
||||
})
|
||||
}
|
||||
await Promise.all(this.processingQueue.values())
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user