platform/server/server-pipeline/src/pipeline.ts
Andrey Sobolev 6fd8018baf
UBERF-8259: Do not store system model into DB (#6716)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
2024-09-25 09:27:45 +05:00

388 lines
10 KiB
TypeScript

/* eslint-disable @typescript-eslint/unbound-method */
import {
DOMAIN_BENCHMARK,
DOMAIN_BLOB,
DOMAIN_FULLTEXT_BLOB,
DOMAIN_MODEL,
DOMAIN_TRANSIENT,
DOMAIN_TX,
Hierarchy,
ModelDb,
type Branding,
type MeasureContext,
type Tx,
type WorkspaceIdWithUrl
} from '@hcengineering/core'
import { createElasticAdapter, createElasticBackupDataAdapter } from '@hcengineering/elastic'
import {
ApplyTxMiddleware,
BroadcastMiddleware,
ConfigurationMiddleware,
ContextNameMiddleware,
DBAdapterInitMiddleware,
DBAdapterMiddleware,
DomainFindMiddleware,
DomainTxMiddleware,
LiveQueryMiddleware,
LookupMiddleware,
LowLevelMiddleware,
MarkDerivedEntryMiddleware,
ModelMiddleware,
ModifiedMiddleware,
PrivateMiddleware,
QueryJoinMiddleware,
SpacePermissionsMiddleware,
SpaceSecurityMiddleware,
TriggersMiddleware,
TxMiddleware,
NotificationsMiddleware
} from '@hcengineering/middleware'
import { createMongoAdapter, createMongoTxAdapter } from '@hcengineering/mongo'
import { createPostgresAdapter, createPostgresTxAdapter } from '@hcengineering/postgres'
import {
buildStorageFromConfig,
createNullAdapter,
createRekoniAdapter,
createStorageDataAdapter,
createYDocAdapter,
storageConfigFromEnv
} from '@hcengineering/server'
import {
createBenchmarkAdapter,
createInMemoryAdapter,
createPipeline,
DummyDbAdapter,
DummyFullTextAdapter,
FullTextMiddleware,
type AggregatorStorageAdapter,
type DbAdapterFactory,
type DbConfiguration,
type Middleware,
type MiddlewareCreator,
type Pipeline,
type PipelineContext,
type PipelineFactory,
type StorageAdapter,
type StorageConfiguration
} from '@hcengineering/server-core'
import { createIndexStages } from './indexing'
/**
* @public
*/
export function getTxAdapterFactory (
metrics: MeasureContext,
dbUrls: string,
workspace: WorkspaceIdWithUrl,
branding: Branding | null,
opt: {
fullTextUrl: string
rekoniUrl: string
indexProcessing: number // 1000
indexParallel: number // 2
disableTriggers?: boolean
usePassedCtx?: boolean
externalStorage: StorageAdapter
},
extensions?: Partial<DbConfiguration>
): DbAdapterFactory {
const conf = getConfig(metrics, dbUrls, workspace, branding, metrics, opt, extensions)
const adapterName = conf.domains[DOMAIN_TX] ?? conf.defaultAdapter
const adapter = conf.adapters[adapterName]
return adapter.factory
}
/**
* @public
*/
export function createServerPipeline (
metrics: MeasureContext,
dbUrls: string,
model: Tx[],
opt: {
fullTextUrl: string
rekoniUrl: string
indexProcessing: number // 1000
indexParallel: number // 2
disableTriggers?: boolean
usePassedCtx?: boolean
adapterSecurity?: boolean
externalStorage: StorageAdapter
},
extensions?: Partial<DbConfiguration>
): PipelineFactory {
return (ctx, workspace, upgrade, broadcast, branding) => {
const metricsCtx = opt.usePassedCtx === true ? ctx : metrics
const wsMetrics = metricsCtx.newChild('🧲 session', {})
const conf = getConfig(metrics, dbUrls, workspace, branding, wsMetrics, opt, extensions)
const middlewares: MiddlewareCreator[] = [
LookupMiddleware.create,
ModifiedMiddleware.create,
PrivateMiddleware.create,
NotificationsMiddleware.create,
(ctx: MeasureContext, context: PipelineContext, next?: Middleware) =>
SpaceSecurityMiddleware.create(opt.adapterSecurity ?? false, ctx, context, next),
SpacePermissionsMiddleware.create,
ConfigurationMiddleware.create,
LowLevelMiddleware.create,
ContextNameMiddleware.create,
MarkDerivedEntryMiddleware.create,
ApplyTxMiddleware.create, // Extract apply
TxMiddleware.create, // Store tx into transaction domain
...(opt.disableTriggers === true ? [] : [TriggersMiddleware.create]),
FullTextMiddleware.create(conf, upgrade),
QueryJoinMiddleware.create,
LiveQueryMiddleware.create,
DomainFindMiddleware.create,
DomainTxMiddleware.create,
DBAdapterInitMiddleware.create,
ModelMiddleware.create(model),
DBAdapterMiddleware.create(conf), // Configure DB adapters
BroadcastMiddleware.create(broadcast)
]
const hierarchy = new Hierarchy()
const modelDb = new ModelDb(hierarchy)
const context: PipelineContext = {
workspace,
branding,
modelDb,
hierarchy,
storageAdapter: opt.externalStorage
}
return createPipeline(ctx, middlewares, context)
}
}
/**
* @public
*/
export function createBackupPipeline (
metrics: MeasureContext,
dbUrls: string,
systemTx: Tx[],
opt: {
usePassedCtx?: boolean
adapterSecurity?: boolean
externalStorage: StorageAdapter
}
): PipelineFactory {
return (ctx, workspace, upgrade, broadcast, branding) => {
const metricsCtx = opt.usePassedCtx === true ? ctx : metrics
const wsMetrics = metricsCtx.newChild('🧲 backup', {})
const conf = getConfig(
metrics,
dbUrls,
workspace,
branding,
wsMetrics,
{
...opt,
fullTextUrl: '',
indexParallel: 0,
indexProcessing: 0,
rekoniUrl: '',
disableTriggers: true
},
{
adapters: {
FullTextBlob: {
factory: async () => new DummyDbAdapter(),
url: ''
}
},
fulltextAdapter: {
factory: async () => new DummyFullTextAdapter(),
stages: () => [],
url: ''
}
}
)
const middlewares: MiddlewareCreator[] = [
LowLevelMiddleware.create,
ContextNameMiddleware.create,
DomainFindMiddleware.create,
DBAdapterInitMiddleware.create,
ModelMiddleware.create(systemTx),
DBAdapterMiddleware.create(conf)
]
const hierarchy = new Hierarchy()
const modelDb = new ModelDb(hierarchy)
const context: PipelineContext = {
workspace,
branding,
modelDb,
hierarchy,
storageAdapter: opt.externalStorage
}
return createPipeline(ctx, middlewares, context)
}
}
export async function getServerPipeline (
ctx: MeasureContext,
model: Tx[],
mongodbUri: string,
dbUrl: string | undefined,
wsUrl: WorkspaceIdWithUrl
): Promise<{
pipeline: Pipeline
storageAdapter: AggregatorStorageAdapter
}> {
const dbUrls = dbUrl !== undefined ? `${dbUrl};${mongodbUri}` : mongodbUri
const storageConfig: StorageConfiguration = storageConfigFromEnv()
const storageAdapter = buildStorageFromConfig(storageConfig, mongodbUri)
const pipelineFactory = createServerPipeline(
ctx,
dbUrls,
model,
{
externalStorage: storageAdapter,
fullTextUrl: 'http://localhost:9200',
indexParallel: 0,
indexProcessing: 0,
rekoniUrl: '',
usePassedCtx: true,
disableTriggers: true
},
{
fulltextAdapter: {
factory: async () => new DummyFullTextAdapter(),
url: '',
stages: (adapter, storage, storageAdapter, contentAdapter) =>
createIndexStages(
ctx.newChild('stages', {}),
wsUrl,
null,
adapter,
storage,
storageAdapter,
contentAdapter,
0,
0
)
}
}
)
try {
return {
pipeline: await pipelineFactory(ctx, wsUrl, true, () => {}, null),
storageAdapter
}
} catch (err: any) {
await storageAdapter.close()
throw err
}
}
export function getConfig (
metrics: MeasureContext,
dbUrls: string,
workspace: WorkspaceIdWithUrl,
branding: Branding | null,
ctx: MeasureContext,
opt: {
fullTextUrl: string
rekoniUrl: string
indexProcessing: number // 1000
indexParallel: number // 2
disableTriggers?: boolean
usePassedCtx?: boolean
externalStorage: StorageAdapter
},
extensions?: Partial<DbConfiguration>
): DbConfiguration {
const metricsCtx = opt.usePassedCtx === true ? ctx : metrics
const wsMetrics = metricsCtx.newChild('🧲 session', {})
const [dbUrl, mongoUrl] = dbUrls.split(';')
const conf: DbConfiguration = {
domains: {
[DOMAIN_TX]: 'Tx',
[DOMAIN_TRANSIENT]: 'InMemory',
[DOMAIN_BLOB]: 'StorageData',
[DOMAIN_FULLTEXT_BLOB]: 'FullTextBlob',
[DOMAIN_MODEL]: 'Null',
[DOMAIN_BENCHMARK]: 'Benchmark',
...extensions?.domains
},
metrics: wsMetrics,
defaultAdapter: extensions?.defaultAdapter ?? 'Main',
adapters: {
Tx: {
factory: mongoUrl !== undefined ? createPostgresTxAdapter : createMongoTxAdapter,
url: dbUrl
},
Main: {
factory: mongoUrl !== undefined ? createPostgresAdapter : createMongoAdapter,
url: dbUrl
},
Null: {
factory: createNullAdapter,
url: ''
},
InMemory: {
factory: createInMemoryAdapter,
url: ''
},
StorageData: {
factory: createStorageDataAdapter,
url: mongoUrl ?? dbUrl
},
FullTextBlob: {
factory: createElasticBackupDataAdapter,
url: opt.fullTextUrl
},
Benchmark: {
factory: createBenchmarkAdapter,
url: ''
},
...extensions?.adapters
},
fulltextAdapter: extensions?.fulltextAdapter ?? {
factory: createElasticAdapter,
url: opt.fullTextUrl,
stages: (adapter, storage, storageAdapter, contentAdapter) =>
createIndexStages(
wsMetrics.newChild('stages', {}),
workspace,
branding,
adapter,
storage,
storageAdapter,
contentAdapter,
opt.indexParallel,
opt.indexProcessing
)
},
serviceAdapters: extensions?.serviceAdapters ?? {},
contentAdapters: {
Rekoni: {
factory: createRekoniAdapter,
contentType: '*',
url: opt.rekoniUrl
},
YDoc: {
factory: createYDocAdapter,
contentType: 'application/ydoc',
url: ''
},
...extensions?.contentAdapters
},
defaultContentAdapter: extensions?.defaultContentAdapter ?? 'Rekoni'
}
return conf
}