diff --git a/server/account/src/__tests__/postgres.test.ts b/server/account/src/__tests__/postgres.test.ts index 383d19298e..1512038290 100644 --- a/server/account/src/__tests__/postgres.test.ts +++ b/server/account/src/__tests__/postgres.test.ts @@ -474,10 +474,13 @@ describe('PostgresAccountDB', () => { expect(mockClient.begin).toHaveBeenCalled() expect(mockClient).toHaveBeenCalledWith( - 'global_account' // First call with schema name + 'global_account' // Verify schema name + ) + expect(mockClient.mock.calls[3][0].map((s: string) => s.replace(/\s+/g, ' ')).join('')).toBe( + ' INSERT INTO ._account_applied_migrations (identifier, ddl, last_processed_at) VALUES (, , NOW()) ON CONFLICT (identifier) DO NOTHING ' ) expect(mockClient).toHaveBeenCalledWith( - ['INSERT INTO ', '._account_applied_migrations (identifier, ddl) VALUES (', ', ', ') ON CONFLICT DO NOTHING'], + expect.anything(), expect.anything(), 'test_migration', 'CREATE TABLE test' diff --git a/server/account/src/collections/postgres.ts b/server/account/src/collections/postgres.ts index b2c07d1f33..9c3bd29750 100644 --- a/server/account/src/collections/postgres.ts +++ b/server/account/src/collections/postgres.ts @@ -488,15 +488,102 @@ export class PostgresAccountDB implements AccountDB { } async migrate (name: string, ddl: string): Promise { - await this.client.begin(async (client) => { - const res = - await client`INSERT INTO ${this.client(this.ns)}._account_applied_migrations (identifier, ddl) VALUES (${name}, ${ddl}) ON CONFLICT DO NOTHING` + const staleTimeoutMs = 30000 + const retryIntervalMs = 5000 + let migrationComplete = false + let updateInterval: NodeJS.Timeout | null = null + let executed = false - if (res.count === 1) { - console.log(`Applying migration: ${name}`) - await client.unsafe(ddl) + const executeMigration = async (client: Sql): Promise => { + updateInterval = setInterval(() => { + this.client` + UPDATE ${this.client(this.ns)}._account_applied_migrations + SET last_processed_at = NOW() + WHERE identifier = ${name} AND applied_at IS NULL + `.catch((err) => { + console.error(`Failed to update last_processed_at for migration ${name}:`, err) + }) + }, 5000) + + await client.unsafe(ddl) + executed = true + } + + try { + while (!migrationComplete) { + try { + executed = false + await this.client.begin(async (client) => { + // Only locks if row exists and is not already locked + const existing = await client` + SELECT identifier, applied_at, last_processed_at + FROM ${this.client(this.ns)}._account_applied_migrations + WHERE identifier = ${name} + FOR UPDATE NOWAIT + ` + + if (existing.length > 0) { + if (existing[0].applied_at !== null) { + // Already completed + migrationComplete = true + } else if ( + existing[0].last_processed_at === null || + Date.now() - new Date(existing[0].last_processed_at).getTime() > staleTimeoutMs + ) { + // Take over the stale migration + await client` + UPDATE ${this.client(this.ns)}._account_applied_migrations + SET last_processed_at = NOW() + WHERE identifier = ${name} + ` + + await executeMigration(client) + } + } else { + const res = await client` + INSERT INTO ${this.client(this.ns)}._account_applied_migrations + (identifier, ddl, last_processed_at) + VALUES (${name}, ${ddl}, NOW()) + ON CONFLICT (identifier) DO NOTHING + ` + + if (res.count === 1) { + // Successfully inserted + await executeMigration(client) + } + // If insert failed (count === 0), another worker got it first, we'll retry the loop + } + }) + + if (executed) { + await this.client` + UPDATE ${this.client(this.ns)}._account_applied_migrations + SET applied_at = NOW() + WHERE identifier = ${name} + ` + migrationComplete = true + } + } catch (err: any) { + if (['55P03', '40001'].includes(err.code)) { + // newLockNotAvailableError, WriteTooOldError + } else { + console.error(`Error in migration ${name}: ${err.code} - ${err.message}`) + } + + if (updateInterval !== null) { + clearInterval(updateInterval) + } + } + + if (!migrationComplete) { + await new Promise((resolve) => setTimeout(resolve, retryIntervalMs)) + } } - }) + } finally { + if (updateInterval !== null) { + clearInterval(updateInterval) + } + } } async _init (): Promise { @@ -507,10 +594,39 @@ export class PostgresAccountDB implements AccountDB { CREATE TABLE IF NOT EXISTS ${this.ns}._account_applied_migrations ( identifier VARCHAR(255) NOT NULL PRIMARY KEY , ddl TEXT NOT NULL - , applied_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() + , applied_at TIMESTAMP WITH TIME ZONE + , last_processed_at TIMESTAMP WITH TIME ZONE ); + + ALTER TABLE ${this.ns}._account_applied_migrations + ADD COLUMN IF NOT EXISTS last_processed_at TIMESTAMP WITH TIME ZONE; ` ) + + const constraintsExist = await this.client` + SELECT 1 + FROM information_schema.columns + WHERE table_schema = ${this.ns} + AND table_name = '_account_applied_migrations' + AND column_name = 'applied_at' + AND (column_default IS NOT NULL OR is_nullable = 'NO') + ` + + if (constraintsExist.length > 0) { + try { + await this.client.unsafe( + ` + ALTER TABLE ${this.ns}._account_applied_migrations + ALTER COLUMN applied_at DROP DEFAULT; + + ALTER TABLE ${this.ns}._account_applied_migrations + ALTER COLUMN applied_at DROP NOT NULL; + ` + ) + } catch (err) { + // Ignore errors since they likely mean constraints were already removed by another concurrent migration + } + } } async createWorkspace (data: WorkspaceData, status: WorkspaceStatusData): Promise {