UBERF-8895: Workspace UUIDs in PG/CR data tables (#7471)

Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com>
This commit is contained in:
Alexey Zinoviev 2024-12-19 20:29:43 +04:00 committed by GitHub
parent 63464cbc54
commit c92c13f864
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 210 additions and 23 deletions

View File

@ -30,7 +30,7 @@ import { type DBDoc } from '@hcengineering/postgres/types/utils'
import { getTransactorEndpoint } from '@hcengineering/server-client'
import { generateToken } from '@hcengineering/server-token'
import { connect } from '@hcengineering/server-tool'
import { type MongoClient } from 'mongodb'
import { type MongoClient, UUID } from 'mongodb'
import type postgres from 'postgres'
export async function moveFromMongoToPG (
@ -93,7 +93,8 @@ async function moveWorkspace (
continue
}
const cursor = collection.find()
const current = await pgClient`SELECT _id FROM ${pgClient(domain)} WHERE "workspaceId" = ${ws.workspace}`
const current =
await pgClient`SELECT _id FROM ${pgClient(domain)} WHERE "workspaceId" = ${ws.uuid ?? ws.workspace}`
const currentIds = new Set(current.map((r) => r._id))
console.log('move domain', domain)
const docs: Doc[] = []
@ -275,3 +276,94 @@ export async function moveAccountDbFromMongoToPG (
ctx.info('Assignments made', { count: assignmentsToInsert.length })
}
export async function generateUuidMissingWorkspaces (
ctx: MeasureMetricsContext,
db: AccountDB,
dryRun = false
): Promise<void> {
const workspaces = await listWorkspacesPure(db)
let updated = 0
for (const ws of workspaces) {
if (ws.uuid !== undefined) continue
const uuid = new UUID().toJSON()
if (!dryRun) {
await db.workspace.updateOne({ _id: ws._id }, { uuid })
}
updated++
}
ctx.info('Assigned uuids to workspaces', { updated, total: workspaces.length })
}
export async function updateDataWorkspaceIdToUuid (
ctx: MeasureMetricsContext,
accountDb: AccountDB,
dbUrl: string | undefined,
dryRun = false
): Promise<void> {
if (dbUrl === undefined) {
throw new Error('dbUrl is required')
}
const pg = getDBClient(dbUrl)
try {
const pgClient = await pg.getClient()
// Generate uuids for all workspaces or verify they exist
await generateUuidMissingWorkspaces(ctx, accountDb, dryRun)
const workspaces = await listWorkspacesPure(accountDb)
const noUuidWss = workspaces.filter((ws) => ws.uuid === undefined)
if (noUuidWss.length > 0) {
ctx.error('Workspace uuid is required but not defined', { workspaces: noUuidWss.map((it) => it.workspace) })
throw new Error('workspace uuid is required but not defined')
}
const res = await pgClient`select t.table_name from information_schema.columns as c
join information_schema.tables as t on
c.table_catalog = t.table_catalog and
c.table_schema = t.table_schema and
c.table_name = t.table_name
where t.table_type = 'BASE TABLE' and t.table_schema = 'public' and c.column_name = 'workspaceId' and c.data_type <> 'uuid'`
const tables: string[] = res.map((r) => r.table_name)
ctx.info('Tables to be updated: ', { tables })
for (const table of tables) {
ctx.info('Altering table workspaceId type to uuid', { table })
if (!dryRun) {
await retryTxn(pgClient, async (client) => {
await client`ALTER TABLE ${client(table)} RENAME COLUMN "workspaceId" TO "workspaceIdOld"`
await client`ALTER TABLE ${client(table)} ADD COLUMN "workspaceId" UUID`
})
await retryTxn(pgClient, async (client) => {
for (const ws of workspaces) {
const uuid = ws.uuid
if (uuid === undefined) {
ctx.error('Workspace uuid is required but not defined', { workspace: ws.workspace })
throw new Error('workspace uuid is required but not defined')
}
await client`UPDATE ${client(table)} SET "workspaceId" = ${uuid} WHERE "workspaceIdOld" = ${ws.workspace}`
}
})
await retryTxn(pgClient, async (client) => {
await client`ALTER TABLE ${client(table)} ALTER COLUMN "workspaceId" SET NOT NULL`
})
await retryTxn(pgClient, async (client) => {
await client`ALTER TABLE ${client(table)} DROP CONSTRAINT ${client(`${table}_pkey`)}`
await client`ALTER TABLE ${client(table)} ADD CONSTRAINT ${client(`${table}_pkey`)} PRIMARY KEY ("workspaceId", _id)`
})
}
}
ctx.info('Done updating workspaceId to uuid')
} finally {
pg.close()
}
}

View File

@ -134,7 +134,13 @@ import {
restoreRecruitingTaskTypes
} from './clean'
import { changeConfiguration } from './configuration'
import { moveAccountDbFromMongoToPG, moveFromMongoToPG, moveWorkspaceFromMongoToPG } from './db'
import {
generateUuidMissingWorkspaces,
updateDataWorkspaceIdToUuid,
moveAccountDbFromMongoToPG,
moveFromMongoToPG,
moveWorkspaceFromMongoToPG
} from './db'
import { restoreControlledDocContentMongo, restoreWikiContentMongo } from './markup'
import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin'
import { fixAccountEmails, renameAccount } from './renameAccount'
@ -1993,6 +1999,29 @@ export function devTool (
})
})
program
.command('generate-uuid-workspaces')
.description('generate uuids for all workspaces which are missing it')
.option('-d, --dryrun', 'Dry run', false)
.action(async (cmd: { dryrun: boolean }) => {
await withAccountDatabase(async (db) => {
console.log('generate uuids for all workspaces which are missing it')
await generateUuidMissingWorkspaces(toolCtx, db, cmd.dryrun)
})
})
program
.command('update-data-wsid-to-uuid')
.description('updates workspaceId in pg/cr to uuid')
.option('-d, --dryrun', 'Dry run', false)
.action(async (cmd: { dryrun: boolean }) => {
await withAccountDatabase(async (db) => {
console.log('updates workspaceId in pg/cr to uuid')
const { dbUrl } = prepareTools()
await updateDataWorkspaceIdToUuid(toolCtx, db, dbUrl, cmd.dryrun)
})
})
extendProgram?.(program)
program.parse(process.argv)

