From c9e41e92a28341001d3f702d12cdf1ae9a06fcfb Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Wed, 2 Oct 2024 17:12:21 +0700 Subject: [PATCH] UBERF-8329 Add missing methods to datalake adapter (#6784) Signed-off-by: Alexander Onnikov --- server/datalake/src/client.ts | 111 ++++++++++++++++++++++++++++++++-- server/datalake/src/index.ts | 33 ++++++++-- 2 files changed, 133 insertions(+), 11 deletions(-) diff --git a/server/datalake/src/client.ts b/server/datalake/src/client.ts index f160afa964..5571afce5a 100644 --- a/server/datalake/src/client.ts +++ b/server/datalake/src/client.ts @@ -26,6 +26,14 @@ export interface ObjectMetadata { size?: number } +/** @public */ +export interface StatObjectOutput { + lastModified: number + type: string + etag?: string + size?: number +} + /** @public */ export interface PutObjectOutput { id: string @@ -55,9 +63,19 @@ export class Client { async getObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { const url = this.getObjectUrl(ctx, workspace, objectName) - const response = await fetch(url) + + let response + try { + response = await fetch(url) + } catch (err: any) { + ctx.error('network error', { error: err }) + throw new Error(`Network error ${err}`) + } if (!response.ok) { + if (response.status === 404) { + throw new Error('Not Found') + } throw new Error('HTTP error ' + response.status) } @@ -69,12 +87,90 @@ export class Client { return Readable.from(response.body) } + async getPartialObject ( + ctx: MeasureContext, + workspace: WorkspaceId, + objectName: string, + offset: number, + length?: number + ): Promise { + const url = this.getObjectUrl(ctx, workspace, objectName) + const headers = { + Range: `bytes=${offset}-${length ?? ''}` + } + + let response + try { + response = await fetch(url, { headers }) + } catch (err: any) { + ctx.error('network error', { error: err }) + throw new Error(`Network error ${err}`) + } + + if (!response.ok) { + if (response.status === 404) { + throw new Error('Not Found') + } + throw new Error('HTTP error ' + response.status) + } + + if (response.body == null) { + ctx.error('bad datalake response', { objectName }) + throw new Error('Missing response body') + } + + return Readable.from(response.body) + } + + async statObject ( + ctx: MeasureContext, + workspace: WorkspaceId, + objectName: string + ): Promise { + const url = this.getObjectUrl(ctx, workspace, objectName) + + let response + try { + response = await fetch(url, { method: 'HEAD' }) + } catch (err: any) { + ctx.error('network error', { error: err }) + throw new Error(`Network error ${err}`) + } + + if (!response.ok) { + if (response.status === 404) { + return undefined + } + throw new Error('HTTP error ' + response.status) + } + + const headers = response.headers + const lastModified = Date.parse(headers.get('Last-Modified') ?? '') + const size = parseInt(headers.get('Content-Length') ?? '0', 10) + + return { + lastModified: isNaN(lastModified) ? 0 : lastModified, + size: isNaN(size) ? 0 : size, + type: headers.get('Content-Type') ?? '', + etag: headers.get('ETag') ?? '' + } + } + async deleteObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { const url = this.getObjectUrl(ctx, workspace, objectName) - const response = await fetch(url, { method: 'DELETE' }) + let response + try { + response = await fetch(url, { method: 'DELETE' }) + } catch (err: any) { + ctx.error('network error', { error: err }) + throw new Error(`Network error ${err}`) + } if (!response.ok) { + if (response.status === 404) { + throw new Error('Not Found') + } throw new Error('HTTP error ' + response.status) } } @@ -100,10 +196,13 @@ export class Client { } form.append('file', stream, options) - const response = await fetch(url, { - method: 'POST', - body: form - }) + let response + try { + response = await fetch(url, { method: 'POST', body: form }) + } catch (err: any) { + ctx.error('network error', { error: err }) + throw new Error(`Network error ${err}`) + } if (!response.ok) { throw new Error('HTTP error ' + response.status) diff --git a/server/datalake/src/index.ts b/server/datalake/src/index.ts index 930be31961..2b3b48b4b2 100644 --- a/server/datalake/src/index.ts +++ b/server/datalake/src/index.ts @@ -13,7 +13,7 @@ // limitations under the License. // -import { withContext, type Blob, type MeasureContext, type WorkspaceId } from '@hcengineering/core' +import core, { type Blob, type MeasureContext, type Ref, type WorkspaceId, withContext } from '@hcengineering/core' import { type BlobStorageIterator, @@ -74,13 +74,36 @@ export class DatalakeService implements StorageAdapter { @withContext('listStream') async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise { - throw new Error('not supported') + return { + next: async () => [], + close: async () => {} + } } @withContext('stat') async stat (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise { - // not supported - return undefined + try { + const result = await this.client.statObject(ctx, workspaceId, objectName) + if (result !== undefined) { + return { + provider: '', + _class: core.class.Blob, + _id: objectName as Ref, + storageId: objectName, + contentType: result.type, + size: result.size ?? 0, + etag: result.etag ?? '', + space: core.space.Configuration, + modifiedBy: core.account.System, + modifiedOn: result.lastModified, + version: null + } + } else { + ctx.error('no object found', { objectName, workspaceId: workspaceId.name }) + } + } catch (err) { + ctx.error('failed to stat object', { error: err, objectName, workspaceId: workspaceId.name }) + } } @withContext('get') @@ -134,7 +157,7 @@ export class DatalakeService implements StorageAdapter { offset: number, length?: number ): Promise { - throw new Error('not implemented') + return await this.client.getPartialObject(ctx, workspaceId, objectName, offset, length) } async getUrl (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise {