Fix analytics collector and ai-bot services (#6331)

Signed-off-by: Kristina Fefelova <kristin.fefelova@gmail.com>
This commit is contained in:
Kristina 2024-08-16 11:02:24 +04:00 committed by GitHub
parent 9df6ba218c
commit d26a03f814
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 453 additions and 164 deletions

22
.vscode/launch.json vendored
View File

@ -347,28 +347,6 @@
"cwd": "${workspaceRoot}/services/rekoni",
"protocol": "inspector"
},
{
"name": "Debug AI bot",
"type": "node",
"request": "launch",
"args": ["src/index.ts"],
"env": {
"ACCOUNTS_URL": "http://localhost:3000",
"MONGO_URL": "mongodb://localhost:27017",
"PORT": "4008",
"SERVER_SECRET": "secret",
"SUPPORT_WORKSPACE": "support",
"FIRST_NAME": "Jolie",
"LAST_NAME": "AI",
"AVATAR_PATH": "./assets/avatar.png",
"AVATAR_CONTENT_TYPE": ".png"
},
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
"sourceMaps": true,
"cwd": "${workspaceRoot}/services/ai-bot/pod-ai-bot",
"protocol": "inspector",
"outputCapture": "std"
},
{
"name": "Debug analytics collector",
"type": "node",

View File

@ -5,5 +5,6 @@
"UPLOAD_URL":"/files",
"REKONI_URL": "http://localhost:4004",
"PRINT_URL": "http://localhost:4005",
"SIGN_URL": "http://localhost:4006"
"SIGN_URL": "http://localhost:4006",
"ANALYTICS_COLLECTOR_URL":"http://localhost:4007"
}

View File

@ -50,6 +50,7 @@ import { trainingOperation } from '@hcengineering/model-training'
import { documentsOperation } from '@hcengineering/model-controlled-documents'
import { productsOperation } from '@hcengineering/model-products'
import { requestOperation } from '@hcengineering/model-request'
import { analyticsCollectorOperation } from '@hcengineering/model-analytics-collector'
export const migrateOperations: [string, MigrateOperation][] = [
['core', coreOperation],
@ -88,5 +89,6 @@ export const migrateOperations: [string, MigrateOperation][] = [
['activityServer', activityServerOperation],
['textEditorOperation', textEditorOperation],
// We should call notification migration after activityServer and chunter
['notification', notificationOperation]
['notification', notificationOperation],
['analyticsCollector', analyticsCollectorOperation]
]

View File

@ -32,6 +32,8 @@
"@hcengineering/chunter": "^0.6.20",
"@hcengineering/core": "^0.6.32",
"@hcengineering/model": "^0.6.11",
"@hcengineering/model-activity": "^0.6.0",
"@hcengineering/model-notification": "^0.6.0",
"@hcengineering/model-chunter": "^0.6.0",
"@hcengineering/model-core": "^0.6.0",
"@hcengineering/model-view": "^0.6.0",

View File

@ -21,6 +21,7 @@ import { TChannel } from '@hcengineering/model-chunter'
import analyticsCollector from './plugin'
export { analyticsCollectorId } from '@hcengineering/analytics-collector'
export { analyticsCollectorOperation } from './migration'
export default analyticsCollector
@Mixin(analyticsCollector.mixin.AnalyticsChannel, chunter.class.Channel)

View File

@ -0,0 +1,56 @@
//
// Copyright © 2020, 2021 Anticrm Platform Contributors.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import {
type MigrateOperation,
type MigrationClient,
type MigrationUpgradeClient,
tryMigrate
} from '@hcengineering/model'
import analyticsCollector, { analyticsCollectorId } from '@hcengineering/analytics-collector'
import { DOMAIN_SPACE } from '@hcengineering/model-core'
import { DOMAIN_DOC_NOTIFY, DOMAIN_NOTIFICATION } from '@hcengineering/model-notification'
import { DOMAIN_ACTIVITY } from '@hcengineering/model-activity'
async function removeAnalyticsChannels (client: MigrationClient): Promise<void> {
const channels = await client.find(DOMAIN_SPACE, {
[`${analyticsCollector.mixin.AnalyticsChannel}`]: { $exists: true }
})
if (channels.length === 0) {
return
}
const channelsIds = channels.map((it) => it._id)
const contexts = await client.find(DOMAIN_DOC_NOTIFY, { objectId: { $in: channelsIds } })
const contextsIds = contexts.map((it) => it._id)
await client.deleteMany(DOMAIN_ACTIVITY, { attachedTo: { $in: channelsIds } })
await client.deleteMany(DOMAIN_NOTIFICATION, { docNotifyContext: { $in: contextsIds } })
await client.deleteMany(DOMAIN_DOC_NOTIFY, { _id: { $in: contextsIds } })
await client.deleteMany(DOMAIN_SPACE, { [`${analyticsCollector.mixin.AnalyticsChannel}`]: { $exists: true } })
}
export const analyticsCollectorOperation: MigrateOperation = {
async migrate (client: MigrationClient): Promise<void> {
await tryMigrate(client, analyticsCollectorId, [
{
state: 'remove-analytics-channels-v1',
func: removeAnalyticsChannels
}
])
},
async upgrade (state: Map<string, Set<string>>, client: () => Promise<MigrationUpgradeClient>): Promise<void> {}
}

