From 81e02eb8ca8513bf36abfbcf158414887f12784a Mon Sep 17 00:00:00 2001
From: Denis Bykhov <bykhov.denis@gmail.com>
Date: Tue, 15 Oct 2024 00:44:06 +0500
Subject: [PATCH] Fix calendar and gmail services (#6918)

Signed-off-by: Denis Bykhov <bykhov.denis@gmail.com>
---
 .../calendar/pod-calendar/src/calendar.ts     |  1 -
 .../pod-calendar/src/calendarController.ts    | 55 +++++++++++++++---
 services/calendar/pod-calendar/src/config.ts  |  5 +-
 services/gmail/pod-gmail/src/config.ts        |  5 +-
 services/gmail/pod-gmail/src/gmail.ts         |  2 +-
 .../gmail/pod-gmail/src/gmailController.ts    | 57 ++++++++++++++++---
 .../gmail/pod-gmail/src/workspaceClient.ts    |  7 ++-
 7 files changed, 112 insertions(+), 20 deletions(-)

diff --git a/services/calendar/pod-calendar/src/calendar.ts b/services/calendar/pod-calendar/src/calendar.ts
index 05b0672ae3..3e1b75b90a 100644
--- a/services/calendar/pod-calendar/src/calendar.ts
+++ b/services/calendar/pod-calendar/src/calendar.ts
@@ -102,7 +102,6 @@ export class CalendarClient {
       await calendarClient.setToken(user)
       await calendarClient.refreshToken()
       await calendarClient.addClient()
-      void calendarClient.startSync()
     }
     return calendarClient
   }
diff --git a/services/calendar/pod-calendar/src/calendarController.ts b/services/calendar/pod-calendar/src/calendarController.ts
index b13d08fe23..55b5f2e3e6 100644
--- a/services/calendar/pod-calendar/src/calendarController.ts
+++ b/services/calendar/pod-calendar/src/calendarController.ts
@@ -13,7 +13,7 @@
 // limitations under the License.
 //
 
-import { Account, Ref } from '@hcengineering/core'
+import { Account, RateLimiter, Ref } from '@hcengineering/core'
 import { type Db } from 'mongodb'
 import { type CalendarClient } from './calendar'
 import config from './config'
