UBERF-8849: Fix update performance (#7393)
Some checks are pending
CI / build (push) Waiting to run
CI / svelte-check (push) Blocked by required conditions
CI / formatting (push) Blocked by required conditions
CI / test (push) Blocked by required conditions
CI / uitest (push) Waiting to run
CI / uitest-pg (push) Waiting to run
CI / uitest-qms (push) Waiting to run
CI / docker-build (push) Blocked by required conditions
CI / dist-build (push) Blocked by required conditions

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-12-09 17:23:31 +07:00 committed by GitHub
parent 7b2bdb2c65
commit 37eff56860
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 37 additions and 56 deletions

12
.vscode/launch.json vendored
View File

@ -34,7 +34,8 @@
"request": "launch",
"args": ["src/__start.ts"],
"env": {
"FULLTEXT_URL": "http://localhost:4700",
// "FULLTEXT_URL": "http://localhost:4700",
"FULLTEXT_URL": "http://host.docker.internal:4702",
// "MONGO_URL": "mongodb://localhost:27017",
// "DB_URL": "mongodb://localhost:27017",
// "DB_URL": "postgresql://postgres:example@localhost:5432",
@ -58,7 +59,7 @@
"ELASTIC_INDEX_NAME": "local_storage_index",
"UPLOAD_URL": "/files",
"AI_BOT_URL": "http://localhost:4010",
"STATS_URL": "http://host.docker.internal:4900",
"STATS_URL": "http://host.docker.internal:4900"
},
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
"runtimeVersion": "20",
@ -74,11 +75,12 @@
"request": "launch",
"args": ["src/index.ts"],
"env": {
"PORT": "4700",// For mongo
// "PORT": "4701", // for pg
// "PORT": "4700",// For mongo
"PORT": "4702", // for cockroach
"FULLTEXT_DB_URL": "http://localhost:9200",
"DB_URL": "mongodb://localhost:27017",
// "DB_URL": "mongodb://localhost:27017",
// "DB_URL": "postgresql://postgres:example@localhost:5432",
"DB_URL": "postgresql://root@host.docker.internal:26257/defaultdb?sslmode=disable",
"STORAGE_CONFIG": "minio|localhost?accessKey=minioadmin&secretKey=minioadmin",
"SERVER_SECRET": "secret",
"REKONI_URL": "http://localhost:4004",

View File

@ -87,7 +87,7 @@ export async function OnChange (txes: Tx[], control: TriggerControl): Promise<Tx
needIndex: true,
modifiedBy: tx.modifiedBy,
modifiedOn: tx.modifiedOn,
space: cud.space,
space: cud.objectSpace,
removed: cud._class === core.class.TxRemoveDoc
})
}

View File

