diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index ee0fca0054..83a2dc862c 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -111,6 +111,7 @@ services: - ACCOUNTS_URL=http://host.docker.internal:3000 - BRANDING_PATH=/var/cfg/branding.json - NOTIFY_INBOX_ONLY=true + # - PARALLEL=2 # - INIT_SCRIPT_URL=https://raw.githubusercontent.com/hcengineering/init/main/script.yaml # - INIT_WORKSPACE=onboarding restart: unless-stopped diff --git a/server/workspace-service/src/service.ts b/server/workspace-service/src/service.ts index 1ce0a6c3c3..dcad505b67 100644 --- a/server/workspace-service/src/service.ts +++ b/server/workspace-service/src/service.ts @@ -14,7 +14,6 @@ // import { type BrandingMap, - RateLimiter, systemAccountEmail, type BaseWorkspaceInfo, type Data, @@ -43,7 +42,9 @@ export interface WorkspaceOptions { } export class WorkspaceWorker { - rateLimit: RateLimiter + runningTasks: number = 0 + resolveBusy: (() => void) | null = null + constructor ( readonly version: Data, readonly txes: Tx[], @@ -52,8 +53,20 @@ export class WorkspaceWorker { readonly limit: number, readonly operation: 'create' | 'upgrade' | 'all', readonly brandings: BrandingMap - ) { - this.rateLimit = new RateLimiter(limit) + ) {} + + hasAvailableThread (): boolean { + return this.runningTasks < this.limit + } + + async waitForAvailableThread (): Promise { + if (this.hasAvailableThread()) { + return + } + + await new Promise((resolve) => { + this.resolveBusy = resolve + }) } // Note: not gonna use it for now @@ -69,6 +82,8 @@ export class WorkspaceWorker { await workerHandshake(token, this.region, this.version, this.operation) while (true) { + await this.waitForAvailableThread() + const workspace = await ctx.with('get-pending-workspace', {}, async (ctx) => { try { return await getPendingWorkspace(token, this.region, this.version, this.operation) @@ -79,13 +94,33 @@ export class WorkspaceWorker { if (workspace === undefined) { await this.doSleep(ctx, opt) } else { - await this.rateLimit.exec(async () => { - await this.doWorkspaceOperation(ctx, workspace, opt) + void this.exec(async () => { + await this.doWorkspaceOperation( + ctx.newChild('workspaceOperation', { + workspace: workspace.workspace, + workspaceName: workspace.workspaceName + }), + workspace, + opt + ) }) } } } + async exec (op: () => Promise): Promise { + this.runningTasks++ + + await op().finally(() => { + this.runningTasks-- + + if (this.resolveBusy !== null) { + this.resolveBusy() + this.resolveBusy = null + } + }) + } + private async _createWorkspace (ctx: MeasureContext, ws: BaseWorkspaceInfo, opt: WorkspaceOptions): Promise { const t = Date.now() diff --git a/server/workspace-service/src/ws-operations.ts b/server/workspace-service/src/ws-operations.ts index 4c1e9de6c7..ce1f5856e0 100644 --- a/server/workspace-service/src/ws-operations.ts +++ b/server/workspace-service/src/ws-operations.ts @@ -197,7 +197,9 @@ export async function createWorkspace ( await handleWsEvent?.('progress', version, 20 + Math.round((Math.min(value, 100) / 100) * 10)) }) + ctx.info('Starting init script if any') await initializeWorkspace(ctx, branding, wsUrl, storageAdapter, client, ctxModellogger, async (value) => { + ctx.info('Init script progress', { value }) await handleWsEvent?.('progress', version, 30 + Math.round((Math.min(value, 100) / 100) * 70)) })