mirror of
https://github.com/hcengineering/platform.git
synced 2025-05-10 01:15:03 +00:00
Rewrite calendar service (#8851)
Signed-off-by: Denis Bykhov <bykhov.denis@gmail.com>
This commit is contained in:
parent
232eae5448
commit
44cecc1a73
@ -1475,7 +1475,15 @@ export async function setTimezoneIfNotDefined (
|
||||
}
|
||||
|
||||
// Move to config?
|
||||
export const integrationServices = ['github', 'telegram-bot', 'telegram', 'mailbox', 'caldav', 'gmail']
|
||||
export const integrationServices = [
|
||||
'github',
|
||||
'telegram-bot',
|
||||
'telegram',
|
||||
'mailbox',
|
||||
'caldav',
|
||||
'gmail',
|
||||
'google-calendar'
|
||||
]
|
||||
|
||||
export async function findExistingIntegration (
|
||||
account: AccountUuid,
|
||||
|
@ -65,6 +65,9 @@
|
||||
"@hcengineering/setting": "^0.6.17",
|
||||
"@hcengineering/text": "^0.6.5",
|
||||
"@hcengineering/server-client": "^0.6.0",
|
||||
"@hcengineering/server-core": "^0.6.1",
|
||||
"@hcengineering/analytics-service": "^0.6.0",
|
||||
"@hcengineering/kvs-client": "^0.6.0",
|
||||
"@hcengineering/server-token": "^0.6.11",
|
||||
"dotenv": "~16.0.0",
|
||||
"cors": "^2.8.5",
|
||||
|
295
services/calendar/pod-calendar/src/auth.ts
Normal file
295
services/calendar/pod-calendar/src/auth.ts
Normal file
@ -0,0 +1,295 @@
|
||||
//
|
||||
// Copyright © 2025 Hardcore Engineering Inc.
|
||||
//
|
||||
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License. You may
|
||||
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
//
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import { AccountClient, IntegrationSecret } from '@hcengineering/account-client'
|
||||
import calendar from '@hcengineering/calendar'
|
||||
import contact, { getPrimarySocialId } from '@hcengineering/contact'
|
||||
import core, { AccountUuid, MeasureContext, PersonId, TxOperations, WorkspaceUuid } from '@hcengineering/core'
|
||||
import setting from '@hcengineering/setting'
|
||||
import { Credentials, OAuth2Client } from 'google-auth-library'
|
||||
import { calendar_v3, google } from 'googleapis'
|
||||
import { encode64 } from './base64'
|
||||
import { getClient } from './client'
|
||||
import { addUserByEmail, removeUserByEmail } from './kvsUtils'
|
||||
import { IncomingSyncManager, lock } from './sync'
|
||||
import { CALENDAR_INTEGRATION, GoogleEmail, SCOPES, State, Token, User } from './types'
|
||||
import { getGoogleClient, getServiceToken } from './utils'
|
||||
import { WatchController } from './watch'
|
||||
|
||||
interface AuthResult {
|
||||
success: boolean
|
||||
email: GoogleEmail
|
||||
}
|
||||
|
||||
export class AuthController {
|
||||
private readonly oAuth2Client: OAuth2Client
|
||||
private readonly googleClient: calendar_v3.Calendar
|
||||
email: GoogleEmail | undefined
|
||||
|
||||
private constructor (
|
||||
private readonly ctx: MeasureContext,
|
||||
private readonly accountClient: AccountClient,
|
||||
private readonly client: TxOperations,
|
||||
private readonly user: User
|
||||
) {
|
||||
const res = getGoogleClient()
|
||||
this.googleClient = res.google
|
||||
this.oAuth2Client = res.auth
|
||||
}
|
||||
|
||||
static async createAndSync (
|
||||
ctx: MeasureContext,
|
||||
accountClient: AccountClient,
|
||||
state: State,
|
||||
code: string
|
||||
): Promise<void> {
|
||||
await ctx.with('Create auth controller', { workspace: state.workspace, user: state.userId }, async () => {
|
||||
const mutex = await lock(`${state.workspace}:${state.userId}`)
|
||||
try {
|
||||
const client = await getClient(getServiceToken())
|
||||
const txOp = new TxOperations(client, core.account.System)
|
||||
const controller = new AuthController(ctx, accountClient, txOp, state)
|
||||
await controller.process(code)
|
||||
} finally {
|
||||
mutex()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
static async signout (
|
||||
ctx: MeasureContext<any>,
|
||||
accountClient: AccountClient,
|
||||
userId: PersonId,
|
||||
workspace: WorkspaceUuid,
|
||||
value: GoogleEmail
|
||||
): Promise<void> {
|
||||
await ctx.with('Signout auth controller', { workspace, userId }, async () => {
|
||||
const mutex = await lock(`${workspace}:${userId}`)
|
||||
try {
|
||||
const client = await getClient(getServiceToken())
|
||||
const txOp = new TxOperations(client, core.account.System)
|
||||
const controller = new AuthController(ctx, accountClient, txOp, {
|
||||
userId,
|
||||
workspace
|
||||
})
|
||||
await controller.signout(value)
|
||||
} catch (err) {
|
||||
ctx.error('signout', { workspace, userId, err })
|
||||
} finally {
|
||||
mutex()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
private async signout (value: GoogleEmail): Promise<void> {
|
||||
const integration = await this.client.findOne(setting.class.Integration, {
|
||||
type: calendar.integrationType.Calendar,
|
||||
value
|
||||
})
|
||||
if (integration !== undefined) {
|
||||
await this.client.remove(integration)
|
||||
}
|
||||
await removeUserByEmail(this.user, value)
|
||||
const data = {
|
||||
kind: CALENDAR_INTEGRATION,
|
||||
workspaceUuid: this.user.workspace,
|
||||
key: value,
|
||||
socialId: this.user.userId
|
||||
}
|
||||
const secret = await this.accountClient.getIntegrationSecret(data)
|
||||
if (secret == null) return
|
||||
const token = JSON.parse(secret.secret)
|
||||
const watchController = WatchController.get(this.accountClient)
|
||||
await watchController.unsubscribe(token)
|
||||
await this.accountClient.deleteIntegrationSecret(data)
|
||||
const left = await this.accountClient.listIntegrationsSecrets(data)
|
||||
if (left.length === 0) {
|
||||
await this.accountClient.deleteIntegration(data)
|
||||
}
|
||||
}
|
||||
|
||||
private async process (code: string): Promise<void> {
|
||||
const authRes = await this.authorize(code)
|
||||
await this.setWorkspaceIntegration(authRes)
|
||||
if (authRes.success) {
|
||||
void IncomingSyncManager.sync(
|
||||
this.ctx,
|
||||
this.accountClient,
|
||||
this.client,
|
||||
this.user,
|
||||
authRes.email,
|
||||
this.googleClient
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private async getEmail (): Promise<GoogleEmail> {
|
||||
if (this.email !== undefined) return this.email
|
||||
const info = await google.oauth2({ version: 'v2', auth: this.oAuth2Client }).userinfo.get()
|
||||
const email = info.data.email
|
||||
if (email == null) {
|
||||
throw new Error('Email not found')
|
||||
}
|
||||
this.email = email as GoogleEmail
|
||||
return this.email
|
||||
}
|
||||
|
||||
private async authorize (code: string): Promise<AuthResult> {
|
||||
const token = await this.oAuth2Client.getToken(code)
|
||||
this.oAuth2Client.setCredentials(token.tokens)
|
||||
const email = await this.getEmail()
|
||||
const providedScopes = token.tokens.scope?.split(' ') ?? []
|
||||
for (const scope of SCOPES) {
|
||||
if (providedScopes.findIndex((p) => p === scope) === -1) {
|
||||
this.ctx.error(`Not all scopes provided, provided: ${providedScopes.join(', ')} required: ${SCOPES.join(', ')}`)
|
||||
return { success: false, email }
|
||||
}
|
||||
}
|
||||
const res = await this.oAuth2Client.refreshAccessToken()
|
||||
await this.createAccIntegrationIfNotExists()
|
||||
await this.updateToken(res.credentials, email)
|
||||
|
||||
return { success: true, email }
|
||||
}
|
||||
|
||||
private async setWorkspaceIntegration (res: AuthResult): Promise<void> {
|
||||
await this.ctx.with(
|
||||
'Set workspace integration',
|
||||
{ user: this.user.userId, workspace: this.user.workspace, email: res.email },
|
||||
async () => {
|
||||
const integrations = await this.client.findAll(setting.class.Integration, {
|
||||
createdBy: this.user.userId,
|
||||
type: calendar.integrationType.Calendar
|
||||
})
|
||||
|
||||
const updated = integrations.find((p) => p.disabled && p.value === res.email)
|
||||
for (const integration of integrations.filter((p) => p.value === '')) {
|
||||
await this.client.remove(integration)
|
||||
}
|
||||
if (!res.success) {
|
||||
if (updated !== undefined) {
|
||||
await this.client.update(updated, {
|
||||
disabled: true,
|
||||
error: calendar.string.NotAllPermissions
|
||||
})
|
||||
} else {
|
||||
await this.client.createDoc(setting.class.Integration, core.space.Workspace, {
|
||||
type: calendar.integrationType.Calendar,
|
||||
disabled: true,
|
||||
error: calendar.string.NotAllPermissions,
|
||||
value: res.email
|
||||
})
|
||||
}
|
||||
throw new Error('Not all scopes provided')
|
||||
} else {
|
||||
if (updated !== undefined) {
|
||||
await this.client.update(updated, {
|
||||
disabled: false,
|
||||
error: null
|
||||
})
|
||||
} else {
|
||||
await this.client.createDoc(setting.class.Integration, core.space.Workspace, {
|
||||
type: calendar.integrationType.Calendar,
|
||||
disabled: false,
|
||||
value: res.email
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
private async createAccIntegrationIfNotExists (): Promise<void> {
|
||||
await this.ctx.with('Create account integration if not exists', { user: this.user.userId }, async () => {
|
||||
const integration = await this.accountClient.getIntegration({
|
||||
socialId: this.user.userId,
|
||||
kind: CALENDAR_INTEGRATION,
|
||||
workspaceUuid: this.user.workspace
|
||||
})
|
||||
if (integration != null) {
|
||||
return
|
||||
}
|
||||
await this.accountClient.createIntegration({
|
||||
socialId: this.user.userId,
|
||||
kind: CALENDAR_INTEGRATION,
|
||||
workspaceUuid: this.user.workspace
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
private async updateToken (token: Credentials, email: GoogleEmail): Promise<void> {
|
||||
const _token: Token = {
|
||||
...this.user,
|
||||
email,
|
||||
...token
|
||||
}
|
||||
const data: IntegrationSecret = {
|
||||
socialId: this.user.userId,
|
||||
kind: CALENDAR_INTEGRATION,
|
||||
workspaceUuid: this.user.workspace,
|
||||
key: email,
|
||||
secret: JSON.stringify(_token)
|
||||
}
|
||||
try {
|
||||
const currentIntegration = this.accountClient.getIntegrationSecret({
|
||||
socialId: this.user.userId,
|
||||
kind: CALENDAR_INTEGRATION,
|
||||
workspaceUuid: this.user.workspace,
|
||||
key: email
|
||||
})
|
||||
if (currentIntegration != null) {
|
||||
await this.accountClient.updateIntegrationSecret(data)
|
||||
} else {
|
||||
await this.accountClient.addIntegrationSecret(data)
|
||||
}
|
||||
await addUserByEmail(_token, email)
|
||||
} catch (err) {
|
||||
this.ctx.error('update token error', { workspace: this.user.workspace, user: this.user.userId, err })
|
||||
}
|
||||
}
|
||||
|
||||
static getAuthUrl (redirectURL: string, workspace: WorkspaceUuid, userId: PersonId, token: string): string {
|
||||
const res = getGoogleClient()
|
||||
const oAuth2Client = res.auth
|
||||
const state: State = {
|
||||
redirectURL,
|
||||
userId,
|
||||
workspace
|
||||
}
|
||||
const authUrl = oAuth2Client.generateAuthUrl({
|
||||
access_type: 'offline',
|
||||
prompt: 'consent',
|
||||
scope: SCOPES,
|
||||
state: encode64(JSON.stringify(state))
|
||||
})
|
||||
return authUrl
|
||||
}
|
||||
|
||||
static async getUserId (account: AccountUuid, token: string): Promise<PersonId> {
|
||||
const client = await getClient(token)
|
||||
const person = await client.findOne(contact.class.Person, { personUuid: account })
|
||||
if (person === undefined) {
|
||||
throw new Error('Person not found')
|
||||
}
|
||||
|
||||
const personId = await getPrimarySocialId(client, person._id)
|
||||
|
||||
if (personId === undefined) {
|
||||
throw new Error('PersonId not found')
|
||||
}
|
||||
|
||||
return personId
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@ -13,206 +13,90 @@
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import { AccountClient, Integration } from '@hcengineering/account-client'
|
||||
import { Event } from '@hcengineering/calendar'
|
||||
import {
|
||||
PersonId,
|
||||
PersonUuid,
|
||||
MeasureContext,
|
||||
RateLimiter,
|
||||
WorkspaceInfoWithStatus,
|
||||
WorkspaceUuid,
|
||||
isActiveMode,
|
||||
isDeletingMode,
|
||||
parseSocialIdString,
|
||||
systemAccountUuid
|
||||
isDeletingMode
|
||||
} from '@hcengineering/core'
|
||||
import { generateToken } from '@hcengineering/server-token'
|
||||
import { Collection, type Db } from 'mongodb'
|
||||
import { type CalendarClient } from './calendar'
|
||||
import config from './config'
|
||||
import { type Token, type User } from './types'
|
||||
import { getIntegrations } from './integrations'
|
||||
import { WorkspaceClient } from './workspaceClient'
|
||||
import { getAccountClient } from '@hcengineering/server-client'
|
||||
|
||||
export class CalendarController {
|
||||
private readonly workspaces: Map<WorkspaceUuid, WorkspaceClient | Promise<WorkspaceClient>> = new Map<
|
||||
WorkspaceUuid,
|
||||
WorkspaceClient | Promise<WorkspaceClient>
|
||||
>()
|
||||
|
||||
private readonly tokens: Collection<Token>
|
||||
|
||||
protected static _instance: CalendarController
|
||||
|
||||
private constructor (private readonly mongo: Db) {
|
||||
this.tokens = mongo.collection<Token>('tokens')
|
||||
private constructor (
|
||||
private readonly ctx: MeasureContext,
|
||||
readonly accountClient: AccountClient
|
||||
) {
|
||||
CalendarController._instance = this
|
||||
setInterval(() => {
|
||||
if (this.workspaces.size > 0) {
|
||||
console.log('active workspaces', this.workspaces.size)
|
||||
}
|
||||
}, 60000)
|
||||
}
|
||||
|
||||
static getCalendarController (mongo?: Db): CalendarController {
|
||||
static getCalendarController (ctx: MeasureContext, accountClient: AccountClient): CalendarController {
|
||||
if (CalendarController._instance !== undefined) {
|
||||
return CalendarController._instance
|
||||
}
|
||||
if (mongo === undefined) throw new Error('CalendarController not exist')
|
||||
return new CalendarController(mongo)
|
||||
return new CalendarController(ctx, accountClient)
|
||||
}
|
||||
|
||||
async startAll (): Promise<void> {
|
||||
const tokens = await this.tokens.find().toArray()
|
||||
const groups = new Map<WorkspaceUuid, Token[]>()
|
||||
console.log('start calendar service', tokens.length)
|
||||
for (const token of tokens) {
|
||||
const group = groups.get(token.workspace)
|
||||
if (group === undefined) {
|
||||
groups.set(token.workspace, [token])
|
||||
} else {
|
||||
group.push(token)
|
||||
groups.set(token.workspace, group)
|
||||
}
|
||||
}
|
||||
try {
|
||||
const integrations = await getIntegrations(this.accountClient)
|
||||
this.ctx.info('Start integrations', { count: integrations.length })
|
||||
|
||||
const limiter = new RateLimiter(config.InitLimit)
|
||||
const token = generateToken(systemAccountUuid)
|
||||
const ids = [...groups.keys()]
|
||||
console.log('start workspaces', ids)
|
||||
const infos = await getAccountClient(token).getWorkspacesInfo(ids)
|
||||
console.log('infos', infos)
|
||||
for (const info of infos) {
|
||||
const tokens = groups.get(info.uuid)
|
||||
if (tokens === undefined) {
|
||||
console.log('no tokens for workspace', info.uuid)
|
||||
continue
|
||||
}
|
||||
if (isDeletingMode(info.mode)) {
|
||||
if (tokens !== undefined) {
|
||||
for (const token of tokens) {
|
||||
await this.tokens.deleteOne({ userId: token.userId, workspace: token.workspace })
|
||||
}
|
||||
const groups = new Map<WorkspaceUuid, Integration[]>()
|
||||
for (const int of integrations) {
|
||||
if (int.workspaceUuid === null) continue
|
||||
const group = groups.get(int.workspaceUuid)
|
||||
if (group === undefined) {
|
||||
groups.set(int.workspaceUuid, [int])
|
||||
} else {
|
||||
group.push(int)
|
||||
groups.set(int.workspaceUuid, group)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if (!isActiveMode(info.mode)) {
|
||||
continue
|
||||
|
||||
const ids = [...groups.keys()]
|
||||
if (ids.length === 0) return
|
||||
const limiter = new RateLimiter(config.InitLimit)
|
||||
const infos = await this.accountClient.getWorkspacesInfo(ids)
|
||||
for (const info of infos) {
|
||||
const integrations = groups.get(info.uuid) ?? []
|
||||
if (await this.checkWorkspace(info, integrations)) {
|
||||
await limiter.add(async () => {
|
||||
this.ctx.info('start workspace', { workspace: info.uuid })
|
||||
await WorkspaceClient.run(this.ctx, this.accountClient, info.uuid)
|
||||
})
|
||||
}
|
||||
}
|
||||
await limiter.add(async () => {
|
||||
console.log('start workspace', info.uuid)
|
||||
const workspace = await this.startWorkspace(info.uuid, tokens)
|
||||
await workspace.sync()
|
||||
})
|
||||
await limiter.waitProcessing()
|
||||
} catch (err: any) {
|
||||
this.ctx.error('Failed to start existing integrations', err)
|
||||
}
|
||||
}
|
||||
|
||||
async startWorkspace (workspace: WorkspaceUuid, tokens: Token[]): Promise<WorkspaceClient> {
|
||||
const workspaceClient = await this.getWorkspaceClient(workspace)
|
||||
for (const token of tokens) {
|
||||
try {
|
||||
const timeout = setTimeout(() => {
|
||||
console.warn('init client hang', token.workspace, token.userId)
|
||||
}, 60000)
|
||||
console.log('init client', token.workspace, token.userId)
|
||||
await workspaceClient.createCalendarClient(token)
|
||||
clearTimeout(timeout)
|
||||
} catch (err) {
|
||||
console.error(`Couldn't create client for ${workspace} ${token.userId}`)
|
||||
private async checkWorkspace (info: WorkspaceInfoWithStatus, integrations: Integration[]): Promise<boolean> {
|
||||
if (isDeletingMode(info.mode)) {
|
||||
if (integrations !== undefined) {
|
||||
for (const int of integrations) {
|
||||
await this.accountClient.deleteIntegration(int)
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
return workspaceClient
|
||||
}
|
||||
|
||||
async push (personId: PersonId, mode: 'events' | 'calendar', calendarId?: string): Promise<void> {
|
||||
const email = parseSocialIdString(personId).value
|
||||
const tokens = await this.tokens.find({ email, access_token: { $exists: true } }).toArray()
|
||||
const token = generateToken(systemAccountUuid)
|
||||
const workspaces = [...new Set(tokens.map((p) => p.workspace))]
|
||||
const infos = await getAccountClient(token).getWorkspacesInfo(workspaces)
|
||||
for (const token of tokens) {
|
||||
const info = infos.find((p) => p.uuid === token.workspace)
|
||||
if (info === undefined) {
|
||||
continue
|
||||
}
|
||||
if (isDeletingMode(info.mode)) {
|
||||
await this.tokens.deleteOne({ userId: token.userId, workspace: token.workspace })
|
||||
continue
|
||||
}
|
||||
if (!isActiveMode(info.mode)) {
|
||||
continue
|
||||
}
|
||||
const workspace = await this.getWorkspaceClient(token.workspace)
|
||||
const calendarClient = await workspace.createCalendarClient(token)
|
||||
if (mode === 'calendar') {
|
||||
await calendarClient.syncCalendars()
|
||||
}
|
||||
if (mode === 'events' && calendarId !== undefined) {
|
||||
await calendarClient.sync(calendarId)
|
||||
}
|
||||
if (!isActiveMode(info.mode)) {
|
||||
this.ctx.info('workspace is not active', { workspaceUuid: info.uuid })
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
async pushEvent (workspace: WorkspaceUuid, event: Event, type: 'create' | 'update' | 'delete'): Promise<void> {
|
||||
const workspaceController = await this.getWorkspaceClient(workspace)
|
||||
await workspaceController.pushEvent(event, type)
|
||||
}
|
||||
|
||||
async getUserId (account: PersonUuid, workspace: WorkspaceUuid): Promise<PersonId> {
|
||||
const workspaceClient = await this.getWorkspaceClient(workspace)
|
||||
|
||||
return await workspaceClient.getUserId(account)
|
||||
}
|
||||
|
||||
async signout (workspace: WorkspaceUuid, value: PersonId): Promise<void> {
|
||||
const workspaceClient = await this.getWorkspaceClient(workspace)
|
||||
const clients = await workspaceClient.signout(value)
|
||||
if (clients === 0) {
|
||||
this.removeWorkspace(workspace)
|
||||
}
|
||||
}
|
||||
|
||||
removeWorkspace (workspace: WorkspaceUuid): void {
|
||||
this.workspaces.delete(workspace)
|
||||
}
|
||||
|
||||
async close (): Promise<void> {
|
||||
for (let workspace of this.workspaces.values()) {
|
||||
if (workspace instanceof Promise) {
|
||||
workspace = await workspace
|
||||
}
|
||||
await workspace.close()
|
||||
}
|
||||
this.workspaces.clear()
|
||||
}
|
||||
|
||||
async createClient (user: Token): Promise<CalendarClient> {
|
||||
const workspace = await this.getWorkspaceClient(user.workspace)
|
||||
const newClient = await workspace.createCalendarClient(user)
|
||||
return newClient
|
||||
}
|
||||
|
||||
async newClient (user: User, code: string): Promise<CalendarClient> {
|
||||
const workspace = await this.getWorkspaceClient(user.workspace)
|
||||
const newClient = await workspace.newCalendarClient(user, code)
|
||||
return newClient
|
||||
}
|
||||
|
||||
private async getWorkspaceClient (workspace: WorkspaceUuid): Promise<WorkspaceClient> {
|
||||
const res = this.workspaces.get(workspace)
|
||||
if (res !== undefined) {
|
||||
if (res instanceof Promise) {
|
||||
return await res
|
||||
}
|
||||
return res
|
||||
}
|
||||
try {
|
||||
const client = WorkspaceClient.create(this.mongo, workspace, this)
|
||||
this.workspaces.set(workspace, client)
|
||||
const res = await client
|
||||
this.workspaces.set(workspace, res)
|
||||
return res
|
||||
} catch (err) {
|
||||
console.error(`Couldn't create workspace worker for ${workspace}, reason: ${JSON.stringify(err)}`)
|
||||
throw err
|
||||
}
|
||||
await WorkspaceClient.push(this.ctx, this.accountClient, workspace, event, type)
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ interface Config {
|
||||
AccountsURL: string
|
||||
ServiceID: string
|
||||
Secret: string
|
||||
KvsUrl: string
|
||||
Credentials: string
|
||||
WATCH_URL: string
|
||||
InitLimit: number
|
||||
@ -37,7 +38,8 @@ const envMap: { [key in keyof Config]: string } = {
|
||||
Secret: 'SECRET',
|
||||
Credentials: 'Credentials',
|
||||
WATCH_URL: 'WATCH_URL',
|
||||
InitLimit: 'INIT_LIMIT'
|
||||
InitLimit: 'INIT_LIMIT',
|
||||
KvsUrl: 'KVS_URL'
|
||||
}
|
||||
|
||||
const parseNumber = (str: string | undefined): number | undefined => (str !== undefined ? Number(str) : undefined)
|
||||
@ -52,7 +54,8 @@ const config: Config = (() => {
|
||||
Secret: process.env[envMap.Secret],
|
||||
Credentials: process.env[envMap.Credentials],
|
||||
InitLimit: parseNumber(process.env[envMap.InitLimit]) ?? 50,
|
||||
WATCH_URL: process.env[envMap.WATCH_URL]
|
||||
WATCH_URL: process.env[envMap.WATCH_URL],
|
||||
KvsUrl: process.env[envMap.KvsUrl]
|
||||
}
|
||||
|
||||
const missingEnv = (Object.keys(params) as Array<keyof Config>)
|
||||
|
@ -1,309 +0,0 @@
|
||||
//
|
||||
// Copyright © 2025 Hardcore Engineering Inc.
|
||||
//
|
||||
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License. You may
|
||||
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
//
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
import { generateId, PersonId } from '@hcengineering/core'
|
||||
import type { Credentials, OAuth2Client } from 'google-auth-library'
|
||||
import { calendar_v3, google } from 'googleapis'
|
||||
import { Collection, Db } from 'mongodb'
|
||||
import { encode64 } from './base64'
|
||||
import { CalendarClient } from './calendar'
|
||||
import config from './config'
|
||||
import { RateLimiter } from './rateLimiter'
|
||||
import { ProjectCredentials, State, Token, User, Watch, WatchBase } from './types'
|
||||
|
||||
export const DUMMY_RESOURCE = 'Dummy'
|
||||
|
||||
const SCOPES = [
|
||||
'https://www.googleapis.com/auth/calendar.calendars.readonly',
|
||||
'https://www.googleapis.com/auth/calendar.calendarlist.readonly',
|
||||
'https://www.googleapis.com/auth/calendar.events',
|
||||
'https://www.googleapis.com/auth/userinfo.email'
|
||||
]
|
||||
|
||||
export class GoogleClient {
|
||||
private me: string | undefined = undefined
|
||||
private readonly credentials: ProjectCredentials
|
||||
private readonly oAuth2Client: OAuth2Client
|
||||
readonly calendar: calendar_v3.Calendar
|
||||
private readonly tokens: Collection<Token>
|
||||
private readonly watches: Collection<Watch>
|
||||
|
||||
private refreshTimer: NodeJS.Timeout | undefined = undefined
|
||||
|
||||
readonly rateLimiter = new RateLimiter(1000, 500)
|
||||
|
||||
constructor (
|
||||
private readonly user: User,
|
||||
mongo: Db,
|
||||
private readonly calendarClient: CalendarClient
|
||||
) {
|
||||
this.tokens = mongo.collection<Token>('tokens')
|
||||
this.credentials = JSON.parse(config.Credentials)
|
||||
const { client_secret, client_id, redirect_uris } = this.credentials.web // eslint-disable-line
|
||||
this.oAuth2Client = new google.auth.OAuth2(client_id, client_secret, redirect_uris[0]) // eslint-disable-line
|
||||
this.calendar = google.calendar({ version: 'v3', auth: this.oAuth2Client })
|
||||
this.watches = mongo.collection<WatchBase>('watch')
|
||||
}
|
||||
|
||||
static getAuthUrl (redirectURL: string, workspace: string, userId: PersonId, token: string): string {
|
||||
const credentials = JSON.parse(config.Credentials)
|
||||
const { client_secret, client_id, redirect_uris } = credentials.web // eslint-disable-line
|
||||
const oAuth2Client = new google.auth.OAuth2(client_id, client_secret, redirect_uris[0]) // eslint-disable-line
|
||||
const state: State = {
|
||||
token,
|
||||
redirectURL,
|
||||
workspace: workspace as any, // TODO: FIXME
|
||||
userId
|
||||
}
|
||||
const authUrl = oAuth2Client.generateAuthUrl({
|
||||
access_type: 'offline',
|
||||
scope: SCOPES,
|
||||
state: encode64(JSON.stringify(state))
|
||||
})
|
||||
return authUrl
|
||||
}
|
||||
|
||||
async signout (): Promise<void> {
|
||||
// get watch controller and unsubscibe
|
||||
await this.oAuth2Client.revokeCredentials()
|
||||
await this.tokens.deleteOne({
|
||||
userId: this.user.userId,
|
||||
workspace: this.user.workspace
|
||||
})
|
||||
}
|
||||
|
||||
async init (token: Token): Promise<void> {
|
||||
await this.setToken(token)
|
||||
await this.refreshToken()
|
||||
}
|
||||
|
||||
async authorize (code: string): Promise<string | undefined> {
|
||||
const token = await this.oAuth2Client.getToken(code)
|
||||
await this.setToken(token.tokens)
|
||||
const me = await this.getMe()
|
||||
const providedScopes = token.tokens.scope?.split(' ') ?? []
|
||||
for (const scope of SCOPES) {
|
||||
if (providedScopes.findIndex((p) => p === scope) === -1) {
|
||||
console.error(`Not all scopes provided, provided: ${providedScopes.join(', ')} required: ${SCOPES.join(', ')}`)
|
||||
return undefined
|
||||
}
|
||||
}
|
||||
await this.refreshToken()
|
||||
|
||||
return me
|
||||
}
|
||||
|
||||
close (): void {
|
||||
if (this.refreshTimer !== undefined) clearTimeout(this.refreshTimer)
|
||||
}
|
||||
|
||||
async getMe (): Promise<string> {
|
||||
if (this.me !== undefined) {
|
||||
return this.me
|
||||
}
|
||||
|
||||
const info = await google.oauth2({ version: 'v2', auth: this.oAuth2Client }).userinfo.get()
|
||||
this.me = info.data.email ?? ''
|
||||
return this.me
|
||||
}
|
||||
|
||||
private async setToken (token: Credentials): Promise<void> {
|
||||
try {
|
||||
this.oAuth2Client.setCredentials(token)
|
||||
} catch (err: any) {
|
||||
console.error('Set token error', this.user.workspace, this.user.userId, err)
|
||||
await this.checkError(err)
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
async checkError (err: any): Promise<boolean> {
|
||||
if (err?.response?.data?.error === 'invalid_grant') {
|
||||
await this.calendarClient.cleanIntegration()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
private async updateToken (token: Credentials): Promise<void> {
|
||||
try {
|
||||
const currentToken = await this.getCurrentToken()
|
||||
if (currentToken != null) {
|
||||
await this.updateCurrentToken(token)
|
||||
} else {
|
||||
await this.tokens.insertOne({
|
||||
userId: this.user.userId,
|
||||
workspace: this.user.workspace,
|
||||
token: this.user.token,
|
||||
...token
|
||||
})
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('update token error', this.user.workspace, this.user.userId, err)
|
||||
}
|
||||
}
|
||||
|
||||
private async refreshToken (): Promise<void> {
|
||||
try {
|
||||
const res = await this.oAuth2Client.refreshAccessToken()
|
||||
await this.updateToken(res.credentials)
|
||||
this.refreshTimer = setTimeout(
|
||||
() => {
|
||||
void this.refreshToken()
|
||||
},
|
||||
30 * 60 * 1000
|
||||
)
|
||||
} catch (err: any) {
|
||||
console.error("Couldn't refresh token, error:", err)
|
||||
if (err?.response?.data?.error === 'invalid_grant' || err.message === 'No refresh token is set.') {
|
||||
await this.calendarClient.cleanIntegration()
|
||||
} else {
|
||||
this.refreshTimer = setTimeout(
|
||||
() => {
|
||||
void this.refreshToken()
|
||||
},
|
||||
15 * 60 * 1000
|
||||
)
|
||||
}
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
private async getCurrentToken (): Promise<Token | null> {
|
||||
return await this.tokens.findOne({
|
||||
userId: this.user.userId,
|
||||
workspace: this.user.workspace
|
||||
})
|
||||
}
|
||||
|
||||
private async updateCurrentToken (token: Credentials): Promise<void> {
|
||||
await this.tokens.updateOne(
|
||||
{
|
||||
userId: this.user.userId,
|
||||
workspace: this.user.workspace
|
||||
},
|
||||
{
|
||||
$set: {
|
||||
...token
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
async watchCalendar (): Promise<void> {
|
||||
try {
|
||||
const current = await this.watches.findOne({
|
||||
userId: this.user.userId,
|
||||
workspace: this.user.workspace,
|
||||
calendarId: null
|
||||
})
|
||||
if (current != null) {
|
||||
await this.rateLimiter.take(1)
|
||||
await this.calendar.channels.stop({ requestBody: { id: current.channelId, resourceId: current.resourceId } })
|
||||
}
|
||||
const channelId = generateId()
|
||||
const me = await this.getMe()
|
||||
const body = { id: channelId, address: config.WATCH_URL, type: 'webhook', token: `user=${me}&mode=calendar` }
|
||||
await this.rateLimiter.take(1)
|
||||
const res = await this.calendar.calendarList.watch({ requestBody: body })
|
||||
if (res.data.expiration != null && res.data.resourceId !== null) {
|
||||
if (current != null) {
|
||||
await this.watches.updateOne(
|
||||
{
|
||||
userId: this.user.userId,
|
||||
workspace: this.user.workspace,
|
||||
calendarId: null
|
||||
},
|
||||
{
|
||||
channelId,
|
||||
expired: Number.parseInt(res.data.expiration),
|
||||
resourceId: res.data.resourceId ?? ''
|
||||
}
|
||||
)
|
||||
} else {
|
||||
await this.watches.insertOne({
|
||||
calendarId: null,
|
||||
channelId,
|
||||
expired: Number.parseInt(res.data.expiration),
|
||||
resourceId: res.data.resourceId ?? '',
|
||||
userId: this.user.userId,
|
||||
workspace: this.user.workspace
|
||||
})
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Calendar watch error', err)
|
||||
}
|
||||
}
|
||||
|
||||
async watch (calendarId: string): Promise<boolean> {
|
||||
try {
|
||||
const current = await this.watches.findOne({
|
||||
userId: this.user.userId,
|
||||
workspace: this.user.workspace,
|
||||
calendarId
|
||||
})
|
||||
if (current != null) {
|
||||
await this.rateLimiter.take(1)
|
||||
await this.calendar.channels.stop({
|
||||
requestBody: { id: current.channelId, resourceId: current.resourceId }
|
||||
})
|
||||
}
|
||||
const channelId = generateId()
|
||||
const me = await this.getMe()
|
||||
const body = {
|
||||
id: channelId,
|
||||
address: config.WATCH_URL,
|
||||
type: 'webhook',
|
||||
token: `user=${me}&mode=events&calendarId=${calendarId}`
|
||||
}
|
||||
await this.rateLimiter.take(1)
|
||||
const res = await this.calendar.events.watch({ calendarId, requestBody: body })
|
||||
if (res.data.expiration != null && res.data.resourceId != null) {
|
||||
if (current != null) {
|
||||
await this.watches.updateOne(
|
||||
{
|
||||
userId: this.user.userId,
|
||||
workspace: this.user.workspace,
|
||||
calendarId
|
||||
},
|
||||
{
|
||||
channelId,
|
||||
expired: Number.parseInt(res.data.expiration),
|
||||
resourceId: res.data.resourceId ?? ''
|
||||
}
|
||||
)
|
||||
} else {
|
||||
await this.watches.insertOne({
|
||||
calendarId,
|
||||
channelId,
|
||||
expired: Number.parseInt(res.data.expiration),
|
||||
resourceId: res.data.resourceId ?? '',
|
||||
userId: this.user.userId,
|
||||
workspace: this.user.workspace
|
||||
})
|
||||
}
|
||||
}
|
||||
return true
|
||||
} catch (err: any) {
|
||||
if (err?.errors?.[0]?.reason === 'pushNotSupportedForRequestedResource') {
|
||||
return false
|
||||
} else {
|
||||
console.error('Watch error', err)
|
||||
await this.checkError(err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
//
|
||||
// Copyright © 2023 Hardcore Engineering Inc.
|
||||
// Copyright © 2025 Hardcore Engineering Inc.
|
||||
//
|
||||
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License. You may
|
||||
@ -13,23 +13,9 @@
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import { MongoClientReference, getMongoClient } from '@hcengineering/mongo'
|
||||
import { MongoClient } from 'mongodb'
|
||||
import { AccountClient, Integration } from '@hcengineering/account-client'
|
||||
import { CALENDAR_INTEGRATION } from './types'
|
||||
|
||||
import config from './config'
|
||||
|
||||
const clientRef: MongoClientReference = getMongoClient(config.MongoURI)
|
||||
let client: MongoClient | undefined
|
||||
export const getDB = (() => {
|
||||
return async () => {
|
||||
if (client === undefined) {
|
||||
client = await clientRef.getClient()
|
||||
}
|
||||
|
||||
return client.db(config.MongoDB)
|
||||
}
|
||||
})()
|
||||
|
||||
export const closeDB: () => Promise<void> = async () => {
|
||||
clientRef.close()
|
||||
export async function getIntegrations (client: AccountClient): Promise<Integration[]> {
|
||||
return (await client.listIntegrations({ kind: CALENDAR_INTEGRATION })) ?? []
|
||||
}
|
79
services/calendar/pod-calendar/src/kvsUtils.ts
Normal file
79
services/calendar/pod-calendar/src/kvsUtils.ts
Normal file
@ -0,0 +1,79 @@
|
||||
import { WorkspaceUuid } from '@hcengineering/core'
|
||||
import { KeyValueClient, getClient as getKeyValueClient } from '@hcengineering/kvs-client'
|
||||
import config from './config'
|
||||
import { CALENDAR_INTEGRATION, GoogleEmail, Token, User } from './types'
|
||||
import { getServiceToken } from './utils'
|
||||
|
||||
let keyValueClient: KeyValueClient | undefined
|
||||
|
||||
export function getKvsClient (): KeyValueClient {
|
||||
if (keyValueClient !== undefined) return keyValueClient
|
||||
keyValueClient = getKeyValueClient(CALENDAR_INTEGRATION, config.KvsUrl, getServiceToken())
|
||||
return keyValueClient
|
||||
}
|
||||
|
||||
export async function getSyncHistory (workspace: WorkspaceUuid): Promise<number | undefined | null> {
|
||||
const client = getKvsClient()
|
||||
const key = `${CALENDAR_INTEGRATION}:calendarSync:${workspace}`
|
||||
return await client.getValue(key)
|
||||
}
|
||||
|
||||
export async function setSyncHistory (workspace: WorkspaceUuid): Promise<void> {
|
||||
const client = getKvsClient()
|
||||
const key = `${CALENDAR_INTEGRATION}:calendarSync:${workspace}`
|
||||
await client.setValue(key, Date.now())
|
||||
}
|
||||
|
||||
function calendarsHistoryKey (user: User): string {
|
||||
return `${CALENDAR_INTEGRATION}:calendarsHistory:${user.workspace}:${user.userId}`
|
||||
}
|
||||
|
||||
export async function getCalendarsSyncHistory (user: User): Promise<string | undefined> {
|
||||
const client = getKvsClient()
|
||||
return (await client.getValue(calendarsHistoryKey(user))) ?? undefined
|
||||
}
|
||||
|
||||
export async function setCalendarsSyncHistory (user: User, historyId: string): Promise<void> {
|
||||
const client = getKvsClient()
|
||||
await client.setValue(calendarsHistoryKey(user), historyId)
|
||||
}
|
||||
|
||||
function eventHistoryKey (user: User, calendarId: string): string {
|
||||
return `${CALENDAR_INTEGRATION}:eventHistory:${user.workspace}:${user.userId}:${calendarId}`
|
||||
}
|
||||
|
||||
export async function getEventHistory (user: User, calendarId: string): Promise<string | undefined> {
|
||||
const client = getKvsClient()
|
||||
return (await client.getValue(eventHistoryKey(user, calendarId))) ?? undefined
|
||||
}
|
||||
|
||||
export async function setEventHistory (user: User, calendarId: string, historyId: string): Promise<void> {
|
||||
const client = getKvsClient()
|
||||
await client.setValue(eventHistoryKey(user, calendarId), historyId)
|
||||
}
|
||||
|
||||
export async function getUserByEmail (email: GoogleEmail): Promise<Token[]> {
|
||||
const client = getKvsClient()
|
||||
const key = `${CALENDAR_INTEGRATION}:users:${email}`
|
||||
return (await client.getValue<Token[]>(key)) ?? []
|
||||
}
|
||||
|
||||
export async function addUserByEmail (user: Token, email: GoogleEmail): Promise<void> {
|
||||
const client = getKvsClient()
|
||||
const key = `${CALENDAR_INTEGRATION}:users:${email}`
|
||||
const curr = (await client.getValue<Token[]>(key)) ?? []
|
||||
curr.push(user)
|
||||
await client.setValue<Token[]>(key, curr)
|
||||
}
|
||||
|
||||
export async function removeUserByEmail (user: User, email: GoogleEmail): Promise<void> {
|
||||
const client = getKvsClient()
|
||||
const key = `${CALENDAR_INTEGRATION}:users:${email}`
|
||||
const curr = (await client.getValue<User[]>(key)) ?? []
|
||||
const newCurr = curr.filter((p) => p.userId !== user.userId || p.workspace !== user.workspace)
|
||||
if (newCurr.length === 0) {
|
||||
await client.deleteKey(key)
|
||||
} else {
|
||||
await client.setValue<User[]>(key, newCurr)
|
||||
}
|
||||
}
|
@ -13,18 +13,23 @@
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import { SplitLogger } from '@hcengineering/analytics-service'
|
||||
import { MeasureMetricsContext, newMetrics } from '@hcengineering/core'
|
||||
import { setMetadata } from '@hcengineering/platform'
|
||||
import serverClient, { getAccountClient } from '@hcengineering/server-client'
|
||||
import { initStatisticsContext } from '@hcengineering/server-core'
|
||||
import serverToken, { decodeToken } from '@hcengineering/server-token'
|
||||
import { type IncomingHttpHeaders } from 'http'
|
||||
import { join } from 'path'
|
||||
import { AuthController } from './auth'
|
||||
import { decode64 } from './base64'
|
||||
import { CalendarController } from './calendarController'
|
||||
import config from './config'
|
||||
import { createServer, listen } from './server'
|
||||
import { closeDB, getDB } from './storage'
|
||||
import { type Endpoint, type State } from './types'
|
||||
import { setMetadata } from '@hcengineering/platform'
|
||||
import serverClient from '@hcengineering/server-client'
|
||||
import serverToken, { decodeToken } from '@hcengineering/server-token'
|
||||
import { GoogleClient } from './googleClient'
|
||||
import { CALENDAR_INTEGRATION, GoogleEmail, type Endpoint, type State } from './types'
|
||||
import { getServiceToken } from './utils'
|
||||
import { WatchController } from './watch'
|
||||
import { PushHandler } from './pushHandler'
|
||||
|
||||
const extractToken = (header: IncomingHttpHeaders): any => {
|
||||
try {
|
||||
@ -35,14 +40,30 @@ const extractToken = (header: IncomingHttpHeaders): any => {
|
||||
}
|
||||
|
||||
export const main = async (): Promise<void> => {
|
||||
const ctx = initStatisticsContext(CALENDAR_INTEGRATION, {
|
||||
factory: () =>
|
||||
new MeasureMetricsContext(
|
||||
'calendar',
|
||||
{},
|
||||
{},
|
||||
newMetrics(),
|
||||
new SplitLogger(CALENDAR_INTEGRATION, {
|
||||
root: join(process.cwd(), 'logs'),
|
||||
enableConsole: (process.env.ENABLE_CONSOLE ?? 'true') === 'true'
|
||||
})
|
||||
)
|
||||
})
|
||||
|
||||
const accountClient = getAccountClient(getServiceToken())
|
||||
|
||||
setMetadata(serverClient.metadata.Endpoint, config.AccountsURL)
|
||||
setMetadata(serverClient.metadata.UserAgent, config.ServiceID)
|
||||
setMetadata(serverToken.metadata.Secret, config.Secret)
|
||||
|
||||
const db = await getDB()
|
||||
const calendarController = CalendarController.getCalendarController(db)
|
||||
const pushHandler = new PushHandler(ctx, accountClient)
|
||||
const calendarController = CalendarController.getCalendarController(ctx, accountClient)
|
||||
await calendarController.startAll()
|
||||
const watchController = WatchController.get(db)
|
||||
const watchController = WatchController.get(accountClient)
|
||||
watchController.startCheck()
|
||||
const endpoints: Endpoint[] = [
|
||||
{
|
||||
@ -59,11 +80,11 @@ export const main = async (): Promise<void> => {
|
||||
const redirectURL = req.query.redirectURL as string
|
||||
|
||||
const { account, workspace } = decodeToken(token)
|
||||
const userId = await calendarController.getUserId(account, workspace)
|
||||
const url = GoogleClient.getAuthUrl(redirectURL, workspace, userId, token)
|
||||
const userId = await AuthController.getUserId(account, token)
|
||||
const url = AuthController.getAuthUrl(redirectURL, workspace, userId, token)
|
||||
res.send(url)
|
||||
} catch (err) {
|
||||
console.error('signin error', err)
|
||||
ctx.error('signin error', { message: (err as any).message })
|
||||
res.status(500).send()
|
||||
}
|
||||
}
|
||||
@ -73,13 +94,16 @@ export const main = async (): Promise<void> => {
|
||||
type: 'get',
|
||||
handler: async (req, res) => {
|
||||
const code = req.query.code as string
|
||||
const state = JSON.parse(decode64(req.query.state as string)) as unknown as State
|
||||
try {
|
||||
await calendarController.newClient(state, code)
|
||||
res.redirect(state.redirectURL)
|
||||
const state = JSON.parse(decode64(req.query.state as string)) as unknown as State
|
||||
try {
|
||||
await AuthController.createAndSync(ctx, accountClient, state, code)
|
||||
res.redirect(state.redirectURL)
|
||||
} catch (err) {
|
||||
ctx.error('signin code error', { message: (err as any).message })
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(err)
|
||||
res.redirect(state.redirectURL)
|
||||
ctx.error('signin code state parse error', { message: (err as any).message })
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -95,14 +119,13 @@ export const main = async (): Promise<void> => {
|
||||
return
|
||||
}
|
||||
|
||||
const value = req.query.value as string
|
||||
|
||||
const { workspace } = decodeToken(token)
|
||||
await calendarController.signout(workspace, value as any) // TODO: FIXME
|
||||
const value = req.query.value as GoogleEmail
|
||||
const { account, workspace } = decodeToken(token)
|
||||
const userId = await AuthController.getUserId(account, token)
|
||||
await AuthController.signout(ctx, accountClient, userId, workspace, value)
|
||||
} catch (err) {
|
||||
console.error('signout error', err)
|
||||
ctx.error('signout', { message: (err as any).message })
|
||||
}
|
||||
|
||||
res.send()
|
||||
}
|
||||
},
|
||||
@ -125,7 +148,7 @@ export const main = async (): Promise<void> => {
|
||||
res.status(400).send({ err: "'data' is missing" })
|
||||
return
|
||||
}
|
||||
void calendarController.push(data.user as any, data.mode as 'events' | 'calendar', data.calendarId) // TODO: FIXME
|
||||
await pushHandler.push(data.user as GoogleEmail, data.mode as 'events' | 'calendar', data.calendarId)
|
||||
}
|
||||
|
||||
res.send()
|
||||
@ -152,12 +175,7 @@ export const main = async (): Promise<void> => {
|
||||
const shutdown = (): void => {
|
||||
server.close(() => {
|
||||
watchController.stop()
|
||||
void calendarController
|
||||
.close()
|
||||
.then(async () => {
|
||||
await closeDB()
|
||||
})
|
||||
.then(() => process.exit())
|
||||
process.exit()
|
||||
})
|
||||
}
|
||||
|
||||
|
56
services/calendar/pod-calendar/src/pushHandler.ts
Normal file
56
services/calendar/pod-calendar/src/pushHandler.ts
Normal file
@ -0,0 +1,56 @@
|
||||
//
|
||||
// Copyright © 2025 Hardcore Engineering Inc.
|
||||
//
|
||||
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License. You may
|
||||
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
//
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import { AccountClient } from '@hcengineering/account-client'
|
||||
import core, { isActiveMode, MeasureContext, TxOperations } from '@hcengineering/core'
|
||||
import { getClient } from './client'
|
||||
import { getUserByEmail } from './kvsUtils'
|
||||
import { IncomingSyncManager } from './sync'
|
||||
import { GoogleEmail, Token } from './types'
|
||||
import { getGoogleClient, getServiceToken } from './utils'
|
||||
|
||||
export class PushHandler {
|
||||
constructor (
|
||||
private readonly ctx: MeasureContext,
|
||||
private readonly accountClient: AccountClient
|
||||
) {}
|
||||
|
||||
async sync (token: Token, calendarId: string | null): Promise<void> {
|
||||
await this.ctx.with('Push handler', { workspace: token.workspace, user: token.userId }, async () => {
|
||||
const client = await getClient(getServiceToken())
|
||||
const txOp = new TxOperations(client, core.account.System)
|
||||
const res = getGoogleClient()
|
||||
res.auth.setCredentials(token)
|
||||
await IncomingSyncManager.push(this.ctx, this.accountClient, txOp, token, res.google, calendarId)
|
||||
await txOp.close()
|
||||
})
|
||||
}
|
||||
|
||||
async push (email: GoogleEmail, mode: 'events' | 'calendar', calendarId?: string): Promise<void> {
|
||||
const tokens = await getUserByEmail(email)
|
||||
const workspaces = [...new Set(tokens.map((p) => p.workspace))]
|
||||
const infos = await this.accountClient.getWorkspacesInfo(workspaces)
|
||||
for (const token of tokens) {
|
||||
const info = infos.find((p) => p.uuid === token.workspace)
|
||||
if (info === undefined) {
|
||||
continue
|
||||
}
|
||||
if (!isActiveMode(info.mode)) {
|
||||
continue
|
||||
}
|
||||
await this.sync(token, mode === 'events' ? calendarId ?? null : null)
|
||||
}
|
||||
}
|
||||
}
|
@ -1,3 +1,5 @@
|
||||
import { GoogleEmail } from './types'
|
||||
|
||||
export class RateLimiter {
|
||||
private tokens: number
|
||||
private lastRefillTime: number
|
||||
@ -32,3 +34,14 @@ export class RateLimiter {
|
||||
this.tokens -= count
|
||||
}
|
||||
}
|
||||
|
||||
const current = new Map<string, RateLimiter>()
|
||||
|
||||
export function getRateLimitter (email: GoogleEmail): RateLimiter {
|
||||
let limiter = current.get(email)
|
||||
if (limiter === undefined) {
|
||||
limiter = new RateLimiter(1000, 500)
|
||||
current.set(email, limiter)
|
||||
}
|
||||
return limiter
|
||||
}
|
||||
|
660
services/calendar/pod-calendar/src/sync.ts
Normal file
660
services/calendar/pod-calendar/src/sync.ts
Normal file
@ -0,0 +1,660 @@
|
||||
//
|
||||
// Copyright © 2025 Hardcore Engineering Inc.
|
||||
//
|
||||
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License. You may
|
||||
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
//
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import {
|
||||
AttachedData,
|
||||
Data,
|
||||
Doc,
|
||||
DocData,
|
||||
DocumentUpdate,
|
||||
generateId,
|
||||
MeasureContext,
|
||||
Mixin,
|
||||
Ref,
|
||||
SocialIdType,
|
||||
TxOperations,
|
||||
TxProcessor
|
||||
} from '@hcengineering/core'
|
||||
import { getCalendarsSyncHistory, getEventHistory, setCalendarsSyncHistory, setEventHistory } from './kvsUtils'
|
||||
import { GoogleEmail, Token, User } from './types'
|
||||
import { calendar_v3 } from 'googleapis'
|
||||
import { getRateLimitter, RateLimiter } from './rateLimiter'
|
||||
import calendar, {
|
||||
Calendar,
|
||||
Event,
|
||||
ExternalCalendar,
|
||||
ReccuringEvent,
|
||||
ReccuringInstance,
|
||||
Visibility
|
||||
} from '@hcengineering/calendar'
|
||||
import { parseRecurrenceStrings } from './utils'
|
||||
import { htmlToMarkup } from '@hcengineering/text'
|
||||
import { deepEqual } from 'fast-equals'
|
||||
import contact, { Contact, getPersonRefsBySocialIds, Person } from '@hcengineering/contact'
|
||||
import setting from '@hcengineering/setting'
|
||||
import { AccountClient } from '@hcengineering/account-client'
|
||||
import { WatchController } from './watch'
|
||||
|
||||
const locks = new Map<string, Promise<void>>()
|
||||
|
||||
export async function lock (key: string): Promise<() => void> {
|
||||
// Wait for any existing lock to be released
|
||||
const currentLock = locks.get(key)
|
||||
if (currentLock != null) {
|
||||
await currentLock
|
||||
}
|
||||
|
||||
// Create a new lock
|
||||
let releaseFn!: () => void
|
||||
const newLock = new Promise<void>((resolve) => {
|
||||
releaseFn = resolve
|
||||
})
|
||||
|
||||
// Store the lock
|
||||
locks.set(key, newLock)
|
||||
|
||||
// Return the release function
|
||||
return () => {
|
||||
if (locks.get(key) === newLock) {
|
||||
locks.delete(key)
|
||||
}
|
||||
releaseFn()
|
||||
}
|
||||
}
|
||||
|
||||
export class IncomingSyncManager {
|
||||
private readonly rateLimiter: RateLimiter
|
||||
private calendars: ExternalCalendar[] = []
|
||||
private readonly participants = new Map<string, Ref<Person>>()
|
||||
private constructor (
|
||||
private readonly ctx: MeasureContext,
|
||||
private readonly accountClient: AccountClient,
|
||||
private readonly client: TxOperations,
|
||||
private readonly user: User,
|
||||
private readonly email: GoogleEmail,
|
||||
private readonly googleClient: calendar_v3.Calendar
|
||||
) {
|
||||
this.rateLimiter = getRateLimitter(this.email)
|
||||
}
|
||||
|
||||
static async sync (
|
||||
ctx: MeasureContext,
|
||||
accountClient: AccountClient,
|
||||
client: TxOperations,
|
||||
user: User,
|
||||
email: GoogleEmail,
|
||||
googleClient: calendar_v3.Calendar
|
||||
): Promise<void> {
|
||||
const syncManager = new IncomingSyncManager(ctx, accountClient, client, user, email, googleClient)
|
||||
const mutex = await lock(`${user.workspace}:${user.userId}:${email}`)
|
||||
try {
|
||||
await syncManager.startSync()
|
||||
} finally {
|
||||
mutex()
|
||||
}
|
||||
}
|
||||
|
||||
private async fillParticipants (): Promise<void> {
|
||||
const personsBySocialId = await getPersonRefsBySocialIds(this.client)
|
||||
const emailSocialIds = await this.client.findAll(contact.class.SocialIdentity, { type: SocialIdType.EMAIL })
|
||||
const emails = await this.client.findAll(contact.class.Channel, { provider: contact.channelProvider.Email })
|
||||
const integrations = await this.client.findAll(setting.class.Integration, {
|
||||
type: calendar.integrationType.Calendar
|
||||
})
|
||||
this.participants.clear()
|
||||
|
||||
for (const sID of emailSocialIds) {
|
||||
if (sID.value === '') continue
|
||||
const pers = personsBySocialId[sID._id]
|
||||
if (pers != null) {
|
||||
this.participants.set(sID.value, pers)
|
||||
}
|
||||
}
|
||||
|
||||
for (const channel of emails) {
|
||||
if (channel.value === '') continue
|
||||
const pers = channel.attachedTo as Ref<Person>
|
||||
if (this.participants.has(channel.value)) continue
|
||||
this.participants.set(channel.value, pers)
|
||||
}
|
||||
|
||||
for (const integration of integrations) {
|
||||
if (integration.value === '') continue
|
||||
const pers = personsBySocialId[integration.createdBy ?? integration.modifiedBy]
|
||||
if (pers != null) {
|
||||
this.participants.set(integration.value, pers)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async getParticipantsMap (): Promise<Map<string, Ref<Person>>> {
|
||||
if (this.participants.size === 0) {
|
||||
await this.fillParticipants()
|
||||
}
|
||||
return this.participants
|
||||
}
|
||||
|
||||
static async push (
|
||||
ctx: MeasureContext,
|
||||
accountClient: AccountClient,
|
||||
client: TxOperations,
|
||||
user: Token,
|
||||
googleClient: calendar_v3.Calendar,
|
||||
calendarId: string | null
|
||||
): Promise<void> {
|
||||
const syncManager = new IncomingSyncManager(ctx, accountClient, client, user, user.email, googleClient)
|
||||
const mutex = await lock(`${user.workspace}:${user.userId}:${user.email}`)
|
||||
try {
|
||||
await syncManager.getMyCalendars()
|
||||
if (calendarId !== null) {
|
||||
await syncManager.sync(calendarId)
|
||||
} else {
|
||||
await syncManager.syncCalendars()
|
||||
}
|
||||
} finally {
|
||||
mutex()
|
||||
}
|
||||
}
|
||||
|
||||
private async sync (calendarId: string): Promise<void> {
|
||||
await this.syncEvents(calendarId)
|
||||
const watchController = WatchController.get(this.accountClient)
|
||||
await this.rateLimiter.take(1)
|
||||
await watchController.addWatch(this.user, this.email, calendarId, this.googleClient)
|
||||
}
|
||||
|
||||
private async startSync (): Promise<void> {
|
||||
try {
|
||||
await this.getMyCalendars()
|
||||
await this.syncCalendars()
|
||||
await this.getMyCalendars()
|
||||
for (const calendar of this.calendars) {
|
||||
if (calendar.externalId !== undefined) {
|
||||
await this.sync(calendar.externalId)
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
this.ctx.error('Start sync error', { workspace: this.user.workspace, user: this.user.userId, err })
|
||||
}
|
||||
}
|
||||
|
||||
private async syncEvents (calendarId: string): Promise<void> {
|
||||
const history = await getEventHistory(this.user, calendarId)
|
||||
await this.eventsSync(calendarId, history)
|
||||
}
|
||||
|
||||
private async eventsSync (calendarId: string, syncToken?: string, pageToken?: string): Promise<void> {
|
||||
try {
|
||||
await this.rateLimiter.take(1)
|
||||
const res = await this.googleClient.events.list({
|
||||
calendarId,
|
||||
syncToken,
|
||||
pageToken,
|
||||
showDeleted: syncToken != null
|
||||
})
|
||||
if (res.status === 410) {
|
||||
await this.eventsSync(calendarId)
|
||||
return
|
||||
}
|
||||
const nextPageToken = res.data.nextPageToken
|
||||
for (const event of res.data.items ?? []) {
|
||||
try {
|
||||
await this.syncEvent(calendarId, event, res.data.accessRole ?? 'reader')
|
||||
} catch (err) {
|
||||
this.ctx.error('save event error', { workspace: this.user.workspace, user: this.user.userId, err })
|
||||
}
|
||||
}
|
||||
if (nextPageToken != null) {
|
||||
await this.eventsSync(calendarId, syncToken, nextPageToken)
|
||||
}
|
||||
if (res.data.nextSyncToken != null) {
|
||||
await setEventHistory(this.user, calendarId, res.data.nextSyncToken)
|
||||
}
|
||||
// if resync
|
||||
} catch (err: any) {
|
||||
if (err?.response?.status === 410) {
|
||||
await this.eventsSync(calendarId)
|
||||
return
|
||||
}
|
||||
this.ctx.error('Event sync error', { workspace: this.user.workspace, user: this.user.userId, err })
|
||||
}
|
||||
}
|
||||
|
||||
private getEventCalendar (calendarId: string, event: calendar_v3.Schema$Event): ExternalCalendar | undefined {
|
||||
const _calendar =
|
||||
this.calendars.find((p) => p.externalId === event.organizer?.email) ??
|
||||
this.calendars.find((p) => p.externalId === calendarId) ??
|
||||
this.calendars[0]
|
||||
return _calendar
|
||||
}
|
||||
|
||||
async syncEvent (calendarId: string, event: calendar_v3.Schema$Event, accessRole: string): Promise<void> {
|
||||
if (event.id != null) {
|
||||
const _calendar = this.getEventCalendar(calendarId, event)
|
||||
if (_calendar !== undefined) {
|
||||
const exists = (await this.client.findOne(calendar.class.Event, {
|
||||
eventId: event.id,
|
||||
calendar: _calendar._id
|
||||
})) as Event | undefined
|
||||
if (exists === undefined) {
|
||||
await this.saveExtEvent(event, accessRole, _calendar)
|
||||
} else {
|
||||
await this.updateExtEvent(event, exists)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async updateExtEvent (event: calendar_v3.Schema$Event, current: Event): Promise<void> {
|
||||
if (event.status === 'cancelled' && current._class !== calendar.class.ReccuringInstance) {
|
||||
await this.client.remove(current)
|
||||
return
|
||||
}
|
||||
const data: Partial<AttachedData<Event>> = await this.parseUpdateData(event)
|
||||
if (event.recurringEventId != null) {
|
||||
const diff = this.getDiff<ReccuringInstance>(
|
||||
{
|
||||
...data,
|
||||
recurringEventId: event.recurringEventId as Ref<ReccuringEvent>,
|
||||
originalStartTime: parseDate(event.originalStartTime),
|
||||
isCancelled: event.status === 'cancelled'
|
||||
},
|
||||
current as ReccuringInstance
|
||||
)
|
||||
if (Object.keys(diff).length > 0) {
|
||||
await this.client.update(current, diff)
|
||||
}
|
||||
} else {
|
||||
if (event.recurrence != null) {
|
||||
const parseRule = parseRecurrenceStrings(event.recurrence)
|
||||
const diff = this.getDiff<ReccuringEvent>(
|
||||
{
|
||||
...data,
|
||||
rules: parseRule.rules,
|
||||
exdate: parseRule.exdate,
|
||||
rdate: parseRule.rdate
|
||||
},
|
||||
current as ReccuringEvent
|
||||
)
|
||||
if (Object.keys(diff).length > 0) {
|
||||
await this.client.update(current, diff)
|
||||
}
|
||||
} else {
|
||||
const diff = this.getDiff(data, current)
|
||||
if (Object.keys(diff).length > 0) {
|
||||
await this.client.update(current, diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
await this.updateMixins(event, current)
|
||||
}
|
||||
|
||||
private async updateMixins (event: calendar_v3.Schema$Event, current: Event): Promise<void> {
|
||||
const mixins = this.parseMixins(event)
|
||||
if (mixins !== undefined) {
|
||||
for (const mixin in mixins) {
|
||||
const attr = mixins[mixin]
|
||||
if (typeof attr === 'object' && Object.keys(attr).length > 0) {
|
||||
if (this.client.getHierarchy().hasMixin(current, mixin as Ref<Mixin<Doc>>)) {
|
||||
const diff = this.getDiff(attr, this.client.getHierarchy().as(current, mixin as Ref<Mixin<Doc>>))
|
||||
if (Object.keys(diff).length > 0) {
|
||||
await this.client.updateMixin(
|
||||
current._id,
|
||||
current._class,
|
||||
calendar.space.Calendar,
|
||||
mixin as Ref<Mixin<Doc>>,
|
||||
diff
|
||||
)
|
||||
}
|
||||
} else {
|
||||
await this.client.createMixin(
|
||||
current._id,
|
||||
current._class,
|
||||
calendar.space.Calendar,
|
||||
mixin as Ref<Mixin<Doc>>,
|
||||
attr
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private getAccess (
|
||||
event: calendar_v3.Schema$Event,
|
||||
accessRole: string
|
||||
): 'freeBusyReader' | 'reader' | 'writer' | 'owner' {
|
||||
if (accessRole !== 'owner') {
|
||||
return accessRole as 'freeBusyReader' | 'reader' | 'writer'
|
||||
}
|
||||
if (event.creator?.self === true) {
|
||||
return 'owner'
|
||||
} else {
|
||||
return 'reader'
|
||||
}
|
||||
}
|
||||
|
||||
private async parseData (
|
||||
event: calendar_v3.Schema$Event,
|
||||
accessRole: string,
|
||||
_calendar: Ref<Calendar>
|
||||
): Promise<AttachedData<Event>> {
|
||||
const participants = await this.getParticipants(event)
|
||||
const res: AttachedData<Event> = {
|
||||
date: parseDate(event.start),
|
||||
dueDate: parseDate(event.end),
|
||||
allDay: event.start?.date != null,
|
||||
description: htmlToMarkup(event.description ?? ''),
|
||||
title: event.summary ?? '',
|
||||
location: event.location ?? undefined,
|
||||
participants: participants[0],
|
||||
eventId: event.id ?? '',
|
||||
calendar: _calendar,
|
||||
access: this.getAccess(event, accessRole),
|
||||
timeZone: event.start?.timeZone ?? event.end?.timeZone ?? 'Etc/GMT',
|
||||
user: this.user.userId
|
||||
}
|
||||
if (participants[1].length > 0) {
|
||||
res.externalParticipants = participants[1]
|
||||
}
|
||||
if (event.visibility != null && event.visibility !== 'default') {
|
||||
res.visibility =
|
||||
event.visibility === 'public'
|
||||
? 'public'
|
||||
: (event.extendedProperties?.private?.visibility as Visibility) ?? 'private'
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
private getParticipant (
|
||||
map: Map<string, Ref<Person>>,
|
||||
value: string
|
||||
): {
|
||||
contact?: Ref<Contact>
|
||||
extra?: string
|
||||
} {
|
||||
const contact = map.get(value)
|
||||
if (contact !== undefined) {
|
||||
return {
|
||||
contact
|
||||
}
|
||||
}
|
||||
return {
|
||||
extra: value
|
||||
}
|
||||
}
|
||||
|
||||
private async getParticipants (event: calendar_v3.Schema$Event): Promise<[Ref<Contact>[], string[]]> {
|
||||
const map = await this.getParticipantsMap()
|
||||
const contacts = new Set<Ref<Contact>>()
|
||||
const extra = new Set<string>()
|
||||
if (event.creator?.email != null) {
|
||||
const res = this.getParticipant(map, event.creator.email)
|
||||
if (res.contact !== undefined) {
|
||||
contacts.add(res.contact)
|
||||
}
|
||||
if (res.extra !== undefined) {
|
||||
extra.add(res.extra)
|
||||
}
|
||||
}
|
||||
for (const attendee of event.attendees ?? []) {
|
||||
if (attendee.email != null) {
|
||||
const res = this.getParticipant(map, attendee.email)
|
||||
if (res.contact !== undefined) {
|
||||
contacts.add(res.contact)
|
||||
}
|
||||
if (res.extra !== undefined) {
|
||||
extra.add(res.extra)
|
||||
}
|
||||
}
|
||||
}
|
||||
return [Array.from(contacts), Array.from(extra)]
|
||||
}
|
||||
|
||||
private getDiff<T extends Doc>(data: Partial<DocData<T>>, current: T): Partial<DocData<T>> {
|
||||
const res = {}
|
||||
for (const key in data) {
|
||||
if (!deepEqual((data as any)[key], (current as any)[key])) {
|
||||
;(res as any)[key] = (data as any)[key]
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
private async parseUpdateData (event: calendar_v3.Schema$Event): Promise<Partial<AttachedData<Event>>> {
|
||||
const res: Partial<AttachedData<Event>> = {}
|
||||
if (event.attendees !== undefined) {
|
||||
const participants = await this.getParticipants(event)
|
||||
res.participants = participants[0]
|
||||
if (participants[1].length > 0) {
|
||||
res.externalParticipants = participants[1]
|
||||
}
|
||||
}
|
||||
if (event.location != null) {
|
||||
res.location = event.location
|
||||
}
|
||||
if (event.description != null) {
|
||||
res.description = htmlToMarkup(event.description)
|
||||
}
|
||||
if (event.summary != null) {
|
||||
res.title = event.summary
|
||||
}
|
||||
if (event.start != null) {
|
||||
res.date = parseDate(event.start)
|
||||
}
|
||||
if (event.end != null) {
|
||||
res.dueDate = parseDate(event.end)
|
||||
}
|
||||
if (event.visibility != null && event.visibility !== 'default') {
|
||||
res.visibility =
|
||||
event.visibility === 'public'
|
||||
? 'public'
|
||||
: (event.extendedProperties?.private?.visibility as Visibility) ?? 'private'
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
private async saveExtEvent (
|
||||
event: calendar_v3.Schema$Event,
|
||||
accessRole: string,
|
||||
_calendar: ExternalCalendar
|
||||
): Promise<void> {
|
||||
const data: AttachedData<Event> = await this.parseData(event, accessRole, _calendar._id)
|
||||
if (event.recurringEventId != null) {
|
||||
const parseRule = parseRecurrenceStrings(event.recurrence ?? [])
|
||||
const id = await this.client.addCollection(
|
||||
calendar.class.ReccuringInstance,
|
||||
calendar.space.Calendar,
|
||||
calendar.ids.NoAttached,
|
||||
calendar.class.Event,
|
||||
'events',
|
||||
{
|
||||
...data,
|
||||
recurringEventId: event.recurringEventId,
|
||||
originalStartTime: parseDate(event.originalStartTime),
|
||||
isCancelled: event.status === 'cancelled',
|
||||
rules: parseRule.rules,
|
||||
exdate: parseRule.exdate,
|
||||
rdate: parseRule.rdate,
|
||||
timeZone: event.start?.timeZone ?? event.end?.timeZone ?? 'Etc/GMT'
|
||||
}
|
||||
)
|
||||
await this.saveMixins(event, id)
|
||||
} else if (event.status !== 'cancelled') {
|
||||
if (event.recurrence != null) {
|
||||
const parseRule = parseRecurrenceStrings(event.recurrence)
|
||||
const id = await this.client.addCollection(
|
||||
calendar.class.ReccuringEvent,
|
||||
calendar.space.Calendar,
|
||||
calendar.ids.NoAttached,
|
||||
calendar.class.Event,
|
||||
'events',
|
||||
{
|
||||
...data,
|
||||
rules: parseRule.rules,
|
||||
exdate: parseRule.exdate,
|
||||
rdate: parseRule.rdate,
|
||||
originalStartTime: data.date,
|
||||
timeZone: event.start?.timeZone ?? event.end?.timeZone ?? 'Etc/GMT'
|
||||
}
|
||||
)
|
||||
await this.saveMixins(event, id)
|
||||
} else {
|
||||
const id = await this.client.addCollection(
|
||||
calendar.class.Event,
|
||||
calendar.space.Calendar,
|
||||
calendar.ids.NoAttached,
|
||||
calendar.class.Event,
|
||||
'events',
|
||||
data
|
||||
)
|
||||
await this.saveMixins(event, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async saveMixins (event: calendar_v3.Schema$Event, _id: Ref<Event>): Promise<void> {
|
||||
const mixins = this.parseMixins(event)
|
||||
if (mixins !== undefined) {
|
||||
for (const mixin in mixins) {
|
||||
const attr = mixins[mixin]
|
||||
if (typeof attr === 'object' && Object.keys(attr).length > 0) {
|
||||
await this.client.createMixin(
|
||||
_id,
|
||||
calendar.class.Event,
|
||||
calendar.space.Calendar,
|
||||
mixin as Ref<Mixin<Doc>>,
|
||||
attr
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private parseMixins (event: calendar_v3.Schema$Event): Record<string, any> | undefined {
|
||||
if (event.extendedProperties?.shared?.mixins !== undefined) {
|
||||
const mixins = JSON.parse(event.extendedProperties.shared.mixins)
|
||||
return mixins
|
||||
}
|
||||
}
|
||||
|
||||
private async getMyCalendars (): Promise<void> {
|
||||
this.calendars = await this.client.findAll(calendar.class.ExternalCalendar, {
|
||||
createdBy: this.user.userId,
|
||||
hidden: false
|
||||
})
|
||||
}
|
||||
|
||||
async syncCalendars (): Promise<void> {
|
||||
const history = await getCalendarsSyncHistory(this.user)
|
||||
await this.calendarSync(history)
|
||||
const watchController = WatchController.get(this.accountClient)
|
||||
await this.rateLimiter.take(1)
|
||||
await watchController.addWatch(this.user, this.email, null, this.googleClient)
|
||||
}
|
||||
|
||||
private async calendarSync (sync?: string): Promise<void> {
|
||||
let syncToken = sync
|
||||
let pageToken: string | undefined
|
||||
while (true) {
|
||||
try {
|
||||
await this.rateLimiter.take(1)
|
||||
const res = await this.googleClient.calendarList.list({
|
||||
syncToken,
|
||||
pageToken
|
||||
})
|
||||
if (res.status === 410) {
|
||||
syncToken = undefined
|
||||
pageToken = undefined
|
||||
continue
|
||||
}
|
||||
const nextPageToken = res.data.nextPageToken
|
||||
for (const calendar of res.data.items ?? []) {
|
||||
try {
|
||||
await this.syncCalendar(calendar)
|
||||
} catch (err) {
|
||||
this.ctx.error('save calendars error', { calendar: JSON.stringify(calendar), err })
|
||||
}
|
||||
}
|
||||
if (nextPageToken != null) {
|
||||
pageToken = nextPageToken
|
||||
continue
|
||||
}
|
||||
if (res.data.nextSyncToken != null) {
|
||||
await setCalendarsSyncHistory(this.user, res.data.nextSyncToken)
|
||||
}
|
||||
return
|
||||
} catch (err: any) {
|
||||
this.ctx.error('Calendars sync error', { workspace: this.user.workspace, user: this.user.userId, err })
|
||||
if (err?.response?.status === 410) {
|
||||
syncToken = undefined
|
||||
pageToken = undefined
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async syncCalendar (val: calendar_v3.Schema$CalendarListEntry): Promise<void> {
|
||||
if (val.id != null) {
|
||||
const exists = this.calendars.find((p) => p.externalId === val.id && p.externalUser === this.email)
|
||||
if (exists === undefined) {
|
||||
const data: Data<ExternalCalendar> = {
|
||||
name: val.summary ?? '',
|
||||
visibility: 'freeBusy',
|
||||
hidden: false,
|
||||
externalId: val.id,
|
||||
externalUser: this.email,
|
||||
default: false
|
||||
}
|
||||
if (val.primary === true) {
|
||||
const primaryExists = this.calendars.length > 0
|
||||
if (primaryExists === undefined) {
|
||||
data.default = true
|
||||
}
|
||||
}
|
||||
const _id = generateId<ExternalCalendar>()
|
||||
const tx = this.client.txFactory.createTxCreateDoc<ExternalCalendar>(
|
||||
calendar.class.ExternalCalendar,
|
||||
calendar.space.Calendar,
|
||||
data,
|
||||
_id,
|
||||
undefined,
|
||||
this.user.userId
|
||||
)
|
||||
await this.client.tx(tx)
|
||||
this.calendars.push(TxProcessor.createDoc2Doc(tx))
|
||||
} else {
|
||||
const update: DocumentUpdate<ExternalCalendar> = {}
|
||||
if (exists.name !== val.summary) {
|
||||
update.name = val.summary ?? exists.name
|
||||
}
|
||||
if (Object.keys(update).length > 0) {
|
||||
await this.client.update(exists, update)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function parseDate (date: calendar_v3.Schema$EventDateTime | undefined): number {
|
||||
if (date?.dateTime != null) {
|
||||
return new Date(date.dateTime).getTime()
|
||||
}
|
||||
if (date?.date != null) {
|
||||
return new Date(date.date).getTime()
|
||||
}
|
||||
return 0
|
||||
}
|
29
services/calendar/pod-calendar/src/tokens.ts
Normal file
29
services/calendar/pod-calendar/src/tokens.ts
Normal file
@ -0,0 +1,29 @@
|
||||
//
|
||||
// Copyright © 2025 Hardcore Engineering Inc.
|
||||
//
|
||||
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License. You may
|
||||
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
//
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import { AccountClient, IntegrationSecret } from '@hcengineering/account-client'
|
||||
import { WorkspaceUuid } from '@hcengineering/core'
|
||||
import { CALENDAR_INTEGRATION } from './types'
|
||||
|
||||
export async function getWorkspaceTokens (
|
||||
accountClient: AccountClient,
|
||||
workspace: WorkspaceUuid
|
||||
): Promise<IntegrationSecret[]> {
|
||||
const secrets = await accountClient.listIntegrationsSecrets({
|
||||
kind: CALENDAR_INTEGRATION,
|
||||
workspaceUuid: workspace
|
||||
})
|
||||
return secrets
|
||||
}
|
@ -18,7 +18,12 @@ import type { PersonId, Timestamp, WorkspaceUuid } from '@hcengineering/core'
|
||||
import type { NextFunction, Request, Response } from 'express'
|
||||
import type { Credentials } from 'google-auth-library'
|
||||
|
||||
export const CALENDAR_INTEGRATION = 'google-calendar'
|
||||
|
||||
export type GoogleEmail = string & { googleEmail: true }
|
||||
|
||||
export interface WatchBase {
|
||||
email: GoogleEmail
|
||||
userId: PersonId
|
||||
workspace: WorkspaceUuid
|
||||
expired: Timestamp
|
||||
@ -42,7 +47,7 @@ export interface DummyWatch {
|
||||
calendarId: string
|
||||
}
|
||||
|
||||
export type Token = User & Credentials
|
||||
export type Token = User & Credentials & { email: GoogleEmail }
|
||||
|
||||
export interface CalendarHistory {
|
||||
userId: PersonId
|
||||
@ -71,7 +76,6 @@ export interface ReccuringData {
|
||||
export interface User {
|
||||
userId: PersonId
|
||||
workspace: WorkspaceUuid
|
||||
token: string
|
||||
}
|
||||
|
||||
export type State = User & {
|
||||
@ -109,3 +113,10 @@ export interface ProjectCredentialsData {
|
||||
client_secret: string
|
||||
redirect_uris: string[]
|
||||
}
|
||||
|
||||
export const SCOPES = [
|
||||
'https://www.googleapis.com/auth/calendar.calendars.readonly',
|
||||
'https://www.googleapis.com/auth/calendar.calendarlist.readonly',
|
||||
'https://www.googleapis.com/auth/calendar.events',
|
||||
'https://www.googleapis.com/auth/userinfo.email'
|
||||
]
|
||||
|
@ -1,5 +1,5 @@
|
||||
//
|
||||
// Copyright © 2023 Hardcore Engineering Inc.
|
||||
// Copyright © 2025 Hardcore Engineering Inc.
|
||||
//
|
||||
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License. You may
|
||||
@ -14,8 +14,12 @@
|
||||
//
|
||||
|
||||
import { RecurringRule } from '@hcengineering/calendar'
|
||||
import { Timestamp } from '@hcengineering/core'
|
||||
import { ReccuringData, type Token, type User } from './types'
|
||||
import { systemAccountUuid, Timestamp, WorkspaceUuid } from '@hcengineering/core'
|
||||
import { generateToken } from '@hcengineering/server-token'
|
||||
import { OAuth2Client } from 'google-auth-library'
|
||||
import { calendar_v3, google } from 'googleapis'
|
||||
import config from './config'
|
||||
import { CALENDAR_INTEGRATION, ReccuringData, State, type Token, type User } from './types'
|
||||
|
||||
export class DeferredPromise<T = any> {
|
||||
public readonly promise: Promise<T>
|
||||
@ -38,7 +42,7 @@ export class DeferredPromise<T = any> {
|
||||
}
|
||||
}
|
||||
|
||||
export function isToken (user: User | Token): user is Token {
|
||||
export function isToken (user: User | Token | State): user is Token {
|
||||
return (user as Token).access_token !== undefined
|
||||
}
|
||||
|
||||
@ -248,3 +252,34 @@ export function encodeReccuring (rules: RecurringRule[], rdates: number[], exdat
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
let serviceToken: string | undefined
|
||||
|
||||
export function getServiceToken (): string {
|
||||
if (serviceToken === undefined) {
|
||||
serviceToken = generateServiceToken()
|
||||
}
|
||||
return serviceToken
|
||||
}
|
||||
|
||||
export function getWorkspaceToken (workspace: WorkspaceUuid): string {
|
||||
return generateToken(systemAccountUuid, workspace, { service: CALENDAR_INTEGRATION })
|
||||
}
|
||||
|
||||
function generateServiceToken (): string {
|
||||
return generateToken(systemAccountUuid, undefined, { service: CALENDAR_INTEGRATION })
|
||||
}
|
||||
|
||||
export function getGoogleClient (): {
|
||||
auth: OAuth2Client
|
||||
google: calendar_v3.Calendar
|
||||
} {
|
||||
const credentials = JSON.parse(config.Credentials)
|
||||
const { client_secret, client_id, redirect_uris } = credentials.web // eslint-disable-line
|
||||
const oAuth2Client = new google.auth.OAuth2(client_id, client_secret, redirect_uris[0]) // eslint-disable-line
|
||||
const googleClient = google.calendar({ version: 'v3', auth: oAuth2Client })
|
||||
return {
|
||||
auth: oAuth2Client,
|
||||
google: googleClient
|
||||
}
|
||||
}
|
||||
|
@ -1,41 +1,44 @@
|
||||
import { generateId, isActiveMode, systemAccountUuid, WorkspaceUuid } from '@hcengineering/core'
|
||||
import { generateToken } from '@hcengineering/server-token'
|
||||
import { AccountClient } from '@hcengineering/account-client'
|
||||
import { generateId, isActiveMode, PersonId, WorkspaceUuid } from '@hcengineering/core'
|
||||
import { Credentials, OAuth2Client } from 'google-auth-library'
|
||||
import { calendar_v3, google } from 'googleapis'
|
||||
import { Collection, Db } from 'mongodb'
|
||||
import { calendar_v3 } from 'googleapis'
|
||||
import config from './config'
|
||||
import { RateLimiter } from './rateLimiter'
|
||||
import { EventWatch, Token, Watch, WatchBase } from './types'
|
||||
import { getAccountClient } from '@hcengineering/server-client'
|
||||
import { getKvsClient } from './kvsUtils'
|
||||
import { getRateLimitter, RateLimiter } from './rateLimiter'
|
||||
import { CALENDAR_INTEGRATION, EventWatch, GoogleEmail, Token, User, Watch, WatchBase } from './types'
|
||||
import { getGoogleClient } from './utils'
|
||||
|
||||
export class WatchClient {
|
||||
private readonly watches: Collection<Watch>
|
||||
private readonly oAuth2Client: OAuth2Client
|
||||
private readonly calendar: calendar_v3.Calendar
|
||||
private readonly user: Token
|
||||
private me: string = ''
|
||||
readonly rateLimiter = new RateLimiter(1000, 500)
|
||||
readonly rateLimiter: RateLimiter
|
||||
|
||||
private constructor (mongo: Db, token: Token) {
|
||||
private constructor (token: Token) {
|
||||
this.user = token
|
||||
this.watches = mongo.collection<WatchBase>('watch')
|
||||
const credentials = JSON.parse(config.Credentials)
|
||||
const { client_secret, client_id, redirect_uris } = credentials.web // eslint-disable-line
|
||||
this.oAuth2Client = new google.auth.OAuth2(client_id, client_secret, redirect_uris[0]) // eslint-disable-line
|
||||
this.calendar = google.calendar({ version: 'v3', auth: this.oAuth2Client })
|
||||
this.rateLimiter = getRateLimitter(this.user.email)
|
||||
|
||||
const res = getGoogleClient()
|
||||
this.calendar = res.google
|
||||
this.oAuth2Client = res.auth
|
||||
}
|
||||
|
||||
static async Create (mongo: Db, token: Token): Promise<WatchClient> {
|
||||
const watchClient = new WatchClient(mongo, token)
|
||||
await watchClient.init(token)
|
||||
static async Create (token: Token): Promise<WatchClient> {
|
||||
const watchClient = new WatchClient(token)
|
||||
await watchClient.setToken(token)
|
||||
return watchClient
|
||||
}
|
||||
|
||||
private async getWatches (): Promise<Record<string, Watch>> {
|
||||
const client = getKvsClient()
|
||||
const key = `${CALENDAR_INTEGRATION}:watch:${this.user.workspace}:${this.user.userId}`
|
||||
const watches = await client.listKeys<Watch>(key)
|
||||
return watches ?? {}
|
||||
}
|
||||
|
||||
private async setToken (token: Credentials): Promise<void> {
|
||||
try {
|
||||
this.oAuth2Client.setCredentials(token)
|
||||
const info = await google.oauth2({ version: 'v2', auth: this.oAuth2Client }).userinfo.get()
|
||||
this.me = info.data.email ?? ''
|
||||
} catch (err: any) {
|
||||
console.error('Set token error', this.user.workspace, this.user.userId, err)
|
||||
await this.checkError(err)
|
||||
@ -45,14 +48,14 @@ export class WatchClient {
|
||||
|
||||
async checkError (err: any): Promise<void> {
|
||||
if (err?.response?.data?.error === 'invalid_grant') {
|
||||
await this.watches.deleteMany({ userId: this.user.userId, workspace: this.user.workspace })
|
||||
const watches = await this.getWatches()
|
||||
const client = getKvsClient()
|
||||
for (const key in watches) {
|
||||
await client.deleteKey(key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async init (token: Token): Promise<void> {
|
||||
await this.setToken(token)
|
||||
}
|
||||
|
||||
async subscribe (watches: Watch[]): Promise<void> {
|
||||
for (const watch of watches) {
|
||||
if (watch.calendarId == null) {
|
||||
@ -70,34 +73,16 @@ export class WatchClient {
|
||||
}
|
||||
|
||||
private async unsubscribeWatch (current: Watch): Promise<void> {
|
||||
await this.rateLimiter.take(1)
|
||||
await this.calendar.channels.stop({ requestBody: { id: current.channelId, resourceId: current.resourceId } })
|
||||
try {
|
||||
await this.rateLimiter.take(1)
|
||||
await this.calendar.channels.stop({ requestBody: { id: current.channelId, resourceId: current.resourceId } })
|
||||
} catch {}
|
||||
}
|
||||
|
||||
private async watchCalendars (current: Watch): Promise<void> {
|
||||
try {
|
||||
await this.unsubscribeWatch(current)
|
||||
const channelId = generateId()
|
||||
const body = { id: channelId, address: config.WATCH_URL, type: 'webhook', token: `user=${this.me}&mode=calendar` }
|
||||
await this.rateLimiter.take(1)
|
||||
const res = await this.calendar.calendarList.watch({ requestBody: body })
|
||||
if (res.data.expiration != null && res.data.resourceId !== null) {
|
||||
// eslint-disable-next-line
|
||||
this.watches.updateOne(
|
||||
{
|
||||
userId: current.userId,
|
||||
workspace: current.workspace,
|
||||
calendarId: null
|
||||
},
|
||||
{
|
||||
$set: {
|
||||
channelId,
|
||||
expired: Number.parseInt(res.data.expiration),
|
||||
resourceId: res.data.resourceId ?? ''
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
await watchCalendars(this.user, this.user.email, this.calendar)
|
||||
} catch (err) {
|
||||
console.error('Calendar watch error', err)
|
||||
}
|
||||
@ -106,66 +91,98 @@ export class WatchClient {
|
||||
private async watchCalendar (current: EventWatch): Promise<void> {
|
||||
try {
|
||||
await this.unsubscribeWatch(current)
|
||||
const channelId = generateId()
|
||||
const body = {
|
||||
id: channelId,
|
||||
address: config.WATCH_URL,
|
||||
type: 'webhook',
|
||||
token: `user=${this.me}&mode=events&calendarId=${current.calendarId}`
|
||||
}
|
||||
await this.rateLimiter.take(1)
|
||||
const res = await this.calendar.events.watch({ calendarId: current.calendarId, requestBody: body })
|
||||
if (res.data.expiration != null && res.data.resourceId != null) {
|
||||
// eslint-disable-next-line
|
||||
this.watches.updateOne(
|
||||
{
|
||||
userId: current.userId,
|
||||
workspace: current.workspace,
|
||||
calendarId: current.calendarId
|
||||
},
|
||||
{
|
||||
$set: {
|
||||
channelId,
|
||||
expired: Number.parseInt(res.data.expiration),
|
||||
resourceId: res.data.resourceId ?? ''
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
await watchCalendar(this.user, this.user.email, current.calendarId, this.calendar)
|
||||
} catch (err: any) {
|
||||
await this.checkError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function watchCalendars (user: User, email: GoogleEmail, googleClient: calendar_v3.Calendar): Promise<void> {
|
||||
const channelId = generateId()
|
||||
const body = { id: channelId, address: config.WATCH_URL, type: 'webhook', token: `user=${email}&mode=calendar` }
|
||||
const res = await googleClient.calendarList.watch({ requestBody: body })
|
||||
if (res.data.expiration != null && res.data.resourceId !== null) {
|
||||
const client = getKvsClient()
|
||||
const key = `${CALENDAR_INTEGRATION}:watch:${user.workspace}:${user.userId}:null`
|
||||
await client.setValue<Watch>(key, {
|
||||
userId: user.userId,
|
||||
workspace: user.workspace,
|
||||
email,
|
||||
calendarId: null,
|
||||
channelId,
|
||||
expired: Number.parseInt(res.data.expiration),
|
||||
resourceId: res.data.resourceId ?? ''
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async function watchCalendar (
|
||||
user: User,
|
||||
email: GoogleEmail,
|
||||
calendarId: string,
|
||||
googleClient: calendar_v3.Calendar
|
||||
): Promise<void> {
|
||||
const channelId = generateId()
|
||||
const body = {
|
||||
id: channelId,
|
||||
address: config.WATCH_URL,
|
||||
type: 'webhook',
|
||||
token: `user=${email}&mode=events&calendarId=${calendarId}`
|
||||
}
|
||||
const res = await googleClient.events.watch({ calendarId, requestBody: body })
|
||||
if (res.data.expiration != null && res.data.resourceId != null) {
|
||||
const client = getKvsClient()
|
||||
const key = `${CALENDAR_INTEGRATION}:watch:${user.workspace}:${user.userId}:${calendarId}`
|
||||
await client.setValue<Watch>(key, {
|
||||
userId: user.userId,
|
||||
workspace: user.workspace,
|
||||
email,
|
||||
calendarId,
|
||||
channelId,
|
||||
expired: Number.parseInt(res.data.expiration),
|
||||
resourceId: res.data.resourceId ?? ''
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// we have to refresh channels approx each week
|
||||
export class WatchController {
|
||||
private readonly watches: Collection<Watch>
|
||||
private readonly tokens: Collection<Token>
|
||||
|
||||
private timer: NodeJS.Timeout | undefined = undefined
|
||||
protected static _instance: WatchController
|
||||
|
||||
private constructor (private readonly mongo: Db) {
|
||||
this.watches = mongo.collection<WatchBase>('watch')
|
||||
this.tokens = mongo.collection<Token>('tokens')
|
||||
private constructor (private readonly accountClient: AccountClient) {
|
||||
console.log('watch started')
|
||||
}
|
||||
|
||||
static get (mongo: Db): WatchController {
|
||||
static get (accountClient: AccountClient): WatchController {
|
||||
if (WatchController._instance !== undefined) {
|
||||
return WatchController._instance
|
||||
}
|
||||
return new WatchController(mongo)
|
||||
return new WatchController(accountClient)
|
||||
}
|
||||
|
||||
private async getUserWatches (userId: PersonId, workspace: WorkspaceUuid): Promise<Record<string, Watch>> {
|
||||
const client = getKvsClient()
|
||||
const key = `${CALENDAR_INTEGRATION}:watch:${workspace}:${userId}`
|
||||
return (await client.listKeys<Watch>(key)) ?? {}
|
||||
}
|
||||
|
||||
async unsubscribe (user: Token): Promise<void> {
|
||||
const allWatches = await this.watches.find({ userId: user.userId, workspae: user.workspace }).toArray()
|
||||
await this.watches.deleteMany({ userId: user.userId, workspae: user.workspace })
|
||||
const token = this.tokens.findOne({ user: user.userId, workspace: user.workspace })
|
||||
const client = getKvsClient()
|
||||
const watches = await this.getUserWatches(user.userId, user.workspace)
|
||||
for (const key in watches) {
|
||||
await client.deleteKey(key)
|
||||
}
|
||||
const token = await this.accountClient.getIntegrationSecret({
|
||||
socialId: user.userId,
|
||||
kind: CALENDAR_INTEGRATION,
|
||||
workspaceUuid: user.workspace,
|
||||
key: user.email
|
||||
})
|
||||
if (token == null) return
|
||||
const watchClient = await WatchClient.Create(this.mongo, user)
|
||||
await watchClient.unsubscribe(allWatches)
|
||||
const watchClient = await WatchClient.Create(user)
|
||||
await watchClient.unsubscribe(Object.values(watches))
|
||||
}
|
||||
|
||||
stop (): void {
|
||||
@ -186,15 +203,21 @@ export class WatchController {
|
||||
|
||||
async checkAll (): Promise<void> {
|
||||
const expired = Date.now() + 24 * 60 * 60 * 1000
|
||||
const watches = await this.watches
|
||||
.find({
|
||||
expired: { $lt: expired }
|
||||
})
|
||||
.toArray()
|
||||
console.log('watch, found for update', watches.length)
|
||||
const client = getKvsClient()
|
||||
const key = `${CALENDAR_INTEGRATION}:watch:`
|
||||
const watches = (await client.listKeys<Watch>(key)) ?? {}
|
||||
const toRefresh: Watch[] = []
|
||||
for (const key in watches) {
|
||||
const watch = watches[key]
|
||||
if (watch.expired < expired) {
|
||||
toRefresh.push(watch)
|
||||
}
|
||||
}
|
||||
console.log('watch, found for update', toRefresh.length)
|
||||
if (toRefresh.length === 0) return
|
||||
const groups = new Map<string, WatchBase[]>()
|
||||
const workspaces = new Set<WorkspaceUuid>()
|
||||
for (const watch of watches) {
|
||||
for (const watch of toRefresh) {
|
||||
workspaces.add(watch.workspace)
|
||||
const key = `${watch.userId}:${watch.workspace}`
|
||||
const group = groups.get(key)
|
||||
@ -204,28 +227,56 @@ export class WatchController {
|
||||
groups.set(key, [watch])
|
||||
}
|
||||
}
|
||||
const token = generateToken(systemAccountUuid)
|
||||
const ids = [...workspaces]
|
||||
const infos = await getAccountClient(token).getWorkspacesInfo(ids)
|
||||
const tokens = await this.tokens.find({ workspace: { $in: ids } }).toArray()
|
||||
if (ids.length === 0) return
|
||||
const infos = await this.accountClient.getWorkspacesInfo(ids)
|
||||
const tokens = await this.accountClient.listIntegrationsSecrets({ kind: CALENDAR_INTEGRATION })
|
||||
for (const group of groups.values()) {
|
||||
try {
|
||||
const userId = group[0].userId
|
||||
const workspace = group[0].workspace
|
||||
const token = tokens.find((p) => p.workspace === workspace && p.userId === userId)
|
||||
const token = tokens.find((p) => p.workspaceUuid === workspace && p.socialId === userId)
|
||||
if (token === undefined) {
|
||||
await this.watches.deleteMany({ userId, workspace })
|
||||
const toRemove = await this.getUserWatches(userId, workspace)
|
||||
for (const key in toRemove) {
|
||||
await client.deleteKey(key)
|
||||
}
|
||||
continue
|
||||
}
|
||||
const info = infos.find((p) => p.uuid === workspace)
|
||||
if (info === undefined || isActiveMode(info.mode)) {
|
||||
await this.watches.deleteMany({ userId, workspace })
|
||||
if (info === undefined || !isActiveMode(info.mode)) {
|
||||
const toRemove = await this.getUserWatches(userId, workspace)
|
||||
for (const key in toRemove) {
|
||||
await client.deleteKey(key)
|
||||
}
|
||||
continue
|
||||
}
|
||||
const watchClient = await WatchClient.Create(this.mongo, token)
|
||||
const watchClient = await WatchClient.Create(JSON.parse(token.secret))
|
||||
await watchClient.subscribe(group)
|
||||
} catch {}
|
||||
}
|
||||
console.log('watch check done')
|
||||
}
|
||||
|
||||
async addWatch (
|
||||
user: User,
|
||||
email: GoogleEmail,
|
||||
calendarId: string | null,
|
||||
googleClient: calendar_v3.Calendar,
|
||||
force: boolean = false
|
||||
): Promise<void> {
|
||||
if (!force) {
|
||||
const client = getKvsClient()
|
||||
const key = `${CALENDAR_INTEGRATION}:watch:${user.workspace}:${user.userId}:${calendarId ?? 'null'}`
|
||||
const exists = await client.getValue<Watch>(key)
|
||||
if (exists != null) {
|
||||
return
|
||||
}
|
||||
}
|
||||
if (calendarId != null) {
|
||||
await watchCalendar(user, email, calendarId, googleClient)
|
||||
} else {
|
||||
await watchCalendars(user, email, googleClient)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -13,274 +13,150 @@
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import { AccountClient } from '@hcengineering/account-client'
|
||||
import calendar, { Event, ExternalCalendar } from '@hcengineering/calendar'
|
||||
import contact, {
|
||||
Channel,
|
||||
Contact,
|
||||
Person,
|
||||
getPrimarySocialId,
|
||||
getPersonRefBySocialId,
|
||||
getPersonRefsBySocialIds,
|
||||
type Employee,
|
||||
SocialIdentityRef
|
||||
} from '@hcengineering/contact'
|
||||
import core, {
|
||||
PersonId,
|
||||
SocialIdType,
|
||||
TxMixin,
|
||||
MeasureContext,
|
||||
RateLimiter,
|
||||
SocialIdType,
|
||||
TxOperations,
|
||||
TxProcessor,
|
||||
WorkspaceUuid,
|
||||
toIdMap,
|
||||
type Client,
|
||||
type Doc,
|
||||
type Ref,
|
||||
type Tx,
|
||||
type TxCreateDoc,
|
||||
type TxRemoveDoc,
|
||||
type TxUpdateDoc,
|
||||
systemAccountUuid,
|
||||
PersonUuid
|
||||
type Ref
|
||||
} from '@hcengineering/core'
|
||||
import { generateToken } from '@hcengineering/server-token'
|
||||
import setting, { Integration } from '@hcengineering/setting'
|
||||
import { Collection, type Db } from 'mongodb'
|
||||
import { CalendarClient } from './calendar'
|
||||
import { CalendarController } from './calendarController'
|
||||
import { getClient } from './client'
|
||||
import { SyncHistory, Token, type User } from './types'
|
||||
import config from './config'
|
||||
import { addUserByEmail, getSyncHistory, setSyncHistory } from './kvsUtils'
|
||||
import { getWorkspaceTokens } from './tokens'
|
||||
import { CALENDAR_INTEGRATION, GoogleEmail, Token } from './types'
|
||||
import { getWorkspaceToken } from './utils'
|
||||
import contact, { getPersonRefsBySocialIds, Person } from '@hcengineering/contact'
|
||||
import setting from '@hcengineering/setting'
|
||||
|
||||
export class WorkspaceClient {
|
||||
private readonly txHandlers: ((...tx: Tx[]) => Promise<void>)[] = []
|
||||
|
||||
client!: Client
|
||||
private readonly clients: Map<string, CalendarClient | Promise<CalendarClient>> = new Map<
|
||||
string,
|
||||
CalendarClient | Promise<CalendarClient>
|
||||
>()
|
||||
|
||||
private readonly syncHistory: Collection<SyncHistory>
|
||||
private readonly tokens: Collection<Token>
|
||||
private channels = new Map<Ref<Channel>, Channel>()
|
||||
private readonly externalIdByPersonId = new Map<PersonId, string | null>()
|
||||
readonly calendars = {
|
||||
byId: new Map<Ref<ExternalCalendar>, ExternalCalendar>(),
|
||||
byExternal: new Map<string, ExternalCalendar[]>()
|
||||
}
|
||||
|
||||
readonly contacts = {
|
||||
byId: new Map<Ref<Contact>, string>(),
|
||||
byEmail: new Map<string, Ref<Contact>>()
|
||||
}
|
||||
|
||||
readonly integrations = {
|
||||
byId: new Map<Ref<Integration>, Integration>(),
|
||||
byContact: new Map<Ref<Contact>, string[]>(),
|
||||
byEmail: new Map<string, Ref<Contact>>()
|
||||
}
|
||||
private readonly clients = new Map<GoogleEmail, CalendarClient>()
|
||||
private readonly calendarsByExternal = new Map<string, ExternalCalendar>()
|
||||
readonly calendarsById = new Map<Ref<ExternalCalendar>, ExternalCalendar>()
|
||||
readonly participants = new Map<Ref<Person>, string>()
|
||||
|
||||
private constructor (
|
||||
private readonly mongo: Db,
|
||||
private readonly workspace: WorkspaceUuid,
|
||||
private readonly serviceController: CalendarController
|
||||
) {
|
||||
this.tokens = mongo.collection<Token>('tokens')
|
||||
this.syncHistory = mongo.collection<SyncHistory>('syncHistories')
|
||||
private readonly ctx: MeasureContext,
|
||||
private readonly accountClient: AccountClient,
|
||||
private readonly client: TxOperations,
|
||||
private readonly workspace: WorkspaceUuid
|
||||
) {}
|
||||
|
||||
static async run (ctx: MeasureContext, accountClient: AccountClient, workspace: WorkspaceUuid): Promise<void> {
|
||||
const client = await getClient(getWorkspaceToken(workspace))
|
||||
const txOp = new TxOperations(client, core.account.System)
|
||||
const instance = new WorkspaceClient(ctx, accountClient, txOp, workspace)
|
||||
|
||||
await instance.init()
|
||||
await instance.startAll()
|
||||
await instance.close()
|
||||
}
|
||||
|
||||
static async create (
|
||||
mongo: Db,
|
||||
workspace: WorkspaceUuid,
|
||||
serviceController: CalendarController
|
||||
): Promise<WorkspaceClient> {
|
||||
const instance = new WorkspaceClient(mongo, workspace, serviceController)
|
||||
|
||||
await instance.initClient(workspace)
|
||||
return instance
|
||||
async init (): Promise<void> {
|
||||
const calendars = await this.client.findAll(calendar.class.ExternalCalendar, {})
|
||||
this.calendarsById.clear()
|
||||
this.calendarsByExternal.clear()
|
||||
for (const calendar of calendars) {
|
||||
this.calendarsById.set(calendar._id, calendar)
|
||||
this.calendarsByExternal.set(calendar.externalId, calendar)
|
||||
}
|
||||
await this.fillParticipants()
|
||||
}
|
||||
|
||||
async createCalendarClient (user: User): Promise<CalendarClient> {
|
||||
const current = this.getCalendarClient(user.userId)
|
||||
if (current !== undefined) {
|
||||
if (current instanceof Promise) {
|
||||
return await current
|
||||
private async fillParticipants (): Promise<void> {
|
||||
const personsBySocialId = await getPersonRefsBySocialIds(this.client)
|
||||
const emailSocialIds = await this.client.findAll(contact.class.SocialIdentity, { type: SocialIdType.EMAIL })
|
||||
const emails = await this.client.findAll(contact.class.Channel, { provider: contact.channelProvider.Email })
|
||||
const integrations = await this.client.findAll(setting.class.Integration, {
|
||||
type: calendar.integrationType.Calendar
|
||||
})
|
||||
this.participants.clear()
|
||||
|
||||
for (const sID of emailSocialIds) {
|
||||
if (sID.value === '') continue
|
||||
const pers = personsBySocialId[sID._id]
|
||||
if (pers != null) {
|
||||
this.participants.set(pers, sID.value)
|
||||
}
|
||||
}
|
||||
|
||||
for (const channel of emails) {
|
||||
if (channel.value === '') continue
|
||||
const pers = channel.attachedTo as Ref<Person>
|
||||
if (this.participants.has(pers)) continue
|
||||
this.participants.set(pers, channel.value)
|
||||
}
|
||||
|
||||
for (const integration of integrations) {
|
||||
if (integration.value === '') continue
|
||||
const pers = personsBySocialId[integration.createdBy ?? integration.modifiedBy]
|
||||
if (pers != null) {
|
||||
this.participants.set(pers, integration.value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async startAll (): Promise<void> {
|
||||
const tokens = await getWorkspaceTokens(this.accountClient, this.workspace)
|
||||
for (const token of tokens) {
|
||||
if (token.workspaceUuid === null) continue
|
||||
await addUserByEmail(JSON.parse(token.secret), token.key as GoogleEmail)
|
||||
await this.createCalendarClient(JSON.parse(token.secret))
|
||||
}
|
||||
await this.getNewEvents()
|
||||
const limiter = new RateLimiter(config.InitLimit)
|
||||
for (const client of this.clients.values()) {
|
||||
await limiter.add(async () => {
|
||||
await client.startSync()
|
||||
})
|
||||
}
|
||||
await limiter.waitProcessing()
|
||||
}
|
||||
|
||||
private async createCalendarClient (user: Token): Promise<CalendarClient> {
|
||||
const current = this.clients.get(user.email)
|
||||
if (current !== undefined) {
|
||||
return current
|
||||
}
|
||||
const newClient = CalendarClient.create(user, this.mongo, this.client, this)
|
||||
this.clients.set(user.userId, newClient)
|
||||
const res = await newClient
|
||||
this.clients.set(user.userId, res)
|
||||
return res
|
||||
}
|
||||
|
||||
async newCalendarClient (user: User, code: string): Promise<CalendarClient> {
|
||||
const newClient = await CalendarClient.create(user, this.mongo, this.client, this)
|
||||
const userId = await newClient.authorize(code)
|
||||
if (this.clients.has(userId)) {
|
||||
newClient.close()
|
||||
throw new Error('Client already exist')
|
||||
}
|
||||
this.clients.set(userId, newClient)
|
||||
const newClient = await CalendarClient.create(this.ctx, this.accountClient, user, this.client, this)
|
||||
this.clients.set(user.email, newClient)
|
||||
return newClient
|
||||
}
|
||||
|
||||
async close (): Promise<void> {
|
||||
for (let client of this.clients.values()) {
|
||||
if (client instanceof Promise) {
|
||||
client = await client
|
||||
}
|
||||
client.close()
|
||||
}
|
||||
private async close (): Promise<void> {
|
||||
this.clients.clear()
|
||||
await this.client?.close()
|
||||
}
|
||||
|
||||
async getUserId (account: PersonUuid): Promise<PersonId> {
|
||||
const person = await this.client.findOne(contact.class.Person, { personUuid: account })
|
||||
if (person === undefined) {
|
||||
throw new Error('Person not found')
|
||||
}
|
||||
|
||||
const personId = await getPrimarySocialId(this.client, person._id)
|
||||
|
||||
if (personId === undefined) {
|
||||
throw new Error('PersonId not found')
|
||||
}
|
||||
|
||||
return personId
|
||||
}
|
||||
|
||||
async signout (personId: PersonId, byError: boolean = false): Promise<number> {
|
||||
let client = this.clients.get(personId)
|
||||
if (client !== undefined) {
|
||||
if (client instanceof Promise) {
|
||||
client = await client
|
||||
}
|
||||
await client.signout()
|
||||
} else {
|
||||
const integration = await this.client.findOne(setting.class.Integration, {
|
||||
type: calendar.integrationType.Calendar,
|
||||
value: personId
|
||||
})
|
||||
if (integration !== undefined) {
|
||||
const txOp = new TxOperations(this.client, core.account.System)
|
||||
if (byError) {
|
||||
await txOp.update(integration, { disabled: true })
|
||||
} else {
|
||||
await txOp.remove(integration)
|
||||
}
|
||||
}
|
||||
}
|
||||
this.clients.delete(personId)
|
||||
return this.clients.size
|
||||
}
|
||||
|
||||
removeClient (personId: PersonId): void {
|
||||
this.clients.delete(personId)
|
||||
if (this.clients.size > 0) return
|
||||
void this.close()
|
||||
this.serviceController.removeWorkspace(this.workspace)
|
||||
}
|
||||
|
||||
private getCalendarClient (personId: PersonId): CalendarClient | Promise<CalendarClient> | undefined {
|
||||
return this.clients.get(personId)
|
||||
}
|
||||
|
||||
private async getCalendarClientByCalendar (
|
||||
id: Ref<ExternalCalendar>,
|
||||
create: boolean = false
|
||||
): Promise<CalendarClient | undefined> {
|
||||
const calendar = this.calendars.byId.get(id)
|
||||
if (calendar === undefined) {
|
||||
console.warn("couldn't find calendar by id", id)
|
||||
return
|
||||
}
|
||||
const client = this.clients.get(calendar.externalUser)
|
||||
if (client instanceof Promise) {
|
||||
return await client
|
||||
}
|
||||
if (client === undefined && create) {
|
||||
const user = await this.tokens.findOne({
|
||||
workspace: this.workspace,
|
||||
access_token: { $exists: true },
|
||||
email: calendar.externalUser
|
||||
})
|
||||
if (user != null) {
|
||||
return await this.createCalendarClient(user)
|
||||
}
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
private async initClient (workspace: WorkspaceUuid): Promise<Client> {
|
||||
const token = generateToken(systemAccountUuid, workspace, { service: 'calendar' })
|
||||
const client = await getClient(token)
|
||||
client.notify = (...tx: Tx[]) => {
|
||||
void this.txHandler(...tx)
|
||||
}
|
||||
|
||||
this.client = client
|
||||
await this.init()
|
||||
return this.client
|
||||
}
|
||||
|
||||
private async txHandler (...tx: Tx[]): Promise<void> {
|
||||
await Promise.all(
|
||||
this.txHandlers.map(async (handler) => {
|
||||
await handler(...tx)
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
private async init (): Promise<void> {
|
||||
await this.checkUsers()
|
||||
await this.initContacts()
|
||||
await this.initIntegrations()
|
||||
await this.initCalendars()
|
||||
}
|
||||
|
||||
async sync (): Promise<void> {
|
||||
await this.getNewEvents()
|
||||
const limiter = new RateLimiter(config.InitLimit)
|
||||
for (let client of this.clients.values()) {
|
||||
void limiter.add(async () => {
|
||||
if (client instanceof Promise) {
|
||||
client = await client
|
||||
}
|
||||
await client.startSync()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// #region Events
|
||||
|
||||
private async getSyncTime (): Promise<number | undefined> {
|
||||
const res = await this.syncHistory.findOne({
|
||||
workspace: this.workspace
|
||||
})
|
||||
return res?.timestamp
|
||||
static async push (
|
||||
ctx: MeasureContext,
|
||||
accountClient: AccountClient,
|
||||
workspace: WorkspaceUuid,
|
||||
event: Event,
|
||||
type: 'create' | 'update' | 'delete'
|
||||
): Promise<void> {
|
||||
const client = await getClient(getWorkspaceToken(workspace))
|
||||
const txOp = new TxOperations(client, core.account.System)
|
||||
const token = await getTokenByEvent(accountClient, txOp, event, workspace)
|
||||
if (token != null) {
|
||||
const instance = new WorkspaceClient(ctx, accountClient, txOp, workspace)
|
||||
await instance.pushEvent(token, event, type)
|
||||
await instance.close()
|
||||
return
|
||||
}
|
||||
await txOp.close()
|
||||
}
|
||||
|
||||
async updateSyncTime (): Promise<void> {
|
||||
const timestamp = Date.now()
|
||||
await this.syncHistory.updateOne(
|
||||
{
|
||||
workspace: this.workspace
|
||||
},
|
||||
{
|
||||
$set: {
|
||||
timestamp
|
||||
}
|
||||
},
|
||||
{ upsert: true }
|
||||
)
|
||||
}
|
||||
|
||||
async pushEvent (event: Event, type: 'create' | 'update' | 'delete'): Promise<void> {
|
||||
const client = await this.getCalendarClientByCalendar(event.calendar as Ref<ExternalCalendar>, true)
|
||||
async pushEvent (user: Token, event: Event, type: 'create' | 'update' | 'delete'): Promise<void> {
|
||||
const client =
|
||||
(await this.getCalendarClientByCalendar(event.calendar as Ref<ExternalCalendar>)) ??
|
||||
(await this.createCalendarClient(user))
|
||||
if (client === undefined) {
|
||||
console.warn('Client not found', event.calendar, this.workspace)
|
||||
return
|
||||
@ -293,387 +169,57 @@ export class WorkspaceClient {
|
||||
await this.updateSyncTime()
|
||||
}
|
||||
|
||||
async getNewEvents (): Promise<void> {
|
||||
private async getSyncTime (): Promise<number | undefined> {
|
||||
return (await getSyncHistory(this.workspace)) ?? undefined
|
||||
}
|
||||
|
||||
private async updateSyncTime (): Promise<void> {
|
||||
await setSyncHistory(this.workspace)
|
||||
}
|
||||
|
||||
private async getNewEvents (): Promise<void> {
|
||||
const lastSync = await this.getSyncTime()
|
||||
const query = lastSync !== undefined ? { modifiedOn: { $gt: lastSync } } : {}
|
||||
const newEvents = await this.client.findAll(calendar.class.Event, query)
|
||||
this.txHandlers.push(async (...tx: Tx[]) => {
|
||||
await this.txEventHandler(...tx)
|
||||
})
|
||||
for (const newEvent of newEvents) {
|
||||
const client = await this.getCalendarClientByCalendar(newEvent.calendar as Ref<ExternalCalendar>)
|
||||
if (client === undefined) {
|
||||
console.warn('Client not found', newEvent.calendar, this.workspace)
|
||||
return
|
||||
this.ctx.warn('Client not found', { calendar: newEvent.calendar, workspace: this.workspace })
|
||||
continue
|
||||
}
|
||||
await client.syncMyEvent(newEvent)
|
||||
await this.updateSyncTime()
|
||||
}
|
||||
console.log('all outcoming messages synced', this.workspace)
|
||||
this.ctx.info('all outcoming messages synced', this.workspace)
|
||||
}
|
||||
|
||||
private async txEventHandler (...txes: Tx[]): Promise<void> {
|
||||
for (const tx of txes) {
|
||||
switch (tx._class) {
|
||||
case core.class.TxCreateDoc: {
|
||||
await this.txCreateEvent(tx as TxCreateDoc<Doc>)
|
||||
return
|
||||
}
|
||||
case core.class.TxUpdateDoc: {
|
||||
await this.txUpdateEvent(tx as TxUpdateDoc<Event>)
|
||||
return
|
||||
}
|
||||
case core.class.TxRemoveDoc: {
|
||||
await this.txRemoveEvent(tx as TxRemoveDoc<Doc>)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async txCreateEvent (tx: TxCreateDoc<Doc>): Promise<void> {
|
||||
const hierarhy = this.client.getHierarchy()
|
||||
if (hierarhy.isDerived(tx.objectClass, calendar.class.Event)) {
|
||||
const doc = TxProcessor.createDoc2Doc(tx as TxCreateDoc<Event>)
|
||||
if (doc.access !== 'owner') return
|
||||
const client = await this.getCalendarClientByCalendar(doc.calendar as Ref<ExternalCalendar>)
|
||||
if (client === undefined) {
|
||||
return
|
||||
}
|
||||
try {
|
||||
await client.createEvent(doc)
|
||||
await this.updateSyncTime()
|
||||
} catch (err) {
|
||||
console.error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async handleMove (tx: TxUpdateDoc<Event>): Promise<void> {
|
||||
const event = await this.client.findOne(calendar.class.Event, { _id: tx.objectId })
|
||||
if (event === undefined) {
|
||||
private async getCalendarClientByCalendar (id: Ref<ExternalCalendar>): Promise<CalendarClient | undefined> {
|
||||
const calendar = this.calendarsByExternal.get(id)
|
||||
if (calendar === undefined) {
|
||||
console.warn("couldn't find calendar by id", id)
|
||||
return
|
||||
}
|
||||
try {
|
||||
const txes = await this.client.findAll(core.class.TxCUD, {
|
||||
objectId: tx.objectId
|
||||
})
|
||||
const extracted = txes.filter((p) => p._id !== tx._id)
|
||||
const ev = TxProcessor.buildDoc2Doc<Event>(extracted)
|
||||
if (ev != null) {
|
||||
const oldClient = await this.getCalendarClientByCalendar(ev.calendar as Ref<ExternalCalendar>)
|
||||
if (oldClient !== undefined) {
|
||||
const oldCalendar = this.calendars.byId.get(ev.calendar as Ref<ExternalCalendar>)
|
||||
if (oldCalendar !== undefined) {
|
||||
await oldClient.remove(event.eventId, oldCalendar.externalId)
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Error on remove event', err)
|
||||
}
|
||||
try {
|
||||
const client = await this.getCalendarClientByCalendar(event.calendar as Ref<ExternalCalendar>)
|
||||
if (client !== undefined) {
|
||||
await client.syncMyEvent(event)
|
||||
}
|
||||
await this.updateSyncTime()
|
||||
} catch (err) {
|
||||
console.error('Error on move event', err)
|
||||
}
|
||||
return this.clients.get(calendar.externalUser as GoogleEmail)
|
||||
}
|
||||
|
||||
private async txUpdateEvent (tx: TxUpdateDoc<Event>): Promise<void> {
|
||||
const hierarhy = this.client.getHierarchy()
|
||||
if (hierarhy.isDerived(tx.objectClass, calendar.class.Event)) {
|
||||
if (tx.operations.calendar !== undefined) {
|
||||
await this.handleMove(tx)
|
||||
return
|
||||
}
|
||||
const event = await this.client.findOne(calendar.class.Event, { _id: tx.objectId })
|
||||
if (event === undefined) {
|
||||
return
|
||||
}
|
||||
if (event.access !== 'owner' && event.access !== 'writer') return
|
||||
const client = await this.getCalendarClientByCalendar(event.calendar as Ref<ExternalCalendar>)
|
||||
if (client === undefined) {
|
||||
return
|
||||
}
|
||||
try {
|
||||
await client.updateEvent(event)
|
||||
await this.updateSyncTime()
|
||||
} catch (err) {
|
||||
console.error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async txRemoveEvent (tx: TxRemoveDoc<Doc>): Promise<void> {
|
||||
const hierarhy = this.client.getHierarchy()
|
||||
if (hierarhy.isDerived(tx.objectClass, calendar.class.Event)) {
|
||||
const txes = await this.client.findAll(core.class.TxCUD, {
|
||||
objectId: tx.objectId
|
||||
})
|
||||
const ev = TxProcessor.buildDoc2Doc<Event>(txes)
|
||||
if (ev == null) return
|
||||
if (ev.access !== 'owner' && ev.access !== 'writer') return
|
||||
const client = await this.getCalendarClientByCalendar(ev?.calendar as Ref<ExternalCalendar>)
|
||||
if (client === undefined) {
|
||||
return
|
||||
}
|
||||
await client.removeEvent(ev)
|
||||
await this.updateSyncTime()
|
||||
}
|
||||
}
|
||||
|
||||
// #endregion
|
||||
|
||||
// #region Calendars
|
||||
|
||||
private async initCalendars (): Promise<void> {
|
||||
const calendars = await this.client.findAll(calendar.class.ExternalCalendar, {})
|
||||
this.calendars.byId = toIdMap(calendars)
|
||||
this.calendars.byExternal.clear()
|
||||
for (const calendar of calendars) {
|
||||
const arrByExt = this.calendars.byExternal.get(calendar.externalId) ?? []
|
||||
arrByExt.push(calendar)
|
||||
this.calendars.byExternal.set(calendar.externalId, arrByExt)
|
||||
}
|
||||
this.txHandlers.push(async (...txes: Tx[]) => {
|
||||
for (const tx of txes) {
|
||||
await this.txCalendarHandler(tx)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async getExtIdByPersonId (personId: PersonId): Promise<string | null | undefined> {
|
||||
if (!this.externalIdByPersonId.has(personId)) {
|
||||
const socialIdentity = await this.client.findOne(contact.class.SocialIdentity, {
|
||||
_id: personId as SocialIdentityRef,
|
||||
type: SocialIdType.GOOGLE
|
||||
})
|
||||
this.externalIdByPersonId.set(personId, socialIdentity?.value ?? null)
|
||||
}
|
||||
|
||||
return this.externalIdByPersonId.get(personId)
|
||||
}
|
||||
|
||||
async getMyCalendars (personId: PersonId): Promise<ExternalCalendar[]> {
|
||||
const extId = await this.getExtIdByPersonId(personId)
|
||||
if (extId == null) {
|
||||
return []
|
||||
}
|
||||
|
||||
return this.calendars.byExternal.get(extId) ?? []
|
||||
}
|
||||
|
||||
private async txCalendarHandler (actualTx: Tx): Promise<void> {
|
||||
if (actualTx._class === core.class.TxCreateDoc) {
|
||||
if ((actualTx as TxCreateDoc<Doc>).objectClass === calendar.class.ExternalCalendar) {
|
||||
const calendar = TxProcessor.createDoc2Doc(actualTx as TxCreateDoc<ExternalCalendar>)
|
||||
this.calendars.byId.set(calendar._id, calendar)
|
||||
const arr = this.calendars.byExternal.get(calendar.externalId) ?? []
|
||||
arr.push(calendar)
|
||||
this.calendars.byExternal.set(calendar.externalId, arr)
|
||||
}
|
||||
}
|
||||
if (actualTx._class === core.class.TxRemoveDoc) {
|
||||
const remTx = actualTx as TxRemoveDoc<ExternalCalendar>
|
||||
const calendar = this.calendars.byId.get(remTx.objectId)
|
||||
if (calendar !== undefined) {
|
||||
this.calendars.byId.delete(remTx.objectId)
|
||||
const arrByExt = this.calendars.byExternal.get(calendar.externalId) ?? []
|
||||
const indexByExt = arrByExt.findIndex((p) => p._id === calendar._id)
|
||||
if (indexByExt !== -1) {
|
||||
arrByExt.splice(indexByExt, 1)
|
||||
this.calendars.byExternal.set(calendar.externalId, arrByExt)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// #endregion
|
||||
|
||||
// #region Contacts
|
||||
|
||||
private async initContacts (): Promise<void> {
|
||||
const channels = await this.client.findAll(contact.class.Channel, { provider: contact.channelProvider.Email })
|
||||
this.channels = toIdMap(channels)
|
||||
const emailSocialIds = await this.client.findAll(contact.class.SocialIdentity, { type: SocialIdType.EMAIL })
|
||||
for (const socialId of emailSocialIds) {
|
||||
this.contacts.byEmail.set(socialId.value, socialId.attachedTo)
|
||||
// Note: this doesn't seem to support multiple emails
|
||||
this.contacts.byId.set(socialId.attachedTo, socialId.value)
|
||||
}
|
||||
for (const channel of channels) {
|
||||
if (channel.value !== '') {
|
||||
this.contacts.byEmail.set(channel.value, channel.attachedTo as Ref<Contact>)
|
||||
this.contacts.byId.set(channel.attachedTo as Ref<Contact>, channel.value)
|
||||
}
|
||||
}
|
||||
this.txHandlers.push(async (...txes: Tx[]) => {
|
||||
for (const tx of txes) {
|
||||
await this.txChannelHandler(tx)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
private async txChannelHandler (actualTx: Tx): Promise<void> {
|
||||
if (actualTx._class === core.class.TxCreateDoc) {
|
||||
if ((actualTx as TxCreateDoc<Doc>).objectClass === contact.class.Channel) {
|
||||
const channel = TxProcessor.createDoc2Doc(actualTx as TxCreateDoc<Channel>)
|
||||
if (channel.provider === contact.channelProvider.Email) {
|
||||
this.contacts.byEmail.set(channel.value, channel.attachedTo as Ref<Contact>)
|
||||
this.contacts.byId.set(channel.attachedTo as Ref<Contact>, channel.value)
|
||||
this.channels.set(channel._id, channel)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (actualTx._class === core.class.TxUpdateDoc) {
|
||||
const updateTx = actualTx as TxUpdateDoc<Channel>
|
||||
if (updateTx.operations.value !== undefined) {
|
||||
const channel = this.channels.get(updateTx.objectId)
|
||||
if (channel !== undefined) {
|
||||
const oldValue = channel.value
|
||||
this.contacts.byEmail.delete(oldValue)
|
||||
TxProcessor.updateDoc2Doc(channel, updateTx)
|
||||
this.contacts.byEmail.set(channel.value, channel.attachedTo as Ref<Contact>)
|
||||
this.contacts.byId.set(channel.attachedTo as Ref<Contact>, channel.value)
|
||||
this.channels.set(channel._id, channel)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (actualTx._class === core.class.TxRemoveDoc) {
|
||||
const remTx = actualTx as TxRemoveDoc<Channel>
|
||||
const channel = this.channels.get(remTx.objectId)
|
||||
if (channel !== undefined) {
|
||||
this.contacts.byEmail.delete(channel.value)
|
||||
this.contacts.byId.delete(channel.attachedTo as Ref<Contact>)
|
||||
this.channels.delete(channel._id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// #endregion
|
||||
|
||||
// #region Integrations
|
||||
|
||||
private async initIntegrations (): Promise<void> {
|
||||
const personsBySocialId = await getPersonRefsBySocialIds(this.client)
|
||||
const integrations = await this.client.findAll(setting.class.Integration, {
|
||||
type: calendar.integrationType.Calendar
|
||||
})
|
||||
for (const integration of integrations) {
|
||||
const person = personsBySocialId[integration.createdBy ?? integration.modifiedBy]
|
||||
if (person != null) {
|
||||
this.integrations.byEmail.set(integration.value, person)
|
||||
const arr = this.integrations.byContact.get(person) ?? []
|
||||
arr.push(integration.value)
|
||||
this.integrations.byContact.set(person, arr)
|
||||
this.integrations.byId.set(integration._id, integration)
|
||||
}
|
||||
}
|
||||
this.txHandlers.push(async (...txes: Tx[]) => {
|
||||
for (const tx of txes) {
|
||||
await this.txIntegrationHandler(tx)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
private addContactIntegration (integration: Integration, person: Ref<Contact>): void {
|
||||
const arr = this.integrations.byContact.get(person) ?? []
|
||||
arr.push(integration.value)
|
||||
this.integrations.byContact.set(person, arr)
|
||||
}
|
||||
|
||||
private removeContactIntegration (integration: Integration, person: Ref<Contact>): void {
|
||||
const arr = this.integrations.byContact.get(person)
|
||||
if (arr !== undefined) {
|
||||
const index = arr.findIndex((p) => p === integration.value)
|
||||
if (index !== -1) {
|
||||
arr.splice(index, 1)
|
||||
if (arr.length > 0) {
|
||||
this.integrations.byContact.set(person, arr)
|
||||
} else {
|
||||
this.integrations.byContact.delete(person)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async addIntegration (integration: Integration): Promise<void> {
|
||||
const person = await getPersonRefBySocialId(this.client, integration.createdBy ?? integration.modifiedBy)
|
||||
if (person != null) {
|
||||
if (integration.value !== '') {
|
||||
this.integrations.byEmail.set(integration.value, person)
|
||||
this.addContactIntegration(integration, person)
|
||||
}
|
||||
this.integrations.byId.set(integration._id, integration)
|
||||
}
|
||||
}
|
||||
|
||||
private async removeIntegration (integration: Integration): Promise<void> {
|
||||
const person = await getPersonRefBySocialId(this.client, integration.createdBy ?? integration.modifiedBy)
|
||||
if (person != null) {
|
||||
this.removeContactIntegration(integration, person)
|
||||
}
|
||||
this.integrations.byEmail.delete(integration.value)
|
||||
this.integrations.byId.delete(integration._id)
|
||||
}
|
||||
|
||||
private async txIntegrationHandler (actualTx: Tx): Promise<void> {
|
||||
if (actualTx._class === core.class.TxCreateDoc) {
|
||||
if ((actualTx as TxCreateDoc<Doc>).objectClass === setting.class.Integration) {
|
||||
const integration = TxProcessor.createDoc2Doc(actualTx as TxCreateDoc<Integration>)
|
||||
if (integration.type === calendar.integrationType.Calendar) {
|
||||
await this.addIntegration(integration)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (actualTx._class === core.class.TxRemoveDoc) {
|
||||
const remTx = actualTx as TxRemoveDoc<Integration>
|
||||
const integration = this.integrations.byId.get(remTx.objectId)
|
||||
if (integration !== undefined) {
|
||||
await this.removeIntegration(integration)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// #endregion
|
||||
|
||||
// #region Users
|
||||
|
||||
async checkUsers (): Promise<void> {
|
||||
const removedEmployees = await this.client.findAll(contact.mixin.Employee, {
|
||||
active: false
|
||||
})
|
||||
|
||||
this.txHandlers.push(async (...txes: Tx[]) => {
|
||||
for (const tx of txes) {
|
||||
await this.txEmployeeHandler(tx)
|
||||
}
|
||||
})
|
||||
for (const employee of removedEmployees) {
|
||||
await this.deactivateUser(employee._id)
|
||||
}
|
||||
}
|
||||
|
||||
private async deactivateUser (person: Ref<Contact>): Promise<void> {
|
||||
const integrations = this.integrations.byContact.get(person) ?? []
|
||||
for (const integration of integrations) {
|
||||
if (integration !== '') {
|
||||
await this.signout(integration as any, true) // TODO: FIXME
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async txEmployeeHandler (tx: Tx): Promise<void> {
|
||||
if (tx._class !== core.class.TxUpdateDoc) return
|
||||
const ctx = tx as TxMixin<Person, Employee>
|
||||
if (!this.client.getHierarchy().isDerived(ctx.objectClass, contact.mixin.Employee)) return
|
||||
if (ctx.attributes.active === false) {
|
||||
await this.deactivateUser(ctx.objectId)
|
||||
}
|
||||
}
|
||||
|
||||
// #endregion
|
||||
}
|
||||
|
||||
async function getTokenByEvent (
|
||||
accountClient: AccountClient,
|
||||
txOp: TxOperations,
|
||||
event: Event,
|
||||
workspace: WorkspaceUuid
|
||||
): Promise<Token | undefined> {
|
||||
const _calendar = await txOp.findOne(calendar.class.ExternalCalendar, {
|
||||
_id: event.calendar as Ref<ExternalCalendar>
|
||||
})
|
||||
if (_calendar === undefined) return
|
||||
const res = await accountClient.getIntegrationSecret({
|
||||
socialId: event.user,
|
||||
kind: CALENDAR_INTEGRATION,
|
||||
workspaceUuid: workspace,
|
||||
key: _calendar.externalUser
|
||||
})
|
||||
if (res == null) return
|
||||
return JSON.parse(res.secret)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user