UBERF-11411: Add communication threads for emails (#9156)
Some checks are pending
CI / build (push) Waiting to run
CI / svelte-check (push) Blocked by required conditions
CI / formatting (push) Blocked by required conditions
CI / test (push) Blocked by required conditions
CI / uitest (push) Waiting to run
CI / uitest-pg (push) Waiting to run
CI / uitest-qms (push) Waiting to run
CI / uitest-workspaces (push) Waiting to run
CI / docker-build (push) Blocked by required conditions
CI / dist-build (push) Blocked by required conditions

* UBERF-11411: Add communication threads for emails

Signed-off-by: Artem Savchenko <armisav@gmail.com>

* UBERF-11411: Clean up

Signed-off-by: Artem Savchenko <armisav@gmail.com>

* UBERF-11411: Update test

Signed-off-by: Artem Savchenko <armisav@gmail.com>

* UBERF-11411: Fix formatting

Signed-off-by: Artem Savchenko <armisav@gmail.com>

* UBERF-11411: Do not create thread for reply

Signed-off-by: Artem Savchenko <armisav@gmail.com>

---------

Signed-off-by: Artem Savchenko <armisav@gmail.com>
This commit is contained in:
Artyom Savchenko 2025-06-03 18:51:03 +07:00 committed by GitHub
parent 94e9d0cbf1
commit c20116c631
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 185 additions and 93 deletions

View File

@ -141,9 +141,10 @@ describe('ChannelCache', () => {
const error = new Error('Database error')
mockClient.findOne.mockRejectedValue(error)
const result = await channelCache.getOrCreateChannel(spaceId, participants, emailAccount, personId)
await expect(channelCache.getOrCreateChannel(spaceId, participants, emailAccount, personId)).rejects.toThrow(
'Failed to create channel for test@example.com in space test-space-id: Database error'
)
expect(result).toBeUndefined()
expect(mockCtx.error).toHaveBeenCalledWith('Failed to create channel', {
me: emailAccount,
space: spaceId,

View File

@ -13,7 +13,8 @@
// limitations under the License.
//
import { MeasureContext, PersonId, Ref, TxOperations, Doc, WorkspaceUuid, generateId } from '@hcengineering/core'
import { MeasureContext, PersonId, Ref, TxOperations, WorkspaceUuid, generateId } from '@hcengineering/core'
import { type Card } from '@hcengineering/card'
import chat from '@hcengineering/chat'
import mail from '@hcengineering/mail'
import { PersonSpace } from '@hcengineering/contact'
@ -27,7 +28,7 @@ const createMutex = new SyncMutex()
*/
export class ChannelCache {
// Key is `${spaceId}:${normalizedEmail}`
private readonly cache = new Map<string, Ref<Doc>>()
private readonly cache = new Map<string, Ref<Card>>()
constructor (
private readonly ctx: MeasureContext,
@ -43,7 +44,7 @@ export class ChannelCache {
participants: PersonId[],
email: string,
owner: PersonId
): Promise<Ref<Doc> | undefined> {
): Promise<Ref<Card>> {
const normalizedEmail = normalizeEmail(email)
const cacheKey = `${spaceId}:${normalizedEmail}`
@ -78,7 +79,7 @@ export class ChannelCache {
participants: PersonId[],
email: string,
personId: PersonId
): Promise<Ref<Doc> | undefined> {
): Promise<Ref<Card>> {
const normalizedEmail = normalizeEmail(email)
try {
// First try to find existing channel
@ -86,7 +87,7 @@ export class ChannelCache {
if (channel != null) {
this.ctx.info('Using existing channel', { me: normalizedEmail, space, channel: channel._id })
return channel._id
return channel._id as Ref<Card>
}
return await this.createNewChannel(space, participants, normalizedEmail, personId)
@ -101,7 +102,9 @@ export class ChannelCache {
// Remove failed lookup from cache
this.cache.delete(`${space}:${normalizedEmail}`)
return undefined
throw new Error(
`Failed to create channel for ${normalizedEmail} in space ${space}: ${err instanceof Error ? err.message : String(err)}`
)
}
}
@ -110,7 +113,7 @@ export class ChannelCache {
participants: PersonId[],
email: string,
personId: PersonId
): Promise<Ref<Doc> | undefined> {
): Promise<Ref<Card>> {
const normalizedEmail = normalizeEmail(email)
const mutexKey = `channel:${this.workspace}:${space}:${normalizedEmail}`
const releaseLock = await createMutex.lock(mutexKey)
@ -124,7 +127,7 @@ export class ChannelCache {
space,
channel: existingChannel._id
})
return existingChannel._id
return existingChannel._id as Ref<Card>
}
// Create new channel if it doesn't exist
@ -156,7 +159,7 @@ export class ChannelCache {
personId
)
return channelId
return channelId as Ref<Card>
} finally {
releaseLock()
}

View File

@ -16,7 +16,7 @@ import { Producer } from 'kafkajs'
import { WorkspaceLoginInfo } from '@hcengineering/account-client'
import { type Card } from '@hcengineering/card'
import { MessageType } from '@hcengineering/communication-types'
import { MessageID, MessageType } from '@hcengineering/communication-types'
import chat from '@hcengineering/chat'
import { PersonSpace } from '@hcengineering/contact'
import {
@ -25,20 +25,25 @@ import {
type PersonId,
type Ref,
type TxOperations,
Doc,
AccountUuid,
generateId,
RateLimiter,
Space
RateLimiter
} from '@hcengineering/core'
import mail from '@hcengineering/mail'
import { type KeyValueClient } from '@hcengineering/kvs-client'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import { MessageRequestEventType } from '@hcengineering/communication-sdk-types'
import {
AddCollaboratorsEvent,
MessageRequestEventType,
CreateFileEvent,
CreateMessageEvent,
CreateThreadEvent,
NotificationRequestEventType
} from '@hcengineering/communication-sdk-types'
import { generateMessageId } from '@hcengineering/communication-shared'
import { BaseConfig, type Attachment } from './types'
import { EmailMessage, MailRecipient } from './types'
import { EmailMessage, MailRecipient, MessageData } from './types'
import { getMdContent } from './utils'
import { PersonCacheFactory } from './person'
import { PersonSpacesCacheFactory } from './personSpaces'
@ -161,8 +166,7 @@ export async function createMessages (
subject,
content,
attachedBlobs,
person.email,
person.socialId,
person,
message.sendOn,
channelCache,
replyTo
@ -188,8 +192,7 @@ async function saveMessageToSpaces (
subject: string,
content: string,
attachments: Attachment[],
me: string,
owner: PersonId,
recipient: MailRecipient,
createdDate: number,
channelCache: ChannelCache,
inReplyTo?: string
@ -197,9 +200,8 @@ async function saveMessageToSpaces (
const rateLimiter = new RateLimiter(10)
for (const space of spaces) {
const spaceId = space._id
let isReply = false
await rateLimiter.add(async () => {
ctx.info('Saving message to space', { mailId, space: spaceId })
let threadId = await threadLookup.getThreadId(mailId, spaceId)
if (threadId !== undefined) {
ctx.info('Message is already in the thread, skip', { mailId, threadId, spaceId })
@ -208,13 +210,10 @@ async function saveMessageToSpaces (
if (inReplyTo !== undefined) {
threadId = await threadLookup.getParentThreadId(inReplyTo, spaceId)
if (threadId !== undefined) {
ctx.info('Found existing thread', { mailId, threadId, spaceId })
}
isReply = threadId !== undefined
}
let channel: Ref<Doc<Space>> | undefined
const channel = await channelCache.getOrCreateChannel(spaceId, participants, recipient.email, recipient.socialId)
if (threadId === undefined) {
channel = await channelCache.getOrCreateChannel(spaceId, participants, me, owner)
const newThreadId = await client.createDoc(
chat.masterTag.Thread,
space._id,
@ -233,78 +232,151 @@ async function saveMessageToSpaces (
createdDate,
modifiedBy
)
await client.createMixin(
newThreadId,
chat.masterTag.Thread,
space._id,
mail.tag.MailThread,
{},
createdDate,
owner
)
threadId = newThreadId as Ref<Card>
ctx.info('Created new thread', { mailId, threadId, spaceId })
}
const messageId = generateMessageId()
const created = new Date(createdDate)
const messageData = Buffer.from(
JSON.stringify({
type: MessageRequestEventType.CreateMessage,
messageType: MessageType.Message,
card: threadId,
cardType: chat.masterTag.Thread,
content,
creator: modifiedBy,
created,
id: messageId
})
)
await producer.send({
topic: config.CommunicationTopic,
messages: [
{
key: Buffer.from(channel ?? spaceId),
value: messageData,
headers: {
WorkspaceUuid: wsInfo.workspace
}
}
]
})
ctx.info('Send message event', { mailId, messageId, threadId })
const messageData: MessageData = {
subject,
content,
channel,
created,
modifiedBy,
mailId,
spaceId,
threadId,
workspace: wsInfo.workspace,
recipient,
isReply
}
const fileData: Buffer[] = attachments.map((a) =>
Buffer.from(
JSON.stringify({
type: MessageRequestEventType.CreateFile,
card: threadId,
message: messageId,
messageCreated: created,
blobId: a.id as Ref<Blob>,
fileType: a.contentType,
filename: a.name,
size: a.data.length,
creator: modifiedBy
})
)
)
const fileEvents = fileData.map((data) => ({
key: Buffer.from(channel ?? spaceId),
value: data,
headers: {
WorkspaceUuid: wsInfo.workspace
}
}))
await producer.send({
topic: config.CommunicationTopic,
messages: fileEvents
})
ctx.info('Send file events', { mailId, messageId, threadId, count: fileEvents.length })
const messageId = await createMailMessage(producer, config, messageData, threadId)
if (!isReply) {
await addCollaborators(producer, config, messageData, threadId)
await createMailThread(producer, config, messageData, messageId)
}
await createFiles(producer, config, attachments, messageData, threadId, messageId)
await threadLookup.setThreadId(mailId, space._id, threadId)
})
}
await rateLimiter.waitProcessing()
}
async function createMailThread (
producer: Producer,
config: BaseConfig,
data: MessageData,
messageId: MessageID
): Promise<void> {
const threadEvent: CreateThreadEvent = {
type: MessageRequestEventType.CreateThread,
card: data.channel,
message: messageId,
messageCreated: data.created,
thread: data.threadId,
threadType: chat.masterTag.Thread
}
const thread = Buffer.from(JSON.stringify(threadEvent))
await sendToCommunicationTopic(producer, config, data, thread)
}
async function createMailMessage (
producer: Producer,
config: BaseConfig,
data: MessageData,
threadId: Ref<Card>
): Promise<MessageID> {
const messageId = generateMessageId()
const createMessageEvent: CreateMessageEvent = {
type: MessageRequestEventType.CreateMessage,
messageType: MessageType.Message,
card: data.isReply ? threadId : data.channel,
cardType: chat.masterTag.Thread,
content: data.content,
creator: data.modifiedBy,
created: data.created,
id: messageId
}
const createMessageData = Buffer.from(JSON.stringify(createMessageEvent))
await sendToCommunicationTopic(producer, config, data, createMessageData)
return messageId
}
async function createFiles (
producer: Producer,
config: BaseConfig,
attachments: Attachment[],
messageData: MessageData,
threadId: Ref<Card>,
messageId: MessageID
): Promise<void> {
const fileData: Buffer[] = attachments.map((a) => {
const creeateFileEvent: CreateFileEvent = {
type: MessageRequestEventType.CreateFile,
card: threadId,
message: messageId,
messageCreated: messageData.created,
creator: messageData.modifiedBy,
data: {
blobId: a.id as Ref<Blob>,
type: a.contentType,
filename: a.name,
size: a.data.length
}
}
return Buffer.from(JSON.stringify(creeateFileEvent))
})
const fileEvents = fileData.map((data) => ({
key: Buffer.from(messageData.channel ?? messageData.spaceId),
value: data,
headers: {
WorkspaceUuid: messageData.workspace
}
}))
await producer.send({
topic: config.CommunicationTopic,
messages: fileEvents
})
}
async function addCollaborators (
producer: Producer,
config: BaseConfig,
data: MessageData,
threadId: Ref<Card>
): Promise<void> {
if (data.recipient.socialId === data.modifiedBy) {
return // Message author should be automatically added as a collaborator
}
const addCollaboratorsEvent: AddCollaboratorsEvent = {
type: NotificationRequestEventType.AddCollaborators,
card: threadId,
cardType: chat.masterTag.Thread,
collaborators: [data.recipient.uuid as AccountUuid],
creator: data.modifiedBy
}
const createMessageData = Buffer.from(JSON.stringify(addCollaboratorsEvent))
await sendToCommunicationTopic(producer, config, data, createMessageData)
}
async function sendToCommunicationTopic (
producer: Producer,
config: BaseConfig,
messageData: MessageData,
content: Buffer
): Promise<void> {
await producer.send({
topic: config.CommunicationTopic,
messages: [
{
key: Buffer.from(messageData.channel ?? messageData.spaceId),
value: content,
headers: {
WorkspaceUuid: messageData.workspace
}
}
]
})
}

View File

@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import { PersonId, PersonUuid } from '@hcengineering/core'
import { Card } from '@hcengineering/card'
import { PersonSpace } from '@hcengineering/contact'
import { PersonId, PersonUuid, Ref, WorkspaceUuid } from '@hcengineering/core'
//
export interface Attachment {
@ -58,3 +60,17 @@ export interface BaseConfig {
QueueRegion: string
CommunicationTopic: string
}
export interface MessageData {
subject: string
content: string
channel: Ref<Card>
created: Date
modifiedBy: PersonId
mailId: string
spaceId: Ref<PersonSpace>
workspace: WorkspaceUuid
threadId: Ref<Card>
recipient: MailRecipient
isReply: boolean
}