diff --git a/services/datalake/pod-datalake/src/config.ts b/services/datalake/pod-datalake/src/config.ts index 5164d822db..d80a973a61 100644 --- a/services/datalake/pod-datalake/src/config.ts +++ b/services/datalake/pod-datalake/src/config.ts @@ -29,6 +29,7 @@ export interface Config { StreamUrl?: string DbUrl: string Buckets: BucketConfig[] + CleanupInterval: number } const parseNumber = (str: string | undefined): number | undefined => (str !== undefined ? Number(str) : undefined) @@ -73,6 +74,7 @@ function parseBucketConfig (str: string): BucketConfig { const config: Config = (() => { const params: Partial = { Port: parseNumber(process.env.PORT) ?? 4030, + CleanupInterval: parseNumber(process.env.CLEANUP_INTERVAL) ?? 30 * 1000, Secret: process.env.SECRET, AccountsUrl: process.env.ACCOUNTS_URL, DbUrl: process.env.DB_URL, diff --git a/services/datalake/pod-datalake/src/handlers/blob.ts b/services/datalake/pod-datalake/src/handlers/blob.ts index 7398a6a8f1..398bbc1423 100644 --- a/services/datalake/pod-datalake/src/handlers/blob.ts +++ b/services/datalake/pod-datalake/src/handlers/blob.ts @@ -17,10 +17,12 @@ import { MeasureContext } from '@hcengineering/core' import { type Request, type Response } from 'express' import { UploadedFile } from 'express-fileupload' import fs from 'fs' +import { pipeline, Readable } from 'stream' import { cacheControl } from '../const' import { type Datalake, wrapETag } from '../datalake' -import { getBufferSha256, getStreamSha256 } from '../hash' +import { getBufferSha256, getFileSha256 } from '../hash' +import { type TemporaryDir } from '../tempdir' interface BlobParentRequest { parent: string | null @@ -80,22 +82,18 @@ export async function handleBlobGet ( const status = range != null && blob.bodyLength !== blob.size ? 206 : 200 res.status(status) - const data = blob.body - data.pipe(res) + pipeline(blob.body, res, (err) => { + if (err != null) { + const error = err instanceof Error ? err.message : String(err) + ctx.error('error writing response', { workspace, name, error }) + if (!res.headersSent) { + res.status(500).send('Internal Server Error') + } + } + }) - await new Promise((resolve, reject) => { - data.on('end', () => { - data.destroy() - res.end() - resolve() - }) - data.on('error', (err) => { - ctx.error('error receive stream', { workspace, name, error: err }) - - res.end() - data.destroy() - reject(err) - }) + req.on('close', () => { + blob.body.destroy() }) } @@ -186,7 +184,8 @@ export async function handleUploadFormData ( ctx: MeasureContext, req: Request, res: Response, - datalake: Datalake + datalake: Datalake, + tempDir: TemporaryDir ): Promise { const { workspace } = req.params @@ -202,38 +201,46 @@ export async function handleUploadFormData ( const result = await Promise.all( files.map(async ([file, key]) => { - const name = file.name - const size = file.size - const contentType = file.mimetype - - let sha256: string - if (file.tempFilePath !== undefined) { - const stream = fs.createReadStream(file.tempFilePath) - try { - sha256 = await getStreamSha256(stream) - } finally { - stream.destroy() - } - } else { - sha256 = await getBufferSha256(file.data) - } - - const data = file.tempFilePath !== undefined ? fs.createReadStream(file.tempFilePath) : file.data - try { - const metadata = await datalake.put(ctx, workspace, name, sha256, data, { - size, - contentType, - lastModified: Date.now() - }) + const name = file.name + const size = file.size + const contentType = file.mimetype - ctx.info('uploaded', { workspace, name, etag: metadata.etag, type: contentType }) + let sha256: string + try { + sha256 = + file.tempFilePath !== undefined ? await getFileSha256(file.tempFilePath) : await getBufferSha256(file.data) + } catch (err: any) { + const error = err instanceof Error ? err.message : String(err) + ctx.error('failed to calculate file hash', { error }) + throw err + } - return { key, metadata } - } catch (err: any) { - const error = err instanceof Error ? err.message : String(err) - ctx.error('failed to upload blob', { error: err }) - return { key, error } + const data = file.tempFilePath !== undefined ? fs.createReadStream(file.tempFilePath) : file.data + + try { + const metadata = await datalake.put(ctx, workspace, name, sha256, data, { + size, + contentType, + lastModified: Date.now() + }) + + ctx.info('uploaded', { workspace, name, etag: metadata.etag, type: contentType }) + + return { key, metadata } + } catch (err: any) { + const error = err instanceof Error ? err.message : String(err) + ctx.error('failed to upload blob', { error: err }) + return { key, error } + } finally { + if (data instanceof Readable) { + data.destroy() + } + } + } finally { + if (file.tempFilePath !== undefined) { + tempDir.rm(file.tempFilePath) + } } }) ) diff --git a/services/datalake/pod-datalake/src/handlers/image.ts b/services/datalake/pod-datalake/src/handlers/image.ts index b62419f8e4..bc8e462a1a 100644 --- a/services/datalake/pod-datalake/src/handlers/image.ts +++ b/services/datalake/pod-datalake/src/handlers/image.ts @@ -13,15 +13,14 @@ // limitations under the License. // -import { generateId, MeasureContext } from '@hcengineering/core' +import { MeasureContext } from '@hcengineering/core' import { type Request, type Response } from 'express' -import { createReadStream, mkdtempSync, rmSync } from 'fs' -import { writeFile } from 'fs/promises' -import { tmpdir } from 'os' -import { join } from 'path' +import { createReadStream, createWriteStream } from 'fs' import sharp from 'sharp' +import { pipeline, type Readable } from 'stream' import { type Datalake } from '../datalake' +import { TemporaryDir } from '../tempdir' const cacheControl = 'public, max-age=31536000, immutable' const prefferedImageFormats = ['webp', 'avif', 'jpeg', 'png'] @@ -75,12 +74,23 @@ export async function handleImageGet ( ctx: MeasureContext, req: Request, res: Response, - datalake: Datalake + datalake: Datalake, + tempDir: TemporaryDir ): Promise { const { workspace, name, transform } = req.params const accept = req.headers.accept ?? 'image/*' - const image = parseImageTransform(accept, transform) + const { format, width, height, fit } = getImageTransformParams(accept, transform) + + const tmpFile = tempDir.tmpFile() + const outFile = tempDir.tmpFile() + + const cleanup = (): void => { + tempDir.rm(tmpFile, outFile) + } + + req.on('close', cleanup) + res.on('finish', cleanup) const blob = await datalake.get(ctx, workspace, name, {}) if (blob == null) { @@ -88,23 +98,45 @@ export async function handleImageGet ( return } - const dpr = image.dpr === undefined || Number.isNaN(image.dpr) ? 1 : image.dpr - const width = - image.width === undefined || Number.isNaN(image.width) ? undefined : Math.min(Math.round(image.width * dpr), 2048) - const height = - image.height === undefined || Number.isNaN(image.height) - ? undefined - : Math.min(Math.round(image.height * dpr), 2048) - const fit = image.fit ?? 'cover' - - const tempDir = mkdtempSync(join(tmpdir(), 'image-')) - const tmpFile = join(tempDir, generateId()) - const outFile = join(tempDir, generateId()) + await writeTempFile(tmpFile, blob.body) try { - await writeFile(tmpFile, blob.body) + const { contentType } = await ctx.with('sharp', {}, () => { + return runPipeline(tmpFile, outFile, { format, width, height, fit }) + }) - let pipeline = sharp(tmpFile) + res.setHeader('Content-Type', contentType) + res.setHeader('Cache-Control', cacheControl) + + await writeFileToResponse(ctx, outFile, res) + } catch (err: any) { + ctx.error('image processing error', { workspace, name, error: err }) + + res.setHeader('Content-Type', blob.contentType) + res.setHeader('Cache-Control', blob.cacheControl ?? cacheControl) + + await writeFileToResponse(ctx, tmpFile, res) + } +} + +interface ImageTransformParams { + format: string + width: number | undefined + height: number | undefined + fit: 'cover' | 'contain' +} + +async function runPipeline ( + inFile: string, + outFile: string, + params: ImageTransformParams +): Promise<{ contentType: string }> { + const { format, width, height, fit } = params + + let pipeline: sharp.Sharp | undefined + + try { + pipeline = sharp(inFile) // auto orient image based on exif to prevent resize use wrong orientation pipeline = pipeline.rotate() @@ -117,7 +149,7 @@ export async function handleImageGet ( }) let contentType = 'image/jpeg' - switch (image.format) { + switch (format) { case 'jpeg': pipeline = pipeline.jpeg({ progressive: true @@ -151,26 +183,65 @@ export async function handleImageGet ( break } - res.setHeader('Content-Type', contentType) - res.setHeader('Cache-Control', cacheControl) + await pipeline.toFile(outFile) - await ctx.with('sharp', {}, () => pipeline.toFile(outFile)) - pipeline.destroy() - - createReadStream(outFile).pipe(res) - } catch (err: any) { - ctx.error('image processing error', { workspace, name, error: err }) - - res.setHeader('Content-Type', blob.contentType) - res.setHeader('Cache-Control', blob.cacheControl ?? cacheControl) - createReadStream(tmpFile).pipe(res) + return { contentType } + } finally { + pipeline?.destroy() } +} - res.on('finish', () => { - try { - rmSync(tempDir, { recursive: true }) - } catch (err: any) { - ctx.error('failed to remove temp dir', { workspace, name, error: err }) +function getImageTransformParams (accept: string, transform: string): ImageTransformParams { + const image = parseImageTransform(accept, transform) + const format = image.format + + const dpr = image.dpr === undefined || Number.isNaN(image.dpr) ? 1 : image.dpr + const width = + image.width === undefined || Number.isNaN(image.width) ? undefined : Math.min(Math.round(image.width * dpr), 2048) + const height = + image.height === undefined || Number.isNaN(image.height) + ? undefined + : Math.min(Math.round(image.height * dpr), 2048) + const fit = image.fit ?? 'cover' + + return { format, width, height, fit } +} + +async function writeTempFile (path: string, stream: Readable): Promise { + const outp = createWriteStream(path) + + stream.pipe(outp) + + await new Promise((resolve, reject) => { + stream.on('error', (err) => { + stream.destroy() + outp.destroy() + reject(err) + }) + + outp.on('finish', () => { + stream.destroy() + resolve() + }) + + outp.on('error', (err) => { + stream.destroy() + outp.destroy() + reject(err) + }) + }) +} + +async function writeFileToResponse (ctx: MeasureContext, path: string, res: Response): Promise { + const stream = createReadStream(path) + + pipeline(stream, res, (err) => { + if (err != null) { + const error = err instanceof Error ? err.message : String(err) + ctx.error('error writing response', { error }) + if (!res.headersSent) { + res.status(500).send('Internal Server Error') + } } }) } diff --git a/services/datalake/pod-datalake/src/hash.ts b/services/datalake/pod-datalake/src/hash.ts index 7158d779e8..13742f232c 100644 --- a/services/datalake/pod-datalake/src/hash.ts +++ b/services/datalake/pod-datalake/src/hash.ts @@ -14,7 +14,8 @@ // import crypto from 'node:crypto' -import { ReadStream } from 'fs' +import { createReadStream } from 'fs' +import { Readable } from 'stream' export async function getBufferSha256 (buffer: Buffer): Promise { const hash = crypto.createHash('sha256') @@ -22,7 +23,7 @@ export async function getBufferSha256 (buffer: Buffer): Promise { return hash.digest('hex') } -export async function getStreamSha256 (stream: ReadStream): Promise { +export async function getStreamSha256 (stream: Readable): Promise { const hasher = crypto.createHash('sha256') stream.pipe(hasher) @@ -37,3 +38,8 @@ export async function getStreamSha256 (stream: ReadStream): Promise { return hasher.digest('hex') } + +export async function getFileSha256 (path: string): Promise { + const stream = createReadStream(path) + return await getStreamSha256(stream) +} diff --git a/services/datalake/pod-datalake/src/middleware.ts b/services/datalake/pod-datalake/src/middleware.ts index 9edbbcd05d..12bf7c7097 100644 --- a/services/datalake/pod-datalake/src/middleware.ts +++ b/services/datalake/pod-datalake/src/middleware.ts @@ -14,9 +14,23 @@ // import { extractToken } from '@hcengineering/server-client' -import { type Response, type Request, type NextFunction } from 'express' +import { type Response, type Request, type NextFunction, RequestHandler } from 'express' import { ApiError } from './error' +export interface KeepAliveOptions { + timeout: number + max: number +} + +export const keepAlive = (options: KeepAliveOptions): RequestHandler => { + const { timeout, max } = options + return (req: Request, res: Response, next: NextFunction) => { + res.setHeader('Connection', 'keep-alive') + res.setHeader('Keep-Alive', `timeout=${timeout}, max=${max}`) + next() + } +} + export const withAuthorization = (req: Request, res: Response, next: NextFunction): void => { try { const token = extractToken(req.headers) diff --git a/services/datalake/pod-datalake/src/server.ts b/services/datalake/pod-datalake/src/server.ts index 34742e1810..871f35c5af 100644 --- a/services/datalake/pod-datalake/src/server.ts +++ b/services/datalake/pod-datalake/src/server.ts @@ -20,17 +20,14 @@ import { decodeToken, TokenError } from '@hcengineering/server-token' import cors from 'cors' import express, { type Express, type NextFunction, type Request, type Response } from 'express' import fileUpload from 'express-fileupload' -import { mkdtempSync } from 'fs' import { type Server } from 'http' import morgan from 'morgan' -import { tmpdir } from 'os' -import { join } from 'path' import onHeaders from 'on-headers' import { cacheControl } from './const' import { createDb } from './datalake/db' import { ApiError } from './error' -import { withAuthorization, withBlob, withWorkspace } from './middleware' +import { keepAlive, withAuthorization, withBlob, withWorkspace } from './middleware' import { handleBlobDelete, handleBlobDeleteList, @@ -53,15 +50,23 @@ import { Datalake, Location } from './datalake' import { DatalakeImpl } from './datalake/datalake' import { Config } from './config' import { createBucket, createClient, S3Bucket } from './s3' +import { TemporaryDir } from './tempdir' const cacheControlNoCache = 'public, no-store, no-cache, must-revalidate, max-age=0' -type AsyncRequestHandler = (ctx: MeasureContext, req: Request, res: Response, datalake: Datalake) => Promise +type AsyncRequestHandler = ( + ctx: MeasureContext, + req: Request, + res: Response, + datalake: Datalake, + tempDir: TemporaryDir +) => Promise const handleRequest = async ( ctx: MeasureContext, name: string, datalake: Datalake, + tempDir: TemporaryDir, fn: AsyncRequestHandler, req: Request, res: Response, @@ -77,24 +82,19 @@ const handleRequest = async ( values.push(`${k};dur=${v.value.toFixed(2)}`) } if (values.length > 0) { - res.setHeader('Server-Timing', values.join(', ')) + if (!res.headersSent) { + res.setHeader('Server-Timing', values.join(', ')) + } } } }) - return fn(ctx, req, res, datalake) + return fn(ctx, req, res, datalake, tempDir) }) } catch (err: unknown) { next(err) } } -const wrapRequest = - (ctx: MeasureContext, name: string, datalake: Datalake, fn: AsyncRequestHandler) => - (req: Request, res: Response, next: NextFunction) => { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - handleRequest(ctx, name, datalake, fn, req, res, next) - } - export function createServer (ctx: MeasureContext, config: Config): { app: Express, close: () => void } { const buckets: Array<{ location: Location, bucket: S3Bucket }> = [] for (const bucket of config.Buckets) { @@ -115,13 +115,13 @@ export function createServer (ctx: MeasureContext, config: Config): { app: Expre const db = createDb(ctx, config.DbUrl) const datalake = new DatalakeImpl(db, buckets, { cacheControl }) - - const tempFileDir = mkdtempSync(join(tmpdir(), 'datalake-')) + const tempDir = new TemporaryDir(ctx, 'datalake-', config.CleanupInterval) const app = express() app.use(cors()) app.use(express.json({ limit: '50mb' })) - app.use(fileUpload({ useTempFiles: true, tempFileDir })) + app.use(fileUpload({ useTempFiles: true, tempFileDir: tempDir.path })) + app.use(keepAlive({ timeout: 5, max: 1000 })) const childLogger = ctx.logger.childLogger?.('requests', { enableConsole: 'true' }) const requests = ctx.newChild('requests', {}, {}, childLogger) @@ -131,51 +131,43 @@ export function createServer (ctx: MeasureContext, config: Config): { app: Expre } } + const wrapRequest = + (ctx: MeasureContext, name: string, fn: AsyncRequestHandler) => + (req: Request, res: Response, next: NextFunction) => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + handleRequest(ctx, name, datalake, tempDir, fn, req, res, next) + } + app.use(morgan('short', { stream: new LogStream() })) - app.get('/blob/:workspace', withAuthorization, withWorkspace, wrapRequest(ctx, 'listBlobs', datalake, handleBlobList)) + app.get('/blob/:workspace', withAuthorization, withWorkspace, wrapRequest(ctx, 'listBlobs', handleBlobList)) - app.head('/blob/:workspace/:name', withBlob, wrapRequest(ctx, 'headBlob', datalake, handleBlobHead)) + app.head('/blob/:workspace/:name', withBlob, wrapRequest(ctx, 'headBlob', handleBlobHead)) - app.head('/blob/:workspace/:name/:filename', withBlob, wrapRequest(ctx, 'headBlob', datalake, handleBlobHead)) + app.head('/blob/:workspace/:name/:filename', withBlob, wrapRequest(ctx, 'headBlob', handleBlobHead)) - app.get('/blob/:workspace/:name', withBlob, wrapRequest(ctx, 'getBlob', datalake, handleBlobGet)) + app.get('/blob/:workspace/:name', withBlob, wrapRequest(ctx, 'getBlob', handleBlobGet)) - app.get('/blob/:workspace/:name/:filename', withBlob, wrapRequest(ctx, 'getBlob', datalake, handleBlobGet)) + app.get('/blob/:workspace/:name/:filename', withBlob, wrapRequest(ctx, 'getBlob', handleBlobGet)) - app.delete( - '/blob/:workspace/:name', - withAuthorization, - withBlob, - wrapRequest(ctx, 'deleteBlob', datalake, handleBlobDelete) - ) + app.delete('/blob/:workspace/:name', withAuthorization, withBlob, wrapRequest(ctx, 'deleteBlob', handleBlobDelete)) app.delete( '/blob/:workspace/:name/:filename', withAuthorization, withBlob, - wrapRequest(ctx, 'deleteBlob', datalake, handleBlobDelete) + wrapRequest(ctx, 'deleteBlob', handleBlobDelete) ) - app.delete( - '/blob/:workspace', - withAuthorization, - withWorkspace, - wrapRequest(ctx, 'deleteBlob', datalake, handleBlobDeleteList) - ) + app.delete('/blob/:workspace', withAuthorization, withWorkspace, wrapRequest(ctx, 'deleteBlob', handleBlobDeleteList)) // Blob meta - app.get('/meta/:workspace/:name', withAuthorization, withBlob, wrapRequest(ctx, 'getMeta', datalake, handleMetaGet)) + app.get('/meta/:workspace/:name', withAuthorization, withBlob, wrapRequest(ctx, 'getMeta', handleMetaGet)) - app.put('/meta/:workspace/:name', withAuthorization, withBlob, wrapRequest(ctx, 'putMeta', datalake, handleMetaPut)) + app.put('/meta/:workspace/:name', withAuthorization, withBlob, wrapRequest(ctx, 'putMeta', handleMetaPut)) - app.patch( - '/meta/:workspace/:name', - withAuthorization, - withBlob, - wrapRequest(ctx, 'patchMeta', datalake, handleMetaPatch) - ) + app.patch('/meta/:workspace/:name', withAuthorization, withBlob, wrapRequest(ctx, 'patchMeta', handleMetaPatch)) // Form Data upload @@ -183,7 +175,7 @@ export function createServer (ctx: MeasureContext, config: Config): { app: Expre '/upload/form-data/:workspace', withAuthorization, withWorkspace, - wrapRequest(ctx, 'uploadFormData', datalake, handleUploadFormData) + wrapRequest(ctx, 'uploadFormData', handleUploadFormData) ) // S3 upload @@ -192,48 +184,43 @@ export function createServer (ctx: MeasureContext, config: Config): { app: Expre '/upload/s3/:workspace', withAuthorization, withWorkspace, - wrapRequest(ctx, 's3UploadParams', datalake, handleS3CreateBlobParams) + wrapRequest(ctx, 's3UploadParams', handleS3CreateBlobParams) ) - app.post( - '/upload/s3/:workspace/:name', - withAuthorization, - withBlob, - wrapRequest(ctx, 's3Upload', datalake, handleS3CreateBlob) - ) + app.post('/upload/s3/:workspace/:name', withAuthorization, withBlob, wrapRequest(ctx, 's3Upload', handleS3CreateBlob)) // Multipart upload app.post( '/upload/multipart/:workspace/:name', withAuthorization, withBlob, - wrapRequest(ctx, 'multipartUploadStart', datalake, handleMultipartUploadStart) + wrapRequest(ctx, 'multipartUploadStart', handleMultipartUploadStart) ) app.put( '/upload/multipart/:workspace/:name/part', withAuthorization, withBlob, - wrapRequest(ctx, 'multipartUploadPart', datalake, handleMultipartUploadPart) + wrapRequest(ctx, 'multipartUploadPart', handleMultipartUploadPart) ) app.post( '/upload/multipart/:workspace/:name/complete', withAuthorization, withBlob, - wrapRequest(ctx, 'multipartUploadComplete', datalake, handleMultipartUploadComplete) + wrapRequest(ctx, 'multipartUploadComplete', handleMultipartUploadComplete) ) app.post( '/upload/multipart/:workspace/:name/abort', withAuthorization, withBlob, - wrapRequest(ctx, 'multipartUploadAvort', datalake, handleMultipartUploadAbort) + wrapRequest(ctx, 'multipartUploadAvort', handleMultipartUploadAbort) ) // Image - app.get('/image/:transform/:workspace/:name', withBlob, wrapRequest(ctx, 'transformImage', datalake, handleImageGet)) // no auth + app.get('/image/:transform/:workspace/:name', withBlob, wrapRequest(ctx, 'transformImage', handleImageGet)) // no auth app.use((err: any, _req: any, res: any, _next: any) => { ctx.error(err.message, { code: err.code, message: err.message }) @@ -287,7 +274,9 @@ export function createServer (ctx: MeasureContext, config: Config): { app: Expre return { app, - close: () => {} + close: () => { + void tempDir.close() + } } } diff --git a/services/datalake/pod-datalake/src/tempdir.ts b/services/datalake/pod-datalake/src/tempdir.ts new file mode 100644 index 0000000000..79d1243d90 --- /dev/null +++ b/services/datalake/pod-datalake/src/tempdir.ts @@ -0,0 +1,88 @@ +// +// Copyright © 2025 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { type MeasureContext, generateId } from '@hcengineering/core' +import { mkdtempSync, rmSync } from 'fs' +import { rm } from 'fs/promises' +import { tmpdir } from 'os' +import { join } from 'path' + +export class TemporaryDir { + readonly path: string + private readonly interval: NodeJS.Timeout + private pendingCleanup: Set + private cleanupPromise: Promise = Promise.resolve() + + constructor ( + private readonly ctx: MeasureContext, + name: string, + interval: number = 60 * 1000 + ) { + this.path = mkdtempSync(join(tmpdir(), name)) + this.pendingCleanup = new Set() + + ctx.info('using temp dir', { path: this.path }) + + this.interval = setInterval(() => { + void this.cleanup() + }, interval) + } + + tmpFile (): string { + return join(this.path, generateId()) + } + + rm (...files: string[]): void { + const normalizedPath = join(this.path) + for (const file of files) { + const normalizedFile = join(file) + + if (normalizedFile.startsWith(normalizedPath)) { + this.pendingCleanup.add(file) + } + } + } + + private async cleanup (): Promise { + await this.cleanupPromise + this.cleanupPromise = this.doCleanup() + await this.cleanupPromise + } + + private async doCleanup (): Promise { + const pendingCleanup = this.pendingCleanup + this.pendingCleanup = new Set() + + await this.ctx.with('cleanup', {}, () => { + const promises = Array.from(pendingCleanup).map(async (file) => { + try { + await rm(file, { force: true }) + } catch (err) { + const error = err instanceof Error ? err.message : String(err) + this.ctx.warn('failed to cleanup temp file', { file, error }) + } + }) + + this.ctx.info('temp files cleanup', { files: promises.length }) + return Promise.all(promises) + }) + } + + async close (): Promise { + clearInterval(this.interval) + await this.cleanupPromise + rmSync(this.path, { force: true, recursive: true }) + } +}