@@ -25,6 +25,7 @@ export class CalendarController {
 
   private readonly credentials: ProjectCredentials
   private readonly clients: Map<string, CalendarClient[]> = new Map<string, CalendarClient[]>()
+  private readonly initLimitter = new RateLimiter(config.InitLimit)
 
   protected static _instance: CalendarController
 
@@ -43,17 +44,57 @@ export class CalendarController {
 
   async startAll (): Promise<void> {
     const tokens = await this.mongo.collection<Token>('tokens').find().toArray()
+    const groups = new Map<string, Token[]>()
+    console.log('start calendar service', tokens.length)
     for (const token of tokens) {
-      try {
-        await this.createClient(token)
-      } catch (err) {
-        console.error(`Couldn't create client for ${token.workspace} ${token.userId}`)
+      const group = groups.get(token.workspace)
+      if (group === undefined) {
+        groups.set(token.workspace, [token])
+      } else {
+        group.push(token)
+        groups.set(token.workspace, group)
       }
     }
 
-    for (const client of this.workspaces.values()) {
-      void client.sync()
+    const limiter = new RateLimiter(config.InitLimit)
+
+    for (const [workspace, tokens] of groups) {
+      await limiter.add(async () => {
+        const startPromise = this.startWorkspace(workspace, tokens)
+        const timeoutPromise = new Promise<void>((resolve) => {
+          setTimeout(() => {
+            resolve()
+          }, 60000)
+        })
+        await Promise.race([startPromise, timeoutPromise])
+      })
     }
+
+    await limiter.waitProcessing()
+    console.log('Calendar service started')
+  }
+
+  async startWorkspace (workspace: string, tokens: Token[]): Promise<void> {
+    const workspaceClient = await this.getWorkspaceClient(workspace)
+    const clients: CalendarClient[] = []
+    for (const token of tokens) {
+      try {
+        const timeout = setTimeout(() => {
+          console.log('init client hang', token.workspace, token.userId)
+        }, 60000)
+        const client = await workspaceClient.createCalendarClient(token)
+        clearTimeout(timeout)
+        clients.push(client)
+      } catch (err) {
+        console.error(`Couldn't create client for ${workspace} ${token.userId}`)
+      }
+    }
+    for (const client of clients) {
+      void this.initLimitter.add(async () => {
+        await client.startSync()
+      })
+    }
+    void workspaceClient.sync()
   }
 
   push (email: string, mode: 'events' | 'calendar', calendarId?: string): void {
diff --git a/services/calendar/pod-calendar/src/config.ts b/services/calendar/pod-calendar/src/config.ts
index 1647c9cfd9..e93fb08446 100644
--- a/services/calendar/pod-calendar/src/config.ts
+++ b/services/calendar/pod-calendar/src/config.ts
@@ -24,6 +24,7 @@ interface Config {
   Credentials: string
   WATCH_URL: string
   SystemEmail: string
+  InitLimit: number
 }
 
 const envMap: { [key in keyof Config]: string } = {
@@ -37,7 +38,8 @@ const envMap: { [key in keyof Config]: string } = {
   Secret: 'SECRET',
   Credentials: 'Credentials',
   SystemEmail: 'SYSTEM_EMAIL',
-  WATCH_URL: 'WATCH_URL'
+  WATCH_URL: 'WATCH_URL',
+  InitLimit: 'INIT_LIMIT'
 }
 
 const parseNumber = (str: string | undefined): number | undefined => (str !== undefined ? Number(str) : undefined)
@@ -52,6 +54,7 @@ const config: Config = (() => {
     Secret: process.env[envMap.Secret],
     SystemEmail: process.env[envMap.SystemEmail] ?? 'anticrm@hc.engineering',
     Credentials: process.env[envMap.Credentials],
+    InitLimit: parseNumber(process.env[envMap.InitLimit]) ?? 50,
     WATCH_URL: process.env[envMap.WATCH_URL]
   }
 
diff --git a/services/gmail/pod-gmail/src/config.ts b/services/gmail/pod-gmail/src/config.ts
index faba0c357b..3d98317309 100644
--- a/services/gmail/pod-gmail/src/config.ts
+++ b/services/gmail/pod-gmail/src/config.ts
@@ -26,6 +26,7 @@ interface Config {
   WATCH_TOPIC_NAME: string
   SystemEmail: string
   FooterMessage: string
+  InitLimit: number
 }
 
 const envMap: { [key in keyof Config]: string } = {
@@ -40,7 +41,8 @@ const envMap: { [key in keyof Config]: string } = {
   Credentials: 'Credentials',
   SystemEmail: 'SYSTEM_EMAIL',
   WATCH_TOPIC_NAME: 'WATCH_TOPIC_NAME',
-  FooterMessage: 'FOOTER_MESSAGE'
+  FooterMessage: 'FOOTER_MESSAGE',
+  InitLimit: 'INIT_LIMIT'
 }
 
 const parseNumber = (str: string | undefined): number | undefined => (str !== undefined ? Number(str) : undefined)
@@ -56,6 +58,7 @@ const config: Config = (() => {
     SystemEmail: process.env[envMap.SystemEmail] ?? 'anticrm@hc.engineering',
     Credentials: process.env[envMap.Credentials],
     WATCH_TOPIC_NAME: process.env[envMap.WATCH_TOPIC_NAME],
+    InitLimit: parseNumber(process.env[envMap.InitLimit]) ?? 50,
     FooterMessage: process.env[envMap.FooterMessage] ?? '<br><br><p>Sent via <a href="https://huly.io">Huly</a></p>'
   }
 
diff --git a/services/gmail/pod-gmail/src/gmail.ts b/services/gmail/pod-gmail/src/gmail.ts
index f8d84a1789..8d7c80be4e 100644
--- a/services/gmail/pod-gmail/src/gmail.ts
+++ b/services/gmail/pod-gmail/src/gmail.ts
@@ -211,6 +211,7 @@ export class GmailClient {
     await this.setToken(token.tokens)
     await this.refreshToken()
     await this.addClient()
+    void this.startSync()
     void this.getNewMessagesAfterAuth()
 
     const me = await this.getMe()
@@ -385,7 +386,6 @@ export class GmailClient {
   private async addClient (): Promise<void> {
     try {
       const me = await this.getMe()
-      void this.startSync()
       const controller = GmailController.getGmailController()
       controller.addClient(me, this)
     } catch (err) {
diff --git a/services/gmail/pod-gmail/src/gmailController.ts b/services/gmail/pod-gmail/src/gmailController.ts
index 62a29286e4..8be43cdd5c 100644
--- a/services/gmail/pod-gmail/src/gmailController.ts
+++ b/services/gmail/pod-gmail/src/gmailController.ts
@@ -13,7 +13,7 @@
 // limitations under the License.
 //
 
-import { MeasureContext } from '@hcengineering/core'
+import { MeasureContext, RateLimiter } from '@hcengineering/core'
 import type { StorageAdapter } from '@hcengineering/server-core'
 
 import { type Db } from 'mongodb'
@@ -28,6 +28,7 @@ export class GmailController {
 
   private readonly credentials: ProjectCredentials
   private readonly clients: Map<string, GmailClient[]> = new Map<string, GmailClient[]>()
+  private readonly initLimitter = new RateLimiter(config.InitLimit)
 
   protected static _instance: GmailController
 
@@ -56,19 +57,57 @@ export class GmailController {
 
   async startAll (): Promise<void> {
     const tokens = await this.mongo.collection<Token>('tokens').find().toArray()
+    const groups = new Map<string, Token[]>()
+    console.log('start gmail service', tokens.length)
     for (const token of tokens) {
-      try {
-        await this.createClient(token)
-      } catch (err) {
-        console.error(`Couldn't create client for ${token.workspace} ${token.userId}`)
+      const group = groups.get(token.workspace)
+      if (group === undefined) {
+        groups.set(token.workspace, [token])
+      } else {
+        group.push(token)
+        groups.set(token.workspace, group)
       }
     }
 
-    for (const client of this.workspaces.values()) {
-      void client.checkUsers().then(async () => {
-        await client.getNewMessages()
+    const limiter = new RateLimiter(config.InitLimit)
+    for (const [workspace, tokens] of groups) {
+      await limiter.add(async () => {
+        const startPromise = this.startWorkspace(workspace, tokens)
+        const timeoutPromise = new Promise<void>((resolve) => {
+          setTimeout(() => {
+            resolve()
+          }, 60000)
+        })
+        await Promise.race([startPromise, timeoutPromise])
       })
     }
+
+    await limiter.waitProcessing()
+  }
+
+  async startWorkspace (workspace: string, tokens: Token[]): Promise<void> {
+    const workspaceClient = await this.getWorkspaceClient(workspace)
+    const clients: GmailClient[] = []
+    for (const token of tokens) {
+      try {
+        const timeout = setTimeout(() => {
+          console.log('init client hang', token.workspace, token.userId)
+        }, 60000)
+        const client = await workspaceClient.createGmailClient(token)
+        clearTimeout(timeout)
+        clients.push(client)
+      } catch (err) {
+        console.error(`Couldn't create client for ${workspace} ${token.userId}`)
+      }
+    }
+    for (const client of clients) {
+      void this.initLimitter.add(async () => {
+        await client.startSync()
+      })
+    }
+    void workspaceClient.checkUsers().then(async () => {
+      await workspaceClient.getNewMessages()
+    })
   }
 
   push (message: string): void {
@@ -121,8 +160,10 @@ export class GmailController {
     let res = this.workspaces.get(workspace)
     if (res === undefined) {
       try {
+        console.log('create workspace worker for', workspace)
         res = await WorkspaceClient.create(this.ctx, this.credentials, this.mongo, this.storageAdapter, workspace)
         this.workspaces.set(workspace, res)
+        console.log('created workspace worker for', workspace)
       } catch (err) {
         console.error(`Couldn't create workspace worker for ${workspace}, reason: `, err)
         throw err
diff --git a/services/gmail/pod-gmail/src/workspaceClient.ts b/services/gmail/pod-gmail/src/workspaceClient.ts
index d18c7c78c5..c5df09a49a 100644
--- a/services/gmail/pod-gmail/src/workspaceClient.ts
+++ b/services/gmail/pod-gmail/src/workspaceClient.ts
@@ -159,11 +159,15 @@ export class WorkspaceClient {
     const newMessages = await this.client.findAll(gmailP.class.NewMessage, {
       status: 'new'
     })
+    console.log('get new messages, recieved', this.workspace, newMessages.length)
     await this.subscribeMessages()
     for (const message of newMessages) {
-      const client = this.getGmailClient(message.from ?? message.createdBy ?? message.modifiedBy)
+      const from = message.from ?? message.createdBy ?? message.modifiedBy
+      const client = this.getGmailClient(from)
       if (client !== undefined) {
         await client.createMessage(message)
+      } else {
+        console.log('client not found, skip message', this.workspace, from, message._id)
       }
     }
   }
@@ -342,6 +346,7 @@ export class WorkspaceClient {
         await this.txEmployeeHandler(tx)
       }
     })
+    console.log('deactivate users', this.workspace, accounts.length)
   }
 
   private async deactivateUser (acc: PersonAccount): Promise<void> {