From 4055bc6bce82b96054ba7b5e9a312ddc0fa76001 Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Tue, 3 Jun 2025 15:49:43 +0700 Subject: [PATCH] fix: datalake memory leak fixes Signed-off-by: Alexander Onnikov --- server/datalake/src/client.ts | 25 +++++---------- .../pod-datalake/src/handlers/blob.ts | 8 ++--- .../pod-datalake/src/handlers/image.ts | 32 +++++++++---------- services/datalake/pod-datalake/src/hash.ts | 16 ++++------ 4 files changed, 33 insertions(+), 48 deletions(-) diff --git a/server/datalake/src/client.ts b/server/datalake/src/client.ts index 012f994bf2..1eda805266 100644 --- a/server/datalake/src/client.ts +++ b/server/datalake/src/client.ts @@ -119,7 +119,6 @@ export class DatalakeClient { if (err.name === 'NotFoundError') { return undefined } - console.error('failed to get object', { workspace, objectName, err }) throw err } @@ -151,7 +150,6 @@ export class DatalakeClient { if (err.name === 'NotFoundError') { return undefined } - console.error('failed to get partial object', { workspace, objectName, err }) throw err } @@ -180,7 +178,6 @@ export class DatalakeClient { if (err.name === 'NotFoundError') { return } - console.error('failed to stat object', { workspace, objectName, err }) throw err } @@ -205,7 +202,6 @@ export class DatalakeClient { }) } catch (err: any) { if (err.name !== 'NotFoundError') { - console.error('failed to delete object', { workspace, objectName, err }) throw err } } @@ -230,19 +226,14 @@ export class DatalakeClient { } } - try { - if (size === undefined || size < 64 * 1024 * 1024) { - return await ctx.with('direct-upload', {}, (ctx) => - this.uploadWithFormData(ctx, workspace, objectName, stream, { ...params, size }) - ) - } else { - return await ctx.with('multipart-upload', {}, (ctx) => - this.uploadWithMultipart(ctx, workspace, objectName, stream, { ...params, size }) - ) - } - } catch (err) { - console.error('failed to put object', { workspace, objectName, err }) - throw err + if (size === undefined || size < 64 * 1024 * 1024) { + return await ctx.with('direct-upload', {}, (ctx) => + this.uploadWithFormData(ctx, workspace, objectName, stream, { ...params, size }) + ) + } else { + return await ctx.with('multipart-upload', {}, (ctx) => + this.uploadWithMultipart(ctx, workspace, objectName, stream, { ...params, size }) + ) } } diff --git a/services/datalake/pod-datalake/src/handlers/blob.ts b/services/datalake/pod-datalake/src/handlers/blob.ts index 658384c5b5..6e9fd65fb6 100644 --- a/services/datalake/pod-datalake/src/handlers/blob.ts +++ b/services/datalake/pod-datalake/src/handlers/blob.ts @@ -85,6 +85,10 @@ export async function handleBlobGet ( res.status(status) pipeline(blob.body, res, (err) => { + if (!blob.body.destroyed) { + blob.body.destroy() + } + if (err != null) { // ignore abort errors to avoid flooding the logs if (err.name === 'AbortError' || err.code === 'ERR_STREAM_PREMATURE_CLOSE') { @@ -98,10 +102,6 @@ export async function handleBlobGet ( } } }) - - req.on('close', () => { - blob.body.destroy() - }) } export async function handleBlobHead ( diff --git a/services/datalake/pod-datalake/src/handlers/image.ts b/services/datalake/pod-datalake/src/handlers/image.ts index a5279a5e05..596b0687f4 100644 --- a/services/datalake/pod-datalake/src/handlers/image.ts +++ b/services/datalake/pod-datalake/src/handlers/image.ts @@ -117,7 +117,9 @@ export async function handleImageGet ( tempDir.rm(tmpFile, outFile) } + req.on('error', cleanup) req.on('close', cleanup) + res.on('error', cleanup) res.on('finish', cleanup) const blob = await datalake.get(ctx, workspace, name, {}) @@ -165,7 +167,7 @@ async function runPipeline ( let pipeline: sharp.Sharp | undefined try { - pipeline = sharp(inFile) + pipeline = sharp(inFile, { sequentialRead: true }) // auto orient image based on exif to prevent resize use wrong orientation pipeline = pipeline.rotate() @@ -228,25 +230,21 @@ function getImageTransformParams (accept: string, transform: string): ImageTrans async function writeTempFile (path: string, stream: Readable): Promise { const outp = createWriteStream(path) - stream.pipe(outp) - await new Promise((resolve, reject) => { - stream.on('error', (err) => { - stream.destroy() - outp.destroy() - reject(err) - }) + const cleanup = (err?: any): void => { + if (!stream.destroyed) stream.destroy() + if (!outp.destroyed) outp.destroy() + if (err !== undefined) reject(err) + } - outp.on('finish', () => { - stream.destroy() - resolve() - }) + stream.on('error', cleanup) + outp.on('finish', resolve) + outp.on('error', cleanup) - outp.on('error', (err) => { - stream.destroy() - outp.destroy() - reject(err) - }) + stream.pipe(outp) + }).finally(() => { + if (!stream.destroyed) stream.destroy() + if (!outp.destroyed) outp.destroy() }) } diff --git a/services/datalake/pod-datalake/src/hash.ts b/services/datalake/pod-datalake/src/hash.ts index 13742f232c..889faeb4cf 100644 --- a/services/datalake/pod-datalake/src/hash.ts +++ b/services/datalake/pod-datalake/src/hash.ts @@ -13,27 +13,23 @@ // limitations under the License. // -import crypto from 'node:crypto' +import { createHash } from 'node:crypto' import { createReadStream } from 'fs' import { Readable } from 'stream' export async function getBufferSha256 (buffer: Buffer): Promise { - const hash = crypto.createHash('sha256') + const hash = createHash('sha256') hash.write(buffer) return hash.digest('hex') } export async function getStreamSha256 (stream: Readable): Promise { - const hasher = crypto.createHash('sha256') + const hasher = createHash('sha256') stream.pipe(hasher) - await new Promise((resolve, reject) => { - stream.on('error', (err) => { - reject(err) - }) - stream.on('end', () => { - resolve() - }) + await new Promise((resolve, reject) => { + stream.on('end', resolve) + stream.on('error', reject) }) return hasher.digest('hex')