fix: datalake fixes and perftest (#7016)

This commit is contained in:
Alexander Onnikov 2024-10-23 12:51:25 +07:00 committed by GitHub
parent c1fe92536f
commit 0bfb79b9d5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 118 additions and 24 deletions

View File

@ -190,7 +190,7 @@ dependencies:
version: file:projects/core.tgz(@types/node@20.11.19)(esbuild@0.20.1)(ts-node@10.9.2)
'@rush-temp/datalake':
specifier: file:./projects/datalake.tgz
version: file:projects/datalake.tgz(esbuild@0.20.1)(ts-node@10.9.2)
version: file:projects/datalake.tgz(esbuild@0.20.1)
'@rush-temp/desktop':
specifier: file:./projects/desktop.tgz
version: file:projects/desktop.tgz(bufferutil@4.0.8)(sass@1.71.1)(utf-8-validate@6.0.4)
@ -27080,13 +27080,12 @@ packages:
- ts-node
dev: false
file:projects/datalake.tgz(esbuild@0.20.1)(ts-node@10.9.2):
resolution: {integrity: sha512-pqgfJAfjDTa3AWRK263xljvkd1GLinDFrjTGW7res8krRskMMJ3K6gj3kfnLjyKmWeAesJQ5CSnFybPnPSJq/Q==, tarball: file:projects/datalake.tgz}
file:projects/datalake.tgz(esbuild@0.20.1):
resolution: {integrity: sha512-UX1RJWMtrQY5HWrFKnwi2vrRYfR8ZSRo2PtLn04ozWueiiLS3Q61UauAUfPDRtO0K5cJgecH7+gX750dx8oUhQ==, tarball: file:projects/datalake.tgz}
id: file:projects/datalake.tgz
name: '@rush-temp/datalake'
version: 0.0.0
dependencies:
'@aws-sdk/client-s3': 3.577.0
'@types/jest': 29.5.12
'@types/node': 20.11.19
'@types/node-fetch': 2.6.11
@ -27102,17 +27101,19 @@ packages:
node-fetch: 2.7.0
prettier: 3.2.5
ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.3.3)
ts-node: 10.9.2(@types/node@20.11.19)(typescript@5.3.3)
typescript: 5.3.3
transitivePeerDependencies:
- '@babel/core'
- '@jest/types'
- '@swc/core'
- '@swc/wasm'
- babel-jest
- babel-plugin-macros
- encoding
- esbuild
- node-notifier
- supports-color
- ts-node
dev: false
file:projects/desktop-1.tgz(webpack@5.90.3):

View File

