From 28dbc1bae50eea73c01698a9524ad62a7b103c25 Mon Sep 17 00:00:00 2001
From: Andrey Sobolev <haiodo@users.noreply.github.com>
Date: Fri, 27 Sep 2024 14:24:29 +0700
Subject: [PATCH] Fix for blobs backup (#6751)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
---
 .../src/components/SelectWorkspaceMenu.svelte |  8 +-
 server/backup/src/backup.ts                   | 81 +++++++++++++++----
 server/core/src/server/aggregator.ts          |  9 ++-
 3 files changed, 79 insertions(+), 19 deletions(-)

diff --git a/plugins/workbench-resources/src/components/SelectWorkspaceMenu.svelte b/plugins/workbench-resources/src/components/SelectWorkspaceMenu.svelte
index f1d46ca615..1c149feeff 100644
--- a/plugins/workbench-resources/src/components/SelectWorkspaceMenu.svelte
+++ b/plugins/workbench-resources/src/components/SelectWorkspaceMenu.svelte
@@ -196,7 +196,13 @@
                   {#if isAdmin && ws.lastVisit != null && ws.lastVisit !== 0}
                     <div class="text-sm">
                       {#if ws.backupInfo != null}
-                        {ws.backupInfo.backupSize}Mb -
+                        {@const sz = ws.backupInfo.dataSize + ws.backupInfo.blobsSize}
+                        {@const szGb = Math.round((sz * 100) / 1024) / 100}
+                        {#if szGb > 0}
+                          {Math.round((sz * 100) / 1024) / 100}Gb -
+                        {:else}
+                          {Math.round(sz)}Mb -
+                        {/if}
                       {/if}
                       ({lastUsageDays} days)
                     </div>
diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts
index 2daa2fdb32..39a67c01bf 100644
--- a/server/backup/src/backup.ts
+++ b/server/backup/src/backup.ts
@@ -43,7 +43,7 @@ import { BlobClient, createClient } from '@hcengineering/server-client'
 import { fullTextPushStagePrefix, type StorageAdapter } from '@hcengineering/server-core'
 import { generateToken } from '@hcengineering/server-token'
 import { connect } from '@hcengineering/server-tool'
-import { createWriteStream, existsSync, mkdirSync, statSync } from 'node:fs'
+import { createWriteStream, existsSync, mkdirSync } from 'node:fs'
 import { dirname } from 'node:path'
 import { PassThrough } from 'node:stream'
 import { createGzip } from 'node:zlib'
@@ -132,6 +132,7 @@ async function loadDigest (
   date?: number
 ): Promise<Map<Ref<Doc>, string>> {
   ctx = ctx.newChild('load digest', { domain, count: snapshots.length })
+  ctx.info('load-digest', { domain, count: snapshots.length })
   const result = new Map<Ref<Doc>, string>()
   for (const s of snapshots) {
     const d = s.domains[domain]
@@ -492,9 +493,9 @@ async function cleanDomain (ctx: MeasureContext, connection: CoreClient & Backup
   }
 }
 
-function doTrimHash (s: string | undefined): string {
+function doTrimHash (s: string | undefined): string | undefined {
   if (s == null) {
-    return ''
+    return undefined
   }
   if (s.startsWith('"') && s.endsWith('"')) {
     return s.slice(1, s.length - 1)
@@ -716,6 +717,24 @@ export async function backup (
         time: Date.now() - st,
         workspace: workspaceId.name
       })
+      const oldHash = new Map<Ref<Doc>, string>()
+
+      function removeFromNeedRetrieve (needRetrieve: Ref<Doc>[], id: string): void {
+        const pos = needRetrieve.indexOf(id as Ref<Doc>)
+        if (pos !== -1) {
+          needRetrieve.splice(pos, 1)
+          processed--
+          changed--
+        }
+        for (const ch of needRetrieveChunks) {
+          const pos = ch.indexOf(id as Ref<Doc>)
+          if (pos !== -1) {
+            ch.splice(pos, 1)
+            processed--
+            changed--
+          }
+        }
+      }
       while (true) {
         try {
           const currentChunk = await ctx.with('loadChunk', {}, () => connection.loadChunk(domain, idx, options.recheck))
@@ -741,17 +760,31 @@ export async function backup (
               })
               st = Date.now()
             }
-            const _hash = doTrimHash(hash)
-            const kHash = doTrimHash(digest.get(id as Ref<Doc>))
+            const _hash = doTrimHash(hash) as string
+            const kHash = doTrimHash(digest.get(id as Ref<Doc>) ?? oldHash.get(id as Ref<Doc>))
             if (kHash !== undefined) {
-              digest.delete(id as Ref<Doc>)
+              if (digest.delete(id as Ref<Doc>)) {
+                oldHash.set(id as Ref<Doc>, kHash)
+              }
               if (kHash !== _hash) {
+                if (changes.updated.has(id as Ref<Doc>)) {
+                  removeFromNeedRetrieve(needRetrieve, id as Ref<Doc>)
+                }
                 changes.updated.set(id as Ref<Doc>, _hash)
                 needRetrieve.push(id as Ref<Doc>)
                 currentNeedRetrieveSize += size
                 changed++
+              } else if (changes.updated.has(id as Ref<Doc>)) {
+                // We have same
+                changes.updated.delete(id as Ref<Doc>)
+                removeFromNeedRetrieve(needRetrieve, id as Ref<Doc>)
+                processed -= 1
               }
             } else {
+              if (domain === DOMAIN_BLOB && changes.added.has(id as Ref<Doc>)) {
+                // We need to clean old need retrieve in case of duplicates.
+                removeFromNeedRetrieve(needRetrieve, id)
+              }
               changes.added.set(id as Ref<Doc>, _hash)
               needRetrieve.push(id as Ref<Doc>)
               changed++
@@ -759,7 +792,9 @@ export async function backup (
             }
 
             if (currentNeedRetrieveSize > retrieveChunkSize) {
-              needRetrieveChunks.push(needRetrieve)
+              if (needRetrieve.length > 0) {
+                needRetrieveChunks.push(needRetrieve)
+              }
               currentNeedRetrieveSize = 0
               needRetrieve = []
             }
@@ -841,12 +876,17 @@ export async function backup (
 
       const totalChunks = needRetrieveChunks.flatMap((it) => it.length).reduce((p, c) => p + c, 0)
       let processed = 0
+      let blobs = 0
+
       while (needRetrieveChunks.length > 0) {
         if (canceled()) {
           return
         }
         const needRetrieve = needRetrieveChunks.shift() as Ref<Doc>[]
 
+        if (needRetrieve.length === 0) {
+          continue
+        }
         ctx.info('Retrieve chunk', {
           needRetrieve: needRetrieveChunks.reduce((v, docs) => v + docs.length, 0),
           toLoad: needRetrieve.length,
@@ -855,6 +895,10 @@ export async function backup (
         let docs: Doc[] = []
         try {
           docs = await ctx.with('load-docs', {}, async (ctx) => await connection.loadDocs(domain, needRetrieve))
+          if (docs.length !== needRetrieve.length) {
+            const nr = new Set(docs.map((it) => it._id))
+            ctx.error('failed to retrieve all documents', { missing: needRetrieve.filter((it) => !nr.has(it)) })
+          }
           ops++
         } catch (err: any) {
           ctx.error('error loading docs', { domain, err, workspace: workspaceId.name })
@@ -998,7 +1042,8 @@ export async function backup (
                   ctx.error('error packing file', { err })
                 }
               })
-              if (blob.size > 1024 * 1024) {
+              blobs++
+              if (blob.size > 1024 * 1024 || blobs >= 10) {
                 ctx.info('download blob', {
                   _id: blob._id,
                   contentType: blob.contentType,
@@ -1006,6 +1051,9 @@ export async function backup (
                   provider: blob.provider,
                   pending: docs.length
                 })
+                if (blobs >= 10) {
+                  blobs = 0
+                }
               }
 
               printDownloaded('', blob.size)
@@ -1179,15 +1227,16 @@ export async function backupDownload (storage: BackupStorage, storeIn: string):
 
   const backupInfo: BackupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString())
   console.log('workspace:', backupInfo.workspace ?? '', backupInfo.version)
-  const addFileSize = async (file: string | undefined | null): Promise<void> => {
-    if (file != null && (await storage.exists(file))) {
-      const fileSize = await storage.stat(file)
+
+  const addFileSize = async (file: string | undefined | null, force: boolean = false): Promise<void> => {
+    if (file != null) {
       const target = join(storeIn, file)
       const dir = dirname(target)
       if (!existsSync(dir)) {
         mkdirSync(dir, { recursive: true })
       }
-      if (!existsSync(target) || fileSize !== statSync(target).size) {
+      if (!existsSync(target) || force) {
+        const fileSize = await storage.stat(file)
         console.log('downloading', file, fileSize)
         const readStream = await storage.load(file)
         const outp = createWriteStream(target)
@@ -1200,8 +1249,10 @@ export async function backupDownload (storage: BackupStorage, storeIn: string):
             resolve()
           })
         })
+        size += fileSize
+      } else {
+        console.log('file-same', file)
       }
-      size += fileSize
     }
   }
 
@@ -1217,7 +1268,7 @@ export async function backupDownload (storage: BackupStorage, storeIn: string):
       }
     }
   }
-  await addFileSize(infoFile)
+  await addFileSize(infoFile, true)
 
   console.log('Backup size', size / (1024 * 1024), 'Mb')
 }
@@ -1693,7 +1744,7 @@ export async function compactBackup (
     const oldSnapshots = [...backupInfo.snapshots]
 
     backupInfo.snapshots = [snapshot]
-    let backupIndex = `${backupInfo.snapshotsIndex ?? oldSnapshots.length}`
+    let backupIndex = `${(backupInfo.snapshotsIndex ?? oldSnapshots.length) + 1}`
     while (backupIndex.length < 6) {
       backupIndex = '0' + backupIndex
     }
diff --git a/server/core/src/server/aggregator.ts b/server/core/src/server/aggregator.ts
index bde7f775e0..61a4e38dac 100644
--- a/server/core/src/server/aggregator.ts
+++ b/server/core/src/server/aggregator.ts
@@ -95,9 +95,12 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
     for (const d of docs) {
       const blobInfo = existingBlobs.get(d._id)
       if (
-        blobInfo === undefined ||
-        this.doTrimHash(blobInfo.etag) !== this.doTrimHash(d.etag) ||
-        blobInfo.size !== d.size
+        blobInfo === undefined || // Blob info undefined
+        // Provider are same and etag or size are diffrent.
+        (d.provider === blobInfo.provider &&
+          (this.doTrimHash(blobInfo.etag) !== this.doTrimHash(d.etag) || blobInfo.size !== d.size)) ||
+        // We have replacement in default
+        (d.provider === this.defaultAdapter && blobInfo?.provider !== d.provider)
       ) {
         const stat = await this.adapters.get(d.provider)?.stat(ctx, workspaceId, d._id)
         if (stat !== undefined) {