UBER-1263 Fix blobs migration on new workspaces (#6586)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2024-09-16 19:01:52 +07:00 committed by GitHub
parent afbe37d7ef
commit 9583f57cf0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 17 additions and 52 deletions

View File

@ -15,7 +15,6 @@
import { saveCollaborativeDoc } from '@hcengineering/collaboration' import { saveCollaborativeDoc } from '@hcengineering/collaboration'
import core, { import core, {
DOMAIN_BLOB,
DOMAIN_DOC_INDEX_STATE, DOMAIN_DOC_INDEX_STATE,
DOMAIN_SPACE, DOMAIN_SPACE,
DOMAIN_STATUS, DOMAIN_STATUS,
@ -28,11 +27,9 @@ import core, {
isClassIndexable, isClassIndexable,
makeCollaborativeDoc, makeCollaborativeDoc,
type AnyAttribute, type AnyAttribute,
type Blob,
type Doc, type Doc,
type Domain, type Domain,
type MeasureContext, type MeasureContext,
type Ref,
type Space, type Space,
type Status, type Status,
type TxCreateDoc type TxCreateDoc
@ -48,7 +45,7 @@ import {
type MigrationIterator, type MigrationIterator,
type MigrationUpgradeClient type MigrationUpgradeClient
} from '@hcengineering/model' } from '@hcengineering/model'
import { type StorageAdapter, type StorageAdapterEx } from '@hcengineering/storage' import { type StorageAdapter } from '@hcengineering/storage'
import { markupToYDoc } from '@hcengineering/text' import { markupToYDoc } from '@hcengineering/text'
async function migrateStatusesToModel (client: MigrationClient): Promise<void> { async function migrateStatusesToModel (client: MigrationClient): Promise<void> {
@ -278,7 +275,6 @@ export const coreOperation: MigrateOperation = {
} }
} }
) )
const exAdapter: StorageAdapterEx = client.storageAdapter as StorageAdapterEx
await tryMigrate(client, coreId, [ await tryMigrate(client, coreId, [
{ {
state: 'statuses-to-model', state: 'statuses-to-model',
@ -292,12 +288,6 @@ export const coreOperation: MigrateOperation = {
state: 'add-spaces-owner', state: 'add-spaces-owner',
func: migrateSpacesOwner func: migrateSpacesOwner
}, },
{
state: 'storage_blobs_v1',
func: async (client: MigrationClient) => {
await migrateBlobData(exAdapter, client)
}
},
{ {
state: 'old-statuses-transactions', state: 'old-statuses-transactions',
func: migrateStatusTransactions func: migrateStatusTransactions
@ -340,43 +330,3 @@ export const coreOperation: MigrateOperation = {
]) ])
} }
} }
async function migrateBlobData (exAdapter: StorageAdapterEx, client: MigrationClient): Promise<void> {
const ctx = new MeasureMetricsContext('storage_upgrade', {})
for (const [provider, adapter] of exAdapter.adapters?.entries() ?? []) {
if (!(await adapter.exists(ctx, client.workspaceId))) {
continue
}
const blobs = await adapter.listStream(ctx, client.workspaceId)
const bulk = new Map<Ref<Blob>, Blob>()
try {
const push = async (force: boolean): Promise<void> => {
if (bulk.size > 1000 || force) {
await client.deleteMany(DOMAIN_BLOB, { _id: { $in: Array.from(bulk.keys()) } })
await client.create(DOMAIN_BLOB, Array.from(bulk.values()))
bulk.clear()
}
}
while (true) {
const blob = await blobs.next()
if (blob === undefined) {
break
}
// We need to state details for blob.
const blobData = await adapter.stat(ctx, client.workspaceId, blob._id)
if (blobData !== undefined) {
bulk.set(blobData._id, {
...blobData,
provider
})
}
await push(false)
}
await push(true)
} catch (err: any) {
ctx.error('Error during blob migration', { error: err.message })
} finally {
await blobs.close()
}
}
}

View File

@ -200,9 +200,24 @@ export async function createWorkspace (
ctx.info('Starting init script if any') ctx.info('Starting init script if any')
await initializeWorkspace(ctx, branding, wsUrl, storageAdapter, client, ctxModellogger, async (value) => { await initializeWorkspace(ctx, branding, wsUrl, storageAdapter, client, ctxModellogger, async (value) => {
ctx.info('Init script progress', { value }) ctx.info('Init script progress', { value })
await handleWsEvent?.('progress', version, 30 + Math.round((Math.min(value, 100) / 100) * 70)) await handleWsEvent?.('progress', version, 30 + Math.round((Math.min(value, 100) / 100) * 60))
}) })
await upgradeWorkspace(
ctx,
version,
txes,
migrationOperation,
workspaceInfo,
ctxModellogger,
async (event, version, value) => {
ctx.info('Init script progress', { event, value })
await handleWsEvent?.('progress', version, 90 + Math.round((Math.min(value, 100) / 100) * 10))
},
false,
false
)
await pipeline.close() await pipeline.close()
await handleWsEvent?.('create-done', version, 100, '') await handleWsEvent?.('create-done', version, 100, '')