diff --git a/services/mail/mail-common/src/__tests__/channel.test.ts b/services/mail/mail-common/src/__tests__/channel.test.ts index b35d202200..94d039a1a1 100644 --- a/services/mail/mail-common/src/__tests__/channel.test.ts +++ b/services/mail/mail-common/src/__tests__/channel.test.ts @@ -141,9 +141,10 @@ describe('ChannelCache', () => { const error = new Error('Database error') mockClient.findOne.mockRejectedValue(error) - const result = await channelCache.getOrCreateChannel(spaceId, participants, emailAccount, personId) + await expect(channelCache.getOrCreateChannel(spaceId, participants, emailAccount, personId)).rejects.toThrow( + 'Failed to create channel for test@example.com in space test-space-id: Database error' + ) - expect(result).toBeUndefined() expect(mockCtx.error).toHaveBeenCalledWith('Failed to create channel', { me: emailAccount, space: spaceId, diff --git a/services/mail/mail-common/src/channel.ts b/services/mail/mail-common/src/channel.ts index 5e40907338..1da3165df6 100644 --- a/services/mail/mail-common/src/channel.ts +++ b/services/mail/mail-common/src/channel.ts @@ -13,7 +13,8 @@ // limitations under the License. // -import { MeasureContext, PersonId, Ref, TxOperations, Doc, WorkspaceUuid, generateId } from '@hcengineering/core' +import { MeasureContext, PersonId, Ref, TxOperations, WorkspaceUuid, generateId } from '@hcengineering/core' +import { type Card } from '@hcengineering/card' import chat from '@hcengineering/chat' import mail from '@hcengineering/mail' import { PersonSpace } from '@hcengineering/contact' @@ -27,7 +28,7 @@ const createMutex = new SyncMutex() */ export class ChannelCache { // Key is `${spaceId}:${normalizedEmail}` - private readonly cache = new Map>() + private readonly cache = new Map>() constructor ( private readonly ctx: MeasureContext, @@ -43,7 +44,7 @@ export class ChannelCache { participants: PersonId[], email: string, owner: PersonId - ): Promise | undefined> { + ): Promise> { const normalizedEmail = normalizeEmail(email) const cacheKey = `${spaceId}:${normalizedEmail}` @@ -78,7 +79,7 @@ export class ChannelCache { participants: PersonId[], email: string, personId: PersonId - ): Promise | undefined> { + ): Promise> { const normalizedEmail = normalizeEmail(email) try { // First try to find existing channel @@ -86,7 +87,7 @@ export class ChannelCache { if (channel != null) { this.ctx.info('Using existing channel', { me: normalizedEmail, space, channel: channel._id }) - return channel._id + return channel._id as Ref } return await this.createNewChannel(space, participants, normalizedEmail, personId) @@ -101,7 +102,9 @@ export class ChannelCache { // Remove failed lookup from cache this.cache.delete(`${space}:${normalizedEmail}`) - return undefined + throw new Error( + `Failed to create channel for ${normalizedEmail} in space ${space}: ${err instanceof Error ? err.message : String(err)}` + ) } } @@ -110,7 +113,7 @@ export class ChannelCache { participants: PersonId[], email: string, personId: PersonId - ): Promise | undefined> { + ): Promise> { const normalizedEmail = normalizeEmail(email) const mutexKey = `channel:${this.workspace}:${space}:${normalizedEmail}` const releaseLock = await createMutex.lock(mutexKey) @@ -124,7 +127,7 @@ export class ChannelCache { space, channel: existingChannel._id }) - return existingChannel._id + return existingChannel._id as Ref } // Create new channel if it doesn't exist @@ -156,7 +159,7 @@ export class ChannelCache { personId ) - return channelId + return channelId as Ref } finally { releaseLock() } diff --git a/services/mail/mail-common/src/message.ts b/services/mail/mail-common/src/message.ts index 51fd8693d6..d98921af92 100644 --- a/services/mail/mail-common/src/message.ts +++ b/services/mail/mail-common/src/message.ts @@ -16,7 +16,7 @@ import { Producer } from 'kafkajs' import { WorkspaceLoginInfo } from '@hcengineering/account-client' import { type Card } from '@hcengineering/card' -import { MessageType } from '@hcengineering/communication-types' +import { MessageID, MessageType } from '@hcengineering/communication-types' import chat from '@hcengineering/chat' import { PersonSpace } from '@hcengineering/contact' import { @@ -25,20 +25,25 @@ import { type PersonId, type Ref, type TxOperations, - Doc, + AccountUuid, generateId, - RateLimiter, - Space + RateLimiter } from '@hcengineering/core' -import mail from '@hcengineering/mail' import { type KeyValueClient } from '@hcengineering/kvs-client' import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage' -import { MessageRequestEventType } from '@hcengineering/communication-sdk-types' +import { + AddCollaboratorsEvent, + MessageRequestEventType, + CreateFileEvent, + CreateMessageEvent, + CreateThreadEvent, + NotificationRequestEventType +} from '@hcengineering/communication-sdk-types' import { generateMessageId } from '@hcengineering/communication-shared' import { BaseConfig, type Attachment } from './types' -import { EmailMessage, MailRecipient } from './types' +import { EmailMessage, MailRecipient, MessageData } from './types' import { getMdContent } from './utils' import { PersonCacheFactory } from './person' import { PersonSpacesCacheFactory } from './personSpaces' @@ -161,8 +166,7 @@ export async function createMessages ( subject, content, attachedBlobs, - person.email, - person.socialId, + person, message.sendOn, channelCache, replyTo @@ -188,8 +192,7 @@ async function saveMessageToSpaces ( subject: string, content: string, attachments: Attachment[], - me: string, - owner: PersonId, + recipient: MailRecipient, createdDate: number, channelCache: ChannelCache, inReplyTo?: string @@ -197,9 +200,8 @@ async function saveMessageToSpaces ( const rateLimiter = new RateLimiter(10) for (const space of spaces) { const spaceId = space._id + let isReply = false await rateLimiter.add(async () => { - ctx.info('Saving message to space', { mailId, space: spaceId }) - let threadId = await threadLookup.getThreadId(mailId, spaceId) if (threadId !== undefined) { ctx.info('Message is already in the thread, skip', { mailId, threadId, spaceId }) @@ -208,13 +210,10 @@ async function saveMessageToSpaces ( if (inReplyTo !== undefined) { threadId = await threadLookup.getParentThreadId(inReplyTo, spaceId) - if (threadId !== undefined) { - ctx.info('Found existing thread', { mailId, threadId, spaceId }) - } + isReply = threadId !== undefined } - let channel: Ref> | undefined + const channel = await channelCache.getOrCreateChannel(spaceId, participants, recipient.email, recipient.socialId) if (threadId === undefined) { - channel = await channelCache.getOrCreateChannel(spaceId, participants, me, owner) const newThreadId = await client.createDoc( chat.masterTag.Thread, space._id, @@ -233,78 +232,151 @@ async function saveMessageToSpaces ( createdDate, modifiedBy ) - await client.createMixin( - newThreadId, - chat.masterTag.Thread, - space._id, - mail.tag.MailThread, - {}, - createdDate, - owner - ) threadId = newThreadId as Ref - ctx.info('Created new thread', { mailId, threadId, spaceId }) } - const messageId = generateMessageId() const created = new Date(createdDate) - const messageData = Buffer.from( - JSON.stringify({ - type: MessageRequestEventType.CreateMessage, - messageType: MessageType.Message, - card: threadId, - cardType: chat.masterTag.Thread, - content, - creator: modifiedBy, - created, - id: messageId - }) - ) - await producer.send({ - topic: config.CommunicationTopic, - messages: [ - { - key: Buffer.from(channel ?? spaceId), - value: messageData, - headers: { - WorkspaceUuid: wsInfo.workspace - } - } - ] - }) - ctx.info('Send message event', { mailId, messageId, threadId }) + const messageData: MessageData = { + subject, + content, + channel, + created, + modifiedBy, + mailId, + spaceId, + threadId, + workspace: wsInfo.workspace, + recipient, + isReply + } - const fileData: Buffer[] = attachments.map((a) => - Buffer.from( - JSON.stringify({ - type: MessageRequestEventType.CreateFile, - card: threadId, - message: messageId, - messageCreated: created, - blobId: a.id as Ref, - fileType: a.contentType, - filename: a.name, - size: a.data.length, - creator: modifiedBy - }) - ) - ) - const fileEvents = fileData.map((data) => ({ - key: Buffer.from(channel ?? spaceId), - value: data, - headers: { - WorkspaceUuid: wsInfo.workspace - } - })) - await producer.send({ - topic: config.CommunicationTopic, - messages: fileEvents - }) - ctx.info('Send file events', { mailId, messageId, threadId, count: fileEvents.length }) + const messageId = await createMailMessage(producer, config, messageData, threadId) + if (!isReply) { + await addCollaborators(producer, config, messageData, threadId) + await createMailThread(producer, config, messageData, messageId) + } + await createFiles(producer, config, attachments, messageData, threadId, messageId) await threadLookup.setThreadId(mailId, space._id, threadId) }) } await rateLimiter.waitProcessing() } + +async function createMailThread ( + producer: Producer, + config: BaseConfig, + data: MessageData, + messageId: MessageID +): Promise { + const threadEvent: CreateThreadEvent = { + type: MessageRequestEventType.CreateThread, + card: data.channel, + message: messageId, + messageCreated: data.created, + thread: data.threadId, + threadType: chat.masterTag.Thread + } + const thread = Buffer.from(JSON.stringify(threadEvent)) + await sendToCommunicationTopic(producer, config, data, thread) +} + +async function createMailMessage ( + producer: Producer, + config: BaseConfig, + data: MessageData, + threadId: Ref +): Promise { + const messageId = generateMessageId() + const createMessageEvent: CreateMessageEvent = { + type: MessageRequestEventType.CreateMessage, + messageType: MessageType.Message, + card: data.isReply ? threadId : data.channel, + cardType: chat.masterTag.Thread, + content: data.content, + creator: data.modifiedBy, + created: data.created, + id: messageId + } + const createMessageData = Buffer.from(JSON.stringify(createMessageEvent)) + await sendToCommunicationTopic(producer, config, data, createMessageData) + return messageId +} + +async function createFiles ( + producer: Producer, + config: BaseConfig, + attachments: Attachment[], + messageData: MessageData, + threadId: Ref, + messageId: MessageID +): Promise { + const fileData: Buffer[] = attachments.map((a) => { + const creeateFileEvent: CreateFileEvent = { + type: MessageRequestEventType.CreateFile, + card: threadId, + message: messageId, + messageCreated: messageData.created, + creator: messageData.modifiedBy, + data: { + blobId: a.id as Ref, + type: a.contentType, + filename: a.name, + size: a.data.length + } + } + return Buffer.from(JSON.stringify(creeateFileEvent)) + }) + const fileEvents = fileData.map((data) => ({ + key: Buffer.from(messageData.channel ?? messageData.spaceId), + value: data, + headers: { + WorkspaceUuid: messageData.workspace + } + })) + await producer.send({ + topic: config.CommunicationTopic, + messages: fileEvents + }) +} + +async function addCollaborators ( + producer: Producer, + config: BaseConfig, + data: MessageData, + threadId: Ref +): Promise { + if (data.recipient.socialId === data.modifiedBy) { + return // Message author should be automatically added as a collaborator + } + const addCollaboratorsEvent: AddCollaboratorsEvent = { + type: NotificationRequestEventType.AddCollaborators, + card: threadId, + cardType: chat.masterTag.Thread, + collaborators: [data.recipient.uuid as AccountUuid], + creator: data.modifiedBy + } + const createMessageData = Buffer.from(JSON.stringify(addCollaboratorsEvent)) + await sendToCommunicationTopic(producer, config, data, createMessageData) +} + +async function sendToCommunicationTopic ( + producer: Producer, + config: BaseConfig, + messageData: MessageData, + content: Buffer +): Promise { + await producer.send({ + topic: config.CommunicationTopic, + messages: [ + { + key: Buffer.from(messageData.channel ?? messageData.spaceId), + value: content, + headers: { + WorkspaceUuid: messageData.workspace + } + } + ] + }) +} diff --git a/services/mail/mail-common/src/types.ts b/services/mail/mail-common/src/types.ts index f1e860d63c..2fc5f44804 100644 --- a/services/mail/mail-common/src/types.ts +++ b/services/mail/mail-common/src/types.ts @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -import { PersonId, PersonUuid } from '@hcengineering/core' +import { Card } from '@hcengineering/card' +import { PersonSpace } from '@hcengineering/contact' +import { PersonId, PersonUuid, Ref, WorkspaceUuid } from '@hcengineering/core' // export interface Attachment { @@ -58,3 +60,17 @@ export interface BaseConfig { QueueRegion: string CommunicationTopic: string } + +export interface MessageData { + subject: string + content: string + channel: Ref + created: Date + modifiedBy: PersonId + mailId: string + spaceId: Ref + workspace: WorkspaceUuid + threadId: Ref + recipient: MailRecipient + isReply: boolean +}