diff --git a/dev/client-resources/src/connection.ts b/dev/client-resources/src/connection.ts index bf1c73be98..8e2808d077 100644 --- a/dev/client-resources/src/connection.ts +++ b/dev/client-resources/src/connection.ts @@ -93,7 +93,7 @@ class ServerStorageWrapper implements ClientConnection { async close (): Promise {} async loadChunk (domain: Domain, idx?: number): Promise { - return { idx: -1, docs: {}, finished: true } + return { idx: -1, docs: [], finished: true } } async closeChunk (idx: number): Promise {} diff --git a/packages/core/src/__tests__/client.test.ts b/packages/core/src/__tests__/client.test.ts index ba831ccf12..587fa28720 100644 --- a/packages/core/src/__tests__/client.test.ts +++ b/packages/core/src/__tests__/client.test.ts @@ -111,7 +111,7 @@ describe('client', () => { loadChunk: async (domain: Domain, idx?: number) => ({ idx: -1, index: -1, - docs: {}, + docs: [], finished: true, digest: '' }), diff --git a/packages/core/src/__tests__/connection.ts b/packages/core/src/__tests__/connection.ts index 69cf501eab..60fade2958 100644 --- a/packages/core/src/__tests__/connection.ts +++ b/packages/core/src/__tests__/connection.ts @@ -62,7 +62,7 @@ export async function connect (handler: (tx: Tx) => void): Promise ({ idx: -1, index: -1, - docs: {}, + docs: [], finished: true, digest: '' }), diff --git a/packages/core/src/backup.ts b/packages/core/src/backup.ts index 7c55962862..103188e084 100644 --- a/packages/core/src/backup.ts +++ b/packages/core/src/backup.ts @@ -1,4 +1,5 @@ import { Doc, Domain, Ref } from './classes' +import { DocInfo } from './server' /** * @public @@ -8,7 +9,7 @@ import { Doc, Domain, Ref } from './classes' export interface DocChunk { idx: number // _id => hash mapping - docs: Record + docs: DocInfo[] finished: boolean } diff --git a/packages/platform-rig/bin/do-svelte-check.js b/packages/platform-rig/bin/do-svelte-check.js index 6f73ff8fdd..3865c185da 100755 --- a/packages/platform-rig/bin/do-svelte-check.js +++ b/packages/platform-rig/bin/do-svelte-check.js @@ -2,7 +2,7 @@ const { join, dirname } = require("path") const { readFileSync, existsSync, mkdirSync, createWriteStream } = require('fs') const { spawn } = require('child_process') -async function execProcess(cmd, logFile, args) { +async function execProcess(cmd, logFile, args, useConsole) { let compileRoot = dirname(dirname(process.argv[1])) console.log("Svelte check...\n", process.cwd(), args) @@ -22,6 +22,10 @@ async function execProcess(cmd, logFile, args) { compileOut.stdout.pipe(outPipe) compileOut.stdout.on('end', function (data) { outPipe.close() + if( useConsole ) { + console.log(readFileSync(stdoutFilePath).toString()) + console.log(readFileSync(stderrFilePath).toString()) + } resolve() }) } else { @@ -64,14 +68,22 @@ async function execProcess(cmd, logFile, args) { } } -let args = process.argv.splice(2) +let args = [] // process.argv.slice(2) +let useConsole = false +for(const a of process.argv.slice(2)) { + if( a === '--console') { + useConsole = true + } else { + args.push(a) + } +} let st = Date.now() execProcess( 'svelte-check', 'svelte-check', [ '--output', 'human', - ...process.argv.splice(2) -]) + ...args +], useConsole) .then(() => { console.log("Svelte check time: ", Date.now() - st) }) diff --git a/packages/query/src/__tests__/connection.ts b/packages/query/src/__tests__/connection.ts index 15cf96f98a..b78b59e9fc 100644 --- a/packages/query/src/__tests__/connection.ts +++ b/packages/query/src/__tests__/connection.ts @@ -83,7 +83,7 @@ FulltextStorage & { loadChunk: async (domain: Domain, idx?: number) => ({ idx: -1, index: -1, - docs: {}, + docs: [], finished: true, digest: '' }), diff --git a/server/backup/src/index.ts b/server/backup/src/index.ts index 17b9643eb8..a34d044b1d 100644 --- a/server/backup/src/index.ts +++ b/server/backup/src/index.ts @@ -39,6 +39,7 @@ export * from './storage' const dataBlobSize = 50 * 1024 * 1024 const dataUploadSize = 2 * 1024 * 1024 +const retrieveChunkSize = 2 * 1024 * 1024 const defaultLevel = 9 @@ -241,17 +242,25 @@ export async function cloneWorkspace ( const it = await sourceConnection.loadChunk(c, idx) idx = it.idx - const needRetrieve: Ref[] = [] + let needRetrieve: Ref[] = [] + let needRetrieveSize = 0 - for (const [k, v] of Object.entries(it.docs)) { + for (const { id, hash, size } of it.docs) { processed++ if (Date.now() - st > 2500) { console.log('processed', processed, Date.now() - st) st = Date.now() } - changes.added.set(k as Ref, v) - needRetrieve.push(k as Ref) + changes.added.set(id as Ref, hash) + needRetrieve.push(id as Ref) + needRetrieveSize += size + + if (needRetrieveSize > retrieveChunkSize) { + needRetrieveChunks.push(needRetrieve) + needRetrieveSize = 0 + needRetrieve = [] + } } if (needRetrieve.length > 0) { needRetrieveChunks.push(needRetrieve) @@ -375,11 +384,11 @@ export async function backup ( if (lastTx._id === backupInfo.lastTxId && !force) { console.log('No transaction changes. Skipping backup.') return - } else { - backupInfo.lastTxId = lastTx._id } } + backupInfo.lastTxId = '' // Clear until full backup will be complete + const snapshot: BackupSnapshot = { date: Date.now(), domains: {} @@ -434,35 +443,44 @@ export async function backup ( // Load all digest from collection. while (true) { try { - const it = await connection.loadChunk(domain, idx) - idx = it.idx + const currentChunk = await connection.loadChunk(domain, idx) + idx = currentChunk.idx - const needRetrieve: Ref[] = [] + let needRetrieve: Ref[] = [] + let currentNeedRetrieveSize = 0 - for (const [k, v] of Object.entries(it.docs)) { + for (const { id, hash, size } of currentChunk.docs) { processed++ if (Date.now() - st > 2500) { console.log('processed', processed, digest.size, Date.now() - st) st = Date.now() } - const kHash = digest.get(k as Ref) + const kHash = digest.get(id as Ref) if (kHash !== undefined) { - digest.delete(k as Ref) - if (kHash !== v) { - changes.updated.set(k as Ref, v) - needRetrieve.push(k as Ref) + digest.delete(id as Ref) + if (kHash !== hash) { + changes.updated.set(id as Ref, hash) + needRetrieve.push(id as Ref) + currentNeedRetrieveSize += size changed++ } } else { - changes.added.set(k as Ref, v) - needRetrieve.push(k as Ref) + changes.added.set(id as Ref, hash) + needRetrieve.push(id as Ref) changed++ + currentNeedRetrieveSize += size + } + + if (currentNeedRetrieveSize > retrieveChunkSize) { + needRetrieveChunks.push(needRetrieve) + currentNeedRetrieveSize = 0 + needRetrieve = [] } } if (needRetrieve.length > 0) { needRetrieveChunks.push(needRetrieve) } - if (it.finished) { + if (currentChunk.finished) { await connection.closeChunk(idx) break } @@ -510,7 +528,10 @@ export async function backup ( processedChanges.added.clear() processedChanges.removed = [] processedChanges.updated.clear() - await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2))) + await storage.writeFile( + infoFile, + gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }) + ) } } if (_pack === undefined) { @@ -583,12 +604,13 @@ export async function backup ( processedChanges.updated.clear() _pack?.finalize() // This will allow to retry in case of critical error. - await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2))) + await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) } } backupInfo.snapshotsIndex = backupInfo.snapshots.length - await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2))) + backupInfo.lastTxId = lastTx?._id ?? '0' // We could store last tx, since full backup is complete + await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) } catch (err: any) { console.error(err) } finally { @@ -680,8 +702,8 @@ export async function restore ( idx = it.idx el += Date.now() - st - for (const [_id, hash] of Object.entries(it.docs)) { - serverChangeset.set(_id as Ref, hash) + for (const { id, hash } of it.docs) { + serverChangeset.set(id as Ref, hash) loaded++ } @@ -979,7 +1001,10 @@ export async function compactBackup (storage: BackupStorage, force: boolean = fa processedChanges.added.clear() processedChanges.removed = [] processedChanges.updated.clear() - await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2))) + await storage.writeFile( + infoFile, + gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }) + ) } } if (_pack === undefined) { @@ -1154,7 +1179,7 @@ export async function compactBackup (storage: BackupStorage, force: boolean = fa processedChanges.updated.clear() _pack?.finalize() // This will allow to retry in case of critical error. - await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2))) + await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) } } @@ -1176,7 +1201,7 @@ export async function compactBackup (storage: BackupStorage, force: boolean = fa } backupInfo.snapshotsIndex = backupInfo.snapshots.length - await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2))) + await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) } catch (err: any) { console.error(err) } finally { diff --git a/server/server/src/backup.ts b/server/server/src/backup.ts index e1d5a82d84..a08e54f5ea 100644 --- a/server/server/src/backup.ts +++ b/server/server/src/backup.ts @@ -1,9 +1,9 @@ -import { Doc, DocChunk, Domain, MeasureContext, Ref, StorageIterator } from '@hcengineering/core' -import { Pipeline } from '@hcengineering/server-core' +import { Doc, DocChunk, DocInfo, Domain, MeasureContext, Ref, StorageIterator } from '@hcengineering/core' +import { estimateDocSize, Pipeline } from '@hcengineering/server-core' import { Token } from '@hcengineering/server-token' import { BroadcastCall, ClientSession, Session } from '@hcengineering/server-ws' -const chunkSize = 1024 * 1024 +const chunkSize = 2 * 1024 * 1024 /** * @public @@ -48,7 +48,7 @@ export class BackupClientSession extends ClientSession implements BackupSession if (chunk.finished === undefined) { return { idx, - docs: {}, + docs: [], finished: true } } @@ -57,7 +57,7 @@ export class BackupClientSession extends ClientSession implements BackupSession this.chunkInfo.set(idx, chunk) } let size = 0 - const docs: Record = {} + const docs: DocInfo[] = [] while (size < chunkSize) { const doc = await chunk.iterator.next(ctx) @@ -66,8 +66,8 @@ export class BackupClientSession extends ClientSession implements BackupSession break } - size = size + doc.size - docs[doc.id] = doc.hash + size += estimateDocSize(doc) + docs.push(doc) } return { diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index 22dbdc7676..3780e273bb 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -211,7 +211,7 @@ export function startHttpServer ( ) if ('upgrade' in session || 'error' in session) { if ('error' in session) { - void ctx.error('error', { error: session.error }) + void ctx.error('error', { error: session.error?.message, stack: session.error?.stack }) } cs.close() return