Merge remote-tracking branch 'origin/develop' into staging

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-11-22 21:19:57 +07:00
commit 449fd73610
No known key found for this signature in database
GPG Key ID: BD80F68D68D8F7F2
13 changed files with 161 additions and 84 deletions

3
.vscode/launch.json vendored
View File

@ -411,7 +411,8 @@
"MINIO_SECRET_KEY": "minioadmin",
"PLATFORM_OPERATION_LOGGING": "true",
"FRONT_URL": "http://localhost:8080",
"PORT": "3500"
"PORT": "3500",
"STATS_URL": "http://host.docker.internal:4900"
},
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
"sourceMaps": true,

View File

@ -997,6 +997,8 @@ export class LiveQuery implements WithTx, Client {
const result = q.result
this.refs.updateDocuments(q, result.getDocs())
if (bulkUpdate) {
this.queriesToUpdate.set(q.id, q)
} else {

View File

@ -1,5 +1,4 @@
import {
clone,
Hierarchy,
matchQuery,
toFindResult,
@ -59,14 +58,17 @@ export class Refs {
query: DocumentQuery<Doc>,
options?: FindOptions<T>
): FindResult<T> | null {
const classKey = _class + ':' + JSON.stringify(options?.lookup ?? {})
if (typeof query._id === 'string') {
// One document query
const doc = this.documentRefs.get(classKey)?.get(query._id)?.doc
if (doc !== undefined) {
const q = matchQuery([doc], query, _class, this.getHierarchy())
if (q.length > 0) {
return toFindResult(clone([doc]), 1)
const desc = this.getHierarchy().getDescendants(_class)
for (const des of desc) {
const classKey = des + ':' + JSON.stringify(options?.lookup ?? {})
// One document query
const doc = this.documentRefs.get(classKey)?.get(query._id)?.doc
if (doc !== undefined) {
const q = matchQuery([doc], query, _class, this.getHierarchy())
if (q.length > 0) {
return toFindResult(this.getHierarchy().clone([doc]), 1)
}
}
}
}
@ -76,13 +78,14 @@ export class Refs {
options?.sort === undefined &&
options?.projection === undefined
) {
const classKey = _class + ':' + JSON.stringify(options?.lookup ?? {})
const docs = this.documentRefs.get(classKey)
if (docs !== undefined) {
const _docs = Array.from(docs.values()).map((it) => it.doc)
const q = matchQuery(_docs, query, _class, this.getHierarchy())
if (q.length > 0) {
return toFindResult(clone([q[0]]), 1)
return toFindResult(this.getHierarchy().clone([q[0]]), 1)
}
}
}

View File

@ -97,7 +97,7 @@
let selecting = false
function handleMouseDown (): void {
function handleMouseDown (event: MouseEvent): void {
function handleMouseMove (): void {
if (editor !== undefined && !editor.state.selection.empty) {
selecting = true
@ -112,9 +112,11 @@
document.removeEventListener('mouseup', handleMouseUp)
}
if (editor !== undefined) {
document.addEventListener('mousemove', handleMouseMove)
document.addEventListener('mouseup', handleMouseUp)
if (editor !== undefined && visible && visibleActions.length > 0) {
if (event.target !== null && toolbar !== null && !toolbar.contains(event.target as Node)) {
document.addEventListener('mousemove', handleMouseMove)
document.addEventListener('mouseup', handleMouseUp)
}
}
}

View File

@ -150,8 +150,13 @@ class SvelteNodeView extends NodeView<SvelteNodeViewComponent, Editor, SvelteNod
handleSelectionUpdate (): void {
const { from, to } = this.editor.state.selection
const pos = this.getPos()
if (from <= this.getPos() && to >= this.getPos() + this.node.nodeSize) {
if (typeof pos !== 'number') {
return
}
if (from <= pos && to >= pos + this.node.nodeSize) {
if (this.renderer.props.selected === true) {
return
}

View File

@ -72,8 +72,7 @@ export function createTodoItemExtension (mode: TextEditorMode, ctx: any): AnyExt
return SvelteNodeViewRenderer(ToDoItemNodeView, {
contentAs: 'li',
contentClass: 'todo-item',
componentProps: { objectId, objectClass, objectSpace },
ignoreMutation: () => true
componentProps: { objectId, objectClass, objectSpace }
})
}
}).configure({
@ -90,7 +89,7 @@ export function createTodoListExtension (mode: TextEditorMode, ctx: any): AnyExt
return TodoListExtension.extend({
addNodeView () {
return SvelteNodeViewRenderer(ToDoListNodeView, { ignoreMutation: () => true })
return SvelteNodeViewRenderer(ToDoListNodeView, {})
}
}).configure({
HTMLAttributes: {

View File

@ -128,8 +128,9 @@ export async function updateIssueRelation (
}
export async function getIssueIdByIdentifier (identifier: string): Promise<Ref<Issue> | undefined> {
if (!isIssueId(identifier)) return
const client = getClient()
const issue = await client.findOne(tracker.class.Issue, { identifier }, { projection: { _id: 1 } })
const issue = await client.findOne(tracker.class.Issue, { identifier })
return issue?._id
}

View File

@ -15,6 +15,8 @@ const model = builder().getTxes()
// const dbURL = 'postgresql://root@localhost:26257/defaultdb?sslmode=disable'
const dbURL = 'postgresql://postgres:example@localhost:5432'
const STORAGE_CONFIG = 'minio|localhost:9000?accessKey=minioadmin&secretKey=minioadmin&useSSL=false'
// jest.setTimeout(4500000)
describe.skip('test-backup-find', () => {
it('check create/load/clean', async () => {
const toolCtx = new MeasureMetricsContext('-', {})
@ -53,6 +55,47 @@ describe.skip('test-backup-find', () => {
const findDocs = await client.findAll(core.class.Tx, {})
expect(findDocs.length).toBe(0)
//
} finally {
await pipeline.close()
await storageAdapter.close()
}
})
it('check traverse', async () => {
const toolCtx = new MeasureMetricsContext('-', {})
// We should setup a DB with docuemnts and try to backup them.
const wsUrl = { name: 'testdb-backup-test', workspaceName: 'test', workspaceUrl: 'test' }
const { pipeline, storageAdapter } = await getServerPipeline(toolCtx, model, dbURL, wsUrl, {
storageConfig: STORAGE_CONFIG,
disableTriggers: true
})
try {
const client = wrapPipeline(toolCtx, pipeline, wsUrl)
const lowLevel = pipeline.context.lowLevelStorage as LowLevelStorage
// We need to create a backup docs if they are missing.
await prepareTxes(lowLevel, toolCtx, 1500)
const iter = await lowLevel.traverse(DOMAIN_TX, {})
const allDocs: Doc[] = []
while (true) {
const docs = await iter.next(50)
if (docs == null || docs?.length === 0) {
break
}
await client.clean(
DOMAIN_TX,
docs.map((doc) => doc._id)
)
allDocs.push(...docs)
}
expect(allDocs.length).toBeGreaterThan(1449)
const findDocs = await client.findAll(core.class.Tx, {})
expect(findDocs.length).toBe(0)
//
} finally {
await pipeline.close()
@ -60,12 +103,16 @@ describe.skip('test-backup-find', () => {
}
})
})
async function prepareTxes (lowLevel: LowLevelStorage, toolCtx: MeasureMetricsContext): Promise<void> {
async function prepareTxes (
lowLevel: LowLevelStorage,
toolCtx: MeasureMetricsContext,
count: number = 500
): Promise<void> {
const docs = await lowLevel.rawFindAll(DOMAIN_TX, {})
if ((docs?.length ?? 0) < 500) {
if ((docs?.length ?? 0) < count) {
// We need to fill some documents to be pressent
const docs: TxCreateDoc<Doc>[] = []
for (let i = 0; i < 500; i++) {
for (let i = 0; i < count; i++) {
docs.push({
_class: core.class.TxCreateDoc,
_id: generateId(),

View File

@ -36,7 +36,7 @@ export class ModifiedMiddleware extends BaseMiddleware implements Middleware {
for (const tx of txes) {
if (tx.modifiedBy !== core.account.System && ctx.contextData.userEmail !== systemAccountEmail) {
tx.modifiedOn = Date.now()
tx.createdOn = tx.createdOn ?? tx.modifiedOn
tx.createdOn = tx.modifiedOn
}
}
return this.provideTx(ctx, txes)

View File

@ -225,8 +225,8 @@ export class TriggersMiddleware extends BaseMiddleware implements Middleware {
)
ctx.contextData = asyncContextData
if (!((ctx as MeasureContext<SessionDataImpl>).contextData.isAsyncContext ?? false)) {
ctx.id = generateId()
if ((ctx as MeasureContext<SessionDataImpl>).contextData.isAsyncContext ?? false) {
ctx.id = 'async_tr' + generateId()
}
const aresult = await this.triggers.apply(
ctx,

View File

@ -27,7 +27,6 @@ import core, {
DOMAIN_TX,
type FindOptions,
type FindResult,
generateId,
groupByArray,
type Hierarchy,
isOperator,
@ -64,7 +63,7 @@ import {
type TxAdapter
} from '@hcengineering/server-core'
import type postgres from 'postgres'
import { getDocFieldsByDomains, getSchema, translateDomain } from './schemas'
import { getDocFieldsByDomains, getSchema, type Schema, translateDomain } from './schemas'
import { type ValueType } from './types'
import {
convertDoc,
@ -83,6 +82,31 @@ import {
type PostgresClientReference
} from './utils'
async function * createCursorGenerator (
client: postgres.ReservedSql,
sql: string,
schema: Schema,
bulkSize = 50
): AsyncGenerator<Doc[]> {
const cursor = client.unsafe(sql).cursor(bulkSize)
try {
let docs: Doc[] = []
for await (const part of cursor) {
docs.push(...part.filter((it) => it != null).map((it) => parseDoc(it as any, schema)))
if (docs.length > 0) {
yield docs
docs = []
}
}
if (docs.length > 0) {
yield docs
docs = []
}
} catch (err: any) {
console.error('failed to recieve data', { err })
}
}
abstract class PostgresAdapterBase implements DbAdapter {
protected readonly _helper: DBCollectionHelper
protected readonly tableFields = new Map<string, string[]>()
@ -174,49 +198,31 @@ abstract class PostgresAdapterBase implements DbAdapter {
): Promise<Iterator<T>> {
const schema = getSchema(_domain)
const client = await this.client.reserve()
let closed = false
const cursorName = `cursor_${translateDomain(this.workspaceId.name)}_${translateDomain(_domain)}_${generateId()}`
const close = async (cursorName: string): Promise<void> => {
if (closed) return
try {
await client.unsafe(`CLOSE ${cursorName}`)
await client.unsafe('COMMIT;')
} finally {
client.release()
closed = true
}
const tdomain = translateDomain(_domain)
const sqlChunks: string[] = [`SELECT * FROM ${tdomain}`]
sqlChunks.push(`WHERE ${this.buildRawQuery(tdomain, query, options)}`)
if (options?.sort !== undefined) {
sqlChunks.push(this.buildRawOrder(tdomain, options.sort))
}
const init = async (): Promise<void> => {
const domain = translateDomain(_domain)
const sqlChunks: string[] = [`CURSOR FOR SELECT * FROM ${domain}`]
sqlChunks.push(`WHERE ${this.buildRawQuery(domain, query, options)}`)
if (options?.sort !== undefined) {
sqlChunks.push(this.buildRawOrder(domain, options.sort))
}
if (options?.limit !== undefined) {
sqlChunks.push(`LIMIT ${options.limit}`)
}
const finalSql: string = sqlChunks.join(' ')
await client.unsafe('BEGIN;')
await client.unsafe(`DECLARE ${cursorName} ${finalSql}`)
if (options?.limit !== undefined) {
sqlChunks.push(`LIMIT ${options.limit}`)
}
const finalSql: string = sqlChunks.join(' ')
const next = async (count: number): Promise<T[] | null> => {
const result = await client.unsafe(`FETCH ${count} FROM ${cursorName}`)
if (result.length === 0) {
await close(cursorName)
return null
}
return result.map((p) => parseDoc(p as any, schema))
}
await init()
const cursor: AsyncGenerator<Doc[]> = createCursorGenerator(client, finalSql, schema)
return {
next,
next: async (count: number): Promise<T[] | null> => {
const result = await cursor.next()
if (result.done === true || result.value.length === 0) {
return null
}
return result.value as T[]
},
close: async () => {
await close(cursorName)
await cursor.return([])
client.release()
}
}
}
@ -384,8 +390,9 @@ abstract class PostgresAdapterBase implements DbAdapter {
params
)
})
} catch (err) {
} catch (err: any) {
console.error(err, { domain, params, updates })
throw err
} finally {
conn.release()
}
@ -1158,17 +1165,10 @@ abstract class PostgresAdapterBase implements DbAdapter {
const workspaceId = this.workspaceId
async function * createBulk (projection: string, query: string, limit = 50): AsyncGenerator<Doc[]> {
const cursor = client
.unsafe(`SELECT ${projection} FROM ${tdomain} WHERE "workspaceId" = '${workspaceId.name}' AND ${query}`)
.cursor(limit)
try {
for await (const part of cursor) {
yield part.filter((it) => it != null).map((it) => parseDoc(it as any, schema))
}
} catch (err: any) {
ctx.error('failed to recieve data', { err })
}
function createBulk (projection: string, query: string, limit = 50): AsyncGenerator<Doc[]> {
const sql = `SELECT ${projection} FROM ${tdomain} WHERE "workspaceId" = '${workspaceId.name}' AND ${query}`
return createCursorGenerator(client, sql, schema, limit)
}
let bulk: AsyncGenerator<Doc[]>
let forcedRecheck = false
@ -1191,7 +1191,6 @@ abstract class PostgresAdapterBase implements DbAdapter {
initialized = true
await flush(true) // We need to flush, so wrong id documents will be updated.
bulk = createBulk('_id, "%hash%"', '"%hash%" IS NOT NULL AND "%hash%" <> \'\'')
// bulk = createBulk('_id, "%hash%, data', '"%hash%" IS NOT NULL AND "%hash%" <> \'\'')
}
let docs = await ctx.with('next', { mode }, () => bulk.next())
@ -1235,6 +1234,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
},
close: async () => {
await ctx.with('flush', {}, () => flush(true))
await bulk.return([]) // We need to close generator, just in case
client?.release()
ctx.end()
}
@ -1246,11 +1246,14 @@ abstract class PostgresAdapterBase implements DbAdapter {
if (docs.length === 0) {
return []
}
return await this.withConnection(ctx, async (connection) => {
const client = await this.client.reserve()
try {
const res =
await connection`SELECT * FROM ${connection(translateDomain(domain))} WHERE _id = ANY(${docs}) AND "workspaceId" = ${this.workspaceId.name}`
await client`SELECT * FROM ${client(translateDomain(domain))} WHERE _id = ANY(${docs}) AND "workspaceId" = ${this.workspaceId.name}`
return res.map((p) => parseDocWithProjection(p as any, domain))
})
} finally {
client.release()
}
})
}
@ -1266,7 +1269,9 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
const insertStr = insertFields.join(', ')
const onConflictStr = onConflict.join(', ')
await this.withConnection(ctx, async (connection) => {
const client = await this.client.reserve()
try {
const domainFields = new Set(getDocFieldsByDomains(domain))
const toUpload = [...docs]
const tdomain = translateDomain(domain)
@ -1299,7 +1304,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
const vals = vars.join(',')
await this.retryTxn(connection, (client) =>
await this.retryTxn(client, (client) =>
client.unsafe(
`INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals}
ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr};`,
@ -1307,7 +1312,12 @@ abstract class PostgresAdapterBase implements DbAdapter {
)
)
}
})
} catch (err: any) {
ctx.error('failed to upload', { err })
throw err
} finally {
client.release()
}
})
}

View File

@ -194,7 +194,7 @@ class PostgresClientReferenceImpl {
void (async () => {
this.onclose()
const cl = await this.client
await cl.end()
await cl.end({ timeout: 1 })
})()
}
}

View File

@ -12,9 +12,16 @@ services:
image: postgres
environment:
- POSTGRES_PASSWORD=example
- PGUSER=postgres
ports:
- 5433:5432
restart: unless-stopped
healthcheck:
#CHANGE 1: this command checks if the database is ready, right on the source db server
test: [ "CMD-SHELL", "pg_isready" ]
interval: 5s
timeout: 5s
retries: 5
minio:
image: 'minio/minio'
command: server /data --address ":9000" --console-address ":9001"