mirror of
https://github.com/hcengineering/platform.git
synced 2025-01-24 04:17:50 +00:00
UBERF-7062: Fix backup memory usage and support missing blobs (#5665)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
ee04637f10
commit
2e6b0f3937
@ -44,6 +44,7 @@
|
||||
"tar-stream": "^2.2.0",
|
||||
"@hcengineering/server-tool": "^0.6.0",
|
||||
"@hcengineering/server-core": "^0.6.1",
|
||||
"@hcengineering/server-storage": "^0.6.0",
|
||||
"@hcengineering/server-backup": "^0.6.0",
|
||||
"@hcengineering/minio": "^0.6.0",
|
||||
"@hcengineering/server-token": "^0.6.7"
|
||||
|
@ -25,9 +25,7 @@ interface Config extends Omit<BackupConfig, 'Token'> {
|
||||
Timeout: number // Timeout in seconds
|
||||
BucketName: string
|
||||
|
||||
MinioEndpoint: string
|
||||
MinioAccessKey: string
|
||||
MinioSecretKey: string
|
||||
MongoURL: string
|
||||
}
|
||||
|
||||
const envMap: { [key in keyof Config]: string } = {
|
||||
@ -37,22 +35,11 @@ const envMap: { [key in keyof Config]: string } = {
|
||||
Secret: 'SECRET',
|
||||
BucketName: 'BUCKET_NAME',
|
||||
Interval: 'INTERVAL',
|
||||
MinioEndpoint: 'MINIO_ENDPOINT',
|
||||
MinioAccessKey: 'MINIO_ACCESS_KEY',
|
||||
MinioSecretKey: 'MINIO_SECRET_KEY',
|
||||
Timeout: 'TIMEOUT'
|
||||
Timeout: 'TIMEOUT',
|
||||
MongoURL: 'MONGO_URL'
|
||||
}
|
||||
|
||||
const required: Array<keyof Config> = [
|
||||
'TransactorURL',
|
||||
'AccountsURL',
|
||||
'Secret',
|
||||
'ServiceID',
|
||||
'BucketName',
|
||||
'MinioEndpoint',
|
||||
'MinioAccessKey',
|
||||
'MinioSecretKey'
|
||||
]
|
||||
const required: Array<keyof Config> = ['TransactorURL', 'AccountsURL', 'Secret', 'ServiceID', 'BucketName', 'MongoURL']
|
||||
|
||||
const config: Config = (() => {
|
||||
const params: Partial<Config> = {
|
||||
@ -62,10 +49,8 @@ const config: Config = (() => {
|
||||
BucketName: process.env[envMap.BucketName] ?? 'backups',
|
||||
ServiceID: process.env[envMap.ServiceID] ?? 'backup-service',
|
||||
Interval: parseInt(process.env[envMap.Interval] ?? '3600'),
|
||||
MinioEndpoint: process.env[envMap.MinioEndpoint],
|
||||
MinioAccessKey: process.env[envMap.MinioAccessKey],
|
||||
MinioSecretKey: process.env[envMap.MinioSecretKey],
|
||||
Timeout: parseInt(process.env[envMap.Timeout] ?? '3600')
|
||||
Timeout: parseInt(process.env[envMap.Timeout] ?? '3600'),
|
||||
MongoURL: process.env[envMap.MongoURL]
|
||||
}
|
||||
|
||||
const missingEnv = required.filter((key) => params[key] === undefined).map((key) => envMap[key])
|
||||
|
@ -14,32 +14,18 @@
|
||||
//
|
||||
|
||||
import { MeasureContext, systemAccountEmail } from '@hcengineering/core'
|
||||
import { MinioService } from '@hcengineering/minio'
|
||||
import { setMetadata } from '@hcengineering/platform'
|
||||
import { backupService } from '@hcengineering/server-backup'
|
||||
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
|
||||
import serverToken, { generateToken } from '@hcengineering/server-token'
|
||||
import toolPlugin from '@hcengineering/server-tool'
|
||||
import config from './config'
|
||||
import { StorageAdapter } from '@hcengineering/server-core'
|
||||
|
||||
export function startBackup (ctx: MeasureContext): void {
|
||||
setMetadata(serverToken.metadata.Secret, config.Secret)
|
||||
|
||||
let minioPort = 9000
|
||||
let minioEndpoint = config.MinioEndpoint
|
||||
const sp = minioEndpoint.split(':')
|
||||
if (sp.length > 1) {
|
||||
minioEndpoint = sp[0]
|
||||
minioPort = parseInt(sp[1])
|
||||
}
|
||||
|
||||
const storageAdapter: StorageAdapter = new MinioService({
|
||||
endpoint: minioEndpoint,
|
||||
port: minioPort,
|
||||
useSSL: 'false',
|
||||
accessKey: config.MinioAccessKey,
|
||||
secretKey: config.MinioSecretKey
|
||||
})
|
||||
const storageConfiguration = storageConfigFromEnv()
|
||||
const storageAdapter = buildStorageFromConfig(storageConfiguration, config.MongoURL)
|
||||
|
||||
setMetadata(toolPlugin.metadata.UserAgent, config.ServiceID)
|
||||
|
||||
|
@ -664,32 +664,59 @@ export async function backup (
|
||||
break
|
||||
}
|
||||
|
||||
// Move processed document to processedChanges
|
||||
if (changes.added.has(d._id)) {
|
||||
processedChanges.added.set(d._id, changes.added.get(d._id) ?? '')
|
||||
changes.added.delete(d._id)
|
||||
} else {
|
||||
processedChanges.updated.set(d._id, changes.updated.get(d._id) ?? '')
|
||||
changes.updated.delete(d._id)
|
||||
function processChanges (d: Doc, error: boolean = false): void {
|
||||
// Move processed document to processedChanges
|
||||
if (changes.added.has(d._id)) {
|
||||
if (!error) {
|
||||
processedChanges.added.set(d._id, changes.added.get(d._id) ?? '')
|
||||
}
|
||||
changes.added.delete(d._id)
|
||||
} else {
|
||||
if (!error) {
|
||||
processedChanges.updated.set(d._id, changes.updated.get(d._id) ?? '')
|
||||
}
|
||||
changes.updated.delete(d._id)
|
||||
}
|
||||
}
|
||||
if (d._class === core.class.Blob) {
|
||||
const blob = d as Blob
|
||||
const descrJson = JSON.stringify(d)
|
||||
addedDocuments += descrJson.length
|
||||
addedDocuments += blob.size
|
||||
|
||||
let blobFiled = false
|
||||
if (!(await blobClient.checkFile(ctx, blob._id))) {
|
||||
ctx.error('failed to download blob', { blob: blob._id, provider: blob.provider })
|
||||
processChanges(d, true)
|
||||
continue
|
||||
}
|
||||
|
||||
_pack.entry({ name: d._id + '.json' }, descrJson, function (err) {
|
||||
if (err != null) throw err
|
||||
})
|
||||
|
||||
_pack.entry({ name: d._id }, await blobClient.pipeFromStorage(blob._id, blob.size), function (err) {
|
||||
if (err != null) throw err
|
||||
})
|
||||
try {
|
||||
const entry = _pack?.entry({ name: d._id, size: blob.size }, (err) => {
|
||||
if (err != null) {
|
||||
ctx.error('error packing file', err)
|
||||
}
|
||||
})
|
||||
await blobClient.writeTo(ctx, blob._id, blob.size, entry)
|
||||
} catch (err: any) {
|
||||
if (err.message?.startsWith('No file for') === true) {
|
||||
ctx.error('failed to download blob', { message: err.message })
|
||||
} else {
|
||||
ctx.error('failed to download blob', { err })
|
||||
}
|
||||
blobFiled = true
|
||||
}
|
||||
processChanges(d, blobFiled)
|
||||
} else {
|
||||
const data = JSON.stringify(d)
|
||||
addedDocuments += data.length
|
||||
_pack.entry({ name: d._id + '.json' }, data, function (err) {
|
||||
if (err != null) throw err
|
||||
})
|
||||
processChanges(d)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -127,5 +127,5 @@ export type DbAdapterFactory = (
|
||||
url: string,
|
||||
workspaceId: WorkspaceId,
|
||||
modelDb: ModelDb,
|
||||
storage?: StorageAdapter
|
||||
storage: StorageAdapter
|
||||
) => Promise<DbAdapter>
|
||||
|
@ -94,15 +94,13 @@ export async function createStorageDataAdapter (
|
||||
url: string,
|
||||
workspaceId: WorkspaceId,
|
||||
modelDb: ModelDb,
|
||||
storage?: StorageAdapter
|
||||
storage: StorageAdapter
|
||||
): Promise<DbAdapter> {
|
||||
if (storage === undefined) {
|
||||
throw new Error('minio storage adapter require minio')
|
||||
}
|
||||
// We need to create bucket if it doesn't exist
|
||||
if (storage !== undefined) {
|
||||
await storage.make(ctx, workspaceId)
|
||||
}
|
||||
await storage.make(ctx, workspaceId)
|
||||
const blobAdapter = await createMongoAdapter(ctx, hierarchy, url, workspaceId, modelDb, undefined, {
|
||||
calculateHash: (d) => {
|
||||
return (d as Blob).etag
|
||||
|
@ -15,11 +15,19 @@
|
||||
//
|
||||
|
||||
import client, { clientId } from '@hcengineering/client'
|
||||
import { Client, LoadModelResponse, systemAccountEmail, Tx, WorkspaceId } from '@hcengineering/core'
|
||||
import {
|
||||
Client,
|
||||
LoadModelResponse,
|
||||
systemAccountEmail,
|
||||
Tx,
|
||||
WorkspaceId,
|
||||
type MeasureContext
|
||||
} from '@hcengineering/core'
|
||||
import { addLocation, getMetadata, getResource, setMetadata } from '@hcengineering/platform'
|
||||
import { generateToken } from '@hcengineering/server-token'
|
||||
import { mkdtempSync } from 'fs'
|
||||
import crypto from 'node:crypto'
|
||||
import { type Writable } from 'stream'
|
||||
import plugin from './plugin'
|
||||
|
||||
/**
|
||||
@ -89,14 +97,33 @@ export class BlobClient {
|
||||
this.tmpDir = mkdtempSync('blobs')
|
||||
}
|
||||
|
||||
async pipeFromStorage (name: string, size: number): Promise<Buffer> {
|
||||
async checkFile (ctx: MeasureContext, name: string): Promise<boolean> {
|
||||
try {
|
||||
const response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, {
|
||||
headers: {
|
||||
Authorization: 'Bearer ' + this.token,
|
||||
Range: 'bytes=0-1'
|
||||
}
|
||||
})
|
||||
if (response.status === 404) {
|
||||
return false
|
||||
}
|
||||
const buff = await response.arrayBuffer()
|
||||
return buff.byteLength > 0
|
||||
} catch (err: any) {
|
||||
ctx.error('Failed to check file', { name, error: err })
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
async writeTo (ctx: MeasureContext, name: string, size: number, writable: Writable): Promise<void> {
|
||||
let written = 0
|
||||
const chunkSize = 1024 * 1024
|
||||
const chunks: Buffer[] = []
|
||||
|
||||
// Use ranges to iterave through file with retry if required.
|
||||
while (written < size) {
|
||||
for (let i = 0; i < 5; i++) {
|
||||
let i = 0
|
||||
for (; i < 5; i++) {
|
||||
try {
|
||||
const response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, {
|
||||
headers: {
|
||||
@ -104,23 +131,40 @@ export class BlobClient {
|
||||
Range: `bytes=${written}-${Math.min(size - 1, written + chunkSize)}`
|
||||
}
|
||||
})
|
||||
if (response.status === 404) {
|
||||
i = 5
|
||||
// No file, so make it empty
|
||||
throw new Error(`No file for ${this.transactorAPIUrl}/${name}`)
|
||||
}
|
||||
const chunk = Buffer.from(await response.arrayBuffer())
|
||||
chunks.push(chunk)
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
writable.write(chunk, (err) => {
|
||||
if (err != null) {
|
||||
reject(err)
|
||||
}
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
|
||||
written += chunk.length
|
||||
if (size > 1024 * 1024) {
|
||||
console.log('Downloaded', Math.round(written / (1024 * 1024)), 'Mb of', Math.round(size / (1024 * 1024)))
|
||||
ctx.info('Downloaded', {
|
||||
name,
|
||||
written: Math.round(written / (1024 * 1024)),
|
||||
of: Math.round(size / (1024 * 1024))
|
||||
})
|
||||
}
|
||||
break
|
||||
} catch (err: any) {
|
||||
if (i === 4) {
|
||||
console.error(err)
|
||||
if (i > 4) {
|
||||
writable.end()
|
||||
throw err
|
||||
}
|
||||
// retry
|
||||
}
|
||||
}
|
||||
}
|
||||
return Buffer.concat(chunks)
|
||||
writable.end()
|
||||
}
|
||||
|
||||
async upload (name: string, size: number, contentType: string, buffer: Buffer): Promise<void> {
|
||||
|
@ -419,12 +419,18 @@ function createWebsocketClientSocket (
|
||||
setImmediate(doSend)
|
||||
return
|
||||
}
|
||||
ws.send(smsg, { binary: true, compress: compression }, (err) => {
|
||||
if (err != null) {
|
||||
reject(err)
|
||||
try {
|
||||
ws.send(smsg, { binary: true, compress: compression }, (err) => {
|
||||
if (err != null) {
|
||||
reject(err)
|
||||
}
|
||||
resolve()
|
||||
})
|
||||
} catch (err: any) {
|
||||
if (err.message !== 'WebSocket is not open') {
|
||||
ctx.error('send error', { err })
|
||||
}
|
||||
resolve()
|
||||
})
|
||||
}
|
||||
}
|
||||
doSend()
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user