UBERF-9167 Enhance blob migration tool (#7697)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2025-01-17 17:22:01 +07:00 committed by GitHub
parent 8829859d1e
commit 08e4ee5ec5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 63 additions and 13 deletions

View File

@ -1193,12 +1193,14 @@ export function devTool (
program
.command('copy-s3-datalake')
.description('migrate files from s3 to datalake')
.description('copy files from s3 to datalake')
.option('-w, --workspace <workspace>', 'Selected workspace only', '')
.option('-c, --concurrency <concurrency>', 'Number of files being processed concurrently', '10')
.action(async (cmd: { workspace: string, concurrency: string }) => {
.option('-e, --existing', 'Copy existing blobs', false)
.action(async (cmd: { workspace: string, concurrency: string, existing: boolean }) => {
const params = {
concurrency: parseInt(cmd.concurrency)
concurrency: parseInt(cmd.concurrency),
existing: cmd.existing
}
const storageConfig = storageConfigFromEnv(process.env.STORAGE)
@ -1222,14 +1224,32 @@ export function devTool (
workspaces = workspaces
.filter((p) => isActiveMode(p.mode) || isArchivingMode(p.mode))
.filter((p) => cmd.workspace === '' || p.workspace === cmd.workspace)
.sort((a, b) => b.lastVisit - a.lastVisit)
// .sort((a, b) => b.lastVisit - a.lastVisit)
.sort((a, b) => {
if (a.backupInfo !== undefined && b.backupInfo !== undefined) {
return b.backupInfo.blobsSize - a.backupInfo.blobsSize
} else if (b.backupInfo !== undefined) {
return 1
} else if (a.backupInfo !== undefined) {
return -1
} else {
return b.lastVisit - a.lastVisit
}
})
})
const count = workspaces.length
console.log('found workspaces', count)
let index = 0
for (const workspace of workspaces) {
index++
toolCtx.info('processing workspace', { workspace: workspace.workspace, index, count })
toolCtx.info('processing workspace', {
workspace: workspace.workspace,
index,
count,
blobsSize: workspace.backupInfo?.blobsSize ?? 0
})
const workspaceId = getWorkspaceId(workspace.workspace)
for (const config of storages) {

View File

@ -261,6 +261,7 @@ async function retryOnFailure<T> (
export interface CopyDatalakeParams {
concurrency: number
existing: boolean
}
export async function copyToDatalake (
@ -281,7 +282,9 @@ export async function copyToDatalake (
let time = Date.now()
let processedCnt = 0
let processedSize = 0
let skippedCnt = 0
let existingCnt = 0
let failedCnt = 0
function printStats (): void {
@ -291,14 +294,32 @@ export async function copyToDatalake (
processedCnt,
'skipped',
skippedCnt,
'existing',
existingCnt,
'failed',
failedCnt,
Math.round(duration / 1000) + 's'
Math.round(duration / 1000) + 's',
formatSize(processedSize)
)
time = Date.now()
}
const existing = new Set<string>()
let cursor: string | undefined = ''
let hasMore = true
while (hasMore) {
const res = await datalake.listObjects(ctx, workspaceId, cursor, 1000)
cursor = res.cursor
hasMore = res.cursor !== undefined
for (const blob of res.blobs) {
existing.add(blob.name)
}
}
console.info('found blobs in datalake:', existing.size)
const rateLimiter = new RateLimiter(params.concurrency)
const iterator = await adapter.listStream(ctx, workspaceId)
@ -315,6 +336,12 @@ export async function copyToDatalake (
continue
}
if (!params.existing && existing.has(objectName)) {
// TODO handle mutable blobs
existingCnt++
continue
}
await rateLimiter.add(async () => {
try {
await retryOnFailure(
@ -323,6 +350,7 @@ export async function copyToDatalake (
async () => {
await copyBlobToDatalake(ctx, workspaceId, blob, config, adapter, datalake)
processedCnt += 1
processedSize += blob.size
},
50
)
@ -352,11 +380,6 @@ export async function copyBlobToDatalake (
datalake: DatalakeClient
): Promise<void> {
const objectName = blob._id
const stat = await datalake.statObject(ctx, workspaceId, objectName)
if (stat !== undefined) {
return
}
if (blob.size < 1024 * 1024 * 64) {
// Handle small file
const { endpoint, accessKey: accessKeyId, secretKey: secretAccessKey, region } = config
@ -392,3 +415,10 @@ export async function copyBlobToDatalake (
}
}
}
export function formatSize (size: number): string {
const units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']
const pow = size === 0 ? 0 : Math.floor(Math.log(size) / Math.log(1024))
const val = (1.0 * size) / Math.pow(1024, pow)
return `${val.toFixed(2)} ${units[pow]}`
}

View File

@ -90,9 +90,9 @@ export class DatalakeClient {
async listObjects (
ctx: MeasureContext,
workspace: WorkspaceId,
cursor: string | undefined
cursor: string | undefined,
limit: number = 100
): Promise<ListObjectOutput> {
const limit = 100
const path = `/blob/${workspace.name}`
const url = new URL(concatLink(this.endpoint, path))
url.searchParams.append('limit', String(limit))