mirror of
https://github.com/hcengineering/platform.git
synced 2025-05-11 09:51:53 +00:00
UBERF-9756: Speed up CR account migrations (#8573)
* uberf-9756: speed up CR account migrations Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com> * uberf-9756: fix formatting Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com>
This commit is contained in:
parent
be6bd0d944
commit
d0b1227d2e
@ -1,19 +1,13 @@
|
||||
/* eslint-disable @typescript-eslint/no-unused-vars */
|
||||
import {
|
||||
type AccountDB,
|
||||
type MongoAccountDB,
|
||||
type Workspace,
|
||||
getAccount,
|
||||
getWorkspaceById,
|
||||
getWorkspaces
|
||||
} from '@hcengineering/account'
|
||||
import { type AccountDB, type MongoAccountDB, type Workspace, ensurePerson } from '@hcengineering/account'
|
||||
import { getFirstName, getLastName } from '@hcengineering/contact'
|
||||
import {
|
||||
systemAccountUuid,
|
||||
type BackupClient,
|
||||
type Client,
|
||||
type Doc,
|
||||
MeasureMetricsContext,
|
||||
type WorkspaceUuid
|
||||
SocialIdType
|
||||
} from '@hcengineering/core'
|
||||
import { getMongoClient, getWorkspaceMongoDB } from '@hcengineering/mongo'
|
||||
import {
|
||||
@ -29,8 +23,9 @@ import { getTransactorEndpoint } from '@hcengineering/server-client'
|
||||
import { sharedPipelineContextVars } from '@hcengineering/server-pipeline'
|
||||
import { generateToken } from '@hcengineering/server-token'
|
||||
import { connect } from '@hcengineering/server-tool'
|
||||
import { type MongoClient, UUID } from 'mongodb'
|
||||
import { type MongoClient } from 'mongodb'
|
||||
import type postgres from 'postgres'
|
||||
import { getToolToken } from './utils'
|
||||
|
||||
export async function moveFromMongoToPG (
|
||||
accountDb: AccountDB,
|
||||
@ -376,3 +371,247 @@ export async function moveAccountDbFromMongoToPG (
|
||||
|
||||
ctx.info('Account database migration completed')
|
||||
}
|
||||
|
||||
export async function migrateCreatedModifiedBy (ctx: MeasureMetricsContext, dbUrl: string): Promise<void> {
|
||||
if (!dbUrl.startsWith('postgresql')) {
|
||||
throw new Error('Only CockroachDB is supported')
|
||||
}
|
||||
|
||||
const pg = getDBClient(sharedPipelineContextVars, dbUrl)
|
||||
const pgClient = await pg.getClient()
|
||||
try {
|
||||
ctx.info('Creating account to person id mapping table...')
|
||||
// Create schema
|
||||
await pgClient`CREATE SCHEMA IF NOT EXISTS temp_data`
|
||||
|
||||
// Create mapping table
|
||||
await pgClient`
|
||||
CREATE TABLE IF NOT EXISTS temp_data.account_personid_mapping (
|
||||
old_account_id text,
|
||||
new_person_id text,
|
||||
INDEX idx_account_mapping_old_id (old_account_id)
|
||||
)
|
||||
`
|
||||
|
||||
// Populate mapping table
|
||||
await pgClient`
|
||||
INSERT INTO temp_data.account_personid_mapping
|
||||
WITH account_data AS (
|
||||
SELECT
|
||||
tx."objectId" as old_account_id,
|
||||
CASE
|
||||
WHEN tx.data->'attributes'->>'email' LIKE 'github:%' THEN lower(tx.data->'attributes'->>'email')
|
||||
WHEN tx.data->'attributes'->>'email' LIKE 'openid:%' THEN 'oidc:' || lower(substring(tx.data->'attributes'->>'email' from 8))
|
||||
ELSE 'email:' || lower(tx.data->'attributes'->>'email')
|
||||
END as social_key
|
||||
FROM model_tx tx
|
||||
WHERE tx."_class" = 'core:class:TxCreateDoc'
|
||||
AND tx.data->>'objectClass' = 'contact:class:PersonAccount'
|
||||
AND tx.data->'attributes'->>'email' IS NOT NULL
|
||||
)
|
||||
SELECT
|
||||
ad.old_account_id,
|
||||
si."_id" as new_person_id
|
||||
FROM account_data ad
|
||||
JOIN global_account.social_id si ON si."key" = ad.social_key
|
||||
WHERE ad.old_account_id NOT IN ('core:account:System', 'core:account:ConfigUser')
|
||||
`
|
||||
|
||||
// Get list of tables to process
|
||||
const tables = await pgClient`
|
||||
SELECT table_name
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = 'public'
|
||||
AND column_name IN ('createdBy', 'modifiedBy')
|
||||
GROUP BY table_name
|
||||
`
|
||||
|
||||
// Process each table
|
||||
for (const table of tables) {
|
||||
const tableName = table.table_name
|
||||
ctx.info(`Processing table: ${tableName}`)
|
||||
|
||||
// Get counts for logging
|
||||
const [createdByCount] = await pgClient`
|
||||
SELECT COUNT(*)
|
||||
FROM ${pgClient(tableName)}
|
||||
WHERE "createdBy" IN (SELECT old_account_id FROM temp_data.account_personid_mapping)
|
||||
`
|
||||
const [modifiedByCount] = await pgClient`
|
||||
SELECT COUNT(*)
|
||||
FROM ${pgClient(tableName)}
|
||||
WHERE "modifiedBy" IN (SELECT old_account_id FROM temp_data.account_personid_mapping)
|
||||
`
|
||||
|
||||
ctx.info(
|
||||
`Table ${tableName}: ${createdByCount.count} createdBy and ${modifiedByCount.count} modifiedBy records need updating`
|
||||
)
|
||||
|
||||
// Update createdBy
|
||||
if (createdByCount.count > 0) {
|
||||
ctx.info(`Updating createdBy for ${tableName}...`)
|
||||
const startTime = Date.now()
|
||||
await pgClient`
|
||||
UPDATE ${pgClient(tableName)}
|
||||
SET "createdBy" = m.new_person_id::text
|
||||
FROM temp_data.account_personid_mapping m
|
||||
WHERE ${pgClient(tableName)}."createdBy" = m.old_account_id
|
||||
`
|
||||
const duration = (Date.now() - startTime) / 1000
|
||||
const rate = Math.round(createdByCount.count / duration)
|
||||
ctx.info(`Updated createdBy for ${tableName}: ${createdByCount.count} rows in ${duration}s (${rate} rows/sec)`)
|
||||
}
|
||||
|
||||
// Update modifiedBy
|
||||
if (modifiedByCount.count > 0) {
|
||||
ctx.info(`Updating modifiedBy for ${tableName}...`)
|
||||
const startTime = Date.now()
|
||||
await pgClient`
|
||||
UPDATE ${pgClient(tableName)}
|
||||
SET "modifiedBy" = m.new_person_id::text
|
||||
FROM temp_data.account_personid_mapping m
|
||||
WHERE ${pgClient(tableName)}."modifiedBy" = m.old_account_id
|
||||
`
|
||||
const duration = (Date.now() - startTime) / 1000
|
||||
const rate = Math.round(modifiedByCount.count / duration)
|
||||
ctx.info(
|
||||
`Updated modifiedBy for ${tableName}: ${modifiedByCount.count} rows in ${duration}s (${rate} rows/sec)`
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
ctx.info('Migration completed successfully')
|
||||
} finally {
|
||||
pg.close()
|
||||
}
|
||||
}
|
||||
|
||||
export async function ensureGlobalPersonsForLocalAccounts (
|
||||
ctx: MeasureMetricsContext,
|
||||
dbUrl: string,
|
||||
accountDb: AccountDB
|
||||
): Promise<void> {
|
||||
ctx.info('Ensuring global persons for local accounts... ', {})
|
||||
|
||||
if (!dbUrl.startsWith('postgresql')) {
|
||||
throw new Error('Only CockroachDB is supported')
|
||||
}
|
||||
|
||||
const pg = getDBClient(sharedPipelineContextVars, dbUrl)
|
||||
const pgClient = await pg.getClient()
|
||||
const token = getToolToken()
|
||||
|
||||
try {
|
||||
ctx.info('Creating account to social key mapping table...')
|
||||
// Create schema
|
||||
await pgClient`CREATE SCHEMA IF NOT EXISTS temp_data`
|
||||
|
||||
// Create mapping table
|
||||
await pgClient`
|
||||
CREATE TABLE IF NOT EXISTS temp_data.account_socialkey_mapping (
|
||||
workspace_id text,
|
||||
old_account_id text,
|
||||
new_social_key text,
|
||||
person_ref text,
|
||||
person_name text,
|
||||
INDEX idx_account_mapping_old_id (workspace_id, old_account_id)
|
||||
)
|
||||
`
|
||||
|
||||
const [res] = await pgClient`SELECT COUNT(*) FROM temp_data.account_socialkey_mapping`
|
||||
|
||||
if (res.count === '0') {
|
||||
// Populate mapping table
|
||||
await pgClient`
|
||||
INSERT INTO temp_data.account_socialkey_mapping
|
||||
WITH person_refs AS (
|
||||
SELECT
|
||||
tx."workspaceId" as workspace_id,
|
||||
tx."objectId" as account_id,
|
||||
CASE
|
||||
WHEN tx.data->'attributes'->>'email' LIKE 'github:%' THEN lower(tx.data->'attributes'->>'email')
|
||||
WHEN tx.data->'attributes'->>'email' LIKE 'openid:%' THEN 'oidc:' || lower(substring(tx.data->'attributes'->>'email' from 8))
|
||||
ELSE 'email:' || lower(tx.data->'attributes'->>'email')
|
||||
END as new_social_key,
|
||||
COALESCE(
|
||||
-- Try to get person from most recent update
|
||||
(
|
||||
SELECT (tx2.data->'operations'->>'person')::text
|
||||
FROM model_tx tx2
|
||||
WHERE tx2."objectId" = tx."objectId"
|
||||
AND tx2."workspaceId" = tx."workspaceId"
|
||||
AND tx2.data->>'objectClass' = 'contact:class:PersonAccount'
|
||||
AND tx2.data->'operations'->>'person' IS NOT NULL
|
||||
ORDER BY tx2."createdOn" DESC
|
||||
LIMIT 1
|
||||
),
|
||||
-- If no updates, get from create transaction
|
||||
(tx.data->'attributes'->>'person')::text
|
||||
) as person_ref
|
||||
FROM model_tx tx
|
||||
WHERE tx."_class" = 'core:class:TxCreateDoc'
|
||||
AND tx.data->>'objectClass' = 'contact:class:PersonAccount'
|
||||
AND tx.data->'attributes'->>'email' IS NOT NULL
|
||||
AND tx.data->'attributes'->>'email' != ''
|
||||
AND tx."objectId" NOT IN ('core:account:System', 'core:account:ConfigUser')
|
||||
)
|
||||
SELECT
|
||||
p.workspace_id,
|
||||
p.account_id as old_account_id,
|
||||
p.new_social_key,
|
||||
p.person_ref,
|
||||
c.data->>'name' as person_name
|
||||
FROM person_refs p
|
||||
LEFT JOIN public.contact c ON c."_id" = p.person_ref
|
||||
`
|
||||
}
|
||||
|
||||
let count = 0
|
||||
let failed = 0
|
||||
const accountToSocialKey = await pgClient`SELECT * FROM temp_data.account_socialkey_mapping`
|
||||
for (const row of accountToSocialKey) {
|
||||
const newSocialKey = row.new_social_key
|
||||
const personName = row.person_name ?? ''
|
||||
|
||||
const keyParts = newSocialKey.split(':')
|
||||
if (keyParts.length !== 2) {
|
||||
ctx.error('Invalid social key', row)
|
||||
continue
|
||||
}
|
||||
|
||||
const keyType = keyParts[0]
|
||||
const keyValue = keyParts[1]
|
||||
|
||||
if (!Object.values(SocialIdType).includes(keyType)) {
|
||||
ctx.error('Invalid social key type', row)
|
||||
continue
|
||||
}
|
||||
|
||||
const firstName = getFirstName(personName)
|
||||
const lastName = getLastName(personName)
|
||||
const effectiveFirstName = firstName === '' ? keyValue : firstName
|
||||
|
||||
try {
|
||||
await ensurePerson(ctx, accountDb, null, token, {
|
||||
socialType: keyType as SocialIdType,
|
||||
socialValue: keyValue,
|
||||
firstName: effectiveFirstName,
|
||||
lastName
|
||||
})
|
||||
count++
|
||||
} catch (err: any) {
|
||||
ctx.error('Failed to ensure person', {
|
||||
socialType: keyType as SocialIdType,
|
||||
socialValue: keyValue,
|
||||
firstName: effectiveFirstName,
|
||||
lastName
|
||||
})
|
||||
failed++
|
||||
}
|
||||
}
|
||||
|
||||
ctx.info(`Successfully ensured ${count} people with failed count ${failed}`)
|
||||
} finally {
|
||||
pg.close()
|
||||
}
|
||||
}
|
||||
|
@ -93,8 +93,8 @@ import { getAccountDBUrl, getMongoDBUrl } from './__start'
|
||||
// import { fillGithubUsers, fixAccountEmails, renameAccount } from './account'
|
||||
import { changeConfiguration } from './configuration'
|
||||
|
||||
import { moveAccountDbFromMongoToPG } from './db'
|
||||
import { performGithubAccountMigrations } from './github'
|
||||
import { migrateCreatedModifiedBy, ensureGlobalPersonsForLocalAccounts, moveAccountDbFromMongoToPG } from './db'
|
||||
import { getToolToken, getWorkspace, getWorkspaceTransactorEndpoint } from './utils'
|
||||
|
||||
const colorConstants = {
|
||||
@ -2284,6 +2284,20 @@ export function devTool (
|
||||
}, dbUrl)
|
||||
})
|
||||
|
||||
program.command('migrate-created-modified-by').action(async () => {
|
||||
const { dbUrl } = prepareTools()
|
||||
|
||||
await migrateCreatedModifiedBy(toolCtx, dbUrl)
|
||||
})
|
||||
|
||||
program.command('ensure-global-persons-for-local-accounts').action(async () => {
|
||||
const { dbUrl } = prepareTools()
|
||||
|
||||
await withAccountDatabase(async (accDb) => {
|
||||
await ensureGlobalPersonsForLocalAccounts(toolCtx, dbUrl, accDb)
|
||||
}, dbUrl)
|
||||
})
|
||||
|
||||
// program
|
||||
// .command('perfomance')
|
||||
// .option('-p, --parallel', '', false)
|
||||
|
Loading…
Reference in New Issue
Block a user