From b8360399030da3e1fef2c0320706ca8136ddad9d Mon Sep 17 00:00:00 2001
From: Andrey Sobolev <haiodo@users.noreply.github.com>
Date: Wed, 3 Jul 2024 21:29:59 +0700
Subject: [PATCH] UBERF-7501: Copy few blobs in parallel (#5995)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
---
 packages/model/package.json            |  1 +
 packages/model/src/migration.ts        | 17 ++++++++--
 server/account/src/operations.ts       |  8 ++---
 server/backup/package.json             |  1 +
 server/backup/src/backup.ts            | 46 ++++++++++----------------
 server/core/src/server/domainHelper.ts | 21 ++++++++----
 server/core/src/server/index.ts        |  2 +-
 server/tool/src/index.ts               | 18 +++++++---
 8 files changed, 68 insertions(+), 46 deletions(-)

diff --git a/packages/model/package.json b/packages/model/package.json
index 0fffd805e8..718f46bd79 100644
--- a/packages/model/package.json
+++ b/packages/model/package.json
@@ -41,6 +41,7 @@
     "@hcengineering/core": "^0.6.32",
     "@hcengineering/platform": "^0.6.11",
     "@hcengineering/storage": "^0.6.0",
+    "@hcengineering/analytics": "^0.6.0",
     "toposort": "^2.0.2",
     "fast-equals": "^5.0.1"
   },
