feat: add auth to datalake (#7852)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2025-01-31 17:51:20 +04:00 committed by GitHub
parent b11e17aee0
commit e11e0eb0dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 117 additions and 45 deletions

View File

@ -3988,7 +3988,7 @@ packages:
version: 0.0.0
'@rush-temp/datalake@file:projects/datalake.tgz':
resolution: {integrity: sha512-UX1RJWMtrQY5HWrFKnwi2vrRYfR8ZSRo2PtLn04ozWueiiLS3Q61UauAUfPDRtO0K5cJgecH7+gX750dx8oUhQ==, tarball: file:projects/datalake.tgz}
resolution: {integrity: sha512-1vIORSiK/b09GyLIUKWdeisoiIuYbPozOorFt23QzYIszVe/OGZR4Zhqgo/7LFUt00tgFyNcBIUYyWg9xd4uDQ==, tarball: file:projects/datalake.tgz}
version: 0.0.0
'@rush-temp/desktop-1@file:projects/desktop-1.tgz':

View File

@ -1279,10 +1279,11 @@ export function devTool (
blobsSize: workspace.backupInfo?.blobsSize ?? 0
})
const workspaceId = getWorkspaceId(workspace.workspace)
const token = generateToken(systemAccountEmail, workspaceId)
for (const config of storages) {
const storage = new S3Service(config)
await copyToDatalake(toolCtx, workspaceId, config, storage, datalake, params)
await copyToDatalake(toolCtx, workspaceId, config, storage, datalake, token, params)
}
}
})

View File

