Connection mgr (#7031)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
Signed-off-by: Denis Bykhov <bykhov.denis@gmail.com>
Co-authored-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Denis Bykhov 2024-10-24 23:17:46 +05:00 committed by GitHub
parent 723026cc59
commit 34d1b2bdd1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 195 additions and 101 deletions

View File

@ -69,6 +69,8 @@ export interface DbAdapter extends LowLevelStorage {
helper?: () => DomainHelperOperations helper?: () => DomainHelperOperations
closeContext?: (ctx: MeasureContext) => Promise<void>
close: () => Promise<void> close: () => Promise<void>
findAll: <T extends Doc>( findAll: <T extends Doc>(
ctx: MeasureContext, ctx: MeasureContext,

View File

@ -42,6 +42,18 @@ export class DbAdapterManagerImpl implements DBAdapterManager {
private readonly adapters: Map<string, DbAdapter> private readonly adapters: Map<string, DbAdapter>
) {} ) {}
async closeContext (ctx: MeasureContext): Promise<void> {
for (const adapter of this.adapters.values()) {
try {
if (adapter.closeContext !== undefined) {
await adapter.closeContext(ctx)
}
} catch (err: any) {
Analytics.handleError(err)
}
}
}
getDefaultAdapter (): DbAdapter { getDefaultAdapter (): DbAdapter {
return this.defaultAdapter return this.defaultAdapter
} }

View File

@ -138,6 +138,8 @@ export interface DBAdapterManager {
registerHelper: (helper: DomainHelper) => Promise<void> registerHelper: (helper: DomainHelper) => Promise<void>
initAdapters: (ctx: MeasureContext) => Promise<void> initAdapters: (ctx: MeasureContext) => Promise<void>
closeContext: (ctx: MeasureContext) => Promise<void>
} }
export interface PipelineContext { export interface PipelineContext {
@ -157,6 +159,8 @@ export interface PipelineContext {
head?: Middleware head?: Middleware
broadcastEvent?: (ctx: MeasureContext, tx: Tx[]) => Promise<void> broadcastEvent?: (ctx: MeasureContext, tx: Tx[]) => Promise<void>
endContext?: (ctx: MeasureContext) => Promise<void>
} }
/** /**
* @public * @public

View File

@ -0,0 +1,32 @@
import { generateId, type MeasureContext, type Tx } from '@hcengineering/core'
import {
BaseMiddleware,
type Middleware,
type PipelineContext,
type TxMiddlewareResult
} from '@hcengineering/server-core'
/**
* Will support apply tx
* @public
*/
export class ConnectionMgrMiddleware extends BaseMiddleware implements Middleware {
static async create (
ctx: MeasureContext,
context: PipelineContext,
next?: Middleware
): Promise<Middleware | undefined> {
return new ConnectionMgrMiddleware(context, next)
}
async tx (ctx: MeasureContext, tx: Tx[]): Promise<TxMiddlewareResult> {
if (ctx.id === undefined) {
ctx.id = generateId()
}
const result = await this.provideTx(ctx, tx)
this.context.endContext = async (_ctx: MeasureContext) => {
await this.context.adapterManager?.closeContext?.(ctx)
}
return result
}
}

View File

@ -16,6 +16,7 @@
export * from './applyTx' export * from './applyTx'
export * from './broadcast' export * from './broadcast'
export * from './configuration' export * from './configuration'
export * from './connectionMgr'
export * from './contextName' export * from './contextName'
export * from './dbAdapter' export * from './dbAdapter'
export * from './dbAdapterHelper' export * from './dbAdapterHelper'
@ -27,10 +28,10 @@ export * from './lookup'
export * from './lowLevel' export * from './lowLevel'
export * from './model' export * from './model'
export * from './modified' export * from './modified'
export * from './notifications'
export * from './private' export * from './private'
export * from './queryJoin' export * from './queryJoin'
export * from './spacePermissions' export * from './spacePermissions'
export * from './spaceSecurity' export * from './spaceSecurity'
export * from './triggers' export * from './triggers'
export * from './txPush' export * from './txPush'
export * from './notifications'

View File

@ -229,6 +229,7 @@ export class TriggersMiddleware extends BaseMiddleware implements Middleware {
// We need to send all to recipients // We need to send all to recipients
await this.context.head?.handleBroadcast(ctx) await this.context.head?.handleBroadcast(ctx)
} }
await this.context.endContext?.(ctx)
} }
private async processDerivedTxes (ctx: MeasureContext<SessionData>, derived: Tx[]): Promise<void> { private async processDerivedTxes (ctx: MeasureContext<SessionData>, derived: Tx[]): Promise<void> {

View File

@ -65,7 +65,7 @@ import {
updateHashForDoc updateHashForDoc
} from '@hcengineering/server-core' } from '@hcengineering/server-core'
import { createHash } from 'crypto' import { createHash } from 'crypto'
import { type Pool, type PoolClient } from 'pg' import { Pool, type PoolClient } from 'pg'
import { type ValueType } from './types' import { type ValueType } from './types'
import { import {
convertDoc, convertDoc,
@ -89,18 +89,20 @@ abstract class PostgresAdapterBase implements DbAdapter {
protected readonly _helper: DBCollectionHelper protected readonly _helper: DBCollectionHelper
protected readonly tableFields = new Map<string, string[]>() protected readonly tableFields = new Map<string, string[]>()
protected readonly queue: ((client: PoolClient) => Promise<any>)[] = [] protected readonly queue: ((client: PoolClient) => Promise<any>)[] = []
private readonly mutex = new Mutex() protected readonly mutex = new Mutex()
protected readonly connections = new Map<string, PoolClient>()
protected readonly retryTxn = async (fn: (client: PoolClient) => Promise<any>): Promise<void> => { protected readonly retryTxn = async (
connection: Pool | PoolClient,
fn: (client: Pool | PoolClient) => Promise<any>
): Promise<void> => {
await this.mutex.runExclusive(async () => { await this.mutex.runExclusive(async () => {
await this.processOps(this.txConnection, fn) await this.processOps(connection, fn)
}) })
} }
constructor ( constructor (
protected readonly client: Pool, protected readonly client: Pool,
protected readonly connection: PoolClient,
protected readonly txConnection: PoolClient,
protected readonly refClient: PostgresClientReference, protected readonly refClient: PostgresClientReference,
protected readonly workspaceId: WorkspaceId, protected readonly workspaceId: WorkspaceId,
protected readonly hierarchy: Hierarchy, protected readonly hierarchy: Hierarchy,
@ -109,30 +111,56 @@ abstract class PostgresAdapterBase implements DbAdapter {
this._helper = new DBCollectionHelper(this.client, this.workspaceId) this._helper = new DBCollectionHelper(this.client, this.workspaceId)
} }
private async processOps (client: PoolClient, operation: (client: PoolClient) => Promise<any>): Promise<void> { async closeContext (ctx: MeasureContext): Promise<void> {
if (ctx.id === undefined) return
const conn = this.connections.get(ctx.id)
if (conn !== undefined) {
conn.release()
this.connections.delete(ctx.id)
}
}
protected async getConnection (ctx: MeasureContext): Promise<PoolClient | Pool> {
if (ctx.id === undefined) return this.client
const conn = this.connections.get(ctx.id)
if (conn !== undefined) return conn
const client = await this.client.connect()
this.connections.set(ctx.id, client)
return client
}
private async processOps (client: Pool | PoolClient, operation: (client: PoolClient) => Promise<any>): Promise<void> {
const backoffInterval = 100 // millis const backoffInterval = 100 // millis
const maxTries = 5 const maxTries = 5
let tries = 0 let tries = 0
while (true) { const _client = client instanceof Pool ? await client.connect() : client
await client.query('BEGIN;')
tries++
try { try {
const result = await operation(client) while (true) {
await client.query('COMMIT;') await _client.query('BEGIN;')
return result tries++
} catch (err: any) {
await client.query('ROLLBACK;')
if (err.code !== '40001' || tries === maxTries) { try {
throw err const result = await operation(_client)
} else { await _client.query('COMMIT;')
console.log('Transaction failed. Retrying.') return result
console.log(err.message) } catch (err: any) {
await new Promise((resolve) => setTimeout(resolve, tries * backoffInterval)) await _client.query('ROLLBACK;')
if (err.code !== '40001' || tries === maxTries) {
throw err
} else {
console.log('Transaction failed. Retrying.')
console.log(err.message)
await new Promise((resolve) => setTimeout(resolve, tries * backoffInterval))
}
} }
} }
} finally {
if (client instanceof Pool) {
_client.release()
}
} }
} }
@ -198,8 +226,9 @@ abstract class PostgresAdapterBase implements DbAdapter {
abstract init (): Promise<void> abstract init (): Promise<void>
async close (): Promise<void> { async close (): Promise<void> {
this.txConnection.release() this.connections.forEach((c) => {
this.connection.release() c.release()
})
this.refClient.close() this.refClient.close()
} }
@ -215,7 +244,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
sqlChunks.push(`LIMIT ${options.limit}`) sqlChunks.push(`LIMIT ${options.limit}`)
} }
const finalSql: string = [select, ...sqlChunks].join(' ') const finalSql: string = [select, ...sqlChunks].join(' ')
const result = await this.connection.query(finalSql) const result = await this.client.query(finalSql)
return result.rows.map((p) => parseDocWithProjection(p, options?.projection)) return result.rows.map((p) => parseDocWithProjection(p, options?.projection))
} }
@ -261,7 +290,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
if ((operations as any)['%hash%'] === undefined) { if ((operations as any)['%hash%'] === undefined) {
;(operations as any)['%hash%'] = null ;(operations as any)['%hash%'] = null
} }
await this.retryTxn(async (client) => { await this.retryTxn(this.client, async (client) => {
const res = await client.query(`SELECT * FROM ${translateDomain(domain)} WHERE ${translatedQuery} FOR UPDATE`) const res = await client.query(`SELECT * FROM ${translateDomain(domain)} WHERE ${translatedQuery} FOR UPDATE`)
const docs = res.rows.map(parseDoc) const docs = res.rows.map(parseDoc)
for (const doc of docs) { for (const doc of docs) {
@ -309,7 +338,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
const domain = translateDomain(options?.domain ?? this.hierarchy.getDomain(_class)) const domain = translateDomain(options?.domain ?? this.hierarchy.getDomain(_class))
const sqlChunks: string[] = [] const sqlChunks: string[] = []
const joins = this.buildJoin(_class, options?.lookup) const joins = this.buildJoin(_class, options?.lookup)
const select = `SELECT ${this.getProjection(ctx, _class, domain, options?.projection, joins)} FROM ${domain}` const select = `SELECT ${this.getProjection(_class, domain, options?.projection, joins)} FROM ${domain}`
const secJoin = this.addSecurity(query, domain, ctx.contextData) const secJoin = this.addSecurity(query, domain, ctx.contextData)
if (secJoin !== undefined) { if (secJoin !== undefined) {
sqlChunks.push(secJoin) sqlChunks.push(secJoin)
@ -318,12 +347,13 @@ abstract class PostgresAdapterBase implements DbAdapter {
sqlChunks.push(this.buildJoinString(joins)) sqlChunks.push(this.buildJoinString(joins))
} }
sqlChunks.push(`WHERE ${this.buildQuery(_class, domain, query, joins, options)}`) sqlChunks.push(`WHERE ${this.buildQuery(_class, domain, query, joins, options)}`)
const connection = await this.getConnection(ctx)
let total = options?.total === true ? 0 : -1 let total = options?.total === true ? 0 : -1
if (options?.total === true) { if (options?.total === true) {
const totalReq = `SELECT COUNT(${domain}._id) as count FROM ${domain}` const totalReq = `SELECT COUNT(${domain}._id) as count FROM ${domain}`
const totalSql = [totalReq, ...sqlChunks].join(' ') const totalSql = [totalReq, ...sqlChunks].join(' ')
const totalResult = await this.connection.query(totalSql) const totalResult = await connection.query(totalSql)
const parsed = Number.parseInt(totalResult.rows[0]?.count ?? '') const parsed = Number.parseInt(totalResult.rows[0]?.count ?? '')
total = Number.isNaN(parsed) ? 0 : parsed total = Number.isNaN(parsed) ? 0 : parsed
} }
@ -335,7 +365,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
} }
const finalSql: string = [select, ...sqlChunks].join(' ') const finalSql: string = [select, ...sqlChunks].join(' ')
const result = await this.connection.query(finalSql) const result = await connection.query(finalSql)
if (options?.lookup === undefined) { if (options?.lookup === undefined) {
return toFindResult( return toFindResult(
result.rows.map((p) => parseDocWithProjection(p, options?.projection)), result.rows.map((p) => parseDocWithProjection(p, options?.projection)),
@ -938,7 +968,6 @@ abstract class PostgresAdapterBase implements DbAdapter {
} }
private getProjection<T extends Doc>( private getProjection<T extends Doc>(
ctx: MeasureContext,
_class: Ref<Class<T>>, _class: Ref<Class<T>>,
baseDomain: string, baseDomain: string,
projection: Projection<T> | undefined, projection: Projection<T> | undefined,
@ -1029,7 +1058,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
next: async () => { next: async () => {
if (!initialized) { if (!initialized) {
if (recheck === true) { if (recheck === true) {
await this.retryTxn(async (client) => { await this.retryTxn(client, async (client) => {
await client.query( await client.query(
`UPDATE ${translateDomain(domain)} SET jsonb_set(data, '{%hash%}', 'NULL', true) WHERE "workspaceId" = $1 AND data ->> '%hash%' IS NOT NULL`, `UPDATE ${translateDomain(domain)} SET jsonb_set(data, '{%hash%}', 'NULL', true) WHERE "workspaceId" = $1 AND data ->> '%hash%' IS NOT NULL`,
[this.workspaceId.name] [this.workspaceId.name]
@ -1100,7 +1129,8 @@ abstract class PostgresAdapterBase implements DbAdapter {
if (docs.length === 0) { if (docs.length === 0) {
return [] return []
} }
const res = await this.connection.query( const connection = await this.getConnection(ctx)
const res = await connection.query(
`SELECT * FROM ${translateDomain(domain)} WHERE _id = ANY($1) AND "workspaceId" = $2`, `SELECT * FROM ${translateDomain(domain)} WHERE _id = ANY($1) AND "workspaceId" = $2`,
[docs, this.workspaceId.name] [docs, this.workspaceId.name]
) )
@ -1109,60 +1139,65 @@ abstract class PostgresAdapterBase implements DbAdapter {
} }
async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise<void> { async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise<void> {
const arr = docs.concat() await ctx.with('upload', { domain }, async (ctx) => {
const fields = getDocFieldsByDomains(domain) const arr = docs.concat()
const filedsWithData = [...fields, 'data'] const fields = getDocFieldsByDomains(domain)
const insertFields: string[] = [] const filedsWithData = [...fields, 'data']
const onConflict: string[] = [] const insertFields: string[] = []
for (const field of filedsWithData) { const onConflict: string[] = []
insertFields.push(`"${field}"`) for (const field of filedsWithData) {
onConflict.push(`"${field}" = EXCLUDED."${field}"`) insertFields.push(`"${field}"`)
} onConflict.push(`"${field}" = EXCLUDED."${field}"`)
const insertStr = insertFields.join(', ')
const onConflictStr = onConflict.join(', ')
while (arr.length > 0) {
const part = arr.splice(0, 500)
const values: any[] = []
const vars: string[] = []
let index = 1
for (let i = 0; i < part.length; i++) {
const doc = part[i]
const variables: string[] = []
const d = convertDoc(domain, doc, this.workspaceId.name)
values.push(d.workspaceId)
variables.push(`$${index++}`)
for (const field of fields) {
values.push(d[field])
variables.push(`$${index++}`)
}
values.push(d.data)
variables.push(`$${index++}`)
vars.push(`(${variables.join(', ')})`)
} }
const insertStr = insertFields.join(', ')
const onConflictStr = onConflict.join(', ')
const connection = await this.getConnection(ctx)
while (arr.length > 0) {
const part = arr.splice(0, 500)
const values: any[] = []
const vars: string[] = []
let index = 1
for (let i = 0; i < part.length; i++) {
const doc = part[i]
const variables: string[] = []
const d = convertDoc(domain, doc, this.workspaceId.name)
values.push(d.workspaceId)
variables.push(`$${index++}`)
for (const field of fields) {
values.push(d[field])
variables.push(`$${index++}`)
}
values.push(d.data)
variables.push(`$${index++}`)
vars.push(`(${variables.join(', ')})`)
}
const vals = vars.join(',') const vals = vars.join(',')
await this.retryTxn(async (client) => { await this.retryTxn(connection, async (client) => {
await client.query( await client.query(
`INSERT INTO ${translateDomain(domain)} ("workspaceId", ${insertStr}) VALUES ${vals} `INSERT INTO ${translateDomain(domain)} ("workspaceId", ${insertStr}) VALUES ${vals}
ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr};`, ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr};`,
values values
) )
}) })
} }
})
} }
async clean (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<void> { async clean (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
await this.connection.query(`DELETE FROM ${translateDomain(domain)} WHERE _id = ANY($1) AND "workspaceId" = $2`, [ const connection = await this.getConnection(ctx)
await connection.query(`DELETE FROM ${translateDomain(domain)} WHERE _id = ANY($1) AND "workspaceId" = $2`, [
docs, docs,
this.workspaceId.name this.workspaceId.name
]) ])
} }
async groupBy<T>(ctx: MeasureContext, domain: Domain, field: string): Promise<Set<T>> { async groupBy<T>(ctx: MeasureContext, domain: Domain, field: string): Promise<Set<T>> {
const connection = await this.getConnection(ctx)
const key = isDataField(domain, field) ? `data ->> '${field}'` : `"${field}"` const key = isDataField(domain, field) ? `data ->> '${field}'` : `"${field}"`
const result = await ctx.with('groupBy', { domain }, async (ctx) => { const result = await ctx.with('groupBy', { domain }, async (ctx) => {
try { try {
const result = await this.connection.query( const result = await connection.query(
`SELECT DISTINCT ${key} as ${field} FROM ${translateDomain(domain)} WHERE "workspaceId" = $1`, `SELECT DISTINCT ${key} as ${field} FROM ${translateDomain(domain)} WHERE "workspaceId" = $1`,
[this.workspaceId.name] [this.workspaceId.name]
) )
@ -1177,7 +1212,8 @@ abstract class PostgresAdapterBase implements DbAdapter {
async update (ctx: MeasureContext, domain: Domain, operations: Map<Ref<Doc>, DocumentUpdate<Doc>>): Promise<void> { async update (ctx: MeasureContext, domain: Domain, operations: Map<Ref<Doc>, DocumentUpdate<Doc>>): Promise<void> {
const ids = Array.from(operations.keys()) const ids = Array.from(operations.keys())
await this.retryTxn(async (client) => { const connection = await this.getConnection(ctx)
await this.retryTxn(connection, async (client) => {
try { try {
const res = await client.query( const res = await client.query(
`SELECT * FROM ${translateDomain(domain)} WHERE _id = ANY($1) AND "workspaceId" = $2 FOR UPDATE`, `SELECT * FROM ${translateDomain(domain)} WHERE _id = ANY($1) AND "workspaceId" = $2 FOR UPDATE`,
@ -1219,7 +1255,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
}) })
} }
async insert (domain: string, docs: Doc[]): Promise<TxResult> { async insert (ctx: MeasureContext, domain: string, docs: Doc[]): Promise<TxResult> {
const fields = getDocFieldsByDomains(domain) const fields = getDocFieldsByDomains(domain)
const filedsWithData = [...fields, 'data'] const filedsWithData = [...fields, 'data']
const insertFields: string[] = [] const insertFields: string[] = []
@ -1247,7 +1283,8 @@ abstract class PostgresAdapterBase implements DbAdapter {
vars.push(`(${variables.join(', ')})`) vars.push(`(${variables.join(', ')})`)
} }
const vals = vars.join(',') const vals = vars.join(',')
await this.retryTxn(async (client) => { const connection = await this.getConnection(ctx)
await this.retryTxn(connection, async (client) => {
await client.query( await client.query(
`INSERT INTO ${translateDomain(domain)} ("workspaceId", ${insertStr}) VALUES ${vals}`, `INSERT INTO ${translateDomain(domain)} ("workspaceId", ${insertStr}) VALUES ${vals}`,
values values
@ -1312,8 +1349,9 @@ class PostgresAdapter extends PostgresAdapterBase {
} }
private async txMixin (ctx: MeasureContext, tx: TxMixin<Doc, Doc>): Promise<TxResult> { private async txMixin (ctx: MeasureContext, tx: TxMixin<Doc, Doc>): Promise<TxResult> {
await ctx.with('tx-mixin', { _class: tx.objectClass, mixin: tx.mixin }, async () => { await ctx.with('tx-mixin', { _class: tx.objectClass, mixin: tx.mixin }, async (ctx) => {
await this.retryTxn(async (client) => { const connection = await this.getConnection(ctx)
await this.retryTxn(connection, async (client) => {
const doc = await this.findDoc(ctx, client, tx.objectClass, tx.objectId, true) const doc = await this.findDoc(ctx, client, tx.objectClass, tx.objectId, true)
if (doc === undefined) return if (doc === undefined) return
TxProcessor.updateMixin4Doc(doc, tx) TxProcessor.updateMixin4Doc(doc, tx)
@ -1365,21 +1403,22 @@ class PostgresAdapter extends PostgresAdapterBase {
protected async txCreateDoc (ctx: MeasureContext, tx: TxCreateDoc<Doc>): Promise<TxResult> { protected async txCreateDoc (ctx: MeasureContext, tx: TxCreateDoc<Doc>): Promise<TxResult> {
const doc = TxProcessor.createDoc2Doc(tx) const doc = TxProcessor.createDoc2Doc(tx)
return await ctx.with('create-doc', { _class: doc._class }, async () => { return await ctx.with('create-doc', { _class: doc._class }, async (_ctx) => {
return await this.insert(translateDomain(this.hierarchy.getDomain(doc._class)), [doc]) return await this.insert(_ctx, translateDomain(this.hierarchy.getDomain(doc._class)), [doc])
}) })
} }
protected async txUpdateDoc (ctx: MeasureContext, tx: TxUpdateDoc<Doc>): Promise<TxResult> { protected async txUpdateDoc (ctx: MeasureContext, tx: TxUpdateDoc<Doc>): Promise<TxResult> {
return await ctx.with('tx-update-doc', { _class: tx.objectClass }, async () => { return await ctx.with('tx-update-doc', { _class: tx.objectClass }, async (_ctx) => {
if (isOperator(tx.operations)) { if (isOperator(tx.operations)) {
let doc: Doc | undefined let doc: Doc | undefined
const ops = { '%hash%': null, ...tx.operations } const ops = { '%hash%': null, ...tx.operations }
return await ctx.with( return await _ctx.with(
'update with operations', 'update with operations',
{ operations: JSON.stringify(Object.keys(tx.operations)) }, { operations: JSON.stringify(Object.keys(tx.operations)) },
async () => { async (ctx) => {
await this.retryTxn(async (client) => { const connection = await this.getConnection(ctx)
await this.retryTxn(connection, async (client) => {
doc = await this.findDoc(ctx, client, tx.objectClass, tx.objectId, true) doc = await this.findDoc(ctx, client, tx.objectClass, tx.objectId, true)
if (doc === undefined) return {} if (doc === undefined) return {}
TxProcessor.applyUpdate(doc, ops) TxProcessor.applyUpdate(doc, ops)
@ -1409,7 +1448,7 @@ class PostgresAdapter extends PostgresAdapterBase {
} }
) )
} else { } else {
return await this.updateDoc(ctx, tx, tx.retrieve ?? false) return await this.updateDoc(_ctx, tx, tx.retrieve ?? false)
} }
}) })
} }
@ -1419,7 +1458,7 @@ class PostgresAdapter extends PostgresAdapterBase {
tx: TxUpdateDoc<T>, tx: TxUpdateDoc<T>,
retrieve: boolean retrieve: boolean
): Promise<TxResult> { ): Promise<TxResult> {
return await ctx.with('update jsonb_set', {}, async () => { return await ctx.with('update jsonb_set', {}, async (_ctx) => {
const updates: string[] = ['"modifiedBy" = $1', '"modifiedOn" = $2'] const updates: string[] = ['"modifiedBy" = $1', '"modifiedOn" = $2']
const params: any[] = [tx.modifiedBy, tx.modifiedOn, tx.objectId, this.workspaceId.name] const params: any[] = [tx.modifiedBy, tx.modifiedOn, tx.objectId, this.workspaceId.name]
let paramsIndex = 5 let paramsIndex = 5
@ -1446,14 +1485,15 @@ class PostgresAdapter extends PostgresAdapterBase {
} }
try { try {
await this.retryTxn(async (client) => { const connection = await this.getConnection(_ctx)
await this.retryTxn(connection, async (client) => {
await client.query( await client.query(
`UPDATE ${translateDomain(this.hierarchy.getDomain(tx.objectClass))} SET ${updates.join(', ')} WHERE _id = $3 AND "workspaceId" = $4`, `UPDATE ${translateDomain(this.hierarchy.getDomain(tx.objectClass))} SET ${updates.join(', ')} WHERE _id = $3 AND "workspaceId" = $4`,
params params
) )
}) })
if (retrieve) { if (retrieve) {
const object = await this.findDoc(ctx, this.connection, tx.objectClass, tx.objectId) const object = await this.findDoc(_ctx, connection, tx.objectClass, tx.objectId)
return { object } return { object }
} }
} catch (err) { } catch (err) {
@ -1465,7 +1505,7 @@ class PostgresAdapter extends PostgresAdapterBase {
private async findDoc ( private async findDoc (
ctx: MeasureContext, ctx: MeasureContext,
client: PoolClient, client: Pool | PoolClient,
_class: Ref<Class<Doc>>, _class: Ref<Class<Doc>>,
_id: Ref<Doc>, _id: Ref<Doc>,
forUpdate: boolean = false forUpdate: boolean = false
@ -1482,9 +1522,10 @@ class PostgresAdapter extends PostgresAdapterBase {
} }
protected async txRemoveDoc (ctx: MeasureContext, tx: TxRemoveDoc<Doc>): Promise<TxResult> { protected async txRemoveDoc (ctx: MeasureContext, tx: TxRemoveDoc<Doc>): Promise<TxResult> {
await ctx.with('tx-remove-doc', { _class: tx.objectClass }, async () => { await ctx.with('tx-remove-doc', { _class: tx.objectClass }, async (_ctx) => {
const domain = translateDomain(this.hierarchy.getDomain(tx.objectClass)) const domain = translateDomain(this.hierarchy.getDomain(tx.objectClass))
await this.retryTxn(async (client) => { const connection = await this.getConnection(_ctx)
await this.retryTxn(connection, async (client) => {
await client.query(`DELETE FROM ${domain} WHERE _id = $1 AND "workspaceId" = $2`, [ await client.query(`DELETE FROM ${domain} WHERE _id = $1 AND "workspaceId" = $2`, [
tx.objectId, tx.objectId,
this.workspaceId.name this.workspaceId.name
@ -1507,7 +1548,7 @@ class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter {
return [] return []
} }
try { try {
await this.insert(DOMAIN_TX, tx) await this.insert(ctx, DOMAIN_TX, tx)
} catch (err) { } catch (err) {
console.error(err) console.error(err)
} }
@ -1515,7 +1556,7 @@ class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter {
} }
async getModel (ctx: MeasureContext): Promise<Tx[]> { async getModel (ctx: MeasureContext): Promise<Tx[]> {
const res = await this.connection.query( const res = await this.client.query(
`SELECT * FROM ${translateDomain(DOMAIN_TX)} WHERE "workspaceId" = '${this.workspaceId.name}' AND data->>'objectSpace' = '${core.space.Model}' ORDER BY _id ASC, "modifiedOn" ASC` `SELECT * FROM ${translateDomain(DOMAIN_TX)} WHERE "workspaceId" = '${this.workspaceId.name}' AND data->>'objectSpace' = '${core.space.Model}' ORDER BY _id ASC, "modifiedOn" ASC`
) )
const model = res.rows.map((p) => parseDoc<Tx>(p)) const model = res.rows.map((p) => parseDoc<Tx>(p))
@ -1539,9 +1580,7 @@ export async function createPostgresAdapter (
): Promise<DbAdapter> { ): Promise<DbAdapter> {
const client = getDBClient(url) const client = getDBClient(url)
const pool = await client.getClient() const pool = await client.getClient()
const mainConnection = await pool.connect() const adapter = new PostgresAdapter(pool, client, workspaceId, hierarchy, modelDb)
const txConnection = await pool.connect()
const adapter = new PostgresAdapter(pool, mainConnection, txConnection, client, workspaceId, hierarchy, modelDb)
return adapter return adapter
} }
@ -1557,9 +1596,7 @@ export async function createPostgresTxAdapter (
): Promise<TxAdapter> { ): Promise<TxAdapter> {
const client = getDBClient(url) const client = getDBClient(url)
const pool = await client.getClient() const pool = await client.getClient()
const mainConnection = await pool.connect() const adapter = new PostgresTxAdapter(pool, client, workspaceId, hierarchy, modelDb)
const txConnection = await pool.connect()
const adapter = new PostgresTxAdapter(pool, mainConnection, txConnection, client, workspaceId, hierarchy, modelDb)
await adapter.init() await adapter.init()
return adapter return adapter
} }

View File

@ -206,14 +206,17 @@ export class ClientRef implements PostgresClientReference {
* @public * @public
*/ */
export function getDBClient (connectionString: string, database?: string): PostgresClientReference { export function getDBClient (connectionString: string, database?: string): PostgresClientReference {
const key = `${connectionString}${process.env.postgree_OPTIONS ?? '{}'}` const extraOptions = JSON.parse(process.env.POSTGRES_OPTIONS ?? '{}')
const key = `${connectionString}${extraOptions}`
let existing = connections.get(key) let existing = connections.get(key)
if (existing === undefined) { if (existing === undefined) {
const pool = new Pool({ const pool = new Pool({
connectionString, connectionString,
application_name: 'transactor', application_name: 'transactor',
database database,
max: 10,
...extraOptions
}) })
existing = new PostgresClientReferenceImpl(pool, () => { existing = new PostgresClientReferenceImpl(pool, () => {

View File

@ -18,6 +18,7 @@ import {
ApplyTxMiddleware, ApplyTxMiddleware,
BroadcastMiddleware, BroadcastMiddleware,
ConfigurationMiddleware, ConfigurationMiddleware,
ConnectionMgrMiddleware,
ContextNameMiddleware, ContextNameMiddleware,
DBAdapterInitMiddleware, DBAdapterInitMiddleware,
DBAdapterMiddleware, DBAdapterMiddleware,
@ -129,6 +130,7 @@ export function createServerPipeline (
ConfigurationMiddleware.create, ConfigurationMiddleware.create,
LowLevelMiddleware.create, LowLevelMiddleware.create,
ContextNameMiddleware.create, ContextNameMiddleware.create,
ConnectionMgrMiddleware.create,
MarkDerivedEntryMiddleware.create, MarkDerivedEntryMiddleware.create,
ApplyTxMiddleware.create, // Extract apply ApplyTxMiddleware.create, // Extract apply
TxMiddleware.create, // Store tx into transaction domain TxMiddleware.create, // Store tx into transaction domain