From 5d8c1a6b76954e8c6050623f1dbd540dea1ab782 Mon Sep 17 00:00:00 2001
From: Andrey Sobolev <haiodo@users.noreply.github.com>
Date: Wed, 15 Nov 2023 11:48:49 +0700
Subject: [PATCH] UBERF-4287: Fix Indexer peak memory usage (#3993)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
---
 server/core/src/indexer/indexer.ts | 148 +++++++++++++++--------------
 1 file changed, 75 insertions(+), 73 deletions(-)

diff --git a/server/core/src/indexer/indexer.ts b/server/core/src/indexer/indexer.ts
index 8511815cd6..785d8dd296 100644
--- a/server/core/src/indexer/indexer.ts
+++ b/server/core/src/indexer/indexer.ts
@@ -559,91 +559,93 @@ export class FullTextIndexPipeline implements FullTextPipeline {
   }
 
   async checkIndexConsistency (dbStorage: ServerStorage): Promise<void> {
-    if (process.env.MODEL_VERSION !== undefined) {
-      const modelVersion = (await this.model.findAll(core.class.Version, {}))[0]
-      if (modelVersion !== undefined) {
-        const modelVersionString = versionToString(modelVersion)
-        if (modelVersionString !== process.env.MODEL_VERSION) {
-          console.error(
-            `Indexer: Model version mismatch model: ${modelVersionString} env: ${process.env.MODEL_VERSION}`
-          )
-          return
+    await rateLimitter.exec(async () => {
+      if (process.env.MODEL_VERSION !== undefined) {
+        const modelVersion = (await this.model.findAll(core.class.Version, {}))[0]
+        if (modelVersion !== undefined) {
+          const modelVersionString = versionToString(modelVersion)
+          if (modelVersionString !== process.env.MODEL_VERSION) {
+            console.error(
+              `Indexer: Model version mismatch model: ${modelVersionString} env: ${process.env.MODEL_VERSION}`
+            )
+            return
+          }
         }
       }
-    }
 
-    this.hierarchy.domains()
-    const allClasses = this.hierarchy.getDescendants(core.class.Doc)
-    for (const c of allClasses) {
-      if (this.cancelling) {
-        return
-      }
-
-      if (!isClassIndexable(this.hierarchy, c)) {
-        // No need, since no indexable fields or attachments.
-        continue
-      }
-
-      console.log(this.workspace.name, 'checking index', c)
-
-      // All saved state documents
-      const states = (
-        await this.storage.findAll(core.class.DocIndexState, { objectClass: c }, { projection: { _id: 1 } })
-      ).map((it) => it._id)
-
-      while (true) {
+      this.hierarchy.domains()
+      const allClasses = this.hierarchy.getDescendants(core.class.Doc)
+      for (const c of allClasses) {
         if (this.cancelling) {
           return
         }
-        const newDocs: DocIndexState[] = (
-          await dbStorage.findAll<Doc>(
-            this.metrics,
-            c,
-            { _class: c, _id: { $nin: states } },
-            { limit: 1000, projection: { _id: 1, attachedTo: 1, attachedToClass: 1 } as any }
-          )
-        ).map((it) => {
-          return createStateDoc(it._id, c, {
-            stages: {},
-            attributes: {},
-            removed: false,
-            space: it.space,
-            attachedTo: (it as AttachedDoc)?.attachedTo ?? undefined,
-            attachedToClass: (it as AttachedDoc)?.attachedToClass ?? undefined
+
+        if (!isClassIndexable(this.hierarchy, c)) {
+          // No need, since no indexable fields or attachments.
+          continue
+        }
+
+        console.log(this.workspace.name, 'checking index', c)
+
+        // All saved state documents
+        const states = (
+          await this.storage.findAll(core.class.DocIndexState, { objectClass: c }, { projection: { _id: 1 } })
+        ).map((it) => it._id)
+
+        while (true) {
+          if (this.cancelling) {
+            return
+          }
+          const newDocs: DocIndexState[] = (
+            await dbStorage.findAll<Doc>(
+              this.metrics,
+              c,
+              { _class: c, _id: { $nin: states } },
+              { limit: 1000, projection: { _id: 1, attachedTo: 1, attachedToClass: 1 } as any }
+            )
+          ).map((it) => {
+            return createStateDoc(it._id, c, {
+              stages: {},
+              attributes: {},
+              removed: false,
+              space: it.space,
+              attachedTo: (it as AttachedDoc)?.attachedTo ?? undefined,
+              attachedToClass: (it as AttachedDoc)?.attachedToClass ?? undefined
+            })
           })
-        })
 
-        states.push(...newDocs.map((it) => it._id))
+          states.push(...newDocs.map((it) => it._id))
 
-        if (newDocs.length === 0) {
-          // All updated for this class
-          break
-        }
-
-        try {
-          await this.storage.upload(DOMAIN_DOC_INDEX_STATE, newDocs)
-        } catch (err: any) {
-          console.error(err)
+          if (newDocs.length === 0) {
+            // All updated for this class
+            break
+          }
+
+          try {
+            await this.storage.upload(DOMAIN_DOC_INDEX_STATE, newDocs)
+          } catch (err: any) {
+            console.error(err)
+          }
         }
+        const statesSet = new Set(states)
+        const docIds = (await dbStorage.findAll<Doc>(this.metrics, c, { _class: c }, { projection: { _id: 1 } }))
+          .filter((it) => !statesSet.has(it._id as Ref<DocIndexState>))
+          .map((it) => it._id)
+        await this.storage.clean(DOMAIN_DOC_INDEX_STATE, docIds)
       }
-      const statesSet = new Set(states)
-      const docIds = (await dbStorage.findAll<Doc>(this.metrics, c, { _class: c }, { projection: { _id: 1 } }))
-        .filter((it) => !statesSet.has(it._id as Ref<DocIndexState>))
-        .map((it) => it._id)
-      await this.storage.clean(DOMAIN_DOC_INDEX_STATE, docIds)
-    }
 
-    // Clean for non existing classes
+      // Clean for non existing classes
 
-    const unknownClasses = (
-      await this.storage.findAll(
-        core.class.DocIndexState,
-        { objectClass: { $nin: allClasses } },
-        { projection: { _id: 1 } }
-      )
-    ).map((it) => it._id)
-    if (unknownClasses.length > 0) {
-      await this.storage.clean(DOMAIN_DOC_INDEX_STATE, unknownClasses)
-    }
+      const unknownClasses = (
+        await this.storage.findAll(
+          core.class.DocIndexState,
+          { objectClass: { $nin: allClasses } },
+          { projection: { _id: 1 } }
+        )
+      ).map((it) => it._id)
+      if (unknownClasses.length > 0) {
+        await this.storage.clean(DOMAIN_DOC_INDEX_STATE, unknownClasses)
+      }
+    })
   }
 }