mirror of
https://github.com/hcengineering/platform.git
synced 2025-06-09 09:20:54 +00:00
fix: datalake memory leak fixes
Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
parent
9c04340976
commit
4055bc6bce
@ -119,7 +119,6 @@ export class DatalakeClient {
|
||||
if (err.name === 'NotFoundError') {
|
||||
return undefined
|
||||
}
|
||||
console.error('failed to get object', { workspace, objectName, err })
|
||||
throw err
|
||||
}
|
||||
|
||||
@ -151,7 +150,6 @@ export class DatalakeClient {
|
||||
if (err.name === 'NotFoundError') {
|
||||
return undefined
|
||||
}
|
||||
console.error('failed to get partial object', { workspace, objectName, err })
|
||||
throw err
|
||||
}
|
||||
|
||||
@ -180,7 +178,6 @@ export class DatalakeClient {
|
||||
if (err.name === 'NotFoundError') {
|
||||
return
|
||||
}
|
||||
console.error('failed to stat object', { workspace, objectName, err })
|
||||
throw err
|
||||
}
|
||||
|
||||
@ -205,7 +202,6 @@ export class DatalakeClient {
|
||||
})
|
||||
} catch (err: any) {
|
||||
if (err.name !== 'NotFoundError') {
|
||||
console.error('failed to delete object', { workspace, objectName, err })
|
||||
throw err
|
||||
}
|
||||
}
|
||||
@ -230,19 +226,14 @@ export class DatalakeClient {
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
if (size === undefined || size < 64 * 1024 * 1024) {
|
||||
return await ctx.with('direct-upload', {}, (ctx) =>
|
||||
this.uploadWithFormData(ctx, workspace, objectName, stream, { ...params, size })
|
||||
)
|
||||
} else {
|
||||
return await ctx.with('multipart-upload', {}, (ctx) =>
|
||||
this.uploadWithMultipart(ctx, workspace, objectName, stream, { ...params, size })
|
||||
)
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('failed to put object', { workspace, objectName, err })
|
||||
throw err
|
||||
if (size === undefined || size < 64 * 1024 * 1024) {
|
||||
return await ctx.with('direct-upload', {}, (ctx) =>
|
||||
this.uploadWithFormData(ctx, workspace, objectName, stream, { ...params, size })
|
||||
)
|
||||
} else {
|
||||
return await ctx.with('multipart-upload', {}, (ctx) =>
|
||||
this.uploadWithMultipart(ctx, workspace, objectName, stream, { ...params, size })
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -85,6 +85,10 @@ export async function handleBlobGet (
|
||||
res.status(status)
|
||||
|
||||
pipeline(blob.body, res, (err) => {
|
||||
if (!blob.body.destroyed) {
|
||||
blob.body.destroy()
|
||||
}
|
||||
|
||||
if (err != null) {
|
||||
// ignore abort errors to avoid flooding the logs
|
||||
if (err.name === 'AbortError' || err.code === 'ERR_STREAM_PREMATURE_CLOSE') {
|
||||
@ -98,10 +102,6 @@ export async function handleBlobGet (
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
req.on('close', () => {
|
||||
blob.body.destroy()
|
||||
})
|
||||
}
|
||||
|
||||
export async function handleBlobHead (
|
||||
|
@ -117,7 +117,9 @@ export async function handleImageGet (
|
||||
tempDir.rm(tmpFile, outFile)
|
||||
}
|
||||
|
||||
req.on('error', cleanup)
|
||||
req.on('close', cleanup)
|
||||
res.on('error', cleanup)
|
||||
res.on('finish', cleanup)
|
||||
|
||||
const blob = await datalake.get(ctx, workspace, name, {})
|
||||
@ -165,7 +167,7 @@ async function runPipeline (
|
||||
let pipeline: sharp.Sharp | undefined
|
||||
|
||||
try {
|
||||
pipeline = sharp(inFile)
|
||||
pipeline = sharp(inFile, { sequentialRead: true })
|
||||
|
||||
// auto orient image based on exif to prevent resize use wrong orientation
|
||||
pipeline = pipeline.rotate()
|
||||
@ -228,25 +230,21 @@ function getImageTransformParams (accept: string, transform: string): ImageTrans
|
||||
async function writeTempFile (path: string, stream: Readable): Promise<void> {
|
||||
const outp = createWriteStream(path)
|
||||
|
||||
stream.pipe(outp)
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
stream.on('error', (err) => {
|
||||
stream.destroy()
|
||||
outp.destroy()
|
||||
reject(err)
|
||||
})
|
||||
const cleanup = (err?: any): void => {
|
||||
if (!stream.destroyed) stream.destroy()
|
||||
if (!outp.destroyed) outp.destroy()
|
||||
if (err !== undefined) reject(err)
|
||||
}
|
||||
|
||||
outp.on('finish', () => {
|
||||
stream.destroy()
|
||||
resolve()
|
||||
})
|
||||
stream.on('error', cleanup)
|
||||
outp.on('finish', resolve)
|
||||
outp.on('error', cleanup)
|
||||
|
||||
outp.on('error', (err) => {
|
||||
stream.destroy()
|
||||
outp.destroy()
|
||||
reject(err)
|
||||
})
|
||||
stream.pipe(outp)
|
||||
}).finally(() => {
|
||||
if (!stream.destroyed) stream.destroy()
|
||||
if (!outp.destroyed) outp.destroy()
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -13,27 +13,23 @@
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import crypto from 'node:crypto'
|
||||
import { createHash } from 'node:crypto'
|
||||
import { createReadStream } from 'fs'
|
||||
import { Readable } from 'stream'
|
||||
|
||||
export async function getBufferSha256 (buffer: Buffer): Promise<string> {
|
||||
const hash = crypto.createHash('sha256')
|
||||
const hash = createHash('sha256')
|
||||
hash.write(buffer)
|
||||
return hash.digest('hex')
|
||||
}
|
||||
|
||||
export async function getStreamSha256 (stream: Readable): Promise<string> {
|
||||
const hasher = crypto.createHash('sha256')
|
||||
const hasher = createHash('sha256')
|
||||
stream.pipe(hasher)
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
stream.on('error', (err) => {
|
||||
reject(err)
|
||||
})
|
||||
stream.on('end', () => {
|
||||
resolve()
|
||||
})
|
||||
await new Promise((resolve, reject) => {
|
||||
stream.on('end', resolve)
|
||||
stream.on('error', reject)
|
||||
})
|
||||
|
||||
return hasher.digest('hex')
|
||||
|
Loading…
Reference in New Issue
Block a user