diff --git a/server/account/src/collections/postgres/postgres.ts b/server/account/src/collections/postgres/postgres.ts index b5967740d6..ae629c1cee 100644 --- a/server/account/src/collections/postgres/postgres.ts +++ b/server/account/src/collections/postgres/postgres.ts @@ -723,6 +723,8 @@ export class PostgresAccountDB implements AccountDB { return ( err.code === '40001' || // Retry transaction err.code === '55P03' || // Lock not available + err.code === 'CONNECTION_CLOSED' || // This error is thrown if the connection was closed without an error. + err.code === 'CONNECTION_DESTROYED' || // This error is thrown for any queries that were pending when the timeout to sql.end({ timeout: X }) was reached. msg.includes('RETRY_SERIALIZABLE') ) } diff --git a/server/account/src/utils.ts b/server/account/src/utils.ts index 8dc6639862..0c8e2070b8 100644 --- a/server/account/src/utils.ts +++ b/server/account/src/utils.ts @@ -65,7 +65,11 @@ import { isAdminEmail } from './admin' export const GUEST_ACCOUNT = 'b6996120-416f-49cd-841e-e4a5d2e49c9b' -export async function getAccountDB (uri: string, dbNs?: string): Promise<[AccountDB, () => void]> { +export async function getAccountDB ( + uri: string, + dbNs?: string, + appName: string = 'account' +): Promise<[AccountDB, () => void]> { const isMongo = uri.startsWith('mongodb://') if (isMongo) { @@ -84,7 +88,7 @@ export async function getAccountDB (uri: string, dbNs?: string): Promise<[Accoun } else { setDBExtraOptions({ connection: { - application_name: 'account' + application_name: appName } }) const client = getDBClient(sharedPipelineContextVars, uri) diff --git a/server/postgres/src/storage.ts b/server/postgres/src/storage.ts index fc91417406..bbc538134c 100644 --- a/server/postgres/src/storage.ts +++ b/server/postgres/src/storage.ts @@ -223,7 +223,7 @@ class ConnectionMgr { await client.execute('ROLLBACK;') console.error({ message: 'failed to process tx', error: err.message, cause: err }) - if (err.code !== '40001' || tries === maxTries) { + if (!['40001', 'CONNECTION_CLOSED'].includes(err.code) || tries === maxTries) { return err } else { console.log('Transaction failed. Retrying.') @@ -267,7 +267,7 @@ class ConnectionMgr { return { result: await fn(client) } } catch (err: any) { console.error({ message: 'failed to process sql', error: err.message, cause: err }) - if (err.code !== '40001' || tries === maxTries) { + if (!['40001', 'CONNECTION_CLOSED'].includes(err.code) || tries === maxTries) { return err } else { console.log('Read Transaction failed. Retrying.') diff --git a/server/tool/src/index.ts b/server/tool/src/index.ts index 3a9e00fea2..44c427c853 100644 --- a/server/tool/src/index.ts +++ b/server/tool/src/index.ts @@ -323,7 +323,7 @@ export async function upgradeModel ( modelDb, pipeline, async (value) => { - await progress(90 + (Math.min(value, 100) / 100) * 10) + await progress(10 + (Math.min(value, 100) / 100) * 10) }, wsIds.uuid ) diff --git a/server/workspace-service/src/service.ts b/server/workspace-service/src/service.ts index 9dff8b7def..43f5f5a547 100644 --- a/server/workspace-service/src/service.ts +++ b/server/workspace-service/src/service.ts @@ -158,7 +158,7 @@ export class WorkspaceWorker { ) break } catch (err: any) { - ctx.error('error', { err }) + ctx.error('error during handshake', { err }) } } @@ -187,6 +187,7 @@ export class WorkspaceWorker { } }) if (workspace == null) { + // no workspaces available, sleep before another attempt await this.doSleep(ctx, opt) } else { void this.exec(async () => { @@ -196,9 +197,11 @@ export class WorkspaceWorker { await this.doWorkspaceOperation(opContext, workspace, opt) } catch (err: any) { Analytics.handleError(err) - ctx.error('error', { err }) + opContext.error('error', { err }) } }) + // sleep for a little bit to avoid bombarding the account service, also add jitter to avoid simultaneous requests from multiple workspace services + await new Promise((resolve) => setTimeout(resolve, Math.random() * 400 + 200)) } } } @@ -286,7 +289,7 @@ export class WorkspaceWorker { await this.workspaceQueue.send(ws.uuid, [workspaceEvents.created()]) } catch (err: any) { - await opt.errorHandler(ws, err) + void opt.errorHandler(ws, err) logger.log('error', err) @@ -385,7 +388,7 @@ export class WorkspaceWorker { }) await this.workspaceQueue.send(ws.uuid, [workspaceEvents.upgraded()]) } catch (err: any) { - await opt.errorHandler(ws, err) + void opt.errorHandler(ws, err) logger.log('error', err) @@ -720,8 +723,10 @@ export class WorkspaceWorker { resolve() this.wakeup = this.defaultWakeup } - // sleep for 5 seconds for the next operation, or until a wakeup event - const sleepHandle = setTimeout(wakeup, opt.waitTimeout) + // sleep for N (5 by default) seconds for the next operation, or until a wakeup event + // add jitter to avoid simultaneous requests from multiple workspace services + const maxJitter = opt.waitTimeout * 0.2 + const sleepHandle = setTimeout(wakeup, opt.waitTimeout + Math.random() * maxJitter) this.wakeup = () => { clearTimeout(sleepHandle) diff --git a/server/workspace-service/src/ws-operations.ts b/server/workspace-service/src/ws-operations.ts index 2194254d38..05eb64da22 100644 --- a/server/workspace-service/src/ws-operations.ts +++ b/server/workspace-service/src/ws-operations.ts @@ -168,7 +168,8 @@ export async function createWorkspace ( await handleWsEvent?.('create-done', version, 100, '') } catch (err: any) { - await handleWsEvent?.('ping', version, 0, `Create failed: ${err.message}`) + void handleWsEvent?.('ping', version, 0, `Create failed: ${err.message}`) + throw err } finally { await pipeline.close() await storageAdapter.close() @@ -350,7 +351,7 @@ export async function upgradeWorkspaceWith ( await handleWsEvent?.('upgrade-done', version, 100, '') } catch (err: any) { ctx.error('upgrade-failed', { message: err.message }) - await handleWsEvent?.('ping', version, 0, `Upgrade failed: ${err.message}`) + void handleWsEvent?.('ping', version, 0, `Upgrade failed: ${err.message}`) throw err } finally { clearInterval(updateProgressHandle)