mirror of
https://github.com/hcengineering/platform.git
synced 2025-05-11 09:51:53 +00:00
qfix:move check tool (#8751)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
b0cd5238f3
commit
121acb591e
@ -1,21 +1,26 @@
|
||||
import {
|
||||
type AccountDB,
|
||||
listAccounts,
|
||||
listWorkspacesPure,
|
||||
listInvites,
|
||||
updateWorkspace,
|
||||
type Workspace,
|
||||
type ObjectId,
|
||||
getAccount,
|
||||
getWorkspaceById
|
||||
getWorkspaceById,
|
||||
listAccounts,
|
||||
listInvites,
|
||||
listWorkspacesPure,
|
||||
type ObjectId,
|
||||
updateWorkspace,
|
||||
type Workspace
|
||||
} from '@hcengineering/account'
|
||||
import {
|
||||
type BackupClient,
|
||||
type Client,
|
||||
type ClientWorkspaceInfo,
|
||||
type Doc,
|
||||
type Domain,
|
||||
getDiffUpdate,
|
||||
getWorkspaceId,
|
||||
type LowLevelStorage,
|
||||
MeasureMetricsContext,
|
||||
systemAccountEmail,
|
||||
type Doc
|
||||
type Tx
|
||||
} from '@hcengineering/core'
|
||||
import { getMongoClient, getWorkspaceMongoDB } from '@hcengineering/mongo'
|
||||
import {
|
||||
@ -28,7 +33,8 @@ import {
|
||||
} from '@hcengineering/postgres'
|
||||
import { type DBDoc } from '@hcengineering/postgres/types/utils'
|
||||
import { getTransactorEndpoint } from '@hcengineering/server-client'
|
||||
import { sharedPipelineContextVars } from '@hcengineering/server-pipeline'
|
||||
import { createDummyStorageAdapter } from '@hcengineering/server-core'
|
||||
import { createBackupPipeline, sharedPipelineContextVars } from '@hcengineering/server-pipeline'
|
||||
import { generateToken } from '@hcengineering/server-token'
|
||||
import { connect } from '@hcengineering/server-tool'
|
||||
import { type MongoClient, UUID } from 'mongodb'
|
||||
@ -63,6 +69,27 @@ export async function moveFromMongoToPG (
|
||||
client.close()
|
||||
}
|
||||
|
||||
export async function checkFromMongoToPG (
|
||||
mongoUrl: string,
|
||||
dbUrl: string | undefined,
|
||||
txes: Tx[],
|
||||
workspace: ClientWorkspaceInfo
|
||||
): Promise<void> {
|
||||
if (dbUrl === undefined) {
|
||||
throw new Error('dbUrl is required')
|
||||
}
|
||||
const client = getMongoClient(mongoUrl)
|
||||
const mongo = await client.getClient()
|
||||
|
||||
try {
|
||||
await checkMoveMissingWorkspace(mongo, dbUrl, txes, workspace)
|
||||
} catch (err) {
|
||||
console.log('Error when move workspace', workspace.workspaceName ?? workspace.workspace, err)
|
||||
throw err
|
||||
}
|
||||
client.close()
|
||||
}
|
||||
|
||||
async function moveWorkspace (
|
||||
accountDb: AccountDB,
|
||||
mongo: MongoClient,
|
||||
@ -155,6 +182,116 @@ async function moveWorkspace (
|
||||
}
|
||||
}
|
||||
|
||||
async function checkMoveMissingWorkspace (
|
||||
mongo: MongoClient,
|
||||
dbUrl: string,
|
||||
txes: Tx[],
|
||||
ws: ClientWorkspaceInfo
|
||||
): Promise<void> {
|
||||
try {
|
||||
const ctx = new MeasureMetricsContext('', {})
|
||||
console.log('move workspace', ws.workspaceName ?? ws.workspace)
|
||||
const wsId = getWorkspaceId(ws.workspaceId)
|
||||
const mongoDB = getWorkspaceMongoDB(mongo, wsId)
|
||||
const collections = await mongoDB.collections()
|
||||
|
||||
const backupPipeline = createBackupPipeline(ctx, dbUrl, txes, {
|
||||
externalStorage: createDummyStorageAdapter()
|
||||
})
|
||||
|
||||
const pipeline = await backupPipeline(
|
||||
ctx,
|
||||
{
|
||||
name: ws.workspace,
|
||||
workspaceName: ws.workspaceName ?? '',
|
||||
workspaceUrl: '',
|
||||
uuid: ws.uuid ?? ''
|
||||
},
|
||||
false,
|
||||
() => {},
|
||||
null
|
||||
)
|
||||
|
||||
const lowLevel = pipeline.context.lowLevelStorage as LowLevelStorage
|
||||
|
||||
try {
|
||||
for (const collection of collections) {
|
||||
if (
|
||||
collection.collectionName === 'tx' ||
|
||||
collection.collectionName === 'blob' ||
|
||||
collection.collectionName === 'doc-index-state'
|
||||
) {
|
||||
continue
|
||||
}
|
||||
console.log('checking domain', collection.collectionName)
|
||||
const cursor = collection.find()
|
||||
|
||||
const docs: Doc[] = []
|
||||
while (true) {
|
||||
while (docs.length < 5000) {
|
||||
const doc = (await cursor.next()) as Doc | null
|
||||
if (doc === null) break
|
||||
docs.push(doc)
|
||||
}
|
||||
if (docs.length === 0) break
|
||||
while (docs.length > 0) {
|
||||
const part = docs.splice(0, 100)
|
||||
|
||||
const pgDocs = await lowLevel.load(
|
||||
ctx,
|
||||
collection.collectionName as Domain,
|
||||
part.map((it) => it._id)
|
||||
)
|
||||
|
||||
for (const p of part) {
|
||||
const pgP = pgDocs.find((it) => it._id === p._id)
|
||||
if (pgP === undefined) {
|
||||
console.log('missing document', p._class, p._id)
|
||||
continue
|
||||
}
|
||||
const { '%hash%': _, ...rest } = p as any
|
||||
const diff = getDiffUpdate(pgP, rest)
|
||||
if (
|
||||
rest._class === 'notification:class:ActivityInboxNotification' ||
|
||||
rest._class === 'notification:class:CommonInboxNotification' ||
|
||||
rest._class === 'notification:class:MentionInboxNotification'
|
||||
) {
|
||||
delete diff.isViewed
|
||||
}
|
||||
if (rest._class === 'notification:class:DocNotifyContext') {
|
||||
delete diff.lastViewedTimestamp
|
||||
}
|
||||
if (Object.keys(diff).length > 0) {
|
||||
console.log(
|
||||
'Documents mismatch',
|
||||
p._class,
|
||||
p._id,
|
||||
'keys',
|
||||
Object.keys(diff),
|
||||
'mongo',
|
||||
Object.keys(diff).map((it) => rest[it]),
|
||||
'PG',
|
||||
Object.keys(diff).map((it) => (pgP as any)[it])
|
||||
)
|
||||
if (rest.modifiedOn === pgP?.modifiedOn) {
|
||||
console.log('Upload update')
|
||||
// Same modifiedOn, but we have modification, we need to apply it.
|
||||
await lowLevel.upload(ctx, collection.collectionName as Domain, [rest])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
await pipeline.close()
|
||||
}
|
||||
} catch (err) {
|
||||
console.log('Error when move workspace', ws.workspaceName ?? ws.workspace, err)
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
export async function moveWorkspaceFromMongoToPG (
|
||||
accountDb: AccountDB,
|
||||
mongoUrl: string,
|
||||
|
@ -55,6 +55,7 @@ import serverClientPlugin, {
|
||||
BlobClient,
|
||||
createClient,
|
||||
getTransactorEndpoint,
|
||||
getWorkspaceInfo,
|
||||
listAccountWorkspaces,
|
||||
updateBackupInfo
|
||||
} from '@hcengineering/server-client'
|
||||
@ -91,6 +92,7 @@ import core, {
|
||||
RateLimiter,
|
||||
systemAccountEmail,
|
||||
versionToString,
|
||||
type ClientWorkspaceInfo,
|
||||
type Data,
|
||||
type Doc,
|
||||
type Ref,
|
||||
@ -146,6 +148,7 @@ import {
|
||||
} from './clean'
|
||||
import { changeConfiguration } from './configuration'
|
||||
import {
|
||||
checkFromMongoToPG,
|
||||
generateUuidMissingWorkspaces,
|
||||
moveAccountDbFromMongoToPG,
|
||||
moveFromMongoToPG,
|
||||
@ -2156,6 +2159,17 @@ export function devTool (
|
||||
})
|
||||
})
|
||||
|
||||
program.command('check-move-mongo-to-pg <workspace>').action(async (workspaceId: string) => {
|
||||
const { dbUrl, txes } = prepareTools()
|
||||
const mongodbUri = getMongoDBUrl()
|
||||
const sysToken = generateToken(systemAccountEmail, getWorkspaceId(workspaceId))
|
||||
const info = await getWorkspaceInfo(sysToken, false)
|
||||
if (info == null) {
|
||||
return
|
||||
}
|
||||
await checkFromMongoToPG(mongodbUri, dbUrl, txes, info as ClientWorkspaceInfo)
|
||||
})
|
||||
|
||||
program
|
||||
.command('move-workspace-to-pg <workspace> <region>')
|
||||
.option('-i, --include <include>', 'A list of ; separated domain names to include during backup', '*')
|
||||
|
Loading…
Reference in New Issue
Block a user