UBERF-9752: properly handle streams to avoid datalake memory leak (#8502)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2025-04-09 13:16:55 +07:00 committed by GitHub
parent 540a4690d3
commit 88e38b12f0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 322 additions and 145 deletions

View File

@ -29,6 +29,7 @@ export interface Config {
StreamUrl?: string
DbUrl: string
Buckets: BucketConfig[]
CleanupInterval: number
}
const parseNumber = (str: string | undefined): number | undefined => (str !== undefined ? Number(str) : undefined)
@ -73,6 +74,7 @@ function parseBucketConfig (str: string): BucketConfig {
const config: Config = (() => {
const params: Partial<Config> = {
Port: parseNumber(process.env.PORT) ?? 4030,
CleanupInterval: parseNumber(process.env.CLEANUP_INTERVAL) ?? 30 * 1000,
Secret: process.env.SECRET,
AccountsUrl: process.env.ACCOUNTS_URL,
DbUrl: process.env.DB_URL,

View File

@ -17,10 +17,12 @@ import { MeasureContext } from '@hcengineering/core'
import { type Request, type Response } from 'express'
import { UploadedFile } from 'express-fileupload'
import fs from 'fs'
import { pipeline, Readable } from 'stream'
import { cacheControl } from '../const'
import { type Datalake, wrapETag } from '../datalake'
import { getBufferSha256, getStreamSha256 } from '../hash'
import { getBufferSha256, getFileSha256 } from '../hash'
import { type TemporaryDir } from '../tempdir'
interface BlobParentRequest {
parent: string | null
@ -80,22 +82,18 @@ export async function handleBlobGet (
const status = range != null && blob.bodyLength !== blob.size ? 206 : 200
res.status(status)
const data = blob.body
data.pipe(res)
pipeline(blob.body, res, (err) => {
if (err != null) {
const error = err instanceof Error ? err.message : String(err)
ctx.error('error writing response', { workspace, name, error })
if (!res.headersSent) {
res.status(500).send('Internal Server Error')
}
}
})
await new Promise<void>((resolve, reject) => {
data.on('end', () => {
data.destroy()
res.end()
resolve()
})
data.on('error', (err) => {
ctx.error('error receive stream', { workspace, name, error: err })
res.end()
data.destroy()
reject(err)
})
req.on('close', () => {
blob.body.destroy()
})
}
@ -186,7 +184,8 @@ export async function handleUploadFormData (
ctx: MeasureContext,
req: Request,
res: Response,
datalake: Datalake
datalake: Datalake,
tempDir: TemporaryDir
): Promise<void> {
const { workspace } = req.params
@ -202,38 +201,46 @@ export async function handleUploadFormData (
const result = await Promise.all(
files.map(async ([file, key]) => {
const name = file.name
const size = file.size
const contentType = file.mimetype
let sha256: string
if (file.tempFilePath !== undefined) {
const stream = fs.createReadStream(file.tempFilePath)
try {
sha256 = await getStreamSha256(stream)
} finally {
stream.destroy()
}
} else {
sha256 = await getBufferSha256(file.data)
}
const data = file.tempFilePath !== undefined ? fs.createReadStream(file.tempFilePath) : file.data
try {
const metadata = await datalake.put(ctx, workspace, name, sha256, data, {
size,
contentType,
lastModified: Date.now()
})
const name = file.name
const size = file.size
const contentType = file.mimetype
ctx.info('uploaded', { workspace, name, etag: metadata.etag, type: contentType })
let sha256: string
try {
sha256 =
file.tempFilePath !== undefined ? await getFileSha256(file.tempFilePath) : await getBufferSha256(file.data)
} catch (err: any) {
const error = err instanceof Error ? err.message : String(err)
ctx.error('failed to calculate file hash', { error })
throw err
}
return { key, metadata }
} catch (err: any) {
const error = err instanceof Error ? err.message : String(err)
ctx.error('failed to upload blob', { error: err })
return { key, error }
const data = file.tempFilePath !== undefined ? fs.createReadStream(file.tempFilePath) : file.data
try {
const metadata = await datalake.put(ctx, workspace, name, sha256, data, {
size,
contentType,
lastModified: Date.now()
})
ctx.info('uploaded', { workspace, name, etag: metadata.etag, type: contentType })
return { key, metadata }
} catch (err: any) {
const error = err instanceof Error ? err.message : String(err)
ctx.error('failed to upload blob', { error: err })
return { key, error }
} finally {
if (data instanceof Readable) {
data.destroy()
}
}
} finally {
if (file.tempFilePath !== undefined) {
tempDir.rm(file.tempFilePath)
}
}
})
)

View File

@ -13,15 +13,14 @@
// limitations under the License.
//
import { generateId, MeasureContext } from '@hcengineering/core'
import { MeasureContext } from '@hcengineering/core'
import { type Request, type Response } from 'express'
import { createReadStream, mkdtempSync, rmSync } from 'fs'
import { writeFile } from 'fs/promises'
import { tmpdir } from 'os'
import { join } from 'path'
import { createReadStream, createWriteStream } from 'fs'
import sharp from 'sharp'
import { pipeline, type Readable } from 'stream'
import { type Datalake } from '../datalake'
import { TemporaryDir } from '../tempdir'
const cacheControl = 'public, max-age=31536000, immutable'
const prefferedImageFormats = ['webp', 'avif', 'jpeg', 'png']
@ -75,12 +74,23 @@ export async function handleImageGet (
ctx: MeasureContext,
req: Request,
res: Response,
datalake: Datalake
datalake: Datalake,
tempDir: TemporaryDir
): Promise<void> {
const { workspace, name, transform } = req.params
const accept = req.headers.accept ?? 'image/*'
const image = parseImageTransform(accept, transform)
const { format, width, height, fit } = getImageTransformParams(accept, transform)
const tmpFile = tempDir.tmpFile()
const outFile = tempDir.tmpFile()
const cleanup = (): void => {
tempDir.rm(tmpFile, outFile)
}
req.on('close', cleanup)
res.on('finish', cleanup)
const blob = await datalake.get(ctx, workspace, name, {})
if (blob == null) {
@ -88,23 +98,45 @@ export async function handleImageGet (
return
}
const dpr = image.dpr === undefined || Number.isNaN(image.dpr) ? 1 : image.dpr
const width =
image.width === undefined || Number.isNaN(image.width) ? undefined : Math.min(Math.round(image.width * dpr), 2048)
const height =
image.height === undefined || Number.isNaN(image.height)
? undefined
: Math.min(Math.round(image.height * dpr), 2048)
const fit = image.fit ?? 'cover'
const tempDir = mkdtempSync(join(tmpdir(), 'image-'))
const tmpFile = join(tempDir, generateId())
const outFile = join(tempDir, generateId())
await writeTempFile(tmpFile, blob.body)
try {
await writeFile(tmpFile, blob.body)
const { contentType } = await ctx.with('sharp', {}, () => {
return runPipeline(tmpFile, outFile, { format, width, height, fit })
})
let pipeline = sharp(tmpFile)
res.setHeader('Content-Type', contentType)
res.setHeader('Cache-Control', cacheControl)
await writeFileToResponse(ctx, outFile, res)
} catch (err: any) {
ctx.error('image processing error', { workspace, name, error: err })
res.setHeader('Content-Type', blob.contentType)
res.setHeader('Cache-Control', blob.cacheControl ?? cacheControl)
await writeFileToResponse(ctx, tmpFile, res)
}
}
interface ImageTransformParams {
format: string
width: number | undefined
height: number | undefined
fit: 'cover' | 'contain'
}
async function runPipeline (
inFile: string,
outFile: string,
params: ImageTransformParams
): Promise<{ contentType: string }> {
const { format, width, height, fit } = params
let pipeline: sharp.Sharp | undefined
try {
pipeline = sharp(inFile)
// auto orient image based on exif to prevent resize use wrong orientation
pipeline = pipeline.rotate()
@ -117,7 +149,7 @@ export async function handleImageGet (
})
let contentType = 'image/jpeg'
switch (image.format) {
switch (format) {
case 'jpeg':
pipeline = pipeline.jpeg({
progressive: true
@ -151,26 +183,65 @@ export async function handleImageGet (
break
}
res.setHeader('Content-Type', contentType)
res.setHeader('Cache-Control', cacheControl)
await pipeline.toFile(outFile)
await ctx.with('sharp', {}, () => pipeline.toFile(outFile))
pipeline.destroy()
createReadStream(outFile).pipe(res)
} catch (err: any) {
ctx.error('image processing error', { workspace, name, error: err })
res.setHeader('Content-Type', blob.contentType)
res.setHeader('Cache-Control', blob.cacheControl ?? cacheControl)
createReadStream(tmpFile).pipe(res)
return { contentType }
} finally {
pipeline?.destroy()
}
}
res.on('finish', () => {
try {
rmSync(tempDir, { recursive: true })
} catch (err: any) {
ctx.error('failed to remove temp dir', { workspace, name, error: err })
function getImageTransformParams (accept: string, transform: string): ImageTransformParams {
const image = parseImageTransform(accept, transform)
const format = image.format
const dpr = image.dpr === undefined || Number.isNaN(image.dpr) ? 1 : image.dpr
const width =
image.width === undefined || Number.isNaN(image.width) ? undefined : Math.min(Math.round(image.width * dpr), 2048)
const height =
image.height === undefined || Number.isNaN(image.height)
? undefined
: Math.min(Math.round(image.height * dpr), 2048)
const fit = image.fit ?? 'cover'
return { format, width, height, fit }
}
async function writeTempFile (path: string, stream: Readable): Promise<void> {
const outp = createWriteStream(path)
stream.pipe(outp)
await new Promise<void>((resolve, reject) => {
stream.on('error', (err) => {
stream.destroy()
outp.destroy()
reject(err)
})
outp.on('finish', () => {
stream.destroy()
resolve()
})
outp.on('error', (err) => {
stream.destroy()
outp.destroy()
reject(err)
})
})
}
async function writeFileToResponse (ctx: MeasureContext, path: string, res: Response): Promise<void> {
const stream = createReadStream(path)
pipeline(stream, res, (err) => {
if (err != null) {
const error = err instanceof Error ? err.message : String(err)
ctx.error('error writing response', { error })
if (!res.headersSent) {
res.status(500).send('Internal Server Error')
}
}
})
}

View File

@ -14,7 +14,8 @@
//
import crypto from 'node:crypto'
import { ReadStream } from 'fs'
import { createReadStream } from 'fs'
import { Readable } from 'stream'
export async function getBufferSha256 (buffer: Buffer): Promise<string> {
const hash = crypto.createHash('sha256')
@ -22,7 +23,7 @@ export async function getBufferSha256 (buffer: Buffer): Promise<string> {
return hash.digest('hex')
}
export async function getStreamSha256 (stream: ReadStream): Promise<string> {
export async function getStreamSha256 (stream: Readable): Promise<string> {
const hasher = crypto.createHash('sha256')
stream.pipe(hasher)
@ -37,3 +38,8 @@ export async function getStreamSha256 (stream: ReadStream): Promise<string> {
return hasher.digest('hex')
}
export async function getFileSha256 (path: string): Promise<string> {
const stream = createReadStream(path)
return await getStreamSha256(stream)
}

View File

@ -14,9 +14,23 @@
//
import { extractToken } from '@hcengineering/server-client'
import { type Response, type Request, type NextFunction } from 'express'
import { type Response, type Request, type NextFunction, RequestHandler } from 'express'
import { ApiError } from './error'
export interface KeepAliveOptions {
timeout: number
max: number
}
export const keepAlive = (options: KeepAliveOptions): RequestHandler => {
const { timeout, max } = options
return (req: Request, res: Response, next: NextFunction) => {
res.setHeader('Connection', 'keep-alive')
res.setHeader('Keep-Alive', `timeout=${timeout}, max=${max}`)
next()
}
}
export const withAuthorization = (req: Request, res: Response, next: NextFunction): void => {
try {
const token = extractToken(req.headers)

View File

@ -20,17 +20,14 @@ import { decodeToken, TokenError } from '@hcengineering/server-token'
import cors from 'cors'
import express, { type Express, type NextFunction, type Request, type Response } from 'express'
import fileUpload from 'express-fileupload'
import { mkdtempSync } from 'fs'
import { type Server } from 'http'
import morgan from 'morgan'
import { tmpdir } from 'os'
import { join } from 'path'
import onHeaders from 'on-headers'
import { cacheControl } from './const'
import { createDb } from './datalake/db'
import { ApiError } from './error'
import { withAuthorization, withBlob, withWorkspace } from './middleware'
import { keepAlive, withAuthorization, withBlob, withWorkspace } from './middleware'
import {
handleBlobDelete,
handleBlobDeleteList,
@ -53,15 +50,23 @@ import { Datalake, Location } from './datalake'
import { DatalakeImpl } from './datalake/datalake'
import { Config } from './config'
import { createBucket, createClient, S3Bucket } from './s3'
import { TemporaryDir } from './tempdir'
const cacheControlNoCache = 'public, no-store, no-cache, must-revalidate, max-age=0'
type AsyncRequestHandler = (ctx: MeasureContext, req: Request, res: Response, datalake: Datalake) => Promise<void>
type AsyncRequestHandler = (
ctx: MeasureContext,
req: Request,
res: Response,
datalake: Datalake,
tempDir: TemporaryDir
) => Promise<void>
const handleRequest = async (
ctx: MeasureContext,
name: string,
datalake: Datalake,
tempDir: TemporaryDir,
fn: AsyncRequestHandler,
req: Request,
res: Response,
@ -77,24 +82,19 @@ const handleRequest = async (
values.push(`${k};dur=${v.value.toFixed(2)}`)
}
if (values.length > 0) {
res.setHeader('Server-Timing', values.join(', '))
if (!res.headersSent) {
res.setHeader('Server-Timing', values.join(', '))
}
}
}
})
return fn(ctx, req, res, datalake)
return fn(ctx, req, res, datalake, tempDir)
})
} catch (err: unknown) {
next(err)
}
}
const wrapRequest =
(ctx: MeasureContext, name: string, datalake: Datalake, fn: AsyncRequestHandler) =>
(req: Request, res: Response, next: NextFunction) => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
handleRequest(ctx, name, datalake, fn, req, res, next)
}
export function createServer (ctx: MeasureContext, config: Config): { app: Express, close: () => void } {
const buckets: Array<{ location: Location, bucket: S3Bucket }> = []
for (const bucket of config.Buckets) {
@ -115,13 +115,13 @@ export function createServer (ctx: MeasureContext, config: Config): { app: Expre
const db = createDb(ctx, config.DbUrl)
const datalake = new DatalakeImpl(db, buckets, { cacheControl })
const tempFileDir = mkdtempSync(join(tmpdir(), 'datalake-'))
const tempDir = new TemporaryDir(ctx, 'datalake-', config.CleanupInterval)
const app = express()
app.use(cors())
app.use(express.json({ limit: '50mb' }))
app.use(fileUpload({ useTempFiles: true, tempFileDir }))
app.use(fileUpload({ useTempFiles: true, tempFileDir: tempDir.path }))
app.use(keepAlive({ timeout: 5, max: 1000 }))
const childLogger = ctx.logger.childLogger?.('requests', { enableConsole: 'true' })
const requests = ctx.newChild('requests', {}, {}, childLogger)
@ -131,51 +131,43 @@ export function createServer (ctx: MeasureContext, config: Config): { app: Expre
}
}
const wrapRequest =
(ctx: MeasureContext, name: string, fn: AsyncRequestHandler) =>
(req: Request, res: Response, next: NextFunction) => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
handleRequest(ctx, name, datalake, tempDir, fn, req, res, next)
}
app.use(morgan('short', { stream: new LogStream() }))
app.get('/blob/:workspace', withAuthorization, withWorkspace, wrapRequest(ctx, 'listBlobs', datalake, handleBlobList))
app.get('/blob/:workspace', withAuthorization, withWorkspace, wrapRequest(ctx, 'listBlobs', handleBlobList))
app.head('/blob/:workspace/:name', withBlob, wrapRequest(ctx, 'headBlob', datalake, handleBlobHead))
app.head('/blob/:workspace/:name', withBlob, wrapRequest(ctx, 'headBlob', handleBlobHead))
app.head('/blob/:workspace/:name/:filename', withBlob, wrapRequest(ctx, 'headBlob', datalake, handleBlobHead))
app.head('/blob/:workspace/:name/:filename', withBlob, wrapRequest(ctx, 'headBlob', handleBlobHead))
app.get('/blob/:workspace/:name', withBlob, wrapRequest(ctx, 'getBlob', datalake, handleBlobGet))
app.get('/blob/:workspace/:name', withBlob, wrapRequest(ctx, 'getBlob', handleBlobGet))
app.get('/blob/:workspace/:name/:filename', withBlob, wrapRequest(ctx, 'getBlob', datalake, handleBlobGet))
app.get('/blob/:workspace/:name/:filename', withBlob, wrapRequest(ctx, 'getBlob', handleBlobGet))
app.delete(
'/blob/:workspace/:name',
withAuthorization,
withBlob,
wrapRequest(ctx, 'deleteBlob', datalake, handleBlobDelete)
)
app.delete('/blob/:workspace/:name', withAuthorization, withBlob, wrapRequest(ctx, 'deleteBlob', handleBlobDelete))
app.delete(
'/blob/:workspace/:name/:filename',
withAuthorization,
withBlob,
wrapRequest(ctx, 'deleteBlob', datalake, handleBlobDelete)
wrapRequest(ctx, 'deleteBlob', handleBlobDelete)
)
app.delete(
'/blob/:workspace',
withAuthorization,
withWorkspace,
wrapRequest(ctx, 'deleteBlob', datalake, handleBlobDeleteList)
)
app.delete('/blob/:workspace', withAuthorization, withWorkspace, wrapRequest(ctx, 'deleteBlob', handleBlobDeleteList))
// Blob meta
app.get('/meta/:workspace/:name', withAuthorization, withBlob, wrapRequest(ctx, 'getMeta', datalake, handleMetaGet))
app.get('/meta/:workspace/:name', withAuthorization, withBlob, wrapRequest(ctx, 'getMeta', handleMetaGet))
app.put('/meta/:workspace/:name', withAuthorization, withBlob, wrapRequest(ctx, 'putMeta', datalake, handleMetaPut))
app.put('/meta/:workspace/:name', withAuthorization, withBlob, wrapRequest(ctx, 'putMeta', handleMetaPut))
app.patch(
'/meta/:workspace/:name',
withAuthorization,
withBlob,
wrapRequest(ctx, 'patchMeta', datalake, handleMetaPatch)
)
app.patch('/meta/:workspace/:name', withAuthorization, withBlob, wrapRequest(ctx, 'patchMeta', handleMetaPatch))
// Form Data upload
@ -183,7 +175,7 @@ export function createServer (ctx: MeasureContext, config: Config): { app: Expre
'/upload/form-data/:workspace',
withAuthorization,
withWorkspace,
wrapRequest(ctx, 'uploadFormData', datalake, handleUploadFormData)
wrapRequest(ctx, 'uploadFormData', handleUploadFormData)
)
// S3 upload
@ -192,48 +184,43 @@ export function createServer (ctx: MeasureContext, config: Config): { app: Expre
'/upload/s3/:workspace',
withAuthorization,
withWorkspace,
wrapRequest(ctx, 's3UploadParams', datalake, handleS3CreateBlobParams)
wrapRequest(ctx, 's3UploadParams', handleS3CreateBlobParams)
)
app.post(
'/upload/s3/:workspace/:name',
withAuthorization,
withBlob,
wrapRequest(ctx, 's3Upload', datalake, handleS3CreateBlob)
)
app.post('/upload/s3/:workspace/:name', withAuthorization, withBlob, wrapRequest(ctx, 's3Upload', handleS3CreateBlob))
// Multipart upload
app.post(
'/upload/multipart/:workspace/:name',
withAuthorization,
withBlob,
wrapRequest(ctx, 'multipartUploadStart', datalake, handleMultipartUploadStart)
wrapRequest(ctx, 'multipartUploadStart', handleMultipartUploadStart)
)
app.put(
'/upload/multipart/:workspace/:name/part',
withAuthorization,
withBlob,
wrapRequest(ctx, 'multipartUploadPart', datalake, handleMultipartUploadPart)
wrapRequest(ctx, 'multipartUploadPart', handleMultipartUploadPart)
)
app.post(
'/upload/multipart/:workspace/:name/complete',
withAuthorization,
withBlob,
wrapRequest(ctx, 'multipartUploadComplete', datalake, handleMultipartUploadComplete)
wrapRequest(ctx, 'multipartUploadComplete', handleMultipartUploadComplete)
)
app.post(
'/upload/multipart/:workspace/:name/abort',
withAuthorization,
withBlob,
wrapRequest(ctx, 'multipartUploadAvort', datalake, handleMultipartUploadAbort)
wrapRequest(ctx, 'multipartUploadAvort', handleMultipartUploadAbort)
)
// Image
app.get('/image/:transform/:workspace/:name', withBlob, wrapRequest(ctx, 'transformImage', datalake, handleImageGet)) // no auth
app.get('/image/:transform/:workspace/:name', withBlob, wrapRequest(ctx, 'transformImage', handleImageGet)) // no auth
app.use((err: any, _req: any, res: any, _next: any) => {
ctx.error(err.message, { code: err.code, message: err.message })
@ -287,7 +274,9 @@ export function createServer (ctx: MeasureContext, config: Config): { app: Expre
return {
app,
close: () => {}
close: () => {
void tempDir.close()
}
}
}

View File

@ -0,0 +1,88 @@
//
// Copyright © 2025 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { type MeasureContext, generateId } from '@hcengineering/core'
import { mkdtempSync, rmSync } from 'fs'
import { rm } from 'fs/promises'
import { tmpdir } from 'os'
import { join } from 'path'
export class TemporaryDir {
readonly path: string
private readonly interval: NodeJS.Timeout
private pendingCleanup: Set<string>
private cleanupPromise: Promise<void> = Promise.resolve()
constructor (
private readonly ctx: MeasureContext,
name: string,
interval: number = 60 * 1000
) {
this.path = mkdtempSync(join(tmpdir(), name))
this.pendingCleanup = new Set()
ctx.info('using temp dir', { path: this.path })
this.interval = setInterval(() => {
void this.cleanup()
}, interval)
}
tmpFile (): string {
return join(this.path, generateId())
}
rm (...files: string[]): void {
const normalizedPath = join(this.path)
for (const file of files) {
const normalizedFile = join(file)
if (normalizedFile.startsWith(normalizedPath)) {
this.pendingCleanup.add(file)
}
}
}
private async cleanup (): Promise<void> {
await this.cleanupPromise
this.cleanupPromise = this.doCleanup()
await this.cleanupPromise
}
private async doCleanup (): Promise<void> {
const pendingCleanup = this.pendingCleanup
this.pendingCleanup = new Set()
await this.ctx.with('cleanup', {}, () => {
const promises = Array.from(pendingCleanup).map(async (file) => {
try {
await rm(file, { force: true })
} catch (err) {
const error = err instanceof Error ? err.message : String(err)
this.ctx.warn('failed to cleanup temp file', { file, error })
}
})
this.ctx.info('temp files cleanup', { files: promises.length })
return Promise.all(promises)
})
}
async close (): Promise<void> {
clearInterval(this.interval)
await this.cleanupPromise
rmSync(this.path, { force: true, recursive: true })
}
}