mirror of
https://github.com/hcengineering/platform.git
synced 2025-06-02 13:52:40 +00:00
UBERF-8425: more adjustments for migration scripts and tools (#9099)
Some checks are pending
CI / build (push) Waiting to run
CI / svelte-check (push) Blocked by required conditions
CI / formatting (push) Blocked by required conditions
CI / test (push) Blocked by required conditions
CI / uitest (push) Waiting to run
CI / uitest-pg (push) Waiting to run
CI / uitest-qms (push) Waiting to run
CI / uitest-workspaces (push) Waiting to run
CI / docker-build (push) Blocked by required conditions
CI / dist-build (push) Blocked by required conditions
Some checks are pending
CI / build (push) Waiting to run
CI / svelte-check (push) Blocked by required conditions
CI / formatting (push) Blocked by required conditions
CI / test (push) Blocked by required conditions
CI / uitest (push) Waiting to run
CI / uitest-pg (push) Waiting to run
CI / uitest-qms (push) Waiting to run
CI / uitest-workspaces (push) Waiting to run
CI / docker-build (push) Blocked by required conditions
CI / dist-build (push) Blocked by required conditions
* uberf-8425: more adjustments for migration scripts and tools Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com> * uberf-8425: fix formatting Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com>
This commit is contained in:
parent
dfca0e6dd0
commit
bf39eedca8
@ -32,7 +32,8 @@ import {
|
||||
getDBClient,
|
||||
getDocFieldsByDomains,
|
||||
retryTxn,
|
||||
translateDomain
|
||||
translateDomain,
|
||||
setDBExtraOptions
|
||||
} from '@hcengineering/postgres'
|
||||
import { type DBDoc } from '@hcengineering/postgres/types/utils'
|
||||
import { getTransactorEndpoint } from '@hcengineering/server-client'
|
||||
@ -484,117 +485,212 @@ export async function moveAccountDbFromMongoToPG (
|
||||
ctx.info('Account database migration completed')
|
||||
}
|
||||
|
||||
export async function migrateCreatedModifiedBy (ctx: MeasureMetricsContext, dbUrl: string): Promise<void> {
|
||||
export async function migrateCreatedModifiedBy (
|
||||
ctx: MeasureMetricsContext,
|
||||
dbUrl: string,
|
||||
domains?: string[],
|
||||
maxLifetimeSec?: number,
|
||||
batchSize?: number
|
||||
): Promise<void> {
|
||||
if (!dbUrl.startsWith('postgresql')) {
|
||||
throw new Error('Only CockroachDB is supported')
|
||||
}
|
||||
|
||||
const pg = getDBClient(sharedPipelineContextVars, dbUrl)
|
||||
const pgClient = await pg.getClient()
|
||||
try {
|
||||
ctx.info('Creating account to person id mapping table...')
|
||||
// Create schema
|
||||
await pgClient`CREATE SCHEMA IF NOT EXISTS temp_data`
|
||||
const MAX_RECONNECTS = 30
|
||||
|
||||
// Create mapping table
|
||||
await pgClient`
|
||||
CREATE TABLE IF NOT EXISTS temp_data.account_personid_mapping (
|
||||
old_account_id text,
|
||||
new_person_id text,
|
||||
INDEX idx_account_mapping_old_id (old_account_id)
|
||||
)
|
||||
`
|
||||
if (maxLifetimeSec !== undefined) {
|
||||
setDBExtraOptions({ max_lifetime: maxLifetimeSec })
|
||||
}
|
||||
|
||||
// Populate mapping table
|
||||
await pgClient`
|
||||
INSERT INTO temp_data.account_personid_mapping
|
||||
WITH account_data AS (
|
||||
SELECT
|
||||
tx."objectId" as old_account_id,
|
||||
CASE
|
||||
WHEN tx.data->'attributes'->>'email' LIKE 'github:%' THEN lower(tx.data->'attributes'->>'email')
|
||||
WHEN tx.data->'attributes'->>'email' LIKE 'openid:%' THEN 'oidc:' || lower(substring(tx.data->'attributes'->>'email' from 8))
|
||||
ELSE 'email:' || lower(tx.data->'attributes'->>'email')
|
||||
END as social_key
|
||||
FROM model_tx tx
|
||||
WHERE tx."_class" = 'core:class:TxCreateDoc'
|
||||
AND tx.data->>'objectClass' = 'contact:class:PersonAccount'
|
||||
AND tx.data->'attributes'->>'email' IS NOT NULL
|
||||
)
|
||||
SELECT
|
||||
ad.old_account_id,
|
||||
si."_id" as new_person_id
|
||||
FROM account_data ad
|
||||
JOIN global_account.social_id si ON si."key" = ad.social_key
|
||||
WHERE ad.old_account_id NOT IN ('core:account:System', 'core:account:ConfigUser')
|
||||
`
|
||||
let progressMade = false
|
||||
let connectsCount = 0
|
||||
let done = false
|
||||
|
||||
// Get list of tables to process
|
||||
const tables = await pgClient`
|
||||
SELECT table_name
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = 'public'
|
||||
AND column_name IN ('createdBy', 'modifiedBy')
|
||||
GROUP BY table_name
|
||||
`
|
||||
|
||||
// Process each table
|
||||
for (const table of tables) {
|
||||
const tableName = table.table_name
|
||||
ctx.info(`Processing table: ${tableName}`)
|
||||
|
||||
// Get counts for logging
|
||||
const [createdByCount] = await pgClient`
|
||||
SELECT COUNT(*)
|
||||
FROM ${pgClient(tableName)}
|
||||
WHERE "createdBy" IN (SELECT old_account_id FROM temp_data.account_personid_mapping)
|
||||
`
|
||||
const [modifiedByCount] = await pgClient`
|
||||
SELECT COUNT(*)
|
||||
FROM ${pgClient(tableName)}
|
||||
WHERE "modifiedBy" IN (SELECT old_account_id FROM temp_data.account_personid_mapping)
|
||||
`
|
||||
|
||||
ctx.info(
|
||||
`Table ${tableName}: ${createdByCount.count} createdBy and ${modifiedByCount.count} modifiedBy records need updating`
|
||||
)
|
||||
|
||||
// Update createdBy
|
||||
if (createdByCount.count > 0) {
|
||||
ctx.info(`Updating createdBy for ${tableName}...`)
|
||||
const startTime = Date.now()
|
||||
await pgClient`
|
||||
UPDATE ${pgClient(tableName)}
|
||||
SET "createdBy" = m.new_person_id::text
|
||||
FROM temp_data.account_personid_mapping m
|
||||
WHERE ${pgClient(tableName)}."createdBy" = m.old_account_id
|
||||
`
|
||||
const duration = (Date.now() - startTime) / 1000
|
||||
const rate = Math.round(createdByCount.count / duration)
|
||||
ctx.info(`Updated createdBy for ${tableName}: ${createdByCount.count} rows in ${duration}s (${rate} rows/sec)`)
|
||||
}
|
||||
|
||||
// Update modifiedBy
|
||||
if (modifiedByCount.count > 0) {
|
||||
ctx.info(`Updating modifiedBy for ${tableName}...`)
|
||||
const startTime = Date.now()
|
||||
await pgClient`
|
||||
UPDATE ${pgClient(tableName)}
|
||||
SET "modifiedBy" = m.new_person_id::text
|
||||
FROM temp_data.account_personid_mapping m
|
||||
WHERE ${pgClient(tableName)}."modifiedBy" = m.old_account_id
|
||||
`
|
||||
const duration = (Date.now() - startTime) / 1000
|
||||
const rate = Math.round(modifiedByCount.count / duration)
|
||||
ctx.info(
|
||||
`Updated modifiedBy for ${tableName}: ${modifiedByCount.count} rows in ${duration}s (${rate} rows/sec)`
|
||||
)
|
||||
}
|
||||
while (!done && (connectsCount === 0 || progressMade) && connectsCount < MAX_RECONNECTS) {
|
||||
if (connectsCount > 0) {
|
||||
ctx.info('Reconnecting...')
|
||||
}
|
||||
progressMade = false
|
||||
connectsCount++
|
||||
|
||||
ctx.info('Migration completed successfully')
|
||||
} finally {
|
||||
pg.close()
|
||||
const pg = getDBClient(sharedPipelineContextVars, dbUrl)
|
||||
const pgClient = await pg.getClient()
|
||||
|
||||
try {
|
||||
ctx.info('Creating/ensuring account to person id mapping table...')
|
||||
// Create schema
|
||||
await pgClient`CREATE SCHEMA IF NOT EXISTS temp_data`
|
||||
|
||||
// Create mapping table
|
||||
await pgClient`
|
||||
CREATE TABLE IF NOT EXISTS temp_data.account_personid_mapping (
|
||||
old_account_id text NOT NULL PRIMARY KEY,
|
||||
new_person_id text
|
||||
)
|
||||
`
|
||||
|
||||
// Populate mapping table
|
||||
await pgClient`
|
||||
INSERT INTO temp_data.account_personid_mapping
|
||||
WITH account_data AS (
|
||||
SELECT
|
||||
tx."objectId" as old_account_id,
|
||||
CASE
|
||||
WHEN tx.data->'attributes'->>'email' LIKE 'github:%' THEN lower(tx.data->'attributes'->>'email')
|
||||
WHEN tx.data->'attributes'->>'email' LIKE 'openid:%' THEN 'oidc:' || lower(substring(tx.data->'attributes'->>'email' from 8))
|
||||
ELSE 'email:' || lower(tx.data->'attributes'->>'email')
|
||||
END as social_key
|
||||
FROM model_tx tx
|
||||
WHERE tx."_class" = 'core:class:TxCreateDoc'
|
||||
AND tx.data->>'objectClass' = 'contact:class:PersonAccount'
|
||||
AND tx.data->'attributes'->>'email' IS NOT NULL
|
||||
)
|
||||
SELECT
|
||||
ad.old_account_id,
|
||||
si."_id" as new_person_id
|
||||
FROM account_data ad
|
||||
JOIN global_account.social_id si ON si."key" = ad.social_key
|
||||
WHERE ad.old_account_id NOT IN ('core:account:System', 'core:account:ConfigUser')
|
||||
ON CONFLICT (old_account_id) DO NOTHING
|
||||
`
|
||||
|
||||
// Get list of tables to process
|
||||
const tables = await pgClient`
|
||||
SELECT table_name
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = 'public'
|
||||
AND column_name IN ('createdBy', 'modifiedBy')
|
||||
GROUP BY table_name
|
||||
`
|
||||
const filteredTables =
|
||||
domains == null || domains.length === 0 ? tables : tables.filter((t) => domains.includes(t.table_name))
|
||||
ctx.info(`Found ${filteredTables.length} tables to process`, { domains: filteredTables.map((t) => t.table_name) })
|
||||
|
||||
// Process each table
|
||||
for (const table of filteredTables) {
|
||||
const tableName = table.table_name
|
||||
ctx.info(`Processing table: ${tableName}`)
|
||||
|
||||
// Get counts for logging
|
||||
const [createdByCount] = await pgClient`
|
||||
SELECT COUNT(*)
|
||||
FROM ${pgClient(tableName)} t
|
||||
JOIN temp_data.account_personid_mapping m ON t."createdBy" = m.old_account_id
|
||||
`
|
||||
|
||||
const [modifiedByCount] = await pgClient`
|
||||
SELECT COUNT(*)
|
||||
FROM ${pgClient(tableName)} t
|
||||
JOIN temp_data.account_personid_mapping m ON t."modifiedBy" = m.old_account_id
|
||||
`
|
||||
|
||||
ctx.info(
|
||||
`Table ${tableName}: ${createdByCount.count} createdBy and ${modifiedByCount.count} modifiedBy records need updating`
|
||||
)
|
||||
|
||||
if (createdByCount.count > 0) {
|
||||
ctx.info(`Updating createdBy for ${tableName}...`)
|
||||
const startTime = Date.now()
|
||||
|
||||
if (batchSize == null || batchSize > createdByCount.count) {
|
||||
ctx.info(`Processing the whole table ${tableName}...`)
|
||||
await pgClient`
|
||||
UPDATE ${pgClient(tableName)}
|
||||
SET "createdBy" = m.new_person_id::text
|
||||
FROM temp_data.account_personid_mapping m
|
||||
WHERE ${pgClient(tableName)}."createdBy" = m.old_account_id
|
||||
`
|
||||
progressMade = true
|
||||
} else {
|
||||
ctx.info(`Processing the table ${tableName} in batches of ${batchSize}...`)
|
||||
let processed = 0
|
||||
while (true) {
|
||||
const res = await pgClient`
|
||||
UPDATE ${pgClient(tableName)}
|
||||
SET "createdBy" = m.new_person_id::text
|
||||
FROM temp_data.account_personid_mapping m
|
||||
WHERE ${pgClient(tableName)}."createdBy" = m.old_account_id
|
||||
LIMIT ${batchSize}
|
||||
`
|
||||
progressMade = true
|
||||
if (res.count === 0) {
|
||||
break
|
||||
}
|
||||
processed += res.count
|
||||
const duration = (Date.now() - startTime) / 1000
|
||||
const rate = Math.round(processed / duration)
|
||||
ctx.info(`Processing createdBy for ${tableName}: ${processed} rows in ${duration}s (${rate} rows/sec)`)
|
||||
}
|
||||
}
|
||||
|
||||
const duration = (Date.now() - startTime) / 1000
|
||||
const rate = Math.round(createdByCount.count / duration)
|
||||
ctx.info(
|
||||
`Updated createdBy for ${tableName}: ${createdByCount.count} rows in ${duration}s (${rate} rows/sec)`
|
||||
)
|
||||
}
|
||||
|
||||
if (modifiedByCount.count > 0) {
|
||||
ctx.info(`Updating modifiedBy for ${tableName}...`)
|
||||
const startTime = Date.now()
|
||||
|
||||
if (batchSize == null || batchSize > modifiedByCount.count) {
|
||||
ctx.info(`Processing the whole table ${tableName}...`)
|
||||
await pgClient`
|
||||
UPDATE ${pgClient(tableName)}
|
||||
SET "modifiedBy" = m.new_person_id::text
|
||||
FROM temp_data.account_personid_mapping m
|
||||
WHERE ${pgClient(tableName)}."modifiedBy" = m.old_account_id
|
||||
`
|
||||
progressMade = true
|
||||
} else {
|
||||
ctx.info(`Processing the table ${tableName} in batches of ${batchSize}...`)
|
||||
let processed = 0
|
||||
while (true) {
|
||||
const res = await pgClient`
|
||||
UPDATE ${pgClient(tableName)}
|
||||
SET "modifiedBy" = m.new_person_id::text
|
||||
FROM temp_data.account_personid_mapping m
|
||||
WHERE ${pgClient(tableName)}."modifiedBy" = m.old_account_id
|
||||
LIMIT ${batchSize}
|
||||
`
|
||||
progressMade = true
|
||||
if (res.count === 0) {
|
||||
break
|
||||
}
|
||||
processed += res.count
|
||||
const duration = (Date.now() - startTime) / 1000
|
||||
const rate = Math.round(processed / duration)
|
||||
ctx.info(`Processing modifiedBy for ${tableName}: ${processed} rows in ${duration}s (${rate} rows/sec)`)
|
||||
}
|
||||
}
|
||||
|
||||
const duration = (Date.now() - startTime) / 1000
|
||||
const rate = Math.round(modifiedByCount.count / duration)
|
||||
ctx.info(
|
||||
`Updated modifiedBy for ${tableName}: ${modifiedByCount.count} rows in ${duration}s (${rate} rows/sec)`
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
done = true
|
||||
ctx.info('Migration of created/modified completed successfully')
|
||||
} catch (err: any) {
|
||||
if (err.code !== 'CONNECTION_CLOSED') {
|
||||
throw err
|
||||
}
|
||||
ctx.info('Connection closed...')
|
||||
if (connectsCount === MAX_RECONNECTS) {
|
||||
ctx.error('Failed to migrate created/modified by', { err })
|
||||
throw err
|
||||
}
|
||||
} finally {
|
||||
pg.close()
|
||||
}
|
||||
}
|
||||
|
||||
if (!done) {
|
||||
ctx.error('Failed to migrate created/modified by')
|
||||
}
|
||||
}
|
||||
|
||||
@ -717,7 +813,7 @@ export async function migrateMergedAccounts (
|
||||
// 1. Take the first social id with the existing account
|
||||
// 2. Merge all other accounts into the first one
|
||||
// 3. Create social ids for the first account which haven't had their own accounts
|
||||
const toAdd: Array<SocialKey> = []
|
||||
const toAdd = new Set<SocialKey>()
|
||||
const toMergePersons = new Set<PersonUuid>()
|
||||
const toMergeAccounts = new Set<AccountUuid>()
|
||||
for (const socialKey of socialKeys) {
|
||||
@ -730,7 +826,7 @@ export async function migrateMergedAccounts (
|
||||
})) as AccountUuid
|
||||
|
||||
if (personUuid == null) {
|
||||
toAdd.push(socialIdKey)
|
||||
toAdd.add(socialIdKey)
|
||||
// Means not attached to any account yet, simply add the social id to the primary account
|
||||
} else if (accountUuid == null) {
|
||||
toMergePersons.add(personUuid)
|
||||
@ -765,7 +861,7 @@ export async function migrateMergedAccounts (
|
||||
})
|
||||
}
|
||||
|
||||
for (const addTarget of toAdd) {
|
||||
for (const addTarget of Array.from(toAdd)) {
|
||||
await addSocialIdToPerson(ctx, accountDb, null, token, {
|
||||
person: primaryAccount,
|
||||
...addTarget,
|
||||
|
@ -2316,11 +2316,19 @@ export function devTool (
|
||||
}, dbUrl)
|
||||
})
|
||||
|
||||
program.command('migrate-created-modified-by').action(async () => {
|
||||
const { dbUrl } = prepareTools()
|
||||
program
|
||||
.command('migrate-created-modified-by')
|
||||
.option('--domains <domains>', 'Domains to migrate(comma-separated)')
|
||||
.option('--lifetime <lifetime>', 'Max lifetime for the connection in seconds')
|
||||
.option('--batch <batch>', 'Batch size')
|
||||
.action(async (cmd: { domains?: string, lifetime?: string, batch?: string }) => {
|
||||
const { dbUrl } = prepareTools()
|
||||
const domains = cmd.domains?.split(',').map((d) => d.trim())
|
||||
const maxLifetime = cmd.lifetime != null ? parseInt(cmd.lifetime) : undefined
|
||||
const batchSize = cmd.batch != null ? parseInt(cmd.batch) : undefined
|
||||
|
||||
await migrateCreatedModifiedBy(toolCtx, dbUrl)
|
||||
})
|
||||
await migrateCreatedModifiedBy(toolCtx, dbUrl, domains, maxLifetime, batchSize)
|
||||
})
|
||||
|
||||
program.command('ensure-global-persons-for-local-accounts').action(async () => {
|
||||
const { dbUrl } = prepareTools()
|
||||
|
@ -270,11 +270,10 @@ async function migrateWorkspace (
|
||||
return
|
||||
}
|
||||
|
||||
const createdBy =
|
||||
workspace.createdBy !== undefined ? accountsEmailToUuid[workspace.createdBy] : ('N/A' as AccountUuid)
|
||||
let createdBy = workspace.createdBy !== undefined ? accountsEmailToUuid[workspace.createdBy] : ('N/A' as AccountUuid)
|
||||
if (createdBy === undefined) {
|
||||
console.log('No account found for workspace', workspace.workspace, 'created by', workspace.createdBy)
|
||||
return
|
||||
createdBy = 'N/A' as AccountUuid
|
||||
}
|
||||
|
||||
const existingWorkspace = await accountDB.workspace.findOne({ url: workspace.workspaceUrl })
|
||||
|
Loading…
Reference in New Issue
Block a user