diff --git a/dev/tool/src/db.ts b/dev/tool/src/db.ts index ba63870073..49ed8b2312 100644 --- a/dev/tool/src/db.ts +++ b/dev/tool/src/db.ts @@ -22,7 +22,8 @@ import { type SocialKey, type AccountUuid, parseSocialIdString, - DOMAIN_SPACE + DOMAIN_SPACE, + AccountRole } from '@hcengineering/core' import { getMongoClient, getWorkspaceMongoDB } from '@hcengineering/mongo' import { @@ -192,11 +193,15 @@ export async function moveAccountDbFromMongoToPG ( pgDb: AccountDB ): Promise { const mdb = mongoDb as MongoAccountDB + const BATCH_SIZE = 5000 + const WS_BATCH_SIZE = 2000 ctx.info('Starting migration of persons...') const personsCursor = mdb.person.findCursor({}) try { let personsCount = 0 + let personsBatch: any[] = [] + while (await personsCursor.hasNext()) { const person = await personsCursor.next() if (person == null) break @@ -211,13 +216,20 @@ export async function moveAccountDbFromMongoToPG ( person.lastName = '' } - await pgDb.person.insertOne(person) - personsCount++ - if (personsCount % 100 === 0) { + personsBatch.push(person) + if (personsBatch.length >= BATCH_SIZE) { + await pgDb.person.insertMany(personsBatch) + personsCount += personsBatch.length ctx.info(`Migrated ${personsCount} persons...`) + personsBatch = [] } } } + // Insert remaining batch + if (personsBatch.length > 0) { + await pgDb.person.insertMany(personsBatch) + personsCount += personsBatch.length + } ctx.info(`Migrated ${personsCount} persons`) } finally { await personsCursor.close() @@ -227,6 +239,9 @@ export async function moveAccountDbFromMongoToPG ( const accountsCursor = mdb.account.findCursor({}) try { let accountsCount = 0 + let accountsBatch: any[] = [] + let passwordsBatch: any[] = [] + while (await accountsCursor.hasNext()) { const account = await accountsCursor.next() if (account == null) break @@ -238,16 +253,34 @@ export async function moveAccountDbFromMongoToPG ( delete account.hash delete account.salt - await pgDb.account.insertOne(account) + accountsBatch.push(account) if (hash != null && salt != null) { - await pgDb.setPassword(account.uuid, hash, salt) + passwordsBatch.push([account.uuid, hash, salt]) } - accountsCount++ - if (accountsCount % 100 === 0) { + + if (accountsBatch.length >= BATCH_SIZE) { + await pgDb.account.insertMany(accountsBatch) + for (const [accountUuid, hash, salt] of passwordsBatch) { + await pgDb.setPassword(accountUuid, hash, salt) + } + + accountsCount += accountsBatch.length ctx.info(`Migrated ${accountsCount} accounts...`) + accountsBatch = [] + passwordsBatch = [] } } } + // Insert remaining batch + if (accountsBatch.length > 0) { + await pgDb.account.insertMany(accountsBatch) + accountsCount += accountsBatch.length + } + if (passwordsBatch.length > 0) { + for (const [accountUuid, hash, salt] of passwordsBatch) { + await pgDb.setPassword(accountUuid, hash, salt) + } + } ctx.info(`Migrated ${accountsCount} accounts`) } finally { await accountsCursor.close() @@ -257,6 +290,7 @@ export async function moveAccountDbFromMongoToPG ( const socialIdsCursor = mdb.socialId.findCursor({}) try { let socialIdsCount = 0 + let socialIdsBatch: any[] = [] while (await socialIdsCursor.hasNext()) { const socialId = await socialIdsCursor.next() if (socialId == null) break @@ -267,13 +301,22 @@ export async function moveAccountDbFromMongoToPG ( delete (socialId as any).id delete (socialId as any)._id // Types of _id are incompatible - await pgDb.socialId.insertOne(socialId) - socialIdsCount++ - if (socialIdsCount % 100 === 0) { - ctx.info(`Migrated ${socialIdsCount} social IDs...`) + socialIdsBatch.push(socialId) + + if (socialIdsBatch.length >= BATCH_SIZE) { + await pgDb.socialId.insertMany(socialIdsBatch) + + socialIdsCount += socialIdsBatch.length + ctx.info(`Migrated ${socialIdsCount} social ids...`) + socialIdsBatch = [] } } } + // Insert remaining batch + if (socialIdsBatch.length > 0) { + await pgDb.socialId.insertMany(socialIdsBatch) + socialIdsCount += socialIdsBatch.length + } ctx.info(`Migrated ${socialIdsCount} social IDs`) } finally { await socialIdsCursor.close() @@ -283,6 +326,7 @@ export async function moveAccountDbFromMongoToPG ( const accountEventsCursor = mdb.accountEvent.findCursor({}) try { let eventsCount = 0 + let eventsBatch: any[] = [] while (await accountEventsCursor.hasNext()) { const accountEvent = await accountEventsCursor.next() if (accountEvent == null) break @@ -296,13 +340,21 @@ export async function moveAccountDbFromMongoToPG ( const account = await pgDb.account.findOne({ uuid: accountEvent.accountUuid }) if (account == null) continue // Not a big deal if we don't move the event for non-existing account - await pgDb.accountEvent.insertOne(accountEvent) - eventsCount++ - if (eventsCount % 100 === 0) { + eventsBatch.push(accountEvent) + + if (eventsBatch.length >= BATCH_SIZE) { + await pgDb.accountEvent.insertMany(eventsBatch) + eventsCount += eventsBatch.length ctx.info(`Migrated ${eventsCount} account events...`) + eventsBatch = [] } } } + // Insert remaining batch + if (eventsBatch.length > 0) { + await pgDb.accountEvent.insertMany(eventsBatch) + eventsCount += eventsBatch.length + } ctx.info(`Migrated ${eventsCount} account events`) } finally { await accountEventsCursor.close() @@ -312,6 +364,9 @@ export async function moveAccountDbFromMongoToPG ( const workspacesCursor = mdb.workspace.findCursor({}) try { let workspacesCount = 0 + let workspacesBatch: any[] = [] + let workspacesStatusesBatch: any[] = [] + let workspacesMembersBatch: any[] = [] let membersCount = 0 while (await workspacesCursor.hasNext()) { const workspace = await workspacesCursor.next() @@ -333,25 +388,49 @@ export async function moveAccountDbFromMongoToPG ( } if (workspace.createdOn == null) { - delete workspace.createdOn + workspace.createdOn = Date.now() } - await pgDb.createWorkspace(workspace, status) - workspacesCount++ + workspacesBatch.push(workspace) + workspacesStatusesBatch.push(status) const members = await mdb.getWorkspaceMembers(workspace.uuid) for (const member of members) { const alreadyAssigned = await pgDb.getWorkspaceRole(member.person, workspace.uuid) if (alreadyAssigned != null) continue - await pgDb.assignWorkspace(member.person, workspace.uuid, member.role) - membersCount++ + workspacesMembersBatch.push([member.person, workspace.uuid, member.role]) } - if (workspacesCount % 100 === 0) { + if (workspacesBatch.length >= WS_BATCH_SIZE) { + const workspaceUuids = await pgDb.workspace.insertMany(workspacesBatch) + workspacesCount += workspacesBatch.length + workspacesBatch = [] + + await pgDb.workspaceStatus.insertMany( + workspacesStatusesBatch.map((s, i) => ({ ...s, workspaceUuid: workspaceUuids[i] })) + ) + workspacesStatusesBatch = [] + + await pgDb.batchAssignWorkspace(workspacesMembersBatch) + membersCount += workspacesMembersBatch.length + workspacesMembersBatch = [] + ctx.info(`Migrated ${workspacesCount} workspaces...`) } } + + // Insert remaining batch + if (workspacesBatch.length > 0) { + const workspaceUuids = await pgDb.workspace.insertMany(workspacesBatch) + workspacesCount += workspacesBatch.length + await pgDb.workspaceStatus.insertMany( + workspacesStatusesBatch.map((s, i) => ({ ...s, workspaceUuid: workspaceUuids[i] })) + ) + await pgDb.batchAssignWorkspace(workspacesMembersBatch) + membersCount += workspacesMembersBatch.length + } + ctx.info(`Migrated ${workspacesCount} workspaces with ${membersCount} member assignments`) } finally { await workspacesCursor.close() @@ -360,7 +439,10 @@ export async function moveAccountDbFromMongoToPG ( ctx.info('Starting migration of invites...') const invitesCursor = mdb.invite.findCursor({}) try { + // eslint-disable-next-line @typescript-eslint/no-loss-of-precision + const MAX_INT_8 = 9223372036854775807 let invitesCount = 0 + let invitesBatch: any[] = [] while (await invitesCursor.hasNext()) { const invite = await invitesCursor.next() if (invite == null) break @@ -370,15 +452,30 @@ export async function moveAccountDbFromMongoToPG ( delete (invite as any).id + if (invite.expiresOn > MAX_INT_8 || typeof invite.expiresOn !== 'number') { + invite.expiresOn = -1 + } + + if (["USER'", 'ADMIN'].includes(invite.role as any)) { + invite.role = AccountRole.User + } + const exists = await pgDb.invite.findOne({ migratedFrom: invite.migratedFrom }) if (exists == null) { - await pgDb.invite.insertOne(invite) - invitesCount++ - if (invitesCount % 100 === 0) { + invitesBatch.push(invite) + + if (invitesBatch.length >= BATCH_SIZE) { + await pgDb.invite.insertMany(invitesBatch) + invitesCount += invitesBatch.length ctx.info(`Migrated ${invitesCount} invites...`) + invitesBatch = [] } } } + if (invitesBatch.length > 0) { + await pgDb.invite.insertMany(invitesBatch) + invitesCount += invitesBatch.length + } ctx.info(`Migrated ${invitesCount} invites`) } finally { await invitesCursor.close() diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 11ead587bd..889d4cb5b5 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -174,11 +174,15 @@ export function devTool ( setMetadata(serverClientPlugin.metadata.Endpoint, accountsUrl) setMetadata(serverToken.metadata.Secret, serverSecret) - async function withAccountDatabase (f: (db: AccountDB) => Promise, dbOverride?: string): Promise { + async function withAccountDatabase ( + f: (db: AccountDB) => Promise, + dbOverride?: string, + nsOverride?: string + ): Promise { const uri = dbOverride ?? getAccountDBUrl() - console.log(`connecting to database '${uri}'...`) + const ns = nsOverride ?? process.env.ACCOUNT_DB_NS - const [accountDb, closeAccountsDb] = await getAccountDB(uri) + const [accountDb, closeAccountsDb] = await getAccountDB(uri, ns) try { await f(accountDb) } catch (err: any) { @@ -2299,10 +2303,16 @@ export function devTool ( throw new Error('MONGO_URL and DB_URL are the same') } + const mongoNs = process.env.OLD_ACCOUNTS_NS + await withAccountDatabase(async (pgDb) => { - await withAccountDatabase(async (mongoDb) => { - await moveAccountDbFromMongoToPG(toolCtx, mongoDb, pgDb) - }, mongodbUri) + await withAccountDatabase( + async (mongoDb) => { + await moveAccountDbFromMongoToPG(toolCtx, mongoDb, pgDb) + }, + mongodbUri, + mongoNs + ) }, dbUrl) }) diff --git a/server/account-service/src/index.ts b/server/account-service/src/index.ts index 83cb85da26..cb6d01c1c6 100644 --- a/server/account-service/src/index.ts +++ b/server/account-service/src/index.ts @@ -50,6 +50,7 @@ export function serveAccount (measureCtx: MeasureContext, brandings: BrandingMap } const oldAccsUrl = process.env.OLD_ACCOUNTS_URL ?? (dbUrl.startsWith('mongodb://') ? dbUrl : undefined) + const oldAccsNs = process.env.OLD_ACCOUNTS_NS const transactorUri = process.env.TRANSACTOR_URL if (transactorUri === undefined) { @@ -112,7 +113,7 @@ export function serveAccount (measureCtx: MeasureContext, brandings: BrandingMap const accountsDb = getAccountDB(dbUrl, dbNs) const migrations = accountsDb.then(async ([db]) => { if (oldAccsUrl !== undefined) { - await migrateFromOldAccounts(oldAccsUrl, db) + await migrateFromOldAccounts(oldAccsUrl, db, oldAccsNs) console.log('Migrations verified/done') } }) diff --git a/server/account-service/src/migration/migration.ts b/server/account-service/src/migration/migration.ts index e3a07416d2..ebc95d94eb 100644 --- a/server/account-service/src/migration/migration.ts +++ b/server/account-service/src/migration/migration.ts @@ -49,10 +49,14 @@ async function shouldMigrate ( } } -export async function migrateFromOldAccounts (oldAccsUrl: string, accountDB: AccountDB): Promise { +export async function migrateFromOldAccounts ( + oldAccsUrl: string, + accountDB: AccountDB, + oldAccsNs?: string +): Promise { const migrationKey = 'migrate-from-old-accounts' // Check if old accounts exist - const [oldAccountDb, closeOldDb] = await getMongoAccountDB(oldAccsUrl) + const [oldAccountDb, closeOldDb] = await getMongoAccountDB(oldAccsUrl, oldAccsNs) let processingHandle try { @@ -234,6 +238,7 @@ async function migrateAccount (account: OldAccount, accountDB: AccountDB): Promi await createAccount(accountDB, personUuid, account.confirmed, false, account.createdOn) if (account.hash != null && account.salt != null) { + // NOTE: for Mongo->CR migration use db method to update password instead await accountDB.account.updateOne({ uuid: personUuid as AccountUuid }, { hash: account.hash, salt: account.salt }) } } else { diff --git a/server/account/src/__tests__/mongo.test.ts b/server/account/src/__tests__/mongo.test.ts index d1b999ef33..118d35ce77 100644 --- a/server/account/src/__tests__/mongo.test.ts +++ b/server/account/src/__tests__/mongo.test.ts @@ -696,7 +696,8 @@ describe('MongoAccountDB', () => { hasNext: jest.fn().mockReturnValue(false), close: jest.fn() })), - updateOne: jest.fn() + updateOne: jest.fn(), + ensureIndices: jest.fn() } mockWorkspace = { diff --git a/server/account/src/collections/mongo.ts b/server/account/src/collections/mongo.ts index 82e3a0526d..b8c24a31f5 100644 --- a/server/account/src/collections/mongo.ts +++ b/server/account/src/collections/mongo.ts @@ -184,6 +184,10 @@ implements DbCollection { return (idKey !== undefined ? toInsert[idKey] : undefined) as K extends keyof T ? T[K] : undefined } + async insertMany (data: Array>): Promise : undefined> { + throw new Error('Not implemented') + } + async updateOne (query: Query, ops: Operations): Promise { const resOps: any = { $set: {} } @@ -346,6 +350,10 @@ export class WorkspaceStatusMongoDbCollection implements DbCollection[]): Promise { + throw new Error('Not implemented') + } + async updateOne (query: Query, ops: Operations): Promise { await this.wsCollection.updateOne(this.toWsQuery(query), this.toWsOperations(ops)) } @@ -420,6 +428,13 @@ export class MongoAccountDB implements AccountDB { } ]) + await this.socialId.ensureIndices([ + { + key: { type: 1, value: 1 }, + options: { unique: true, name: 'hc_account_social_id_type_value_1' } + } + ]) + await this.workspace.ensureIndices([ { key: { uuid: 1 }, @@ -538,6 +553,16 @@ export class MongoAccountDB implements AccountDB { }) } + async batchAssignWorkspace (data: [AccountUuid, WorkspaceUuid, AccountRole][]): Promise { + await this.workspaceMembers.insertMany( + data.map(([accountId, workspaceId, role]) => ({ + workspaceUuid: workspaceId, + accountUuid: accountId, + role + })) + ) + } + async unassignWorkspace (accountId: AccountUuid, workspaceId: WorkspaceUuid): Promise { await this.workspaceMembers.deleteMany({ workspaceUuid: workspaceId, diff --git a/server/account/src/collections/postgres/postgres.ts b/server/account/src/collections/postgres/postgres.ts index db0444607f..dd3d9365f6 100644 --- a/server/account/src/collections/postgres/postgres.ts +++ b/server/account/src/collections/postgres/postgres.ts @@ -292,6 +292,42 @@ implements DbCollection { return res[0][idKey] } + async insertMany (data: Array>, client?: Sql): Promise : undefined> { + const snakeData = convertKeysToSnakeCase(data) + const columns = new Set() + for (const record of snakeData) { + Object.keys(record).forEach((k) => columns.add(k)) + } + const columnsList = Array.from(columns).sort() + + const values: any[] = [] + for (const record of snakeData) { + const recordValues = columnsList.map((col) => record[col] ?? null) + values.push(...recordValues) + } + + const placeholders = snakeData + .map((_: any, i: number) => `(${columnsList.map((_, j) => `$${i * columnsList.length + j + 1}`).join(', ')})`) + .join(', ') + + const sql = ` + INSERT INTO ${this.getTableName()} + (${columnsList.map((k) => `"${k}"`).join(', ')}) + VALUES ${placeholders} + RETURNING * + ` + + const _client = client ?? this.client + const res: any = await _client.unsafe(sql, values) + const idKey = this.idKey + + if (idKey === undefined) { + return undefined as any + } + + return res.map((r: any) => r[idKey]) + } + protected buildUpdateClause (ops: Operations, lastRefIdx: number = 0): [string, any[]] { const updateChunks: string[] = [] const values: any[] = [] @@ -650,6 +686,19 @@ export class PostgresAccountDB implements AccountDB { .client`INSERT INTO ${this.client(this.getWsMembersTableName())} (workspace_uuid, account_uuid, role) VALUES (${workspaceUuid}, ${accountUuid}, ${role})` } + async batchAssignWorkspace (data: [AccountUuid, WorkspaceUuid, AccountRole][]): Promise { + const placeholders = data.map((_: any, i: number) => `($${i * 3 + 1}, $${i * 3 + 2}, $${i * 3 + 3})`).join(', ') + const values = data.flat() + + const sql = ` + INSERT INTO ${this.getWsMembersTableName()} + (account_uuid, workspace_uuid, role) + VALUES ${placeholders} + ` + + await this.client.unsafe(sql, values) + } + async unassignWorkspace (accountUuid: AccountUuid, workspaceUuid: WorkspaceUuid): Promise { await this .client`DELETE FROM ${this.client(this.getWsMembersTableName())} WHERE workspace_uuid = ${workspaceUuid} AND account_uuid = ${accountUuid}` diff --git a/server/account/src/types.ts b/server/account/src/types.ts index f2b5bc0ab9..b680ece60c 100644 --- a/server/account/src/types.ts +++ b/server/account/src/types.ts @@ -201,6 +201,7 @@ export interface AccountDB { init: () => Promise createWorkspace: (data: WorkspaceData, status: WorkspaceStatusData) => Promise assignWorkspace: (accountId: AccountUuid, workspaceId: WorkspaceUuid, role: AccountRole) => Promise + batchAssignWorkspace: (data: [AccountUuid, WorkspaceUuid, AccountRole][]) => Promise updateWorkspaceRole: (accountId: AccountUuid, workspaceId: WorkspaceUuid, role: AccountRole) => Promise unassignWorkspace: (accountId: AccountUuid, workspaceId: WorkspaceUuid) => Promise getWorkspaceRole: (accountId: AccountUuid, workspaceId: WorkspaceUuid) => Promise @@ -222,6 +223,7 @@ export interface DbCollection { find: (query: Query, sort?: Sort, limit?: number) => Promise findOne: (query: Query) => Promise insertOne: (data: Partial) => Promise + insertMany: (data: Partial[]) => Promise updateOne: (query: Query, ops: Operations) => Promise deleteMany: (query: Query) => Promise }