From 9e183faf6b86e895ece34e6a3dbdeede9845d223 Mon Sep 17 00:00:00 2001
From: Andrey Sobolev <haiodo@users.noreply.github.com>
Date: Mon, 8 Apr 2024 17:43:01 +0700
Subject: [PATCH] UBERF-6313: Improve backup/restore (#5241)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
---
 dev/client-resources/src/connection.ts       |  2 +-
 packages/core/src/__tests__/client.test.ts   |  2 +-
 packages/core/src/__tests__/connection.ts    |  2 +-
 packages/core/src/backup.ts                  |  3 +-
 packages/platform-rig/bin/do-svelte-check.js | 20 ++++-
 packages/query/src/__tests__/connection.ts   |  2 +-
 server/backup/src/index.ts                   | 77 +++++++++++++-------
 server/server/src/backup.ts                  | 14 ++--
 server/ws/src/server_http.ts                 |  2 +-
 9 files changed, 81 insertions(+), 43 deletions(-)

diff --git a/dev/client-resources/src/connection.ts b/dev/client-resources/src/connection.ts
index bf1c73be98..8e2808d077 100644
--- a/dev/client-resources/src/connection.ts
+++ b/dev/client-resources/src/connection.ts
@@ -93,7 +93,7 @@ class ServerStorageWrapper implements ClientConnection {
   async close (): Promise<void> {}
 
   async loadChunk (domain: Domain, idx?: number): Promise<DocChunk> {
-    return { idx: -1, docs: {}, finished: true }
+    return { idx: -1, docs: [], finished: true }
   }
 
   async closeChunk (idx: number): Promise<void> {}
diff --git a/packages/core/src/__tests__/client.test.ts b/packages/core/src/__tests__/client.test.ts
index ba831ccf12..587fa28720 100644
--- a/packages/core/src/__tests__/client.test.ts
+++ b/packages/core/src/__tests__/client.test.ts
@@ -111,7 +111,7 @@ describe('client', () => {
         loadChunk: async (domain: Domain, idx?: number) => ({
           idx: -1,
           index: -1,
-          docs: {},
+          docs: [],
           finished: true,
           digest: ''
         }),
diff --git a/packages/core/src/__tests__/connection.ts b/packages/core/src/__tests__/connection.ts
index 69cf501eab..60fade2958 100644
--- a/packages/core/src/__tests__/connection.ts
+++ b/packages/core/src/__tests__/connection.ts
@@ -62,7 +62,7 @@ export async function connect (handler: (tx: Tx) => void): Promise<ClientConnect
     loadChunk: async (domain: Domain, idx?: number) => ({
       idx: -1,
       index: -1,
-      docs: {},
+      docs: [],
       finished: true,
       digest: ''
     }),
diff --git a/packages/core/src/backup.ts b/packages/core/src/backup.ts
index 7c55962862..103188e084 100644
--- a/packages/core/src/backup.ts
+++ b/packages/core/src/backup.ts
@@ -1,4 +1,5 @@
 import { Doc, Domain, Ref } from './classes'
+import { DocInfo } from './server'
 
 /**
  * @public
@@ -8,7 +9,7 @@ import { Doc, Domain, Ref } from './classes'
 export interface DocChunk {
   idx: number
   // _id => hash mapping
-  docs: Record<string, string>
+  docs: DocInfo[]
   finished: boolean
 }
 
diff --git a/packages/platform-rig/bin/do-svelte-check.js b/packages/platform-rig/bin/do-svelte-check.js
index 6f73ff8fdd..3865c185da 100755
--- a/packages/platform-rig/bin/do-svelte-check.js
+++ b/packages/platform-rig/bin/do-svelte-check.js
@@ -2,7 +2,7 @@ const { join, dirname } = require("path")
 const { readFileSync, existsSync, mkdirSync, createWriteStream } = require('fs')
 const { spawn } = require('child_process')
 
-async function execProcess(cmd, logFile, args) {
+async function execProcess(cmd, logFile, args, useConsole) {
   let compileRoot = dirname(dirname(process.argv[1]))
   console.log("Svelte check...\n", process.cwd(), args)
 
@@ -22,6 +22,10 @@ async function execProcess(cmd, logFile, args) {
       compileOut.stdout.pipe(outPipe)
       compileOut.stdout.on('end', function (data) {
         outPipe.close()
+        if( useConsole ) {
+          console.log(readFileSync(stdoutFilePath).toString())
+          console.log(readFileSync(stderrFilePath).toString())
+        }
         resolve()
       })
     } else {
@@ -64,14 +68,22 @@ async function execProcess(cmd, logFile, args) {
   }
 }
 
-let args = process.argv.splice(2)
+let args = [] // process.argv.slice(2)
+let useConsole = false
+for(const a of process.argv.slice(2)) {
+  if( a === '--console') {
+    useConsole = true
+  } else {
+    args.push(a)
+  }
+}
 let st = Date.now()
 execProcess(
   'svelte-check',
   'svelte-check', [
   '--output', 'human',
-  ...process.argv.splice(2)
-])
+  ...args  
+], useConsole)
   .then(() => {
     console.log("Svelte check time: ", Date.now() - st)
   })
diff --git a/packages/query/src/__tests__/connection.ts b/packages/query/src/__tests__/connection.ts
index 15cf96f98a..b78b59e9fc 100644
--- a/packages/query/src/__tests__/connection.ts
+++ b/packages/query/src/__tests__/connection.ts
@@ -83,7 +83,7 @@ FulltextStorage & {
     loadChunk: async (domain: Domain, idx?: number) => ({
       idx: -1,
       index: -1,
-      docs: {},
+      docs: [],
       finished: true,
       digest: ''
     }),
diff --git a/server/backup/src/index.ts b/server/backup/src/index.ts
index 17b9643eb8..a34d044b1d 100644
--- a/server/backup/src/index.ts
+++ b/server/backup/src/index.ts
@@ -39,6 +39,7 @@ export * from './storage'
 
 const dataBlobSize = 50 * 1024 * 1024
 const dataUploadSize = 2 * 1024 * 1024
+const retrieveChunkSize = 2 * 1024 * 1024
 
 const defaultLevel = 9
 
@@ -241,17 +242,25 @@ export async function cloneWorkspace (
           const it = await sourceConnection.loadChunk(c, idx)
           idx = it.idx
 
-          const needRetrieve: Ref<Doc>[] = []
+          let needRetrieve: Ref<Doc>[] = []
+          let needRetrieveSize = 0
 
-          for (const [k, v] of Object.entries(it.docs)) {
+          for (const { id, hash, size } of it.docs) {
             processed++
             if (Date.now() - st > 2500) {
               console.log('processed', processed, Date.now() - st)
               st = Date.now()
             }
 
-            changes.added.set(k as Ref<Doc>, v)
-            needRetrieve.push(k as Ref<Doc>)
+            changes.added.set(id as Ref<Doc>, hash)
+            needRetrieve.push(id as Ref<Doc>)
+            needRetrieveSize += size
+
+            if (needRetrieveSize > retrieveChunkSize) {
+              needRetrieveChunks.push(needRetrieve)
+              needRetrieveSize = 0
+              needRetrieve = []
+            }
           }
           if (needRetrieve.length > 0) {
             needRetrieveChunks.push(needRetrieve)
@@ -375,11 +384,11 @@ export async function backup (
       if (lastTx._id === backupInfo.lastTxId && !force) {
         console.log('No transaction changes. Skipping backup.')
         return
-      } else {
-        backupInfo.lastTxId = lastTx._id
       }
     }
 
+    backupInfo.lastTxId = '' // Clear until full backup will be complete
+
     const snapshot: BackupSnapshot = {
       date: Date.now(),
       domains: {}
@@ -434,35 +443,44 @@ export async function backup (
       // Load all digest from collection.
       while (true) {
         try {
-          const it = await connection.loadChunk(domain, idx)
-          idx = it.idx
+          const currentChunk = await connection.loadChunk(domain, idx)
+          idx = currentChunk.idx
 
-          const needRetrieve: Ref<Doc>[] = []
+          let needRetrieve: Ref<Doc>[] = []
+          let currentNeedRetrieveSize = 0
 
-          for (const [k, v] of Object.entries(it.docs)) {
+          for (const { id, hash, size } of currentChunk.docs) {
             processed++
             if (Date.now() - st > 2500) {
               console.log('processed', processed, digest.size, Date.now() - st)
               st = Date.now()
             }
-            const kHash = digest.get(k as Ref<Doc>)
+            const kHash = digest.get(id as Ref<Doc>)
             if (kHash !== undefined) {
-              digest.delete(k as Ref<Doc>)
-              if (kHash !== v) {
-                changes.updated.set(k as Ref<Doc>, v)
-                needRetrieve.push(k as Ref<Doc>)
+              digest.delete(id as Ref<Doc>)
+              if (kHash !== hash) {
+                changes.updated.set(id as Ref<Doc>, hash)
+                needRetrieve.push(id as Ref<Doc>)
+                currentNeedRetrieveSize += size
                 changed++
               }
             } else {
-              changes.added.set(k as Ref<Doc>, v)
-              needRetrieve.push(k as Ref<Doc>)
+              changes.added.set(id as Ref<Doc>, hash)
+              needRetrieve.push(id as Ref<Doc>)
               changed++
+              currentNeedRetrieveSize += size
+            }
+
+            if (currentNeedRetrieveSize > retrieveChunkSize) {
+              needRetrieveChunks.push(needRetrieve)
+              currentNeedRetrieveSize = 0
+              needRetrieve = []
             }
           }
           if (needRetrieve.length > 0) {
             needRetrieveChunks.push(needRetrieve)
           }
-          if (it.finished) {
+          if (currentChunk.finished) {
             await connection.closeChunk(idx)
             break
           }
@@ -510,7 +528,10 @@ export async function backup (
             processedChanges.added.clear()
             processedChanges.removed = []
             processedChanges.updated.clear()
-            await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2)))
+            await storage.writeFile(
+              infoFile,
+              gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })
+            )
           }
         }
         if (_pack === undefined) {
@@ -583,12 +604,13 @@ export async function backup (
         processedChanges.updated.clear()
         _pack?.finalize()
         // This will allow to retry in case of critical error.
-        await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2)))
+        await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
       }
     }
 
     backupInfo.snapshotsIndex = backupInfo.snapshots.length
-    await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2)))
+    backupInfo.lastTxId = lastTx?._id ?? '0' // We could store last tx, since full backup is complete
+    await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
   } catch (err: any) {
     console.error(err)
   } finally {
@@ -680,8 +702,8 @@ export async function restore (
         idx = it.idx
         el += Date.now() - st
 
-        for (const [_id, hash] of Object.entries(it.docs)) {
-          serverChangeset.set(_id as Ref<Doc>, hash)
+        for (const { id, hash } of it.docs) {
+          serverChangeset.set(id as Ref<Doc>, hash)
           loaded++
         }
 
@@ -979,7 +1001,10 @@ export async function compactBackup (storage: BackupStorage, force: boolean = fa
             processedChanges.added.clear()
             processedChanges.removed = []
             processedChanges.updated.clear()
-            await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2)))
+            await storage.writeFile(
+              infoFile,
+              gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })
+            )
           }
         }
         if (_pack === undefined) {
@@ -1154,7 +1179,7 @@ export async function compactBackup (storage: BackupStorage, force: boolean = fa
         processedChanges.updated.clear()
         _pack?.finalize()
         // This will allow to retry in case of critical error.
-        await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2)))
+        await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
       }
     }
 
@@ -1176,7 +1201,7 @@ export async function compactBackup (storage: BackupStorage, force: boolean = fa
     }
 
     backupInfo.snapshotsIndex = backupInfo.snapshots.length
-    await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2)))
+    await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
   } catch (err: any) {
     console.error(err)
   } finally {
diff --git a/server/server/src/backup.ts b/server/server/src/backup.ts
index e1d5a82d84..a08e54f5ea 100644
--- a/server/server/src/backup.ts
+++ b/server/server/src/backup.ts
@@ -1,9 +1,9 @@
-import { Doc, DocChunk, Domain, MeasureContext, Ref, StorageIterator } from '@hcengineering/core'
-import { Pipeline } from '@hcengineering/server-core'
+import { Doc, DocChunk, DocInfo, Domain, MeasureContext, Ref, StorageIterator } from '@hcengineering/core'
+import { estimateDocSize, Pipeline } from '@hcengineering/server-core'
 import { Token } from '@hcengineering/server-token'
 import { BroadcastCall, ClientSession, Session } from '@hcengineering/server-ws'
 
-const chunkSize = 1024 * 1024
+const chunkSize = 2 * 1024 * 1024
 
 /**
  * @public
@@ -48,7 +48,7 @@ export class BackupClientSession extends ClientSession implements BackupSession
         if (chunk.finished === undefined) {
           return {
             idx,
-            docs: {},
+            docs: [],
             finished: true
           }
         }
@@ -57,7 +57,7 @@ export class BackupClientSession extends ClientSession implements BackupSession
         this.chunkInfo.set(idx, chunk)
       }
       let size = 0
-      const docs: Record<string, string> = {}
+      const docs: DocInfo[] = []
 
       while (size < chunkSize) {
         const doc = await chunk.iterator.next(ctx)
@@ -66,8 +66,8 @@ export class BackupClientSession extends ClientSession implements BackupSession
           break
         }
 
-        size = size + doc.size
-        docs[doc.id] = doc.hash
+        size += estimateDocSize(doc)
+        docs.push(doc)
       }
 
       return {
diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts
index 22dbdc7676..3780e273bb 100644
--- a/server/ws/src/server_http.ts
+++ b/server/ws/src/server_http.ts
@@ -211,7 +211,7 @@ export function startHttpServer (
     )
     if ('upgrade' in session || 'error' in session) {
       if ('error' in session) {
-        void ctx.error('error', { error: session.error })
+        void ctx.error('error', { error: session.error?.message, stack: session.error?.stack })
       }
       cs.close()
       return