UBERF-9224: Use context variables to hold context data (#7754)
Some checks are pending
CI / build (push) Waiting to run
CI / svelte-check (push) Blocked by required conditions
CI / formatting (push) Blocked by required conditions
CI / test (push) Blocked by required conditions
CI / uitest (push) Waiting to run
CI / uitest-pg (push) Waiting to run
CI / uitest-qms (push) Waiting to run
CI / docker-build (push) Blocked by required conditions
CI / dist-build (push) Blocked by required conditions

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2025-01-28 00:36:37 +07:00 committed by GitHub
parent 9046204e0a
commit f8a8c94bc2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 278 additions and 102 deletions

View File

@ -28,6 +28,7 @@ import {
} from '@hcengineering/postgres'
import { type DBDoc } from '@hcengineering/postgres/types/utils'
import { getTransactorEndpoint } from '@hcengineering/server-client'
import { sharedPipelineContextVars } from '@hcengineering/server-pipeline'
import { generateToken } from '@hcengineering/server-token'
import { connect } from '@hcengineering/server-tool'
import { type MongoClient, UUID } from 'mongodb'
@ -45,7 +46,7 @@ export async function moveFromMongoToPG (
}
const client = getMongoClient(mongoUrl)
const mongo = await client.getClient()
const pg = getDBClient(dbUrl)
const pg = getDBClient(sharedPipelineContextVars, dbUrl)
const pgClient = await pg.getClient()
for (let index = 0; index < workspaces.length; index++) {
@ -168,7 +169,7 @@ export async function moveWorkspaceFromMongoToPG (
}
const client = getMongoClient(mongoUrl)
const mongo = await client.getClient()
const pg = getDBClient(dbUrl)
const pg = getDBClient(sharedPipelineContextVars, dbUrl)
const pgClient = await pg.getClient()
await moveWorkspace(accountDb, mongo, pgClient, ws, region, include, force)
@ -306,7 +307,7 @@ export async function updateDataWorkspaceIdToUuid (
throw new Error('dbUrl is required')
}
const pg = getDBClient(dbUrl)
const pg = getDBClient(sharedPipelineContextVars, dbUrl)
try {
const pgClient = await pg.getClient()

View File

@ -66,10 +66,11 @@ import {
registerDestroyFactory,
registerServerPlugins,
registerStringLoaders,
registerTxAdapterFactory
registerTxAdapterFactory,
sharedPipelineContextVars
} from '@hcengineering/server-pipeline'
import serverToken, { decodeToken, generateToken } from '@hcengineering/server-token'
import { FileModelLogger, buildModel } from '@hcengineering/server-tool'
import { buildModel, FileModelLogger } from '@hcengineering/server-tool'
import { createWorkspace, upgradeWorkspace } from '@hcengineering/workspace-service'
import path from 'path'
@ -105,12 +106,18 @@ import {
createMongoTxAdapter,
getMongoClient,
getWorkspaceMongoDB,
shutdown
shutdownMongo
} from '@hcengineering/mongo'
import { backupDownload } from '@hcengineering/server-backup/src/backup'
import { createDatalakeClient, CONFIG_KIND as DATALAKE_CONFIG_KIND, type DatalakeConfig } from '@hcengineering/datalake'
import { getModelVersion } from '@hcengineering/model-all'
import {
createPostgreeDestroyAdapter,
createPostgresAdapter,
createPostgresTxAdapter,
shutdownPostgres
} from '@hcengineering/postgres'
import { CONFIG_KIND as S3_CONFIG_KIND, S3Service, type S3Config } from '@hcengineering/s3'
import type { PipelineFactory, StorageAdapter, StorageAdapterEx } from '@hcengineering/server-core'
import { deepEqual } from 'fast-equals'
@ -139,17 +146,16 @@ import {
import { changeConfiguration } from './configuration'
import {
generateUuidMissingWorkspaces,
updateDataWorkspaceIdToUuid,
moveAccountDbFromMongoToPG,
moveFromMongoToPG,
moveWorkspaceFromMongoToPG
moveWorkspaceFromMongoToPG,
updateDataWorkspaceIdToUuid
} from './db'
import { restoreControlledDocContentMongo, restoreWikiContentMongo, restoreMarkupRefsMongo } from './markup'
import { reindexWorkspace } from './fulltext'
import { restoreControlledDocContentMongo, restoreMarkupRefsMongo, restoreWikiContentMongo } from './markup'
import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin'
import { fixAccountEmails, renameAccount, fillGithubUsers } from './account'
import { copyToDatalake, moveFiles, showLostFiles } from './storage'
import { createPostgresTxAdapter, createPostgresAdapter, createPostgreeDestroyAdapter } from '@hcengineering/postgres'
import { reindexWorkspace } from './fulltext'
const colorConstants = {
colorRed: '\u001b[31m',
@ -163,6 +169,16 @@ const colorConstants = {
reset: '\u001b[0m'
}
// Register close on process exit.
process.on('exit', () => {
shutdownPostgres(sharedPipelineContextVars).catch((err) => {
console.error(err)
})
shutdownMongo(sharedPipelineContextVars).catch((err) => {
console.error(err)
})
})
/**
* @public
*/
@ -220,7 +236,7 @@ export function devTool (
}
closeAccountsDb()
console.log(`closing database connection to '${uri}'...`)
await shutdown()
await shutdownMongo()
}
async function withStorage (f: (storageAdapter: StorageAdapter) => Promise<any>): Promise<void> {
@ -648,6 +664,7 @@ export function devTool (
true,
true,
5000, // 5 gigabytes per blob
sharedPipelineContextVars,
async (storage, workspaceStorage) => {
if (cmd.remove) {
await updateArchiveInfo(toolCtx, db, ws.workspace, true)
@ -667,7 +684,7 @@ export function devTool (
const destroyer = getWorkspaceDestroyAdapter(dbUrl)
await destroyer.deleteWorkspace(toolCtx, { name: ws.workspace })
await destroyer.deleteWorkspace(toolCtx, sharedPipelineContextVars, { name: ws.workspace })
}
}
)
@ -718,7 +735,8 @@ export function devTool (
cmd.region,
false,
false,
100
100,
sharedPipelineContextVars
)
) {
processed++

View File

@ -412,7 +412,7 @@ export class ModelDb extends MemDb {
this.updateDoc(cud.objectId, doc, cud)
TxProcessor.updateDoc2Doc(doc, cud)
} else {
ctx.error('no document found, failed to apply model transaction, skipping', {
ctx.warn('no document found, failed to apply model transaction, skipping', {
_id: tx._id,
_class: tx._class,
objectId: cud.objectId
@ -424,7 +424,7 @@ export class ModelDb extends MemDb {
try {
this.delDoc((tx as TxRemoveDoc<Doc>).objectId)
} catch (err: any) {
ctx.error('no document found, failed to apply model transaction, skipping', {
ctx.warn('no document found, failed to apply model transaction, skipping', {
_id: tx._id,
_class: tx._class,
objectId: (tx as TxRemoveDoc<Doc>).objectId
@ -438,7 +438,7 @@ export class ModelDb extends MemDb {
this.updateDoc(mix.objectId, doc, mix)
TxProcessor.updateMixin4Doc(doc, mix)
} else {
ctx.error('no document found, failed to apply model transaction, skipping', {
ctx.warn('no document found, failed to apply model transaction, skipping', {
_id: tx._id,
_class: tx._class,
objectId: mix.objectId

View File

@ -23,20 +23,37 @@ import {
getConfig,
registerAdapterFactory,
registerDestroyFactory,
registerTxAdapterFactory
registerTxAdapterFactory,
sharedPipelineContextVars
} from '@hcengineering/server-pipeline'
import { join } from 'path'
import { createMongoAdapter, createMongoDestroyAdapter, createMongoTxAdapter } from '@hcengineering/mongo'
import {
createMongoAdapter,
createMongoDestroyAdapter,
createMongoTxAdapter,
shutdownMongo
} from '@hcengineering/mongo'
import {
createPostgreeDestroyAdapter,
createPostgresAdapter,
createPostgresTxAdapter,
setDBExtraOptions
setDBExtraOptions,
shutdownPostgres
} from '@hcengineering/postgres'
import { readFileSync } from 'node:fs'
const model = JSON.parse(readFileSync(process.env.MODEL_JSON ?? 'model.json').toString()) as Tx[]
// Register close on process exit.
process.on('exit', () => {
shutdownPostgres(sharedPipelineContextVars).catch((err) => {
console.error(err)
})
shutdownMongo(sharedPipelineContextVars).catch((err) => {
console.error(err)
})
})
const metricsContext = initStatisticsContext('backup', {
factory: () =>
new MeasureMetricsContext(
@ -84,5 +101,6 @@ startBackup(
externalStorage,
disableTriggers: true
})
}
},
sharedPipelineContextVars
)

View File

@ -30,13 +30,19 @@ import {
LowLevelMiddleware,
ModelMiddleware
} from '@hcengineering/middleware'
import { createMongoAdapter, createMongoDestroyAdapter, createMongoTxAdapter } from '@hcengineering/mongo'
import {
createMongoAdapter,
createMongoDestroyAdapter,
createMongoTxAdapter,
shutdownMongo
} from '@hcengineering/mongo'
import { PlatformError, setMetadata, unknownError } from '@hcengineering/platform'
import {
createPostgreeDestroyAdapter,
createPostgresAdapter,
createPostgresTxAdapter,
setDBExtraOptions
setDBExtraOptions,
shutdownPostgres
} from '@hcengineering/postgres'
import serverClientPlugin, { getTransactorEndpoint, getWorkspaceInfo } from '@hcengineering/server-client'
import serverCore, {
@ -56,7 +62,8 @@ import {
registerDestroyFactory,
registerServerPlugins,
registerStringLoaders,
registerTxAdapterFactory
registerTxAdapterFactory,
sharedPipelineContextVars
} from '@hcengineering/server-pipeline'
import serverToken, { decodeToken, generateToken, type Token } from '@hcengineering/server-token'
import cors from '@koa/cors'
@ -104,7 +111,8 @@ class WorkspaceIndexer {
branding: null,
modelDb,
hierarchy,
storageAdapter: externalStorage
storageAdapter: externalStorage,
contextVars: {}
}
result.pipeline = await createPipeline(ctx, middlewares, context)
@ -204,6 +212,15 @@ interface Search {
interface Reindex {
token: string
}
// Register close on process exit.
process.on('exit', () => {
shutdownPostgres(sharedPipelineContextVars).catch((err) => {
console.error(err)
})
shutdownMongo(sharedPipelineContextVars).catch((err) => {
console.error(err)
})
})
export async function startIndexer (
ctx: MeasureContext,

View File

@ -33,21 +33,37 @@ import {
registerDestroyFactory,
registerServerPlugins,
registerStringLoaders,
registerTxAdapterFactory
registerTxAdapterFactory,
sharedPipelineContextVars
} from '@hcengineering/server-pipeline'
import { createMongoAdapter, createMongoDestroyAdapter, createMongoTxAdapter } from '@hcengineering/mongo'
import {
createMongoAdapter,
createMongoDestroyAdapter,
createMongoTxAdapter,
shutdownMongo
} from '@hcengineering/mongo'
import {
createPostgreeDestroyAdapter,
createPostgresAdapter,
createPostgresTxAdapter,
setDBExtraOptions
setDBExtraOptions,
shutdownPostgres
} from '@hcengineering/postgres'
import { readFileSync } from 'node:fs'
const model = JSON.parse(readFileSync(process.env.MODEL_JSON ?? 'model.json').toString()) as Tx[]
registerStringLoaders()
// Register close on process exit.
process.on('exit', () => {
shutdownPostgres(sharedPipelineContextVars).catch((err) => {
console.error(err)
})
shutdownMongo(sharedPipelineContextVars).catch((err) => {
console.error(err)
})
})
/**
* @public
*/

View File

@ -92,7 +92,7 @@ import {
toAccountInfo,
verifyPassword
} from './utils'
import { getWorkspaceDestroyAdapter } from '@hcengineering/server-pipeline'
import { getWorkspaceDestroyAdapter, sharedPipelineContextVars } from '@hcengineering/server-pipeline'
import MD5 from 'crypto-js/md5'
function buildGravatarId (email: string): string {
@ -2364,7 +2364,7 @@ export async function dropWorkspaceFull (
const ws = await dropWorkspace(ctx, db, branding, workspaceId)
const adapter = getWorkspaceDestroyAdapter(dbUrl)
await adapter.deleteWorkspace(ctx, { name: ws.workspace })
await adapter.deleteWorkspace(ctx, sharedPipelineContextVars, { name: ws.workspace })
const wspace = getWorkspaceId(workspaceId)
const hasBucket = await storageAdapter?.exists(ctx, wspace)

View File

@ -23,6 +23,7 @@ import { MongoAccountDB } from './collections/mongo'
import { PostgresAccountDB } from './collections/postgres'
import { accountPlugin } from './plugin'
import type { Account, AccountDB, AccountInfo, RegionInfo, WorkspaceInfo } from './types'
import { sharedPipelineContextVars } from '@hcengineering/server-pipeline'
export async function getAccountDB (uri: string, dbNs?: string): Promise<[AccountDB, () => void]> {
const isMongo = uri.startsWith('mongodb://')
@ -41,7 +42,7 @@ export async function getAccountDB (uri: string, dbNs?: string): Promise<[Accoun
}
]
} else {
const client = getDBClient(uri)
const client = getDBClient(sharedPipelineContextVars, uri)
const pgClient = await client.getClient()
// TODO: if dbNs is provided put tables in that schema
const pgAccount = new PostgresAccountDB(pgClient)

View File

@ -37,7 +37,8 @@ export function startBackup (
workspace: WorkspaceIdWithUrl,
branding: Branding | null,
externalStorage: StorageAdapter
) => DbConfiguration
) => DbConfiguration,
contextVars: Record<string, any>
): void {
const config = _config()
setMetadata(serverToken.metadata.Secret, config.Secret)
@ -66,7 +67,8 @@ export function startBackup (
(ctx, workspace, branding, externalStorage) => {
return getConfig(ctx, mainDbUrl, workspace, branding, externalStorage)
},
config.Region
config.Region,
contextVars
)
process.on('SIGINT', shutdown)
@ -94,6 +96,7 @@ export async function backupWorkspace (
freshBackup: boolean = false,
clean: boolean = false,
downloadLimit: number,
contextVars: Record<string, any>,
onFinish?: (backupStorage: StorageAdapter, workspaceStorage: StorageAdapter) => Promise<void>
): Promise<boolean> {
@ -130,7 +133,8 @@ export async function backupWorkspace (
freshBackup,
clean,
downloadLimit,
[]
[],
contextVars
)
if (result && onFinish !== undefined) {
await onFinish(storageAdapter, workspaceStorageAdapter)

View File

@ -66,6 +66,7 @@ class BackupWorker {
externalStorage: StorageAdapter
) => DbConfiguration,
readonly region: string,
readonly contextVars: Record<string, any>,
readonly freshWorkspace: boolean = false,
readonly clean: boolean = false,
readonly skipDomains: string[] = []
@ -211,6 +212,7 @@ class BackupWorker {
const modelDb = new ModelDb(hierarchy)
const txAdapter = await adapterConf.factory(
ctx,
this.contextVars,
hierarchy,
adapterConf.url,
wsUrl,
@ -218,7 +220,7 @@ class BackupWorker {
this.workspaceStorageAdapter
)
try {
await txAdapter.init?.(ctx)
await txAdapter.init?.(ctx, this.contextVars)
return (
await txAdapter.rawFindAll<Tx>(
@ -292,9 +294,18 @@ export function backupService (
externalStorage: StorageAdapter
) => DbConfiguration,
region: string,
contextVars: Record<string, any>,
recheck?: boolean
): () => void {
const backupWorker = new BackupWorker(storage, config, pipelineFactory, workspaceStorageAdapter, getConfig, region)
const backupWorker = new BackupWorker(
storage,
config,
pipelineFactory,
workspaceStorageAdapter,
getConfig,
region,
contextVars
)
const shutdown = (): void => {
void backupWorker.close()
@ -322,6 +333,7 @@ export async function doBackupWorkspace (
clean: boolean,
downloadLimit: number,
skipDomains: string[],
contextVars: Record<string, any>,
notify?: (progress: number) => Promise<void>
): Promise<boolean> {
const backupWorker = new BackupWorker(
@ -331,6 +343,7 @@ export async function doBackupWorkspace (
workspaceStorageAdapter,
getConfig,
region,
contextVars,
freshWorkspace,
clean,
skipDomains

View File

@ -65,7 +65,12 @@ export type DbAdapterHandler = (
* @public
*/
export interface DbAdapter extends LowLevelStorage {
init?: (ctx: MeasureContext, domains?: string[], excludeDomains?: string[]) => Promise<void>
init?: (
ctx: MeasureContext,
contextVars: Record<string, any>,
domains?: string[],
excludeDomains?: string[]
) => Promise<void>
helper?: () => DomainHelperOperations
@ -103,7 +108,7 @@ export interface TxAdapter extends DbAdapter {
* @public
*/
export interface WorkspaceDestroyAdapter {
deleteWorkspace: (ctx: MeasureContext, workspace: WorkspaceId) => Promise<void>
deleteWorkspace: (ctx: MeasureContext, contextVars: Record<string, any>, workspace: WorkspaceId) => Promise<void>
}
/**
@ -111,6 +116,7 @@ export interface WorkspaceDestroyAdapter {
*/
export type DbAdapterFactory = (
ctx: MeasureContext,
contextVars: Record<string, any>,
hierarchy: Hierarchy,
url: string,
workspaceId: WorkspaceId,

View File

@ -119,6 +119,7 @@ class BenchmarkDbAdapter extends DummyDbAdapter {
*/
export async function createBenchmarkAdapter (
ctx: MeasureContext,
contextVars: Record<string, any>,
hierarchy: Hierarchy,
url: string,
workspaceId: WorkspaceId,

View File

@ -130,7 +130,9 @@ export class DbAdapterManagerImpl implements DBAdapterManager {
}
}
}
await ctx.with(`init adapter ${key}`, {}, (ctx) => adapter?.init?.(ctx, domains, excludeDomains))
await ctx.with(`init adapter ${key}`, {}, (ctx) =>
adapter?.init?.(ctx, this.context.contextVars, domains, excludeDomains)
)
}
}
}

View File

@ -177,6 +177,7 @@ class InMemoryAdapter extends DummyDbAdapter implements DbAdapter {
*/
export async function createInMemoryAdapter (
ctx: MeasureContext,
contextVars: Record<string, any>,
hierarchy: Hierarchy,
url: string,
workspaceId: WorkspaceId

View File

@ -22,6 +22,7 @@ import { DummyDbAdapter } from './mem'
*/
export async function createNullAdapter (
ctx: MeasureContext,
contextVars: Record<string, any>,
hierarchy: Hierarchy,
url: string,
workspaceId: WorkspaceId,

View File

@ -181,6 +181,8 @@ export interface PipelineContext {
derived?: Middleware
head?: Middleware
contextVars: Record<string, any>
broadcastEvent?: (ctx: MeasureContext, tx: Tx[]) => Promise<void>
}
/**

View File

@ -55,6 +55,7 @@ export class DBAdapterMiddleware extends BaseMiddleware implements Middleware {
key,
await adapterConf.factory(
ctx,
this.context.contextVars,
this.context.hierarchy,
adapterConf.url,
this.context.workspace,
@ -69,7 +70,7 @@ export class DBAdapterMiddleware extends BaseMiddleware implements Middleware {
const txAdapterName = this.conf.domains[DOMAIN_TX]
const txAdapter = adapters.get(txAdapterName) as TxAdapter
await txAdapter.init?.(metrics, [DOMAIN_TX, DOMAIN_MODEL_TX])
await txAdapter.init?.(metrics, this.context.contextVars, [DOMAIN_TX, DOMAIN_MODEL_TX])
const defaultAdapter = adapters.get(this.conf.defaultAdapter)
if (defaultAdapter === undefined) {

View File

@ -37,7 +37,6 @@ import core, {
type SessionData,
type Tx
} from '@hcengineering/core'
import { PlatformError, unknownError } from '@hcengineering/platform'
import type {
IndexedDoc,
Middleware,
@ -91,9 +90,6 @@ export class FullTextMiddleware extends BaseMiddleware implements Middleware {
}
async init (ctx: MeasureContext): Promise<void> {
if (this.context.adapterManager == null) {
throw new PlatformError(unknownError('Adapter manager should be specified'))
}
this.contexts = new Map(
this.context.modelDb.findAllSync(core.class.FullTextSearchContext, {}).map((it) => [it.toClass, it])
)

View File

@ -28,7 +28,7 @@ import core, {
} from '@hcengineering/core'
import { type DbAdapter, wrapAdapterToClient } from '@hcengineering/server-core'
import { createMongoAdapter, createMongoTxAdapter } from '..'
import { getMongoClient, type MongoClientReference, shutdown } from '../utils'
import { getMongoClient, type MongoClientReference, shutdownMongo } from '../utils'
import { genMinModel } from './minmodel'
import { createTaskModel, type Task, type TaskComment, taskPlugin } from './tasks'
@ -52,7 +52,7 @@ describe('mongo operations', () => {
afterAll(async () => {
mongoClient.close()
await shutdown()
await shutdownMongo()
})
beforeEach(async () => {
@ -80,6 +80,7 @@ describe('mongo operations', () => {
const mctx = new MeasureMetricsContext('', {})
const txStorage = await createMongoTxAdapter(
new MeasureMetricsContext('', {}),
{},
hierarchy,
mongodbUri,
getWorkspaceId(dbId),
@ -88,6 +89,7 @@ describe('mongo operations', () => {
serverStorage = await createMongoAdapter(
new MeasureMetricsContext('', {}),
{},
hierarchy,
mongodbUri,
getWorkspaceId(dbId),

View File

@ -22,7 +22,7 @@ export * from './utils'
export function createMongoDestroyAdapter (url: string): WorkspaceDestroyAdapter {
return {
deleteWorkspace: async (ctx, workspace): Promise<void> => {
deleteWorkspace: async (ctx, contextVars, workspace): Promise<void> => {
const client = getMongoClient(url)
try {
await ctx.with('delete-workspace', {}, async () => {

View File

@ -1807,6 +1807,7 @@ function translateLikeQuery (pattern: string): { $regex: string, $options: strin
*/
export async function createMongoAdapter (
ctx: MeasureContext,
contextVars: Record<string, any>,
hierarchy: Hierarchy,
url: string,
workspaceId: WorkspaceId,
@ -1825,6 +1826,7 @@ export async function createMongoAdapter (
*/
export async function createMongoTxAdapter (
ctx: MeasureContext,
contextVars: Record<string, any>,
hierarchy: Hierarchy,
url: string,
workspaceId: WorkspaceId,

View File

@ -27,19 +27,12 @@ import { MongoClient, type Collection, type Db, type Document } from 'mongodb'
const connections = new Map<string, MongoClientReferenceImpl>()
// Register mongo close on process exit.
process.on('exit', () => {
shutdown().catch((err) => {
console.error(err)
})
})
const clientRefs = new Map<string, ClientRef>()
/**
* @public
*/
export async function shutdown (): Promise<void> {
export async function shutdownMongo (contextVars: Record<string, any> = {}): Promise<void> {
for (const it of Array.from(clientRefs.values())) {
console.error((it as any).stack)
}

View File

@ -27,7 +27,7 @@ import core, {
} from '@hcengineering/core'
import { type DbAdapter, wrapAdapterToClient } from '@hcengineering/server-core'
import { createPostgresAdapter, createPostgresTxAdapter } from '..'
import { getDBClient, type PostgresClientReference, shutdown } from '../utils'
import { getDBClient, type PostgresClientReference, shutdownPostgres } from '../utils'
import { genMinModel } from './minmodel'
import { createTaskModel, type Task, type TaskComment, taskPlugin } from './tasks'
@ -35,12 +35,14 @@ const txes = genMinModel()
createTaskModel(txes)
const contextVars: Record<string, any> = {}
describe('postgres operations', () => {
const baseDbUri: string = process.env.DB_URL ?? 'postgresql://postgres:example@localhost:5433'
let dbId: string = 'pg_testdb_' + generateId()
let dbUuid: string = crypto.randomUUID()
let dbUri: string = baseDbUri + '/' + dbId
const clientRef: PostgresClientReference = getDBClient(baseDbUri)
const clientRef: PostgresClientReference = getDBClient(contextVars, baseDbUri)
let hierarchy: Hierarchy
let model: ModelDb
let client: Client
@ -49,7 +51,7 @@ describe('postgres operations', () => {
afterAll(async () => {
clientRef.close()
await shutdown()
await shutdownPostgres(contextVars)
})
beforeEach(async () => {
@ -88,6 +90,7 @@ describe('postgres operations', () => {
const mctx = new MeasureMetricsContext('', {})
const txStorage = await createPostgresTxAdapter(
mctx,
contextVars,
hierarchy,
dbUri,
{
@ -107,6 +110,7 @@ describe('postgres operations', () => {
const ctx = new MeasureMetricsContext('client', {})
const serverStorage = await createPostgresAdapter(
ctx,
contextVars,
hierarchy,
dbUri,
{
@ -115,7 +119,7 @@ describe('postgres operations', () => {
},
model
)
await serverStorage.init?.(ctx)
await serverStorage.init?.(ctx, contextVars)
client = await createClient(async (handler) => {
return wrapAdapterToClient(ctx, serverStorage, txes)
})

View File

@ -19,12 +19,20 @@ import { getDBClient, retryTxn } from './utils'
export { getDocFieldsByDomains, translateDomain } from './schemas'
export * from './storage'
export { convertDoc, createTables, getDBClient, retryTxn, setDBExtraOptions, setExtraOptions } from './utils'
export {
convertDoc,
createTables,
getDBClient,
retryTxn,
setDBExtraOptions,
setExtraOptions,
shutdownPostgres
} from './utils'
export function createPostgreeDestroyAdapter (url: string): WorkspaceDestroyAdapter {
return {
deleteWorkspace: async (ctx, workspace): Promise<void> => {
const client = getDBClient(url)
deleteWorkspace: async (ctx, contextVars, workspace): Promise<void> => {
const client = getDBClient(contextVars, url)
try {
const connection = await client.getClient()

View File

@ -93,7 +93,6 @@ import {
parseUpdate,
type PostgresClientReference
} from './utils'
async function * createCursorGenerator (
client: postgres.ReservedSql,
sql: string,
@ -192,11 +191,10 @@ class ConnectionInfo {
}
}
const connections = new Map<string, ConnectionInfo>()
class ConnectionMgr {
constructor (
protected readonly client: postgres.Sql,
protected readonly connections: () => Map<string, ConnectionInfo>,
readonly mgrId: string
) {}
@ -296,10 +294,10 @@ class ConnectionMgr {
}
release (id: string): void {
const conn = connections.get(id)
const conn = this.connections().get(id)
if (conn !== undefined) {
conn.released = true
connections.delete(id) // We need to delete first
this.connections().delete(id) // We need to delete first
conn.release()
} else {
console.log('wrne')
@ -307,10 +305,11 @@ class ConnectionMgr {
}
close (): void {
for (const [k, conn] of Array.from(connections.entries()).filter(
const cnts = this.connections()
for (const [k, conn] of Array.from(cnts.entries()).filter(
([, it]: [string, ConnectionInfo]) => it.mgrId === this.mgrId
)) {
connections.delete(k)
cnts.delete(k)
try {
conn.release()
} catch (err: any) {
@ -320,12 +319,12 @@ class ConnectionMgr {
}
getConnection (id: string, managed: boolean = true): ConnectionInfo {
let conn = connections.get(id)
let conn = this.connections().get(id)
if (conn === undefined) {
conn = new ConnectionInfo(this.mgrId, id, this.client, managed)
}
if (managed) {
connections.set(id, conn)
this.connections().set(id, conn)
}
return conn
}
@ -407,6 +406,8 @@ abstract class PostgresAdapterBase implements DbAdapter {
protected readonly tableFields = new Map<string, string[]>()
protected readonly workspaceId: WorkspaceId
protected connections = new Map<string, ConnectionInfo>()
mgr: ConnectionMgr
constructor (
@ -422,7 +423,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
name: enrichedWorkspaceId.uuid ?? enrichedWorkspaceId.name
}
this._helper = new DBCollectionHelper(this.client, this.workspaceId)
this.mgr = new ConnectionMgr(client, mgrId)
this.mgr = new ConnectionMgr(client, () => this.connections, mgrId)
}
reserveContext (id: string): () => void {
@ -430,7 +431,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
return () => {
conn.released = true
conn.release()
connections.delete(id) // We need to delete first
this.connections.delete(id) // We need to delete first
}
}
@ -477,7 +478,12 @@ abstract class PostgresAdapterBase implements DbAdapter {
on?: ((handler: DbAdapterHandler) => void) | undefined
abstract init (ctx: MeasureContext, domains?: string[], excludeDomains?: string[]): Promise<void>
abstract init (
ctx: MeasureContext,
contextVars: Record<string, any>,
domains?: string[],
excludeDomains?: string[]
): Promise<void>
async close (): Promise<void> {
this.mgr.close()
@ -1672,7 +1678,14 @@ interface OperationBulk {
const initRateLimit = new RateLimiter(1)
class PostgresAdapter extends PostgresAdapterBase {
async init (ctx: MeasureContext, domains?: string[], excludeDomains?: string[]): Promise<void> {
async init (
ctx: MeasureContext,
contextVars: Record<string, any>,
domains?: string[],
excludeDomains?: string[]
): Promise<void> {
this.connections = contextVars.cntInfoPG ?? new Map<string, ConnectionInfo>()
contextVars.cntInfoPG = this.connections
let resultDomains = domains ?? this.hierarchy.domains()
if (excludeDomains !== undefined) {
resultDomains = resultDomains.filter((it) => !excludeDomains.includes(it))
@ -1977,7 +1990,15 @@ class PostgresAdapter extends PostgresAdapterBase {
}
class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter {
async init (ctx: MeasureContext, domains?: string[], excludeDomains?: string[]): Promise<void> {
async init (
ctx: MeasureContext,
contextVars: Record<string, any>,
domains?: string[],
excludeDomains?: string[]
): Promise<void> {
this.connections = contextVars.cntInfoPG ?? new Map<string, ConnectionInfo>()
contextVars.cntInfoPG = this.connections
const resultDomains = domains ?? [DOMAIN_TX, DOMAIN_MODEL_TX]
await initRateLimit.exec(async () => {
const url = this.refClient.url()
@ -2035,12 +2056,13 @@ class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter {
*/
export async function createPostgresAdapter (
ctx: MeasureContext,
contextVars: Record<string, any>,
hierarchy: Hierarchy,
url: string,
workspaceId: WorkspaceId,
modelDb: ModelDb
): Promise<DbAdapter> {
const client = getDBClient(url)
const client = getDBClient(contextVars, url)
const connection = await client.getClient()
return new PostgresAdapter(connection, client, workspaceId, hierarchy, modelDb, 'default-' + workspaceId.name)
}
@ -2050,12 +2072,13 @@ export async function createPostgresAdapter (
*/
export async function createPostgresTxAdapter (
ctx: MeasureContext,
contextVars: Record<string, any>,
hierarchy: Hierarchy,
url: string,
workspaceId: WorkspaceId,
modelDb: ModelDb
): Promise<TxAdapter> {
const client = getDBClient(url)
const client = getDBClient(contextVars, url)
const connection = await client.getClient()
return new PostgresTxAdapter(connection, client, workspaceId, hierarchy, modelDb, 'tx' + workspaceId.name)
}

View File

@ -43,15 +43,6 @@ import {
translateDomain
} from './schemas'
const connections = new Map<string, PostgresClientReferenceImpl>()
// Register close on process exit.
process.on('exit', () => {
shutdown().catch((err) => {
console.error(err)
})
})
const clientRefs = new Map<string, ClientRef>()
const loadedDomains = new Set<string>()
@ -195,7 +186,12 @@ async function createTable (client: postgres.Sql, domain: string): Promise<void>
/**
* @public
*/
export async function shutdown (): Promise<void> {
export async function shutdownPostgres (contextVars: Record<string, any>): Promise<void> {
const connections: Map<string, PostgresClientReferenceImpl> | undefined =
contextVars.pgConnections ?? new Map<string, PostgresClientReferenceImpl>()
if (connections === undefined) {
return
}
for (const c of connections.values()) {
c.close(true)
}
@ -305,9 +301,16 @@ export function setExtraOptions (options: DBExtraOptions): void {
* Initialize a workspace connection to DB
* @public
*/
export function getDBClient (connectionString: string, database?: string): PostgresClientReference {
export function getDBClient (
contextVars: Record<string, any>,
connectionString: string,
database?: string
): PostgresClientReference {
const extraOptions = JSON.parse(process.env.POSTGRES_OPTIONS ?? '{}')
const key = `${connectionString}${extraOptions}`
const connections = contextVars.pgConnections ?? new Map<string, PostgresClientReferenceImpl>()
contextVars.pgConnections = connections
let existing = connections.get(key)
if (existing === undefined) {

View File

@ -62,7 +62,15 @@ class StorageBlobAdapter implements DbAdapter {
}
}
init?: ((ctx: MeasureContext, domains?: string[], excludeDomains?: string[]) => Promise<void>) | undefined
init?:
| ((
ctx: MeasureContext,
contextVars: Record<string, any>,
domains?: string[],
excludeDomains?: string[]
) => Promise<void>)
| undefined
on?: ((handler: DbAdapterHandler) => void) | undefined
async rawFindAll<T extends Doc>(domain: Domain, query: DocumentQuery<T>, options?: FindOptions<T>): Promise<T[]> {
@ -130,6 +138,7 @@ class StorageBlobAdapter implements DbAdapter {
*/
export async function createStorageDataAdapter (
ctx: MeasureContext,
contextVars: Record<string, any>,
hierarchy: Hierarchy,
url: string,
workspaceId: WorkspaceId,

View File

@ -77,6 +77,12 @@ export function getTxAdapterFactory (
return adapter.factory
}
/**
* A pipelice context used by standalong services to hold global variables.
* In case of Durable Objects, it should not be shared and individual context should be created.
*/
export const sharedPipelineContextVars: Record<string, any> = {}
/**
* @public
*/
@ -94,6 +100,7 @@ export function createServerPipeline (
externalStorage: StorageAdapter
extraLogging?: boolean // If passed, will log every request/etc.
pipelineContextVars?: Record<string, any>
},
extensions?: Partial<DbConfiguration>
): PipelineFactory {
@ -137,7 +144,8 @@ export function createServerPipeline (
branding,
modelDb,
hierarchy,
storageAdapter: opt.externalStorage
storageAdapter: opt.externalStorage,
contextVars: opt.pipelineContextVars ?? sharedPipelineContextVars
}
return createPipeline(ctx, middlewares, context)
}
@ -183,7 +191,8 @@ export function createBackupPipeline (
branding,
modelDb,
hierarchy,
storageAdapter: opt.externalStorage
storageAdapter: opt.externalStorage,
contextVars: {}
}
return createPipeline(ctx, middlewares, context)
}

View File

@ -102,7 +102,7 @@ export class ClientSession implements Session {
async loadModel (ctx: ClientSessionCtx, lastModelTx: Timestamp, hash?: string): Promise<void> {
this.includeSessionContext(ctx.ctx, ctx.pipeline)
const result = await ctx.ctx.with('load-model', {}, () => ctx.pipeline.loadModel(ctx.ctx, lastModelTx, hash))
const result = await ctx.ctx.with('load-model', {}, (_ctx) => ctx.pipeline.loadModel(_ctx, lastModelTx, hash))
await ctx.sendResponse(ctx.requestId, result)
}

View File

@ -40,8 +40,18 @@ import { FileModelLogger, prepareTools } from '@hcengineering/server-tool'
import path from 'path'
import { Analytics } from '@hcengineering/analytics'
import { createMongoAdapter, createMongoDestroyAdapter, createMongoTxAdapter } from '@hcengineering/mongo'
import { createPostgreeDestroyAdapter, createPostgresAdapter, createPostgresTxAdapter } from '@hcengineering/postgres'
import {
createMongoAdapter,
createMongoDestroyAdapter,
createMongoTxAdapter,
shutdownMongo
} from '@hcengineering/mongo'
import {
createPostgreeDestroyAdapter,
createPostgresAdapter,
createPostgresTxAdapter,
shutdownPostgres
} from '@hcengineering/postgres'
import { doBackupWorkspace, doRestoreWorkspace } from '@hcengineering/server-backup'
import type { PipelineFactory, StorageAdapter } from '@hcengineering/server-core'
import {
@ -52,7 +62,8 @@ import {
registerDestroyFactory,
registerServerPlugins,
registerStringLoaders,
registerTxAdapterFactory
registerTxAdapterFactory,
sharedPipelineContextVars
} from '@hcengineering/server-pipeline'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import { createWorkspace, upgradeWorkspace } from './ws-operations'
@ -72,6 +83,16 @@ export interface WorkspaceOptions {
}
}
// Register close on process exit.
process.on('exit', () => {
shutdownPostgres(sharedPipelineContextVars).catch((err) => {
console.error(err)
})
shutdownMongo(sharedPipelineContextVars).catch((err) => {
console.error(err)
})
})
export type WorkspaceOperation = 'create' | 'upgrade' | 'all' | 'all+backup'
export class WorkspaceWorker {
@ -348,7 +369,7 @@ export class WorkspaceWorker {
async doCleanup (ctx: MeasureContext, workspace: BaseWorkspaceInfo): Promise<void> {
const { dbUrl } = prepareTools([])
const adapter = getWorkspaceDestroyAdapter(dbUrl)
await adapter.deleteWorkspace(ctx, { name: workspace.workspace })
await adapter.deleteWorkspace(ctx, sharedPipelineContextVars, { name: workspace.workspace })
}
private async doWorkspaceOperation (
@ -500,6 +521,7 @@ export class WorkspaceWorker {
archive,
50000,
['blob'],
sharedPipelineContextVars,
(_p: number) => {
if (progress !== Math.round(_p)) {
progress = Math.round(_p)

View File

@ -16,7 +16,7 @@ import core, {
import { consoleModelLogger, type MigrateOperation, type ModelLogger } from '@hcengineering/model'
import { getTransactorEndpoint } from '@hcengineering/server-client'
import { SessionDataImpl, wrapPipeline, type Pipeline, type StorageAdapter } from '@hcengineering/server-core'
import { getServerPipeline, getTxAdapterFactory } from '@hcengineering/server-pipeline'
import { getServerPipeline, getTxAdapterFactory, sharedPipelineContextVars } from '@hcengineering/server-pipeline'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import { generateToken } from '@hcengineering/server-token'
import { initializeWorkspace, initModel, prepareTools, updateModel, upgradeModel } from '@hcengineering/server-tool'
@ -82,7 +82,7 @@ export async function createWorkspace (
externalStorage: storageAdapter,
usePassedCtx: true
})
const txAdapter = await txFactory(ctx, hierarchy, dbUrl, wsId, modelDb, storageAdapter)
const txAdapter = await txFactory(ctx, sharedPipelineContextVars, hierarchy, dbUrl, wsId, modelDb, storageAdapter)
await childLogger.withLog('init-workspace', {}, (ctx) =>
initModel(ctx, wsId, txes, txAdapter, storageAdapter, ctxModellogger, async (value) => {})
)

View File

@ -54,10 +54,10 @@ import { CloudFlareLogger } from './logger'
import model from './model.json'
// import { configureAnalytics } from '@hcengineering/analytics-service'
// import { Analytics } from '@hcengineering/analytics'
import contactPlugin from '@hcengineering/contact'
import serverAiBot from '@hcengineering/server-ai-bot'
import serverNotification from '@hcengineering/server-notification'
import serverTelegram from '@hcengineering/server-telegram'
import contactPlugin from '@hcengineering/contact'
export const PREFERRED_SAVE_SIZE = 500
export const PREFERRED_SAVE_INTERVAL = 30 * 1000
@ -75,6 +75,8 @@ export class Transactor extends DurableObject<Env> {
private readonly sessions = new Map<WebSocket, WebsocketData>()
private readonly contextVars: Record<string, any> = {}
constructor (ctx: DurableObjectState, env: Env) {
super(ctx, env)
@ -135,11 +137,12 @@ export class Transactor extends DurableObject<Env> {
adapterSecurity: false,
disableTriggers: false,
fulltextUrl: env.FULLTEXT_URL,
extraLogging: true
extraLogging: true,
pipelineContextVars: this.contextVars
})
const result = await pipeline(ctx, ws, upgrade, broadcast, branding)
const client = getDBClient(dbUrl)
const client = getDBClient(this.contextVars, dbUrl)
const connection = await client.getClient()
const t1 = Date.now()
await connection`select now()`