fix: datalake memory limit issue (#7018)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2024-10-23 14:33:20 +07:00 committed by GitHub
parent 0bfb79b9d5
commit 498b390b10
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -24,8 +24,8 @@ import { copyVideo, deleteVideo } from './video'
const expires = 86400
const cacheControl = `public,max-age=${expires}`
// 64MB hash limit
const HASH_LIMIT = 64 * 1024 * 1024
// 1MB hash limit
const HASH_LIMIT = 1 * 1024 * 1024
interface BlobMetadata {
lastModified: number
@ -121,6 +121,12 @@ export async function deleteBlob (env: Env, workspace: string, name: string): Pr
}
export async function postBlobFormData (request: Request, env: Env, workspace: string): Promise<Response> {
const contentType = request.headers.get('Content-Type')
if (contentType === null || !contentType.includes('multipart/form-data')) {
console.error({ error: 'expected multipart/form-data' })
return error(400, 'Expected multipart/form-data')
}
const sql = postgres(env.HYPERDRIVE.connectionString)
const formData = await request.formData()
@ -168,14 +174,11 @@ async function saveBlob (
const httpMetadata = { contentType: type, cacheControl }
const filename = getUniqueFilename()
const sha256hash = await getSha256(file)
if (sha256hash !== null) {
// Lucky boy, nothing to upload, use existing blob
const hash = sha256hash
if (file.size <= HASH_LIMIT) {
const hash = await getSha256(file)
const data = await db.getData(sql, { hash, location })
if (data !== null) {
// Lucky boy, nothing to upload, use existing blob
await db.createBlob(sql, { workspace, name, hash, location })
} else {
await bucket.put(filename, file, { httpMetadata })
@ -189,11 +192,7 @@ async function saveBlob (
} else {
// For large files we cannot calculate checksum beforehead
// upload file with unique filename and then obtain checksum
const object = await bucket.put(filename, file, { httpMetadata })
const hash =
object.checksums.md5 !== undefined ? getMd5Checksum(object.checksums.md5) : (crypto.randomUUID() as UUID)
const { hash } = await uploadLargeFile(bucket, file, filename, { httpMetadata })
const data = await db.getData(sql, { hash, location })
if (data !== null) {
// We found an existing blob with the same hash
@ -220,7 +219,7 @@ export async function handleBlobUploaded (env: Env, workspace: string, name: str
throw Error('blob not found')
}
const hash = object.checksums.md5 !== undefined ? getMd5Checksum(object.checksums.md5) : (crypto.randomUUID() as UUID)
const hash = object.checksums.md5 !== undefined ? digestToUUID(object.checksums.md5) : (crypto.randomUUID() as UUID)
const data = await db.getData(sql, { hash, location })
if (data !== null) {
@ -234,23 +233,40 @@ export async function handleBlobUploaded (env: Env, workspace: string, name: str
}
}
async function uploadLargeFile (
bucket: R2Bucket,
file: File,
filename: string,
options: R2PutOptions
): Promise<{ hash: UUID }> {
const digestStream = new crypto.DigestStream('SHA-256')
const fileStream = file.stream()
const [digestFS, uploadFS] = fileStream.tee()
const digestPromise = digestFS.pipeTo(digestStream)
const uploadPromise = bucket.put(filename, uploadFS, options)
await Promise.all([digestPromise, uploadPromise])
const hash = digestToUUID(await digestStream.digest)
return { hash }
}
function getUniqueFilename (): UUID {
return crypto.randomUUID() as UUID
}
async function getSha256 (file: File): Promise<UUID | null> {
if (file.size > HASH_LIMIT) {
return null
}
async function getSha256 (file: File): Promise<UUID> {
const digestStream = new crypto.DigestStream('SHA-256')
await file.stream().pipeTo(digestStream)
const digest = await digestStream.digest
return toUUID(new Uint8Array(digest))
return digestToUUID(digest)
}
function getMd5Checksum (digest: ArrayBuffer): UUID {
function digestToUUID (digest: ArrayBuffer): UUID {
return toUUID(new Uint8Array(digest))
}