Fix cockroach (#6965)

Signed-off-by: Denis Bykhov <bykhov.denis@gmail.com>
This commit is contained in:
Denis Bykhov 2024-10-23 13:29:44 +05:00 committed by GitHub
parent 498b390b10
commit 5eeb828f9c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
88 changed files with 447 additions and 314 deletions

View File

@ -18,7 +18,14 @@ import {
type MeasureMetricsContext
} from '@hcengineering/core'
import { getMongoClient, getWorkspaceMongoDB } from '@hcengineering/mongo'
import { convertDoc, createTable, getDBClient, retryTxn, translateDomain } from '@hcengineering/postgres'
import {
convertDoc,
createTable,
getDBClient,
getDocFieldsByDomains,
retryTxn,
translateDomain
} from '@hcengineering/postgres'
import { getTransactorEndpoint } from '@hcengineering/server-client'
import { generateToken } from '@hcengineering/server-token'
import { connect } from '@hcengineering/server-tool'
@ -54,10 +61,6 @@ export async function moveFromMongoToPG (
client.close()
}
function escapeBackticks (str: string): string {
return str.replaceAll("'", "''")
}
async function moveWorkspace (
accountDb: AccountDB,
mongo: MongoClient,
@ -85,6 +88,13 @@ async function moveWorkspace (
const currentIds = new Set(current.rows.map((r) => r._id))
console.log('move domain', domain)
const docs: Doc[] = []
const fields = getDocFieldsByDomains(domain)
const filedsWithData = [...fields, 'data']
const insertFields: string[] = []
for (const field of filedsWithData) {
insertFields.push(`"${field}"`)
}
const insertStr = insertFields.join(', ')
while (true) {
while (docs.length < 50000) {
const doc = (await cursor.next()) as Doc | null
@ -95,18 +105,29 @@ async function moveWorkspace (
if (docs.length === 0) break
while (docs.length > 0) {
const part = docs.splice(0, 500)
const vals = part
.map((doc) => {
const d = convertDoc(doc, ws.workspace)
return `('${d._id}', '${d.workspaceId}', '${d._class}', '${d.createdBy ?? d.modifiedBy}', '${d.modifiedBy}', ${d.modifiedOn}, ${d.createdOn ?? d.modifiedOn}, '${d.space}', ${
d.attachedTo != null ? `'${d.attachedTo}'` : 'NULL'
}, '${escapeBackticks(JSON.stringify(d.data))}')`
})
.join(', ')
const values: any[] = []
const vars: string[] = []
let index = 1
for (let i = 0; i < part.length; i++) {
const doc = part[i]
const variables: string[] = []
const d = convertDoc(domain, doc, ws.workspace)
values.push(d.workspaceId)
variables.push(`$${index++}`)
for (const field of fields) {
values.push(d[field])
variables.push(`$${index++}`)
}
values.push(d.data)
variables.push(`$${index++}`)
vars.push(`(${variables.join(', ')})`)
}
const vals = vars.join(',')
try {
await retryTxn(pgClient, async (client) => {
await client.query(
`INSERT INTO ${translateDomain(domain)} (_id, "workspaceId", _class, "createdBy", "modifiedBy", "modifiedOn", "createdOn", space, "attachedTo", data) VALUES ${vals}`
`INSERT INTO ${translateDomain(domain)} ("workspaceId", ${insertStr}) VALUES ${vals}`,
values
)
})
} catch (err) {

View File

@ -371,7 +371,7 @@ export function devTool (
lastProcessingTime: Date.now() + 1000 * 60
})
await createWorkspace(measureCtx, version, brandingObj, wsInfo, txes, migrateOperations)
await createWorkspace(measureCtx, version, brandingObj, wsInfo, txes, migrateOperations, undefined, true)
await updateWorkspace(db, wsInfo, {
mode: 'active',
@ -1717,7 +1717,7 @@ export function devTool (
lastProcessingTime: Date.now() + 1000 * 60
})
await createWorkspace(measureCtx, version, null, wsInfo, txes, migrateOperations)
await createWorkspace(measureCtx, version, null, wsInfo, txes, migrateOperations, undefined, true)
await updateWorkspace(db, wsInfo, {
mode: 'active',

View File

@ -133,18 +133,6 @@ describe('memdb', () => {
})
const objClass = (await model.findAll(core.class.Class, { _id: core.class.Obj }))[0] as any
expect(objClass['test:mixin:TestMixin'].arr).toEqual(expect.arrayContaining(['hello']))
await ops.updateDoc(test.mixin.TestMixin, core.space.Model, core.class.Obj as unknown as Ref<TestMixin>, {
$pushMixin: {
$mixin: test.mixin.TestMixin,
values: {
arr: 'there'
}
}
})
const objClass2 = (await model.findAll(core.class.Class, { _id: core.class.Obj }))[0] as any
expect(objClass2['test:mixin:TestMixin'].arr).toEqual(expect.arrayContaining(['hello', 'there']))
})
it('should allow delete', async () => {

View File

@ -116,37 +116,6 @@ function $update (document: Doc, keyval: Record<string, PropertyType>): void {
}
}
function $move (document: Doc, keyval: Record<string, PropertyType>): void {
const doc = document as any
for (const key in keyval) {
if (doc[key] === undefined) {
doc[key] = []
}
const arr = doc[key] as Array<any>
const desc = keyval[key]
doc[key] = (arr ?? []).filter((val) => val !== desc.$value)
doc[key].splice(desc.$position, 0, desc.$value)
}
}
function $pushMixin (document: Doc, options: any): void {
const doc = document as any
const mixinId = options.$mixin
if (mixinId === undefined) {
throw new Error('$mixin must be specified for $push_mixin operation')
}
const mixin = doc[mixinId]
const keyval = options.values
for (const key in keyval) {
const arr = mixin[key]
if (arr == null) {
mixin[key] = [keyval[key]]
} else {
arr.push(keyval[key])
}
}
}
function $inc (document: Doc, keyval: Record<string, number>): void {
const doc = document as unknown as Record<string, number | undefined>
for (const key in keyval) {
@ -180,8 +149,6 @@ const operators: Record<string, _OperatorFunc> = {
$push,
$pull,
$update,
$move,
$pushMixin,
$inc,
$unset,
$rename

View File

@ -247,7 +247,6 @@ export type OmitNever<T extends object> = Omit<T, KeysByType<T, never>>
export interface PushOptions<T extends object> {
$push?: Partial<OmitNever<ArrayAsElementPosition<Required<T>>>>
$pull?: Partial<OmitNever<ArrayAsElement<Required<T>>>>
$move?: Partial<OmitNever<ArrayMoveDescriptor<Required<T>>>>
}
/**
@ -269,16 +268,6 @@ export interface SetEmbeddedOptions<T extends object> {
$update?: Partial<OmitNever<ArrayAsElementUpdate<Required<T>>>>
}
/**
* @public
*/
export interface PushMixinOptions<D extends Doc> {
$pushMixin?: {
$mixin: Ref<Mixin<D>>
values: Partial<OmitNever<ArrayAsElement<D>>>
}
}
/**
* @public
*/
@ -299,7 +288,6 @@ export interface SpaceUpdate {
export type DocumentUpdate<T extends Doc> = Partial<Data<T>> &
PushOptions<T> &
SetEmbeddedOptions<T> &
PushMixinOptions<T> &
IncOptions<T> &
SpaceUpdate

View File

@ -29,7 +29,7 @@ import { ModelLogger } from './utils'
* @public
*/
export type MigrateUpdate<T extends Doc> = Partial<T> &
Omit<PushOptions<T>, '$move'> &
PushOptions<T> &
IncOptions<T> &
UnsetOptions &
Record<string, any>

View File

@ -117,7 +117,7 @@ async function createPersonSpace (
person: Ref<Person>,
control: TriggerControl
): Promise<TxCUD<PersonSpace>[]> {
const personSpace = (await control.findAll(control.ctx, contact.class.PersonSpace, { person }, { limit: 1 })).shift()
const personSpace = (await control.findAll(control.ctx, contact.class.PersonSpace, { person }, { limit: 1 }))[0]
if (personSpace !== undefined) {
const toAdd = account.filter((it) => !personSpace.members.includes(it))
if (toAdd.length === 0) return []

View File

@ -1717,7 +1717,7 @@ async function updateCollaborators (
if (hierarchy.classHierarchyMixin(objectClass, activity.mixin.ActivityDoc) === undefined) return res
const contexts = await control.findAll(control.ctx, notification.class.DocNotifyContext, { attachedTo: objectId })
const contexts = await control.findAll(control.ctx, notification.class.DocNotifyContext, { objectId })
const addedInfo = await getUsersInfo(ctx, toAdd as Ref<PersonAccount>[], control)
for (const addedUser of addedInfo.values()) {

View File

@ -118,6 +118,7 @@ class AdapterStorage implements BackupStorage {
*/
export async function createFileBackupStorage (fileName: string): Promise<BackupStorage> {
if (!existsSync(fileName)) {
console.log(__dirname)
await mkdir(fileName, { recursive: true })
}
return new FileStorage(fileName)

View File

@ -1407,23 +1407,6 @@ class MongoAdapter extends MongoAdapterBase {
modifiedOn: tx.modifiedOn
}
if (isOperator(tx.attributes)) {
const operator = Object.keys(tx.attributes)[0]
if (operator === '$move') {
const keyval = (tx.attributes as any).$move
const arr = tx.mixin + '.' + Object.keys(keyval)[0]
const desc = keyval[arr]
const ops: any = [
{ updateOne: { filter, update: { $pull: { [arr]: desc.$value } } } },
{
updateOne: {
filter,
update: { $set: modifyOp, $push: { [arr]: { $each: [desc.$value], $position: desc.$position } } }
}
}
]
bulk.bulkOperations.push(...ops)
return
}
const update = { ...this.translateMixinAttrs(tx.mixin, tx.attributes), $set: { ...modifyOp } }
bulk.bulkOperations.push({
@ -1475,46 +1458,7 @@ class MongoAdapter extends MongoAdapterBase {
protected txUpdateDoc (bulk: OperationBulk, tx: TxUpdateDoc<Doc>): void {
if (isOperator(tx.operations)) {
const operator = Object.keys(tx.operations)[0]
if (operator === '$move') {
const keyval = (tx.operations as any).$move
const arr = Object.keys(keyval)[0]
const desc = keyval[arr]
const ops: any = [
{
updateOne: {
filter: { _id: tx.objectId },
update: {
$set: {
'%hash%': null
},
$pull: {
[arr]: desc.$value
}
}
}
},
{
updateOne: {
filter: { _id: tx.objectId },
update: {
$set: {
modifiedBy: tx.modifiedBy,
modifiedOn: tx.modifiedOn,
'%hash%': null
},
$push: {
[arr]: {
$each: [desc.$value],
$position: desc.$position
}
}
}
}
}
]
bulk.bulkOperations.push(...ops)
} else if (operator === '$update') {
if (operator === '$update') {
const keyval = (tx.operations as any).$update
const arr = Object.keys(keyval)[0]
const desc = keyval[arr] as QueryUpdate<any>

View File

@ -14,4 +14,4 @@
//
export * from './storage'
export { getDBClient, convertDoc, createTable, retryTxn, translateDomain } from './utils'
export { getDBClient, convertDoc, createTable, retryTxn, translateDomain, getDocFieldsByDomains } from './utils'

View File

@ -0,0 +1,36 @@
import { DOMAIN_SPACE } from '@hcengineering/core'
type DataType = 'bigint' | 'bool' | 'text' | 'text[]'
type Schema = Record<string, [DataType, boolean]>
export const defaultSchema: Schema = {
_id: ['text', true],
_class: ['text', true],
space: ['text', true],
modifiedBy: ['text', true],
createdBy: ['text', false],
modifiedOn: ['bigint', true],
createdOn: ['bigint', false],
attachedTo: ['text', false]
}
export const spaceSchema: Schema = {
_id: ['text', true],
_class: ['text', true],
space: ['text', true],
modifiedBy: ['text', true],
createdBy: ['text', false],
modifiedOn: ['bigint', true],
createdOn: ['bigint', false],
private: ['bool', true],
members: ['text[]', true]
}
export const domainSchemas: Record<string, Schema> = {
[DOMAIN_SPACE]: spaceSchema
}
export function getSchema (domain: string): Schema {
return domainSchemas[domain] ?? defaultSchema
}

View File

@ -66,33 +66,41 @@ import {
} from '@hcengineering/server-core'
import { createHash } from 'crypto'
import { type Pool, type PoolClient } from 'pg'
import { type ValueType } from './types'
import {
convertDoc,
createTable,
DBCollectionHelper,
docFields,
escapeBackticks,
getDBClient,
getUpdateValue,
getDocFieldsByDomains,
isDataField,
isOwner,
type JoinProps,
Mutex,
parseDoc,
parseDocWithProjection,
parseUpdate,
type PostgresClientReference,
retryTxn,
translateDomain
} from './utils'
abstract class PostgresAdapterBase implements DbAdapter {
protected readonly _helper: DBCollectionHelper
protected readonly tableFields = new Map<string, string[]>()
protected readonly retryTxn = async (fn: (client: PoolClient) => Promise<any>): Promise<any> => {
return await retryTxn(this.client, fn)
protected readonly queue: ((client: PoolClient) => Promise<any>)[] = []
private readonly mutex = new Mutex()
protected readonly retryTxn = async (fn: (client: PoolClient) => Promise<any>): Promise<void> => {
await this.mutex.runExclusive(async () => {
await this.processOps(this.txConnection, fn)
})
}
constructor (
protected readonly client: Pool,
protected readonly connection: PoolClient,
protected readonly txConnection: PoolClient,
protected readonly refClient: PostgresClientReference,
protected readonly workspaceId: WorkspaceId,
protected readonly hierarchy: Hierarchy,
@ -101,6 +109,33 @@ abstract class PostgresAdapterBase implements DbAdapter {
this._helper = new DBCollectionHelper(this.client, this.workspaceId)
}
private async processOps (client: PoolClient, operation: (client: PoolClient) => Promise<any>): Promise<void> {
const backoffInterval = 100 // millis
const maxTries = 5
let tries = 0
while (true) {
await client.query('BEGIN;')
tries++
try {
const result = await operation(client)
await client.query('COMMIT;')
return result
} catch (err: any) {
await client.query('ROLLBACK;')
if (err.code !== '40001' || tries === maxTries) {
throw err
} else {
console.log('Transaction failed. Retrying.')
console.log(err.message)
await new Promise((resolve) => setTimeout(resolve, tries * backoffInterval))
}
}
}
}
async traverse<T extends Doc>(
_domain: Domain,
query: DocumentQuery<T>,
@ -163,6 +198,8 @@ abstract class PostgresAdapterBase implements DbAdapter {
abstract init (): Promise<void>
async close (): Promise<void> {
this.txConnection.release()
this.connection.release()
this.refClient.close()
}
@ -178,7 +215,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
sqlChunks.push(`LIMIT ${options.limit}`)
}
const finalSql: string = [select, ...sqlChunks].join(' ')
const result = await this.client.query(finalSql)
const result = await this.connection.query(finalSql)
return result.rows.map((p) => parseDocWithProjection(p, options?.projection))
}
@ -190,7 +227,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
continue
}
if (typeof val === 'number') {
res.push(`${this.transformKey(core.class.Doc, key, false)} ${val === 1 ? 'ASC' : 'DESC'}`)
res.push(`${this.transformKey(domain, core.class.Doc, key, false)} ${val === 1 ? 'ASC' : 'DESC'}`)
} else {
// todo handle custom sorting
}
@ -203,8 +240,8 @@ abstract class PostgresAdapterBase implements DbAdapter {
res.push(`"workspaceId" = '${this.workspaceId.name}'`)
for (const key in query) {
const value = query[key]
const tkey = this.transformKey(core.class.Doc, key, false)
const translated = this.translateQueryValue(tkey, value, false)
const tkey = this.transformKey(domain, core.class.Doc, key, false)
const translated = this.translateQueryValue(tkey, value, 'common')
if (translated !== undefined) {
res.push(translated)
}
@ -231,18 +268,26 @@ abstract class PostgresAdapterBase implements DbAdapter {
if (doc === undefined) continue
const prevAttachedTo = (doc as any).attachedTo
TxProcessor.applyUpdate(doc, operations)
const converted = convertDoc(doc, this.workspaceId.name)
const updates: string[] = []
const { space, attachedTo, ...ops } = operations as any
const converted = convertDoc(domain, doc, this.workspaceId.name)
let paramsIndex = 3
const params: any[] = [doc._id, this.workspaceId.name]
if (space !== undefined) {
updates.push(`space = '${space}'`)
const updates: string[] = []
const { extractedFields, remainingData } = parseUpdate(domain, operations)
const newAttachedTo = (doc as any).attachedTo
if (Object.keys(extractedFields).length > 0) {
for (const key in extractedFields) {
const val = (extractedFields as any)[key]
if (key === 'attachedTo' && val === prevAttachedTo) continue
updates.push(`"${key}" = $${paramsIndex++}`)
params.push(val)
}
} else if (prevAttachedTo !== undefined && prevAttachedTo !== newAttachedTo) {
updates.push(`"attachedTo" = $${paramsIndex++}`)
params.push(newAttachedTo)
}
if ((doc as any).attachedTo !== prevAttachedTo) {
updates.push(`"attachedTo" = ${attachedTo != null ? "'" + attachedTo + "'" : 'NULL'}`)
}
if (Object.keys(ops).length > 0) {
updates.push('data = $3')
if (Object.keys(remainingData).length > 0) {
updates.push(`data = $${paramsIndex++}`)
params.push(converted.data)
}
await client.query(
@ -278,7 +323,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
if (options?.total === true) {
const totalReq = `SELECT COUNT(${domain}._id) as count FROM ${domain}`
const totalSql = [totalReq, ...sqlChunks].join(' ')
const totalResult = await this.client.query(totalSql)
const totalResult = await this.connection.query(totalSql)
const parsed = Number.parseInt(totalResult.rows[0]?.count ?? '')
total = Number.isNaN(parsed) ? 0 : parsed
}
@ -290,7 +335,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
const finalSql: string = [select, ...sqlChunks].join(' ')
const result = await this.client.query(finalSql)
const result = await this.connection.query(finalSql)
if (options?.lookup === undefined) {
return toFindResult(
result.rows.map((p) => parseDocWithProjection(p, options?.projection)),
@ -315,9 +360,9 @@ abstract class PostgresAdapterBase implements DbAdapter {
return
}
if (query.space === acc._id) return
const key = domain === DOMAIN_SPACE ? '_id' : domain === DOMAIN_TX ? 'data ->> "objectSpace"' : 'space'
const privateCheck = domain === DOMAIN_SPACE ? " OR sec.data ->> 'private' = 'false'" : ''
const q = `(sec.data -> 'members' @> '"${acc._id}"' OR sec."_class" = '${core.class.SystemSpace}'${privateCheck})`
const key = domain === DOMAIN_SPACE ? '_id' : domain === DOMAIN_TX ? "data ->> 'objectSpace'" : 'space'
const privateCheck = domain === DOMAIN_SPACE ? ' OR sec.private = false' : ''
const q = `(sec.members @> '{"${acc._id}"}' OR sec."_class" = '${core.class.SystemSpace}'${privateCheck})`
return `INNER JOIN ${translateDomain(DOMAIN_SPACE)} AS sec ON sec._id = ${domain}.${key} AND sec."workspaceId" = '${this.workspaceId.name}' AND ${q}`
}
}
@ -545,7 +590,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
const _class = Array.isArray(value) ? value[0] : value
const nested = Array.isArray(value) ? value[1] : undefined
const domain = translateDomain(this.hierarchy.getDomain(_class))
const tkey = domain === DOMAIN_MODEL ? key : this.transformKey(clazz, key)
const tkey = domain === DOMAIN_MODEL ? key : this.transformKey(baseDomain, clazz, key)
const as = `lookup_${domain}_${parentKey !== undefined ? parentKey + '_lookup_' + key : key}`
res.push({
isReverse: false,
@ -643,9 +688,9 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
const value = query[key]
if (value === undefined) continue
const isDataArray = this.checkDataArray(_class, key)
const tkey = this.getKey(_class, baseDomain, key, joins, isDataArray)
const translated = this.translateQueryValue(tkey, value, isDataArray)
const valueType = this.getValueType(_class, key)
const tkey = this.getKey(_class, baseDomain, key, joins, valueType === 'dataArray')
const translated = this.translateQueryValue(tkey, value, valueType)
if (translated !== undefined) {
res.push(translated)
}
@ -653,22 +698,23 @@ abstract class PostgresAdapterBase implements DbAdapter {
return res.join(' AND ')
}
private checkDataArray<T extends Doc>(_class: Ref<Class<T>>, key: string): boolean {
private getValueType<T extends Doc>(_class: Ref<Class<T>>, key: string): ValueType {
const splitted = key.split('.')
const mixinOrKey = splitted[0]
const domain = this.hierarchy.getDomain(_class)
if (this.hierarchy.isMixin(mixinOrKey as Ref<Class<Doc>>)) {
key = splitted.slice(1).join('.')
const attr = this.hierarchy.findAttribute(mixinOrKey as Ref<Class<Doc>>, key)
if (attr !== undefined) {
return attr.type._class === core.class.ArrOf
if (attr !== undefined && attr.type._class === core.class.ArrOf) {
return isDataField(domain, key) ? 'dataArray' : 'array'
}
return false
return 'common'
} else {
const attr = this.hierarchy.findAttribute(_class, key)
if (attr !== undefined) {
return attr.type._class === core.class.ArrOf
if (attr !== undefined && attr.type._class === core.class.ArrOf) {
return isDataField(domain, key) ? 'dataArray' : 'array'
}
return false
return 'common'
}
}
@ -731,12 +777,12 @@ abstract class PostgresAdapterBase implements DbAdapter {
isDataArray: boolean = false
): string {
if (key.startsWith('$lookup')) {
return this.transformLookupKey(key, joins, isDataArray)
return this.transformLookupKey(baseDomain, key, joins, isDataArray)
}
return `${baseDomain}.${this.transformKey(_class, key, isDataArray)}`
return `${baseDomain}.${this.transformKey(baseDomain, _class, key, isDataArray)}`
}
private transformLookupKey (key: string, joins: JoinProps[], isDataArray: boolean = false): string {
private transformLookupKey (domain: string, key: string, joins: JoinProps[], isDataArray: boolean = false): string {
const arr = key.split('.').filter((p) => p !== '$lookup')
const tKey = arr.pop() ?? ''
const path = arr.join('.')
@ -747,12 +793,17 @@ abstract class PostgresAdapterBase implements DbAdapter {
if (join.isReverse) {
return `${join.toAlias}->'${tKey}'`
}
const res = isDataField(tKey) ? (isDataArray ? `data->'${tKey}'` : `data#>>'{${tKey}}'`) : key
const res = isDataField(domain, tKey) ? (isDataArray ? `data->'${tKey}'` : `data#>>'{${tKey}}'`) : key
return `${join.toAlias}.${res}`
}
private transformKey<T extends Doc>(_class: Ref<Class<T>>, key: string, isDataArray: boolean = false): string {
if (!isDataField(key)) return `"${key}"`
private transformKey<T extends Doc>(
domain: string,
_class: Ref<Class<T>>,
key: string,
isDataArray: boolean = false
): string {
if (!isDataField(domain, key)) return `"${key}"`
const arr = key.split('.').filter((p) => p)
let tKey = ''
let isNestedField = false
@ -799,7 +850,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
return key
}
private translateQueryValue (tkey: string, value: any, isDataArray: boolean): string | undefined {
private translateQueryValue (tkey: string, value: any, type: ValueType): string | undefined {
if (value === null) {
return `${tkey} IS NULL`
} else if (typeof value === 'object' && !Array.isArray(value)) {
@ -825,7 +876,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
break
case '$in':
res.push(
isDataArray
type !== 'common'
? `${tkey} ?| array[${val.length > 0 ? val.map((v: any) => `'${v}'`).join(', ') : 'NULL'}]`
: `${tkey} IN (${val.length > 0 ? val.map((v: any) => `'${v}'`).join(', ') : 'NULL'})`
)
@ -856,9 +907,11 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
return res.length === 0 ? undefined : res.join(' AND ')
}
return isDataArray
? `${tkey} @> '${typeof value === 'string' ? '"' + value + '"' : value}'`
: `${tkey} = '${value}'`
return type === 'common'
? `${tkey} = '${value}'`
: type === 'array'
? `${tkey} @> '${typeof value === 'string' ? '{"' + value + '"}' : value}'`
: `${tkey} @> '${typeof value === 'string' ? '"' + value + '"' : value}'`
}
private getProjectionsAliases (join: JoinProps): string[] {
@ -876,8 +929,9 @@ abstract class PostgresAdapterBase implements DbAdapter {
`(SELECT jsonb_agg(${join.toAlias}.*) FROM ${join.table} AS ${join.toAlias} WHERE ${join.fromAlias}.${join.fromField} = ${join.toAlias}."${join.toField}" ${classsesQuery}) AS ${join.toAlias}`
]
}
const fields = getDocFieldsByDomains(join.table)
const res: string[] = []
for (const key of [...docFields, 'data']) {
for (const key of [...fields, 'data']) {
res.push(`${join.toAlias}."${key}" as "lookup_${join.path.replaceAll('.', '_')}_${key}"`)
}
return res
@ -897,7 +951,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
res.push(`${baseDomain}.*`)
} else {
for (const key in projection) {
if (isDataField(key)) {
if (isDataField(baseDomain, key)) {
if (!dataAdded) {
res.push(`${baseDomain}.data as data`)
dataAdded = true
@ -1046,7 +1100,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
if (docs.length === 0) {
return []
}
const res = await this.client.query(
const res = await this.connection.query(
`SELECT * FROM ${translateDomain(domain)} WHERE _id = ANY($1) AND "workspaceId" = $2`,
[docs, this.workspaceId.name]
)
@ -1056,37 +1110,59 @@ abstract class PostgresAdapterBase implements DbAdapter {
async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise<void> {
const arr = docs.concat()
return await this.retryTxn(async (client) => {
while (arr.length > 0) {
const part = arr.splice(0, 500)
const vals = part
.map((doc) => {
const d = convertDoc(doc, this.workspaceId.name)
return `('${d._id}', '${d.workspaceId}', '${d._class}', '${d.createdBy ?? d.modifiedBy}', '${d.modifiedBy}', ${d.modifiedOn}, ${d.createdOn ?? d.modifiedOn}, '${d.space}', ${
d.attachedTo != null ? `'${d.attachedTo}'` : 'NULL'
}, '${escapeBackticks(JSON.stringify(d.data))}')`
})
.join(', ')
await client.query(
`INSERT INTO ${translateDomain(domain)} (_id, "workspaceId", _class, "createdBy", "modifiedBy", "modifiedOn", "createdOn", space, "attachedTo", data) VALUES ${vals}
ON CONFLICT (_id, "workspaceId") DO UPDATE SET _class = EXCLUDED._class, "createdBy" = EXCLUDED."createdBy", "modifiedBy" = EXCLUDED."modifiedBy", "modifiedOn" = EXCLUDED."modifiedOn", "createdOn" = EXCLUDED."createdOn", space = EXCLUDED.space, "attachedTo" = EXCLUDED."attachedTo", data = EXCLUDED.data;`
)
const fields = getDocFieldsByDomains(domain)
const filedsWithData = [...fields, 'data']
const insertFields: string[] = []
const onConflict: string[] = []
for (const field of filedsWithData) {
insertFields.push(`"${field}"`)
onConflict.push(`"${field}" = EXCLUDED."${field}"`)
}
const insertStr = insertFields.join(', ')
const onConflictStr = onConflict.join(', ')
while (arr.length > 0) {
const part = arr.splice(0, 500)
const values: any[] = []
const vars: string[] = []
let index = 1
for (let i = 0; i < part.length; i++) {
const doc = part[i]
const variables: string[] = []
const d = convertDoc(domain, doc, this.workspaceId.name)
values.push(d.workspaceId)
variables.push(`$${index++}`)
for (const field of fields) {
values.push(d[field])
variables.push(`$${index++}`)
}
values.push(d.data)
variables.push(`$${index++}`)
vars.push(`(${variables.join(', ')})`)
}
})
const vals = vars.join(',')
await this.retryTxn(async (client) => {
await client.query(
`INSERT INTO ${translateDomain(domain)} ("workspaceId", ${insertStr}) VALUES ${vals}
ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr};`,
values
)
})
}
}
async clean (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
await this.client.query(`DELETE FROM ${translateDomain(domain)} WHERE _id = ANY($1) AND "workspaceId" = $2`, [
await this.connection.query(`DELETE FROM ${translateDomain(domain)} WHERE _id = ANY($1) AND "workspaceId" = $2`, [
docs,
this.workspaceId.name
])
}
async groupBy<T>(ctx: MeasureContext, domain: Domain, field: string): Promise<Set<T>> {
const key = isDataField(field) ? `data ->> '${field}'` : `"${field}"`
const key = isDataField(domain, field) ? `data ->> '${field}'` : `"${field}"`
const result = await ctx.with('groupBy', { domain }, async (ctx) => {
try {
const result = await this.client.query(
const result = await this.connection.query(
`SELECT DISTINCT ${key} as ${field} FROM ${translateDomain(domain)} WHERE "workspaceId" = $1`,
[this.workspaceId.name]
)
@ -1117,19 +1193,18 @@ abstract class PostgresAdapterBase implements DbAdapter {
;(op as any)['%hash%'] = null
}
TxProcessor.applyUpdate(doc, op)
const converted = convertDoc(doc, this.workspaceId.name)
const converted = convertDoc(domain, doc, this.workspaceId.name)
const updates: string[] = []
const { space, attachedTo, ...data } = op as any
let paramsIndex = 3
const { extractedFields, remainingData } = parseUpdate(domain, op)
const params: any[] = [doc._id, this.workspaceId.name]
if (space !== undefined) {
updates.push(`space = '${space}'`)
for (const key in extractedFields) {
updates.push(`"${key}" = $${paramsIndex++}`)
params.push((extractedFields as any)[key])
}
if (attachedTo !== undefined) {
updates.push(`"attachedTo" = ${attachedTo != null ? "'" + attachedTo + "'" : 'NULL'}`)
}
if (Object.keys(data).length > 0) {
updates.push('data = $3')
if (Object.keys(remainingData).length > 0) {
updates.push(`data = $${paramsIndex++}`)
params.push(converted.data)
}
await client.query(
@ -1145,22 +1220,41 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
async insert (domain: string, docs: Doc[]): Promise<TxResult> {
return await this.retryTxn(async (client) => {
while (docs.length > 0) {
const part = docs.splice(0, 500)
const vals = part
.map((doc) => {
const d = convertDoc(doc, this.workspaceId.name)
return `('${d._id}', '${d.workspaceId}', '${d._class}', '${d.createdBy ?? d.modifiedBy}', '${d.modifiedBy}', ${d.modifiedOn}, ${d.createdOn ?? d.modifiedOn}, '${d.space}', ${
d.attachedTo != null ? `'${d.attachedTo}'` : 'NULL'
}, '${escapeBackticks(JSON.stringify(d.data))}')`
})
.join(', ')
await client.query(
`INSERT INTO ${translateDomain(domain)} (_id, "workspaceId", _class, "createdBy", "modifiedBy", "modifiedOn", "createdOn", space, "attachedTo", data) VALUES ${vals}`
)
const fields = getDocFieldsByDomains(domain)
const filedsWithData = [...fields, 'data']
const insertFields: string[] = []
for (const field of filedsWithData) {
insertFields.push(`"${field}"`)
}
const insertStr = insertFields.join(', ')
while (docs.length > 0) {
const part = docs.splice(0, 500)
const values: any[] = []
const vars: string[] = []
let index = 1
for (let i = 0; i < part.length; i++) {
const doc = part[i]
const variables: string[] = []
const d = convertDoc(domain, doc, this.workspaceId.name)
values.push(d.workspaceId)
variables.push(`$${index++}`)
for (const field of fields) {
values.push(d[field])
variables.push(`$${index++}`)
}
values.push(d.data)
variables.push(`$${index++}`)
vars.push(`(${variables.join(', ')})`)
}
})
const vals = vars.join(',')
await this.retryTxn(async (client) => {
await client.query(
`INSERT INTO ${translateDomain(domain)} ("workspaceId", ${insertStr}) VALUES ${vals}`,
values
)
})
}
return {}
}
}
@ -1218,18 +1312,30 @@ class PostgresAdapter extends PostgresAdapterBase {
}
private async txMixin (ctx: MeasureContext, tx: TxMixin<Doc, Doc>): Promise<TxResult> {
return await ctx.with('tx-mixin', { _class: tx.objectClass, mixin: tx.mixin }, async () => {
return await this.retryTxn(async (client) => {
await ctx.with('tx-mixin', { _class: tx.objectClass, mixin: tx.mixin }, async () => {
await this.retryTxn(async (client) => {
const doc = await this.findDoc(ctx, client, tx.objectClass, tx.objectId, true)
if (doc === undefined) return {}
if (doc === undefined) return
TxProcessor.updateMixin4Doc(doc, tx)
const converted = convertDoc(doc, this.workspaceId.name)
const domain = this.hierarchy.getDomain(tx.objectClass)
const converted = convertDoc(domain, doc, this.workspaceId.name)
const updates: string[] = ['"modifiedBy" = $1', '"modifiedOn" = $2']
let paramsIndex = 5
const { extractedFields } = parseUpdate(domain, tx.attributes as Partial<Doc>)
const params: any[] = [tx.modifiedBy, tx.modifiedOn, tx.objectId, this.workspaceId.name]
for (const key in extractedFields) {
updates.push(`"${key}" = $${paramsIndex++}`)
params.push(converted[key])
}
updates.push(`data = $${paramsIndex++}`)
params.push(converted.data)
await client.query(
`UPDATE ${translateDomain(this.hierarchy.getDomain(tx.objectClass))} SET "modifiedBy" = $1, "modifiedOn" = $2, data = $5 WHERE _id = $3 AND "workspaceId" = $4`,
[tx.modifiedBy, tx.modifiedOn, tx.objectId, this.workspaceId.name, converted.data]
`UPDATE ${translateDomain(domain)} SET ${updates.join(', ')} WHERE _id = $3 AND "workspaceId" = $4`,
params
)
})
})
return {}
}
async tx (ctx: MeasureContext, ...txes: Tx[]): Promise<TxResult[]> {
@ -1277,22 +1383,22 @@ class PostgresAdapter extends PostgresAdapterBase {
doc = await this.findDoc(ctx, client, tx.objectClass, tx.objectId, true)
if (doc === undefined) return {}
TxProcessor.applyUpdate(doc, ops)
const converted = convertDoc(doc, this.workspaceId.name)
const domain = this.hierarchy.getDomain(tx.objectClass)
const converted = convertDoc(domain, doc, this.workspaceId.name)
const updates: string[] = ['"modifiedBy" = $1', '"modifiedOn" = $2']
const { space, attachedTo, ...data } = ops as any
let paramsIndex = 5
const { extractedFields, remainingData } = parseUpdate(domain, ops)
const params: any[] = [tx.modifiedBy, tx.modifiedOn, tx.objectId, this.workspaceId.name]
if (space !== undefined) {
updates.push(`space = '${space}'`)
for (const key in extractedFields) {
updates.push(`"${key}" = $${paramsIndex++}`)
params.push(converted[key])
}
if (attachedTo !== undefined) {
updates.push(`"attachedTo" = ${attachedTo != null ? "'" + attachedTo + "'" : 'NULL'}`)
}
if (Object.keys(data).length > 0) {
updates.push('data = $5')
if (Object.keys(remainingData).length > 0) {
updates.push(`data = $${paramsIndex++}`)
params.push(converted.data)
}
await client.query(
`UPDATE ${translateDomain(this.hierarchy.getDomain(tx.objectClass))} SET ${updates.join(', ')} WHERE _id = $3 AND "workspaceId" = $4`,
`UPDATE ${translateDomain(domain)} SET ${updates.join(', ')} WHERE _id = $3 AND "workspaceId" = $4`,
params
)
})
@ -1315,21 +1421,24 @@ class PostgresAdapter extends PostgresAdapterBase {
): Promise<TxResult> {
return await ctx.with('update jsonb_set', {}, async () => {
const updates: string[] = ['"modifiedBy" = $1', '"modifiedOn" = $2']
const params: any[] = [tx.modifiedBy, tx.modifiedOn, tx.objectId, this.workspaceId.name]
let paramsIndex = 5
const domain = this.hierarchy.getDomain(tx.objectClass)
const { extractedFields, remainingData } = parseUpdate(domain, tx.operations)
const { space, attachedTo, ...ops } = tx.operations as any
if (ops['%hash%'] === undefined) {
ops['%hash%'] = null
}
if (space !== undefined) {
updates.push(`space = '${space}'`)
}
if (attachedTo !== undefined) {
updates.push(`"attachedTo" = ${attachedTo != null ? "'" + attachedTo + "'" : 'NULL'}`)
for (const key in extractedFields) {
updates.push(`"${key}" = $${paramsIndex++}`)
params.push((extractedFields as any)[key])
}
let from = 'data'
let dataUpdated = false
for (const key in ops) {
for (const key in remainingData) {
if (ops[key] === undefined) continue
from = `jsonb_set(${from}, '{${key}}', '${getUpdateValue(ops[key])}', true)`
from = `jsonb_set(${from}, '{${key}}', $${paramsIndex++}::jsonb, true)`
params.push(JSON.stringify((remainingData as any)[key]))
dataUpdated = true
}
if (dataUpdated) {
@ -1340,13 +1449,13 @@ class PostgresAdapter extends PostgresAdapterBase {
await this.retryTxn(async (client) => {
await client.query(
`UPDATE ${translateDomain(this.hierarchy.getDomain(tx.objectClass))} SET ${updates.join(', ')} WHERE _id = $3 AND "workspaceId" = $4`,
[tx.modifiedBy, tx.modifiedOn, tx.objectId, this.workspaceId.name]
params
)
if (retrieve) {
const object = await this.findDoc(ctx, client, tx.objectClass, tx.objectId)
return { object }
}
})
if (retrieve) {
const object = await this.findDoc(ctx, this.connection, tx.objectClass, tx.objectId)
return { object }
}
} catch (err) {
console.error(err)
}
@ -1373,15 +1482,16 @@ class PostgresAdapter extends PostgresAdapterBase {
}
protected async txRemoveDoc (ctx: MeasureContext, tx: TxRemoveDoc<Doc>): Promise<TxResult> {
return await ctx.with('tx-remove-doc', { _class: tx.objectClass }, async () => {
await ctx.with('tx-remove-doc', { _class: tx.objectClass }, async () => {
const domain = translateDomain(this.hierarchy.getDomain(tx.objectClass))
return await this.retryTxn(async (client) => {
await this.retryTxn(async (client) => {
await client.query(`DELETE FROM ${domain} WHERE _id = $1 AND "workspaceId" = $2`, [
tx.objectId,
this.workspaceId.name
])
})
})
return {}
}
}
@ -1405,8 +1515,8 @@ class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter {
}
async getModel (ctx: MeasureContext): Promise<Tx[]> {
const res = await this.client.query(
`SELECT * FROM ${translateDomain(DOMAIN_TX)} WHERE "workspaceId" = '${this.workspaceId.name}' AND data->>'objectSpace' = '${core.space.Model}' ORDER BY _id ASC, "modifiedOn" ASC`
const res = await this.connection.query(
`SELECT * FROM ${translateDomain(DOMAIN_TX)} WHERE "workspaceId" = '${this.workspaceId.name}' AND data->>'objectSpace' = '${core.space.Model}' ORDER BY _id ASC, "modifiedOn" ASC`
)
const model = res.rows.map((p) => parseDoc<Tx>(p))
// We need to put all core.account.System transactions first
@ -1428,7 +1538,10 @@ export async function createPostgresAdapter (
modelDb: ModelDb
): Promise<DbAdapter> {
const client = getDBClient(url)
const adapter = new PostgresAdapter(await client.getClient(), client, workspaceId, hierarchy, modelDb)
const pool = await client.getClient()
const mainConnection = await pool.connect()
const txConnection = await pool.connect()
const adapter = new PostgresAdapter(pool, mainConnection, txConnection, client, workspaceId, hierarchy, modelDb)
return adapter
}
@ -1443,7 +1556,10 @@ export async function createPostgresTxAdapter (
modelDb: ModelDb
): Promise<TxAdapter> {
const client = getDBClient(url)
const adapter = new PostgresTxAdapter(await client.getClient(), client, workspaceId, hierarchy, modelDb)
const pool = await client.getClient()
const mainConnection = await pool.connect()
const txConnection = await pool.connect()
const adapter = new PostgresTxAdapter(pool, mainConnection, txConnection, client, workspaceId, hierarchy, modelDb)
await adapter.init()
return adapter
}

View File

@ -0,0 +1 @@
export type ValueType = 'common' | 'array' | 'dataArray'

View File

@ -18,9 +18,11 @@ import core, {
AccountRole,
type Class,
type Doc,
type DocumentUpdate,
type Domain,
type FieldIndexConfig,
generateId,
type MixinUpdate,
type Projection,
type Ref,
type WorkspaceId
@ -28,6 +30,7 @@ import core, {
import { PlatformError, unknownStatus } from '@hcengineering/platform'
import { type DomainHelperOperations } from '@hcengineering/server-core'
import { Pool, type PoolClient } from 'pg'
import { defaultSchema, domainSchemas, getSchema } from './schemas'
const connections = new Map<string, PostgresClientReferenceImpl>()
@ -87,24 +90,26 @@ export async function createTable (client: Pool, domains: string[]): Promise<voi
const toCreate = mapped.filter((it) => !exists.rows.map((it) => it.table_name).includes(it))
await retryTxn(client, async (client) => {
for (const domain of toCreate) {
const schema = getSchema(domain)
const fields: string[] = []
for (const key in schema) {
const val = schema[key]
fields.push(`"${key}" ${val[0]} ${val[1] ? 'NOT NULL' : ''}`)
}
const colums = fields.join(', ')
await client.query(
`CREATE TABLE ${domain} (
"workspaceId" VARCHAR(255) NOT NULL,
_id VARCHAR(255) NOT NULL,
_class VARCHAR(255) NOT NULL,
"createdBy" VARCHAR(255),
"modifiedBy" VARCHAR(255) NOT NULL,
"modifiedOn" bigint NOT NULL,
"createdOn" bigint,
space VARCHAR(255) NOT NULL,
"attachedTo" VARCHAR(255),
"workspaceId" text NOT NULL,
${colums},
data JSONB NOT NULL,
PRIMARY KEY("workspaceId", _id)
)`
)
await client.query(`
CREATE INDEX ${domain}_attachedTo ON ${domain} ("attachedTo")
`)
if (schema.attachedTo !== undefined) {
await client.query(`
CREATE INDEX ${domain}_attachedTo ON ${domain} ("attachedTo")
`)
}
await client.query(`
CREATE INDEX ${domain}_class ON ${domain} (_class)
`)
@ -221,19 +226,67 @@ export function getDBClient (connectionString: string, database?: string): Postg
return new ClientRef(existing)
}
export function convertDoc<T extends Doc> (doc: T, workspaceId: string): DBDoc {
const { _id, _class, createdBy, modifiedBy, modifiedOn, createdOn, space, attachedTo, ...data } = doc as any
return {
_id,
_class,
createdBy,
modifiedBy,
modifiedOn,
createdOn,
space,
attachedTo,
export function convertDoc<T extends Doc> (domain: string, doc: T, workspaceId: string): DBDoc {
const extractedFields: Doc & Record<string, any> = {
_id: doc._id,
space: doc.space,
createdBy: doc.createdBy,
modifiedBy: doc.modifiedBy,
modifiedOn: doc.modifiedOn,
createdOn: doc.createdOn,
_class: doc._class
}
const remainingData: Partial<T> = {}
for (const key in doc) {
if (Object.keys(extractedFields).includes(key)) continue
if (getDocFieldsByDomains(domain).includes(key)) {
extractedFields[key] = doc[key]
} else {
remainingData[key] = doc[key]
}
}
const res: any = {
...extractedFields,
workspaceId,
data
data: remainingData
}
return res
}
export function parseUpdate<T extends Doc> (
domain: string,
ops: DocumentUpdate<T> | MixinUpdate<Doc, T>
): {
extractedFields: Partial<T>
remainingData: Partial<T>
} {
const extractedFields: Partial<T> = {}
const remainingData: Partial<T> = {}
for (const key in ops) {
if (key === '$push' || key === '$pull') {
const val = (ops as any)[key]
for (const k in val) {
if (getDocFieldsByDomains(domain).includes(k)) {
;(extractedFields as any)[k] = val[key]
} else {
;(remainingData as any)[k] = val[key]
}
}
} else {
if (getDocFieldsByDomains(domain).includes(key)) {
;(extractedFields as any)[key] = (ops as any)[key]
} else {
;(remainingData as any)[key] = (ops as any)[key]
}
}
}
return {
extractedFields,
remainingData
}
}
@ -343,39 +396,17 @@ export function parseDoc<T extends Doc> (doc: DBDoc): T {
export interface DBDoc extends Doc {
workspaceId: string
attachedTo?: Ref<Doc>
data: Record<string, any>
[key: string]: any
}
export function isDataField (field: string): boolean {
return !docFields.includes(field)
export function isDataField (domain: string, field: string): boolean {
return !getDocFieldsByDomains(domain).includes(field)
}
export const docFields: string[] = [
'_id',
'_class',
'createdBy',
'modifiedBy',
'modifiedOn',
'createdOn',
'space',
'attachedTo'
] as const
export function getUpdateValue (value: any): string {
if (typeof value === 'string') {
return '"' + escapeDoubleQuotes(value) + '"'
}
if (typeof value === 'object') {
return JSON.stringify(value)
}
return value
}
function escapeDoubleQuotes (jsonString: string): string {
const unescapedQuotes = /(?<!\\)"/g
return jsonString.replace(unescapedQuotes, '\\"')
export function getDocFieldsByDomains (domain: string): string[] {
const schema = domainSchemas[domain] ?? defaultSchema
return Object.keys(schema)
}
export interface JoinProps {
@ -389,3 +420,38 @@ export interface JoinProps {
toClass: Ref<Class<Doc>>
classes?: Ref<Class<Doc>>[] // filter by classes
}
export class Mutex {
private locked: boolean = false
private readonly waitingQueue: Array<(value: boolean) => void> = []
private async acquire (): Promise<void> {
while (this.locked) {
await new Promise<boolean>((resolve) => {
this.waitingQueue.push(resolve)
})
}
this.locked = true
}
private release (): void {
if (!this.locked) {
throw new Error('Mutex is not locked')
}
this.locked = false
const nextResolver = this.waitingQueue.shift()
if (nextResolver !== undefined) {
nextResolver(true)
}
}
async runExclusive<T>(fn: () => Promise<T> | T): Promise<T> {
await this.acquire()
try {
return await fn()
} finally {
this.release()
}
}
}

View File

@ -86,7 +86,8 @@ export async function createWorkspace (
version: Data<Version>,
progress: number,
message?: string
) => Promise<void>
) => Promise<void>,
external: boolean = false
): Promise<void> {
const childLogger = ctx.newChild('createWorkspace', {}, { workspace: workspaceInfo.workspace })
const ctxModellogger: ModelLogger = {
@ -162,7 +163,8 @@ export async function createWorkspace (
await handleWsEvent?.('progress', version, 80 + Math.round((Math.min(value, 100) / 100) * 20))
},
false,
'disable'
'disable',
external
)
await handleWsEvent?.('create-done', version, 100, '')

View File

@ -43,6 +43,7 @@ services:
links:
- mongodb
- minio
- postgres
ports:
- 3003:3003
volumes:
@ -59,6 +60,7 @@ services:
image: hardcoreeng/workspace
links:
- mongodb
- postgres
- minio
volumes:
- ./branding-test.json:/var/cfg/branding.json
@ -106,6 +108,7 @@ services:
- elastic
- minio
- rekoni
- postgres
- account
ports:
- 3334:3334

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.