UBERF-9500: Fix indexing on staging (#8231)

This commit is contained in:
Andrey Sobolev 2025-03-14 21:46:15 +07:00 committed by GitHub
parent 06605ed619
commit 448d6239ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 143 additions and 69 deletions

View File

@ -31,16 +31,16 @@ import core, {
RateLimiter,
Ref,
SortingOrder,
systemAccountUuid,
toIdMap,
TxProcessor,
systemAccountUuid,
type WorkspaceUuid,
type BackupStatus,
type Blob,
type DocIndexState,
type Tx,
type TxCUD,
type WorkspaceIds
type WorkspaceIds,
type WorkspaceUuid
} from '@hcengineering/core'
import { BlobClient, createClient, getTransactorEndpoint } from '@hcengineering/server-client'
import { estimateDocSize, type StorageAdapter } from '@hcengineering/server-core'
@ -137,6 +137,8 @@ export interface BackupInfo {
// A hash of current domain transactions, so we could skip all other checks if same.
domainHashes: Record<Domain, string>
migrations: Record<string, boolean>
}
async function loadDigest (
@ -220,6 +222,7 @@ async function verifyDigest (
ctx.info('checking', { domain })
// We have required documents here.
const validDocs = new Set<Ref<Doc>>()
const zeroEntres = new Set<Ref<Doc>>()
for (const sf of d.storage ?? []) {
const blobs = new Map<string, { doc: Doc | undefined, buffer: Buffer | undefined }>()
@ -239,20 +242,27 @@ async function verifyDigest (
})
stream.on('end', () => {
const bf = Buffer.concat(chunks as any)
const doc = JSON.parse(bf.toString()) as Doc
if (doc._class === core.class.Blob || doc._class === 'core:class:BlobData') {
const data = migradeBlobData(doc as Blob, '')
const d = blobs.get(bname) ?? (data !== '' ? Buffer.from(data, 'base64') : undefined)
if (d === undefined) {
blobs.set(bname, { doc, buffer: undefined })
} else {
blobs.delete(bname)
try {
const doc = JSON.parse(bf.toString()) as Doc
if (doc._class === core.class.Blob || doc._class === 'core:class:BlobData') {
const data = migradeBlobData(doc as Blob, '')
const d = blobs.get(bname) ?? (data !== '' ? Buffer.from(data, 'base64') : undefined)
if (d === undefined) {
blobs.set(bname, { doc, buffer: undefined })
} else {
blobs.delete(bname)
}
}
validDocs.add(bname as Ref<Doc>)
} catch (err: any) {
// If not a json, skip
}
validDocs.add(bname as Ref<Doc>)
next()
})
} else {
if (headers.size === 0) {
zeroEntres.add(name as any)
}
next()
}
stream.resume() // just auto drain the stream
@ -286,6 +296,11 @@ async function verifyDigest (
storageToRemove.add(sf)
}
}
// Clear zero files, they potentially wrong downloaded.
for (const zz of zeroEntres.values()) {
validDocs.delete(zz)
}
if (storageToRemove.size > 0) {
modified = true
d.storage = (d.storage ?? []).filter((it) => !storageToRemove.has(it))
@ -739,7 +754,10 @@ export async function backup (
workspace: workspaceId,
version: '0.6.2',
snapshots: [],
domainHashes: {}
domainHashes: {},
migrations: {
zeroCheckSize: true // Assume already checked for new backups
}
}
// Version 0.6.2, format of digest file is changed to
@ -751,6 +769,20 @@ export async function backup (
}
backupInfo.version = '0.6.2'
if (backupInfo.migrations == null) {
backupInfo.migrations = {}
}
// Apply verification to backup, since we know it should have broken blobs
if (backupInfo.migrations.zeroCheckSize == null) {
await checkBackupIntegrity(ctx, storage)
if (await storage.exists(infoFile)) {
backupInfo = JSON.parse(gunzipSync(new Uint8Array(await storage.loadFile(infoFile))).toString())
}
backupInfo.migrations.zeroCheckSize = true
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
}
backupInfo.workspace = workspaceId
if (backupInfo.domainHashes === undefined) {
@ -2035,7 +2067,7 @@ export async function restore (
}
}
} catch (err: any) {
ctx.warn('failed to upload blob', { _id: blob._id, err, workspace: workspaceId })
ctx.warn('failed to upload blob', { _id: blob._id, cause: err.cause?.message, workspace: workspaceId })
}
}
docsToAdd.delete(blob._id)
@ -2091,7 +2123,7 @@ export async function restore (
sz = bf.length
}
void sendBlob(blob, bf, next).catch((err) => {
ctx.error('failed to send blob', { err })
ctx.error('failed to send blob', { message: err.message })
})
}
})

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import { type WorkspaceIds, type MeasureContext } from '@hcengineering/core'
import { type MeasureContext, type WorkspaceIds } from '@hcengineering/core'
import type { StorageAdapter } from '@hcengineering/server-core'
import { Buffer } from 'node:buffer'
@ -206,22 +206,34 @@ export class BlobClient {
// TODO: We need to improve this logig, to allow restore of huge blobs
for (let i = 0; i < 5; i++) {
try {
await fetch(
this.transactorAPIUrl +
`?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}&size=${size}`,
{
method: 'PUT',
headers: {
Authorization: 'Bearer ' + this.token,
'Content-Type': contentType
},
body: buffer
const resp = await (
await fetch(
this.transactorAPIUrl +
`?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}&size=${size}`,
{
keepalive: true,
method: 'PUT',
headers: {
Authorization: 'Bearer ' + this.token,
'Content-Type': contentType
},
body: buffer
}
)
).text()
try {
const json = JSON.parse(resp)
if (json.error !== undefined) {
ctx.error('failed to upload file, error from server', { name, message: json.error })
return
}
)
} catch (err) {
console.log(err)
}
break
} catch (err: any) {
if (i === 4) {
ctx.error('failed to upload file', { name })
ctx.error('failed to upload file', { name, message: err.message, cause: err.cause?.message })
throw err
}
}

View File

@ -39,8 +39,9 @@ import core, {
type Space,
TxFactory,
type WithLookup,
coreId,
type WorkspaceIds,
type WorkspaceUuid,
coreId,
docKey,
generateId,
getFullTextIndexableAttributes,
@ -48,10 +49,9 @@ import core, {
isClassIndexable,
isFullTextAttribute,
isIndexedAttribute,
toIdMap,
withContext,
systemAccount,
type WorkspaceUuid
toIdMap,
withContext
} from '@hcengineering/core'
import drivePlugin, { type FileVersion } from '@hcengineering/drive'
import type {
@ -229,7 +229,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
await this.addMigration(ctx, fullReindex)
}
const docStructure = 'full-text-structure-v5'
const docStructure = 'full-text-structure-v6'
if (migrations.find((it) => it.state === docStructure) === undefined) {
ctx.warn('verify document structure', { version: docStructure, workspace: this.workspace.uuid })
@ -267,7 +267,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
await this.storage.upload(ctx, DOMAIN_DOC_INDEX_STATE, missingDocs)
}
processed += docs.length
ctx.info('processed', { processed, allDocs: allDocs.length, domain })
ctx.info('check-processed', { processed, allDocs: allDocs.length, domain })
}
} catch (err: any) {
ctx.error('failed to restore index state', { err })
@ -776,9 +776,17 @@ export class FullTextIndexPipeline implements FullTextPipeline {
await pushToIndex()
await pushQueue.waitProcessing()
await ctx.with('update-index-state', {}, (ctx) =>
this.storage.rawUpdate(DOMAIN_DOC_INDEX_STATE, DOMAIN_DOC_INDEX_STATE, docUpdates)
)
await ctx.with('update-index-state', {}, async (ctx) => {
const ids = [...docUpdates.entries()]
const groups = groupByArray(ids, (it) => JSON.stringify(it[1]))
for (const [, values] of groups.entries()) {
const ids = values.map((it) => it[0])
while (ids.length > 0) {
const part = ids.splice(0, 200)
await this.storage.rawUpdate(DOMAIN_DOC_INDEX_STATE, { _id: { $in: part } }, values[0][1])
}
}
})
}
private createContextData (): SessionDataImpl {

View File

@ -19,9 +19,13 @@ import plugin, {
} from '@hcengineering/server-core'
export function findSearchPresenter (hierarchy: Hierarchy, _class: Ref<Class<Doc>>): SearchPresenter | undefined {
const searchMixin = hierarchy.classHierarchyMixin(_class, plugin.mixin.SearchPresenter)
if (searchMixin !== undefined) {
return searchMixin
try {
const searchMixin = hierarchy.classHierarchyMixin(_class, plugin.mixin.SearchPresenter)
if (searchMixin !== undefined) {
return searchMixin
}
} catch (err: any) {
// Ignore missing classes
}
return undefined
}

View File

@ -1705,7 +1705,9 @@ abstract class PostgresAdapterBase implements DbAdapter {
const finalSql = sqlChunks.join(' ')
return await this.mgr.retry(ctx.id, async (connection) => {
const result = await connection.execute(finalSql, vars.getValues())
return new Map(result.map((r) => [r[`_${field.toLowerCase()}`], r.count]))
return new Map(
result.map((r) => [r[`_${field.toLowerCase()}`], typeof r.count === 'string' ? parseInt(r.count) : r.count])
)
})
} catch (err) {
ctx.error('Error while grouping by', { domain, field })

View File

@ -66,6 +66,15 @@ import { retrieveJson } from './utils'
import { setImmediate } from 'timers/promises'
const MAX_FILE_SIZE = 100 * 1024 * 1024 // 100MB limit
const KEEP_ALIVE_TIMEOUT = 5 // seconds
const KEEP_ALIVE_MAX = 1000
const KEEP_ALIVE_HEADERS = {
Connection: 'keep-alive',
'Keep-Alive': `timeout=${KEEP_ALIVE_TIMEOUT}, max=${KEEP_ALIVE_MAX}`
}
export type RequestHandler = (req: Request, res: ExpressResponse, next?: NextFunction) => Promise<void>
const catchError = (fn: RequestHandler) => (req: Request, res: ExpressResponse, next: NextFunction) => {
@ -285,7 +294,7 @@ export function startHttpServer (
try {
const authHeader = req.headers.authorization
if (authHeader === undefined) {
res.status(403).send({ error: 'Unauthorized' })
res.status(403).end(JSON.stringify({ error: 'Unauthorized' }))
return
}
@ -293,16 +302,16 @@ export function startHttpServer (
const wsIds = await getWorkspaceIds(token)
if (wsIds.uuid == null) {
res.status(401).send({ error: 'No workspace found' })
res.status(401).end(JSON.stringify({ error: 'No workspace found' }))
return
}
const name = req.query.name as string
const contentType = req.query.contentType as string
const size = parseInt((req.query.size as string) ?? '-1')
const MAX_FILE_SIZE = 100 * 1024 * 1024 // 100MB limit
if (size > MAX_FILE_SIZE) {
res.writeHead(413, { 'Content-Type': 'application/json' })
res.writeHead(413, { 'Content-Type': 'application/json', ...KEEP_ALIVE_HEADERS })
res.end(JSON.stringify({ error: 'File too large' }))
return
}
@ -312,32 +321,38 @@ export function startHttpServer (
name,
workspace: wsIds.uuid
})
res.writeHead(404, {})
res.writeHead(404, { ...KEEP_ALIVE_HEADERS })
res.end()
return
}
ctx
.with(
'storage upload',
{ workspace: wsIds.uuid },
(ctx) => externalStorage.put(ctx, wsIds, name, req, contentType, size !== -1 ? size : undefined),
{ file: name, contentType }
)
.then(() => {
res.writeHead(200, { 'Cache-Control': 'no-cache' })
res.end()
})
.catch((err) => {
Analytics.handleError(err)
ctx.error('/api/v1/blob put error', { err })
res.writeHead(404, {})
res.end()
})
await ctx.with(
'storage upload',
{ workspace: wsIds.uuid },
async (ctx) => {
await externalStorage.put(
ctx,
wsIds,
name,
size === 0 ? '' : req,
contentType,
size !== -1 ? size : undefined
)
res.writeHead(200, {
...KEEP_ALIVE_HEADERS,
'Cache-Control': 'no-cache'
})
res.end(JSON.stringify({ success: true }))
},
{ file: name, contentType }
)
} catch (err: any) {
Analytics.handleError(err)
ctx.error('/api/v1/blob put error', { err })
res.writeHead(404, {})
res.end()
res.writeHead(200, {
...KEEP_ALIVE_HEADERS,
'content-type': 'application/json'
})
res.end(JSON.stringify({ error: err.message }))
}
})
)
@ -500,10 +515,11 @@ export function startHttpServer (
}, 1000)
}
if ('upgrade' in s) {
void cs.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false)
setTimeout(() => {
cs.close()
}, 5000)
void cs
.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false)
.then(() => {
cs.close()
})
}
})
void webSocketData.session.catch((err) => {