@ -32,7 +32,8 @@
"jest": "^29.7.0",
"ts-jest": "^29.1.1",
"@types/jest": "^29.5.5",
"@types/node-fetch": "~2.6.2"
"@types/node-fetch": "~2.6.2",
"ts-node": "^10.8.0"
},
"dependencies": {
"@hcengineering/core": "^0.6.32",

View File

@ -49,11 +49,7 @@ type BlobUploadResult = BlobUploadSuccess | BlobUploadError
/** @public */
export class Client {
private readonly endpoint: string
constructor (host: string, port?: number) {
this.endpoint = port !== undefined ? `${host}:${port}` : host
}
constructor (private readonly endpoint: string) {}
getObjectUrl (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): string {
const path = `/blob/${workspace.name}/${encodeURIComponent(objectName)}`
@ -81,7 +77,7 @@ export class Client {
): Promise<Readable> {
const url = this.getObjectUrl(ctx, workspace, objectName)
const headers = {
Range: `bytes=${offset}-${length ?? ''}`
Range: length !== undefined ? `bytes=${offset}-${offset + length - 1}` : `bytes=${offset}`
}
const response = await fetchSafe(ctx, url, { headers })

View File

@ -37,7 +37,7 @@ export class DatalakeService implements StorageAdapter {
static config = 'datalake'
client: Client
constructor (readonly opt: DatalakeConfig) {
this.client = new Client(opt.endpoint, opt.port)
this.client = new Client(opt.endpoint)
}
async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {}
@ -167,23 +167,15 @@ export class DatalakeService implements StorageAdapter {
}
export function processConfigFromEnv (storageConfig: StorageConfiguration): string | undefined {
let endpoint = process.env.DATALAKE_ENDPOINT
const endpoint = process.env.DATALAKE_ENDPOINT
if (endpoint === undefined) {
return 'DATALAKE_ENDPOINT'
}
let port = 80
const sp = endpoint.split(':')
if (sp.length > 1) {
endpoint = sp[0]
port = parseInt(sp[1])
}
const config: DatalakeConfig = {
kind: 'datalake',
name: 'datalake',
endpoint,
port
endpoint
}
storageConfig.storages.push(config)
storageConfig.default = 'datalake'

View File

@ -0,0 +1,102 @@
import { MeasureMetricsContext, generateId } from '@hcengineering/core'
import type { StorageConfiguration } from '@hcengineering/server-core'
import { DatalakeService, processConfigFromEnv, type DatalakeConfig } from '.'
const MB = 1024 * 1024
const config: StorageConfiguration = { default: 'minio', storages: [] }
const minioConfigVar = processConfigFromEnv(config)
if (minioConfigVar !== undefined || config.storages[0] === undefined) {
console.error('No Datalake config env is configured:' + minioConfigVar)
it.skip('No Datalake config env is configured', async () => {})
process.exit(1)
}
const toolCtx = new MeasureMetricsContext('test', {})
const storageService = new DatalakeService({ ...(config.storages[0] as DatalakeConfig) })
async function doTest (): Promise<void> {
const genWorkspaceId1 = generateId()
const ws1 = { name: genWorkspaceId1 }
await storageService.make(toolCtx, ws1)
/// /////// Uploads
console.log('upload 1mb test')
let st1 = Date.now()
const sz = 10
const stream = Buffer.alloc(sz * 1024 * 1024)
for (let i = 0; i < 10; i++) {
// We need 1Mb random file to check upload speed.
const st = Date.now()
await storageService.put(toolCtx, ws1, `testObject.${i}`, stream, 'application/octet-stream', stream.length)
console.log('upload time', Date.now() - st)
}
let now = Date.now()
console.log(`upload performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`)
/// // Downloads 1
console.log('download 1mb test')
st1 = Date.now()
for (let i = 0; i < 10; i++) {
// We need 1Mb random file to check upload speed.
const st = Date.now()
await storageService.read(toolCtx, ws1, `testObject.${i}`)
console.log('download time', Date.now() - st)
}
now = Date.now()
console.log(`download performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`)
/// Downloads 2
st1 = Date.now()
for (let i = 0; i < 10; i++) {
// We need 1Mb random file to check upload speed.
const st = Date.now()
const readable = await storageService.get(toolCtx, ws1, `testObject.${i}`)
const chunks: Buffer[] = []
readable.on('data', (chunk) => {
chunks.push(chunk)
})
await new Promise<void>((resolve) => {
readable.on('end', () => {
resolve()
readable.destroy()
})
})
console.log('download time 2', Date.now() - st)
}
now = Date.now()
console.log(`download performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`)
/// Downloads 3
console.log('download partial test')
st1 = Date.now()
for (let i = 0; i < 10; i++) {
// We need 1Mb random file to check upload speed.
const st = Date.now()
for (let i = 0; i < sz; i++) {
const readable = await storageService.partial(toolCtx, ws1, `testObject.${i}`, i * MB, MB)
const chunks: Buffer[] = []
readable.on('data', (chunk) => {
chunks.push(chunk)
})
await new Promise<void>((resolve) => {
readable.on('end', () => {
resolve()
readable.destroy()
})
})
}
console.log('download time 2', Date.now() - st)
}
now = Date.now()
console.log(`download performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`)
}
void doTest().catch((err) => {
console.error(err)
})
console.log('done')

View File

@ -75,7 +75,9 @@ export async function handleBlobGet (
const status = length !== undefined && length < object.size ? 206 : 200
const response = new Response(object?.body, { headers, status })
ctx.waitUntil(cache.put(request, response.clone()))
if (response.status === 200) {
ctx.waitUntil(cache.put(request, response.clone()))
}
return response
}