From 44cecc1a7309ae83600df020234c8b33507ab24a Mon Sep 17 00:00:00 2001 From: Denis Bykhov Date: Tue, 6 May 2025 15:04:52 +0500 Subject: [PATCH] Rewrite calendar service (#8851) Signed-off-by: Denis Bykhov --- server/account/src/utils.ts | 10 +- services/calendar/pod-calendar/package.json | 3 + services/calendar/pod-calendar/src/auth.ts | 295 ++++++ .../calendar/pod-calendar/src/calendar.ts | 862 ++---------------- .../pod-calendar/src/calendarController.ts | 218 ++--- services/calendar/pod-calendar/src/config.ts | 7 +- .../calendar/pod-calendar/src/googleClient.ts | 309 ------- .../src/{storage.ts => integrations.ts} | 24 +- .../calendar/pod-calendar/src/kvsUtils.ts | 79 ++ services/calendar/pod-calendar/src/main.ts | 78 +- .../calendar/pod-calendar/src/pushHandler.ts | 56 ++ .../calendar/pod-calendar/src/rateLimiter.ts | 13 + services/calendar/pod-calendar/src/sync.ts | 660 ++++++++++++++ services/calendar/pod-calendar/src/tokens.ts | 29 + services/calendar/pod-calendar/src/types.ts | 15 +- services/calendar/pod-calendar/src/utils.ts | 43 +- services/calendar/pod-calendar/src/watch.ts | 257 +++--- .../pod-calendar/src/workspaceClient.ts | 754 +++------------ 18 files changed, 1670 insertions(+), 2042 deletions(-) create mode 100644 services/calendar/pod-calendar/src/auth.ts delete mode 100644 services/calendar/pod-calendar/src/googleClient.ts rename services/calendar/pod-calendar/src/{storage.ts => integrations.ts} (50%) create mode 100644 services/calendar/pod-calendar/src/kvsUtils.ts create mode 100644 services/calendar/pod-calendar/src/pushHandler.ts create mode 100644 services/calendar/pod-calendar/src/sync.ts create mode 100644 services/calendar/pod-calendar/src/tokens.ts diff --git a/server/account/src/utils.ts b/server/account/src/utils.ts index 9022f6ac35..e54048a177 100644 --- a/server/account/src/utils.ts +++ b/server/account/src/utils.ts @@ -1475,7 +1475,15 @@ export async function setTimezoneIfNotDefined ( } // Move to config? -export const integrationServices = ['github', 'telegram-bot', 'telegram', 'mailbox', 'caldav', 'gmail'] +export const integrationServices = [ + 'github', + 'telegram-bot', + 'telegram', + 'mailbox', + 'caldav', + 'gmail', + 'google-calendar' +] export async function findExistingIntegration ( account: AccountUuid, diff --git a/services/calendar/pod-calendar/package.json b/services/calendar/pod-calendar/package.json index 033ab63b77..d2cbfaa61f 100644 --- a/services/calendar/pod-calendar/package.json +++ b/services/calendar/pod-calendar/package.json @@ -65,6 +65,9 @@ "@hcengineering/setting": "^0.6.17", "@hcengineering/text": "^0.6.5", "@hcengineering/server-client": "^0.6.0", + "@hcengineering/server-core": "^0.6.1", + "@hcengineering/analytics-service": "^0.6.0", + "@hcengineering/kvs-client": "^0.6.0", "@hcengineering/server-token": "^0.6.11", "dotenv": "~16.0.0", "cors": "^2.8.5", diff --git a/services/calendar/pod-calendar/src/auth.ts b/services/calendar/pod-calendar/src/auth.ts new file mode 100644 index 0000000000..02c453cec7 --- /dev/null +++ b/services/calendar/pod-calendar/src/auth.ts @@ -0,0 +1,295 @@ +// +// Copyright © 2025 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { AccountClient, IntegrationSecret } from '@hcengineering/account-client' +import calendar from '@hcengineering/calendar' +import contact, { getPrimarySocialId } from '@hcengineering/contact' +import core, { AccountUuid, MeasureContext, PersonId, TxOperations, WorkspaceUuid } from '@hcengineering/core' +import setting from '@hcengineering/setting' +import { Credentials, OAuth2Client } from 'google-auth-library' +import { calendar_v3, google } from 'googleapis' +import { encode64 } from './base64' +import { getClient } from './client' +import { addUserByEmail, removeUserByEmail } from './kvsUtils' +import { IncomingSyncManager, lock } from './sync' +import { CALENDAR_INTEGRATION, GoogleEmail, SCOPES, State, Token, User } from './types' +import { getGoogleClient, getServiceToken } from './utils' +import { WatchController } from './watch' + +interface AuthResult { + success: boolean + email: GoogleEmail +} + +export class AuthController { + private readonly oAuth2Client: OAuth2Client + private readonly googleClient: calendar_v3.Calendar + email: GoogleEmail | undefined + + private constructor ( + private readonly ctx: MeasureContext, + private readonly accountClient: AccountClient, + private readonly client: TxOperations, + private readonly user: User + ) { + const res = getGoogleClient() + this.googleClient = res.google + this.oAuth2Client = res.auth + } + + static async createAndSync ( + ctx: MeasureContext, + accountClient: AccountClient, + state: State, + code: string + ): Promise { + await ctx.with('Create auth controller', { workspace: state.workspace, user: state.userId }, async () => { + const mutex = await lock(`${state.workspace}:${state.userId}`) + try { + const client = await getClient(getServiceToken()) + const txOp = new TxOperations(client, core.account.System) + const controller = new AuthController(ctx, accountClient, txOp, state) + await controller.process(code) + } finally { + mutex() + } + }) + } + + static async signout ( + ctx: MeasureContext, + accountClient: AccountClient, + userId: PersonId, + workspace: WorkspaceUuid, + value: GoogleEmail + ): Promise { + await ctx.with('Signout auth controller', { workspace, userId }, async () => { + const mutex = await lock(`${workspace}:${userId}`) + try { + const client = await getClient(getServiceToken()) + const txOp = new TxOperations(client, core.account.System) + const controller = new AuthController(ctx, accountClient, txOp, { + userId, + workspace + }) + await controller.signout(value) + } catch (err) { + ctx.error('signout', { workspace, userId, err }) + } finally { + mutex() + } + }) + } + + private async signout (value: GoogleEmail): Promise { + const integration = await this.client.findOne(setting.class.Integration, { + type: calendar.integrationType.Calendar, + value + }) + if (integration !== undefined) { + await this.client.remove(integration) + } + await removeUserByEmail(this.user, value) + const data = { + kind: CALENDAR_INTEGRATION, + workspaceUuid: this.user.workspace, + key: value, + socialId: this.user.userId + } + const secret = await this.accountClient.getIntegrationSecret(data) + if (secret == null) return + const token = JSON.parse(secret.secret) + const watchController = WatchController.get(this.accountClient) + await watchController.unsubscribe(token) + await this.accountClient.deleteIntegrationSecret(data) + const left = await this.accountClient.listIntegrationsSecrets(data) + if (left.length === 0) { + await this.accountClient.deleteIntegration(data) + } + } + + private async process (code: string): Promise { + const authRes = await this.authorize(code) + await this.setWorkspaceIntegration(authRes) + if (authRes.success) { + void IncomingSyncManager.sync( + this.ctx, + this.accountClient, + this.client, + this.user, + authRes.email, + this.googleClient + ) + } + } + + private async getEmail (): Promise { + if (this.email !== undefined) return this.email + const info = await google.oauth2({ version: 'v2', auth: this.oAuth2Client }).userinfo.get() + const email = info.data.email + if (email == null) { + throw new Error('Email not found') + } + this.email = email as GoogleEmail + return this.email + } + + private async authorize (code: string): Promise { + const token = await this.oAuth2Client.getToken(code) + this.oAuth2Client.setCredentials(token.tokens) + const email = await this.getEmail() + const providedScopes = token.tokens.scope?.split(' ') ?? [] + for (const scope of SCOPES) { + if (providedScopes.findIndex((p) => p === scope) === -1) { + this.ctx.error(`Not all scopes provided, provided: ${providedScopes.join(', ')} required: ${SCOPES.join(', ')}`) + return { success: false, email } + } + } + const res = await this.oAuth2Client.refreshAccessToken() + await this.createAccIntegrationIfNotExists() + await this.updateToken(res.credentials, email) + + return { success: true, email } + } + + private async setWorkspaceIntegration (res: AuthResult): Promise { + await this.ctx.with( + 'Set workspace integration', + { user: this.user.userId, workspace: this.user.workspace, email: res.email }, + async () => { + const integrations = await this.client.findAll(setting.class.Integration, { + createdBy: this.user.userId, + type: calendar.integrationType.Calendar + }) + + const updated = integrations.find((p) => p.disabled && p.value === res.email) + for (const integration of integrations.filter((p) => p.value === '')) { + await this.client.remove(integration) + } + if (!res.success) { + if (updated !== undefined) { + await this.client.update(updated, { + disabled: true, + error: calendar.string.NotAllPermissions + }) + } else { + await this.client.createDoc(setting.class.Integration, core.space.Workspace, { + type: calendar.integrationType.Calendar, + disabled: true, + error: calendar.string.NotAllPermissions, + value: res.email + }) + } + throw new Error('Not all scopes provided') + } else { + if (updated !== undefined) { + await this.client.update(updated, { + disabled: false, + error: null + }) + } else { + await this.client.createDoc(setting.class.Integration, core.space.Workspace, { + type: calendar.integrationType.Calendar, + disabled: false, + value: res.email + }) + } + } + } + ) + } + + private async createAccIntegrationIfNotExists (): Promise { + await this.ctx.with('Create account integration if not exists', { user: this.user.userId }, async () => { + const integration = await this.accountClient.getIntegration({ + socialId: this.user.userId, + kind: CALENDAR_INTEGRATION, + workspaceUuid: this.user.workspace + }) + if (integration != null) { + return + } + await this.accountClient.createIntegration({ + socialId: this.user.userId, + kind: CALENDAR_INTEGRATION, + workspaceUuid: this.user.workspace + }) + }) + } + + private async updateToken (token: Credentials, email: GoogleEmail): Promise { + const _token: Token = { + ...this.user, + email, + ...token + } + const data: IntegrationSecret = { + socialId: this.user.userId, + kind: CALENDAR_INTEGRATION, + workspaceUuid: this.user.workspace, + key: email, + secret: JSON.stringify(_token) + } + try { + const currentIntegration = this.accountClient.getIntegrationSecret({ + socialId: this.user.userId, + kind: CALENDAR_INTEGRATION, + workspaceUuid: this.user.workspace, + key: email + }) + if (currentIntegration != null) { + await this.accountClient.updateIntegrationSecret(data) + } else { + await this.accountClient.addIntegrationSecret(data) + } + await addUserByEmail(_token, email) + } catch (err) { + this.ctx.error('update token error', { workspace: this.user.workspace, user: this.user.userId, err }) + } + } + + static getAuthUrl (redirectURL: string, workspace: WorkspaceUuid, userId: PersonId, token: string): string { + const res = getGoogleClient() + const oAuth2Client = res.auth + const state: State = { + redirectURL, + userId, + workspace + } + const authUrl = oAuth2Client.generateAuthUrl({ + access_type: 'offline', + prompt: 'consent', + scope: SCOPES, + state: encode64(JSON.stringify(state)) + }) + return authUrl + } + + static async getUserId (account: AccountUuid, token: string): Promise { + const client = await getClient(token) + const person = await client.findOne(contact.class.Person, { personUuid: account }) + if (person === undefined) { + throw new Error('Person not found') + } + + const personId = await getPrimarySocialId(client, person._id) + + if (personId === undefined) { + throw new Error('PersonId not found') + } + + return personId + } +} diff --git a/services/calendar/pod-calendar/src/calendar.ts b/services/calendar/pod-calendar/src/calendar.ts index 6cdc0a832e..a5e659a171 100644 --- a/services/calendar/pod-calendar/src/calendar.ts +++ b/services/calendar/pod-calendar/src/calendar.ts @@ -13,753 +13,66 @@ // limitations under the License. // -import calendar, { - Calendar, - Event, - ExternalCalendar, - ReccuringEvent, - ReccuringInstance, - Visibility -} from '@hcengineering/calendar' -import { Contact } from '@hcengineering/contact' -import core, { - AttachedData, - Client, - Data, - Doc, - DocData, - DocumentUpdate, - Mixin, - Ref, - TxOperations, - parseSocialIdString -} from '@hcengineering/core' -import setting from '@hcengineering/setting' +import { AccountClient } from '@hcengineering/account-client' +import calendar, { Event, ExternalCalendar, ReccuringEvent, ReccuringInstance } from '@hcengineering/calendar' +import { Client, Doc, MeasureContext, Mixin, Ref, TxOperations } from '@hcengineering/core' import { htmlToMarkup, jsonToHTML, markupToJSON } from '@hcengineering/text' import { deepEqual } from 'fast-equals' +import { OAuth2Client } from 'google-auth-library' import { calendar_v3 } from 'googleapis' -import type { Collection, Db } from 'mongodb' -import { GoogleClient } from './googleClient' -import type { CalendarHistory, DummyWatch, EventHistory, Token, User } from './types' -import { encodeReccuring, isToken, parseRecurrenceStrings } from './utils' -import { WatchController } from './watch' +import { getRateLimitter, RateLimiter } from './rateLimiter' +import { IncomingSyncManager } from './sync' +import type { Token } from './types' +import { encodeReccuring, getGoogleClient } from './utils' import type { WorkspaceClient } from './workspaceClient' export class CalendarClient { private readonly calendar: calendar_v3.Calendar - private readonly calendarHistories: Collection - private readonly histories: Collection private readonly client: TxOperations - private readonly systemTxOp: TxOperations - private readonly activeSync: Record = {} - private readonly dummyWatches: DummyWatch[] = [] - // to do< find!!!! - private readonly googleClient - - private inactiveTimer: NodeJS.Timeout - - isClosed: boolean = false + private readonly oAuth2Client: OAuth2Client + readonly rateLimiter: RateLimiter private constructor ( - private readonly user: User, - private readonly mongo: Db, + private readonly ctx: MeasureContext, + private readonly accountClient: AccountClient, + private readonly user: Token, client: Client, - private readonly workspace: WorkspaceClient, - stayAlive: boolean = false + private readonly workspace: WorkspaceClient ) { this.client = new TxOperations(client, this.user.userId) - this.systemTxOp = new TxOperations(client, core.account.System) - this.googleClient = new GoogleClient(user, mongo, this) - this.calendar = this.googleClient.calendar - this.histories = mongo.collection('histories') - this.calendarHistories = mongo.collection('calendarHistories') - this.inactiveTimer = setTimeout(() => { - this.closeByTimer() - }, 60 * 1000) - } - - async cleanIntegration (): Promise { - const integration = await this.client.findOne(setting.class.Integration, { - createdBy: this.user.userId, - type: calendar.integrationType.Calendar, - value: this.getEmail() - }) - if (integration !== undefined) { - await this.client.update(integration, { disabled: true }) - } - this.workspace.removeClient(this.user.userId) - } - - private updateTimer (): void { - clearTimeout(this.inactiveTimer) - this.inactiveTimer = setTimeout(() => { - this.closeByTimer() - }, 60 * 1000) + this.rateLimiter = getRateLimitter(this.user.email) + const res = getGoogleClient() + this.calendar = res.google + this.oAuth2Client = res.auth } static async create ( - user: User | Token, - mongo: Db, + ctx: MeasureContext, + accountClient: AccountClient, + user: Token, client: Client, - workspace: WorkspaceClient, - stayAlive: boolean = false + workspace: WorkspaceClient ): Promise { - const calendarClient = new CalendarClient(user, mongo, client, workspace) - if (isToken(user)) { - await calendarClient.googleClient.init(user) - calendarClient.updateTimer() - } + const calendarClient = new CalendarClient(ctx, accountClient, user, client, workspace) + calendarClient.oAuth2Client.setCredentials(user) return calendarClient } - async authorize (code: string): Promise { - this.updateTimer() - const me = await this.googleClient.authorize(code) - if (me === undefined) { - const integrations = await this.client.findAll(setting.class.Integration, { - createdBy: this.user.userId, - type: calendar.integrationType.Calendar - }) - for (const integration of integrations.filter((p) => p.value === '')) { - await this.client.remove(integration) - } - - const updated = integrations.find((p) => p.disabled && p.value === me) - if (updated !== undefined) { - await this.client.update(updated, { - disabled: true, - error: calendar.string.NotAllPermissions - }) - } else { - const value = await this.googleClient.getMe() - await this.client.createDoc(setting.class.Integration, core.space.Workspace, { - type: calendar.integrationType.Calendar, - disabled: true, - error: calendar.string.NotAllPermissions, - value - }) - } - throw new Error('Not all scopes provided') - } - this.updateTimer() - - const integrations = await this.client.findAll(setting.class.Integration, { - createdBy: this.user.userId, - type: calendar.integrationType.Calendar - }) - - const updated = integrations.find((p) => p.disabled && p.value === this.user.userId) - - for (const integration of integrations.filter((p) => p.value === '')) { - await this.client.remove(integration) - } - if (updated !== undefined) { - await this.client.update(updated, { - disabled: false, - error: null - }) - } else { - await this.client.createDoc(setting.class.Integration, core.space.Workspace, { - type: calendar.integrationType.Calendar, - disabled: false, - value: this.user.userId - }) - } - - void this.syncOurEvents().then(async () => { - await this.startSync() - }) - - return this.user.userId - } - - async signout (): Promise { - this.updateTimer() - try { - this.close() - if (isToken(this.user)) { - const watch = WatchController.get(this.mongo) - await watch.unsubscribe(this.user) - } - await this.googleClient.signout() - } catch {} - - const integration = await this.client.findOne(setting.class.Integration, { - createdBy: this.user.userId, - type: calendar.integrationType.Calendar, - value: this.user.userId - }) - if (integration !== undefined) { - await this.client.remove(integration) - } - this.workspace.removeClient(this.user.userId) - } - async startSync (): Promise { try { - await this.syncCalendars() - const calendars = await this.workspace.getMyCalendars(this.user.userId) - for (const calendar of calendars) { - if (calendar.externalId !== undefined) { - await this.sync(calendar.externalId) - } - } + await IncomingSyncManager.sync( + this.ctx, + this.accountClient, + this.client, + this.user, + this.user.email, + this.calendar + ) } catch (err) { - console.error('Start sync error', this.user.workspace, this.user.userId, err) + this.ctx.error('Start sync error', { workspace: this.user.workspace, user: this.user.userId, err }) } } - async startSyncCalendar (calendar: ExternalCalendar): Promise { - await this.sync(calendar.externalId) - } - - private closeByTimer (): void { - this.close() - this.workspace.removeClient(this.user.userId) - } - - close (): void { - this.googleClient.close() - for (const watch of this.dummyWatches) { - clearTimeout(watch.timer) - } - this.isClosed = true - } - - // #region Calendars - - async syncCalendars (): Promise { - const history = await this.getCalendarHistory() - await this.calendarSync(history?.historyId) - await this.googleClient.watchCalendar() - } - - private getEmail (): string { - return parseSocialIdString(this.user.userId).value - } - - private async calendarSync (syncToken?: string, pageToken?: string): Promise { - try { - this.updateTimer() - await this.googleClient.rateLimiter.take(1) - const res = await this.googleClient.calendar.calendarList.list({ - syncToken, - pageToken - }) - if (res.status === 410) { - await this.calendarSync() - return - } - const nextPageToken = res.data.nextPageToken - for (const calendar of res.data.items ?? []) { - try { - await this.syncCalendar(calendar) - } catch (err) { - console.error('save calendar error', JSON.stringify(calendar), err) - } - } - if (nextPageToken != null) { - await this.calendarSync(syncToken, nextPageToken) - } - if (res.data.nextSyncToken != null) { - await this.setCalendarHistoryId(res.data.nextSyncToken) - } - } catch (err: any) { - if (err?.response?.status === 410) { - await this.calendarSync() - return - } - console.error('Calendar sync error', this.user.workspace, this.user.userId, err) - } - } - - private async syncCalendar (val: calendar_v3.Schema$CalendarListEntry): Promise { - if (val.id != null) { - const email = this.getEmail() - const exists = await this.client.findOne(calendar.class.ExternalCalendar, { - externalId: val.id, - externalUser: email - }) - if (exists === undefined) { - const data: Data = { - name: val.summary ?? '', - visibility: 'freeBusy', - hidden: false, - externalId: val.id, - externalUser: email, - default: false - } - if (val.primary === true) { - const primaryExists = await this.client.findOne( - calendar.class.ExternalCalendar, - { - createdBy: this.user.userId - }, - { projection: { _id: 1 } } - ) - if (primaryExists === undefined) { - data.default = true - } - } - await this.client.createDoc(calendar.class.ExternalCalendar, calendar.space.Calendar, data) - } else { - const update: DocumentUpdate = {} - if (exists.name !== val.summary) { - update.name = val.summary ?? exists.name - } - if (Object.keys(update).length > 0) { - await this.client.update(exists, update) - } - } - } - } - - private async getCalendarHistory (): Promise { - return await this.calendarHistories.findOne({ - userId: this.user.userId, - workspace: this.user.workspace - }) - } - - private async setCalendarHistoryId (historyId: string): Promise { - await this.calendarHistories.updateOne( - { - userId: this.user.userId, - workspace: this.user.workspace - }, - { - $set: { - historyId - } - }, - { upsert: true } - ) - } - - // #endregion - - // #region Events - - // #region Incoming - private async watch (calendarId: string): Promise { - if (!(await this.googleClient.watch(calendarId))) { - await this.dummyWatch(calendarId) - } - } - - private async dummyWatch (calendarId: string): Promise { - const timer = setTimeout( - () => { - void this.sync(calendarId) - }, - 6 * 60 * 60 * 1000 - ) - this.dummyWatches.push({ - calendarId, - timer - }) - } - - async sync (calendarId: string): Promise { - if (this.isClosed) return - if (this.activeSync[calendarId]) return - this.activeSync[calendarId] = true - await this.syncEvents(calendarId) - this.activeSync[calendarId] = false - await this.watch(calendarId) - } - - private async getEventHistory (calendarId: string): Promise { - return await this.histories.findOne({ - calendarId, - userId: this.user.userId, - workspace: this.user.workspace - }) - } - - private async setEventHistoryId (calendarId: string, historyId: string): Promise { - await this.histories.updateOne( - { - calendarId, - userId: this.user.userId, - workspace: this.user.workspace - }, - { - $set: { - historyId - } - }, - { upsert: true } - ) - } - - private async syncEvents (calendarId: string): Promise { - const history = await this.getEventHistory(calendarId) - await this.eventsSync(calendarId, history?.historyId) - } - - private async eventsSync (calendarId: string, syncToken?: string, pageToken?: string): Promise { - try { - await this.googleClient.rateLimiter.take(1) - const res = await this.calendar.events.list({ - calendarId, - syncToken, - pageToken, - showDeleted: syncToken != null - }) - if (res.status === 410) { - await this.eventsSync(calendarId) - return - } - const nextPageToken = res.data.nextPageToken - for (const event of res.data.items ?? []) { - try { - await this.syncEvent(calendarId, event, res.data.accessRole ?? 'reader') - } catch (err) { - console.error('save event error', JSON.stringify(event), err) - } - } - if (nextPageToken != null) { - await this.eventsSync(calendarId, syncToken, nextPageToken) - } - if (res.data.nextSyncToken != null) { - await this.setEventHistoryId(calendarId, res.data.nextSyncToken) - } - // if resync - } catch (err: any) { - if (err?.response?.status === 410) { - await this.eventsSync(calendarId) - return - } - await this.googleClient.checkError(err) - console.error('Event sync error', this.user.workspace, this.user.userId, err) - } - } - - private async syncEvent (calendarId: string, event: calendar_v3.Schema$Event, accessRole: string): Promise { - this.updateTimer() - if (event.id != null) { - const calendars = await this.workspace.getMyCalendars(this.user.userId) - const _calendar = - calendars.find((p) => p.externalId === event.organizer?.email) ?? - calendars.find((p) => p.externalId === calendarId) ?? - calendars[0] - if (_calendar !== undefined) { - const exists = (await this.client.findOne(calendar.class.Event, { - eventId: event.id, - calendar: _calendar._id - })) as Event | undefined - if (exists === undefined) { - await this.saveExtEvent(event, accessRole, _calendar) - } else { - await this.updateExtEvent(event, exists) - } - } - } - } - - private async updateExtEvent (event: calendar_v3.Schema$Event, current: Event): Promise { - this.updateTimer() - if (event.status === 'cancelled' && current._class !== calendar.class.ReccuringInstance) { - await this.systemTxOp.remove(current) - return - } - const data: Partial> = await this.parseUpdateData(event) - if (event.recurringEventId != null) { - const diff = this.getDiff( - { - ...data, - recurringEventId: event.recurringEventId as Ref, - originalStartTime: parseDate(event.originalStartTime), - isCancelled: event.status === 'cancelled' - }, - current as ReccuringInstance - ) - if (Object.keys(diff).length > 0) { - await this.systemTxOp.update(current, diff) - } - } else { - if (event.recurrence != null) { - const parseRule = parseRecurrenceStrings(event.recurrence) - const diff = this.getDiff( - { - ...data, - rules: parseRule.rules, - exdate: parseRule.exdate, - rdate: parseRule.rdate - }, - current as ReccuringEvent - ) - if (Object.keys(diff).length > 0) { - await this.systemTxOp.update(current, diff) - } - } else { - const diff = this.getDiff(data, current) - if (Object.keys(diff).length > 0) { - await this.systemTxOp.update(current, diff) - } - } - } - await this.updateMixins(event, current) - } - - private async updateMixins (event: calendar_v3.Schema$Event, current: Event): Promise { - const mixins = this.parseMixins(event) - if (mixins !== undefined) { - for (const mixin in mixins) { - const attr = mixins[mixin] - if (typeof attr === 'object' && Object.keys(attr).length > 0) { - if (this.client.getHierarchy().hasMixin(current, mixin as Ref>)) { - const diff = this.getDiff(attr, this.client.getHierarchy().as(current, mixin as Ref>)) - if (Object.keys(diff).length > 0) { - await this.systemTxOp.updateMixin( - current._id, - current._class, - calendar.space.Calendar, - mixin as Ref>, - diff - ) - } - } else { - await this.systemTxOp.createMixin( - current._id, - current._class, - calendar.space.Calendar, - mixin as Ref>, - attr - ) - } - } - } - } - } - - private parseMixins (event: calendar_v3.Schema$Event): Record | undefined { - if (event.extendedProperties?.shared?.mixins !== undefined) { - const mixins = JSON.parse(event.extendedProperties.shared.mixins) - return mixins - } - } - - private async saveMixins (event: calendar_v3.Schema$Event, _id: Ref): Promise { - const mixins = this.parseMixins(event) - if (mixins !== undefined) { - for (const mixin in mixins) { - const attr = mixins[mixin] - if (typeof attr === 'object' && Object.keys(attr).length > 0) { - await this.systemTxOp.createMixin( - _id, - calendar.class.Event, - calendar.space.Calendar, - mixin as Ref>, - attr - ) - } - } - } - } - - private async saveExtEvent ( - event: calendar_v3.Schema$Event, - accessRole: string, - _calendar: ExternalCalendar - ): Promise { - this.updateTimer() - const data: AttachedData = await this.parseData(event, accessRole, _calendar._id) - if (event.recurringEventId != null) { - const parseRule = parseRecurrenceStrings(event.recurrence ?? []) - const id = await this.systemTxOp.addCollection( - calendar.class.ReccuringInstance, - calendar.space.Calendar, - calendar.ids.NoAttached, - calendar.class.Event, - 'events', - { - ...data, - recurringEventId: event.recurringEventId, - originalStartTime: parseDate(event.originalStartTime), - isCancelled: event.status === 'cancelled', - rules: parseRule.rules, - exdate: parseRule.exdate, - rdate: parseRule.rdate, - timeZone: event.start?.timeZone ?? event.end?.timeZone ?? 'Etc/GMT' - } - ) - await this.saveMixins(event, id) - } else if (event.status !== 'cancelled') { - if (event.recurrence != null) { - const parseRule = parseRecurrenceStrings(event.recurrence) - const id = await this.systemTxOp.addCollection( - calendar.class.ReccuringEvent, - calendar.space.Calendar, - calendar.ids.NoAttached, - calendar.class.Event, - 'events', - { - ...data, - rules: parseRule.rules, - exdate: parseRule.exdate, - rdate: parseRule.rdate, - originalStartTime: data.date, - timeZone: event.start?.timeZone ?? event.end?.timeZone ?? 'Etc/GMT' - } - ) - await this.saveMixins(event, id) - } else { - const id = await this.systemTxOp.addCollection( - calendar.class.Event, - calendar.space.Calendar, - calendar.ids.NoAttached, - calendar.class.Event, - 'events', - data - ) - await this.saveMixins(event, id) - } - } - } - - private getDiff(data: Partial>, current: T): Partial> { - const res = {} - for (const key in data) { - if (!deepEqual((data as any)[key], (current as any)[key])) { - ;(res as any)[key] = (data as any)[key] - } - } - return res - } - - private async parseUpdateData (event: calendar_v3.Schema$Event): Promise>> { - const res: Partial> = {} - if (event.attendees !== undefined) { - const participants = await this.getParticipants(event) - res.participants = participants[0] - if (participants[1].length > 0) { - res.externalParticipants = participants[1] - } - } - if (event.location != null) { - res.location = event.location - } - if (event.description != null) { - res.description = htmlToMarkup(event.description) - } - if (event.summary != null) { - res.title = event.summary - } - if (event.start != null) { - res.date = parseDate(event.start) - } - if (event.end != null) { - res.dueDate = parseDate(event.end) - } - if (event.visibility != null && event.visibility !== 'default') { - res.visibility = - event.visibility === 'public' - ? 'public' - : (event.extendedProperties?.private?.visibility as Visibility) ?? 'private' - } - - return res - } - - private getAccess ( - event: calendar_v3.Schema$Event, - accessRole: string - ): 'freeBusyReader' | 'reader' | 'writer' | 'owner' { - if (accessRole !== 'owner') { - return accessRole as 'freeBusyReader' | 'reader' | 'writer' - } - if (event.creator?.self === true) { - return 'owner' - } else { - return 'reader' - } - } - - private async parseData ( - event: calendar_v3.Schema$Event, - accessRole: string, - _calendar: Ref - ): Promise> { - const participants = await this.getParticipants(event) - const res: AttachedData = { - date: parseDate(event.start), - dueDate: parseDate(event.end), - allDay: event.start?.date != null, - description: htmlToMarkup(event.description ?? ''), - title: event.summary ?? '', - location: event.location ?? undefined, - participants: participants[0], - eventId: event.id ?? '', - calendar: _calendar, - access: this.getAccess(event, accessRole), - timeZone: event.start?.timeZone ?? event.end?.timeZone ?? 'Etc/GMT', - user: this.user.userId - } - if (participants[1].length > 0) { - res.externalParticipants = participants[1] - } - if (event.visibility != null && event.visibility !== 'default') { - res.visibility = - event.visibility === 'public' - ? 'public' - : (event.extendedProperties?.private?.visibility as Visibility) ?? 'private' - } - return res - } - - private getParticipant (value: string): { - contact?: Ref - extra?: string - } { - const integration = this.workspace.integrations.byEmail.get(value) - if (integration != null) { - return { - contact: integration - } - } - const contact = this.workspace.contacts.byEmail.get(value) - if (contact !== undefined) { - return { - contact - } - } - return { - extra: value - } - } - - private async getParticipants (event: calendar_v3.Schema$Event): Promise<[Ref[], string[]]> { - const contacts = new Set>() - const extra = new Set() - if (event.creator?.email != null) { - const res = this.getParticipant(event.creator.email) - if (res.contact !== undefined) { - contacts.add(res.contact) - } - if (res.extra !== undefined) { - extra.add(res.extra) - } - } - for (const attendee of event.attendees ?? []) { - if (attendee.email != null) { - const res = this.getParticipant(attendee.email) - if (res.contact !== undefined) { - contacts.add(res.contact) - } - if (res.extra !== undefined) { - extra.add(res.extra) - } - } - } - return [Array.from(contacts), Array.from(extra)] - } - - // #endregion - - // #region Outcoming - private areDatesEqual (first: calendar_v3.Schema$EventDateTime, second: calendar_v3.Schema$EventDateTime): boolean { if (first.date != null && second.date != null) { return new Date(first.date).getTime() === new Date(second.date).getTime() @@ -771,14 +84,12 @@ export class CalendarClient { } private async createRecInstance (calendarId: string, event: ReccuringInstance): Promise { - this.updateTimer() - const me = await this.googleClient.getMe() - const body = this.convertBody(event, me) + const body = this.convertBody(event) const req: calendar_v3.Params$Resource$Events$Instances = { calendarId, eventId: event.recurringEventId } - await this.googleClient.rateLimiter.take(1) + await this.rateLimiter.take(1) const instancesResp = await this.calendar.events.instances(req) const items = instancesResp.data.items const target = items?.find( @@ -789,7 +100,7 @@ export class CalendarClient { ) if (target?.id != null) { body.id = target.id - await this.googleClient.rateLimiter.take(1) + await this.rateLimiter.take(1) await this.calendar.events.update({ calendarId, eventId: target.id, @@ -800,15 +111,14 @@ export class CalendarClient { } async createEvent (event: Event): Promise { - const me = await this.googleClient.getMe() try { - const _calendar = this.workspace.calendars.byId.get(event.calendar as Ref) + const _calendar = this.workspace.calendarsById.get(event.calendar as Ref) if (_calendar !== undefined) { if (event._class === calendar.class.ReccuringInstance) { await this.createRecInstance(_calendar.externalId, event as ReccuringInstance) } else { - const body = this.convertBody(event, me) - await this.googleClient.rateLimiter.take(1) + const body = this.convertBody(event) + await this.rateLimiter.take(1) await this.calendar.events.insert({ calendarId: _calendar.externalId, requestBody: body @@ -816,24 +126,21 @@ export class CalendarClient { } } } catch (err: any) { - await this.googleClient.checkError(err) - // eslint-disable-next-line throw new Error(`Create event error, ${this.user.workspace}, ${this.user.userId}, ${event._id}, ${err?.message}`) } } async updateEvent (event: Event): Promise { - const me = await this.googleClient.getMe() - const _calendar = this.workspace.calendars.byId.get(event.calendar as Ref) + const _calendar = this.workspace.calendarsById.get(event.calendar as Ref) const calendarId = _calendar?.externalId if (calendarId !== undefined) { try { - await this.googleClient.rateLimiter.take(1) + await this.rateLimiter.take(1) const current = await this.calendar.events.get({ calendarId, eventId: event.eventId }) if (current?.data !== undefined) { if (current.data.organizer?.self === true) { - const ev = this.applyUpdate(current.data, event, me) - await this.googleClient.rateLimiter.take(1) + const ev = this.applyUpdate(current.data, event) + await this.rateLimiter.take(1) await this.calendar.events.update({ calendarId, eventId: event.eventId, @@ -846,18 +153,16 @@ export class CalendarClient { await this.createEvent(event) } else { console.error('Update event error', this.user.workspace, this.user.userId, err) - await this.googleClient.checkError(err) } } } } async remove (eventId: string, calendarId: string): Promise { - this.updateTimer() const current = await this.calendar.events.get({ calendarId, eventId }) if (current?.data !== undefined) { if (current.data.organizer?.self === true) { - await this.googleClient.rateLimiter.take(1) + await this.rateLimiter.take(1) await this.calendar.events.delete({ eventId, calendarId @@ -868,7 +173,7 @@ export class CalendarClient { async removeEvent (event: Event): Promise { try { - const _calendar = this.workspace.calendars.byId.get(event.calendar as Ref) + const _calendar = this.workspace.calendarsById.get(event.calendar as Ref) if (_calendar !== undefined) { await this.remove(event.eventId, _calendar.externalId) } @@ -877,26 +182,11 @@ export class CalendarClient { } } - async syncOurEvents (): Promise { - this.updateTimer() - const events = await this.client.findAll(calendar.class.Event, { - access: 'owner', - createdBy: this.user.userId, - calendar: { $in: (await this.workspace.getMyCalendars(this.user.userId)).map((p) => p._id) } - }) - for (const event of events) { - await this.syncMyEvent(event) - } - await this.workspace.updateSyncTime() - } - async syncMyEvent (event: Event): Promise { if (event.access === 'owner' || event.access === 'writer') { try { - const space = this.workspace.calendars.byId.get(event.calendar as Ref) - const email = this.getEmail() - if (space !== undefined && space.externalUser === email) { - this.updateTimer() + const space = this.workspace.calendarsById.get(event.calendar as Ref) + if (space !== undefined && space.externalUser === this.user.email) { if (!(await this.update(event, space))) { await this.create(event, space) } @@ -908,12 +198,10 @@ export class CalendarClient { } private async create (event: Event, space: ExternalCalendar): Promise { - this.updateTimer() - const me = await this.googleClient.getMe() - const body = this.convertBody(event, me) + const body = this.convertBody(event) const calendarId = space?.externalId if (calendarId !== undefined) { - await this.googleClient.rateLimiter.take(1) + await this.rateLimiter.take(1) await this.calendar.events.insert({ calendarId, requestBody: body @@ -922,16 +210,14 @@ export class CalendarClient { } private async update (event: Event, space: ExternalCalendar): Promise { - this.updateTimer() - const me = await this.googleClient.getMe() const calendarId = space?.externalId if (calendarId !== undefined) { try { - await this.googleClient.rateLimiter.take(1) + await this.rateLimiter.take(1) const current = await this.calendar.events.get({ calendarId, eventId: event.eventId }) if (current !== undefined) { - const ev = this.applyUpdate(current.data, event, me) - await this.googleClient.rateLimiter.take(1) + const ev = this.applyUpdate(current.data, event) + await this.rateLimiter.take(1) await this.calendar.events.update({ calendarId, eventId: event.eventId, @@ -968,7 +254,7 @@ export class CalendarClient { return res } - private convertBody (event: Event, me: string): calendar_v3.Schema$Event { + private convertBody (event: Event): calendar_v3.Schema$Event { const res: calendar_v3.Schema$Event = { start: convertDate(event.date, event.allDay, getTimezone(event)), end: convertDate(event.dueDate, event.allDay, getTimezone(event)), @@ -1007,11 +293,10 @@ export class CalendarClient { }) } } - const attendees = this.getAttendees(event, me) + const attendees = this.getAttendees(event) if (attendees.length > 0) { - const email = this.getEmail() res.attendees = attendees.map((p) => { - if (p === email) { + if (p === this.user.email) { return { email: p, responseStatus: 'accepted', self: true } } return { email: p } @@ -1030,11 +315,7 @@ export class CalendarClient { return res } - private applyUpdate ( - event: calendar_v3.Schema$Event, - current: Event, - me: string - ): calendar_v3.Schema$Event | undefined { + private applyUpdate (event: calendar_v3.Schema$Event, current: Event): calendar_v3.Schema$Event | undefined { let res: boolean = false if (current.title !== event.summary) { event.summary = current.title @@ -1065,7 +346,7 @@ export class CalendarClient { res = true event.location = current.location } - const attendees = this.getAttendees(current, me) + const attendees = this.getAttendees(current) if (attendees.length > 0 && event.attendees !== undefined) { for (const attendee of attendees) { if (event.attendees.findIndex((p) => p.email === attendee) === -1) { @@ -1095,19 +376,12 @@ export class CalendarClient { return res ? event : undefined } - private getAttendees (event: Event, me: string): string[] { + private getAttendees (event: Event): string[] { const res = new Set() - const email = this.getEmail() for (const participant of event.participants) { - const integrations = this.workspace.integrations.byContact.get(participant) ?? [] - const integration = integrations.find((p) => p === email) ?? integrations[0] - if (integration !== undefined && integration !== '') { - res.add(integration) - } else { - const contact = this.workspace.contacts.byId.get(participant) - if (contact !== undefined) { - res.add(contact) - } + const contact = this.workspace.participants.get(participant) + if (contact !== undefined) { + res.add(contact) } } for (const ext of event.externalParticipants ?? []) { @@ -1115,20 +389,6 @@ export class CalendarClient { } return Array.from(res) } - - // #endregion - - // #endregion -} - -function parseDate (date: calendar_v3.Schema$EventDateTime | undefined): number { - if (date?.dateTime != null) { - return new Date(date.dateTime).getTime() - } - if (date?.date != null) { - return new Date(date.date).getTime() - } - return 0 } function convertDate (value: number, allDay: boolean, timeZone: string | undefined): calendar_v3.Schema$EventDateTime { diff --git a/services/calendar/pod-calendar/src/calendarController.ts b/services/calendar/pod-calendar/src/calendarController.ts index 2a42298821..d554edb994 100644 --- a/services/calendar/pod-calendar/src/calendarController.ts +++ b/services/calendar/pod-calendar/src/calendarController.ts @@ -13,206 +13,90 @@ // limitations under the License. // +import { AccountClient, Integration } from '@hcengineering/account-client' import { Event } from '@hcengineering/calendar' import { - PersonId, - PersonUuid, + MeasureContext, RateLimiter, + WorkspaceInfoWithStatus, WorkspaceUuid, isActiveMode, - isDeletingMode, - parseSocialIdString, - systemAccountUuid + isDeletingMode } from '@hcengineering/core' -import { generateToken } from '@hcengineering/server-token' -import { Collection, type Db } from 'mongodb' -import { type CalendarClient } from './calendar' import config from './config' -import { type Token, type User } from './types' +import { getIntegrations } from './integrations' import { WorkspaceClient } from './workspaceClient' -import { getAccountClient } from '@hcengineering/server-client' export class CalendarController { - private readonly workspaces: Map> = new Map< - WorkspaceUuid, - WorkspaceClient | Promise - >() - - private readonly tokens: Collection - protected static _instance: CalendarController - private constructor (private readonly mongo: Db) { - this.tokens = mongo.collection('tokens') + private constructor ( + private readonly ctx: MeasureContext, + readonly accountClient: AccountClient + ) { CalendarController._instance = this - setInterval(() => { - if (this.workspaces.size > 0) { - console.log('active workspaces', this.workspaces.size) - } - }, 60000) } - static getCalendarController (mongo?: Db): CalendarController { + static getCalendarController (ctx: MeasureContext, accountClient: AccountClient): CalendarController { if (CalendarController._instance !== undefined) { return CalendarController._instance } - if (mongo === undefined) throw new Error('CalendarController not exist') - return new CalendarController(mongo) + return new CalendarController(ctx, accountClient) } async startAll (): Promise { - const tokens = await this.tokens.find().toArray() - const groups = new Map() - console.log('start calendar service', tokens.length) - for (const token of tokens) { - const group = groups.get(token.workspace) - if (group === undefined) { - groups.set(token.workspace, [token]) - } else { - group.push(token) - groups.set(token.workspace, group) - } - } + try { + const integrations = await getIntegrations(this.accountClient) + this.ctx.info('Start integrations', { count: integrations.length }) - const limiter = new RateLimiter(config.InitLimit) - const token = generateToken(systemAccountUuid) - const ids = [...groups.keys()] - console.log('start workspaces', ids) - const infos = await getAccountClient(token).getWorkspacesInfo(ids) - console.log('infos', infos) - for (const info of infos) { - const tokens = groups.get(info.uuid) - if (tokens === undefined) { - console.log('no tokens for workspace', info.uuid) - continue - } - if (isDeletingMode(info.mode)) { - if (tokens !== undefined) { - for (const token of tokens) { - await this.tokens.deleteOne({ userId: token.userId, workspace: token.workspace }) - } + const groups = new Map() + for (const int of integrations) { + if (int.workspaceUuid === null) continue + const group = groups.get(int.workspaceUuid) + if (group === undefined) { + groups.set(int.workspaceUuid, [int]) + } else { + group.push(int) + groups.set(int.workspaceUuid, group) } - continue } - if (!isActiveMode(info.mode)) { - continue + + const ids = [...groups.keys()] + if (ids.length === 0) return + const limiter = new RateLimiter(config.InitLimit) + const infos = await this.accountClient.getWorkspacesInfo(ids) + for (const info of infos) { + const integrations = groups.get(info.uuid) ?? [] + if (await this.checkWorkspace(info, integrations)) { + await limiter.add(async () => { + this.ctx.info('start workspace', { workspace: info.uuid }) + await WorkspaceClient.run(this.ctx, this.accountClient, info.uuid) + }) + } } - await limiter.add(async () => { - console.log('start workspace', info.uuid) - const workspace = await this.startWorkspace(info.uuid, tokens) - await workspace.sync() - }) + await limiter.waitProcessing() + } catch (err: any) { + this.ctx.error('Failed to start existing integrations', err) } } - async startWorkspace (workspace: WorkspaceUuid, tokens: Token[]): Promise { - const workspaceClient = await this.getWorkspaceClient(workspace) - for (const token of tokens) { - try { - const timeout = setTimeout(() => { - console.warn('init client hang', token.workspace, token.userId) - }, 60000) - console.log('init client', token.workspace, token.userId) - await workspaceClient.createCalendarClient(token) - clearTimeout(timeout) - } catch (err) { - console.error(`Couldn't create client for ${workspace} ${token.userId}`) + private async checkWorkspace (info: WorkspaceInfoWithStatus, integrations: Integration[]): Promise { + if (isDeletingMode(info.mode)) { + if (integrations !== undefined) { + for (const int of integrations) { + await this.accountClient.deleteIntegration(int) + } } + return false } - return workspaceClient - } - - async push (personId: PersonId, mode: 'events' | 'calendar', calendarId?: string): Promise { - const email = parseSocialIdString(personId).value - const tokens = await this.tokens.find({ email, access_token: { $exists: true } }).toArray() - const token = generateToken(systemAccountUuid) - const workspaces = [...new Set(tokens.map((p) => p.workspace))] - const infos = await getAccountClient(token).getWorkspacesInfo(workspaces) - for (const token of tokens) { - const info = infos.find((p) => p.uuid === token.workspace) - if (info === undefined) { - continue - } - if (isDeletingMode(info.mode)) { - await this.tokens.deleteOne({ userId: token.userId, workspace: token.workspace }) - continue - } - if (!isActiveMode(info.mode)) { - continue - } - const workspace = await this.getWorkspaceClient(token.workspace) - const calendarClient = await workspace.createCalendarClient(token) - if (mode === 'calendar') { - await calendarClient.syncCalendars() - } - if (mode === 'events' && calendarId !== undefined) { - await calendarClient.sync(calendarId) - } + if (!isActiveMode(info.mode)) { + this.ctx.info('workspace is not active', { workspaceUuid: info.uuid }) + return false } + return true } async pushEvent (workspace: WorkspaceUuid, event: Event, type: 'create' | 'update' | 'delete'): Promise { - const workspaceController = await this.getWorkspaceClient(workspace) - await workspaceController.pushEvent(event, type) - } - - async getUserId (account: PersonUuid, workspace: WorkspaceUuid): Promise { - const workspaceClient = await this.getWorkspaceClient(workspace) - - return await workspaceClient.getUserId(account) - } - - async signout (workspace: WorkspaceUuid, value: PersonId): Promise { - const workspaceClient = await this.getWorkspaceClient(workspace) - const clients = await workspaceClient.signout(value) - if (clients === 0) { - this.removeWorkspace(workspace) - } - } - - removeWorkspace (workspace: WorkspaceUuid): void { - this.workspaces.delete(workspace) - } - - async close (): Promise { - for (let workspace of this.workspaces.values()) { - if (workspace instanceof Promise) { - workspace = await workspace - } - await workspace.close() - } - this.workspaces.clear() - } - - async createClient (user: Token): Promise { - const workspace = await this.getWorkspaceClient(user.workspace) - const newClient = await workspace.createCalendarClient(user) - return newClient - } - - async newClient (user: User, code: string): Promise { - const workspace = await this.getWorkspaceClient(user.workspace) - const newClient = await workspace.newCalendarClient(user, code) - return newClient - } - - private async getWorkspaceClient (workspace: WorkspaceUuid): Promise { - const res = this.workspaces.get(workspace) - if (res !== undefined) { - if (res instanceof Promise) { - return await res - } - return res - } - try { - const client = WorkspaceClient.create(this.mongo, workspace, this) - this.workspaces.set(workspace, client) - const res = await client - this.workspaces.set(workspace, res) - return res - } catch (err) { - console.error(`Couldn't create workspace worker for ${workspace}, reason: ${JSON.stringify(err)}`) - throw err - } + await WorkspaceClient.push(this.ctx, this.accountClient, workspace, event, type) } } diff --git a/services/calendar/pod-calendar/src/config.ts b/services/calendar/pod-calendar/src/config.ts index dc17770a9d..6335023a7f 100644 --- a/services/calendar/pod-calendar/src/config.ts +++ b/services/calendar/pod-calendar/src/config.ts @@ -21,6 +21,7 @@ interface Config { AccountsURL: string ServiceID: string Secret: string + KvsUrl: string Credentials: string WATCH_URL: string InitLimit: number @@ -37,7 +38,8 @@ const envMap: { [key in keyof Config]: string } = { Secret: 'SECRET', Credentials: 'Credentials', WATCH_URL: 'WATCH_URL', - InitLimit: 'INIT_LIMIT' + InitLimit: 'INIT_LIMIT', + KvsUrl: 'KVS_URL' } const parseNumber = (str: string | undefined): number | undefined => (str !== undefined ? Number(str) : undefined) @@ -52,7 +54,8 @@ const config: Config = (() => { Secret: process.env[envMap.Secret], Credentials: process.env[envMap.Credentials], InitLimit: parseNumber(process.env[envMap.InitLimit]) ?? 50, - WATCH_URL: process.env[envMap.WATCH_URL] + WATCH_URL: process.env[envMap.WATCH_URL], + KvsUrl: process.env[envMap.KvsUrl] } const missingEnv = (Object.keys(params) as Array) diff --git a/services/calendar/pod-calendar/src/googleClient.ts b/services/calendar/pod-calendar/src/googleClient.ts deleted file mode 100644 index a8b2c514f9..0000000000 --- a/services/calendar/pod-calendar/src/googleClient.ts +++ /dev/null @@ -1,309 +0,0 @@ -// -// Copyright © 2025 Hardcore Engineering Inc. -// -// Licensed under the Eclipse Public License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. You may -// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// -// See the License for the specific language governing permissions and -// limitations under the License. - -import { generateId, PersonId } from '@hcengineering/core' -import type { Credentials, OAuth2Client } from 'google-auth-library' -import { calendar_v3, google } from 'googleapis' -import { Collection, Db } from 'mongodb' -import { encode64 } from './base64' -import { CalendarClient } from './calendar' -import config from './config' -import { RateLimiter } from './rateLimiter' -import { ProjectCredentials, State, Token, User, Watch, WatchBase } from './types' - -export const DUMMY_RESOURCE = 'Dummy' - -const SCOPES = [ - 'https://www.googleapis.com/auth/calendar.calendars.readonly', - 'https://www.googleapis.com/auth/calendar.calendarlist.readonly', - 'https://www.googleapis.com/auth/calendar.events', - 'https://www.googleapis.com/auth/userinfo.email' -] - -export class GoogleClient { - private me: string | undefined = undefined - private readonly credentials: ProjectCredentials - private readonly oAuth2Client: OAuth2Client - readonly calendar: calendar_v3.Calendar - private readonly tokens: Collection - private readonly watches: Collection - - private refreshTimer: NodeJS.Timeout | undefined = undefined - - readonly rateLimiter = new RateLimiter(1000, 500) - - constructor ( - private readonly user: User, - mongo: Db, - private readonly calendarClient: CalendarClient - ) { - this.tokens = mongo.collection('tokens') - this.credentials = JSON.parse(config.Credentials) - const { client_secret, client_id, redirect_uris } = this.credentials.web // eslint-disable-line - this.oAuth2Client = new google.auth.OAuth2(client_id, client_secret, redirect_uris[0]) // eslint-disable-line - this.calendar = google.calendar({ version: 'v3', auth: this.oAuth2Client }) - this.watches = mongo.collection('watch') - } - - static getAuthUrl (redirectURL: string, workspace: string, userId: PersonId, token: string): string { - const credentials = JSON.parse(config.Credentials) - const { client_secret, client_id, redirect_uris } = credentials.web // eslint-disable-line - const oAuth2Client = new google.auth.OAuth2(client_id, client_secret, redirect_uris[0]) // eslint-disable-line - const state: State = { - token, - redirectURL, - workspace: workspace as any, // TODO: FIXME - userId - } - const authUrl = oAuth2Client.generateAuthUrl({ - access_type: 'offline', - scope: SCOPES, - state: encode64(JSON.stringify(state)) - }) - return authUrl - } - - async signout (): Promise { - // get watch controller and unsubscibe - await this.oAuth2Client.revokeCredentials() - await this.tokens.deleteOne({ - userId: this.user.userId, - workspace: this.user.workspace - }) - } - - async init (token: Token): Promise { - await this.setToken(token) - await this.refreshToken() - } - - async authorize (code: string): Promise { - const token = await this.oAuth2Client.getToken(code) - await this.setToken(token.tokens) - const me = await this.getMe() - const providedScopes = token.tokens.scope?.split(' ') ?? [] - for (const scope of SCOPES) { - if (providedScopes.findIndex((p) => p === scope) === -1) { - console.error(`Not all scopes provided, provided: ${providedScopes.join(', ')} required: ${SCOPES.join(', ')}`) - return undefined - } - } - await this.refreshToken() - - return me - } - - close (): void { - if (this.refreshTimer !== undefined) clearTimeout(this.refreshTimer) - } - - async getMe (): Promise { - if (this.me !== undefined) { - return this.me - } - - const info = await google.oauth2({ version: 'v2', auth: this.oAuth2Client }).userinfo.get() - this.me = info.data.email ?? '' - return this.me - } - - private async setToken (token: Credentials): Promise { - try { - this.oAuth2Client.setCredentials(token) - } catch (err: any) { - console.error('Set token error', this.user.workspace, this.user.userId, err) - await this.checkError(err) - throw err - } - } - - async checkError (err: any): Promise { - if (err?.response?.data?.error === 'invalid_grant') { - await this.calendarClient.cleanIntegration() - return true - } - return false - } - - private async updateToken (token: Credentials): Promise { - try { - const currentToken = await this.getCurrentToken() - if (currentToken != null) { - await this.updateCurrentToken(token) - } else { - await this.tokens.insertOne({ - userId: this.user.userId, - workspace: this.user.workspace, - token: this.user.token, - ...token - }) - } - } catch (err) { - console.error('update token error', this.user.workspace, this.user.userId, err) - } - } - - private async refreshToken (): Promise { - try { - const res = await this.oAuth2Client.refreshAccessToken() - await this.updateToken(res.credentials) - this.refreshTimer = setTimeout( - () => { - void this.refreshToken() - }, - 30 * 60 * 1000 - ) - } catch (err: any) { - console.error("Couldn't refresh token, error:", err) - if (err?.response?.data?.error === 'invalid_grant' || err.message === 'No refresh token is set.') { - await this.calendarClient.cleanIntegration() - } else { - this.refreshTimer = setTimeout( - () => { - void this.refreshToken() - }, - 15 * 60 * 1000 - ) - } - throw err - } - } - - private async getCurrentToken (): Promise { - return await this.tokens.findOne({ - userId: this.user.userId, - workspace: this.user.workspace - }) - } - - private async updateCurrentToken (token: Credentials): Promise { - await this.tokens.updateOne( - { - userId: this.user.userId, - workspace: this.user.workspace - }, - { - $set: { - ...token - } - } - ) - } - - async watchCalendar (): Promise { - try { - const current = await this.watches.findOne({ - userId: this.user.userId, - workspace: this.user.workspace, - calendarId: null - }) - if (current != null) { - await this.rateLimiter.take(1) - await this.calendar.channels.stop({ requestBody: { id: current.channelId, resourceId: current.resourceId } }) - } - const channelId = generateId() - const me = await this.getMe() - const body = { id: channelId, address: config.WATCH_URL, type: 'webhook', token: `user=${me}&mode=calendar` } - await this.rateLimiter.take(1) - const res = await this.calendar.calendarList.watch({ requestBody: body }) - if (res.data.expiration != null && res.data.resourceId !== null) { - if (current != null) { - await this.watches.updateOne( - { - userId: this.user.userId, - workspace: this.user.workspace, - calendarId: null - }, - { - channelId, - expired: Number.parseInt(res.data.expiration), - resourceId: res.data.resourceId ?? '' - } - ) - } else { - await this.watches.insertOne({ - calendarId: null, - channelId, - expired: Number.parseInt(res.data.expiration), - resourceId: res.data.resourceId ?? '', - userId: this.user.userId, - workspace: this.user.workspace - }) - } - } - } catch (err) { - console.error('Calendar watch error', err) - } - } - - async watch (calendarId: string): Promise { - try { - const current = await this.watches.findOne({ - userId: this.user.userId, - workspace: this.user.workspace, - calendarId - }) - if (current != null) { - await this.rateLimiter.take(1) - await this.calendar.channels.stop({ - requestBody: { id: current.channelId, resourceId: current.resourceId } - }) - } - const channelId = generateId() - const me = await this.getMe() - const body = { - id: channelId, - address: config.WATCH_URL, - type: 'webhook', - token: `user=${me}&mode=events&calendarId=${calendarId}` - } - await this.rateLimiter.take(1) - const res = await this.calendar.events.watch({ calendarId, requestBody: body }) - if (res.data.expiration != null && res.data.resourceId != null) { - if (current != null) { - await this.watches.updateOne( - { - userId: this.user.userId, - workspace: this.user.workspace, - calendarId - }, - { - channelId, - expired: Number.parseInt(res.data.expiration), - resourceId: res.data.resourceId ?? '' - } - ) - } else { - await this.watches.insertOne({ - calendarId, - channelId, - expired: Number.parseInt(res.data.expiration), - resourceId: res.data.resourceId ?? '', - userId: this.user.userId, - workspace: this.user.workspace - }) - } - } - return true - } catch (err: any) { - if (err?.errors?.[0]?.reason === 'pushNotSupportedForRequestedResource') { - return false - } else { - console.error('Watch error', err) - await this.checkError(err) - return false - } - } - } -} diff --git a/services/calendar/pod-calendar/src/storage.ts b/services/calendar/pod-calendar/src/integrations.ts similarity index 50% rename from services/calendar/pod-calendar/src/storage.ts rename to services/calendar/pod-calendar/src/integrations.ts index 187f3ddaf6..8512277959 100644 --- a/services/calendar/pod-calendar/src/storage.ts +++ b/services/calendar/pod-calendar/src/integrations.ts @@ -1,5 +1,5 @@ // -// Copyright © 2023 Hardcore Engineering Inc. +// Copyright © 2025 Hardcore Engineering Inc. // // Licensed under the Eclipse Public License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. You may @@ -13,23 +13,9 @@ // limitations under the License. // -import { MongoClientReference, getMongoClient } from '@hcengineering/mongo' -import { MongoClient } from 'mongodb' +import { AccountClient, Integration } from '@hcengineering/account-client' +import { CALENDAR_INTEGRATION } from './types' -import config from './config' - -const clientRef: MongoClientReference = getMongoClient(config.MongoURI) -let client: MongoClient | undefined -export const getDB = (() => { - return async () => { - if (client === undefined) { - client = await clientRef.getClient() - } - - return client.db(config.MongoDB) - } -})() - -export const closeDB: () => Promise = async () => { - clientRef.close() +export async function getIntegrations (client: AccountClient): Promise { + return (await client.listIntegrations({ kind: CALENDAR_INTEGRATION })) ?? [] } diff --git a/services/calendar/pod-calendar/src/kvsUtils.ts b/services/calendar/pod-calendar/src/kvsUtils.ts new file mode 100644 index 0000000000..1944118dad --- /dev/null +++ b/services/calendar/pod-calendar/src/kvsUtils.ts @@ -0,0 +1,79 @@ +import { WorkspaceUuid } from '@hcengineering/core' +import { KeyValueClient, getClient as getKeyValueClient } from '@hcengineering/kvs-client' +import config from './config' +import { CALENDAR_INTEGRATION, GoogleEmail, Token, User } from './types' +import { getServiceToken } from './utils' + +let keyValueClient: KeyValueClient | undefined + +export function getKvsClient (): KeyValueClient { + if (keyValueClient !== undefined) return keyValueClient + keyValueClient = getKeyValueClient(CALENDAR_INTEGRATION, config.KvsUrl, getServiceToken()) + return keyValueClient +} + +export async function getSyncHistory (workspace: WorkspaceUuid): Promise { + const client = getKvsClient() + const key = `${CALENDAR_INTEGRATION}:calendarSync:${workspace}` + return await client.getValue(key) +} + +export async function setSyncHistory (workspace: WorkspaceUuid): Promise { + const client = getKvsClient() + const key = `${CALENDAR_INTEGRATION}:calendarSync:${workspace}` + await client.setValue(key, Date.now()) +} + +function calendarsHistoryKey (user: User): string { + return `${CALENDAR_INTEGRATION}:calendarsHistory:${user.workspace}:${user.userId}` +} + +export async function getCalendarsSyncHistory (user: User): Promise { + const client = getKvsClient() + return (await client.getValue(calendarsHistoryKey(user))) ?? undefined +} + +export async function setCalendarsSyncHistory (user: User, historyId: string): Promise { + const client = getKvsClient() + await client.setValue(calendarsHistoryKey(user), historyId) +} + +function eventHistoryKey (user: User, calendarId: string): string { + return `${CALENDAR_INTEGRATION}:eventHistory:${user.workspace}:${user.userId}:${calendarId}` +} + +export async function getEventHistory (user: User, calendarId: string): Promise { + const client = getKvsClient() + return (await client.getValue(eventHistoryKey(user, calendarId))) ?? undefined +} + +export async function setEventHistory (user: User, calendarId: string, historyId: string): Promise { + const client = getKvsClient() + await client.setValue(eventHistoryKey(user, calendarId), historyId) +} + +export async function getUserByEmail (email: GoogleEmail): Promise { + const client = getKvsClient() + const key = `${CALENDAR_INTEGRATION}:users:${email}` + return (await client.getValue(key)) ?? [] +} + +export async function addUserByEmail (user: Token, email: GoogleEmail): Promise { + const client = getKvsClient() + const key = `${CALENDAR_INTEGRATION}:users:${email}` + const curr = (await client.getValue(key)) ?? [] + curr.push(user) + await client.setValue(key, curr) +} + +export async function removeUserByEmail (user: User, email: GoogleEmail): Promise { + const client = getKvsClient() + const key = `${CALENDAR_INTEGRATION}:users:${email}` + const curr = (await client.getValue(key)) ?? [] + const newCurr = curr.filter((p) => p.userId !== user.userId || p.workspace !== user.workspace) + if (newCurr.length === 0) { + await client.deleteKey(key) + } else { + await client.setValue(key, newCurr) + } +} diff --git a/services/calendar/pod-calendar/src/main.ts b/services/calendar/pod-calendar/src/main.ts index efc3d5afab..eabc5e61cb 100644 --- a/services/calendar/pod-calendar/src/main.ts +++ b/services/calendar/pod-calendar/src/main.ts @@ -13,18 +13,23 @@ // limitations under the License. // +import { SplitLogger } from '@hcengineering/analytics-service' +import { MeasureMetricsContext, newMetrics } from '@hcengineering/core' +import { setMetadata } from '@hcengineering/platform' +import serverClient, { getAccountClient } from '@hcengineering/server-client' +import { initStatisticsContext } from '@hcengineering/server-core' +import serverToken, { decodeToken } from '@hcengineering/server-token' import { type IncomingHttpHeaders } from 'http' +import { join } from 'path' +import { AuthController } from './auth' import { decode64 } from './base64' import { CalendarController } from './calendarController' import config from './config' import { createServer, listen } from './server' -import { closeDB, getDB } from './storage' -import { type Endpoint, type State } from './types' -import { setMetadata } from '@hcengineering/platform' -import serverClient from '@hcengineering/server-client' -import serverToken, { decodeToken } from '@hcengineering/server-token' -import { GoogleClient } from './googleClient' +import { CALENDAR_INTEGRATION, GoogleEmail, type Endpoint, type State } from './types' +import { getServiceToken } from './utils' import { WatchController } from './watch' +import { PushHandler } from './pushHandler' const extractToken = (header: IncomingHttpHeaders): any => { try { @@ -35,14 +40,30 @@ const extractToken = (header: IncomingHttpHeaders): any => { } export const main = async (): Promise => { + const ctx = initStatisticsContext(CALENDAR_INTEGRATION, { + factory: () => + new MeasureMetricsContext( + 'calendar', + {}, + {}, + newMetrics(), + new SplitLogger(CALENDAR_INTEGRATION, { + root: join(process.cwd(), 'logs'), + enableConsole: (process.env.ENABLE_CONSOLE ?? 'true') === 'true' + }) + ) + }) + + const accountClient = getAccountClient(getServiceToken()) + setMetadata(serverClient.metadata.Endpoint, config.AccountsURL) setMetadata(serverClient.metadata.UserAgent, config.ServiceID) setMetadata(serverToken.metadata.Secret, config.Secret) - const db = await getDB() - const calendarController = CalendarController.getCalendarController(db) + const pushHandler = new PushHandler(ctx, accountClient) + const calendarController = CalendarController.getCalendarController(ctx, accountClient) await calendarController.startAll() - const watchController = WatchController.get(db) + const watchController = WatchController.get(accountClient) watchController.startCheck() const endpoints: Endpoint[] = [ { @@ -59,11 +80,11 @@ export const main = async (): Promise => { const redirectURL = req.query.redirectURL as string const { account, workspace } = decodeToken(token) - const userId = await calendarController.getUserId(account, workspace) - const url = GoogleClient.getAuthUrl(redirectURL, workspace, userId, token) + const userId = await AuthController.getUserId(account, token) + const url = AuthController.getAuthUrl(redirectURL, workspace, userId, token) res.send(url) } catch (err) { - console.error('signin error', err) + ctx.error('signin error', { message: (err as any).message }) res.status(500).send() } } @@ -73,13 +94,16 @@ export const main = async (): Promise => { type: 'get', handler: async (req, res) => { const code = req.query.code as string - const state = JSON.parse(decode64(req.query.state as string)) as unknown as State try { - await calendarController.newClient(state, code) - res.redirect(state.redirectURL) + const state = JSON.parse(decode64(req.query.state as string)) as unknown as State + try { + await AuthController.createAndSync(ctx, accountClient, state, code) + res.redirect(state.redirectURL) + } catch (err) { + ctx.error('signin code error', { message: (err as any).message }) + } } catch (err) { - console.error(err) - res.redirect(state.redirectURL) + ctx.error('signin code state parse error', { message: (err as any).message }) } } }, @@ -95,14 +119,13 @@ export const main = async (): Promise => { return } - const value = req.query.value as string - - const { workspace } = decodeToken(token) - await calendarController.signout(workspace, value as any) // TODO: FIXME + const value = req.query.value as GoogleEmail + const { account, workspace } = decodeToken(token) + const userId = await AuthController.getUserId(account, token) + await AuthController.signout(ctx, accountClient, userId, workspace, value) } catch (err) { - console.error('signout error', err) + ctx.error('signout', { message: (err as any).message }) } - res.send() } }, @@ -125,7 +148,7 @@ export const main = async (): Promise => { res.status(400).send({ err: "'data' is missing" }) return } - void calendarController.push(data.user as any, data.mode as 'events' | 'calendar', data.calendarId) // TODO: FIXME + await pushHandler.push(data.user as GoogleEmail, data.mode as 'events' | 'calendar', data.calendarId) } res.send() @@ -152,12 +175,7 @@ export const main = async (): Promise => { const shutdown = (): void => { server.close(() => { watchController.stop() - void calendarController - .close() - .then(async () => { - await closeDB() - }) - .then(() => process.exit()) + process.exit() }) } diff --git a/services/calendar/pod-calendar/src/pushHandler.ts b/services/calendar/pod-calendar/src/pushHandler.ts new file mode 100644 index 0000000000..177e1d34fd --- /dev/null +++ b/services/calendar/pod-calendar/src/pushHandler.ts @@ -0,0 +1,56 @@ +// +// Copyright © 2025 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { AccountClient } from '@hcengineering/account-client' +import core, { isActiveMode, MeasureContext, TxOperations } from '@hcengineering/core' +import { getClient } from './client' +import { getUserByEmail } from './kvsUtils' +import { IncomingSyncManager } from './sync' +import { GoogleEmail, Token } from './types' +import { getGoogleClient, getServiceToken } from './utils' + +export class PushHandler { + constructor ( + private readonly ctx: MeasureContext, + private readonly accountClient: AccountClient + ) {} + + async sync (token: Token, calendarId: string | null): Promise { + await this.ctx.with('Push handler', { workspace: token.workspace, user: token.userId }, async () => { + const client = await getClient(getServiceToken()) + const txOp = new TxOperations(client, core.account.System) + const res = getGoogleClient() + res.auth.setCredentials(token) + await IncomingSyncManager.push(this.ctx, this.accountClient, txOp, token, res.google, calendarId) + await txOp.close() + }) + } + + async push (email: GoogleEmail, mode: 'events' | 'calendar', calendarId?: string): Promise { + const tokens = await getUserByEmail(email) + const workspaces = [...new Set(tokens.map((p) => p.workspace))] + const infos = await this.accountClient.getWorkspacesInfo(workspaces) + for (const token of tokens) { + const info = infos.find((p) => p.uuid === token.workspace) + if (info === undefined) { + continue + } + if (!isActiveMode(info.mode)) { + continue + } + await this.sync(token, mode === 'events' ? calendarId ?? null : null) + } + } +} diff --git a/services/calendar/pod-calendar/src/rateLimiter.ts b/services/calendar/pod-calendar/src/rateLimiter.ts index ea4a986ec4..0764879790 100644 --- a/services/calendar/pod-calendar/src/rateLimiter.ts +++ b/services/calendar/pod-calendar/src/rateLimiter.ts @@ -1,3 +1,5 @@ +import { GoogleEmail } from './types' + export class RateLimiter { private tokens: number private lastRefillTime: number @@ -32,3 +34,14 @@ export class RateLimiter { this.tokens -= count } } + +const current = new Map() + +export function getRateLimitter (email: GoogleEmail): RateLimiter { + let limiter = current.get(email) + if (limiter === undefined) { + limiter = new RateLimiter(1000, 500) + current.set(email, limiter) + } + return limiter +} diff --git a/services/calendar/pod-calendar/src/sync.ts b/services/calendar/pod-calendar/src/sync.ts new file mode 100644 index 0000000000..c3e7b2e76f --- /dev/null +++ b/services/calendar/pod-calendar/src/sync.ts @@ -0,0 +1,660 @@ +// +// Copyright © 2025 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { + AttachedData, + Data, + Doc, + DocData, + DocumentUpdate, + generateId, + MeasureContext, + Mixin, + Ref, + SocialIdType, + TxOperations, + TxProcessor +} from '@hcengineering/core' +import { getCalendarsSyncHistory, getEventHistory, setCalendarsSyncHistory, setEventHistory } from './kvsUtils' +import { GoogleEmail, Token, User } from './types' +import { calendar_v3 } from 'googleapis' +import { getRateLimitter, RateLimiter } from './rateLimiter' +import calendar, { + Calendar, + Event, + ExternalCalendar, + ReccuringEvent, + ReccuringInstance, + Visibility +} from '@hcengineering/calendar' +import { parseRecurrenceStrings } from './utils' +import { htmlToMarkup } from '@hcengineering/text' +import { deepEqual } from 'fast-equals' +import contact, { Contact, getPersonRefsBySocialIds, Person } from '@hcengineering/contact' +import setting from '@hcengineering/setting' +import { AccountClient } from '@hcengineering/account-client' +import { WatchController } from './watch' + +const locks = new Map>() + +export async function lock (key: string): Promise<() => void> { + // Wait for any existing lock to be released + const currentLock = locks.get(key) + if (currentLock != null) { + await currentLock + } + + // Create a new lock + let releaseFn!: () => void + const newLock = new Promise((resolve) => { + releaseFn = resolve + }) + + // Store the lock + locks.set(key, newLock) + + // Return the release function + return () => { + if (locks.get(key) === newLock) { + locks.delete(key) + } + releaseFn() + } +} + +export class IncomingSyncManager { + private readonly rateLimiter: RateLimiter + private calendars: ExternalCalendar[] = [] + private readonly participants = new Map>() + private constructor ( + private readonly ctx: MeasureContext, + private readonly accountClient: AccountClient, + private readonly client: TxOperations, + private readonly user: User, + private readonly email: GoogleEmail, + private readonly googleClient: calendar_v3.Calendar + ) { + this.rateLimiter = getRateLimitter(this.email) + } + + static async sync ( + ctx: MeasureContext, + accountClient: AccountClient, + client: TxOperations, + user: User, + email: GoogleEmail, + googleClient: calendar_v3.Calendar + ): Promise { + const syncManager = new IncomingSyncManager(ctx, accountClient, client, user, email, googleClient) + const mutex = await lock(`${user.workspace}:${user.userId}:${email}`) + try { + await syncManager.startSync() + } finally { + mutex() + } + } + + private async fillParticipants (): Promise { + const personsBySocialId = await getPersonRefsBySocialIds(this.client) + const emailSocialIds = await this.client.findAll(contact.class.SocialIdentity, { type: SocialIdType.EMAIL }) + const emails = await this.client.findAll(contact.class.Channel, { provider: contact.channelProvider.Email }) + const integrations = await this.client.findAll(setting.class.Integration, { + type: calendar.integrationType.Calendar + }) + this.participants.clear() + + for (const sID of emailSocialIds) { + if (sID.value === '') continue + const pers = personsBySocialId[sID._id] + if (pers != null) { + this.participants.set(sID.value, pers) + } + } + + for (const channel of emails) { + if (channel.value === '') continue + const pers = channel.attachedTo as Ref + if (this.participants.has(channel.value)) continue + this.participants.set(channel.value, pers) + } + + for (const integration of integrations) { + if (integration.value === '') continue + const pers = personsBySocialId[integration.createdBy ?? integration.modifiedBy] + if (pers != null) { + this.participants.set(integration.value, pers) + } + } + } + + private async getParticipantsMap (): Promise>> { + if (this.participants.size === 0) { + await this.fillParticipants() + } + return this.participants + } + + static async push ( + ctx: MeasureContext, + accountClient: AccountClient, + client: TxOperations, + user: Token, + googleClient: calendar_v3.Calendar, + calendarId: string | null + ): Promise { + const syncManager = new IncomingSyncManager(ctx, accountClient, client, user, user.email, googleClient) + const mutex = await lock(`${user.workspace}:${user.userId}:${user.email}`) + try { + await syncManager.getMyCalendars() + if (calendarId !== null) { + await syncManager.sync(calendarId) + } else { + await syncManager.syncCalendars() + } + } finally { + mutex() + } + } + + private async sync (calendarId: string): Promise { + await this.syncEvents(calendarId) + const watchController = WatchController.get(this.accountClient) + await this.rateLimiter.take(1) + await watchController.addWatch(this.user, this.email, calendarId, this.googleClient) + } + + private async startSync (): Promise { + try { + await this.getMyCalendars() + await this.syncCalendars() + await this.getMyCalendars() + for (const calendar of this.calendars) { + if (calendar.externalId !== undefined) { + await this.sync(calendar.externalId) + } + } + } catch (err) { + this.ctx.error('Start sync error', { workspace: this.user.workspace, user: this.user.userId, err }) + } + } + + private async syncEvents (calendarId: string): Promise { + const history = await getEventHistory(this.user, calendarId) + await this.eventsSync(calendarId, history) + } + + private async eventsSync (calendarId: string, syncToken?: string, pageToken?: string): Promise { + try { + await this.rateLimiter.take(1) + const res = await this.googleClient.events.list({ + calendarId, + syncToken, + pageToken, + showDeleted: syncToken != null + }) + if (res.status === 410) { + await this.eventsSync(calendarId) + return + } + const nextPageToken = res.data.nextPageToken + for (const event of res.data.items ?? []) { + try { + await this.syncEvent(calendarId, event, res.data.accessRole ?? 'reader') + } catch (err) { + this.ctx.error('save event error', { workspace: this.user.workspace, user: this.user.userId, err }) + } + } + if (nextPageToken != null) { + await this.eventsSync(calendarId, syncToken, nextPageToken) + } + if (res.data.nextSyncToken != null) { + await setEventHistory(this.user, calendarId, res.data.nextSyncToken) + } + // if resync + } catch (err: any) { + if (err?.response?.status === 410) { + await this.eventsSync(calendarId) + return + } + this.ctx.error('Event sync error', { workspace: this.user.workspace, user: this.user.userId, err }) + } + } + + private getEventCalendar (calendarId: string, event: calendar_v3.Schema$Event): ExternalCalendar | undefined { + const _calendar = + this.calendars.find((p) => p.externalId === event.organizer?.email) ?? + this.calendars.find((p) => p.externalId === calendarId) ?? + this.calendars[0] + return _calendar + } + + async syncEvent (calendarId: string, event: calendar_v3.Schema$Event, accessRole: string): Promise { + if (event.id != null) { + const _calendar = this.getEventCalendar(calendarId, event) + if (_calendar !== undefined) { + const exists = (await this.client.findOne(calendar.class.Event, { + eventId: event.id, + calendar: _calendar._id + })) as Event | undefined + if (exists === undefined) { + await this.saveExtEvent(event, accessRole, _calendar) + } else { + await this.updateExtEvent(event, exists) + } + } + } + } + + private async updateExtEvent (event: calendar_v3.Schema$Event, current: Event): Promise { + if (event.status === 'cancelled' && current._class !== calendar.class.ReccuringInstance) { + await this.client.remove(current) + return + } + const data: Partial> = await this.parseUpdateData(event) + if (event.recurringEventId != null) { + const diff = this.getDiff( + { + ...data, + recurringEventId: event.recurringEventId as Ref, + originalStartTime: parseDate(event.originalStartTime), + isCancelled: event.status === 'cancelled' + }, + current as ReccuringInstance + ) + if (Object.keys(diff).length > 0) { + await this.client.update(current, diff) + } + } else { + if (event.recurrence != null) { + const parseRule = parseRecurrenceStrings(event.recurrence) + const diff = this.getDiff( + { + ...data, + rules: parseRule.rules, + exdate: parseRule.exdate, + rdate: parseRule.rdate + }, + current as ReccuringEvent + ) + if (Object.keys(diff).length > 0) { + await this.client.update(current, diff) + } + } else { + const diff = this.getDiff(data, current) + if (Object.keys(diff).length > 0) { + await this.client.update(current, diff) + } + } + } + await this.updateMixins(event, current) + } + + private async updateMixins (event: calendar_v3.Schema$Event, current: Event): Promise { + const mixins = this.parseMixins(event) + if (mixins !== undefined) { + for (const mixin in mixins) { + const attr = mixins[mixin] + if (typeof attr === 'object' && Object.keys(attr).length > 0) { + if (this.client.getHierarchy().hasMixin(current, mixin as Ref>)) { + const diff = this.getDiff(attr, this.client.getHierarchy().as(current, mixin as Ref>)) + if (Object.keys(diff).length > 0) { + await this.client.updateMixin( + current._id, + current._class, + calendar.space.Calendar, + mixin as Ref>, + diff + ) + } + } else { + await this.client.createMixin( + current._id, + current._class, + calendar.space.Calendar, + mixin as Ref>, + attr + ) + } + } + } + } + } + + private getAccess ( + event: calendar_v3.Schema$Event, + accessRole: string + ): 'freeBusyReader' | 'reader' | 'writer' | 'owner' { + if (accessRole !== 'owner') { + return accessRole as 'freeBusyReader' | 'reader' | 'writer' + } + if (event.creator?.self === true) { + return 'owner' + } else { + return 'reader' + } + } + + private async parseData ( + event: calendar_v3.Schema$Event, + accessRole: string, + _calendar: Ref + ): Promise> { + const participants = await this.getParticipants(event) + const res: AttachedData = { + date: parseDate(event.start), + dueDate: parseDate(event.end), + allDay: event.start?.date != null, + description: htmlToMarkup(event.description ?? ''), + title: event.summary ?? '', + location: event.location ?? undefined, + participants: participants[0], + eventId: event.id ?? '', + calendar: _calendar, + access: this.getAccess(event, accessRole), + timeZone: event.start?.timeZone ?? event.end?.timeZone ?? 'Etc/GMT', + user: this.user.userId + } + if (participants[1].length > 0) { + res.externalParticipants = participants[1] + } + if (event.visibility != null && event.visibility !== 'default') { + res.visibility = + event.visibility === 'public' + ? 'public' + : (event.extendedProperties?.private?.visibility as Visibility) ?? 'private' + } + return res + } + + private getParticipant ( + map: Map>, + value: string + ): { + contact?: Ref + extra?: string + } { + const contact = map.get(value) + if (contact !== undefined) { + return { + contact + } + } + return { + extra: value + } + } + + private async getParticipants (event: calendar_v3.Schema$Event): Promise<[Ref[], string[]]> { + const map = await this.getParticipantsMap() + const contacts = new Set>() + const extra = new Set() + if (event.creator?.email != null) { + const res = this.getParticipant(map, event.creator.email) + if (res.contact !== undefined) { + contacts.add(res.contact) + } + if (res.extra !== undefined) { + extra.add(res.extra) + } + } + for (const attendee of event.attendees ?? []) { + if (attendee.email != null) { + const res = this.getParticipant(map, attendee.email) + if (res.contact !== undefined) { + contacts.add(res.contact) + } + if (res.extra !== undefined) { + extra.add(res.extra) + } + } + } + return [Array.from(contacts), Array.from(extra)] + } + + private getDiff(data: Partial>, current: T): Partial> { + const res = {} + for (const key in data) { + if (!deepEqual((data as any)[key], (current as any)[key])) { + ;(res as any)[key] = (data as any)[key] + } + } + return res + } + + private async parseUpdateData (event: calendar_v3.Schema$Event): Promise>> { + const res: Partial> = {} + if (event.attendees !== undefined) { + const participants = await this.getParticipants(event) + res.participants = participants[0] + if (participants[1].length > 0) { + res.externalParticipants = participants[1] + } + } + if (event.location != null) { + res.location = event.location + } + if (event.description != null) { + res.description = htmlToMarkup(event.description) + } + if (event.summary != null) { + res.title = event.summary + } + if (event.start != null) { + res.date = parseDate(event.start) + } + if (event.end != null) { + res.dueDate = parseDate(event.end) + } + if (event.visibility != null && event.visibility !== 'default') { + res.visibility = + event.visibility === 'public' + ? 'public' + : (event.extendedProperties?.private?.visibility as Visibility) ?? 'private' + } + + return res + } + + private async saveExtEvent ( + event: calendar_v3.Schema$Event, + accessRole: string, + _calendar: ExternalCalendar + ): Promise { + const data: AttachedData = await this.parseData(event, accessRole, _calendar._id) + if (event.recurringEventId != null) { + const parseRule = parseRecurrenceStrings(event.recurrence ?? []) + const id = await this.client.addCollection( + calendar.class.ReccuringInstance, + calendar.space.Calendar, + calendar.ids.NoAttached, + calendar.class.Event, + 'events', + { + ...data, + recurringEventId: event.recurringEventId, + originalStartTime: parseDate(event.originalStartTime), + isCancelled: event.status === 'cancelled', + rules: parseRule.rules, + exdate: parseRule.exdate, + rdate: parseRule.rdate, + timeZone: event.start?.timeZone ?? event.end?.timeZone ?? 'Etc/GMT' + } + ) + await this.saveMixins(event, id) + } else if (event.status !== 'cancelled') { + if (event.recurrence != null) { + const parseRule = parseRecurrenceStrings(event.recurrence) + const id = await this.client.addCollection( + calendar.class.ReccuringEvent, + calendar.space.Calendar, + calendar.ids.NoAttached, + calendar.class.Event, + 'events', + { + ...data, + rules: parseRule.rules, + exdate: parseRule.exdate, + rdate: parseRule.rdate, + originalStartTime: data.date, + timeZone: event.start?.timeZone ?? event.end?.timeZone ?? 'Etc/GMT' + } + ) + await this.saveMixins(event, id) + } else { + const id = await this.client.addCollection( + calendar.class.Event, + calendar.space.Calendar, + calendar.ids.NoAttached, + calendar.class.Event, + 'events', + data + ) + await this.saveMixins(event, id) + } + } + } + + private async saveMixins (event: calendar_v3.Schema$Event, _id: Ref): Promise { + const mixins = this.parseMixins(event) + if (mixins !== undefined) { + for (const mixin in mixins) { + const attr = mixins[mixin] + if (typeof attr === 'object' && Object.keys(attr).length > 0) { + await this.client.createMixin( + _id, + calendar.class.Event, + calendar.space.Calendar, + mixin as Ref>, + attr + ) + } + } + } + } + + private parseMixins (event: calendar_v3.Schema$Event): Record | undefined { + if (event.extendedProperties?.shared?.mixins !== undefined) { + const mixins = JSON.parse(event.extendedProperties.shared.mixins) + return mixins + } + } + + private async getMyCalendars (): Promise { + this.calendars = await this.client.findAll(calendar.class.ExternalCalendar, { + createdBy: this.user.userId, + hidden: false + }) + } + + async syncCalendars (): Promise { + const history = await getCalendarsSyncHistory(this.user) + await this.calendarSync(history) + const watchController = WatchController.get(this.accountClient) + await this.rateLimiter.take(1) + await watchController.addWatch(this.user, this.email, null, this.googleClient) + } + + private async calendarSync (sync?: string): Promise { + let syncToken = sync + let pageToken: string | undefined + while (true) { + try { + await this.rateLimiter.take(1) + const res = await this.googleClient.calendarList.list({ + syncToken, + pageToken + }) + if (res.status === 410) { + syncToken = undefined + pageToken = undefined + continue + } + const nextPageToken = res.data.nextPageToken + for (const calendar of res.data.items ?? []) { + try { + await this.syncCalendar(calendar) + } catch (err) { + this.ctx.error('save calendars error', { calendar: JSON.stringify(calendar), err }) + } + } + if (nextPageToken != null) { + pageToken = nextPageToken + continue + } + if (res.data.nextSyncToken != null) { + await setCalendarsSyncHistory(this.user, res.data.nextSyncToken) + } + return + } catch (err: any) { + this.ctx.error('Calendars sync error', { workspace: this.user.workspace, user: this.user.userId, err }) + if (err?.response?.status === 410) { + syncToken = undefined + pageToken = undefined + } + } + } + } + + private async syncCalendar (val: calendar_v3.Schema$CalendarListEntry): Promise { + if (val.id != null) { + const exists = this.calendars.find((p) => p.externalId === val.id && p.externalUser === this.email) + if (exists === undefined) { + const data: Data = { + name: val.summary ?? '', + visibility: 'freeBusy', + hidden: false, + externalId: val.id, + externalUser: this.email, + default: false + } + if (val.primary === true) { + const primaryExists = this.calendars.length > 0 + if (primaryExists === undefined) { + data.default = true + } + } + const _id = generateId() + const tx = this.client.txFactory.createTxCreateDoc( + calendar.class.ExternalCalendar, + calendar.space.Calendar, + data, + _id, + undefined, + this.user.userId + ) + await this.client.tx(tx) + this.calendars.push(TxProcessor.createDoc2Doc(tx)) + } else { + const update: DocumentUpdate = {} + if (exists.name !== val.summary) { + update.name = val.summary ?? exists.name + } + if (Object.keys(update).length > 0) { + await this.client.update(exists, update) + } + } + } + } +} + +function parseDate (date: calendar_v3.Schema$EventDateTime | undefined): number { + if (date?.dateTime != null) { + return new Date(date.dateTime).getTime() + } + if (date?.date != null) { + return new Date(date.date).getTime() + } + return 0 +} diff --git a/services/calendar/pod-calendar/src/tokens.ts b/services/calendar/pod-calendar/src/tokens.ts new file mode 100644 index 0000000000..f839e71572 --- /dev/null +++ b/services/calendar/pod-calendar/src/tokens.ts @@ -0,0 +1,29 @@ +// +// Copyright © 2025 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { AccountClient, IntegrationSecret } from '@hcengineering/account-client' +import { WorkspaceUuid } from '@hcengineering/core' +import { CALENDAR_INTEGRATION } from './types' + +export async function getWorkspaceTokens ( + accountClient: AccountClient, + workspace: WorkspaceUuid +): Promise { + const secrets = await accountClient.listIntegrationsSecrets({ + kind: CALENDAR_INTEGRATION, + workspaceUuid: workspace + }) + return secrets +} diff --git a/services/calendar/pod-calendar/src/types.ts b/services/calendar/pod-calendar/src/types.ts index 2ac6ac0903..786dcd61e4 100644 --- a/services/calendar/pod-calendar/src/types.ts +++ b/services/calendar/pod-calendar/src/types.ts @@ -18,7 +18,12 @@ import type { PersonId, Timestamp, WorkspaceUuid } from '@hcengineering/core' import type { NextFunction, Request, Response } from 'express' import type { Credentials } from 'google-auth-library' +export const CALENDAR_INTEGRATION = 'google-calendar' + +export type GoogleEmail = string & { googleEmail: true } + export interface WatchBase { + email: GoogleEmail userId: PersonId workspace: WorkspaceUuid expired: Timestamp @@ -42,7 +47,7 @@ export interface DummyWatch { calendarId: string } -export type Token = User & Credentials +export type Token = User & Credentials & { email: GoogleEmail } export interface CalendarHistory { userId: PersonId @@ -71,7 +76,6 @@ export interface ReccuringData { export interface User { userId: PersonId workspace: WorkspaceUuid - token: string } export type State = User & { @@ -109,3 +113,10 @@ export interface ProjectCredentialsData { client_secret: string redirect_uris: string[] } + +export const SCOPES = [ + 'https://www.googleapis.com/auth/calendar.calendars.readonly', + 'https://www.googleapis.com/auth/calendar.calendarlist.readonly', + 'https://www.googleapis.com/auth/calendar.events', + 'https://www.googleapis.com/auth/userinfo.email' +] diff --git a/services/calendar/pod-calendar/src/utils.ts b/services/calendar/pod-calendar/src/utils.ts index 1238363a79..80bd624cf0 100644 --- a/services/calendar/pod-calendar/src/utils.ts +++ b/services/calendar/pod-calendar/src/utils.ts @@ -1,5 +1,5 @@ // -// Copyright © 2023 Hardcore Engineering Inc. +// Copyright © 2025 Hardcore Engineering Inc. // // Licensed under the Eclipse Public License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. You may @@ -14,8 +14,12 @@ // import { RecurringRule } from '@hcengineering/calendar' -import { Timestamp } from '@hcengineering/core' -import { ReccuringData, type Token, type User } from './types' +import { systemAccountUuid, Timestamp, WorkspaceUuid } from '@hcengineering/core' +import { generateToken } from '@hcengineering/server-token' +import { OAuth2Client } from 'google-auth-library' +import { calendar_v3, google } from 'googleapis' +import config from './config' +import { CALENDAR_INTEGRATION, ReccuringData, State, type Token, type User } from './types' export class DeferredPromise { public readonly promise: Promise @@ -38,7 +42,7 @@ export class DeferredPromise { } } -export function isToken (user: User | Token): user is Token { +export function isToken (user: User | Token | State): user is Token { return (user as Token).access_token !== undefined } @@ -248,3 +252,34 @@ export function encodeReccuring (rules: RecurringRule[], rdates: number[], exdat return res } + +let serviceToken: string | undefined + +export function getServiceToken (): string { + if (serviceToken === undefined) { + serviceToken = generateServiceToken() + } + return serviceToken +} + +export function getWorkspaceToken (workspace: WorkspaceUuid): string { + return generateToken(systemAccountUuid, workspace, { service: CALENDAR_INTEGRATION }) +} + +function generateServiceToken (): string { + return generateToken(systemAccountUuid, undefined, { service: CALENDAR_INTEGRATION }) +} + +export function getGoogleClient (): { + auth: OAuth2Client + google: calendar_v3.Calendar +} { + const credentials = JSON.parse(config.Credentials) + const { client_secret, client_id, redirect_uris } = credentials.web // eslint-disable-line + const oAuth2Client = new google.auth.OAuth2(client_id, client_secret, redirect_uris[0]) // eslint-disable-line + const googleClient = google.calendar({ version: 'v3', auth: oAuth2Client }) + return { + auth: oAuth2Client, + google: googleClient + } +} diff --git a/services/calendar/pod-calendar/src/watch.ts b/services/calendar/pod-calendar/src/watch.ts index 80d179977a..9ed8b75096 100644 --- a/services/calendar/pod-calendar/src/watch.ts +++ b/services/calendar/pod-calendar/src/watch.ts @@ -1,41 +1,44 @@ -import { generateId, isActiveMode, systemAccountUuid, WorkspaceUuid } from '@hcengineering/core' -import { generateToken } from '@hcengineering/server-token' +import { AccountClient } from '@hcengineering/account-client' +import { generateId, isActiveMode, PersonId, WorkspaceUuid } from '@hcengineering/core' import { Credentials, OAuth2Client } from 'google-auth-library' -import { calendar_v3, google } from 'googleapis' -import { Collection, Db } from 'mongodb' +import { calendar_v3 } from 'googleapis' import config from './config' -import { RateLimiter } from './rateLimiter' -import { EventWatch, Token, Watch, WatchBase } from './types' -import { getAccountClient } from '@hcengineering/server-client' +import { getKvsClient } from './kvsUtils' +import { getRateLimitter, RateLimiter } from './rateLimiter' +import { CALENDAR_INTEGRATION, EventWatch, GoogleEmail, Token, User, Watch, WatchBase } from './types' +import { getGoogleClient } from './utils' export class WatchClient { - private readonly watches: Collection private readonly oAuth2Client: OAuth2Client private readonly calendar: calendar_v3.Calendar private readonly user: Token - private me: string = '' - readonly rateLimiter = new RateLimiter(1000, 500) + readonly rateLimiter: RateLimiter - private constructor (mongo: Db, token: Token) { + private constructor (token: Token) { this.user = token - this.watches = mongo.collection('watch') - const credentials = JSON.parse(config.Credentials) - const { client_secret, client_id, redirect_uris } = credentials.web // eslint-disable-line - this.oAuth2Client = new google.auth.OAuth2(client_id, client_secret, redirect_uris[0]) // eslint-disable-line - this.calendar = google.calendar({ version: 'v3', auth: this.oAuth2Client }) + this.rateLimiter = getRateLimitter(this.user.email) + + const res = getGoogleClient() + this.calendar = res.google + this.oAuth2Client = res.auth } - static async Create (mongo: Db, token: Token): Promise { - const watchClient = new WatchClient(mongo, token) - await watchClient.init(token) + static async Create (token: Token): Promise { + const watchClient = new WatchClient(token) + await watchClient.setToken(token) return watchClient } + private async getWatches (): Promise> { + const client = getKvsClient() + const key = `${CALENDAR_INTEGRATION}:watch:${this.user.workspace}:${this.user.userId}` + const watches = await client.listKeys(key) + return watches ?? {} + } + private async setToken (token: Credentials): Promise { try { this.oAuth2Client.setCredentials(token) - const info = await google.oauth2({ version: 'v2', auth: this.oAuth2Client }).userinfo.get() - this.me = info.data.email ?? '' } catch (err: any) { console.error('Set token error', this.user.workspace, this.user.userId, err) await this.checkError(err) @@ -45,14 +48,14 @@ export class WatchClient { async checkError (err: any): Promise { if (err?.response?.data?.error === 'invalid_grant') { - await this.watches.deleteMany({ userId: this.user.userId, workspace: this.user.workspace }) + const watches = await this.getWatches() + const client = getKvsClient() + for (const key in watches) { + await client.deleteKey(key) + } } } - private async init (token: Token): Promise { - await this.setToken(token) - } - async subscribe (watches: Watch[]): Promise { for (const watch of watches) { if (watch.calendarId == null) { @@ -70,34 +73,16 @@ export class WatchClient { } private async unsubscribeWatch (current: Watch): Promise { - await this.rateLimiter.take(1) - await this.calendar.channels.stop({ requestBody: { id: current.channelId, resourceId: current.resourceId } }) + try { + await this.rateLimiter.take(1) + await this.calendar.channels.stop({ requestBody: { id: current.channelId, resourceId: current.resourceId } }) + } catch {} } private async watchCalendars (current: Watch): Promise { try { await this.unsubscribeWatch(current) - const channelId = generateId() - const body = { id: channelId, address: config.WATCH_URL, type: 'webhook', token: `user=${this.me}&mode=calendar` } - await this.rateLimiter.take(1) - const res = await this.calendar.calendarList.watch({ requestBody: body }) - if (res.data.expiration != null && res.data.resourceId !== null) { - // eslint-disable-next-line - this.watches.updateOne( - { - userId: current.userId, - workspace: current.workspace, - calendarId: null - }, - { - $set: { - channelId, - expired: Number.parseInt(res.data.expiration), - resourceId: res.data.resourceId ?? '' - } - } - ) - } + await watchCalendars(this.user, this.user.email, this.calendar) } catch (err) { console.error('Calendar watch error', err) } @@ -106,66 +91,98 @@ export class WatchClient { private async watchCalendar (current: EventWatch): Promise { try { await this.unsubscribeWatch(current) - const channelId = generateId() - const body = { - id: channelId, - address: config.WATCH_URL, - type: 'webhook', - token: `user=${this.me}&mode=events&calendarId=${current.calendarId}` - } - await this.rateLimiter.take(1) - const res = await this.calendar.events.watch({ calendarId: current.calendarId, requestBody: body }) - if (res.data.expiration != null && res.data.resourceId != null) { - // eslint-disable-next-line - this.watches.updateOne( - { - userId: current.userId, - workspace: current.workspace, - calendarId: current.calendarId - }, - { - $set: { - channelId, - expired: Number.parseInt(res.data.expiration), - resourceId: res.data.resourceId ?? '' - } - } - ) - } + await watchCalendar(this.user, this.user.email, current.calendarId, this.calendar) } catch (err: any) { await this.checkError(err) } } } +async function watchCalendars (user: User, email: GoogleEmail, googleClient: calendar_v3.Calendar): Promise { + const channelId = generateId() + const body = { id: channelId, address: config.WATCH_URL, type: 'webhook', token: `user=${email}&mode=calendar` } + const res = await googleClient.calendarList.watch({ requestBody: body }) + if (res.data.expiration != null && res.data.resourceId !== null) { + const client = getKvsClient() + const key = `${CALENDAR_INTEGRATION}:watch:${user.workspace}:${user.userId}:null` + await client.setValue(key, { + userId: user.userId, + workspace: user.workspace, + email, + calendarId: null, + channelId, + expired: Number.parseInt(res.data.expiration), + resourceId: res.data.resourceId ?? '' + }) + } +} + +async function watchCalendar ( + user: User, + email: GoogleEmail, + calendarId: string, + googleClient: calendar_v3.Calendar +): Promise { + const channelId = generateId() + const body = { + id: channelId, + address: config.WATCH_URL, + type: 'webhook', + token: `user=${email}&mode=events&calendarId=${calendarId}` + } + const res = await googleClient.events.watch({ calendarId, requestBody: body }) + if (res.data.expiration != null && res.data.resourceId != null) { + const client = getKvsClient() + const key = `${CALENDAR_INTEGRATION}:watch:${user.workspace}:${user.userId}:${calendarId}` + await client.setValue(key, { + userId: user.userId, + workspace: user.workspace, + email, + calendarId, + channelId, + expired: Number.parseInt(res.data.expiration), + resourceId: res.data.resourceId ?? '' + }) + } +} + // we have to refresh channels approx each week export class WatchController { - private readonly watches: Collection - private readonly tokens: Collection - private timer: NodeJS.Timeout | undefined = undefined protected static _instance: WatchController - private constructor (private readonly mongo: Db) { - this.watches = mongo.collection('watch') - this.tokens = mongo.collection('tokens') + private constructor (private readonly accountClient: AccountClient) { console.log('watch started') } - static get (mongo: Db): WatchController { + static get (accountClient: AccountClient): WatchController { if (WatchController._instance !== undefined) { return WatchController._instance } - return new WatchController(mongo) + return new WatchController(accountClient) + } + + private async getUserWatches (userId: PersonId, workspace: WorkspaceUuid): Promise> { + const client = getKvsClient() + const key = `${CALENDAR_INTEGRATION}:watch:${workspace}:${userId}` + return (await client.listKeys(key)) ?? {} } async unsubscribe (user: Token): Promise { - const allWatches = await this.watches.find({ userId: user.userId, workspae: user.workspace }).toArray() - await this.watches.deleteMany({ userId: user.userId, workspae: user.workspace }) - const token = this.tokens.findOne({ user: user.userId, workspace: user.workspace }) + const client = getKvsClient() + const watches = await this.getUserWatches(user.userId, user.workspace) + for (const key in watches) { + await client.deleteKey(key) + } + const token = await this.accountClient.getIntegrationSecret({ + socialId: user.userId, + kind: CALENDAR_INTEGRATION, + workspaceUuid: user.workspace, + key: user.email + }) if (token == null) return - const watchClient = await WatchClient.Create(this.mongo, user) - await watchClient.unsubscribe(allWatches) + const watchClient = await WatchClient.Create(user) + await watchClient.unsubscribe(Object.values(watches)) } stop (): void { @@ -186,15 +203,21 @@ export class WatchController { async checkAll (): Promise { const expired = Date.now() + 24 * 60 * 60 * 1000 - const watches = await this.watches - .find({ - expired: { $lt: expired } - }) - .toArray() - console.log('watch, found for update', watches.length) + const client = getKvsClient() + const key = `${CALENDAR_INTEGRATION}:watch:` + const watches = (await client.listKeys(key)) ?? {} + const toRefresh: Watch[] = [] + for (const key in watches) { + const watch = watches[key] + if (watch.expired < expired) { + toRefresh.push(watch) + } + } + console.log('watch, found for update', toRefresh.length) + if (toRefresh.length === 0) return const groups = new Map() const workspaces = new Set() - for (const watch of watches) { + for (const watch of toRefresh) { workspaces.add(watch.workspace) const key = `${watch.userId}:${watch.workspace}` const group = groups.get(key) @@ -204,28 +227,56 @@ export class WatchController { groups.set(key, [watch]) } } - const token = generateToken(systemAccountUuid) const ids = [...workspaces] - const infos = await getAccountClient(token).getWorkspacesInfo(ids) - const tokens = await this.tokens.find({ workspace: { $in: ids } }).toArray() + if (ids.length === 0) return + const infos = await this.accountClient.getWorkspacesInfo(ids) + const tokens = await this.accountClient.listIntegrationsSecrets({ kind: CALENDAR_INTEGRATION }) for (const group of groups.values()) { try { const userId = group[0].userId const workspace = group[0].workspace - const token = tokens.find((p) => p.workspace === workspace && p.userId === userId) + const token = tokens.find((p) => p.workspaceUuid === workspace && p.socialId === userId) if (token === undefined) { - await this.watches.deleteMany({ userId, workspace }) + const toRemove = await this.getUserWatches(userId, workspace) + for (const key in toRemove) { + await client.deleteKey(key) + } continue } const info = infos.find((p) => p.uuid === workspace) - if (info === undefined || isActiveMode(info.mode)) { - await this.watches.deleteMany({ userId, workspace }) + if (info === undefined || !isActiveMode(info.mode)) { + const toRemove = await this.getUserWatches(userId, workspace) + for (const key in toRemove) { + await client.deleteKey(key) + } continue } - const watchClient = await WatchClient.Create(this.mongo, token) + const watchClient = await WatchClient.Create(JSON.parse(token.secret)) await watchClient.subscribe(group) } catch {} } console.log('watch check done') } + + async addWatch ( + user: User, + email: GoogleEmail, + calendarId: string | null, + googleClient: calendar_v3.Calendar, + force: boolean = false + ): Promise { + if (!force) { + const client = getKvsClient() + const key = `${CALENDAR_INTEGRATION}:watch:${user.workspace}:${user.userId}:${calendarId ?? 'null'}` + const exists = await client.getValue(key) + if (exists != null) { + return + } + } + if (calendarId != null) { + await watchCalendar(user, email, calendarId, googleClient) + } else { + await watchCalendars(user, email, googleClient) + } + } } diff --git a/services/calendar/pod-calendar/src/workspaceClient.ts b/services/calendar/pod-calendar/src/workspaceClient.ts index 998d32239b..cf611b4bcb 100644 --- a/services/calendar/pod-calendar/src/workspaceClient.ts +++ b/services/calendar/pod-calendar/src/workspaceClient.ts @@ -13,274 +13,150 @@ // limitations under the License. // +import { AccountClient } from '@hcengineering/account-client' import calendar, { Event, ExternalCalendar } from '@hcengineering/calendar' -import contact, { - Channel, - Contact, - Person, - getPrimarySocialId, - getPersonRefBySocialId, - getPersonRefsBySocialIds, - type Employee, - SocialIdentityRef -} from '@hcengineering/contact' import core, { - PersonId, - SocialIdType, - TxMixin, + MeasureContext, RateLimiter, + SocialIdType, TxOperations, - TxProcessor, WorkspaceUuid, - toIdMap, - type Client, - type Doc, - type Ref, - type Tx, - type TxCreateDoc, - type TxRemoveDoc, - type TxUpdateDoc, - systemAccountUuid, - PersonUuid + type Ref } from '@hcengineering/core' -import { generateToken } from '@hcengineering/server-token' -import setting, { Integration } from '@hcengineering/setting' -import { Collection, type Db } from 'mongodb' import { CalendarClient } from './calendar' -import { CalendarController } from './calendarController' import { getClient } from './client' -import { SyncHistory, Token, type User } from './types' import config from './config' +import { addUserByEmail, getSyncHistory, setSyncHistory } from './kvsUtils' +import { getWorkspaceTokens } from './tokens' +import { CALENDAR_INTEGRATION, GoogleEmail, Token } from './types' +import { getWorkspaceToken } from './utils' +import contact, { getPersonRefsBySocialIds, Person } from '@hcengineering/contact' +import setting from '@hcengineering/setting' export class WorkspaceClient { - private readonly txHandlers: ((...tx: Tx[]) => Promise)[] = [] - - client!: Client - private readonly clients: Map> = new Map< - string, - CalendarClient | Promise - >() - - private readonly syncHistory: Collection - private readonly tokens: Collection - private channels = new Map, Channel>() - private readonly externalIdByPersonId = new Map() - readonly calendars = { - byId: new Map, ExternalCalendar>(), - byExternal: new Map() - } - - readonly contacts = { - byId: new Map, string>(), - byEmail: new Map>() - } - - readonly integrations = { - byId: new Map, Integration>(), - byContact: new Map, string[]>(), - byEmail: new Map>() - } + private readonly clients = new Map() + private readonly calendarsByExternal = new Map() + readonly calendarsById = new Map, ExternalCalendar>() + readonly participants = new Map, string>() private constructor ( - private readonly mongo: Db, - private readonly workspace: WorkspaceUuid, - private readonly serviceController: CalendarController - ) { - this.tokens = mongo.collection('tokens') - this.syncHistory = mongo.collection('syncHistories') + private readonly ctx: MeasureContext, + private readonly accountClient: AccountClient, + private readonly client: TxOperations, + private readonly workspace: WorkspaceUuid + ) {} + + static async run (ctx: MeasureContext, accountClient: AccountClient, workspace: WorkspaceUuid): Promise { + const client = await getClient(getWorkspaceToken(workspace)) + const txOp = new TxOperations(client, core.account.System) + const instance = new WorkspaceClient(ctx, accountClient, txOp, workspace) + + await instance.init() + await instance.startAll() + await instance.close() } - static async create ( - mongo: Db, - workspace: WorkspaceUuid, - serviceController: CalendarController - ): Promise { - const instance = new WorkspaceClient(mongo, workspace, serviceController) - - await instance.initClient(workspace) - return instance + async init (): Promise { + const calendars = await this.client.findAll(calendar.class.ExternalCalendar, {}) + this.calendarsById.clear() + this.calendarsByExternal.clear() + for (const calendar of calendars) { + this.calendarsById.set(calendar._id, calendar) + this.calendarsByExternal.set(calendar.externalId, calendar) + } + await this.fillParticipants() } - async createCalendarClient (user: User): Promise { - const current = this.getCalendarClient(user.userId) - if (current !== undefined) { - if (current instanceof Promise) { - return await current + private async fillParticipants (): Promise { + const personsBySocialId = await getPersonRefsBySocialIds(this.client) + const emailSocialIds = await this.client.findAll(contact.class.SocialIdentity, { type: SocialIdType.EMAIL }) + const emails = await this.client.findAll(contact.class.Channel, { provider: contact.channelProvider.Email }) + const integrations = await this.client.findAll(setting.class.Integration, { + type: calendar.integrationType.Calendar + }) + this.participants.clear() + + for (const sID of emailSocialIds) { + if (sID.value === '') continue + const pers = personsBySocialId[sID._id] + if (pers != null) { + this.participants.set(pers, sID.value) } + } + + for (const channel of emails) { + if (channel.value === '') continue + const pers = channel.attachedTo as Ref + if (this.participants.has(pers)) continue + this.participants.set(pers, channel.value) + } + + for (const integration of integrations) { + if (integration.value === '') continue + const pers = personsBySocialId[integration.createdBy ?? integration.modifiedBy] + if (pers != null) { + this.participants.set(pers, integration.value) + } + } + } + + async startAll (): Promise { + const tokens = await getWorkspaceTokens(this.accountClient, this.workspace) + for (const token of tokens) { + if (token.workspaceUuid === null) continue + await addUserByEmail(JSON.parse(token.secret), token.key as GoogleEmail) + await this.createCalendarClient(JSON.parse(token.secret)) + } + await this.getNewEvents() + const limiter = new RateLimiter(config.InitLimit) + for (const client of this.clients.values()) { + await limiter.add(async () => { + await client.startSync() + }) + } + await limiter.waitProcessing() + } + + private async createCalendarClient (user: Token): Promise { + const current = this.clients.get(user.email) + if (current !== undefined) { return current } - const newClient = CalendarClient.create(user, this.mongo, this.client, this) - this.clients.set(user.userId, newClient) - const res = await newClient - this.clients.set(user.userId, res) - return res - } - - async newCalendarClient (user: User, code: string): Promise { - const newClient = await CalendarClient.create(user, this.mongo, this.client, this) - const userId = await newClient.authorize(code) - if (this.clients.has(userId)) { - newClient.close() - throw new Error('Client already exist') - } - this.clients.set(userId, newClient) + const newClient = await CalendarClient.create(this.ctx, this.accountClient, user, this.client, this) + this.clients.set(user.email, newClient) return newClient } - async close (): Promise { - for (let client of this.clients.values()) { - if (client instanceof Promise) { - client = await client - } - client.close() - } + private async close (): Promise { this.clients.clear() await this.client?.close() } - async getUserId (account: PersonUuid): Promise { - const person = await this.client.findOne(contact.class.Person, { personUuid: account }) - if (person === undefined) { - throw new Error('Person not found') - } - - const personId = await getPrimarySocialId(this.client, person._id) - - if (personId === undefined) { - throw new Error('PersonId not found') - } - - return personId - } - - async signout (personId: PersonId, byError: boolean = false): Promise { - let client = this.clients.get(personId) - if (client !== undefined) { - if (client instanceof Promise) { - client = await client - } - await client.signout() - } else { - const integration = await this.client.findOne(setting.class.Integration, { - type: calendar.integrationType.Calendar, - value: personId - }) - if (integration !== undefined) { - const txOp = new TxOperations(this.client, core.account.System) - if (byError) { - await txOp.update(integration, { disabled: true }) - } else { - await txOp.remove(integration) - } - } - } - this.clients.delete(personId) - return this.clients.size - } - - removeClient (personId: PersonId): void { - this.clients.delete(personId) - if (this.clients.size > 0) return - void this.close() - this.serviceController.removeWorkspace(this.workspace) - } - - private getCalendarClient (personId: PersonId): CalendarClient | Promise | undefined { - return this.clients.get(personId) - } - - private async getCalendarClientByCalendar ( - id: Ref, - create: boolean = false - ): Promise { - const calendar = this.calendars.byId.get(id) - if (calendar === undefined) { - console.warn("couldn't find calendar by id", id) - return - } - const client = this.clients.get(calendar.externalUser) - if (client instanceof Promise) { - return await client - } - if (client === undefined && create) { - const user = await this.tokens.findOne({ - workspace: this.workspace, - access_token: { $exists: true }, - email: calendar.externalUser - }) - if (user != null) { - return await this.createCalendarClient(user) - } - } - return client - } - - private async initClient (workspace: WorkspaceUuid): Promise { - const token = generateToken(systemAccountUuid, workspace, { service: 'calendar' }) - const client = await getClient(token) - client.notify = (...tx: Tx[]) => { - void this.txHandler(...tx) - } - - this.client = client - await this.init() - return this.client - } - - private async txHandler (...tx: Tx[]): Promise { - await Promise.all( - this.txHandlers.map(async (handler) => { - await handler(...tx) - }) - ) - } - - private async init (): Promise { - await this.checkUsers() - await this.initContacts() - await this.initIntegrations() - await this.initCalendars() - } - - async sync (): Promise { - await this.getNewEvents() - const limiter = new RateLimiter(config.InitLimit) - for (let client of this.clients.values()) { - void limiter.add(async () => { - if (client instanceof Promise) { - client = await client - } - await client.startSync() - }) - } - } - // #region Events - private async getSyncTime (): Promise { - const res = await this.syncHistory.findOne({ - workspace: this.workspace - }) - return res?.timestamp + static async push ( + ctx: MeasureContext, + accountClient: AccountClient, + workspace: WorkspaceUuid, + event: Event, + type: 'create' | 'update' | 'delete' + ): Promise { + const client = await getClient(getWorkspaceToken(workspace)) + const txOp = new TxOperations(client, core.account.System) + const token = await getTokenByEvent(accountClient, txOp, event, workspace) + if (token != null) { + const instance = new WorkspaceClient(ctx, accountClient, txOp, workspace) + await instance.pushEvent(token, event, type) + await instance.close() + return + } + await txOp.close() } - async updateSyncTime (): Promise { - const timestamp = Date.now() - await this.syncHistory.updateOne( - { - workspace: this.workspace - }, - { - $set: { - timestamp - } - }, - { upsert: true } - ) - } - - async pushEvent (event: Event, type: 'create' | 'update' | 'delete'): Promise { - const client = await this.getCalendarClientByCalendar(event.calendar as Ref, true) + async pushEvent (user: Token, event: Event, type: 'create' | 'update' | 'delete'): Promise { + const client = + (await this.getCalendarClientByCalendar(event.calendar as Ref)) ?? + (await this.createCalendarClient(user)) if (client === undefined) { console.warn('Client not found', event.calendar, this.workspace) return @@ -293,387 +169,57 @@ export class WorkspaceClient { await this.updateSyncTime() } - async getNewEvents (): Promise { + private async getSyncTime (): Promise { + return (await getSyncHistory(this.workspace)) ?? undefined + } + + private async updateSyncTime (): Promise { + await setSyncHistory(this.workspace) + } + + private async getNewEvents (): Promise { const lastSync = await this.getSyncTime() const query = lastSync !== undefined ? { modifiedOn: { $gt: lastSync } } : {} const newEvents = await this.client.findAll(calendar.class.Event, query) - this.txHandlers.push(async (...tx: Tx[]) => { - await this.txEventHandler(...tx) - }) for (const newEvent of newEvents) { const client = await this.getCalendarClientByCalendar(newEvent.calendar as Ref) if (client === undefined) { - console.warn('Client not found', newEvent.calendar, this.workspace) - return + this.ctx.warn('Client not found', { calendar: newEvent.calendar, workspace: this.workspace }) + continue } await client.syncMyEvent(newEvent) await this.updateSyncTime() } - console.log('all outcoming messages synced', this.workspace) + this.ctx.info('all outcoming messages synced', this.workspace) } - private async txEventHandler (...txes: Tx[]): Promise { - for (const tx of txes) { - switch (tx._class) { - case core.class.TxCreateDoc: { - await this.txCreateEvent(tx as TxCreateDoc) - return - } - case core.class.TxUpdateDoc: { - await this.txUpdateEvent(tx as TxUpdateDoc) - return - } - case core.class.TxRemoveDoc: { - await this.txRemoveEvent(tx as TxRemoveDoc) - } - } - } - } - - private async txCreateEvent (tx: TxCreateDoc): Promise { - const hierarhy = this.client.getHierarchy() - if (hierarhy.isDerived(tx.objectClass, calendar.class.Event)) { - const doc = TxProcessor.createDoc2Doc(tx as TxCreateDoc) - if (doc.access !== 'owner') return - const client = await this.getCalendarClientByCalendar(doc.calendar as Ref) - if (client === undefined) { - return - } - try { - await client.createEvent(doc) - await this.updateSyncTime() - } catch (err) { - console.error(err) - } - } - } - - private async handleMove (tx: TxUpdateDoc): Promise { - const event = await this.client.findOne(calendar.class.Event, { _id: tx.objectId }) - if (event === undefined) { + private async getCalendarClientByCalendar (id: Ref): Promise { + const calendar = this.calendarsByExternal.get(id) + if (calendar === undefined) { + console.warn("couldn't find calendar by id", id) return } - try { - const txes = await this.client.findAll(core.class.TxCUD, { - objectId: tx.objectId - }) - const extracted = txes.filter((p) => p._id !== tx._id) - const ev = TxProcessor.buildDoc2Doc(extracted) - if (ev != null) { - const oldClient = await this.getCalendarClientByCalendar(ev.calendar as Ref) - if (oldClient !== undefined) { - const oldCalendar = this.calendars.byId.get(ev.calendar as Ref) - if (oldCalendar !== undefined) { - await oldClient.remove(event.eventId, oldCalendar.externalId) - } - } - } - } catch (err) { - console.error('Error on remove event', err) - } - try { - const client = await this.getCalendarClientByCalendar(event.calendar as Ref) - if (client !== undefined) { - await client.syncMyEvent(event) - } - await this.updateSyncTime() - } catch (err) { - console.error('Error on move event', err) - } + return this.clients.get(calendar.externalUser as GoogleEmail) } - - private async txUpdateEvent (tx: TxUpdateDoc): Promise { - const hierarhy = this.client.getHierarchy() - if (hierarhy.isDerived(tx.objectClass, calendar.class.Event)) { - if (tx.operations.calendar !== undefined) { - await this.handleMove(tx) - return - } - const event = await this.client.findOne(calendar.class.Event, { _id: tx.objectId }) - if (event === undefined) { - return - } - if (event.access !== 'owner' && event.access !== 'writer') return - const client = await this.getCalendarClientByCalendar(event.calendar as Ref) - if (client === undefined) { - return - } - try { - await client.updateEvent(event) - await this.updateSyncTime() - } catch (err) { - console.error(err) - } - } - } - - private async txRemoveEvent (tx: TxRemoveDoc): Promise { - const hierarhy = this.client.getHierarchy() - if (hierarhy.isDerived(tx.objectClass, calendar.class.Event)) { - const txes = await this.client.findAll(core.class.TxCUD, { - objectId: tx.objectId - }) - const ev = TxProcessor.buildDoc2Doc(txes) - if (ev == null) return - if (ev.access !== 'owner' && ev.access !== 'writer') return - const client = await this.getCalendarClientByCalendar(ev?.calendar as Ref) - if (client === undefined) { - return - } - await client.removeEvent(ev) - await this.updateSyncTime() - } - } - - // #endregion - - // #region Calendars - - private async initCalendars (): Promise { - const calendars = await this.client.findAll(calendar.class.ExternalCalendar, {}) - this.calendars.byId = toIdMap(calendars) - this.calendars.byExternal.clear() - for (const calendar of calendars) { - const arrByExt = this.calendars.byExternal.get(calendar.externalId) ?? [] - arrByExt.push(calendar) - this.calendars.byExternal.set(calendar.externalId, arrByExt) - } - this.txHandlers.push(async (...txes: Tx[]) => { - for (const tx of txes) { - await this.txCalendarHandler(tx) - } - }) - } - - async getExtIdByPersonId (personId: PersonId): Promise { - if (!this.externalIdByPersonId.has(personId)) { - const socialIdentity = await this.client.findOne(contact.class.SocialIdentity, { - _id: personId as SocialIdentityRef, - type: SocialIdType.GOOGLE - }) - this.externalIdByPersonId.set(personId, socialIdentity?.value ?? null) - } - - return this.externalIdByPersonId.get(personId) - } - - async getMyCalendars (personId: PersonId): Promise { - const extId = await this.getExtIdByPersonId(personId) - if (extId == null) { - return [] - } - - return this.calendars.byExternal.get(extId) ?? [] - } - - private async txCalendarHandler (actualTx: Tx): Promise { - if (actualTx._class === core.class.TxCreateDoc) { - if ((actualTx as TxCreateDoc).objectClass === calendar.class.ExternalCalendar) { - const calendar = TxProcessor.createDoc2Doc(actualTx as TxCreateDoc) - this.calendars.byId.set(calendar._id, calendar) - const arr = this.calendars.byExternal.get(calendar.externalId) ?? [] - arr.push(calendar) - this.calendars.byExternal.set(calendar.externalId, arr) - } - } - if (actualTx._class === core.class.TxRemoveDoc) { - const remTx = actualTx as TxRemoveDoc - const calendar = this.calendars.byId.get(remTx.objectId) - if (calendar !== undefined) { - this.calendars.byId.delete(remTx.objectId) - const arrByExt = this.calendars.byExternal.get(calendar.externalId) ?? [] - const indexByExt = arrByExt.findIndex((p) => p._id === calendar._id) - if (indexByExt !== -1) { - arrByExt.splice(indexByExt, 1) - this.calendars.byExternal.set(calendar.externalId, arrByExt) - } - } - } - } - - // #endregion - - // #region Contacts - - private async initContacts (): Promise { - const channels = await this.client.findAll(contact.class.Channel, { provider: contact.channelProvider.Email }) - this.channels = toIdMap(channels) - const emailSocialIds = await this.client.findAll(contact.class.SocialIdentity, { type: SocialIdType.EMAIL }) - for (const socialId of emailSocialIds) { - this.contacts.byEmail.set(socialId.value, socialId.attachedTo) - // Note: this doesn't seem to support multiple emails - this.contacts.byId.set(socialId.attachedTo, socialId.value) - } - for (const channel of channels) { - if (channel.value !== '') { - this.contacts.byEmail.set(channel.value, channel.attachedTo as Ref) - this.contacts.byId.set(channel.attachedTo as Ref, channel.value) - } - } - this.txHandlers.push(async (...txes: Tx[]) => { - for (const tx of txes) { - await this.txChannelHandler(tx) - } - }) - } - - private async txChannelHandler (actualTx: Tx): Promise { - if (actualTx._class === core.class.TxCreateDoc) { - if ((actualTx as TxCreateDoc).objectClass === contact.class.Channel) { - const channel = TxProcessor.createDoc2Doc(actualTx as TxCreateDoc) - if (channel.provider === contact.channelProvider.Email) { - this.contacts.byEmail.set(channel.value, channel.attachedTo as Ref) - this.contacts.byId.set(channel.attachedTo as Ref, channel.value) - this.channels.set(channel._id, channel) - } - } - } - if (actualTx._class === core.class.TxUpdateDoc) { - const updateTx = actualTx as TxUpdateDoc - if (updateTx.operations.value !== undefined) { - const channel = this.channels.get(updateTx.objectId) - if (channel !== undefined) { - const oldValue = channel.value - this.contacts.byEmail.delete(oldValue) - TxProcessor.updateDoc2Doc(channel, updateTx) - this.contacts.byEmail.set(channel.value, channel.attachedTo as Ref) - this.contacts.byId.set(channel.attachedTo as Ref, channel.value) - this.channels.set(channel._id, channel) - } - } - } - if (actualTx._class === core.class.TxRemoveDoc) { - const remTx = actualTx as TxRemoveDoc - const channel = this.channels.get(remTx.objectId) - if (channel !== undefined) { - this.contacts.byEmail.delete(channel.value) - this.contacts.byId.delete(channel.attachedTo as Ref) - this.channels.delete(channel._id) - } - } - } - - // #endregion - - // #region Integrations - - private async initIntegrations (): Promise { - const personsBySocialId = await getPersonRefsBySocialIds(this.client) - const integrations = await this.client.findAll(setting.class.Integration, { - type: calendar.integrationType.Calendar - }) - for (const integration of integrations) { - const person = personsBySocialId[integration.createdBy ?? integration.modifiedBy] - if (person != null) { - this.integrations.byEmail.set(integration.value, person) - const arr = this.integrations.byContact.get(person) ?? [] - arr.push(integration.value) - this.integrations.byContact.set(person, arr) - this.integrations.byId.set(integration._id, integration) - } - } - this.txHandlers.push(async (...txes: Tx[]) => { - for (const tx of txes) { - await this.txIntegrationHandler(tx) - } - }) - } - - private addContactIntegration (integration: Integration, person: Ref): void { - const arr = this.integrations.byContact.get(person) ?? [] - arr.push(integration.value) - this.integrations.byContact.set(person, arr) - } - - private removeContactIntegration (integration: Integration, person: Ref): void { - const arr = this.integrations.byContact.get(person) - if (arr !== undefined) { - const index = arr.findIndex((p) => p === integration.value) - if (index !== -1) { - arr.splice(index, 1) - if (arr.length > 0) { - this.integrations.byContact.set(person, arr) - } else { - this.integrations.byContact.delete(person) - } - } - } - } - - private async addIntegration (integration: Integration): Promise { - const person = await getPersonRefBySocialId(this.client, integration.createdBy ?? integration.modifiedBy) - if (person != null) { - if (integration.value !== '') { - this.integrations.byEmail.set(integration.value, person) - this.addContactIntegration(integration, person) - } - this.integrations.byId.set(integration._id, integration) - } - } - - private async removeIntegration (integration: Integration): Promise { - const person = await getPersonRefBySocialId(this.client, integration.createdBy ?? integration.modifiedBy) - if (person != null) { - this.removeContactIntegration(integration, person) - } - this.integrations.byEmail.delete(integration.value) - this.integrations.byId.delete(integration._id) - } - - private async txIntegrationHandler (actualTx: Tx): Promise { - if (actualTx._class === core.class.TxCreateDoc) { - if ((actualTx as TxCreateDoc).objectClass === setting.class.Integration) { - const integration = TxProcessor.createDoc2Doc(actualTx as TxCreateDoc) - if (integration.type === calendar.integrationType.Calendar) { - await this.addIntegration(integration) - } - } - } - if (actualTx._class === core.class.TxRemoveDoc) { - const remTx = actualTx as TxRemoveDoc - const integration = this.integrations.byId.get(remTx.objectId) - if (integration !== undefined) { - await this.removeIntegration(integration) - } - } - } - - // #endregion - - // #region Users - - async checkUsers (): Promise { - const removedEmployees = await this.client.findAll(contact.mixin.Employee, { - active: false - }) - - this.txHandlers.push(async (...txes: Tx[]) => { - for (const tx of txes) { - await this.txEmployeeHandler(tx) - } - }) - for (const employee of removedEmployees) { - await this.deactivateUser(employee._id) - } - } - - private async deactivateUser (person: Ref): Promise { - const integrations = this.integrations.byContact.get(person) ?? [] - for (const integration of integrations) { - if (integration !== '') { - await this.signout(integration as any, true) // TODO: FIXME - } - } - } - - private async txEmployeeHandler (tx: Tx): Promise { - if (tx._class !== core.class.TxUpdateDoc) return - const ctx = tx as TxMixin - if (!this.client.getHierarchy().isDerived(ctx.objectClass, contact.mixin.Employee)) return - if (ctx.attributes.active === false) { - await this.deactivateUser(ctx.objectId) - } - } - // #endregion } + +async function getTokenByEvent ( + accountClient: AccountClient, + txOp: TxOperations, + event: Event, + workspace: WorkspaceUuid +): Promise { + const _calendar = await txOp.findOne(calendar.class.ExternalCalendar, { + _id: event.calendar as Ref + }) + if (_calendar === undefined) return + const res = await accountClient.getIntegrationSecret({ + socialId: event.user, + kind: CALENDAR_INTEGRATION, + workspaceUuid: workspace, + key: _calendar.externalUser + }) + if (res == null) return + return JSON.parse(res.secret) +}