Merge branch 'develop' of https://github.com/hcengineering/platform into staging-new

Signed-off-by: Artem Savchenko <armisav@gmail.com>
This commit is contained in:
Artem Savchenko 2025-05-21 11:40:25 +07:00
commit c065918882
18 changed files with 694 additions and 51 deletions

View File

@ -4076,7 +4076,7 @@ packages:
version: 0.0.0
'@rush-temp/card-resources@file:projects/card-resources.tgz':
resolution: {integrity: sha512-CdWQQYWbLy+4AITDmlv2q7Sw96mSzRsslUOU/87wDljEuIubpEKNhfccS42ImusNZMXj4yvd5wwM9ajjBMRKkg==, tarball: file:projects/card-resources.tgz}
resolution: {integrity: sha512-pMv5ceyUDDEJBQbuNCHXE72Px6MNS16zvjzmK7M+PZxOmOXFNwldGCq/jPhslXqJXC24WjqpHCiQZ0gs7dN9uA==, tarball: file:projects/card-resources.tgz}
version: 0.0.0
'@rush-temp/card@file:projects/card.tgz':
@ -4412,7 +4412,7 @@ packages:
version: 0.0.0
'@rush-temp/mail-common@file:projects/mail-common.tgz':
resolution: {integrity: sha512-4ThjG7u37BfKgOUzqyVxZ+uCt5Z7Yv2DX192wxBuTbl1EX3t50XjTERnjt4Iw0rYFhXc7lOKZygYF1u8a3APhA==, tarball: file:projects/mail-common.tgz}
resolution: {integrity: sha512-DuWl8K6KBwPKVeIaSqUfgTwasFy+KEpU0zrPicIkuDaMILOKGCkmVM7YESS/ggiBlKxtyIOwKRDYeka1dhtxZw==, tarball: file:projects/mail-common.tgz}
version: 0.0.0
'@rush-temp/mail@file:projects/mail.tgz':
@ -4896,11 +4896,11 @@ packages:
version: 0.0.0
'@rush-temp/pod-gmail@file:projects/pod-gmail.tgz':
resolution: {integrity: sha512-5rn2Y1X7GtTiOKR5vbkqo5oB/zEeQXOiSw4nTPL8xp/Jx66PDnGjkwkHqBlberol5hG2/ns8im3Zqxql5O2VXw==, tarball: file:projects/pod-gmail.tgz}
resolution: {integrity: sha512-zgEk+yi5UCh/xDEFIoancdkzC+k/3o+yuyZ7JW6OiH4oi8xam0FJLk0ZgpPgEheHiD0qU2OfDPS5cymaivQTew==, tarball: file:projects/pod-gmail.tgz}
version: 0.0.0
'@rush-temp/pod-inbound-mail@file:projects/pod-inbound-mail.tgz':
resolution: {integrity: sha512-vRYTZhVYb2trqlb5uKFKAPJq7Ay7EaTFy6FijfUTRqitr9QJEREl4PKdf752cOFUfgC8L5uHTH6Q2rgbvsXSug==, tarball: file:projects/pod-inbound-mail.tgz}
resolution: {integrity: sha512-EQdkJNUsw+xUAs74SJ+44fJCrR/kNeSCRzm3Dah9tFlhIcTZl5d80ir8VotOXBZ4McKxAFo3is05crGMDxP04Q==, tarball: file:projects/pod-inbound-mail.tgz}
version: 0.0.0
'@rush-temp/pod-love@file:projects/pod-love.tgz':
@ -4992,7 +4992,7 @@ packages:
version: 0.0.0
'@rush-temp/process@file:projects/process.tgz':
resolution: {integrity: sha512-3vz9X1/VX0PmWuKw9m3Nlfp2CjFZo2MpgCRdLfPaeo6EHExPeSitbfYl7LojLsGo5Uk7tSnUOwMSgd/GOndOVA==, tarball: file:projects/process.tgz}
resolution: {integrity: sha512-FLoi1Ooh3ntse8Czmk9pyT2ASy8VHz1xGUUXf38t09UKTZ/8SfqwrOtNWYuVnyIrZ3zVVmSL7xcOWmcn5AdTSw==, tarball: file:projects/process.tgz}
version: 0.0.0
'@rush-temp/prod@file:projects/prod.tgz':
@ -5216,7 +5216,7 @@ packages:
version: 0.0.0
'@rush-temp/server-gmail-resources@file:projects/server-gmail-resources.tgz':
resolution: {integrity: sha512-VUZuCBA60lmGBsC072eJNhPcfGLWGWaQKj6NmEZSutJt2M0dzchgdC5iWr+AX+fhE3pmDJhNW8hYdU06Y5FeZg==, tarball: file:projects/server-gmail-resources.tgz}
resolution: {integrity: sha512-PsKe9cztrK9Mj6TZO5scM/IArHSVaiJ+SMNwMoz6j8Kb2jZBP3a36wWG0Ly5GuDU7m272NE8bjuKUNmzM+kfZQ==, tarball: file:projects/server-gmail-resources.tgz}
version: 0.0.0
'@rush-temp/server-gmail@file:projects/server-gmail.tgz':
@ -5616,7 +5616,7 @@ packages:
version: 0.0.0
'@rush-temp/workbench-resources@file:projects/workbench-resources.tgz':
resolution: {integrity: sha512-N00MR4MXHz3pMLbTWfIwQCKUpwtaLTtx2lu+UR9QYhn2CiMT/tyVqfxtN2Pkbf0sj5VG5euI6jsnKskYgUY74g==, tarball: file:projects/workbench-resources.tgz}
resolution: {integrity: sha512-UfLCpybz5EEeX1qnGkQgS0cKEvptFzYm8D8YR9uIqSlfRAnCjKpEFTexnvbXZQeUsjvbIwkMQYsSbyhk/pYrGg==, tarball: file:projects/workbench-resources.tgz}
version: 0.0.0
'@rush-temp/workbench@file:projects/workbench.tgz':
@ -20013,6 +20013,8 @@ snapshots:
'@rush-temp/mail-common@file:projects/mail-common.tgz(@babel/core@7.23.9)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.23.9))':
dependencies:
'@hcengineering/communication-rest-client': 0.1.187(typescript@5.7.3)
'@hcengineering/communication-sdk-types': 0.1.187(typescript@5.7.3)
'@hcengineering/communication-shared': 0.1.187(typescript@5.7.3)
'@hcengineering/communication-types': 0.1.187(typescript@5.7.3)
'@tsconfig/node16': 1.0.4
'@types/express': 4.17.21
@ -20031,6 +20033,7 @@ snapshots:
eslint-plugin-node: 11.1.0(eslint@8.56.0)
eslint-plugin-promise: 6.1.1(eslint@8.56.0)
jest: 29.7.0(@types/node@20.11.19)(ts-node@10.9.2(@types/node@20.11.19)(typescript@5.3.3))
kafkajs: 2.2.4
prettier: 3.2.5
sanitize-html: 2.16.0
ts-jest: 29.1.2(@babel/core@7.23.9)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.23.9))(esbuild@0.24.2)(jest@29.7.0(@types/node@20.11.19)(ts-node@10.9.2(@types/node@20.11.19)(typescript@5.3.3)))(typescript@5.7.3)
@ -22412,6 +22415,7 @@ snapshots:
'@rush-temp/pod-gmail@file:projects/pod-gmail.tgz(@babel/core@7.23.9)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.23.9))(bufferutil@4.0.8)(encoding@0.1.13)(utf-8-validate@6.0.4)':
dependencies:
'@hcengineering/communication-sdk-types': 0.1.187(typescript@5.3.3)
'@tsconfig/node16': 1.0.4
'@types/cors': 2.8.17
'@types/express': 4.17.21
@ -22464,6 +22468,7 @@ snapshots:
'@rush-temp/pod-inbound-mail@file:projects/pod-inbound-mail.tgz(@babel/core@7.23.9)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.23.9))':
dependencies:
'@hcengineering/communication-rest-client': 0.1.187(typescript@5.7.3)
'@hcengineering/communication-sdk-types': 0.1.187(typescript@5.7.3)
'@hcengineering/communication-types': 0.1.187(typescript@5.7.3)
'@tsconfig/node16': 1.0.4
'@types/cors': 2.8.17

