mirror of
https://github.com/hcengineering/platform.git
synced 2025-04-22 00:10:37 +00:00
UBERF-9172: Fix $lookup order by (#7714)
Some checks are pending
CI / build (push) Waiting to run
CI / svelte-check (push) Blocked by required conditions
CI / formatting (push) Blocked by required conditions
CI / test (push) Blocked by required conditions
CI / uitest (push) Waiting to run
CI / uitest-pg (push) Waiting to run
CI / uitest-qms (push) Waiting to run
CI / docker-build (push) Blocked by required conditions
CI / dist-build (push) Blocked by required conditions
Some checks are pending
CI / build (push) Waiting to run
CI / svelte-check (push) Blocked by required conditions
CI / formatting (push) Blocked by required conditions
CI / test (push) Blocked by required conditions
CI / uitest (push) Waiting to run
CI / uitest-pg (push) Waiting to run
CI / uitest-qms (push) Waiting to run
CI / docker-build (push) Blocked by required conditions
CI / dist-build (push) Blocked by required conditions
This commit is contained in:
parent
599b7b3375
commit
1840dc5edc
@ -41,7 +41,7 @@ import {
|
|||||||
createPostgreeDestroyAdapter,
|
createPostgreeDestroyAdapter,
|
||||||
createPostgresAdapter,
|
createPostgresAdapter,
|
||||||
createPostgresTxAdapter,
|
createPostgresTxAdapter,
|
||||||
setDbUnsafePrepareOptions
|
setDBExtraOptions
|
||||||
} from '@hcengineering/postgres'
|
} from '@hcengineering/postgres'
|
||||||
import { readFileSync } from 'node:fs'
|
import { readFileSync } from 'node:fs'
|
||||||
const model = JSON.parse(readFileSync(process.env.MODEL_JSON ?? 'model.json').toString()) as Tx[]
|
const model = JSON.parse(readFileSync(process.env.MODEL_JSON ?? 'model.json').toString()) as Tx[]
|
||||||
@ -83,11 +83,8 @@ export function start (
|
|||||||
|
|
||||||
const usePrepare = process.env.DB_PREPARE === 'true'
|
const usePrepare = process.env.DB_PREPARE === 'true'
|
||||||
|
|
||||||
setDbUnsafePrepareOptions({
|
setDBExtraOptions({
|
||||||
find: usePrepare,
|
prepare: usePrepare // We override defaults
|
||||||
model: false,
|
|
||||||
update: usePrepare,
|
|
||||||
upload: usePrepare
|
|
||||||
})
|
})
|
||||||
|
|
||||||
registerServerPlugins()
|
registerServerPlugins()
|
||||||
|
@ -109,7 +109,7 @@ export class DbAdapterManagerImpl implements DBAdapterManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async initAdapters (): Promise<void> {
|
async initAdapters (ctx: MeasureContext): Promise<void> {
|
||||||
for (const [key, adapter] of this.adapters) {
|
for (const [key, adapter] of this.adapters) {
|
||||||
// already initialized
|
// already initialized
|
||||||
if (key !== this.conf.domains[DOMAIN_TX] && adapter.init !== undefined) {
|
if (key !== this.conf.domains[DOMAIN_TX] && adapter.init !== undefined) {
|
||||||
@ -130,7 +130,7 @@ export class DbAdapterManagerImpl implements DBAdapterManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
await adapter?.init?.(this.metrics, domains, excludeDomains)
|
await ctx.with(`init adapter ${key}`, {}, (ctx) => adapter?.init?.(ctx, domains, excludeDomains))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,7 +19,7 @@ import { getDBClient, retryTxn } from './utils'
|
|||||||
|
|
||||||
export { getDocFieldsByDomains, translateDomain } from './schemas'
|
export { getDocFieldsByDomains, translateDomain } from './schemas'
|
||||||
export * from './storage'
|
export * from './storage'
|
||||||
export { convertDoc, createTables, getDBClient, retryTxn, setDBExtraOptions, setDbUnsafePrepareOptions } from './utils'
|
export { convertDoc, createTables, getDBClient, retryTxn, setDBExtraOptions, setExtraOptions } from './utils'
|
||||||
|
|
||||||
export function createPostgreeDestroyAdapter (url: string): WorkspaceDestroyAdapter {
|
export function createPostgreeDestroyAdapter (url: string): WorkspaceDestroyAdapter {
|
||||||
return {
|
return {
|
||||||
|
@ -80,8 +80,8 @@ import {
|
|||||||
createTables,
|
createTables,
|
||||||
DBCollectionHelper,
|
DBCollectionHelper,
|
||||||
type DBDoc,
|
type DBDoc,
|
||||||
dbUnsafePrepareOptions,
|
|
||||||
getDBClient,
|
getDBClient,
|
||||||
|
getPrepare,
|
||||||
inferType,
|
inferType,
|
||||||
isDataField,
|
isDataField,
|
||||||
isOwner,
|
isOwner,
|
||||||
@ -325,14 +325,15 @@ class ValuesVariables {
|
|||||||
|
|
||||||
add (value: any, type: string = ''): string {
|
add (value: any, type: string = ''): string {
|
||||||
// Compact value if string and same
|
// Compact value if string and same
|
||||||
if (typeof value === 'string') {
|
if (typeof value === 'string' || typeof value === 'number' || typeof value === 'boolean') {
|
||||||
const v = this.valueHashes.get(value + ':' + type)
|
const vkey = `${value}:${type}`
|
||||||
|
const v = this.valueHashes.get(vkey)
|
||||||
if (v !== undefined) {
|
if (v !== undefined) {
|
||||||
return v
|
return v
|
||||||
}
|
}
|
||||||
this.values.push(value)
|
this.values.push(value)
|
||||||
const idx = type !== '' ? `$${this.index++}${type}` : `$${this.index++}`
|
const idx = type !== '' ? `$${this.index++}${type}` : `$${this.index++}`
|
||||||
this.valueHashes.set(value + ':' + type, idx)
|
this.valueHashes.set(vkey, idx)
|
||||||
return idx
|
return idx
|
||||||
} else {
|
} else {
|
||||||
this.values.push(value)
|
this.values.push(value)
|
||||||
@ -359,6 +360,33 @@ class ValuesVariables {
|
|||||||
})
|
})
|
||||||
return vv
|
return vv
|
||||||
}
|
}
|
||||||
|
|
||||||
|
injectVars (sql: string): string {
|
||||||
|
const escQuote = (d: any | any[]): string => {
|
||||||
|
if (d == null) {
|
||||||
|
return 'NULL'
|
||||||
|
}
|
||||||
|
if (Array.isArray(d)) {
|
||||||
|
return 'ARRAY[' + d.map(escQuote).join(',') + ']'
|
||||||
|
}
|
||||||
|
switch (typeof d) {
|
||||||
|
case 'number':
|
||||||
|
if (isNaN(d) || !isFinite(d)) {
|
||||||
|
throw new Error('Invalid number value')
|
||||||
|
}
|
||||||
|
return d.toString()
|
||||||
|
case 'boolean':
|
||||||
|
return d ? 'TRUE' : 'FALSE'
|
||||||
|
case 'string':
|
||||||
|
return `'${d.replace(/'/g, "''")}'`
|
||||||
|
default:
|
||||||
|
throw new Error(`Unsupported value type: ${typeof d}`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return sql.replaceAll(/(\$\d+)/g, (_, v) => {
|
||||||
|
return escQuote(this.getValues()[parseInt(v.substring(1)) - 1] ?? v)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract class PostgresAdapterBase implements DbAdapter {
|
abstract class PostgresAdapterBase implements DbAdapter {
|
||||||
@ -457,9 +485,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
|
|||||||
}
|
}
|
||||||
const finalSql: string = [select, ...sqlChunks].join(' ')
|
const finalSql: string = [select, ...sqlChunks].join(' ')
|
||||||
const result: DBDoc[] = await this.mgr.retry(undefined, (client) =>
|
const result: DBDoc[] = await this.mgr.retry(undefined, (client) =>
|
||||||
client.unsafe(finalSql, vars.getValues(), {
|
client.unsafe(finalSql, vars.getValues(), getPrepare())
|
||||||
prepare: dbUnsafePrepareOptions.find
|
|
||||||
})
|
|
||||||
)
|
)
|
||||||
return result.map((p) => parseDocWithProjection(p, domain, options?.projection))
|
return result.map((p) => parseDocWithProjection(p, domain, options?.projection))
|
||||||
}
|
}
|
||||||
@ -519,9 +545,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
|
|||||||
const res = await client.unsafe(
|
const res = await client.unsafe(
|
||||||
`SELECT * FROM ${translateDomain(domain)} WHERE ${translatedQuery} FOR UPDATE`,
|
`SELECT * FROM ${translateDomain(domain)} WHERE ${translatedQuery} FOR UPDATE`,
|
||||||
vars.getValues(),
|
vars.getValues(),
|
||||||
{
|
getPrepare()
|
||||||
prepare: dbUnsafePrepareOptions.find
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
const docs = res.map((p) => parseDoc(p as any, schemaFields.schema))
|
const docs = res.map((p) => parseDoc(p as any, schemaFields.schema))
|
||||||
for (const doc of docs) {
|
for (const doc of docs) {
|
||||||
@ -553,9 +577,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
|
|||||||
WHERE "workspaceId" = ${params.add(this.workspaceId.name, '::uuid')}
|
WHERE "workspaceId" = ${params.add(this.workspaceId.name, '::uuid')}
|
||||||
AND _id = ${params.add(doc._id, '::text')}`,
|
AND _id = ${params.add(doc._id, '::text')}`,
|
||||||
params.getValues(),
|
params.getValues(),
|
||||||
{
|
getPrepare()
|
||||||
prepare: dbUnsafePrepareOptions.update
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -593,9 +615,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
|
|||||||
await client.unsafe(
|
await client.unsafe(
|
||||||
`UPDATE ${translateDomain(domain)} SET ${updates.join(', ')} WHERE ${translatedQuery};`,
|
`UPDATE ${translateDomain(domain)} SET ${updates.join(', ')} WHERE ${translatedQuery};`,
|
||||||
vars.getValues(),
|
vars.getValues(),
|
||||||
{
|
getPrepare()
|
||||||
prepare: dbUnsafePrepareOptions.find
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -604,9 +624,11 @@ abstract class PostgresAdapterBase implements DbAdapter {
|
|||||||
const vars = new ValuesVariables()
|
const vars = new ValuesVariables()
|
||||||
const translatedQuery = this.buildRawQuery(vars, domain, query)
|
const translatedQuery = this.buildRawQuery(vars, domain, query)
|
||||||
await this.mgr.retry(undefined, async (client) => {
|
await this.mgr.retry(undefined, async (client) => {
|
||||||
await client.unsafe(`DELETE FROM ${translateDomain(domain)} WHERE ${translatedQuery}`, vars.getValues(), {
|
await client.unsafe(
|
||||||
prepare: dbUnsafePrepareOptions.update
|
`DELETE FROM ${translateDomain(domain)} WHERE ${translatedQuery}`,
|
||||||
})
|
vars.getValues(),
|
||||||
|
getPrepare()
|
||||||
|
)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -670,18 +692,15 @@ abstract class PostgresAdapterBase implements DbAdapter {
|
|||||||
if (options?.total === true) {
|
if (options?.total === true) {
|
||||||
const totalReq = `SELECT COUNT(${domain}._id) as count FROM ${domain}`
|
const totalReq = `SELECT COUNT(${domain}._id) as count FROM ${domain}`
|
||||||
const totalSql = [totalReq, ...totalSqlChunks].join(' ')
|
const totalSql = [totalReq, ...totalSqlChunks].join(' ')
|
||||||
const totalResult = await connection.unsafe(totalSql, vars.getValues(), {
|
const totalResult = await connection.unsafe(totalSql, vars.getValues(), getPrepare())
|
||||||
prepare: dbUnsafePrepareOptions.find
|
|
||||||
})
|
|
||||||
const parsed = Number.parseInt(totalResult[0].count)
|
const parsed = Number.parseInt(totalResult[0].count)
|
||||||
total = Number.isNaN(parsed) ? 0 : parsed
|
total = Number.isNaN(parsed) ? 0 : parsed
|
||||||
}
|
}
|
||||||
|
|
||||||
const finalSql: string = [select, ...sqlChunks].join(' ')
|
const finalSql: string = [select, ...sqlChunks].join(' ')
|
||||||
fquery = finalSql
|
fquery = finalSql
|
||||||
const result = await connection.unsafe(finalSql, vars.getValues(), {
|
|
||||||
prepare: dbUnsafePrepareOptions.find
|
const result = await connection.unsafe(finalSql, vars.getValues(), getPrepare())
|
||||||
})
|
|
||||||
if (
|
if (
|
||||||
options?.lookup === undefined &&
|
options?.lookup === undefined &&
|
||||||
options?.domainLookup === undefined &&
|
options?.domainLookup === undefined &&
|
||||||
@ -697,7 +716,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
|
|||||||
}
|
}
|
||||||
})) as FindResult<T>
|
})) as FindResult<T>
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
ctx.error('Error in findAll', { err })
|
ctx.error('Error in findAll', { err, sql: vars.injectVars(fquery) })
|
||||||
throw err
|
throw err
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -1170,8 +1189,13 @@ abstract class PostgresAdapterBase implements DbAdapter {
|
|||||||
if (join.isReverse) {
|
if (join.isReverse) {
|
||||||
return `${join.toAlias}->'${tKey}'`
|
return `${join.toAlias}->'${tKey}'`
|
||||||
}
|
}
|
||||||
const res = isDataField(domain, tKey) ? (isDataArray ? `data->'${tKey}'` : `data#>>'{${tKey}}'`) : key
|
if (isDataField(domain, tKey)) {
|
||||||
return `${join.toAlias}.${res}`
|
if (isDataArray) {
|
||||||
|
return `${join.toAlias}."data"->'${tKey}'`
|
||||||
|
}
|
||||||
|
return `${join.toAlias}."data"#>>'{${tKey}}'`
|
||||||
|
}
|
||||||
|
return `${join.toAlias}."${tKey}"`
|
||||||
}
|
}
|
||||||
|
|
||||||
private transformKey<T extends Doc>(
|
private transformKey<T extends Doc>(
|
||||||
@ -1505,9 +1529,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
|
|||||||
`SELECT * FROM ${translateDomain(domain)}
|
`SELECT * FROM ${translateDomain(domain)}
|
||||||
WHERE "workspaceId" = $1::uuid AND _id = ANY($2::text[])`,
|
WHERE "workspaceId" = $1::uuid AND _id = ANY($2::text[])`,
|
||||||
[this.workspaceId.name, docs],
|
[this.workspaceId.name, docs],
|
||||||
{
|
getPrepare()
|
||||||
prepare: dbUnsafePrepareOptions.find
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
return res.map((p) => parseDocWithProjection(p as any, domain))
|
return res.map((p) => parseDocWithProjection(p as any, domain))
|
||||||
})
|
})
|
||||||
@ -1562,9 +1584,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
|
|||||||
`INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals}
|
`INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals}
|
||||||
ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr};`,
|
ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr};`,
|
||||||
values.getValues(),
|
values.getValues(),
|
||||||
{
|
getPrepare()
|
||||||
prepare: dbUnsafePrepareOptions.upload
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
@ -1574,9 +1594,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
|
|||||||
await client.unsafe(
|
await client.unsafe(
|
||||||
`INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals};`,
|
`INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals};`,
|
||||||
values.getValues(),
|
values.getValues(),
|
||||||
{
|
getPrepare()
|
||||||
prepare: dbUnsafePrepareOptions.upload
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@ -1598,9 +1616,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
|
|||||||
client.unsafe(
|
client.unsafe(
|
||||||
`DELETE FROM ${tdomain} WHERE "workspaceId" = $1 AND _id = ANY($2::text[])`,
|
`DELETE FROM ${tdomain} WHERE "workspaceId" = $1 AND _id = ANY($2::text[])`,
|
||||||
[this.workspaceId.name, part],
|
[this.workspaceId.name, part],
|
||||||
{
|
getPrepare()
|
||||||
prepare: dbUnsafePrepareOptions.upload
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
@ -1619,9 +1635,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
|
|||||||
const vars = new ValuesVariables()
|
const vars = new ValuesVariables()
|
||||||
const finalSql = `SELECT DISTINCT ${key} as ${field}, Count(*) AS count FROM ${translateDomain(domain)} WHERE ${this.buildRawQuery(vars, domain, query ?? {})} GROUP BY ${key}`
|
const finalSql = `SELECT DISTINCT ${key} as ${field}, Count(*) AS count FROM ${translateDomain(domain)} WHERE ${this.buildRawQuery(vars, domain, query ?? {})} GROUP BY ${key}`
|
||||||
return await this.mgr.retry(ctx.id, async (connection) => {
|
return await this.mgr.retry(ctx.id, async (connection) => {
|
||||||
const result = await connection.unsafe(finalSql, vars.getValues(), {
|
const result = await connection.unsafe(finalSql, vars.getValues(), getPrepare())
|
||||||
prepare: dbUnsafePrepareOptions.find
|
|
||||||
})
|
|
||||||
return new Map(result.map((r) => [r[field.toLocaleLowerCase()], parseInt(r.count)]))
|
return new Map(result.map((r) => [r[field.toLocaleLowerCase()], parseInt(r.count)]))
|
||||||
})
|
})
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
@ -1722,9 +1736,7 @@ class PostgresAdapter extends PostgresAdapterBase {
|
|||||||
SET ${updates.join(', ')}
|
SET ${updates.join(', ')}
|
||||||
WHERE "workspaceId" = ${wsId} AND _id = ${oId}`,
|
WHERE "workspaceId" = ${wsId} AND _id = ${oId}`,
|
||||||
params.getValues(),
|
params.getValues(),
|
||||||
{
|
getPrepare()
|
||||||
prepare: dbUnsafePrepareOptions.update
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@ -1837,9 +1849,7 @@ class PostgresAdapter extends PostgresAdapterBase {
|
|||||||
WHERE "workspaceId" = ${wsId}
|
WHERE "workspaceId" = ${wsId}
|
||||||
AND _id = ${oId}`,
|
AND _id = ${oId}`,
|
||||||
params.getValues(),
|
params.getValues(),
|
||||||
{
|
getPrepare()
|
||||||
prepare: dbUnsafePrepareOptions.update
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
if (tx.retrieve === true && doc !== undefined) {
|
if (tx.retrieve === true && doc !== undefined) {
|
||||||
@ -1928,11 +1938,7 @@ class PostgresAdapter extends PostgresAdapterBase {
|
|||||||
WHERE "workspaceId" = $1::uuid AND "_id" = update_data.__id`
|
WHERE "workspaceId" = $1::uuid AND "_id" = update_data.__id`
|
||||||
|
|
||||||
await this.mgr.retry(ctx.id, (client) =>
|
await this.mgr.retry(ctx.id, (client) =>
|
||||||
ctx.with('bulk-update', {}, () =>
|
ctx.with('bulk-update', {}, () => client.unsafe(op, data, getPrepare()))
|
||||||
client.unsafe(op, data, {
|
|
||||||
prepare: dbUnsafePrepareOptions.update
|
|
||||||
})
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1966,9 +1972,7 @@ class PostgresAdapter extends PostgresAdapterBase {
|
|||||||
forUpdate ? ' FOR UPDATE' : ''
|
forUpdate ? ' FOR UPDATE' : ''
|
||||||
}`,
|
}`,
|
||||||
[this.workspaceId.name, _id],
|
[this.workspaceId.name, _id],
|
||||||
{
|
getPrepare()
|
||||||
prepare: dbUnsafePrepareOptions.find
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
const dbDoc = res[0] as any
|
const dbDoc = res[0] as any
|
||||||
return dbDoc !== undefined ? parseDoc(dbDoc, getSchema(domain)) : undefined
|
return dbDoc !== undefined ? parseDoc(dbDoc, getSchema(domain)) : undefined
|
||||||
@ -2015,9 +2019,7 @@ class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter {
|
|||||||
async getModel (ctx: MeasureContext): Promise<Tx[]> {
|
async getModel (ctx: MeasureContext): Promise<Tx[]> {
|
||||||
const res: DBDoc[] = await this.mgr.retry(undefined, (client) => {
|
const res: DBDoc[] = await this.mgr.retry(undefined, (client) => {
|
||||||
return client.unsafe(
|
return client.unsafe(
|
||||||
`SELECT * FROM "${translateDomain(DOMAIN_MODEL_TX)}" WHERE "workspaceId" = $1::uuid ORDER BY _id::text ASC, "modifiedOn"::bigint ASC`,
|
`SELECT * FROM "${translateDomain(DOMAIN_MODEL_TX)}" WHERE "workspaceId" = '${this.workspaceId.name}'::uuid ORDER BY _id::text ASC, "modifiedOn"::bigint ASC`
|
||||||
[this.workspaceId.name],
|
|
||||||
{ prepare: dbUnsafePrepareOptions.model }
|
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -55,6 +55,8 @@ process.on('exit', () => {
|
|||||||
const clientRefs = new Map<string, ClientRef>()
|
const clientRefs = new Map<string, ClientRef>()
|
||||||
const loadedDomains = new Set<string>()
|
const loadedDomains = new Set<string>()
|
||||||
|
|
||||||
|
let loadedTables = new Set<string>()
|
||||||
|
|
||||||
export async function retryTxn (
|
export async function retryTxn (
|
||||||
pool: postgres.Sql,
|
pool: postgres.Sql,
|
||||||
operation: (client: postgres.TransactionSql) => Promise<any>
|
operation: (client: postgres.TransactionSql) => Promise<any>
|
||||||
@ -83,26 +85,30 @@ export async function createTables (
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
const mapped = filtered.map((p) => translateDomain(p))
|
const mapped = filtered.map((p) => translateDomain(p))
|
||||||
const tables = await ctx.with('load-table', {}, () =>
|
const t = Date.now()
|
||||||
client.unsafe(
|
loadedTables =
|
||||||
`
|
loadedTables.size === 0
|
||||||
|
? new Set(
|
||||||
|
(
|
||||||
|
await ctx.with('load-table', {}, () =>
|
||||||
|
client.unsafe(`
|
||||||
SELECT table_name
|
SELECT table_name
|
||||||
FROM information_schema.tables
|
FROM information_schema.tables
|
||||||
WHERE table_name = ANY( $1::text[] )
|
WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
|
||||||
`,
|
AND table_name NOT LIKE 'pg_%'`)
|
||||||
[mapped]
|
)
|
||||||
)
|
).map((it) => it.table_name)
|
||||||
)
|
)
|
||||||
|
: loadedTables
|
||||||
|
console.log('load-table', Date.now() - t)
|
||||||
|
|
||||||
const exists = new Set(tables.map((it) => it.table_name))
|
const domainsToLoad = mapped.filter((it) => loadedTables.has(it))
|
||||||
|
|
||||||
const domainsToLoad = mapped.filter((it) => exists.has(it))
|
|
||||||
if (domainsToLoad.length > 0) {
|
if (domainsToLoad.length > 0) {
|
||||||
await ctx.with('load-schemas', {}, () => getTableSchema(client, domainsToLoad))
|
await ctx.with('load-schemas', {}, () => getTableSchema(client, domainsToLoad))
|
||||||
}
|
}
|
||||||
const domainsToCreate: string[] = []
|
const domainsToCreate: string[] = []
|
||||||
for (const domain of mapped) {
|
for (const domain of mapped) {
|
||||||
if (!exists.has(domain)) {
|
if (!loadedTables.has(domain)) {
|
||||||
domainsToCreate.push(domain)
|
domainsToCreate.push(domain)
|
||||||
} else {
|
} else {
|
||||||
loadedDomains.add(url + domain)
|
loadedDomains.add(url + domain)
|
||||||
@ -120,13 +126,10 @@ export async function createTables (
|
|||||||
}
|
}
|
||||||
|
|
||||||
async function getTableSchema (client: postgres.Sql, domains: string[]): Promise<void> {
|
async function getTableSchema (client: postgres.Sql, domains: string[]): Promise<void> {
|
||||||
const res = await client.unsafe(
|
const res = await client.unsafe(`SELECT column_name::name, data_type::text, is_nullable::text, table_name::name
|
||||||
`SELECT column_name::name, data_type::text, is_nullable::text, table_name::name
|
|
||||||
FROM information_schema.columns
|
FROM information_schema.columns
|
||||||
WHERE table_name = ANY($1::text[]) and table_schema = 'public'::name
|
WHERE table_name IN (${domains.map((it) => `'${it}'`).join(', ')}) and table_schema = 'public'::name
|
||||||
ORDER BY table_name::name, ordinal_position::int ASC;`,
|
ORDER BY table_name::name, ordinal_position::int ASC;`)
|
||||||
[domains]
|
|
||||||
)
|
|
||||||
|
|
||||||
const schemas: Record<string, Schema> = {}
|
const schemas: Record<string, Schema> = {}
|
||||||
for (const column of res) {
|
for (const column of res) {
|
||||||
@ -277,27 +280,25 @@ export class ClientRef implements PostgresClientReference {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let dbExtraOptions: Partial<Options<any>> = {}
|
export let dbExtraOptions: Partial<Options<any>> = {}
|
||||||
export function setDBExtraOptions (options: Partial<Options<any>>): void {
|
export function setDBExtraOptions (options: Partial<Options<any>>): void {
|
||||||
dbExtraOptions = options
|
dbExtraOptions = options
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface DbUnsafePrepareOptions {
|
export function getPrepare (): { prepare: boolean } {
|
||||||
upload: boolean
|
return { prepare: dbExtraOptions.prepare ?? false }
|
||||||
find: boolean
|
|
||||||
update: boolean
|
|
||||||
model: boolean
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export let dbUnsafePrepareOptions: DbUnsafePrepareOptions = {
|
export interface DBExtraOptions {
|
||||||
upload: true,
|
useCF: boolean
|
||||||
find: true,
|
|
||||||
update: true,
|
|
||||||
model: true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export function setDbUnsafePrepareOptions (options: DbUnsafePrepareOptions): void {
|
export let dbExtra: DBExtraOptions = {
|
||||||
dbUnsafePrepareOptions = options
|
useCF: false
|
||||||
|
}
|
||||||
|
|
||||||
|
export function setExtraOptions (options: DBExtraOptions): void {
|
||||||
|
dbExtra = options
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -5,7 +5,7 @@
|
|||||||
"template": "cloud",
|
"template": "cloud",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"deploy": "wrangler deploy",
|
"deploy": "wrangler deploy",
|
||||||
"dev": "wrangler dev --port 3335",
|
"dev": "wrangler dev --port 3335 --remote",
|
||||||
"dev-local": "wrangler dev --port 3335 --local --upstream-protocol=http",
|
"dev-local": "wrangler dev --port 3335 --local --upstream-protocol=http",
|
||||||
"start": "wrangler dev --port 3335",
|
"start": "wrangler dev --port 3335",
|
||||||
"logs": "npx wrangler tail --format pretty",
|
"logs": "npx wrangler tail --format pretty",
|
||||||
|
@ -38,8 +38,9 @@ import {
|
|||||||
createPostgreeDestroyAdapter,
|
createPostgreeDestroyAdapter,
|
||||||
createPostgresAdapter,
|
createPostgresAdapter,
|
||||||
createPostgresTxAdapter,
|
createPostgresTxAdapter,
|
||||||
|
getDBClient,
|
||||||
setDBExtraOptions,
|
setDBExtraOptions,
|
||||||
setDbUnsafePrepareOptions
|
setExtraOptions
|
||||||
} from '@hcengineering/postgres'
|
} from '@hcengineering/postgres'
|
||||||
import {
|
import {
|
||||||
createServerPipeline,
|
createServerPipeline,
|
||||||
@ -75,13 +76,11 @@ export class Transactor extends DurableObject<Env> {
|
|||||||
ssl: false,
|
ssl: false,
|
||||||
connection: {
|
connection: {
|
||||||
application_name: 'cloud-transactor'
|
application_name: 'cloud-transactor'
|
||||||
}
|
},
|
||||||
|
prepare: false
|
||||||
})
|
})
|
||||||
setDbUnsafePrepareOptions({
|
setExtraOptions({
|
||||||
upload: false,
|
useCF: true
|
||||||
find: false,
|
|
||||||
update: false,
|
|
||||||
model: false
|
|
||||||
})
|
})
|
||||||
registerTxAdapterFactory('postgresql', createPostgresTxAdapter, true)
|
registerTxAdapterFactory('postgresql', createPostgresTxAdapter, true)
|
||||||
registerAdapterFactory('postgresql', createPostgresAdapter, true)
|
registerAdapterFactory('postgresql', createPostgresAdapter, true)
|
||||||
@ -105,23 +104,28 @@ export class Transactor extends DurableObject<Env> {
|
|||||||
console.log({ message: 'use stats', url: this.env.STATS_URL })
|
console.log({ message: 'use stats', url: this.env.STATS_URL })
|
||||||
console.log({ message: 'use fulltext', url: this.env.FULLTEXT_URL })
|
console.log({ message: 'use fulltext', url: this.env.FULLTEXT_URL })
|
||||||
|
|
||||||
|
const dbUrl = env.DB_MODE === 'direct' ? env.DB_URL ?? '' : env.HYPERDRIVE.connectionString
|
||||||
|
|
||||||
// TODO:
|
// TODO:
|
||||||
const storage = createDummyStorageAdapter()
|
const storage = createDummyStorageAdapter()
|
||||||
|
|
||||||
this.pipelineFactory = async (ctx, ws, upgrade, broadcast, branding) => {
|
this.pipelineFactory = async (ctx, ws, upgrade, broadcast, branding) => {
|
||||||
const pipeline = createServerPipeline(
|
const pipeline = createServerPipeline(this.measureCtx, dbUrl, model, {
|
||||||
this.measureCtx,
|
externalStorage: storage,
|
||||||
env.DB_MODE === 'direct' ? env.DB_URL ?? '' : env.HYPERDRIVE.connectionString,
|
adapterSecurity: false,
|
||||||
model,
|
disableTriggers: false,
|
||||||
{
|
fulltextUrl: env.FULLTEXT_URL,
|
||||||
externalStorage: storage,
|
extraLogging: true
|
||||||
adapterSecurity: false,
|
})
|
||||||
disableTriggers: false,
|
const result = await pipeline(ctx, ws, upgrade, broadcast, branding)
|
||||||
fulltextUrl: env.FULLTEXT_URL,
|
|
||||||
extraLogging: true
|
const client = getDBClient(dbUrl)
|
||||||
}
|
const connection = await client.getClient()
|
||||||
)
|
const t1 = Date.now()
|
||||||
return await pipeline(ctx, ws, upgrade, broadcast, branding)
|
await connection`select now()`
|
||||||
|
console.log('DB query time', Date.now() - t1)
|
||||||
|
client.close()
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
void this.ctx
|
void this.ctx
|
||||||
|
@ -13,8 +13,8 @@ head_sampling_rate = 1 # optional. default = 1.
|
|||||||
# If you are running back-end logic in a Worker, running it closer to your back-end infrastructure
|
# If you are running back-end logic in a Worker, running it closer to your back-end infrastructure
|
||||||
# rather than the end user may result in better performance.
|
# rather than the end user may result in better performance.
|
||||||
# Docs: https://developers.cloudflare.com/workers/configuration/smart-placement/#smart-placement
|
# Docs: https://developers.cloudflare.com/workers/configuration/smart-placement/#smart-placement
|
||||||
# [placement]
|
[placement]
|
||||||
# mode = "smart"
|
mode = "smart"
|
||||||
|
|
||||||
# Variable bindings. These are arbitrary, plaintext strings (similar to environment variables)
|
# Variable bindings. These are arbitrary, plaintext strings (similar to environment variables)
|
||||||
# Docs:
|
# Docs:
|
||||||
|
Loading…
Reference in New Issue
Block a user