From 40001d9a5a37ba3a9b7bb01fb80735b3e66c80df Mon Sep 17 00:00:00 2001 From: Artyom Savchenko Date: Wed, 21 May 2025 11:24:50 +0700 Subject: [PATCH] UBERF-10557: Use communication queue (#8993) Signed-off-by: Artem Savchenko --- common/config/rush/pnpm-lock.yaml | 19 +- services/gmail/pod-gmail/package.json | 3 +- .../pod-gmail/src/__tests__/config.test.ts | 157 ++++++++++ services/gmail/pod-gmail/src/config.ts | 21 +- services/gmail/pod-gmail/src/main.ts | 9 +- .../gmail/pod-gmail/src/message/v2/message.ts | 9 +- services/mail/mail-common/package.json | 5 + .../mail-common/src/__tests__/queue.test.ts | 271 ++++++++++++++++++ services/mail/mail-common/src/index.ts | 1 + services/mail/mail-common/src/message.ts | 105 ++++--- services/mail/mail-common/src/queue.ts | 86 ++++++ services/mail/mail-common/src/types.ts | 3 + services/mail/pod-inbound-mail/package.json | 1 + .../src/__tests__/handlerMta.test.ts | 9 +- services/mail/pod-inbound-mail/src/client.ts | 5 +- services/mail/pod-inbound-mail/src/config.ts | 13 +- .../mail/pod-inbound-mail/src/handlerMta.ts | 21 +- services/mail/pod-inbound-mail/src/index.ts | 7 +- 18 files changed, 694 insertions(+), 51 deletions(-) create mode 100644 services/gmail/pod-gmail/src/__tests__/config.test.ts create mode 100644 services/mail/mail-common/src/__tests__/queue.test.ts create mode 100644 services/mail/mail-common/src/queue.ts diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index 7b7e42e677..e1590bcec1 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -4076,7 +4076,7 @@ packages: version: 0.0.0 '@rush-temp/card-resources@file:projects/card-resources.tgz': - resolution: {integrity: sha512-CdWQQYWbLy+4AITDmlv2q7Sw96mSzRsslUOU/87wDljEuIubpEKNhfccS42ImusNZMXj4yvd5wwM9ajjBMRKkg==, tarball: file:projects/card-resources.tgz} + resolution: {integrity: sha512-pMv5ceyUDDEJBQbuNCHXE72Px6MNS16zvjzmK7M+PZxOmOXFNwldGCq/jPhslXqJXC24WjqpHCiQZ0gs7dN9uA==, tarball: file:projects/card-resources.tgz} version: 0.0.0 '@rush-temp/card@file:projects/card.tgz': @@ -4412,7 +4412,7 @@ packages: version: 0.0.0 '@rush-temp/mail-common@file:projects/mail-common.tgz': - resolution: {integrity: sha512-4ThjG7u37BfKgOUzqyVxZ+uCt5Z7Yv2DX192wxBuTbl1EX3t50XjTERnjt4Iw0rYFhXc7lOKZygYF1u8a3APhA==, tarball: file:projects/mail-common.tgz} + resolution: {integrity: sha512-DuWl8K6KBwPKVeIaSqUfgTwasFy+KEpU0zrPicIkuDaMILOKGCkmVM7YESS/ggiBlKxtyIOwKRDYeka1dhtxZw==, tarball: file:projects/mail-common.tgz} version: 0.0.0 '@rush-temp/mail@file:projects/mail.tgz': @@ -4896,11 +4896,11 @@ packages: version: 0.0.0 '@rush-temp/pod-gmail@file:projects/pod-gmail.tgz': - resolution: {integrity: sha512-5rn2Y1X7GtTiOKR5vbkqo5oB/zEeQXOiSw4nTPL8xp/Jx66PDnGjkwkHqBlberol5hG2/ns8im3Zqxql5O2VXw==, tarball: file:projects/pod-gmail.tgz} + resolution: {integrity: sha512-zgEk+yi5UCh/xDEFIoancdkzC+k/3o+yuyZ7JW6OiH4oi8xam0FJLk0ZgpPgEheHiD0qU2OfDPS5cymaivQTew==, tarball: file:projects/pod-gmail.tgz} version: 0.0.0 '@rush-temp/pod-inbound-mail@file:projects/pod-inbound-mail.tgz': - resolution: {integrity: sha512-vRYTZhVYb2trqlb5uKFKAPJq7Ay7EaTFy6FijfUTRqitr9QJEREl4PKdf752cOFUfgC8L5uHTH6Q2rgbvsXSug==, tarball: file:projects/pod-inbound-mail.tgz} + resolution: {integrity: sha512-EQdkJNUsw+xUAs74SJ+44fJCrR/kNeSCRzm3Dah9tFlhIcTZl5d80ir8VotOXBZ4McKxAFo3is05crGMDxP04Q==, tarball: file:projects/pod-inbound-mail.tgz} version: 0.0.0 '@rush-temp/pod-love@file:projects/pod-love.tgz': @@ -4992,7 +4992,7 @@ packages: version: 0.0.0 '@rush-temp/process@file:projects/process.tgz': - resolution: {integrity: sha512-3vz9X1/VX0PmWuKw9m3Nlfp2CjFZo2MpgCRdLfPaeo6EHExPeSitbfYl7LojLsGo5Uk7tSnUOwMSgd/GOndOVA==, tarball: file:projects/process.tgz} + resolution: {integrity: sha512-FLoi1Ooh3ntse8Czmk9pyT2ASy8VHz1xGUUXf38t09UKTZ/8SfqwrOtNWYuVnyIrZ3zVVmSL7xcOWmcn5AdTSw==, tarball: file:projects/process.tgz} version: 0.0.0 '@rush-temp/prod@file:projects/prod.tgz': @@ -5216,7 +5216,7 @@ packages: version: 0.0.0 '@rush-temp/server-gmail-resources@file:projects/server-gmail-resources.tgz': - resolution: {integrity: sha512-VUZuCBA60lmGBsC072eJNhPcfGLWGWaQKj6NmEZSutJt2M0dzchgdC5iWr+AX+fhE3pmDJhNW8hYdU06Y5FeZg==, tarball: file:projects/server-gmail-resources.tgz} + resolution: {integrity: sha512-PsKe9cztrK9Mj6TZO5scM/IArHSVaiJ+SMNwMoz6j8Kb2jZBP3a36wWG0Ly5GuDU7m272NE8bjuKUNmzM+kfZQ==, tarball: file:projects/server-gmail-resources.tgz} version: 0.0.0 '@rush-temp/server-gmail@file:projects/server-gmail.tgz': @@ -5616,7 +5616,7 @@ packages: version: 0.0.0 '@rush-temp/workbench-resources@file:projects/workbench-resources.tgz': - resolution: {integrity: sha512-N00MR4MXHz3pMLbTWfIwQCKUpwtaLTtx2lu+UR9QYhn2CiMT/tyVqfxtN2Pkbf0sj5VG5euI6jsnKskYgUY74g==, tarball: file:projects/workbench-resources.tgz} + resolution: {integrity: sha512-UfLCpybz5EEeX1qnGkQgS0cKEvptFzYm8D8YR9uIqSlfRAnCjKpEFTexnvbXZQeUsjvbIwkMQYsSbyhk/pYrGg==, tarball: file:projects/workbench-resources.tgz} version: 0.0.0 '@rush-temp/workbench@file:projects/workbench.tgz': @@ -20013,6 +20013,8 @@ snapshots: '@rush-temp/mail-common@file:projects/mail-common.tgz(@babel/core@7.23.9)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.23.9))': dependencies: '@hcengineering/communication-rest-client': 0.1.187(typescript@5.7.3) + '@hcengineering/communication-sdk-types': 0.1.187(typescript@5.7.3) + '@hcengineering/communication-shared': 0.1.187(typescript@5.7.3) '@hcengineering/communication-types': 0.1.187(typescript@5.7.3) '@tsconfig/node16': 1.0.4 '@types/express': 4.17.21 @@ -20031,6 +20033,7 @@ snapshots: eslint-plugin-node: 11.1.0(eslint@8.56.0) eslint-plugin-promise: 6.1.1(eslint@8.56.0) jest: 29.7.0(@types/node@20.11.19)(ts-node@10.9.2(@types/node@20.11.19)(typescript@5.3.3)) + kafkajs: 2.2.4 prettier: 3.2.5 sanitize-html: 2.16.0 ts-jest: 29.1.2(@babel/core@7.23.9)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.23.9))(esbuild@0.24.2)(jest@29.7.0(@types/node@20.11.19)(ts-node@10.9.2(@types/node@20.11.19)(typescript@5.3.3)))(typescript@5.7.3) @@ -22412,6 +22415,7 @@ snapshots: '@rush-temp/pod-gmail@file:projects/pod-gmail.tgz(@babel/core@7.23.9)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.23.9))(bufferutil@4.0.8)(encoding@0.1.13)(utf-8-validate@6.0.4)': dependencies: + '@hcengineering/communication-sdk-types': 0.1.187(typescript@5.3.3) '@tsconfig/node16': 1.0.4 '@types/cors': 2.8.17 '@types/express': 4.17.21 @@ -22464,6 +22468,7 @@ snapshots: '@rush-temp/pod-inbound-mail@file:projects/pod-inbound-mail.tgz(@babel/core@7.23.9)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.23.9))': dependencies: '@hcengineering/communication-rest-client': 0.1.187(typescript@5.7.3) + '@hcengineering/communication-sdk-types': 0.1.187(typescript@5.7.3) '@hcengineering/communication-types': 0.1.187(typescript@5.7.3) '@tsconfig/node16': 1.0.4 '@types/cors': 2.8.17 diff --git a/services/gmail/pod-gmail/package.json b/services/gmail/pod-gmail/package.json index 8d408ca6cd..c484d4c1b8 100644 --- a/services/gmail/pod-gmail/package.json +++ b/services/gmail/pod-gmail/package.json @@ -64,7 +64,9 @@ "@hcengineering/chat": "^0.6.0", "@hcengineering/client": "^0.6.18", "@hcengineering/client-resources": "^0.6.27", + "@hcengineering/communication-sdk-types": "0.1.187", "@hcengineering/contact": "^0.6.24", + "@hcengineering/kvs-client": "^0.6.0", "@hcengineering/mail-common": "^0.6.0", "@hcengineering/core": "^0.6.32", "@hcengineering/gmail": "^0.6.22", @@ -74,7 +76,6 @@ "@hcengineering/server-client": "^0.6.0", "@hcengineering/server-storage": "^0.6.0", "@hcengineering/server-token": "^0.6.11", - "@hcengineering/kvs-client": "^0.6.0", "cors": "^2.8.5", "dotenv": "~16.0.0", "express": "^4.21.2", diff --git a/services/gmail/pod-gmail/src/__tests__/config.test.ts b/services/gmail/pod-gmail/src/__tests__/config.test.ts new file mode 100644 index 0000000000..f283776d92 --- /dev/null +++ b/services/gmail/pod-gmail/src/__tests__/config.test.ts @@ -0,0 +1,157 @@ +// +// Copyright © 2025 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { IntegrationVersion } from '../types' + +describe('Config', () => { + // Store original environment + const originalEnv = { ...process.env } + + // Mock required environment variables to prevent test failures + beforeEach(() => { + jest.resetModules() // Clear cache for config module between tests + + // Set minimum required env variables + process.env.ACCOUNTS_URL = 'http://accounts.test' + process.env.SECRET = 'test-secret' + process.env.Credentials = 'test-credentials' + process.env.WATCH_TOPIC_NAME = 'test-topic' + process.env.KVS_URL = 'http://kvs.test' + process.env.STORAGE_CONFIG = 'test-storage-config' + }) + + // Restore original environment after tests + afterEach(() => { + process.env = { ...originalEnv } + }) + + it('should load default configuration values', () => { + // Import config inside test to ensure env variables are set + // eslint-disable-next-line @typescript-eslint/no-var-requires + const config = require('../config').default + + // Check default values + expect(config.Port).toBe(8087) + expect(config.ServiceID).toBe('gmail-service') + expect(config.InitLimit).toBe(50) + expect(config.FooterMessage).toContain('Sent via Huly') + expect(config.Version).toBe(IntegrationVersion.V1) + expect(config.QueueRegion).toBe('') + expect(config.CommunicationTopic).toBe('hulygun') + }) + + it('should override defaults with environment variables', () => { + // Set custom values + process.env.PORT = '9000' + process.env.SERVICE_ID = 'custom-service' + process.env.INIT_LIMIT = '100' + process.env.FOOTER_MESSAGE = 'Custom footer' + process.env.VERSION = 'v2' + process.env.QUEUE_CONFIG = 'custom-queue-config' + process.env.QUEUE_REGION = 'custom-region' + process.env.COMMUNICATION_TOPIC = 'custom-topic' + + // Load config with custom values + // eslint-disable-next-line @typescript-eslint/no-var-requires + const config = require('../config').default + + // Check overridden values + expect(config.Port).toBe(9000) + expect(config.ServiceID).toBe('custom-service') + expect(config.InitLimit).toBe(100) + expect(config.FooterMessage).toBe('Custom footer') + expect(config.Version).toBe(IntegrationVersion.V2) + expect(config.QueueConfig).toBe('custom-queue-config') + expect(config.QueueRegion).toBe('custom-region') + expect(config.CommunicationTopic).toBe('custom-topic') + }) + + it('should throw error when required env variables are missing', () => { + // Remove required environment variables + process.env.ACCOUNTS_URL = undefined + + // Expect error when loading config + expect(() => { + require('../config') + }).toThrow('Missing env variables: ACCOUNTS_URL') + }) + + it('should throw error for invalid version value', () => { + // Set invalid version + process.env.VERSION = 'invalid-version' + + // Expect error when loading config + expect(() => { + require('../config') + }).toThrow("Invalid version: invalid-version. Must be 'v1' or 'v2'.") + }) + + it('should throw error when v2 is set but queue config is missing', () => { + // Set v2 without required queue config + process.env.VERSION = 'v2' + process.env.QUEUE_CONFIG = '' + + // Expect error when loading config + expect(() => { + require('../config') + }).toThrow('Missing env variable: QUEUE_CONFIG') + }) + + it('should throw error when v2 is set but communication topic is missing', () => { + // Set v2 with queue config but missing communication topic + process.env.VERSION = 'v2' + process.env.QUEUE_CONFIG = 'test-queue' + process.env.COMMUNICATION_TOPIC = '' + + // Expect error when loading config + expect(() => { + require('../config') + }).toThrow('Missing env variable: COMMUNICATION_TOPIC') + }) + + it('should correctly parse numeric values', () => { + // Set string values for numeric fields + process.env.PORT = '9999' + process.env.INIT_LIMIT = '200' + + // Load config + // eslint-disable-next-line @typescript-eslint/no-var-requires + const config = require('../config').default + + // Check parsed values + expect(config.Port).toBe(9999) + expect(config.InitLimit).toBe(200) + expect(typeof config.Port).toBe('number') + expect(typeof config.InitLimit).toBe('number') + }) + + it('should handle v2 configuration correctly', () => { + // Set up v2 configuration + process.env.VERSION = 'v2' + process.env.QUEUE_CONFIG = 'test-queue-config' + process.env.QUEUE_REGION = 'test-region' + process.env.COMMUNICATION_TOPIC = 'test-topic-name' + + // Load config + // eslint-disable-next-line @typescript-eslint/no-var-requires + const config = require('../config').default + + // Check v2 specific configuration + expect(config.Version).toBe(IntegrationVersion.V2) + expect(config.QueueConfig).toBe('test-queue-config') + expect(config.QueueRegion).toBe('test-region') + expect(config.CommunicationTopic).toBe('test-topic-name') + }) +}) diff --git a/services/gmail/pod-gmail/src/config.ts b/services/gmail/pod-gmail/src/config.ts index a565c50e51..9f4f717424 100644 --- a/services/gmail/pod-gmail/src/config.ts +++ b/services/gmail/pod-gmail/src/config.ts @@ -28,6 +28,9 @@ interface Config extends BaseConfig { FooterMessage: string InitLimit: number Version: IntegrationVersion + QueueConfig: string + QueueRegion: string + CommunicationTopic: string } const envMap: { [key in keyof Config]: string } = { @@ -41,7 +44,10 @@ const envMap: { [key in keyof Config]: string } = { InitLimit: 'INIT_LIMIT', KvsUrl: 'KVS_URL', StorageConfig: 'STORAGE_CONFIG', - Version: 'VERSION' + Version: 'VERSION', + QueueConfig: 'QUEUE_CONFIG', + QueueRegion: 'QUEUE_REGION', + CommunicationTopic: 'COMMUNICATION_TOPIC' } const parseNumber = (str: string | undefined): number | undefined => (str !== undefined ? Number(str) : undefined) @@ -65,7 +71,10 @@ const config: Config = (() => { FooterMessage: process.env[envMap.FooterMessage] ?? '

Sent via Huly

', KvsUrl: process.env[envMap.KvsUrl], StorageConfig: process.env[envMap.StorageConfig], - Version: version + Version: version, + QueueConfig: process.env[envMap.QueueConfig] ?? '', + QueueRegion: process.env[envMap.QueueRegion] ?? '', + CommunicationTopic: process.env[envMap.CommunicationTopic] ?? 'hulygun' } const missingEnv = (Object.keys(params) as Array) @@ -75,6 +84,14 @@ const config: Config = (() => { if (missingEnv.length > 0) { throw Error(`Missing env variables: ${missingEnv.join(', ')}`) } + if (version === IntegrationVersion.V2) { + if (params.QueueConfig === '') { + throw Error('Missing env variable: QUEUE_CONFIG') + } + if (params.CommunicationTopic === '') { + throw Error('Missing env variable: COMMUNICATION_TOPIC') + } + } return params as Config })() diff --git a/services/gmail/pod-gmail/src/main.ts b/services/gmail/pod-gmail/src/main.ts index 86f83c3658..82a2e51745 100644 --- a/services/gmail/pod-gmail/src/main.ts +++ b/services/gmail/pod-gmail/src/main.ts @@ -23,13 +23,15 @@ import { isWorkspaceLoginInfo } from '@hcengineering/account-client' import { initStatisticsContext, type StorageConfiguration } from '@hcengineering/server-core' import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage' import serverToken, { decodeToken } from '@hcengineering/server-token' +import { initQueue, closeQueue } from '@hcengineering/mail-common' import { type IncomingHttpHeaders } from 'http' import { join } from 'path' + import { decode64 } from './base64' import config from './config' import { GmailController } from './gmailController' import { createServer, listen } from './server' -import { type Endpoint, type State } from './types' +import { IntegrationVersion, type Endpoint, type State } from './types' const extractToken = (header: IncomingHttpHeaders): any => { try { @@ -61,6 +63,10 @@ export const main = async (): Promise => { const storageConfig: StorageConfiguration = storageConfigFromEnv() const storageAdapter = buildStorageFromConfig(storageConfig) + if (config.Version === IntegrationVersion.V2) { + initQueue(ctx, 'gmail-service', config) + } + const gmailController = GmailController.create(ctx, storageAdapter) await gmailController.startAll() const endpoints: Endpoint[] = [ @@ -170,6 +176,7 @@ export const main = async (): Promise => { const asyncClose = async (): Promise => { await gmailController.close() await storageAdapter.close() + await closeQueue() } const shutdown = (): void => { diff --git a/services/gmail/pod-gmail/src/message/v2/message.ts b/services/gmail/pod-gmail/src/message/v2/message.ts index 21750a3b81..812dc9807e 100644 --- a/services/gmail/pod-gmail/src/message/v2/message.ts +++ b/services/gmail/pod-gmail/src/message/v2/message.ts @@ -17,7 +17,13 @@ import { gmail_v1 } from 'googleapis' import sanitizeHtml from 'sanitize-html' import { TxOperations, type MeasureContext } from '@hcengineering/core' -import { createMessages, parseEmailHeader, parseNameFromEmailHeader, EmailMessage } from '@hcengineering/mail-common' +import { + createMessages, + parseEmailHeader, + parseNameFromEmailHeader, + EmailMessage, + getProducer +} from '@hcengineering/mail-common' import { type KeyValueClient } from '@hcengineering/kvs-client' import { AccountClient, isWorkspaceLoginInfo, WorkspaceLoginInfo } from '@hcengineering/account-client' @@ -55,6 +61,7 @@ export class MessageManagerV2 implements IMessageManager { this.ctx, this.txClient, this.keyValueClient, + await getProducer(config.CommunicationTopic), this.token, this.wsInfo, res, diff --git a/services/mail/mail-common/package.json b/services/mail/mail-common/package.json index 7cc80b4c6c..1461969229 100644 --- a/services/mail/mail-common/package.json +++ b/services/mail/mail-common/package.json @@ -50,12 +50,17 @@ "@hcengineering/card": "^0.6.0", "@hcengineering/chat": "^0.6.0", "@hcengineering/communication-rest-client": "0.1.187", + "@hcengineering/communication-sdk-types": "0.1.187", + "@hcengineering/communication-shared": "0.1.187", "@hcengineering/communication-types": "0.1.187", "@hcengineering/contact": "^0.6.24", "@hcengineering/core": "^0.6.32", "@hcengineering/kvs-client": "^0.6.0", + "@hcengineering/kafka": "^0.6.0", "@hcengineering/mail": "^0.6.0", + "@hcengineering/server-core": "^0.6.1", "@hcengineering/server-storage": "^0.6.0", + "kafkajs": "^2.2.4", "sanitize-html": "^2.15.0", "turndown": "^7.2.0", "uuid": "^8.3.2" diff --git a/services/mail/mail-common/src/__tests__/queue.test.ts b/services/mail/mail-common/src/__tests__/queue.test.ts new file mode 100644 index 0000000000..77e6d147f7 --- /dev/null +++ b/services/mail/mail-common/src/__tests__/queue.test.ts @@ -0,0 +1,271 @@ +import { Kafka, Producer } from 'kafkajs' +import { parseQueueConfig } from '@hcengineering/kafka' +import { MeasureContext } from '@hcengineering/core' +import { initQueue, closeQueue, getProducer, KafkaQueueRegistry } from '../queue' +import { BaseConfig } from '../types' + +// Mock dependencies +jest.mock('kafkajs') +jest.mock('@hcengineering/kafka') + +/* eslint-disable @typescript-eslint/unbound-method */ +describe('Kafka Queue Management', () => { + // Mock objects + let mockCtx: MeasureContext + let mockConfig: BaseConfig + let mockKafka: Kafka + let mockProducer: Producer + let disconnectMock: jest.Mock + + // Setup function to create clean mocks for each test + beforeEach(() => { + // Reset modules to clear any singleton state + jest.resetModules() + + // Create mock for producer + disconnectMock = jest.fn().mockResolvedValue(undefined) + mockProducer = { + connect: jest.fn().mockResolvedValue(undefined), + disconnect: disconnectMock, + send: jest.fn().mockResolvedValue(undefined), + on: jest.fn(), + events: {}, + transaction: jest.fn(), + logger: jest.fn() + } as unknown as Producer + + // Create mock for Kafka + mockKafka = { + producer: jest.fn().mockReturnValue(mockProducer), + consumer: jest.fn(), + admin: jest.fn() + } as unknown as Kafka + + // Mock constructor for Kafka + ;(Kafka as jest.Mock).mockImplementation(() => mockKafka) + + // Create context mock + mockCtx = { + info: jest.fn(), + error: jest.fn(), + warn: jest.fn(), + measure: jest.fn() + } as unknown as MeasureContext + + // Create config mock + mockConfig = { + QueueConfig: 'kafka:broker:9092', + QueueRegion: 'test-region', + CommunicationTopic: 'test-topic' + } as any + + // Mock parseQueueConfig + ;(parseQueueConfig as jest.Mock).mockReturnValue({ + clientId: 'test-service', + brokers: ['broker:9092'], + ssl: false + }) + }) + + afterEach(async () => { + // Clean up after each test + await closeQueue() + }) + + describe('initQueue', () => { + it('should initialize the queue registry with correct parameters', () => { + // Act + initQueue(mockCtx, 'test-service', mockConfig) + + // Assert + expect(parseQueueConfig).toHaveBeenCalledWith(mockConfig.QueueConfig, 'test-service', mockConfig.QueueRegion) + + expect(Kafka).toHaveBeenCalledWith({ + clientId: 'test-service', + brokers: ['broker:9092'], + ssl: false + }) + + expect(mockCtx.info).toHaveBeenCalledWith('Kafka queue initialized', { serviceId: 'test-service' }) + }) + + it('should throw an error when initialized twice', () => { + // Arrange + initQueue(mockCtx, 'test-service', mockConfig) + + // Act & Assert + expect(() => { + initQueue(mockCtx, 'test-service', mockConfig) + }).toThrow('Queue already initialized') + }) + + it('should throw an error when queue config is missing', () => { + // Arrange + const configWithoutQueue: BaseConfig = { ...mockConfig, QueueConfig: undefined } as any + + // Act & Assert + expect(() => { + initQueue(mockCtx, 'test-service', configWithoutQueue) + }).toThrow('Please provide queue config') + }) + }) + + describe('getProducer', () => { + it('should throw an error when queue is not initialized', async () => { + // Act & Assert + await expect(getProducer('test-topic')).rejects.toThrow('Queue not initialized') + }) + + it('should create a new producer when one does not exist for the topic', async () => { + // Arrange + initQueue(mockCtx, 'test-service', mockConfig) + + // Act + const producer = await getProducer('test-topic') + + // Assert + expect(producer).toBe(mockProducer) + expect(mockKafka.producer).toHaveBeenCalled() + expect(mockProducer.connect).toHaveBeenCalled() + expect(mockCtx.info).toHaveBeenCalledWith('Created new Kafka producer', { key: 'test-topic' }) + }) + + it('should reuse an existing producer for the same topic', async () => { + // Arrange + initQueue(mockCtx, 'test-service', mockConfig) + + // Act + const producer1 = await getProducer('test-topic') + const producer2 = await getProducer('test-topic') + + // Assert + expect(producer1).toBe(producer2) + expect(mockKafka.producer).toHaveBeenCalledTimes(1) // Called only once + }) + + it('should create different producers for different topics', async () => { + // Arrange + initQueue(mockCtx, 'test-service', mockConfig) + + // Setup a second producer + const mockProducer2 = { ...mockProducer } + mockKafka.producer = jest.fn().mockReturnValueOnce(mockProducer).mockReturnValueOnce(mockProducer2) + + // Act + const producer1 = await getProducer('topic1') + const producer2 = await getProducer('topic2') + + // Assert + expect(mockKafka.producer).toHaveBeenCalledTimes(2) + expect(producer1).not.toBe(producer2) + }) + }) + + describe('closeQueue', () => { + it('should do nothing if the queue is not initialized', async () => { + // Act + await closeQueue() + + // Assert + expect(mockProducer.disconnect).not.toHaveBeenCalled() + }) + + it('should close all producers', async () => { + // Arrange + initQueue(mockCtx, 'test-service', mockConfig) + + // Get a couple of producers + await getProducer('topic1') + await getProducer('topic2') + + // Act + await closeQueue() + + // Assert + expect(mockProducer.disconnect).toHaveBeenCalledTimes(2) + expect(mockCtx.info).toHaveBeenCalledWith('KafkaQueueRegistry closed') + }) + + it('should handle errors when closing producers', async () => { + // Arrange + initQueue(mockCtx, 'test-service', mockConfig) + + // Get a producer + await getProducer('test-topic') + + // Make disconnect throw an error + const error = new Error('Disconnect failed') + disconnectMock.mockRejectedValueOnce(error) + + // Act + await closeQueue() + + // Assert + expect(mockCtx.error).toHaveBeenCalledWith('Failed to close Kafka producer', { + topic: 'test-topic', + error + }) + + // It should still clear the producers + expect(mockCtx.info).toHaveBeenCalledWith('KafkaQueueRegistry closed') + }) + + it('should reset the registry after closing', async () => { + // Arrange + initQueue(mockCtx, 'test-service', mockConfig) + + // Act + await closeQueue() + + // The registry should be reset, so initializing again should work + initQueue(mockCtx, 'test-service', mockConfig) + + // Assert - expect no errors and two initialization logs + expect(mockCtx.info).toHaveBeenCalledWith('Kafka queue initialized', { serviceId: 'test-service' }) + }) + }) + + describe('KafkaQueueRegistry', () => { + it('should create a Kafka instance with correct configuration', () => { + // Act + // eslint-disable-next-line no-new + new KafkaQueueRegistry(mockCtx, 'test-service', mockConfig) + + // Assert + expect(parseQueueConfig).toHaveBeenCalledWith(mockConfig.QueueConfig, 'test-service', mockConfig.QueueRegion) + + expect(Kafka).toHaveBeenCalledWith({ + clientId: 'test-service', + brokers: ['broker:9092'], + ssl: false + }) + }) + + it('should log when a producer is created', async () => { + // Arrange + const registry = new KafkaQueueRegistry(mockCtx, 'test-service', mockConfig) + + // Act + await registry.getProducer('test-topic') + + // Assert + expect(mockCtx.info).toHaveBeenCalledWith('Created new Kafka producer', { key: 'test-topic' }) + }) + + it('should close all producers during shutdown', async () => { + // Arrange + const registry = new KafkaQueueRegistry(mockCtx, 'test-service', mockConfig) + + // Create some producers + await registry.getProducer('topic1') + await registry.getProducer('topic2') + + // Act + await registry.close() + + // Assert + expect(mockProducer.disconnect).toHaveBeenCalledTimes(2) + expect(mockCtx.info).toHaveBeenCalledWith('KafkaQueueRegistry closed') + }) + }) +}) diff --git a/services/mail/mail-common/src/index.ts b/services/mail/mail-common/src/index.ts index a169b586a6..df82f64f6b 100644 --- a/services/mail/mail-common/src/index.ts +++ b/services/mail/mail-common/src/index.ts @@ -17,3 +17,4 @@ export * from './message' export * from './types' export * from './utils' export * from './mutex' +export * from './queue' diff --git a/services/mail/mail-common/src/message.ts b/services/mail/mail-common/src/message.ts index 60f9e3fc6c..f88ecbb884 100644 --- a/services/mail/mail-common/src/message.ts +++ b/services/mail/mail-common/src/message.ts @@ -12,12 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. // +import { Producer } from 'kafkajs' + import { WorkspaceLoginInfo } from '@hcengineering/account-client' import { type Card } from '@hcengineering/card' -import { - type RestClient as CommunicationClient, - createRestClient as getCommunicationClient -} from '@hcengineering/communication-rest-client' import { MessageType } from '@hcengineering/communication-types' import chat from '@hcengineering/chat' import { PersonSpace } from '@hcengineering/contact' @@ -27,14 +25,18 @@ import { type PersonId, type Ref, type TxOperations, + Doc, generateId, PersonUuid, - RateLimiter + RateLimiter, + Space } from '@hcengineering/core' import mail from '@hcengineering/mail' import { type KeyValueClient } from '@hcengineering/kvs-client' import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage' +import { MessageRequestEventType } from '@hcengineering/communication-sdk-types' +import { generateMessageId } from '@hcengineering/communication-shared' import { BaseConfig, type Attachment } from './types' import { EmailMessage } from './types' @@ -49,6 +51,7 @@ export async function createMessages ( ctx: MeasureContext, txClient: TxOperations, keyValueClient: KeyValueClient, + producer: Producer, token: string, wsInfo: WorkspaceLoginInfo, message: EmailMessage, @@ -62,7 +65,6 @@ export async function createMessages ( const personSpacesCache = PersonSpacesCacheFactory.getInstance(ctx, txClient, wsInfo.workspace) const channelCache = ChannelCacheFactory.getInstance(ctx, txClient, wsInfo.workspace) const threadLookup = ThreadLookupService.getInstance(ctx, keyValueClient, token) - const msgClient = getCommunicationClient(wsInfo.endpoint, wsInfo.workspace, wsInfo.token) const fromPerson = await personCache.ensurePerson(from) @@ -116,10 +118,12 @@ export async function createMessages ( const spaces = await personSpacesCache.getPersonSpaces(mailId, fromPerson.uuid, from.email) if (spaces.length > 0) { await saveMessageToSpaces( + config, ctx, txClient, - msgClient, + producer, threadLookup, + wsInfo, mailId, spaces, participants, @@ -148,10 +152,12 @@ export async function createMessages ( const spaces = await personSpacesCache.getPersonSpaces(mailId, to.uuid, to.address) if (spaces.length > 0) { await saveMessageToSpaces( + config, ctx, txClient, - msgClient, + producer, threadLookup, + wsInfo, mailId, spaces, participants, @@ -173,10 +179,12 @@ export async function createMessages ( } async function saveMessageToSpaces ( + config: BaseConfig, ctx: MeasureContext, client: TxOperations, - msgClient: CommunicationClient, + producer: Producer, threadLookup: ThreadLookupService, + wsInfo: WorkspaceLoginInfo, mailId: string, spaces: PersonSpace[], participants: PersonId[], @@ -208,8 +216,9 @@ async function saveMessageToSpaces ( ctx.info('Found existing thread', { mailId, threadId, spaceId }) } } + let channel: Ref> | undefined if (threadId === undefined) { - const channel = await channelCache.getOrCreateChannel(spaceId, participants, me, owner) + channel = await channelCache.getOrCreateChannel(spaceId, participants, me, owner) const newThreadId = await client.createDoc( chat.masterTag.Thread, space._id, @@ -241,30 +250,62 @@ async function saveMessageToSpaces ( ctx.info('Created new thread', { mailId, threadId, spaceId }) } - const { id: messageId, created: messageCreated } = await msgClient.createMessage( - threadId, - chat.masterTag.Thread, - content, - modifiedBy, - MessageType.Message, - { - created: createdDate - } - ) - ctx.info('Created message', { mailId, messageId, threadId }) + const messageId = generateMessageId() + const created = new Date(createdDate) - for (const a of attachments) { - await msgClient.createFile( - threadId, - messageId, - messageCreated, - a.id as Ref, - a.contentType, - a.name, - a.data.length, - modifiedBy + const messageData = Buffer.from( + JSON.stringify({ + type: MessageRequestEventType.CreateMessage, + messageType: MessageType.Message, + card: threadId, + cardType: chat.masterTag.Thread, + content, + creator: modifiedBy, + created, + id: messageId + }) + ) + await producer.send({ + topic: config.CommunicationTopic, + messages: [ + { + key: Buffer.from(channel ?? spaceId), + value: messageData, + headers: { + WorkspaceUuid: wsInfo.workspace + } + } + ] + }) + ctx.info('Send message event', { mailId, messageId, threadId }) + + const fileData: Buffer[] = attachments.map((a) => + Buffer.from( + JSON.stringify({ + type: MessageRequestEventType.CreateFile, + card: threadId, + message: messageId, + messageCreated: created, + blobId: a.id as Ref, + fileType: a.contentType, + filename: a.name, + size: a.data.length, + creator: modifiedBy + }) ) - } + ) + const fileEvents = fileData.map((data) => ({ + key: Buffer.from(channel ?? spaceId), + value: data, + headers: { + WorkspaceUuid: wsInfo.workspace + } + })) + await producer.send({ + topic: config.CommunicationTopic, + messages: fileEvents + }) + ctx.info('Send file events', { mailId, messageId, threadId, count: fileEvents.length }) await threadLookup.setThreadId(mailId, space._id, threadId) }) diff --git a/services/mail/mail-common/src/queue.ts b/services/mail/mail-common/src/queue.ts new file mode 100644 index 0000000000..878b2ee799 --- /dev/null +++ b/services/mail/mail-common/src/queue.ts @@ -0,0 +1,86 @@ +import { Kafka, Producer } from 'kafkajs' +import { parseQueueConfig } from '@hcengineering/kafka' +import { BaseConfig } from './types' +import { MeasureContext } from '@hcengineering/core' + +let queueRegistry: KafkaQueueRegistry | undefined + +export function initQueue (ctx: MeasureContext, serviceId: string, config: BaseConfig): void { + if (queueRegistry !== undefined) { + throw new Error('Queue already initialized') + } + + queueRegistry = new KafkaQueueRegistry(ctx, serviceId, config) + ctx.info('Kafka queue initialized', { serviceId }) +} + +export async function closeQueue (): Promise { + if (queueRegistry !== undefined) { + await queueRegistry.close() + queueRegistry = undefined + } +} + +export async function getProducer (key: string): Promise { + if (queueRegistry === undefined) { + throw new Error('Queue not initialized') + } + return await queueRegistry.getProducer(key) +} +export class KafkaQueueRegistry { + private readonly kafka: Kafka + private readonly producers = new Map() + + constructor ( + private readonly ctx: MeasureContext, + serviceId: string, + serviceConfig: BaseConfig + ) { + this.kafka = getKafkaQueue(serviceId, serviceConfig) + + ctx.info('Kafka client created', { serviceId }) + } + + public async getProducer (key: string): Promise { + const producer = this.producers.get(key) + if (producer !== undefined) { + return producer + } + + const kafkaProducer = this.kafka.producer() + + await kafkaProducer.connect() + + this.producers.set(key, kafkaProducer) + this.ctx.info('Created new Kafka producer', { key }) + + return kafkaProducer + } + + public async close (): Promise { + for (const [topic, producer] of this.producers.entries()) { + try { + await producer.disconnect() + this.ctx.info('Kafka producer closed', { topic }) + } catch (err) { + this.ctx.error('Failed to close Kafka producer', { topic, error: err }) + } + } + + this.producers.clear() + this.ctx.info('KafkaQueueRegistry closed') + } +} + +function getKafkaQueue (serviceId: string, serviceConfig: BaseConfig): Kafka { + const { QueueConfig, QueueRegion } = serviceConfig + if (QueueConfig === undefined) { + throw new Error('Please provide queue config') + } + const config = parseQueueConfig(QueueConfig, serviceId, QueueRegion) + return new Kafka({ + clientId: config.clientId, + brokers: config.brokers, + ssl: config.ssl + }) +} diff --git a/services/mail/mail-common/src/types.ts b/services/mail/mail-common/src/types.ts index 9185042e7b..11ac14e731 100644 --- a/services/mail/mail-common/src/types.ts +++ b/services/mail/mail-common/src/types.ts @@ -45,4 +45,7 @@ export interface BaseConfig { AccountsURL: string KvsUrl: string StorageConfig: string + QueueConfig: string + QueueRegion: string + CommunicationTopic: string } diff --git a/services/mail/pod-inbound-mail/package.json b/services/mail/pod-inbound-mail/package.json index 18d9b6dc88..ed07ec12be 100644 --- a/services/mail/pod-inbound-mail/package.json +++ b/services/mail/pod-inbound-mail/package.json @@ -62,6 +62,7 @@ "@hcengineering/api-client": "^0.6.0", "@hcengineering/card": "^0.6.0", "@hcengineering/communication-rest-client": "0.1.187", + "@hcengineering/communication-sdk-types": "0.1.187", "@hcengineering/communication-types": "0.1.187", "@hcengineering/contact": "^0.6.24", "@hcengineering/core": "^0.6.32", diff --git a/services/mail/pod-inbound-mail/src/__tests__/handlerMta.test.ts b/services/mail/pod-inbound-mail/src/__tests__/handlerMta.test.ts index 08731a8cd6..e162b3835a 100644 --- a/services/mail/pod-inbound-mail/src/__tests__/handlerMta.test.ts +++ b/services/mail/pod-inbound-mail/src/__tests__/handlerMta.test.ts @@ -22,7 +22,8 @@ import { createRestTxOperations } from '@hcengineering/api-client' // Mock dependencies jest.mock('@hcengineering/mail-common', () => ({ - createMessages: jest.fn() + createMessages: jest.fn(), + getProducer: jest.fn().mockReturnValue({}) })) jest.mock('../client', () => ({ @@ -177,6 +178,7 @@ describe('handleMtaHook', () => { mockCtx, mockTxOperations, // This should be the TxOperations mock, not kvsClient {}, // This should be the KeyValueClient mock + {}, client.mailServiceToken, mockLoginInfo, // Added workspace login info expect.objectContaining({ @@ -221,6 +223,7 @@ describe('handleMtaHook', () => { mockCtx, mockTxOperations, {}, + {}, client.mailServiceToken, mockLoginInfo, expect.objectContaining({ @@ -246,6 +249,7 @@ describe('handleMtaHook', () => { mockCtx, mockTxOperations, {}, + {}, client.mailServiceToken, mockLoginInfo, expect.objectContaining({ @@ -275,6 +279,7 @@ describe('handleMtaHook', () => { mockCtx, mockTxOperations, {}, + {}, client.mailServiceToken, mockLoginInfo, expect.objectContaining({ @@ -299,6 +304,7 @@ describe('handleMtaHook', () => { mockCtx, mockTxOperations, {}, + {}, client.mailServiceToken, mockLoginInfo, expect.objectContaining({ @@ -327,6 +333,7 @@ describe('handleMtaHook', () => { mockCtx, mockTxOperations, {}, + {}, client.mailServiceToken, mockLoginInfo, expect.objectContaining({ diff --git a/services/mail/pod-inbound-mail/src/client.ts b/services/mail/pod-inbound-mail/src/client.ts index 5e41b9b152..775e5406bf 100644 --- a/services/mail/pod-inbound-mail/src/client.ts +++ b/services/mail/pod-inbound-mail/src/client.ts @@ -25,6 +25,9 @@ export const mailServiceToken = generateToken(systemAccountUuid, undefined, { se export const baseConfig: BaseConfig = { AccountsURL: config.accountsUrl, KvsUrl: config.kvsUrl, - StorageConfig: config.storageConfig ?? '' + StorageConfig: config.storageConfig ?? '', + QueueConfig: config.queueConfig ?? '', + QueueRegion: config.queueRegion ?? '', + CommunicationTopic: config.communicationTopic } export const kvsClient = getClient('inbound-mail', baseConfig.KvsUrl, mailServiceToken) diff --git a/services/mail/pod-inbound-mail/src/config.ts b/services/mail/pod-inbound-mail/src/config.ts index b115f06ef2..fa4a695ae0 100644 --- a/services/mail/pod-inbound-mail/src/config.ts +++ b/services/mail/pod-inbound-mail/src/config.ts @@ -26,6 +26,9 @@ interface Config { mailSizeLimit?: string storageConfig?: string kvsUrl: string + queueConfig: string + queueRegion: string + communicationTopic: string } const config: Config = { @@ -47,7 +50,15 @@ const config: Config = { return process.env.KVS_URL } throw Error('KVS_URL env var is not set') - })() + })(), + queueConfig: (() => { + if (process.env.QUEUE_CONFIG !== undefined) { + return process.env.QUEUE_CONFIG + } + throw Error('QUEUE_CONFIG env var is not set') + })(), + queueRegion: process.env.QUEUE_REGION ?? '', + communicationTopic: process.env.COMMUNICATION_TOPIC ?? 'hulygun' } export default config diff --git a/services/mail/pod-inbound-mail/src/handlerMta.ts b/services/mail/pod-inbound-mail/src/handlerMta.ts index 985cdf7da9..976a60e687 100644 --- a/services/mail/pod-inbound-mail/src/handlerMta.ts +++ b/services/mail/pod-inbound-mail/src/handlerMta.ts @@ -18,12 +18,17 @@ import { Request, Response } from 'express' import TurndownService from 'turndown' import sanitizeHtml from 'sanitize-html' import { MeasureContext } from '@hcengineering/core' -import { type Attachment, type EmailContact, type EmailMessage, createMessages } from '@hcengineering/mail-common' +import { + type Attachment, + type EmailContact, + type EmailMessage, + createMessages, + getProducer +} from '@hcengineering/mail-common' import { getClient as getAccountClient } from '@hcengineering/account-client' import { createRestTxOperations } from '@hcengineering/api-client' import { mailServiceToken, baseConfig, kvsClient } from './client' - import config from './config' export interface MtaMessage { @@ -118,7 +123,17 @@ export async function handleMtaHook (req: Request, res: Response, ctx: MeasureCo const transactorUrl = wsInfo.endpoint.replace('ws://', 'http://').replace('wss://', 'https://') const txClient = await createRestTxOperations(transactorUrl, wsInfo.workspace, wsInfo.token) - await createMessages(baseConfig, ctx, txClient, kvsClient, mailServiceToken, wsInfo, convertedMessage, attachments) + await createMessages( + baseConfig, + ctx, + txClient, + kvsClient, + await getProducer(baseConfig.CommunicationTopic), + mailServiceToken, + wsInfo, + convertedMessage, + attachments + ) } catch (error) { ctx.error('mta-hook', { error }) } finally { diff --git a/services/mail/pod-inbound-mail/src/index.ts b/services/mail/pod-inbound-mail/src/index.ts index 98372aa6f1..2a4bd61d5a 100644 --- a/services/mail/pod-inbound-mail/src/index.ts +++ b/services/mail/pod-inbound-mail/src/index.ts @@ -21,8 +21,11 @@ import { MeasureContext, MeasureMetricsContext, newMetrics } from '@hcengineerin import { setMetadata } from '@hcengineering/platform' import { initStatisticsContext } from '@hcengineering/server-core' import serverToken from '@hcengineering/server-token' +import { initQueue, closeQueue } from '@hcengineering/mail-common' + import { handleMtaHook } from './handlerMta' import config from './config' +import { baseConfig } from './client' type RequestHandler = (req: Request, res: Response, ctx: MeasureContext, next?: NextFunction) => Promise @@ -43,6 +46,8 @@ async function main (): Promise { setMetadata(serverToken.metadata.Secret, config.secret) + initQueue(ctx, 'inbound-mail', baseConfig) + const app = express() app.use(cors()) @@ -91,7 +96,7 @@ async function main (): Promise { const shutdown = (): void => { server.close(() => { - process.exit() + void closeQueue().then(() => process.exit()) }) }