fix: parallel blob processing in tools an migration (#6391)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2024-08-26 14:17:16 +07:00 committed by GitHub
parent c92bd622ac
commit 7ea8606922
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 66 additions and 52 deletions

View File

@ -991,7 +991,8 @@ export function devTool (
.command('move-files') .command('move-files')
.option('-w, --workspace <workspace>', 'Selected workspace only', '') .option('-w, --workspace <workspace>', 'Selected workspace only', '')
.option('-bl, --blobLimit <blobLimit>', 'A blob size limit in megabytes (default 50mb)', '50') .option('-bl, --blobLimit <blobLimit>', 'A blob size limit in megabytes (default 50mb)', '50')
.action(async (cmd: { workspace: string, blobLimit: string }) => { .option('-c, --concurrency <concurrency>', 'Number of files being processed concurrently', '10')
.action(async (cmd: { workspace: string, blobLimit: string, concurrency: string }) => {
const { mongodbUri } = prepareTools() const { mongodbUri } = prepareTools()
await withDatabase(mongodbUri, async (db, client) => { await withDatabase(mongodbUri, async (db, client) => {
await withStorage(mongodbUri, async (adapter) => { await withStorage(mongodbUri, async (adapter) => {
@ -1010,7 +1011,10 @@ export function devTool (
} }
const wsId = getWorkspaceId(workspace.workspace) const wsId = getWorkspaceId(workspace.workspace)
await moveFiles(toolCtx, wsId, exAdapter, parseInt(cmd.blobLimit)) await moveFiles(toolCtx, wsId, exAdapter, {
blobSizeLimitMb: parseInt(cmd.blobLimit),
concurrency: parseInt(cmd.concurrency)
})
} }
} catch (err: any) { } catch (err: any) {
console.error(err) console.error(err)

View File

@ -13,7 +13,7 @@
// limitations under the License. // limitations under the License.
// //
import { type Blob, type MeasureContext, type WorkspaceId } from '@hcengineering/core' import { type Blob, type MeasureContext, type WorkspaceId, RateLimiter } from '@hcengineering/core'
import { type StorageAdapterEx } from '@hcengineering/server-core' import { type StorageAdapterEx } from '@hcengineering/server-core'
import { PassThrough } from 'stream' import { PassThrough } from 'stream'
@ -21,7 +21,10 @@ export async function moveFiles (
ctx: MeasureContext, ctx: MeasureContext,
workspaceId: WorkspaceId, workspaceId: WorkspaceId,
exAdapter: StorageAdapterEx, exAdapter: StorageAdapterEx,
params: {
blobSizeLimitMb: number blobSizeLimitMb: number
concurrency: number
}
): Promise<void> { ): Promise<void> {
if (exAdapter.adapters === undefined) return if (exAdapter.adapters === undefined) return
@ -35,7 +38,11 @@ export async function moveFiles (
for (const [name, adapter] of exAdapter.adapters.entries()) { for (const [name, adapter] of exAdapter.adapters.entries()) {
if (name === target) continue if (name === target) continue
console.log('moving from', name) console.log('moving from', name, 'limit', params.blobSizeLimitMb, 'concurrency', params.concurrency)
let time = Date.now()
const rateLimiter = new RateLimiter(params.concurrency)
const iterator = await adapter.listStream(ctx, workspaceId) const iterator = await adapter.listStream(ctx, workspaceId)
while (true) { while (true) {
@ -46,11 +53,12 @@ export async function moveFiles (
if (blob === undefined) continue if (blob === undefined) continue
if (blob.provider === target) continue if (blob.provider === target) continue
if (blob.size > blobSizeLimitMb * 1024 * 1024) { if (blob.size > params.blobSizeLimitMb * 1024 * 1024) {
console.log('skipping large blob', name, data._id, Math.round(blob.size / 1024 / 1024)) console.log('skipping large blob', name, data._id, Math.round(blob.size / 1024 / 1024))
continue continue
} }
await rateLimiter.exec(async () => {
try { try {
await retryOnFailure( await retryOnFailure(
ctx, ctx,
@ -63,12 +71,19 @@ export async function moveFiles (
} catch (err) { } catch (err) {
console.error('failed to process blob', name, data._id, err) console.error('failed to process blob', name, data._id, err)
} }
})
count += 1 count += 1
if (count % 100 === 0) { if (count % 100 === 0) {
console.log('...moved: ', count) await rateLimiter.waitProcessing()
const duration = Date.now() - time
time = Date.now()
console.log('...moved: ', count, Math.round(duration / 1000))
} }
} }
await rateLimiter.waitProcessing()
await iterator.close() await iterator.close()
} }

View File

@ -13,13 +13,14 @@
// limitations under the License. // limitations under the License.
// //
import { saveCollaborativeDoc, takeCollaborativeDocSnapshot } from '@hcengineering/collaboration' import { saveCollaborativeDoc } from '@hcengineering/collaboration'
import core, { import core, {
DOMAIN_BLOB, DOMAIN_BLOB,
DOMAIN_DOC_INDEX_STATE, DOMAIN_DOC_INDEX_STATE,
DOMAIN_STATUS, DOMAIN_STATUS,
DOMAIN_TX, DOMAIN_TX,
MeasureMetricsContext, MeasureMetricsContext,
RateLimiter,
collaborativeDocParse, collaborativeDocParse,
coreId, coreId,
generateId, generateId,
@ -188,7 +189,10 @@ async function processMigrateContentFor (
storageAdapter: StorageAdapter, storageAdapter: StorageAdapter,
iterator: MigrationIterator<Doc> iterator: MigrationIterator<Doc>
): Promise<void> { ): Promise<void> {
const rateLimiter = new RateLimiter(10)
let processed = 0 let processed = 0
while (true) { while (true) {
const docs = await iterator.next(1000) const docs = await iterator.next(1000)
if (docs === null || docs.length === 0) { if (docs === null || docs.length === 0) {
@ -201,6 +205,7 @@ async function processMigrateContentFor (
const operations: { filter: MigrationDocumentQuery<Doc>, update: MigrateUpdate<Doc> }[] = [] const operations: { filter: MigrationDocumentQuery<Doc>, update: MigrateUpdate<Doc> }[] = []
for (const doc of docs) { for (const doc of docs) {
await rateLimiter.exec(async () => {
const update: MigrateUpdate<Doc> = {} const update: MigrateUpdate<Doc> = {}
for (const attribute of attributes) { for (const attribute of attributes) {
@ -214,19 +219,6 @@ async function processMigrateContentFor (
if (blob === undefined) { if (blob === undefined) {
const ydoc = markupToYDoc(value, attribute.name) const ydoc = markupToYDoc(value, attribute.name)
await saveCollaborativeDoc(storageAdapter, client.workspaceId, collaborativeDoc, ydoc, ctx) await saveCollaborativeDoc(storageAdapter, client.workspaceId, collaborativeDoc, ydoc, ctx)
await takeCollaborativeDocSnapshot(
storageAdapter,
client.workspaceId,
collaborativeDoc,
ydoc,
{
versionId: revisionId,
name: 'Migration to storage',
createdBy: core.account.System,
createdOn: Date.now()
},
ctx
)
} }
update[attribute.name] = collaborativeDoc update[attribute.name] = collaborativeDoc
@ -238,8 +230,11 @@ async function processMigrateContentFor (
if (Object.keys(update).length > 0) { if (Object.keys(update).length > 0) {
operations.push({ filter: { _id: doc._id }, update }) operations.push({ filter: { _id: doc._id }, update })
} }
})
} }
await rateLimiter.waitProcessing()
if (operations.length > 0) { if (operations.length > 0) {
await client.bulk(domain, operations) await client.bulk(domain, operations)
} }