diff --git a/workers/datalake/src/blob.ts b/workers/datalake/src/blob.ts index 7807aa9b11..e4478e8bfa 100644 --- a/workers/datalake/src/blob.ts +++ b/workers/datalake/src/blob.ts @@ -24,8 +24,8 @@ import { copyVideo, deleteVideo } from './video' const expires = 86400 const cacheControl = `public,max-age=${expires}` -// 64MB hash limit -const HASH_LIMIT = 64 * 1024 * 1024 +// 1MB hash limit +const HASH_LIMIT = 1 * 1024 * 1024 interface BlobMetadata { lastModified: number @@ -121,6 +121,12 @@ export async function deleteBlob (env: Env, workspace: string, name: string): Pr } export async function postBlobFormData (request: Request, env: Env, workspace: string): Promise { + const contentType = request.headers.get('Content-Type') + if (contentType === null || !contentType.includes('multipart/form-data')) { + console.error({ error: 'expected multipart/form-data' }) + return error(400, 'Expected multipart/form-data') + } + const sql = postgres(env.HYPERDRIVE.connectionString) const formData = await request.formData() @@ -168,14 +174,11 @@ async function saveBlob ( const httpMetadata = { contentType: type, cacheControl } const filename = getUniqueFilename() - const sha256hash = await getSha256(file) - - if (sha256hash !== null) { - // Lucky boy, nothing to upload, use existing blob - const hash = sha256hash - + if (file.size <= HASH_LIMIT) { + const hash = await getSha256(file) const data = await db.getData(sql, { hash, location }) if (data !== null) { + // Lucky boy, nothing to upload, use existing blob await db.createBlob(sql, { workspace, name, hash, location }) } else { await bucket.put(filename, file, { httpMetadata }) @@ -189,11 +192,7 @@ async function saveBlob ( } else { // For large files we cannot calculate checksum beforehead // upload file with unique filename and then obtain checksum - const object = await bucket.put(filename, file, { httpMetadata }) - - const hash = - object.checksums.md5 !== undefined ? getMd5Checksum(object.checksums.md5) : (crypto.randomUUID() as UUID) - + const { hash } = await uploadLargeFile(bucket, file, filename, { httpMetadata }) const data = await db.getData(sql, { hash, location }) if (data !== null) { // We found an existing blob with the same hash @@ -220,7 +219,7 @@ export async function handleBlobUploaded (env: Env, workspace: string, name: str throw Error('blob not found') } - const hash = object.checksums.md5 !== undefined ? getMd5Checksum(object.checksums.md5) : (crypto.randomUUID() as UUID) + const hash = object.checksums.md5 !== undefined ? digestToUUID(object.checksums.md5) : (crypto.randomUUID() as UUID) const data = await db.getData(sql, { hash, location }) if (data !== null) { @@ -234,23 +233,40 @@ export async function handleBlobUploaded (env: Env, workspace: string, name: str } } +async function uploadLargeFile ( + bucket: R2Bucket, + file: File, + filename: string, + options: R2PutOptions +): Promise<{ hash: UUID }> { + const digestStream = new crypto.DigestStream('SHA-256') + + const fileStream = file.stream() + const [digestFS, uploadFS] = fileStream.tee() + + const digestPromise = digestFS.pipeTo(digestStream) + const uploadPromise = bucket.put(filename, uploadFS, options) + + await Promise.all([digestPromise, uploadPromise]) + + const hash = digestToUUID(await digestStream.digest) + + return { hash } +} + function getUniqueFilename (): UUID { return crypto.randomUUID() as UUID } -async function getSha256 (file: File): Promise { - if (file.size > HASH_LIMIT) { - return null - } - +async function getSha256 (file: File): Promise { const digestStream = new crypto.DigestStream('SHA-256') await file.stream().pipeTo(digestStream) const digest = await digestStream.digest - return toUUID(new Uint8Array(digest)) + return digestToUUID(digest) } -function getMd5Checksum (digest: ArrayBuffer): UUID { +function digestToUUID (digest: ArrayBuffer): UUID { return toUUID(new Uint8Array(digest)) }