View File

@ -64,7 +64,9 @@
"@hcengineering/chat": "^0.6.0",
"@hcengineering/client": "^0.6.18",
"@hcengineering/client-resources": "^0.6.27",
"@hcengineering/communication-sdk-types": "0.1.187",
"@hcengineering/contact": "^0.6.24",
"@hcengineering/kvs-client": "^0.6.0",
"@hcengineering/mail-common": "^0.6.0",
"@hcengineering/core": "^0.6.32",
"@hcengineering/gmail": "^0.6.22",
@ -74,7 +76,6 @@
"@hcengineering/server-client": "^0.6.0",
"@hcengineering/server-storage": "^0.6.0",
"@hcengineering/server-token": "^0.6.11",
"@hcengineering/kvs-client": "^0.6.0",
"cors": "^2.8.5",
"dotenv": "~16.0.0",
"express": "^4.21.2",

View File

@ -0,0 +1,157 @@
//
// Copyright © 2025 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { IntegrationVersion } from '../types'
describe('Config', () => {
// Store original environment
const originalEnv = { ...process.env }
// Mock required environment variables to prevent test failures
beforeEach(() => {
jest.resetModules() // Clear cache for config module between tests
// Set minimum required env variables
process.env.ACCOUNTS_URL = 'http://accounts.test'
process.env.SECRET = 'test-secret'
process.env.Credentials = 'test-credentials'
process.env.WATCH_TOPIC_NAME = 'test-topic'
process.env.KVS_URL = 'http://kvs.test'
process.env.STORAGE_CONFIG = 'test-storage-config'
})
// Restore original environment after tests
afterEach(() => {
process.env = { ...originalEnv }
})
it('should load default configuration values', () => {
// Import config inside test to ensure env variables are set
// eslint-disable-next-line @typescript-eslint/no-var-requires
const config = require('../config').default
// Check default values
expect(config.Port).toBe(8087)
expect(config.ServiceID).toBe('gmail-service')
expect(config.InitLimit).toBe(50)
expect(config.FooterMessage).toContain('Sent via <a href="https://huly.io">Huly</a>')
expect(config.Version).toBe(IntegrationVersion.V1)
expect(config.QueueRegion).toBe('')
expect(config.CommunicationTopic).toBe('hulygun')
})
it('should override defaults with environment variables', () => {
// Set custom values
process.env.PORT = '9000'
process.env.SERVICE_ID = 'custom-service'
process.env.INIT_LIMIT = '100'
process.env.FOOTER_MESSAGE = 'Custom footer'
process.env.VERSION = 'v2'
process.env.QUEUE_CONFIG = 'custom-queue-config'
process.env.QUEUE_REGION = 'custom-region'
process.env.COMMUNICATION_TOPIC = 'custom-topic'
// Load config with custom values
// eslint-disable-next-line @typescript-eslint/no-var-requires
const config = require('../config').default
// Check overridden values
expect(config.Port).toBe(9000)
expect(config.ServiceID).toBe('custom-service')
expect(config.InitLimit).toBe(100)
expect(config.FooterMessage).toBe('Custom footer')
expect(config.Version).toBe(IntegrationVersion.V2)
expect(config.QueueConfig).toBe('custom-queue-config')
expect(config.QueueRegion).toBe('custom-region')
expect(config.CommunicationTopic).toBe('custom-topic')
})
it('should throw error when required env variables are missing', () => {
// Remove required environment variables
process.env.ACCOUNTS_URL = undefined
// Expect error when loading config
expect(() => {
require('../config')
}).toThrow('Missing env variables: ACCOUNTS_URL')
})
it('should throw error for invalid version value', () => {
// Set invalid version
process.env.VERSION = 'invalid-version'
// Expect error when loading config
expect(() => {
require('../config')
}).toThrow("Invalid version: invalid-version. Must be 'v1' or 'v2'.")
})
it('should throw error when v2 is set but queue config is missing', () => {
// Set v2 without required queue config
process.env.VERSION = 'v2'
process.env.QUEUE_CONFIG = ''
// Expect error when loading config
expect(() => {
require('../config')
}).toThrow('Missing env variable: QUEUE_CONFIG')
})
it('should throw error when v2 is set but communication topic is missing', () => {
// Set v2 with queue config but missing communication topic
process.env.VERSION = 'v2'
process.env.QUEUE_CONFIG = 'test-queue'
process.env.COMMUNICATION_TOPIC = ''
// Expect error when loading config
expect(() => {
require('../config')
}).toThrow('Missing env variable: COMMUNICATION_TOPIC')
})
it('should correctly parse numeric values', () => {
// Set string values for numeric fields
process.env.PORT = '9999'
process.env.INIT_LIMIT = '200'
// Load config
// eslint-disable-next-line @typescript-eslint/no-var-requires
const config = require('../config').default
// Check parsed values
expect(config.Port).toBe(9999)
expect(config.InitLimit).toBe(200)
expect(typeof config.Port).toBe('number')
expect(typeof config.InitLimit).toBe('number')
})
it('should handle v2 configuration correctly', () => {
// Set up v2 configuration
process.env.VERSION = 'v2'
process.env.QUEUE_CONFIG = 'test-queue-config'
process.env.QUEUE_REGION = 'test-region'
process.env.COMMUNICATION_TOPIC = 'test-topic-name'
// Load config
// eslint-disable-next-line @typescript-eslint/no-var-requires
const config = require('../config').default
// Check v2 specific configuration
expect(config.Version).toBe(IntegrationVersion.V2)
expect(config.QueueConfig).toBe('test-queue-config')
expect(config.QueueRegion).toBe('test-region')
expect(config.CommunicationTopic).toBe('test-topic-name')
})
})

