Create queue producers in pipeline phase (#8643)

This commit is contained in:
Kristina 2025-04-22 05:47:33 +04:00 committed by GitHub
parent 7721300186
commit a1c74384ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 75 additions and 96 deletions

View File

@ -352,10 +352,8 @@ export function devTool (
const coreWsInfo = flattenStatus(wsInfo) const coreWsInfo = flattenStatus(wsInfo)
const accountClient = getAccountClient(getToolToken()) const accountClient = getAccountClient(getToolToken())
const wsProducer = getPlatformQueue('tool', cmd.region).createProducer<QueueWorkspaceMessage>( const queue = getPlatformQueue('tool', cmd.region)
toolCtx, const wsProducer = queue.getProducer<QueueWorkspaceMessage>(toolCtx, QueueTopic.Workspace)
QueueTopic.Workspace
)
await createWorkspace( await createWorkspace(
measureCtx, measureCtx,
@ -377,9 +375,8 @@ export function devTool (
}) })
await wsProducer.send(res.workspaceUuid, [workspaceEvents.created()]) await wsProducer.send(res.workspaceUuid, [workspaceEvents.created()])
await wsProducer.close() await queue.shutdown()
console.log(queue)
console.log('create-workspace done')
}) })
}) })
@ -435,10 +432,8 @@ export function devTool (
const coreWsInfo = flattenStatus(wsInfo) const coreWsInfo = flattenStatus(wsInfo)
const measureCtx = new MeasureMetricsContext('upgrade-workspace', {}) const measureCtx = new MeasureMetricsContext('upgrade-workspace', {})
const accountClient = getAccountClient(getToolToken(wsInfo.uuid)) const accountClient = getAccountClient(getToolToken(wsInfo.uuid))
const wsProducer = getPlatformQueue('tool', info.region).createProducer<QueueWorkspaceMessage>( const queue = getPlatformQueue('tool', info.region)
toolCtx, const wsProducer = queue.getProducer<QueueWorkspaceMessage>(toolCtx, QueueTopic.Workspace)
QueueTopic.Workspace
)
await upgradeWorkspace( await upgradeWorkspace(
measureCtx, measureCtx,
version, version,
@ -464,7 +459,7 @@ export function devTool (
console.log(metricsToString(measureCtx.metrics, 'upgrade', 60)) console.log(metricsToString(measureCtx.metrics, 'upgrade', 60))
await wsProducer.send(info.uuid, [workspaceEvents.upgraded()]) await wsProducer.send(info.uuid, [workspaceEvents.upgraded()])
await wsProducer.close() await queue.shutdown()
console.log('upgrade-workspace done') console.log('upgrade-workspace done')
}) })
}) })
@ -1144,12 +1139,10 @@ export function devTool (
storageAdapter: workspaceStorage, storageAdapter: workspaceStorage,
historyFile: cmd.historyFile historyFile: cmd.historyFile
}) })
const wsProducer = getPlatformQueue('tool', ws.region).createProducer<QueueWorkspaceMessage>( const queue = getPlatformQueue('tool', ws.region)
toolCtx, const wsProducer = queue.getProducer<QueueWorkspaceMessage>(toolCtx, QueueTopic.Workspace)
QueueTopic.Workspace
)
await wsProducer.send(ws.uuid, [workspaceEvents.fullReindex()]) await wsProducer.send(ws.uuid, [workspaceEvents.fullReindex()])
await wsProducer.close() await queue.shutdown()
await workspaceStorage?.close() await workspaceStorage?.close()
}) })
} }
@ -2092,12 +2085,10 @@ export function devTool (
} }
console.log('reindex workspace', workspace) console.log('reindex workspace', workspace)
const wsProducer = getPlatformQueue('tool', ws.region).createProducer<QueueWorkspaceMessage>( const queue = getPlatformQueue('tool', ws.region)
toolCtx, const wsProducer = queue.getProducer<QueueWorkspaceMessage>(toolCtx, QueueTopic.Workspace)
QueueTopic.Workspace
)
await wsProducer.send(ws.uuid, [workspaceEvents.fullReindex()]) await wsProducer.send(ws.uuid, [workspaceEvents.fullReindex()])
await wsProducer.close() await queue.shutdown()
console.log('done', workspace) console.log('done', workspace)
}) })
}) })

