diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index 35d45d4d99..b24580d62b 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -190,7 +190,7 @@ dependencies: version: file:projects/core.tgz(@types/node@20.11.19)(esbuild@0.20.1)(ts-node@10.9.2) '@rush-temp/datalake': specifier: file:./projects/datalake.tgz - version: file:projects/datalake.tgz(esbuild@0.20.1)(ts-node@10.9.2) + version: file:projects/datalake.tgz(esbuild@0.20.1) '@rush-temp/desktop': specifier: file:./projects/desktop.tgz version: file:projects/desktop.tgz(bufferutil@4.0.8)(sass@1.71.1)(utf-8-validate@6.0.4) @@ -27080,13 +27080,12 @@ packages: - ts-node dev: false - file:projects/datalake.tgz(esbuild@0.20.1)(ts-node@10.9.2): - resolution: {integrity: sha512-pqgfJAfjDTa3AWRK263xljvkd1GLinDFrjTGW7res8krRskMMJ3K6gj3kfnLjyKmWeAesJQ5CSnFybPnPSJq/Q==, tarball: file:projects/datalake.tgz} + file:projects/datalake.tgz(esbuild@0.20.1): + resolution: {integrity: sha512-UX1RJWMtrQY5HWrFKnwi2vrRYfR8ZSRo2PtLn04ozWueiiLS3Q61UauAUfPDRtO0K5cJgecH7+gX750dx8oUhQ==, tarball: file:projects/datalake.tgz} id: file:projects/datalake.tgz name: '@rush-temp/datalake' version: 0.0.0 dependencies: - '@aws-sdk/client-s3': 3.577.0 '@types/jest': 29.5.12 '@types/node': 20.11.19 '@types/node-fetch': 2.6.11 @@ -27102,17 +27101,19 @@ packages: node-fetch: 2.7.0 prettier: 3.2.5 ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.3.3) + ts-node: 10.9.2(@types/node@20.11.19)(typescript@5.3.3) typescript: 5.3.3 transitivePeerDependencies: - '@babel/core' - '@jest/types' + - '@swc/core' + - '@swc/wasm' - babel-jest - babel-plugin-macros - encoding - esbuild - node-notifier - supports-color - - ts-node dev: false file:projects/desktop-1.tgz(webpack@5.90.3): diff --git a/dev/tool/src/db.ts b/dev/tool/src/db.ts index 1d88ac4d26..9ba0fcedf7 100644 --- a/dev/tool/src/db.ts +++ b/dev/tool/src/db.ts @@ -18,7 +18,14 @@ import { type MeasureMetricsContext } from '@hcengineering/core' import { getMongoClient, getWorkspaceMongoDB } from '@hcengineering/mongo' -import { convertDoc, createTable, getDBClient, retryTxn, translateDomain } from '@hcengineering/postgres' +import { + convertDoc, + createTable, + getDBClient, + getDocFieldsByDomains, + retryTxn, + translateDomain +} from '@hcengineering/postgres' import { getTransactorEndpoint } from '@hcengineering/server-client' import { generateToken } from '@hcengineering/server-token' import { connect } from '@hcengineering/server-tool' @@ -54,10 +61,6 @@ export async function moveFromMongoToPG ( client.close() } -function escapeBackticks (str: string): string { - return str.replaceAll("'", "''") -} - async function moveWorkspace ( accountDb: AccountDB, mongo: MongoClient, @@ -85,6 +88,13 @@ async function moveWorkspace ( const currentIds = new Set(current.rows.map((r) => r._id)) console.log('move domain', domain) const docs: Doc[] = [] + const fields = getDocFieldsByDomains(domain) + const filedsWithData = [...fields, 'data'] + const insertFields: string[] = [] + for (const field of filedsWithData) { + insertFields.push(`"${field}"`) + } + const insertStr = insertFields.join(', ') while (true) { while (docs.length < 50000) { const doc = (await cursor.next()) as Doc | null @@ -95,18 +105,29 @@ async function moveWorkspace ( if (docs.length === 0) break while (docs.length > 0) { const part = docs.splice(0, 500) - const vals = part - .map((doc) => { - const d = convertDoc(doc, ws.workspace) - return `('${d._id}', '${d.workspaceId}', '${d._class}', '${d.createdBy ?? d.modifiedBy}', '${d.modifiedBy}', ${d.modifiedOn}, ${d.createdOn ?? d.modifiedOn}, '${d.space}', ${ - d.attachedTo != null ? `'${d.attachedTo}'` : 'NULL' - }, '${escapeBackticks(JSON.stringify(d.data))}')` - }) - .join(', ') + const values: any[] = [] + const vars: string[] = [] + let index = 1 + for (let i = 0; i < part.length; i++) { + const doc = part[i] + const variables: string[] = [] + const d = convertDoc(domain, doc, ws.workspace) + values.push(d.workspaceId) + variables.push(`$${index++}`) + for (const field of fields) { + values.push(d[field]) + variables.push(`$${index++}`) + } + values.push(d.data) + variables.push(`$${index++}`) + vars.push(`(${variables.join(', ')})`) + } + const vals = vars.join(',') try { await retryTxn(pgClient, async (client) => { await client.query( - `INSERT INTO ${translateDomain(domain)} (_id, "workspaceId", _class, "createdBy", "modifiedBy", "modifiedOn", "createdOn", space, "attachedTo", data) VALUES ${vals}` + `INSERT INTO ${translateDomain(domain)} ("workspaceId", ${insertStr}) VALUES ${vals}`, + values ) }) } catch (err) { diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 9bc6e84c8a..ff43160457 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -371,7 +371,7 @@ export function devTool ( lastProcessingTime: Date.now() + 1000 * 60 }) - await createWorkspace(measureCtx, version, brandingObj, wsInfo, txes, migrateOperations) + await createWorkspace(measureCtx, version, brandingObj, wsInfo, txes, migrateOperations, undefined, true) await updateWorkspace(db, wsInfo, { mode: 'active', @@ -1717,7 +1717,7 @@ export function devTool ( lastProcessingTime: Date.now() + 1000 * 60 }) - await createWorkspace(measureCtx, version, null, wsInfo, txes, migrateOperations) + await createWorkspace(measureCtx, version, null, wsInfo, txes, migrateOperations, undefined, true) await updateWorkspace(db, wsInfo, { mode: 'active', diff --git a/packages/core/src/__tests__/memdb.test.ts b/packages/core/src/__tests__/memdb.test.ts index 0913724475..e80d8642df 100644 --- a/packages/core/src/__tests__/memdb.test.ts +++ b/packages/core/src/__tests__/memdb.test.ts @@ -133,18 +133,6 @@ describe('memdb', () => { }) const objClass = (await model.findAll(core.class.Class, { _id: core.class.Obj }))[0] as any expect(objClass['test:mixin:TestMixin'].arr).toEqual(expect.arrayContaining(['hello'])) - - await ops.updateDoc(test.mixin.TestMixin, core.space.Model, core.class.Obj as unknown as Ref, { - $pushMixin: { - $mixin: test.mixin.TestMixin, - values: { - arr: 'there' - } - } - }) - - const objClass2 = (await model.findAll(core.class.Class, { _id: core.class.Obj }))[0] as any - expect(objClass2['test:mixin:TestMixin'].arr).toEqual(expect.arrayContaining(['hello', 'there'])) }) it('should allow delete', async () => { diff --git a/packages/core/src/operator.ts b/packages/core/src/operator.ts index f929351ee6..612426b066 100644 --- a/packages/core/src/operator.ts +++ b/packages/core/src/operator.ts @@ -116,37 +116,6 @@ function $update (document: Doc, keyval: Record): void { } } -function $move (document: Doc, keyval: Record): void { - const doc = document as any - for (const key in keyval) { - if (doc[key] === undefined) { - doc[key] = [] - } - const arr = doc[key] as Array - const desc = keyval[key] - doc[key] = (arr ?? []).filter((val) => val !== desc.$value) - doc[key].splice(desc.$position, 0, desc.$value) - } -} - -function $pushMixin (document: Doc, options: any): void { - const doc = document as any - const mixinId = options.$mixin - if (mixinId === undefined) { - throw new Error('$mixin must be specified for $push_mixin operation') - } - const mixin = doc[mixinId] - const keyval = options.values - for (const key in keyval) { - const arr = mixin[key] - if (arr == null) { - mixin[key] = [keyval[key]] - } else { - arr.push(keyval[key]) - } - } -} - function $inc (document: Doc, keyval: Record): void { const doc = document as unknown as Record for (const key in keyval) { @@ -180,8 +149,6 @@ const operators: Record = { $push, $pull, $update, - $move, - $pushMixin, $inc, $unset, $rename diff --git a/packages/core/src/tx.ts b/packages/core/src/tx.ts index 61fda47304..ae36c4e936 100644 --- a/packages/core/src/tx.ts +++ b/packages/core/src/tx.ts @@ -247,7 +247,6 @@ export type OmitNever = Omit> export interface PushOptions { $push?: Partial>>> $pull?: Partial>>> - $move?: Partial>>> } /** @@ -269,16 +268,6 @@ export interface SetEmbeddedOptions { $update?: Partial>>> } -/** - * @public - */ -export interface PushMixinOptions { - $pushMixin?: { - $mixin: Ref> - values: Partial>> - } -} - /** * @public */ @@ -299,7 +288,6 @@ export interface SpaceUpdate { export type DocumentUpdate = Partial> & PushOptions & SetEmbeddedOptions & -PushMixinOptions & IncOptions & SpaceUpdate diff --git a/packages/model/src/migration.ts b/packages/model/src/migration.ts index 03f79a1443..8d0e7ac17d 100644 --- a/packages/model/src/migration.ts +++ b/packages/model/src/migration.ts @@ -29,7 +29,7 @@ import { ModelLogger } from './utils' * @public */ export type MigrateUpdate = Partial & -Omit, '$move'> & +PushOptions & IncOptions & UnsetOptions & Record diff --git a/server-plugins/contact-resources/src/index.ts b/server-plugins/contact-resources/src/index.ts index 391e08524f..0ccbbb07a6 100644 --- a/server-plugins/contact-resources/src/index.ts +++ b/server-plugins/contact-resources/src/index.ts @@ -117,7 +117,7 @@ async function createPersonSpace ( person: Ref, control: TriggerControl ): Promise[]> { - const personSpace = (await control.findAll(control.ctx, contact.class.PersonSpace, { person }, { limit: 1 })).shift() + const personSpace = (await control.findAll(control.ctx, contact.class.PersonSpace, { person }, { limit: 1 }))[0] if (personSpace !== undefined) { const toAdd = account.filter((it) => !personSpace.members.includes(it)) if (toAdd.length === 0) return [] diff --git a/server-plugins/notification-resources/src/index.ts b/server-plugins/notification-resources/src/index.ts index e2fb7f5cbc..b3c35d5f65 100644 --- a/server-plugins/notification-resources/src/index.ts +++ b/server-plugins/notification-resources/src/index.ts @@ -1717,7 +1717,7 @@ async function updateCollaborators ( if (hierarchy.classHierarchyMixin(objectClass, activity.mixin.ActivityDoc) === undefined) return res - const contexts = await control.findAll(control.ctx, notification.class.DocNotifyContext, { attachedTo: objectId }) + const contexts = await control.findAll(control.ctx, notification.class.DocNotifyContext, { objectId }) const addedInfo = await getUsersInfo(ctx, toAdd as Ref[], control) for (const addedUser of addedInfo.values()) { diff --git a/server/backup/src/storage.ts b/server/backup/src/storage.ts index 18e845c7c5..a343a6437f 100644 --- a/server/backup/src/storage.ts +++ b/server/backup/src/storage.ts @@ -118,6 +118,7 @@ class AdapterStorage implements BackupStorage { */ export async function createFileBackupStorage (fileName: string): Promise { if (!existsSync(fileName)) { + console.log(__dirname) await mkdir(fileName, { recursive: true }) } return new FileStorage(fileName) diff --git a/server/datalake/package.json b/server/datalake/package.json index f947ec1cb0..809cdd2cda 100644 --- a/server/datalake/package.json +++ b/server/datalake/package.json @@ -32,7 +32,8 @@ "jest": "^29.7.0", "ts-jest": "^29.1.1", "@types/jest": "^29.5.5", - "@types/node-fetch": "~2.6.2" + "@types/node-fetch": "~2.6.2", + "ts-node": "^10.8.0" }, "dependencies": { "@hcengineering/core": "^0.6.32", diff --git a/server/datalake/src/client.ts b/server/datalake/src/client.ts index 6990cb60f9..807c63145a 100644 --- a/server/datalake/src/client.ts +++ b/server/datalake/src/client.ts @@ -49,11 +49,7 @@ type BlobUploadResult = BlobUploadSuccess | BlobUploadError /** @public */ export class Client { - private readonly endpoint: string - - constructor (host: string, port?: number) { - this.endpoint = port !== undefined ? `${host}:${port}` : host - } + constructor (private readonly endpoint: string) {} getObjectUrl (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): string { const path = `/blob/${workspace.name}/${encodeURIComponent(objectName)}` @@ -62,7 +58,14 @@ export class Client { async getObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { const url = this.getObjectUrl(ctx, workspace, objectName) - const response = await fetchSafe(ctx, url) + + let response + try { + response = await fetchSafe(ctx, url) + } catch (err) { + console.error('failed to get object', { workspace, objectName, err }) + throw err + } if (response.body == null) { ctx.error('bad datalake response', { objectName }) @@ -81,10 +84,16 @@ export class Client { ): Promise { const url = this.getObjectUrl(ctx, workspace, objectName) const headers = { - Range: `bytes=${offset}-${length ?? ''}` + Range: length !== undefined ? `bytes=${offset}-${offset + length - 1}` : `bytes=${offset}` } - const response = await fetchSafe(ctx, url, { headers }) + let response + try { + response = await fetchSafe(ctx, url, { headers }) + } catch (err) { + console.error('failed to get partial object', { workspace, objectName, err }) + throw err + } if (response.body == null) { ctx.error('bad datalake response', { objectName }) @@ -101,7 +110,13 @@ export class Client { ): Promise { const url = this.getObjectUrl(ctx, workspace, objectName) - const response = await fetchSafe(ctx, url, { method: 'HEAD' }) + let response: Response + try { + response = await fetchSafe(ctx, url, { method: 'HEAD' }) + } catch (err) { + console.error('failed to stat object', { workspace, objectName, err }) + throw err + } const headers = response.headers const lastModified = Date.parse(headers.get('Last-Modified') ?? '') @@ -117,7 +132,12 @@ export class Client { async deleteObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { const url = this.getObjectUrl(ctx, workspace, objectName) - await fetchSafe(ctx, url, { method: 'DELETE' }) + try { + await fetchSafe(ctx, url, { method: 'DELETE' }) + } catch (err) { + console.error('failed to delete object', { workspace, objectName, err }) + throw err + } } async putObject ( @@ -128,14 +148,30 @@ export class Client { metadata: ObjectMetadata, size?: number ): Promise { - if (size === undefined || size < 64 * 1024 * 1024) { - await ctx.with('direct-upload', {}, async (ctx) => { - await this.uploadWithFormData(ctx, workspace, objectName, stream, metadata) - }) - } else { - await ctx.with('signed-url-upload', {}, async (ctx) => { - await this.uploadWithSignedURL(ctx, workspace, objectName, stream, metadata) - }) + if (size === undefined) { + if (Buffer.isBuffer(stream)) { + size = stream.length + } else if (typeof stream === 'string') { + size = Buffer.byteLength(stream) + } else { + // TODO: Implement size calculation for Readable streams + ctx.warn('unknown object size', { workspace, objectName }) + } + } + + try { + if (size === undefined || size < 64 * 1024 * 1024) { + await ctx.with('direct-upload', {}, async (ctx) => { + await this.uploadWithFormData(ctx, workspace, objectName, stream, metadata) + }) + } else { + await ctx.with('signed-url-upload', {}, async (ctx) => { + await this.uploadWithSignedURL(ctx, workspace, objectName, stream, metadata) + }) + } + } catch (err) { + console.error('failed to put object', { workspace, objectName, err }) + throw err } } @@ -164,14 +200,12 @@ export class Client { const result = (await response.json()) as BlobUploadResult[] if (result.length !== 1) { - ctx.error('bad datalake response', { objectName, result }) - throw new Error('Bad datalake response') + throw new Error('Bad datalake response: ' + result.toString()) } const uploadResult = result[0] if ('error' in uploadResult) { - ctx.error('error during blob upload', { objectName, error: uploadResult.error }) throw new Error('Upload failed: ' + uploadResult.error) } } @@ -195,26 +229,43 @@ export class Client { 'x-amz-meta-last-modified': metadata.lastModified.toString() } }) - await this.signObjectComplete(ctx, workspace, objectName) - } catch { + } catch (err) { await this.signObjectDelete(ctx, workspace, objectName) + throw new Error('Failed to upload via signed URL') } + + await this.signObjectComplete(ctx, workspace, objectName) } private async signObjectSign (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { - const url = this.getSignObjectUrl(workspace, objectName) - const response = await fetchSafe(ctx, url, { method: 'POST' }) - return await response.text() + try { + const url = this.getSignObjectUrl(workspace, objectName) + const response = await fetchSafe(ctx, url, { method: 'POST' }) + return await response.text() + } catch (err: any) { + ctx.error('failed to sign object', { workspace, objectName, err }) + throw new Error('Failed to sign URL') + } } private async signObjectComplete (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { - const url = this.getSignObjectUrl(workspace, objectName) - await fetchSafe(ctx, url, { method: 'PUT' }) + try { + const url = this.getSignObjectUrl(workspace, objectName) + await fetchSafe(ctx, url, { method: 'PUT' }) + } catch (err: any) { + ctx.error('failed to complete signed url upload', { workspace, objectName, err }) + throw new Error('Failed to complete signed URL upload') + } } private async signObjectDelete (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { - const url = this.getSignObjectUrl(workspace, objectName) - await fetchSafe(ctx, url, { method: 'DELETE' }) + try { + const url = this.getSignObjectUrl(workspace, objectName) + await fetchSafe(ctx, url, { method: 'DELETE' }) + } catch (err: any) { + ctx.error('failed to abort signed url upload', { workspace, objectName, err }) + throw new Error('Failed to abort signed URL upload') + } } private getSignObjectUrl (workspace: WorkspaceId, objectName: string): string { @@ -228,12 +279,13 @@ async function fetchSafe (ctx: MeasureContext, url: string, init?: RequestInit): try { response = await fetch(url, init) } catch (err: any) { - ctx.error('network error', { error: err }) + ctx.error('network error', { err }) throw new Error(`Network error ${err}`) } if (!response.ok) { - throw new Error(response.status === 404 ? 'Not Found' : 'HTTP error ' + response.status) + const text = await response.text() + throw new Error(response.status === 404 ? 'Not Found' : 'HTTP error ' + response.status + ': ' + text) } return response diff --git a/server/datalake/src/index.ts b/server/datalake/src/index.ts index f743e97718..5cec66083e 100644 --- a/server/datalake/src/index.ts +++ b/server/datalake/src/index.ts @@ -37,7 +37,7 @@ export class DatalakeService implements StorageAdapter { static config = 'datalake' client: Client constructor (readonly opt: DatalakeConfig) { - this.client = new Client(opt.endpoint, opt.port) + this.client = new Client(opt.endpoint) } async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise {} @@ -167,23 +167,15 @@ export class DatalakeService implements StorageAdapter { } export function processConfigFromEnv (storageConfig: StorageConfiguration): string | undefined { - let endpoint = process.env.DATALAKE_ENDPOINT + const endpoint = process.env.DATALAKE_ENDPOINT if (endpoint === undefined) { return 'DATALAKE_ENDPOINT' } - let port = 80 - const sp = endpoint.split(':') - if (sp.length > 1) { - endpoint = sp[0] - port = parseInt(sp[1]) - } - const config: DatalakeConfig = { kind: 'datalake', name: 'datalake', - endpoint, - port + endpoint } storageConfig.storages.push(config) storageConfig.default = 'datalake' diff --git a/server/datalake/src/perfTest.ts b/server/datalake/src/perfTest.ts new file mode 100644 index 0000000000..8b016a25f6 --- /dev/null +++ b/server/datalake/src/perfTest.ts @@ -0,0 +1,102 @@ +import { MeasureMetricsContext, generateId } from '@hcengineering/core' +import type { StorageConfiguration } from '@hcengineering/server-core' +import { DatalakeService, processConfigFromEnv, type DatalakeConfig } from '.' + +const MB = 1024 * 1024 + +const config: StorageConfiguration = { default: 'minio', storages: [] } +const minioConfigVar = processConfigFromEnv(config) +if (minioConfigVar !== undefined || config.storages[0] === undefined) { + console.error('No Datalake config env is configured:' + minioConfigVar) + it.skip('No Datalake config env is configured', async () => {}) + process.exit(1) +} +const toolCtx = new MeasureMetricsContext('test', {}) +const storageService = new DatalakeService({ ...(config.storages[0] as DatalakeConfig) }) + +async function doTest (): Promise { + const genWorkspaceId1 = generateId() + + const ws1 = { name: genWorkspaceId1 } + await storageService.make(toolCtx, ws1) + + /// /////// Uploads + console.log('upload 1mb test') + let st1 = Date.now() + const sz = 10 + const stream = Buffer.alloc(sz * 1024 * 1024) + for (let i = 0; i < 10; i++) { + // We need 1Mb random file to check upload speed. + const st = Date.now() + await storageService.put(toolCtx, ws1, `testObject.${i}`, stream, 'application/octet-stream', stream.length) + console.log('upload time', Date.now() - st) + } + let now = Date.now() + console.log(`upload performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`) + + /// // Downloads 1 + console.log('download 1mb test') + st1 = Date.now() + for (let i = 0; i < 10; i++) { + // We need 1Mb random file to check upload speed. + const st = Date.now() + await storageService.read(toolCtx, ws1, `testObject.${i}`) + console.log('download time', Date.now() - st) + } + + now = Date.now() + console.log(`download performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`) + + /// Downloads 2 + st1 = Date.now() + for (let i = 0; i < 10; i++) { + // We need 1Mb random file to check upload speed. + const st = Date.now() + const readable = await storageService.get(toolCtx, ws1, `testObject.${i}`) + const chunks: Buffer[] = [] + readable.on('data', (chunk) => { + chunks.push(chunk) + }) + await new Promise((resolve) => { + readable.on('end', () => { + resolve() + readable.destroy() + }) + }) + console.log('download time 2', Date.now() - st) + } + + now = Date.now() + console.log(`download performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`) + + /// Downloads 3 + console.log('download partial test') + st1 = Date.now() + for (let i = 0; i < 10; i++) { + // We need 1Mb random file to check upload speed. + const st = Date.now() + for (let i = 0; i < sz; i++) { + const readable = await storageService.partial(toolCtx, ws1, `testObject.${i}`, i * MB, MB) + const chunks: Buffer[] = [] + readable.on('data', (chunk) => { + chunks.push(chunk) + }) + await new Promise((resolve) => { + readable.on('end', () => { + resolve() + readable.destroy() + }) + }) + } + console.log('download time 2', Date.now() - st) + } + + now = Date.now() + console.log(`download performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`) +} + +void doTest().catch((err) => { + console.error(err) +}) + +console.log('done') diff --git a/server/mongo/src/storage.ts b/server/mongo/src/storage.ts index b3bd76597e..2d7d28aff4 100644 --- a/server/mongo/src/storage.ts +++ b/server/mongo/src/storage.ts @@ -1407,23 +1407,6 @@ class MongoAdapter extends MongoAdapterBase { modifiedOn: tx.modifiedOn } if (isOperator(tx.attributes)) { - const operator = Object.keys(tx.attributes)[0] - if (operator === '$move') { - const keyval = (tx.attributes as any).$move - const arr = tx.mixin + '.' + Object.keys(keyval)[0] - const desc = keyval[arr] - const ops: any = [ - { updateOne: { filter, update: { $pull: { [arr]: desc.$value } } } }, - { - updateOne: { - filter, - update: { $set: modifyOp, $push: { [arr]: { $each: [desc.$value], $position: desc.$position } } } - } - } - ] - bulk.bulkOperations.push(...ops) - return - } const update = { ...this.translateMixinAttrs(tx.mixin, tx.attributes), $set: { ...modifyOp } } bulk.bulkOperations.push({ @@ -1475,46 +1458,7 @@ class MongoAdapter extends MongoAdapterBase { protected txUpdateDoc (bulk: OperationBulk, tx: TxUpdateDoc): void { if (isOperator(tx.operations)) { const operator = Object.keys(tx.operations)[0] - if (operator === '$move') { - const keyval = (tx.operations as any).$move - const arr = Object.keys(keyval)[0] - const desc = keyval[arr] - - const ops: any = [ - { - updateOne: { - filter: { _id: tx.objectId }, - update: { - $set: { - '%hash%': null - }, - $pull: { - [arr]: desc.$value - } - } - } - }, - { - updateOne: { - filter: { _id: tx.objectId }, - update: { - $set: { - modifiedBy: tx.modifiedBy, - modifiedOn: tx.modifiedOn, - '%hash%': null - }, - $push: { - [arr]: { - $each: [desc.$value], - $position: desc.$position - } - } - } - } - } - ] - bulk.bulkOperations.push(...ops) - } else if (operator === '$update') { + if (operator === '$update') { const keyval = (tx.operations as any).$update const arr = Object.keys(keyval)[0] const desc = keyval[arr] as QueryUpdate diff --git a/server/postgres/src/index.ts b/server/postgres/src/index.ts index 0db9671940..12dce9b0eb 100644 --- a/server/postgres/src/index.ts +++ b/server/postgres/src/index.ts @@ -14,4 +14,4 @@ // export * from './storage' -export { getDBClient, convertDoc, createTable, retryTxn, translateDomain } from './utils' +export { getDBClient, convertDoc, createTable, retryTxn, translateDomain, getDocFieldsByDomains } from './utils' diff --git a/server/postgres/src/schemas.ts b/server/postgres/src/schemas.ts new file mode 100644 index 0000000000..9215cd65f6 --- /dev/null +++ b/server/postgres/src/schemas.ts @@ -0,0 +1,36 @@ +import { DOMAIN_SPACE } from '@hcengineering/core' + +type DataType = 'bigint' | 'bool' | 'text' | 'text[]' + +type Schema = Record + +export const defaultSchema: Schema = { + _id: ['text', true], + _class: ['text', true], + space: ['text', true], + modifiedBy: ['text', true], + createdBy: ['text', false], + modifiedOn: ['bigint', true], + createdOn: ['bigint', false], + attachedTo: ['text', false] +} + +export const spaceSchema: Schema = { + _id: ['text', true], + _class: ['text', true], + space: ['text', true], + modifiedBy: ['text', true], + createdBy: ['text', false], + modifiedOn: ['bigint', true], + createdOn: ['bigint', false], + private: ['bool', true], + members: ['text[]', true] +} + +export const domainSchemas: Record = { + [DOMAIN_SPACE]: spaceSchema +} + +export function getSchema (domain: string): Schema { + return domainSchemas[domain] ?? defaultSchema +} diff --git a/server/postgres/src/storage.ts b/server/postgres/src/storage.ts index 332964e3bd..09c29a1c5d 100644 --- a/server/postgres/src/storage.ts +++ b/server/postgres/src/storage.ts @@ -66,33 +66,41 @@ import { } from '@hcengineering/server-core' import { createHash } from 'crypto' import { type Pool, type PoolClient } from 'pg' +import { type ValueType } from './types' import { convertDoc, createTable, DBCollectionHelper, - docFields, escapeBackticks, getDBClient, - getUpdateValue, + getDocFieldsByDomains, isDataField, isOwner, type JoinProps, + Mutex, parseDoc, parseDocWithProjection, + parseUpdate, type PostgresClientReference, - retryTxn, translateDomain } from './utils' abstract class PostgresAdapterBase implements DbAdapter { protected readonly _helper: DBCollectionHelper protected readonly tableFields = new Map() - protected readonly retryTxn = async (fn: (client: PoolClient) => Promise): Promise => { - return await retryTxn(this.client, fn) + protected readonly queue: ((client: PoolClient) => Promise)[] = [] + private readonly mutex = new Mutex() + + protected readonly retryTxn = async (fn: (client: PoolClient) => Promise): Promise => { + await this.mutex.runExclusive(async () => { + await this.processOps(this.txConnection, fn) + }) } constructor ( protected readonly client: Pool, + protected readonly connection: PoolClient, + protected readonly txConnection: PoolClient, protected readonly refClient: PostgresClientReference, protected readonly workspaceId: WorkspaceId, protected readonly hierarchy: Hierarchy, @@ -101,6 +109,33 @@ abstract class PostgresAdapterBase implements DbAdapter { this._helper = new DBCollectionHelper(this.client, this.workspaceId) } + private async processOps (client: PoolClient, operation: (client: PoolClient) => Promise): Promise { + const backoffInterval = 100 // millis + const maxTries = 5 + let tries = 0 + + while (true) { + await client.query('BEGIN;') + tries++ + + try { + const result = await operation(client) + await client.query('COMMIT;') + return result + } catch (err: any) { + await client.query('ROLLBACK;') + + if (err.code !== '40001' || tries === maxTries) { + throw err + } else { + console.log('Transaction failed. Retrying.') + console.log(err.message) + await new Promise((resolve) => setTimeout(resolve, tries * backoffInterval)) + } + } + } + } + async traverse( _domain: Domain, query: DocumentQuery, @@ -163,6 +198,8 @@ abstract class PostgresAdapterBase implements DbAdapter { abstract init (): Promise async close (): Promise { + this.txConnection.release() + this.connection.release() this.refClient.close() } @@ -178,7 +215,7 @@ abstract class PostgresAdapterBase implements DbAdapter { sqlChunks.push(`LIMIT ${options.limit}`) } const finalSql: string = [select, ...sqlChunks].join(' ') - const result = await this.client.query(finalSql) + const result = await this.connection.query(finalSql) return result.rows.map((p) => parseDocWithProjection(p, options?.projection)) } @@ -190,7 +227,7 @@ abstract class PostgresAdapterBase implements DbAdapter { continue } if (typeof val === 'number') { - res.push(`${this.transformKey(core.class.Doc, key, false)} ${val === 1 ? 'ASC' : 'DESC'}`) + res.push(`${this.transformKey(domain, core.class.Doc, key, false)} ${val === 1 ? 'ASC' : 'DESC'}`) } else { // todo handle custom sorting } @@ -203,8 +240,8 @@ abstract class PostgresAdapterBase implements DbAdapter { res.push(`"workspaceId" = '${this.workspaceId.name}'`) for (const key in query) { const value = query[key] - const tkey = this.transformKey(core.class.Doc, key, false) - const translated = this.translateQueryValue(tkey, value, false) + const tkey = this.transformKey(domain, core.class.Doc, key, false) + const translated = this.translateQueryValue(tkey, value, 'common') if (translated !== undefined) { res.push(translated) } @@ -231,18 +268,26 @@ abstract class PostgresAdapterBase implements DbAdapter { if (doc === undefined) continue const prevAttachedTo = (doc as any).attachedTo TxProcessor.applyUpdate(doc, operations) - const converted = convertDoc(doc, this.workspaceId.name) - const updates: string[] = [] - const { space, attachedTo, ...ops } = operations as any + const converted = convertDoc(domain, doc, this.workspaceId.name) + let paramsIndex = 3 const params: any[] = [doc._id, this.workspaceId.name] - if (space !== undefined) { - updates.push(`space = '${space}'`) + const updates: string[] = [] + const { extractedFields, remainingData } = parseUpdate(domain, operations) + const newAttachedTo = (doc as any).attachedTo + if (Object.keys(extractedFields).length > 0) { + for (const key in extractedFields) { + const val = (extractedFields as any)[key] + if (key === 'attachedTo' && val === prevAttachedTo) continue + updates.push(`"${key}" = $${paramsIndex++}`) + params.push(val) + } + } else if (prevAttachedTo !== undefined && prevAttachedTo !== newAttachedTo) { + updates.push(`"attachedTo" = $${paramsIndex++}`) + params.push(newAttachedTo) } - if ((doc as any).attachedTo !== prevAttachedTo) { - updates.push(`"attachedTo" = ${attachedTo != null ? "'" + attachedTo + "'" : 'NULL'}`) - } - if (Object.keys(ops).length > 0) { - updates.push('data = $3') + + if (Object.keys(remainingData).length > 0) { + updates.push(`data = $${paramsIndex++}`) params.push(converted.data) } await client.query( @@ -278,7 +323,7 @@ abstract class PostgresAdapterBase implements DbAdapter { if (options?.total === true) { const totalReq = `SELECT COUNT(${domain}._id) as count FROM ${domain}` const totalSql = [totalReq, ...sqlChunks].join(' ') - const totalResult = await this.client.query(totalSql) + const totalResult = await this.connection.query(totalSql) const parsed = Number.parseInt(totalResult.rows[0]?.count ?? '') total = Number.isNaN(parsed) ? 0 : parsed } @@ -290,7 +335,7 @@ abstract class PostgresAdapterBase implements DbAdapter { } const finalSql: string = [select, ...sqlChunks].join(' ') - const result = await this.client.query(finalSql) + const result = await this.connection.query(finalSql) if (options?.lookup === undefined) { return toFindResult( result.rows.map((p) => parseDocWithProjection(p, options?.projection)), @@ -315,9 +360,9 @@ abstract class PostgresAdapterBase implements DbAdapter { return } if (query.space === acc._id) return - const key = domain === DOMAIN_SPACE ? '_id' : domain === DOMAIN_TX ? 'data ->> "objectSpace"' : 'space' - const privateCheck = domain === DOMAIN_SPACE ? " OR sec.data ->> 'private' = 'false'" : '' - const q = `(sec.data -> 'members' @> '"${acc._id}"' OR sec."_class" = '${core.class.SystemSpace}'${privateCheck})` + const key = domain === DOMAIN_SPACE ? '_id' : domain === DOMAIN_TX ? "data ->> 'objectSpace'" : 'space' + const privateCheck = domain === DOMAIN_SPACE ? ' OR sec.private = false' : '' + const q = `(sec.members @> '{"${acc._id}"}' OR sec."_class" = '${core.class.SystemSpace}'${privateCheck})` return `INNER JOIN ${translateDomain(DOMAIN_SPACE)} AS sec ON sec._id = ${domain}.${key} AND sec."workspaceId" = '${this.workspaceId.name}' AND ${q}` } } @@ -545,7 +590,7 @@ abstract class PostgresAdapterBase implements DbAdapter { const _class = Array.isArray(value) ? value[0] : value const nested = Array.isArray(value) ? value[1] : undefined const domain = translateDomain(this.hierarchy.getDomain(_class)) - const tkey = domain === DOMAIN_MODEL ? key : this.transformKey(clazz, key) + const tkey = domain === DOMAIN_MODEL ? key : this.transformKey(baseDomain, clazz, key) const as = `lookup_${domain}_${parentKey !== undefined ? parentKey + '_lookup_' + key : key}` res.push({ isReverse: false, @@ -643,9 +688,9 @@ abstract class PostgresAdapterBase implements DbAdapter { } const value = query[key] if (value === undefined) continue - const isDataArray = this.checkDataArray(_class, key) - const tkey = this.getKey(_class, baseDomain, key, joins, isDataArray) - const translated = this.translateQueryValue(tkey, value, isDataArray) + const valueType = this.getValueType(_class, key) + const tkey = this.getKey(_class, baseDomain, key, joins, valueType === 'dataArray') + const translated = this.translateQueryValue(tkey, value, valueType) if (translated !== undefined) { res.push(translated) } @@ -653,22 +698,23 @@ abstract class PostgresAdapterBase implements DbAdapter { return res.join(' AND ') } - private checkDataArray(_class: Ref>, key: string): boolean { + private getValueType(_class: Ref>, key: string): ValueType { const splitted = key.split('.') const mixinOrKey = splitted[0] + const domain = this.hierarchy.getDomain(_class) if (this.hierarchy.isMixin(mixinOrKey as Ref>)) { key = splitted.slice(1).join('.') const attr = this.hierarchy.findAttribute(mixinOrKey as Ref>, key) - if (attr !== undefined) { - return attr.type._class === core.class.ArrOf + if (attr !== undefined && attr.type._class === core.class.ArrOf) { + return isDataField(domain, key) ? 'dataArray' : 'array' } - return false + return 'common' } else { const attr = this.hierarchy.findAttribute(_class, key) - if (attr !== undefined) { - return attr.type._class === core.class.ArrOf + if (attr !== undefined && attr.type._class === core.class.ArrOf) { + return isDataField(domain, key) ? 'dataArray' : 'array' } - return false + return 'common' } } @@ -731,12 +777,12 @@ abstract class PostgresAdapterBase implements DbAdapter { isDataArray: boolean = false ): string { if (key.startsWith('$lookup')) { - return this.transformLookupKey(key, joins, isDataArray) + return this.transformLookupKey(baseDomain, key, joins, isDataArray) } - return `${baseDomain}.${this.transformKey(_class, key, isDataArray)}` + return `${baseDomain}.${this.transformKey(baseDomain, _class, key, isDataArray)}` } - private transformLookupKey (key: string, joins: JoinProps[], isDataArray: boolean = false): string { + private transformLookupKey (domain: string, key: string, joins: JoinProps[], isDataArray: boolean = false): string { const arr = key.split('.').filter((p) => p !== '$lookup') const tKey = arr.pop() ?? '' const path = arr.join('.') @@ -747,12 +793,17 @@ abstract class PostgresAdapterBase implements DbAdapter { if (join.isReverse) { return `${join.toAlias}->'${tKey}'` } - const res = isDataField(tKey) ? (isDataArray ? `data->'${tKey}'` : `data#>>'{${tKey}}'`) : key + const res = isDataField(domain, tKey) ? (isDataArray ? `data->'${tKey}'` : `data#>>'{${tKey}}'`) : key return `${join.toAlias}.${res}` } - private transformKey(_class: Ref>, key: string, isDataArray: boolean = false): string { - if (!isDataField(key)) return `"${key}"` + private transformKey( + domain: string, + _class: Ref>, + key: string, + isDataArray: boolean = false + ): string { + if (!isDataField(domain, key)) return `"${key}"` const arr = key.split('.').filter((p) => p) let tKey = '' let isNestedField = false @@ -799,7 +850,7 @@ abstract class PostgresAdapterBase implements DbAdapter { return key } - private translateQueryValue (tkey: string, value: any, isDataArray: boolean): string | undefined { + private translateQueryValue (tkey: string, value: any, type: ValueType): string | undefined { if (value === null) { return `${tkey} IS NULL` } else if (typeof value === 'object' && !Array.isArray(value)) { @@ -825,7 +876,7 @@ abstract class PostgresAdapterBase implements DbAdapter { break case '$in': res.push( - isDataArray + type !== 'common' ? `${tkey} ?| array[${val.length > 0 ? val.map((v: any) => `'${v}'`).join(', ') : 'NULL'}]` : `${tkey} IN (${val.length > 0 ? val.map((v: any) => `'${v}'`).join(', ') : 'NULL'})` ) @@ -856,9 +907,11 @@ abstract class PostgresAdapterBase implements DbAdapter { } return res.length === 0 ? undefined : res.join(' AND ') } - return isDataArray - ? `${tkey} @> '${typeof value === 'string' ? '"' + value + '"' : value}'` - : `${tkey} = '${value}'` + return type === 'common' + ? `${tkey} = '${value}'` + : type === 'array' + ? `${tkey} @> '${typeof value === 'string' ? '{"' + value + '"}' : value}'` + : `${tkey} @> '${typeof value === 'string' ? '"' + value + '"' : value}'` } private getProjectionsAliases (join: JoinProps): string[] { @@ -876,8 +929,9 @@ abstract class PostgresAdapterBase implements DbAdapter { `(SELECT jsonb_agg(${join.toAlias}.*) FROM ${join.table} AS ${join.toAlias} WHERE ${join.fromAlias}.${join.fromField} = ${join.toAlias}."${join.toField}" ${classsesQuery}) AS ${join.toAlias}` ] } + const fields = getDocFieldsByDomains(join.table) const res: string[] = [] - for (const key of [...docFields, 'data']) { + for (const key of [...fields, 'data']) { res.push(`${join.toAlias}."${key}" as "lookup_${join.path.replaceAll('.', '_')}_${key}"`) } return res @@ -897,7 +951,7 @@ abstract class PostgresAdapterBase implements DbAdapter { res.push(`${baseDomain}.*`) } else { for (const key in projection) { - if (isDataField(key)) { + if (isDataField(baseDomain, key)) { if (!dataAdded) { res.push(`${baseDomain}.data as data`) dataAdded = true @@ -1046,7 +1100,7 @@ abstract class PostgresAdapterBase implements DbAdapter { if (docs.length === 0) { return [] } - const res = await this.client.query( + const res = await this.connection.query( `SELECT * FROM ${translateDomain(domain)} WHERE _id = ANY($1) AND "workspaceId" = $2`, [docs, this.workspaceId.name] ) @@ -1056,37 +1110,59 @@ abstract class PostgresAdapterBase implements DbAdapter { async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise { const arr = docs.concat() - return await this.retryTxn(async (client) => { - while (arr.length > 0) { - const part = arr.splice(0, 500) - const vals = part - .map((doc) => { - const d = convertDoc(doc, this.workspaceId.name) - return `('${d._id}', '${d.workspaceId}', '${d._class}', '${d.createdBy ?? d.modifiedBy}', '${d.modifiedBy}', ${d.modifiedOn}, ${d.createdOn ?? d.modifiedOn}, '${d.space}', ${ - d.attachedTo != null ? `'${d.attachedTo}'` : 'NULL' - }, '${escapeBackticks(JSON.stringify(d.data))}')` - }) - .join(', ') - await client.query( - `INSERT INTO ${translateDomain(domain)} (_id, "workspaceId", _class, "createdBy", "modifiedBy", "modifiedOn", "createdOn", space, "attachedTo", data) VALUES ${vals} - ON CONFLICT (_id, "workspaceId") DO UPDATE SET _class = EXCLUDED._class, "createdBy" = EXCLUDED."createdBy", "modifiedBy" = EXCLUDED."modifiedBy", "modifiedOn" = EXCLUDED."modifiedOn", "createdOn" = EXCLUDED."createdOn", space = EXCLUDED.space, "attachedTo" = EXCLUDED."attachedTo", data = EXCLUDED.data;` - ) + const fields = getDocFieldsByDomains(domain) + const filedsWithData = [...fields, 'data'] + const insertFields: string[] = [] + const onConflict: string[] = [] + for (const field of filedsWithData) { + insertFields.push(`"${field}"`) + onConflict.push(`"${field}" = EXCLUDED."${field}"`) + } + const insertStr = insertFields.join(', ') + const onConflictStr = onConflict.join(', ') + while (arr.length > 0) { + const part = arr.splice(0, 500) + const values: any[] = [] + const vars: string[] = [] + let index = 1 + for (let i = 0; i < part.length; i++) { + const doc = part[i] + const variables: string[] = [] + const d = convertDoc(domain, doc, this.workspaceId.name) + values.push(d.workspaceId) + variables.push(`$${index++}`) + for (const field of fields) { + values.push(d[field]) + variables.push(`$${index++}`) + } + values.push(d.data) + variables.push(`$${index++}`) + vars.push(`(${variables.join(', ')})`) } - }) + + const vals = vars.join(',') + await this.retryTxn(async (client) => { + await client.query( + `INSERT INTO ${translateDomain(domain)} ("workspaceId", ${insertStr}) VALUES ${vals} + ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr};`, + values + ) + }) + } } async clean (ctx: MeasureContext, domain: Domain, docs: Ref[]): Promise { - await this.client.query(`DELETE FROM ${translateDomain(domain)} WHERE _id = ANY($1) AND "workspaceId" = $2`, [ + await this.connection.query(`DELETE FROM ${translateDomain(domain)} WHERE _id = ANY($1) AND "workspaceId" = $2`, [ docs, this.workspaceId.name ]) } async groupBy(ctx: MeasureContext, domain: Domain, field: string): Promise> { - const key = isDataField(field) ? `data ->> '${field}'` : `"${field}"` + const key = isDataField(domain, field) ? `data ->> '${field}'` : `"${field}"` const result = await ctx.with('groupBy', { domain }, async (ctx) => { try { - const result = await this.client.query( + const result = await this.connection.query( `SELECT DISTINCT ${key} as ${field} FROM ${translateDomain(domain)} WHERE "workspaceId" = $1`, [this.workspaceId.name] ) @@ -1117,19 +1193,18 @@ abstract class PostgresAdapterBase implements DbAdapter { ;(op as any)['%hash%'] = null } TxProcessor.applyUpdate(doc, op) - const converted = convertDoc(doc, this.workspaceId.name) + const converted = convertDoc(domain, doc, this.workspaceId.name) const updates: string[] = [] - const { space, attachedTo, ...data } = op as any + let paramsIndex = 3 + const { extractedFields, remainingData } = parseUpdate(domain, op) const params: any[] = [doc._id, this.workspaceId.name] - if (space !== undefined) { - updates.push(`space = '${space}'`) + for (const key in extractedFields) { + updates.push(`"${key}" = $${paramsIndex++}`) + params.push((extractedFields as any)[key]) } - if (attachedTo !== undefined) { - updates.push(`"attachedTo" = ${attachedTo != null ? "'" + attachedTo + "'" : 'NULL'}`) - } - if (Object.keys(data).length > 0) { - updates.push('data = $3') + if (Object.keys(remainingData).length > 0) { + updates.push(`data = $${paramsIndex++}`) params.push(converted.data) } await client.query( @@ -1145,22 +1220,41 @@ abstract class PostgresAdapterBase implements DbAdapter { } async insert (domain: string, docs: Doc[]): Promise { - return await this.retryTxn(async (client) => { - while (docs.length > 0) { - const part = docs.splice(0, 500) - const vals = part - .map((doc) => { - const d = convertDoc(doc, this.workspaceId.name) - return `('${d._id}', '${d.workspaceId}', '${d._class}', '${d.createdBy ?? d.modifiedBy}', '${d.modifiedBy}', ${d.modifiedOn}, ${d.createdOn ?? d.modifiedOn}, '${d.space}', ${ - d.attachedTo != null ? `'${d.attachedTo}'` : 'NULL' - }, '${escapeBackticks(JSON.stringify(d.data))}')` - }) - .join(', ') - await client.query( - `INSERT INTO ${translateDomain(domain)} (_id, "workspaceId", _class, "createdBy", "modifiedBy", "modifiedOn", "createdOn", space, "attachedTo", data) VALUES ${vals}` - ) + const fields = getDocFieldsByDomains(domain) + const filedsWithData = [...fields, 'data'] + const insertFields: string[] = [] + for (const field of filedsWithData) { + insertFields.push(`"${field}"`) + } + const insertStr = insertFields.join(', ') + while (docs.length > 0) { + const part = docs.splice(0, 500) + const values: any[] = [] + const vars: string[] = [] + let index = 1 + for (let i = 0; i < part.length; i++) { + const doc = part[i] + const variables: string[] = [] + const d = convertDoc(domain, doc, this.workspaceId.name) + values.push(d.workspaceId) + variables.push(`$${index++}`) + for (const field of fields) { + values.push(d[field]) + variables.push(`$${index++}`) + } + values.push(d.data) + variables.push(`$${index++}`) + vars.push(`(${variables.join(', ')})`) } - }) + const vals = vars.join(',') + await this.retryTxn(async (client) => { + await client.query( + `INSERT INTO ${translateDomain(domain)} ("workspaceId", ${insertStr}) VALUES ${vals}`, + values + ) + }) + } + return {} } } @@ -1218,18 +1312,30 @@ class PostgresAdapter extends PostgresAdapterBase { } private async txMixin (ctx: MeasureContext, tx: TxMixin): Promise { - return await ctx.with('tx-mixin', { _class: tx.objectClass, mixin: tx.mixin }, async () => { - return await this.retryTxn(async (client) => { + await ctx.with('tx-mixin', { _class: tx.objectClass, mixin: tx.mixin }, async () => { + await this.retryTxn(async (client) => { const doc = await this.findDoc(ctx, client, tx.objectClass, tx.objectId, true) - if (doc === undefined) return {} + if (doc === undefined) return TxProcessor.updateMixin4Doc(doc, tx) - const converted = convertDoc(doc, this.workspaceId.name) + const domain = this.hierarchy.getDomain(tx.objectClass) + const converted = convertDoc(domain, doc, this.workspaceId.name) + const updates: string[] = ['"modifiedBy" = $1', '"modifiedOn" = $2'] + let paramsIndex = 5 + const { extractedFields } = parseUpdate(domain, tx.attributes as Partial) + const params: any[] = [tx.modifiedBy, tx.modifiedOn, tx.objectId, this.workspaceId.name] + for (const key in extractedFields) { + updates.push(`"${key}" = $${paramsIndex++}`) + params.push(converted[key]) + } + updates.push(`data = $${paramsIndex++}`) + params.push(converted.data) await client.query( - `UPDATE ${translateDomain(this.hierarchy.getDomain(tx.objectClass))} SET "modifiedBy" = $1, "modifiedOn" = $2, data = $5 WHERE _id = $3 AND "workspaceId" = $4`, - [tx.modifiedBy, tx.modifiedOn, tx.objectId, this.workspaceId.name, converted.data] + `UPDATE ${translateDomain(domain)} SET ${updates.join(', ')} WHERE _id = $3 AND "workspaceId" = $4`, + params ) }) }) + return {} } async tx (ctx: MeasureContext, ...txes: Tx[]): Promise { @@ -1277,22 +1383,22 @@ class PostgresAdapter extends PostgresAdapterBase { doc = await this.findDoc(ctx, client, tx.objectClass, tx.objectId, true) if (doc === undefined) return {} TxProcessor.applyUpdate(doc, ops) - const converted = convertDoc(doc, this.workspaceId.name) + const domain = this.hierarchy.getDomain(tx.objectClass) + const converted = convertDoc(domain, doc, this.workspaceId.name) const updates: string[] = ['"modifiedBy" = $1', '"modifiedOn" = $2'] - const { space, attachedTo, ...data } = ops as any + let paramsIndex = 5 + const { extractedFields, remainingData } = parseUpdate(domain, ops) const params: any[] = [tx.modifiedBy, tx.modifiedOn, tx.objectId, this.workspaceId.name] - if (space !== undefined) { - updates.push(`space = '${space}'`) + for (const key in extractedFields) { + updates.push(`"${key}" = $${paramsIndex++}`) + params.push(converted[key]) } - if (attachedTo !== undefined) { - updates.push(`"attachedTo" = ${attachedTo != null ? "'" + attachedTo + "'" : 'NULL'}`) - } - if (Object.keys(data).length > 0) { - updates.push('data = $5') + if (Object.keys(remainingData).length > 0) { + updates.push(`data = $${paramsIndex++}`) params.push(converted.data) } await client.query( - `UPDATE ${translateDomain(this.hierarchy.getDomain(tx.objectClass))} SET ${updates.join(', ')} WHERE _id = $3 AND "workspaceId" = $4`, + `UPDATE ${translateDomain(domain)} SET ${updates.join(', ')} WHERE _id = $3 AND "workspaceId" = $4`, params ) }) @@ -1315,21 +1421,24 @@ class PostgresAdapter extends PostgresAdapterBase { ): Promise { return await ctx.with('update jsonb_set', {}, async () => { const updates: string[] = ['"modifiedBy" = $1', '"modifiedOn" = $2'] + const params: any[] = [tx.modifiedBy, tx.modifiedOn, tx.objectId, this.workspaceId.name] + let paramsIndex = 5 + const domain = this.hierarchy.getDomain(tx.objectClass) + const { extractedFields, remainingData } = parseUpdate(domain, tx.operations) const { space, attachedTo, ...ops } = tx.operations as any if (ops['%hash%'] === undefined) { ops['%hash%'] = null } - if (space !== undefined) { - updates.push(`space = '${space}'`) - } - if (attachedTo !== undefined) { - updates.push(`"attachedTo" = ${attachedTo != null ? "'" + attachedTo + "'" : 'NULL'}`) + for (const key in extractedFields) { + updates.push(`"${key}" = $${paramsIndex++}`) + params.push((extractedFields as any)[key]) } let from = 'data' let dataUpdated = false - for (const key in ops) { + for (const key in remainingData) { if (ops[key] === undefined) continue - from = `jsonb_set(${from}, '{${key}}', '${getUpdateValue(ops[key])}', true)` + from = `jsonb_set(${from}, '{${key}}', $${paramsIndex++}::jsonb, true)` + params.push(JSON.stringify((remainingData as any)[key])) dataUpdated = true } if (dataUpdated) { @@ -1340,13 +1449,13 @@ class PostgresAdapter extends PostgresAdapterBase { await this.retryTxn(async (client) => { await client.query( `UPDATE ${translateDomain(this.hierarchy.getDomain(tx.objectClass))} SET ${updates.join(', ')} WHERE _id = $3 AND "workspaceId" = $4`, - [tx.modifiedBy, tx.modifiedOn, tx.objectId, this.workspaceId.name] + params ) - if (retrieve) { - const object = await this.findDoc(ctx, client, tx.objectClass, tx.objectId) - return { object } - } }) + if (retrieve) { + const object = await this.findDoc(ctx, this.connection, tx.objectClass, tx.objectId) + return { object } + } } catch (err) { console.error(err) } @@ -1373,15 +1482,16 @@ class PostgresAdapter extends PostgresAdapterBase { } protected async txRemoveDoc (ctx: MeasureContext, tx: TxRemoveDoc): Promise { - return await ctx.with('tx-remove-doc', { _class: tx.objectClass }, async () => { + await ctx.with('tx-remove-doc', { _class: tx.objectClass }, async () => { const domain = translateDomain(this.hierarchy.getDomain(tx.objectClass)) - return await this.retryTxn(async (client) => { + await this.retryTxn(async (client) => { await client.query(`DELETE FROM ${domain} WHERE _id = $1 AND "workspaceId" = $2`, [ tx.objectId, this.workspaceId.name ]) }) }) + return {} } } @@ -1405,8 +1515,8 @@ class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter { } async getModel (ctx: MeasureContext): Promise { - const res = await this.client.query( - `SELECT * FROM ${translateDomain(DOMAIN_TX)} WHERE "workspaceId" = '${this.workspaceId.name}' AND data->>'objectSpace' = '${core.space.Model}' ORDER BY _id ASC, "modifiedOn" ASC` + const res = await this.connection.query( + `SELECT * FROM ${translateDomain(DOMAIN_TX)} WHERE "workspaceId" = '${this.workspaceId.name}' AND data->>'objectSpace' = '${core.space.Model}' ORDER BY _id ASC, "modifiedOn" ASC` ) const model = res.rows.map((p) => parseDoc(p)) // We need to put all core.account.System transactions first @@ -1428,7 +1538,10 @@ export async function createPostgresAdapter ( modelDb: ModelDb ): Promise { const client = getDBClient(url) - const adapter = new PostgresAdapter(await client.getClient(), client, workspaceId, hierarchy, modelDb) + const pool = await client.getClient() + const mainConnection = await pool.connect() + const txConnection = await pool.connect() + const adapter = new PostgresAdapter(pool, mainConnection, txConnection, client, workspaceId, hierarchy, modelDb) return adapter } @@ -1443,7 +1556,10 @@ export async function createPostgresTxAdapter ( modelDb: ModelDb ): Promise { const client = getDBClient(url) - const adapter = new PostgresTxAdapter(await client.getClient(), client, workspaceId, hierarchy, modelDb) + const pool = await client.getClient() + const mainConnection = await pool.connect() + const txConnection = await pool.connect() + const adapter = new PostgresTxAdapter(pool, mainConnection, txConnection, client, workspaceId, hierarchy, modelDb) await adapter.init() return adapter } diff --git a/server/postgres/src/types.ts b/server/postgres/src/types.ts new file mode 100644 index 0000000000..ede3b253f2 --- /dev/null +++ b/server/postgres/src/types.ts @@ -0,0 +1 @@ +export type ValueType = 'common' | 'array' | 'dataArray' diff --git a/server/postgres/src/utils.ts b/server/postgres/src/utils.ts index 58091da343..6640666531 100644 --- a/server/postgres/src/utils.ts +++ b/server/postgres/src/utils.ts @@ -18,9 +18,11 @@ import core, { AccountRole, type Class, type Doc, + type DocumentUpdate, type Domain, type FieldIndexConfig, generateId, + type MixinUpdate, type Projection, type Ref, type WorkspaceId @@ -28,6 +30,7 @@ import core, { import { PlatformError, unknownStatus } from '@hcengineering/platform' import { type DomainHelperOperations } from '@hcengineering/server-core' import { Pool, type PoolClient } from 'pg' +import { defaultSchema, domainSchemas, getSchema } from './schemas' const connections = new Map() @@ -87,24 +90,26 @@ export async function createTable (client: Pool, domains: string[]): Promise !exists.rows.map((it) => it.table_name).includes(it)) await retryTxn(client, async (client) => { for (const domain of toCreate) { + const schema = getSchema(domain) + const fields: string[] = [] + for (const key in schema) { + const val = schema[key] + fields.push(`"${key}" ${val[0]} ${val[1] ? 'NOT NULL' : ''}`) + } + const colums = fields.join(', ') await client.query( `CREATE TABLE ${domain} ( - "workspaceId" VARCHAR(255) NOT NULL, - _id VARCHAR(255) NOT NULL, - _class VARCHAR(255) NOT NULL, - "createdBy" VARCHAR(255), - "modifiedBy" VARCHAR(255) NOT NULL, - "modifiedOn" bigint NOT NULL, - "createdOn" bigint, - space VARCHAR(255) NOT NULL, - "attachedTo" VARCHAR(255), + "workspaceId" text NOT NULL, + ${colums}, data JSONB NOT NULL, PRIMARY KEY("workspaceId", _id) )` ) - await client.query(` - CREATE INDEX ${domain}_attachedTo ON ${domain} ("attachedTo") - `) + if (schema.attachedTo !== undefined) { + await client.query(` + CREATE INDEX ${domain}_attachedTo ON ${domain} ("attachedTo") + `) + } await client.query(` CREATE INDEX ${domain}_class ON ${domain} (_class) `) @@ -221,19 +226,67 @@ export function getDBClient (connectionString: string, database?: string): Postg return new ClientRef(existing) } -export function convertDoc (doc: T, workspaceId: string): DBDoc { - const { _id, _class, createdBy, modifiedBy, modifiedOn, createdOn, space, attachedTo, ...data } = doc as any - return { - _id, - _class, - createdBy, - modifiedBy, - modifiedOn, - createdOn, - space, - attachedTo, +export function convertDoc (domain: string, doc: T, workspaceId: string): DBDoc { + const extractedFields: Doc & Record = { + _id: doc._id, + space: doc.space, + createdBy: doc.createdBy, + modifiedBy: doc.modifiedBy, + modifiedOn: doc.modifiedOn, + createdOn: doc.createdOn, + _class: doc._class + } + const remainingData: Partial = {} + + for (const key in doc) { + if (Object.keys(extractedFields).includes(key)) continue + if (getDocFieldsByDomains(domain).includes(key)) { + extractedFields[key] = doc[key] + } else { + remainingData[key] = doc[key] + } + } + + const res: any = { + ...extractedFields, workspaceId, - data + data: remainingData + } + return res +} + +export function parseUpdate ( + domain: string, + ops: DocumentUpdate | MixinUpdate +): { + extractedFields: Partial + remainingData: Partial + } { + const extractedFields: Partial = {} + const remainingData: Partial = {} + + for (const key in ops) { + if (key === '$push' || key === '$pull') { + const val = (ops as any)[key] + for (const k in val) { + if (getDocFieldsByDomains(domain).includes(k)) { + ;(extractedFields as any)[k] = val[key] + } else { + ;(remainingData as any)[k] = val[key] + } + } + } else { + if (getDocFieldsByDomains(domain).includes(key)) { + ;(extractedFields as any)[key] = (ops as any)[key] + } else { + ;(remainingData as any)[key] = (ops as any)[key] + } + } + } + + return { + extractedFields, + remainingData } } @@ -343,39 +396,17 @@ export function parseDoc (doc: DBDoc): T { export interface DBDoc extends Doc { workspaceId: string - attachedTo?: Ref data: Record + [key: string]: any } -export function isDataField (field: string): boolean { - return !docFields.includes(field) +export function isDataField (domain: string, field: string): boolean { + return !getDocFieldsByDomains(domain).includes(field) } -export const docFields: string[] = [ - '_id', - '_class', - 'createdBy', - 'modifiedBy', - 'modifiedOn', - 'createdOn', - 'space', - 'attachedTo' -] as const - -export function getUpdateValue (value: any): string { - if (typeof value === 'string') { - return '"' + escapeDoubleQuotes(value) + '"' - } - if (typeof value === 'object') { - return JSON.stringify(value) - } - return value -} - -function escapeDoubleQuotes (jsonString: string): string { - const unescapedQuotes = /(?> classes?: Ref>[] // filter by classes } + +export class Mutex { + private locked: boolean = false + private readonly waitingQueue: Array<(value: boolean) => void> = [] + + private async acquire (): Promise { + while (this.locked) { + await new Promise((resolve) => { + this.waitingQueue.push(resolve) + }) + } + this.locked = true + } + + private release (): void { + if (!this.locked) { + throw new Error('Mutex is not locked') + } + + this.locked = false + const nextResolver = this.waitingQueue.shift() + if (nextResolver !== undefined) { + nextResolver(true) + } + } + + async runExclusive(fn: () => Promise | T): Promise { + await this.acquire() + try { + return await fn() + } finally { + this.release() + } + } +} diff --git a/server/workspace-service/src/ws-operations.ts b/server/workspace-service/src/ws-operations.ts index 44e6d70f26..123811c904 100644 --- a/server/workspace-service/src/ws-operations.ts +++ b/server/workspace-service/src/ws-operations.ts @@ -86,7 +86,8 @@ export async function createWorkspace ( version: Data, progress: number, message?: string - ) => Promise + ) => Promise, + external: boolean = false ): Promise { const childLogger = ctx.newChild('createWorkspace', {}, { workspace: workspaceInfo.workspace }) const ctxModellogger: ModelLogger = { @@ -162,7 +163,8 @@ export async function createWorkspace ( await handleWsEvent?.('progress', version, 80 + Math.round((Math.min(value, 100) / 100) * 20)) }, false, - 'disable' + 'disable', + external ) await handleWsEvent?.('create-done', version, 100, '') diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml index 1175e6d3ab..be43bfc3c8 100644 --- a/tests/docker-compose.yaml +++ b/tests/docker-compose.yaml @@ -43,6 +43,7 @@ services: links: - mongodb - minio + - postgres ports: - 3003:3003 volumes: @@ -59,6 +60,7 @@ services: image: hardcoreeng/workspace links: - mongodb + - postgres - minio volumes: - ./branding-test.json:/var/cfg/branding.json @@ -106,6 +108,7 @@ services: - elastic - minio - rekoni + - postgres - account ports: - 3334:3334 diff --git a/tests/sanity-ws/000001/_migrations-1715107031571-0.snp.gz b/tests/sanity-ws/000001/_migrations-1715107031571-0.snp.gz deleted file mode 100644 index fed2a4adc9..0000000000 Binary files a/tests/sanity-ws/000001/_migrations-1715107031571-0.snp.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/_migrations-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/_migrations-data-1715107031571-1.tar.gz deleted file mode 100644 index 138541090e..0000000000 Binary files a/tests/sanity-ws/000001/_migrations-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/activity-1715107031571-0.snp.gz b/tests/sanity-ws/000001/activity-1715107031571-0.snp.gz deleted file mode 100644 index 0470125a92..0000000000 Binary files a/tests/sanity-ws/000001/activity-1715107031571-0.snp.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/activity-1729588583864-0.snp.gz b/tests/sanity-ws/000001/activity-1729588583864-0.snp.gz new file mode 100644 index 0000000000..b6f0ce426d Binary files /dev/null and b/tests/sanity-ws/000001/activity-1729588583864-0.snp.gz differ diff --git a/tests/sanity-ws/000001/activity-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/activity-data-1715107031571-1.tar.gz deleted file mode 100644 index c653ff04c2..0000000000 Binary files a/tests/sanity-ws/000001/activity-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/activity-data-1729588583864-1.tar.gz b/tests/sanity-ws/000001/activity-data-1729588583864-1.tar.gz new file mode 100644 index 0000000000..1fd7b0103c Binary files /dev/null and b/tests/sanity-ws/000001/activity-data-1729588583864-1.tar.gz differ diff --git a/tests/sanity-ws/000001/attachment-1715107031571-0.snp.gz b/tests/sanity-ws/000001/attachment-1715107031571-0.snp.gz deleted file mode 100644 index 6d6ee23eee..0000000000 Binary files a/tests/sanity-ws/000001/attachment-1715107031571-0.snp.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/attachment-1729588583864-0.snp.gz b/tests/sanity-ws/000001/attachment-1729588583864-0.snp.gz new file mode 100644 index 0000000000..e0139da5f7 Binary files /dev/null and b/tests/sanity-ws/000001/attachment-1729588583864-0.snp.gz differ diff --git a/tests/sanity-ws/000001/attachment-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/attachment-data-1715107031571-1.tar.gz deleted file mode 100644 index d78a2a3dbe..0000000000 Binary files a/tests/sanity-ws/000001/attachment-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/attachment-data-1729588583864-1.tar.gz b/tests/sanity-ws/000001/attachment-data-1729588583864-1.tar.gz new file mode 100644 index 0000000000..e41b2d7085 Binary files /dev/null and b/tests/sanity-ws/000001/attachment-data-1729588583864-1.tar.gz differ diff --git a/tests/sanity-ws/000001/blob-1715107031571-0.snp.gz b/tests/sanity-ws/000001/blob-1729588583864-0.snp.gz similarity index 100% rename from tests/sanity-ws/000001/blob-1715107031571-0.snp.gz rename to tests/sanity-ws/000001/blob-1729588583864-0.snp.gz diff --git a/tests/sanity-ws/000001/blob-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/blob-data-1715107031571-1.tar.gz deleted file mode 100644 index 796e293510..0000000000 Binary files a/tests/sanity-ws/000001/blob-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/blob-data-1729588583864-1.tar.gz b/tests/sanity-ws/000001/blob-data-1729588583864-1.tar.gz new file mode 100644 index 0000000000..6aa1922190 Binary files /dev/null and b/tests/sanity-ws/000001/blob-data-1729588583864-1.tar.gz differ diff --git a/tests/sanity-ws/000001/channel-1715107031571-0.snp.gz b/tests/sanity-ws/000001/channel-1715107031571-0.snp.gz deleted file mode 100644 index 5a7c6eb7a1..0000000000 Binary files a/tests/sanity-ws/000001/channel-1715107031571-0.snp.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/channel-1729588583864-0.snp.gz b/tests/sanity-ws/000001/channel-1729588583864-0.snp.gz new file mode 100644 index 0000000000..2ca617a3cd Binary files /dev/null and b/tests/sanity-ws/000001/channel-1729588583864-0.snp.gz differ diff --git a/tests/sanity-ws/000001/channel-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/channel-data-1715107031571-1.tar.gz deleted file mode 100644 index 80ac2438e9..0000000000 Binary files a/tests/sanity-ws/000001/channel-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/channel-data-1729588583864-1.tar.gz b/tests/sanity-ws/000001/channel-data-1729588583864-1.tar.gz new file mode 100644 index 0000000000..83bf9cd342 Binary files /dev/null and b/tests/sanity-ws/000001/channel-data-1729588583864-1.tar.gz differ diff --git a/tests/sanity-ws/000001/chunter-1715107031571-0.snp.gz b/tests/sanity-ws/000001/chunter-1729588583864-0.snp.gz similarity index 100% rename from tests/sanity-ws/000001/chunter-1715107031571-0.snp.gz rename to tests/sanity-ws/000001/chunter-1729588583864-0.snp.gz diff --git a/tests/sanity-ws/000001/chunter-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/chunter-data-1715107031571-1.tar.gz deleted file mode 100644 index 6eaf3fb918..0000000000 Binary files a/tests/sanity-ws/000001/chunter-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/chunter-data-1729588583864-1.tar.gz b/tests/sanity-ws/000001/chunter-data-1729588583864-1.tar.gz new file mode 100644 index 0000000000..3851f2b1b3 Binary files /dev/null and b/tests/sanity-ws/000001/chunter-data-1729588583864-1.tar.gz differ diff --git a/tests/sanity-ws/000001/contact-1715107031571-0.snp.gz b/tests/sanity-ws/000001/contact-1715107031571-0.snp.gz deleted file mode 100644 index 7970311d22..0000000000 Binary files a/tests/sanity-ws/000001/contact-1715107031571-0.snp.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/contact-1729588583864-0.snp.gz b/tests/sanity-ws/000001/contact-1729588583864-0.snp.gz new file mode 100644 index 0000000000..61d3188be9 Binary files /dev/null and b/tests/sanity-ws/000001/contact-1729588583864-0.snp.gz differ diff --git a/tests/sanity-ws/000001/contact-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/contact-data-1715107031571-1.tar.gz deleted file mode 100644 index 5698bc438f..0000000000 Binary files a/tests/sanity-ws/000001/contact-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/contact-data-1729588583864-1.tar.gz b/tests/sanity-ws/000001/contact-data-1729588583864-1.tar.gz new file mode 100644 index 0000000000..e43f838af6 Binary files /dev/null and b/tests/sanity-ws/000001/contact-data-1729588583864-1.tar.gz differ diff --git a/tests/sanity-ws/000001/doc-index-state-1715107031571-0.snp.gz b/tests/sanity-ws/000001/doc-index-state-1715107031571-0.snp.gz deleted file mode 100644 index d2e8cc9823..0000000000 Binary files a/tests/sanity-ws/000001/doc-index-state-1715107031571-0.snp.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/doc-index-state-1729588583864-0.snp.gz b/tests/sanity-ws/000001/doc-index-state-1729588583864-0.snp.gz new file mode 100644 index 0000000000..deded16d78 Binary files /dev/null and b/tests/sanity-ws/000001/doc-index-state-1729588583864-0.snp.gz differ diff --git a/tests/sanity-ws/000001/doc-index-state-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/doc-index-state-data-1715107031571-1.tar.gz deleted file mode 100644 index bd3f52ac59..0000000000 Binary files a/tests/sanity-ws/000001/doc-index-state-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/doc-index-state-data-1729588583864-1.tar.gz b/tests/sanity-ws/000001/doc-index-state-data-1729588583864-1.tar.gz new file mode 100644 index 0000000000..cee19fa486 Binary files /dev/null and b/tests/sanity-ws/000001/doc-index-state-data-1729588583864-1.tar.gz differ diff --git a/tests/sanity-ws/000001/fulltext-blob-1715107031571-0.snp.gz b/tests/sanity-ws/000001/fulltext-blob-1715107031571-0.snp.gz deleted file mode 100644 index 4b45cd66af..0000000000 Binary files a/tests/sanity-ws/000001/fulltext-blob-1715107031571-0.snp.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/fulltext-blob-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/fulltext-blob-data-1715107031571-1.tar.gz deleted file mode 100644 index 5995f6bd0e..0000000000 Binary files a/tests/sanity-ws/000001/fulltext-blob-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/inventory-1715107031571-0.snp.gz b/tests/sanity-ws/000001/inventory-1715107031571-0.snp.gz deleted file mode 100644 index 1dd3e241a4..0000000000 Binary files a/tests/sanity-ws/000001/inventory-1715107031571-0.snp.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/inventory-1729588583864-0.snp.gz b/tests/sanity-ws/000001/inventory-1729588583864-0.snp.gz new file mode 100644 index 0000000000..98c2745906 Binary files /dev/null and b/tests/sanity-ws/000001/inventory-1729588583864-0.snp.gz differ diff --git a/tests/sanity-ws/000001/inventory-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/inventory-data-1715107031571-1.tar.gz deleted file mode 100644 index 2bd95b9f51..0000000000 Binary files a/tests/sanity-ws/000001/inventory-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/inventory-data-1729588583864-1.tar.gz b/tests/sanity-ws/000001/inventory-data-1729588583864-1.tar.gz new file mode 100644 index 0000000000..f2585594b4 Binary files /dev/null and b/tests/sanity-ws/000001/inventory-data-1729588583864-1.tar.gz differ diff --git a/tests/sanity-ws/000001/kanban-1715107031571-0.snp.gz b/tests/sanity-ws/000001/kanban-1715107031571-0.snp.gz deleted file mode 100644 index 8f96d3048e..0000000000 Binary files a/tests/sanity-ws/000001/kanban-1715107031571-0.snp.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/kanban-1729588583864-0.snp.gz b/tests/sanity-ws/000001/kanban-1729588583864-0.snp.gz new file mode 100644 index 0000000000..7a80797f49 Binary files /dev/null and b/tests/sanity-ws/000001/kanban-1729588583864-0.snp.gz differ diff --git a/tests/sanity-ws/000001/kanban-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/kanban-data-1715107031571-1.tar.gz deleted file mode 100644 index 61a45ae96b..0000000000 Binary files a/tests/sanity-ws/000001/kanban-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/kanban-data-1729588583864-1.tar.gz b/tests/sanity-ws/000001/kanban-data-1729588583864-1.tar.gz new file mode 100644 index 0000000000..82dd03b8be Binary files /dev/null and b/tests/sanity-ws/000001/kanban-data-1729588583864-1.tar.gz differ diff --git a/tests/sanity-ws/000001/notification-1715107031571-0.snp.gz b/tests/sanity-ws/000001/notification-1715107031571-0.snp.gz deleted file mode 100644 index 5c36d3427c..0000000000 Binary files a/tests/sanity-ws/000001/notification-1715107031571-0.snp.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/notification-1729588583864-0.snp.gz b/tests/sanity-ws/000001/notification-1729588583864-0.snp.gz new file mode 100644 index 0000000000..5222cb208a Binary files /dev/null and b/tests/sanity-ws/000001/notification-1729588583864-0.snp.gz differ diff --git a/tests/sanity-ws/000001/notification-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/notification-data-1715107031571-1.tar.gz deleted file mode 100644 index 206e74abd2..0000000000 Binary files a/tests/sanity-ws/000001/notification-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/notification-data-1729588583864-1.tar.gz b/tests/sanity-ws/000001/notification-data-1729588583864-1.tar.gz new file mode 100644 index 0000000000..c4083eca77 Binary files /dev/null and b/tests/sanity-ws/000001/notification-data-1729588583864-1.tar.gz differ diff --git a/tests/sanity-ws/000001/space-1715107031571-0.snp.gz b/tests/sanity-ws/000001/space-1715107031571-0.snp.gz deleted file mode 100644 index 1dda93654c..0000000000 Binary files a/tests/sanity-ws/000001/space-1715107031571-0.snp.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/space-1729588583864-0.snp.gz b/tests/sanity-ws/000001/space-1729588583864-0.snp.gz new file mode 100644 index 0000000000..5cf596d84a Binary files /dev/null and b/tests/sanity-ws/000001/space-1729588583864-0.snp.gz differ diff --git a/tests/sanity-ws/000001/space-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/space-data-1715107031571-1.tar.gz deleted file mode 100644 index e5fb84f63b..0000000000 Binary files a/tests/sanity-ws/000001/space-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/space-data-1729588583864-1.tar.gz b/tests/sanity-ws/000001/space-data-1729588583864-1.tar.gz new file mode 100644 index 0000000000..790ee00274 Binary files /dev/null and b/tests/sanity-ws/000001/space-data-1729588583864-1.tar.gz differ diff --git a/tests/sanity-ws/000001/status-1715107031571-0.snp.gz b/tests/sanity-ws/000001/status-1715107031571-0.snp.gz deleted file mode 100644 index 2922241d28..0000000000 Binary files a/tests/sanity-ws/000001/status-1715107031571-0.snp.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/status-1729588583864-0.snp.gz b/tests/sanity-ws/000001/status-1729588583864-0.snp.gz new file mode 100644 index 0000000000..07735bf4a4 Binary files /dev/null and b/tests/sanity-ws/000001/status-1729588583864-0.snp.gz differ diff --git a/tests/sanity-ws/000001/status-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/status-data-1715107031571-1.tar.gz deleted file mode 100644 index af21060f15..0000000000 Binary files a/tests/sanity-ws/000001/status-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/status-data-1729588583864-1.tar.gz b/tests/sanity-ws/000001/status-data-1729588583864-1.tar.gz new file mode 100644 index 0000000000..92b9c284a9 Binary files /dev/null and b/tests/sanity-ws/000001/status-data-1729588583864-1.tar.gz differ diff --git a/tests/sanity-ws/000001/tags-1715107031571-0.snp.gz b/tests/sanity-ws/000001/tags-1715107031571-0.snp.gz deleted file mode 100644 index 255a0f3f12..0000000000 Binary files a/tests/sanity-ws/000001/tags-1715107031571-0.snp.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/tags-1729588583864-0.snp.gz b/tests/sanity-ws/000001/tags-1729588583864-0.snp.gz new file mode 100644 index 0000000000..b39225ce11 Binary files /dev/null and b/tests/sanity-ws/000001/tags-1729588583864-0.snp.gz differ diff --git a/tests/sanity-ws/000001/tags-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/tags-data-1715107031571-1.tar.gz deleted file mode 100644 index 811cf4924f..0000000000 Binary files a/tests/sanity-ws/000001/tags-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/tags-data-1729588583864-1.tar.gz b/tests/sanity-ws/000001/tags-data-1729588583864-1.tar.gz new file mode 100644 index 0000000000..8cc79dc357 Binary files /dev/null and b/tests/sanity-ws/000001/tags-data-1729588583864-1.tar.gz differ diff --git a/tests/sanity-ws/000001/task-1715107031571-0.snp.gz b/tests/sanity-ws/000001/task-1715107031571-0.snp.gz deleted file mode 100644 index 6ba5a35582..0000000000 Binary files a/tests/sanity-ws/000001/task-1715107031571-0.snp.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/task-1729588583864-0.snp.gz b/tests/sanity-ws/000001/task-1729588583864-0.snp.gz new file mode 100644 index 0000000000..a9c992c6ff Binary files /dev/null and b/tests/sanity-ws/000001/task-1729588583864-0.snp.gz differ diff --git a/tests/sanity-ws/000001/task-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/task-data-1715107031571-1.tar.gz deleted file mode 100644 index c1e5caa9e0..0000000000 Binary files a/tests/sanity-ws/000001/task-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/task-data-1729588583864-1.tar.gz b/tests/sanity-ws/000001/task-data-1729588583864-1.tar.gz new file mode 100644 index 0000000000..94dc24ac89 Binary files /dev/null and b/tests/sanity-ws/000001/task-data-1729588583864-1.tar.gz differ diff --git a/tests/sanity-ws/000001/time-1715107031571-0.snp.gz b/tests/sanity-ws/000001/time-1729588583864-0.snp.gz similarity index 100% rename from tests/sanity-ws/000001/time-1715107031571-0.snp.gz rename to tests/sanity-ws/000001/time-1729588583864-0.snp.gz diff --git a/tests/sanity-ws/000001/time-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/time-data-1715107031571-1.tar.gz deleted file mode 100644 index b96ba896c3..0000000000 Binary files a/tests/sanity-ws/000001/time-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/time-data-1729588583864-1.tar.gz b/tests/sanity-ws/000001/time-data-1729588583864-1.tar.gz new file mode 100644 index 0000000000..8f3d3d43c3 Binary files /dev/null and b/tests/sanity-ws/000001/time-data-1729588583864-1.tar.gz differ diff --git a/tests/sanity-ws/000001/tracker-1715107031571-0.snp.gz b/tests/sanity-ws/000001/tracker-1715107031571-0.snp.gz deleted file mode 100644 index 2e3a5539ca..0000000000 Binary files a/tests/sanity-ws/000001/tracker-1715107031571-0.snp.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/tracker-1729588583864-0.snp.gz b/tests/sanity-ws/000001/tracker-1729588583864-0.snp.gz new file mode 100644 index 0000000000..0ecd92092d Binary files /dev/null and b/tests/sanity-ws/000001/tracker-1729588583864-0.snp.gz differ diff --git a/tests/sanity-ws/000001/tracker-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/tracker-data-1715107031571-1.tar.gz deleted file mode 100644 index 50e7ca87f4..0000000000 Binary files a/tests/sanity-ws/000001/tracker-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/tracker-data-1729588583864-1.tar.gz b/tests/sanity-ws/000001/tracker-data-1729588583864-1.tar.gz new file mode 100644 index 0000000000..651243d8b6 Binary files /dev/null and b/tests/sanity-ws/000001/tracker-data-1729588583864-1.tar.gz differ diff --git a/tests/sanity-ws/000001/tx-1715107031571-0.snp.gz b/tests/sanity-ws/000001/tx-1715107031571-0.snp.gz deleted file mode 100644 index 11aa9c8158..0000000000 Binary files a/tests/sanity-ws/000001/tx-1715107031571-0.snp.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/tx-1729588583864-0.snp.gz b/tests/sanity-ws/000001/tx-1729588583864-0.snp.gz new file mode 100644 index 0000000000..4a2cee9ce5 Binary files /dev/null and b/tests/sanity-ws/000001/tx-1729588583864-0.snp.gz differ diff --git a/tests/sanity-ws/000001/tx-data-1715107031571-1.tar.gz b/tests/sanity-ws/000001/tx-data-1715107031571-1.tar.gz deleted file mode 100644 index 635a048463..0000000000 Binary files a/tests/sanity-ws/000001/tx-data-1715107031571-1.tar.gz and /dev/null differ diff --git a/tests/sanity-ws/000001/tx-data-1729588583864-1.tar.gz b/tests/sanity-ws/000001/tx-data-1729588583864-1.tar.gz new file mode 100644 index 0000000000..cd9155b45f Binary files /dev/null and b/tests/sanity-ws/000001/tx-data-1729588583864-1.tar.gz differ diff --git a/tests/sanity-ws/backup.json.gz b/tests/sanity-ws/backup.json.gz index 3263eed53b..e985e060e5 100644 Binary files a/tests/sanity-ws/backup.json.gz and b/tests/sanity-ws/backup.json.gz differ diff --git a/tests/sanity-ws/backup.size.gz b/tests/sanity-ws/backup.size.gz new file mode 100644 index 0000000000..781156afb9 Binary files /dev/null and b/tests/sanity-ws/backup.size.gz differ diff --git a/workers/datalake/src/blob.ts b/workers/datalake/src/blob.ts index e21f9c327e..7b79a184a0 100644 --- a/workers/datalake/src/blob.ts +++ b/workers/datalake/src/blob.ts @@ -24,8 +24,8 @@ import { copyVideo, deleteVideo } from './video' const expires = 86400 const cacheControl = `public,max-age=${expires}` -// 64MB hash limit -const HASH_LIMIT = 64 * 1024 * 1024 +// 1MB hash limit +const HASH_LIMIT = 1 * 1024 * 1024 interface BlobMetadata { lastModified: number @@ -75,7 +75,9 @@ export async function handleBlobGet ( const status = length !== undefined && length < object.size ? 206 : 200 const response = new Response(object?.body, { headers, status }) - ctx.waitUntil(cache.put(request, response.clone())) + if (response.status === 200) { + ctx.waitUntil(cache.put(request, response.clone())) + } return response } @@ -119,8 +121,22 @@ export async function deleteBlob (env: Env, workspace: string, name: string): Pr } export async function postBlobFormData (request: Request, env: Env, workspace: string): Promise { + const contentType = request.headers.get('Content-Type') + if (contentType === null || !contentType.includes('multipart/form-data')) { + console.error({ error: 'expected multipart/form-data' }) + return error(400, 'expected multipart/form-data') + } + const sql = postgres(env.HYPERDRIVE.connectionString) - const formData = await request.formData() + + let formData: FormData + try { + formData = await request.formData() + } catch (err: any) { + const message = err instanceof Error ? err.message : String(err) + console.error({ error: 'failed to parse form data', message }) + return error(400, 'failed to parse form data') + } const files: [File, key: string][] = [] formData.forEach((value: any, key: string) => { @@ -166,14 +182,11 @@ async function saveBlob ( const httpMetadata = { contentType: type, cacheControl } const filename = getUniqueFilename() - const sha256hash = await getSha256(file) - - if (sha256hash !== null) { - // Lucky boy, nothing to upload, use existing blob - const hash = sha256hash - + if (file.size <= HASH_LIMIT) { + const hash = await getSha256(file) const data = await db.getData(sql, { hash, location }) if (data !== null) { + // Lucky boy, nothing to upload, use existing blob await db.createBlob(sql, { workspace, name, hash, location }) } else { await bucket.put(filename, file, { httpMetadata }) @@ -187,11 +200,7 @@ async function saveBlob ( } else { // For large files we cannot calculate checksum beforehead // upload file with unique filename and then obtain checksum - const object = await bucket.put(filename, file, { httpMetadata }) - - const hash = - object.checksums.md5 !== undefined ? getMd5Checksum(object.checksums.md5) : (crypto.randomUUID() as UUID) - + const { hash } = await uploadLargeFile(bucket, file, filename, { httpMetadata }) const data = await db.getData(sql, { hash, location }) if (data !== null) { // We found an existing blob with the same hash @@ -218,7 +227,7 @@ export async function handleBlobUploaded (env: Env, workspace: string, name: str throw Error('blob not found') } - const hash = object.checksums.md5 !== undefined ? getMd5Checksum(object.checksums.md5) : (crypto.randomUUID() as UUID) + const hash = object.checksums.md5 !== undefined ? digestToUUID(object.checksums.md5) : (crypto.randomUUID() as UUID) const data = await db.getData(sql, { hash, location }) if (data !== null) { @@ -232,23 +241,40 @@ export async function handleBlobUploaded (env: Env, workspace: string, name: str } } +async function uploadLargeFile ( + bucket: R2Bucket, + file: File, + filename: string, + options: R2PutOptions +): Promise<{ hash: UUID }> { + const digestStream = new crypto.DigestStream('SHA-256') + + const fileStream = file.stream() + const [digestFS, uploadFS] = fileStream.tee() + + const digestPromise = digestFS.pipeTo(digestStream) + const uploadPromise = bucket.put(filename, uploadFS, options) + + await Promise.all([digestPromise, uploadPromise]) + + const hash = digestToUUID(await digestStream.digest) + + return { hash } +} + function getUniqueFilename (): UUID { return crypto.randomUUID() as UUID } -async function getSha256 (file: File): Promise { - if (file.size > HASH_LIMIT) { - return null - } - +async function getSha256 (file: File): Promise { const digestStream = new crypto.DigestStream('SHA-256') await file.stream().pipeTo(digestStream) const digest = await digestStream.digest - return toUUID(new Uint8Array(digest)) + return digestToUUID(digest) } -function getMd5Checksum (digest: ArrayBuffer): UUID { +function digestToUUID (digest: ArrayBuffer): UUID { return toUUID(new Uint8Array(digest)) }