Merge branch 'develop' into staging-new

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2025-06-04 00:07:57 +07:00
commit 764f3c8567
No known key found for this signature in database
GPG Key ID: BD80F68D68D8F7F2
30 changed files with 1005 additions and 249 deletions

View File

@ -1677,7 +1677,7 @@ export function devTool (
name = systemAccountUuid
}
const wsByUrl = await db.workspace.findOne({ url: workspace })
const account = await db.socialId.findOne({ key: name })
const account = await db.socialId.findOne({ value: name })
console.log(
generateToken(account?.personUuid ?? (name as AccountUuid), wsByUrl?.uuid ?? (workspace as WorkspaceUuid), {
...(opt.admin ? { admin: 'true' } : {})

View File

@ -14,7 +14,7 @@
//
import { type Card, cardId, DOMAIN_CARD } from '@hcengineering/card'
import core, { TxOperations, type Client, type Data, type Doc } from '@hcengineering/core'
import core, { type Ref, TxOperations, type Client, type Data, type Doc } from '@hcengineering/core'
import {
tryMigrate,
tryUpgrade,
@ -67,11 +67,69 @@ export const cardOperation: MigrateOperation = {
{
state: 'default-labels',
func: defaultLabels
},
{
state: 'fill-parent-info',
mode: 'upgrade',
func: fillParentInfo
}
])
}
}
async function fillParentInfo (client: Client): Promise<void> {
const txOp = new TxOperations(client, core.account.System)
const cards = await client.findAll(card.class.Card, { parentInfo: { $exists: false }, parent: { $ne: null } })
const cache = new Map<Ref<Card>, Card>()
for (const val of cards) {
if (val.parent == null) continue
const parent = await getCardParentWithParentInfo(txOp, val.parent, cache)
if (parent !== undefined) {
const parentInfo = [
...(parent.parentInfo ?? []),
{
_id: parent._id,
_class: parent._class,
title: parent.title
}
]
await txOp.update(val, { parentInfo })
val.parentInfo = parentInfo
cache.set(val._id, val)
}
}
}
async function getCardParentWithParentInfo (
txOp: TxOperations,
_id: Ref<Card>,
cache: Map<Ref<Card>, Card>
): Promise<Card | undefined> {
const doc = cache.get(_id) ?? (await txOp.findOne(card.class.Card, { _id }))
if (doc === undefined) return
if (doc.parentInfo === undefined) {
if (doc.parent == null) {
doc.parentInfo = []
} else {
const parent = await getCardParentWithParentInfo(txOp, doc.parent, cache)
if (parent !== undefined) {
doc.parentInfo = [
...(parent.parentInfo ?? []),
{
_id: parent._id,
_class: parent._class,
title: parent.title
}
]
} else {
doc.parentInfo = []
}
}
}
cache.set(doc._id, doc)
return doc
}
async function removeVariantViewlets (client: Client): Promise<void> {
const txOp = new TxOperations(client, core.account.System)
const desc = client

View File

@ -12,6 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
import cardPlugin, { type Card } from '@hcengineering/card'
import chat from '@hcengineering/chat'
import { type LinkPreviewData, type Message, MessageType } from '@hcengineering/communication-types'
import { getEmployeeBySocialId } from '@hcengineering/contact'
import { employeeByPersonIdStore } from '@hcengineering/contact-resources'
import {
fillDefaults,
generateId,
@ -21,28 +26,23 @@ import {
type Ref,
SortingOrder
} from '@hcengineering/core'
import { jsonToMarkup, markupToJSON, markupToText } from '@hcengineering/text'
import { showPopup } from '@hcengineering/ui'
import { markdownToMarkup, markupToMarkdown } from '@hcengineering/text-markdown'
import { type Message, MessageType, type LinkPreviewData } from '@hcengineering/communication-types'
import emojiPlugin from '@hcengineering/emoji'
import {
canDisplayLinkPreview,
fetchLinkPreviewDetails,
getClient,
getCommunicationClient
} from '@hcengineering/presentation'
import { employeeByPersonIdStore } from '@hcengineering/contact-resources'
import cardPlugin, { type Card } from '@hcengineering/card'
import { openDoc } from '@hcengineering/view-resources'
import { getEmployeeBySocialId } from '@hcengineering/contact'
import { get } from 'svelte/store'
import chat from '@hcengineering/chat'
import { makeRank } from '@hcengineering/rank'
import emojiPlugin from '@hcengineering/emoji'
import { jsonToMarkup, markupToJSON, markupToText } from '@hcengineering/text'
import { markdownToMarkup, markupToMarkdown } from '@hcengineering/text-markdown'
import { showPopup } from '@hcengineering/ui'
import { openDoc } from '@hcengineering/view-resources'
import { get } from 'svelte/store'
import IconAt from './components/icons/IconAt.svelte'
import { type TextInputAction } from './types'
import uiNext from './plugin'
import { type TextInputAction } from './types'
export const defaultMessageInputActions: TextInputAction[] = [
{
@ -115,13 +115,22 @@ export async function replyToThread (message: Message, parentCard: Card): Promis
get(employeeByPersonIdStore).get(message.creator) ?? (await getEmployeeBySocialId(client, message.creator))
const lastOne = await client.findOne(cardPlugin.class.Card, {}, { sort: { rank: SortingOrder.Descending } })
const title = createThreadTitle(message, parentCard)
const data = fillDefaults(
const data = fillDefaults<Card>(
hierarchy,
{
title,
rank: makeRank(lastOne?.rank, undefined),
content: '' as MarkupBlobRef,
parent: parentCard._id
parent: parentCard._id,
blobs: {},
parentInfo: [
...(parentCard.parentInfo ?? []),
{
_id: parentCard._id,
_class: parentCard._class,
title: parentCard.title
}
]
},
chat.masterTag.Thread
)

View File

@ -80,13 +80,13 @@ async function sendJson (
extraHeaders?: OutgoingHttpHeaders
): Promise<void> {
// Calculate ETag
let body: any = JSON.stringify(result, rpcJSONReplacer)
let body: Buffer = Buffer.from(JSON.stringify(result, rpcJSONReplacer), 'utf8')
const etag = createHash('sha256').update(body).digest('hex')
const headers: OutgoingHttpHeaders = {
...(extraHeaders ?? {}),
...keepAliveOptions,
'Content-Type': 'application/json',
'Content-Type': 'application/json; charset=utf-8',
'Cache-Control': 'no-cache',
ETag: etag
}

View File

@ -403,6 +403,21 @@ async function OnCardCreate (ctx: TxCreateDoc<Card>[], control: TriggerControl):
}
})
)
if ((doc.parentInfo?.length ?? 0) === 0) {
const parentInfo = [
...(parent.parentInfo ?? []),
{
_id: parent._id,
_class: parent._class,
title: parent.title
}
]
res.push(
control.txFactory.createTxUpdateDoc(doc._class, doc.space, doc._id, {
parentInfo
})
)
}
}
}

View File

@ -662,8 +662,7 @@ describe('PostgresAccountDB', () => {
AND (s.last_processing_time IS NULL OR s.last_processing_time < $1)
AND (w.region IS NULL OR w.region = '')
ORDER BY s.last_visit DESC
LIMIT 1
FOR UPDATE SKIP LOCKED`.replace(/\s+/g, ' ')
LIMIT 1`.replace(/\s+/g, ' ')
)
expect(mockClient.unsafe.mock.calls[0][1]).toEqual([NOW - processingTimeoutMs])
})
@ -721,8 +720,7 @@ describe('PostgresAccountDB', () => {
AND (s.last_processing_time IS NULL OR s.last_processing_time < $5)
AND (w.region IS NULL OR w.region = '')
ORDER BY s.last_visit DESC
LIMIT 1
FOR UPDATE SKIP LOCKED`
LIMIT 1`
.replace(/\s+/g, ' ')
.replace(/\(\s/g, '(')
.replace(/\s\)/g, ')')
@ -793,8 +791,7 @@ describe('PostgresAccountDB', () => {
AND (s.last_processing_time IS NULL OR s.last_processing_time < $5)
AND (w.region IS NULL OR w.region = '')
ORDER BY s.last_visit DESC
LIMIT 1
FOR UPDATE SKIP LOCKED`
LIMIT 1`
.replace(/\s+/g, ' ')
.replace(/\(\s/g, '(')
.replace(/\s\)/g, ')')
@ -883,8 +880,7 @@ describe('PostgresAccountDB', () => {
AND (s.last_processing_time IS NULL OR s.last_processing_time < $5)
AND (w.region IS NULL OR w.region = '')
ORDER BY s.last_visit DESC
LIMIT 1
FOR UPDATE SKIP LOCKED`
LIMIT 1`
.replace(/\s+/g, ' ')
.replace(/\(\s/g, '(')
.replace(/\s\)/g, ')')
@ -936,8 +932,7 @@ describe('PostgresAccountDB', () => {
AND (s.last_processing_time IS NULL OR s.last_processing_time < $1)
AND region = $2
ORDER BY s.last_visit DESC
LIMIT 1
FOR UPDATE SKIP LOCKED`
LIMIT 1`
.replace(/\s+/g, ' ')
.replace(/\(\s/g, '(')
.replace(/\s\)/g, ')')

View File

@ -565,13 +565,17 @@ describe('account utils', () => {
test('should handle PlatformError', async () => {
const errorStatus = new Status(Severity.ERROR, 'test-error' as any, {})
const mockMethod = jest.fn().mockRejectedValue(new PlatformError(errorStatus))
Object.defineProperty(mockMethod, 'name', { value: 'mockAccMethod' })
const wrappedMethod = wrap(mockMethod)
const request = { id: 'req1', params: [] }
const result = await wrappedMethod(mockCtx, mockDb, mockBranding, request, 'token')
expect(result).toEqual({ error: errorStatus })
expect(mockCtx.error).toHaveBeenCalledWith('error', { status: errorStatus })
expect(mockCtx.error).toHaveBeenCalledWith('Error while processing account method', {
status: errorStatus,
method: 'mockAccMethod'
})
})
test('should handle TokenError', async () => {
@ -589,15 +593,17 @@ describe('account utils', () => {
test('should handle internal server error', async () => {
const error = new Error('unexpected error')
const mockMethod = jest.fn().mockRejectedValue(error)
Object.defineProperty(mockMethod, 'name', { value: 'mockAccMethod' })
const wrappedMethod = wrap(mockMethod)
const request = { id: 'req1', params: [] }
const result = await wrappedMethod(mockCtx, mockDb, mockBranding, request, 'token')
expect(result.error.code).toBe(platform.status.InternalServerError)
expect(mockCtx.error).toHaveBeenCalledWith('error', {
expect(mockCtx.error).toHaveBeenCalledWith('Error while processing account method', {
status: expect.any(Status),
err: error
origErr: error,
method: 'mockAccMethod'
})
})

View File

@ -131,6 +131,10 @@ implements DbCollection<T> {
}
}
async exists (query: Query<T>): Promise<boolean> {
return (await this.findOne(query)) !== null
}
async find (query: Query<T>, sort?: Sort<T>, limit?: number): Promise<T[]> {
return await this.findCursor(getFilteredQuery(query), sort, limit).toArray()
}
@ -315,6 +319,10 @@ export class WorkspaceStatusMongoDbCollection implements DbCollection<WorkspaceS
return res
}
async exists (query: Query<WorkspaceStatus>): Promise<boolean> {
return await this.wsCollection.exists(this.toWsQuery(query))
}
async find (query: Query<WorkspaceStatus>, sort?: Sort<WorkspaceStatus>, limit?: number): Promise<WorkspaceStatus[]> {
return (await this.wsCollection.find(this.toWsQuery(query), this.toWsSort(sort), limit)).map((ws) => ({
...ws.status,

View File

@ -28,7 +28,8 @@ export function getMigrations (ns: string): [string, string][] {
getV8Migration(ns),
getV9Migration(ns),
getV10Migration1(ns),
getV10Migration2(ns)
getV10Migration2(ns),
getV11Migration(ns)
]
}
@ -362,3 +363,18 @@ function getV10Migration2 (ns: string): [string, string] {
`
]
}
function getV11Migration (ns: string): [string, string] {
return [
'account_db_v10_add_migrated_to_person',
`
CREATE TABLE IF NOT EXISTS ${ns}._pending_workspace_lock (
id INT8 DEFAULT 1 PRIMARY KEY,
CONSTRAINT single_row CHECK (id = 1)
);
INSERT INTO ${ns}._pending_workspace_lock (id) VALUES (1)
ON CONFLICT (id) DO NOTHING;
`
]
}

View File

@ -258,6 +258,15 @@ implements DbCollection<T> {
}
}
async exists (query: Query<T>, client?: Sql): Promise<boolean> {
const [whereClause, whereValues] = this.buildWhereClause(query)
const sql = `SELECT EXISTS (SELECT 1 FROM ${this.getTableName()} ${whereClause})`
const result = await this.unsafe(sql, whereValues, client)
return result[0]?.exists === true
}
async find (query: Query<T>, sort?: Sort<T>, limit?: number, client?: Sql): Promise<T[]> {
const sqlChunks: string[] = [this.buildSelectClause()]
const [whereClause, whereValues] = this.buildWhereClause(query)
@ -483,6 +492,7 @@ export class PostgresAccountDB implements AccountDB {
}
readonly wsMembersName = 'workspace_members'
readonly pendingWorkspaceLockName = '_pending_workspace_lock'
person: PostgresDbCollection<Person, 'uuid'>
account: AccountPostgresDbCollection
@ -546,6 +556,10 @@ export class PostgresAccountDB implements AccountDB {
return `${this.ns}.${this.wsMembersName}`
}
getPendingWorkspaceLockTableName (): string {
return `${this.ns}.${this.pendingWorkspaceLockName}`
}
async init (): Promise<void> {
await this._init()
@ -943,10 +957,9 @@ export class PostgresAccountDB implements AccountDB {
sqlChunks.push(`WHERE ${whereChunks.join(' AND ')}`)
sqlChunks.push('ORDER BY s.last_visit DESC')
sqlChunks.push('LIMIT 1')
// Note: SKIP LOCKED is supported starting from Postgres 9.5 and CockroachDB v22.2.1
sqlChunks.push('FOR UPDATE SKIP LOCKED')
return await this.withRetry(async (rTx) => {
await rTx`SELECT 1 FROM ${this.client(this.getPendingWorkspaceLockTableName())} WHERE id = 1 FOR UPDATE;`
// We must have all the conditions in the DB query and we cannot filter anything in the code
// because of possible concurrency between account services.
const res: any = await rTx.unsafe(sqlChunks.join(' '), values)

View File

@ -57,7 +57,6 @@ import {
getRolePower,
getSocialIdByKey,
getWorkspaceById,
getWorkspaceInfoWithStatusById,
getWorkspacesInfoWithStatusByIds,
verifyAllowedServices,
wrap,
@ -289,8 +288,8 @@ export async function updateWorkspaceInfo (
throw new PlatformError(new Status(Severity.ERROR, platform.status.Forbidden, {}))
}
const workspace = await getWorkspaceInfoWithStatusById(db, workspaceUuid)
if (workspace === null) {
const wsExists = await db.workspace.exists({ uuid: workspaceUuid })
if (!wsExists) {
throw new PlatformError(new Status(Severity.ERROR, platform.status.WorkspaceNotFound, { workspaceUuid }))
}
progress = Math.round(progress)
@ -298,17 +297,24 @@ export async function updateWorkspaceInfo (
const ts = Date.now()
const update: Partial<WorkspaceStatus> = {}
const wsUpdate: Partial<Workspace> = {}
const query: Query<WorkspaceStatus> = { workspaceUuid: workspace.uuid }
const query: Query<WorkspaceStatus> = { workspaceUuid }
// Only read status for certain events because it is not needed for others
// and it interferes with status updates when concurrency is high
let wsStatus: WorkspaceStatus | null = null
if (['create-started', 'upgrade-started', 'migrate-clean-done'].includes(event)) {
wsStatus = await db.workspaceStatus.findOne({ workspaceUuid })
}
switch (event) {
case 'create-started':
update.mode = 'creating'
if (workspace.status.mode !== 'creating') {
if (wsStatus != null && wsStatus.mode !== 'creating') {
update.processingAttempts = 0
}
update.processingProgress = progress
break
case 'upgrade-started':
if (workspace.status.mode !== 'upgrading') {
if (wsStatus != null && wsStatus.mode !== 'upgrading') {
update.processingAttempts = 0
}
update.mode = 'upgrading'
@ -350,7 +356,7 @@ export async function updateWorkspaceInfo (
update.processingProgress = progress
break
case 'migrate-clean-done':
wsUpdate.region = workspace.status.targetRegion ?? ''
wsUpdate.region = wsStatus?.targetRegion ?? ''
update.mode = 'pending-restore'
update.processingProgress = progress
update.lastProcessingTime = Date.now() - processingTimeoutMs // To not wait for next step
@ -400,7 +406,7 @@ export async function updateWorkspaceInfo (
})
if (Object.keys(wsUpdate).length !== 0) {
await db.workspace.updateOne({ uuid: workspace.uuid }, wsUpdate)
await db.workspace.updateOne({ uuid: workspaceUuid }, wsUpdate)
}
}

View File

@ -222,6 +222,7 @@ export interface AccountDB {
}
export interface DbCollection<T> {
exists: (query: Query<T>) => Promise<boolean>
find: (query: Query<T>, sort?: Sort<T>, limit?: number) => Promise<T[]>
findOne: (query: Query<T>) => Promise<T | null>
insertOne: (data: Partial<T>) => Promise<any>

View File

@ -143,7 +143,7 @@ export function wrap (
): Promise<any> {
return await accountMethod(ctx, db, branding, token, { ...request.params }, meta)
.then((result) => ({ id: request.id, result }))
.catch((err) => {
.catch((err: Error) => {
const status =
err instanceof PlatformError
? err.status
@ -158,9 +158,9 @@ export function wrap (
if (status.code === platform.status.InternalServerError) {
Analytics.handleError(err)
ctx.error('error', { status, err })
ctx.error('Error while processing account method', { method: accountMethod.name, status, origErr: err })
} else {
ctx.error('error', { status })
ctx.error('Error while processing account method', { method: accountMethod.name, status })
}
return {

View File

@ -1,5 +1,5 @@
//
// Copyright © 2023 Hardcore Engineering Inc.
// Copyright © 2025 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
@ -35,6 +35,7 @@ export interface StorageConfiguration {
ctx: MeasureContext
adapter: CollabStorageAdapter
transformer: Transformer
saveRetryInterval?: number
}
type DocumentName = string
@ -50,9 +51,30 @@ export class StorageExtension implements Extension {
private readonly configuration: StorageConfiguration
private readonly updates = new Map<DocumentName, DocumentUpdates>()
private readonly markups = new Map<DocumentName, Record<Markup, Markup>>()
private readonly promises = new Map<DocumentName, Promise<void>>()
private readonly saveRetryInterval: number
private stopped = false
constructor (configuration: StorageConfiguration) {
this.configuration = configuration
this.saveRetryInterval = configuration.saveRetryInterval ?? 1000
}
async onDestroy (): Promise<any> {
this.stopped = true
const documents = Array.from(this.promises.keys())
const promises = Array.from(this.promises.values())
if (promises.length > 0) {
const { ctx } = this.configuration
try {
ctx.info('waiting for pending document saves', { documents, count: promises.length })
await Promise.all(promises)
} catch (error) {
ctx.error('error while waiting for pending document saves', { documents, error })
}
}
}
async onChange ({ context, document, documentName }: withContext<onChangePayload>): Promise<any> {
@ -98,18 +120,15 @@ export class StorageExtension implements Extension {
const { ctx } = this.configuration
const { connectionId } = context
const updates = this.updates.get(documentName)
const connections = document.getConnectionsCount()
const collaborators = updates?.collaborators.size ?? 0
ctx.info('store document', { documentName, connectionId, connections, collaborators })
ctx.info('store document', { documentName, connectionId, connections })
if (updates === undefined || updates.collaborators.size === 0) {
if (this.hasNoUpdates(documentName)) {
ctx.info('no changes for document', { documentName, connectionId })
return
}
updates.collaborators.clear()
await this.storeDocument(documentName, document, updates.context)
await this.storeDocument(documentName, document, context)
}
async onConnect ({ context, documentName, instance }: withContext<onConnectPayload>): Promise<any> {
@ -122,13 +141,11 @@ export class StorageExtension implements Extension {
const { ctx } = this.configuration
const { connectionId } = context
const updates = this.updates.get(documentName)
const connections = document.getConnectionsCount()
const collaborators = updates?.collaborators.size ?? 0
const updatedAt = updates?.collaborators.get(connectionId)
ctx.info('disconnect from document', { documentName, connectionId, connections, collaborators, updatedAt })
ctx.info('disconnect from document', { documentName, connectionId, connections })
if (updates === undefined || !updates.collaborators.has(connectionId)) {
const noUpdates = this.hasNoUpdates(documentName, connectionId)
if (noUpdates) {
ctx.info('no changes for document', { documentName, connectionId })
return
}
@ -138,14 +155,14 @@ export class StorageExtension implements Extension {
return
}
updates.collaborators.clear()
await this.storeDocument(documentName, document, context)
await this.storeDocument(documentName, document, context, connectionId)
}
async afterUnloadDocument ({ documentName }: afterUnloadDocumentPayload): Promise<any> {
this.configuration.ctx.info('unload document', { documentName })
this.updates.delete(documentName)
this.markups.delete(documentName)
this.promises.delete(documentName)
}
private async loadDocument (documentName: string, context: Context): Promise<YDoc | undefined> {
@ -162,22 +179,93 @@ export class StorageExtension implements Extension {
}
}
private async storeDocument (documentName: string, document: Document, context: Context): Promise<void> {
const { ctx, adapter } = this.configuration
private async storeDocument (
documentName: string,
document: Document,
context: Context,
connectionId?: string
): Promise<void> {
const prev = this.promises.get(documentName)
const curr = async (): Promise<void> => {
if (prev !== undefined) {
await prev
}
// Check whether we still have changes after the previous save
const noUpdates = this.hasNoUpdates(documentName, connectionId)
if (!noUpdates) {
await this.performStoreDocument(documentName, document, context)
}
}
const promise = curr()
this.promises.set(documentName, promise)
try {
const currMarkup = await ctx.with('save-document', {}, (ctx) =>
adapter.saveDocument(ctx, documentName, document, context, {
prev: () => this.markups.get(documentName) ?? {},
curr: () => this.configuration.transformer.fromYdoc(document)
})
)
this.markups.set(documentName, currMarkup ?? {})
} catch (err: any) {
Analytics.handleError(err)
ctx.error('failed to save document', { documentName, error: err })
throw new Error('Failed to save document')
await promise
} finally {
if (this.promises.get(documentName) === promise) {
this.promises.delete(documentName)
}
}
}
private async performStoreDocument (documentName: string, document: Document, context: Context): Promise<void> {
const { ctx, adapter } = this.configuration
let attempt = 0
while (true) {
attempt++
const now = Date.now()
try {
const currMarkup = await ctx.with('save-document', {}, (ctx) =>
adapter.saveDocument(ctx, documentName, document, context, {
prev: () => this.markups.get(documentName) ?? {},
curr: () => this.configuration.transformer.fromYdoc(document)
})
)
this.markups.set(documentName, currMarkup ?? {})
this.clearUpdates(documentName, now)
return
} catch (err: any) {
Analytics.handleError(err)
ctx.error('failed to save document', { documentName, attempt, error: err })
if (this.stopped) {
ctx.info('storage extension stopped, skipping document save', { documentName })
throw new Error('Aborted')
}
await new Promise((resolve) => setTimeout(resolve, this.saveRetryInterval))
}
}
}
private clearUpdates (documentName: string, timestamp: number): void {
const updates = this.updates.get(documentName)
if (updates !== undefined) {
for (const [connectionId, updatedAt] of updates.collaborators.entries()) {
if (updatedAt < timestamp) {
updates.collaborators.delete(connectionId)
}
}
}
}
private hasNoUpdates (documentName: string, connectionId?: string): boolean {
const updates = this.updates.get(documentName)
if (updates === undefined) {
return true
}
if (connectionId !== undefined) {
return !updates.collaborators.has(connectionId)
}
return updates.collaborators.size === 0
}
}

View File

@ -119,7 +119,6 @@ export class DatalakeClient {
if (err.name === 'NotFoundError') {
return undefined
}
console.error('failed to get object', { workspace, objectName, err })
throw err
}
@ -151,7 +150,6 @@ export class DatalakeClient {
if (err.name === 'NotFoundError') {
return undefined
}
console.error('failed to get partial object', { workspace, objectName, err })
throw err
}
@ -180,7 +178,6 @@ export class DatalakeClient {
if (err.name === 'NotFoundError') {
return
}
console.error('failed to stat object', { workspace, objectName, err })
throw err
}
@ -205,7 +202,6 @@ export class DatalakeClient {
})
} catch (err: any) {
if (err.name !== 'NotFoundError') {
console.error('failed to delete object', { workspace, objectName, err })
throw err
}
}
@ -230,19 +226,14 @@ export class DatalakeClient {
}
}
try {
if (size === undefined || size < 64 * 1024 * 1024) {
return await ctx.with('direct-upload', {}, (ctx) =>
this.uploadWithFormData(ctx, workspace, objectName, stream, { ...params, size })
)
} else {
return await ctx.with('multipart-upload', {}, (ctx) =>
this.uploadWithMultipart(ctx, workspace, objectName, stream, { ...params, size })
)
}
} catch (err) {
console.error('failed to put object', { workspace, objectName, err })
throw err
if (size === undefined || size < 64 * 1024 * 1024) {
return await ctx.with('direct-upload', {}, (ctx) =>
this.uploadWithFormData(ctx, workspace, objectName, stream, { ...params, size })
)
} else {
return await ctx.with('multipart-upload', {}, (ctx) =>
this.uploadWithMultipart(ctx, workspace, objectName, stream, { ...params, size })
)
}
}

View File

@ -188,7 +188,7 @@ export class WorkspaceWorker {
try {
return await accountClient.getPendingWorkspace(this.region, this.version, this.operation)
} catch (err) {
ctx.error('Error getting pending workspace:', { err })
ctx.error('Error getting pending workspace:', { origErr: err })
}
})
if (workspace == null) {
@ -202,7 +202,7 @@ export class WorkspaceWorker {
await this.doWorkspaceOperation(opContext, workspace, opt)
} catch (err: any) {
Analytics.handleError(err)
opContext.error('error', { err })
opContext.error('Error while performing workspace operation', { origErr: err })
}
})
// sleep for a little bit to avoid bombarding the account service, also add jitter to avoid simultaneous requests from multiple workspace services

View File

@ -61,7 +61,9 @@ export async function createWorkspace (
}
const createPingHandle = setInterval(() => {
void handleWsEvent?.('ping', version, 0)
handleWsEvent?.('ping', version, 0).catch((err: any) => {
ctx.error('Error while updating progress', { origErr: err })
})
}, 5000)
try {
@ -309,7 +311,9 @@ export async function upgradeWorkspaceWith (
let progress = 0
const updateProgressHandle = setInterval(() => {
void handleWsEvent?.('progress', version, progress)
handleWsEvent?.('progress', version, progress).catch((err: any) => {
ctx.error('Error while updating progress', { origErr: err })
})
}, 5000)
try {

View File

@ -85,6 +85,10 @@ export async function handleBlobGet (
res.status(status)
pipeline(blob.body, res, (err) => {
if (!blob.body.destroyed) {
blob.body.destroy()
}
if (err != null) {
// ignore abort errors to avoid flooding the logs
if (err.name === 'AbortError' || err.code === 'ERR_STREAM_PREMATURE_CLOSE') {
@ -98,10 +102,6 @@ export async function handleBlobGet (
}
}
})
req.on('close', () => {
blob.body.destroy()
})
}
export async function handleBlobHead (

View File

@ -117,7 +117,9 @@ export async function handleImageGet (
tempDir.rm(tmpFile, outFile)
}
req.on('error', cleanup)
req.on('close', cleanup)
res.on('error', cleanup)
res.on('finish', cleanup)
const blob = await datalake.get(ctx, workspace, name, {})
@ -165,7 +167,7 @@ async function runPipeline (
let pipeline: sharp.Sharp | undefined
try {
pipeline = sharp(inFile)
pipeline = sharp(inFile, { sequentialRead: true })
// auto orient image based on exif to prevent resize use wrong orientation
pipeline = pipeline.rotate()
@ -228,25 +230,21 @@ function getImageTransformParams (accept: string, transform: string): ImageTrans
async function writeTempFile (path: string, stream: Readable): Promise<void> {
const outp = createWriteStream(path)
stream.pipe(outp)
await new Promise<void>((resolve, reject) => {
stream.on('error', (err) => {
stream.destroy()
outp.destroy()
reject(err)
})
const cleanup = (err?: any): void => {
if (!stream.destroyed) stream.destroy()
if (!outp.destroyed) outp.destroy()
if (err !== undefined) reject(err)
}
outp.on('finish', () => {
stream.destroy()
resolve()
})
stream.on('error', cleanup)
outp.on('finish', resolve)
outp.on('error', cleanup)
outp.on('error', (err) => {
stream.destroy()
outp.destroy()
reject(err)
})
stream.pipe(outp)
}).finally(() => {
if (!stream.destroyed) stream.destroy()
if (!outp.destroyed) outp.destroy()
})
}

View File

@ -13,27 +13,23 @@
// limitations under the License.
//
import crypto from 'node:crypto'
import { createHash } from 'node:crypto'
import { createReadStream } from 'fs'
import { Readable } from 'stream'
export async function getBufferSha256 (buffer: Buffer): Promise<string> {
const hash = crypto.createHash('sha256')
const hash = createHash('sha256')
hash.write(buffer)
return hash.digest('hex')
}
export async function getStreamSha256 (stream: Readable): Promise<string> {
const hasher = crypto.createHash('sha256')
const hasher = createHash('sha256')
stream.pipe(hasher)
await new Promise<void>((resolve, reject) => {
stream.on('error', (err) => {
reject(err)
})
stream.on('end', () => {
resolve()
})
await new Promise((resolve, reject) => {
stream.on('end', resolve)
stream.on('error', reject)
})
return hasher.digest('hex')

View File

@ -141,9 +141,10 @@ describe('ChannelCache', () => {
const error = new Error('Database error')
mockClient.findOne.mockRejectedValue(error)
const result = await channelCache.getOrCreateChannel(spaceId, participants, emailAccount, personId)
await expect(channelCache.getOrCreateChannel(spaceId, participants, emailAccount, personId)).rejects.toThrow(
'Failed to create channel for test@example.com in space test-space-id: Database error'
)
expect(result).toBeUndefined()
expect(mockCtx.error).toHaveBeenCalledWith('Failed to create channel', {
me: emailAccount,
space: spaceId,

View File

@ -13,7 +13,8 @@
// limitations under the License.
//
import { MeasureContext, PersonId, Ref, TxOperations, Doc, WorkspaceUuid, generateId } from '@hcengineering/core'
import { MeasureContext, PersonId, Ref, TxOperations, WorkspaceUuid, generateId } from '@hcengineering/core'
import { type Card } from '@hcengineering/card'
import chat from '@hcengineering/chat'
import mail from '@hcengineering/mail'
import { PersonSpace } from '@hcengineering/contact'
@ -27,7 +28,7 @@ const createMutex = new SyncMutex()
*/
export class ChannelCache {
// Key is `${spaceId}:${normalizedEmail}`
private readonly cache = new Map<string, Ref<Doc>>()
private readonly cache = new Map<string, Ref<Card>>()
constructor (
private readonly ctx: MeasureContext,
@ -43,7 +44,7 @@ export class ChannelCache {
participants: PersonId[],
email: string,
owner: PersonId
): Promise<Ref<Doc> | undefined> {
): Promise<Ref<Card>> {
const normalizedEmail = normalizeEmail(email)
const cacheKey = `${spaceId}:${normalizedEmail}`
@ -78,7 +79,7 @@ export class ChannelCache {
participants: PersonId[],
email: string,
personId: PersonId
): Promise<Ref<Doc> | undefined> {
): Promise<Ref<Card>> {
const normalizedEmail = normalizeEmail(email)
try {
// First try to find existing channel
@ -86,7 +87,7 @@ export class ChannelCache {
if (channel != null) {
this.ctx.info('Using existing channel', { me: normalizedEmail, space, channel: channel._id })
return channel._id
return channel._id as Ref<Card>
}
return await this.createNewChannel(space, participants, normalizedEmail, personId)
@ -101,7 +102,9 @@ export class ChannelCache {
// Remove failed lookup from cache
this.cache.delete(`${space}:${normalizedEmail}`)
return undefined
throw new Error(
`Failed to create channel for ${normalizedEmail} in space ${space}: ${err instanceof Error ? err.message : String(err)}`
)
}
}
@ -110,7 +113,7 @@ export class ChannelCache {
participants: PersonId[],
email: string,
personId: PersonId
): Promise<Ref<Doc> | undefined> {
): Promise<Ref<Card>> {
const normalizedEmail = normalizeEmail(email)
const mutexKey = `channel:${this.workspace}:${space}:${normalizedEmail}`
const releaseLock = await createMutex.lock(mutexKey)
@ -124,7 +127,7 @@ export class ChannelCache {
space,
channel: existingChannel._id
})
return existingChannel._id
return existingChannel._id as Ref<Card>
}
// Create new channel if it doesn't exist
@ -156,7 +159,7 @@ export class ChannelCache {
personId
)
return channelId
return channelId as Ref<Card>
} finally {
releaseLock()
}

View File

@ -16,7 +16,7 @@ import { Producer } from 'kafkajs'
import { WorkspaceLoginInfo } from '@hcengineering/account-client'
import { type Card } from '@hcengineering/card'
import { MessageType } from '@hcengineering/communication-types'
import { MessageID, MessageType } from '@hcengineering/communication-types'
import chat from '@hcengineering/chat'
import { PersonSpace } from '@hcengineering/contact'
import {
@ -25,20 +25,25 @@ import {
type PersonId,
type Ref,
type TxOperations,
Doc,
AccountUuid,
generateId,
RateLimiter,
Space
RateLimiter
} from '@hcengineering/core'
import mail from '@hcengineering/mail'
import { type KeyValueClient } from '@hcengineering/kvs-client'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import { MessageRequestEventType } from '@hcengineering/communication-sdk-types'
import {
AddCollaboratorsEvent,
MessageRequestEventType,
CreateFileEvent,
CreateMessageEvent,
CreateThreadEvent,
NotificationRequestEventType
} from '@hcengineering/communication-sdk-types'
import { generateMessageId } from '@hcengineering/communication-shared'
import { BaseConfig, type Attachment } from './types'
import { EmailMessage, MailRecipient } from './types'
import { EmailMessage, MailRecipient, MessageData } from './types'
import { getMdContent } from './utils'
import { PersonCacheFactory } from './person'
import { PersonSpacesCacheFactory } from './personSpaces'
@ -161,8 +166,7 @@ export async function createMessages (
subject,
content,
attachedBlobs,
person.email,
person.socialId,
person,
message.sendOn,
channelCache,
replyTo
@ -188,8 +192,7 @@ async function saveMessageToSpaces (
subject: string,
content: string,
attachments: Attachment[],
me: string,
owner: PersonId,
recipient: MailRecipient,
createdDate: number,
channelCache: ChannelCache,
inReplyTo?: string
@ -197,9 +200,8 @@ async function saveMessageToSpaces (
const rateLimiter = new RateLimiter(10)
for (const space of spaces) {
const spaceId = space._id
let isReply = false
await rateLimiter.add(async () => {
ctx.info('Saving message to space', { mailId, space: spaceId })
let threadId = await threadLookup.getThreadId(mailId, spaceId)
if (threadId !== undefined) {
ctx.info('Message is already in the thread, skip', { mailId, threadId, spaceId })
@ -208,13 +210,10 @@ async function saveMessageToSpaces (
if (inReplyTo !== undefined) {
threadId = await threadLookup.getParentThreadId(inReplyTo, spaceId)
if (threadId !== undefined) {
ctx.info('Found existing thread', { mailId, threadId, spaceId })
}
isReply = threadId !== undefined
}
let channel: Ref<Doc<Space>> | undefined
const channel = await channelCache.getOrCreateChannel(spaceId, participants, recipient.email, recipient.socialId)
if (threadId === undefined) {
channel = await channelCache.getOrCreateChannel(spaceId, participants, me, owner)
const newThreadId = await client.createDoc(
chat.masterTag.Thread,
space._id,
@ -233,78 +232,151 @@ async function saveMessageToSpaces (
createdDate,
modifiedBy
)
await client.createMixin(
newThreadId,
chat.masterTag.Thread,
space._id,
mail.tag.MailThread,
{},
createdDate,
owner
)
threadId = newThreadId as Ref<Card>
ctx.info('Created new thread', { mailId, threadId, spaceId })
}
const messageId = generateMessageId()
const created = new Date(createdDate)
const messageData = Buffer.from(
JSON.stringify({
type: MessageRequestEventType.CreateMessage,
messageType: MessageType.Message,
card: threadId,
cardType: chat.masterTag.Thread,
content,
creator: modifiedBy,
created,
id: messageId
})
)
await producer.send({
topic: config.CommunicationTopic,
messages: [
{
key: Buffer.from(channel ?? spaceId),
value: messageData,
headers: {
WorkspaceUuid: wsInfo.workspace
}
}
]
})
ctx.info('Send message event', { mailId, messageId, threadId })
const messageData: MessageData = {
subject,
content,
channel,
created,
modifiedBy,
mailId,
spaceId,
threadId,
workspace: wsInfo.workspace,
recipient,
isReply
}
const fileData: Buffer[] = attachments.map((a) =>
Buffer.from(
JSON.stringify({
type: MessageRequestEventType.CreateFile,
card: threadId,
message: messageId,
messageCreated: created,
blobId: a.id as Ref<Blob>,
fileType: a.contentType,
filename: a.name,
size: a.data.length,
creator: modifiedBy
})
)
)
const fileEvents = fileData.map((data) => ({
key: Buffer.from(channel ?? spaceId),
value: data,
headers: {
WorkspaceUuid: wsInfo.workspace
}
}))
await producer.send({
topic: config.CommunicationTopic,
messages: fileEvents
})
ctx.info('Send file events', { mailId, messageId, threadId, count: fileEvents.length })
const messageId = await createMailMessage(producer, config, messageData, threadId)
if (!isReply) {
await addCollaborators(producer, config, messageData, threadId)
await createMailThread(producer, config, messageData, messageId)
}
await createFiles(producer, config, attachments, messageData, threadId, messageId)
await threadLookup.setThreadId(mailId, space._id, threadId)
})
}
await rateLimiter.waitProcessing()
}
async function createMailThread (
producer: Producer,
config: BaseConfig,
data: MessageData,
messageId: MessageID
): Promise<void> {
const threadEvent: CreateThreadEvent = {
type: MessageRequestEventType.CreateThread,
card: data.channel,
message: messageId,
messageCreated: data.created,
thread: data.threadId,
threadType: chat.masterTag.Thread
}
const thread = Buffer.from(JSON.stringify(threadEvent))
await sendToCommunicationTopic(producer, config, data, thread)
}
async function createMailMessage (
producer: Producer,
config: BaseConfig,
data: MessageData,
threadId: Ref<Card>
): Promise<MessageID> {
const messageId = generateMessageId()
const createMessageEvent: CreateMessageEvent = {
type: MessageRequestEventType.CreateMessage,
messageType: MessageType.Message,
card: data.isReply ? threadId : data.channel,
cardType: chat.masterTag.Thread,
content: data.content,
creator: data.modifiedBy,
created: data.created,
id: messageId
}
const createMessageData = Buffer.from(JSON.stringify(createMessageEvent))
await sendToCommunicationTopic(producer, config, data, createMessageData)
return messageId
}
async function createFiles (
producer: Producer,
config: BaseConfig,
attachments: Attachment[],
messageData: MessageData,
threadId: Ref<Card>,
messageId: MessageID
): Promise<void> {
const fileData: Buffer[] = attachments.map((a) => {
const creeateFileEvent: CreateFileEvent = {
type: MessageRequestEventType.CreateFile,
card: threadId,
message: messageId,
messageCreated: messageData.created,
creator: messageData.modifiedBy,
data: {
blobId: a.id as Ref<Blob>,
type: a.contentType,
filename: a.name,
size: a.data.length
}
}
return Buffer.from(JSON.stringify(creeateFileEvent))
})
const fileEvents = fileData.map((data) => ({
key: Buffer.from(messageData.channel ?? messageData.spaceId),
value: data,
headers: {
WorkspaceUuid: messageData.workspace
}
}))
await producer.send({
topic: config.CommunicationTopic,
messages: fileEvents
})
}
async function addCollaborators (
producer: Producer,
config: BaseConfig,
data: MessageData,
threadId: Ref<Card>
): Promise<void> {
if (data.recipient.socialId === data.modifiedBy) {
return // Message author should be automatically added as a collaborator
}
const addCollaboratorsEvent: AddCollaboratorsEvent = {
type: NotificationRequestEventType.AddCollaborators,
card: threadId,
cardType: chat.masterTag.Thread,
collaborators: [data.recipient.uuid as AccountUuid],
creator: data.modifiedBy
}
const createMessageData = Buffer.from(JSON.stringify(addCollaboratorsEvent))
await sendToCommunicationTopic(producer, config, data, createMessageData)
}
async function sendToCommunicationTopic (
producer: Producer,
config: BaseConfig,
messageData: MessageData,
content: Buffer
): Promise<void> {
await producer.send({
topic: config.CommunicationTopic,
messages: [
{
key: Buffer.from(messageData.channel ?? messageData.spaceId),
value: content,
headers: {
WorkspaceUuid: messageData.workspace
}
}
]
})
}

View File

@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import { PersonId, PersonUuid } from '@hcengineering/core'
import { Card } from '@hcengineering/card'
import { PersonSpace } from '@hcengineering/contact'
import { PersonId, PersonUuid, Ref, WorkspaceUuid } from '@hcengineering/core'
//
export interface Attachment {
@ -58,3 +60,17 @@ export interface BaseConfig {
QueueRegion: string
CommunicationTopic: string
}
export interface MessageData {
subject: string
content: string
channel: Ref<Card>
created: Date
modifiedBy: PersonId
mailId: string
spaceId: Ref<PersonSpace>
workspace: WorkspaceUuid
threadId: Ref<Card>
recipient: MailRecipient
isReply: boolean
}

View File

@ -0,0 +1,81 @@
{
"envelope": {
"from": {
"address": "example1@test.com"
},
"to": [
{
"address": "recipient2@example.com"
}
]
},
"message": {
"contents": "VGVzdCBlbmNvZGVkIGVtYWlsIGNvbnRlbnQ=",
"headers": [
[
"Received",
" from mail-nwsmtp-mxback-production-main-38.iva.yp-c.yandex.net (mail-nwsmtp-mxback-production-main-38.iva.yp-c.yandex.net [IPv6:2a02:6b8:c0c:1724:0:640:dee6:0])\r\n\tby forward500b.mail.yandex.net (Yandex) with ESMTPS id 65A23611AA\r\n\tfor <artyom@huly.app>; Tue, 3 Jun 2025 06:35:44 +0300 (MSK)\r\n"
],
[
"Received",
" from mail.yandex.ru (2a02:6b8:c0c:b187:0:640:6b88:0 [2a02:6b8:c0c:b187:0:640:6b88:0])\r\n\tby mail-nwsmtp-mxback-production-main-38.iva.yp-c.yandex.net (mxback/Yandex) with HTTPS id YZRhuVDL1eA0-m7PJdtq8;\r\n\tTue, 03 Jun 2025 06:35:44 +0300\r\n"
],
[
"X-Yandex-Fwd",
" 1\r\n"
],
[
"DKIM-Signature",
" v=1; a=rsa-sha256; c=relaxed/relaxed; d=yandex.ru; s=mail;\r\n\tt=1748921744; bh=DnseDjFgmtsB1kN2sgMKhzeGZ1TcOQm0aEN3ux6v8k0=;\r\n\th=Message-Id:Date:Subject:In-Reply-To:To:From;\r\n\tb=mOwsmrIzUHfrnHY6fjABtgU2IHkXKyHjoEmbKNGHPkFFdq9fqtNiw7rwX7HYJIFwN\r\n\t Jx9ZGkGNLpDGElAXs67xexAp6t/mAebaInU/5/C7nJd8YMlkauUGTKmQDD4rtOrBSG\r\n\t 2LAfXrsAyEVaeqnIjhNEir+sAWHyA1+kDPpA4jCc=\r\n"
],
[
"Authentication-Results",
" mail-nwsmtp-mxback-production-main-38.iva.yp-c.yandex.net; dkim=pass header.i=@yandex.ru\r\n"
],
[
"Received",
" by qvxj4z7i6zm4ub2j.iva.yp-c.yandex.net with HTTP;\r\n\tTue, 03 Jun 2025 06:35:43 +0300\r\n"
],
[
"From",
" =?utf-8?B?RXhhbXBsZSBVc2VyMQ==?= <example1@test.com>\r\n"
],
[
"To",
" \"Example Recipient2\" <recipient2@example.com>\r\n"
],
[
"In-Reply-To",
" YUvuZoQ3ypgAAAAAAfVzrQAAAAAAAAQu\r\n"
],
[
"Subject",
" =?utf-8?B?VGhpcyBpcyBlbmNvZGVkIGVtYWlsIHN1YmplY3Q=?=\r\n"
],
[
"MIME-Version",
" 1.0\r\n"
],
[
"X-Mailer",
" Yamail [ http://yandex.ru ] 5.0\r\n"
],
[
"Date",
" Tue, 03 Jun 2025 06:35:43 +0300\r\n"
],
[
"Message-Id",
" <1296141748921736@mail.yandex.ru>\r\n"
],
[
"Content-Transfer-Encoding",
" base64\r\n"
],
[
"Content-Type",
" text/html; charset=utf-8\r\n"
]
]
}
}

View File

@ -0,0 +1,230 @@
//
// Copyright © 2025 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { decodeContent, decodeEncodedWords } from '../decode'
import { MeasureContext } from '@hcengineering/core'
jest.mock(
'../config',
() => ({
hookToken: 'test-hook-token',
ignoredAddresses: ['ignored@example.com'],
storageConfig: 'test-storage-config',
workspaceUrl: 'test-workspace'
}),
{ virtual: true }
)
const mockCtx: MeasureContext = {
info: jest.fn(),
error: jest.fn(),
warn: jest.fn()
} as any
describe('decodeContent', () => {
test('should return original content when encoding is undefined', () => {
const content = 'Hello World'
const result = decodeContent(mockCtx, content, undefined)
expect(result).toBe(content)
})
test('should return original content when encoding is empty string', () => {
const content = 'Hello World'
const result = decodeContent(mockCtx, content, '')
expect(result).toBe(content)
})
test('should decode base64 content', () => {
const base64Content = 'SGVsbG8gV29ybGQ=' // "Hello World" in base64
const result = decodeContent(mockCtx, base64Content, 'base64')
expect(result).toBe('Hello World')
})
test('should decode base64 content with case insensitive encoding', () => {
const base64Content = 'SGVsbG8gV29ybGQ='
const result = decodeContent(mockCtx, base64Content, 'BASE64')
expect(result).toBe('Hello World')
})
test('should decode quoted-printable content', () => {
const qpContent = 'Hello=20World=21'
const result = decodeContent(mockCtx, qpContent, 'quoted-printable')
expect(result).toBe('Hello World!')
})
test('should handle quoted-printable with soft line breaks', () => {
const qpContent = 'This is a very long line that needs=\r\nto be wrapped'
const result = decodeContent(mockCtx, qpContent, 'quoted-printable')
expect(result).toBe('This is a very long line that needsto be wrapped')
})
test('should return original content for 7bit encoding', () => {
const content = 'Plain text content'
const result = decodeContent(mockCtx, content, '7bit')
expect(result).toBe(content)
})
test('should return original content for 8bit encoding', () => {
const content = 'Plain text with émojis 🎉'
const result = decodeContent(mockCtx, content, '8bit')
expect(result).toBe(content)
})
test('should return original content for binary encoding', () => {
const content = 'Binary content'
const result = decodeContent(mockCtx, content, 'binary')
expect(result).toBe(content)
})
test('should return original content for unknown encoding', () => {
const content = 'Unknown encoding content'
const result = decodeContent(mockCtx, content, 'unknown-encoding')
expect(result).toBe(content)
})
})
describe('decodeEncodedWords', () => {
test('should return original text when no encoded words present', () => {
const text = 'Plain text without encoding'
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe(text)
})
test('should decode base64 encoded word', () => {
const text = '=?utf-8?B?SGVsbG8gV29ybGQ=?='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('Hello World')
})
test('should decode quoted-printable encoded word', () => {
const text = '=?utf-8?Q?Hello=20World?='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('Hello World')
})
test('should decode quoted-printable with underscores as spaces', () => {
const text = '=?utf-8?Q?Hello_World?='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('Hello World')
})
test('should handle multiple encoded words in same text', () => {
const text = '=?utf-8?B?SGVsbG8=?= =?utf-8?B?V29ybGQ=?='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('Hello World')
})
test('should handle mixed encoded and plain text', () => {
const text = 'Subject: =?utf-8?B?SGVsbG8=?= from sender'
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('Subject: Hello from sender')
})
test('should handle case insensitive encoding (lowercase b)', () => {
const text = '=?utf-8?b?SGVsbG8gV29ybGQ=?='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('Hello World')
})
test('should handle case insensitive encoding (lowercase q)', () => {
const text = '=?utf-8?q?Hello_World?='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('Hello World')
})
test('should handle unknown encoding gracefully', () => {
const text = '=?utf-8?X?unknown?='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe(text) // Should return original
})
test('should decode real-world email subject', () => {
const text = '=?UTF-8?B?8J+OiSBXZWxjb21lIHRvIG91ciBwbGF0Zm9ybSE=?='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('🎉 Welcome to our platform!')
})
test('should handle empty encoded text', () => {
const text = '=?utf-8?B??='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('')
})
test('should handle different charset - ISO-8859-1', () => {
const text = '=?iso-8859-1?B?SGVsbG8gV29ybGQ=?='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('Hello World')
})
test('should handle different charset - latin1', () => {
const text = '=?latin1?B?SGVsbG8gV29ybGQ=?='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('Hello World')
})
test('should handle different charset - windows-1252', () => {
const text = '=?windows-1252?B?SGVsbG8gV29ybGQ=?='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('Hello World')
})
test('should handle ASCII charset', () => {
const text = '=?us-ascii?B?SGVsbG8gV29ybGQ=?='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('Hello World')
})
test('should handle case insensitive charset names', () => {
const text = '=?UTF-8?B?SGVsbG8gV29ybGQ=?='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('Hello World')
})
test('should handle charset with whitespace', () => {
const text = '=? utf-8 ?B?SGVsbG8gV29ybGQ=?='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('Hello World')
})
test('should default to utf8 for unsupported charset', () => {
const text = '=?gb2312?B?SGVsbG8gV29ybGQ=?='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('Hello World') // Should still decode as utf8
})
test('should handle mixed charsets in same text', () => {
const text = '=?utf-8?B?SGVsbG8=?= =?iso-8859-1?B?V29ybGQ=?='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('Hello World')
})
test('should handle quoted-printable with different charset', () => {
const text = '=?iso-8859-1?Q?caf=E9?='
const result = decodeEncodedWords(mockCtx, text)
expect(result).toBe('café')
})
test('should handle error in charset conversion gracefully', () => {
const consoleSpy = jest.spyOn(mockCtx, 'warn')
// This might cause an encoding issue depending on the content
const text = '=?invalid-charset?B?invalid-content?='
const result = decodeEncodedWords(mockCtx, text)
// Should either decode successfully with fallback or return original
expect(typeof result).toBe('string')
consoleSpy.mockRestore()
})
})

View File

@ -13,6 +13,8 @@
// limitations under the License.
//
import fs from 'fs/promises'
import path from 'path'
import { Request, Response } from 'express'
import { MeasureContext } from '@hcengineering/core'
import { createMessages } from '@hcengineering/mail-common'
@ -486,4 +488,41 @@ This is an **HTML** test email`
[]
)
})
it('should decode encoded content in email', async () => {
// Create a multipart email with both text and HTML
const base64MessageData = await fs.readFile(path.join(__dirname, '__mocks__/base64Message.json'), 'utf-8')
const mtaMessage: MtaMessage = JSON.parse(base64MessageData)
mockReq = {
headers: { 'x-hook-token': 'test-hook-token' },
body: mtaMessage
}
await handleMtaHook(mockReq as Request, mockRes as Response, mockCtx)
// Should return 200
expect(mockStatus).toHaveBeenCalledWith(200)
expect(mockSend).toHaveBeenCalledWith({ action: 'accept' })
// Should process the message with both content types
expect(createMessages).toHaveBeenCalledWith(
client.baseConfig,
mockCtx,
mockTxOperations,
{},
{},
client.mailServiceToken,
mockLoginInfo,
expect.objectContaining({
mailId: expect.any(String),
from: { email: 'example1@test.com', firstName: 'Example', lastName: 'User1' },
to: [{ email: 'recipient2@example.com', firstName: 'Example', lastName: 'Recipient2' }],
subject: 'This is encoded email subject',
content: 'Test encoded email content',
incoming: true
}),
[]
)
})
})

View File

@ -0,0 +1,130 @@
//
// Copyright © 2025 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { MeasureContext } from '@hcengineering/core'
import { MtaMessage } from './types'
import { getHeader } from './utils'
export function getDecodedContent (ctx: MeasureContext, mta: MtaMessage): string {
const contentEncoding = getHeader(mta, 'Content-Transfer-Encoding')
return decodeContent(ctx, mta.message.contents, contentEncoding)
}
export function decodeContent (ctx: MeasureContext, content: string, encoding: string | undefined): string {
if (encoding == null || encoding.trim() === '') {
return content
}
const normalizedEncoding = encoding.toLowerCase().trim()
switch (normalizedEncoding) {
case 'base64':
try {
return Buffer.from(content, 'base64').toString('utf-8')
} catch (error: any) {
ctx.warn('Failed to decode base64 content:', { error: error.message })
return content
}
case 'quoted-printable':
return decodeQuotedPrintable(content)
case '7bit':
case '8bit':
case 'binary':
default:
return content
}
}
function decodeQuotedPrintable (content: string): string {
return content
.replace(/=([0-9A-F]{2})/gi, (match, hex) => {
return String.fromCharCode(parseInt(hex, 16))
})
.replace(/=\r?\n/g, '') // Remove soft line breaks
.replace(/=$/gm, '') // Remove trailing = at end of lines
}
export function decodeEncodedWords (ctx: MeasureContext, text: string): string {
// RFC 2047 encoded word pattern: =?charset?encoding?encoded_text?=
const encodedWordPattern = /=\?([^?]+)\?([BQbq])\?([^?]*)\?=/g
return text.replace(encodedWordPattern, (match, charset, encoding, encodedText) => {
try {
const normalizedEncoding = encoding.toLowerCase()
let decodedBytes: Buffer
if (normalizedEncoding === 'b') {
// Base64 encoding
decodedBytes = Buffer.from(encodedText, 'base64')
} else if (normalizedEncoding === 'q') {
// Quoted-printable encoding (with some modifications for encoded words)
const qpDecoded = encodedText
.replace(/_/g, ' ') // Underscores represent spaces in encoded words
.replace(/=([0-9A-F]{2})/gi, (_match: any, hex: string) => {
return String.fromCharCode(parseInt(hex, 16))
})
decodedBytes = Buffer.from(qpDecoded, 'binary')
} else {
// Unknown encoding, return original
return match
}
// Convert to string using the specified charset
const normalizedCharset = normalizeCharset(charset)
return decodedBytes.toString(normalizedCharset)
} catch (error: any) {
ctx.warn('Failed to decode encoded word:', { match, error: error.message })
return match // Return original if decoding fails
}
})
}
function normalizeCharset (charset: string): BufferEncoding {
const normalized = charset.toLowerCase().trim()
// Map common charset aliases to Node.js Buffer encodings
switch (normalized) {
case 'utf-8':
case 'utf8':
return 'utf8'
case 'iso-8859-1':
case 'latin1':
case 'cp1252':
case 'windows-1252':
return 'latin1'
case 'ascii':
case 'us-ascii':
return 'ascii'
case 'utf-16':
case 'utf-16le':
case 'ucs-2':
case 'ucs2':
return 'utf16le'
case 'base64':
return 'base64'
case 'hex':
return 'hex'
// For any unsupported charset, default to utf8
default:
return 'utf8'
}
}

View File

@ -23,6 +23,7 @@ import { mailServiceToken, baseConfig, kvsClient } from './client'
import config from './config'
import { MtaMessage } from './types'
import { getHeader, parseContent } from './utils'
import { decodeEncodedWords } from './decode'
export async function handleMtaHook (req: Request, res: Response, ctx: MeasureContext): Promise<void> {
try {
@ -60,7 +61,7 @@ export async function handleMtaHook (req: Request, res: Response, ctx: MeasureCo
}
}
const subject = getHeader(mta, 'Subject') ?? ''
const subject = decodeEncodedWords(ctx, getHeader(mta, 'Subject') ?? '')
const inReplyTo = getHeader(mta, 'In-Reply-To')
const { content, attachments } = await parseContent(ctx, mta)
@ -132,7 +133,7 @@ function extractContactName (
// Match name part that appears before an email in angle brackets
const nameMatch = fromHeader.match(/^\s*"?([^"<]+?)"?\s*<.+?>/)
const encodedName = nameMatch?.[1].trim() ?? ''
const name = encodedName.length > 0 ? decodeMimeWord(ctx, encodedName) : ''
const name = encodedName.length > 0 ? decodeEncodedWords(ctx, encodedName) : ''
let [firstName, lastName] = name.split(' ')
if (firstName === undefined || firstName.length === 0) {
firstName = email.split('@')[0]
@ -143,28 +144,6 @@ function extractContactName (
return { firstName, lastName }
}
function decodeMimeWord (ctx: MeasureContext, text: string): string {
return text.replace(/=\?([^?]+)\?([BQ])\?([^?]+)\?=/gi, (match, charset, encoding, content) => {
try {
if (encoding.toUpperCase() === 'B') {
// Base64 encoding
const buffer = Buffer.from(content, 'base64')
return buffer.toString(charset as BufferEncoding)
} else if (encoding.toUpperCase() === 'Q') {
// Quoted-printable encoding
const decoded = content
.replace(/_/g, ' ')
.replace(/=([0-9A-F]{2})/gi, (_: any, hex: string) => String.fromCharCode(parseInt(hex, 16)))
return Buffer.from(decoded).toString(charset as BufferEncoding)
}
return match
} catch (error) {
ctx.warn('Failed to decode encoded word', { error })
return match
}
})
}
function stripTags (email: string): string {
const [name, domain] = email.split('@')
const tagStart = name.indexOf('+')

View File

@ -21,6 +21,7 @@ import { type Attachment } from '@hcengineering/mail-common'
import { MtaMessage } from './types'
import config from './config'
import { getDecodedContent } from './decode'
export async function parseContent (
ctx: MeasureContext,
@ -34,10 +35,10 @@ export async function parseContent (
}
if (contentType.toLowerCase().startsWith('text/plain')) {
return { content: mta.message.contents, attachments: [] }
return { content: getDecodedContent(ctx, mta), attachments: [] }
}
const email = await getEmailContent(mta)
const email = await getEmailContent(ctx, mta)
let content = email.text ?? ''
let isMarkdown = false
@ -83,14 +84,14 @@ export async function parseContent (
return { content, attachments }
}
export function convertMtaToEml (mta: MtaMessage): string {
export function convertMtaToEml (ctx: MeasureContext, mta: MtaMessage): string {
return `MIME-Version: 1.0
Date: ${new Date().toUTCString()}
From: ${mta.envelope.from.address}
To: ${mta.envelope.to.map((to) => to.address).join(', ')}
Content-Type: ${getHeader(mta, 'Content-Type') ?? 'text/plain; charset=utf-8'}
${unescapeString(mta.message.contents)}`
${unescapeString(getDecodedContent(ctx, mta))}`
}
function unescapeString (str: string): string {
@ -107,8 +108,8 @@ export function getHeader (mta: MtaMessage, header: string): string | undefined
return mta.message.headers.find((header) => header[0].toLowerCase() === h)?.[1]?.trim()
}
async function getEmailContent (mta: MtaMessage): Promise<ReadedEmlJson> {
const eml = convertMtaToEml(mta)
async function getEmailContent (ctx: MeasureContext, mta: MtaMessage): Promise<ReadedEmlJson> {
const eml = convertMtaToEml(ctx, mta)
const email = await new Promise<ReadedEmlJson>((resolve, reject) => {
readEml(eml, (err, json) => {
if (err !== undefined && err !== null) {
@ -123,7 +124,7 @@ async function getEmailContent (mta: MtaMessage): Promise<ReadedEmlJson> {
if (isEmptyString(email.text) && isEmptyString(email.html)) {
return {
...email,
text: removeContentTypeHeader(mta.message.contents)
text: removeContentTypeHeader(getDecodedContent(ctx, mta))
}
}
return email