UBERF-8425: Speed up accounts migration (#8994)

Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com>
This commit is contained in:
Alexey Zinoviev 2025-05-20 14:47:32 +04:00 committed by GitHub
parent 7f32b2b2f4
commit efb6ea9085
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 224 additions and 34 deletions

View File

@ -22,7 +22,8 @@ import {
type SocialKey,
type AccountUuid,
parseSocialIdString,
DOMAIN_SPACE
DOMAIN_SPACE,
AccountRole
} from '@hcengineering/core'
import { getMongoClient, getWorkspaceMongoDB } from '@hcengineering/mongo'
import {
@ -192,11 +193,15 @@ export async function moveAccountDbFromMongoToPG (
pgDb: AccountDB
): Promise<void> {
const mdb = mongoDb as MongoAccountDB
const BATCH_SIZE = 5000
const WS_BATCH_SIZE = 2000
ctx.info('Starting migration of persons...')
const personsCursor = mdb.person.findCursor({})
try {
let personsCount = 0
let personsBatch: any[] = []
while (await personsCursor.hasNext()) {
const person = await personsCursor.next()
if (person == null) break
@ -211,13 +216,20 @@ export async function moveAccountDbFromMongoToPG (
person.lastName = ''
}
await pgDb.person.insertOne(person)
personsCount++
if (personsCount % 100 === 0) {
personsBatch.push(person)
if (personsBatch.length >= BATCH_SIZE) {
await pgDb.person.insertMany(personsBatch)
personsCount += personsBatch.length
ctx.info(`Migrated ${personsCount} persons...`)
personsBatch = []
}
}
}
// Insert remaining batch
if (personsBatch.length > 0) {
await pgDb.person.insertMany(personsBatch)
personsCount += personsBatch.length
}
ctx.info(`Migrated ${personsCount} persons`)
} finally {
await personsCursor.close()
@ -227,6 +239,9 @@ export async function moveAccountDbFromMongoToPG (
const accountsCursor = mdb.account.findCursor({})
try {
let accountsCount = 0
let accountsBatch: any[] = []
let passwordsBatch: any[] = []
while (await accountsCursor.hasNext()) {
const account = await accountsCursor.next()
if (account == null) break
@ -238,16 +253,34 @@ export async function moveAccountDbFromMongoToPG (
delete account.hash
delete account.salt
await pgDb.account.insertOne(account)
accountsBatch.push(account)
if (hash != null && salt != null) {
await pgDb.setPassword(account.uuid, hash, salt)
passwordsBatch.push([account.uuid, hash, salt])
}
accountsCount++
if (accountsCount % 100 === 0) {
if (accountsBatch.length >= BATCH_SIZE) {
await pgDb.account.insertMany(accountsBatch)
for (const [accountUuid, hash, salt] of passwordsBatch) {
await pgDb.setPassword(accountUuid, hash, salt)
}
accountsCount += accountsBatch.length
ctx.info(`Migrated ${accountsCount} accounts...`)
accountsBatch = []
passwordsBatch = []
}
}
}
// Insert remaining batch
if (accountsBatch.length > 0) {
await pgDb.account.insertMany(accountsBatch)
accountsCount += accountsBatch.length
}
if (passwordsBatch.length > 0) {
for (const [accountUuid, hash, salt] of passwordsBatch) {
await pgDb.setPassword(accountUuid, hash, salt)
}
}
ctx.info(`Migrated ${accountsCount} accounts`)
} finally {
await accountsCursor.close()
@ -257,6 +290,7 @@ export async function moveAccountDbFromMongoToPG (
const socialIdsCursor = mdb.socialId.findCursor({})
try {
let socialIdsCount = 0
let socialIdsBatch: any[] = []
while (await socialIdsCursor.hasNext()) {
const socialId = await socialIdsCursor.next()
if (socialId == null) break
@ -267,13 +301,22 @@ export async function moveAccountDbFromMongoToPG (
delete (socialId as any).id
delete (socialId as any)._id // Types of _id are incompatible
await pgDb.socialId.insertOne(socialId)
socialIdsCount++
if (socialIdsCount % 100 === 0) {
ctx.info(`Migrated ${socialIdsCount} social IDs...`)
socialIdsBatch.push(socialId)
if (socialIdsBatch.length >= BATCH_SIZE) {
await pgDb.socialId.insertMany(socialIdsBatch)
socialIdsCount += socialIdsBatch.length
ctx.info(`Migrated ${socialIdsCount} social ids...`)
socialIdsBatch = []
}
}
}
// Insert remaining batch
if (socialIdsBatch.length > 0) {
await pgDb.socialId.insertMany(socialIdsBatch)
socialIdsCount += socialIdsBatch.length
}
ctx.info(`Migrated ${socialIdsCount} social IDs`)
} finally {
await socialIdsCursor.close()
@ -283,6 +326,7 @@ export async function moveAccountDbFromMongoToPG (
const accountEventsCursor = mdb.accountEvent.findCursor({})
try {
let eventsCount = 0
let eventsBatch: any[] = []
while (await accountEventsCursor.hasNext()) {
const accountEvent = await accountEventsCursor.next()
if (accountEvent == null) break
@ -296,13 +340,21 @@ export async function moveAccountDbFromMongoToPG (
const account = await pgDb.account.findOne({ uuid: accountEvent.accountUuid })
if (account == null) continue // Not a big deal if we don't move the event for non-existing account
await pgDb.accountEvent.insertOne(accountEvent)
eventsCount++
if (eventsCount % 100 === 0) {
eventsBatch.push(accountEvent)
if (eventsBatch.length >= BATCH_SIZE) {
await pgDb.accountEvent.insertMany(eventsBatch)
eventsCount += eventsBatch.length
ctx.info(`Migrated ${eventsCount} account events...`)
eventsBatch = []
}
}
}
// Insert remaining batch
if (eventsBatch.length > 0) {
await pgDb.accountEvent.insertMany(eventsBatch)
eventsCount += eventsBatch.length
}
ctx.info(`Migrated ${eventsCount} account events`)
} finally {
await accountEventsCursor.close()
@ -312,6 +364,9 @@ export async function moveAccountDbFromMongoToPG (
const workspacesCursor = mdb.workspace.findCursor({})
try {
let workspacesCount = 0
let workspacesBatch: any[] = []
let workspacesStatusesBatch: any[] = []
let workspacesMembersBatch: any[] = []
let membersCount = 0
while (await workspacesCursor.hasNext()) {
const workspace = await workspacesCursor.next()
@ -333,25 +388,49 @@ export async function moveAccountDbFromMongoToPG (
}
if (workspace.createdOn == null) {
delete workspace.createdOn
workspace.createdOn = Date.now()
}
await pgDb.createWorkspace(workspace, status)
workspacesCount++
workspacesBatch.push(workspace)
workspacesStatusesBatch.push(status)
const members = await mdb.getWorkspaceMembers(workspace.uuid)
for (const member of members) {
const alreadyAssigned = await pgDb.getWorkspaceRole(member.person, workspace.uuid)
if (alreadyAssigned != null) continue
await pgDb.assignWorkspace(member.person, workspace.uuid, member.role)
membersCount++
workspacesMembersBatch.push([member.person, workspace.uuid, member.role])
}
if (workspacesCount % 100 === 0) {
if (workspacesBatch.length >= WS_BATCH_SIZE) {
const workspaceUuids = await pgDb.workspace.insertMany(workspacesBatch)
workspacesCount += workspacesBatch.length
workspacesBatch = []
await pgDb.workspaceStatus.insertMany(
workspacesStatusesBatch.map((s, i) => ({ ...s, workspaceUuid: workspaceUuids[i] }))
)
workspacesStatusesBatch = []
await pgDb.batchAssignWorkspace(workspacesMembersBatch)
membersCount += workspacesMembersBatch.length
workspacesMembersBatch = []
ctx.info(`Migrated ${workspacesCount} workspaces...`)
}
}
// Insert remaining batch
if (workspacesBatch.length > 0) {
const workspaceUuids = await pgDb.workspace.insertMany(workspacesBatch)
workspacesCount += workspacesBatch.length
await pgDb.workspaceStatus.insertMany(
workspacesStatusesBatch.map((s, i) => ({ ...s, workspaceUuid: workspaceUuids[i] }))
)
await pgDb.batchAssignWorkspace(workspacesMembersBatch)
membersCount += workspacesMembersBatch.length
}
ctx.info(`Migrated ${workspacesCount} workspaces with ${membersCount} member assignments`)
} finally {
await workspacesCursor.close()
@ -360,7 +439,10 @@ export async function moveAccountDbFromMongoToPG (
ctx.info('Starting migration of invites...')
const invitesCursor = mdb.invite.findCursor({})
try {
// eslint-disable-next-line @typescript-eslint/no-loss-of-precision
const MAX_INT_8 = 9223372036854775807
let invitesCount = 0
let invitesBatch: any[] = []
while (await invitesCursor.hasNext()) {
const invite = await invitesCursor.next()
if (invite == null) break
@ -370,15 +452,30 @@ export async function moveAccountDbFromMongoToPG (
delete (invite as any).id
if (invite.expiresOn > MAX_INT_8 || typeof invite.expiresOn !== 'number') {
invite.expiresOn = -1
}
if (["USER'", 'ADMIN'].includes(invite.role as any)) {
invite.role = AccountRole.User
}
const exists = await pgDb.invite.findOne({ migratedFrom: invite.migratedFrom })
if (exists == null) {
await pgDb.invite.insertOne(invite)
invitesCount++
if (invitesCount % 100 === 0) {
invitesBatch.push(invite)
if (invitesBatch.length >= BATCH_SIZE) {
await pgDb.invite.insertMany(invitesBatch)
invitesCount += invitesBatch.length
ctx.info(`Migrated ${invitesCount} invites...`)
invitesBatch = []
}
}
}
if (invitesBatch.length > 0) {
await pgDb.invite.insertMany(invitesBatch)
invitesCount += invitesBatch.length
}
ctx.info(`Migrated ${invitesCount} invites`)
} finally {
await invitesCursor.close()

View File

@ -174,11 +174,15 @@ export function devTool (
setMetadata(serverClientPlugin.metadata.Endpoint, accountsUrl)
setMetadata(serverToken.metadata.Secret, serverSecret)
async function withAccountDatabase (f: (db: AccountDB) => Promise<any>, dbOverride?: string): Promise<void> {
async function withAccountDatabase (
f: (db: AccountDB) => Promise<any>,
dbOverride?: string,
nsOverride?: string
): Promise<void> {
const uri = dbOverride ?? getAccountDBUrl()
console.log(`connecting to database '${uri}'...`)
const ns = nsOverride ?? process.env.ACCOUNT_DB_NS
const [accountDb, closeAccountsDb] = await getAccountDB(uri)
const [accountDb, closeAccountsDb] = await getAccountDB(uri, ns)
try {
await f(accountDb)
} catch (err: any) {
@ -2299,10 +2303,16 @@ export function devTool (
throw new Error('MONGO_URL and DB_URL are the same')
}
const mongoNs = process.env.OLD_ACCOUNTS_NS
await withAccountDatabase(async (pgDb) => {
await withAccountDatabase(async (mongoDb) => {
await moveAccountDbFromMongoToPG(toolCtx, mongoDb, pgDb)
}, mongodbUri)
await withAccountDatabase(
async (mongoDb) => {
await moveAccountDbFromMongoToPG(toolCtx, mongoDb, pgDb)
},
mongodbUri,
mongoNs
)
}, dbUrl)
})

View File

@ -50,6 +50,7 @@ export function serveAccount (measureCtx: MeasureContext, brandings: BrandingMap
}
const oldAccsUrl = process.env.OLD_ACCOUNTS_URL ?? (dbUrl.startsWith('mongodb://') ? dbUrl : undefined)
const oldAccsNs = process.env.OLD_ACCOUNTS_NS
const transactorUri = process.env.TRANSACTOR_URL
if (transactorUri === undefined) {
@ -112,7 +113,7 @@ export function serveAccount (measureCtx: MeasureContext, brandings: BrandingMap
const accountsDb = getAccountDB(dbUrl, dbNs)
const migrations = accountsDb.then(async ([db]) => {
if (oldAccsUrl !== undefined) {
await migrateFromOldAccounts(oldAccsUrl, db)
await migrateFromOldAccounts(oldAccsUrl, db, oldAccsNs)
console.log('Migrations verified/done')
}
})

View File

@ -49,10 +49,14 @@ async function shouldMigrate (
}
}
export async function migrateFromOldAccounts (oldAccsUrl: string, accountDB: AccountDB): Promise<void> {
export async function migrateFromOldAccounts (
oldAccsUrl: string,
accountDB: AccountDB,
oldAccsNs?: string
): Promise<void> {
const migrationKey = 'migrate-from-old-accounts'
// Check if old accounts exist
const [oldAccountDb, closeOldDb] = await getMongoAccountDB(oldAccsUrl)
const [oldAccountDb, closeOldDb] = await getMongoAccountDB(oldAccsUrl, oldAccsNs)
let processingHandle
try {
@ -234,6 +238,7 @@ async function migrateAccount (account: OldAccount, accountDB: AccountDB): Promi
await createAccount(accountDB, personUuid, account.confirmed, false, account.createdOn)
if (account.hash != null && account.salt != null) {
// NOTE: for Mongo->CR migration use db method to update password instead
await accountDB.account.updateOne({ uuid: personUuid as AccountUuid }, { hash: account.hash, salt: account.salt })
}
} else {

View File

@ -696,7 +696,8 @@ describe('MongoAccountDB', () => {
hasNext: jest.fn().mockReturnValue(false),
close: jest.fn()
})),
updateOne: jest.fn()
updateOne: jest.fn(),
ensureIndices: jest.fn()
}
mockWorkspace = {

View File

@ -184,6 +184,10 @@ implements DbCollection<T> {
return (idKey !== undefined ? toInsert[idKey] : undefined) as K extends keyof T ? T[K] : undefined
}
async insertMany (data: Array<Partial<T>>): Promise<K extends keyof T ? Array<T[K]> : undefined> {
throw new Error('Not implemented')
}
async updateOne (query: Query<T>, ops: Operations<T>): Promise<void> {
const resOps: any = { $set: {} }
@ -346,6 +350,10 @@ export class WorkspaceStatusMongoDbCollection implements DbCollection<WorkspaceS
return data.workspaceUuid
}
async insertMany (data: Partial<WorkspaceStatus>[]): Promise<any> {
throw new Error('Not implemented')
}
async updateOne (query: Query<WorkspaceStatus>, ops: Operations<WorkspaceStatus>): Promise<void> {
await this.wsCollection.updateOne(this.toWsQuery(query), this.toWsOperations(ops))
}
@ -420,6 +428,13 @@ export class MongoAccountDB implements AccountDB {
}
])
await this.socialId.ensureIndices([
{
key: { type: 1, value: 1 },
options: { unique: true, name: 'hc_account_social_id_type_value_1' }
}
])
await this.workspace.ensureIndices([
{
key: { uuid: 1 },
@ -538,6 +553,16 @@ export class MongoAccountDB implements AccountDB {
})
}
async batchAssignWorkspace (data: [AccountUuid, WorkspaceUuid, AccountRole][]): Promise<void> {
await this.workspaceMembers.insertMany(
data.map(([accountId, workspaceId, role]) => ({
workspaceUuid: workspaceId,
accountUuid: accountId,
role
}))
)
}
async unassignWorkspace (accountId: AccountUuid, workspaceId: WorkspaceUuid): Promise<void> {
await this.workspaceMembers.deleteMany({
workspaceUuid: workspaceId,

View File

@ -292,6 +292,42 @@ implements DbCollection<T> {
return res[0][idKey]
}
async insertMany (data: Array<Partial<T>>, client?: Sql): Promise<K extends keyof T ? Array<T[K]> : undefined> {
const snakeData = convertKeysToSnakeCase(data)
const columns = new Set<string>()
for (const record of snakeData) {
Object.keys(record).forEach((k) => columns.add(k))
}
const columnsList = Array.from(columns).sort()
const values: any[] = []
for (const record of snakeData) {
const recordValues = columnsList.map((col) => record[col] ?? null)
values.push(...recordValues)
}
const placeholders = snakeData
.map((_: any, i: number) => `(${columnsList.map((_, j) => `$${i * columnsList.length + j + 1}`).join(', ')})`)
.join(', ')
const sql = `
INSERT INTO ${this.getTableName()}
(${columnsList.map((k) => `"${k}"`).join(', ')})
VALUES ${placeholders}
RETURNING *
`
const _client = client ?? this.client
const res: any = await _client.unsafe(sql, values)
const idKey = this.idKey
if (idKey === undefined) {
return undefined as any
}
return res.map((r: any) => r[idKey])
}
protected buildUpdateClause (ops: Operations<T>, lastRefIdx: number = 0): [string, any[]] {
const updateChunks: string[] = []
const values: any[] = []
@ -650,6 +686,19 @@ export class PostgresAccountDB implements AccountDB {
.client`INSERT INTO ${this.client(this.getWsMembersTableName())} (workspace_uuid, account_uuid, role) VALUES (${workspaceUuid}, ${accountUuid}, ${role})`
}
async batchAssignWorkspace (data: [AccountUuid, WorkspaceUuid, AccountRole][]): Promise<void> {
const placeholders = data.map((_: any, i: number) => `($${i * 3 + 1}, $${i * 3 + 2}, $${i * 3 + 3})`).join(', ')
const values = data.flat()
const sql = `
INSERT INTO ${this.getWsMembersTableName()}
(account_uuid, workspace_uuid, role)
VALUES ${placeholders}
`
await this.client.unsafe(sql, values)
}
async unassignWorkspace (accountUuid: AccountUuid, workspaceUuid: WorkspaceUuid): Promise<void> {
await this
.client`DELETE FROM ${this.client(this.getWsMembersTableName())} WHERE workspace_uuid = ${workspaceUuid} AND account_uuid = ${accountUuid}`

View File

@ -201,6 +201,7 @@ export interface AccountDB {
init: () => Promise<void>
createWorkspace: (data: WorkspaceData, status: WorkspaceStatusData) => Promise<WorkspaceUuid>
assignWorkspace: (accountId: AccountUuid, workspaceId: WorkspaceUuid, role: AccountRole) => Promise<void>
batchAssignWorkspace: (data: [AccountUuid, WorkspaceUuid, AccountRole][]) => Promise<void>
updateWorkspaceRole: (accountId: AccountUuid, workspaceId: WorkspaceUuid, role: AccountRole) => Promise<void>
unassignWorkspace: (accountId: AccountUuid, workspaceId: WorkspaceUuid) => Promise<void>
getWorkspaceRole: (accountId: AccountUuid, workspaceId: WorkspaceUuid) => Promise<AccountRole | null>
@ -222,6 +223,7 @@ export interface DbCollection<T> {
find: (query: Query<T>, sort?: Sort<T>, limit?: number) => Promise<T[]>
findOne: (query: Query<T>) => Promise<T | null>
insertOne: (data: Partial<T>) => Promise<any>
insertMany: (data: Partial<T>[]) => Promise<any>
updateOne: (query: Query<T>, ops: Operations<T>) => Promise<void>
deleteMany: (query: Query<T>) => Promise<void>
}