UBERF-8619: Rework backup %hash% usage (#7273)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-12-06 16:30:33 +07:00 committed by GitHub
parent 2b969c74e8
commit f9e3aed4fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 401 additions and 429 deletions

20
.vscode/launch.json vendored
View File

@ -300,22 +300,24 @@
"cwd": "${workspaceRoot}/dev/tool"
},
{
"name": "Debug tool upgrade PG(tests)",
"name": "Debug tool upgrade PG(Cockroach)",
"type": "node",
"request": "launch",
"args": ["src/__start.ts", "upgrade", "--force"],
"args": ["src/__start.ts", "upgrade-workspace", "w-haiodo-alex-staff-c-673ee7ab-87df5406ea-2b8b4d" ],
"env": {
"SERVER_SECRET": "secret",
"MINIO_ACCESS_KEY": "minioadmin",
"MINIO_SECRET_KEY": "minioadmin",
"MINIO_ENDPOINT": "localhost:9002",
"TRANSACTOR_URL": "ws://localhost:3334",
"ACCOUNT_DB_URL": "postgresql://postgres:example@localhost:5433",
"DB_URL": "postgresql://postgres:example@localhost:5433",
"MONGO_URL": "mongodb://localhost:27018",
"ACCOUNTS_URL": "http://localhost:3003",
"MINIO_ENDPOINT": "localhost:9000",
"TRANSACTOR_URL": "ws://localhost:3332",
"ACCOUNTS_URL": "http://localhost:3000",
"ACCOUNT_DB_URL": "mongodb://localhost:27017",
// "ACCOUNT_DB_URL": "postgresql://postgres:example@localhost:5433",
// "DB_URL": "postgresql://postgres:example@localhost:5433",
"DB_URL": "postgresql://root@host.docker.internal:26257/defaultdb?sslmode=disable",
"MONGO_URL": "mongodb://localhost:27017",
"TELEGRAM_DATABASE": "telegram-service",
"ELASTIC_URL": "http://localhost:9201",
"ELASTIC_URL": "http://localhost:9200",
"REKONI_URL": "http://localhost:4004",
"MODEL_VERSION": "0.6.287"
},

View File

@ -22208,7 +22208,7 @@ packages:
dev: false
file:projects/collaboration.tgz(esbuild@0.20.1)(ts-node@10.9.2):
resolution: {integrity: sha512-krhgq1XiDnWKIP/HUM8VQgEzXdxLNfDf68lZgDl/Yl2tEFUu8yLYpzd1qWVMkl8N0dXyGts+DEFC7Ntns48lgA==, tarball: file:projects/collaboration.tgz}
resolution: {integrity: sha512-aJ4uMSpM7IB3wgrjVKYm4jR3IeBYSaFvYZZFjxkriMD1fAxvjr/WpKUWxQy0q2x3gZb4SoGLoiX2d2qj2/hdhA==, tarball: file:projects/collaboration.tgz}
id: file:projects/collaboration.tgz
name: '@rush-temp/collaboration'
version: 0.0.0
@ -24713,7 +24713,7 @@ packages:
dev: false
file:projects/model-document.tgz:
resolution: {integrity: sha512-tSr57oIXY1fECAB/axaDBJLSh/RVC4BXacjVHQ4wx3y+buoNngZoX9kpJsbNxEjCpW8yyhWwO1+sseyBi9RJdg==, tarball: file:projects/model-document.tgz}
resolution: {integrity: sha512-5JcKBBX19mvQXZAg2p1z/qMSYqiR7py8mtNiHLaWKQpjHhMynutkkwGsnGq36Hb3Af8VtQiVxBZEq4rtcoph1Q==, tarball: file:projects/model-document.tgz}
name: '@rush-temp/model-document'
version: 0.0.0
dependencies:
@ -28589,7 +28589,7 @@ packages:
dev: false
file:projects/server-backup.tgz(esbuild@0.20.1)(ts-node@10.9.2):
resolution: {integrity: sha512-e0MNgA1LeSOikxK3b6X/HMuzpyQ4MjAoJtHi84X28nxD+49sEy1NctCTptyqz2qWWicAZAghKV46Qi9N9+RnEw==, tarball: file:projects/server-backup.tgz}
resolution: {integrity: sha512-uuTU9Pa0R+AlAtxZo0mStW6hAnk/Ca3B++AoFAE5WDapBBxT0mVNEoDcAd0ATDKea0fo8mbRB4blnuoQa06NbA==, tarball: file:projects/server-backup.tgz}
id: file:projects/server-backup.tgz
name: '@rush-temp/server-backup'
version: 0.0.0
@ -28604,6 +28604,7 @@ packages:
eslint-plugin-import: 2.29.1(eslint@8.56.0)
eslint-plugin-n: 15.7.0(eslint@8.56.0)
eslint-plugin-promise: 6.1.1(eslint@8.56.0)
fast-equals: 5.0.1
jest: 29.7.0(@types/node@20.11.19)(ts-node@10.9.2)
prettier: 3.2.5
prettier-plugin-svelte: 3.2.1(prettier@3.2.5)(svelte@4.2.11)

View File

@ -42,7 +42,8 @@ async function doBackup (dirName: string, token: string, endpoint: string, works
ctx.info('do backup', { workspace, endpoint })
await backup(ctx, endpoint, wsid, storage, {
force: true,
recheck: false,
freshBackup: false,
clean: false,
skipDomains: [],
timeout: 0,
connectTimeout: 60 * 1000,

View File

@ -639,6 +639,7 @@ export function devTool (
},
cmd.region,
true,
true,
5000, // 5 gigabytes per blob
async (storage, workspaceStorage) => {
if (cmd.remove) {
@ -710,7 +711,8 @@ export function devTool (
})
},
cmd.region,
true,
false,
false,
100
)
) {
@ -930,7 +932,8 @@ export function devTool (
)
.option('-bl, --blobLimit <blobLimit>', 'A blob size limit in megabytes (default 15mb)', '15')
.option('-f, --force', 'Force backup', false)
.option('-c, --recheck', 'Force hash recheck on server', false)
.option('-f, --fresh', 'Force fresh backup', false)
.option('-c, --clean', 'Force clean of old backup files, only with fresh backup option', false)
.option('-t, --timeout <timeout>', 'Connect timeout in seconds', '30')
.action(
async (
@ -939,7 +942,8 @@ export function devTool (
cmd: {
skip: string
force: boolean
recheck: boolean
fresh: boolean
clean: boolean
timeout: string
include: string
blobLimit: string
@ -951,7 +955,8 @@ export function devTool (
const endpoint = await getTransactorEndpoint(generateToken(systemAccountEmail, wsid), 'external')
await backup(toolCtx, endpoint, wsid, storage, {
force: cmd.force,
recheck: cmd.recheck,
freshBackup: cmd.fresh,
clean: cmd.clean,
include: cmd.include === '*' ? undefined : new Set(cmd.include.split(';').map((it) => it.trim())),
skipDomains: (cmd.skip ?? '').split(';').map((it) => it.trim()),
timeout: 0,

View File

@ -21,9 +21,9 @@ import core, {
DOMAIN_STATUS,
DOMAIN_TX,
generateId,
makeDocCollabId,
makeCollabJsonId,
makeCollabYdocId,
makeDocCollabId,
MeasureMetricsContext,
RateLimiter,
type AnyAttribute,
@ -208,7 +208,7 @@ async function processMigrateContentFor (
const operations: { filter: MigrationDocumentQuery<Doc>, update: MigrateUpdate<Doc> }[] = []
for (const doc of docs) {
await rateLimiter.exec(async () => {
await rateLimiter.add(async () => {
const update: MigrateUpdate<Doc> = {}
for (const attribute of attributes) {
@ -305,7 +305,7 @@ async function processMigrateJsonForDomain (
const operations: { filter: MigrationDocumentQuery<Doc>, update: MigrateUpdate<Doc> }[] = []
for (const doc of docs) {
await rateLimiter.exec(async () => {
await rateLimiter.add(async () => {
const update = await processMigrateJsonForDoc(ctx, doc, attributes, client, storageAdapter)
if (Object.keys(update).length > 0) {
operations.push({ filter: { _id: doc._id }, update })
@ -385,8 +385,8 @@ async function processMigrateJsonForDoc (
const unset = update.$unset ?? {}
update.$unset = { ...unset, [attribute.name]: 1 }
} catch (err) {
ctx.warn('failed to process collaborative doc', { workspaceId, collabId, currentYdocId, err })
} catch (err: any) {
ctx.warn('failed to process collaborative doc', { workspaceId, collabId, currentYdocId, err: err.message })
}
}
@ -427,36 +427,43 @@ export const coreOperation: MigrateOperation = {
state: 'remove-collection-txes',
func: async (client) => {
let processed = 0
let last = 0
const iterator = await client.traverse<TxCUD<Doc>>(DOMAIN_TX, {
_class: 'core:class:TxCollectionCUD' as Ref<Class<Doc>>
})
while (true) {
const txes = await iterator.next(200)
if (txes === null || txes.length === 0) break
processed += txes.length
try {
await client.create(
DOMAIN_TX,
txes.map((tx) => {
const { collection, objectId, objectClass } = tx
return {
collection,
attachedTo: objectId,
attachedToClass: objectClass,
...(tx as any).tx,
objectSpace: (tx as any).tx.objectSpace ?? tx.objectClass
}
try {
while (true) {
const txes = await iterator.next(1000)
if (txes === null || txes.length === 0) break
processed += txes.length
try {
await client.create(
DOMAIN_TX,
txes.map((tx) => {
const { collection, objectId, objectClass } = tx
return {
collection,
attachedTo: objectId,
attachedToClass: objectClass,
...(tx as any).tx,
objectSpace: (tx as any).tx.objectSpace ?? tx.objectClass
}
})
)
await client.deleteMany(DOMAIN_TX, {
_id: { $in: txes.map((it) => it._id) }
})
)
await client.deleteMany(DOMAIN_TX, {
_id: { $in: txes.map((it) => it._id) }
})
} catch (err: any) {
console.error(err)
} catch (err: any) {
console.error(err)
}
if (last !== Math.round(processed / 1000)) {
last = Math.round(processed / 1000)
console.log('processed', processed)
}
}
console.log('processed', processed)
} finally {
await iterator.close()
}
await iterator.close()
}
},
{

View File

@ -120,7 +120,7 @@ describe('client', () => {
},
close: async () => {},
loadChunk: async (domain: Domain, idx?: number, recheck?: boolean) => ({
loadChunk: async (domain: Domain, idx?: number) => ({
idx: -1,
index: -1,
docs: [],

View File

@ -60,7 +60,7 @@ export async function connect (handler: (tx: Tx) => void): Promise<ClientConnect
},
close: async () => {},
loadChunk: async (domain: Domain, idx?: number, recheck?: boolean) => ({
loadChunk: async (domain: Domain, idx?: number) => ({
idx: -1,
index: -1,
docs: [],

View File

@ -17,7 +17,7 @@ export interface DocChunk {
* @public
*/
export interface BackupClient {
loadChunk: (domain: Domain, idx?: number, recheck?: boolean) => Promise<DocChunk>
loadChunk: (domain: Domain, idx?: number) => Promise<DocChunk>
closeChunk: (idx: number) => Promise<void>
loadDocs: (domain: Domain, docs: Ref<Doc>[]) => Promise<Doc[]>

View File

@ -178,8 +178,8 @@ class ClientImpl implements AccountClient, BackupClient {
await this.conn.close()
}
async loadChunk (domain: Domain, idx?: number, recheck?: boolean): Promise<DocChunk> {
return await this.conn.loadChunk(domain, idx, recheck)
async loadChunk (domain: Domain, idx?: number): Promise<DocChunk> {
return await this.conn.loadChunk(domain, idx)
}
async closeChunk (idx: number): Promise<void> {

View File

@ -25,7 +25,6 @@ import type { WorkspaceIdWithUrl } from './utils'
export interface DocInfo {
id: string
hash: string
size: number // Aprox size
}
/**
* @public
@ -68,8 +67,7 @@ export interface SessionData {
*/
export interface LowLevelStorage {
// Low level streaming API to retrieve information
// If recheck is passed, all %hash% for documents, will be re-calculated.
find: (ctx: MeasureContext, domain: Domain, recheck?: boolean) => StorageIterator
find: (ctx: MeasureContext, domain: Domain) => StorageIterator
// Load passed documents from domain
load: (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]) => Promise<Doc[]>

View File

@ -159,6 +159,7 @@ export async function tryMigrate (client: MigrationClient, plugin: string, migra
for (const migration of migrations) {
if (states.has(migration.state)) continue
try {
console.log('running migration', plugin, migration.state)
await migration.func(client)
} catch (err: any) {
console.error(err)

View File

@ -82,7 +82,7 @@ FulltextStorage & {
return {}
},
close: async () => {},
loadChunk: async (domain: Domain, idx?: number, recheck?: boolean) => ({
loadChunk: async (domain: Domain, idx?: number) => ({
idx: -1,
index: -1,
docs: [],

View File

@ -663,8 +663,8 @@ class Connection implements ClientConnection {
})
}
loadChunk (domain: Domain, idx?: number, recheck?: boolean): Promise<DocChunk> {
return this.sendRequest({ method: 'loadChunk', params: [domain, idx, recheck] })
loadChunk (domain: Domain, idx?: number): Promise<DocChunk> {
return this.sendRequest({ method: 'loadChunk', params: [domain, idx] })
}
closeChunk (idx: number): Promise<void> {

View File

@ -35,7 +35,7 @@ describe.skip('test-backup-find', () => {
const docs: Doc[] = []
while (true) {
const chunk = await client.loadChunk(DOMAIN_TX, 0, true)
const chunk = await client.loadChunk(DOMAIN_TX, 0)
const part = await client.loadDocs(
DOMAIN_TX,
chunk.docs.map((doc) => doc.id as Ref<Doc>)

View File

@ -605,7 +605,7 @@ export async function createPushNotification (
const limiter = new RateLimiter(5)
for (const subscription of userSubscriptions) {
await limiter.exec(async () => {
await limiter.add(async () => {
await sendPushToSubscription(control, target, subscription, data)
})
}

View File

@ -91,7 +91,8 @@ export async function backupWorkspace (
externalStorage: StorageAdapter
) => DbConfiguration,
region: string,
recheck: boolean = false,
freshBackup: boolean = false,
clean: boolean = false,
downloadLimit: number,
onFinish?: (backupStorage: StorageAdapter, workspaceStorage: StorageAdapter) => Promise<void>
@ -126,7 +127,8 @@ export async function backupWorkspace (
return getConfig(ctx, mainDbUrl, workspace, branding, externalStorage)
},
region,
recheck,
freshBackup,
clean,
downloadLimit
)
if (result && onFinish !== undefined) {

View File

@ -51,6 +51,7 @@
"@hcengineering/server-tool": "^0.6.0",
"@hcengineering/server-client": "^0.6.0",
"@hcengineering/server-token": "^0.6.11",
"@hcengineering/server-core": "^0.6.1"
"@hcengineering/server-core": "^0.6.1",
"fast-equals": "^5.0.1"
}
}

View File

@ -32,6 +32,7 @@ import core, {
Ref,
SortingOrder,
systemAccountEmail,
toIdMap,
TxProcessor,
WorkspaceId,
type BackupStatus,
@ -41,9 +42,10 @@ import core, {
type TxCUD
} from '@hcengineering/core'
import { BlobClient, createClient } from '@hcengineering/server-client'
import { type StorageAdapter } from '@hcengineering/server-core'
import { estimateDocSize, type StorageAdapter } from '@hcengineering/server-core'
import { generateToken } from '@hcengineering/server-token'
import { connect } from '@hcengineering/server-tool'
import { deepEqual } from 'fast-equals'
import { createReadStream, createWriteStream, existsSync, mkdirSync, statSync } from 'node:fs'
import { rm } from 'node:fs/promises'
import { basename, dirname } from 'node:path'
@ -58,7 +60,6 @@ export * from './storage'
const dataBlobSize = 50 * 1024 * 1024
const dataUploadSize = 2 * 1024 * 1024
const retrieveChunkSize = 2 * 1024 * 1024
const defaultLevel = 9
@ -134,7 +135,7 @@ async function loadDigest (
date?: number
): Promise<Map<Ref<Doc>, string>> {
ctx = ctx.newChild('load digest', { domain, count: snapshots.length })
ctx.info('load-digest', { domain, count: snapshots.length })
ctx.info('loading-digest', { domain, snapshots: snapshots.length })
const result = new Map<Ref<Doc>, string>()
for (const s of snapshots) {
const d = s.domains[domain]
@ -186,6 +187,7 @@ async function loadDigest (
}
}
ctx.end()
ctx.info('load-digest', { domain, snapshots: snapshots.length, documents: result.size })
return result
}
async function verifyDigest (
@ -477,9 +479,8 @@ export async function cloneWorkspace (
idx = it.idx
let needRetrieve: Ref<Doc>[] = []
let needRetrieveSize = 0
for (const { id, hash, size } of it.docs) {
for (const { id, hash } of it.docs) {
processed++
if (Date.now() - st > 2500) {
ctx.info('processed', { processed, time: Date.now() - st, workspace: targetWorkspaceId.name })
@ -488,11 +489,9 @@ export async function cloneWorkspace (
changes.added.set(id as Ref<Doc>, hash)
needRetrieve.push(id as Ref<Doc>)
needRetrieveSize += size
if (needRetrieveSize > retrieveChunkSize) {
if (needRetrieve.length > 200) {
needRetrieveChunks.push(needRetrieve)
needRetrieveSize = 0
needRetrieve = []
}
}
@ -532,7 +531,7 @@ export async function cloneWorkspace (
for (const d of docs) {
if (d._class === core.class.Blob) {
const blob = d as Blob
await executor.exec(async () => {
await executor.add(async () => {
try {
ctx.info('clone blob', { name: blob._id, contentType: blob.contentType })
const readable = await storageAdapter.get(ctx, sourceWorkspaceId, blob._id)
@ -662,7 +661,8 @@ export async function backup (
include?: Set<string>
skipDomains: string[]
force: boolean
recheck: boolean
freshBackup: boolean // If passed as true, will download all documents except blobs as new backup
clean: boolean // If set will perform a clena of old backup files
timeout: number
connectTimeout: number
skipBlobContentTypes: string[]
@ -676,7 +676,8 @@ export async function backup (
token?: string
} = {
force: false,
recheck: false,
freshBackup: false,
clean: false,
timeout: 0,
skipDomains: [],
connectTimeout: 30000,
@ -693,7 +694,7 @@ export async function backup (
ctx = ctx.newChild('backup', {
workspaceId: workspaceId.name,
force: options.force,
recheck: options.recheck,
recheck: options.freshBackup,
timeout: options.timeout
})
@ -741,7 +742,7 @@ export async function backup (
let lastTxChecked = false
// Skip backup if there is no transaction changes.
if (options.getLastTx !== undefined) {
if (options.getLastTx !== undefined && !options.freshBackup) {
lastTx = await options.getLastTx()
if (lastTx !== undefined) {
if (lastTx._id === backupInfo.lastTxId && !options.force) {
@ -771,7 +772,7 @@ export async function backup (
options.connectTimeout
)) as CoreClient & BackupClient)
if (!lastTxChecked) {
if (!lastTxChecked && !options.freshBackup) {
lastTx = await connection.findOne(
core.class.Tx,
{ objectSpace: { $ne: core.space.Model } },
@ -888,19 +889,13 @@ export async function backup (
}
while (true) {
try {
const currentChunk = await ctx.with('loadChunk', {}, () => connection.loadChunk(domain, idx, options.recheck))
const currentChunk = await ctx.with('loadChunk', {}, () => connection.loadChunk(domain, idx))
idx = currentChunk.idx
ops++
let needRetrieve: Ref<Doc>[] = []
let currentNeedRetrieveSize = 0
for (const { id, hash, size } of currentChunk.docs) {
if (domain === DOMAIN_BLOB) {
result.blobsSize += size
} else {
result.dataSize += size
}
for (const { id, hash } of currentChunk.docs) {
processed++
if (Date.now() - st > 2500) {
ctx.info('processed', {
@ -911,19 +906,18 @@ export async function backup (
})
st = Date.now()
}
const _hash = doTrimHash(hash) as string
const kHash = doTrimHash(digest.get(id as Ref<Doc>) ?? oldHash.get(id as Ref<Doc>))
if (kHash !== undefined) {
const serverDocHash = doTrimHash(hash) as string
const currentHash = doTrimHash(digest.get(id as Ref<Doc>) ?? oldHash.get(id as Ref<Doc>))
if (currentHash !== undefined) {
if (digest.delete(id as Ref<Doc>)) {
oldHash.set(id as Ref<Doc>, kHash)
oldHash.set(id as Ref<Doc>, currentHash)
}
if (kHash !== _hash) {
if (currentHash !== serverDocHash || (options.freshBackup && domain !== DOMAIN_BLOB)) {
if (changes.updated.has(id as Ref<Doc>)) {
removeFromNeedRetrieve(needRetrieve, id as Ref<Doc>)
}
changes.updated.set(id as Ref<Doc>, _hash)
changes.updated.set(id as Ref<Doc>, serverDocHash)
needRetrieve.push(id as Ref<Doc>)
currentNeedRetrieveSize += size
changed++
} else if (changes.updated.has(id as Ref<Doc>)) {
// We have same
@ -936,22 +930,19 @@ export async function backup (
// We need to clean old need retrieve in case of duplicates.
removeFromNeedRetrieve(needRetrieve, id)
}
changes.added.set(id as Ref<Doc>, _hash)
changes.added.set(id as Ref<Doc>, serverDocHash)
needRetrieve.push(id as Ref<Doc>)
changed++
currentNeedRetrieveSize += size
}
if (currentNeedRetrieveSize > retrieveChunkSize) {
if (needRetrieve.length > 0) {
needRetrieveChunks.push(needRetrieve)
}
currentNeedRetrieveSize = 0
if (needRetrieve.length > 200) {
needRetrieveChunks.push(needRetrieve)
needRetrieve = []
}
}
if (needRetrieve.length > 0) {
needRetrieveChunks.push(needRetrieve)
needRetrieve = []
}
if (currentChunk.finished) {
ctx.info('processed-end', {
@ -1036,6 +1027,8 @@ export async function backup (
global.gc?.()
} catch (err) {}
let lastSize = 0
while (needRetrieveChunks.length > 0) {
if (canceled()) {
return
@ -1048,11 +1041,13 @@ export async function backup (
ctx.info('Retrieve chunk', {
needRetrieve: needRetrieveChunks.reduce((v, docs) => v + docs.length, 0),
toLoad: needRetrieve.length,
workspace: workspaceId.name
workspace: workspaceId.name,
lastSize: Math.round((lastSize * 100) / (1024 * 1024)) / 100
})
let docs: Doc[] = []
try {
docs = await ctx.with('load-docs', {}, async (ctx) => await connection.loadDocs(domain, needRetrieve))
lastSize = docs.reduce((p, it) => p + estimateDocSize(it), 0)
if (docs.length !== needRetrieve.length) {
const nr = new Set(docs.map((it) => it._id))
ctx.error('failed to retrieve all documents', { missing: needRetrieve.filter((it) => !nr.has(it)) })
@ -1146,6 +1141,12 @@ export async function backup (
break
}
if (domain === DOMAIN_BLOB) {
result.blobsSize += (d as Blob).size
} else {
result.dataSize += JSON.stringify(d).length
}
function processChanges (d: Doc, error: boolean = false): void {
processed++
progress(10 + (processed / totalChunks) * 90)
@ -1319,6 +1320,41 @@ export async function backup (
}
let processed = 0
if (!canceled()) {
backupInfo.lastTxId = lastTx?._id ?? '0' // We could store last tx, since full backup is complete
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
if (options.freshBackup && options.clean) {
// Preparing a list of files to clean
ctx.info('Cleaning old backup files...')
for (const sn of backupInfo.snapshots.slice(0, backupInfo.snapshots.length - 1)) {
const filesToDelete: string[] = []
for (const [domain, dsn] of [...Object.entries(sn.domains)]) {
if (domain === DOMAIN_BLOB) {
continue
}
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete (sn.domains as any)[domain]
filesToDelete.push(...(dsn.snapshots ?? []))
filesToDelete.push(...(dsn.storage ?? []))
if (dsn.snapshot !== undefined) {
filesToDelete.push(dsn.snapshot)
}
}
for (const file of filesToDelete) {
ctx.info('Removing file...', { file })
await storage.delete(file)
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete sizeInfo[file]
}
}
ctx.info('Cleaning complete...')
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
}
}
const addFileSize = async (file: string | undefined | null): Promise<void> => {
if (file != null) {
const sz = sizeInfo[file]
@ -1588,6 +1624,8 @@ export async function backupFind (storage: BackupStorage, id: Ref<Doc>, domain?:
/**
* @public
* Restore state of DB to specified point.
*
* Recheck mean we download and compare every document on our side and if found difference upload changed version to server.
*/
export async function restore (
ctx: MeasureContext,
@ -1689,7 +1727,7 @@ export async function restore (
try {
while (true) {
const st = Date.now()
const it = await connection.loadChunk(c, idx, opt.recheck)
const it = await connection.loadChunk(c, idx)
chunks++
idx = it.idx
@ -1723,11 +1761,13 @@ export async function restore (
// Let's find difference
const docsToAdd = new Map(
Array.from(changeset.entries()).filter(
([it]) =>
!serverChangeset.has(it) ||
(serverChangeset.has(it) && doTrimHash(serverChangeset.get(it)) !== doTrimHash(changeset.get(it)))
)
opt.recheck === true // If recheck we check all documents.
? Array.from(changeset.entries())
: Array.from(changeset.entries()).filter(
([it]) =>
!serverChangeset.has(it) ||
(serverChangeset.has(it) && doTrimHash(serverChangeset.get(it)) !== doTrimHash(changeset.get(it)))
)
)
const docsToRemove = Array.from(serverChangeset.keys()).filter((it) => !changeset.has(it))
@ -1738,15 +1778,12 @@ export async function restore (
async function sendChunk (doc: Doc | undefined, len: number): Promise<void> {
if (doc !== undefined) {
docsToAdd.delete(doc._id)
if (opt.recheck === true) {
// We need to clear %hash% in case our is wrong.
delete (doc as any)['%hash%']
}
docs.push(doc)
}
sendSize = sendSize + len
if (sendSize > dataUploadSize || (doc === undefined && docs.length > 0)) {
let docsToSend = docs
totalSend += docs.length
ctx.info('upload-' + c, {
docs: docs.length,
@ -1757,32 +1794,42 @@ export async function restore (
})
// Correct docs without space
for (const d of docs) {
if (d._class === core.class.DocIndexState) {
// We need to clean old stuff from restored document.
if ('stages' in d) {
delete (d as any).stages
delete (d as any).attributes
;(d as any).needIndex = true
;(d as any)['%hash%'] = ''
}
}
if (d.space == null) {
d.space = core.space.Workspace
;(d as any)['%hash%'] = ''
}
if (TxProcessor.isExtendsCUD(d._class)) {
const tx = d as TxCUD<Doc>
if (tx.objectSpace == null) {
tx.objectSpace = core.space.Workspace
;(tx as any)['%hash%'] = ''
}
}
}
try {
await connection.upload(c, docs)
} catch (err: any) {
ctx.error('error during upload', { err, docs: JSON.stringify(docs) })
if (opt.recheck === true) {
// We need to download all documents and compare them.
const serverDocs = toIdMap(
await connection.loadDocs(
c,
docs.map((it) => it._id)
)
)
docsToSend = docs.filter((doc) => {
const serverDoc = serverDocs.get(doc._id)
if (serverDoc !== undefined) {
const { '%hash%': _h1, ...dData } = doc as any
const { '%hash%': _h2, ...sData } = serverDoc as any
return !deepEqual(dData, sData)
}
return true
})
} else {
try {
await connection.upload(c, docsToSend)
} catch (err: any) {
ctx.error('error during upload', { err, docs: JSON.stringify(docs) })
}
}
docs.length = 0
sendSize = 0
@ -1964,7 +2011,7 @@ export async function restore (
if (opt.skip?.has(c) === true) {
continue
}
await limiter.exec(async () => {
await limiter.add(async () => {
ctx.info('processing domain', { domain: c, workspaceId: workspaceId.name })
let retry = 5
let delay = 1

View File

@ -65,7 +65,8 @@ class BackupWorker {
externalStorage: StorageAdapter
) => DbConfiguration,
readonly region: string,
readonly recheck: boolean = false
readonly freshWorkspace: boolean = false,
readonly clean: boolean = false
) {}
canceled = false
@ -186,7 +187,8 @@ class BackupWorker {
backup(ctx, '', getWorkspaceId(ws.workspace), storage, {
skipDomains: [],
force: true,
recheck: this.recheck,
freshBackup: this.freshWorkspace,
clean: this.clean,
timeout: this.config.Timeout * 1000,
connectTimeout: 5 * 60 * 1000, // 5 minutes to,
blobDownloadLimit: this.downloadLimit,
@ -303,7 +305,8 @@ export async function doBackupWorkspace (
externalStorage: StorageAdapter
) => DbConfiguration,
region: string,
recheck: boolean,
freshWorkspace: boolean,
clean: boolean,
downloadLimit: number
): Promise<boolean> {
const backupWorker = new BackupWorker(
@ -313,7 +316,8 @@ export async function doBackupWorkspace (
workspaceStorageAdapter,
getConfig,
region,
recheck
freshWorkspace,
clean
)
backupWorker.downloadLimit = downloadLimit
const { processed } = await backupWorker.doBackup(ctx, [workspace], Number.MAX_VALUE)

View File

@ -101,6 +101,15 @@ export function initStatisticsContext (
let oldMetricsValue = ''
const serviceId = encodeURIComponent(os.hostname() + '-' + serviceName)
const handleError = (err: any): void => {
errorToSend++
if (errorToSend % 2 === 0) {
if (err.code !== 'UND_ERR_SOCKET') {
console.error(err)
}
}
}
const intTimer = setInterval(() => {
try {
if (metricsFile !== undefined || ops?.logConsole === true) {
@ -138,18 +147,10 @@ export function initStatisticsContext (
},
body: statData
}
).catch((err) => {
errorToSend++
if (errorToSend % 2 === 0) {
console.error(err)
}
})
).catch(handleError)
}
} catch (err: any) {
errorToSend++
if (errorToSend % 20 === 0) {
console.error(err)
}
handleError(err)
}
}, METRICS_UPDATE_INTERVAL)

View File

@ -9,7 +9,6 @@ import {
type StorageIterator,
type WorkspaceId
} from '@hcengineering/core'
import { estimateDocSize } from './utils'
export * from '@hcengineering/storage'
@ -20,7 +19,7 @@ export function getBucketId (workspaceId: WorkspaceId): string {
return toWorkspaceString(workspaceId)
}
const chunkSize = 512 * 1024
const chunkSize = 200
/**
* @public
@ -44,8 +43,7 @@ export class BackupClientOps {
loadChunk (
ctx: MeasureContext,
domain: Domain,
idx?: number,
recheck?: boolean
idx?: number
): Promise<{
idx: number
docs: DocInfo[]
@ -64,22 +62,18 @@ export class BackupClientOps {
}
}
} else {
chunk = { idx, iterator: this.storage.find(ctx, domain, recheck), finished: false, index: 0 }
chunk = { idx, iterator: this.storage.find(ctx, domain), finished: false, index: 0 }
this.chunkInfo.set(idx, chunk)
}
let size = 0
const docs: DocInfo[] = []
while (size < chunkSize) {
while (docs.length < chunkSize) {
const _docs = await chunk.iterator.next(ctx)
if (_docs.length === 0) {
chunk.finished = true
break
}
for (const doc of _docs) {
size += estimateDocSize(doc)
docs.push(doc)
}
docs.push(..._docs)
}
return {

View File

@ -12,7 +12,6 @@ import core, {
type Class,
type Client,
type Doc,
type DocInfo,
type MeasureContext,
type ModelDb,
type Ref,
@ -21,7 +20,7 @@ import core, {
type WorkspaceIdWithUrl
} from '@hcengineering/core'
import platform, { PlatformError, Severity, Status, unknownError } from '@hcengineering/platform'
import { createHash, type Hash } from 'crypto'
import { type Hash } from 'crypto'
import fs from 'fs'
import { BackupClientOps } from './storage'
import type { Pipeline } from './types'
@ -86,7 +85,7 @@ export function estimateDocSize (_obj: any): number {
return result
}
/**
* Return some estimation for object size
* Calculate hash for object
*/
export function updateHashForDoc (hash: Hash, _obj: any): void {
const toProcess = [_obj]
@ -98,7 +97,9 @@ export function updateHashForDoc (hash: Hash, _obj: any): void {
if (typeof obj === 'function') {
continue
}
for (const key in obj) {
const keys = Object.keys(obj).sort()
// We need sorted list of keys to make it consistent
for (const key of keys) {
// include prototype properties
const value = obj[key]
const type = getTypeOf(value)
@ -220,38 +221,6 @@ export function loadBrandingMap (brandingPath?: string): BrandingMap {
return brandings
}
export function toDocInfo (d: Doc, bulkUpdate: Map<Ref<Doc>, string>, recheck?: boolean): DocInfo {
let digest: string | null = (d as any)['%hash%']
if ('%hash%' in d) {
delete d['%hash%']
}
const pos = (digest ?? '').indexOf('|')
const oldDigest = digest
if (digest == null || digest === '' || recheck === true) {
const size = estimateDocSize(d)
const hash = createHash('sha256')
updateHashForDoc(hash, d)
digest = hash.digest('base64')
const newDigest = `${digest}|${size.toString(16)}`
if (recheck !== true || oldDigest !== newDigest) {
bulkUpdate.set(d._id, `${digest}|${size.toString(16)}`)
}
return {
id: d._id,
hash: digest,
size
}
} else {
return {
id: d._id,
hash: pos >= 0 ? digest.slice(0, pos) : digest,
size: pos >= 0 ? parseInt(digest.slice(pos + 1), 16) : 0
}
}
}
export function wrapPipeline (
ctx: MeasureContext,
pipeline: Pipeline,
@ -284,7 +253,7 @@ export function wrapPipeline (
closeChunk: (idx) => backupOps.closeChunk(ctx, idx),
getHierarchy: () => pipeline.context.hierarchy,
getModel: () => pipeline.context.modelDb,
loadChunk: (domain, idx, recheck) => backupOps.loadChunk(ctx, domain, idx, recheck),
loadChunk: (domain, idx) => backupOps.loadChunk(ctx, domain, idx),
loadDocs: (domain, docs) => backupOps.loadDocs(ctx, domain, docs),
upload: (domain, docs) => backupOps.upload(ctx, domain, docs),
searchFulltext: async (query, options) => ({ docs: [], total: 0 }),

View File

@ -402,7 +402,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
): Promise<{ classUpdate: Ref<Class<Doc>>[], processed: number }> {
const _classUpdate = new Set<Ref<Class<Doc>>>()
let processed = 0
await rateLimiter.exec(async () => {
await rateLimiter.add(async () => {
let st = Date.now()
let groupBy = await this.storage.groupBy(ctx, DOMAIN_DOC_INDEX_STATE, 'objectClass', { needIndex: true })
@ -581,7 +581,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
if (docs.length === 0) {
return
}
await pushQueue.exec(async () => {
await pushQueue.add(async () => {
try {
try {
await ctx.with('push-elastic', {}, () => this.fulltextAdapter.updateMany(ctx, this.workspace, docs))
@ -646,7 +646,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
const docState = valueIds.get(doc._id as Ref<DocIndexState>) as WithLookup<DocIndexState>
const indexedDoc = createIndexedDoc(doc, this.hierarchy.findAllMixins(doc), doc.space)
await rateLimit.exec(async () => {
await rateLimit.add(async () => {
await ctx.with('process-document', { _class: doc._class }, async (ctx) => {
try {
// Copy content attributes as well.

View File

@ -43,8 +43,8 @@ export class LowLevelMiddleware extends BaseMiddleware implements Middleware {
}
const adapterManager = context.adapterManager
context.lowLevelStorage = {
find (ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator {
return adapterManager.getAdapter(domain, false).find(ctx, domain, recheck)
find (ctx: MeasureContext, domain: Domain): StorageIterator {
return adapterManager.getAdapter(domain, false).find(ctx, domain)
},
load (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {

View File

@ -62,8 +62,6 @@ import core, {
type WorkspaceId
} from '@hcengineering/core'
import {
estimateDocSize,
toDocInfo,
type DbAdapter,
type DbAdapterHandler,
type DomainHelperOperations,
@ -86,8 +84,8 @@ import {
} from 'mongodb'
import { DBCollectionHelper, getMongoClient, getWorkspaceMongoDB, type MongoClientReference } from './utils'
function translateDoc (doc: Doc): Doc {
return { ...doc, '%hash%': null } as any
function translateDoc (doc: Doc, hash: string): Doc {
return { ...doc, '%hash%': hash } as any
}
function isLookupQuery<T extends Doc> (query: DocumentQuery<T>): boolean {
@ -253,14 +251,14 @@ abstract class MongoAdapterBase implements DbAdapter {
operations: DocumentUpdate<T>
): Promise<void> {
if (isOperator(operations)) {
await this.db.collection(domain).updateMany(this.translateRawQuery(query), { $set: { '%hash%': null } })
await this.db.collection(domain).updateMany(this.translateRawQuery(query), { $set: { '%hash%': this.curHash() } })
await this.db
.collection(domain)
.updateMany(this.translateRawQuery(query), { ...operations } as unknown as UpdateFilter<Document>)
} else {
await this.db
.collection(domain)
.updateMany(this.translateRawQuery(query), { $set: { ...operations, '%hash%': null } })
.updateMany(this.translateRawQuery(query), { $set: { ...operations, '%hash%': this.curHash() } })
}
}
@ -1023,66 +1021,57 @@ abstract class MongoAdapterBase implements DbAdapter {
return docs
}
find (_ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator {
curHash (): string {
return Date.now().toString(16) // Current hash value
}
strimSize (str: string): string {
const pos = str.indexOf('|')
if (pos > 0) {
return str.substring(0, pos)
}
return str
}
find (_ctx: MeasureContext, domain: Domain): StorageIterator {
const ctx = _ctx.newChild('find', { domain })
const coll = this.db.collection<Doc>(domain)
let mode: 'hashed' | 'non-hashed' = 'hashed'
let iterator: FindCursor<Doc>
const bulkUpdate = new Map<Ref<Doc>, string>()
const flush = async (flush = false): Promise<void> => {
if (bulkUpdate.size > 1000 || flush) {
if (bulkUpdate.size > 0) {
await ctx.with('bulk-write-find', {}, () =>
coll.bulkWrite(
Array.from(bulkUpdate.entries()).map((it) => ({
updateOne: {
filter: { _id: it[0], '%hash%': null },
update: { $set: { '%hash%': it[1] } }
}
})),
{ ordered: false }
)
)
}
bulkUpdate.clear()
}
}
return {
next: async () => {
if (iterator === undefined) {
await coll.updateMany({ '%hash%': { $in: [null, ''] } }, { $set: { '%hash%': this.curHash() } })
iterator = coll.find(
recheck === true ? {} : { '%hash%': { $nin: ['', null] } },
recheck === true
? {}
: {
projection: {
'%hash%': 1,
_id: 1
}
}
{},
{
projection: {
'%hash%': 1,
_id: 1
}
}
)
}
let d = await ctx.with('next', { mode }, () => iterator.next())
if (d == null && mode === 'hashed' && recheck !== true) {
mode = 'non-hashed'
await iterator.close()
await flush(true) // We need to flush, so wrong id documents will be updated.
iterator = coll.find({ '%hash%': { $in: ['', null] } })
d = await ctx.with('next', { mode }, () => iterator.next())
}
const d = await ctx.with('next', {}, () => iterator.next())
const result: DocInfo[] = []
if (d != null) {
result.push(toDocInfo(d, bulkUpdate, recheck))
result.push({
id: d._id,
hash: this.strimSize((d as any)['%hash%'])
})
}
if (iterator.bufferedCount() > 0) {
result.push(...iterator.readBufferedDocuments().map((it) => toDocInfo(it, bulkUpdate, recheck)))
result.push(
...iterator.readBufferedDocuments().map((it) => ({
id: it._id,
hash: this.strimSize((it as any)['%hash%'])
}))
)
}
await ctx.with('flush', {}, () => flush())
return result
},
close: async () => {
await ctx.with('flush', {}, () => flush(true))
await ctx.with('close', {}, () => iterator.close())
ctx.end()
}
@ -1104,7 +1093,7 @@ abstract class MongoAdapterBase implements DbAdapter {
return ctx.with('upload', { domain }, (ctx) => {
const coll = this.collection(domain)
return uploadDocuments(ctx, docs, coll)
return uploadDocuments(ctx, docs, coll, this.curHash())
})
}
@ -1137,7 +1126,7 @@ abstract class MongoAdapterBase implements DbAdapter {
updateOne: {
filter: { _id: it[0] },
update: {
$set: { ...set, '%hash%': null },
$set: { ...set, '%hash%': this.curHash() },
...($unset !== undefined ? { $unset } : {})
}
}
@ -1419,7 +1408,7 @@ class MongoAdapter extends MongoAdapterBase {
protected txCreateDoc (bulk: OperationBulk, tx: TxCreateDoc<Doc>): void {
const doc = TxProcessor.createDoc2Doc(tx)
bulk.add.push(translateDoc(doc))
bulk.add.push(translateDoc(doc, this.curHash()))
}
protected txUpdateDoc (bulk: OperationBulk, tx: TxUpdateDoc<Doc>): void {
@ -1439,7 +1428,7 @@ class MongoAdapter extends MongoAdapterBase {
update: {
$set: {
...Object.fromEntries(Object.entries(desc.$update).map((it) => [arr + '.$.' + it[0], it[1]])),
'%hash%': null
'%hash%': this.curHash()
}
}
}
@ -1451,7 +1440,7 @@ class MongoAdapter extends MongoAdapterBase {
$set: {
modifiedBy: tx.modifiedBy,
modifiedOn: tx.modifiedOn,
'%hash%': null
'%hash%': this.curHash()
}
}
}
@ -1470,7 +1459,7 @@ class MongoAdapter extends MongoAdapterBase {
$set: {
modifiedBy: tx.modifiedBy,
modifiedOn: tx.modifiedOn,
'%hash%': null
'%hash%': this.curHash()
}
} as unknown as UpdateFilter<Document>,
{ returnDocument: 'after', includeResultMetadata: true }
@ -1488,7 +1477,7 @@ class MongoAdapter extends MongoAdapterBase {
$set: {
modifiedBy: tx.modifiedBy,
modifiedOn: tx.modifiedOn,
'%hash%': null
'%hash%': this.curHash()
}
}
}
@ -1506,7 +1495,7 @@ class MongoAdapter extends MongoAdapterBase {
...tx.operations,
modifiedBy: tx.modifiedBy,
modifiedOn: tx.modifiedOn,
'%hash%': null
'%hash%': this.curHash()
})) {
;(upd as any)[k] = v
}
@ -1552,8 +1541,9 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
{ domain: 'tx' },
async () => {
try {
const hash = this.curHash()
await this.txCollection().insertMany(
baseTxes.map((it) => translateDoc(it)),
baseTxes.map((it) => translateDoc(it, hash)),
{
ordered: false
}
@ -1582,8 +1572,9 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
{ domain: DOMAIN_MODEL_TX },
async () => {
try {
const hash = this.curHash()
await this.db.collection<Doc>(DOMAIN_MODEL_TX).insertMany(
modelTxes.map((it) => translateDoc(it)),
modelTxes.map((it) => translateDoc(it, hash)),
{
ordered: false
}
@ -1637,22 +1628,23 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
}
}
export async function uploadDocuments (ctx: MeasureContext, docs: Doc[], coll: Collection<Document>): Promise<void> {
export async function uploadDocuments (
ctx: MeasureContext,
docs: Doc[],
coll: Collection<Document>,
curHash: string
): Promise<void> {
const ops = Array.from(docs)
while (ops.length > 0) {
const part = ops.splice(0, 500)
await coll.bulkWrite(
part.map((it) => {
const digest: string | null = (it as any)['%hash%']
if ('%hash%' in it) {
delete it['%hash%']
}
const size = digest != null ? estimateDocSize(it) : 0
const digest: string = (it as any)['%hash%'] ?? curHash
return {
replaceOne: {
filter: { _id: it._id },
replacement: { ...it, '%hash%': digest == null ? null : `${digest}|${size.toString(16)}` },
replacement: { ...it, '%hash%': digest },
upsert: true
}
}

View File

@ -31,7 +31,7 @@ BEGIN
EXECUTE format('
ALTER TABLE %I ADD COLUMN "%%hash%%" text;', tbl_name);
EXECUTE format('
UPDATE %I SET "%%hash%%" = data->>''%%data%%'';', tbl_name);
UPDATE %I SET "%%hash%%" = data->>''%%hash%%'';', tbl_name);
END IF;
END LOOP;
END $$;

View File

@ -59,9 +59,7 @@ import {
type DbAdapter,
type DbAdapterHandler,
type DomainHelperOperations,
estimateDocSize,
type ServerFindOptions,
toDocInfo,
type TxAdapter
} from '@hcengineering/server-core'
import type postgres from 'postgres'
@ -96,7 +94,7 @@ async function * createCursorGenerator (
client: postgres.ReservedSql,
sql: string,
schema: Schema,
bulkSize = 50
bulkSize = 1000
): AsyncGenerator<Doc[]> {
const cursor = client.unsafe(sql).cursor(bulkSize)
try {
@ -447,8 +445,8 @@ abstract class PostgresAdapterBase implements DbAdapter {
;(operations as any) = { ...(operations as any).$set }
}
const isOps = isOperator(operations)
if ((operations as any)['%hash%'] === undefined) {
;(operations as any)['%hash%'] = null
if ((operations as any)['%hash%'] == null) {
;(operations as any)['%hash%'] = this.curHash()
}
const schemaFields = getSchemaAndFields(domain)
if (isOps) {
@ -459,7 +457,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
if (doc === undefined) continue
const prevAttachedTo = (doc as any).attachedTo
TxProcessor.applyUpdate(doc, operations)
;(doc as any)['%hash%'] = null
;(doc as any)['%hash%'] = this.curHash()
const converted = convertDoc(domain, doc, this.workspaceId.name, schemaFields)
const params: any[] = [doc._id, this.workspaceId.name]
let paramsIndex = params.length + 1
@ -542,73 +540,78 @@ abstract class PostgresAdapterBase implements DbAdapter {
query: DocumentQuery<T>,
options?: ServerFindOptions<T>
): Promise<FindResult<T>> {
return ctx.with('findAll', { _class }, async () => {
try {
const domain = translateDomain(options?.domain ?? this.hierarchy.getDomain(_class))
const sqlChunks: string[] = []
const joins = this.buildJoin(_class, options?.lookup)
if (options?.domainLookup !== undefined) {
const baseDomain = translateDomain(this.hierarchy.getDomain(_class))
let fquery = ''
return ctx.with(
'findAll',
{},
async () => {
try {
const domain = translateDomain(options?.domain ?? this.hierarchy.getDomain(_class))
const sqlChunks: string[] = []
const joins = this.buildJoin(_class, options?.lookup)
if (options?.domainLookup !== undefined) {
const baseDomain = translateDomain(this.hierarchy.getDomain(_class))
const domain = translateDomain(options.domainLookup.domain)
const key = options.domainLookup.field
const as = `dl_lookup_${domain}_${key}`
joins.push({
isReverse: false,
table: domain,
path: options.domainLookup.field,
toAlias: as,
toField: '_id',
fromField: key,
fromAlias: baseDomain,
toClass: undefined
})
const domain = translateDomain(options.domainLookup.domain)
const key = options.domainLookup.field
const as = `dl_lookup_${domain}_${key}`
joins.push({
isReverse: false,
table: domain,
path: options.domainLookup.field,
toAlias: as,
toField: '_id',
fromField: key,
fromAlias: baseDomain,
toClass: undefined
})
}
const select = `SELECT ${this.getProjection(domain, options?.projection, joins)} FROM ${domain}`
const secJoin = this.addSecurity(query, domain, ctx.contextData)
if (secJoin !== undefined) {
sqlChunks.push(secJoin)
}
if (joins.length > 0) {
sqlChunks.push(this.buildJoinString(joins))
}
sqlChunks.push(`WHERE ${this.buildQuery(_class, domain, query, joins, options)}`)
return (await this.mgr.read(ctx.id, async (connection) => {
let total = options?.total === true ? 0 : -1
if (options?.total === true) {
const totalReq = `SELECT COUNT(${domain}._id) as count FROM ${domain}`
const totalSql = [totalReq, ...sqlChunks].join(' ')
const totalResult = await connection.unsafe(totalSql)
const parsed = Number.parseInt(totalResult[0].count)
total = Number.isNaN(parsed) ? 0 : parsed
}
if (options?.sort !== undefined) {
sqlChunks.push(this.buildOrder(_class, domain, options.sort, joins))
}
if (options?.limit !== undefined) {
sqlChunks.push(`LIMIT ${options.limit}`)
}
const finalSql: string = [select, ...sqlChunks].join(' ')
fquery = finalSql
const result = await connection.unsafe(finalSql)
if (options?.lookup === undefined && options?.domainLookup === undefined) {
return toFindResult(
result.map((p) => parseDocWithProjection(p as any, domain, options?.projection)),
total
)
} else {
const res = this.parseLookup<T>(result, joins, options?.projection, domain)
return toFindResult(res, total)
}
})) as FindResult<T>
} catch (err) {
ctx.error('Error in findAll', { err })
throw err
}
const select = `SELECT ${this.getProjection(domain, options?.projection, joins)} FROM ${domain}`
const secJoin = this.addSecurity(query, domain, ctx.contextData)
if (secJoin !== undefined) {
sqlChunks.push(secJoin)
}
if (joins.length > 0) {
sqlChunks.push(this.buildJoinString(joins))
}
sqlChunks.push(`WHERE ${this.buildQuery(_class, domain, query, joins, options)}`)
const findId = ctx.id ?? generateId()
return (await this.mgr.read(findId, async (connection) => {
let total = options?.total === true ? 0 : -1
if (options?.total === true) {
const totalReq = `SELECT COUNT(${domain}._id) as count FROM ${domain}`
const totalSql = [totalReq, ...sqlChunks].join(' ')
const totalResult = await connection.unsafe(totalSql)
const parsed = Number.parseInt(totalResult[0].count)
total = Number.isNaN(parsed) ? 0 : parsed
}
if (options?.sort !== undefined) {
sqlChunks.push(this.buildOrder(_class, domain, options.sort, joins))
}
if (options?.limit !== undefined) {
sqlChunks.push(`LIMIT ${options.limit}`)
}
const finalSql: string = [select, ...sqlChunks].join(' ')
const result = await connection.unsafe(finalSql)
if (options?.lookup === undefined && options?.domainLookup === undefined) {
return toFindResult(
result.map((p) => parseDocWithProjection(p as any, domain, options?.projection)),
total
)
} else {
const res = this.parseLookup<T>(result, joins, options?.projection, domain)
return toFindResult(res, total)
}
})) as FindResult<T>
} catch (err) {
ctx.error('Error in findAll', { err })
throw err
}
})
},
() => ({ fquery })
)
}
addSecurity<T extends Doc>(query: DocumentQuery<T>, domain: string, sessionContext: SessionData): string | undefined {
@ -1231,6 +1234,10 @@ abstract class PostgresAdapterBase implements DbAdapter {
return res
}
curHash (): string {
return Date.now().toString(16) // Current hash value
}
private getProjection<T extends Doc>(
baseDomain: string,
projection: Projection<T> | undefined,
@ -1269,60 +1276,31 @@ abstract class PostgresAdapterBase implements DbAdapter {
return []
}
find (_ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator {
strimSize (str: string): string {
const pos = str.indexOf('|')
if (pos > 0) {
return str.substring(0, pos)
}
return str
}
find (_ctx: MeasureContext, domain: Domain): StorageIterator {
const ctx = _ctx.newChild('find', { domain })
let initialized: boolean = false
let client: postgres.ReservedSql
let mode: 'hashed' | 'non_hashed' = 'hashed'
const bulkUpdate = new Map<Ref<Doc>, string>()
const tdomain = translateDomain(domain)
const schema = getSchema(domain)
const findId = generateId()
const flush = async (flush = false): Promise<void> => {
if (bulkUpdate.size > 1000 || flush) {
if (bulkUpdate.size > 0) {
const entries = Array.from(bulkUpdate.entries())
bulkUpdate.clear()
try {
while (entries.length > 0) {
const part = entries.splice(0, 200)
const data: string[] = part.flat()
const indexes = part.map((val, idx) => `($${2 * idx + 1}::text, $${2 * idx + 2}::text)`).join(', ')
await ctx.with('bulk-write-find', {}, () => {
return this.mgr.write(
findId,
async (client) =>
await client.unsafe(
`
UPDATE ${tdomain} SET "%hash%" = update_data.hash
FROM (values ${indexes}) AS update_data(_id, hash)
WHERE ${tdomain}."workspaceId" = '${this.workspaceId.name}' AND ${tdomain}."_id" = update_data._id
`,
data
)
)
})
}
} catch (err: any) {
ctx.error('failed to update hash', { err })
}
}
}
}
const workspaceId = this.workspaceId
function createBulk (projection: string, query: string, limit = 50): AsyncGenerator<Doc[]> {
const sql = `SELECT ${projection} FROM ${tdomain} WHERE "workspaceId" = '${workspaceId.name}' AND ${query}`
function createBulk (projection: string, limit = 50000): AsyncGenerator<Doc[]> {
const sql = `SELECT ${projection} FROM ${tdomain} WHERE "workspaceId" = '${workspaceId.name}'`
return createCursorGenerator(client, sql, schema, limit)
}
let bulk: AsyncGenerator<Doc[]>
let forcedRecheck = false
return {
next: async () => {
@ -1331,60 +1309,29 @@ abstract class PostgresAdapterBase implements DbAdapter {
client = await this.client.reserve()
}
if (recheck === true) {
await this.mgr.write(
findId,
async (client) =>
await client`UPDATE ${client(tdomain)} SET "%hash%" = NULL WHERE "workspaceId" = ${this.workspaceId.name} AND "%hash%" IS NOT NULL`
)
}
// We need update hash to be set properly
await client.unsafe(
`UPDATE ${tdomain} SET "%hash%" = '${this.curHash()}' WHERE "workspaceId" = '${this.workspaceId.name}' AND "%hash%" IS NULL OR "%hash%" = ''`
)
initialized = true
await flush(true) // We need to flush, so wrong id documents will be updated.
bulk = createBulk('_id, "%hash%"', '"%hash%" IS NOT NULL AND "%hash%" <> \'\'')
bulk = createBulk('_id, "%hash%"')
}
let docs = await ctx.with('next', { mode }, () => bulk.next())
if (!forcedRecheck && docs.done !== true && docs.value?.length > 0) {
// Check if we have wrong hash stored, and update all of them.
forcedRecheck = true
for (const d of docs.value) {
const digest: string | null = (d as any)['%hash%']
const pos = (digest ?? '').indexOf('|')
if (pos === -1) {
await bulk.return([]) // We need to close generator
docs = { done: true, value: undefined }
await this.mgr.write(
findId,
async (client) =>
await client`UPDATE ${client(tdomain)} SET "%hash%" = NULL WHERE "workspaceId" = ${this.workspaceId.name} AND "%hash%" IS NOT NULL`
)
break
}
}
}
if ((docs.done === true || docs.value.length === 0) && mode === 'hashed') {
forcedRecheck = true
mode = 'non_hashed'
bulk = createBulk('*', '"%hash%" IS NULL OR "%hash%" = \'\'')
docs = await ctx.with('next', { mode }, () => bulk.next())
}
const docs = await ctx.with('next', {}, () => bulk.next())
if (docs.done === true || docs.value.length === 0) {
return []
}
const result: DocInfo[] = []
for (const d of docs.value) {
result.push(toDocInfo(d, bulkUpdate))
result.push({
id: d._id,
hash: this.strimSize((d as any)['%hash%'])
})
}
await ctx.with('flush', {}, () => flush())
return result
},
close: async () => {
await ctx.with('flush', {}, () => flush(true))
await bulk.return([]) // We need to close generator, just in case
client?.release()
ctx.end()
@ -1431,12 +1378,9 @@ abstract class PostgresAdapterBase implements DbAdapter {
const doc = part[i]
const variables: string[] = []
const digest: string | null = (doc as any)['%hash%']
if ('%hash%' in doc) {
delete doc['%hash%']
if (!('%hash%' in doc) || doc['%hash%'] === '' || doc['%hash%'] == null) {
;(doc as any)['%hash%'] = this.curHash() // We need to set current hash
}
const size = digest != null ? estimateDocSize(doc) : 0
;(doc as any)['%hash%'] = digest == null ? null : `${digest}|${size.toString(16)}`
const d = convertDoc(domain, doc, this.workspaceId.name, schemaFields)
values.push(d.workspaceId)
@ -1472,7 +1416,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
const tdomain = translateDomain(domain)
const toClean = [...docs]
while (toClean.length > 0) {
const part = toClean.splice(0, 200)
const part = toClean.splice(0, 2500)
await ctx.with('clean', {}, () => {
return this.mgr.write(
ctx.id,
@ -1492,7 +1436,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
const key = isDataField(domain, field) ? `data ->> '${field}'` : `"${field}"`
return ctx.with('groupBy', { domain }, async (ctx) => {
try {
return await this.mgr.read(ctx.id ?? generateId(), async (connection) => {
return await this.mgr.read(ctx.id, async (connection) => {
const result = await connection.unsafe(
`SELECT DISTINCT ${key} as ${field}, Count(*) AS count FROM ${translateDomain(domain)} WHERE ${this.buildRawQuery(domain, query ?? {})} GROUP BY ${key}`
)
@ -1519,8 +1463,8 @@ abstract class PostgresAdapterBase implements DbAdapter {
const doc = map.get(_id)
if (doc === undefined) continue
const op = { ...ops }
if ((op as any)['%hash%'] === undefined) {
;(op as any)['%hash%'] = null
if ((op as any)['%hash%'] == null) {
;(op as any)['%hash%'] = this.curHash()
}
TxProcessor.applyUpdate(doc, op)
const converted = convertDoc(domain, doc, this.workspaceId.name, schemaFields)
@ -1560,6 +1504,9 @@ abstract class PostgresAdapterBase implements DbAdapter {
const values: DBDoc[] = []
for (let i = 0; i < part.length; i++) {
const doc = part[i]
if ((doc as any)['%hash%'] == null) {
;(doc as any)['%hash%'] = this.curHash()
}
const d = convertDoc(domain, doc, this.workspaceId.name, schemaFields)
values.push(d)
}
@ -1622,7 +1569,7 @@ class PostgresAdapter extends PostgresAdapterBase {
const doc = await this.findDoc(ctx, client, tx.objectClass, tx.objectId, true)
if (doc === undefined) return
TxProcessor.updateMixin4Doc(doc, tx)
;(doc as any)['%hash%'] = null
;(doc as any)['%hash%'] = this.curHash()
const domain = this.hierarchy.getDomain(tx.objectClass)
const converted = convertDoc(domain, doc, this.workspaceId.name, schemaFields)
const { extractedFields } = parseUpdate(tx.attributes as Partial<Doc>, schemaFields)
@ -1714,7 +1661,7 @@ class PostgresAdapter extends PostgresAdapterBase {
for (const tx of withOperator ?? []) {
let doc: Doc | undefined
const ops: any = { '%hash%': null, ...tx.operations }
const ops: any = { '%hash%': this.curHash(), ...tx.operations }
result.push(
await ctx.with('tx-update-doc', { _class: tx.objectClass }, async (ctx) => {
await this.mgr.write(ctx.id, async (client) => {
@ -1723,7 +1670,7 @@ class PostgresAdapter extends PostgresAdapterBase {
ops.modifiedBy = tx.modifiedBy
ops.modifiedOn = tx.modifiedOn
TxProcessor.applyUpdate(doc, ops)
;(doc as any)['%hash%'] = null
;(doc as any)['%hash%'] = this.curHash()
const converted = convertDoc(domain, doc, this.workspaceId.name, schemaFields)
const columns: string[] = []
const { extractedFields, remainingData } = parseUpdate(ops, schemaFields)

View File

@ -304,9 +304,9 @@ export function convertDoc<T extends Doc> (
// Check if some fields are missing
for (const [key, _type] of Object.entries(schemaAndFields.schema)) {
if (!(key in doc)) {
// We missing required field, and we need to add a dummy value for it.
if (_type.notNull) {
if (_type.notNull) {
if (!(key in doc) || (doc as any)[key] == null) {
// We missing required field, and we need to add a dummy value for it.
// Null value is not allowed
switch (_type.type) {
case 'bigint':

View File

@ -94,7 +94,7 @@ class StorageBlobAdapter implements DbAdapter {
async close (): Promise<void> {}
find (ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator {
find (ctx: MeasureContext, domain: Domain): StorageIterator {
return this.client.find(ctx, this.workspaceId)
}

View File

@ -263,10 +263,10 @@ export class ClientSession implements Session {
return this.ops
}
async loadChunk (ctx: ClientSessionCtx, domain: Domain, idx?: number, recheck?: boolean): Promise<void> {
async loadChunk (ctx: ClientSessionCtx, domain: Domain, idx?: number): Promise<void> {
this.lastRequest = Date.now()
try {
const result = await this.getOps().loadChunk(ctx.ctx, domain, idx, recheck)
const result = await this.getOps().loadChunk(ctx.ctx, domain, idx)
await ctx.sendResponse(result)
} catch (err: any) {
await ctx.sendError('Failed to upload', unknownError(err))
@ -326,7 +326,7 @@ export class ClientSession implements Session {
* @public
*/
export interface BackupSession extends Session {
loadChunk: (ctx: ClientSessionCtx, domain: Domain, idx?: number, recheck?: boolean) => Promise<void>
loadChunk: (ctx: ClientSessionCtx, domain: Domain, idx?: number) => Promise<void>
closeChunk: (ctx: ClientSessionCtx, idx: number) => Promise<void>
loadDocs: (ctx: ClientSessionCtx, domain: Domain, docs: Ref<Doc>[]) => Promise<void>
}