View File

@ -65,6 +65,7 @@ export class TAIBotTransferEvent extends TAIBotEvent implements AIBotTransferEve
toEmail!: string
toWorkspace!: string
fromWorkspace!: string
fromWorkspaceUrl!: string
messageId!: Ref<ChatMessage>
parentMessageId?: Ref<ChatMessage>
}

View File

@ -39,6 +39,7 @@ export interface AIBotTransferEvent extends AIBotEvent {
toEmail: string
toWorkspace: string
fromWorkspace: string
fromWorkspaceUrl: string
messageId: Ref<ChatMessage>
parentMessageId?: Ref<ChatMessage>
}

View File

@ -244,7 +244,7 @@ export class ChannelDataProvider implements IChannelDataProvider {
this.isTailLoading.set(true)
const tailStart = metadata[startIndex]?.createdOn
this.loadTail(tailStart)
this.backwardNextPromise = this.loadNext('backward', metadata[startIndex]?.createdOn, this.limit)
this.backwardNextPromise = this.loadNext('backward', metadata[startIndex]?.createdOn, this.limit, false)
} else {
const newStart = Math.max(startPosition - this.limit / 2, 0)
await this.loadMore('forward', metadata[newStart]?.createdOn, this.limit)
@ -309,7 +309,7 @@ export class ChannelDataProvider implements IChannelDataProvider {
return index !== -1 ? metadata.length - index : -1
}
async loadChunk (isBackward: boolean, loadAfter: Timestamp, limit?: number): Promise<Chunk | undefined> {
async loadChunk (isBackward: boolean, loadAfter: Timestamp, limit?: number, equal = true): Promise<Chunk | undefined> {
const client = getClient()
const skipIds = this.getChunkSkipIds(loadAfter)
@ -319,7 +319,13 @@ export class ChannelDataProvider implements IChannelDataProvider {
attachedTo: this.chatId,
space: this.space,
_id: { $nin: skipIds },
createdOn: isBackward ? { $lte: loadAfter } : { $gte: loadAfter }
createdOn: equal
? isBackward
? { $lte: loadAfter }
: { $gte: loadAfter }
: isBackward
? { $lt: loadAfter }
: { $gt: loadAfter }
},
{
limit: limit ?? this.limit,
@ -359,7 +365,7 @@ export class ChannelDataProvider implements IChannelDataProvider {
.map(({ _id }) => _id)
}
async loadNext (mode: LoadMode, loadAfter?: Timestamp, limit?: number): Promise<void> {
async loadNext (mode: LoadMode, loadAfter?: Timestamp, limit?: number, equal = true): Promise<void> {
if (this.chatId === undefined || loadAfter === undefined) {
return
}
@ -384,7 +390,7 @@ export class ChannelDataProvider implements IChannelDataProvider {
return
}
const chunk = await this.loadChunk(isBackward, loadAfter, limit)
const chunk = await this.loadChunk(isBackward, loadAfter, limit, equal)
if (chunk !== undefined && isBackward) {
this.backwardNextStore.set(chunk)

View File

@ -25,7 +25,7 @@
canGroupMessages,
messageInFocus
} from '@hcengineering/activity-resources'
import { Class, Doc, getDay, Ref, Timestamp } from '@hcengineering/core'
import { Class, Doc, generateId, getDay, Ref, Timestamp } from '@hcengineering/core'
import { InboxNotificationsClientImpl } from '@hcengineering/notification-resources'
import { getResource } from '@hcengineering/platform'
import { getClient } from '@hcengineering/presentation'
@ -657,7 +657,7 @@
}
}
function handleScrollDown (): void {
async function handleScrollDown (): Promise<void> {
selectedMessageId = undefined
messageInFocus.set(undefined)
@ -665,8 +665,6 @@
const lastMetadata = metadata[metadata.length - 1]
const lastMessage = displayMessages[displayMessages.length - 1]
void inboxClient.readDoc(client, objectId)
if (lastMetadata._id !== lastMessage._id) {
separatorIndex = -1
provider.jumpToEnd(true)
@ -674,12 +672,17 @@
} else {
scrollToBottom()
}
const op = client.apply(generateId(), 'chunter.scrollDown')
await inboxClient.readDoc(op, objectId)
await op.commit()
}
$: forceReadContext(isScrollAtBottom, notifyContext)
let forceRead = false
$: void forceReadContext(isScrollAtBottom, notifyContext)
function forceReadContext (isScrollAtBottom: boolean, context?: DocNotifyContext): void {
if (context === undefined || !isScrollAtBottom) return
async function forceReadContext (isScrollAtBottom: boolean, context?: DocNotifyContext): Promise<void> {
if (context === undefined || !isScrollAtBottom || forceRead || !separatorElement) return
const { lastUpdateTimestamp = 0, lastViewedTimestamp = 0 } = context
if (lastViewedTimestamp >= lastUpdateTimestamp) return
@ -688,7 +691,10 @@
const unViewed = notifications.filter(({ isViewed }) => !isViewed)
if (unViewed.length === 0) {
void inboxClient.readDoc(client, objectId)
forceRead = true
const op = client.apply(generateId(), 'chunter.forceReadContext')
await inboxClient.readDoc(op, objectId)
await op.commit()
}
}

View File

@ -417,7 +417,7 @@ export function recheckNotifications (context: DocNotifyContext): void {
const toReadData = Array.from(toRead)
toRead.clear()
void (async () => {
const _client = client.apply(generateId())
const _client = client.apply(generateId(), 'recheckNotifications')
await inboxClient.readNotifications(_client, toReadData)
await _client.commit()
})()
@ -434,7 +434,7 @@ export async function readChannelMessages (
const inboxClient = InboxNotificationsClientImpl.getClient()
const client = getClient().apply(generateId())
const client = getClient().apply(generateId(), 'readViewportMessages')
try {
const readMessages = get(chatReadMessagesStore)
const allIds = getAllIds(messages).filter((id) => !readMessages.has(id))

View File

@ -157,12 +157,14 @@ export class InboxNotificationsClientImpl implements InboxNotificationsClient {
return
}
const inboxNotifications = (get(this.inboxNotifications) ?? []).filter(
(notification) => notification.docNotifyContext === docNotifyContext._id && !notification.isViewed
const inboxNotifications = await client.findAll(
notification.class.InboxNotification,
{ docNotifyContext: docNotifyContext._id, isViewed: false },
{ projection: { _id: 1, _class: 1, space: 1 } }
)
for (const notification of inboxNotifications) {
await client.update(notification, { isViewed: true })
await client.updateDoc(notification._class, notification.space, notification._id, { isViewed: true })
}
await client.update(docNotifyContext, { lastViewedTimestamp: Date.now() })
}

View File

@ -308,6 +308,7 @@ export class AnalyticsMiddleware extends BasePresentationMiddleware implements P
}
if (TxProcessor.isExtendsCUD(etx._class)) {
const cud = etx as TxCUD<Doc>
if (cud.objectClass === core.class.BenchmarkDoc) continue
const _class = this.client.getHierarchy().getClass(cud.objectClass)
if (_class.label !== undefined) {
const label = await translate(_class.label, {}, 'en')

Binary file not shown.

View File

@ -182,23 +182,11 @@ function getSupportWorkspaceId (): string | undefined {
return supportWorkspaceId
}
async function onBotDirectMessageSend (control: TriggerControl, message: ChatMessage): Promise<Tx[]> {
async function onBotDirectMessageSend (control: TriggerControl, message: ChatMessage): Promise<void> {
const supportWorkspaceId = getSupportWorkspaceId()
if (supportWorkspaceId === undefined) {
return []
}
const direct = (await getMessageDoc(message, control)) as DirectMessage
if (direct === undefined) {
return []
}
const isAvailable = await isDirectAvailable(direct, control)
if (!isAvailable) {
return []
return
}
const account = control.modelDb.findAllSync(contact.class.PersonAccount, {
@ -206,7 +194,19 @@ async function onBotDirectMessageSend (control: TriggerControl, message: ChatMes
})[0]
if (account === undefined || account.role !== AccountRole.Owner) {
return []
return
}
const direct = (await getMessageDoc(message, control)) as DirectMessage
if (direct === undefined) {
return
}
const isAvailable = await isDirectAvailable(direct, control)
if (!isAvailable) {
return
}
let data: Data<AIBotResponseEvent> | undefined
@ -218,7 +218,7 @@ async function onBotDirectMessageSend (control: TriggerControl, message: ChatMes
}
if (data === undefined) {
return []
return
}
const eventTx = control.txFactory.createTxCreateDoc(aiBot.class.AIBotTransferEvent, message.space, {
@ -228,34 +228,34 @@ async function onBotDirectMessageSend (control: TriggerControl, message: ChatMes
toWorkspace: supportWorkspaceId,
toEmail: account.email,
fromWorkspace: toWorkspaceString(control.workspace),
fromWorkspaceUrl: control.workspace.workspaceUrl,
messageId: message._id,
parentMessageId: await getThreadParent(control, message)
})
await control.apply([eventTx])
await processWorkspace(control)
return [eventTx]
}
async function onSupportWorkspaceMessage (control: TriggerControl, message: ChatMessage): Promise<Tx[]> {
async function onSupportWorkspaceMessage (control: TriggerControl, message: ChatMessage): Promise<void> {
const supportWorkspaceId = getSupportWorkspaceId()
if (supportWorkspaceId === undefined) {
return []
return
}
if (toWorkspaceString(control.workspace) !== supportWorkspaceId) {
return []
return
}
const channel = await getMessageDoc(message, control)
if (channel === undefined) {
return []
return
}
if (!control.hierarchy.hasMixin(channel, analytics.mixin.AnalyticsChannel)) {
return []
return
}
const mixin = control.hierarchy.as(channel, analytics.mixin.AnalyticsChannel)
@ -270,23 +270,24 @@ async function onSupportWorkspaceMessage (control: TriggerControl, message: Chat
}
if (data === undefined) {
return []
return
}
await processWorkspace(control)
const tx = control.txFactory.createTxCreateDoc(aiBot.class.AIBotTransferEvent, message.space, {
messageClass: data.messageClass,
message: message.message,
collection: data.collection,
toEmail: email,
toWorkspace: workspace,
fromWorkspace: toWorkspaceString(control.workspace),
fromWorkspaceUrl: control.workspace.workspaceUrl,
messageId: message._id,
parentMessageId: await getThreadParent(control, message)
})
return [
control.txFactory.createTxCreateDoc(aiBot.class.AIBotTransferEvent, message.space, {
messageClass: data.messageClass,
message: message.message,
collection: data.collection,
toEmail: email,
toWorkspace: workspace,
fromWorkspace: toWorkspaceString(control.workspace),
messageId: message._id,
parentMessageId: await getThreadParent(control, message)
})
]
await control.apply([tx])
await processWorkspace(control)
}
export async function OnMessageSend (
@ -312,19 +313,15 @@ export async function OnMessageSend (
return []
}
const res: Tx[] = []
if (docClass === chunter.class.DirectMessage) {
const txes = await onBotDirectMessageSend(control, message)
res.push(...txes)
await onBotDirectMessageSend(control, message)
}
if (docClass === chunter.class.Channel) {
const txes = await onSupportWorkspaceMessage(control, message)
res.push(...txes)
await onSupportWorkspaceMessage(control, message)
}
return res
return []
}
export async function OnMention (tx: TxCreateDoc<MentionInboxNotification>, control: TriggerControl): Promise<Tx[]> {

View File

@ -13,15 +13,17 @@
// limitations under the License.
//
import chunter, { Channel } from '@hcengineering/chunter'
import core, { AccountRole, Ref, TxOperations } from '@hcengineering/core'
import core, { AccountRole, MeasureContext, Ref, TxOperations } from '@hcengineering/core'
import analyticsCollector, { getAnalyticsChannelName } from '@hcengineering/analytics-collector'
import contact, { Person } from '@hcengineering/contact'
import { translate } from '@hcengineering/platform'
export async function getOrCreateAnalyticsChannel (
ctx: MeasureContext,
client: TxOperations,
email: string,
workspace: string,
workspaceUrl: string,
person?: Person
): Promise<Ref<Channel> | undefined> {
const channel = await client.findOne(chunter.class.Channel, {
@ -33,13 +35,14 @@ export async function getOrCreateAnalyticsChannel (
return channel._id
}
const accounts = await client.findAll(contact.class.PersonAccount, { role: { $ne: AccountRole.Guest } })
ctx.info('Creating analytics channel', { email, workspace })
const accounts = await client.findAll(contact.class.PersonAccount, { role: { $ne: AccountRole.Guest } })
const _id = await client.createDoc(chunter.class.Channel, core.space.Space, {
name: getAnalyticsChannelName(workspace, email),
name: getAnalyticsChannelName(workspaceUrl, email),
topic: await translate(analyticsCollector.string.AnalyticsChannelDescription, {
user: person?.name ?? email,
workspace
workspace: workspaceUrl
}),
description: '',
private: false,

View File

@ -32,6 +32,7 @@ const MAX_ASSIGN_ATTEMPTS = 5
export class AIBotController {
private readonly workspaces: Map<string, WorkspaceClient> = new Map<string, WorkspaceClient>()
private readonly closeWorkspaceTimeouts: Map<string, NodeJS.Timeout> = new Map<string, NodeJS.Timeout>()
private readonly connectingWorkspaces: Set<string> = new Set<string>()
private readonly db: Db
private readonly ctx: MeasureContext
@ -58,7 +59,13 @@ export class AIBotController {
for (const record of activeRecords) {
const id: WorkspaceId = { name: record.workspace, productId: record.productId }
if (this.workspaces.has(toWorkspaceString(id))) {
const ws = toWorkspaceString(id)
if (this.workspaces.has(ws)) {
continue
}
if (this.connectingWorkspaces.has(ws)) {
continue
}
@ -89,6 +96,7 @@ export class AIBotController {
await client.close()
this.workspaces.delete(workspace)
}
this.connectingWorkspaces.delete(workspace)
}
private async getWorkspaceInfo (ws: WorkspaceId): Promise<WorkspaceLoginInfo | undefined> {
@ -146,6 +154,7 @@ export class AIBotController {
async initWorkspaceClient (workspaceId: WorkspaceId, info: WorkspaceInfoRecord): Promise<void> {
const workspace = toWorkspaceString(workspaceId)
this.connectingWorkspaces.add(workspace)
if (!this.workspaces.has(workspace)) {
this.ctx.info('Listen workspace: ', { workspace })
@ -169,6 +178,7 @@ export class AIBotController {
}, CLOSE_INTERVAL_MS)
this.closeWorkspaceTimeouts.set(workspace, newTimeoutId)
this.connectingWorkspaces.delete(workspace)
}
async transfer (event: AIBotTransferEvent): Promise<void> {

View File

@ -0,0 +1,25 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { coreId } from '@hcengineering/core'
import coreEng from '@hcengineering/core/lang/en.json'
import platformEng from '@hcengineering/platform/lang/en.json'
import { addStringsLoader, platformId } from '@hcengineering/platform'
export function registerLoaders (): void {
addStringsLoader(coreId, async (lang: string) => coreEng)
addStringsLoader(platformId, async (lang: string) => platformEng)
}

View File

@ -23,6 +23,7 @@ import config from './config'
import { closeDB, getDB } from './storage'
import { AIBotController } from './controller'
import { createBotAccount } from './account'
import { registerLoaders } from './loaders'
export const start = async (): Promise<void> => {
setMetadata(serverToken.metadata.Secret, config.ServerSecret)
@ -30,6 +31,7 @@ export const start = async (): Promise<void> => {
setMetadata(serverClient.metadata.UserAgent, config.ServiceID)
setMetadata(serverClient.metadata.Endpoint, config.AccountsURL)
registerLoaders()
const ctx = new MeasureMetricsContext('ai-bot-service', {})
ctx.info('AI Bot Service started', { firstName: config.FirstName, lastName: config.LastName })

View File

@ -26,10 +26,11 @@ import core, {
TxOperations,
TxProcessor,
WorkspaceId,
Blob
Blob,
RateLimiter
} from '@hcengineering/core'
import aiBot, { AIBotEvent, aiBotAccountEmail, AIBotResponseEvent, AIBotTransferEvent } from '@hcengineering/ai-bot'
import chunter, { ChatMessage, DirectMessage, ThreadMessage } from '@hcengineering/chunter'
import chunter, { Channel, ChatMessage, DirectMessage, ThreadMessage } from '@hcengineering/chunter'
import contact, { AvatarType, combineName, getFirstName, getLastName, PersonAccount } from '@hcengineering/contact'
import { generateToken } from '@hcengineering/server-token'
import notification from '@hcengineering/notification'
@ -57,7 +58,11 @@ export class WorkspaceClient {
initializePromise: Promise<void> | undefined = undefined
channelByKey = new Map<string, Ref<Channel>>()
aiAccount: PersonAccount | undefined
rate = new RateLimiter(1)
directByEmail = new Map<string, Ref<DirectMessage>>()
constructor (
readonly transactorUrl: string,
@ -176,15 +181,13 @@ export class WorkspaceClient {
return
}
this.opClient = new TxOperations(this.client, aiBot.account.AIBot)
void this.opClient.findAll(aiBot.class.AIBotEvent, {}).then((res) => {
void this.processEvents(res)
})
await this.uploadAvatarFile(this.opClient)
const events = await this.opClient.findAll(aiBot.class.AIBotTransferEvent, {})
void this.processEvents(events)
this.client.notify = (...txes: Tx[]) => {
void this.txHandler(txes)
}
await this.uploadAvatarFile(this.opClient)
this.ctx.info('Initialized workspace', this.workspace)
}
@ -302,7 +305,7 @@ export class WorkspaceClient {
await this.opClient.remove(event)
}
async getAccount (email: string): Promise<Account | undefined> {
async getAccount (email: string): Promise<PersonAccount | undefined> {
if (this.opClient === undefined) {
return
}
@ -310,12 +313,12 @@ export class WorkspaceClient {
return await this.opClient.findOne(contact.class.PersonAccount, { email })
}
async getDirect (_id: Ref<Account>): Promise<Ref<DirectMessage> | undefined> {
async getDirect (email: string): Promise<Ref<DirectMessage> | undefined> {
if (this.opClient === undefined) {
return
}
const personAccount = await this.opClient.findOne(contact.class.PersonAccount, { _id: _id as Ref<PersonAccount> })
const personAccount = await this.getAccount(email)
if (personAccount === undefined) {
return
@ -331,7 +334,7 @@ export class WorkspaceClient {
}
}
const id = await this.opClient.createDoc<DirectMessage>(chunter.class.DirectMessage, core.space.Space, {
const dmId = await this.opClient.createDoc<DirectMessage>(chunter.class.DirectMessage, core.space.Space, {
name: '',
description: '',
private: true,
@ -339,31 +342,40 @@ export class WorkspaceClient {
members: accIds
})
if (this.aiAccount === undefined) return id
if (this.aiAccount === undefined) return dmId
const space = await this.opClient.findOne(contact.class.PersonSpace, { person: this.aiAccount.person })
if (space === undefined) return id
if (space === undefined) return dmId
await this.opClient.createDoc(notification.class.DocNotifyContext, space._id, {
user: aiBot.account.AIBot,
objectId: id,
objectId: dmId,
objectClass: chunter.class.DirectMessage,
objectSpace: core.space.Space,
isPinned: false
})
return id
return dmId
}
async transferToSupport (event: AIBotTransferEvent): Promise<void> {
if (this.opClient === undefined) {
return
}
const channel = await getOrCreateAnalyticsChannel(this.opClient, event.toEmail, event.fromWorkspace)
async transferToSupport (event: AIBotTransferEvent, channelRef?: Ref<Channel>): Promise<void> {
if (this.opClient === undefined) return
const key = `${event.toEmail}-${event.fromWorkspace}`
const channel =
channelRef ??
this.channelByKey.get(key) ??
(await getOrCreateAnalyticsChannel(
this.ctx,
this.opClient,
event.toEmail,
event.fromWorkspace,
event.fromWorkspaceUrl
))
if (channel === undefined) {
return
}
this.channelByKey.set(key, channel)
await this.createTransferMessage(this.opClient, event, channel, chunter.class.Channel, channel, event.message)
}
@ -372,30 +384,48 @@ export class WorkspaceClient {
return
}
const account = await this.getAccount(event.toEmail)
if (account === undefined) {
return
}
const direct = await this.getDirect(account._id)
const direct = this.directByEmail.get(event.toEmail) ?? (await this.getDirect(event.toEmail))
if (direct === undefined) {
return
}
this.directByEmail.set(event.toEmail, direct)
await this.createTransferMessage(this.opClient, event, direct, chunter.class.DirectMessage, direct, event.message)
}
getChannelRef (email: string, workspace: string): Ref<Channel> | undefined {
const key = `${email}-${workspace}`
return this.channelByKey.get(key)
}
async transfer (event: AIBotTransferEvent): Promise<void> {
if (this.initializePromise instanceof Promise) {
await this.initializePromise
}
if (event.toWorkspace === config.SupportWorkspace) {
await this.transferToSupport(event)
const channel = this.getChannelRef(event.toEmail, event.fromWorkspace)
if (channel !== undefined) {
await this.transferToSupport(event, channel)
} else {
// If we dont have AnalyticsChannel we should call it sync to prevent multiple channel for the same user and workspace
await this.rate.add(async () => {
await this.transferToSupport(event)
})
}
} else {
await this.transferToUserDirect(event)
if (this.directByEmail.has(event.toEmail)) {
await this.transferToUserDirect(event)
} else {
// If we dont have Direct with user we should call it sync to prevent multiple directs for the same user
await this.rate.add(async () => {
await this.transferToUserDirect(event)
})
}
}
}

View File

@ -53,6 +53,8 @@
"typescript": "^5.3.3"
},
"dependencies": {
"@hcengineering/analytics": "^0.6.0",
"@hcengineering/analytics-service": "^0.6.0",
"@hcengineering/account": "^0.6.0",
"@hcengineering/chunter": "^0.6.20",
"@hcengineering/chunter-assets": "^0.6.18",

View File

@ -1,7 +1,23 @@
import config from './config'
import { WorkspaceLoginInfo } from '@hcengineering/account'
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
export async function getWorkspaceInfo (token: string): Promise<WorkspaceLoginInfo | undefined> {
import { WorkspaceInfo } from '@hcengineering/account'
import config from './config'
export async function getWorkspaceInfo (token: string): Promise<WorkspaceInfo | undefined> {
const accountsUrl = config.AccountsUrl
const workspaceInfo = await (
await fetch(accountsUrl, {
@ -17,5 +33,5 @@ export async function getWorkspaceInfo (token: string): Promise<WorkspaceLoginIn
})
).json()
return workspaceInfo.result as WorkspaceLoginInfo
return workspaceInfo.result as WorkspaceInfo
}

View File

@ -1,6 +1,28 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { generateToken, Token } from '@hcengineering/server-token'
import { AnalyticEvent } from '@hcengineering/analytics-collector'
import { AccountRole, getWorkspaceId, Timestamp, toWorkspaceString, WorkspaceId } from '@hcengineering/core'
import {
AccountRole,
getWorkspaceId,
MeasureContext,
Timestamp,
toWorkspaceString,
WorkspaceId
} from '@hcengineering/core'
import { Person } from '@hcengineering/contact'
import { WorkspaceClient } from './workspaceClient'
import config from './config'
@ -14,13 +36,16 @@ export class Collector {
private readonly workspaces: Map<string, WorkspaceClient> = new Map<string, WorkspaceClient>()
private readonly closeWorkspaceTimeouts: Map<string, NodeJS.Timeout> = new Map<string, NodeJS.Timeout>()
private readonly createdWorkspaces: Set<string> = new Set<string>()
private readonly workspaceUrlById = new Map<string, string>()
supportClient: WorkspaceClient | undefined = undefined
eventsByEmail = new Map<string, AnalyticEvent[]>()
periodicTimer: NodeJS.Timeout
constructor () {
persons = new Map<string, Person>()
constructor (private readonly ctx: MeasureContext) {
this.periodicTimer = setInterval(() => {
void this.clearEvents()
}, clearEventsTimeout)
@ -41,6 +66,7 @@ export class Collector {
}
async closeWorkspaceClient (workspaceId: WorkspaceId): Promise<void> {
this.ctx.info('Closing workspace client', { workspace: toWorkspaceString(workspaceId) })
const workspace = toWorkspaceString(workspaceId)
const timeoutId = this.closeWorkspaceTimeouts.get(workspace)
@ -57,11 +83,12 @@ export class Collector {
}
}
async getWorkspaceClient (workspaceId: WorkspaceId): Promise<WorkspaceClient> {
getWorkspaceClient (workspaceId: WorkspaceId): WorkspaceClient {
const workspace = toWorkspaceString(workspaceId)
const wsClient = this.workspaces.get(workspace) ?? new WorkspaceClient(workspaceId)
const wsClient = this.workspaces.get(workspace) ?? new WorkspaceClient(this.ctx, workspaceId)
if (!this.workspaces.has(workspace)) {
this.ctx.info('Creating workspace client', { workspace, allClients: Array.from(this.workspaces.keys()) })
this.workspaces.set(workspace, wsClient)
}
@ -93,7 +120,7 @@ export class Collector {
getSupportWorkspaceClient (): WorkspaceClient {
if (this.supportClient === undefined) {
this.supportClient = new WorkspaceClient(getWorkspaceId(config.SupportWorkspace))
this.supportClient = new WorkspaceClient(this.ctx, getWorkspaceId(config.SupportWorkspace))
}
return this.supportClient
@ -106,14 +133,17 @@ export class Collector {
return true
}
console.info('isWorkspaceCreated', token.email, token.workspace.name)
const info = await getWorkspaceInfo(generateToken(token.email, token.workspace, token.extra))
console.log('workspace info', info?.workspace, info?.email, info?.endpoint)
this.ctx.info('workspace info', info)
if (info === undefined) {
return false
}
if (info?.workspaceUrl != null) {
this.workspaceUrlById.set(ws, info.workspaceUrl)
}
if (info?.creating === true) {
return false
}
@ -122,6 +152,35 @@ export class Collector {
return true
}
async getPerson (email: string, workspace: WorkspaceId): Promise<Person | undefined> {
const wsString = toWorkspaceString(workspace)
const key = `${email}-${wsString}`
if (this.persons.has(key)) {
return this.persons.get(key)
}
const fromWsClient = this.getWorkspaceClient(workspace)
const account = await fromWsClient.getAccount(email)
if (account === undefined) {
this.ctx.error('Cannnot found account', { email, workspace: wsString })
return
}
if (account.role !== AccountRole.Owner) {
return
}
const person = await fromWsClient.getPerson(account)
if (person !== undefined) {
this.persons.set(key, person)
}
return person
}
async pushEvents (events: AnalyticEvent[], token: Token): Promise<void> {
const isCreated = await this.isWorkspaceCreated(token)
@ -129,22 +188,21 @@ export class Collector {
return
}
const fromWsClient = await this.getWorkspaceClient(token.workspace)
const account = await fromWsClient.getAccount(token.email)
const person = await this.getPerson(token.email, token.workspace)
if (account === undefined) {
console.error('Cannnot found account', { email: token.email, workspace: toWorkspaceString(token.workspace) })
if (person === undefined) {
return
}
if (account.role !== AccountRole.Owner) {
return
}
const person = await fromWsClient.getPerson(account)
const client = this.getSupportWorkspaceClient()
await client.pushEvents(events, token.email, token.workspace, person)
await client.pushEvents(
events,
token.email,
token.workspace,
person,
this.workspaceUrlById.get(toWorkspaceString(token.workspace))
)
}
getEvents (start?: Timestamp, end?: Timestamp): AnalyticEvent[] {

View File

@ -20,6 +20,7 @@ export interface Config {
ServiceID: string
SupportWorkspace: string
AccountsUrl: string
SentryDSN?: string
}
const parseNumber = (str: string | undefined): number | undefined => (str !== undefined ? Number(str) : undefined)
@ -31,7 +32,8 @@ const config: Config = (() => {
Secret: process.env.SECRET,
ServiceID: process.env.SERVICE_ID ?? 'analytics-collector-service',
SupportWorkspace: process.env.SUPPORT_WORKSPACE,
AccountsUrl: process.env.ACCOUNTS_URL
AccountsUrl: process.env.ACCOUNTS_URL,
SentryDSN: process.env.SENTRY_DSN ?? ''
}
const missingEnv = (Object.keys(params) as Array<keyof Config>).filter((key) => params[key] === undefined)

View File

@ -1,3 +1,18 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { analyticsCollectorId } from '@hcengineering/analytics-collector'
import { chunterId } from '@hcengineering/chunter'
import { contactId } from '@hcengineering/contact'

View File

@ -15,26 +15,45 @@
import { setMetadata } from '@hcengineering/platform'
import serverToken from '@hcengineering/server-token'
import { Analytics } from '@hcengineering/analytics'
import { SplitLogger, configureAnalytics } from '@hcengineering/analytics-service'
import serverClient from '@hcengineering/server-client'
import { MeasureMetricsContext, newMetrics } from '@hcengineering/core'
import { join } from 'path'
import config from './config'
import { createServer, listen } from './server'
import { Collector } from './collector'
import { registerLoaders } from './loaders'
import serverClient from '@hcengineering/server-client'
const ctx = new MeasureMetricsContext(
'analytics-collector-service',
{},
{},
newMetrics(),
new SplitLogger('analytics-collector-service', {
root: join(process.cwd(), 'logs'),
enableConsole: (process.env.ENABLE_CONSOLE ?? 'true') === 'true'
})
)
configureAnalytics(config.SentryDSN, config)
Analytics.setTag('application', 'analytics-collector-service')
export const main = async (): Promise<void> => {
setMetadata(serverToken.metadata.Secret, config.Secret)
setMetadata(serverClient.metadata.Endpoint, config.AccountsUrl)
setMetadata(serverClient.metadata.UserAgent, config.ServiceID)
console.log('Analytics service')
console.log(config.AccountsUrl)
console.log(config.DbURL)
console.log(config.SupportWorkspace)
ctx.info('Analytics service started', {
accountsUrl: config.AccountsUrl,
dbUrl: config.DbURL,
supportWorkspace: config.SupportWorkspace
})
registerLoaders()
const collector = new Collector()
const collector = new Collector(ctx)
const app = createServer(collector)
const server = listen(app, config.Port)

View File

@ -15,6 +15,8 @@
import core, {
Client,
MeasureContext,
RateLimiter,
Ref,
systemAccountEmail,
toWorkspaceString,
@ -38,7 +40,12 @@ export class WorkspaceClient {
channelIdByKey = new Map<string, Ref<Channel>>()
constructor (readonly workspace: WorkspaceId) {
rate = new RateLimiter(1)
constructor (
readonly ctx: MeasureContext,
readonly workspace: WorkspaceId
) {
this.initializePromise = this.initClient().then(() => {
this.initializePromise = undefined
})
@ -79,35 +86,51 @@ export class WorkspaceClient {
return await this.opClient.findOne(contact.class.Person, { _id: account.person })
}
async pushEvents (events: AnalyticEvent[], email: string, workspace: WorkspaceId, person?: Person): Promise<void> {
if (this.initializePromise instanceof Promise) {
await this.initializePromise
async getChannel (
client: TxOperations,
workspace: string,
workspaceName: string,
email: string,
person?: Person
): Promise<Ref<Channel> | undefined> {
const key = `${email}-${workspace}`
if (this.channelIdByKey.has(key)) {
return this.channelIdByKey.get(key)
}
if (this.opClient === undefined) {
return
const channel = await getOrCreateAnalyticsChannel(this.ctx, client, email, workspace, workspaceName, person)
if (channel !== undefined) {
this.channelIdByKey.set(key, channel)
}
return channel
}
async processEvents (
client: TxOperations,
events: AnalyticEvent[],
email: string,
workspace: WorkspaceId,
person?: Person,
wsUrl?: string,
channelRef?: Ref<Channel>
): Promise<void> {
const wsString = toWorkspaceString(workspace)
const channelKey = `${email}-${wsString}`
const channel =
this.channelIdByKey.get(channelKey) ?? (await getOrCreateAnalyticsChannel(this.opClient, email, wsString, person))
const channel = channelRef ?? (await this.getChannel(client, wsString, wsUrl ?? wsString, email, person))
if (channel === undefined) {
return
}
this.channelIdByKey.set(channelKey, channel)
for (const event of events) {
const markup = await eventToMarkup(event, this.opClient.getHierarchy())
const markup = await eventToMarkup(event, client.getHierarchy())
if (markup === undefined) {
continue
}
await this.opClient.addCollection(
await client.addCollection(
chunter.class.ChatMessage,
channel,
channel,
@ -120,6 +143,36 @@ export class WorkspaceClient {
}
}
async pushEvents (
events: AnalyticEvent[],
email: string,
workspace: WorkspaceId,
person?: Person,
wsUrl?: string
): Promise<void> {
if (this.initializePromise instanceof Promise) {
await this.initializePromise
}
if (this.opClient === undefined) {
return
}
const wsString = toWorkspaceString(workspace)
const channelKey = `${email}-${wsString}`
if (this.channelIdByKey.has(channelKey)) {
const channel = this.channelIdByKey.get(channelKey)
await this.processEvents(this.opClient, events, email, workspace, person, wsUrl, channel)
} else {
// If we dont have AnalyticsChannel we should call it sync to prevent multiple channels for the same user and workspace
await this.rate.add(async () => {
if (this.opClient === undefined) return
await this.processEvents(this.opClient, events, email, workspace, person, wsUrl)
})
}
}
async close (): Promise<void> {
if (this.client === undefined) {
return