uberf-8425: improved pg/acc/ws error handling

Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com>

Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com>
This commit is contained in:
Alexey Zinoviev 2025-05-31 00:07:51 +04:00
parent 8a8c4906fa
commit 6498d90afd
No known key found for this signature in database
6 changed files with 25 additions and 13 deletions

View File

@ -723,6 +723,8 @@ export class PostgresAccountDB implements AccountDB {
return ( return (
err.code === '40001' || // Retry transaction err.code === '40001' || // Retry transaction
err.code === '55P03' || // Lock not available err.code === '55P03' || // Lock not available
err.code === 'CONNECTION_CLOSED' || // This error is thrown if the connection was closed without an error.
err.code === 'CONNECTION_DESTROYED' || // This error is thrown for any queries that were pending when the timeout to sql.end({ timeout: X }) was reached.
msg.includes('RETRY_SERIALIZABLE') msg.includes('RETRY_SERIALIZABLE')
) )
} }

View File

@ -65,7 +65,11 @@ import { isAdminEmail } from './admin'
export const GUEST_ACCOUNT = 'b6996120-416f-49cd-841e-e4a5d2e49c9b' export const GUEST_ACCOUNT = 'b6996120-416f-49cd-841e-e4a5d2e49c9b'
export async function getAccountDB (uri: string, dbNs?: string): Promise<[AccountDB, () => void]> { export async function getAccountDB (
uri: string,
dbNs?: string,
appName: string = 'account'
): Promise<[AccountDB, () => void]> {
const isMongo = uri.startsWith('mongodb://') const isMongo = uri.startsWith('mongodb://')
if (isMongo) { if (isMongo) {
@ -84,7 +88,7 @@ export async function getAccountDB (uri: string, dbNs?: string): Promise<[Accoun
} else { } else {
setDBExtraOptions({ setDBExtraOptions({
connection: { connection: {
application_name: 'account' application_name: appName
} }
}) })
const client = getDBClient(sharedPipelineContextVars, uri) const client = getDBClient(sharedPipelineContextVars, uri)

View File

@ -223,7 +223,7 @@ class ConnectionMgr {
await client.execute('ROLLBACK;') await client.execute('ROLLBACK;')
console.error({ message: 'failed to process tx', error: err.message, cause: err }) console.error({ message: 'failed to process tx', error: err.message, cause: err })
if (err.code !== '40001' || tries === maxTries) { if (!['40001', 'CONNECTION_CLOSED'].includes(err.code) || tries === maxTries) {
return err return err
} else { } else {
console.log('Transaction failed. Retrying.') console.log('Transaction failed. Retrying.')
@ -267,7 +267,7 @@ class ConnectionMgr {
return { result: await fn(client) } return { result: await fn(client) }
} catch (err: any) { } catch (err: any) {
console.error({ message: 'failed to process sql', error: err.message, cause: err }) console.error({ message: 'failed to process sql', error: err.message, cause: err })
if (err.code !== '40001' || tries === maxTries) { if (!['40001', 'CONNECTION_CLOSED'].includes(err.code) || tries === maxTries) {
return err return err
} else { } else {
console.log('Read Transaction failed. Retrying.') console.log('Read Transaction failed. Retrying.')

View File

@ -323,7 +323,7 @@ export async function upgradeModel (
modelDb, modelDb,
pipeline, pipeline,
async (value) => { async (value) => {
await progress(90 + (Math.min(value, 100) / 100) * 10) await progress(10 + (Math.min(value, 100) / 100) * 10)
}, },
wsIds.uuid wsIds.uuid
) )

View File

@ -158,7 +158,7 @@ export class WorkspaceWorker {
) )
break break
} catch (err: any) { } catch (err: any) {
ctx.error('error', { err }) ctx.error('error during handshake', { err })
} }
} }
@ -187,6 +187,7 @@ export class WorkspaceWorker {
} }
}) })
if (workspace == null) { if (workspace == null) {
// no workspaces available, sleep before another attempt
await this.doSleep(ctx, opt) await this.doSleep(ctx, opt)
} else { } else {
void this.exec(async () => { void this.exec(async () => {
@ -196,9 +197,11 @@ export class WorkspaceWorker {
await this.doWorkspaceOperation(opContext, workspace, opt) await this.doWorkspaceOperation(opContext, workspace, opt)
} catch (err: any) { } catch (err: any) {
Analytics.handleError(err) Analytics.handleError(err)
ctx.error('error', { err }) opContext.error('error', { err })
} }
}) })
// sleep for a little bit to avoid bombarding the account service, also add jitter to avoid simultaneous requests from multiple workspace services
await new Promise((resolve) => setTimeout(resolve, Math.random() * 400 + 200))
} }
} }
} }
@ -286,7 +289,7 @@ export class WorkspaceWorker {
await this.workspaceQueue.send(ws.uuid, [workspaceEvents.created()]) await this.workspaceQueue.send(ws.uuid, [workspaceEvents.created()])
} catch (err: any) { } catch (err: any) {
await opt.errorHandler(ws, err) void opt.errorHandler(ws, err)
logger.log('error', err) logger.log('error', err)
@ -385,7 +388,7 @@ export class WorkspaceWorker {
}) })
await this.workspaceQueue.send(ws.uuid, [workspaceEvents.upgraded()]) await this.workspaceQueue.send(ws.uuid, [workspaceEvents.upgraded()])
} catch (err: any) { } catch (err: any) {
await opt.errorHandler(ws, err) void opt.errorHandler(ws, err)
logger.log('error', err) logger.log('error', err)
@ -720,8 +723,10 @@ export class WorkspaceWorker {
resolve() resolve()
this.wakeup = this.defaultWakeup this.wakeup = this.defaultWakeup
} }
// sleep for 5 seconds for the next operation, or until a wakeup event // sleep for N (5 by default) seconds for the next operation, or until a wakeup event
const sleepHandle = setTimeout(wakeup, opt.waitTimeout) // add jitter to avoid simultaneous requests from multiple workspace services
const maxJitter = opt.waitTimeout * 0.2
const sleepHandle = setTimeout(wakeup, opt.waitTimeout + Math.random() * maxJitter)
this.wakeup = () => { this.wakeup = () => {
clearTimeout(sleepHandle) clearTimeout(sleepHandle)

View File

@ -168,7 +168,8 @@ export async function createWorkspace (
await handleWsEvent?.('create-done', version, 100, '') await handleWsEvent?.('create-done', version, 100, '')
} catch (err: any) { } catch (err: any) {
await handleWsEvent?.('ping', version, 0, `Create failed: ${err.message}`) void handleWsEvent?.('ping', version, 0, `Create failed: ${err.message}`)
throw err
} finally { } finally {
await pipeline.close() await pipeline.close()
await storageAdapter.close() await storageAdapter.close()
@ -350,7 +351,7 @@ export async function upgradeWorkspaceWith (
await handleWsEvent?.('upgrade-done', version, 100, '') await handleWsEvent?.('upgrade-done', version, 100, '')
} catch (err: any) { } catch (err: any) {
ctx.error('upgrade-failed', { message: err.message }) ctx.error('upgrade-failed', { message: err.message })
await handleWsEvent?.('ping', version, 0, `Upgrade failed: ${err.message}`) void handleWsEvent?.('ping', version, 0, `Upgrade failed: ${err.message}`)
throw err throw err
} finally { } finally {
clearInterval(updateProgressHandle) clearInterval(updateProgressHandle)