UBERF-9559: Make CR accounts migrations concurrency safe (#8821)
Some checks are pending
CI / uitest (push) Waiting to run
CI / dist-build (push) Blocked by required conditions
CI / uitest-pg (push) Waiting to run
CI / uitest-qms (push) Waiting to run
CI / uitest-workspaces (push) Waiting to run
CI / docker-build (push) Blocked by required conditions
CI / build (push) Waiting to run
CI / svelte-check (push) Blocked by required conditions
CI / formatting (push) Blocked by required conditions
CI / test (push) Blocked by required conditions

* uberf-9559: make cr accounts migrations concurrency safe
Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com>

* uberf-9559: fix test
Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com>
This commit is contained in:
Alexey Zinoviev 2025-05-03 20:30:39 +04:00 committed by GitHub
parent 0eac3cde01
commit c42a94f697
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 129 additions and 10 deletions

View File

@ -474,10 +474,13 @@ describe('PostgresAccountDB', () => {
expect(mockClient.begin).toHaveBeenCalled()
expect(mockClient).toHaveBeenCalledWith(
'global_account' // First call with schema name
'global_account' // Verify schema name
)
expect(mockClient.mock.calls[3][0].map((s: string) => s.replace(/\s+/g, ' ')).join('')).toBe(
' INSERT INTO ._account_applied_migrations (identifier, ddl, last_processed_at) VALUES (, , NOW()) ON CONFLICT (identifier) DO NOTHING '
)
expect(mockClient).toHaveBeenCalledWith(
['INSERT INTO ', '._account_applied_migrations (identifier, ddl) VALUES (', ', ', ') ON CONFLICT DO NOTHING'],
expect.anything(),
expect.anything(),
'test_migration',
'CREATE TABLE test'

View File

@ -488,15 +488,102 @@ export class PostgresAccountDB implements AccountDB {
}
async migrate (name: string, ddl: string): Promise<void> {
await this.client.begin(async (client) => {
const res =
await client`INSERT INTO ${this.client(this.ns)}._account_applied_migrations (identifier, ddl) VALUES (${name}, ${ddl}) ON CONFLICT DO NOTHING`
const staleTimeoutMs = 30000
const retryIntervalMs = 5000
let migrationComplete = false
let updateInterval: NodeJS.Timeout | null = null
let executed = false
if (res.count === 1) {
console.log(`Applying migration: ${name}`)
await client.unsafe(ddl)
const executeMigration = async (client: Sql): Promise<void> => {
updateInterval = setInterval(() => {
this.client`
UPDATE ${this.client(this.ns)}._account_applied_migrations
SET last_processed_at = NOW()
WHERE identifier = ${name} AND applied_at IS NULL
`.catch((err) => {
console.error(`Failed to update last_processed_at for migration ${name}:`, err)
})
}, 5000)
await client.unsafe(ddl)
executed = true
}
try {
while (!migrationComplete) {
try {
executed = false
await this.client.begin(async (client) => {
// Only locks if row exists and is not already locked
const existing = await client`
SELECT identifier, applied_at, last_processed_at
FROM ${this.client(this.ns)}._account_applied_migrations
WHERE identifier = ${name}
FOR UPDATE NOWAIT
`
if (existing.length > 0) {
if (existing[0].applied_at !== null) {
// Already completed
migrationComplete = true
} else if (
existing[0].last_processed_at === null ||
Date.now() - new Date(existing[0].last_processed_at).getTime() > staleTimeoutMs
) {
// Take over the stale migration
await client`
UPDATE ${this.client(this.ns)}._account_applied_migrations
SET last_processed_at = NOW()
WHERE identifier = ${name}
`
await executeMigration(client)
}
} else {
const res = await client`
INSERT INTO ${this.client(this.ns)}._account_applied_migrations
(identifier, ddl, last_processed_at)
VALUES (${name}, ${ddl}, NOW())
ON CONFLICT (identifier) DO NOTHING
`
if (res.count === 1) {
// Successfully inserted
await executeMigration(client)
}
// If insert failed (count === 0), another worker got it first, we'll retry the loop
}
})
if (executed) {
await this.client`
UPDATE ${this.client(this.ns)}._account_applied_migrations
SET applied_at = NOW()
WHERE identifier = ${name}
`
migrationComplete = true
}
} catch (err: any) {
if (['55P03', '40001'].includes(err.code)) {
// newLockNotAvailableError, WriteTooOldError
} else {
console.error(`Error in migration ${name}: ${err.code} - ${err.message}`)
}
if (updateInterval !== null) {
clearInterval(updateInterval)
}
}
if (!migrationComplete) {
await new Promise((resolve) => setTimeout(resolve, retryIntervalMs))
}
}
})
} finally {
if (updateInterval !== null) {
clearInterval(updateInterval)
}
}
}
async _init (): Promise<void> {
@ -507,10 +594,39 @@ export class PostgresAccountDB implements AccountDB {
CREATE TABLE IF NOT EXISTS ${this.ns}._account_applied_migrations (
identifier VARCHAR(255) NOT NULL PRIMARY KEY
, ddl TEXT NOT NULL
, applied_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
, applied_at TIMESTAMP WITH TIME ZONE
, last_processed_at TIMESTAMP WITH TIME ZONE
);
ALTER TABLE ${this.ns}._account_applied_migrations
ADD COLUMN IF NOT EXISTS last_processed_at TIMESTAMP WITH TIME ZONE;
`
)
const constraintsExist = await this.client`
SELECT 1
FROM information_schema.columns
WHERE table_schema = ${this.ns}
AND table_name = '_account_applied_migrations'
AND column_name = 'applied_at'
AND (column_default IS NOT NULL OR is_nullable = 'NO')
`
if (constraintsExist.length > 0) {
try {
await this.client.unsafe(
`
ALTER TABLE ${this.ns}._account_applied_migrations
ALTER COLUMN applied_at DROP DEFAULT;
ALTER TABLE ${this.ns}._account_applied_migrations
ALTER COLUMN applied_at DROP NOT NULL;
`
)
} catch (err) {
// Ignore errors since they likely mean constraints were already removed by another concurrent migration
}
}
}
async createWorkspace (data: WorkspaceData, status: WorkspaceStatusData): Promise<WorkspaceUuid> {