diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index edbabf6767..20a1577a80 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -352,10 +352,8 @@ export function devTool ( const coreWsInfo = flattenStatus(wsInfo) const accountClient = getAccountClient(getToolToken()) - const wsProducer = getPlatformQueue('tool', cmd.region).createProducer( - toolCtx, - QueueTopic.Workspace - ) + const queue = getPlatformQueue('tool', cmd.region) + const wsProducer = queue.getProducer(toolCtx, QueueTopic.Workspace) await createWorkspace( measureCtx, @@ -377,9 +375,8 @@ export function devTool ( }) await wsProducer.send(res.workspaceUuid, [workspaceEvents.created()]) - await wsProducer.close() - - console.log('create-workspace done') + await queue.shutdown() + console.log(queue) }) }) @@ -435,10 +432,8 @@ export function devTool ( const coreWsInfo = flattenStatus(wsInfo) const measureCtx = new MeasureMetricsContext('upgrade-workspace', {}) const accountClient = getAccountClient(getToolToken(wsInfo.uuid)) - const wsProducer = getPlatformQueue('tool', info.region).createProducer( - toolCtx, - QueueTopic.Workspace - ) + const queue = getPlatformQueue('tool', info.region) + const wsProducer = queue.getProducer(toolCtx, QueueTopic.Workspace) await upgradeWorkspace( measureCtx, version, @@ -464,7 +459,7 @@ export function devTool ( console.log(metricsToString(measureCtx.metrics, 'upgrade', 60)) await wsProducer.send(info.uuid, [workspaceEvents.upgraded()]) - await wsProducer.close() + await queue.shutdown() console.log('upgrade-workspace done') }) }) @@ -1144,12 +1139,10 @@ export function devTool ( storageAdapter: workspaceStorage, historyFile: cmd.historyFile }) - const wsProducer = getPlatformQueue('tool', ws.region).createProducer( - toolCtx, - QueueTopic.Workspace - ) + const queue = getPlatformQueue('tool', ws.region) + const wsProducer = queue.getProducer(toolCtx, QueueTopic.Workspace) await wsProducer.send(ws.uuid, [workspaceEvents.fullReindex()]) - await wsProducer.close() + await queue.shutdown() await workspaceStorage?.close() }) } @@ -2092,12 +2085,10 @@ export function devTool ( } console.log('reindex workspace', workspace) - const wsProducer = getPlatformQueue('tool', ws.region).createProducer( - toolCtx, - QueueTopic.Workspace - ) + const queue = getPlatformQueue('tool', ws.region) + const wsProducer = queue.getProducer(toolCtx, QueueTopic.Workspace) await wsProducer.send(ws.uuid, [workspaceEvents.fullReindex()]) - await wsProducer.close() + await queue.shutdown() console.log('done', workspace) }) }) diff --git a/pods/fulltext/src/__tests__/indexing.spec.ts b/pods/fulltext/src/__tests__/indexing.spec.ts index c8dd03b0d8..7b9b99d7f7 100644 --- a/pods/fulltext/src/__tests__/indexing.spec.ts +++ b/pods/fulltext/src/__tests__/indexing.spec.ts @@ -127,7 +127,7 @@ describe('full-text-indexing', () => { const queue = new TestQueue(toolCtx) await queue.start() try { - const txProducer = queue.queue.createProducer(toolCtx, QueueTopic.Tx) + const txProducer = queue.queue.getProducer(toolCtx, QueueTopic.Tx) const personId = randomUUID().toString() as PersonUuid const wsId: WorkspaceUuid = randomUUID().toString() as WorkspaceUuid const token = generateToken(personId, wsId) @@ -184,7 +184,7 @@ describe('full-text-indexing', () => { const queue = new TestQueue(toolCtx) await queue.start() const { pipeline, wsIds } = await preparePipeline(toolCtx, queue.queue, false) // Do not use broadcast - const wsProcessor = queue.queue.createProducer(toolCtx, QueueTopic.Workspace) + const wsProcessor = queue.queue.getProducer(toolCtx, QueueTopic.Workspace) try { const pipelineClient = wrapPipeline(toolCtx, pipeline, wsIds) diff --git a/pods/fulltext/src/manager.ts b/pods/fulltext/src/manager.ts index 1bc9f78401..3acfe152c1 100644 --- a/pods/fulltext/src/manager.ts +++ b/pods/fulltext/src/manager.ts @@ -51,7 +51,7 @@ export class WorkspaceManager { this.sysHierarchy.tx(tx) } - this.workspaceProducer = this.opt.queue.createProducer(this.ctx, QueueTopic.Workspace) + this.workspaceProducer = this.opt.queue.getProducer(this.ctx, QueueTopic.Workspace) } shutdownInterval: any diff --git a/server-plugins/calendar-resources/src/index.ts b/server-plugins/calendar-resources/src/index.ts index 61e3d66e78..348ca15173 100644 --- a/server-plugins/calendar-resources/src/index.ts +++ b/server-plugins/calendar-resources/src/index.ts @@ -364,19 +364,16 @@ async function putEventToQueue ( modifiedBy: PersonId, changes?: Record ): Promise { - if (control.queue === undefined) { - return - } - const producer = control.queue.createProducer( + if (control.queue === undefined) return + const producer = control.queue.getProducer( control.ctx.newChild('queue', {}), QueueTopic.CalendarEventCUD ) + try { await producer.send(control.workspace.uuid, [{ action, event, modifiedBy, changes }]) } catch (err) { control.ctx.error('Could not queue calendar event', { err, action, event }) - } finally { - await producer.close() } } diff --git a/server-plugins/telegram-resources/src/index.ts b/server-plugins/telegram-resources/src/index.ts index 78e1c87729..49209a2454 100644 --- a/server-plugins/telegram-resources/src/index.ts +++ b/server-plugins/telegram-resources/src/index.ts @@ -260,11 +260,7 @@ function hasAttachments (doc: ActivityMessage | undefined, hierarchy: Hierarchy) const telegramNotificationCacheKey = 'telegram.notification.cache' async function NotificationsHandler (txes: TxCreateDoc[], control: TriggerControl): Promise { - const queue = control.queue - - if (queue === undefined) { - return [] - } + if (control.queue === undefined) return [] const availableProviders: AvailableProvidersCache = control.contextCache.get(AvailableProvidersCacheKey) ?? new Map() @@ -280,14 +276,11 @@ async function NotificationsHandler (txes: TxCreateDoc[], con } const result: Tx[] = [] - const producer = queue.createProducer(control.ctx, QueueTopic.TelegramBot) - try { - for (const inboxNotification of all) { - result.push(...(await processNotification(inboxNotification, control, producer))) - } - } finally { - await producer.close() + const producer = control.queue.getProducer(control.ctx, QueueTopic.TelegramBot) + for (const inboxNotification of all) { + result.push(...(await processNotification(inboxNotification, control, producer))) } + return result } @@ -396,45 +389,36 @@ async function ProviderSettingsHandler ( txes: TxCUD[], control: TriggerControl ): Promise { - const queue = control.queue + if (control.queue === undefined) return [] + const producer = control.queue.getProducer(control.ctx, QueueTopic.TelegramBot) - if (queue === undefined) { - return [] - } + for (const tx of txes) { + if (tx._class === core.class.TxCreateDoc) { + const createTx = tx as TxCreateDoc + const setting = TxProcessor.createDoc2Doc(createTx) - const producer = queue.createProducer(control.ctx, QueueTopic.TelegramBot) + if (setting.attachedTo === telegram.providers.TelegramNotificationProvider) { + await updateWorkspaceSubscription(producer, setting.enabled, setting.createdBy ?? setting.modifiedBy, control) + } + } else if (tx._class === core.class.TxUpdateDoc) { + const updateTx = tx as TxUpdateDoc + if (updateTx.operations.enabled !== undefined) { + const setting = ( + await control.findAll(control.ctx, notification.class.NotificationProviderSetting, { + _id: updateTx.objectId + }) + )[0] - try { - for (const tx of txes) { - if (tx._class === core.class.TxCreateDoc) { - const createTx = tx as TxCreateDoc - const setting = TxProcessor.createDoc2Doc(createTx) - - if (setting.attachedTo === telegram.providers.TelegramNotificationProvider) { - await updateWorkspaceSubscription(producer, setting.enabled, setting.createdBy ?? setting.modifiedBy, control) - } - } else if (tx._class === core.class.TxUpdateDoc) { - const updateTx = tx as TxUpdateDoc - if (updateTx.operations.enabled !== undefined) { - const setting = ( - await control.findAll(control.ctx, notification.class.NotificationProviderSetting, { - _id: updateTx.objectId - }) - )[0] - - if (setting !== undefined && setting.attachedTo === telegram.providers.TelegramNotificationProvider) { - await updateWorkspaceSubscription( - producer, - updateTx.operations.enabled, - setting.createdBy ?? setting.modifiedBy, - control - ) - } + if (setting !== undefined && setting.attachedTo === telegram.providers.TelegramNotificationProvider) { + await updateWorkspaceSubscription( + producer, + updateTx.operations.enabled, + setting.createdBy ?? setting.modifiedBy, + control + ) } } } - } finally { - await producer.close() } return [] diff --git a/server/core/src/queue/dummyQueue.ts b/server/core/src/queue/dummyQueue.ts index c9cd1f0223..e9cbca2810 100644 --- a/server/core/src/queue/dummyQueue.ts +++ b/server/core/src/queue/dummyQueue.ts @@ -22,7 +22,7 @@ class DummyQueueProducer implements PlatformQueueProducer { * A dummy implementation of PlatformQueue for testing and development */ export class DummyQueue implements PlatformQueue { - createProducer(ctx: MeasureContext, topic: QueueTopic | string): PlatformQueueProducer { + getProducer(ctx: MeasureContext, topic: QueueTopic | string): PlatformQueueProducer { return new DummyQueueProducer() } diff --git a/server/core/src/queue/types.ts b/server/core/src/queue/types.ts index c51d644314..fbdeea8072 100644 --- a/server/core/src/queue/types.ts +++ b/server/core/src/queue/types.ts @@ -35,7 +35,7 @@ export interface ConsumerControl { } export interface PlatformQueue { - createProducer: (ctx: MeasureContext, topic: QueueTopic | string) => PlatformQueueProducer + getProducer: (ctx: MeasureContext, topic: QueueTopic | string) => PlatformQueueProducer /** * Create a consumer for a topic. diff --git a/server/core/src/types.ts b/server/core/src/types.ts index f596f01e8c..d40405084a 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -64,7 +64,7 @@ import { type Readable } from 'stream' import type { DbAdapter, DomainHelper } from './adapter' import type { StatisticsElement, WorkspaceStatistics } from './stats' import { type StorageAdapter } from './storage' -import { type PlatformQueue } from './queue' +import { type PlatformQueueProducer, type QueueTopic, type PlatformQueue } from './queue' export interface ServerFindOptions extends FindOptions { domain?: Domain // Allow to find for Doc's in specified domain only. @@ -195,6 +195,7 @@ export interface PipelineContext { lowLevelStorage?: LowLevelStorage liveQuery?: LiveQuery queue?: PlatformQueue + queueProducers?: Map> // Entry point for derived data procvessing derived?: Middleware diff --git a/server/kafka/src/__test__/queue.spec.ts b/server/kafka/src/__test__/queue.spec.ts index f3bdaf07be..ece022f836 100644 --- a/server/kafka/src/__test__/queue.spec.ts +++ b/server/kafka/src/__test__/queue.spec.ts @@ -24,7 +24,7 @@ describe('queue', () => { }) }) - const producer = queue.createProducer(testCtx, 'qtest') + const producer = queue.getProducer(testCtx, 'qtest') for (let i = 0; i < docsCount; i++) { await producer.send(genId, ['msg' + i]) } @@ -54,7 +54,7 @@ describe('queue', () => { }) }) - const producer = queue.createProducer(testCtx, 'test') + const producer = queue.getProducer(testCtx, 'test') await producer.send(genId, ['msg']) await p diff --git a/server/kafka/src/index.ts b/server/kafka/src/index.ts index 04474ef5f0..816a57224b 100644 --- a/server/kafka/src/index.ts +++ b/server/kafka/src/index.ts @@ -57,7 +57,7 @@ function getKafkaTopicId (topic: QueueTopic | string, config: QueueConfig): stri class PlatformQueueImpl implements PlatformQueue { consumers: ConsumerHandle[] = [] - producers: PlatformQueueProducerImpl[] = [] + producers = new Map() constructor ( private readonly kafka: Kafka, readonly config: QueueConfig @@ -68,7 +68,7 @@ class PlatformQueueImpl implements PlatformQueue { } async shutdown (): Promise { - for (const p of this.producers) { + for (const [, p] of this.producers) { try { await p.close() } catch (err: any) { @@ -84,10 +84,14 @@ class PlatformQueueImpl implements PlatformQueue { } } - createProducer(ctx: MeasureContext, topic: QueueTopic | string): PlatformQueueProducer { - const result = new PlatformQueueProducerImpl(ctx, this.kafka, getKafkaTopicId(topic, this.config), this) - this.producers.push(result) - return result + getProducer(ctx: MeasureContext, topic: QueueTopic | string): PlatformQueueProducer { + const producer = this.producers.get(topic) + if (producer !== undefined && !producer.isClosed()) return producer + + const created = new PlatformQueueProducerImpl(ctx, this.kafka, getKafkaTopicId(topic, this.config), this) + this.producers.set(topic, created) + + return created } createConsumer( @@ -152,6 +156,8 @@ class PlatformQueueImpl implements PlatformQueue { class PlatformQueueProducerImpl implements PlatformQueueProducer { txProducer: Producer connected: Promise | undefined + private closed = false + constructor ( readonly ctx: MeasureContext, kafka: Kafka, @@ -185,7 +191,12 @@ class PlatformQueueProducerImpl implements PlatformQueueProducer { ) } + isClosed (): boolean { + return this.closed + } + async close (): Promise { + this.closed = true await this.ctx.with('disconnect', {}, () => this.txProducer.disconnect()) } } diff --git a/server/middleware/src/queue.ts b/server/middleware/src/queue.ts index 94e311086f..588bb9a943 100644 --- a/server/middleware/src/queue.ts +++ b/server/middleware/src/queue.ts @@ -34,7 +34,7 @@ export class QueueMiddleware extends BaseMiddleware { readonly queue: PlatformQueue ) { super(context, next) - this.txProducer = queue.createProducer(ctx, QueueTopic.Tx) + this.txProducer = queue.getProducer(ctx, QueueTopic.Tx) } static create (queue: PlatformQueue): MiddlewareCreator { @@ -53,8 +53,4 @@ export class QueueMiddleware extends BaseMiddleware { this.txProducer.send(this.context.workspace.uuid, ctx.contextData.broadcast.txes) ]) } - - async close (): Promise { - await this.txProducer.close() - } } diff --git a/server/server/src/sessionManager.ts b/server/server/src/sessionManager.ts index 5cf7ee8720..a151af7b6b 100644 --- a/server/server/src/sessionManager.ts +++ b/server/server/src/sessionManager.ts @@ -133,8 +133,8 @@ export class TSessionManager implements SessionManager { this.handleTick() }, 1000 / ticksPerSecond) } - this.workspaceProducer = this.queue.createProducer(ctx.newChild('queue', {}), QueueTopic.Workspace) - this.usersProducer = this.queue.createProducer(ctx.newChild('queue', {}), QueueTopic.Users) + this.workspaceProducer = this.queue.getProducer(ctx.newChild('queue', {}), QueueTopic.Workspace) + this.usersProducer = this.queue.getProducer(ctx.newChild('queue', {}), QueueTopic.Users) } scheduleMaintenance (timeMinutes: number): void { diff --git a/server/workspace-service/src/index.ts b/server/workspace-service/src/index.ts index fa5c926bd8..9c1cf558cc 100644 --- a/server/workspace-service/src/index.ts +++ b/server/workspace-service/src/index.ts @@ -123,7 +123,7 @@ export function serveWorkspaceAccount ( let canceled = false - const wsProducer = queue.createProducer(measureCtx, QueueTopic.Workspace) + const wsProducer = queue.getProducer(measureCtx, QueueTopic.Workspace) const worker = new WorkspaceWorker( wsProducer, version, @@ -158,7 +158,6 @@ export function serveWorkspaceAccount ( const close = (): void => { canceled = true - void wsProducer.close() void queue.shutdown() onClose?.() }