diff --git a/dev/tool/package.json b/dev/tool/package.json index 7e61c8e731..474d390fb2 100644 --- a/dev/tool/package.json +++ b/dev/tool/package.json @@ -150,6 +150,8 @@ "@hcengineering/telegram": "^0.6.21", "@hcengineering/tracker": "^0.6.24", "@hcengineering/collaboration": "^0.6.0", + "@hcengineering/datalake": "^0.6.0", + "@hcengineering/s3": "^0.6.0", "commander": "^8.1.0", "csv-parse": "~5.1.0", "email-addresses": "^5.0.0", diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 829c9eca48..789a97df91 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -118,8 +118,10 @@ import { changeConfiguration } from './configuration' import { moveAccountDbFromMongoToPG, moveFromMongoToPG, moveWorkspaceFromMongoToPG } from './db' import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin' import { fixAccountEmails, renameAccount } from './renameAccount' -import { moveFiles, showLostFiles } from './storage' +import { copyToDatalake, moveFiles, showLostFiles } from './storage' import { getModelVersion } from '@hcengineering/model-all' +import { type DatalakeConfig, DatalakeService, createDatalakeClient } from '@hcengineering/datalake' +import { S3Service, type S3Config } from '@hcengineering/s3' const colorConstants = { colorRed: '\u001b[31m', @@ -1184,6 +1186,54 @@ export function devTool ( await storageAdapter.close() }) + program + .command('copy-s3-datalake') + .description('migrate files from s3 to datalake') + .option('-w, --workspace ', 'Selected workspace only', '') + .option('-c, --concurrency ', 'Number of files being processed concurrently', '10') + .action(async (cmd: { workspace: string, concurrency: string }) => { + const params = { + concurrency: parseInt(cmd.concurrency) + } + + const storageConfig = storageConfigFromEnv(process.env.STORAGE) + + const storages = storageConfig.storages.filter((p) => p.kind === S3Service.config) as S3Config[] + if (storages.length === 0) { + throw new Error('S3 storage config is required') + } + + const datalakeConfig = storageConfig.storages.find((p) => p.kind === DatalakeService.config) + if (datalakeConfig === undefined) { + throw new Error('Datalake storage config is required') + } + + const datalake = createDatalakeClient(datalakeConfig as DatalakeConfig) + + let workspaces: Workspace[] = [] + const { dbUrl } = prepareTools() + await withDatabase(dbUrl, async (db) => { + workspaces = await listWorkspacesPure(db) + workspaces = workspaces + .filter((p) => p.mode !== 'archived') + .filter((p) => cmd.workspace === '' || p.workspace === cmd.workspace) + .sort((a, b) => b.lastVisit - a.lastVisit) + }) + + const count = workspaces.length + let index = 0 + for (const workspace of workspaces) { + index++ + toolCtx.info('processing workspace', { workspace: workspace.workspace, index, count }) + const workspaceId = getWorkspaceId(workspace.workspace) + + for (const config of storages) { + const storage = new S3Service(config) + await copyToDatalake(toolCtx, workspaceId, config, storage, datalake, params) + } + } + }) + program .command('confirm-email ') .description('confirm user email') diff --git a/dev/tool/src/storage.ts b/dev/tool/src/storage.ts index d13f254c12..80273dff9c 100644 --- a/dev/tool/src/storage.ts +++ b/dev/tool/src/storage.ts @@ -14,8 +14,17 @@ // import { type Attachment } from '@hcengineering/attachment' -import { type Blob, type MeasureContext, type Ref, type WorkspaceId, RateLimiter } from '@hcengineering/core' +import { + type Blob, + type MeasureContext, + type Ref, + type WorkspaceId, + concatLink, + RateLimiter +} from '@hcengineering/core' +import { type DatalakeClient } from '@hcengineering/datalake' import { DOMAIN_ATTACHMENT } from '@hcengineering/model-attachment' +import { type S3Config, type S3Service } from '@hcengineering/s3' import { type ListBlobResult, type StorageAdapter, @@ -249,3 +258,127 @@ async function retryOnFailure ( } throw lastError } + +export interface CopyDatalakeParams { + concurrency: number +} + +export async function copyToDatalake ( + ctx: MeasureContext, + workspaceId: WorkspaceId, + config: S3Config, + adapter: S3Service, + datalake: DatalakeClient, + params: CopyDatalakeParams +): Promise { + console.log('copying from', config.name, 'concurrency:', params.concurrency) + + const exists = await adapter.exists(ctx, workspaceId) + if (!exists) { + console.log('no files to copy') + return + } + + let time = Date.now() + let processedCnt = 0 + let skippedCnt = 0 + + function printStats (): void { + const duration = Date.now() - time + console.log('...processed', processedCnt, 'skipped', skippedCnt, Math.round(duration / 1000) + 's') + + time = Date.now() + } + + const rateLimiter = new RateLimiter(params.concurrency) + + const iterator = await adapter.listStream(ctx, workspaceId) + + try { + while (true) { + const batch = await iterator.next() + if (batch.length === 0) break + + for (const blob of batch) { + const objectName = blob._id + if (objectName.includes('%preview%') || objectName.includes('%size%') || objectName.endsWith('#history')) { + skippedCnt++ + continue + } + + await rateLimiter.add(async () => { + try { + await retryOnFailure( + ctx, + 5, + async () => { + await copyBlobToDatalake(ctx, workspaceId, blob, config, adapter, datalake) + processedCnt += 1 + }, + 50 + ) + } catch (err) { + console.error('failed to process blob', objectName, err) + } + }) + } + await rateLimiter.waitProcessing() + printStats() + } + + await rateLimiter.waitProcessing() + printStats() + } finally { + await iterator.close() + } +} + +export async function copyBlobToDatalake ( + ctx: MeasureContext, + workspaceId: WorkspaceId, + blob: ListBlobResult, + config: S3Config, + adapter: S3Service, + datalake: DatalakeClient +): Promise { + const objectName = blob._id + const stat = await datalake.statObject(ctx, workspaceId, objectName) + if (stat !== undefined) { + return + } + + if (blob.size < 1024 * 1024 * 64) { + // Handle small file + const { endpoint, accessKey: accessKeyId, secretKey: secretAccessKey, region } = config + + const bucketId = adapter.getBucketId(workspaceId) + const objectId = adapter.getDocumentKey(workspaceId, encodeURIComponent(objectName)) + const url = concatLink(endpoint, `${bucketId}/${objectId}`) + + const params = { url, accessKeyId, secretAccessKey, region } + await datalake.uploadFromS3(ctx, workspaceId, objectName, params) + } else { + // Handle huge file + const stat = await adapter.stat(ctx, workspaceId, objectName) + if (stat !== undefined) { + const metadata = { + lastModified: stat.modifiedOn, + name: objectName, + type: stat.contentType, + size: stat.size + } + const readable = await adapter.get(ctx, workspaceId, objectName) + try { + readable.on('end', () => { + readable.destroy() + }) + console.log('uploading huge blob', objectName, Math.round(stat.size / 1024 / 1024), 'MB') + const stream = readable.pipe(new PassThrough()) + await datalake.uploadMultipart(ctx, workspaceId, objectName, stream, metadata) + console.log('done', objectName) + } finally { + readable.destroy() + } + } + } +} diff --git a/packages/presentation/src/file.ts b/packages/presentation/src/file.ts index a49b011d4a..9e18569b38 100644 --- a/packages/presentation/src/file.ts +++ b/packages/presentation/src/file.ts @@ -258,8 +258,8 @@ async function uploadFileWithSignedUrl (file: File, uuid: string, uploadUrl: str method: 'PUT', headers: { 'Content-Type': file.type, - 'Content-Length': file.size.toString(), - 'x-amz-meta-last-modified': file.lastModified.toString() + 'Content-Length': file.size.toString() + // 'x-amz-meta-last-modified': file.lastModified.toString() } }) diff --git a/server/datalake/src/client.ts b/server/datalake/src/client.ts index ae3defb46b..042d785a0e 100644 --- a/server/datalake/src/client.ts +++ b/server/datalake/src/client.ts @@ -15,7 +15,7 @@ import { type MeasureContext, type WorkspaceId, concatLink } from '@hcengineering/core' import FormData from 'form-data' -import fetch, { type RequestInit, type Response } from 'node-fetch' +import fetch, { type RequestInfo, type RequestInit, type Response } from 'node-fetch' import { Readable } from 'stream' import { DatalakeError, NetworkError, NotFoundError } from './error' @@ -49,8 +49,18 @@ interface BlobUploadSuccess { type BlobUploadResult = BlobUploadSuccess | BlobUploadError +interface MultipartUpload { + key: string + uploadId: string +} + +interface MultipartUploadPart { + partNumber: number + etag: string +} + /** @public */ -export class Client { +export class DatalakeClient { constructor (private readonly endpoint: string) {} getObjectUrl (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): string { @@ -186,7 +196,7 @@ export class Client { } } - private async uploadWithFormData ( + async uploadWithFormData ( ctx: MeasureContext, workspace: WorkspaceId, objectName: string, @@ -221,7 +231,35 @@ export class Client { } } - private async uploadWithSignedURL ( + async uploadMultipart ( + ctx: MeasureContext, + workspace: WorkspaceId, + objectName: string, + stream: Readable | Buffer | string, + metadata: ObjectMetadata + ): Promise { + const chunkSize = 10 * 1024 * 1024 + + const multipart = await this.multipartUploadStart(ctx, workspace, objectName, metadata) + + try { + const parts: MultipartUploadPart[] = [] + + let partNumber = 1 + for await (const chunk of getChunks(stream, chunkSize)) { + const part = await this.multipartUploadPart(ctx, workspace, objectName, multipart, partNumber, chunk) + parts.push(part) + partNumber++ + } + + await this.multipartUploadComplete(ctx, workspace, objectName, multipart, parts) + } catch (err: any) { + await this.multipartUploadAbort(ctx, workspace, objectName, multipart) + throw err + } + } + + async uploadWithSignedURL ( ctx: MeasureContext, workspace: WorkspaceId, objectName: string, @@ -236,8 +274,8 @@ export class Client { method: 'PUT', headers: { 'Content-Type': metadata.type, - 'Content-Length': metadata.size?.toString() ?? '0', - 'x-amz-meta-last-modified': metadata.lastModified.toString() + 'Content-Length': metadata.size?.toString() ?? '0' + // 'x-amz-meta-last-modified': metadata.lastModified.toString() } }) } catch (err) { @@ -249,6 +287,30 @@ export class Client { await this.signObjectComplete(ctx, workspace, objectName) } + async uploadFromS3 ( + ctx: MeasureContext, + workspace: WorkspaceId, + objectName: string, + params: { + url: string + accessKeyId: string + secretAccessKey: string + } + ): Promise { + const path = `/upload/s3/${workspace.name}/${encodeURIComponent(objectName)}` + const url = concatLink(this.endpoint, path) + + await fetchSafe(ctx, url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify(params) + }) + } + + // Signed URL + private async signObjectSign (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { try { const url = this.getSignObjectUrl(workspace, objectName) @@ -284,9 +346,123 @@ export class Client { const path = `/upload/signed-url/${workspace.name}/${encodeURIComponent(objectName)}` return concatLink(this.endpoint, path) } + + // Multipart + + private async multipartUploadStart ( + ctx: MeasureContext, + workspace: WorkspaceId, + objectName: string, + metadata: ObjectMetadata + ): Promise { + const path = `/upload/multipart/${workspace.name}/${encodeURIComponent(objectName)}` + const url = concatLink(this.endpoint, path) + + try { + const headers = { + 'Content-Type': metadata.type, + 'Content-Length': metadata.size?.toString() ?? '0', + 'Last-Modified': new Date(metadata.lastModified).toUTCString() + } + const response = await fetchSafe(ctx, url, { method: 'POST', headers }) + return (await response.json()) as MultipartUpload + } catch (err: any) { + ctx.error('failed to start multipart upload', { workspace, objectName, err }) + throw new DatalakeError('Failed to start multipart upload') + } + } + + private async multipartUploadPart ( + ctx: MeasureContext, + workspace: WorkspaceId, + objectName: string, + multipart: MultipartUpload, + partNumber: number, + body: Readable | Buffer | string + ): Promise { + const path = `/upload/multipart/${workspace.name}/${encodeURIComponent(objectName)}/part` + const url = new URL(concatLink(this.endpoint, path)) + url.searchParams.append('key', multipart.key) + url.searchParams.append('uploadId', multipart.uploadId) + url.searchParams.append('partNumber', partNumber.toString()) + + try { + const response = await fetchSafe(ctx, url, { method: 'POST', body }) + return (await response.json()) as MultipartUploadPart + } catch (err: any) { + ctx.error('failed to abort multipart upload', { workspace, objectName, err }) + throw new DatalakeError('Failed to abort multipart upload') + } + } + + private async multipartUploadComplete ( + ctx: MeasureContext, + workspace: WorkspaceId, + objectName: string, + multipart: MultipartUpload, + parts: MultipartUploadPart[] + ): Promise { + const path = `/upload/multipart/${workspace.name}/${encodeURIComponent(objectName)}/complete` + const url = new URL(concatLink(this.endpoint, path)) + url.searchParams.append('key', multipart.key) + url.searchParams.append('uploadId', multipart.uploadId) + + try { + await fetchSafe(ctx, url, { method: 'POST', body: JSON.stringify({ parts }) }) + } catch (err: any) { + ctx.error('failed to complete multipart upload', { workspace, objectName, err }) + throw new DatalakeError('Failed to complete multipart upload') + } + } + + private async multipartUploadAbort ( + ctx: MeasureContext, + workspace: WorkspaceId, + objectName: string, + multipart: MultipartUpload + ): Promise { + const path = `/upload/multipart/${workspace.name}/${encodeURIComponent(objectName)}/abort` + const url = new URL(concatLink(this.endpoint, path)) + url.searchParams.append('key', multipart.key) + url.searchParams.append('uploadId', multipart.uploadId) + + try { + await fetchSafe(ctx, url, { method: 'POST' }) + } catch (err: any) { + ctx.error('failed to abort multipart upload', { workspace, objectName, err }) + throw new DatalakeError('Failed to abort multipart upload') + } + } } -async function fetchSafe (ctx: MeasureContext, url: string, init?: RequestInit): Promise { +async function * getChunks (data: Buffer | string | Readable, chunkSize: number): AsyncGenerator { + if (Buffer.isBuffer(data)) { + let offset = 0 + while (offset < data.length) { + yield data.subarray(offset, offset + chunkSize) + offset += chunkSize + } + } else if (typeof data === 'string') { + const buffer = Buffer.from(data) + yield * getChunks(buffer, chunkSize) + } else if (data instanceof Readable) { + let buffer = Buffer.alloc(0) + + for await (const chunk of data) { + buffer = Buffer.concat([buffer, chunk]) + + while (buffer.length >= chunkSize) { + yield buffer.subarray(0, chunkSize) + buffer = buffer.subarray(chunkSize) + } + } + if (buffer.length > 0) { + yield buffer + } + } +} + +async function fetchSafe (ctx: MeasureContext, url: RequestInfo, init?: RequestInit): Promise { let response try { response = await fetch(url, init) diff --git a/server/datalake/src/index.ts b/server/datalake/src/index.ts index c5f641dd11..d0f5c87469 100644 --- a/server/datalake/src/index.ts +++ b/server/datalake/src/index.ts @@ -24,22 +24,34 @@ import { type UploadedObjectInfo } from '@hcengineering/server-core' import { type Readable } from 'stream' -import { type ObjectMetadata, Client } from './client' +import { type ObjectMetadata, DatalakeClient } from './client' +export { DatalakeClient } + +/** + * @public + */ export interface DatalakeConfig extends StorageConfig { kind: 'datalake' } +/** + * @public + */ +export function createDatalakeClient (opt: DatalakeConfig): DatalakeClient { + const endpoint = Number.isInteger(opt.port) ? `${opt.endpoint}:${opt.port}` : opt.endpoint + return new DatalakeClient(endpoint) +} + /** * @public */ export class DatalakeService implements StorageAdapter { static config = 'datalake' - private readonly client: Client + private readonly client: DatalakeClient constructor (readonly opt: DatalakeConfig) { - const endpoint = Number.isInteger(opt.port) ? `${opt.endpoint}:${opt.port}` : opt.endpoint - this.client = new Client(endpoint) + this.client = createDatalakeClient(opt) } async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise {} diff --git a/workers/datalake/src/blob.ts b/workers/datalake/src/blob.ts index b58c51d380..7f118e3a90 100644 --- a/workers/datalake/src/blob.ts +++ b/workers/datalake/src/blob.ts @@ -16,17 +16,13 @@ import { error, json } from 'itty-router' import postgres from 'postgres' import * as db from './db' +import { cacheControl, hashLimit } from './const' import { toUUID } from './encodings' +import { getSha256 } from './hash' import { selectStorage } from './storage' import { type BlobRequest, type WorkspaceRequest, type UUID } from './types' import { copyVideo, deleteVideo } from './video' -const expires = 86400 -const cacheControl = `public,max-age=${expires}` - -// 1MB hash limit -const HASH_LIMIT = 1 * 1024 * 1024 - interface BlobMetadata { lastModified: number type: string @@ -143,7 +139,7 @@ export async function handleUploadFormData (request: WorkspaceRequest, env: Env) files.map(async ([file, key]) => { const { name, type, lastModified } = file try { - const metadata = await saveBlob(env, sql, file, type, workspace, name, lastModified) + const metadata = await saveBlob(env, sql, file.stream(), file.size, type, workspace, name, lastModified) // TODO this probably should happen via queue, let it be here for now if (type.startsWith('video/')) { @@ -163,10 +159,11 @@ export async function handleUploadFormData (request: WorkspaceRequest, env: Env) return json(result) } -async function saveBlob ( +export async function saveBlob ( env: Env, sql: postgres.Sql, - file: File, + stream: ReadableStream, + size: number, type: string, workspace: string, name: string, @@ -174,18 +171,19 @@ async function saveBlob ( ): Promise { const { location, bucket } = selectStorage(env, workspace) - const size = file.size - const httpMetadata = { contentType: type, cacheControl } + const httpMetadata = { contentType: type, cacheControl, lastModified } const filename = getUniqueFilename() - if (file.size <= HASH_LIMIT) { - const hash = await getSha256(file) + if (size <= hashLimit) { + const [hashStream, uploadStream] = stream.tee() + + const hash = await getSha256(hashStream) const data = await db.getData(sql, { hash, location }) if (data !== null) { // Lucky boy, nothing to upload, use existing blob await db.createBlob(sql, { workspace, name, hash, location }) } else { - await bucket.put(filename, file, { httpMetadata }) + await bucket.put(filename, uploadStream, { httpMetadata }) await sql.begin((sql) => [ db.createData(sql, { hash, location, filename, type, size }), db.createBlob(sql, { workspace, name, hash, location }) @@ -196,7 +194,7 @@ async function saveBlob ( } else { // For large files we cannot calculate checksum beforehead // upload file with unique filename and then obtain checksum - const { hash } = await uploadLargeFile(bucket, file, filename, { httpMetadata }) + const { hash } = await uploadLargeFile(bucket, stream, filename, { httpMetadata }) const data = await db.getData(sql, { hash, location }) if (data !== null) { // We found an existing blob with the same hash @@ -239,14 +237,13 @@ export async function handleBlobUploaded (env: Env, workspace: string, name: str async function uploadLargeFile ( bucket: R2Bucket, - file: File, + stream: ReadableStream, filename: string, options: R2PutOptions ): Promise<{ hash: UUID }> { const digestStream = new crypto.DigestStream('SHA-256') - const fileStream = file.stream() - const [digestFS, uploadFS] = fileStream.tee() + const [digestFS, uploadFS] = stream.tee() const digestPromise = digestFS.pipeTo(digestStream) const uploadPromise = bucket.put(filename, uploadFS, options) @@ -262,14 +259,6 @@ function getUniqueFilename (): UUID { return crypto.randomUUID() as UUID } -async function getSha256 (file: File): Promise { - const digestStream = new crypto.DigestStream('SHA-256') - await file.stream().pipeTo(digestStream) - const digest = await digestStream.digest - - return digestToUUID(digest) -} - function digestToUUID (digest: ArrayBuffer): UUID { return toUUID(new Uint8Array(digest)) } diff --git a/workers/datalake/src/const.ts b/workers/datalake/src/const.ts new file mode 100644 index 0000000000..f267ea4594 --- /dev/null +++ b/workers/datalake/src/const.ts @@ -0,0 +1,19 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +export const expires = 86400 +export const cacheControl = `public,max-age=${expires}` + +export const hashLimit = 1 * 1024 * 1024 diff --git a/workers/datalake/src/hash.ts b/workers/datalake/src/hash.ts new file mode 100644 index 0000000000..10c8b93ff3 --- /dev/null +++ b/workers/datalake/src/hash.ts @@ -0,0 +1,25 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { toUUID } from './encodings' +import { type UUID } from './types' + +export async function getSha256 (stream: ReadableStream): Promise { + const digestStream = new crypto.DigestStream('SHA-256') + await stream.pipeTo(digestStream) + const digest = await digestStream.digest + + return toUUID(new Uint8Array(digest)) +} diff --git a/workers/datalake/src/index.ts b/workers/datalake/src/index.ts index bc27f83249..d61b35e70f 100644 --- a/workers/datalake/src/index.ts +++ b/workers/datalake/src/index.ts @@ -19,8 +19,15 @@ import { type IRequestStrict, type RequestHandler, Router, error, html } from 'i import { handleBlobDelete, handleBlobGet, handleBlobHead, handleUploadFormData } from './blob' import { cors } from './cors' import { handleImageGet } from './image' +import { handleS3Blob } from './s3' import { handleVideoMetaGet } from './video' import { handleSignAbort, handleSignComplete, handleSignCreate } from './sign' +import { + handleMultipartUploadStart, + handleMultipartUploadPart, + handleMultipartUploadComplete, + handleMultipartUploadAbort +} from './multipart' import { type BlobRequest, type WorkspaceRequest } from './types' const { preflight, corsify } = cors({ @@ -64,6 +71,13 @@ router .post('/upload/signed-url/:workspace/:name', withBlob, handleSignCreate) .put('/upload/signed-url/:workspace/:name', withBlob, handleSignComplete) .delete('/upload/signed-url/:workspace/:name', withBlob, handleSignAbort) + // Multipart + .post('/upload/multipart/:workspace/:name', withBlob, handleMultipartUploadStart) + .post('/upload/multipart/:workspace/:name/part', withBlob, handleMultipartUploadPart) + .post('/upload/multipart/:workspace/:name/complete', withBlob, handleMultipartUploadComplete) + .post('/upload/multipart/:workspace/:name/abort', withBlob, handleMultipartUploadAbort) + // S3 + .post('/upload/s3/:workspace/:name', withBlob, handleS3Blob) .all('/', () => html( `Huly® Datalake™ https://huly.io diff --git a/workers/datalake/src/multipart.ts b/workers/datalake/src/multipart.ts new file mode 100644 index 0000000000..c842bbe522 --- /dev/null +++ b/workers/datalake/src/multipart.ts @@ -0,0 +1,144 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { error, json } from 'itty-router' +import postgres from 'postgres' +import * as db from './db' +import { cacheControl } from './const' +import { toUUID } from './encodings' +import { selectStorage } from './storage' +import { type BlobRequest, type UUID } from './types' + +export interface MultipartUpload { + key: string + uploadId: string +} + +export interface MultipartUploadPart { + partNumber: number + etag: string +} + +export interface MultipartUploadCompleteRequest { + parts: MultipartUploadPart[] +} + +export async function handleMultipartUploadStart ( + request: BlobRequest, + env: Env, + ctx: ExecutionContext +): Promise { + const { workspace } = request + + const { bucket } = selectStorage(env, workspace) + + const contentType = request.headers.get('content-type') ?? 'application/octet-stream' + const lastModifiedHeader = request.headers.get('last-modified') + const lastModified = lastModifiedHeader !== null ? new Date(lastModifiedHeader).getTime() : Date.now() + const httpMetadata = { contentType, cacheControl, lastModified } + const uuid = crypto.randomUUID() as UUID + + const multipart = await bucket.createMultipartUpload(uuid, { httpMetadata }) + return json({ key: multipart.key, uploadId: multipart.uploadId }) +} + +export async function handleMultipartUploadPart ( + request: BlobRequest, + env: Env, + ctx: ExecutionContext +): Promise { + const { workspace } = request + + const multipartKey = request.query?.key + const multipartUploadId = request.query?.uploadId + const partNumber = request.query?.partNumber + if (typeof multipartKey !== 'string' || typeof multipartUploadId !== 'string' || typeof partNumber !== 'string') { + return error(400, 'missing key or uploadId or partNumber') + } + + if (request.body === null) { + return error(400, 'missing body') + } + + const { bucket } = selectStorage(env, workspace) + + const upload = bucket.resumeMultipartUpload(multipartKey, multipartUploadId) + const part = await upload.uploadPart(Number.parseInt(partNumber), request.body) + + return json({ partNumber: part.partNumber, etag: part.etag }) +} + +export async function handleMultipartUploadComplete ( + request: BlobRequest, + env: Env, + ctx: ExecutionContext +): Promise { + const sql = postgres(env.HYPERDRIVE.connectionString) + + const { workspace, name } = request + + const multipartKey = request.query?.key + const multipartUploadId = request.query?.uploadId + if (typeof multipartKey !== 'string' || typeof multipartUploadId !== 'string') { + return error(400, 'missing key or uploadId') + } + + const { parts } = await request.json() + + const { bucket, location } = selectStorage(env, workspace) + + const upload = bucket.resumeMultipartUpload(multipartKey, multipartUploadId) + const object = await upload.complete(parts) + + const hash = + object.checksums.md5 !== undefined ? toUUID(new Uint8Array(object.checksums.md5)) : (crypto.randomUUID() as UUID) + const type = object.httpMetadata?.contentType ?? 'application/octet-stream' + const size = object.size ?? 0 + const filename = multipartKey as UUID + + const data = await db.getData(sql, { hash, location }) + if (data !== null) { + // blob already exists + await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })]) + } else { + // Otherwise register a new hash and blob + await sql.begin((sql) => [ + db.createData(sql, { hash, location, filename, type, size }), + db.createBlob(sql, { workspace, name, hash, location }) + ]) + } + + return new Response(null, { status: 204 }) +} + +export async function handleMultipartUploadAbort ( + request: BlobRequest, + env: Env, + ctx: ExecutionContext +): Promise { + const { workspace } = request + + const multipartKey = request.query?.key + const multipartUploadId = request.query?.uploadId + if (typeof multipartKey !== 'string' || typeof multipartUploadId !== 'string') { + return error(400, 'missing key or uploadId') + } + + const { bucket } = selectStorage(env, workspace) + + const upload = bucket.resumeMultipartUpload(multipartKey, multipartUploadId) + await upload.abort() + return new Response(null, { status: 204 }) +} diff --git a/workers/datalake/src/s3.ts b/workers/datalake/src/s3.ts new file mode 100644 index 0000000000..0c8f4f46ac --- /dev/null +++ b/workers/datalake/src/s3.ts @@ -0,0 +1,71 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { AwsClient } from 'aws4fetch' +import { error, json } from 'itty-router' +import postgres from 'postgres' +import * as db from './db' +import { saveBlob } from './blob' +import { type BlobRequest } from './types' + +export interface S3UploadPayload { + url: string + region: string + accessKeyId: string + secretAccessKey: string +} + +function getS3Client (payload: S3UploadPayload): AwsClient { + return new AwsClient({ + service: 's3', + region: payload.region, + accessKeyId: payload.accessKeyId, + secretAccessKey: payload.secretAccessKey + }) +} + +export async function handleS3Blob (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise { + const { workspace, name } = request + const sql = postgres(env.HYPERDRIVE.connectionString) + + const payload = await request.json() + + const client = getS3Client(payload) + + // Ensure the blob does not exist + const blob = await db.getBlob(sql, { workspace, name }) + if (blob !== null) { + return new Response(null, { status: 200 }) + } + + const object = await client.fetch(payload.url) + if (!object.ok || object.status !== 200) { + return error(object.status) + } + + if (object.body === null) { + return error(400) + } + + const contentType = object.headers.get('content-type') ?? 'application/octet-stream' + const contentLengthHeader = object.headers.get('content-length') ?? '0' + const lastModifiedHeader = object.headers.get('last-modified') + + const contentLength = Number.parseInt(contentLengthHeader) + const lastModified = lastModifiedHeader !== null ? new Date(lastModifiedHeader).getTime() : Date.now() + + const result = await saveBlob(env, sql, object.body, contentLength, contentType, workspace, name, lastModified) + return json(result) +}