diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml
index 7b7e42e677..e1590bcec1 100644
--- a/common/config/rush/pnpm-lock.yaml
+++ b/common/config/rush/pnpm-lock.yaml
@@ -4076,7 +4076,7 @@ packages:
version: 0.0.0
'@rush-temp/card-resources@file:projects/card-resources.tgz':
- resolution: {integrity: sha512-CdWQQYWbLy+4AITDmlv2q7Sw96mSzRsslUOU/87wDljEuIubpEKNhfccS42ImusNZMXj4yvd5wwM9ajjBMRKkg==, tarball: file:projects/card-resources.tgz}
+ resolution: {integrity: sha512-pMv5ceyUDDEJBQbuNCHXE72Px6MNS16zvjzmK7M+PZxOmOXFNwldGCq/jPhslXqJXC24WjqpHCiQZ0gs7dN9uA==, tarball: file:projects/card-resources.tgz}
version: 0.0.0
'@rush-temp/card@file:projects/card.tgz':
@@ -4412,7 +4412,7 @@ packages:
version: 0.0.0
'@rush-temp/mail-common@file:projects/mail-common.tgz':
- resolution: {integrity: sha512-4ThjG7u37BfKgOUzqyVxZ+uCt5Z7Yv2DX192wxBuTbl1EX3t50XjTERnjt4Iw0rYFhXc7lOKZygYF1u8a3APhA==, tarball: file:projects/mail-common.tgz}
+ resolution: {integrity: sha512-DuWl8K6KBwPKVeIaSqUfgTwasFy+KEpU0zrPicIkuDaMILOKGCkmVM7YESS/ggiBlKxtyIOwKRDYeka1dhtxZw==, tarball: file:projects/mail-common.tgz}
version: 0.0.0
'@rush-temp/mail@file:projects/mail.tgz':
@@ -4896,11 +4896,11 @@ packages:
version: 0.0.0
'@rush-temp/pod-gmail@file:projects/pod-gmail.tgz':
- resolution: {integrity: sha512-5rn2Y1X7GtTiOKR5vbkqo5oB/zEeQXOiSw4nTPL8xp/Jx66PDnGjkwkHqBlberol5hG2/ns8im3Zqxql5O2VXw==, tarball: file:projects/pod-gmail.tgz}
+ resolution: {integrity: sha512-zgEk+yi5UCh/xDEFIoancdkzC+k/3o+yuyZ7JW6OiH4oi8xam0FJLk0ZgpPgEheHiD0qU2OfDPS5cymaivQTew==, tarball: file:projects/pod-gmail.tgz}
version: 0.0.0
'@rush-temp/pod-inbound-mail@file:projects/pod-inbound-mail.tgz':
- resolution: {integrity: sha512-vRYTZhVYb2trqlb5uKFKAPJq7Ay7EaTFy6FijfUTRqitr9QJEREl4PKdf752cOFUfgC8L5uHTH6Q2rgbvsXSug==, tarball: file:projects/pod-inbound-mail.tgz}
+ resolution: {integrity: sha512-EQdkJNUsw+xUAs74SJ+44fJCrR/kNeSCRzm3Dah9tFlhIcTZl5d80ir8VotOXBZ4McKxAFo3is05crGMDxP04Q==, tarball: file:projects/pod-inbound-mail.tgz}
version: 0.0.0
'@rush-temp/pod-love@file:projects/pod-love.tgz':
@@ -4992,7 +4992,7 @@ packages:
version: 0.0.0
'@rush-temp/process@file:projects/process.tgz':
- resolution: {integrity: sha512-3vz9X1/VX0PmWuKw9m3Nlfp2CjFZo2MpgCRdLfPaeo6EHExPeSitbfYl7LojLsGo5Uk7tSnUOwMSgd/GOndOVA==, tarball: file:projects/process.tgz}
+ resolution: {integrity: sha512-FLoi1Ooh3ntse8Czmk9pyT2ASy8VHz1xGUUXf38t09UKTZ/8SfqwrOtNWYuVnyIrZ3zVVmSL7xcOWmcn5AdTSw==, tarball: file:projects/process.tgz}
version: 0.0.0
'@rush-temp/prod@file:projects/prod.tgz':
@@ -5216,7 +5216,7 @@ packages:
version: 0.0.0
'@rush-temp/server-gmail-resources@file:projects/server-gmail-resources.tgz':
- resolution: {integrity: sha512-VUZuCBA60lmGBsC072eJNhPcfGLWGWaQKj6NmEZSutJt2M0dzchgdC5iWr+AX+fhE3pmDJhNW8hYdU06Y5FeZg==, tarball: file:projects/server-gmail-resources.tgz}
+ resolution: {integrity: sha512-PsKe9cztrK9Mj6TZO5scM/IArHSVaiJ+SMNwMoz6j8Kb2jZBP3a36wWG0Ly5GuDU7m272NE8bjuKUNmzM+kfZQ==, tarball: file:projects/server-gmail-resources.tgz}
version: 0.0.0
'@rush-temp/server-gmail@file:projects/server-gmail.tgz':
@@ -5616,7 +5616,7 @@ packages:
version: 0.0.0
'@rush-temp/workbench-resources@file:projects/workbench-resources.tgz':
- resolution: {integrity: sha512-N00MR4MXHz3pMLbTWfIwQCKUpwtaLTtx2lu+UR9QYhn2CiMT/tyVqfxtN2Pkbf0sj5VG5euI6jsnKskYgUY74g==, tarball: file:projects/workbench-resources.tgz}
+ resolution: {integrity: sha512-UfLCpybz5EEeX1qnGkQgS0cKEvptFzYm8D8YR9uIqSlfRAnCjKpEFTexnvbXZQeUsjvbIwkMQYsSbyhk/pYrGg==, tarball: file:projects/workbench-resources.tgz}
version: 0.0.0
'@rush-temp/workbench@file:projects/workbench.tgz':
@@ -20013,6 +20013,8 @@ snapshots:
'@rush-temp/mail-common@file:projects/mail-common.tgz(@babel/core@7.23.9)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.23.9))':
dependencies:
'@hcengineering/communication-rest-client': 0.1.187(typescript@5.7.3)
+ '@hcengineering/communication-sdk-types': 0.1.187(typescript@5.7.3)
+ '@hcengineering/communication-shared': 0.1.187(typescript@5.7.3)
'@hcengineering/communication-types': 0.1.187(typescript@5.7.3)
'@tsconfig/node16': 1.0.4
'@types/express': 4.17.21
@@ -20031,6 +20033,7 @@ snapshots:
eslint-plugin-node: 11.1.0(eslint@8.56.0)
eslint-plugin-promise: 6.1.1(eslint@8.56.0)
jest: 29.7.0(@types/node@20.11.19)(ts-node@10.9.2(@types/node@20.11.19)(typescript@5.3.3))
+ kafkajs: 2.2.4
prettier: 3.2.5
sanitize-html: 2.16.0
ts-jest: 29.1.2(@babel/core@7.23.9)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.23.9))(esbuild@0.24.2)(jest@29.7.0(@types/node@20.11.19)(ts-node@10.9.2(@types/node@20.11.19)(typescript@5.3.3)))(typescript@5.7.3)
@@ -22412,6 +22415,7 @@ snapshots:
'@rush-temp/pod-gmail@file:projects/pod-gmail.tgz(@babel/core@7.23.9)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.23.9))(bufferutil@4.0.8)(encoding@0.1.13)(utf-8-validate@6.0.4)':
dependencies:
+ '@hcengineering/communication-sdk-types': 0.1.187(typescript@5.3.3)
'@tsconfig/node16': 1.0.4
'@types/cors': 2.8.17
'@types/express': 4.17.21
@@ -22464,6 +22468,7 @@ snapshots:
'@rush-temp/pod-inbound-mail@file:projects/pod-inbound-mail.tgz(@babel/core@7.23.9)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.23.9))':
dependencies:
'@hcengineering/communication-rest-client': 0.1.187(typescript@5.7.3)
+ '@hcengineering/communication-sdk-types': 0.1.187(typescript@5.7.3)
'@hcengineering/communication-types': 0.1.187(typescript@5.7.3)
'@tsconfig/node16': 1.0.4
'@types/cors': 2.8.17
diff --git a/services/gmail/pod-gmail/package.json b/services/gmail/pod-gmail/package.json
index 8d408ca6cd..c484d4c1b8 100644
--- a/services/gmail/pod-gmail/package.json
+++ b/services/gmail/pod-gmail/package.json
@@ -64,7 +64,9 @@
"@hcengineering/chat": "^0.6.0",
"@hcengineering/client": "^0.6.18",
"@hcengineering/client-resources": "^0.6.27",
+ "@hcengineering/communication-sdk-types": "0.1.187",
"@hcengineering/contact": "^0.6.24",
+ "@hcengineering/kvs-client": "^0.6.0",
"@hcengineering/mail-common": "^0.6.0",
"@hcengineering/core": "^0.6.32",
"@hcengineering/gmail": "^0.6.22",
@@ -74,7 +76,6 @@
"@hcengineering/server-client": "^0.6.0",
"@hcengineering/server-storage": "^0.6.0",
"@hcengineering/server-token": "^0.6.11",
- "@hcengineering/kvs-client": "^0.6.0",
"cors": "^2.8.5",
"dotenv": "~16.0.0",
"express": "^4.21.2",
diff --git a/services/gmail/pod-gmail/src/__tests__/config.test.ts b/services/gmail/pod-gmail/src/__tests__/config.test.ts
new file mode 100644
index 0000000000..f283776d92
--- /dev/null
+++ b/services/gmail/pod-gmail/src/__tests__/config.test.ts
@@ -0,0 +1,157 @@
+//
+// Copyright © 2025 Hardcore Engineering Inc.
+//
+// Licensed under the Eclipse Public License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License. You may
+// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+import { IntegrationVersion } from '../types'
+
+describe('Config', () => {
+ // Store original environment
+ const originalEnv = { ...process.env }
+
+ // Mock required environment variables to prevent test failures
+ beforeEach(() => {
+ jest.resetModules() // Clear cache for config module between tests
+
+ // Set minimum required env variables
+ process.env.ACCOUNTS_URL = 'http://accounts.test'
+ process.env.SECRET = 'test-secret'
+ process.env.Credentials = 'test-credentials'
+ process.env.WATCH_TOPIC_NAME = 'test-topic'
+ process.env.KVS_URL = 'http://kvs.test'
+ process.env.STORAGE_CONFIG = 'test-storage-config'
+ })
+
+ // Restore original environment after tests
+ afterEach(() => {
+ process.env = { ...originalEnv }
+ })
+
+ it('should load default configuration values', () => {
+ // Import config inside test to ensure env variables are set
+ // eslint-disable-next-line @typescript-eslint/no-var-requires
+ const config = require('../config').default
+
+ // Check default values
+ expect(config.Port).toBe(8087)
+ expect(config.ServiceID).toBe('gmail-service')
+ expect(config.InitLimit).toBe(50)
+ expect(config.FooterMessage).toContain('Sent via Huly')
+ expect(config.Version).toBe(IntegrationVersion.V1)
+ expect(config.QueueRegion).toBe('')
+ expect(config.CommunicationTopic).toBe('hulygun')
+ })
+
+ it('should override defaults with environment variables', () => {
+ // Set custom values
+ process.env.PORT = '9000'
+ process.env.SERVICE_ID = 'custom-service'
+ process.env.INIT_LIMIT = '100'
+ process.env.FOOTER_MESSAGE = 'Custom footer'
+ process.env.VERSION = 'v2'
+ process.env.QUEUE_CONFIG = 'custom-queue-config'
+ process.env.QUEUE_REGION = 'custom-region'
+ process.env.COMMUNICATION_TOPIC = 'custom-topic'
+
+ // Load config with custom values
+ // eslint-disable-next-line @typescript-eslint/no-var-requires
+ const config = require('../config').default
+
+ // Check overridden values
+ expect(config.Port).toBe(9000)
+ expect(config.ServiceID).toBe('custom-service')
+ expect(config.InitLimit).toBe(100)
+ expect(config.FooterMessage).toBe('Custom footer')
+ expect(config.Version).toBe(IntegrationVersion.V2)
+ expect(config.QueueConfig).toBe('custom-queue-config')
+ expect(config.QueueRegion).toBe('custom-region')
+ expect(config.CommunicationTopic).toBe('custom-topic')
+ })
+
+ it('should throw error when required env variables are missing', () => {
+ // Remove required environment variables
+ process.env.ACCOUNTS_URL = undefined
+
+ // Expect error when loading config
+ expect(() => {
+ require('../config')
+ }).toThrow('Missing env variables: ACCOUNTS_URL')
+ })
+
+ it('should throw error for invalid version value', () => {
+ // Set invalid version
+ process.env.VERSION = 'invalid-version'
+
+ // Expect error when loading config
+ expect(() => {
+ require('../config')
+ }).toThrow("Invalid version: invalid-version. Must be 'v1' or 'v2'.")
+ })
+
+ it('should throw error when v2 is set but queue config is missing', () => {
+ // Set v2 without required queue config
+ process.env.VERSION = 'v2'
+ process.env.QUEUE_CONFIG = ''
+
+ // Expect error when loading config
+ expect(() => {
+ require('../config')
+ }).toThrow('Missing env variable: QUEUE_CONFIG')
+ })
+
+ it('should throw error when v2 is set but communication topic is missing', () => {
+ // Set v2 with queue config but missing communication topic
+ process.env.VERSION = 'v2'
+ process.env.QUEUE_CONFIG = 'test-queue'
+ process.env.COMMUNICATION_TOPIC = ''
+
+ // Expect error when loading config
+ expect(() => {
+ require('../config')
+ }).toThrow('Missing env variable: COMMUNICATION_TOPIC')
+ })
+
+ it('should correctly parse numeric values', () => {
+ // Set string values for numeric fields
+ process.env.PORT = '9999'
+ process.env.INIT_LIMIT = '200'
+
+ // Load config
+ // eslint-disable-next-line @typescript-eslint/no-var-requires
+ const config = require('../config').default
+
+ // Check parsed values
+ expect(config.Port).toBe(9999)
+ expect(config.InitLimit).toBe(200)
+ expect(typeof config.Port).toBe('number')
+ expect(typeof config.InitLimit).toBe('number')
+ })
+
+ it('should handle v2 configuration correctly', () => {
+ // Set up v2 configuration
+ process.env.VERSION = 'v2'
+ process.env.QUEUE_CONFIG = 'test-queue-config'
+ process.env.QUEUE_REGION = 'test-region'
+ process.env.COMMUNICATION_TOPIC = 'test-topic-name'
+
+ // Load config
+ // eslint-disable-next-line @typescript-eslint/no-var-requires
+ const config = require('../config').default
+
+ // Check v2 specific configuration
+ expect(config.Version).toBe(IntegrationVersion.V2)
+ expect(config.QueueConfig).toBe('test-queue-config')
+ expect(config.QueueRegion).toBe('test-region')
+ expect(config.CommunicationTopic).toBe('test-topic-name')
+ })
+})
diff --git a/services/gmail/pod-gmail/src/config.ts b/services/gmail/pod-gmail/src/config.ts
index a565c50e51..9f4f717424 100644
--- a/services/gmail/pod-gmail/src/config.ts
+++ b/services/gmail/pod-gmail/src/config.ts
@@ -28,6 +28,9 @@ interface Config extends BaseConfig {
FooterMessage: string
InitLimit: number
Version: IntegrationVersion
+ QueueConfig: string
+ QueueRegion: string
+ CommunicationTopic: string
}
const envMap: { [key in keyof Config]: string } = {
@@ -41,7 +44,10 @@ const envMap: { [key in keyof Config]: string } = {
InitLimit: 'INIT_LIMIT',
KvsUrl: 'KVS_URL',
StorageConfig: 'STORAGE_CONFIG',
- Version: 'VERSION'
+ Version: 'VERSION',
+ QueueConfig: 'QUEUE_CONFIG',
+ QueueRegion: 'QUEUE_REGION',
+ CommunicationTopic: 'COMMUNICATION_TOPIC'
}
const parseNumber = (str: string | undefined): number | undefined => (str !== undefined ? Number(str) : undefined)
@@ -65,7 +71,10 @@ const config: Config = (() => {
FooterMessage: process.env[envMap.FooterMessage] ?? '
Sent via Huly
',
KvsUrl: process.env[envMap.KvsUrl],
StorageConfig: process.env[envMap.StorageConfig],
- Version: version
+ Version: version,
+ QueueConfig: process.env[envMap.QueueConfig] ?? '',
+ QueueRegion: process.env[envMap.QueueRegion] ?? '',
+ CommunicationTopic: process.env[envMap.CommunicationTopic] ?? 'hulygun'
}
const missingEnv = (Object.keys(params) as Array)
@@ -75,6 +84,14 @@ const config: Config = (() => {
if (missingEnv.length > 0) {
throw Error(`Missing env variables: ${missingEnv.join(', ')}`)
}
+ if (version === IntegrationVersion.V2) {
+ if (params.QueueConfig === '') {
+ throw Error('Missing env variable: QUEUE_CONFIG')
+ }
+ if (params.CommunicationTopic === '') {
+ throw Error('Missing env variable: COMMUNICATION_TOPIC')
+ }
+ }
return params as Config
})()
diff --git a/services/gmail/pod-gmail/src/main.ts b/services/gmail/pod-gmail/src/main.ts
index 86f83c3658..82a2e51745 100644
--- a/services/gmail/pod-gmail/src/main.ts
+++ b/services/gmail/pod-gmail/src/main.ts
@@ -23,13 +23,15 @@ import { isWorkspaceLoginInfo } from '@hcengineering/account-client'
import { initStatisticsContext, type StorageConfiguration } from '@hcengineering/server-core'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import serverToken, { decodeToken } from '@hcengineering/server-token'
+import { initQueue, closeQueue } from '@hcengineering/mail-common'
import { type IncomingHttpHeaders } from 'http'
import { join } from 'path'
+
import { decode64 } from './base64'
import config from './config'
import { GmailController } from './gmailController'
import { createServer, listen } from './server'
-import { type Endpoint, type State } from './types'
+import { IntegrationVersion, type Endpoint, type State } from './types'
const extractToken = (header: IncomingHttpHeaders): any => {
try {
@@ -61,6 +63,10 @@ export const main = async (): Promise => {
const storageConfig: StorageConfiguration = storageConfigFromEnv()
const storageAdapter = buildStorageFromConfig(storageConfig)
+ if (config.Version === IntegrationVersion.V2) {
+ initQueue(ctx, 'gmail-service', config)
+ }
+
const gmailController = GmailController.create(ctx, storageAdapter)
await gmailController.startAll()
const endpoints: Endpoint[] = [
@@ -170,6 +176,7 @@ export const main = async (): Promise => {
const asyncClose = async (): Promise => {
await gmailController.close()
await storageAdapter.close()
+ await closeQueue()
}
const shutdown = (): void => {
diff --git a/services/gmail/pod-gmail/src/message/v2/message.ts b/services/gmail/pod-gmail/src/message/v2/message.ts
index 21750a3b81..812dc9807e 100644
--- a/services/gmail/pod-gmail/src/message/v2/message.ts
+++ b/services/gmail/pod-gmail/src/message/v2/message.ts
@@ -17,7 +17,13 @@ import { gmail_v1 } from 'googleapis'
import sanitizeHtml from 'sanitize-html'
import { TxOperations, type MeasureContext } from '@hcengineering/core'
-import { createMessages, parseEmailHeader, parseNameFromEmailHeader, EmailMessage } from '@hcengineering/mail-common'
+import {
+ createMessages,
+ parseEmailHeader,
+ parseNameFromEmailHeader,
+ EmailMessage,
+ getProducer
+} from '@hcengineering/mail-common'
import { type KeyValueClient } from '@hcengineering/kvs-client'
import { AccountClient, isWorkspaceLoginInfo, WorkspaceLoginInfo } from '@hcengineering/account-client'
@@ -55,6 +61,7 @@ export class MessageManagerV2 implements IMessageManager {
this.ctx,
this.txClient,
this.keyValueClient,
+ await getProducer(config.CommunicationTopic),
this.token,
this.wsInfo,
res,
diff --git a/services/mail/mail-common/package.json b/services/mail/mail-common/package.json
index 7cc80b4c6c..1461969229 100644
--- a/services/mail/mail-common/package.json
+++ b/services/mail/mail-common/package.json
@@ -50,12 +50,17 @@
"@hcengineering/card": "^0.6.0",
"@hcengineering/chat": "^0.6.0",
"@hcengineering/communication-rest-client": "0.1.187",
+ "@hcengineering/communication-sdk-types": "0.1.187",
+ "@hcengineering/communication-shared": "0.1.187",
"@hcengineering/communication-types": "0.1.187",
"@hcengineering/contact": "^0.6.24",
"@hcengineering/core": "^0.6.32",
"@hcengineering/kvs-client": "^0.6.0",
+ "@hcengineering/kafka": "^0.6.0",
"@hcengineering/mail": "^0.6.0",
+ "@hcengineering/server-core": "^0.6.1",
"@hcengineering/server-storage": "^0.6.0",
+ "kafkajs": "^2.2.4",
"sanitize-html": "^2.15.0",
"turndown": "^7.2.0",
"uuid": "^8.3.2"
diff --git a/services/mail/mail-common/src/__tests__/queue.test.ts b/services/mail/mail-common/src/__tests__/queue.test.ts
new file mode 100644
index 0000000000..77e6d147f7
--- /dev/null
+++ b/services/mail/mail-common/src/__tests__/queue.test.ts
@@ -0,0 +1,271 @@
+import { Kafka, Producer } from 'kafkajs'
+import { parseQueueConfig } from '@hcengineering/kafka'
+import { MeasureContext } from '@hcengineering/core'
+import { initQueue, closeQueue, getProducer, KafkaQueueRegistry } from '../queue'
+import { BaseConfig } from '../types'
+
+// Mock dependencies
+jest.mock('kafkajs')
+jest.mock('@hcengineering/kafka')
+
+/* eslint-disable @typescript-eslint/unbound-method */
+describe('Kafka Queue Management', () => {
+ // Mock objects
+ let mockCtx: MeasureContext
+ let mockConfig: BaseConfig
+ let mockKafka: Kafka
+ let mockProducer: Producer
+ let disconnectMock: jest.Mock
+
+ // Setup function to create clean mocks for each test
+ beforeEach(() => {
+ // Reset modules to clear any singleton state
+ jest.resetModules()
+
+ // Create mock for producer
+ disconnectMock = jest.fn().mockResolvedValue(undefined)
+ mockProducer = {
+ connect: jest.fn().mockResolvedValue(undefined),
+ disconnect: disconnectMock,
+ send: jest.fn().mockResolvedValue(undefined),
+ on: jest.fn(),
+ events: {},
+ transaction: jest.fn(),
+ logger: jest.fn()
+ } as unknown as Producer
+
+ // Create mock for Kafka
+ mockKafka = {
+ producer: jest.fn().mockReturnValue(mockProducer),
+ consumer: jest.fn(),
+ admin: jest.fn()
+ } as unknown as Kafka
+
+ // Mock constructor for Kafka
+ ;(Kafka as jest.Mock).mockImplementation(() => mockKafka)
+
+ // Create context mock
+ mockCtx = {
+ info: jest.fn(),
+ error: jest.fn(),
+ warn: jest.fn(),
+ measure: jest.fn()
+ } as unknown as MeasureContext
+
+ // Create config mock
+ mockConfig = {
+ QueueConfig: 'kafka:broker:9092',
+ QueueRegion: 'test-region',
+ CommunicationTopic: 'test-topic'
+ } as any
+
+ // Mock parseQueueConfig
+ ;(parseQueueConfig as jest.Mock).mockReturnValue({
+ clientId: 'test-service',
+ brokers: ['broker:9092'],
+ ssl: false
+ })
+ })
+
+ afterEach(async () => {
+ // Clean up after each test
+ await closeQueue()
+ })
+
+ describe('initQueue', () => {
+ it('should initialize the queue registry with correct parameters', () => {
+ // Act
+ initQueue(mockCtx, 'test-service', mockConfig)
+
+ // Assert
+ expect(parseQueueConfig).toHaveBeenCalledWith(mockConfig.QueueConfig, 'test-service', mockConfig.QueueRegion)
+
+ expect(Kafka).toHaveBeenCalledWith({
+ clientId: 'test-service',
+ brokers: ['broker:9092'],
+ ssl: false
+ })
+
+ expect(mockCtx.info).toHaveBeenCalledWith('Kafka queue initialized', { serviceId: 'test-service' })
+ })
+
+ it('should throw an error when initialized twice', () => {
+ // Arrange
+ initQueue(mockCtx, 'test-service', mockConfig)
+
+ // Act & Assert
+ expect(() => {
+ initQueue(mockCtx, 'test-service', mockConfig)
+ }).toThrow('Queue already initialized')
+ })
+
+ it('should throw an error when queue config is missing', () => {
+ // Arrange
+ const configWithoutQueue: BaseConfig = { ...mockConfig, QueueConfig: undefined } as any
+
+ // Act & Assert
+ expect(() => {
+ initQueue(mockCtx, 'test-service', configWithoutQueue)
+ }).toThrow('Please provide queue config')
+ })
+ })
+
+ describe('getProducer', () => {
+ it('should throw an error when queue is not initialized', async () => {
+ // Act & Assert
+ await expect(getProducer('test-topic')).rejects.toThrow('Queue not initialized')
+ })
+
+ it('should create a new producer when one does not exist for the topic', async () => {
+ // Arrange
+ initQueue(mockCtx, 'test-service', mockConfig)
+
+ // Act
+ const producer = await getProducer('test-topic')
+
+ // Assert
+ expect(producer).toBe(mockProducer)
+ expect(mockKafka.producer).toHaveBeenCalled()
+ expect(mockProducer.connect).toHaveBeenCalled()
+ expect(mockCtx.info).toHaveBeenCalledWith('Created new Kafka producer', { key: 'test-topic' })
+ })
+
+ it('should reuse an existing producer for the same topic', async () => {
+ // Arrange
+ initQueue(mockCtx, 'test-service', mockConfig)
+
+ // Act
+ const producer1 = await getProducer('test-topic')
+ const producer2 = await getProducer('test-topic')
+
+ // Assert
+ expect(producer1).toBe(producer2)
+ expect(mockKafka.producer).toHaveBeenCalledTimes(1) // Called only once
+ })
+
+ it('should create different producers for different topics', async () => {
+ // Arrange
+ initQueue(mockCtx, 'test-service', mockConfig)
+
+ // Setup a second producer
+ const mockProducer2 = { ...mockProducer }
+ mockKafka.producer = jest.fn().mockReturnValueOnce(mockProducer).mockReturnValueOnce(mockProducer2)
+
+ // Act
+ const producer1 = await getProducer('topic1')
+ const producer2 = await getProducer('topic2')
+
+ // Assert
+ expect(mockKafka.producer).toHaveBeenCalledTimes(2)
+ expect(producer1).not.toBe(producer2)
+ })
+ })
+
+ describe('closeQueue', () => {
+ it('should do nothing if the queue is not initialized', async () => {
+ // Act
+ await closeQueue()
+
+ // Assert
+ expect(mockProducer.disconnect).not.toHaveBeenCalled()
+ })
+
+ it('should close all producers', async () => {
+ // Arrange
+ initQueue(mockCtx, 'test-service', mockConfig)
+
+ // Get a couple of producers
+ await getProducer('topic1')
+ await getProducer('topic2')
+
+ // Act
+ await closeQueue()
+
+ // Assert
+ expect(mockProducer.disconnect).toHaveBeenCalledTimes(2)
+ expect(mockCtx.info).toHaveBeenCalledWith('KafkaQueueRegistry closed')
+ })
+
+ it('should handle errors when closing producers', async () => {
+ // Arrange
+ initQueue(mockCtx, 'test-service', mockConfig)
+
+ // Get a producer
+ await getProducer('test-topic')
+
+ // Make disconnect throw an error
+ const error = new Error('Disconnect failed')
+ disconnectMock.mockRejectedValueOnce(error)
+
+ // Act
+ await closeQueue()
+
+ // Assert
+ expect(mockCtx.error).toHaveBeenCalledWith('Failed to close Kafka producer', {
+ topic: 'test-topic',
+ error
+ })
+
+ // It should still clear the producers
+ expect(mockCtx.info).toHaveBeenCalledWith('KafkaQueueRegistry closed')
+ })
+
+ it('should reset the registry after closing', async () => {
+ // Arrange
+ initQueue(mockCtx, 'test-service', mockConfig)
+
+ // Act
+ await closeQueue()
+
+ // The registry should be reset, so initializing again should work
+ initQueue(mockCtx, 'test-service', mockConfig)
+
+ // Assert - expect no errors and two initialization logs
+ expect(mockCtx.info).toHaveBeenCalledWith('Kafka queue initialized', { serviceId: 'test-service' })
+ })
+ })
+
+ describe('KafkaQueueRegistry', () => {
+ it('should create a Kafka instance with correct configuration', () => {
+ // Act
+ // eslint-disable-next-line no-new
+ new KafkaQueueRegistry(mockCtx, 'test-service', mockConfig)
+
+ // Assert
+ expect(parseQueueConfig).toHaveBeenCalledWith(mockConfig.QueueConfig, 'test-service', mockConfig.QueueRegion)
+
+ expect(Kafka).toHaveBeenCalledWith({
+ clientId: 'test-service',
+ brokers: ['broker:9092'],
+ ssl: false
+ })
+ })
+
+ it('should log when a producer is created', async () => {
+ // Arrange
+ const registry = new KafkaQueueRegistry(mockCtx, 'test-service', mockConfig)
+
+ // Act
+ await registry.getProducer('test-topic')
+
+ // Assert
+ expect(mockCtx.info).toHaveBeenCalledWith('Created new Kafka producer', { key: 'test-topic' })
+ })
+
+ it('should close all producers during shutdown', async () => {
+ // Arrange
+ const registry = new KafkaQueueRegistry(mockCtx, 'test-service', mockConfig)
+
+ // Create some producers
+ await registry.getProducer('topic1')
+ await registry.getProducer('topic2')
+
+ // Act
+ await registry.close()
+
+ // Assert
+ expect(mockProducer.disconnect).toHaveBeenCalledTimes(2)
+ expect(mockCtx.info).toHaveBeenCalledWith('KafkaQueueRegistry closed')
+ })
+ })
+})
diff --git a/services/mail/mail-common/src/index.ts b/services/mail/mail-common/src/index.ts
index a169b586a6..df82f64f6b 100644
--- a/services/mail/mail-common/src/index.ts
+++ b/services/mail/mail-common/src/index.ts
@@ -17,3 +17,4 @@ export * from './message'
export * from './types'
export * from './utils'
export * from './mutex'
+export * from './queue'
diff --git a/services/mail/mail-common/src/message.ts b/services/mail/mail-common/src/message.ts
index 60f9e3fc6c..f88ecbb884 100644
--- a/services/mail/mail-common/src/message.ts
+++ b/services/mail/mail-common/src/message.ts
@@ -12,12 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
+import { Producer } from 'kafkajs'
+
import { WorkspaceLoginInfo } from '@hcengineering/account-client'
import { type Card } from '@hcengineering/card'
-import {
- type RestClient as CommunicationClient,
- createRestClient as getCommunicationClient
-} from '@hcengineering/communication-rest-client'
import { MessageType } from '@hcengineering/communication-types'
import chat from '@hcengineering/chat'
import { PersonSpace } from '@hcengineering/contact'
@@ -27,14 +25,18 @@ import {
type PersonId,
type Ref,
type TxOperations,
+ Doc,
generateId,
PersonUuid,
- RateLimiter
+ RateLimiter,
+ Space
} from '@hcengineering/core'
import mail from '@hcengineering/mail'
import { type KeyValueClient } from '@hcengineering/kvs-client'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
+import { MessageRequestEventType } from '@hcengineering/communication-sdk-types'
+import { generateMessageId } from '@hcengineering/communication-shared'
import { BaseConfig, type Attachment } from './types'
import { EmailMessage } from './types'
@@ -49,6 +51,7 @@ export async function createMessages (
ctx: MeasureContext,
txClient: TxOperations,
keyValueClient: KeyValueClient,
+ producer: Producer,
token: string,
wsInfo: WorkspaceLoginInfo,
message: EmailMessage,
@@ -62,7 +65,6 @@ export async function createMessages (
const personSpacesCache = PersonSpacesCacheFactory.getInstance(ctx, txClient, wsInfo.workspace)
const channelCache = ChannelCacheFactory.getInstance(ctx, txClient, wsInfo.workspace)
const threadLookup = ThreadLookupService.getInstance(ctx, keyValueClient, token)
- const msgClient = getCommunicationClient(wsInfo.endpoint, wsInfo.workspace, wsInfo.token)
const fromPerson = await personCache.ensurePerson(from)
@@ -116,10 +118,12 @@ export async function createMessages (
const spaces = await personSpacesCache.getPersonSpaces(mailId, fromPerson.uuid, from.email)
if (spaces.length > 0) {
await saveMessageToSpaces(
+ config,
ctx,
txClient,
- msgClient,
+ producer,
threadLookup,
+ wsInfo,
mailId,
spaces,
participants,
@@ -148,10 +152,12 @@ export async function createMessages (
const spaces = await personSpacesCache.getPersonSpaces(mailId, to.uuid, to.address)
if (spaces.length > 0) {
await saveMessageToSpaces(
+ config,
ctx,
txClient,
- msgClient,
+ producer,
threadLookup,
+ wsInfo,
mailId,
spaces,
participants,
@@ -173,10 +179,12 @@ export async function createMessages (
}
async function saveMessageToSpaces (
+ config: BaseConfig,
ctx: MeasureContext,
client: TxOperations,
- msgClient: CommunicationClient,
+ producer: Producer,
threadLookup: ThreadLookupService,
+ wsInfo: WorkspaceLoginInfo,
mailId: string,
spaces: PersonSpace[],
participants: PersonId[],
@@ -208,8 +216,9 @@ async function saveMessageToSpaces (
ctx.info('Found existing thread', { mailId, threadId, spaceId })
}
}
+ let channel: Ref> | undefined
if (threadId === undefined) {
- const channel = await channelCache.getOrCreateChannel(spaceId, participants, me, owner)
+ channel = await channelCache.getOrCreateChannel(spaceId, participants, me, owner)
const newThreadId = await client.createDoc(
chat.masterTag.Thread,
space._id,
@@ -241,30 +250,62 @@ async function saveMessageToSpaces (
ctx.info('Created new thread', { mailId, threadId, spaceId })
}
- const { id: messageId, created: messageCreated } = await msgClient.createMessage(
- threadId,
- chat.masterTag.Thread,
- content,
- modifiedBy,
- MessageType.Message,
- {
- created: createdDate
- }
- )
- ctx.info('Created message', { mailId, messageId, threadId })
+ const messageId = generateMessageId()
+ const created = new Date(createdDate)
- for (const a of attachments) {
- await msgClient.createFile(
- threadId,
- messageId,
- messageCreated,
- a.id as Ref,
- a.contentType,
- a.name,
- a.data.length,
- modifiedBy
+ const messageData = Buffer.from(
+ JSON.stringify({
+ type: MessageRequestEventType.CreateMessage,
+ messageType: MessageType.Message,
+ card: threadId,
+ cardType: chat.masterTag.Thread,
+ content,
+ creator: modifiedBy,
+ created,
+ id: messageId
+ })
+ )
+ await producer.send({
+ topic: config.CommunicationTopic,
+ messages: [
+ {
+ key: Buffer.from(channel ?? spaceId),
+ value: messageData,
+ headers: {
+ WorkspaceUuid: wsInfo.workspace
+ }
+ }
+ ]
+ })
+ ctx.info('Send message event', { mailId, messageId, threadId })
+
+ const fileData: Buffer[] = attachments.map((a) =>
+ Buffer.from(
+ JSON.stringify({
+ type: MessageRequestEventType.CreateFile,
+ card: threadId,
+ message: messageId,
+ messageCreated: created,
+ blobId: a.id as Ref,
+ fileType: a.contentType,
+ filename: a.name,
+ size: a.data.length,
+ creator: modifiedBy
+ })
)
- }
+ )
+ const fileEvents = fileData.map((data) => ({
+ key: Buffer.from(channel ?? spaceId),
+ value: data,
+ headers: {
+ WorkspaceUuid: wsInfo.workspace
+ }
+ }))
+ await producer.send({
+ topic: config.CommunicationTopic,
+ messages: fileEvents
+ })
+ ctx.info('Send file events', { mailId, messageId, threadId, count: fileEvents.length })
await threadLookup.setThreadId(mailId, space._id, threadId)
})
diff --git a/services/mail/mail-common/src/queue.ts b/services/mail/mail-common/src/queue.ts
new file mode 100644
index 0000000000..878b2ee799
--- /dev/null
+++ b/services/mail/mail-common/src/queue.ts
@@ -0,0 +1,86 @@
+import { Kafka, Producer } from 'kafkajs'
+import { parseQueueConfig } from '@hcengineering/kafka'
+import { BaseConfig } from './types'
+import { MeasureContext } from '@hcengineering/core'
+
+let queueRegistry: KafkaQueueRegistry | undefined
+
+export function initQueue (ctx: MeasureContext, serviceId: string, config: BaseConfig): void {
+ if (queueRegistry !== undefined) {
+ throw new Error('Queue already initialized')
+ }
+
+ queueRegistry = new KafkaQueueRegistry(ctx, serviceId, config)
+ ctx.info('Kafka queue initialized', { serviceId })
+}
+
+export async function closeQueue (): Promise {
+ if (queueRegistry !== undefined) {
+ await queueRegistry.close()
+ queueRegistry = undefined
+ }
+}
+
+export async function getProducer (key: string): Promise {
+ if (queueRegistry === undefined) {
+ throw new Error('Queue not initialized')
+ }
+ return await queueRegistry.getProducer(key)
+}
+export class KafkaQueueRegistry {
+ private readonly kafka: Kafka
+ private readonly producers = new Map()
+
+ constructor (
+ private readonly ctx: MeasureContext,
+ serviceId: string,
+ serviceConfig: BaseConfig
+ ) {
+ this.kafka = getKafkaQueue(serviceId, serviceConfig)
+
+ ctx.info('Kafka client created', { serviceId })
+ }
+
+ public async getProducer (key: string): Promise {
+ const producer = this.producers.get(key)
+ if (producer !== undefined) {
+ return producer
+ }
+
+ const kafkaProducer = this.kafka.producer()
+
+ await kafkaProducer.connect()
+
+ this.producers.set(key, kafkaProducer)
+ this.ctx.info('Created new Kafka producer', { key })
+
+ return kafkaProducer
+ }
+
+ public async close (): Promise {
+ for (const [topic, producer] of this.producers.entries()) {
+ try {
+ await producer.disconnect()
+ this.ctx.info('Kafka producer closed', { topic })
+ } catch (err) {
+ this.ctx.error('Failed to close Kafka producer', { topic, error: err })
+ }
+ }
+
+ this.producers.clear()
+ this.ctx.info('KafkaQueueRegistry closed')
+ }
+}
+
+function getKafkaQueue (serviceId: string, serviceConfig: BaseConfig): Kafka {
+ const { QueueConfig, QueueRegion } = serviceConfig
+ if (QueueConfig === undefined) {
+ throw new Error('Please provide queue config')
+ }
+ const config = parseQueueConfig(QueueConfig, serviceId, QueueRegion)
+ return new Kafka({
+ clientId: config.clientId,
+ brokers: config.brokers,
+ ssl: config.ssl
+ })
+}
diff --git a/services/mail/mail-common/src/types.ts b/services/mail/mail-common/src/types.ts
index 9185042e7b..11ac14e731 100644
--- a/services/mail/mail-common/src/types.ts
+++ b/services/mail/mail-common/src/types.ts
@@ -45,4 +45,7 @@ export interface BaseConfig {
AccountsURL: string
KvsUrl: string
StorageConfig: string
+ QueueConfig: string
+ QueueRegion: string
+ CommunicationTopic: string
}
diff --git a/services/mail/pod-inbound-mail/package.json b/services/mail/pod-inbound-mail/package.json
index 18d9b6dc88..ed07ec12be 100644
--- a/services/mail/pod-inbound-mail/package.json
+++ b/services/mail/pod-inbound-mail/package.json
@@ -62,6 +62,7 @@
"@hcengineering/api-client": "^0.6.0",
"@hcengineering/card": "^0.6.0",
"@hcengineering/communication-rest-client": "0.1.187",
+ "@hcengineering/communication-sdk-types": "0.1.187",
"@hcengineering/communication-types": "0.1.187",
"@hcengineering/contact": "^0.6.24",
"@hcengineering/core": "^0.6.32",
diff --git a/services/mail/pod-inbound-mail/src/__tests__/handlerMta.test.ts b/services/mail/pod-inbound-mail/src/__tests__/handlerMta.test.ts
index 08731a8cd6..e162b3835a 100644
--- a/services/mail/pod-inbound-mail/src/__tests__/handlerMta.test.ts
+++ b/services/mail/pod-inbound-mail/src/__tests__/handlerMta.test.ts
@@ -22,7 +22,8 @@ import { createRestTxOperations } from '@hcengineering/api-client'
// Mock dependencies
jest.mock('@hcengineering/mail-common', () => ({
- createMessages: jest.fn()
+ createMessages: jest.fn(),
+ getProducer: jest.fn().mockReturnValue({})
}))
jest.mock('../client', () => ({
@@ -177,6 +178,7 @@ describe('handleMtaHook', () => {
mockCtx,
mockTxOperations, // This should be the TxOperations mock, not kvsClient
{}, // This should be the KeyValueClient mock
+ {},
client.mailServiceToken,
mockLoginInfo, // Added workspace login info
expect.objectContaining({
@@ -221,6 +223,7 @@ describe('handleMtaHook', () => {
mockCtx,
mockTxOperations,
{},
+ {},
client.mailServiceToken,
mockLoginInfo,
expect.objectContaining({
@@ -246,6 +249,7 @@ describe('handleMtaHook', () => {
mockCtx,
mockTxOperations,
{},
+ {},
client.mailServiceToken,
mockLoginInfo,
expect.objectContaining({
@@ -275,6 +279,7 @@ describe('handleMtaHook', () => {
mockCtx,
mockTxOperations,
{},
+ {},
client.mailServiceToken,
mockLoginInfo,
expect.objectContaining({
@@ -299,6 +304,7 @@ describe('handleMtaHook', () => {
mockCtx,
mockTxOperations,
{},
+ {},
client.mailServiceToken,
mockLoginInfo,
expect.objectContaining({
@@ -327,6 +333,7 @@ describe('handleMtaHook', () => {
mockCtx,
mockTxOperations,
{},
+ {},
client.mailServiceToken,
mockLoginInfo,
expect.objectContaining({
diff --git a/services/mail/pod-inbound-mail/src/client.ts b/services/mail/pod-inbound-mail/src/client.ts
index 5e41b9b152..775e5406bf 100644
--- a/services/mail/pod-inbound-mail/src/client.ts
+++ b/services/mail/pod-inbound-mail/src/client.ts
@@ -25,6 +25,9 @@ export const mailServiceToken = generateToken(systemAccountUuid, undefined, { se
export const baseConfig: BaseConfig = {
AccountsURL: config.accountsUrl,
KvsUrl: config.kvsUrl,
- StorageConfig: config.storageConfig ?? ''
+ StorageConfig: config.storageConfig ?? '',
+ QueueConfig: config.queueConfig ?? '',
+ QueueRegion: config.queueRegion ?? '',
+ CommunicationTopic: config.communicationTopic
}
export const kvsClient = getClient('inbound-mail', baseConfig.KvsUrl, mailServiceToken)
diff --git a/services/mail/pod-inbound-mail/src/config.ts b/services/mail/pod-inbound-mail/src/config.ts
index b115f06ef2..fa4a695ae0 100644
--- a/services/mail/pod-inbound-mail/src/config.ts
+++ b/services/mail/pod-inbound-mail/src/config.ts
@@ -26,6 +26,9 @@ interface Config {
mailSizeLimit?: string
storageConfig?: string
kvsUrl: string
+ queueConfig: string
+ queueRegion: string
+ communicationTopic: string
}
const config: Config = {
@@ -47,7 +50,15 @@ const config: Config = {
return process.env.KVS_URL
}
throw Error('KVS_URL env var is not set')
- })()
+ })(),
+ queueConfig: (() => {
+ if (process.env.QUEUE_CONFIG !== undefined) {
+ return process.env.QUEUE_CONFIG
+ }
+ throw Error('QUEUE_CONFIG env var is not set')
+ })(),
+ queueRegion: process.env.QUEUE_REGION ?? '',
+ communicationTopic: process.env.COMMUNICATION_TOPIC ?? 'hulygun'
}
export default config
diff --git a/services/mail/pod-inbound-mail/src/handlerMta.ts b/services/mail/pod-inbound-mail/src/handlerMta.ts
index 985cdf7da9..976a60e687 100644
--- a/services/mail/pod-inbound-mail/src/handlerMta.ts
+++ b/services/mail/pod-inbound-mail/src/handlerMta.ts
@@ -18,12 +18,17 @@ import { Request, Response } from 'express'
import TurndownService from 'turndown'
import sanitizeHtml from 'sanitize-html'
import { MeasureContext } from '@hcengineering/core'
-import { type Attachment, type EmailContact, type EmailMessage, createMessages } from '@hcengineering/mail-common'
+import {
+ type Attachment,
+ type EmailContact,
+ type EmailMessage,
+ createMessages,
+ getProducer
+} from '@hcengineering/mail-common'
import { getClient as getAccountClient } from '@hcengineering/account-client'
import { createRestTxOperations } from '@hcengineering/api-client'
import { mailServiceToken, baseConfig, kvsClient } from './client'
-
import config from './config'
export interface MtaMessage {
@@ -118,7 +123,17 @@ export async function handleMtaHook (req: Request, res: Response, ctx: MeasureCo
const transactorUrl = wsInfo.endpoint.replace('ws://', 'http://').replace('wss://', 'https://')
const txClient = await createRestTxOperations(transactorUrl, wsInfo.workspace, wsInfo.token)
- await createMessages(baseConfig, ctx, txClient, kvsClient, mailServiceToken, wsInfo, convertedMessage, attachments)
+ await createMessages(
+ baseConfig,
+ ctx,
+ txClient,
+ kvsClient,
+ await getProducer(baseConfig.CommunicationTopic),
+ mailServiceToken,
+ wsInfo,
+ convertedMessage,
+ attachments
+ )
} catch (error) {
ctx.error('mta-hook', { error })
} finally {
diff --git a/services/mail/pod-inbound-mail/src/index.ts b/services/mail/pod-inbound-mail/src/index.ts
index 98372aa6f1..2a4bd61d5a 100644
--- a/services/mail/pod-inbound-mail/src/index.ts
+++ b/services/mail/pod-inbound-mail/src/index.ts
@@ -21,8 +21,11 @@ import { MeasureContext, MeasureMetricsContext, newMetrics } from '@hcengineerin
import { setMetadata } from '@hcengineering/platform'
import { initStatisticsContext } from '@hcengineering/server-core'
import serverToken from '@hcengineering/server-token'
+import { initQueue, closeQueue } from '@hcengineering/mail-common'
+
import { handleMtaHook } from './handlerMta'
import config from './config'
+import { baseConfig } from './client'
type RequestHandler = (req: Request, res: Response, ctx: MeasureContext, next?: NextFunction) => Promise
@@ -43,6 +46,8 @@ async function main (): Promise {
setMetadata(serverToken.metadata.Secret, config.secret)
+ initQueue(ctx, 'inbound-mail', baseConfig)
+
const app = express()
app.use(cors())
@@ -91,7 +96,7 @@ async function main (): Promise {
const shutdown = (): void => {
server.close(() => {
- process.exit()
+ void closeQueue().then(() => process.exit())
})
}