From f8a8c94bc2a0e1e840acfd2706c4f2304ea8988a Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Tue, 28 Jan 2025 00:36:37 +0700 Subject: [PATCH] UBERF-9224: Use context variables to hold context data (#7754) Signed-off-by: Andrey Sobolev --- dev/tool/src/db.ts | 7 ++- dev/tool/src/index.ts | 40 ++++++++++---- packages/core/src/memdb.ts | 6 +- pods/backup/src/index.ts | 26 +++++++-- pods/fulltext/src/server.ts | 25 +++++++-- pods/server/src/server.ts | 22 +++++++- server/account/src/operations.ts | 4 +- server/account/src/utils.ts | 3 +- server/backup-service/src/index.ts | 10 +++- server/backup/src/service.ts | 17 +++++- server/core/src/adapter.ts | 10 +++- server/core/src/benchmark/index.ts | 1 + server/core/src/dbAdapterManager.ts | 4 +- server/core/src/mem.ts | 1 + server/core/src/nullAdapter.ts | 1 + server/core/src/types.ts | 2 + server/middleware/src/dbAdapter.ts | 3 +- server/middleware/src/fulltext.ts | 4 -- server/mongo/src/__tests__/storage.test.ts | 6 +- server/mongo/src/index.ts | 2 +- server/mongo/src/storage.ts | 2 + server/mongo/src/utils.ts | 9 +-- server/postgres/src/__tests__/storage.test.ts | 12 ++-- server/postgres/src/index.ts | 14 ++++- server/postgres/src/storage.ts | 55 +++++++++++++------ server/postgres/src/utils.ts | 25 +++++---- server/server-pipeline/src/blobStorage.ts | 11 +++- server/server-pipeline/src/pipeline.ts | 13 ++++- server/server/src/client.ts | 2 +- server/workspace-service/src/service.ts | 30 ++++++++-- server/workspace-service/src/ws-operations.ts | 4 +- workers/transactor/src/transactor.ts | 9 ++- 32 files changed, 278 insertions(+), 102 deletions(-) diff --git a/dev/tool/src/db.ts b/dev/tool/src/db.ts index f266f7bc01..a45ef841b3 100644 --- a/dev/tool/src/db.ts +++ b/dev/tool/src/db.ts @@ -28,6 +28,7 @@ import { } from '@hcengineering/postgres' import { type DBDoc } from '@hcengineering/postgres/types/utils' import { getTransactorEndpoint } from '@hcengineering/server-client' +import { sharedPipelineContextVars } from '@hcengineering/server-pipeline' import { generateToken } from '@hcengineering/server-token' import { connect } from '@hcengineering/server-tool' import { type MongoClient, UUID } from 'mongodb' @@ -45,7 +46,7 @@ export async function moveFromMongoToPG ( } const client = getMongoClient(mongoUrl) const mongo = await client.getClient() - const pg = getDBClient(dbUrl) + const pg = getDBClient(sharedPipelineContextVars, dbUrl) const pgClient = await pg.getClient() for (let index = 0; index < workspaces.length; index++) { @@ -168,7 +169,7 @@ export async function moveWorkspaceFromMongoToPG ( } const client = getMongoClient(mongoUrl) const mongo = await client.getClient() - const pg = getDBClient(dbUrl) + const pg = getDBClient(sharedPipelineContextVars, dbUrl) const pgClient = await pg.getClient() await moveWorkspace(accountDb, mongo, pgClient, ws, region, include, force) @@ -306,7 +307,7 @@ export async function updateDataWorkspaceIdToUuid ( throw new Error('dbUrl is required') } - const pg = getDBClient(dbUrl) + const pg = getDBClient(sharedPipelineContextVars, dbUrl) try { const pgClient = await pg.getClient() diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 4e7067d27b..1c1854c91b 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -66,10 +66,11 @@ import { registerDestroyFactory, registerServerPlugins, registerStringLoaders, - registerTxAdapterFactory + registerTxAdapterFactory, + sharedPipelineContextVars } from '@hcengineering/server-pipeline' import serverToken, { decodeToken, generateToken } from '@hcengineering/server-token' -import { FileModelLogger, buildModel } from '@hcengineering/server-tool' +import { buildModel, FileModelLogger } from '@hcengineering/server-tool' import { createWorkspace, upgradeWorkspace } from '@hcengineering/workspace-service' import path from 'path' @@ -105,12 +106,18 @@ import { createMongoTxAdapter, getMongoClient, getWorkspaceMongoDB, - shutdown + shutdownMongo } from '@hcengineering/mongo' import { backupDownload } from '@hcengineering/server-backup/src/backup' import { createDatalakeClient, CONFIG_KIND as DATALAKE_CONFIG_KIND, type DatalakeConfig } from '@hcengineering/datalake' import { getModelVersion } from '@hcengineering/model-all' +import { + createPostgreeDestroyAdapter, + createPostgresAdapter, + createPostgresTxAdapter, + shutdownPostgres +} from '@hcengineering/postgres' import { CONFIG_KIND as S3_CONFIG_KIND, S3Service, type S3Config } from '@hcengineering/s3' import type { PipelineFactory, StorageAdapter, StorageAdapterEx } from '@hcengineering/server-core' import { deepEqual } from 'fast-equals' @@ -139,17 +146,16 @@ import { import { changeConfiguration } from './configuration' import { generateUuidMissingWorkspaces, - updateDataWorkspaceIdToUuid, moveAccountDbFromMongoToPG, moveFromMongoToPG, - moveWorkspaceFromMongoToPG + moveWorkspaceFromMongoToPG, + updateDataWorkspaceIdToUuid } from './db' -import { restoreControlledDocContentMongo, restoreWikiContentMongo, restoreMarkupRefsMongo } from './markup' +import { reindexWorkspace } from './fulltext' +import { restoreControlledDocContentMongo, restoreMarkupRefsMongo, restoreWikiContentMongo } from './markup' import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin' import { fixAccountEmails, renameAccount, fillGithubUsers } from './account' import { copyToDatalake, moveFiles, showLostFiles } from './storage' -import { createPostgresTxAdapter, createPostgresAdapter, createPostgreeDestroyAdapter } from '@hcengineering/postgres' -import { reindexWorkspace } from './fulltext' const colorConstants = { colorRed: '\u001b[31m', @@ -163,6 +169,16 @@ const colorConstants = { reset: '\u001b[0m' } +// Register close on process exit. +process.on('exit', () => { + shutdownPostgres(sharedPipelineContextVars).catch((err) => { + console.error(err) + }) + shutdownMongo(sharedPipelineContextVars).catch((err) => { + console.error(err) + }) +}) + /** * @public */ @@ -220,7 +236,7 @@ export function devTool ( } closeAccountsDb() console.log(`closing database connection to '${uri}'...`) - await shutdown() + await shutdownMongo() } async function withStorage (f: (storageAdapter: StorageAdapter) => Promise): Promise { @@ -648,6 +664,7 @@ export function devTool ( true, true, 5000, // 5 gigabytes per blob + sharedPipelineContextVars, async (storage, workspaceStorage) => { if (cmd.remove) { await updateArchiveInfo(toolCtx, db, ws.workspace, true) @@ -667,7 +684,7 @@ export function devTool ( const destroyer = getWorkspaceDestroyAdapter(dbUrl) - await destroyer.deleteWorkspace(toolCtx, { name: ws.workspace }) + await destroyer.deleteWorkspace(toolCtx, sharedPipelineContextVars, { name: ws.workspace }) } } ) @@ -718,7 +735,8 @@ export function devTool ( cmd.region, false, false, - 100 + 100, + sharedPipelineContextVars ) ) { processed++ diff --git a/packages/core/src/memdb.ts b/packages/core/src/memdb.ts index 63b95536e6..79314d6e84 100644 --- a/packages/core/src/memdb.ts +++ b/packages/core/src/memdb.ts @@ -412,7 +412,7 @@ export class ModelDb extends MemDb { this.updateDoc(cud.objectId, doc, cud) TxProcessor.updateDoc2Doc(doc, cud) } else { - ctx.error('no document found, failed to apply model transaction, skipping', { + ctx.warn('no document found, failed to apply model transaction, skipping', { _id: tx._id, _class: tx._class, objectId: cud.objectId @@ -424,7 +424,7 @@ export class ModelDb extends MemDb { try { this.delDoc((tx as TxRemoveDoc).objectId) } catch (err: any) { - ctx.error('no document found, failed to apply model transaction, skipping', { + ctx.warn('no document found, failed to apply model transaction, skipping', { _id: tx._id, _class: tx._class, objectId: (tx as TxRemoveDoc).objectId @@ -438,7 +438,7 @@ export class ModelDb extends MemDb { this.updateDoc(mix.objectId, doc, mix) TxProcessor.updateMixin4Doc(doc, mix) } else { - ctx.error('no document found, failed to apply model transaction, skipping', { + ctx.warn('no document found, failed to apply model transaction, skipping', { _id: tx._id, _class: tx._class, objectId: mix.objectId diff --git a/pods/backup/src/index.ts b/pods/backup/src/index.ts index e14c6e0dbb..9aa888997f 100644 --- a/pods/backup/src/index.ts +++ b/pods/backup/src/index.ts @@ -23,20 +23,37 @@ import { getConfig, registerAdapterFactory, registerDestroyFactory, - registerTxAdapterFactory + registerTxAdapterFactory, + sharedPipelineContextVars } from '@hcengineering/server-pipeline' import { join } from 'path' -import { createMongoAdapter, createMongoDestroyAdapter, createMongoTxAdapter } from '@hcengineering/mongo' +import { + createMongoAdapter, + createMongoDestroyAdapter, + createMongoTxAdapter, + shutdownMongo +} from '@hcengineering/mongo' import { createPostgreeDestroyAdapter, createPostgresAdapter, createPostgresTxAdapter, - setDBExtraOptions + setDBExtraOptions, + shutdownPostgres } from '@hcengineering/postgres' import { readFileSync } from 'node:fs' const model = JSON.parse(readFileSync(process.env.MODEL_JSON ?? 'model.json').toString()) as Tx[] +// Register close on process exit. +process.on('exit', () => { + shutdownPostgres(sharedPipelineContextVars).catch((err) => { + console.error(err) + }) + shutdownMongo(sharedPipelineContextVars).catch((err) => { + console.error(err) + }) +}) + const metricsContext = initStatisticsContext('backup', { factory: () => new MeasureMetricsContext( @@ -84,5 +101,6 @@ startBackup( externalStorage, disableTriggers: true }) - } + }, + sharedPipelineContextVars ) diff --git a/pods/fulltext/src/server.ts b/pods/fulltext/src/server.ts index a87b7b9c82..b826a1ac2b 100644 --- a/pods/fulltext/src/server.ts +++ b/pods/fulltext/src/server.ts @@ -30,13 +30,19 @@ import { LowLevelMiddleware, ModelMiddleware } from '@hcengineering/middleware' -import { createMongoAdapter, createMongoDestroyAdapter, createMongoTxAdapter } from '@hcengineering/mongo' +import { + createMongoAdapter, + createMongoDestroyAdapter, + createMongoTxAdapter, + shutdownMongo +} from '@hcengineering/mongo' import { PlatformError, setMetadata, unknownError } from '@hcengineering/platform' import { createPostgreeDestroyAdapter, createPostgresAdapter, createPostgresTxAdapter, - setDBExtraOptions + setDBExtraOptions, + shutdownPostgres } from '@hcengineering/postgres' import serverClientPlugin, { getTransactorEndpoint, getWorkspaceInfo } from '@hcengineering/server-client' import serverCore, { @@ -56,7 +62,8 @@ import { registerDestroyFactory, registerServerPlugins, registerStringLoaders, - registerTxAdapterFactory + registerTxAdapterFactory, + sharedPipelineContextVars } from '@hcengineering/server-pipeline' import serverToken, { decodeToken, generateToken, type Token } from '@hcengineering/server-token' import cors from '@koa/cors' @@ -104,7 +111,8 @@ class WorkspaceIndexer { branding: null, modelDb, hierarchy, - storageAdapter: externalStorage + storageAdapter: externalStorage, + contextVars: {} } result.pipeline = await createPipeline(ctx, middlewares, context) @@ -204,6 +212,15 @@ interface Search { interface Reindex { token: string } +// Register close on process exit. +process.on('exit', () => { + shutdownPostgres(sharedPipelineContextVars).catch((err) => { + console.error(err) + }) + shutdownMongo(sharedPipelineContextVars).catch((err) => { + console.error(err) + }) +}) export async function startIndexer ( ctx: MeasureContext, diff --git a/pods/server/src/server.ts b/pods/server/src/server.ts index 8850484962..46344ffd5e 100644 --- a/pods/server/src/server.ts +++ b/pods/server/src/server.ts @@ -33,21 +33,37 @@ import { registerDestroyFactory, registerServerPlugins, registerStringLoaders, - registerTxAdapterFactory + registerTxAdapterFactory, + sharedPipelineContextVars } from '@hcengineering/server-pipeline' -import { createMongoAdapter, createMongoDestroyAdapter, createMongoTxAdapter } from '@hcengineering/mongo' +import { + createMongoAdapter, + createMongoDestroyAdapter, + createMongoTxAdapter, + shutdownMongo +} from '@hcengineering/mongo' import { createPostgreeDestroyAdapter, createPostgresAdapter, createPostgresTxAdapter, - setDBExtraOptions + setDBExtraOptions, + shutdownPostgres } from '@hcengineering/postgres' import { readFileSync } from 'node:fs' const model = JSON.parse(readFileSync(process.env.MODEL_JSON ?? 'model.json').toString()) as Tx[] registerStringLoaders() +// Register close on process exit. +process.on('exit', () => { + shutdownPostgres(sharedPipelineContextVars).catch((err) => { + console.error(err) + }) + shutdownMongo(sharedPipelineContextVars).catch((err) => { + console.error(err) + }) +}) /** * @public */ diff --git a/server/account/src/operations.ts b/server/account/src/operations.ts index 2ee99fd63b..29778cd181 100644 --- a/server/account/src/operations.ts +++ b/server/account/src/operations.ts @@ -92,7 +92,7 @@ import { toAccountInfo, verifyPassword } from './utils' -import { getWorkspaceDestroyAdapter } from '@hcengineering/server-pipeline' +import { getWorkspaceDestroyAdapter, sharedPipelineContextVars } from '@hcengineering/server-pipeline' import MD5 from 'crypto-js/md5' function buildGravatarId (email: string): string { @@ -2364,7 +2364,7 @@ export async function dropWorkspaceFull ( const ws = await dropWorkspace(ctx, db, branding, workspaceId) const adapter = getWorkspaceDestroyAdapter(dbUrl) - await adapter.deleteWorkspace(ctx, { name: ws.workspace }) + await adapter.deleteWorkspace(ctx, sharedPipelineContextVars, { name: ws.workspace }) const wspace = getWorkspaceId(workspaceId) const hasBucket = await storageAdapter?.exists(ctx, wspace) diff --git a/server/account/src/utils.ts b/server/account/src/utils.ts index 01102423dc..d3d640294c 100644 --- a/server/account/src/utils.ts +++ b/server/account/src/utils.ts @@ -23,6 +23,7 @@ import { MongoAccountDB } from './collections/mongo' import { PostgresAccountDB } from './collections/postgres' import { accountPlugin } from './plugin' import type { Account, AccountDB, AccountInfo, RegionInfo, WorkspaceInfo } from './types' +import { sharedPipelineContextVars } from '@hcengineering/server-pipeline' export async function getAccountDB (uri: string, dbNs?: string): Promise<[AccountDB, () => void]> { const isMongo = uri.startsWith('mongodb://') @@ -41,7 +42,7 @@ export async function getAccountDB (uri: string, dbNs?: string): Promise<[Accoun } ] } else { - const client = getDBClient(uri) + const client = getDBClient(sharedPipelineContextVars, uri) const pgClient = await client.getClient() // TODO: if dbNs is provided put tables in that schema const pgAccount = new PostgresAccountDB(pgClient) diff --git a/server/backup-service/src/index.ts b/server/backup-service/src/index.ts index 2f48216df5..0067a79451 100644 --- a/server/backup-service/src/index.ts +++ b/server/backup-service/src/index.ts @@ -37,7 +37,8 @@ export function startBackup ( workspace: WorkspaceIdWithUrl, branding: Branding | null, externalStorage: StorageAdapter - ) => DbConfiguration + ) => DbConfiguration, + contextVars: Record ): void { const config = _config() setMetadata(serverToken.metadata.Secret, config.Secret) @@ -66,7 +67,8 @@ export function startBackup ( (ctx, workspace, branding, externalStorage) => { return getConfig(ctx, mainDbUrl, workspace, branding, externalStorage) }, - config.Region + config.Region, + contextVars ) process.on('SIGINT', shutdown) @@ -94,6 +96,7 @@ export async function backupWorkspace ( freshBackup: boolean = false, clean: boolean = false, downloadLimit: number, + contextVars: Record, onFinish?: (backupStorage: StorageAdapter, workspaceStorage: StorageAdapter) => Promise ): Promise { @@ -130,7 +133,8 @@ export async function backupWorkspace ( freshBackup, clean, downloadLimit, - [] + [], + contextVars ) if (result && onFinish !== undefined) { await onFinish(storageAdapter, workspaceStorageAdapter) diff --git a/server/backup/src/service.ts b/server/backup/src/service.ts index c60a3008d6..13d04eaec8 100644 --- a/server/backup/src/service.ts +++ b/server/backup/src/service.ts @@ -66,6 +66,7 @@ class BackupWorker { externalStorage: StorageAdapter ) => DbConfiguration, readonly region: string, + readonly contextVars: Record, readonly freshWorkspace: boolean = false, readonly clean: boolean = false, readonly skipDomains: string[] = [] @@ -211,6 +212,7 @@ class BackupWorker { const modelDb = new ModelDb(hierarchy) const txAdapter = await adapterConf.factory( ctx, + this.contextVars, hierarchy, adapterConf.url, wsUrl, @@ -218,7 +220,7 @@ class BackupWorker { this.workspaceStorageAdapter ) try { - await txAdapter.init?.(ctx) + await txAdapter.init?.(ctx, this.contextVars) return ( await txAdapter.rawFindAll( @@ -292,9 +294,18 @@ export function backupService ( externalStorage: StorageAdapter ) => DbConfiguration, region: string, + contextVars: Record, recheck?: boolean ): () => void { - const backupWorker = new BackupWorker(storage, config, pipelineFactory, workspaceStorageAdapter, getConfig, region) + const backupWorker = new BackupWorker( + storage, + config, + pipelineFactory, + workspaceStorageAdapter, + getConfig, + region, + contextVars + ) const shutdown = (): void => { void backupWorker.close() @@ -322,6 +333,7 @@ export async function doBackupWorkspace ( clean: boolean, downloadLimit: number, skipDomains: string[], + contextVars: Record, notify?: (progress: number) => Promise ): Promise { const backupWorker = new BackupWorker( @@ -331,6 +343,7 @@ export async function doBackupWorkspace ( workspaceStorageAdapter, getConfig, region, + contextVars, freshWorkspace, clean, skipDomains diff --git a/server/core/src/adapter.ts b/server/core/src/adapter.ts index 26cba80b20..d905d4c3a2 100644 --- a/server/core/src/adapter.ts +++ b/server/core/src/adapter.ts @@ -65,7 +65,12 @@ export type DbAdapterHandler = ( * @public */ export interface DbAdapter extends LowLevelStorage { - init?: (ctx: MeasureContext, domains?: string[], excludeDomains?: string[]) => Promise + init?: ( + ctx: MeasureContext, + contextVars: Record, + domains?: string[], + excludeDomains?: string[] + ) => Promise helper?: () => DomainHelperOperations @@ -103,7 +108,7 @@ export interface TxAdapter extends DbAdapter { * @public */ export interface WorkspaceDestroyAdapter { - deleteWorkspace: (ctx: MeasureContext, workspace: WorkspaceId) => Promise + deleteWorkspace: (ctx: MeasureContext, contextVars: Record, workspace: WorkspaceId) => Promise } /** @@ -111,6 +116,7 @@ export interface WorkspaceDestroyAdapter { */ export type DbAdapterFactory = ( ctx: MeasureContext, + contextVars: Record, hierarchy: Hierarchy, url: string, workspaceId: WorkspaceId, diff --git a/server/core/src/benchmark/index.ts b/server/core/src/benchmark/index.ts index 05d0f1d803..afb5f99e1b 100644 --- a/server/core/src/benchmark/index.ts +++ b/server/core/src/benchmark/index.ts @@ -119,6 +119,7 @@ class BenchmarkDbAdapter extends DummyDbAdapter { */ export async function createBenchmarkAdapter ( ctx: MeasureContext, + contextVars: Record, hierarchy: Hierarchy, url: string, workspaceId: WorkspaceId, diff --git a/server/core/src/dbAdapterManager.ts b/server/core/src/dbAdapterManager.ts index 0bb863ea52..807718b8bc 100644 --- a/server/core/src/dbAdapterManager.ts +++ b/server/core/src/dbAdapterManager.ts @@ -130,7 +130,9 @@ export class DbAdapterManagerImpl implements DBAdapterManager { } } } - await ctx.with(`init adapter ${key}`, {}, (ctx) => adapter?.init?.(ctx, domains, excludeDomains)) + await ctx.with(`init adapter ${key}`, {}, (ctx) => + adapter?.init?.(ctx, this.context.contextVars, domains, excludeDomains) + ) } } } diff --git a/server/core/src/mem.ts b/server/core/src/mem.ts index 2dfb16f1a6..92bbdeb808 100644 --- a/server/core/src/mem.ts +++ b/server/core/src/mem.ts @@ -177,6 +177,7 @@ class InMemoryAdapter extends DummyDbAdapter implements DbAdapter { */ export async function createInMemoryAdapter ( ctx: MeasureContext, + contextVars: Record, hierarchy: Hierarchy, url: string, workspaceId: WorkspaceId diff --git a/server/core/src/nullAdapter.ts b/server/core/src/nullAdapter.ts index 74ff2a7abd..45eec91dea 100644 --- a/server/core/src/nullAdapter.ts +++ b/server/core/src/nullAdapter.ts @@ -22,6 +22,7 @@ import { DummyDbAdapter } from './mem' */ export async function createNullAdapter ( ctx: MeasureContext, + contextVars: Record, hierarchy: Hierarchy, url: string, workspaceId: WorkspaceId, diff --git a/server/core/src/types.ts b/server/core/src/types.ts index f0214ce27f..24a0dc2063 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -181,6 +181,8 @@ export interface PipelineContext { derived?: Middleware head?: Middleware + contextVars: Record + broadcastEvent?: (ctx: MeasureContext, tx: Tx[]) => Promise } /** diff --git a/server/middleware/src/dbAdapter.ts b/server/middleware/src/dbAdapter.ts index 78da46056f..efd0b4213d 100644 --- a/server/middleware/src/dbAdapter.ts +++ b/server/middleware/src/dbAdapter.ts @@ -55,6 +55,7 @@ export class DBAdapterMiddleware extends BaseMiddleware implements Middleware { key, await adapterConf.factory( ctx, + this.context.contextVars, this.context.hierarchy, adapterConf.url, this.context.workspace, @@ -69,7 +70,7 @@ export class DBAdapterMiddleware extends BaseMiddleware implements Middleware { const txAdapterName = this.conf.domains[DOMAIN_TX] const txAdapter = adapters.get(txAdapterName) as TxAdapter - await txAdapter.init?.(metrics, [DOMAIN_TX, DOMAIN_MODEL_TX]) + await txAdapter.init?.(metrics, this.context.contextVars, [DOMAIN_TX, DOMAIN_MODEL_TX]) const defaultAdapter = adapters.get(this.conf.defaultAdapter) if (defaultAdapter === undefined) { diff --git a/server/middleware/src/fulltext.ts b/server/middleware/src/fulltext.ts index 0ee86b7821..eada8a9416 100644 --- a/server/middleware/src/fulltext.ts +++ b/server/middleware/src/fulltext.ts @@ -37,7 +37,6 @@ import core, { type SessionData, type Tx } from '@hcengineering/core' -import { PlatformError, unknownError } from '@hcengineering/platform' import type { IndexedDoc, Middleware, @@ -91,9 +90,6 @@ export class FullTextMiddleware extends BaseMiddleware implements Middleware { } async init (ctx: MeasureContext): Promise { - if (this.context.adapterManager == null) { - throw new PlatformError(unknownError('Adapter manager should be specified')) - } this.contexts = new Map( this.context.modelDb.findAllSync(core.class.FullTextSearchContext, {}).map((it) => [it.toClass, it]) ) diff --git a/server/mongo/src/__tests__/storage.test.ts b/server/mongo/src/__tests__/storage.test.ts index 9257c440a0..62d4f2c7f9 100644 --- a/server/mongo/src/__tests__/storage.test.ts +++ b/server/mongo/src/__tests__/storage.test.ts @@ -28,7 +28,7 @@ import core, { } from '@hcengineering/core' import { type DbAdapter, wrapAdapterToClient } from '@hcengineering/server-core' import { createMongoAdapter, createMongoTxAdapter } from '..' -import { getMongoClient, type MongoClientReference, shutdown } from '../utils' +import { getMongoClient, type MongoClientReference, shutdownMongo } from '../utils' import { genMinModel } from './minmodel' import { createTaskModel, type Task, type TaskComment, taskPlugin } from './tasks' @@ -52,7 +52,7 @@ describe('mongo operations', () => { afterAll(async () => { mongoClient.close() - await shutdown() + await shutdownMongo() }) beforeEach(async () => { @@ -80,6 +80,7 @@ describe('mongo operations', () => { const mctx = new MeasureMetricsContext('', {}) const txStorage = await createMongoTxAdapter( new MeasureMetricsContext('', {}), + {}, hierarchy, mongodbUri, getWorkspaceId(dbId), @@ -88,6 +89,7 @@ describe('mongo operations', () => { serverStorage = await createMongoAdapter( new MeasureMetricsContext('', {}), + {}, hierarchy, mongodbUri, getWorkspaceId(dbId), diff --git a/server/mongo/src/index.ts b/server/mongo/src/index.ts index dcf90295e9..64a7113b38 100644 --- a/server/mongo/src/index.ts +++ b/server/mongo/src/index.ts @@ -22,7 +22,7 @@ export * from './utils' export function createMongoDestroyAdapter (url: string): WorkspaceDestroyAdapter { return { - deleteWorkspace: async (ctx, workspace): Promise => { + deleteWorkspace: async (ctx, contextVars, workspace): Promise => { const client = getMongoClient(url) try { await ctx.with('delete-workspace', {}, async () => { diff --git a/server/mongo/src/storage.ts b/server/mongo/src/storage.ts index 644d23fdcc..60594e0d2c 100644 --- a/server/mongo/src/storage.ts +++ b/server/mongo/src/storage.ts @@ -1807,6 +1807,7 @@ function translateLikeQuery (pattern: string): { $regex: string, $options: strin */ export async function createMongoAdapter ( ctx: MeasureContext, + contextVars: Record, hierarchy: Hierarchy, url: string, workspaceId: WorkspaceId, @@ -1825,6 +1826,7 @@ export async function createMongoAdapter ( */ export async function createMongoTxAdapter ( ctx: MeasureContext, + contextVars: Record, hierarchy: Hierarchy, url: string, workspaceId: WorkspaceId, diff --git a/server/mongo/src/utils.ts b/server/mongo/src/utils.ts index e65404e1d5..1e76507a95 100644 --- a/server/mongo/src/utils.ts +++ b/server/mongo/src/utils.ts @@ -27,19 +27,12 @@ import { MongoClient, type Collection, type Db, type Document } from 'mongodb' const connections = new Map() -// Register mongo close on process exit. -process.on('exit', () => { - shutdown().catch((err) => { - console.error(err) - }) -}) - const clientRefs = new Map() /** * @public */ -export async function shutdown (): Promise { +export async function shutdownMongo (contextVars: Record = {}): Promise { for (const it of Array.from(clientRefs.values())) { console.error((it as any).stack) } diff --git a/server/postgres/src/__tests__/storage.test.ts b/server/postgres/src/__tests__/storage.test.ts index 332644c247..3a69fe87db 100644 --- a/server/postgres/src/__tests__/storage.test.ts +++ b/server/postgres/src/__tests__/storage.test.ts @@ -27,7 +27,7 @@ import core, { } from '@hcengineering/core' import { type DbAdapter, wrapAdapterToClient } from '@hcengineering/server-core' import { createPostgresAdapter, createPostgresTxAdapter } from '..' -import { getDBClient, type PostgresClientReference, shutdown } from '../utils' +import { getDBClient, type PostgresClientReference, shutdownPostgres } from '../utils' import { genMinModel } from './minmodel' import { createTaskModel, type Task, type TaskComment, taskPlugin } from './tasks' @@ -35,12 +35,14 @@ const txes = genMinModel() createTaskModel(txes) +const contextVars: Record = {} + describe('postgres operations', () => { const baseDbUri: string = process.env.DB_URL ?? 'postgresql://postgres:example@localhost:5433' let dbId: string = 'pg_testdb_' + generateId() let dbUuid: string = crypto.randomUUID() let dbUri: string = baseDbUri + '/' + dbId - const clientRef: PostgresClientReference = getDBClient(baseDbUri) + const clientRef: PostgresClientReference = getDBClient(contextVars, baseDbUri) let hierarchy: Hierarchy let model: ModelDb let client: Client @@ -49,7 +51,7 @@ describe('postgres operations', () => { afterAll(async () => { clientRef.close() - await shutdown() + await shutdownPostgres(contextVars) }) beforeEach(async () => { @@ -88,6 +90,7 @@ describe('postgres operations', () => { const mctx = new MeasureMetricsContext('', {}) const txStorage = await createPostgresTxAdapter( mctx, + contextVars, hierarchy, dbUri, { @@ -107,6 +110,7 @@ describe('postgres operations', () => { const ctx = new MeasureMetricsContext('client', {}) const serverStorage = await createPostgresAdapter( ctx, + contextVars, hierarchy, dbUri, { @@ -115,7 +119,7 @@ describe('postgres operations', () => { }, model ) - await serverStorage.init?.(ctx) + await serverStorage.init?.(ctx, contextVars) client = await createClient(async (handler) => { return wrapAdapterToClient(ctx, serverStorage, txes) }) diff --git a/server/postgres/src/index.ts b/server/postgres/src/index.ts index 7d339eb9ce..ec73ea786f 100644 --- a/server/postgres/src/index.ts +++ b/server/postgres/src/index.ts @@ -19,12 +19,20 @@ import { getDBClient, retryTxn } from './utils' export { getDocFieldsByDomains, translateDomain } from './schemas' export * from './storage' -export { convertDoc, createTables, getDBClient, retryTxn, setDBExtraOptions, setExtraOptions } from './utils' +export { + convertDoc, + createTables, + getDBClient, + retryTxn, + setDBExtraOptions, + setExtraOptions, + shutdownPostgres +} from './utils' export function createPostgreeDestroyAdapter (url: string): WorkspaceDestroyAdapter { return { - deleteWorkspace: async (ctx, workspace): Promise => { - const client = getDBClient(url) + deleteWorkspace: async (ctx, contextVars, workspace): Promise => { + const client = getDBClient(contextVars, url) try { const connection = await client.getClient() diff --git a/server/postgres/src/storage.ts b/server/postgres/src/storage.ts index 566d37496c..532fc98689 100644 --- a/server/postgres/src/storage.ts +++ b/server/postgres/src/storage.ts @@ -93,7 +93,6 @@ import { parseUpdate, type PostgresClientReference } from './utils' - async function * createCursorGenerator ( client: postgres.ReservedSql, sql: string, @@ -192,11 +191,10 @@ class ConnectionInfo { } } -const connections = new Map() - class ConnectionMgr { constructor ( protected readonly client: postgres.Sql, + protected readonly connections: () => Map, readonly mgrId: string ) {} @@ -296,10 +294,10 @@ class ConnectionMgr { } release (id: string): void { - const conn = connections.get(id) + const conn = this.connections().get(id) if (conn !== undefined) { conn.released = true - connections.delete(id) // We need to delete first + this.connections().delete(id) // We need to delete first conn.release() } else { console.log('wrne') @@ -307,10 +305,11 @@ class ConnectionMgr { } close (): void { - for (const [k, conn] of Array.from(connections.entries()).filter( + const cnts = this.connections() + for (const [k, conn] of Array.from(cnts.entries()).filter( ([, it]: [string, ConnectionInfo]) => it.mgrId === this.mgrId )) { - connections.delete(k) + cnts.delete(k) try { conn.release() } catch (err: any) { @@ -320,12 +319,12 @@ class ConnectionMgr { } getConnection (id: string, managed: boolean = true): ConnectionInfo { - let conn = connections.get(id) + let conn = this.connections().get(id) if (conn === undefined) { conn = new ConnectionInfo(this.mgrId, id, this.client, managed) } if (managed) { - connections.set(id, conn) + this.connections().set(id, conn) } return conn } @@ -407,6 +406,8 @@ abstract class PostgresAdapterBase implements DbAdapter { protected readonly tableFields = new Map() protected readonly workspaceId: WorkspaceId + protected connections = new Map() + mgr: ConnectionMgr constructor ( @@ -422,7 +423,7 @@ abstract class PostgresAdapterBase implements DbAdapter { name: enrichedWorkspaceId.uuid ?? enrichedWorkspaceId.name } this._helper = new DBCollectionHelper(this.client, this.workspaceId) - this.mgr = new ConnectionMgr(client, mgrId) + this.mgr = new ConnectionMgr(client, () => this.connections, mgrId) } reserveContext (id: string): () => void { @@ -430,7 +431,7 @@ abstract class PostgresAdapterBase implements DbAdapter { return () => { conn.released = true conn.release() - connections.delete(id) // We need to delete first + this.connections.delete(id) // We need to delete first } } @@ -477,7 +478,12 @@ abstract class PostgresAdapterBase implements DbAdapter { on?: ((handler: DbAdapterHandler) => void) | undefined - abstract init (ctx: MeasureContext, domains?: string[], excludeDomains?: string[]): Promise + abstract init ( + ctx: MeasureContext, + contextVars: Record, + domains?: string[], + excludeDomains?: string[] + ): Promise async close (): Promise { this.mgr.close() @@ -1672,7 +1678,14 @@ interface OperationBulk { const initRateLimit = new RateLimiter(1) class PostgresAdapter extends PostgresAdapterBase { - async init (ctx: MeasureContext, domains?: string[], excludeDomains?: string[]): Promise { + async init ( + ctx: MeasureContext, + contextVars: Record, + domains?: string[], + excludeDomains?: string[] + ): Promise { + this.connections = contextVars.cntInfoPG ?? new Map() + contextVars.cntInfoPG = this.connections let resultDomains = domains ?? this.hierarchy.domains() if (excludeDomains !== undefined) { resultDomains = resultDomains.filter((it) => !excludeDomains.includes(it)) @@ -1977,7 +1990,15 @@ class PostgresAdapter extends PostgresAdapterBase { } class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter { - async init (ctx: MeasureContext, domains?: string[], excludeDomains?: string[]): Promise { + async init ( + ctx: MeasureContext, + contextVars: Record, + domains?: string[], + excludeDomains?: string[] + ): Promise { + this.connections = contextVars.cntInfoPG ?? new Map() + contextVars.cntInfoPG = this.connections + const resultDomains = domains ?? [DOMAIN_TX, DOMAIN_MODEL_TX] await initRateLimit.exec(async () => { const url = this.refClient.url() @@ -2035,12 +2056,13 @@ class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter { */ export async function createPostgresAdapter ( ctx: MeasureContext, + contextVars: Record, hierarchy: Hierarchy, url: string, workspaceId: WorkspaceId, modelDb: ModelDb ): Promise { - const client = getDBClient(url) + const client = getDBClient(contextVars, url) const connection = await client.getClient() return new PostgresAdapter(connection, client, workspaceId, hierarchy, modelDb, 'default-' + workspaceId.name) } @@ -2050,12 +2072,13 @@ export async function createPostgresAdapter ( */ export async function createPostgresTxAdapter ( ctx: MeasureContext, + contextVars: Record, hierarchy: Hierarchy, url: string, workspaceId: WorkspaceId, modelDb: ModelDb ): Promise { - const client = getDBClient(url) + const client = getDBClient(contextVars, url) const connection = await client.getClient() return new PostgresTxAdapter(connection, client, workspaceId, hierarchy, modelDb, 'tx' + workspaceId.name) } diff --git a/server/postgres/src/utils.ts b/server/postgres/src/utils.ts index f00d2967ba..68410b9285 100644 --- a/server/postgres/src/utils.ts +++ b/server/postgres/src/utils.ts @@ -43,15 +43,6 @@ import { translateDomain } from './schemas' -const connections = new Map() - -// Register close on process exit. -process.on('exit', () => { - shutdown().catch((err) => { - console.error(err) - }) -}) - const clientRefs = new Map() const loadedDomains = new Set() @@ -195,7 +186,12 @@ async function createTable (client: postgres.Sql, domain: string): Promise /** * @public */ -export async function shutdown (): Promise { +export async function shutdownPostgres (contextVars: Record): Promise { + const connections: Map | undefined = + contextVars.pgConnections ?? new Map() + if (connections === undefined) { + return + } for (const c of connections.values()) { c.close(true) } @@ -305,9 +301,16 @@ export function setExtraOptions (options: DBExtraOptions): void { * Initialize a workspace connection to DB * @public */ -export function getDBClient (connectionString: string, database?: string): PostgresClientReference { +export function getDBClient ( + contextVars: Record, + connectionString: string, + database?: string +): PostgresClientReference { const extraOptions = JSON.parse(process.env.POSTGRES_OPTIONS ?? '{}') const key = `${connectionString}${extraOptions}` + const connections = contextVars.pgConnections ?? new Map() + contextVars.pgConnections = connections + let existing = connections.get(key) if (existing === undefined) { diff --git a/server/server-pipeline/src/blobStorage.ts b/server/server-pipeline/src/blobStorage.ts index cc68911bad..2f5aefeded 100644 --- a/server/server-pipeline/src/blobStorage.ts +++ b/server/server-pipeline/src/blobStorage.ts @@ -62,7 +62,15 @@ class StorageBlobAdapter implements DbAdapter { } } - init?: ((ctx: MeasureContext, domains?: string[], excludeDomains?: string[]) => Promise) | undefined + init?: + | (( + ctx: MeasureContext, + contextVars: Record, + domains?: string[], + excludeDomains?: string[] + ) => Promise) + | undefined + on?: ((handler: DbAdapterHandler) => void) | undefined async rawFindAll(domain: Domain, query: DocumentQuery, options?: FindOptions): Promise { @@ -130,6 +138,7 @@ class StorageBlobAdapter implements DbAdapter { */ export async function createStorageDataAdapter ( ctx: MeasureContext, + contextVars: Record, hierarchy: Hierarchy, url: string, workspaceId: WorkspaceId, diff --git a/server/server-pipeline/src/pipeline.ts b/server/server-pipeline/src/pipeline.ts index 5633501e2c..e0ff4b75f1 100644 --- a/server/server-pipeline/src/pipeline.ts +++ b/server/server-pipeline/src/pipeline.ts @@ -77,6 +77,12 @@ export function getTxAdapterFactory ( return adapter.factory } +/** + * A pipelice context used by standalong services to hold global variables. + * In case of Durable Objects, it should not be shared and individual context should be created. + */ +export const sharedPipelineContextVars: Record = {} + /** * @public */ @@ -94,6 +100,7 @@ export function createServerPipeline ( externalStorage: StorageAdapter extraLogging?: boolean // If passed, will log every request/etc. + pipelineContextVars?: Record }, extensions?: Partial ): PipelineFactory { @@ -137,7 +144,8 @@ export function createServerPipeline ( branding, modelDb, hierarchy, - storageAdapter: opt.externalStorage + storageAdapter: opt.externalStorage, + contextVars: opt.pipelineContextVars ?? sharedPipelineContextVars } return createPipeline(ctx, middlewares, context) } @@ -183,7 +191,8 @@ export function createBackupPipeline ( branding, modelDb, hierarchy, - storageAdapter: opt.externalStorage + storageAdapter: opt.externalStorage, + contextVars: {} } return createPipeline(ctx, middlewares, context) } diff --git a/server/server/src/client.ts b/server/server/src/client.ts index b867415df4..447fbdbcdd 100644 --- a/server/server/src/client.ts +++ b/server/server/src/client.ts @@ -102,7 +102,7 @@ export class ClientSession implements Session { async loadModel (ctx: ClientSessionCtx, lastModelTx: Timestamp, hash?: string): Promise { this.includeSessionContext(ctx.ctx, ctx.pipeline) - const result = await ctx.ctx.with('load-model', {}, () => ctx.pipeline.loadModel(ctx.ctx, lastModelTx, hash)) + const result = await ctx.ctx.with('load-model', {}, (_ctx) => ctx.pipeline.loadModel(_ctx, lastModelTx, hash)) await ctx.sendResponse(ctx.requestId, result) } diff --git a/server/workspace-service/src/service.ts b/server/workspace-service/src/service.ts index 5ff47cebdf..9e7fe5dfa0 100644 --- a/server/workspace-service/src/service.ts +++ b/server/workspace-service/src/service.ts @@ -40,8 +40,18 @@ import { FileModelLogger, prepareTools } from '@hcengineering/server-tool' import path from 'path' import { Analytics } from '@hcengineering/analytics' -import { createMongoAdapter, createMongoDestroyAdapter, createMongoTxAdapter } from '@hcengineering/mongo' -import { createPostgreeDestroyAdapter, createPostgresAdapter, createPostgresTxAdapter } from '@hcengineering/postgres' +import { + createMongoAdapter, + createMongoDestroyAdapter, + createMongoTxAdapter, + shutdownMongo +} from '@hcengineering/mongo' +import { + createPostgreeDestroyAdapter, + createPostgresAdapter, + createPostgresTxAdapter, + shutdownPostgres +} from '@hcengineering/postgres' import { doBackupWorkspace, doRestoreWorkspace } from '@hcengineering/server-backup' import type { PipelineFactory, StorageAdapter } from '@hcengineering/server-core' import { @@ -52,7 +62,8 @@ import { registerDestroyFactory, registerServerPlugins, registerStringLoaders, - registerTxAdapterFactory + registerTxAdapterFactory, + sharedPipelineContextVars } from '@hcengineering/server-pipeline' import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage' import { createWorkspace, upgradeWorkspace } from './ws-operations' @@ -72,6 +83,16 @@ export interface WorkspaceOptions { } } +// Register close on process exit. +process.on('exit', () => { + shutdownPostgres(sharedPipelineContextVars).catch((err) => { + console.error(err) + }) + shutdownMongo(sharedPipelineContextVars).catch((err) => { + console.error(err) + }) +}) + export type WorkspaceOperation = 'create' | 'upgrade' | 'all' | 'all+backup' export class WorkspaceWorker { @@ -348,7 +369,7 @@ export class WorkspaceWorker { async doCleanup (ctx: MeasureContext, workspace: BaseWorkspaceInfo): Promise { const { dbUrl } = prepareTools([]) const adapter = getWorkspaceDestroyAdapter(dbUrl) - await adapter.deleteWorkspace(ctx, { name: workspace.workspace }) + await adapter.deleteWorkspace(ctx, sharedPipelineContextVars, { name: workspace.workspace }) } private async doWorkspaceOperation ( @@ -500,6 +521,7 @@ export class WorkspaceWorker { archive, 50000, ['blob'], + sharedPipelineContextVars, (_p: number) => { if (progress !== Math.round(_p)) { progress = Math.round(_p) diff --git a/server/workspace-service/src/ws-operations.ts b/server/workspace-service/src/ws-operations.ts index 602408aa56..b3f8a43428 100644 --- a/server/workspace-service/src/ws-operations.ts +++ b/server/workspace-service/src/ws-operations.ts @@ -16,7 +16,7 @@ import core, { import { consoleModelLogger, type MigrateOperation, type ModelLogger } from '@hcengineering/model' import { getTransactorEndpoint } from '@hcengineering/server-client' import { SessionDataImpl, wrapPipeline, type Pipeline, type StorageAdapter } from '@hcengineering/server-core' -import { getServerPipeline, getTxAdapterFactory } from '@hcengineering/server-pipeline' +import { getServerPipeline, getTxAdapterFactory, sharedPipelineContextVars } from '@hcengineering/server-pipeline' import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage' import { generateToken } from '@hcengineering/server-token' import { initializeWorkspace, initModel, prepareTools, updateModel, upgradeModel } from '@hcengineering/server-tool' @@ -82,7 +82,7 @@ export async function createWorkspace ( externalStorage: storageAdapter, usePassedCtx: true }) - const txAdapter = await txFactory(ctx, hierarchy, dbUrl, wsId, modelDb, storageAdapter) + const txAdapter = await txFactory(ctx, sharedPipelineContextVars, hierarchy, dbUrl, wsId, modelDb, storageAdapter) await childLogger.withLog('init-workspace', {}, (ctx) => initModel(ctx, wsId, txes, txAdapter, storageAdapter, ctxModellogger, async (value) => {}) ) diff --git a/workers/transactor/src/transactor.ts b/workers/transactor/src/transactor.ts index d587208917..91068c616c 100644 --- a/workers/transactor/src/transactor.ts +++ b/workers/transactor/src/transactor.ts @@ -54,10 +54,10 @@ import { CloudFlareLogger } from './logger' import model from './model.json' // import { configureAnalytics } from '@hcengineering/analytics-service' // import { Analytics } from '@hcengineering/analytics' +import contactPlugin from '@hcengineering/contact' import serverAiBot from '@hcengineering/server-ai-bot' import serverNotification from '@hcengineering/server-notification' import serverTelegram from '@hcengineering/server-telegram' -import contactPlugin from '@hcengineering/contact' export const PREFERRED_SAVE_SIZE = 500 export const PREFERRED_SAVE_INTERVAL = 30 * 1000 @@ -75,6 +75,8 @@ export class Transactor extends DurableObject { private readonly sessions = new Map() + private readonly contextVars: Record = {} + constructor (ctx: DurableObjectState, env: Env) { super(ctx, env) @@ -135,11 +137,12 @@ export class Transactor extends DurableObject { adapterSecurity: false, disableTriggers: false, fulltextUrl: env.FULLTEXT_URL, - extraLogging: true + extraLogging: true, + pipelineContextVars: this.contextVars }) const result = await pipeline(ctx, ws, upgrade, broadcast, branding) - const client = getDBClient(dbUrl) + const client = getDBClient(this.contextVars, dbUrl) const connection = await client.getClient() const t1 = Date.now() await connection`select now()`