UBERF-7520: Use Bulk for index query updates (#6012)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-07-08 14:03:35 +07:00 committed by GitHub
parent 8d471bed1b
commit e07f01bf94
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 94 additions and 46 deletions

View File

@ -79,6 +79,10 @@ export class FullTextIndexPipeline implements FullTextPipeline {
indexId = indexCounter++
updateTriggerTimer: any
updateOps = new Map<Ref<DocIndexState>, DocumentUpdate<DocIndexState>>()
uploadOps: DocIndexState[] = []
constructor (
private readonly storage: DbAdapter,
private readonly stages: FullTextPipelineStage[],
@ -96,6 +100,9 @@ export class FullTextIndexPipeline implements FullTextPipeline {
this.cancelling = true
clearTimeout(this.updateBroadcast)
clearTimeout(this.skippedReiterationTimeout)
clearInterval(this.updateTriggerTimer)
// We need to upload all bulk changes.
await this.processUpload(this.metrics)
this.triggerIndexing()
await this.indexing
await this.flush(true)
@ -168,6 +175,26 @@ export class FullTextIndexPipeline implements FullTextPipeline {
return doc
}
async processUpload (ctx: MeasureContext): Promise<void> {
const ops = this.updateOps
this.updateOps = new Map()
const toUpload = this.uploadOps
this.uploadOps = []
if (toUpload.length > 0) {
await ctx.with('upload', {}, async () => {
await this.storage.upload(this.metrics, DOMAIN_DOC_INDEX_STATE, toUpload)
})
}
if (ops.size > 0) {
await ctx.with('update', {}, async () => {
await this.storage.update(this.metrics, DOMAIN_DOC_INDEX_STATE, ops)
})
}
if (toUpload.length > 0 || ops.size > 0) {
this.triggerIndexing()
}
}
async queue (
ctx: MeasureContext,
updates: Map<Ref<DocIndexState>, { create?: DocIndexState, updated: boolean, removed: boolean }>
@ -175,27 +202,20 @@ export class FullTextIndexPipeline implements FullTextPipeline {
const entries = Array.from(updates.entries())
const uploads = entries.filter((it) => it[1].create !== undefined).map((it) => it[1].create) as DocIndexState[]
if (uploads.length > 0) {
await ctx.with('upload', {}, async () => {
await this.storage.upload(this.metrics, DOMAIN_DOC_INDEX_STATE, uploads)
})
this.uploadOps.push(...uploads)
}
const onlyUpdates = entries.filter((it) => it[1].create === undefined)
if (onlyUpdates.length > 0) {
const ops = new Map<Ref<DocIndexState>, DocumentUpdate<DocIndexState>>()
for (const u of onlyUpdates) {
const upd: DocumentUpdate<DocIndexState> = { removed: u[1].removed }
// We need to clear only first state, to prevent multiple index operations to happen.
;(upd as any)['stages.' + this.stages[0].stageId] = false
ops.set(u[0], upd)
this.updateOps.set(u[0], upd)
}
await ctx.with('upload', {}, async () => {
await this.storage.update(this.metrics, DOMAIN_DOC_INDEX_STATE, ops)
})
}
this.triggerIndexing()
}
add (doc: DocIndexState): void {
@ -288,6 +308,11 @@ export class FullTextIndexPipeline implements FullTextPipeline {
async startIndexing (): Promise<void> {
this.indexing = this.doIndexing()
clearTimeout(this.updateTriggerTimer)
this.updateTriggerTimer = setInterval(() => {
void this.processUpload(this.metrics)
}, 250)
}
async initializeStages (): Promise<void> {

View File

@ -41,40 +41,44 @@ export class BackupClientSession extends ClientSession implements BackupSession
async loadChunk (_ctx: ClientSessionCtx, domain: Domain, idx?: number, recheck?: boolean): Promise<void> {
this.lastRequest = Date.now()
await _ctx.ctx.with('load-chunk', { domain }, async (ctx) => {
idx = idx ?? this.idIndex++
let chunk: ChunkInfo | undefined = this.chunkInfo.get(idx)
if (chunk !== undefined) {
chunk.index++
if (chunk.finished === undefined) {
return {
idx,
docs: [],
finished: true
try {
idx = idx ?? this.idIndex++
let chunk: ChunkInfo | undefined = this.chunkInfo.get(idx)
if (chunk !== undefined) {
chunk.index++
if (chunk.finished === undefined) {
return {
idx,
docs: [],
finished: true
}
}
} else {
chunk = { idx, iterator: this._pipeline.storage.find(ctx, domain, recheck), finished: false, index: 0 }
this.chunkInfo.set(idx, chunk)
}
} else {
chunk = { idx, iterator: this._pipeline.storage.find(ctx, domain, recheck), finished: false, index: 0 }
this.chunkInfo.set(idx, chunk)
}
let size = 0
const docs: DocInfo[] = []
let size = 0
const docs: DocInfo[] = []
while (size < chunkSize) {
const doc = await chunk.iterator.next(ctx)
if (doc === undefined) {
chunk.finished = true
break
while (size < chunkSize) {
const doc = await chunk.iterator.next(ctx)
if (doc === undefined) {
chunk.finished = true
break
}
size += estimateDocSize(doc)
docs.push(doc)
}
size += estimateDocSize(doc)
docs.push(doc)
await _ctx.sendResponse({
idx,
docs,
finished: chunk.finished
})
} catch (err: any) {
await _ctx.sendResponse({ error: err.message })
}
await _ctx.sendResponse({
idx,
docs,
finished: chunk.finished
})
})
}
@ -92,18 +96,33 @@ export class BackupClientSession extends ClientSession implements BackupSession
async loadDocs (ctx: ClientSessionCtx, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
this.lastRequest = Date.now()
await ctx.sendResponse(await this._pipeline.storage.load(ctx.ctx, domain, docs))
try {
const result = await this._pipeline.storage.load(ctx.ctx, domain, docs)
await ctx.sendResponse(result)
} catch (err: any) {
await ctx.sendResponse({ error: err.message })
}
}
async upload (ctx: ClientSessionCtx, domain: Domain, docs: Doc[]): Promise<void> {
this.lastRequest = Date.now()
await this._pipeline.storage.upload(ctx.ctx, domain, docs)
try {
await this._pipeline.storage.upload(ctx.ctx, domain, docs)
} catch (err: any) {
await ctx.sendResponse({ error: err.message })
return
}
await ctx.sendResponse({})
}
async clean (ctx: ClientSessionCtx, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
this.lastRequest = Date.now()
await this._pipeline.storage.clean(ctx.ctx, domain, docs)
try {
await this._pipeline.storage.clean(ctx.ctx, domain, docs)
} catch (err: any) {
await ctx.sendResponse({ error: err.message })
return
}
await ctx.sendResponse({})
}
}

View File

@ -61,7 +61,6 @@ export class IssuesPage extends CommonTrackerPage {
textPopupAddAttachmentsFile = (): Locator => this.page.locator('div.popup-tooltip div.item div.name')
buttonCollapsedCategories = (): Locator => this.page.locator('div.categoryHeader.collapsed')
pupupTagsPopup = (): Locator => this.page.locator('.popup#TagsPopup')
issupeByName = (issueName: string): Locator => this.page.locator('a', { hasText: issueName })
issueNotExist = (issueName: string): Locator => this.page.locator('tr', { hasText: issueName })
filterRowExists = (issueName: string): Locator => this.page.locator('div.row span', { hasText: issueName })
issueListGrid = (): Locator => this.page.locator('div.listGrid')
@ -491,7 +490,7 @@ export class IssuesPage extends CommonTrackerPage {
}
async openIssueByName (issueName: string): Promise<void> {
await this.issupeByName(issueName).click()
await this.issueByName(issueName).click()
}
async checkIssueNotExist (issueName: string): Promise<void> {
@ -519,7 +518,7 @@ export class IssuesPage extends CommonTrackerPage {
}
async doActionOnIssue (issueName: string, action: string): Promise<void> {
await this.issupeByName(issueName).click({ button: 'right' })
await this.issueByName(issueName).click({ button: 'right' })
await this.selectFromDropdown(this.page, action)
}
@ -543,8 +542,10 @@ export class IssuesPage extends CommonTrackerPage {
await this.issueAnchorById(issueId).click()
}
async checkIssuesCount (issueName: string, count: number): Promise<void> {
await expect(this.issueAnchorByName(issueName)).toHaveCount(count)
async checkIssuesCount (issueName: string, count: number, timeout?: number): Promise<void> {
await expect(this.issueAnchorByName(issueName)).toHaveCount(count, {
timeout: timeout !== undefined ? timeout * 1000 : undefined
})
}
async selectTemplate (templateName: string): Promise<void> {

View File

@ -52,10 +52,13 @@ test.describe('Tracker duplicate issue tests', () => {
await trackerNavigationMenuPage.openTemplateForProject('Default')
await trackerNavigationMenuPage.openIssuesForProject('Default')
await issuesPage.searchIssueByName(secondIssue.title)
await issuesPage.checkIssuesCount(secondIssue.title, 2, 30)
const secondIssueId = await issuesPage.getIssueId(secondIssue.title, 0)
expect(firstIssueId).not.toEqual(secondIssueId)
await issuesPage.checkIssuesCount(firstIssue.title, 2)
await issuesPage.checkIssuesCount(firstIssue.title, 2, 30)
await test.step('Update the first issue title', async () => {
const newIssueTitle = `Duplicate Update issue-${generateId()}`