diff --git a/.vscode/launch.json b/.vscode/launch.json index e45be3c750..598c2370a0 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -745,6 +745,25 @@ "sourceMaps": true, "cwd": "${workspaceRoot}/services/telegram-bot/pod-telegram-bot" }, + { + "name": "Debug datalake", + "type": "node", + "request": "launch", + "args": ["src/index.ts"], + "env": { + "PORT": "4030", + "SECRET": "secret", + "DB_URL": "", + "BUCKETS": "", + "ACCOUNTS_URL": "http://localhost:3000", + "STATS_URL": "http://huly.local:4900", + "STREAM_URL": "http://huly.local:1080/recording" + }, + "runtimeArgs": ["--nolazy", "-r", "ts-node/register"], + "runtimeVersion": "20", + "sourceMaps": true, + "cwd": "${workspaceRoot}/services/datalake/pod-datalake" + }, { "type": "chrome", "name": "Attach to Browser", diff --git a/server/datalake/src/client.ts b/server/datalake/src/client.ts index 7f5a4635e0..4fe6790cc9 100644 --- a/server/datalake/src/client.ts +++ b/server/datalake/src/client.ts @@ -369,9 +369,7 @@ export class DatalakeClient { }) } - // R2 - - async getR2UploadParams (ctx: MeasureContext, workspace: WorkspaceUuid): Promise<R2UploadParams> { + async getS3UploadParams (ctx: MeasureContext, workspace: WorkspaceUuid): Promise<R2UploadParams> { const path = `/upload/s3/${workspace}` const url = concatLink(this.endpoint, path) @@ -380,7 +378,7 @@ export class DatalakeClient { return json } - async uploadFromR2 ( + async createFromS3 ( ctx: MeasureContext, workspace: WorkspaceUuid, objectName: string, diff --git a/services/datalake/pod-datalake/src/handlers/blob.ts b/services/datalake/pod-datalake/src/handlers/blob.ts index c3c1338fbd..e60c074371 100644 --- a/services/datalake/pod-datalake/src/handlers/blob.ts +++ b/services/datalake/pod-datalake/src/handlers/blob.ts @@ -123,7 +123,7 @@ export async function handleBlobHead ( res.setHeader('Last-Modified', new Date(head.lastModified).toUTCString()) res.setHeader('ETag', head.etag) - res.status(204).send() + res.status(200).send() } export async function handleBlobDelete ( diff --git a/services/datalake/pod-datalake/src/handlers/s3.ts b/services/datalake/pod-datalake/src/handlers/s3.ts index db53e5a65b..ee84a443f4 100644 --- a/services/datalake/pod-datalake/src/handlers/s3.ts +++ b/services/datalake/pod-datalake/src/handlers/s3.ts @@ -47,7 +47,7 @@ export async function handleS3CreateBlob ( res.status(200).send() } catch (err: any) { const error = err instanceof Error ? err.message : String(err) - console.error('failed to create blob', { workspace, name, error }) + ctx.error('failed to create blob', { workspace, name, error }) res.status(500).send() } } diff --git a/services/datalake/pod-datalake/src/s3/bucket.ts b/services/datalake/pod-datalake/src/s3/bucket.ts index 5d5879159e..c449b2ed73 100644 --- a/services/datalake/pod-datalake/src/s3/bucket.ts +++ b/services/datalake/pod-datalake/src/s3/bucket.ts @@ -41,47 +41,63 @@ class S3BucketImpl implements S3Bucket { ) {} async head (ctx: MeasureContext, key: string): Promise<S3Object | null> { - const result = await ctx.with('s3.headObject', {}, () => this.client.headObject({ Bucket: this.bucket, Key: key })) + try { + const result = await ctx.with('s3.headObject', {}, () => + this.client.headObject({ Bucket: this.bucket, Key: key }) + ) - return { - key, - etag: result.ETag ?? '', - size: result.ContentLength ?? 0, - contentType: result.ContentType ?? '', - lastModified: result.LastModified?.getTime() ?? 0, - cacheControl: result.CacheControl + return { + key, + etag: result.ETag ?? '', + size: result.ContentLength ?? 0, + contentType: result.ContentType ?? '', + lastModified: result.LastModified?.getTime() ?? 0, + cacheControl: result.CacheControl + } + } catch (err: any) { + if (err?.$metadata?.httpStatusCode !== 404) { + ctx.warn('no object found', { error: err, key }) + } + return null } } async get (ctx: MeasureContext, key: string, options?: S3GetOptions): Promise<S3ObjectBody | null> { - const command = { Bucket: this.bucket, Key: key, Range: options?.range } + try { + const command = { Bucket: this.bucket, Key: key, Range: options?.range } - const result = await ctx.with('s3.getObject', {}, () => this.client.getObject(command)) + const result = await ctx.with('s3.getObject', {}, () => this.client.getObject(command)) - if (result.Body === undefined) { + if (result.Body === undefined) { + return null + } + + const stream = result.Body?.transformToWebStream() + if (stream === undefined) { + return null + } + + const lastModified = + result.Metadata?.['last-modified'] !== undefined + ? new Date(result.Metadata['last-modified']).getTime() + : result.LastModified?.getTime() + + return { + key, + body: Readable.fromWeb(stream as ReadableStream<any>), + range: result.ContentRange, + etag: result.ETag ?? '', + size: result.ContentLength ?? 0, + contentType: result.ContentType ?? '', + lastModified: lastModified ?? 0, + cacheControl: result.CacheControl + } + } catch (err: any) { + if (err?.$metadata?.httpStatusCode !== 404) { + ctx.warn('no object found', { error: err, key }) + } return null } - - const stream = result.Body?.transformToWebStream() - if (stream === undefined) { - return null - } - - const lastModified = - result.Metadata?.['last-modified'] !== undefined - ? new Date(result.Metadata['last-modified']).getTime() - : result.LastModified?.getTime() - - return { - key, - body: Readable.fromWeb(stream as ReadableStream<any>), - range: result.ContentRange, - etag: result.ETag ?? '', - size: result.ContentLength ?? 0, - contentType: result.ContentType ?? '', - lastModified: lastModified ?? 0, - cacheControl: result.CacheControl - } } async put ( diff --git a/services/love/src/main.ts b/services/love/src/main.ts index 3cb38ee38e..ff350b2d41 100644 --- a/services/love/src/main.ts +++ b/services/love/src/main.ts @@ -311,7 +311,8 @@ const startRecord = async ( accessKey, region, secret, - bucket + bucket, + forcePathStyle: true }) } }) diff --git a/services/love/src/storage.ts b/services/love/src/storage.ts index 75b8d5e6de..fc5bd9f804 100644 --- a/services/love/src/storage.ts +++ b/services/love/src/storage.ts @@ -110,7 +110,7 @@ async function getS3UploadParamsDatalake ( ): Promise<S3UploadParams> { const token = generateToken(systemAccountUuid, undefined, { service: 'love' }) const client = createDatalakeClient(config, token) - const { bucket } = await client.getR2UploadParams(ctx, workspaceId) + const { bucket } = await client.getS3UploadParams(ctx, workspaceId) const endpoint = s3config.endpoint const accessKey = s3config.accessKey @@ -156,7 +156,7 @@ async function saveFileToDatalake ( const prefix = rootPrefix(s3config, wsIds.uuid) const uuid = stripPrefix(prefix, filename) - await client.uploadFromR2(ctx, wsIds.uuid, uuid, { filename: uuid }) + await client.createFromS3(ctx, wsIds.uuid, uuid, { filename: uuid }) return await storageAdapter.stat(ctx, wsIds, uuid) }