diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index 1e5866b0ac..7cb4e50d43 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -3988,7 +3988,7 @@ packages: version: 0.0.0 '@rush-temp/datalake@file:projects/datalake.tgz': - resolution: {integrity: sha512-UX1RJWMtrQY5HWrFKnwi2vrRYfR8ZSRo2PtLn04ozWueiiLS3Q61UauAUfPDRtO0K5cJgecH7+gX750dx8oUhQ==, tarball: file:projects/datalake.tgz} + resolution: {integrity: sha512-1vIORSiK/b09GyLIUKWdeisoiIuYbPozOorFt23QzYIszVe/OGZR4Zhqgo/7LFUt00tgFyNcBIUYyWg9xd4uDQ==, tarball: file:projects/datalake.tgz} version: 0.0.0 '@rush-temp/desktop-1@file:projects/desktop-1.tgz': diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 88cd088932..64bef7b067 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -1279,10 +1279,11 @@ export function devTool ( blobsSize: workspace.backupInfo?.blobsSize ?? 0 }) const workspaceId = getWorkspaceId(workspace.workspace) + const token = generateToken(systemAccountEmail, workspaceId) for (const config of storages) { const storage = new S3Service(config) - await copyToDatalake(toolCtx, workspaceId, config, storage, datalake, params) + await copyToDatalake(toolCtx, workspaceId, config, storage, datalake, token, params) } } }) diff --git a/dev/tool/src/storage.ts b/dev/tool/src/storage.ts index 9d550abdbc..f1aec4f0f0 100644 --- a/dev/tool/src/storage.ts +++ b/dev/tool/src/storage.ts @@ -271,6 +271,7 @@ export async function copyToDatalake ( config: S3Config, adapter: S3Service, datalake: DatalakeClient, + token: string, params: CopyDatalakeParams ): Promise { console.log('copying from', config.name, 'concurrency:', params.concurrency) @@ -311,7 +312,7 @@ export async function copyToDatalake ( let cursor: string | undefined = '' let hasMore = true while (hasMore) { - const res = await datalake.listObjects(ctx, workspaceId, cursor, 1000) + const res = await datalake.listObjects(ctx, token, workspaceId, cursor, 1000) cursor = res.cursor hasMore = res.cursor !== undefined for (const blob of res.blobs) { @@ -349,7 +350,7 @@ export async function copyToDatalake ( ctx, 5, async () => { - await copyBlobToDatalake(ctx, workspaceId, blob, config, adapter, datalake) + await copyBlobToDatalake(ctx, workspaceId, blob, config, adapter, datalake, token) processedCnt += 1 processedSize += blob.size }, @@ -378,7 +379,8 @@ export async function copyBlobToDatalake ( blob: ListBlobResult, config: S3Config, adapter: S3Service, - datalake: DatalakeClient + datalake: DatalakeClient, + token: string ): Promise { const objectName = blob._id if (blob.size < 1024 * 1024 * 64) { @@ -390,7 +392,7 @@ export async function copyBlobToDatalake ( const url = concatLink(endpoint, `${bucketId}/${objectId}`) const params = { url, accessKeyId, secretAccessKey, region } - await datalake.uploadFromS3(ctx, workspaceId, objectName, params) + await datalake.uploadFromS3(ctx, token, workspaceId, objectName, params) } else { // Handle huge file const stat = await adapter.stat(ctx, workspaceId, objectName) @@ -405,7 +407,7 @@ export async function copyBlobToDatalake ( const readable = await adapter.get(ctx, workspaceId, objectName) try { console.log('uploading huge blob', objectName, Math.round(stat.size / 1024 / 1024), 'MB') - await uploadMultipart(ctx, datalake, workspaceId, objectName, readable, metadata) + await uploadMultipart(ctx, token, datalake, workspaceId, objectName, readable, metadata) console.log('done', objectName) } finally { readable.destroy() @@ -416,6 +418,7 @@ export async function copyBlobToDatalake ( function uploadMultipart ( ctx: MeasureContext, + token: string, datalake: DatalakeClient, workspaceId: WorkspaceId, objectName: string, @@ -446,7 +449,7 @@ function uploadMultipart ( stream.pipe(passthrough) datalake - .uploadMultipart(ctx, workspaceId, objectName, passthrough, metadata) + .uploadMultipart(ctx, token, workspaceId, objectName, passthrough, metadata) .then(() => { cleanup() resolve() diff --git a/server/datalake/package.json b/server/datalake/package.json index 809cdd2cda..c099d7de73 100644 --- a/server/datalake/package.json +++ b/server/datalake/package.json @@ -39,6 +39,7 @@ "@hcengineering/core": "^0.6.32", "@hcengineering/platform": "^0.6.11", "@hcengineering/server-core": "^0.6.1", + "@hcengineering/server-token": "^0.6.11", "node-fetch": "^2.6.6", "form-data": "^4.0.0" } diff --git a/server/datalake/src/client.ts b/server/datalake/src/client.ts index 8a27e6eb66..151f9668ec 100644 --- a/server/datalake/src/client.ts +++ b/server/datalake/src/client.ts @@ -89,6 +89,7 @@ export class DatalakeClient { async listObjects ( ctx: MeasureContext, + token: string, workspace: WorkspaceId, cursor: string | undefined, limit: number = 100 @@ -100,16 +101,16 @@ export class DatalakeClient { url.searchParams.append('cursor', cursor) } - const response = await fetchSafe(ctx, url) + const response = await fetchSafe(ctx, url, { headers: { Authorization: 'Bearer ' + token } }) return (await response.json()) as ListObjectOutput } - async getObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { + async getObject (ctx: MeasureContext, token: string, workspace: WorkspaceId, objectName: string): Promise { const url = this.getObjectUrl(ctx, workspace, objectName) let response try { - response = await fetchSafe(ctx, url) + response = await fetchSafe(ctx, url, { headers: { Authorization: 'Bearer ' + token } }) } catch (err: any) { if (err.name !== 'NotFoundError') { console.error('failed to get object', { workspace, objectName, err }) @@ -127,6 +128,7 @@ export class DatalakeClient { async getPartialObject ( ctx: MeasureContext, + token: string, workspace: WorkspaceId, objectName: string, offset: number, @@ -134,6 +136,7 @@ export class DatalakeClient { ): Promise { const url = this.getObjectUrl(ctx, workspace, objectName) const headers = { + Authorization: 'Bearer ' + token, Range: length !== undefined ? `bytes=${offset}-${offset + length - 1}` : `bytes=${offset}` } @@ -157,6 +160,7 @@ export class DatalakeClient { async statObject ( ctx: MeasureContext, + token: string, workspace: WorkspaceId, objectName: string ): Promise { @@ -164,7 +168,10 @@ export class DatalakeClient { let response: Response try { - response = await fetchSafe(ctx, url, { method: 'HEAD' }) + response = await fetchSafe(ctx, url, { + method: 'HEAD', + headers: { Authorization: 'Bearer ' + token } + }) } catch (err: any) { if (err.name === 'NotFoundError') { return @@ -185,10 +192,13 @@ export class DatalakeClient { } } - async deleteObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { + async deleteObject (ctx: MeasureContext, token: string, workspace: WorkspaceId, objectName: string): Promise { const url = this.getObjectUrl(ctx, workspace, objectName) try { - await fetchSafe(ctx, url, { method: 'DELETE' }) + await fetchSafe(ctx, url, { + method: 'DELETE', + headers: { Authorization: 'Bearer ' + token } + }) } catch (err: any) { if (err.name !== 'NotFoundError') { console.error('failed to delete object', { workspace, objectName, err }) @@ -199,6 +209,7 @@ export class DatalakeClient { async putObject ( ctx: MeasureContext, + token: string, workspace: WorkspaceId, objectName: string, stream: Readable | Buffer | string, @@ -219,11 +230,11 @@ export class DatalakeClient { try { if (size === undefined || size < 64 * 1024 * 1024) { return await ctx.with('direct-upload', {}, (ctx) => - this.uploadWithFormData(ctx, workspace, objectName, stream, { ...params, size }) + this.uploadWithFormData(ctx, token, workspace, objectName, stream, { ...params, size }) ) } else { return await ctx.with('signed-url-upload', {}, (ctx) => - this.uploadWithSignedURL(ctx, workspace, objectName, stream, { ...params, size }) + this.uploadWithSignedURL(ctx, token, workspace, objectName, stream, { ...params, size }) ) } } catch (err) { @@ -234,6 +245,7 @@ export class DatalakeClient { async uploadWithFormData ( ctx: MeasureContext, + token: string, workspace: WorkspaceId, objectName: string, stream: Readable | Buffer | string, @@ -253,7 +265,11 @@ export class DatalakeClient { } form.append('file', stream, options) - const response = await fetchSafe(ctx, url, { method: 'POST', body: form }) + const response = await fetchSafe(ctx, url, { + method: 'POST', + body: form, + headers: { Authorization: 'Bearer ' + token } + }) const result = (await response.json()) as BlobUploadResult[] if (result.length !== 1) { @@ -271,6 +287,7 @@ export class DatalakeClient { async uploadMultipart ( ctx: MeasureContext, + token: string, workspace: WorkspaceId, objectName: string, stream: Readable | Buffer | string, @@ -278,39 +295,41 @@ export class DatalakeClient { ): Promise { const chunkSize = 10 * 1024 * 1024 - const multipart = await this.multipartUploadStart(ctx, workspace, objectName, params) + const multipart = await this.multipartUploadStart(ctx, token, workspace, objectName, params) try { const parts: MultipartUploadPart[] = [] let partNumber = 1 for await (const chunk of getChunks(stream, chunkSize)) { - const part = await this.multipartUploadPart(ctx, workspace, objectName, multipart, partNumber, chunk) + const part = await this.multipartUploadPart(ctx, token, workspace, objectName, multipart, partNumber, chunk) parts.push(part) partNumber++ } - return await this.multipartUploadComplete(ctx, workspace, objectName, multipart, parts) + return await this.multipartUploadComplete(ctx, token, workspace, objectName, multipart, parts) } catch (err: any) { - await this.multipartUploadAbort(ctx, workspace, objectName, multipart) + await this.multipartUploadAbort(ctx, token, workspace, objectName, multipart) throw err } } async uploadWithSignedURL ( ctx: MeasureContext, + token: string, workspace: WorkspaceId, objectName: string, stream: Readable | Buffer | string, params: UploadObjectParams ): Promise { - const url = await this.signObjectSign(ctx, workspace, objectName) + const url = await this.signObjectSign(ctx, token, workspace, objectName) try { await fetchSafe(ctx, url, { body: stream, method: 'PUT', headers: { + Authorization: 'Bearer ' + token, 'Content-Type': params.type, 'Content-Length': params.size?.toString() ?? '0' // 'x-amz-meta-last-modified': metadata.lastModified.toString() @@ -318,17 +337,18 @@ export class DatalakeClient { }) } catch (err) { ctx.error('failed to upload via signed url', { workspace, objectName, err }) - await this.signObjectDelete(ctx, workspace, objectName) + await this.signObjectDelete(ctx, token, workspace, objectName) throw new DatalakeError('Failed to upload via signed URL') } - return await this.signObjectComplete(ctx, workspace, objectName) + return await this.signObjectComplete(ctx, token, workspace, objectName) } // S3 async uploadFromS3 ( ctx: MeasureContext, + token: string, workspace: WorkspaceId, objectName: string, params: { @@ -343,6 +363,7 @@ export class DatalakeClient { await fetchSafe(ctx, url, { method: 'POST', headers: { + Authorization: 'Bearer ' + token, 'Content-Type': 'application/json' }, body: JSON.stringify(params) @@ -351,17 +372,18 @@ export class DatalakeClient { // R2 - async getR2UploadParams (ctx: MeasureContext, workspace: WorkspaceId): Promise { + async getR2UploadParams (ctx: MeasureContext, token: string, workspace: WorkspaceId): Promise { const path = `/upload/r2/${workspace.name}` const url = concatLink(this.endpoint, path) - const response = await fetchSafe(ctx, url) + const response = await fetchSafe(ctx, url, { headers: { Authorization: 'Bearer ' + token } }) const json = (await response.json()) as R2UploadParams return json } async uploadFromR2 ( ctx: MeasureContext, + token: string, workspace: WorkspaceId, objectName: string, params: { @@ -374,6 +396,7 @@ export class DatalakeClient { await fetchSafe(ctx, url, { method: 'POST', headers: { + Authorization: 'Bearer ' + token, 'Content-Type': 'application/json' }, body: JSON.stringify(params) @@ -382,10 +405,15 @@ export class DatalakeClient { // Signed URL - private async signObjectSign (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { + private async signObjectSign ( + ctx: MeasureContext, + token: string, + workspace: WorkspaceId, + objectName: string + ): Promise { try { const url = this.getSignObjectUrl(workspace, objectName) - const response = await fetchSafe(ctx, url, { method: 'POST' }) + const response = await fetchSafe(ctx, url, { method: 'POST', headers: { Authorization: 'Bearer ' + token } }) return await response.text() } catch (err: any) { ctx.error('failed to sign object', { workspace, objectName, err }) @@ -395,12 +423,13 @@ export class DatalakeClient { private async signObjectComplete ( ctx: MeasureContext, + token: string, workspace: WorkspaceId, objectName: string ): Promise { try { const url = this.getSignObjectUrl(workspace, objectName) - const res = await fetchSafe(ctx, url, { method: 'PUT' }) + const res = await fetchSafe(ctx, url, { method: 'PUT', headers: { Authorization: 'Bearer ' + token } }) return (await res.json()) as ObjectMetadata } catch (err: any) { ctx.error('failed to complete signed url upload', { workspace, objectName, err }) @@ -408,10 +437,15 @@ export class DatalakeClient { } } - private async signObjectDelete (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { + private async signObjectDelete ( + ctx: MeasureContext, + token: string, + workspace: WorkspaceId, + objectName: string + ): Promise { try { const url = this.getSignObjectUrl(workspace, objectName) - await fetchSafe(ctx, url, { method: 'DELETE' }) + await fetchSafe(ctx, url, { method: 'DELETE', headers: { Authorization: 'Bearer ' + token } }) } catch (err: any) { ctx.error('failed to abort signed url upload', { workspace, objectName, err }) throw new DatalakeError('Failed to abort signed URL upload') @@ -427,6 +461,7 @@ export class DatalakeClient { private async multipartUploadStart ( ctx: MeasureContext, + token: string, workspace: WorkspaceId, objectName: string, params: UploadObjectParams @@ -436,6 +471,7 @@ export class DatalakeClient { try { const headers = { + Authorization: 'Bearer ' + token, 'Content-Type': params.type, 'Content-Length': params.size?.toString() ?? '0', 'Last-Modified': new Date(params.lastModified).toUTCString() @@ -450,6 +486,7 @@ export class DatalakeClient { private async multipartUploadPart ( ctx: MeasureContext, + token: string, workspace: WorkspaceId, objectName: string, multipart: MultipartUpload, @@ -463,7 +500,11 @@ export class DatalakeClient { url.searchParams.append('partNumber', partNumber.toString()) try { - const response = await fetchSafe(ctx, url, { method: 'POST', body }) + const response = await fetchSafe(ctx, url, { + method: 'POST', + body, + headers: { Authorization: 'Bearer ' + token } + }) return (await response.json()) as MultipartUploadPart } catch (err: any) { ctx.error('failed to upload multipart part', { workspace, objectName, err }) @@ -473,6 +514,7 @@ export class DatalakeClient { private async multipartUploadComplete ( ctx: MeasureContext, + token: string, workspace: WorkspaceId, objectName: string, multipart: MultipartUpload, @@ -484,7 +526,11 @@ export class DatalakeClient { url.searchParams.append('uploadId', multipart.uploadId) try { - const res = await fetchSafe(ctx, url, { method: 'POST', body: JSON.stringify({ parts }) }) + const res = await fetchSafe(ctx, url, { + method: 'POST', + body: JSON.stringify({ parts }), + headers: { Authorization: 'Bearer ' + token } + }) return (await res.json()) as ObjectMetadata } catch (err: any) { ctx.error('failed to complete multipart upload', { workspace, objectName, err }) @@ -494,6 +540,7 @@ export class DatalakeClient { private async multipartUploadAbort ( ctx: MeasureContext, + token: string, workspace: WorkspaceId, objectName: string, multipart: MultipartUpload @@ -504,7 +551,7 @@ export class DatalakeClient { url.searchParams.append('uploadId', multipart.uploadId) try { - await fetchSafe(ctx, url, { method: 'POST' }) + await fetchSafe(ctx, url, { method: 'POST', headers: { Authorization: 'Bearer ' + token } }) } catch (err: any) { ctx.error('failed to abort multipart upload', { workspace, objectName, err }) throw new DatalakeError('Failed to abort multipart upload') diff --git a/server/datalake/src/index.ts b/server/datalake/src/index.ts index 91e99bb2d8..1605de6a92 100644 --- a/server/datalake/src/index.ts +++ b/server/datalake/src/index.ts @@ -13,7 +13,14 @@ // limitations under the License. // -import core, { type Blob, type MeasureContext, type Ref, type WorkspaceId, withContext } from '@hcengineering/core' +import core, { + type Blob, + type MeasureContext, + type Ref, + type WorkspaceId, + systemAccountEmail, + withContext +} from '@hcengineering/core' import { type BlobStorageIterator, @@ -24,6 +31,7 @@ import { type StorageConfiguration, type UploadedObjectInfo } from '@hcengineering/server-core' +import { generateToken } from '@hcengineering/server-token' import { type Readable } from 'stream' import { type UploadObjectParams, DatalakeClient } from './client' @@ -76,9 +84,10 @@ export class DatalakeService implements StorageAdapter { @withContext('remove') async remove (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]): Promise { + const token = generateToken(systemAccountEmail, workspaceId) await Promise.all( objectNames.map(async (objectName) => { - await this.client.deleteObject(ctx, workspaceId, objectName) + await this.client.deleteObject(ctx, token, workspaceId, objectName) }) ) } @@ -90,6 +99,8 @@ export class DatalakeService implements StorageAdapter { @withContext('listStream') async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise { + const token = generateToken(systemAccountEmail, workspaceId) + let hasMore = true const buffer: ListBlobResult[] = [] let cursor: string | undefined @@ -98,7 +109,7 @@ export class DatalakeService implements StorageAdapter { next: async () => { try { while (hasMore && buffer.length < 50) { - const res = await this.client.listObjects(ctx, workspaceId, cursor) + const res = await this.client.listObjects(ctx, token, workspaceId, cursor) hasMore = res.cursor !== undefined cursor = res.cursor @@ -127,7 +138,8 @@ export class DatalakeService implements StorageAdapter { @withContext('stat') async stat (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise { try { - const result = await this.client.statObject(ctx, workspaceId, objectName) + const token = generateToken(systemAccountEmail, workspaceId) + const result = await this.client.statObject(ctx, token, workspaceId, objectName) if (result !== undefined) { return { provider: '', @@ -149,7 +161,8 @@ export class DatalakeService implements StorageAdapter { @withContext('get') async get (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise { - return await this.client.getObject(ctx, workspaceId, objectName) + const token = generateToken(systemAccountEmail, workspaceId) + return await this.client.getObject(ctx, token, workspaceId, objectName) } @withContext('put') @@ -161,6 +174,8 @@ export class DatalakeService implements StorageAdapter { contentType: string, size?: number ): Promise { + const token = generateToken(systemAccountEmail, workspaceId) + const params: UploadObjectParams = { lastModified: Date.now(), type: contentType, @@ -168,7 +183,7 @@ export class DatalakeService implements StorageAdapter { } const { etag } = await ctx.with('put', {}, (ctx) => - withRetry(ctx, 5, () => this.client.putObject(ctx, workspaceId, objectName, stream, params)) + withRetry(ctx, 5, () => this.client.putObject(ctx, token, workspaceId, objectName, stream, params)) ) return { @@ -179,7 +194,8 @@ export class DatalakeService implements StorageAdapter { @withContext('read') async read (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise { - const data = await this.client.getObject(ctx, workspaceId, objectName) + const token = generateToken(systemAccountEmail, workspaceId) + const data = await this.client.getObject(ctx, token, workspaceId, objectName) const chunks: Buffer[] = [] for await (const chunk of data) { @@ -197,7 +213,8 @@ export class DatalakeService implements StorageAdapter { offset: number, length?: number ): Promise { - return await this.client.getPartialObject(ctx, workspaceId, objectName, offset, length) + const token = generateToken(systemAccountEmail, workspaceId) + return await this.client.getPartialObject(ctx, token, workspaceId, objectName, offset, length) } async getUrl (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise { diff --git a/services/love/src/storage.ts b/services/love/src/storage.ts index 044ad41439..2a8bb4cd39 100644 --- a/services/love/src/storage.ts +++ b/services/love/src/storage.ts @@ -13,10 +13,11 @@ // limitations under the License. // -import { Blob, MeasureContext, toWorkspaceString, WorkspaceId } from '@hcengineering/core' +import { Blob, MeasureContext, systemAccountEmail, toWorkspaceString, WorkspaceId } from '@hcengineering/core' import { DatalakeConfig, DatalakeService, createDatalakeClient } from '@hcengineering/datalake' import { S3Config, S3Service } from '@hcengineering/s3' import { StorageConfig } from '@hcengineering/server-core' +import { generateToken } from '@hcengineering/server-token' import { v4 as uuid } from 'uuid' export interface S3UploadParams { @@ -105,8 +106,9 @@ async function getS3UploadParamsDatalake ( config: DatalakeConfig, s3config: S3Config ): Promise { + const token = generateToken(systemAccountEmail, workspaceId) const client = createDatalakeClient(config) - const { bucket } = await client.getR2UploadParams(ctx, workspaceId) + const { bucket } = await client.getR2UploadParams(ctx, token, workspaceId) const endpoint = s3config.endpoint const accessKey = s3config.accessKey @@ -144,13 +146,14 @@ async function saveFileToDatalake ( s3config: S3Config, filename: string ): Promise { + const token = generateToken(systemAccountEmail, workspaceId) const client = createDatalakeClient(config) const storageAdapter = new DatalakeService(config) const prefix = rootPrefix(s3config, workspaceId) const uuid = stripPrefix(prefix, filename) - await client.uploadFromR2(ctx, workspaceId, uuid, { filename: uuid }) + await client.uploadFromR2(ctx, token, workspaceId, uuid, { filename: uuid }) return await storageAdapter.stat(ctx, workspaceId, uuid) }