From 0534c2b90e04dfec1f2979710ae91430a1374623 Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Tue, 21 Jan 2025 20:45:07 +0700 Subject: [PATCH] fix: properly handle stream errors in datalake migration tool (#7747) --- dev/tool/src/index.ts | 9 +++++- dev/tool/src/storage.ts | 55 +++++++++++++++++++++++++++++++---- server/datalake/src/client.ts | 4 +-- 3 files changed, 59 insertions(+), 9 deletions(-) diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 97b17e971b..4095fac8cd 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -1197,12 +1197,14 @@ export function devTool ( .description('copy files from s3 to datalake') .option('-w, --workspace ', 'Selected workspace only', '') .option('-c, --concurrency ', 'Number of files being processed concurrently', '10') + .option('-s, --skip ', 'Number of workspaces to skip', '0') .option('-e, --existing', 'Copy existing blobs', false) - .action(async (cmd: { workspace: string, concurrency: string, existing: boolean }) => { + .action(async (cmd: { workspace: string, concurrency: string, existing: boolean, skip: string }) => { const params = { concurrency: parseInt(cmd.concurrency), existing: cmd.existing } + const skip = parseInt(cmd.skip) const storageConfig = storageConfigFromEnv(process.env.STORAGE) @@ -1245,6 +1247,11 @@ export function devTool ( let index = 0 for (const workspace of workspaces) { index++ + if (index <= skip) { + toolCtx.info('processing workspace', { workspace: workspace.workspace, index, count }) + continue + } + toolCtx.info('processing workspace', { workspace: workspace.workspace, index, diff --git a/dev/tool/src/storage.ts b/dev/tool/src/storage.ts index e4afd9bb51..9d550abdbc 100644 --- a/dev/tool/src/storage.ts +++ b/dev/tool/src/storage.ts @@ -23,6 +23,7 @@ import { RateLimiter } from '@hcengineering/core' import { type DatalakeClient } from '@hcengineering/datalake' +import { type UploadObjectParams } from '@hcengineering/datalake/types/client' import { DOMAIN_ATTACHMENT } from '@hcengineering/model-attachment' import { type S3Config, type S3Service } from '@hcengineering/s3' import { @@ -32,7 +33,7 @@ import { type UploadedObjectInfo } from '@hcengineering/server-core' import { type Db } from 'mongodb' -import { PassThrough } from 'stream' +import { PassThrough, type Readable } from 'stream' export interface MoveFilesParams { concurrency: number @@ -400,14 +401,11 @@ export async function copyBlobToDatalake ( type: stat.contentType, size: stat.size } + const readable = await adapter.get(ctx, workspaceId, objectName) try { - readable.on('end', () => { - readable.destroy() - }) console.log('uploading huge blob', objectName, Math.round(stat.size / 1024 / 1024), 'MB') - const stream = readable.pipe(new PassThrough()) - await datalake.uploadMultipart(ctx, workspaceId, objectName, stream, metadata) + await uploadMultipart(ctx, datalake, workspaceId, objectName, readable, metadata) console.log('done', objectName) } finally { readable.destroy() @@ -416,6 +414,51 @@ export async function copyBlobToDatalake ( } } +function uploadMultipart ( + ctx: MeasureContext, + datalake: DatalakeClient, + workspaceId: WorkspaceId, + objectName: string, + stream: Readable, + metadata: UploadObjectParams +): Promise { + return new Promise((resolve, reject) => { + const passthrough = new PassThrough() + + const cleanup = (): void => { + stream.removeAllListeners() + passthrough.removeAllListeners() + stream.destroy() + passthrough.destroy() + } + + stream.on('error', (err) => { + ctx.error('error reading blob', { err }) + cleanup() + reject(err) + }) + passthrough.on('error', (err) => { + ctx.error('error reading blob', { err }) + cleanup() + reject(err) + }) + + stream.pipe(passthrough) + + datalake + .uploadMultipart(ctx, workspaceId, objectName, passthrough, metadata) + .then(() => { + cleanup() + resolve() + }) + .catch((err) => { + ctx.error('failed to upload blob', { err }) + cleanup() + reject(err) + }) + }) +} + export function formatSize (size: number): string { const units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'] const pow = size === 0 ? 0 : Math.floor(Math.log(size) / Math.log(1024)) diff --git a/server/datalake/src/client.ts b/server/datalake/src/client.ts index 553aaaeeb6..8a27e6eb66 100644 --- a/server/datalake/src/client.ts +++ b/server/datalake/src/client.ts @@ -466,8 +466,8 @@ export class DatalakeClient { const response = await fetchSafe(ctx, url, { method: 'POST', body }) return (await response.json()) as MultipartUploadPart } catch (err: any) { - ctx.error('failed to abort multipart upload', { workspace, objectName, err }) - throw new DatalakeError('Failed to abort multipart upload') + ctx.error('failed to upload multipart part', { workspace, objectName, err }) + throw new DatalakeError('Failed to upload multipart part') } }