diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index 35d45d4d99..b24580d62b 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -190,7 +190,7 @@ dependencies: version: file:projects/core.tgz(@types/node@20.11.19)(esbuild@0.20.1)(ts-node@10.9.2) '@rush-temp/datalake': specifier: file:./projects/datalake.tgz - version: file:projects/datalake.tgz(esbuild@0.20.1)(ts-node@10.9.2) + version: file:projects/datalake.tgz(esbuild@0.20.1) '@rush-temp/desktop': specifier: file:./projects/desktop.tgz version: file:projects/desktop.tgz(bufferutil@4.0.8)(sass@1.71.1)(utf-8-validate@6.0.4) @@ -27080,13 +27080,12 @@ packages: - ts-node dev: false - file:projects/datalake.tgz(esbuild@0.20.1)(ts-node@10.9.2): - resolution: {integrity: sha512-pqgfJAfjDTa3AWRK263xljvkd1GLinDFrjTGW7res8krRskMMJ3K6gj3kfnLjyKmWeAesJQ5CSnFybPnPSJq/Q==, tarball: file:projects/datalake.tgz} + file:projects/datalake.tgz(esbuild@0.20.1): + resolution: {integrity: sha512-UX1RJWMtrQY5HWrFKnwi2vrRYfR8ZSRo2PtLn04ozWueiiLS3Q61UauAUfPDRtO0K5cJgecH7+gX750dx8oUhQ==, tarball: file:projects/datalake.tgz} id: file:projects/datalake.tgz name: '@rush-temp/datalake' version: 0.0.0 dependencies: - '@aws-sdk/client-s3': 3.577.0 '@types/jest': 29.5.12 '@types/node': 20.11.19 '@types/node-fetch': 2.6.11 @@ -27102,17 +27101,19 @@ packages: node-fetch: 2.7.0 prettier: 3.2.5 ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.3.3) + ts-node: 10.9.2(@types/node@20.11.19)(typescript@5.3.3) typescript: 5.3.3 transitivePeerDependencies: - '@babel/core' - '@jest/types' + - '@swc/core' + - '@swc/wasm' - babel-jest - babel-plugin-macros - encoding - esbuild - node-notifier - supports-color - - ts-node dev: false file:projects/desktop-1.tgz(webpack@5.90.3): diff --git a/server/datalake/package.json b/server/datalake/package.json index f947ec1cb0..809cdd2cda 100644 --- a/server/datalake/package.json +++ b/server/datalake/package.json @@ -32,7 +32,8 @@ "jest": "^29.7.0", "ts-jest": "^29.1.1", "@types/jest": "^29.5.5", - "@types/node-fetch": "~2.6.2" + "@types/node-fetch": "~2.6.2", + "ts-node": "^10.8.0" }, "dependencies": { "@hcengineering/core": "^0.6.32", diff --git a/server/datalake/src/client.ts b/server/datalake/src/client.ts index cfe03f95a0..acb5031b7b 100644 --- a/server/datalake/src/client.ts +++ b/server/datalake/src/client.ts @@ -49,11 +49,7 @@ type BlobUploadResult = BlobUploadSuccess | BlobUploadError /** @public */ export class Client { - private readonly endpoint: string - - constructor (host: string, port?: number) { - this.endpoint = port !== undefined ? `${host}:${port}` : host - } + constructor (private readonly endpoint: string) {} getObjectUrl (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): string { const path = `/blob/${workspace.name}/${encodeURIComponent(objectName)}` @@ -81,7 +77,7 @@ export class Client { ): Promise { const url = this.getObjectUrl(ctx, workspace, objectName) const headers = { - Range: `bytes=${offset}-${length ?? ''}` + Range: length !== undefined ? `bytes=${offset}-${offset + length - 1}` : `bytes=${offset}` } const response = await fetchSafe(ctx, url, { headers }) diff --git a/server/datalake/src/index.ts b/server/datalake/src/index.ts index f743e97718..5cec66083e 100644 --- a/server/datalake/src/index.ts +++ b/server/datalake/src/index.ts @@ -37,7 +37,7 @@ export class DatalakeService implements StorageAdapter { static config = 'datalake' client: Client constructor (readonly opt: DatalakeConfig) { - this.client = new Client(opt.endpoint, opt.port) + this.client = new Client(opt.endpoint) } async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise {} @@ -167,23 +167,15 @@ export class DatalakeService implements StorageAdapter { } export function processConfigFromEnv (storageConfig: StorageConfiguration): string | undefined { - let endpoint = process.env.DATALAKE_ENDPOINT + const endpoint = process.env.DATALAKE_ENDPOINT if (endpoint === undefined) { return 'DATALAKE_ENDPOINT' } - let port = 80 - const sp = endpoint.split(':') - if (sp.length > 1) { - endpoint = sp[0] - port = parseInt(sp[1]) - } - const config: DatalakeConfig = { kind: 'datalake', name: 'datalake', - endpoint, - port + endpoint } storageConfig.storages.push(config) storageConfig.default = 'datalake' diff --git a/server/datalake/src/perfTest.ts b/server/datalake/src/perfTest.ts new file mode 100644 index 0000000000..8b016a25f6 --- /dev/null +++ b/server/datalake/src/perfTest.ts @@ -0,0 +1,102 @@ +import { MeasureMetricsContext, generateId } from '@hcengineering/core' +import type { StorageConfiguration } from '@hcengineering/server-core' +import { DatalakeService, processConfigFromEnv, type DatalakeConfig } from '.' + +const MB = 1024 * 1024 + +const config: StorageConfiguration = { default: 'minio', storages: [] } +const minioConfigVar = processConfigFromEnv(config) +if (minioConfigVar !== undefined || config.storages[0] === undefined) { + console.error('No Datalake config env is configured:' + minioConfigVar) + it.skip('No Datalake config env is configured', async () => {}) + process.exit(1) +} +const toolCtx = new MeasureMetricsContext('test', {}) +const storageService = new DatalakeService({ ...(config.storages[0] as DatalakeConfig) }) + +async function doTest (): Promise { + const genWorkspaceId1 = generateId() + + const ws1 = { name: genWorkspaceId1 } + await storageService.make(toolCtx, ws1) + + /// /////// Uploads + console.log('upload 1mb test') + let st1 = Date.now() + const sz = 10 + const stream = Buffer.alloc(sz * 1024 * 1024) + for (let i = 0; i < 10; i++) { + // We need 1Mb random file to check upload speed. + const st = Date.now() + await storageService.put(toolCtx, ws1, `testObject.${i}`, stream, 'application/octet-stream', stream.length) + console.log('upload time', Date.now() - st) + } + let now = Date.now() + console.log(`upload performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`) + + /// // Downloads 1 + console.log('download 1mb test') + st1 = Date.now() + for (let i = 0; i < 10; i++) { + // We need 1Mb random file to check upload speed. + const st = Date.now() + await storageService.read(toolCtx, ws1, `testObject.${i}`) + console.log('download time', Date.now() - st) + } + + now = Date.now() + console.log(`download performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`) + + /// Downloads 2 + st1 = Date.now() + for (let i = 0; i < 10; i++) { + // We need 1Mb random file to check upload speed. + const st = Date.now() + const readable = await storageService.get(toolCtx, ws1, `testObject.${i}`) + const chunks: Buffer[] = [] + readable.on('data', (chunk) => { + chunks.push(chunk) + }) + await new Promise((resolve) => { + readable.on('end', () => { + resolve() + readable.destroy() + }) + }) + console.log('download time 2', Date.now() - st) + } + + now = Date.now() + console.log(`download performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`) + + /// Downloads 3 + console.log('download partial test') + st1 = Date.now() + for (let i = 0; i < 10; i++) { + // We need 1Mb random file to check upload speed. + const st = Date.now() + for (let i = 0; i < sz; i++) { + const readable = await storageService.partial(toolCtx, ws1, `testObject.${i}`, i * MB, MB) + const chunks: Buffer[] = [] + readable.on('data', (chunk) => { + chunks.push(chunk) + }) + await new Promise((resolve) => { + readable.on('end', () => { + resolve() + readable.destroy() + }) + }) + } + console.log('download time 2', Date.now() - st) + } + + now = Date.now() + console.log(`download performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`) +} + +void doTest().catch((err) => { + console.error(err) +}) + +console.log('done') diff --git a/workers/datalake/src/blob.ts b/workers/datalake/src/blob.ts index e21f9c327e..7807aa9b11 100644 --- a/workers/datalake/src/blob.ts +++ b/workers/datalake/src/blob.ts @@ -75,7 +75,9 @@ export async function handleBlobGet ( const status = length !== undefined && length < object.size ? 206 : 200 const response = new Response(object?.body, { headers, status }) - ctx.waitUntil(cache.put(request, response.clone())) + if (response.status === 200) { + ctx.waitUntil(cache.put(request, response.clone())) + } return response }