View File

@ -28,6 +28,9 @@ interface Config extends BaseConfig {
FooterMessage: string
InitLimit: number
Version: IntegrationVersion
QueueConfig: string
QueueRegion: string
CommunicationTopic: string
}
const envMap: { [key in keyof Config]: string } = {
@ -41,7 +44,10 @@ const envMap: { [key in keyof Config]: string } = {
InitLimit: 'INIT_LIMIT',
KvsUrl: 'KVS_URL',
StorageConfig: 'STORAGE_CONFIG',
Version: 'VERSION'
Version: 'VERSION',
QueueConfig: 'QUEUE_CONFIG',
QueueRegion: 'QUEUE_REGION',
CommunicationTopic: 'COMMUNICATION_TOPIC'
}
const parseNumber = (str: string | undefined): number | undefined => (str !== undefined ? Number(str) : undefined)
@ -65,7 +71,10 @@ const config: Config = (() => {
FooterMessage: process.env[envMap.FooterMessage] ?? '<br><br><p>Sent via <a href="https://huly.io">Huly</a></p>',
KvsUrl: process.env[envMap.KvsUrl],
StorageConfig: process.env[envMap.StorageConfig],
Version: version
Version: version,
QueueConfig: process.env[envMap.QueueConfig] ?? '',
QueueRegion: process.env[envMap.QueueRegion] ?? '',
CommunicationTopic: process.env[envMap.CommunicationTopic] ?? 'hulygun'
}
const missingEnv = (Object.keys(params) as Array<keyof Config>)
@ -75,6 +84,14 @@ const config: Config = (() => {
if (missingEnv.length > 0) {
throw Error(`Missing env variables: ${missingEnv.join(', ')}`)
}
if (version === IntegrationVersion.V2) {
if (params.QueueConfig === '') {
throw Error('Missing env variable: QUEUE_CONFIG')
}
if (params.CommunicationTopic === '') {
throw Error('Missing env variable: COMMUNICATION_TOPIC')
}
}
return params as Config
})()

View File

@ -23,13 +23,15 @@ import { isWorkspaceLoginInfo } from '@hcengineering/account-client'
import { initStatisticsContext, type StorageConfiguration } from '@hcengineering/server-core'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import serverToken, { decodeToken } from '@hcengineering/server-token'
import { initQueue, closeQueue } from '@hcengineering/mail-common'
import { type IncomingHttpHeaders } from 'http'
import { join } from 'path'
import { decode64 } from './base64'
import config from './config'
import { GmailController } from './gmailController'
import { createServer, listen } from './server'
import { type Endpoint, type State } from './types'
import { IntegrationVersion, type Endpoint, type State } from './types'
const extractToken = (header: IncomingHttpHeaders): any => {
try {
@ -61,6 +63,10 @@ export const main = async (): Promise<void> => {
const storageConfig: StorageConfiguration = storageConfigFromEnv()
const storageAdapter = buildStorageFromConfig(storageConfig)
if (config.Version === IntegrationVersion.V2) {
initQueue(ctx, 'gmail-service', config)
}
const gmailController = GmailController.create(ctx, storageAdapter)
await gmailController.startAll()
const endpoints: Endpoint[] = [
@ -170,6 +176,7 @@ export const main = async (): Promise<void> => {
const asyncClose = async (): Promise<void> => {
await gmailController.close()
await storageAdapter.close()
await closeQueue()
}
const shutdown = (): void => {

View File

@ -17,7 +17,13 @@ import { gmail_v1 } from 'googleapis'
import sanitizeHtml from 'sanitize-html'
import { TxOperations, type MeasureContext } from '@hcengineering/core'
import { createMessages, parseEmailHeader, parseNameFromEmailHeader, EmailMessage } from '@hcengineering/mail-common'
import {
createMessages,
parseEmailHeader,
parseNameFromEmailHeader,
EmailMessage,
getProducer
} from '@hcengineering/mail-common'
import { type KeyValueClient } from '@hcengineering/kvs-client'
import { AccountClient, isWorkspaceLoginInfo, WorkspaceLoginInfo } from '@hcengineering/account-client'
@ -55,6 +61,7 @@ export class MessageManagerV2 implements IMessageManager {
this.ctx,
this.txClient,
this.keyValueClient,
await getProducer(config.CommunicationTopic),
this.token,
this.wsInfo,
res,

View File

@ -50,12 +50,17 @@
"@hcengineering/card": "^0.6.0",
"@hcengineering/chat": "^0.6.0",
"@hcengineering/communication-rest-client": "0.1.187",
"@hcengineering/communication-sdk-types": "0.1.187",
"@hcengineering/communication-shared": "0.1.187",
"@hcengineering/communication-types": "0.1.187",
"@hcengineering/contact": "^0.6.24",
"@hcengineering/core": "^0.6.32",
"@hcengineering/kvs-client": "^0.6.0",
"@hcengineering/kafka": "^0.6.0",
"@hcengineering/mail": "^0.6.0",
"@hcengineering/server-core": "^0.6.1",
"@hcengineering/server-storage": "^0.6.0",
"kafkajs": "^2.2.4",
"sanitize-html": "^2.15.0",
"turndown": "^7.2.0",
"uuid": "^8.3.2"

View File

@ -0,0 +1,271 @@
import { Kafka, Producer } from 'kafkajs'
import { parseQueueConfig } from '@hcengineering/kafka'
import { MeasureContext } from '@hcengineering/core'
import { initQueue, closeQueue, getProducer, KafkaQueueRegistry } from '../queue'
import { BaseConfig } from '../types'
// Mock dependencies
jest.mock('kafkajs')
jest.mock('@hcengineering/kafka')
/* eslint-disable @typescript-eslint/unbound-method */
describe('Kafka Queue Management', () => {
// Mock objects
let mockCtx: MeasureContext
let mockConfig: BaseConfig
let mockKafka: Kafka
let mockProducer: Producer
let disconnectMock: jest.Mock
// Setup function to create clean mocks for each test
beforeEach(() => {
// Reset modules to clear any singleton state
jest.resetModules()
// Create mock for producer
disconnectMock = jest.fn().mockResolvedValue(undefined)
mockProducer = {
connect: jest.fn().mockResolvedValue(undefined),
disconnect: disconnectMock,
send: jest.fn().mockResolvedValue(undefined),
on: jest.fn(),
events: {},
transaction: jest.fn(),
logger: jest.fn()
} as unknown as Producer
// Create mock for Kafka
mockKafka = {
producer: jest.fn().mockReturnValue(mockProducer),
consumer: jest.fn(),
admin: jest.fn()
} as unknown as Kafka
// Mock constructor for Kafka
;(Kafka as jest.Mock).mockImplementation(() => mockKafka)
// Create context mock
mockCtx = {
info: jest.fn(),
error: jest.fn(),
warn: jest.fn(),
measure: jest.fn()
} as unknown as MeasureContext
// Create config mock
mockConfig = {
QueueConfig: 'kafka:broker:9092',
QueueRegion: 'test-region',
CommunicationTopic: 'test-topic'
} as any
// Mock parseQueueConfig
;(parseQueueConfig as jest.Mock).mockReturnValue({
clientId: 'test-service',
brokers: ['broker:9092'],
ssl: false
})
})
afterEach(async () => {
// Clean up after each test
await closeQueue()
})
describe('initQueue', () => {
it('should initialize the queue registry with correct parameters', () => {
// Act
initQueue(mockCtx, 'test-service', mockConfig)
// Assert
expect(parseQueueConfig).toHaveBeenCalledWith(mockConfig.QueueConfig, 'test-service', mockConfig.QueueRegion)
expect(Kafka).toHaveBeenCalledWith({
clientId: 'test-service',
brokers: ['broker:9092'],
ssl: false
})
expect(mockCtx.info).toHaveBeenCalledWith('Kafka queue initialized', { serviceId: 'test-service' })
})
it('should throw an error when initialized twice', () => {
// Arrange
initQueue(mockCtx, 'test-service', mockConfig)
// Act & Assert
expect(() => {
initQueue(mockCtx, 'test-service', mockConfig)
}).toThrow('Queue already initialized')
})
it('should throw an error when queue config is missing', () => {
// Arrange
const configWithoutQueue: BaseConfig = { ...mockConfig, QueueConfig: undefined } as any
// Act & Assert
expect(() => {
initQueue(mockCtx, 'test-service', configWithoutQueue)
}).toThrow('Please provide queue config')
})
})
describe('getProducer', () => {
it('should throw an error when queue is not initialized', async () => {
// Act & Assert
await expect(getProducer('test-topic')).rejects.toThrow('Queue not initialized')
})
it('should create a new producer when one does not exist for the topic', async () => {
// Arrange
initQueue(mockCtx, 'test-service', mockConfig)
// Act
const producer = await getProducer('test-topic')
// Assert
expect(producer).toBe(mockProducer)
expect(mockKafka.producer).toHaveBeenCalled()
expect(mockProducer.connect).toHaveBeenCalled()
expect(mockCtx.info).toHaveBeenCalledWith('Created new Kafka producer', { key: 'test-topic' })
})
it('should reuse an existing producer for the same topic', async () => {
// Arrange
initQueue(mockCtx, 'test-service', mockConfig)
// Act
const producer1 = await getProducer('test-topic')
const producer2 = await getProducer('test-topic')
// Assert
expect(producer1).toBe(producer2)
expect(mockKafka.producer).toHaveBeenCalledTimes(1) // Called only once
})
it('should create different producers for different topics', async () => {
// Arrange
initQueue(mockCtx, 'test-service', mockConfig)
// Setup a second producer
const mockProducer2 = { ...mockProducer }
mockKafka.producer = jest.fn().mockReturnValueOnce(mockProducer).mockReturnValueOnce(mockProducer2)
// Act
const producer1 = await getProducer('topic1')
const producer2 = await getProducer('topic2')
// Assert
expect(mockKafka.producer).toHaveBeenCalledTimes(2)
expect(producer1).not.toBe(producer2)
})
})
describe('closeQueue', () => {
it('should do nothing if the queue is not initialized', async () => {
// Act
await closeQueue()
// Assert
expect(mockProducer.disconnect).not.toHaveBeenCalled()
})
it('should close all producers', async () => {
// Arrange
initQueue(mockCtx, 'test-service', mockConfig)
// Get a couple of producers
await getProducer('topic1')
await getProducer('topic2')
// Act
await closeQueue()
// Assert
expect(mockProducer.disconnect).toHaveBeenCalledTimes(2)
expect(mockCtx.info).toHaveBeenCalledWith('KafkaQueueRegistry closed')
})
it('should handle errors when closing producers', async () => {
// Arrange
initQueue(mockCtx, 'test-service', mockConfig)
// Get a producer
await getProducer('test-topic')
// Make disconnect throw an error
const error = new Error('Disconnect failed')
disconnectMock.mockRejectedValueOnce(error)
// Act
await closeQueue()
// Assert
expect(mockCtx.error).toHaveBeenCalledWith('Failed to close Kafka producer', {
topic: 'test-topic',
error
})
// It should still clear the producers
expect(mockCtx.info).toHaveBeenCalledWith('KafkaQueueRegistry closed')
})
it('should reset the registry after closing', async () => {
// Arrange
initQueue(mockCtx, 'test-service', mockConfig)
// Act
await closeQueue()
// The registry should be reset, so initializing again should work
initQueue(mockCtx, 'test-service', mockConfig)
// Assert - expect no errors and two initialization logs
expect(mockCtx.info).toHaveBeenCalledWith('Kafka queue initialized', { serviceId: 'test-service' })
})
})
describe('KafkaQueueRegistry', () => {
it('should create a Kafka instance with correct configuration', () => {
// Act
// eslint-disable-next-line no-new
new KafkaQueueRegistry(mockCtx, 'test-service', mockConfig)
// Assert
expect(parseQueueConfig).toHaveBeenCalledWith(mockConfig.QueueConfig, 'test-service', mockConfig.QueueRegion)
expect(Kafka).toHaveBeenCalledWith({
clientId: 'test-service',
brokers: ['broker:9092'],
ssl: false
})
})
it('should log when a producer is created', async () => {
// Arrange
const registry = new KafkaQueueRegistry(mockCtx, 'test-service', mockConfig)
// Act
await registry.getProducer('test-topic')
// Assert
expect(mockCtx.info).toHaveBeenCalledWith('Created new Kafka producer', { key: 'test-topic' })
})
it('should close all producers during shutdown', async () => {
// Arrange
const registry = new KafkaQueueRegistry(mockCtx, 'test-service', mockConfig)
// Create some producers
await registry.getProducer('topic1')
await registry.getProducer('topic2')
// Act
await registry.close()
// Assert
expect(mockProducer.disconnect).toHaveBeenCalledTimes(2)
expect(mockCtx.info).toHaveBeenCalledWith('KafkaQueueRegistry closed')
})
})
})

View File

@ -17,3 +17,4 @@ export * from './message'
export * from './types'
export * from './utils'
export * from './mutex'
export * from './queue'

View File

@ -12,12 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { Producer } from 'kafkajs'
import { WorkspaceLoginInfo } from '@hcengineering/account-client'
import { type Card } from '@hcengineering/card'
import {
type RestClient as CommunicationClient,
createRestClient as getCommunicationClient
} from '@hcengineering/communication-rest-client'
import { MessageType } from '@hcengineering/communication-types'
import chat from '@hcengineering/chat'
import { PersonSpace } from '@hcengineering/contact'
@ -27,14 +25,18 @@ import {
type PersonId,
type Ref,
type TxOperations,
Doc,
generateId,
PersonUuid,
RateLimiter
RateLimiter,
Space
} from '@hcengineering/core'
import mail from '@hcengineering/mail'
import { type KeyValueClient } from '@hcengineering/kvs-client'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import { MessageRequestEventType } from '@hcengineering/communication-sdk-types'
import { generateMessageId } from '@hcengineering/communication-shared'
import { BaseConfig, type Attachment } from './types'
import { EmailMessage } from './types'
@ -49,6 +51,7 @@ export async function createMessages (
ctx: MeasureContext,
txClient: TxOperations,
keyValueClient: KeyValueClient,
producer: Producer,
token: string,
wsInfo: WorkspaceLoginInfo,
message: EmailMessage,
@ -62,7 +65,6 @@ export async function createMessages (
const personSpacesCache = PersonSpacesCacheFactory.getInstance(ctx, txClient, wsInfo.workspace)
const channelCache = ChannelCacheFactory.getInstance(ctx, txClient, wsInfo.workspace)
const threadLookup = ThreadLookupService.getInstance(ctx, keyValueClient, token)
const msgClient = getCommunicationClient(wsInfo.endpoint, wsInfo.workspace, wsInfo.token)
const fromPerson = await personCache.ensurePerson(from)
@ -116,10 +118,12 @@ export async function createMessages (
const spaces = await personSpacesCache.getPersonSpaces(mailId, fromPerson.uuid, from.email)
if (spaces.length > 0) {
await saveMessageToSpaces(
config,
ctx,
txClient,
msgClient,
producer,
threadLookup,
wsInfo,
mailId,
spaces,
participants,
@ -148,10 +152,12 @@ export async function createMessages (
const spaces = await personSpacesCache.getPersonSpaces(mailId, to.uuid, to.address)
if (spaces.length > 0) {
await saveMessageToSpaces(
config,
ctx,
txClient,
msgClient,
producer,
threadLookup,
wsInfo,
mailId,
spaces,
participants,
@ -173,10 +179,12 @@ export async function createMessages (
}
async function saveMessageToSpaces (
config: BaseConfig,
ctx: MeasureContext,
client: TxOperations,
msgClient: CommunicationClient,
producer: Producer,
threadLookup: ThreadLookupService,
wsInfo: WorkspaceLoginInfo,
mailId: string,
spaces: PersonSpace[],
participants: PersonId[],
@ -208,8 +216,9 @@ async function saveMessageToSpaces (
ctx.info('Found existing thread', { mailId, threadId, spaceId })
}
}
let channel: Ref<Doc<Space>> | undefined
if (threadId === undefined) {
const channel = await channelCache.getOrCreateChannel(spaceId, participants, me, owner)
channel = await channelCache.getOrCreateChannel(spaceId, participants, me, owner)
const newThreadId = await client.createDoc(
chat.masterTag.Thread,
space._id,
@ -241,30 +250,62 @@ async function saveMessageToSpaces (
ctx.info('Created new thread', { mailId, threadId, spaceId })
}
const { id: messageId, created: messageCreated } = await msgClient.createMessage(
threadId,
chat.masterTag.Thread,
content,
modifiedBy,
MessageType.Message,
{
created: createdDate
}
)
ctx.info('Created message', { mailId, messageId, threadId })
const messageId = generateMessageId()
const created = new Date(createdDate)
for (const a of attachments) {
await msgClient.createFile(
threadId,
messageId,
messageCreated,
a.id as Ref<Blob>,
a.contentType,
a.name,
a.data.length,
modifiedBy
const messageData = Buffer.from(
JSON.stringify({
type: MessageRequestEventType.CreateMessage,
messageType: MessageType.Message,
card: threadId,
cardType: chat.masterTag.Thread,
content,
creator: modifiedBy,
created,
id: messageId
})
)
await producer.send({
topic: config.CommunicationTopic,
messages: [
{
key: Buffer.from(channel ?? spaceId),
value: messageData,
headers: {
WorkspaceUuid: wsInfo.workspace
}
}
]
})
ctx.info('Send message event', { mailId, messageId, threadId })
const fileData: Buffer[] = attachments.map((a) =>
Buffer.from(
JSON.stringify({
type: MessageRequestEventType.CreateFile,
card: threadId,
message: messageId,
messageCreated: created,
blobId: a.id as Ref<Blob>,
fileType: a.contentType,
filename: a.name,
size: a.data.length,
creator: modifiedBy
})
)
}
)
const fileEvents = fileData.map((data) => ({
key: Buffer.from(channel ?? spaceId),
value: data,
headers: {
WorkspaceUuid: wsInfo.workspace
}
}))
await producer.send({
topic: config.CommunicationTopic,
messages: fileEvents
})
ctx.info('Send file events', { mailId, messageId, threadId, count: fileEvents.length })
await threadLookup.setThreadId(mailId, space._id, threadId)
})

View File

@ -0,0 +1,86 @@
import { Kafka, Producer } from 'kafkajs'
import { parseQueueConfig } from '@hcengineering/kafka'
import { BaseConfig } from './types'
import { MeasureContext } from '@hcengineering/core'
let queueRegistry: KafkaQueueRegistry | undefined
export function initQueue (ctx: MeasureContext, serviceId: string, config: BaseConfig): void {
if (queueRegistry !== undefined) {
throw new Error('Queue already initialized')
}
queueRegistry = new KafkaQueueRegistry(ctx, serviceId, config)
ctx.info('Kafka queue initialized', { serviceId })
}
export async function closeQueue (): Promise<void> {
if (queueRegistry !== undefined) {
await queueRegistry.close()
queueRegistry = undefined
}
}
export async function getProducer (key: string): Promise<Producer> {
if (queueRegistry === undefined) {
throw new Error('Queue not initialized')
}
return await queueRegistry.getProducer(key)
}
export class KafkaQueueRegistry {
private readonly kafka: Kafka
private readonly producers = new Map<string, Producer>()
constructor (
private readonly ctx: MeasureContext,
serviceId: string,
serviceConfig: BaseConfig
) {
this.kafka = getKafkaQueue(serviceId, serviceConfig)
ctx.info('Kafka client created', { serviceId })
}
public async getProducer (key: string): Promise<Producer> {
const producer = this.producers.get(key)
if (producer !== undefined) {
return producer
}
const kafkaProducer = this.kafka.producer()
await kafkaProducer.connect()
this.producers.set(key, kafkaProducer)
this.ctx.info('Created new Kafka producer', { key })
return kafkaProducer
}
public async close (): Promise<void> {
for (const [topic, producer] of this.producers.entries()) {
try {
await producer.disconnect()
this.ctx.info('Kafka producer closed', { topic })
} catch (err) {
this.ctx.error('Failed to close Kafka producer', { topic, error: err })
}
}
this.producers.clear()
this.ctx.info('KafkaQueueRegistry closed')
}
}
function getKafkaQueue (serviceId: string, serviceConfig: BaseConfig): Kafka {
const { QueueConfig, QueueRegion } = serviceConfig
if (QueueConfig === undefined) {
throw new Error('Please provide queue config')
}
const config = parseQueueConfig(QueueConfig, serviceId, QueueRegion)
return new Kafka({
clientId: config.clientId,
brokers: config.brokers,
ssl: config.ssl
})
}

View File

@ -45,4 +45,7 @@ export interface BaseConfig {
AccountsURL: string
KvsUrl: string
StorageConfig: string
QueueConfig: string
QueueRegion: string
CommunicationTopic: string
}

View File

@ -62,6 +62,7 @@
"@hcengineering/api-client": "^0.6.0",
"@hcengineering/card": "^0.6.0",
"@hcengineering/communication-rest-client": "0.1.187",
"@hcengineering/communication-sdk-types": "0.1.187",
"@hcengineering/communication-types": "0.1.187",
"@hcengineering/contact": "^0.6.24",
"@hcengineering/core": "^0.6.32",

View File

@ -22,7 +22,8 @@ import { createRestTxOperations } from '@hcengineering/api-client'
// Mock dependencies
jest.mock('@hcengineering/mail-common', () => ({
createMessages: jest.fn()
createMessages: jest.fn(),
getProducer: jest.fn().mockReturnValue({})
}))
jest.mock('../client', () => ({
@ -177,6 +178,7 @@ describe('handleMtaHook', () => {
mockCtx,
mockTxOperations, // This should be the TxOperations mock, not kvsClient
{}, // This should be the KeyValueClient mock
{},
client.mailServiceToken,
mockLoginInfo, // Added workspace login info
expect.objectContaining({
@ -221,6 +223,7 @@ describe('handleMtaHook', () => {
mockCtx,
mockTxOperations,
{},
{},
client.mailServiceToken,
mockLoginInfo,
expect.objectContaining({
@ -246,6 +249,7 @@ describe('handleMtaHook', () => {
mockCtx,
mockTxOperations,
{},
{},
client.mailServiceToken,
mockLoginInfo,
expect.objectContaining({
@ -275,6 +279,7 @@ describe('handleMtaHook', () => {
mockCtx,
mockTxOperations,
{},
{},
client.mailServiceToken,
mockLoginInfo,
expect.objectContaining({
@ -299,6 +304,7 @@ describe('handleMtaHook', () => {
mockCtx,
mockTxOperations,
{},
{},
client.mailServiceToken,
mockLoginInfo,
expect.objectContaining({
@ -327,6 +333,7 @@ describe('handleMtaHook', () => {
mockCtx,
mockTxOperations,
{},
{},
client.mailServiceToken,
mockLoginInfo,
expect.objectContaining({

View File

@ -25,6 +25,9 @@ export const mailServiceToken = generateToken(systemAccountUuid, undefined, { se
export const baseConfig: BaseConfig = {
AccountsURL: config.accountsUrl,
KvsUrl: config.kvsUrl,
StorageConfig: config.storageConfig ?? ''
StorageConfig: config.storageConfig ?? '',
QueueConfig: config.queueConfig ?? '',
QueueRegion: config.queueRegion ?? '',
CommunicationTopic: config.communicationTopic
}
export const kvsClient = getClient('inbound-mail', baseConfig.KvsUrl, mailServiceToken)

View File

@ -26,6 +26,9 @@ interface Config {
mailSizeLimit?: string
storageConfig?: string
kvsUrl: string
queueConfig: string
queueRegion: string
communicationTopic: string
}
const config: Config = {
@ -47,7 +50,15 @@ const config: Config = {
return process.env.KVS_URL
}
throw Error('KVS_URL env var is not set')
})()
})(),
queueConfig: (() => {
if (process.env.QUEUE_CONFIG !== undefined) {
return process.env.QUEUE_CONFIG
}
throw Error('QUEUE_CONFIG env var is not set')
})(),
queueRegion: process.env.QUEUE_REGION ?? '',
communicationTopic: process.env.COMMUNICATION_TOPIC ?? 'hulygun'
}
export default config

View File

@ -18,12 +18,17 @@ import { Request, Response } from 'express'
import TurndownService from 'turndown'
import sanitizeHtml from 'sanitize-html'
import { MeasureContext } from '@hcengineering/core'
import { type Attachment, type EmailContact, type EmailMessage, createMessages } from '@hcengineering/mail-common'
import {
type Attachment,
type EmailContact,
type EmailMessage,
createMessages,
getProducer
} from '@hcengineering/mail-common'
import { getClient as getAccountClient } from '@hcengineering/account-client'
import { createRestTxOperations } from '@hcengineering/api-client'
import { mailServiceToken, baseConfig, kvsClient } from './client'
import config from './config'
export interface MtaMessage {
@ -118,7 +123,17 @@ export async function handleMtaHook (req: Request, res: Response, ctx: MeasureCo
const transactorUrl = wsInfo.endpoint.replace('ws://', 'http://').replace('wss://', 'https://')
const txClient = await createRestTxOperations(transactorUrl, wsInfo.workspace, wsInfo.token)
await createMessages(baseConfig, ctx, txClient, kvsClient, mailServiceToken, wsInfo, convertedMessage, attachments)
await createMessages(
baseConfig,
ctx,
txClient,
kvsClient,
await getProducer(baseConfig.CommunicationTopic),
mailServiceToken,
wsInfo,
convertedMessage,
attachments
)
} catch (error) {
ctx.error('mta-hook', { error })
} finally {

View File

@ -21,8 +21,11 @@ import { MeasureContext, MeasureMetricsContext, newMetrics } from '@hcengineerin
import { setMetadata } from '@hcengineering/platform'
import { initStatisticsContext } from '@hcengineering/server-core'
import serverToken from '@hcengineering/server-token'
import { initQueue, closeQueue } from '@hcengineering/mail-common'
import { handleMtaHook } from './handlerMta'
import config from './config'
import { baseConfig } from './client'
type RequestHandler = (req: Request, res: Response, ctx: MeasureContext, next?: NextFunction) => Promise<void>
@ -43,6 +46,8 @@ async function main (): Promise<void> {
setMetadata(serverToken.metadata.Secret, config.secret)
initQueue(ctx, 'inbound-mail', baseConfig)
const app = express()
app.use(cors())
@ -91,7 +96,7 @@ async function main (): Promise<void> {
const shutdown = (): void => {
server.close(() => {
process.exit()
void closeQueue().then(() => process.exit())
})
}