fix: datalake memory leak fixes (#9161)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2025-06-03 17:52:54 +07:00 committed by GitHub
parent ff16fac754
commit 94e9d0cbf1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 33 additions and 48 deletions

View File

@ -119,7 +119,6 @@ export class DatalakeClient {
if (err.name === 'NotFoundError') { if (err.name === 'NotFoundError') {
return undefined return undefined
} }
console.error('failed to get object', { workspace, objectName, err })
throw err throw err
} }
@ -151,7 +150,6 @@ export class DatalakeClient {
if (err.name === 'NotFoundError') { if (err.name === 'NotFoundError') {
return undefined return undefined
} }
console.error('failed to get partial object', { workspace, objectName, err })
throw err throw err
} }
@ -180,7 +178,6 @@ export class DatalakeClient {
if (err.name === 'NotFoundError') { if (err.name === 'NotFoundError') {
return return
} }
console.error('failed to stat object', { workspace, objectName, err })
throw err throw err
} }
@ -205,7 +202,6 @@ export class DatalakeClient {
}) })
} catch (err: any) { } catch (err: any) {
if (err.name !== 'NotFoundError') { if (err.name !== 'NotFoundError') {
console.error('failed to delete object', { workspace, objectName, err })
throw err throw err
} }
} }
@ -230,19 +226,14 @@ export class DatalakeClient {
} }
} }
try { if (size === undefined || size < 64 * 1024 * 1024) {
if (size === undefined || size < 64 * 1024 * 1024) { return await ctx.with('direct-upload', {}, (ctx) =>
return await ctx.with('direct-upload', {}, (ctx) => this.uploadWithFormData(ctx, workspace, objectName, stream, { ...params, size })
this.uploadWithFormData(ctx, workspace, objectName, stream, { ...params, size }) )
) } else {
} else { return await ctx.with('multipart-upload', {}, (ctx) =>
return await ctx.with('multipart-upload', {}, (ctx) => this.uploadWithMultipart(ctx, workspace, objectName, stream, { ...params, size })
this.uploadWithMultipart(ctx, workspace, objectName, stream, { ...params, size }) )
)
}
} catch (err) {
console.error('failed to put object', { workspace, objectName, err })
throw err
} }
} }

View File

@ -85,6 +85,10 @@ export async function handleBlobGet (
res.status(status) res.status(status)
pipeline(blob.body, res, (err) => { pipeline(blob.body, res, (err) => {
if (!blob.body.destroyed) {
blob.body.destroy()
}
if (err != null) { if (err != null) {
// ignore abort errors to avoid flooding the logs // ignore abort errors to avoid flooding the logs
if (err.name === 'AbortError' || err.code === 'ERR_STREAM_PREMATURE_CLOSE') { if (err.name === 'AbortError' || err.code === 'ERR_STREAM_PREMATURE_CLOSE') {
@ -98,10 +102,6 @@ export async function handleBlobGet (
} }
} }
}) })
req.on('close', () => {
blob.body.destroy()
})
} }
export async function handleBlobHead ( export async function handleBlobHead (

View File

@ -117,7 +117,9 @@ export async function handleImageGet (
tempDir.rm(tmpFile, outFile) tempDir.rm(tmpFile, outFile)
} }
req.on('error', cleanup)
req.on('close', cleanup) req.on('close', cleanup)
res.on('error', cleanup)
res.on('finish', cleanup) res.on('finish', cleanup)
const blob = await datalake.get(ctx, workspace, name, {}) const blob = await datalake.get(ctx, workspace, name, {})
@ -165,7 +167,7 @@ async function runPipeline (
let pipeline: sharp.Sharp | undefined let pipeline: sharp.Sharp | undefined
try { try {
pipeline = sharp(inFile) pipeline = sharp(inFile, { sequentialRead: true })
// auto orient image based on exif to prevent resize use wrong orientation // auto orient image based on exif to prevent resize use wrong orientation
pipeline = pipeline.rotate() pipeline = pipeline.rotate()
@ -228,25 +230,21 @@ function getImageTransformParams (accept: string, transform: string): ImageTrans
async function writeTempFile (path: string, stream: Readable): Promise<void> { async function writeTempFile (path: string, stream: Readable): Promise<void> {
const outp = createWriteStream(path) const outp = createWriteStream(path)
stream.pipe(outp)
await new Promise<void>((resolve, reject) => { await new Promise<void>((resolve, reject) => {
stream.on('error', (err) => { const cleanup = (err?: any): void => {
stream.destroy() if (!stream.destroyed) stream.destroy()
outp.destroy() if (!outp.destroyed) outp.destroy()
reject(err) if (err !== undefined) reject(err)
}) }
outp.on('finish', () => { stream.on('error', cleanup)
stream.destroy() outp.on('finish', resolve)
resolve() outp.on('error', cleanup)
})
outp.on('error', (err) => { stream.pipe(outp)
stream.destroy() }).finally(() => {
outp.destroy() if (!stream.destroyed) stream.destroy()
reject(err) if (!outp.destroyed) outp.destroy()
})
}) })
} }

View File

@ -13,27 +13,23 @@
// limitations under the License. // limitations under the License.
// //
import crypto from 'node:crypto' import { createHash } from 'node:crypto'
import { createReadStream } from 'fs' import { createReadStream } from 'fs'
import { Readable } from 'stream' import { Readable } from 'stream'
export async function getBufferSha256 (buffer: Buffer): Promise<string> { export async function getBufferSha256 (buffer: Buffer): Promise<string> {
const hash = crypto.createHash('sha256') const hash = createHash('sha256')
hash.write(buffer) hash.write(buffer)
return hash.digest('hex') return hash.digest('hex')
} }
export async function getStreamSha256 (stream: Readable): Promise<string> { export async function getStreamSha256 (stream: Readable): Promise<string> {
const hasher = crypto.createHash('sha256') const hasher = createHash('sha256')
stream.pipe(hasher) stream.pipe(hasher)
await new Promise<void>((resolve, reject) => { await new Promise((resolve, reject) => {
stream.on('error', (err) => { stream.on('end', resolve)
reject(err) stream.on('error', reject)
})
stream.on('end', () => {
resolve()
})
}) })
return hasher.digest('hex') return hasher.digest('hex')