Few fixes to uwebsocket and backup/restore (#5698)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-05-30 16:19:23 +07:00 committed by GitHub
parent 6bec327f61
commit cc7f962b90
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 497 additions and 219 deletions

View File

@ -92,7 +92,6 @@ export async function cleanWorkspace (
}
}
await storageAdapter.remove(ctx, workspaceId, toClean)
// connection.loadChunk(DOMAIN_BLOB, idx = )
if (opt.recruit) {
const contacts = await ops.findAll(recruit.mixin.Candidate, {})

View File

@ -69,6 +69,7 @@ import { getMongoClient, getWorkspaceDB } from '@hcengineering/mongo'
import { openAIConfigDefaults } from '@hcengineering/openai'
import type { StorageAdapter } from '@hcengineering/server-core'
import { deepEqual } from 'fast-equals'
import { createWriteStream, readFileSync } from 'fs'
import { benchmark, benchmarkWorker } from './benchmark'
import {
cleanArchivedSpaces,
@ -87,7 +88,6 @@ import { fixJsonMarkup } from './markup'
import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin'
import { openAIConfig } from './openai'
import { fixAccountEmails, renameAccount } from './renameAccount'
import { readFileSync } from 'fs'
const colorConstants = {
colorRed: '\u001b[31m',
@ -586,17 +586,24 @@ export function devTool (
.description('dump workspace transactions and minio resources')
.option('-s, --skip <skip>', 'A list of ; separated domain names to skip during backup', '')
.option('-f, --force', 'Force backup', false)
.action(async (dirName: string, workspace: string, cmd: { skip: string, force: boolean }) => {
const storage = await createFileBackupStorage(dirName)
await backup(
toolCtx,
transactorUrl,
getWorkspaceId(workspace, productId),
storage,
(cmd.skip ?? '').split(';').map((it) => it.trim()),
cmd.force
)
})
.option('-c, --recheck', 'Force hash recheck on server', false)
.option('-t, --timeout <timeout>', 'Connect timeout in seconds', '30')
.action(
async (
dirName: string,
workspace: string,
cmd: { skip: string, force: boolean, recheck: boolean, timeout: string }
) => {
const storage = await createFileBackupStorage(dirName)
await backup(toolCtx, transactorUrl, getWorkspaceId(workspace, productId), storage, {
force: cmd.force,
recheck: cmd.recheck,
skipDomains: (cmd.skip ?? '').split(';').map((it) => it.trim()),
timeout: 0,
connectTimeout: parseInt(cmd.timeout) * 1000
})
}
)
program
.command('backup-compact <dirName>')
@ -611,19 +618,23 @@ export function devTool (
.command('backup-restore <dirName> <workspace> [date]')
.option('-m, --merge', 'Enable merge of remote and backup content.', false)
.option('-p, --parallel <parallel>', 'Enable merge of remote and backup content.', '1')
.option('-c, --recheck', 'Force hash recheck on server', false)
.description('dump workspace transactions and minio resources')
.action(async (dirName: string, workspace: string, date, cmd: { merge: boolean, parallel: string }) => {
const storage = await createFileBackupStorage(dirName)
await restore(
toolCtx,
transactorUrl,
getWorkspaceId(workspace, productId),
storage,
parseInt(date ?? '-1'),
cmd.merge,
parseInt(cmd.parallel ?? '1')
)
})
.action(
async (dirName: string, workspace: string, date, cmd: { merge: boolean, parallel: string, recheck: boolean }) => {
const storage = await createFileBackupStorage(dirName)
await restore(
toolCtx,
transactorUrl,
getWorkspaceId(workspace, productId),
storage,
parseInt(date ?? '-1'),
cmd.merge,
parseInt(cmd.parallel ?? '1'),
cmd.recheck
)
}
)
program
.command('backup-list <dirName>')
@ -848,6 +859,27 @@ export function devTool (
})
})
program
.command('download-file <workspace> <remote> <local>')
.action(async (workspace: string, remote: string, local: string, cmd: any) => {
const { mongodbUri } = prepareTools()
await withStorage(mongodbUri, async (adapter) => {
const blobClient = new BlobClient(transactorUrl, {
name: workspace,
productId
})
const wrstream = createWriteStream(local)
await blobClient.writeTo(toolCtx, remote, -1, {
write: (buffer, cb) => {
wrstream.write(buffer, cb)
},
end: (cb) => {
wrstream.end(cb)
}
})
})
})
program.command('fix-bw-workspace <workspace>').action(async (workspace: string) => {
const { mongodbUri } = prepareTools()
await withStorage(mongodbUri, async (adapter) => {

View File

@ -93,6 +93,7 @@ describe('client', () => {
}
return {
isConnected: () => true,
findAll,
searchFulltext: async (query: SearchQuery, options: SearchOptions): Promise<SearchResult> => {
@ -108,7 +109,7 @@ describe('client', () => {
},
close: async () => {},
loadChunk: async (domain: Domain, idx?: number) => ({
loadChunk: async (domain: Domain, idx?: number, recheck?: boolean) => ({
idx: -1,
index: -1,
docs: [],

View File

@ -43,6 +43,7 @@ export async function connect (handler: (tx: Tx) => void): Promise<ClientConnect
}
return {
isConnected: () => true,
findAll,
searchFulltext: async (query: SearchQuery, options: SearchOptions): Promise<SearchResult> => {
@ -59,7 +60,7 @@ export async function connect (handler: (tx: Tx) => void): Promise<ClientConnect
},
close: async () => {},
loadChunk: async (domain: Domain, idx?: number) => ({
loadChunk: async (domain: Domain, idx?: number, recheck?: boolean) => ({
idx: -1,
index: -1,
docs: [],

View File

@ -17,7 +17,7 @@ export interface DocChunk {
* @public
*/
export interface BackupClient {
loadChunk: (domain: Domain, idx?: number) => Promise<DocChunk>
loadChunk: (domain: Domain, idx?: number, recheck?: boolean) => Promise<DocChunk>
closeChunk: (idx: number) => Promise<void>
loadDocs: (domain: Domain, docs: Ref<Doc>[]) => Promise<Doc[]>

View File

@ -89,6 +89,8 @@ export enum ClientConnectEvent {
* @public
*/
export interface ClientConnection extends Storage, FulltextStorage, BackupClient {
isConnected: () => boolean
close: () => Promise<void>
onConnect?: (event: ClientConnectEvent) => Promise<void>
@ -184,8 +186,8 @@ class ClientImpl implements AccountClient, BackupClient, MeasureClient {
await this.conn.close()
}
async loadChunk (domain: Domain, idx?: number | undefined): Promise<DocChunk> {
return await this.conn.loadChunk(domain, idx)
async loadChunk (domain: Domain, idx?: number, recheck?: boolean): Promise<DocChunk> {
return await this.conn.loadChunk(domain, idx, recheck)
}
async closeChunk (idx: number): Promise<void> {

View File

@ -54,7 +54,8 @@ export interface SessionOperationContext {
*/
export interface LowLevelStorage {
// Low level streaming API to retrieve information
find: (ctx: MeasureContext, domain: Domain) => StorageIterator
// If recheck is passed, all %hash% for documents, will be re-calculated.
find: (ctx: MeasureContext, domain: Domain, recheck?: boolean) => StorageIterator
// Load passed documents from domain
load: (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]) => Promise<Doc[]>

View File

@ -39,6 +39,7 @@ export async function connect (handler: (tx: Tx) => void): Promise<
AccountClient &
BackupClient &
FulltextStorage & {
isConnected: () => boolean
loadModel: (last: Timestamp, hash?: string) => Promise<Tx[] | LoadModelResponse>
}
> {
@ -65,6 +66,7 @@ FulltextStorage & {
}
return {
isConnected: () => true,
findAll,
findOne: async (_class, query, options) => (await findAll(_class, query, { ...options, limit: 1 })).shift(),
getHierarchy: () => hierarchy,
@ -80,7 +82,7 @@ FulltextStorage & {
return {}
},
close: async () => {},
loadChunk: async (domain: Domain, idx?: number) => ({
loadChunk: async (domain: Domain, idx?: number, recheck?: boolean) => ({
idx: -1,
index: -1,
docs: [],

View File

@ -83,6 +83,7 @@ export interface StorageAdapter {
}
export interface StorageAdapterEx extends StorageAdapter {
defaultAdapter: string
adapters?: Map<string, StorageAdapter>
syncBlobFromStorage: (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string) => Promise<void>
@ -92,6 +93,7 @@ export interface StorageAdapterEx extends StorageAdapter {
* Ad dummy storage adapter for tests
*/
export class DummyStorageAdapter implements StorageAdapter, StorageAdapterEx {
defaultAdapter: string = ''
async syncBlobFromStorage (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<void> {}
async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {}

View File

@ -102,7 +102,9 @@ class Connection implements ClientConnection {
private readonly onUpgrade?: () => void,
private readonly onUnauthorized?: () => void,
readonly onConnect?: (event: ClientConnectEvent, data?: any) => Promise<void>
) {}
) {
this.scheduleOpen(false)
}
private schedulePing (socketId: number): void {
clearInterval(this.interval)
@ -154,11 +156,15 @@ class Connection implements ClientConnection {
}
}
isConnected (): boolean {
return this.websocket != null && this.websocket.readyState === ClientSocketReadyState.OPEN
}
delay = 0
onConnectHandlers: (() => void)[] = []
private waitOpenConnection (): Promise<void> | undefined {
if (this.websocket != null && this.websocket.readyState === ClientSocketReadyState.OPEN) {
if (this.isConnected()) {
return undefined
}
@ -625,8 +631,8 @@ class Connection implements ClientConnection {
})
}
loadChunk (domain: Domain, idx?: number): Promise<DocChunk> {
return this.sendRequest({ method: 'loadChunk', params: [domain, idx] })
loadChunk (domain: Domain, idx?: number, recheck?: boolean): Promise<DocChunk> {
return this.sendRequest({ method: 'loadChunk', params: [domain, idx, recheck] })
}
closeChunk (idx: number): Promise<void> {
@ -657,7 +663,7 @@ class Connection implements ClientConnection {
/**
* @public
*/
export async function connect (
export function connect (
url: string,
handler: TxHandler,
workspace: string,
@ -665,7 +671,7 @@ export async function connect (
onUpgrade?: () => void,
onUnauthorized?: () => void,
onConnect?: (event: ClientConnectEvent, data?: any) => void
): Promise<ClientConnection> {
): ClientConnection {
return new Connection(url, handler, workspace, user, onUpgrade, onUnauthorized, async (event, data) => {
onConnect?.(event, data)
})

View File

@ -25,7 +25,8 @@ import core, {
TxWorkspaceEvent,
WorkspaceEvent,
concatLink,
createClient
createClient,
type ClientConnection
} from '@hcengineering/core'
import platform, {
Severity,
@ -109,7 +110,7 @@ export default async () => {
handler(...txes)
}
const tokenPayload: { workspace: string, email: string } = decodeTokenPayload(token)
return connect(
const clientConnection = connect(
url,
upgradeHandler,
tokenPayload.workspace,
@ -118,6 +119,24 @@ export default async () => {
onUnauthorized,
onConnect
)
const connectTimeout = getMetadata(clientPlugin.metadata.ConnectionTimeout)
if ((connectTimeout ?? 0) > 0) {
return new Promise<ClientConnection>((resolve, reject) => {
const connectTO = setTimeout(() => {
if (!clientConnection.isConnected()) {
clientConnection.onConnect = undefined
void clientConnection?.close()
reject(new Error(`Connection timeout, and no connection established to ${endpoint}`))
}
}, connectTimeout)
clientConnection.onConnect = async (event) => {
// Any event is fine, it means server is alive.
clearTimeout(connectTO)
resolve(clientConnection)
}
})
}
return Promise.resolve(clientConnection)
},
filterModel ? [...getPlugins(), ...(getMetadata(clientPlugin.metadata.ExtraPlugins) ?? [])] : undefined,
createModelPersistence(getWSFromToken(token)),

View File

@ -80,6 +80,7 @@ export default plugin(clientId, {
ExtraPlugins: '' as Metadata<Plugin[]>,
UseBinaryProtocol: '' as Metadata<boolean>,
UseProtocolCompression: '' as Metadata<boolean>,
ConnectionTimeout: '' as Metadata<number>,
OverridePersistenceStore: '' as Metadata<TxPersistenceStore>
},
function: {

View File

@ -351,7 +351,9 @@ export async function cloneWorkspace (
blobs.push(b)
cb()
},
end: () => {}
end: (cb) => {
cb()
}
})
await blobClientTarget.upload(blob._id, blob.size, blob.contentType, Buffer.concat(blobs))
@ -416,40 +418,50 @@ export async function backup (
transactorUrl: string,
workspaceId: WorkspaceId,
storage: BackupStorage,
skipDomains: string[] = [],
force: boolean = false,
timeout: number = -1
options: {
skipDomains: string[]
force: boolean
recheck: boolean
timeout: number
connectTimeout: number
} = { force: false, recheck: false, timeout: 0, skipDomains: [], connectTimeout: 30000 }
): Promise<void> {
ctx = ctx.newChild('backup', { workspaceId: workspaceId.name, force })
const connection = await ctx.with(
'connect',
{},
async () =>
(await connect(transactorUrl, workspaceId, undefined, {
mode: 'backup'
})) as unknown as CoreClient & BackupClient
)
const blobClient = new BlobClient(transactorUrl, workspaceId)
ctx.info('starting backup', { workspace: workspaceId.name })
ctx = ctx.newChild('backup', {
workspaceId: workspaceId.name,
force: options.force,
recheck: options.recheck,
timeout: options.timeout
})
let canceled = false
let timer: any
if (timeout > 0) {
if (options.timeout > 0) {
timer = setTimeout(() => {
ctx.error('Timeout during backup', { workspace: workspaceId.name, timeout: timeout / 1000 })
ctx.error('Timeout during backup', { workspace: workspaceId.name, timeout: options.timeout / 1000 })
canceled = true
}, timeout)
}, options.timeout)
}
const connection = (await connect(
transactorUrl,
workspaceId,
undefined,
{
mode: 'backup'
},
undefined,
options.connectTimeout
)) as unknown as CoreClient & BackupClient
const blobClient = new BlobClient(transactorUrl, workspaceId)
ctx.info('starting backup', { workspace: workspaceId.name })
try {
const domains = [
...connection
.getHierarchy()
.domains()
.filter((it) => it !== DOMAIN_TRANSIENT && it !== DOMAIN_MODEL && !skipDomains.includes(it))
.filter((it) => it !== DOMAIN_TRANSIENT && it !== DOMAIN_MODEL && !options.skipDomains.includes(it))
]
ctx.info('domains for dump', { domains: domains.length })
@ -479,7 +491,7 @@ export async function backup (
{ limit: 1, sort: { modifiedOn: SortingOrder.Descending } }
)
if (lastTx !== undefined) {
if (lastTx._id === backupInfo.lastTxId && !force) {
if (lastTx._id === backupInfo.lastTxId && !options.force) {
ctx.info('No transaction changes. Skipping backup.', { workspace: workspaceId.name })
return
}
@ -498,6 +510,22 @@ export async function backup (
backupIndex = '0' + backupIndex
}
let downloadedMb = 0
let downloaded = 0
const printDownloaded = (msg: string, size: number): void => {
downloaded += size
const newDownloadedMb = Math.round(downloaded / (1024 * 1024))
const newId = Math.round(newDownloadedMb / 10)
if (downloadedMb !== newId) {
downloadedMb = newId
ctx.info('Downloaded', {
msg,
written: newDownloadedMb
})
}
}
async function loadChangesFromServer (
ctx: MeasureContext,
domain: Domain,
@ -512,7 +540,11 @@ export async function backup (
// Load all digest from collection.
while (true) {
try {
const currentChunk = await ctx.with('loadChunk', {}, async () => await connection.loadChunk(domain, idx))
const currentChunk = await ctx.with(
'loadChunk',
{},
async () => await connection.loadChunk(domain, idx, options.recheck)
)
idx = currentChunk.idx
let needRetrieve: Ref<Doc>[] = []
@ -564,7 +596,7 @@ export async function backup (
console.error(err)
ctx.error('failed to load chunks', { error: err })
if (idx !== undefined) {
await ctx.with('loadChunk', {}, async () => {
await ctx.with('closeChunk', {}, async () => {
await connection.closeChunk(idx as number)
})
}
@ -709,7 +741,7 @@ export async function backup (
addedDocuments += blob.size
let blobFiled = false
if (!(await blobClient.checkFile(ctx, blob._id))) {
if (blob.size !== 0 && !(await blobClient.checkFile(ctx, blob._id))) {
ctx.error('failed to download blob', { blob: blob._id, provider: blob.provider })
processChanges(d, true)
continue
@ -718,13 +750,27 @@ export async function backup (
_pack.entry({ name: d._id + '.json' }, descrJson, function (err) {
if (err != null) throw err
})
printDownloaded(blob._id, descrJson.length)
try {
const entry = _pack?.entry({ name: d._id, size: blob.size }, (err) => {
if (err != null) {
ctx.error('error packing file', err)
}
})
await blobClient.writeTo(ctx, blob._id, blob.size, entry)
if (blob.size === 0) {
entry.end()
} else {
await blobClient.writeTo(ctx, blob._id, blob.size, {
write (buffer, cb) {
entry.write(buffer, cb)
},
end: (cb: () => void) => {
entry.end(cb)
}
})
}
printDownloaded(blob._id, blob.size)
} catch (err: any) {
if (err.message?.startsWith('No file for') === true) {
ctx.error('failed to download blob', { message: err.message })
@ -741,6 +787,7 @@ export async function backup (
if (err != null) throw err
})
processChanges(d)
printDownloaded(d._id, data.length)
}
}
}
@ -788,7 +835,7 @@ export async function backup (
ctx.info('end backup', { workspace: workspaceId.name })
await connection.close()
ctx.end()
if (timeout !== -1) {
if (options.timeout !== -1) {
clearTimeout(timer)
}
}
@ -821,7 +868,8 @@ export async function restore (
storage: BackupStorage,
date: number,
merge?: boolean,
parallel?: number
parallel?: number,
recheck?: boolean
): Promise<void> {
const infoFile = 'backup.json.gz'
@ -875,7 +923,7 @@ export async function restore (
try {
while (true) {
const st = Date.now()
const it = await connection.loadChunk(c, idx)
const it = await connection.loadChunk(c, idx, recheck)
chunks++
idx = it.idx

View File

@ -62,19 +62,32 @@ class BackupWorker {
async schedule (ctx: MeasureContext): Promise<void> {
console.log('schedule timeout for', this.config.Interval, ' seconds')
this.interval = setTimeout(() => {
void this.backup(ctx).then(() => {
void this.schedule(ctx)
void this.backup(ctx).then((failed) => {
if (failed.length > 0) {
ctx.info('Failed to backup workspaces, Retry failed workspaces once.', { failed: failed.length })
void this.doBackup(ctx, failed).then(() => {
void this.schedule(ctx)
})
} else {
void this.schedule(ctx)
}
})
}, this.config.Interval * 1000)
}
async backup (ctx: MeasureContext): Promise<void> {
async backup (ctx: MeasureContext): Promise<BaseWorkspaceInfo[]> {
const workspaces = await getWorkspaces(this.config.AccountsURL, this.config.Token)
workspaces.sort((a, b) => b.lastVisit - a.lastVisit)
return await this.doBackup(ctx, workspaces)
}
async doBackup (ctx: MeasureContext, workspaces: BaseWorkspaceInfo[]): Promise<BaseWorkspaceInfo[]> {
let index = 0
const failedWorkspaces: BaseWorkspaceInfo[] = []
for (const ws of workspaces) {
if (this.canceled) {
return
return failedWorkspaces
}
index++
ctx.info('\n\nBACKUP WORKSPACE ', {
@ -91,20 +104,20 @@ class BackupWorker {
ws.workspace
)
await ctx.with('backup', { workspace: ws.workspace }, async (ctx) => {
await backup(
ctx,
this.config.TransactorURL,
getWorkspaceId(ws.workspace, ws.productId),
storage,
[],
false,
this.config.Timeout * 1000
)
await backup(ctx, this.config.TransactorURL, getWorkspaceId(ws.workspace, ws.productId), storage, {
skipDomains: [],
force: false,
recheck: false,
timeout: this.config.Timeout * 1000,
connectTimeout: 5 * 60 * 1000 // 5 minutes to
})
})
} catch (err: any) {
ctx.error('\n\nFAILED to BACKUP', { workspace: ws.workspace, err })
failedWorkspaces.push(ws)
}
}
return failedWorkspaces
}
}

View File

@ -107,7 +107,7 @@ export interface DbAdapter {
) => Promise<FindResult<T>>
tx: (ctx: MeasureContext, ...tx: Tx[]) => Promise<TxResult[]>
find: (ctx: MeasureContext, domain: Domain) => StorageIterator
find: (ctx: MeasureContext, domain: Domain, recheck?: boolean) => StorageIterator
load: (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]) => Promise<Doc[]>
upload: (ctx: MeasureContext, domain: Domain, docs: Doc[]) => Promise<void>

View File

@ -933,8 +933,8 @@ export class TServerStorage implements ServerStorage {
})
}
find (ctx: MeasureContext, domain: Domain): StorageIterator {
return this.getAdapter(domain, false).find(ctx, domain)
find (ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator {
return this.getAdapter(domain, false).find(ctx, domain, recheck)
}
async load (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {

View File

@ -193,6 +193,7 @@ describe('mongo operations', () => {
}
client = await createClient(async (handler) => {
const st: ClientConnection = {
isConnected: () => true,
findAll: async (_class, query, options) => await serverStorage.findAll(ctx, _class, query, options),
tx: async (tx) => await serverStorage.tx(soCtx, tx),
searchFulltext: async () => ({ docs: [] }),

View File

@ -74,6 +74,7 @@ import {
type Db,
type Document,
type Filter,
type FindCursor,
type Sort,
type UpdateFilter
} from 'mongodb'
@ -718,20 +719,11 @@ abstract class MongoAdapterBase implements DbAdapter {
return docs
}
find (_ctx: MeasureContext, domain: Domain): StorageIterator {
find (_ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator {
const ctx = _ctx.newChild('find', { domain })
const coll = this.db.collection<Doc>(domain)
let mode: 'hashed' | 'non-hashed' = 'hashed'
let iterator = coll.find(
{ '%hash%': { $nin: ['', null] } },
{
projection: {
'%hash%': 1,
_id: 1
}
}
)
let iterator: FindCursor<Doc>
const bulkUpdate = new Map<Ref<Doc>, string>()
const flush = async (flush = false): Promise<void> => {
if (bulkUpdate.size > 1000 || flush) {
@ -756,6 +748,20 @@ abstract class MongoAdapterBase implements DbAdapter {
return {
next: async () => {
if (iterator === undefined) {
if (recheck === true) {
await coll.updateMany({ '%hash%': { $ne: null } }, { $set: { '%hash%': null } })
}
iterator = coll.find(
{ '%hash%': { $nin: ['', null] } },
{
projection: {
'%hash%': 1,
_id: 1
}
}
)
}
let d = await ctx.with('next', { mode }, async () => await iterator.next())
if (d == null && mode === 'hashed') {
mode = 'non-hashed'

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import {
import core, {
Class,
Doc,
DocumentQuery,
@ -34,7 +34,7 @@ import {
} from '@hcengineering/core'
import { createMongoAdapter } from '@hcengineering/mongo'
import { PlatformError, unknownError } from '@hcengineering/platform'
import { DbAdapter, StorageAdapter } from '@hcengineering/server-core'
import { DbAdapter, StorageAdapter, type StorageAdapterEx } from '@hcengineering/server-core'
class StorageBlobAdapter implements DbAdapter {
constructor (
@ -73,6 +73,15 @@ class StorageBlobAdapter implements DbAdapter {
}
async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise<void> {
// We need to update docs to have provider === defualt one.
if ('adapters' in this.client) {
const adapterEx = this.client as StorageAdapterEx
for (const d of docs) {
if (d._class === core.class.Blob) {
;(d as Blob).provider = adapterEx.defaultAdapter
}
}
}
await this.blobAdapter.upload(ctx, domain, docs)
}

View File

@ -19,7 +19,7 @@ export interface ChunkInfo {
* @public
*/
export interface BackupSession extends Session {
loadChunk: (ctx: ClientSessionCtx, domain: Domain, idx?: number) => Promise<void>
loadChunk: (ctx: ClientSessionCtx, domain: Domain, idx?: number, recheck?: boolean) => Promise<void>
closeChunk: (ctx: ClientSessionCtx, idx: number) => Promise<void>
loadDocs: (ctx: ClientSessionCtx, domain: Domain, docs: Ref<Doc>[]) => Promise<void>
}
@ -38,7 +38,7 @@ export class BackupClientSession extends ClientSession implements BackupSession
idIndex = 0
chunkInfo = new Map<number, ChunkInfo>()
async loadChunk (_ctx: ClientSessionCtx, domain: Domain, idx?: number): Promise<void> {
async loadChunk (_ctx: ClientSessionCtx, domain: Domain, idx?: number, recheck?: boolean): Promise<void> {
this.lastRequest = Date.now()
await _ctx.ctx.with('load-chunk', { domain }, async (ctx) => {
idx = idx ?? this.idIndex++
@ -53,7 +53,7 @@ export class BackupClientSession extends ClientSession implements BackupSession
}
}
} else {
chunk = { idx, iterator: this._pipeline.storage.find(ctx, domain), finished: false, index: 0 }
chunk = { idx, iterator: this._pipeline.storage.find(ctx, domain, recheck), finished: false, index: 0 }
this.chunkInfo.set(idx, chunk)
}
let size = 0

View File

@ -30,13 +30,16 @@ import plugin from './plugin'
/**
* @public
*
* If connectTimeout is set, connect will try to connect only specified amount of time, and will return failure if failed.
*/
export async function connect (
transactorUrl: string,
workspace: WorkspaceId,
email?: string,
extra?: Record<string, string>,
model?: Tx[]
model?: Tx[],
connectTimeout: number = 0
): Promise<Client> {
const token = generateToken(email ?? systemAccountEmail, workspace, extra)
@ -46,6 +49,7 @@ export async function connect (
setMetadata(client.metadata.UseBinaryProtocol, true)
setMetadata(client.metadata.UseProtocolCompression, true)
setMetadata(client.metadata.ConnectionTimeout, connectTimeout)
setMetadata(client.metadata.ClientSocketFactory, (url) => {
const socket = new WebSocket(url, {
@ -98,26 +102,31 @@ export class BlobClient {
url = url.slice(0, url.length - 1)
}
this.transactorAPIUrl = url.replaceAll('wss://', 'https://').replace('ws://', 'http://') + '/api/v1/blob/'
this.transactorAPIUrl = url.replaceAll('wss://', 'https://').replace('ws://', 'http://') + '/api/v1/blob'
}
async checkFile (ctx: MeasureContext, name: string): Promise<boolean> {
try {
const response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, {
headers: {
Authorization: 'Bearer ' + this.token,
Range: 'bytes=0-1'
for (let i = 0; i < 5; i++) {
try {
const response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, {
headers: {
Authorization: 'Bearer ' + this.token,
Range: 'bytes=0-1'
}
})
if (response.status === 404) {
return false
}
})
if (response.status === 404) {
return false
const buff = await response.arrayBuffer()
return buff.byteLength > 0
} catch (err: any) {
if (i === 4) {
ctx.error('Failed to check file', { name, error: err })
}
await new Promise<void>((resolve) => setTimeout(resolve, 500))
}
const buff = await response.arrayBuffer()
return buff.byteLength > 0
} catch (err: any) {
ctx.error('Failed to check file', { name, error: err })
return false
}
return false
}
async writeTo (
@ -126,21 +135,23 @@ export class BlobClient {
size: number,
writable: {
write: (buffer: Buffer, cb: (err?: any) => void) => void
end: () => void
end: (cb: () => void) => void
}
): Promise<void> {
let written = 0
const chunkSize = 1024 * 1024
let writtenMb = 0
// Use ranges to iterave through file with retry if required.
while (written < size) {
while (written < size || size === -1) {
let i = 0
let response: Response | undefined
for (; i < 5; i++) {
try {
const response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, {
response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, {
headers: {
Authorization: 'Bearer ' + this.token,
Range: `bytes=${written}-${Math.min(size - 1, written + chunkSize)}`
Range: `bytes=${written}-${size === -1 ? written + chunkSize : Math.min(size - 1, written + chunkSize)}`
}
})
if (response.status === 404) {
@ -148,7 +159,27 @@ export class BlobClient {
// No file, so make it empty
throw new Error(`No file for ${this.transactorAPIUrl}/${this.workspace.name}/${name}`)
}
if (response.status === 416) {
if (size === -1) {
size = parseInt((response.headers.get('content-range') ?? '').split('*/')[1])
continue
}
// No file, so make it empty
throw new Error(`No file for ${this.transactorAPIUrl}/${this.workspace.name}/${name}`)
}
const chunk = Buffer.from(await response.arrayBuffer())
// We need to parse
// 'Content-Range': `bytes ${start}-${end}/${size}`
// To determine if something is left
const range = response.headers.get('Content-Range')
if (range !== null) {
const [, total] = range.split(' ')[1].split('/')
if (total !== undefined) {
size = parseInt(total)
}
}
await new Promise<void>((resolve, reject) => {
writable.write(chunk, (err) => {
if (err != null) {
@ -159,24 +190,32 @@ export class BlobClient {
})
written += chunk.length
if (size > 1024 * 1024) {
ctx.info('Downloaded', {
const newWrittenMb = Math.round(written / (1024 * 1024))
const newWrittenId = Math.round(newWrittenMb / 5)
if (writtenMb !== newWrittenId) {
writtenMb = newWrittenId
ctx.info(' >>>>Chunk', {
name,
written: Math.round(written / (1024 * 1024)),
written: newWrittenMb,
of: Math.round(size / (1024 * 1024))
})
}
break
} catch (err: any) {
if (i > 4) {
writable.end()
await new Promise<void>((resolve) => {
writable.end(resolve)
})
throw err
}
await new Promise<void>((resolve) => setTimeout(resolve, 1000))
// retry
}
}
}
writable.end()
await new Promise<void>((resolve) => {
writable.end(resolve)
})
}
async upload (name: string, size: number, contentType: string, buffer: Buffer): Promise<void> {

View File

@ -6,10 +6,12 @@ import type { Readable } from 'stream'
const cacheControlNoCache = 'public, no-store, no-cache, must-revalidate, max-age=0'
export interface BlobResponse {
aborted: boolean
writeHead: (code: number, header: Record<string, string | number>) => void
status: (code: number) => void
end: () => void
pipeFrom: (readable: Readable, size: number) => void
cork: (cb: () => void) => void
}
export async function getFile (
@ -22,8 +24,10 @@ export async function getFile (
const stat = await ctx.with('stat', {}, async () => await client.stat(ctx, workspace, file))
if (stat === undefined) {
ctx.error('No such key', { file })
res.status(404)
res.end()
res.cork(() => {
res.status(404)
res.end()
})
return
}
@ -33,31 +37,37 @@ export async function getFile (
async (ctx) => {
try {
const dataStream = await ctx.with('readable', {}, async () => await client.get(ctx, workspace, file))
res.writeHead(200, {
'Content-Type': stat.contentType,
Etag: stat.etag,
'Last-Modified': new Date(stat.modifiedOn).toISOString(),
'Cache-Control': cacheControlNoCache
})
res.pipeFrom(dataStream, stat.size)
await new Promise<void>((resolve, reject) => {
dataStream.on('end', function () {
res.end()
dataStream.destroy()
resolve()
})
dataStream.on('error', function (err) {
Analytics.handleError(err)
ctx.error('error', { err })
reject(err)
res.cork(() => {
res.writeHead(200, {
'Content-Type': stat.contentType,
Etag: stat.etag,
'Last-Modified': new Date(stat.modifiedOn).toISOString(),
'Cache-Control': cacheControlNoCache
})
res.pipeFrom(dataStream, stat.size)
dataStream.on('end', function () {
res.cork(() => {
res.end()
})
dataStream.destroy()
resolve()
})
dataStream.on('error', function (err) {
Analytics.handleError(err)
ctx.error('error', { err })
reject(err)
})
})
})
} catch (err: any) {
ctx.error('get-file-error', { workspace: workspace.name, err })
Analytics.handleError(err)
res.status(500)
res.end()
res.cork(() => {
res.status(500)
res.end()
})
}
},
{}
@ -92,19 +102,26 @@ export async function getFileRange (
const stat = await ctx.with('stats', {}, async () => await client.stat(ctx, workspace, uuid))
if (stat === undefined) {
ctx.error('No such key', { file: uuid })
res.status(404)
res.end()
res.cork(() => {
res.status(404)
res.end()
})
return
}
const size: number = stat.size
const [start, end] = getRange(range, size)
let [start, end] = getRange(range, size)
if (start >= size || end >= size) {
res.writeHead(416, {
'Content-Range': `bytes */${size}`
if (end >= size) {
end = size // Allow to iterative return of entire document
}
if (start >= size) {
res.cork(() => {
res.writeHead(416, {
'Content-Range': `bytes */${size}`
})
res.end()
})
res.end()
return
}
@ -119,41 +136,52 @@ export async function getFileRange (
async () => await client.partial(ctx, workspace, uuid, start, end - start + 1),
{}
)
res.writeHead(206, {
Connection: 'keep-alive',
'Content-Range': `bytes ${start}-${end}/${size}`,
'Accept-Ranges': 'bytes',
'Content-Length': end - start + 1,
'Content-Type': stat.contentType,
Etag: stat.etag,
'Last-Modified': new Date(stat.modifiedOn).toISOString()
})
res.pipeFrom(dataStream, end - start)
await new Promise<void>((resolve, reject) => {
dataStream.on('end', function () {
res.end()
dataStream.destroy()
resolve()
})
dataStream.on('error', function (err) {
Analytics.handleError(err)
ctx.error('error', { err })
reject(err)
res.cork(() => {
res.writeHead(206, {
Connection: 'keep-alive',
'Content-Range': `bytes ${start}-${end}/${size}`,
'Accept-Ranges': 'bytes',
// 'Content-Length': end - start + 1,
'Content-Type': stat.contentType,
Etag: stat.etag,
'Last-Modified': new Date(stat.modifiedOn).toISOString()
})
res.pipeFrom(dataStream, end - start)
dataStream.on('end', function () {
res.cork(() => {
res.end()
})
dataStream.destroy()
resolve()
})
dataStream.on('error', function (err) {
Analytics.handleError(err)
ctx.error('error', { err })
res.cork(() => {
res.end()
})
reject(err)
})
})
})
} catch (err: any) {
if (err?.code === 'NoSuchKey' || err?.code === 'NotFound') {
ctx.info('No such key', { workspace: workspace.name, uuid })
res.status(404)
res.end()
res.cork(() => {
res.status(404)
res.end()
})
return
} else {
Analytics.handleError(err)
ctx.error(err)
}
res.status(500)
res.end()
res.cork(() => {
res.status(500)
res.end()
})
}
},
{ uuid, start, end: end - start + 1 }

View File

@ -31,7 +31,7 @@ export const serverFactories: Record<string, ServerFactory> = {
externalStorage
)
} catch (err: any) {
console.error('uwebsocket.js is not supported, switcg back to nodejs ws')
console.error('uwebsocket.js is not supported, switcg back to nodejs ws', err)
return startHttpServer(
sessions,
handleRequest,

View File

@ -419,18 +419,18 @@ function createWebsocketClientSocket (
setImmediate(doSend)
return
}
try {
ws.send(smsg, { binary: true, compress: compression }, (err) => {
if (err != null) {
ws.send(smsg, { binary: true, compress: compression }, (err) => {
if (err != null) {
if (!`${err.message}`.includes('WebSocket is not open')) {
ctx.error('send error', { err })
Analytics.handleError(err)
reject(err)
}
// In case socket not open, just resolve
resolve()
})
} catch (err: any) {
if (err.message !== 'WebSocket is not open') {
ctx.error('send error', { err })
}
}
resolve()
})
}
doSend()
})
@ -441,6 +441,10 @@ function createWebsocketClientSocket (
}
function wrapRes (res: ExpressResponse): BlobResponse {
return {
aborted: false,
cork: (cb) => {
cb()
},
end: () => res.end(),
pipeFrom: (readable) => readable.pipe(res),
status: (code) => res.status(code),

View File

@ -31,10 +31,10 @@ import {
} from './types'
import type { StorageAdapter } from '@hcengineering/server-core'
import uWebSockets, { DISABLED, SHARED_COMPRESSOR, type HttpResponse, type WebSocket } from '@hcengineering/uws'
import { Readable } from 'stream'
import { getFile, getFileRange, type BlobResponse } from './blobs'
import { doSessionOp, processRequest, type WebsocketData } from './utils'
import uWebSockets, { DISABLED, SHARED_COMPRESSOR, type HttpResponse, type WebSocket } from '@hcengineering/uws'
interface WebsocketUserData extends WebsocketData {
backPressure?: Promise<void>
@ -70,9 +70,6 @@ export function startUWebsocketServer (
}
uAPP
.trace('/*', (res, req) => {
console.log(req.getUrl(), req.getMethod())
})
.ws<WebsocketUserData>('/*', {
/* There are many common helper features */
maxPayloadLength: 250 * 1024 * 1024,
@ -275,17 +272,22 @@ export function startUWebsocketServer (
const payload = decodeToken(authHeader.split(' ')[1])
const name = req.getQuery('name') as string
const wrappedRes = wrapRes(res)
res.onAborted(() => {
wrappedRes.aborted = true
})
const range = req.getHeader('range')
if (range !== undefined) {
void ctx.with('file-range', { workspace: payload.workspace.name }, async (ctx) => {
await getFileRange(ctx, range, externalStorage, payload.workspace, name, wrapRes(res))
await getFileRange(ctx, range, externalStorage, payload.workspace, name, wrappedRes)
})
} else {
void getFile(ctx, externalStorage, payload.workspace, name, wrapRes(res))
void getFile(ctx, externalStorage, payload.workspace, name, wrappedRes)
}
} catch (err: any) {
Analytics.handleError(err)
writeStatus(res, '404 ERROR').end()
}
})
.put('/api/v1/blob/*', (res, req) => {
@ -318,12 +320,11 @@ export function startUWebsocketServer (
} catch (err: any) {
Analytics.handleError(err)
console.error(err)
res.writeStatus('404 ERROR')
res.end()
writeStatus(res, '404 ERROR').end()
}
})
.any('/*', (res, req) => {
res.end('')
writeStatus(res, '404 ERROR').end()
})
.listen(port, (s) => {})
@ -408,28 +409,39 @@ function pipeFromRequest (res: HttpResponse): Readable {
return readable
}
function pipeStreamOverResponse (res: HttpResponse, readStream: Readable, totalSize: number): void {
function pipeStreamOverResponse (
res: HttpResponse,
readStream: Readable,
totalSize: number,
checkAborted: () => boolean
): void {
readStream
.on('data', (chunk) => {
if (checkAborted()) {
readStream.destroy()
return
}
const ab = toArrayBuffer(chunk)
const lastOffset = res.getWriteOffset()
const [ok, done] = res.tryEnd(ab, totalSize)
if (done) {
onAbortedOrFinishedResponse(res, readStream)
} else if (!ok) {
readStream.pause()
res.ab = ab
res.abOffset = lastOffset
res.onWritable((offset) => {
const [ok, done] = res.tryEnd(res.ab.slice(offset - res.abOffset), totalSize)
if (done) {
onAbortedOrFinishedResponse(res, readStream)
} else if (ok) {
readStream.resume()
}
return ok
})
}
res.cork(() => {
const [ok, done] = res.tryEnd(ab, totalSize)
if (done) {
onAbortedOrFinishedResponse(res, readStream)
} else if (!ok) {
readStream.pause()
res.ab = ab
res.abOffset = lastOffset
res.onWritable((offset) => {
const [ok, done] = res.tryEnd(res.ab.slice(offset - res.abOffset), totalSize)
if (done) {
onAbortedOrFinishedResponse(res, readStream)
} else if (ok) {
readStream.resume()
}
return ok
})
}
})
})
.on('error', (err) => {
Analytics.handleError(err)
@ -443,12 +455,64 @@ function pipeStreamOverResponse (res: HttpResponse, readStream: Readable, totalS
}
function wrapRes (res: HttpResponse): BlobResponse {
return {
end: () => res.end(),
status: (code) => res.status(code),
pipeFrom: (readable, size) => {
pipeStreamOverResponse(res, readable, size)
const result: BlobResponse = {
aborted: false,
cork: (cb: () => void) => {
if (result.aborted || res.id === -1) {
cb()
return
}
res.cork(cb)
},
writeHead: (code, header) => res.writeHead(code, header)
end: () => {
if (result.aborted || res.id === -1) {
return
}
res.end()
},
status: (code) => {
if (result.aborted || res.id === -1) {
return
}
switch (code) {
case 200:
res.writeStatus(`${code} OK`)
break
case 206:
res.writeStatus(`${code} Partial Content`)
break
case 304:
res.writeStatus(`${code} Not Modified`)
break
case 400:
res.writeStatus(`${code} Bad Request`)
break
case 404:
res.writeStatus(`${code} Not Found`)
break
case 416:
res.writeStatus(`${code} Range Not Satisfiable`)
break
default:
res.writeStatus(`${code} ERROR`)
break
}
},
pipeFrom: (readable, size) => {
if (result.aborted || res.id === -1) {
return
}
pipeStreamOverResponse(res, readable, size, () => result.aborted)
},
writeHead: (code, header) => {
if (result.aborted || res.id === -1) {
return
}
result.status(code)
for (const [k, v] of Object.entries(header)) {
res.writeHeader(k, `${v}`)
}
}
}
return result
}