UBERF-8877: Fix indexer concurrency (#7416)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-12-10 17:04:43 +07:00 committed by GitHub
parent cb9720f340
commit 9421887f01
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 6 additions and 5 deletions

1
.vscode/launch.json vendored
View File

@ -85,7 +85,6 @@
"SERVER_SECRET": "secret", "SERVER_SECRET": "secret",
"REKONI_URL": "http://localhost:4004", "REKONI_URL": "http://localhost:4004",
"MODEL_JSON": "${workspaceRoot}/models/all/bundle/model.json", "MODEL_JSON": "${workspaceRoot}/models/all/bundle/model.json",
"REGION":"pg",
"ELASTIC_INDEX_NAME": "local_storage_index", "ELASTIC_INDEX_NAME": "local_storage_index",
"STATS_URL":"http://host.docker.internal:4900", "STATS_URL":"http://host.docker.internal:4900",
"ACCOUNTS_URL": "http://localhost:3000", "ACCOUNTS_URL": "http://localhost:3000",

View File

@ -364,7 +364,8 @@ export class LiveQuery implements WithTx, Client {
total: 0, total: 0,
options: options as FindOptions<Doc>, options: options as FindOptions<Doc>,
callbacks: new Map(), callbacks: new Map(),
refresh: reduceCalls(() => this.doRefresh(q)) refresh: reduceCalls(() => this.doRefresh(q)),
refreshId: 0
} }
if (callback !== undefined) { if (callback !== undefined) {
q.callbacks.set(callback.callbackId, callback.callback as unknown as Callback) q.callbacks.set(callback.callbackId, callback.callback as unknown as Callback)
@ -787,8 +788,9 @@ export class LiveQuery implements WithTx, Client {
} }
private async doRefresh (q: Query): Promise<void> { private async doRefresh (q: Query): Promise<void> {
const qid = ++q.refreshId
const res = await this.client.findAll(q._class, q.query, q.options) const res = await this.client.findAll(q._class, q.query, q.options)
if (!deepEqual(res, q.result) || (res.total !== q.total && q.options?.total === true)) { if (q.refreshId === qid && (!deepEqual(res, q.result) || (res.total !== q.total && q.options?.total === true))) {
q.result = new ResultArray(res, this.getHierarchy()) q.result = new ResultArray(res, this.getHierarchy())
q.total = res.total q.total = res.total
await this.callback(q) await this.callback(q)

View File

@ -12,6 +12,6 @@ export interface Query {
options?: FindOptions<Doc> options?: FindOptions<Doc>
total: number total: number
callbacks: Map<string, Callback> callbacks: Map<string, Callback>
refresh: () => Promise<void> refresh: () => Promise<void>
refreshId: number
} }

View File

@ -402,7 +402,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
): Promise<{ classUpdate: Ref<Class<Doc>>[], processed: number }> { ): Promise<{ classUpdate: Ref<Class<Doc>>[], processed: number }> {
const _classUpdate = new Set<Ref<Class<Doc>>>() const _classUpdate = new Set<Ref<Class<Doc>>>()
let processed = 0 let processed = 0
await rateLimiter.add(async () => { await rateLimiter.exec(async () => {
let st = Date.now() let st = Date.now()
let groupBy = await this.storage.groupBy(ctx, DOMAIN_DOC_INDEX_STATE, 'objectClass', { needIndex: true }) let groupBy = await this.storage.groupBy(ctx, DOMAIN_DOC_INDEX_STATE, 'objectClass', { needIndex: true })