Fix calendar and gmail services ()

Signed-off-by: Denis Bykhov <bykhov.denis@gmail.com>
This commit is contained in:
Denis Bykhov 2024-10-15 00:44:06 +05:00 committed by Andrey Sobolev
parent 3ec53392b0
commit 568dd2f47e
No known key found for this signature in database
GPG Key ID: BD80F68D68D8F7F2
7 changed files with 112 additions and 20 deletions

View File

@ -102,7 +102,6 @@ export class CalendarClient {
await calendarClient.setToken(user)
await calendarClient.refreshToken()
await calendarClient.addClient()
void calendarClient.startSync()
}
return calendarClient
}

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import { Account, Ref } from '@hcengineering/core'
import { Account, RateLimiter, Ref } from '@hcengineering/core'
import { type Db } from 'mongodb'
import { type CalendarClient } from './calendar'
import config from './config'
@ -25,6 +25,7 @@ export class CalendarController {
private readonly credentials: ProjectCredentials
private readonly clients: Map<string, CalendarClient[]> = new Map<string, CalendarClient[]>()
private readonly initLimitter = new RateLimiter(config.InitLimit)
protected static _instance: CalendarController
@ -43,17 +44,57 @@ export class CalendarController {
async startAll (): Promise<void> {
const tokens = await this.mongo.collection<Token>('tokens').find().toArray()
const groups = new Map<string, Token[]>()
console.log('start calendar service', tokens.length)
for (const token of tokens) {
try {
await this.createClient(token)
} catch (err) {
console.error(`Couldn't create client for ${token.workspace} ${token.userId}`)
const group = groups.get(token.workspace)
if (group === undefined) {
groups.set(token.workspace, [token])
} else {
group.push(token)
groups.set(token.workspace, group)
}
}
for (const client of this.workspaces.values()) {
void client.sync()
const limiter = new RateLimiter(config.InitLimit)
for (const [workspace, tokens] of groups) {
await limiter.add(async () => {
const startPromise = this.startWorkspace(workspace, tokens)
const timeoutPromise = new Promise<void>((resolve) => {
setTimeout(() => {
resolve()
}, 60000)
})
await Promise.race([startPromise, timeoutPromise])
})
}
await limiter.waitProcessing()
console.log('Calendar service started')
}
async startWorkspace (workspace: string, tokens: Token[]): Promise<void> {
const workspaceClient = await this.getWorkspaceClient(workspace)
const clients: CalendarClient[] = []
for (const token of tokens) {
try {
const timeout = setTimeout(() => {
console.log('init client hang', token.workspace, token.userId)
}, 60000)
const client = await workspaceClient.createCalendarClient(token)
clearTimeout(timeout)
clients.push(client)
} catch (err) {
console.error(`Couldn't create client for ${workspace} ${token.userId}`)
}
}
for (const client of clients) {
void this.initLimitter.add(async () => {
await client.startSync()
})
}
void workspaceClient.sync()
}
push (email: string, mode: 'events' | 'calendar', calendarId?: string): void {

View File

@ -24,6 +24,7 @@ interface Config {
Credentials: string
WATCH_URL: string
SystemEmail: string
InitLimit: number
}
const envMap: { [key in keyof Config]: string } = {
@ -37,7 +38,8 @@ const envMap: { [key in keyof Config]: string } = {
Secret: 'SECRET',
Credentials: 'Credentials',
SystemEmail: 'SYSTEM_EMAIL',
WATCH_URL: 'WATCH_URL'
WATCH_URL: 'WATCH_URL',
InitLimit: 'INIT_LIMIT'
}
const parseNumber = (str: string | undefined): number | undefined => (str !== undefined ? Number(str) : undefined)
@ -52,6 +54,7 @@ const config: Config = (() => {
Secret: process.env[envMap.Secret],
SystemEmail: process.env[envMap.SystemEmail] ?? 'anticrm@hc.engineering',
Credentials: process.env[envMap.Credentials],
InitLimit: parseNumber(process.env[envMap.InitLimit]) ?? 50,
WATCH_URL: process.env[envMap.WATCH_URL]
}

View File

@ -26,6 +26,7 @@ interface Config {
WATCH_TOPIC_NAME: string
SystemEmail: string
FooterMessage: string
InitLimit: number
}
const envMap: { [key in keyof Config]: string } = {
@ -40,7 +41,8 @@ const envMap: { [key in keyof Config]: string } = {
Credentials: 'Credentials',
SystemEmail: 'SYSTEM_EMAIL',
WATCH_TOPIC_NAME: 'WATCH_TOPIC_NAME',
FooterMessage: 'FOOTER_MESSAGE'
FooterMessage: 'FOOTER_MESSAGE',
InitLimit: 'INIT_LIMIT'
}
const parseNumber = (str: string | undefined): number | undefined => (str !== undefined ? Number(str) : undefined)
@ -56,6 +58,7 @@ const config: Config = (() => {
SystemEmail: process.env[envMap.SystemEmail] ?? 'anticrm@hc.engineering',
Credentials: process.env[envMap.Credentials],
WATCH_TOPIC_NAME: process.env[envMap.WATCH_TOPIC_NAME],
InitLimit: parseNumber(process.env[envMap.InitLimit]) ?? 50,
FooterMessage: process.env[envMap.FooterMessage] ?? '<br><br><p>Sent via <a href="https://huly.io">Huly</a></p>'
}

View File

@ -211,6 +211,7 @@ export class GmailClient {
await this.setToken(token.tokens)
await this.refreshToken()
await this.addClient()
void this.startSync()
void this.getNewMessagesAfterAuth()
const me = await this.getMe()
@ -385,7 +386,6 @@ export class GmailClient {
private async addClient (): Promise<void> {
try {
const me = await this.getMe()
void this.startSync()
const controller = GmailController.getGmailController()
controller.addClient(me, this)
} catch (err) {

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import { MeasureContext } from '@hcengineering/core'
import { MeasureContext, RateLimiter } from '@hcengineering/core'
import type { StorageAdapter } from '@hcengineering/server-core'
import { type Db } from 'mongodb'
@ -28,6 +28,7 @@ export class GmailController {
private readonly credentials: ProjectCredentials
private readonly clients: Map<string, GmailClient[]> = new Map<string, GmailClient[]>()
private readonly initLimitter = new RateLimiter(config.InitLimit)
protected static _instance: GmailController
@ -56,19 +57,57 @@ export class GmailController {
async startAll (): Promise<void> {
const tokens = await this.mongo.collection<Token>('tokens').find().toArray()
const groups = new Map<string, Token[]>()
console.log('start gmail service', tokens.length)
for (const token of tokens) {
try {
await this.createClient(token)
} catch (err) {
console.error(`Couldn't create client for ${token.workspace} ${token.userId}`)
const group = groups.get(token.workspace)
if (group === undefined) {
groups.set(token.workspace, [token])
} else {
group.push(token)
groups.set(token.workspace, group)
}
}
for (const client of this.workspaces.values()) {
void client.checkUsers().then(async () => {
await client.getNewMessages()
const limiter = new RateLimiter(config.InitLimit)
for (const [workspace, tokens] of groups) {
await limiter.add(async () => {
const startPromise = this.startWorkspace(workspace, tokens)
const timeoutPromise = new Promise<void>((resolve) => {
setTimeout(() => {
resolve()
}, 60000)
})
await Promise.race([startPromise, timeoutPromise])
})
}
await limiter.waitProcessing()
}
async startWorkspace (workspace: string, tokens: Token[]): Promise<void> {
const workspaceClient = await this.getWorkspaceClient(workspace)
const clients: GmailClient[] = []
for (const token of tokens) {
try {
const timeout = setTimeout(() => {
console.log('init client hang', token.workspace, token.userId)
}, 60000)
const client = await workspaceClient.createGmailClient(token)
clearTimeout(timeout)
clients.push(client)
} catch (err) {
console.error(`Couldn't create client for ${workspace} ${token.userId}`)
}
}
for (const client of clients) {
void this.initLimitter.add(async () => {
await client.startSync()
})
}
void workspaceClient.checkUsers().then(async () => {
await workspaceClient.getNewMessages()
})
}
push (message: string): void {
@ -121,8 +160,10 @@ export class GmailController {
let res = this.workspaces.get(workspace)
if (res === undefined) {
try {
console.log('create workspace worker for', workspace)
res = await WorkspaceClient.create(this.ctx, this.credentials, this.mongo, this.storageAdapter, workspace)
this.workspaces.set(workspace, res)
console.log('created workspace worker for', workspace)
} catch (err) {
console.error(`Couldn't create workspace worker for ${workspace}, reason: `, err)
throw err

View File

@ -159,11 +159,15 @@ export class WorkspaceClient {
const newMessages = await this.client.findAll(gmailP.class.NewMessage, {
status: 'new'
})
console.log('get new messages, recieved', this.workspace, newMessages.length)
await this.subscribeMessages()
for (const message of newMessages) {
const client = this.getGmailClient(message.from ?? message.createdBy ?? message.modifiedBy)
const from = message.from ?? message.createdBy ?? message.modifiedBy
const client = this.getGmailClient(from)
if (client !== undefined) {
await client.createMessage(message)
} else {
console.log('client not found, skip message', this.workspace, from, message._id)
}
}
}
@ -342,6 +346,7 @@ export class WorkspaceClient {
await this.txEmployeeHandler(tx)
}
})
console.log('deactivate users', this.workspace, accounts.length)
}
private async deactivateUser (acc: PersonAccount): Promise<void> {