diff --git a/server/server/src/sessionManager.ts b/server/server/src/sessionManager.ts index 190941e996..5dbd248880 100644 --- a/server/server/src/sessionManager.ts +++ b/server/server/src/sessionManager.ts @@ -1008,21 +1008,7 @@ export class TSessionManager implements SessionManager { closeS(s[1].session, s[1].socket) }) - const closePipeline = async (): Promise => { - try { - await this.ctx.with('close-pipeline', {}, async () => { - await (await workspace.pipeline).close() - }) - } catch (err: any) { - Analytics.handleError(err) - this.ctx.error('close-pipeline-error', { error: err }) - } - } - await this.ctx.with('closing', {}, async () => { - const to = timeoutPromise(120000) - await Promise.race([closePipeline(), to.promise]) - to.cancelHandle() - }) + await closeWorkspace(this.ctx, workspace) if (LOGGING_ENABLED) { this.ctx.warn('Workspace closed...', { workspace: workspace.id, wsId, wsName: workspace.workspaceName }) } @@ -1059,13 +1045,7 @@ export class TSessionManager implements SessionManager { } try { if (workspace.sessions.size === 0) { - const pl = await workspace.pipeline - let to = timeoutPromise(60000) - await Promise.race([pl, to.promise]) - to.cancelHandle() - to = timeoutPromise(60000) - await Promise.race([pl.close(), to]) - to.cancelHandle() + await closeWorkspace(this.ctx, workspace) if (this.workspaces.get(wsUuid)?.id === wsUID) { this.workspaces.delete(wsUuid) @@ -1463,3 +1443,33 @@ export function startSessionManager ( sessionManager: sessions } } + +async function closeWorkspace (ctx: MeasureContext, workspace: Workspace): Promise { + const closePipeline = async (): Promise => { + try { + await ctx.with('close-pipeline', {}, async () => { + await (await workspace.pipeline).close() + }) + } catch (err: any) { + Analytics.handleError(err) + ctx.error('close-pipeline-error', { error: err }) + } + } + + const closeCommunicationApi = async (): Promise => { + try { + await ctx.with('close-communication-api', {}, async () => { + await (await workspace.communicationApi).close() + }) + } catch (err: any) { + Analytics.handleError(err) + ctx.error('close-pipeline-error', { error: err }) + } + } + await ctx.with('closing', {}, async () => { + const to = timeoutPromise(120000) + const closePromises = [closePipeline(), closeCommunicationApi()] + await Promise.race([Promise.all(closePromises), to.promise]) + to.cancelHandle() + }) +}