Fix pushes for new clients in calendar (#8115)

Signed-off-by: Denis Bykhov <bykhov.denis@gmail.com>
This commit is contained in:
Denis Bykhov 2025-02-28 12:46:46 +05:00 committed by GitHub
parent b858dd7696
commit 143cf905b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 91 additions and 79 deletions

View File

@ -241,6 +241,7 @@ export class CalendarClient {
} }
close (): void { close (): void {
clearTimeout(this.inactiveTimer)
this.googleClient.close() this.googleClient.close()
for (const watch of this.dummyWatches) { for (const watch of this.dummyWatches) {
clearTimeout(watch.timer) clearTimeout(watch.timer)

View File

@ -108,6 +108,12 @@ export class CalendarController {
} }
} }
pushWorkspaceByEmail (email: string, workspace: string): void {
const arr = this.workspacesByEmail.get(email) ?? []
arr.push(workspace)
this.workspacesByEmail.set(email, arr)
}
async startWorkspace (workspace: string, tokens: Token[]): Promise<WorkspaceClient> { async startWorkspace (workspace: string, tokens: Token[]): Promise<WorkspaceClient> {
const workspaceClient = await this.getWorkspaceClient(workspace) const workspaceClient = await this.getWorkspaceClient(workspace)
for (const token of tokens) { for (const token of tokens) {
@ -117,9 +123,7 @@ export class CalendarController {
}, 60000) }, 60000)
console.log('init client', token.workspace, token.userId) console.log('init client', token.workspace, token.userId)
await workspaceClient.createCalendarClient(token, true) await workspaceClient.createCalendarClient(token, true)
const arr = this.workspacesByEmail.get(token.email) ?? [] this.pushWorkspaceByEmail(token.email, token.workspace)
arr.push(token.workspace)
this.workspacesByEmail.set(token.email, arr)
clearTimeout(timeout) clearTimeout(timeout)
} catch (err) { } catch (err) {
console.error(`Couldn't create client for ${workspace} ${token.userId} ${token.email}`) console.error(`Couldn't create client for ${workspace} ${token.userId} ${token.email}`)

View File

@ -212,42 +212,46 @@ export class GoogleClient {
workspace: this.user.workspace, workspace: this.user.workspace,
calendarId: null calendarId: null
}) })
if (current != null) { if (current == null || current.expired < Date.now() + 24 * 60 * 60 * 1000) {
await this.rateLimiter.take(1)
try {
await this.calendar.channels.stop({ requestBody: { id: current.channelId, resourceId: current.resourceId } })
} catch {}
}
const channelId = generateId()
const me = await this.getMe()
const body = { id: channelId, address: config.WATCH_URL, type: 'webhook', token: `user=${me}&mode=calendar` }
await this.rateLimiter.take(1)
const res = await this.calendar.calendarList.watch({ requestBody: body })
if (res.data.expiration != null && res.data.resourceId !== null) {
if (current != null) { if (current != null) {
await this.watches.updateOne( await this.rateLimiter.take(1)
{ try {
userId: this.user.userId, await this.calendar.channels.stop({
workspace: this.user.workspace, requestBody: { id: current.channelId, resourceId: current.resourceId }
calendarId: null })
}, } catch {}
{ }
$set: { const channelId = generateId()
channelId, const me = await this.getMe()
expired: Number.parseInt(res.data.expiration), const body = { id: channelId, address: config.WATCH_URL, type: 'webhook', token: `user=${me}&mode=calendar` }
resourceId: res.data.resourceId ?? '' await this.rateLimiter.take(1)
const res = await this.calendar.calendarList.watch({ requestBody: body })
if (res.data.expiration != null && res.data.resourceId !== null) {
if (current != null) {
await this.watches.updateOne(
{
userId: this.user.userId,
workspace: this.user.workspace,
calendarId: null
},
{
$set: {
channelId,
expired: Number.parseInt(res.data.expiration),
resourceId: res.data.resourceId ?? ''
}
} }
} )
) } else {
} else { await this.watches.insertOne({
await this.watches.insertOne({ calendarId: null,
calendarId: null, channelId,
channelId, expired: Number.parseInt(res.data.expiration),
expired: Number.parseInt(res.data.expiration), resourceId: res.data.resourceId ?? '',
resourceId: res.data.resourceId ?? '', userId: this.user.userId,
userId: this.user.userId, workspace: this.user.workspace
workspace: this.user.workspace })
}) }
} }
} }
} catch (err: any) { } catch (err: any) {
@ -262,49 +266,51 @@ export class GoogleClient {
workspace: this.user.workspace, workspace: this.user.workspace,
calendarId calendarId
}) })
if (current != null) { if (current == null || current.expired < Date.now() + 24 * 60 * 60 * 1000) {
await this.rateLimiter.take(1)
try {
await this.calendar.channels.stop({
requestBody: { id: current.channelId, resourceId: current.resourceId }
})
} catch {}
}
const channelId = generateId()
const me = await this.getMe()
const body = {
id: channelId,
address: config.WATCH_URL,
type: 'webhook',
token: `user=${me}&mode=events&calendarId=${calendarId}`
}
await this.rateLimiter.take(1)
const res = await this.calendar.events.watch({ calendarId, requestBody: body })
if (res.data.expiration != null && res.data.resourceId != null) {
if (current != null) { if (current != null) {
await this.watches.updateOne( await this.rateLimiter.take(1)
{ try {
userId: this.user.userId, await this.calendar.channels.stop({
workspace: this.user.workspace, requestBody: { id: current.channelId, resourceId: current.resourceId }
calendarId })
}, } catch {}
{ }
$set: { const channelId = generateId()
channelId, const me = await this.getMe()
expired: Number.parseInt(res.data.expiration), const body = {
resourceId: res.data.resourceId ?? '' id: channelId,
address: config.WATCH_URL,
type: 'webhook',
token: `user=${me}&mode=events&calendarId=${calendarId}`
}
await this.rateLimiter.take(1)
const res = await this.calendar.events.watch({ calendarId, requestBody: body })
if (res.data.expiration != null && res.data.resourceId != null) {
if (current != null) {
await this.watches.updateOne(
{
userId: this.user.userId,
workspace: this.user.workspace,
calendarId
},
{
$set: {
channelId,
expired: Number.parseInt(res.data.expiration),
resourceId: res.data.resourceId ?? ''
}
} }
} )
) } else {
} else { await this.watches.insertOne({
await this.watches.insertOne({ calendarId,
calendarId, channelId,
channelId, expired: Number.parseInt(res.data.expiration),
expired: Number.parseInt(res.data.expiration), resourceId: res.data.resourceId ?? '',
resourceId: res.data.resourceId ?? '', userId: this.user.userId,
userId: this.user.userId, workspace: this.user.workspace
workspace: this.user.workspace })
}) }
} }
} }
return true return true
@ -312,7 +318,7 @@ export class GoogleClient {
if (err?.errors?.[0]?.reason === 'pushNotSupportedForRequestedResource') { if (err?.errors?.[0]?.reason === 'pushNotSupportedForRequestedResource') {
return false return false
} else { } else {
console.error('Watch error', err.message) console.error('Watch error', err)
await this.checkError(err) await this.checkError(err)
return false return false
} }

View File

@ -116,6 +116,7 @@ export class WorkspaceClient {
throw new Error('Client already exist') throw new Error('Client already exist')
} }
this.clients.set(email, newClient) this.clients.set(email, newClient)
this.serviceController.pushWorkspaceByEmail(email, user.workspace)
return newClient return newClient
} }