From 498b390b108eea202f3b878a91874798081e4c60 Mon Sep 17 00:00:00 2001
From: Alexander Onnikov <aonnikov@hardcoreeng.com>
Date: Wed, 23 Oct 2024 14:33:20 +0700
Subject: [PATCH] fix: datalake memory limit issue (#7018)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
---
 workers/datalake/src/blob.ts | 58 +++++++++++++++++++++++-------------
 1 file changed, 37 insertions(+), 21 deletions(-)

diff --git a/workers/datalake/src/blob.ts b/workers/datalake/src/blob.ts
index 7807aa9b11..e4478e8bfa 100644
--- a/workers/datalake/src/blob.ts
+++ b/workers/datalake/src/blob.ts
@@ -24,8 +24,8 @@ import { copyVideo, deleteVideo } from './video'
 const expires = 86400
 const cacheControl = `public,max-age=${expires}`
 
-// 64MB hash limit
-const HASH_LIMIT = 64 * 1024 * 1024
+// 1MB hash limit
+const HASH_LIMIT = 1 * 1024 * 1024
 
 interface BlobMetadata {
   lastModified: number
@@ -121,6 +121,12 @@ export async function deleteBlob (env: Env, workspace: string, name: string): Pr
 }
 
 export async function postBlobFormData (request: Request, env: Env, workspace: string): Promise<Response> {
+  const contentType = request.headers.get('Content-Type')
+  if (contentType === null || !contentType.includes('multipart/form-data')) {
+    console.error({ error: 'expected multipart/form-data' })
+    return error(400, 'Expected multipart/form-data')
+  }
+
   const sql = postgres(env.HYPERDRIVE.connectionString)
   const formData = await request.formData()
 
@@ -168,14 +174,11 @@ async function saveBlob (
   const httpMetadata = { contentType: type, cacheControl }
   const filename = getUniqueFilename()
 
-  const sha256hash = await getSha256(file)
-
-  if (sha256hash !== null) {
-    // Lucky boy, nothing to upload, use existing blob
-    const hash = sha256hash
-
+  if (file.size <= HASH_LIMIT) {
+    const hash = await getSha256(file)
     const data = await db.getData(sql, { hash, location })
     if (data !== null) {
+      // Lucky boy, nothing to upload, use existing blob
       await db.createBlob(sql, { workspace, name, hash, location })
     } else {
       await bucket.put(filename, file, { httpMetadata })
@@ -189,11 +192,7 @@ async function saveBlob (
   } else {
     // For large files we cannot calculate checksum beforehead
     // upload file with unique filename and then obtain checksum
-    const object = await bucket.put(filename, file, { httpMetadata })
-
-    const hash =
-      object.checksums.md5 !== undefined ? getMd5Checksum(object.checksums.md5) : (crypto.randomUUID() as UUID)
-
+    const { hash } = await uploadLargeFile(bucket, file, filename, { httpMetadata })
     const data = await db.getData(sql, { hash, location })
     if (data !== null) {
       // We found an existing blob with the same hash
@@ -220,7 +219,7 @@ export async function handleBlobUploaded (env: Env, workspace: string, name: str
     throw Error('blob not found')
   }
 
-  const hash = object.checksums.md5 !== undefined ? getMd5Checksum(object.checksums.md5) : (crypto.randomUUID() as UUID)
+  const hash = object.checksums.md5 !== undefined ? digestToUUID(object.checksums.md5) : (crypto.randomUUID() as UUID)
 
   const data = await db.getData(sql, { hash, location })
   if (data !== null) {
@@ -234,23 +233,40 @@ export async function handleBlobUploaded (env: Env, workspace: string, name: str
   }
 }
 
+async function uploadLargeFile (
+  bucket: R2Bucket,
+  file: File,
+  filename: string,
+  options: R2PutOptions
+): Promise<{ hash: UUID }> {
+  const digestStream = new crypto.DigestStream('SHA-256')
+
+  const fileStream = file.stream()
+  const [digestFS, uploadFS] = fileStream.tee()
+
+  const digestPromise = digestFS.pipeTo(digestStream)
+  const uploadPromise = bucket.put(filename, uploadFS, options)
+
+  await Promise.all([digestPromise, uploadPromise])
+
+  const hash = digestToUUID(await digestStream.digest)
+
+  return { hash }
+}
+
 function getUniqueFilename (): UUID {
   return crypto.randomUUID() as UUID
 }
 
-async function getSha256 (file: File): Promise<UUID | null> {
-  if (file.size > HASH_LIMIT) {
-    return null
-  }
-
+async function getSha256 (file: File): Promise<UUID> {
   const digestStream = new crypto.DigestStream('SHA-256')
   await file.stream().pipeTo(digestStream)
   const digest = await digestStream.digest
 
-  return toUUID(new Uint8Array(digest))
+  return digestToUUID(digest)
 }
 
-function getMd5Checksum (digest: ArrayBuffer): UUID {
+function digestToUUID (digest: ArrayBuffer): UUID {
   return toUUID(new Uint8Array(digest))
 }