platform/server/core/src/pipeline.ts

138 lines
3.9 KiB
TypeScript
Raw Normal View History

//
// Copyright © 2022 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import {
type Class,
type Doc,
type DocumentQuery,
type Domain,
type FindOptions,
type FindResult,
type MeasureContext,
type ModelDb,
type Ref,
type SearchOptions,
type SearchQuery,
type SearchResult,
type StorageIterator,
type Tx,
type TxResult
} from '@hcengineering/core'
import { type DbConfiguration } from './configuration'
import { createServerStorage } from './server'
import {
type BroadcastFunc,
type Middleware,
type MiddlewareCreator,
type Pipeline,
type ServerStorage,
type SessionContext
} from './types'
/**
* @public
*/
export async function createPipeline (
ctx: MeasureContext,
conf: DbConfiguration,
constructors: MiddlewareCreator[],
upgrade: boolean,
broadcast: BroadcastFunc
): Promise<Pipeline> {
const storage = await ctx.with(
'create-server-storage',
{},
async (ctx) =>
await createServerStorage(ctx, conf, {
upgrade,
broadcast
})
)
const pipelineResult = await PipelineImpl.create(ctx.newChild('pipeline-operations', {}), storage, constructors)
return pipelineResult
}
class PipelineImpl implements Pipeline {
private head: Middleware | undefined
readonly modelDb: ModelDb
private constructor (readonly storage: ServerStorage) {
this.modelDb = storage.modelDb
}
static async create (
ctx: MeasureContext,
storage: ServerStorage,
constructors: MiddlewareCreator[]
): Promise<PipelineImpl> {
const pipeline = new PipelineImpl(storage)
pipeline.head = await pipeline.buildChain(ctx, constructors)
return pipeline
}
private async buildChain (ctx: MeasureContext, constructors: MiddlewareCreator[]): Promise<Middleware | undefined> {
let current: Middleware | undefined
for (let index = constructors.length - 1; index >= 0; index--) {
const element = constructors[index]
current = await ctx.with('build chain', {}, async (ctx) => await element(ctx, this.storage, current))
}
return current
}
async findAll<T extends Doc>(
ctx: SessionContext,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
return this.head !== undefined
? await this.head.findAll(ctx, _class, query, options)
: await this.storage.findAll(ctx.ctx, _class, query, options)
}
async searchFulltext (ctx: SessionContext, query: SearchQuery, options: SearchOptions): Promise<SearchResult> {
return this.head !== undefined
? await this.head.searchFulltext(ctx, query, options)
: await this.storage.searchFulltext(ctx.ctx, query, options)
}
async tx (ctx: SessionContext, tx: Tx): Promise<TxResult> {
if (this.head === undefined) {
return await this.storage.tx(ctx, tx)
} else {
return await this.head.tx(ctx, tx)
}
}
async close (): Promise<void> {
await this.storage.close()
}
find (ctx: MeasureContext, domain: Domain): StorageIterator {
return this.storage.find(ctx, domain)
}
async load (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return await this.storage.load(ctx, domain, docs)
}
async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise<void> {
await this.storage.upload(ctx, domain, docs)
}
async clean (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
await this.storage.clean(ctx, domain, docs)
}
}