diff --git a/packages/model/src/migration.ts b/packages/model/src/migration.ts
index 7cc54caf18..070535147f 100644
--- a/packages/model/src/migration.ts
+++ b/packages/model/src/migration.ts
@@ -1,3 +1,4 @@
+import { Analytics } from '@hcengineering/analytics'
 import core, {
   Class,
   Client,
@@ -154,7 +155,13 @@ export async function tryMigrate (client: MigrationClient, plugin: string, migra
   const states = client.migrateState.get(plugin) ?? new Set()
   for (const migration of migrations) {
     if (states.has(migration.state)) continue
-    await migration.func(client)
+    try {
+      await migration.func(client)
+    } catch (err: any) {
+      console.error(err)
+      Analytics.handleError(err)
+      continue
+    }
     const st: MigrationState = {
       plugin,
       state: migration.state,
@@ -181,7 +188,13 @@ export async function tryUpgrade (
   for (const migration of migrations) {
     if (states.has(migration.state)) continue
     const _client = await client()
-    await migration.func(_client)
+    try {
+      await migration.func(_client)
+    } catch (err: any) {
+      console.error(err)
+      Analytics.handleError(err)
+      continue
+    }
     const st: Data<MigrationState> = {
       plugin,
       state: migration.state
diff --git a/server/account/src/operations.ts b/server/account/src/operations.ts
index 9f0358150f..2adc542461 100644
--- a/server/account/src/operations.ts
+++ b/server/account/src/operations.ts
@@ -968,13 +968,13 @@ export async function createWorkspace (
           getWorkspaceId(workspaceInfo.workspace, productId),
           true,
           async (value) => {
-            await updateInfo({ createProgress: 20 + Math.round((Math.min(value, 100) / 100) * 30) })
+            await updateInfo({ createProgress: 20 + Math.round((Math.min(value, 100) / 100) * 70) })
           },
           true,
           getStorageAdapter()
         )
         const modelVersion = getModelVersion()
-        await updateInfo({ createProgress: 50 })
+        await updateInfo({ createProgress: 90 })
 
         // Skip tx update if version of init workspace are proper one.
         const skipTxUpdate =
@@ -992,11 +992,11 @@ export async function createWorkspace (
               ctxModellogger,
               skipTxUpdate,
               async (value) => {
-                await updateInfo({ createProgress: Math.round(50 + (Math.min(value, 100) / 100) * 40) })
+                await updateInfo({ createProgress: Math.round(90 + (Math.min(value, 100) / 100) * 10) })
               }
             )
         )
-        await updateInfo({ createProgress: 90 })
+        await updateInfo({ createProgress: 99 })
       } else {
         await childLogger.withLog('init-workspace', {}, async (ctx) => {
           await initModel(ctx, getTransactor(), wsId, txes, migrationOperation, ctxModellogger, async (value) => {
diff --git a/server/backup/package.json b/server/backup/package.json
index bcc197c704..ddb895f7c2 100644
--- a/server/backup/package.json
+++ b/server/backup/package.json
@@ -46,6 +46,7 @@
     "@hcengineering/client-resources": "^0.6.27",
     "@hcengineering/client": "^0.6.18",
     "@hcengineering/model": "^0.6.11",
+    "@hcengineering/analytics": "^0.6.0",
     "tar-stream": "^2.2.0",
     "@hcengineering/server-tool": "^0.6.0",
     "@hcengineering/server-core": "^0.6.1"
diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts
index 92ea016db6..ea256d4e0d 100644
--- a/server/backup/src/backup.ts
+++ b/server/backup/src/backup.ts
@@ -43,6 +43,7 @@ import { Writable } from 'stream'
 import { extract, Pack, pack } from 'tar-stream'
 import { createGunzip, gunzipSync, gzipSync } from 'zlib'
 import { BackupStorage } from './storage'
+import { Analytics } from '@hcengineering/analytics'
 export * from './storage'
 
 const dataBlobSize = 50 * 1024 * 1024
@@ -231,7 +232,7 @@ export async function cloneWorkspace (
   clearTime: boolean = true,
   progress: (value: number) => Promise<void>,
   skipFullText: boolean,
-  storageAdapter?: StorageAdapter
+  storageAdapter: StorageAdapter
 ): Promise<void> {
   await ctx.with(
     'clone-workspace',
@@ -255,10 +256,6 @@ export async function cloneWorkspace (
             admin: 'true'
           })) as unknown as CoreClient & BackupClient
       )
-
-      const blobClientSource = new BlobClient(transactorUrl, sourceWorkspaceId)
-      const blobClientTarget = new BlobClient(transactorUrl, targetWorkspaceId)
-
       try {
         const domains = sourceConnection
           .getHierarchy()
@@ -290,6 +287,7 @@ export async function cloneWorkspace (
           const needRetrieveChunks: Ref<Doc>[][] = []
 
           let processed = 0
+          let domainProgress = 0
           let st = Date.now()
           // Load all digest from collection.
           await ctx.with('retrieve-domain-info', { domain: c }, async (ctx) => {
@@ -351,12 +349,12 @@ export async function cloneWorkspace (
                 if (clearTime) {
                   docs = prepareClonedDocuments(docs, sourceConnection, skipFullText)
                 }
+                const executor = new RateLimiter(10)
                 for (const d of docs) {
                   if (d._class === core.class.Blob) {
                     const blob = d as Blob
-                    const blobs: Buffer[] = []
-                    try {
-                      if (storageAdapter !== undefined) {
+                    await executor.exec(async () => {
+                      try {
                         ctx.info('clone blob', { name: blob._id, contentType: blob.contentType })
                         const readable = await storageAdapter.get(ctx, sourceWorkspaceId, blob._id)
                         const passThrue = new PassThrough()
@@ -369,29 +367,18 @@ export async function cloneWorkspace (
                           blob.contentType,
                           blob.size
                         )
-                      } else {
-                        ctx.info('clone blob', { name: blob._id, contentType: blob.contentType })
-                        await ctx.with('download-blob', { contentType: blob.contentType }, async (ctx) => {
-                          await blobClientSource.writeTo(ctx, blob._id, blob.size, {
-                            write: (b, cb) => {
-                              blobs.push(b)
-                              cb()
-                            },
-                            end: (cb) => {
-                              cb()
-                            }
-                          })
-                        })
-                        await ctx.with('upload-blob', { contentType: blob.contentType }, async (ctx) => {
-                          const buffer = Buffer.concat(blobs)
-                          await blobClientTarget.upload(ctx, blob._id, buffer.length, blob.contentType, buffer)
-                        })
+                      } catch (err: any) {
+                        Analytics.handleError(err)
+                        console.error(err)
                       }
-                    } catch (err: any) {
-                      console.error(err)
-                    }
+                      domainProgress++
+                      await progress((100 / domains.length) * i + (100 / domains.length / processed) * domainProgress)
+                    })
+                  } else {
+                    domainProgress++
                   }
                 }
+                await executor.waitProcessing()
                 await ctx.with(
                   'upload-docs',
                   {},
@@ -400,8 +387,10 @@ export async function cloneWorkspace (
                   },
                   { length: docs.length }
                 )
+                await progress((100 / domains.length) * i + (100 / domains.length / processed) * domainProgress)
               } catch (err: any) {
                 console.log(err)
+                Analytics.handleError(err)
                 // Put back.
                 needRetrieveChunks.push(needRetrieve)
                 continue
@@ -414,6 +403,7 @@ export async function cloneWorkspace (
         }
       } catch (err: any) {
         console.error(err)
+        Analytics.handleError(err)
       } finally {
         ctx.info('end clone')
         await ctx.with('close-source', {}, async (ctx) => {
diff --git a/server/core/src/server/domainHelper.ts b/server/core/src/server/domainHelper.ts
index 7f84c004f8..e5b6892a15 100644
--- a/server/core/src/server/domainHelper.ts
+++ b/server/core/src/server/domainHelper.ts
@@ -1,3 +1,4 @@
+import { Analytics } from '@hcengineering/analytics'
 import type {
   Doc,
   Domain,
@@ -5,24 +6,32 @@ import type {
   FieldIndex,
   Hierarchy,
   MeasureContext,
-  ModelDb
+  ModelDb,
+  WorkspaceId
 } from '@hcengineering/core'
 import core, { DOMAIN_MODEL, IndexKind, IndexOrder } from '@hcengineering/core'
 import { deepEqual } from 'fast-equals'
 import type { DomainHelper, DomainHelperOperations } from '../adapter'
-import { Analytics } from '@hcengineering/analytics'
 
 export class DomainIndexHelperImpl implements DomainHelper {
   domains = new Map<Domain, Set<string | FieldIndex<Doc>>>()
   domainConfigurations: DomainIndexConfiguration[] = []
   constructor (
+    readonly ctx: MeasureContext,
     readonly hierarchy: Hierarchy,
-    readonly model: ModelDb
+    readonly model: ModelDb,
+    readonly workspaceId: WorkspaceId
   ) {
     const classes = model.findAllSync(core.class.Class, {})
 
-    this.domainConfigurations =
-      model.findAllSync<DomainIndexConfiguration>(core.class.DomainIndexConfiguration, {}) ?? []
+    try {
+      this.domainConfigurations =
+        model.findAllSync<DomainIndexConfiguration>(core.class.DomainIndexConfiguration, {}) ?? []
+    } catch (err: any) {
+      this.domainConfigurations = []
+      Analytics.handleError(err)
+      ctx.error('failed to find domain index configuration', { err })
+    }
 
     this.domains = new Map<Domain, Set<string | FieldIndex<Doc>>>()
     // Find all domains and indexed fields inside
@@ -81,7 +90,7 @@ export class DomainIndexHelperImpl implements DomainHelper {
 
     if (forceCreate && !exists) {
       await operations.create(domain)
-      console.log('collection will be created', domain)
+      ctx.info('collection will be created', domain)
       exists = true
     }
     if (!exists) {
diff --git a/server/core/src/server/index.ts b/server/core/src/server/index.ts
index d5033da857..bddf5e2474 100644
--- a/server/core/src/server/index.ts
+++ b/server/core/src/server/index.ts
@@ -163,7 +163,7 @@ export async function createServerStorage (
     )
   }
 
-  const domainHelper = new DomainIndexHelperImpl(hierarchy, modelDb)
+  const domainHelper = new DomainIndexHelperImpl(metrics, hierarchy, modelDb, conf.workspace)
 
   return new TServerStorage(
     conf.domains,
diff --git a/server/tool/src/index.ts b/server/tool/src/index.ts
index 81d3d49173..66e41a2168 100644
--- a/server/tool/src/index.ts
+++ b/server/tool/src/index.ts
@@ -155,9 +155,16 @@ export async function initModel (
       await progress(30)
 
       // Create update indexes
-      await createUpdateIndexes(ctx, connection, db, logger, async (value) => {
-        await progress(30 + (Math.min(value, 100) / 100) * 70)
-      })
+      await createUpdateIndexes(
+        ctx,
+        connection,
+        db,
+        logger,
+        async (value) => {
+          await progress(30 + (Math.min(value, 100) / 100) * 70)
+        },
+        workspaceId
+      )
       await progress(100)
     } catch (e: any) {
       logger.error('error', { error: e })
@@ -403,9 +410,10 @@ async function createUpdateIndexes (
   connection: CoreClient,
   db: Db,
   logger: ModelLogger,
-  progress: (value: number) => Promise<void>
+  progress: (value: number) => Promise<void>,
+  workspaceId: WorkspaceId
 ): Promise<void> {
-  const domainHelper = new DomainIndexHelperImpl(connection.getHierarchy(), connection.getModel())
+  const domainHelper = new DomainIndexHelperImpl(ctx, connection.getHierarchy(), connection.getModel(), workspaceId)
   const dbHelper = new DBCollectionHelper(db)
   await dbHelper.init()
   let completed = 0