diff --git a/services/mail/mail-common/src/__tests__/mutex.test.ts b/services/mail/mail-common/src/__tests__/mutex.test.ts index 66cf83cc6b..f491964a96 100644 --- a/services/mail/mail-common/src/__tests__/mutex.test.ts +++ b/services/mail/mail-common/src/__tests__/mutex.test.ts @@ -449,4 +449,320 @@ describe('SyncMutex', () => { expect(results).toEqual(['caught error', 'acquired lock', 'released lock']) }) + + it('should be resilient to unexpected errors during lock acquisition', async () => { + // Existing test... + }) + + it('should release locks when an error is thrown within a promise chain', async () => { + const lockKey = 'promise-error' + const executionOrder: string[] = [] + + // First lock with an error in the promise chain + try { + const promise = mutex.lock(lockKey).then((release) => { + executionOrder.push('lock1 acquired') + + // Create a promise chain with an error + return Promise.resolve() + .then(() => { + executionOrder.push('lock1 processing') + throw new Error('Error in promise chain') + }) + .finally(() => { + executionOrder.push('lock1 releasing') + release() + }) + }) + + await promise + executionOrder.push('lock1 completed') // Should not reach here + } catch (error) { + executionOrder.push('error caught') + } + + // Second lock should still work + const release2 = await mutex.lock(lockKey) + executionOrder.push('lock2 acquired') + release2() + + expect(executionOrder).toEqual([ + 'lock1 acquired', + 'lock1 processing', + 'lock1 releasing', + 'error caught', + 'lock2 acquired' + ]) + }) + + it('should handle async rejection with multiple lock requests', async () => { + const lockKey = 'async-rejection' + const results: string[] = [] + + // First task acquires the lock but fails + const failingTask = async (): Promise => { + const release = await mutex.lock(lockKey) + try { + results.push('task1 started') + await new Promise((resolve) => setTimeout(resolve, 20)) + throw new Error('Async task error') + } catch (error) { + results.push('task1 error caught') + throw error // Re-throw the error + } finally { + results.push('task1 releasing lock') + release() + } + } + + // Second task waits for the lock + const waitingTask = async (): Promise => { + results.push('task2 waiting') + const release = await mutex.lock(lockKey) + try { + results.push('task2 acquired lock') + await new Promise((resolve) => setTimeout(resolve, 10)) + results.push('task2 completed') + } finally { + release() + } + } + + // Run both tasks + try { + await failingTask() + } catch (error) { + results.push('outer catch') + } + + await waitingTask() + + // Verify error handling and lock release + expect(results).toEqual([ + 'task1 started', + 'task1 error caught', + 'task1 releasing lock', + 'outer catch', + 'task2 waiting', + 'task2 acquired lock', + 'task2 completed' + ]) + }) + + it('should handle nested lock acquisitions with errors', async () => { + const outerKey = 'outer-lock' + const innerKey = 'inner-lock' + const results: string[] = [] + + // Function that acquires nested locks with an error in the middle + const nestedLocks = async (shouldFail: boolean): Promise => { + const outerRelease = await mutex.lock(outerKey) + try { + results.push('outer lock acquired') + + // Acquire inner lock + const innerRelease = await mutex.lock(innerKey) + try { + results.push('inner lock acquired') + + // Simulate work that might fail + if (shouldFail) { + throw new Error('Error in nested locks') + } + + results.push('work completed successfully') + } finally { + results.push('inner lock releasing') + innerRelease() + } + } catch (error) { + results.push('nested error caught') + throw error // Re-throw to test outer catch + } finally { + results.push('outer lock releasing') + outerRelease() + } + } + + // First attempt - should fail + try { + await nestedLocks(true) + } catch (error) { + results.push('outer error handler') + } + + // Second attempt - should succeed + await nestedLocks(false) + + // Verify both locks were properly released during the error scenario + expect(results).toEqual([ + 'outer lock acquired', + 'inner lock acquired', + 'inner lock releasing', + 'nested error caught', + 'outer lock releasing', + 'outer error handler', + 'outer lock acquired', + 'inner lock acquired', + 'work completed successfully', + 'inner lock releasing', + 'outer lock releasing' + ]) + }) + + it('should handle complex promise interactions with multiple rejections', async () => { + const lockKey = 'complex-promises' + const results: string[] = [] + + // Simulate a complex chain of promises with potential errors + const complexOperation = async (): Promise => { + const release = await mutex.lock(lockKey) + + try { + results.push('lock acquired') + + // Create a promise that might reject + const operation = new Promise((resolve, reject) => { + setTimeout(() => { + // Randomly decide to succeed or fail + if (Math.random() < 0.5) { + reject(new Error('Random operation failure')) + } else { + resolve('operation succeeded') + } + }, 10) + }) + + // Create a race condition + const result = await Promise.race([ + operation, + // eslint-disable-next-line promise/param-names + new Promise((_, reject) => { + setTimeout(() => { + reject(new Error('Timeout')) + }, 15) + }) + ]) + + results.push(result) + } catch (error: any) { + results.push(`error: ${error.message}`) + } finally { + results.push('lock released') + release() + } + } + + // Run multiple operations in parallel + await Promise.all([complexOperation(), complexOperation(), complexOperation()]) + + // Each operation should have 3 log entries: acquired, result/error, released + expect(results.length).toBe(9) + + // Group the results in sets of 3 + const operations = [results.slice(0, 3), results.slice(3, 6), results.slice(6, 9)] + + // Each operation should follow the pattern: acquired -> result/error -> released + operations.forEach((op) => { + expect(op[0]).toBe('lock acquired') + expect(op[2]).toBe('lock released') + // Middle entry should be either the result or an error + expect(op[1]).toMatch(/^(operation succeeded|error: .*)/) + }) + }) + + it('should handle rejections in async/await expressions within lock', async () => { + const lockKey = 'async-await-rejection' + const executionOrder: string[] = [] + + // Function that will throw within an async/await expression + const asyncFunction = async (): Promise => { + const release = await mutex.lock(lockKey) + try { + executionOrder.push('lock acquired') + + // Nested async operation that fails + const nestedOperation = async (): Promise => { + await new Promise((resolve) => setTimeout(resolve, 10)) + throw new Error('Nested operation failed') + } + + // This should throw + const result = await nestedOperation() + executionOrder.push(`got result: ${result}`) // Should not be reached + } catch (error) { + executionOrder.push('caught nested error') + } finally { + executionOrder.push('releasing lock') + release() + } + } + + await asyncFunction() + + // Another lock on the same key should work + const secondOp = async (): Promise => { + const release = await mutex.lock(lockKey) + try { + executionOrder.push('second lock acquired') + } finally { + release() + } + } + + await secondOp() + + expect(executionOrder).toEqual(['lock acquired', 'caught nested error', 'releasing lock', 'second lock acquired']) + }) + + it('should handle errors thrown in finally blocks within lock', async () => { + const lockKey = 'finally-error' + const executionOrder: string[] = [] + + // Function that will throw in a finally block + const problematicFunction = async (): Promise => { + const release = await mutex.lock(lockKey) + try { + executionOrder.push('lock acquired') + throw new Error('Initial error') + } catch (error) { + executionOrder.push('caught initial error') + } finally { + executionOrder.push('in finally block') + + // This is bad practice but we need to test it works + try { + // Throw in finally - could suppress the original error if not caught + // eslint-disable-next-line no-unsafe-finally + throw new Error('Error in finally') + } catch (finallyError) { + executionOrder.push('caught finally error') + } + + // Still need to release the lock + release() + executionOrder.push('lock released') + } + } + + await problematicFunction() + + // Verify we can still get the lock + const secondOp = async (): Promise => { + const release = await mutex.lock(lockKey) + executionOrder.push('second lock acquired') + release() + } + + await secondOp() + + expect(executionOrder).toEqual([ + 'lock acquired', + 'caught initial error', + 'in finally block', + 'caught finally error', + 'lock released', + 'second lock acquired' + ]) + }) }) diff --git a/services/mail/mail-common/src/mutex.ts b/services/mail/mail-common/src/mutex.ts index 246154a04b..3aa4d14e00 100644 --- a/services/mail/mail-common/src/mutex.ts +++ b/services/mail/mail-common/src/mutex.ts @@ -12,9 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. // + +interface LockRequest { + promise: Promise + resolve: () => void +} export class SyncMutex { // Queue of pending promises for each lock key - private readonly locks = new Map, resolve: () => void }>>() + private readonly locks = new Map>() async lock (key: string): Promise<() => void> { // Initialize queue if it doesn't exist @@ -22,7 +27,7 @@ export class SyncMutex { this.locks.set(key, []) } - const queue = this.locks.get(key) ?? [] + const queue = this.locks.get(key) as Array // Create a new lock request let releaseFn!: () => void @@ -44,11 +49,7 @@ export class SyncMutex { // If there are more locks in the queue, resolve the next one if (queue.length > 0) { - try { - queue[0].resolve() - } catch (error) { - console.error(`Error resolving next lock in queue for key "${key}":`, error) - } + queue[0].resolve() } // If queue is empty, clean up @@ -56,11 +57,7 @@ export class SyncMutex { this.locks.delete(key) } - try { - resolve() - } catch (error) { - console.error(`Error resolving lock release for key "${key}":`, error) - } + resolve() } })