View File

@ -127,7 +127,7 @@ describe('full-text-indexing', () => {
const queue = new TestQueue(toolCtx) const queue = new TestQueue(toolCtx)
await queue.start() await queue.start()
try { try {
const txProducer = queue.queue.createProducer<Tx>(toolCtx, QueueTopic.Tx) const txProducer = queue.queue.getProducer<Tx>(toolCtx, QueueTopic.Tx)
const personId = randomUUID().toString() as PersonUuid const personId = randomUUID().toString() as PersonUuid
const wsId: WorkspaceUuid = randomUUID().toString() as WorkspaceUuid const wsId: WorkspaceUuid = randomUUID().toString() as WorkspaceUuid
const token = generateToken(personId, wsId) const token = generateToken(personId, wsId)
@ -184,7 +184,7 @@ describe('full-text-indexing', () => {
const queue = new TestQueue(toolCtx) const queue = new TestQueue(toolCtx)
await queue.start() await queue.start()
const { pipeline, wsIds } = await preparePipeline(toolCtx, queue.queue, false) // Do not use broadcast const { pipeline, wsIds } = await preparePipeline(toolCtx, queue.queue, false) // Do not use broadcast
const wsProcessor = queue.queue.createProducer<QueueWorkspaceMessage>(toolCtx, QueueTopic.Workspace) const wsProcessor = queue.queue.getProducer<QueueWorkspaceMessage>(toolCtx, QueueTopic.Workspace)
try { try {
const pipelineClient = wrapPipeline(toolCtx, pipeline, wsIds) const pipelineClient = wrapPipeline(toolCtx, pipeline, wsIds)

View File

@ -51,7 +51,7 @@ export class WorkspaceManager {
this.sysHierarchy.tx(tx) this.sysHierarchy.tx(tx)
} }
this.workspaceProducer = this.opt.queue.createProducer<QueueWorkspaceMessage>(this.ctx, QueueTopic.Workspace) this.workspaceProducer = this.opt.queue.getProducer<QueueWorkspaceMessage>(this.ctx, QueueTopic.Workspace)
} }
shutdownInterval: any shutdownInterval: any

View File

@ -364,19 +364,16 @@ async function putEventToQueue (
modifiedBy: PersonId, modifiedBy: PersonId,
changes?: Record<string, any> changes?: Record<string, any>
): Promise<void> { ): Promise<void> {
if (control.queue === undefined) { if (control.queue === undefined) return
return const producer = control.queue.getProducer<EventCUDMessage>(
}
const producer = control.queue.createProducer<EventCUDMessage>(
control.ctx.newChild('queue', {}), control.ctx.newChild('queue', {}),
QueueTopic.CalendarEventCUD QueueTopic.CalendarEventCUD
) )
try { try {
await producer.send(control.workspace.uuid, [{ action, event, modifiedBy, changes }]) await producer.send(control.workspace.uuid, [{ action, event, modifiedBy, changes }])
} catch (err) { } catch (err) {
control.ctx.error('Could not queue calendar event', { err, action, event }) control.ctx.error('Could not queue calendar event', { err, action, event })
} finally {
await producer.close()
} }
} }

View File

@ -260,11 +260,7 @@ function hasAttachments (doc: ActivityMessage | undefined, hierarchy: Hierarchy)
const telegramNotificationCacheKey = 'telegram.notification.cache' const telegramNotificationCacheKey = 'telegram.notification.cache'
async function NotificationsHandler (txes: TxCreateDoc<InboxNotification>[], control: TriggerControl): Promise<Tx[]> { async function NotificationsHandler (txes: TxCreateDoc<InboxNotification>[], control: TriggerControl): Promise<Tx[]> {
const queue = control.queue if (control.queue === undefined) return []
if (queue === undefined) {
return []
}
const availableProviders: AvailableProvidersCache = control.contextCache.get(AvailableProvidersCacheKey) ?? new Map() const availableProviders: AvailableProvidersCache = control.contextCache.get(AvailableProvidersCacheKey) ?? new Map()
@ -280,14 +276,11 @@ async function NotificationsHandler (txes: TxCreateDoc<InboxNotification>[], con
} }
const result: Tx[] = [] const result: Tx[] = []
const producer = queue.createProducer(control.ctx, QueueTopic.TelegramBot) const producer = control.queue.getProducer<TelegramQueueMessage>(control.ctx, QueueTopic.TelegramBot)
try { for (const inboxNotification of all) {
for (const inboxNotification of all) { result.push(...(await processNotification(inboxNotification, control, producer)))
result.push(...(await processNotification(inboxNotification, control, producer)))
}
} finally {
await producer.close()
} }
return result return result
} }
@ -396,45 +389,36 @@ async function ProviderSettingsHandler (
txes: TxCUD<NotificationProviderSetting>[], txes: TxCUD<NotificationProviderSetting>[],
control: TriggerControl control: TriggerControl
): Promise<Tx[]> { ): Promise<Tx[]> {
const queue = control.queue if (control.queue === undefined) return []
const producer = control.queue.getProducer<TelegramQueueMessage>(control.ctx, QueueTopic.TelegramBot)
if (queue === undefined) { for (const tx of txes) {
return [] if (tx._class === core.class.TxCreateDoc) {
} const createTx = tx as TxCreateDoc<NotificationProviderSetting>
const setting = TxProcessor.createDoc2Doc(createTx)
const producer = queue.createProducer(control.ctx, QueueTopic.TelegramBot) if (setting.attachedTo === telegram.providers.TelegramNotificationProvider) {
await updateWorkspaceSubscription(producer, setting.enabled, setting.createdBy ?? setting.modifiedBy, control)
}
} else if (tx._class === core.class.TxUpdateDoc) {
const updateTx = tx as TxUpdateDoc<NotificationProviderSetting>
if (updateTx.operations.enabled !== undefined) {
const setting = (
await control.findAll(control.ctx, notification.class.NotificationProviderSetting, {
_id: updateTx.objectId
})
)[0]
try { if (setting !== undefined && setting.attachedTo === telegram.providers.TelegramNotificationProvider) {
for (const tx of txes) { await updateWorkspaceSubscription(
if (tx._class === core.class.TxCreateDoc) { producer,
const createTx = tx as TxCreateDoc<NotificationProviderSetting> updateTx.operations.enabled,
const setting = TxProcessor.createDoc2Doc(createTx) setting.createdBy ?? setting.modifiedBy,
control
if (setting.attachedTo === telegram.providers.TelegramNotificationProvider) { )
await updateWorkspaceSubscription(producer, setting.enabled, setting.createdBy ?? setting.modifiedBy, control)
}
} else if (tx._class === core.class.TxUpdateDoc) {
const updateTx = tx as TxUpdateDoc<NotificationProviderSetting>
if (updateTx.operations.enabled !== undefined) {
const setting = (
await control.findAll(control.ctx, notification.class.NotificationProviderSetting, {
_id: updateTx.objectId
})
)[0]
if (setting !== undefined && setting.attachedTo === telegram.providers.TelegramNotificationProvider) {
await updateWorkspaceSubscription(
producer,
updateTx.operations.enabled,
setting.createdBy ?? setting.modifiedBy,
control
)
}
} }
} }
} }
} finally {
await producer.close()
} }
return [] return []

View File

@ -22,7 +22,7 @@ class DummyQueueProducer<T> implements PlatformQueueProducer<T> {
* A dummy implementation of PlatformQueue for testing and development * A dummy implementation of PlatformQueue for testing and development
*/ */
export class DummyQueue implements PlatformQueue { export class DummyQueue implements PlatformQueue {
createProducer<T>(ctx: MeasureContext, topic: QueueTopic | string): PlatformQueueProducer<T> { getProducer<T>(ctx: MeasureContext, topic: QueueTopic | string): PlatformQueueProducer<T> {
return new DummyQueueProducer<T>() return new DummyQueueProducer<T>()
} }

View File

@ -35,7 +35,7 @@ export interface ConsumerControl {
} }
export interface PlatformQueue { export interface PlatformQueue {
createProducer: <T>(ctx: MeasureContext, topic: QueueTopic | string) => PlatformQueueProducer<T> getProducer: <T>(ctx: MeasureContext, topic: QueueTopic | string) => PlatformQueueProducer<T>
/** /**
* Create a consumer for a topic. * Create a consumer for a topic.

View File

@ -64,7 +64,7 @@ import { type Readable } from 'stream'
import type { DbAdapter, DomainHelper } from './adapter' import type { DbAdapter, DomainHelper } from './adapter'
import type { StatisticsElement, WorkspaceStatistics } from './stats' import type { StatisticsElement, WorkspaceStatistics } from './stats'
import { type StorageAdapter } from './storage' import { type StorageAdapter } from './storage'
import { type PlatformQueue } from './queue' import { type PlatformQueueProducer, type QueueTopic, type PlatformQueue } from './queue'
export interface ServerFindOptions<T extends Doc> extends FindOptions<T> { export interface ServerFindOptions<T extends Doc> extends FindOptions<T> {
domain?: Domain // Allow to find for Doc's in specified domain only. domain?: Domain // Allow to find for Doc's in specified domain only.
@ -195,6 +195,7 @@ export interface PipelineContext {
lowLevelStorage?: LowLevelStorage lowLevelStorage?: LowLevelStorage
liveQuery?: LiveQuery liveQuery?: LiveQuery
queue?: PlatformQueue queue?: PlatformQueue
queueProducers?: Map<QueueTopic, PlatformQueueProducer<any>>
// Entry point for derived data procvessing // Entry point for derived data procvessing
derived?: Middleware derived?: Middleware

View File

@ -24,7 +24,7 @@ describe('queue', () => {
}) })
}) })
const producer = queue.createProducer<string>(testCtx, 'qtest') const producer = queue.getProducer<string>(testCtx, 'qtest')
for (let i = 0; i < docsCount; i++) { for (let i = 0; i < docsCount; i++) {
await producer.send(genId, ['msg' + i]) await producer.send(genId, ['msg' + i])
} }
@ -54,7 +54,7 @@ describe('queue', () => {
}) })
}) })
const producer = queue.createProducer<string>(testCtx, 'test') const producer = queue.getProducer<string>(testCtx, 'test')
await producer.send(genId, ['msg']) await producer.send(genId, ['msg'])
await p await p

View File

@ -57,7 +57,7 @@ function getKafkaTopicId (topic: QueueTopic | string, config: QueueConfig): stri
class PlatformQueueImpl implements PlatformQueue { class PlatformQueueImpl implements PlatformQueue {
consumers: ConsumerHandle[] = [] consumers: ConsumerHandle[] = []
producers: PlatformQueueProducerImpl[] = [] producers = new Map<QueueTopic | string, PlatformQueueProducerImpl>()
constructor ( constructor (
private readonly kafka: Kafka, private readonly kafka: Kafka,
readonly config: QueueConfig readonly config: QueueConfig
@ -68,7 +68,7 @@ class PlatformQueueImpl implements PlatformQueue {
} }
async shutdown (): Promise<void> { async shutdown (): Promise<void> {
for (const p of this.producers) { for (const [, p] of this.producers) {
try { try {
await p.close() await p.close()
} catch (err: any) { } catch (err: any) {
@ -84,10 +84,14 @@ class PlatformQueueImpl implements PlatformQueue {
} }
} }
createProducer<T>(ctx: MeasureContext, topic: QueueTopic | string): PlatformQueueProducer<T> { getProducer<T>(ctx: MeasureContext, topic: QueueTopic | string): PlatformQueueProducer<T> {
const result = new PlatformQueueProducerImpl(ctx, this.kafka, getKafkaTopicId(topic, this.config), this) const producer = this.producers.get(topic)
this.producers.push(result) if (producer !== undefined && !producer.isClosed()) return producer
return result
const created = new PlatformQueueProducerImpl(ctx, this.kafka, getKafkaTopicId(topic, this.config), this)
this.producers.set(topic, created)
return created
} }
createConsumer<T>( createConsumer<T>(
@ -152,6 +156,8 @@ class PlatformQueueImpl implements PlatformQueue {
class PlatformQueueProducerImpl implements PlatformQueueProducer<any> { class PlatformQueueProducerImpl implements PlatformQueueProducer<any> {
txProducer: Producer txProducer: Producer
connected: Promise<void> | undefined connected: Promise<void> | undefined
private closed = false
constructor ( constructor (
readonly ctx: MeasureContext, readonly ctx: MeasureContext,
kafka: Kafka, kafka: Kafka,
@ -185,7 +191,12 @@ class PlatformQueueProducerImpl implements PlatformQueueProducer<any> {
) )
} }
isClosed (): boolean {
return this.closed
}
async close (): Promise<void> { async close (): Promise<void> {
this.closed = true
await this.ctx.with('disconnect', {}, () => this.txProducer.disconnect()) await this.ctx.with('disconnect', {}, () => this.txProducer.disconnect())
} }
} }

View File

@ -34,7 +34,7 @@ export class QueueMiddleware extends BaseMiddleware {
readonly queue: PlatformQueue readonly queue: PlatformQueue
) { ) {
super(context, next) super(context, next)
this.txProducer = queue.createProducer<Tx>(ctx, QueueTopic.Tx) this.txProducer = queue.getProducer<Tx>(ctx, QueueTopic.Tx)
} }
static create (queue: PlatformQueue): MiddlewareCreator { static create (queue: PlatformQueue): MiddlewareCreator {
@ -53,8 +53,4 @@ export class QueueMiddleware extends BaseMiddleware {
this.txProducer.send(this.context.workspace.uuid, ctx.contextData.broadcast.txes) this.txProducer.send(this.context.workspace.uuid, ctx.contextData.broadcast.txes)
]) ])
} }
async close (): Promise<void> {
await this.txProducer.close()
}
} }

View File

@ -133,8 +133,8 @@ export class TSessionManager implements SessionManager {
this.handleTick() this.handleTick()
}, 1000 / ticksPerSecond) }, 1000 / ticksPerSecond)
} }
this.workspaceProducer = this.queue.createProducer(ctx.newChild('queue', {}), QueueTopic.Workspace) this.workspaceProducer = this.queue.getProducer(ctx.newChild('queue', {}), QueueTopic.Workspace)
this.usersProducer = this.queue.createProducer(ctx.newChild('queue', {}), QueueTopic.Users) this.usersProducer = this.queue.getProducer(ctx.newChild('queue', {}), QueueTopic.Users)
} }
scheduleMaintenance (timeMinutes: number): void { scheduleMaintenance (timeMinutes: number): void {

View File

@ -123,7 +123,7 @@ export function serveWorkspaceAccount (
let canceled = false let canceled = false
const wsProducer = queue.createProducer<QueueWorkspaceMessage>(measureCtx, QueueTopic.Workspace) const wsProducer = queue.getProducer<QueueWorkspaceMessage>(measureCtx, QueueTopic.Workspace)
const worker = new WorkspaceWorker( const worker = new WorkspaceWorker(
wsProducer, wsProducer,
version, version,
@ -158,7 +158,6 @@ export function serveWorkspaceAccount (
const close = (): void => { const close = (): void => {
canceled = true canceled = true
void wsProducer.close()
void queue.shutdown() void queue.shutdown()
onClose?.() onClose?.()
} }