@ -271,6 +271,7 @@ export async function copyToDatalake (
config: S3Config,
adapter: S3Service,
datalake: DatalakeClient,
token: string,
params: CopyDatalakeParams
): Promise<void> {
console.log('copying from', config.name, 'concurrency:', params.concurrency)
@ -311,7 +312,7 @@ export async function copyToDatalake (
let cursor: string | undefined = ''
let hasMore = true
while (hasMore) {
const res = await datalake.listObjects(ctx, workspaceId, cursor, 1000)
const res = await datalake.listObjects(ctx, token, workspaceId, cursor, 1000)
cursor = res.cursor
hasMore = res.cursor !== undefined
for (const blob of res.blobs) {
@ -349,7 +350,7 @@ export async function copyToDatalake (
ctx,
5,
async () => {
await copyBlobToDatalake(ctx, workspaceId, blob, config, adapter, datalake)
await copyBlobToDatalake(ctx, workspaceId, blob, config, adapter, datalake, token)
processedCnt += 1
processedSize += blob.size
},
@ -378,7 +379,8 @@ export async function copyBlobToDatalake (
blob: ListBlobResult,
config: S3Config,
adapter: S3Service,
datalake: DatalakeClient
datalake: DatalakeClient,
token: string
): Promise<void> {
const objectName = blob._id
if (blob.size < 1024 * 1024 * 64) {
@ -390,7 +392,7 @@ export async function copyBlobToDatalake (
const url = concatLink(endpoint, `${bucketId}/${objectId}`)
const params = { url, accessKeyId, secretAccessKey, region }
await datalake.uploadFromS3(ctx, workspaceId, objectName, params)
await datalake.uploadFromS3(ctx, token, workspaceId, objectName, params)
} else {
// Handle huge file
const stat = await adapter.stat(ctx, workspaceId, objectName)
@ -405,7 +407,7 @@ export async function copyBlobToDatalake (
const readable = await adapter.get(ctx, workspaceId, objectName)
try {
console.log('uploading huge blob', objectName, Math.round(stat.size / 1024 / 1024), 'MB')
await uploadMultipart(ctx, datalake, workspaceId, objectName, readable, metadata)
await uploadMultipart(ctx, token, datalake, workspaceId, objectName, readable, metadata)
console.log('done', objectName)
} finally {
readable.destroy()
@ -416,6 +418,7 @@ export async function copyBlobToDatalake (
function uploadMultipart (
ctx: MeasureContext,
token: string,
datalake: DatalakeClient,
workspaceId: WorkspaceId,
objectName: string,
@ -446,7 +449,7 @@ function uploadMultipart (
stream.pipe(passthrough)
datalake
.uploadMultipart(ctx, workspaceId, objectName, passthrough, metadata)
.uploadMultipart(ctx, token, workspaceId, objectName, passthrough, metadata)
.then(() => {
cleanup()
resolve()

View File

@ -39,6 +39,7 @@
"@hcengineering/core": "^0.6.32",
"@hcengineering/platform": "^0.6.11",
"@hcengineering/server-core": "^0.6.1",
"@hcengineering/server-token": "^0.6.11",
"node-fetch": "^2.6.6",
"form-data": "^4.0.0"
}

View File

@ -89,6 +89,7 @@ export class DatalakeClient {
async listObjects (
ctx: MeasureContext,
token: string,
workspace: WorkspaceId,
cursor: string | undefined,
limit: number = 100
@ -100,16 +101,16 @@ export class DatalakeClient {
url.searchParams.append('cursor', cursor)
}
const response = await fetchSafe(ctx, url)
const response = await fetchSafe(ctx, url, { headers: { Authorization: 'Bearer ' + token } })
return (await response.json()) as ListObjectOutput
}
async getObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<Readable> {
async getObject (ctx: MeasureContext, token: string, workspace: WorkspaceId, objectName: string): Promise<Readable> {
const url = this.getObjectUrl(ctx, workspace, objectName)
let response
try {
response = await fetchSafe(ctx, url)
response = await fetchSafe(ctx, url, { headers: { Authorization: 'Bearer ' + token } })
} catch (err: any) {
if (err.name !== 'NotFoundError') {
console.error('failed to get object', { workspace, objectName, err })
@ -127,6 +128,7 @@ export class DatalakeClient {
async getPartialObject (
ctx: MeasureContext,
token: string,
workspace: WorkspaceId,
objectName: string,
offset: number,
@ -134,6 +136,7 @@ export class DatalakeClient {
): Promise<Readable> {
const url = this.getObjectUrl(ctx, workspace, objectName)
const headers = {
Authorization: 'Bearer ' + token,
Range: length !== undefined ? `bytes=${offset}-${offset + length - 1}` : `bytes=${offset}`
}
@ -157,6 +160,7 @@ export class DatalakeClient {
async statObject (
ctx: MeasureContext,
token: string,
workspace: WorkspaceId,
objectName: string
): Promise<StatObjectOutput | undefined> {
@ -164,7 +168,10 @@ export class DatalakeClient {
let response: Response
try {
response = await fetchSafe(ctx, url, { method: 'HEAD' })
response = await fetchSafe(ctx, url, {
method: 'HEAD',
headers: { Authorization: 'Bearer ' + token }
})
} catch (err: any) {
if (err.name === 'NotFoundError') {
return
@ -185,10 +192,13 @@ export class DatalakeClient {
}
}
async deleteObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<void> {
async deleteObject (ctx: MeasureContext, token: string, workspace: WorkspaceId, objectName: string): Promise<void> {
const url = this.getObjectUrl(ctx, workspace, objectName)
try {
await fetchSafe(ctx, url, { method: 'DELETE' })
await fetchSafe(ctx, url, {
method: 'DELETE',
headers: { Authorization: 'Bearer ' + token }
})
} catch (err: any) {
if (err.name !== 'NotFoundError') {
console.error('failed to delete object', { workspace, objectName, err })
@ -199,6 +209,7 @@ export class DatalakeClient {
async putObject (
ctx: MeasureContext,
token: string,
workspace: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
@ -219,11 +230,11 @@ export class DatalakeClient {
try {
if (size === undefined || size < 64 * 1024 * 1024) {
return await ctx.with('direct-upload', {}, (ctx) =>
this.uploadWithFormData(ctx, workspace, objectName, stream, { ...params, size })
this.uploadWithFormData(ctx, token, workspace, objectName, stream, { ...params, size })
)
} else {
return await ctx.with('signed-url-upload', {}, (ctx) =>
this.uploadWithSignedURL(ctx, workspace, objectName, stream, { ...params, size })
this.uploadWithSignedURL(ctx, token, workspace, objectName, stream, { ...params, size })
)
}
} catch (err) {
@ -234,6 +245,7 @@ export class DatalakeClient {
async uploadWithFormData (
ctx: MeasureContext,
token: string,
workspace: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
@ -253,7 +265,11 @@ export class DatalakeClient {
}
form.append('file', stream, options)
const response = await fetchSafe(ctx, url, { method: 'POST', body: form })
const response = await fetchSafe(ctx, url, {
method: 'POST',
body: form,
headers: { Authorization: 'Bearer ' + token }
})
const result = (await response.json()) as BlobUploadResult[]
if (result.length !== 1) {
@ -271,6 +287,7 @@ export class DatalakeClient {
async uploadMultipart (
ctx: MeasureContext,
token: string,
workspace: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
@ -278,39 +295,41 @@ export class DatalakeClient {
): Promise<ObjectMetadata> {
const chunkSize = 10 * 1024 * 1024
const multipart = await this.multipartUploadStart(ctx, workspace, objectName, params)
const multipart = await this.multipartUploadStart(ctx, token, workspace, objectName, params)
try {
const parts: MultipartUploadPart[] = []
let partNumber = 1
for await (const chunk of getChunks(stream, chunkSize)) {
const part = await this.multipartUploadPart(ctx, workspace, objectName, multipart, partNumber, chunk)
const part = await this.multipartUploadPart(ctx, token, workspace, objectName, multipart, partNumber, chunk)
parts.push(part)
partNumber++
}
return await this.multipartUploadComplete(ctx, workspace, objectName, multipart, parts)
return await this.multipartUploadComplete(ctx, token, workspace, objectName, multipart, parts)
} catch (err: any) {
await this.multipartUploadAbort(ctx, workspace, objectName, multipart)
await this.multipartUploadAbort(ctx, token, workspace, objectName, multipart)
throw err
}
}
async uploadWithSignedURL (
ctx: MeasureContext,
token: string,
workspace: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
params: UploadObjectParams
): Promise<ObjectMetadata> {
const url = await this.signObjectSign(ctx, workspace, objectName)
const url = await this.signObjectSign(ctx, token, workspace, objectName)
try {
await fetchSafe(ctx, url, {
body: stream,
method: 'PUT',
headers: {
Authorization: 'Bearer ' + token,
'Content-Type': params.type,
'Content-Length': params.size?.toString() ?? '0'
// 'x-amz-meta-last-modified': metadata.lastModified.toString()
@ -318,17 +337,18 @@ export class DatalakeClient {
})
} catch (err) {
ctx.error('failed to upload via signed url', { workspace, objectName, err })
await this.signObjectDelete(ctx, workspace, objectName)
await this.signObjectDelete(ctx, token, workspace, objectName)
throw new DatalakeError('Failed to upload via signed URL')
}
return await this.signObjectComplete(ctx, workspace, objectName)
return await this.signObjectComplete(ctx, token, workspace, objectName)
}
// S3
async uploadFromS3 (
ctx: MeasureContext,
token: string,
workspace: WorkspaceId,
objectName: string,
params: {
@ -343,6 +363,7 @@ export class DatalakeClient {
await fetchSafe(ctx, url, {
method: 'POST',
headers: {
Authorization: 'Bearer ' + token,
'Content-Type': 'application/json'
},
body: JSON.stringify(params)
@ -351,17 +372,18 @@ export class DatalakeClient {
// R2
async getR2UploadParams (ctx: MeasureContext, workspace: WorkspaceId): Promise<R2UploadParams> {
async getR2UploadParams (ctx: MeasureContext, token: string, workspace: WorkspaceId): Promise<R2UploadParams> {
const path = `/upload/r2/${workspace.name}`
const url = concatLink(this.endpoint, path)
const response = await fetchSafe(ctx, url)
const response = await fetchSafe(ctx, url, { headers: { Authorization: 'Bearer ' + token } })
const json = (await response.json()) as R2UploadParams
return json
}
async uploadFromR2 (
ctx: MeasureContext,
token: string,
workspace: WorkspaceId,
objectName: string,
params: {
@ -374,6 +396,7 @@ export class DatalakeClient {
await fetchSafe(ctx, url, {
method: 'POST',
headers: {
Authorization: 'Bearer ' + token,
'Content-Type': 'application/json'
},
body: JSON.stringify(params)
@ -382,10 +405,15 @@ export class DatalakeClient {
// Signed URL
private async signObjectSign (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<string> {
private async signObjectSign (
ctx: MeasureContext,
token: string,
workspace: WorkspaceId,
objectName: string
): Promise<string> {
try {
const url = this.getSignObjectUrl(workspace, objectName)
const response = await fetchSafe(ctx, url, { method: 'POST' })
const response = await fetchSafe(ctx, url, { method: 'POST', headers: { Authorization: 'Bearer ' + token } })
return await response.text()
} catch (err: any) {
ctx.error('failed to sign object', { workspace, objectName, err })
@ -395,12 +423,13 @@ export class DatalakeClient {
private async signObjectComplete (
ctx: MeasureContext,
token: string,
workspace: WorkspaceId,
objectName: string
): Promise<ObjectMetadata> {
try {
const url = this.getSignObjectUrl(workspace, objectName)
const res = await fetchSafe(ctx, url, { method: 'PUT' })
const res = await fetchSafe(ctx, url, { method: 'PUT', headers: { Authorization: 'Bearer ' + token } })
return (await res.json()) as ObjectMetadata
} catch (err: any) {
ctx.error('failed to complete signed url upload', { workspace, objectName, err })
@ -408,10 +437,15 @@ export class DatalakeClient {
}
}
private async signObjectDelete (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<void> {
private async signObjectDelete (
ctx: MeasureContext,
token: string,
workspace: WorkspaceId,
objectName: string
): Promise<void> {
try {
const url = this.getSignObjectUrl(workspace, objectName)
await fetchSafe(ctx, url, { method: 'DELETE' })
await fetchSafe(ctx, url, { method: 'DELETE', headers: { Authorization: 'Bearer ' + token } })
} catch (err: any) {
ctx.error('failed to abort signed url upload', { workspace, objectName, err })
throw new DatalakeError('Failed to abort signed URL upload')
@ -427,6 +461,7 @@ export class DatalakeClient {
private async multipartUploadStart (
ctx: MeasureContext,
token: string,
workspace: WorkspaceId,
objectName: string,
params: UploadObjectParams
@ -436,6 +471,7 @@ export class DatalakeClient {
try {
const headers = {
Authorization: 'Bearer ' + token,
'Content-Type': params.type,
'Content-Length': params.size?.toString() ?? '0',
'Last-Modified': new Date(params.lastModified).toUTCString()
@ -450,6 +486,7 @@ export class DatalakeClient {
private async multipartUploadPart (
ctx: MeasureContext,
token: string,
workspace: WorkspaceId,
objectName: string,
multipart: MultipartUpload,
@ -463,7 +500,11 @@ export class DatalakeClient {
url.searchParams.append('partNumber', partNumber.toString())
try {
const response = await fetchSafe(ctx, url, { method: 'POST', body })
const response = await fetchSafe(ctx, url, {
method: 'POST',
body,
headers: { Authorization: 'Bearer ' + token }
})
return (await response.json()) as MultipartUploadPart
} catch (err: any) {
ctx.error('failed to upload multipart part', { workspace, objectName, err })
@ -473,6 +514,7 @@ export class DatalakeClient {
private async multipartUploadComplete (
ctx: MeasureContext,
token: string,
workspace: WorkspaceId,
objectName: string,
multipart: MultipartUpload,
@ -484,7 +526,11 @@ export class DatalakeClient {
url.searchParams.append('uploadId', multipart.uploadId)
try {
const res = await fetchSafe(ctx, url, { method: 'POST', body: JSON.stringify({ parts }) })
const res = await fetchSafe(ctx, url, {
method: 'POST',
body: JSON.stringify({ parts }),
headers: { Authorization: 'Bearer ' + token }
})
return (await res.json()) as ObjectMetadata
} catch (err: any) {
ctx.error('failed to complete multipart upload', { workspace, objectName, err })
@ -494,6 +540,7 @@ export class DatalakeClient {
private async multipartUploadAbort (
ctx: MeasureContext,
token: string,
workspace: WorkspaceId,
objectName: string,
multipart: MultipartUpload
@ -504,7 +551,7 @@ export class DatalakeClient {
url.searchParams.append('uploadId', multipart.uploadId)
try {
await fetchSafe(ctx, url, { method: 'POST' })
await fetchSafe(ctx, url, { method: 'POST', headers: { Authorization: 'Bearer ' + token } })
} catch (err: any) {
ctx.error('failed to abort multipart upload', { workspace, objectName, err })
throw new DatalakeError('Failed to abort multipart upload')

View File

@ -13,7 +13,14 @@
// limitations under the License.
//
import core, { type Blob, type MeasureContext, type Ref, type WorkspaceId, withContext } from '@hcengineering/core'
import core, {
type Blob,
type MeasureContext,
type Ref,
type WorkspaceId,
systemAccountEmail,
withContext
} from '@hcengineering/core'
import {
type BlobStorageIterator,
@ -24,6 +31,7 @@ import {
type StorageConfiguration,
type UploadedObjectInfo
} from '@hcengineering/server-core'
import { generateToken } from '@hcengineering/server-token'
import { type Readable } from 'stream'
import { type UploadObjectParams, DatalakeClient } from './client'
@ -76,9 +84,10 @@ export class DatalakeService implements StorageAdapter {
@withContext('remove')
async remove (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]): Promise<void> {
const token = generateToken(systemAccountEmail, workspaceId)
await Promise.all(
objectNames.map(async (objectName) => {
await this.client.deleteObject(ctx, workspaceId, objectName)
await this.client.deleteObject(ctx, token, workspaceId, objectName)
})
)
}
@ -90,6 +99,8 @@ export class DatalakeService implements StorageAdapter {
@withContext('listStream')
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
const token = generateToken(systemAccountEmail, workspaceId)
let hasMore = true
const buffer: ListBlobResult[] = []
let cursor: string | undefined
@ -98,7 +109,7 @@ export class DatalakeService implements StorageAdapter {
next: async () => {
try {
while (hasMore && buffer.length < 50) {
const res = await this.client.listObjects(ctx, workspaceId, cursor)
const res = await this.client.listObjects(ctx, token, workspaceId, cursor)
hasMore = res.cursor !== undefined
cursor = res.cursor
@ -127,7 +138,8 @@ export class DatalakeService implements StorageAdapter {
@withContext('stat')
async stat (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Blob | undefined> {
try {
const result = await this.client.statObject(ctx, workspaceId, objectName)
const token = generateToken(systemAccountEmail, workspaceId)
const result = await this.client.statObject(ctx, token, workspaceId, objectName)
if (result !== undefined) {
return {
provider: '',
@ -149,7 +161,8 @@ export class DatalakeService implements StorageAdapter {
@withContext('get')
async get (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Readable> {
return await this.client.getObject(ctx, workspaceId, objectName)
const token = generateToken(systemAccountEmail, workspaceId)
return await this.client.getObject(ctx, token, workspaceId, objectName)
}
@withContext('put')
@ -161,6 +174,8 @@ export class DatalakeService implements StorageAdapter {
contentType: string,
size?: number
): Promise<UploadedObjectInfo> {
const token = generateToken(systemAccountEmail, workspaceId)
const params: UploadObjectParams = {
lastModified: Date.now(),
type: contentType,
@ -168,7 +183,7 @@ export class DatalakeService implements StorageAdapter {
}
const { etag } = await ctx.with('put', {}, (ctx) =>
withRetry(ctx, 5, () => this.client.putObject(ctx, workspaceId, objectName, stream, params))
withRetry(ctx, 5, () => this.client.putObject(ctx, token, workspaceId, objectName, stream, params))
)
return {
@ -179,7 +194,8 @@ export class DatalakeService implements StorageAdapter {
@withContext('read')
async read (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Buffer[]> {
const data = await this.client.getObject(ctx, workspaceId, objectName)
const token = generateToken(systemAccountEmail, workspaceId)
const data = await this.client.getObject(ctx, token, workspaceId, objectName)
const chunks: Buffer[] = []
for await (const chunk of data) {
@ -197,7 +213,8 @@ export class DatalakeService implements StorageAdapter {
offset: number,
length?: number
): Promise<Readable> {
return await this.client.getPartialObject(ctx, workspaceId, objectName, offset, length)
const token = generateToken(systemAccountEmail, workspaceId)
return await this.client.getPartialObject(ctx, token, workspaceId, objectName, offset, length)
}
async getUrl (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<string> {

View File

@ -13,10 +13,11 @@
// limitations under the License.
//
import { Blob, MeasureContext, toWorkspaceString, WorkspaceId } from '@hcengineering/core'
import { Blob, MeasureContext, systemAccountEmail, toWorkspaceString, WorkspaceId } from '@hcengineering/core'
import { DatalakeConfig, DatalakeService, createDatalakeClient } from '@hcengineering/datalake'
import { S3Config, S3Service } from '@hcengineering/s3'
import { StorageConfig } from '@hcengineering/server-core'
import { generateToken } from '@hcengineering/server-token'
import { v4 as uuid } from 'uuid'
export interface S3UploadParams {
@ -105,8 +106,9 @@ async function getS3UploadParamsDatalake (
config: DatalakeConfig,
s3config: S3Config
): Promise<S3UploadParams> {
const token = generateToken(systemAccountEmail, workspaceId)
const client = createDatalakeClient(config)
const { bucket } = await client.getR2UploadParams(ctx, workspaceId)
const { bucket } = await client.getR2UploadParams(ctx, token, workspaceId)
const endpoint = s3config.endpoint
const accessKey = s3config.accessKey
@ -144,13 +146,14 @@ async function saveFileToDatalake (
s3config: S3Config,
filename: string
): Promise<Blob | undefined> {
const token = generateToken(systemAccountEmail, workspaceId)
const client = createDatalakeClient(config)
const storageAdapter = new DatalakeService(config)
const prefix = rootPrefix(s3config, workspaceId)
const uuid = stripPrefix(prefix, filename)
await client.uploadFromR2(ctx, workspaceId, uuid, { filename: uuid })
await client.uploadFromR2(ctx, token, workspaceId, uuid, { filename: uuid })
return await storageAdapter.stat(ctx, workspaceId, uuid)
}