UBERF-8425: Fix account upgrade deadlocks (#9163)
Some checks are pending
CI / build (push) Waiting to run
CI / svelte-check (push) Blocked by required conditions
CI / formatting (push) Blocked by required conditions
CI / test (push) Blocked by required conditions
CI / uitest (push) Waiting to run
CI / uitest-pg (push) Waiting to run
CI / uitest-qms (push) Waiting to run
CI / uitest-workspaces (push) Waiting to run
CI / docker-build (push) Blocked by required conditions
CI / dist-build (push) Blocked by required conditions

* uberf-8425: fix account upgrade deadlocks
Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com>

* uberf-8425: fix unit tests
Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com>
This commit is contained in:
Alexey Zinoviev 2025-06-03 21:07:18 +04:00 committed by GitHub
parent c20116c631
commit 14310dfa6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 80 additions and 31 deletions

View File

@ -662,8 +662,7 @@ describe('PostgresAccountDB', () => {
AND (s.last_processing_time IS NULL OR s.last_processing_time < $1)
AND (w.region IS NULL OR w.region = '')
ORDER BY s.last_visit DESC
LIMIT 1
FOR UPDATE SKIP LOCKED`.replace(/\s+/g, ' ')
LIMIT 1`.replace(/\s+/g, ' ')
)
expect(mockClient.unsafe.mock.calls[0][1]).toEqual([NOW - processingTimeoutMs])
})
@ -721,8 +720,7 @@ describe('PostgresAccountDB', () => {
AND (s.last_processing_time IS NULL OR s.last_processing_time < $5)
AND (w.region IS NULL OR w.region = '')
ORDER BY s.last_visit DESC
LIMIT 1
FOR UPDATE SKIP LOCKED`
LIMIT 1`
.replace(/\s+/g, ' ')
.replace(/\(\s/g, '(')
.replace(/\s\)/g, ')')
@ -793,8 +791,7 @@ describe('PostgresAccountDB', () => {
AND (s.last_processing_time IS NULL OR s.last_processing_time < $5)
AND (w.region IS NULL OR w.region = '')
ORDER BY s.last_visit DESC
LIMIT 1
FOR UPDATE SKIP LOCKED`
LIMIT 1`
.replace(/\s+/g, ' ')
.replace(/\(\s/g, '(')
.replace(/\s\)/g, ')')
@ -883,8 +880,7 @@ describe('PostgresAccountDB', () => {
AND (s.last_processing_time IS NULL OR s.last_processing_time < $5)
AND (w.region IS NULL OR w.region = '')
ORDER BY s.last_visit DESC
LIMIT 1
FOR UPDATE SKIP LOCKED`
LIMIT 1`
.replace(/\s+/g, ' ')
.replace(/\(\s/g, '(')
.replace(/\s\)/g, ')')
@ -936,8 +932,7 @@ describe('PostgresAccountDB', () => {
AND (s.last_processing_time IS NULL OR s.last_processing_time < $1)
AND region = $2
ORDER BY s.last_visit DESC
LIMIT 1
FOR UPDATE SKIP LOCKED`
LIMIT 1`
.replace(/\s+/g, ' ')
.replace(/\(\s/g, '(')
.replace(/\s\)/g, ')')

View File

@ -565,13 +565,17 @@ describe('account utils', () => {
test('should handle PlatformError', async () => {
const errorStatus = new Status(Severity.ERROR, 'test-error' as any, {})
const mockMethod = jest.fn().mockRejectedValue(new PlatformError(errorStatus))
Object.defineProperty(mockMethod, 'name', { value: 'mockAccMethod' })
const wrappedMethod = wrap(mockMethod)
const request = { id: 'req1', params: [] }
const result = await wrappedMethod(mockCtx, mockDb, mockBranding, request, 'token')
expect(result).toEqual({ error: errorStatus })
expect(mockCtx.error).toHaveBeenCalledWith('error', { status: errorStatus })
expect(mockCtx.error).toHaveBeenCalledWith('Error while processing account method', {
status: errorStatus,
method: 'mockAccMethod'
})
})
test('should handle TokenError', async () => {
@ -589,15 +593,17 @@ describe('account utils', () => {
test('should handle internal server error', async () => {
const error = new Error('unexpected error')
const mockMethod = jest.fn().mockRejectedValue(error)
Object.defineProperty(mockMethod, 'name', { value: 'mockAccMethod' })
const wrappedMethod = wrap(mockMethod)
const request = { id: 'req1', params: [] }
const result = await wrappedMethod(mockCtx, mockDb, mockBranding, request, 'token')
expect(result.error.code).toBe(platform.status.InternalServerError)
expect(mockCtx.error).toHaveBeenCalledWith('error', {
expect(mockCtx.error).toHaveBeenCalledWith('Error while processing account method', {
status: expect.any(Status),
err: error
origErr: error,
method: 'mockAccMethod'
})
})

View File

@ -131,6 +131,10 @@ implements DbCollection<T> {
}
}
async exists (query: Query<T>): Promise<boolean> {
return (await this.findOne(query)) !== null
}
async find (query: Query<T>, sort?: Sort<T>, limit?: number): Promise<T[]> {
return await this.findCursor(getFilteredQuery(query), sort, limit).toArray()
}
@ -315,6 +319,10 @@ export class WorkspaceStatusMongoDbCollection implements DbCollection<WorkspaceS
return res
}
async exists (query: Query<WorkspaceStatus>): Promise<boolean> {
return await this.wsCollection.exists(this.toWsQuery(query))
}
async find (query: Query<WorkspaceStatus>, sort?: Sort<WorkspaceStatus>, limit?: number): Promise<WorkspaceStatus[]> {
return (await this.wsCollection.find(this.toWsQuery(query), this.toWsSort(sort), limit)).map((ws) => ({
...ws.status,

View File

@ -28,7 +28,8 @@ export function getMigrations (ns: string): [string, string][] {
getV8Migration(ns),
getV9Migration(ns),
getV10Migration1(ns),
getV10Migration2(ns)
getV10Migration2(ns),
getV11Migration(ns)
]
}
@ -362,3 +363,18 @@ function getV10Migration2 (ns: string): [string, string] {
`
]
}
function getV11Migration (ns: string): [string, string] {
return [
'account_db_v10_add_migrated_to_person',
`
CREATE TABLE IF NOT EXISTS ${ns}._pending_workspace_lock (
id INT8 DEFAULT 1 PRIMARY KEY,
CONSTRAINT single_row CHECK (id = 1)
);
INSERT INTO ${ns}._pending_workspace_lock (id) VALUES (1)
ON CONFLICT (id) DO NOTHING;
`
]
}

View File

@ -258,6 +258,15 @@ implements DbCollection<T> {
}
}
async exists (query: Query<T>, client?: Sql): Promise<boolean> {
const [whereClause, whereValues] = this.buildWhereClause(query)
const sql = `SELECT EXISTS (SELECT 1 FROM ${this.getTableName()} ${whereClause})`
const result = await this.unsafe(sql, whereValues, client)
return result[0]?.exists === true
}
async find (query: Query<T>, sort?: Sort<T>, limit?: number, client?: Sql): Promise<T[]> {
const sqlChunks: string[] = [this.buildSelectClause()]
const [whereClause, whereValues] = this.buildWhereClause(query)
@ -483,6 +492,7 @@ export class PostgresAccountDB implements AccountDB {
}
readonly wsMembersName = 'workspace_members'
readonly pendingWorkspaceLockName = '_pending_workspace_lock'
person: PostgresDbCollection<Person, 'uuid'>
account: AccountPostgresDbCollection
@ -546,6 +556,10 @@ export class PostgresAccountDB implements AccountDB {
return `${this.ns}.${this.wsMembersName}`
}
getPendingWorkspaceLockTableName (): string {
return `${this.ns}.${this.pendingWorkspaceLockName}`
}
async init (): Promise<void> {
await this._init()
@ -943,10 +957,9 @@ export class PostgresAccountDB implements AccountDB {
sqlChunks.push(`WHERE ${whereChunks.join(' AND ')}`)
sqlChunks.push('ORDER BY s.last_visit DESC')
sqlChunks.push('LIMIT 1')
// Note: SKIP LOCKED is supported starting from Postgres 9.5 and CockroachDB v22.2.1
sqlChunks.push('FOR UPDATE SKIP LOCKED')
return await this.withRetry(async (rTx) => {
await rTx`SELECT 1 FROM ${this.client(this.getPendingWorkspaceLockTableName())} WHERE id = 1 FOR UPDATE;`
// We must have all the conditions in the DB query and we cannot filter anything in the code
// because of possible concurrency between account services.
const res: any = await rTx.unsafe(sqlChunks.join(' '), values)

View File

@ -57,7 +57,6 @@ import {
getRolePower,
getSocialIdByKey,
getWorkspaceById,
getWorkspaceInfoWithStatusById,
getWorkspacesInfoWithStatusByIds,
verifyAllowedServices,
wrap,
@ -289,8 +288,8 @@ export async function updateWorkspaceInfo (
throw new PlatformError(new Status(Severity.ERROR, platform.status.Forbidden, {}))
}
const workspace = await getWorkspaceInfoWithStatusById(db, workspaceUuid)
if (workspace === null) {
const wsExists = await db.workspace.exists({ uuid: workspaceUuid })
if (!wsExists) {
throw new PlatformError(new Status(Severity.ERROR, platform.status.WorkspaceNotFound, { workspaceUuid }))
}
progress = Math.round(progress)
@ -298,17 +297,24 @@ export async function updateWorkspaceInfo (
const ts = Date.now()
const update: Partial<WorkspaceStatus> = {}
const wsUpdate: Partial<Workspace> = {}
const query: Query<WorkspaceStatus> = { workspaceUuid: workspace.uuid }
const query: Query<WorkspaceStatus> = { workspaceUuid }
// Only read status for certain events because it is not needed for others
// and it interferes with status updates when concurrency is high
let wsStatus: WorkspaceStatus | null = null
if (['create-started', 'upgrade-started', 'migrate-clean-done'].includes(event)) {
wsStatus = await db.workspaceStatus.findOne({ workspaceUuid })
}
switch (event) {
case 'create-started':
update.mode = 'creating'
if (workspace.status.mode !== 'creating') {
if (wsStatus != null && wsStatus.mode !== 'creating') {
update.processingAttempts = 0
}
update.processingProgress = progress
break
case 'upgrade-started':
if (workspace.status.mode !== 'upgrading') {
if (wsStatus != null && wsStatus.mode !== 'upgrading') {
update.processingAttempts = 0
}
update.mode = 'upgrading'
@ -350,7 +356,7 @@ export async function updateWorkspaceInfo (
update.processingProgress = progress
break
case 'migrate-clean-done':
wsUpdate.region = workspace.status.targetRegion ?? ''
wsUpdate.region = wsStatus?.targetRegion ?? ''
update.mode = 'pending-restore'
update.processingProgress = progress
update.lastProcessingTime = Date.now() - processingTimeoutMs // To not wait for next step
@ -400,7 +406,7 @@ export async function updateWorkspaceInfo (
})
if (Object.keys(wsUpdate).length !== 0) {
await db.workspace.updateOne({ uuid: workspace.uuid }, wsUpdate)
await db.workspace.updateOne({ uuid: workspaceUuid }, wsUpdate)
}
}

View File

@ -222,6 +222,7 @@ export interface AccountDB {
}
export interface DbCollection<T> {
exists: (query: Query<T>) => Promise<boolean>
find: (query: Query<T>, sort?: Sort<T>, limit?: number) => Promise<T[]>
findOne: (query: Query<T>) => Promise<T | null>
insertOne: (data: Partial<T>) => Promise<any>

View File

@ -143,7 +143,7 @@ export function wrap (
): Promise<any> {
return await accountMethod(ctx, db, branding, token, { ...request.params }, meta)
.then((result) => ({ id: request.id, result }))
.catch((err) => {
.catch((err: Error) => {
const status =
err instanceof PlatformError
? err.status
@ -158,9 +158,9 @@ export function wrap (
if (status.code === platform.status.InternalServerError) {
Analytics.handleError(err)
ctx.error('error', { status, err })
ctx.error('Error while processing account method', { method: accountMethod.name, status, origErr: err })
} else {
ctx.error('error', { status })
ctx.error('Error while processing account method', { method: accountMethod.name, status })
}
return {

View File

@ -188,7 +188,7 @@ export class WorkspaceWorker {
try {
return await accountClient.getPendingWorkspace(this.region, this.version, this.operation)
} catch (err) {
ctx.error('Error getting pending workspace:', { err })
ctx.error('Error getting pending workspace:', { origErr: err })
}
})
if (workspace == null) {
@ -202,7 +202,7 @@ export class WorkspaceWorker {
await this.doWorkspaceOperation(opContext, workspace, opt)
} catch (err: any) {
Analytics.handleError(err)
opContext.error('error', { err })
opContext.error('Error while performing workspace operation', { origErr: err })
}
})
// sleep for a little bit to avoid bombarding the account service, also add jitter to avoid simultaneous requests from multiple workspace services

View File

@ -61,7 +61,9 @@ export async function createWorkspace (
}
const createPingHandle = setInterval(() => {
void handleWsEvent?.('ping', version, 0)
handleWsEvent?.('ping', version, 0).catch((err: any) => {
ctx.error('Error while updating progress', { origErr: err })
})
}, 5000)
try {
@ -309,7 +311,9 @@ export async function upgradeWorkspaceWith (
let progress = 0
const updateProgressHandle = setInterval(() => {
void handleWsEvent?.('progress', version, progress)
handleWsEvent?.('progress', version, progress).catch((err: any) => {
ctx.error('Error while updating progress', { origErr: err })
})
}, 5000)
try {