mirror of
https://github.com/hcengineering/platform.git
synced 2025-01-22 11:26:58 +00:00
fix: properly handle stream errors in datalake migration tool (#7747)
This commit is contained in:
parent
f4c3eef699
commit
0534c2b90e
@ -1197,12 +1197,14 @@ export function devTool (
|
||||
.description('copy files from s3 to datalake')
|
||||
.option('-w, --workspace <workspace>', 'Selected workspace only', '')
|
||||
.option('-c, --concurrency <concurrency>', 'Number of files being processed concurrently', '10')
|
||||
.option('-s, --skip <number>', 'Number of workspaces to skip', '0')
|
||||
.option('-e, --existing', 'Copy existing blobs', false)
|
||||
.action(async (cmd: { workspace: string, concurrency: string, existing: boolean }) => {
|
||||
.action(async (cmd: { workspace: string, concurrency: string, existing: boolean, skip: string }) => {
|
||||
const params = {
|
||||
concurrency: parseInt(cmd.concurrency),
|
||||
existing: cmd.existing
|
||||
}
|
||||
const skip = parseInt(cmd.skip)
|
||||
|
||||
const storageConfig = storageConfigFromEnv(process.env.STORAGE)
|
||||
|
||||
@ -1245,6 +1247,11 @@ export function devTool (
|
||||
let index = 0
|
||||
for (const workspace of workspaces) {
|
||||
index++
|
||||
if (index <= skip) {
|
||||
toolCtx.info('processing workspace', { workspace: workspace.workspace, index, count })
|
||||
continue
|
||||
}
|
||||
|
||||
toolCtx.info('processing workspace', {
|
||||
workspace: workspace.workspace,
|
||||
index,
|
||||
|
@ -23,6 +23,7 @@ import {
|
||||
RateLimiter
|
||||
} from '@hcengineering/core'
|
||||
import { type DatalakeClient } from '@hcengineering/datalake'
|
||||
import { type UploadObjectParams } from '@hcengineering/datalake/types/client'
|
||||
import { DOMAIN_ATTACHMENT } from '@hcengineering/model-attachment'
|
||||
import { type S3Config, type S3Service } from '@hcengineering/s3'
|
||||
import {
|
||||
@ -32,7 +33,7 @@ import {
|
||||
type UploadedObjectInfo
|
||||
} from '@hcengineering/server-core'
|
||||
import { type Db } from 'mongodb'
|
||||
import { PassThrough } from 'stream'
|
||||
import { PassThrough, type Readable } from 'stream'
|
||||
|
||||
export interface MoveFilesParams {
|
||||
concurrency: number
|
||||
@ -400,14 +401,11 @@ export async function copyBlobToDatalake (
|
||||
type: stat.contentType,
|
||||
size: stat.size
|
||||
}
|
||||
|
||||
const readable = await adapter.get(ctx, workspaceId, objectName)
|
||||
try {
|
||||
readable.on('end', () => {
|
||||
readable.destroy()
|
||||
})
|
||||
console.log('uploading huge blob', objectName, Math.round(stat.size / 1024 / 1024), 'MB')
|
||||
const stream = readable.pipe(new PassThrough())
|
||||
await datalake.uploadMultipart(ctx, workspaceId, objectName, stream, metadata)
|
||||
await uploadMultipart(ctx, datalake, workspaceId, objectName, readable, metadata)
|
||||
console.log('done', objectName)
|
||||
} finally {
|
||||
readable.destroy()
|
||||
@ -416,6 +414,51 @@ export async function copyBlobToDatalake (
|
||||
}
|
||||
}
|
||||
|
||||
function uploadMultipart (
|
||||
ctx: MeasureContext,
|
||||
datalake: DatalakeClient,
|
||||
workspaceId: WorkspaceId,
|
||||
objectName: string,
|
||||
stream: Readable,
|
||||
metadata: UploadObjectParams
|
||||
): Promise<void> {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const passthrough = new PassThrough()
|
||||
|
||||
const cleanup = (): void => {
|
||||
stream.removeAllListeners()
|
||||
passthrough.removeAllListeners()
|
||||
stream.destroy()
|
||||
passthrough.destroy()
|
||||
}
|
||||
|
||||
stream.on('error', (err) => {
|
||||
ctx.error('error reading blob', { err })
|
||||
cleanup()
|
||||
reject(err)
|
||||
})
|
||||
passthrough.on('error', (err) => {
|
||||
ctx.error('error reading blob', { err })
|
||||
cleanup()
|
||||
reject(err)
|
||||
})
|
||||
|
||||
stream.pipe(passthrough)
|
||||
|
||||
datalake
|
||||
.uploadMultipart(ctx, workspaceId, objectName, passthrough, metadata)
|
||||
.then(() => {
|
||||
cleanup()
|
||||
resolve()
|
||||
})
|
||||
.catch((err) => {
|
||||
ctx.error('failed to upload blob', { err })
|
||||
cleanup()
|
||||
reject(err)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
export function formatSize (size: number): string {
|
||||
const units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']
|
||||
const pow = size === 0 ? 0 : Math.floor(Math.log(size) / Math.log(1024))
|
||||
|
@ -466,8 +466,8 @@ export class DatalakeClient {
|
||||
const response = await fetchSafe(ctx, url, { method: 'POST', body })
|
||||
return (await response.json()) as MultipartUploadPart
|
||||
} catch (err: any) {
|
||||
ctx.error('failed to abort multipart upload', { workspace, objectName, err })
|
||||
throw new DatalakeError('Failed to abort multipart upload')
|
||||
ctx.error('failed to upload multipart part', { workspace, objectName, err })
|
||||
throw new DatalakeError('Failed to upload multipart part')
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user