View File

@ -732,6 +732,8 @@ export interface BackupStatus {
export interface BaseWorkspaceInfo {
workspace: string // An uniq workspace name, Database names
uuid?: string // An uuid for a workspace to be used already for cockroach data
disabled?: boolean
version?: Data<Version>
branding?: string

View File

@ -125,6 +125,7 @@ export function toFindResult<T extends Doc> (docs: T[], total?: number, lookupMa
*/
export interface WorkspaceId {
name: string
uuid?: string
}
/**

View File

@ -264,6 +264,7 @@ export async function startIndexer (
opt.model,
{
...workspace,
uuid: workspaceInfo.uuid,
workspaceName: workspaceInfo.workspaceName ?? workspaceInfo.workspace,
workspaceUrl: workspaceInfo.workspaceUrl ?? workspaceInfo.workspace
},

View File

@ -22,7 +22,12 @@ describe.skip('test-backup-find', () => {
it('check create/load/clean', async () => {
const toolCtx = new MeasureMetricsContext('-', {})
// We should setup a DB with docuemnts and try to backup them.
const wsUrl = { name: 'testdb-backup-test', workspaceName: 'test', workspaceUrl: 'test' }
const wsUrl = {
name: 'testdb-backup-test',
uuid: 'testdb-backup-uuid',
workspaceName: 'test',
workspaceUrl: 'test'
}
const storageConfig = storageConfigFromEnv(STORAGE_CONFIG)
const storageAdapter = buildStorageFromConfig(storageConfig)
@ -67,7 +72,12 @@ describe.skip('test-backup-find', () => {
it('check traverse', async () => {
const toolCtx = new MeasureMetricsContext('-', {})
// We should setup a DB with docuemnts and try to backup them.
const wsUrl = { name: 'testdb-backup-test', workspaceName: 'test', workspaceUrl: 'test' }
const wsUrl = {
name: 'testdb-backup-test',
uuid: 'testdb-backup-uuid',
workspaceName: 'test',
workspaceUrl: 'test'
}
const storageConfig = storageConfigFromEnv(STORAGE_CONFIG)
const storageAdapter = buildStorageFromConfig(storageConfig)
const pipeline = await getServerPipeline(toolCtx, model, dbURL, wsUrl, storageAdapter, {

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { ObjectId as MongoObjectId } from 'mongodb'
import { ObjectId as MongoObjectId, UUID } from 'mongodb'
import type { Collection, CreateIndexesOptions, Db, Filter, OptionalUnlessRequiredId, Sort } from 'mongodb'
import type { Data, Version } from '@hcengineering/core'
@ -178,6 +178,14 @@ export class WorkspaceMongoDbCollection extends MongoDbCollection<Workspace> imp
super('workspace', db)
}
async insertOne<K extends keyof Workspace>(data: Partial<Workspace>, idKey?: K): Promise<any> {
if (data.uuid === undefined) {
data.uuid = new UUID().toJSON()
}
return await super.insertOne(data, idKey)
}
async countWorkspacesInRegion (region: string, upToVersion?: Data<Version>, visitedSince?: number): Promise<number> {
const regionQuery = region === '' ? { $or: [{ region: { $exists: false } }, { region: '' }] } : { region }
const query: Filter<Workspace>['$and'] = [

View File

@ -189,6 +189,7 @@ class BackupWorker {
)
const wsUrl: WorkspaceIdWithUrl = {
name: ws.workspace,
uuid: ws.uuid,
workspaceName: ws.workspaceName ?? '',
workspaceUrl: ws.workspaceUrl ?? ''
}
@ -360,6 +361,7 @@ export async function doRestoreWorkspace (
const storage = await createStorageBackupStorage(ctx, backupAdapter, getWorkspaceId(bucketName), ws.workspace)
const wsUrl: WorkspaceIdWithUrl = {
name: ws.workspace,
uuid: ws.uuid,
workspaceName: ws.workspaceName ?? '',
workspaceUrl: ws.workspaceUrl ?? ''
}

View File

@ -598,6 +598,7 @@ export interface Workspace {
workspaceId: WorkspaceId
workspaceName: string
workspaceUuid?: string
branding: Branding | null
}

View File

@ -42,6 +42,7 @@ createTaskModel(txes)
describe('postgres operations', () => {
const baseDbUri: string = process.env.DB_URL ?? 'postgresql://postgres:example@localhost:5433'
let dbId: string = 'pg_testdb_' + generateId()
let dbUuid: string = crypto.randomUUID()
let dbUri: string = baseDbUri + '/' + dbId
const clientRef: PostgresClientReference = getDBClient(baseDbUri)
let hierarchy: Hierarchy
@ -58,6 +59,7 @@ describe('postgres operations', () => {
beforeEach(async () => {
try {
dbId = 'pg_testdb_' + generateId()
dbUuid = crypto.randomUUID()
dbUri = baseDbUri + '/' + dbId
const client = await clientRef.getClient()
await client`CREATE DATABASE ${client(dbId)}`
@ -88,7 +90,16 @@ describe('postgres operations', () => {
}
const mctx = new MeasureMetricsContext('', {})
const txStorage = await createPostgresTxAdapter(mctx, hierarchy, dbUri, getWorkspaceId(dbId), model)
const txStorage = await createPostgresTxAdapter(
mctx,
hierarchy,
dbUri,
{
...getWorkspaceId(dbId),
uuid: dbUuid
},
model
)
// Put all transactions to Tx
for (const t of txes) {
@ -98,7 +109,16 @@ describe('postgres operations', () => {
await txStorage.close()
const ctx = new MeasureMetricsContext('client', {})
const serverStorage = await createPostgresAdapter(ctx, hierarchy, dbUri, getWorkspaceId(dbId), model)
const serverStorage = await createPostgresAdapter(
ctx,
hierarchy,
dbUri,
{
...getWorkspaceId(dbId),
uuid: dbUuid
},
model
)
await serverStorage.init?.()
client = await createClient(async (handler) => {
const st: ClientConnection = {

View File

@ -33,7 +33,7 @@ export function createPostgreeDestroyAdapter (url: string): WorkspaceDestroyAdap
for (const [domain] of Object.entries(domainSchemas)) {
await ctx.with('delete-workspace-domain', {}, async () => {
await retryTxn(connection, async (client) => {
await client`delete from ${connection(domain)} where "workspaceId" = '${connection(workspace.name)}'`
await client`delete from ${connection(domain)} where "workspaceId" = '${connection(workspace.uuid ?? workspace.name)}'`
})
})
}

View File

@ -316,17 +316,22 @@ class ConnectionMgr {
abstract class PostgresAdapterBase implements DbAdapter {
protected readonly _helper: DBCollectionHelper
protected readonly tableFields = new Map<string, string[]>()
protected readonly workspaceId: WorkspaceId
mgr: ConnectionMgr
constructor (
protected readonly client: postgres.Sql,
protected readonly refClient: PostgresClientReference,
protected readonly workspaceId: WorkspaceId,
protected readonly enrichedWorkspaceId: WorkspaceId,
protected readonly hierarchy: Hierarchy,
protected readonly modelDb: ModelDb,
readonly mgrId: string
) {
// Swich to use uuid already before new accounts and workspaces
this.workspaceId = {
name: enrichedWorkspaceId.uuid ?? enrichedWorkspaceId.name
}
this._helper = new DBCollectionHelper(this.client, this.workspaceId)
this.mgr = new ConnectionMgr(client, mgrId)
}

View File

@ -142,7 +142,7 @@ async function createTable (client: postgres.Sql, domain: string): Promise<void>
}
const colums = fields.join(', ')
const res = await client.unsafe(`CREATE TABLE IF NOT EXISTS ${domain} (
"workspaceId" text NOT NULL,
"workspaceId" uuid NOT NULL,
${colums},
data JSONB NOT NULL,
PRIMARY KEY("workspaceId", _id)
@ -404,10 +404,16 @@ export function isOwner (account: Account): boolean {
}
export class DBCollectionHelper implements DomainHelperOperations {
protected readonly workspaceId: WorkspaceId
constructor (
protected readonly client: postgres.Sql,
protected readonly workspaceId: WorkspaceId
) {}
protected readonly enrichedWorkspaceId: WorkspaceId
) {
this.workspaceId = {
name: enrichedWorkspaceId.uuid ?? enrichedWorkspaceId.name
}
}
async dropIndex (domain: Domain, name: string): Promise<void> {}

View File

@ -427,6 +427,7 @@ class TSessionManager implements SessionManager {
token,
workspaceInfo.workspaceUrl ?? workspaceInfo.workspaceId,
workspaceName,
workspaceInfo.uuid,
branding
)
}
@ -456,7 +457,8 @@ class TSessionManager implements SessionManager {
pipelineFactory,
ws,
workspaceInfo.workspaceUrl ?? workspaceInfo.workspaceId,
workspaceName
workspaceName,
workspaceInfo.uuid
)
}
} else {
@ -543,7 +545,8 @@ class TSessionManager implements SessionManager {
pipelineFactory: PipelineFactory,
ws: ConnectionSocket,
workspaceUrl: string,
workspaceName: string
workspaceName: string,
workspaceUuid?: string
): Promise<Pipeline> {
if (LOGGING_ENABLED) {
ctx.info('reloading workspace', { workspaceName, token: JSON.stringify(token) })
@ -565,7 +568,7 @@ class TSessionManager implements SessionManager {
// Re-create pipeline.
workspace.pipeline = pipelineFactory(
ctx,
{ ...token.workspace, workspaceUrl, workspaceName },
{ ...token.workspace, workspaceUrl, workspaceName, uuid: workspaceUuid },
true,
(ctx, tx, targets, exclude) => {
this.broadcastAll(workspace, tx, targets, exclude)
@ -654,6 +657,7 @@ class TSessionManager implements SessionManager {
token: Token,
workspaceUrl: string,
workspaceName: string,
workspaceUuid: string | undefined,
branding: Branding | null
): Workspace {
const upgrade = token.extra?.model === 'upgrade'
@ -664,7 +668,7 @@ class TSessionManager implements SessionManager {
id: generateId(),
pipeline: pipelineFactory(
pipelineCtx,
{ ...token.workspace, workspaceUrl, workspaceName },
{ ...token.workspace, uuid: workspaceUuid, workspaceUrl, workspaceName },
upgrade,
(ctx, tx, targets, exclude) => {
this.broadcastAll(workspace, tx, targets, exclude)
@ -676,6 +680,7 @@ class TSessionManager implements SessionManager {
upgrade,
workspaceId: token.workspace,
workspaceName,
workspaceUuid,
branding,
workspaceInitCompleted: false,
tickHash: this.tickCounter % ticksPerSecond,

View File

@ -1,5 +1,4 @@
import core, {
getWorkspaceId,
Hierarchy,
ModelDb,
systemAccountEmail,
@ -67,11 +66,15 @@ export async function createWorkspace (
try {
const wsUrl: WorkspaceIdWithUrl = {
name: workspaceInfo.workspace,
uuid: workspaceInfo.uuid,
workspaceName: workspaceInfo.workspaceName ?? '',
workspaceUrl: workspaceInfo.workspaceUrl ?? ''
}
const wsId = getWorkspaceId(workspaceInfo.workspace)
const wsId = {
name: workspaceInfo.workspace,
uuid: workspaceInfo.uuid
}
await handleWsEvent?.('create-started', version, 10)
@ -190,6 +193,7 @@ export async function upgradeWorkspace (
dbUrl,
{
name: ws.workspace,
uuid: ws.uuid,
workspaceName: ws.workspaceName ?? '',
workspaceUrl: ws.workspaceUrl ?? ''
},

View File

@ -1,7 +1,7 @@
services:
account:
environment:
- DB_URL=postgresql://postgres:example@postgres:5432
# account:
# environment:
# - DB_URL=postgresql://postgres:example@postgres:5432
transactor:
environment:
- DB_URL=postgresql://postgres:example@postgres:5432

View File

@ -6,7 +6,8 @@ export MINIO_SECRET_KEY=minioadmin
export MINIO_ENDPOINT=localhost:9002
export ACCOUNTS_URL=http://localhost:3003
export TRANSACTOR_URL=ws://localhost:3334
export ACCOUNT_DB_URL=postgresql://postgres:example@localhost:5433
# export ACCOUNT_DB_URL=postgresql://postgres:example@localhost:5433
export ACCOUNT_DB_URL=mongodb://localhost:27018
export MONGO_URL=mongodb://localhost:27018
export ELASTIC_URL=http://localhost:9201
export SERVER_SECRET=secret