@ -15,9 +15,9 @@
import {
type Class,
type Data,
type Doc,
type DocumentQuery,
type DocumentUpdate,
type Domain,
type FieldIndexConfig,
type FindResult,
@ -81,7 +81,11 @@ export interface DbAdapter extends LowLevelStorage {
tx: (ctx: MeasureContext, ...tx: Tx[]) => Promise<TxResult[]>
// Bulk update operations
update: (ctx: MeasureContext, domain: Domain, operations: Map<Ref<Doc>, DocumentUpdate<Doc>>) => Promise<void>
update: <T extends Doc>(
ctx: MeasureContext,
domain: Domain,
operations: Map<Ref<Doc>, Partial<Data<T>>>
) => Promise<void>
// Allow to register a handler to listen for domain operations
on?: (handler: DbAdapterHandler) => void

View File

@ -15,6 +15,7 @@
import core, {
type Class,
type Data,
type Doc,
type DocumentQuery,
type DocumentUpdate,
@ -101,7 +102,11 @@ export class DummyDbAdapter implements DbAdapter {
async clean (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<void> {}
async update (ctx: MeasureContext, domain: Domain, operations: Map<Ref<Doc>, DocumentUpdate<Doc>>): Promise<void> {}
async update<T extends Doc>(
ctx: MeasureContext,
domain: Domain,
operations: Map<Ref<Doc>, Partial<Data<T>>>
): Promise<void> {}
async groupBy<T, P extends Doc>(
ctx: MeasureContext,

View File

@ -571,7 +571,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
const byClass = groupByArray<WithLookup<DocIndexState>, Ref<Class<Doc>>>(result, (it) => it.objectClass)
const docUpdates = new Map<Ref<Doc>, DocumentUpdate<DocIndexState>>()
const docUpdates = new Map<Ref<Doc>, Partial<DocIndexState>>()
const pushQueue = new RateLimiter(5)

View File

@ -1097,7 +1097,7 @@ abstract class MongoAdapterBase implements DbAdapter {
})
}
update (ctx: MeasureContext, domain: Domain, operations: Map<Ref<Doc>, DocumentUpdate<Doc>>): Promise<void> {
update (ctx: MeasureContext, domain: Domain, operations: Map<Ref<Doc>, Partial<Doc>>): Promise<void> {
return ctx.with('update', { domain }, async () => {
const coll = this.collection(domain)

View File

@ -481,7 +481,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
params.push(converted.data)
}
await client.unsafe(
`UPDATE ${translateDomain(domain)} SET ${updates.join(', ')} WHERE _id = $1 AND "workspaceId" = $2`,
`UPDATE ${translateDomain(domain)} SET ${updates.join(', ')} WHERE "workspaceId" = $2 AND _id = $1`,
params
)
}
@ -1347,7 +1347,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
return await this.mgr.read('', async (client) => {
const res =
await client`SELECT * FROM ${client(translateDomain(domain))} WHERE _id = ANY(${docs}) AND "workspaceId" = ${this.workspaceId.name}`
await client`SELECT * FROM ${client(translateDomain(domain))} WHERE "workspaceId" = ${this.workspaceId.name} AND _id = ANY(${docs})`
return res.map((p) => parseDocWithProjection(p as any, domain))
})
})
@ -1421,7 +1421,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
return this.mgr.write(
ctx.id,
(client) =>
client`DELETE FROM ${client(tdomain)} WHERE _id = ANY(${part}) AND "workspaceId" = ${this.workspaceId.name}`
client`DELETE FROM ${client(tdomain)} WHERE "workspaceId" = ${this.workspaceId.name} AND _id = ANY(${part})`
)
})
}
@ -1449,46 +1449,16 @@ abstract class PostgresAdapterBase implements DbAdapter {
})
}
update (ctx: MeasureContext, domain: Domain, operations: Map<Ref<Doc>, DocumentUpdate<Doc>>): Promise<void> {
const ids = Array.from(operations.keys())
return this.mgr.write(ctx.id, async (client) => {
try {
const res: DBDoc[] =
await client`SELECT * FROM ${client(translateDomain(domain))} WHERE _id = ANY(${ids}) AND "workspaceId" = ${this.workspaceId.name} FOR UPDATE`
const schema = getSchema(domain)
const docs = res.map((p) => parseDoc(p, schema))
const map = new Map(docs.map((d) => [d._id, d]))
const schemaFields = getSchemaAndFields(domain)
for (const [_id, ops] of operations) {
const doc = map.get(_id)
if (doc === undefined) continue
const op = { ...ops }
if ((op as any)['%hash%'] == null) {
;(op as any)['%hash%'] = this.curHash()
}
TxProcessor.applyUpdate(doc, op)
const converted = convertDoc(domain, doc, this.workspaceId.name, schemaFields)
const columns: string[] = []
const { extractedFields, remainingData } = parseUpdate(op, schemaFields)
for (const key in extractedFields) {
columns.push(key)
}
if (Object.keys(remainingData).length > 0) {
columns.push('data')
}
columns.push('modifiedBy')
columns.push('modifiedOn')
await client`UPDATE ${client(translateDomain(domain))} SET ${client(
converted,
columns
)} WHERE _id = ${doc._id} AND "workspaceId" = ${this.workspaceId.name}`
}
} catch (err) {
ctx.error('Error while updating', { domain, operations, err })
throw err
async update (ctx: MeasureContext, domain: Domain, operations: Map<Ref<Doc>, Partial<Doc>>): Promise<void> {
const ids = [...operations.entries()]
const groups = groupByArray(ids, (it) => JSON.stringify(it[1]))
for (const [, values] of groups.entries()) {
const ids = values.map((it) => it[0])
while (ids.length > 0) {
const part = ids.splice(0, 200)
await this.rawUpdate(domain, { _id: { $in: part } }, values[0][1])
}
})
}
}
@withContext('insert')
@ -1581,7 +1551,7 @@ class PostgresAdapter extends PostgresAdapterBase {
columns.add('modifiedOn')
columns.add('data')
columns.add('%hash%')
await client`UPDATE ${client(translateDomain(domain))} SET ${client(converted, Array.from(columns))} WHERE _id = ${tx.objectId} AND "workspaceId" = ${this.workspaceId.name}`
await client`UPDATE ${client(translateDomain(domain))} SET ${client(converted, Array.from(columns))} WHERE "workspaceId" = ${this.workspaceId.name} AND _id = ${tx.objectId}`
})
})
return {}
@ -1683,7 +1653,7 @@ class PostgresAdapter extends PostgresAdapterBase {
if (!columns.includes('%hash%')) {
columns.push('%hash%')
}
await client`UPDATE ${client(translateDomain(domain))} SET ${client(converted, columns)} WHERE _id = ${tx.objectId} AND "workspaceId" = ${this.workspaceId.name}`
await client`UPDATE ${client(translateDomain(domain))} SET ${client(converted, columns)} WHERE "workspaceId" = ${this.workspaceId.name} AND _id = ${tx.objectId}`
})
if (tx.retrieve === true && doc !== undefined) {
return { object: doc }
@ -1802,7 +1772,7 @@ class PostgresAdapter extends PostgresAdapterBase {
const domain = this.hierarchy.getDomain(_class)
return ctx.with('find-doc', { _class }, async () => {
const res =
await client`SELECT * FROM ${client(translateDomain(domain))} WHERE _id = ${_id} AND "workspaceId" = ${this.workspaceId.name} ${
await client`SELECT * FROM ${client(translateDomain(domain))} WHERE "workspaceId" = ${this.workspaceId.name} AND _id = ${_id} ${
forUpdate ? client` FOR UPDATE` : client``
}`
const dbDoc = res[0] as any