diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts index 54a130c343..ab76a7b39c 100644 --- a/server/backup/src/backup.ts +++ b/server/backup/src/backup.ts @@ -31,16 +31,16 @@ import core, { RateLimiter, Ref, SortingOrder, + systemAccountUuid, toIdMap, TxProcessor, - systemAccountUuid, - type WorkspaceUuid, type BackupStatus, type Blob, type DocIndexState, type Tx, type TxCUD, - type WorkspaceIds + type WorkspaceIds, + type WorkspaceUuid } from '@hcengineering/core' import { BlobClient, createClient, getTransactorEndpoint } from '@hcengineering/server-client' import { estimateDocSize, type StorageAdapter } from '@hcengineering/server-core' @@ -137,6 +137,8 @@ export interface BackupInfo { // A hash of current domain transactions, so we could skip all other checks if same. domainHashes: Record + + migrations: Record } async function loadDigest ( @@ -220,6 +222,7 @@ async function verifyDigest ( ctx.info('checking', { domain }) // We have required documents here. const validDocs = new Set>() + const zeroEntres = new Set>() for (const sf of d.storage ?? []) { const blobs = new Map() @@ -239,20 +242,27 @@ async function verifyDigest ( }) stream.on('end', () => { const bf = Buffer.concat(chunks as any) - const doc = JSON.parse(bf.toString()) as Doc - if (doc._class === core.class.Blob || doc._class === 'core:class:BlobData') { - const data = migradeBlobData(doc as Blob, '') - const d = blobs.get(bname) ?? (data !== '' ? Buffer.from(data, 'base64') : undefined) - if (d === undefined) { - blobs.set(bname, { doc, buffer: undefined }) - } else { - blobs.delete(bname) + try { + const doc = JSON.parse(bf.toString()) as Doc + if (doc._class === core.class.Blob || doc._class === 'core:class:BlobData') { + const data = migradeBlobData(doc as Blob, '') + const d = blobs.get(bname) ?? (data !== '' ? Buffer.from(data, 'base64') : undefined) + if (d === undefined) { + blobs.set(bname, { doc, buffer: undefined }) + } else { + blobs.delete(bname) + } } + validDocs.add(bname as Ref) + } catch (err: any) { + // If not a json, skip } - validDocs.add(bname as Ref) next() }) } else { + if (headers.size === 0) { + zeroEntres.add(name as any) + } next() } stream.resume() // just auto drain the stream @@ -286,6 +296,11 @@ async function verifyDigest ( storageToRemove.add(sf) } } + + // Clear zero files, they potentially wrong downloaded. + for (const zz of zeroEntres.values()) { + validDocs.delete(zz) + } if (storageToRemove.size > 0) { modified = true d.storage = (d.storage ?? []).filter((it) => !storageToRemove.has(it)) @@ -739,7 +754,10 @@ export async function backup ( workspace: workspaceId, version: '0.6.2', snapshots: [], - domainHashes: {} + domainHashes: {}, + migrations: { + zeroCheckSize: true // Assume already checked for new backups + } } // Version 0.6.2, format of digest file is changed to @@ -751,6 +769,20 @@ export async function backup ( } backupInfo.version = '0.6.2' + if (backupInfo.migrations == null) { + backupInfo.migrations = {} + } + + // Apply verification to backup, since we know it should have broken blobs + if (backupInfo.migrations.zeroCheckSize == null) { + await checkBackupIntegrity(ctx, storage) + if (await storage.exists(infoFile)) { + backupInfo = JSON.parse(gunzipSync(new Uint8Array(await storage.loadFile(infoFile))).toString()) + } + backupInfo.migrations.zeroCheckSize = true + await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) + } + backupInfo.workspace = workspaceId if (backupInfo.domainHashes === undefined) { @@ -2035,7 +2067,7 @@ export async function restore ( } } } catch (err: any) { - ctx.warn('failed to upload blob', { _id: blob._id, err, workspace: workspaceId }) + ctx.warn('failed to upload blob', { _id: blob._id, cause: err.cause?.message, workspace: workspaceId }) } } docsToAdd.delete(blob._id) @@ -2091,7 +2123,7 @@ export async function restore ( sz = bf.length } void sendBlob(blob, bf, next).catch((err) => { - ctx.error('failed to send blob', { err }) + ctx.error('failed to send blob', { message: err.message }) }) } }) diff --git a/server/client/src/blob.ts b/server/client/src/blob.ts index a1477e73f1..4edeabbd9b 100644 --- a/server/client/src/blob.ts +++ b/server/client/src/blob.ts @@ -13,7 +13,7 @@ // limitations under the License. // -import { type WorkspaceIds, type MeasureContext } from '@hcengineering/core' +import { type MeasureContext, type WorkspaceIds } from '@hcengineering/core' import type { StorageAdapter } from '@hcengineering/server-core' import { Buffer } from 'node:buffer' @@ -206,22 +206,34 @@ export class BlobClient { // TODO: We need to improve this logig, to allow restore of huge blobs for (let i = 0; i < 5; i++) { try { - await fetch( - this.transactorAPIUrl + - `?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}&size=${size}`, - { - method: 'PUT', - headers: { - Authorization: 'Bearer ' + this.token, - 'Content-Type': contentType - }, - body: buffer + const resp = await ( + await fetch( + this.transactorAPIUrl + + `?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}&size=${size}`, + { + keepalive: true, + method: 'PUT', + headers: { + Authorization: 'Bearer ' + this.token, + 'Content-Type': contentType + }, + body: buffer + } + ) + ).text() + try { + const json = JSON.parse(resp) + if (json.error !== undefined) { + ctx.error('failed to upload file, error from server', { name, message: json.error }) + return } - ) + } catch (err) { + console.log(err) + } break } catch (err: any) { if (i === 4) { - ctx.error('failed to upload file', { name }) + ctx.error('failed to upload file', { name, message: err.message, cause: err.cause?.message }) throw err } } diff --git a/server/indexer/src/indexer/indexer.ts b/server/indexer/src/indexer/indexer.ts index 6c00cc3b7e..944dcfbf27 100644 --- a/server/indexer/src/indexer/indexer.ts +++ b/server/indexer/src/indexer/indexer.ts @@ -39,8 +39,9 @@ import core, { type Space, TxFactory, type WithLookup, - coreId, type WorkspaceIds, + type WorkspaceUuid, + coreId, docKey, generateId, getFullTextIndexableAttributes, @@ -48,10 +49,9 @@ import core, { isClassIndexable, isFullTextAttribute, isIndexedAttribute, - toIdMap, - withContext, systemAccount, - type WorkspaceUuid + toIdMap, + withContext } from '@hcengineering/core' import drivePlugin, { type FileVersion } from '@hcengineering/drive' import type { @@ -229,7 +229,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { await this.addMigration(ctx, fullReindex) } - const docStructure = 'full-text-structure-v5' + const docStructure = 'full-text-structure-v6' if (migrations.find((it) => it.state === docStructure) === undefined) { ctx.warn('verify document structure', { version: docStructure, workspace: this.workspace.uuid }) @@ -267,7 +267,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { await this.storage.upload(ctx, DOMAIN_DOC_INDEX_STATE, missingDocs) } processed += docs.length - ctx.info('processed', { processed, allDocs: allDocs.length, domain }) + ctx.info('check-processed', { processed, allDocs: allDocs.length, domain }) } } catch (err: any) { ctx.error('failed to restore index state', { err }) @@ -776,9 +776,17 @@ export class FullTextIndexPipeline implements FullTextPipeline { await pushToIndex() await pushQueue.waitProcessing() - await ctx.with('update-index-state', {}, (ctx) => - this.storage.rawUpdate(DOMAIN_DOC_INDEX_STATE, DOMAIN_DOC_INDEX_STATE, docUpdates) - ) + await ctx.with('update-index-state', {}, async (ctx) => { + const ids = [...docUpdates.entries()] + const groups = groupByArray(ids, (it) => JSON.stringify(it[1])) + for (const [, values] of groups.entries()) { + const ids = values.map((it) => it[0]) + while (ids.length > 0) { + const part = ids.splice(0, 200) + await this.storage.rawUpdate(DOMAIN_DOC_INDEX_STATE, { _id: { $in: part } }, values[0][1]) + } + } + }) } private createContextData (): SessionDataImpl { diff --git a/server/indexer/src/mapper.ts b/server/indexer/src/mapper.ts index 12c8347c22..0975424def 100644 --- a/server/indexer/src/mapper.ts +++ b/server/indexer/src/mapper.ts @@ -19,9 +19,13 @@ import plugin, { } from '@hcengineering/server-core' export function findSearchPresenter (hierarchy: Hierarchy, _class: Ref>): SearchPresenter | undefined { - const searchMixin = hierarchy.classHierarchyMixin(_class, plugin.mixin.SearchPresenter) - if (searchMixin !== undefined) { - return searchMixin + try { + const searchMixin = hierarchy.classHierarchyMixin(_class, plugin.mixin.SearchPresenter) + if (searchMixin !== undefined) { + return searchMixin + } + } catch (err: any) { + // Ignore missing classes } return undefined } diff --git a/server/postgres/src/storage.ts b/server/postgres/src/storage.ts index 46d6b37347..9738994a6c 100644 --- a/server/postgres/src/storage.ts +++ b/server/postgres/src/storage.ts @@ -1705,7 +1705,9 @@ abstract class PostgresAdapterBase implements DbAdapter { const finalSql = sqlChunks.join(' ') return await this.mgr.retry(ctx.id, async (connection) => { const result = await connection.execute(finalSql, vars.getValues()) - return new Map(result.map((r) => [r[`_${field.toLowerCase()}`], r.count])) + return new Map( + result.map((r) => [r[`_${field.toLowerCase()}`], typeof r.count === 'string' ? parseInt(r.count) : r.count]) + ) }) } catch (err) { ctx.error('Error while grouping by', { domain, field }) diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index 4cbcd11a0c..4b7d881619 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -66,6 +66,15 @@ import { retrieveJson } from './utils' import { setImmediate } from 'timers/promises' +const MAX_FILE_SIZE = 100 * 1024 * 1024 // 100MB limit + +const KEEP_ALIVE_TIMEOUT = 5 // seconds +const KEEP_ALIVE_MAX = 1000 +const KEEP_ALIVE_HEADERS = { + Connection: 'keep-alive', + 'Keep-Alive': `timeout=${KEEP_ALIVE_TIMEOUT}, max=${KEEP_ALIVE_MAX}` +} + export type RequestHandler = (req: Request, res: ExpressResponse, next?: NextFunction) => Promise const catchError = (fn: RequestHandler) => (req: Request, res: ExpressResponse, next: NextFunction) => { @@ -285,7 +294,7 @@ export function startHttpServer ( try { const authHeader = req.headers.authorization if (authHeader === undefined) { - res.status(403).send({ error: 'Unauthorized' }) + res.status(403).end(JSON.stringify({ error: 'Unauthorized' })) return } @@ -293,16 +302,16 @@ export function startHttpServer ( const wsIds = await getWorkspaceIds(token) if (wsIds.uuid == null) { - res.status(401).send({ error: 'No workspace found' }) + res.status(401).end(JSON.stringify({ error: 'No workspace found' })) + return } const name = req.query.name as string const contentType = req.query.contentType as string const size = parseInt((req.query.size as string) ?? '-1') - const MAX_FILE_SIZE = 100 * 1024 * 1024 // 100MB limit if (size > MAX_FILE_SIZE) { - res.writeHead(413, { 'Content-Type': 'application/json' }) + res.writeHead(413, { 'Content-Type': 'application/json', ...KEEP_ALIVE_HEADERS }) res.end(JSON.stringify({ error: 'File too large' })) return } @@ -312,32 +321,38 @@ export function startHttpServer ( name, workspace: wsIds.uuid }) - res.writeHead(404, {}) + res.writeHead(404, { ...KEEP_ALIVE_HEADERS }) res.end() return } - ctx - .with( - 'storage upload', - { workspace: wsIds.uuid }, - (ctx) => externalStorage.put(ctx, wsIds, name, req, contentType, size !== -1 ? size : undefined), - { file: name, contentType } - ) - .then(() => { - res.writeHead(200, { 'Cache-Control': 'no-cache' }) - res.end() - }) - .catch((err) => { - Analytics.handleError(err) - ctx.error('/api/v1/blob put error', { err }) - res.writeHead(404, {}) - res.end() - }) + await ctx.with( + 'storage upload', + { workspace: wsIds.uuid }, + async (ctx) => { + await externalStorage.put( + ctx, + wsIds, + name, + size === 0 ? '' : req, + contentType, + size !== -1 ? size : undefined + ) + res.writeHead(200, { + ...KEEP_ALIVE_HEADERS, + 'Cache-Control': 'no-cache' + }) + res.end(JSON.stringify({ success: true })) + }, + { file: name, contentType } + ) } catch (err: any) { Analytics.handleError(err) ctx.error('/api/v1/blob put error', { err }) - res.writeHead(404, {}) - res.end() + res.writeHead(200, { + ...KEEP_ALIVE_HEADERS, + 'content-type': 'application/json' + }) + res.end(JSON.stringify({ error: err.message })) } }) ) @@ -500,10 +515,11 @@ export function startHttpServer ( }, 1000) } if ('upgrade' in s) { - void cs.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false) - setTimeout(() => { - cs.close() - }, 5000) + void cs + .send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false) + .then(() => { + cs.close() + }) } }) void webSocketData.session.catch((err) => {