UBERF-7165: Storage + Backup improvements (#5913)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-06-25 13:11:21 +07:00 committed by GitHub
parent 7f2a8779c6
commit 26d9155ec2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 1323 additions and 677 deletions

View File

@ -592,6 +592,12 @@ export function devTool (
.description('dump workspace transactions and minio resources')
.option('-i, --include <include>', 'A list of ; separated domain names to include during backup', '*')
.option('-s, --skip <skip>', 'A list of ; separated domain names to skip during backup', '')
.option(
'-ct, --contentTypes <contentTypes>',
'A list of ; separated content types for blobs to skip download if size >= limit',
''
)
.option('-bl, --blobLimit <blobLimit>', 'A blob size limit in megabytes (default 15mb)', '15')
.option('-f, --force', 'Force backup', false)
.option('-c, --recheck', 'Force hash recheck on server', false)
.option('-t, --timeout <timeout>', 'Connect timeout in seconds', '30')
@ -599,7 +605,15 @@ export function devTool (
async (
dirName: string,
workspace: string,
cmd: { skip: string, force: boolean, recheck: boolean, timeout: string, include: string }
cmd: {
skip: string
force: boolean
recheck: boolean
timeout: string
include: string
blobLimit: string
contentTypes: string
}
) => {
const storage = await createFileBackupStorage(dirName)
await backup(toolCtx, transactorUrl, getWorkspaceId(workspace, productId), storage, {
@ -608,7 +622,12 @@ export function devTool (
include: cmd.include === '*' ? undefined : new Set(cmd.include.split(';').map((it) => it.trim())),
skipDomains: (cmd.skip ?? '').split(';').map((it) => it.trim()),
timeout: 0,
connectTimeout: parseInt(cmd.timeout) * 1000
connectTimeout: parseInt(cmd.timeout) * 1000,
blobDownloadLimit: parseInt(cmd.blobLimit),
skipBlobContentTypes: cmd.contentTypes
.split(';')
.map((it) => it.trim())
.filter((it) => it.length > 0)
})
}
)

View File

@ -136,11 +136,11 @@ export class TAttachedDoc extends TDoc implements AttachedDoc {
export class TBlob extends TDoc implements Blob {
@Prop(TypeString(), core.string.Blob)
@ReadOnly()
@Index(IndexKind.Indexed)
provider!: string
@Prop(TypeString(), core.string.BlobContentType)
@ReadOnly()
@Index(IndexKind.Indexed)
contentType!: string
@Prop(TypeString(), core.string.BlobStorageId)

View File

@ -87,10 +87,11 @@ import {
TSpace,
TSpaceType,
TSpaceTypeDescriptor,
TTypedSpace,
TSystemSpace
TSystemSpace,
TTypedSpace
} from './security'
import { TStatus, TStatusCategory, TDomainStatusPlaceholder } from './status'
import { defineSpaceType } from './spaceType'
import { TDomainStatusPlaceholder, TStatus, TStatusCategory } from './status'
import { TUserStatus } from './transient'
import {
TTx,
@ -103,7 +104,6 @@ import {
TTxUpdateDoc,
TTxWorkspaceEvent
} from './tx'
import { defineSpaceType } from './spaceType'
export { coreId } from '@hcengineering/core'
export * from './core'

View File

@ -91,7 +91,7 @@ export default plugin(presentationId, {
CollaboratorApiUrl: '' as Metadata<string>,
Token: '' as Metadata<string>,
FrontUrl: '' as Asset,
PreviewConfig: '' as Metadata<PreviewConfig>
PreviewConfig: '' as Metadata<PreviewConfig | undefined>
},
status: {
FileTooLarge: '' as StatusCode

View File

@ -44,22 +44,9 @@ const defaultPreview = (): ProviderPreviewConfig => ({
- contentTypes - a ',' separated list of content type patterns.
*/
export function parsePreviewConfig (config?: string): PreviewConfig {
export function parsePreviewConfig (config?: string): PreviewConfig | undefined {
if (config === undefined) {
// TODO: Remove after all migrated
return {
default: defaultPreview(),
previewers: {
'': [
{
providerId: '',
contentTypes: ['image/gif', 'image/apng', 'image/svg'], // Disable gif and apng format preview.
formats: [],
previewUrl: ''
}
]
}
}
return
}
const result: PreviewConfig = { previewers: {} }
const nolineData = config
@ -99,7 +86,21 @@ export function parsePreviewConfig (config?: string): PreviewConfig {
}
export function getPreviewConfig (): PreviewConfig {
return getMetadata(presentation.metadata.PreviewConfig) as PreviewConfig
return (
(getMetadata(presentation.metadata.PreviewConfig) as PreviewConfig) ?? {
default: defaultPreview(),
previewers: {
'': [
{
providerId: '',
contentTypes: ['image/gif', 'image/apng', 'image/svg'], // Disable gif and apng format preview.
formats: [],
previewUrl: ''
}
]
}
}
)
}
export async function getBlobRef (

View File

@ -1443,7 +1443,7 @@ export class LiveQuery implements WithTx, Client {
} else {
const d = await this.findOne(lookupClass, { _id: pops[pkey] }, { lookup: nestedLookup })
if (d !== undefined) {
pp[pkey].push()
pp[pkey].push(d)
}
}
}

View File

@ -15,12 +15,13 @@
import {
type Blob,
type Branding,
type DocumentUpdate,
type MeasureContext,
type Ref,
type StorageIterator,
type WorkspaceId,
type WorkspaceIdWithUrl,
type Branding
type WorkspaceIdWithUrl
} from '@hcengineering/core'
import type { BlobLookup } from '@hcengineering/core/src/classes'
import { type Readable } from 'stream'
@ -103,6 +104,8 @@ export interface StorageAdapterEx extends StorageAdapter {
objectName: string,
provider?: string
) => Promise<void>
find: (ctx: MeasureContext, workspaceId: WorkspaceId) => StorageIterator
}
/**
@ -120,6 +123,13 @@ export class DummyStorageAdapter implements StorageAdapter, StorageAdapterEx {
return false
}
find (ctx: MeasureContext, workspaceId: WorkspaceId): StorageIterator {
return {
next: async (ctx) => undefined,
close: async (ctx) => {}
}
}
async listBuckets (ctx: MeasureContext, productId: string): Promise<BucketInfo[]> {
return []
}

View File

@ -141,7 +141,7 @@ export const ImageNode = Node.create<ImageOptions>({
if (fileId != null) {
const setBrokenImg = setTimeout(() => {
imgElement.src = this.options.loadingImgSrc ?? `platform://platform/files/workspace/?file=${fileId}`
}, 200)
}, 500)
if (fileId != null) {
void this.options.getBlobRef(fileId).then((val) => {
clearTimeout(setBrokenImg)

View File

@ -1,81 +1,27 @@
<script lang="ts">
import contact, { PersonAccount } from '@hcengineering/contact'
import { Metrics, systemAccountEmail } from '@hcengineering/core'
import login from '@hcengineering/login'
import { getEmbeddedLabel, getMetadata } from '@hcengineering/platform'
import presentation, { createQuery, isAdminUser } from '@hcengineering/presentation'
import {
Button,
CheckBox,
IconArrowRight,
Loading,
Panel,
TabItem,
TabList,
fetchMetadataLocalStorage,
ticker
} from '@hcengineering/ui'
import EditBox from '@hcengineering/ui/src/components/EditBox.svelte'
import Expandable from '@hcengineering/ui/src/components/Expandable.svelte'
import { ObjectPresenter } from '@hcengineering/view-resources'
import { onDestroy } from 'svelte'
import { workspacesStore } from '../utils'
import MetricsInfo from './statistics/MetricsInfo.svelte'
import ServerManagerCollaboratorStatistics from './ServerManagerCollaboratorStatistics.svelte'
const _endpoint: string = fetchMetadataLocalStorage(login.metadata.LoginEndpoint) ?? ''
const token: string = getMetadata(presentation.metadata.Token) ?? ''
import ServerManagerFrontStatistics from './ServerManagerFrontStatistics.svelte'
let endpoint = _endpoint.replace(/^ws/g, 'http')
if (endpoint.endsWith('/')) {
endpoint = endpoint.substring(0, endpoint.length - 1)
}
import ServerManagerServerStatistics from './ServerManagerServerStatistics.svelte'
async function fetchStats (): Promise<void> {
await fetch(endpoint + `/api/v1/statistics?token=${token}`, {})
.then(async (json) => {
data = await json.json()
admin = data?.admin ?? false
})
.catch((err) => {
console.error(err)
})
}
async function fetchUIStats (): Promise<void> {
await fetch(`/api/v1/statistics?token=${token}`, {})
.then(async (json) => {
dataFront = await json.json()
})
.catch((err) => {
console.error(err)
})
}
async function fetchCollabStats (): Promise<void> {
const collaborator = getMetadata(presentation.metadata.CollaboratorApiUrl)
await fetch(collaborator + `/api/v1/statistics?token=${token}`, {})
.then(async (json) => {
dataCollab = await json.json()
})
.catch((err) => {
console.error(err)
})
}
import ServerManagerUsers from './ServerManagerUsers.svelte'
import ServerManagerGeneral from './ServerManagerGeneral.svelte'
import { getEmbeddedLabel } from '@hcengineering/platform'
import { Panel, TabItem, TabList } from '@hcengineering/ui'
import ServerManagerAccountStatistics from './ServerManagerAccountStatistics.svelte'
let data: any
let dataFront: any
let dataCollab: any
let admin = false
onDestroy(
ticker.subscribe(() => {
void fetchStats()
void fetchUIStats()
void fetchCollabStats()
})
)
const tabs: TabItem[] = [
{
id: 'general',
labelIntl: getEmbeddedLabel('General')
},
{
id: 'account',
labelIntl: getEmbeddedLabel('Account')
},
{
id: 'statistics',
labelIntl: getEmbeddedLabel('Server')
@ -94,78 +40,9 @@
}
]
let selectedTab: string = tabs[0].id
interface StatisticsElement {
find: number
tx: number
}
$: activeSessions =
(data?.statistics?.activeSessions as Record<
string,
{
sessions: Array<{
userId: string
data?: Record<string, any>
total: StatisticsElement
mins5: StatisticsElement
current: StatisticsElement
}>
name: string
wsId: string
sessionsTotal: number
upgrading: boolean
closing: boolean
}
>) ?? {}
const employeeQuery = createQuery()
let employees = new Map<string, PersonAccount>()
employeeQuery.query(contact.class.PersonAccount, {}, (res) => {
const emp = new Map<string, PersonAccount>()
for (const r of res) {
emp.set(r.email, r)
}
employees = emp
})
let warningTimeout = 15
$: metricsData = data?.metrics as Metrics | undefined
$: metricsDataFront = dataFront?.metrics as Metrics | undefined
$: metricsDataCollab = dataCollab?.metrics as Metrics | undefined
$: totalStats = Array.from(Object.entries(activeSessions).values()).reduce(
(cur, it) => {
const totalFind = it[1].sessions.reduce((it, itm) => itm.current.find + it, 0)
const totalTx = it[1].sessions.reduce((it, itm) => itm.current.tx + it, 0)
return {
find: cur.find + totalFind,
tx: cur.tx + totalTx
}
},
{ find: 0, tx: 0 }
)
let realUsers: boolean
</script>
<Panel on:close isFullSize useMaxWidth={true}>
<svelte:fragment slot="header">
{#if data}
<div class="flex-col">
<span>
Mem: {data.statistics.memoryUsed} / {data.statistics.memoryTotal} CPU: {data.statistics.cpuUsage}
</span>
<span>
TotalFind: {totalStats.find} / Total Tx: {totalStats.tx}
</span>
</div>
{/if}
</svelte:fragment>
<svelte:fragment slot="title">
<span class="p-3"> Server manager </span>
<TabList
@ -177,185 +54,18 @@
}}
/>
</svelte:fragment>
{#if data}
{#if selectedTab === 'general'}
{#if admin}
<div class="flex flex-col">
<div class="flex-row-center p-1">
<div class="p-3">1.</div>
<Button
icon={IconArrowRight}
label={getEmbeddedLabel('Set maintenance warning')}
on:click={() => {
void fetch(endpoint + `/api/v1/manage?token=${token}&operation=maintenance&timeout=${warningTimeout}`, {
method: 'PUT'
})
}}
/>
<div class="flex-col p-1">
<div class="flex-row-center p-1">
<EditBox kind={'underline'} format={'number'} bind:value={warningTimeout} /> min
</div>
</div>
</div>
<div class="flex-row-center p-1">
<div class="p-3">2.</div>
<Button
icon={IconArrowRight}
label={getEmbeddedLabel('Reboot server')}
on:click={() => {
void fetch(endpoint + `/api/v1/manage?token=${token}&operation=reboot`, {
method: 'PUT'
})
}}
/>
</div>
</div>
{/if}
{:else if selectedTab === 'users'}
<div class="ml-4 p-1 flex-row-center">
<CheckBox bind:checked={realUsers} />
<div class="ml-1">Show only users</div>
</div>
<div class="flex-column p-3 h-full" style:overflow="auto">
{#each Object.entries(activeSessions) as act}
{@const wsInstance = $workspacesStore.find((it) => it.workspaceId === act[0])}
{@const totalFind = act[1].sessions.reduce((it, itm) => itm.current.find + it, 0)}
{@const totalTx = act[1].sessions.reduce((it, itm) => itm.current.tx + it, 0)}
{@const employeeGroups = Array.from(new Set(act[1].sessions.map((it) => it.userId))).filter(
(it) => systemAccountEmail !== it || !realUsers
)}
{@const realGroup = Array.from(new Set(act[1].sessions.map((it) => it.userId))).filter(
(it) => systemAccountEmail !== it
)}
{#if employeeGroups.length > 0}
<span class="flex-col">
<Expandable contentColor expanded={false} expandable={true} bordered>
<svelte:fragment slot="title">
<div class="flex flex-row-center flex-between flex-grow p-1">
<div class="fs-title" class:greyed={realGroup.length === 0}>
Workspace: {wsInstance?.workspaceName ?? act[0]}: {employeeGroups.length} current 5 mins => {totalFind}/{totalTx}
{#if act[1].upgrading}
(Upgrading)
{/if}
{#if act[1].closing}
(Closing)
{/if}
</div>
{#if isAdminUser()}
<Button
label={getEmbeddedLabel('Force close')}
size={'small'}
kind={'ghost'}
on:click={() => {
void fetch(
endpoint + `/api/v1/manage?token=${token}&operation=force-close&wsId=${act[1].wsId}`,
{
method: 'PUT'
}
)
}}
/>
{/if}
</div>
</svelte:fragment>
<div class="flex-col">
{#each employeeGroups as employeeId}
{@const employee = employees.get(employeeId)}
{@const connections = act[1].sessions.filter((it) => it.userId === employeeId)}
{@const find = connections.reduce((it, itm) => itm.current.find + it, 0)}
{@const txes = connections.reduce((it, itm) => itm.current.tx + it, 0)}
<div class="p-1 flex-col ml-4">
<Expandable>
<svelte:fragment slot="title">
<div class="flex-row-center p-1">
{#if employee}
<ObjectPresenter
_class={contact.mixin.Employee}
objectId={employee.person}
props={{ shouldShowAvatar: true, disabled: true }}
/>
{:else}
{employeeId}
{/if}
: {connections.length}
<div class="ml-4">
<div class="ml-1">{find}/{txes}</div>
</div>
</div>
</svelte:fragment>
{#each connections as user, i}
<div class="flex-row-center ml-10">
#{i}
{user.userId}
<div class="p-1">
Total: {user.total.find}/{user.total.tx}
</div>
<div class="p-1">
Previous 5 mins: {user.mins5.find}/{user.mins5.tx}
</div>
<div class="p-1">
Current 5 mins: {user.current.find}/{user.current.tx}
</div>
</div>
<div class="p-1 flex-col ml-10">
{#each Object.entries(user.data ?? {}) as [k, v]}
<div class="p-1">
{k}: {JSON.stringify(v)}
</div>
{/each}
</div>
{/each}
</Expandable>
</div>
{/each}
</div>
</Expandable>
</span>
{/if}
{/each}
</div>
{:else if selectedTab === 'statistics'}
{#if admin}
<div class="flex flex-col">
<div class="flex-row-center p-1">
<div class="p-3">1.</div>
<Button
icon={IconArrowRight}
label={getEmbeddedLabel('Wipe statistics')}
on:click={() => {
void fetch(endpoint + `/api/v1/manage?token=${token}&operation=wipe-statistics`, {
method: 'PUT'
}).then(async () => {
await fetchStats()
})
}}
/>
</div>
</div>
{/if}
<div class="flex-column p-3 h-full" style:overflow="auto">
{#if metricsData !== undefined}
<MetricsInfo metrics={metricsData} />
{/if}
</div>
{:else if selectedTab === 'statistics-front'}
<div class="flex-column p-3 h-full" style:overflow="auto">
{#if metricsDataFront !== undefined}
<MetricsInfo metrics={metricsDataFront} />
{/if}
</div>
{:else if selectedTab === 'statistics-collab'}
<div class="flex-column p-3 h-full" style:overflow="auto">
{#if metricsDataCollab !== undefined}
<MetricsInfo metrics={metricsDataCollab} />
{/if}
</div>
{/if}
{:else}
<Loading />
{#if selectedTab === 'general'}
<ServerManagerGeneral />
{:else if selectedTab === 'users'}
<ServerManagerUsers />
{:else if selectedTab === 'statistics'}
<ServerManagerServerStatistics />
{:else if selectedTab === 'statistics-front'}
<ServerManagerFrontStatistics />
{:else if selectedTab === 'statistics-collab'}
<ServerManagerCollaboratorStatistics />
{:else if selectedTab === 'account'}
<ServerManagerAccountStatistics />
{/if}
</Panel>

View File

@ -0,0 +1,65 @@
<script lang="ts">
import { Metrics } from '@hcengineering/core'
import login from '@hcengineering/login'
import { getEmbeddedLabel, getMetadata } from '@hcengineering/platform'
import presentation from '@hcengineering/presentation'
import { Button, IconArrowRight, ticker } from '@hcengineering/ui'
import MetricsInfo from './statistics/MetricsInfo.svelte'
const endpoint: string = getMetadata(login.metadata.AccountsUrl) ?? ''
const token: string = getMetadata(presentation.metadata.Token) ?? ''
async function fetchStats (time: number): Promise<void> {
await fetch(endpoint + `/api/v1/statistics?token=${token}`, {})
.then(async (json) => {
data = await json.json()
admin = data?.admin ?? false
})
.catch((err) => {
console.error(err)
})
}
let data: any
let admin = false
$: void fetchStats($ticker)
$: metricsData = data?.metrics as Metrics | undefined
</script>
{#if data}
<div class="flex-col p-4">
<span>
Mem: {data?.statistics?.memoryUsed} / {data?.statistics?.memoryTotal} CPU: {data?.statistics?.cpuUsage}
</span>
</div>
{/if}
{#if admin}
<div class="flex flex-col">
<div class="flex-row-center p-1">
<div class="p-3">1.</div>
<Button
icon={IconArrowRight}
label={getEmbeddedLabel('Wipe statistics')}
on:click={() => {
void fetch(endpoint + `/api/v1/manage?token=${token}&operation=wipe-statistics`, {
method: 'PUT'
}).then(async () => {
await fetchStats(0)
})
}}
/>
</div>
</div>
{/if}
<div class="flex-column p-3 h-full" style:overflow="auto">
{#if metricsData !== undefined}
<MetricsInfo metrics={metricsData} />
{/if}
</div>
<style lang="scss">
.greyed {
color: rgba(black, 0.5);
}
</style>

View File

@ -0,0 +1,36 @@
<script lang="ts">
import { Metrics } from '@hcengineering/core'
import { getMetadata } from '@hcengineering/platform'
import presentation from '@hcengineering/presentation'
import { ticker } from '@hcengineering/ui'
import MetricsInfo from './statistics/MetricsInfo.svelte'
const token: string = getMetadata(presentation.metadata.Token) ?? ''
async function fetchCollabStats (tick: number): Promise<void> {
const collaborator = getMetadata(presentation.metadata.CollaboratorApiUrl)
await fetch(collaborator + `/api/v1/statistics?token=${token}`, {})
.then(async (json) => {
dataCollab = await json.json()
})
.catch((err) => {
console.error(err)
})
}
let dataCollab: any
$: void fetchCollabStats($ticker)
$: metricsDataCollab = dataCollab?.metrics as Metrics | undefined
</script>
<div class="flex-column p-3 h-full" style:overflow="auto">
{#if metricsDataCollab !== undefined}
<MetricsInfo metrics={metricsDataCollab} />
{/if}
</div>
<style lang="scss">
.greyed {
color: rgba(black, 0.5);
}
</style>

View File

@ -0,0 +1,35 @@
<script lang="ts">
import { Metrics } from '@hcengineering/core'
import { getMetadata } from '@hcengineering/platform'
import presentation from '@hcengineering/presentation'
import { ticker } from '@hcengineering/ui'
import MetricsInfo from './statistics/MetricsInfo.svelte'
const token: string = getMetadata(presentation.metadata.Token) ?? ''
async function fetchUIStats (time: number): Promise<void> {
await fetch(`/api/v1/statistics?token=${token}`, {})
.then(async (json) => {
dataFront = await json.json()
})
.catch((err) => {
console.error(err)
})
}
let dataFront: any
$: void fetchUIStats($ticker)
$: metricsDataFront = dataFront?.metrics as Metrics | undefined
</script>
<div class="flex-column p-3 h-full" style:overflow="auto">
{#if metricsDataFront !== undefined}
<MetricsInfo metrics={metricsDataFront} />
{/if}
</div>
<style lang="scss">
.greyed {
color: rgba(black, 0.5);
}
</style>

View File

@ -0,0 +1,57 @@
<script lang="ts">
import login from '@hcengineering/login'
import { getEmbeddedLabel, getMetadata } from '@hcengineering/platform'
import presentation, { isAdminUser } from '@hcengineering/presentation'
import { Button, IconArrowRight, fetchMetadataLocalStorage } from '@hcengineering/ui'
import EditBox from '@hcengineering/ui/src/components/EditBox.svelte'
const _endpoint: string = fetchMetadataLocalStorage(login.metadata.LoginEndpoint) ?? ''
const token: string = getMetadata(presentation.metadata.Token) ?? ''
let endpoint = _endpoint.replace(/^ws/g, 'http')
if (endpoint.endsWith('/')) {
endpoint = endpoint.substring(0, endpoint.length - 1)
}
let warningTimeout = 15
</script>
{#if isAdminUser()}
<div class="flex flex-col">
<div class="flex-row-center p-1">
<div class="p-3">1.</div>
<Button
icon={IconArrowRight}
label={getEmbeddedLabel('Set maintenance warning')}
on:click={() => {
void fetch(endpoint + `/api/v1/manage?token=${token}&operation=maintenance&timeout=${warningTimeout}`, {
method: 'PUT'
})
}}
/>
<div class="flex-col p-1">
<div class="flex-row-center p-1">
<EditBox kind={'underline'} format={'number'} bind:value={warningTimeout} /> min
</div>
</div>
</div>
<div class="flex-row-center p-1">
<div class="p-3">2.</div>
<Button
icon={IconArrowRight}
label={getEmbeddedLabel('Reboot server')}
on:click={() => {
void fetch(endpoint + `/api/v1/manage?token=${token}&operation=reboot`, {
method: 'PUT'
})
}}
/>
</div>
</div>
{/if}
<style lang="scss">
.greyed {
color: rgba(black, 0.5);
}
</style>

View File

@ -0,0 +1,109 @@
<script lang="ts">
import { Metrics } from '@hcengineering/core'
import login from '@hcengineering/login'
import { getEmbeddedLabel, getMetadata } from '@hcengineering/platform'
import presentation from '@hcengineering/presentation'
import { Button, IconArrowRight, fetchMetadataLocalStorage, ticker } from '@hcengineering/ui'
import MetricsInfo from './statistics/MetricsInfo.svelte'
const _endpoint: string = fetchMetadataLocalStorage(login.metadata.LoginEndpoint) ?? ''
const token: string = getMetadata(presentation.metadata.Token) ?? ''
let endpoint = _endpoint.replace(/^ws/g, 'http')
if (endpoint.endsWith('/')) {
endpoint = endpoint.substring(0, endpoint.length - 1)
}
async function fetchStats (time: number): Promise<void> {
await fetch(endpoint + `/api/v1/statistics?token=${token}`, {})
.then(async (json) => {
data = await json.json()
admin = data?.admin ?? false
})
.catch((err) => {
console.error(err)
})
}
let data: any
let admin = false
$: void fetchStats($ticker)
$: metricsData = data?.metrics as Metrics | undefined
interface StatisticsElement {
find: number
tx: number
}
$: activeSessions =
(data?.statistics?.activeSessions as Record<
string,
{
sessions: Array<{
userId: string
data?: Record<string, any>
total: StatisticsElement
mins5: StatisticsElement
current: StatisticsElement
}>
name: string
wsId: string
sessionsTotal: number
upgrading: boolean
closing: boolean
}
>) ?? {}
$: totalStats = Array.from(Object.entries(activeSessions).values()).reduce(
(cur, it) => {
const totalFind = it[1].sessions.reduce((it, itm) => itm.current.find + it, 0)
const totalTx = it[1].sessions.reduce((it, itm) => itm.current.tx + it, 0)
return {
find: cur.find + totalFind,
tx: cur.tx + totalTx
}
},
{ find: 0, tx: 0 }
)
</script>
{#if data}
<div class="flex-col p-4">
<span>
Mem: {data.statistics.memoryUsed} / {data.statistics.memoryTotal} CPU: {data.statistics.cpuUsage}
</span>
<span>
TotalFind: {totalStats.find} / Total Tx: {totalStats.tx}
</span>
</div>
{/if}
{#if admin}
<div class="flex flex-col">
<div class="flex-row-center p-1">
<div class="p-3">1.</div>
<Button
icon={IconArrowRight}
label={getEmbeddedLabel('Wipe statistics')}
on:click={() => {
void fetch(endpoint + `/api/v1/manage?token=${token}&operation=wipe-statistics`, {
method: 'PUT'
}).then(async () => {
await fetchStats(0)
})
}}
/>
</div>
</div>
{/if}
<div class="flex-column p-3 h-full" style:overflow="auto">
{#if metricsData !== undefined}
<MetricsInfo metrics={metricsData} />
{/if}
</div>
<style lang="scss">
.greyed {
color: rgba(black, 0.5);
}
</style>

View File

@ -0,0 +1,181 @@
<script lang="ts">
import contact, { PersonAccount } from '@hcengineering/contact'
import { systemAccountEmail } from '@hcengineering/core'
import login from '@hcengineering/login'
import { getEmbeddedLabel, getMetadata } from '@hcengineering/platform'
import presentation, { createQuery, isAdminUser } from '@hcengineering/presentation'
import { Button, CheckBox, fetchMetadataLocalStorage, ticker } from '@hcengineering/ui'
import Expandable from '@hcengineering/ui/src/components/Expandable.svelte'
import { ObjectPresenter } from '@hcengineering/view-resources'
import { workspacesStore } from '../utils'
const _endpoint: string = fetchMetadataLocalStorage(login.metadata.LoginEndpoint) ?? ''
const token: string = getMetadata(presentation.metadata.Token) ?? ''
let endpoint = _endpoint.replace(/^ws/g, 'http')
if (endpoint.endsWith('/')) {
endpoint = endpoint.substring(0, endpoint.length - 1)
}
async function fetchStats (time: number): Promise<void> {
await fetch(endpoint + `/api/v1/statistics?token=${token}`, {})
.then(async (json) => {
data = await json.json()
})
.catch((err) => {
console.error(err)
})
}
let data: any
$: void fetchStats($ticker)
interface StatisticsElement {
find: number
tx: number
}
$: activeSessions =
(data?.statistics?.activeSessions as Record<
string,
{
sessions: Array<{
userId: string
data?: Record<string, any>
total: StatisticsElement
mins5: StatisticsElement
current: StatisticsElement
}>
name: string
wsId: string
sessionsTotal: number
upgrading: boolean
closing: boolean
}
>) ?? {}
const employeeQuery = createQuery()
let employees = new Map<string, PersonAccount>()
employeeQuery.query(contact.class.PersonAccount, {}, (res) => {
const emp = new Map<string, PersonAccount>()
for (const r of res) {
emp.set(r.email, r)
}
employees = emp
})
let realUsers: boolean
</script>
<div class="p-6">
<div class="flex-row-center">
Uniq users: {Object.keys(activeSessions).length}
</div>
<div class="flex-row-center">
<CheckBox bind:checked={realUsers} />
<div class="ml-1">Show only users</div>
</div>
</div>
<div class="flex-column p-3 h-full" style:overflow="auto">
{#each Object.entries(activeSessions) as act}
{@const wsInstance = $workspacesStore.find((it) => it.workspaceId === act[0])}
{@const totalFind = act[1].sessions.reduce((it, itm) => itm.current.find + it, 0)}
{@const totalTx = act[1].sessions.reduce((it, itm) => itm.current.tx + it, 0)}
{@const employeeGroups = Array.from(new Set(act[1].sessions.map((it) => it.userId))).filter(
(it) => systemAccountEmail !== it || !realUsers
)}
{@const realGroup = Array.from(new Set(act[1].sessions.map((it) => it.userId))).filter(
(it) => systemAccountEmail !== it
)}
{#if employeeGroups.length > 0}
<span class="flex-col">
<Expandable contentColor expanded={false} expandable={true} bordered>
<svelte:fragment slot="title">
<div class="flex flex-row-center flex-between flex-grow p-1">
<div class="fs-title" class:greyed={realGroup.length === 0}>
Workspace: {wsInstance?.workspaceName ?? act[0]}: {employeeGroups.length} current 5 mins => {totalFind}/{totalTx}
{#if act[1].upgrading}
(Upgrading)
{/if}
{#if act[1].closing}
(Closing)
{/if}
</div>
{#if isAdminUser()}
<Button
label={getEmbeddedLabel('Force close')}
size={'small'}
kind={'ghost'}
on:click={() => {
void fetch(endpoint + `/api/v1/manage?token=${token}&operation=force-close&wsId=${act[1].wsId}`, {
method: 'PUT'
})
}}
/>
{/if}
</div>
</svelte:fragment>
<div class="flex-col">
{#each employeeGroups as employeeId}
{@const employee = employees.get(employeeId)}
{@const connections = act[1].sessions.filter((it) => it.userId === employeeId)}
{@const find = connections.reduce((it, itm) => itm.current.find + it, 0)}
{@const txes = connections.reduce((it, itm) => itm.current.tx + it, 0)}
<div class="p-1 flex-col ml-4">
<Expandable>
<svelte:fragment slot="title">
<div class="flex-row-center p-1">
{#if employee}
<ObjectPresenter
_class={contact.mixin.Employee}
objectId={employee.person}
props={{ shouldShowAvatar: true, disabled: true }}
/>
{:else}
{employeeId}
{/if}
: {connections.length}
<div class="ml-4">
<div class="ml-1">{find}/{txes}</div>
</div>
</div>
</svelte:fragment>
{#each connections as user, i}
<div class="flex-row-center ml-10">
#{i}
{user.userId}
<div class="p-1">
Total: {user.total.find}/{user.total.tx}
</div>
<div class="p-1">
Previous 5 mins: {user.mins5.find}/{user.mins5.tx}
</div>
<div class="p-1">
Current 5 mins: {user.current.find}/{user.current.tx}
</div>
</div>
<div class="p-1 flex-col ml-10">
{#each Object.entries(user.data ?? {}) as [k, v]}
<div class="p-1">
{k}: {JSON.stringify(v)}
</div>
{/each}
</div>
{/each}
</Expandable>
</div>
{/each}
</div>
</Expandable>
</span>
{/if}
{/each}
</div>
<style lang="scss">
.greyed {
color: rgba(black, 0.5);
}
</style>

View File

@ -13,10 +13,17 @@ import accountEn from '@hcengineering/account/lang/en.json'
import accountRu from '@hcengineering/account/lang/ru.json'
import { Analytics } from '@hcengineering/analytics'
import { registerProviders } from '@hcengineering/auth-providers'
import { type Data, type MeasureContext, type Tx, type Version, type BrandingMap } from '@hcengineering/core'
import {
metricsAggregate,
type BrandingMap,
type Data,
type MeasureContext,
type Tx,
type Version
} from '@hcengineering/core'
import { type MigrateOperation } from '@hcengineering/model'
import platform, { Severity, Status, addStringsLoader, setMetadata } from '@hcengineering/platform'
import serverToken from '@hcengineering/server-token'
import serverToken, { decodeToken } from '@hcengineering/server-token'
import toolPlugin from '@hcengineering/server-tool'
import cors from '@koa/cors'
import { type IncomingHttpHeaders } from 'http'
@ -24,6 +31,7 @@ import Koa from 'koa'
import bodyParser from 'koa-bodyparser'
import Router from 'koa-router'
import { MongoClient } from 'mongodb'
import os from 'os'
/**
* @public
@ -107,15 +115,17 @@ export function serveAccount (
const performUpgrade = (process.env.PERFORM_UPGRADE ?? 'true') === 'true'
if (performUpgrade) {
worker = new UpgradeWorker(db, p, version, txes, migrateOperations, productId)
await worker.upgradeAll(measureCtx, {
errorHandler: async (ws, err) => {
Analytics.handleError(err)
},
force: false,
console: false,
logs: 'upgrade-logs',
parallel: parseInt(process.env.PARALLEL ?? '1')
await measureCtx.with('upgrade-all-models', {}, async (ctx) => {
worker = new UpgradeWorker(db, p, version, txes, migrateOperations, productId)
await worker.upgradeAll(ctx, {
errorHandler: async (ws, err) => {
Analytics.handleError(err)
},
force: false,
console: false,
logs: 'upgrade-logs',
parallel: parseInt(process.env.PARALLEL ?? '1')
})
})
}
})
@ -128,6 +138,32 @@ export function serveAccount (
}
}
router.get('/api/v1/statistics', (req, res) => {
try {
const token = req.query.token as string
const payload = decodeToken(token)
const admin = payload.extra?.admin === 'true'
const data: Record<string, any> = {
metrics: admin ? metricsAggregate((measureCtx as any).metrics) : {},
statistics: {}
}
data.statistics.totalClients = 0
data.statistics.memoryUsed = Math.round((process.memoryUsage().heapUsed / 1024 / 1024) * 100) / 100
data.statistics.memoryTotal = Math.round((process.memoryUsage().heapTotal / 1024 / 1024) * 100) / 100
data.statistics.cpuUsage = Math.round(os.loadavg()[0] * 100) / 100
data.statistics.freeMem = Math.round((os.freemem() / 1024 / 1024) * 100) / 100
data.statistics.totalMem = Math.round((os.totalmem() / 1024 / 1024) * 100) / 100
const json = JSON.stringify(data)
req.res.writeHead(200, { 'Content-Type': 'application/json' })
req.res.end(json)
} catch (err: any) {
Analytics.handleError(err)
console.error(err)
req.res.writeHead(404, {})
req.res.end()
}
})
router.post('rpc', '/', async (ctx) => {
const token = extractToken(ctx.request.headers)
@ -153,7 +189,11 @@ export function serveAccount (
host = new URL(origin).host
}
const branding = host !== undefined ? brandings[host] : null
const result = await method(measureCtx, db, productId, branding, request, token)
const result = await measureCtx.with(
request.method,
{},
async (ctx) => await method(ctx, db, productId, branding, request, token)
)
worker?.updateResponseStatistics(result)
ctx.body = result

View File

@ -46,10 +46,11 @@ import core, {
type Branding
} from '@hcengineering/core'
import { consoleModelLogger, MigrateOperation, ModelLogger } from '@hcengineering/model'
import { getModelVersion } from '@hcengineering/model-all'
import platform, { getMetadata, PlatformError, Severity, Status, translate } from '@hcengineering/platform'
import { cloneWorkspace } from '@hcengineering/server-backup'
import { decodeToken, generateToken } from '@hcengineering/server-token'
import toolPlugin, { connect, initModel, upgradeModel } from '@hcengineering/server-tool'
import toolPlugin, { connect, initModel, upgradeModel, getStorageAdapter } from '@hcengineering/server-tool'
import { pbkdf2Sync, randomBytes } from 'crypto'
import { Binary, Db, Filter, ObjectId, type MongoClient } from 'mongodb'
import fetch from 'node-fetch'
@ -930,7 +931,7 @@ export async function createWorkspace (
await updateInfo({ createProgress: 10 })
return await rateLimiter.exec(async () => {
const childLogger = ctx.newChild('createWorkspace', { workspace: workspaceInfo.workspace })
const childLogger = ctx.newChild('createUserWorkspace', {}, { workspace: workspaceInfo.workspace })
const ctxModellogger: ModelLogger = {
log: (msg, data) => {
childLogger.info(msg, data)
@ -945,19 +946,23 @@ export async function createWorkspace (
const wsId = getWorkspaceId(workspaceInfo.workspace, productId)
// We should not try to clone INIT_WS into INIT_WS during it's creation.
if (
initWS !== undefined &&
(await getWorkspaceById(db, productId, initWS)) !== null &&
initWS !== workspaceInfo.workspace
) {
let initWSInfo: Workspace | undefined
if (initWS !== undefined) {
initWSInfo = (await getWorkspaceById(db, productId, initWS)) ?? undefined
}
if (initWS !== undefined && initWSInfo !== undefined && initWS !== workspaceInfo.workspace) {
// Just any valid model for transactor to be able to function
await initModel(ctx, getTransactor(), wsId, txes, [], ctxModellogger, async (value) => {
await updateInfo({ createProgress: Math.round((Math.min(value, 100) / 100) * 20) })
await childLogger.with('init-model', {}, async (ctx) => {
await initModel(ctx, getTransactor(), wsId, txes, [], ctxModellogger, async (value) => {
await updateInfo({ createProgress: Math.round((Math.min(value, 100) / 100) * 20) })
})
})
await updateInfo({ createProgress: 20 })
// Clone init workspace.
await cloneWorkspace(
childLogger,
getTransactor(),
getWorkspaceId(initWS, productId),
getWorkspaceId(workspaceInfo.workspace, productId),
@ -965,25 +970,38 @@ export async function createWorkspace (
async (value) => {
await updateInfo({ createProgress: 20 + Math.round((Math.min(value, 100) / 100) * 30) })
},
true
)
await updateInfo({ createProgress: 50 })
model = await upgradeModel(
ctx,
getTransactor(),
wsId,
txes,
migrationOperation,
ctxModellogger,
true,
async (value) => {
await updateInfo({ createProgress: Math.round(50 + (Math.min(value, 100) / 100) * 40) })
}
getStorageAdapter()
)
const modelVersion = getModelVersion()
await updateInfo({ createProgress: 50 })
// Skip tx update if version of init workspace are proper one.
const skipTxUpdate =
versionToString(modelVersion) === versionToString(initWSInfo.version ?? { major: 0, minor: 0, patch: 0 })
model = await childLogger.withLog(
'upgrade-model',
{},
async (ctx) =>
await upgradeModel(
ctx,
getTransactor(),
wsId,
txes,
migrationOperation,
ctxModellogger,
skipTxUpdate,
async (value) => {
await updateInfo({ createProgress: Math.round(50 + (Math.min(value, 100) / 100) * 40) })
}
)
)
await updateInfo({ createProgress: 90 })
} else {
await initModel(ctx, getTransactor(), wsId, txes, migrationOperation, ctxModellogger, async (value) => {
await updateInfo({ createProgress: Math.round(Math.min(value, 100)) })
await childLogger.withLog('init-workspace', {}, async (ctx) => {
await initModel(ctx, getTransactor(), wsId, txes, migrationOperation, ctxModellogger, async (value) => {
await updateInfo({ createProgress: Math.round(Math.min(value, 100)) })
})
})
}
} catch (err: any) {
@ -992,7 +1010,9 @@ export async function createWorkspace (
}
if (postInitHandler !== undefined) {
await postInitHandler?.(workspaceInfo, model)
await ctx.withLog('post-handler', {}, async (ctx) => {
await postInitHandler?.(workspaceInfo, model)
})
}
childLogger.end()

View File

@ -128,7 +128,11 @@ export class UpgradeWorker {
}
async upgradeAll (ctx: MeasureContext, opt: UpgradeOptions): Promise<void> {
const workspaces = await listWorkspacesRaw(this.db, this.productId)
const workspaces = await ctx.with(
'retrieve-workspaces',
{},
async (ctx) => await listWorkspacesRaw(this.db, this.productId)
)
workspaces.sort((a, b) => b.lastVisit - a.lastVisit)
// We need to update workspaces with missing workspaceUrl
@ -157,7 +161,7 @@ export class UpgradeWorker {
for (const it of workspaces) {
await rateLimit.add(async () => {
try {
await ctx.with('do-upgrade', {}, async () => {
await ctx.with('do-upgrade', {}, async (ctx) => {
await this._upgradeWorkspace(ctx, it, opt)
})
} catch (err: any) {

View File

@ -33,7 +33,9 @@ import core, {
type Blob,
type DocIndexState
} from '@hcengineering/core'
import type { StorageAdapter } from '@hcengineering/server-core'
import { BlobClient, connect } from '@hcengineering/server-tool'
import { PassThrough } from 'node:stream'
import { createGzip } from 'node:zlib'
import { join } from 'path'
import { Writable } from 'stream'
@ -221,184 +223,255 @@ async function writeChanges (storage: BackupStorage, snapshot: string, changes:
* @public
*/
export async function cloneWorkspace (
ctx: MeasureContext,
transactorUrl: string,
sourceWorkspaceId: WorkspaceId,
targetWorkspaceId: WorkspaceId,
clearTime: boolean = true,
progress: (value: number) => Promise<void>,
skipFullText: boolean
skipFullText: boolean,
storageAdapter?: StorageAdapter
): Promise<void> {
const sourceConnection = (await connect(transactorUrl, sourceWorkspaceId, undefined, {
mode: 'backup'
})) as unknown as CoreClient & BackupClient
const targetConnection = (await connect(transactorUrl, targetWorkspaceId, undefined, {
mode: 'backup',
model: 'upgrade',
admin: 'true'
})) as unknown as CoreClient & BackupClient
await ctx.with(
'clone-workspace',
{},
async (ctx) => {
const sourceConnection = await ctx.with(
'connect-source',
{},
async (ctx) =>
(await connect(transactorUrl, sourceWorkspaceId, undefined, {
mode: 'backup'
})) as unknown as CoreClient & BackupClient
)
const targetConnection = await ctx.with(
'connect-target',
{},
async (ctx) =>
(await connect(transactorUrl, targetWorkspaceId, undefined, {
mode: 'backup',
model: 'upgrade',
admin: 'true'
})) as unknown as CoreClient & BackupClient
)
const blobClientSource = new BlobClient(transactorUrl, sourceWorkspaceId)
const blobClientTarget = new BlobClient(transactorUrl, targetWorkspaceId)
const blobClientSource = new BlobClient(transactorUrl, sourceWorkspaceId)
const blobClientTarget = new BlobClient(transactorUrl, targetWorkspaceId)
try {
const domains = sourceConnection
.getHierarchy()
.domains()
.filter((it) => it !== DOMAIN_TRANSIENT && it !== DOMAIN_MODEL)
try {
const domains = sourceConnection
.getHierarchy()
.domains()
.filter((it) => it !== DOMAIN_TRANSIENT && it !== DOMAIN_MODEL)
let i = 0
for (const c of domains) {
if (skipFullText && c === DOMAIN_FULLTEXT_BLOB) {
console.log('clone skip domain...', c)
continue
}
console.log('clone domain...', c)
// We need to clean target connection before copying something.
await cleanDomain(targetConnection, c)
const changes: Snapshot = {
added: new Map(),
updated: new Map(),
removed: []
}
let idx: number | undefined
// update digest tar
const needRetrieveChunks: Ref<Doc>[][] = []
let processed = 0
let st = Date.now()
// Load all digest from collection.
while (true) {
try {
const it = await sourceConnection.loadChunk(c, idx)
idx = it.idx
let needRetrieve: Ref<Doc>[] = []
let needRetrieveSize = 0
for (const { id, hash, size } of it.docs) {
processed++
if (Date.now() - st > 2500) {
console.log('processed', processed, Date.now() - st)
st = Date.now()
}
changes.added.set(id as Ref<Doc>, hash)
needRetrieve.push(id as Ref<Doc>)
needRetrieveSize += size
if (needRetrieveSize > retrieveChunkSize) {
needRetrieveChunks.push(needRetrieve)
needRetrieveSize = 0
needRetrieve = []
}
let i = 0
for (const c of domains) {
if (skipFullText && c === DOMAIN_FULLTEXT_BLOB) {
ctx.info('clone skip domain...', { domain: c, workspace: targetWorkspaceId.name })
continue
}
if (needRetrieve.length > 0) {
needRetrieveChunks.push(needRetrieve)
}
if (it.finished) {
await sourceConnection.closeChunk(idx)
break
}
} catch (err: any) {
console.error(err)
if (idx !== undefined) {
await sourceConnection.closeChunk(idx)
}
// Try again
idx = undefined
processed = 0
}
}
while (needRetrieveChunks.length > 0) {
const needRetrieve = needRetrieveChunks.shift() as Ref<Doc>[]
ctx.info('clone domain...', { domain: c, workspace: targetWorkspaceId.name })
console.log('Retrieve chunk:', needRetrieve.length)
let docs: Doc[] = []
try {
docs = await sourceConnection.loadDocs(c, needRetrieve)
if (clearTime) {
docs = docs.map((p) => {
let collectionCud = false
// We need to clean target connection before copying something.
await ctx.with('clean-domain', { domain: c }, async (ctx) => {
await cleanDomain(ctx, targetConnection, c)
})
const changes: Snapshot = {
added: new Map(),
updated: new Map(),
removed: []
}
let idx: number | undefined
// update digest tar
const needRetrieveChunks: Ref<Doc>[][] = []
let processed = 0
let st = Date.now()
// Load all digest from collection.
await ctx.with('retrieve-domain-info', { domain: c }, async (ctx) => {
while (true) {
try {
collectionCud = sourceConnection.getHierarchy().isDerived(p._class, core.class.TxCollectionCUD)
const it = await ctx.with('load-chunk', {}, async () => await sourceConnection.loadChunk(c, idx))
idx = it.idx
let needRetrieve: Ref<Doc>[] = []
let needRetrieveSize = 0
for (const { id, hash, size } of it.docs) {
processed++
if (Date.now() - st > 2500) {
ctx.info('processed', { processed, time: Date.now() - st, workspace: targetWorkspaceId.name })
st = Date.now()
}
changes.added.set(id as Ref<Doc>, hash)
needRetrieve.push(id as Ref<Doc>)
needRetrieveSize += size
if (needRetrieveSize > retrieveChunkSize) {
needRetrieveChunks.push(needRetrieve)
needRetrieveSize = 0
needRetrieve = []
}
}
if (needRetrieve.length > 0) {
needRetrieveChunks.push(needRetrieve)
}
if (it.finished) {
await ctx.with('close-chunk', {}, async () => {
await sourceConnection.closeChunk(idx as number)
})
break
}
} catch (err: any) {
ctx.error('failed to clone', { err, workspace: targetWorkspaceId.name })
if (idx !== undefined) {
await ctx.with('load-chunk', {}, async () => {
await sourceConnection.closeChunk(idx as number)
})
}
// Try again
idx = undefined
processed = 0
}
}
})
await ctx.with('clone-domain', { domain: c }, async (ctx) => {
while (needRetrieveChunks.length > 0) {
const needRetrieve = needRetrieveChunks.shift() as Ref<Doc>[]
ctx.info('Retrieve chunk:', { count: needRetrieve.length })
let docs: Doc[] = []
try {
docs = await ctx.with('load-docs', {}, async (ctx) => await sourceConnection.loadDocs(c, needRetrieve))
if (clearTime) {
docs = prepareClonedDocuments(docs, sourceConnection, skipFullText)
}
for (const d of docs) {
if (d._class === core.class.Blob) {
const blob = d as Blob
const blobs: Buffer[] = []
try {
if (storageAdapter !== undefined) {
ctx.info('clone blob', { name: blob._id, contentType: blob.contentType })
const readable = await storageAdapter.get(ctx, sourceWorkspaceId, blob._id)
const passThrue = new PassThrough()
readable.pipe(passThrue)
await storageAdapter.put(
ctx,
targetWorkspaceId,
blob._id,
passThrue,
blob.contentType,
blob.size
)
} else {
ctx.info('clone blob', { name: blob._id, contentType: blob.contentType })
await ctx.with('download-blob', { contentType: blob.contentType }, async (ctx) => {
await blobClientSource.writeTo(ctx, blob._id, blob.size, {
write: (b, cb) => {
blobs.push(b)
cb()
},
end: (cb) => {
cb()
}
})
})
await ctx.with('upload-blob', { contentType: blob.contentType }, async (ctx) => {
const buffer = Buffer.concat(blobs)
await blobClientTarget.upload(ctx, blob._id, buffer.length, blob.contentType, buffer)
})
}
} catch (err: any) {
console.error(err)
}
}
}
await ctx.with(
'upload-docs',
{},
async (ctx) => {
await targetConnection.upload(c, docs)
},
{ length: docs.length }
)
} catch (err: any) {
console.log(err)
}
// if full text is skipped, we need to clean stages for indexes.
if (p._class === core.class.DocIndexState && skipFullText) {
;(p as DocIndexState).stages = {}
}
if (collectionCud) {
return {
...p,
modifiedOn: Date.now(),
createdOn: Date.now(),
tx: {
...(p as TxCollectionCUD<Doc, AttachedDoc>).tx,
modifiedOn: Date.now(),
createdOn: Date.now()
}
}
} else {
return {
...p,
modifiedOn: Date.now(),
createdOn: Date.now()
}
}
})
}
for (const d of docs) {
if (d._class === core.class.Blob) {
const blob = d as Blob
const blobs: Buffer[] = []
try {
const ctx = new MeasureMetricsContext('upload', {})
await blobClientSource.writeTo(ctx, blob._id, blob.size, {
write: (b, cb) => {
blobs.push(b)
cb()
},
end: (cb) => {
cb()
}
})
await blobClientTarget.upload(ctx, blob._id, blob.size, blob.contentType, Buffer.concat(blobs))
} catch (err: any) {
console.error(err)
// Put back.
needRetrieveChunks.push(needRetrieve)
continue
}
}
}
await targetConnection.upload(c, docs)
} catch (err: any) {
console.log(err)
// Put back.
needRetrieveChunks.push(needRetrieve)
continue
}
}
})
i++
await progress((100 / domains.length) * i)
i++
await progress((100 / domains.length) * i)
}
} catch (err: any) {
console.error(err)
} finally {
ctx.info('end clone')
await ctx.with('close-source', {}, async (ctx) => {
await sourceConnection.close()
})
await ctx.with('close-target', {}, async (ctx) => {
await targetConnection.sendForceClose()
await targetConnection.close()
})
}
},
{
source: sourceWorkspaceId.name,
target: targetWorkspaceId.name
}
} catch (err: any) {
console.error(err)
} finally {
console.log('end clone')
await sourceConnection.close()
await targetConnection.sendForceClose()
await targetConnection.close()
}
)
}
async function cleanDomain (connection: CoreClient & BackupClient, domain: Domain): Promise<void> {
function prepareClonedDocuments (
docs: Doc[],
sourceConnection: CoreClient & BackupClient,
skipFullText: boolean
): Doc[] {
docs = docs.map((p) => {
let collectionCud = false
try {
collectionCud = sourceConnection.getHierarchy().isDerived(p._class, core.class.TxCollectionCUD)
} catch (err: any) {
console.log(err)
}
// if full text is skipped, we need to clean stages for indexes.
if (p._class === core.class.DocIndexState && skipFullText) {
;(p as DocIndexState).stages = {}
}
if (collectionCud) {
return {
...p,
modifiedOn: Date.now(),
createdOn: Date.now(),
tx: {
...(p as TxCollectionCUD<Doc, AttachedDoc>).tx,
modifiedOn: Date.now(),
createdOn: Date.now()
}
}
} else {
return {
...p,
modifiedOn: Date.now(),
createdOn: Date.now()
}
}
})
return docs
}
async function cleanDomain (ctx: MeasureContext, connection: CoreClient & BackupClient, domain: Domain): Promise<void> {
// Load all digest from collection.
let idx: number | undefined
const ids: Ref<Doc>[] = []
@ -439,7 +512,17 @@ export async function backup (
recheck: boolean
timeout: number
connectTimeout: number
} = { force: false, recheck: false, timeout: 0, skipDomains: [], connectTimeout: 30000 }
skipBlobContentTypes: string[]
blobDownloadLimit: number
} = {
force: false,
recheck: false,
timeout: 0,
skipDomains: [],
connectTimeout: 30000,
skipBlobContentTypes: [],
blobDownloadLimit: 15
}
): Promise<void> {
ctx = ctx.newChild('backup', {
workspaceId: workspaceId.name,
@ -758,8 +841,30 @@ export async function backup (
if (d._class === core.class.Blob) {
const blob = d as Blob
const descrJson = JSON.stringify(d)
addedDocuments += descrJson.length
addedDocuments += blob.size
if (blob.size > options.blobDownloadLimit * 1024 * 1024) {
ctx.info('skip blob download, limit excheed', {
blob: blob._id,
provider: blob.provider,
size: Math.round(blob.size / (1024 * 1024)),
limit: options.blobDownloadLimit
})
processChanges(d, true)
continue
}
if (
options.skipBlobContentTypes.length > 0 &&
options.skipBlobContentTypes.some((it) => blob.contentType.includes(it))
) {
ctx.info('skip blob download, contentType', {
blob: blob._id,
provider: blob.provider,
size: blob.size / (1024 * 1024)
})
processChanges(d, true)
continue
}
let blobFiled = false
if (blob.size !== 0 && !(await blobClient.checkFile(ctx, blob._id))) {
@ -768,6 +873,9 @@ export async function backup (
continue
}
addedDocuments += descrJson.length
addedDocuments += blob.size
_pack.entry({ name: d._id + '.json' }, descrJson, function (err) {
if (err != null) throw err
})
@ -775,12 +883,20 @@ export async function backup (
try {
const entry = _pack?.entry({ name: d._id, size: blob.size }, (err) => {
if (err != null) {
ctx.error('error packing file', err)
ctx.error('error packing file', { err })
}
})
if (blob.size === 0) {
entry.end()
} else {
// if (blob.size > 1024 * 1024) {
ctx.info('download blob', {
_id: blob._id,
contentType: blob.contentType,
size: blob.size,
provider: blob.provider
})
// }
await blobClient.writeTo(ctx, blob._id, blob.size, {
write (buffer, cb) {
entry.write(buffer, cb)
@ -1206,8 +1322,6 @@ export async function restore (
await endPromise
}
} else {
console.log('domain had no changes', c)
}
}
}

View File

@ -109,7 +109,9 @@ class BackupWorker {
force: false,
recheck: false,
timeout: this.config.Timeout * 1000,
connectTimeout: 5 * 60 * 1000 // 5 minutes to
connectTimeout: 5 * 60 * 1000, // 5 minutes to,
blobDownloadLimit: 100,
skipBlobContentTypes: []
})
})
} catch (err: any) {

View File

@ -183,27 +183,28 @@ export class MemRawDBAdapter implements RawDBAdapter {
ctx: MeasureContext,
workspace: WorkspaceId,
domain: Domain,
query: { _class: Ref<Class<T>> } & DocumentQuery<T>,
query: DocumentQuery<T>,
options?: Omit<FindOptions<T>, 'projection' | 'lookup'>
): Promise<FindResult<T>> {
const db = this.workspaces.get(workspace.name)
if (db === undefined) {
return toFindResult([])
}
return await db.findAll(query._class as Ref<Class<T>>, query, options)
return await db.findAll(core.class.Blob as Ref<Class<T>>, query, options)
}
async findStream<T extends Doc>(
ctx: MeasureContext,
workspace: WorkspaceId,
domain: Domain,
query: { _class: Ref<Class<T>> } & DocumentQuery<T>,
query: DocumentQuery<T>,
options?: Omit<FindOptions<T>, 'projection' | 'lookup'>
): Promise<RawDBAdapterStream<T>> {
const db = this.workspaces.get(workspace.name)
let result: T[] = []
if (db !== undefined) {
result = await db.findAll(query._class as Ref<Class<T>>, query, options)
result = await db.findAll(core.class.Blob as Ref<Class<T>>, query, options)
}
return {
next: async () => {

View File

@ -65,14 +65,14 @@ export interface RawDBAdapter {
ctx: MeasureContext,
workspace: WorkspaceId,
domain: Domain,
query: { _class: Ref<Class<T>> } & DocumentQuery<T>,
query: DocumentQuery<T>,
options?: Omit<FindOptions<T>, 'projection' | 'lookup'>
) => Promise<FindResult<T>>
findStream: <T extends Doc>(
ctx: MeasureContext,
workspace: WorkspaceId,
domain: Domain,
query: { _class: Ref<Class<T>> } & DocumentQuery<T>,
query: DocumentQuery<T>,
options?: Omit<FindOptions<T>, 'projection' | 'lookup'>
) => Promise<RawDBAdapterStream<T>>
upload: <T extends Doc>(ctx: MeasureContext, workspace: WorkspaceId, domain: Domain, docs: T[]) => Promise<void>

View File

@ -1,13 +1,15 @@
import core, {
DOMAIN_BLOB,
groupByArray,
withContext,
type Blob,
type BlobLookup,
type Branding,
type MeasureContext,
type Ref,
type StorageIterator,
type WorkspaceId,
type WorkspaceIdWithUrl,
type Branding
type WorkspaceIdWithUrl
} from '@hcengineering/core'
import { type Readable } from 'stream'
import { type RawDBAdapter } from '../adapter'
@ -48,13 +50,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
providerId?: string
): Promise<void> {
let current: Blob | undefined = (
await this.dbAdapter.find<Blob>(
ctx,
workspaceId,
DOMAIN_BLOB,
{ _class: core.class.Blob, _id: objectName as Ref<Blob> },
{ limit: 1 }
)
await this.dbAdapter.find<Blob>(ctx, workspaceId, DOMAIN_BLOB, { _id: objectName as Ref<Blob> }, { limit: 1 })
).shift()
if (current === undefined && providerId !== undefined) {
current = await this.adapters.get(providerId)?.stat(ctx, workspaceId, objectName)
@ -78,8 +74,40 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
}
}
async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {
// We need to initialize internal table if it miss documents.
async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {}
find (ctx: MeasureContext, workspaceId: WorkspaceId): StorageIterator {
const adapters = Array.from(this.adapters.values())
let iterator: BlobStorageIterator | undefined
return {
next: async (ctx) => {
while (true) {
if (iterator === undefined && adapters.length > 0) {
iterator = await (adapters.shift() as StorageAdapter).listStream(ctx, workspaceId)
}
if (iterator === undefined) {
return undefined
}
const docInfo = await iterator.next()
if (docInfo !== undefined) {
return {
hash: docInfo.etag,
id: docInfo._id,
size: docInfo.size
}
} else {
// We need to take next adapter
await iterator.close()
iterator = undefined
}
}
},
close: async (ctx) => {
if (iterator !== undefined) {
await iterator.close()
}
}
}
}
async close (): Promise<void> {
@ -98,6 +126,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
return true
}
@withContext('aggregator-make', {})
async make (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {
for (const a of this.adapters.values()) {
if (!(await a.exists(ctx, workspaceId))) {
@ -106,6 +135,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
}
}
@withContext('aggregator-listBuckets', {})
async listBuckets (ctx: MeasureContext, productId: string): Promise<BucketInfo[]> {
const result: BucketInfo[] = []
for (const a of this.adapters.values()) {
@ -114,6 +144,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
return result
}
@withContext('aggregator-delete', {})
async delete (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {
for (const a of this.adapters.values()) {
if (await a.exists(ctx, workspaceId)) {
@ -122,9 +153,9 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
}
}
@withContext('aggregator-remove', {})
async remove (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]): Promise<void> {
const docs = await this.dbAdapter.find<Blob>(ctx, workspaceId, DOMAIN_BLOB, {
_class: core.class.Blob,
_id: { $in: objectNames as Ref<Blob>[] }
})
@ -149,7 +180,6 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
prefix?: string | undefined
): Promise<BlobStorageIterator> {
const data = await this.dbAdapter.findStream<Blob>(ctx, workspaceId, DOMAIN_BLOB, {
_class: core.class.Blob,
_id: { $regex: `${prefix ?? ''}.*` }
})
return {
@ -162,35 +192,32 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
}
}
@withContext('aggregator-stat', {})
async stat (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<Blob | undefined> {
const result = await this.dbAdapter.find<Blob>(
ctx,
workspaceId,
DOMAIN_BLOB,
{ _class: core.class.Blob, _id: name as Ref<Blob> },
{ _id: name as Ref<Blob> },
{ limit: 1 }
)
return result.shift()
}
@withContext('aggregator-get', {})
async get (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<Readable> {
const { provider, stat } = await this.findProvider(workspaceId, ctx, name)
const { provider, stat } = await this.findProvider(ctx, workspaceId, name)
return await provider.get(ctx, workspaceId, stat.storageId)
}
@withContext('find-provider', {})
private async findProvider (
workspaceId: WorkspaceId,
ctx: MeasureContext,
workspaceId: WorkspaceId,
objectName: string
): Promise<{ provider: StorageAdapter, stat: Blob }> {
const stat = (
await this.dbAdapter.find<Blob>(
ctx,
workspaceId,
DOMAIN_BLOB,
{ _class: core.class.Blob, _id: objectName as Ref<Blob> },
{ limit: 1 }
)
await this.dbAdapter.find<Blob>(ctx, workspaceId, DOMAIN_BLOB, { _id: objectName as Ref<Blob> }, { limit: 1 })
).shift()
if (stat === undefined) {
throw new NoSuchKeyError(`No such object found ${objectName}`)
@ -202,6 +229,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
return { provider, stat }
}
@withContext('aggregator-partial', {})
async partial (
ctx: MeasureContext,
workspaceId: WorkspaceId,
@ -209,12 +237,13 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
offset: number,
length?: number | undefined
): Promise<Readable> {
const { provider, stat } = await this.findProvider(workspaceId, ctx, objectName)
const { provider, stat } = await this.findProvider(ctx, workspaceId, objectName)
return await provider.partial(ctx, workspaceId, stat.storageId, offset, length)
}
@withContext('aggregator-read', {})
async read (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<Buffer[]> {
const { provider, stat } = await this.findProvider(workspaceId, ctx, name)
const { provider, stat } = await this.findProvider(ctx, workspaceId, name)
return await provider.read(ctx, workspaceId, stat.storageId)
}
@ -239,6 +268,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
return { adapter: this.adapters.get(this.defaultAdapter), provider: this.defaultAdapter }
}
@withContext('aggregator-put', {})
async put (
ctx: MeasureContext,
workspaceId: WorkspaceId,
@ -249,13 +279,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
): Promise<UploadedObjectInfo> {
// We need to reuse same provider for existing documents.
const stat = (
await this.dbAdapter.find<Blob>(
ctx,
workspaceId,
DOMAIN_BLOB,
{ _class: core.class.Blob, _id: objectName as Ref<Blob> },
{ limit: 1 }
)
await this.dbAdapter.find<Blob>(ctx, workspaceId, DOMAIN_BLOB, { _id: objectName as Ref<Blob> }, { limit: 1 })
).shift()
const { provider, adapter } = this.selectProvider(stat?.provider, contentType)
@ -293,6 +317,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
return result
}
@withContext('aggregator-lookup', {})
async lookup (
ctx: MeasureContext,
workspaceId: WorkspaceIdWithUrl,

View File

@ -31,6 +31,8 @@ import sharp from 'sharp'
import { v4 as uuid } from 'uuid'
import { preConditions } from './utils'
import fs from 'fs'
const cacheControlValue = 'public, max-age=365d'
const cacheControlNoCache = 'public, no-store, no-cache, must-revalidate, max-age=0'
@ -45,10 +47,11 @@ async function storageUpload (
): Promise<string> {
const id = uuid()
const data = file.tempFilePath !== undefined ? fs.createReadStream(file.tempFilePath) : file.data
const resp = await ctx.with(
'storage upload',
{ workspace: workspace.name },
async () => await storageAdapter.put(ctx, workspace, id, file.data, file.mimetype, file.size),
async (ctx) => await storageAdapter.put(ctx, workspace, id, data, file.mimetype, file.size),
{ file: file.name, contentType: file.mimetype }
)
@ -80,7 +83,6 @@ async function getFileRange (
range: string,
client: StorageAdapter,
workspace: WorkspaceId,
uuid: string,
res: Response
): Promise<void> {
const size: number = stat.size
@ -103,7 +105,7 @@ async function getFileRange (
const dataStream = await ctx.with(
'partial',
{},
async () => await client.partial(ctx, workspace, uuid, start, end - start + 1),
async (ctx) => await client.partial(ctx, workspace, stat._id, start, end - start + 1),
{}
)
res.writeHead(206, {
@ -155,7 +157,6 @@ async function getFile (
stat: PlatformBlob,
client: StorageAdapter,
workspace: WorkspaceId,
uuid: string,
req: Request,
res: Response
): Promise<void> {
@ -183,7 +184,7 @@ async function getFile (
{ contentType: stat.contentType },
async (ctx) => {
try {
const dataStream = await ctx.with('readable', {}, async () => await client.get(ctx, workspace, uuid))
const dataStream = await ctx.with('readable', {}, async (ctx) => await client.get(ctx, workspace, stat._id))
res.writeHead(200, {
'Content-Type': stat.contentType,
Etag: stat.etag,
@ -241,7 +242,11 @@ export function start (
const app = express()
app.use(cors())
app.use(fileUpload())
app.use(
fileUpload({
useTempFiles: true
})
)
app.use(bp.json())
app.use(bp.urlencoded({ extended: true }))
@ -359,10 +364,10 @@ export function start (
uuid = uuid.slice(0, uuid.length - format.length - 1)
}
const blobInfo = await ctx.with(
let blobInfo = await ctx.with(
'notoken-stat',
{ workspace: payload.workspace.name },
async () => await config.storageAdapter.stat(ctx, payload.workspace, uuid)
async (ctx) => await config.storageAdapter.stat(ctx, payload.workspace, uuid)
)
if (blobInfo === undefined) {
@ -397,24 +402,25 @@ export function start (
const size = req.query.size !== undefined ? parseInt(req.query.size as string) : undefined
if (format !== undefined && isImage && blobInfo.contentType !== 'image/gif') {
uuid = await ctx.with(
blobInfo = await ctx.with(
'resize',
{},
async () => await getGeneratePreview(ctx, blobInfo, size ?? -1, uuid, config, payload, format)
async (ctx) =>
await getGeneratePreview(ctx, blobInfo as PlatformBlob, size ?? -1, uuid, config, payload, format)
)
}
const range = req.headers.range
if (range !== undefined) {
await ctx.with('file-range', { workspace: payload.workspace.name }, async (ctx) => {
await getFileRange(ctx, blobInfo, range, config.storageAdapter, payload.workspace, uuid, res)
await getFileRange(ctx, blobInfo as PlatformBlob, range, config.storageAdapter, payload.workspace, res)
})
} else {
await ctx.with(
'file',
{ workspace: payload.workspace.name },
async (ctx) => {
await getFile(ctx, blobInfo, config.storageAdapter, payload.workspace, uuid, req, res)
await getFile(ctx, blobInfo as PlatformBlob, config.storageAdapter, payload.workspace, req, res)
},
{ uuid }
)
@ -720,9 +726,9 @@ async function getGeneratePreview (
config: { storageAdapter: StorageAdapter },
payload: Token,
format: SupportedFormat = 'jpeg'
): Promise<string> {
): Promise<PlatformBlob> {
if (size === undefined) {
return uuid
return blob
}
const sizeId = uuid + `%preview%${size}${format !== 'jpeg' ? format : ''}`
@ -731,7 +737,7 @@ async function getGeneratePreview (
if (hasSmall) {
// We have cached small document, let's proceed with it.
uuid = sizeId
return d
} else {
let data: Buffer
try {
@ -785,7 +791,7 @@ async function getGeneratePreview (
// Add support of avif as well.
await config.storageAdapter.put(ctx, payload.workspace, sizeId, dataBuff, contentType, dataBuff.length)
uuid = sizeId
return (await config.storageAdapter.stat(ctx, payload.workspace, sizeId)) ?? blob
} catch (err: any) {
Analytics.handleError(err)
ctx.error('failed to resize image', {
@ -796,7 +802,9 @@ async function getGeneratePreview (
size: blob.size,
provider: blob.provider
})
// Return original in case of error
return blob
}
}
return uuid
}

View File

@ -13,12 +13,13 @@ import {
type WorkspaceId
} from '@hcengineering/core'
import type { RawDBAdapter, RawDBAdapterStream } from '@hcengineering/server-core'
import { type Document, type Filter, type FindCursor, type Sort } from 'mongodb'
import { type Document, type Filter, type FindCursor, type MongoClient, type Sort } from 'mongodb'
import { toArray, uploadDocuments } from './storage'
import { getMongoClient, getWorkspaceDB } from './utils'
export function createRawMongoDBAdapter (url: string): RawDBAdapter {
const client = getMongoClient(url)
let mongoClient: MongoClient | undefined
const collectSort = (options: FindOptions<Doc>): Sort | undefined => {
if (options?.sort === undefined) {
@ -46,7 +47,8 @@ export function createRawMongoDBAdapter (url: string): RawDBAdapter {
cursor: FindCursor<T>
total: number
}> {
const db = getWorkspaceDB(await client.getClient(), workspace)
mongoClient = mongoClient ?? (await client.getClient())
const db = getWorkspaceDB(mongoClient, workspace)
const coll = db.collection(domain)
let cursor = coll.find<T>(query as Filter<Document>, {
checkKeys: false
@ -78,11 +80,18 @@ export function createRawMongoDBAdapter (url: string): RawDBAdapter {
query: DocumentQuery<T>,
options?: Omit<FindOptions<T>, 'projection' | 'lookup'>
): Promise<FindResult<T>> {
let { cursor, total } = await getCursor(workspace, domain, query, options)
let { cursor, total } = await ctx.with(
'get-cursor',
{},
async () => await getCursor(workspace, domain, query, options)
)
// Error in case of timeout
try {
const res = await toArray<T>(cursor)
const res = await ctx.with('to-array', {}, async () => await toArray<T>(cursor), {
...query,
...options
})
if (options?.total === true && options?.limit === undefined) {
total = res.length
}
@ -109,7 +118,8 @@ export function createRawMongoDBAdapter (url: string): RawDBAdapter {
}
},
upload: async (ctx: MeasureContext, workspace, domain, docs) => {
const db = getWorkspaceDB(await client.getClient(), workspace)
mongoClient = mongoClient ?? (await client.getClient())
const db = getWorkspaceDB(mongoClient, workspace)
const coll = db.collection(domain)
await uploadDocuments(ctx, docs, coll)
},
@ -117,7 +127,8 @@ export function createRawMongoDBAdapter (url: string): RawDBAdapter {
client.close()
},
clean: async (ctx, workspace, domain, docs) => {
const db = getWorkspaceDB(await client.getClient(), workspace)
mongoClient = mongoClient ?? (await client.getClient())
const db = getWorkspaceDB(mongoClient, workspace)
const coll = db.collection<Doc>(domain)
await coll.deleteMany({ _id: { $in: docs } })
},
@ -128,7 +139,8 @@ export function createRawMongoDBAdapter (url: string): RawDBAdapter {
operations: Map<Ref<Doc>, DocumentUpdate<Doc>>
): Promise<void> => {
await ctx.with('update', { domain }, async () => {
const db = getWorkspaceDB(await client.getClient(), workspace)
mongoClient = mongoClient ?? (await client.getClient())
const db = getWorkspaceDB(mongoClient, workspace)
const coll = db.collection(domain)
// remove old and insert new ones

View File

@ -17,7 +17,7 @@ import { MeasureMetricsContext, generateId } from '@hcengineering/core'
import { objectsToArray, type StorageConfiguration } from '@hcengineering/server-core'
import { S3Service, processConfigFromEnv, type S3Config } from '..'
describe('minio operations', () => {
describe('s3 operations', () => {
const config: StorageConfiguration = { default: 'minio', storages: [] }
const minioConfigVar = processConfigFromEnv(config)
if (minioConfigVar !== undefined || config.storages[0] === undefined) {

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import { GetObjectCommand, S3 } from '@aws-sdk/client-s3'
import { CopyObjectCommand, GetObjectCommand, PutObjectCommand, S3 } from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'
import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
@ -22,11 +22,11 @@ import core, {
withContext,
type Blob,
type BlobLookup,
type Branding,
type MeasureContext,
type Ref,
type WorkspaceId,
type WorkspaceIdWithUrl,
type Branding
type WorkspaceIdWithUrl
} from '@hcengineering/core'
import {
@ -268,6 +268,15 @@ export class S3Service implements StorageAdapter {
return this.opt.rootBucket !== undefined ? this.getBucketFolder(workspaceId) + '/' : undefined
}
async copy (sourceId: WorkspaceId, targetId: WorkspaceId, objectName: string): Promise<void> {
const copyOp = new CopyObjectCommand({
Bucket: this.getBucketId(targetId),
Key: this.getDocumentKey(targetId, objectName),
CopySource: `${this.getBucketId(sourceId)}/${this.getDocumentKey(sourceId, objectName)}`
})
await this.client.send(copyOp)
}
@withContext('listStream')
async listStream (
ctx: MeasureContext,
@ -382,41 +391,57 @@ export class S3Service implements StorageAdapter {
contentType: string,
size?: number
): Promise<UploadedObjectInfo> {
if (size === undefined) {
const uploadTask = new Upload({
client: this.client,
params: {
Bucket: this.getBucketId(workspaceId),
Key: this.getDocumentKey(workspaceId, objectName),
ContentType: contentType,
Body: stream
if (size !== undefined && size < 1024 * 1024 * 5) {
return await ctx.with(
'simple-put',
{},
async () => {
const cmd = new PutObjectCommand({
Bucket: this.getBucketId(workspaceId),
Key: this.getDocumentKey(workspaceId, objectName),
ContentType: contentType,
ContentLength: size,
Body: stream
})
const response = await this.client.send(cmd)
return {
etag: response.ETag ?? '',
versionId: response.VersionId ?? null
}
},
{ size, objectName, workspaceId: workspaceId.name }
)
// Less 5Mb
} else {
return await ctx.with(
'multipart-upload',
{},
async () => {
const uploadTask = new Upload({
client: this.client,
params: {
Bucket: this.getBucketId(workspaceId),
Key: this.getDocumentKey(workspaceId, objectName),
ContentType: contentType,
Body: stream
},
// (optional) concurrency configuration
queueSize: 1,
// (optional) concurrency configuration
// queueSize: 1,
// (optional) size of each part, in bytes, at least 5MB
partSize: 1024 * 1024 * 5,
leavePartsOnError: false
})
// (optional) size of each part, in bytes, at least 5MB
partSize: 1024 * 1024 * 5,
leavePartsOnError: false
})
const output = await uploadTask.done()
return {
etag: output.ETag ?? '',
versionId: output.VersionId ?? null
}
}
const result = await this.client.putObject({
Bucket: this.getBucketId(workspaceId),
Key: this.getDocumentKey(workspaceId, objectName),
ContentType: contentType,
ContentLength: size,
Body: stream
})
return {
etag: result.ETag ?? '',
versionId: result.VersionId ?? null
const output = await uploadTask.done()
return {
etag: output.ETag ?? '',
versionId: output.VersionId ?? null
}
},
{ size, objectName, workspaceId: workspaceId.name }
)
}
}
@ -426,12 +451,8 @@ export class S3Service implements StorageAdapter {
const chunks: Buffer[] = []
await new Promise((resolve, reject) => {
data.on('readable', () => {
let chunk
while ((chunk = data.read()) !== null) {
const b = chunk as Buffer
chunks.push(b)
}
data.on('data', (chunk) => {
chunks.push(chunk)
})
data.on('end', () => {
@ -439,6 +460,7 @@ export class S3Service implements StorageAdapter {
resolve(null)
})
data.on('error', (err) => {
data.destroy()
reject(err)
})
})

102
server/s3/src/perfTest.ts Normal file
View File

@ -0,0 +1,102 @@
import { MeasureMetricsContext, generateId } from '@hcengineering/core'
import type { StorageConfiguration } from '@hcengineering/server-core'
import { S3Service, processConfigFromEnv, type S3Config } from '.'
const MB = 1024 * 1024
const config: StorageConfiguration = { default: 'minio', storages: [] }
const minioConfigVar = processConfigFromEnv(config)
if (minioConfigVar !== undefined || config.storages[0] === undefined) {
console.error('No S3 config env is configured:' + minioConfigVar)
it.skip('No S3 config env is configured', async () => {})
process.exit(1)
}
const toolCtx = new MeasureMetricsContext('test', {})
const storageService = new S3Service({ ...(config.storages[0] as S3Config), rootBucket: 'haiodo-test-bucket' })
async function doTest (): Promise<void> {
const existingTestBuckets = await storageService.listBuckets(toolCtx, '')
// Delete old buckets
for (const b of existingTestBuckets) {
await b.delete()
}
const genWorkspaceId1 = generateId()
const ws1 = { name: genWorkspaceId1, productId: '' }
await storageService.make(toolCtx, ws1)
/// /////// Uploads
let st1 = Date.now()
const sz = 10
const stream = Buffer.alloc(sz * 1024 * 1024)
for (let i = 0; i < 10; i++) {
// We need 1Mb random file to check upload speed.
const st = Date.now()
await storageService.put(toolCtx, ws1, `testObject.${i}`, stream, 'application/octet-stream', stream.length)
console.log('upload time', Date.now() - st)
}
let now = Date.now()
console.log(`upload performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`)
/// // Downloads 1
st1 = Date.now()
for (let i = 0; i < 10; i++) {
// We need 1Mb random file to check upload speed.
const st = Date.now()
await storageService.read(toolCtx, ws1, `testObject.${i}`)
console.log('download time', Date.now() - st)
}
now = Date.now()
console.log(`download performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`)
/// Downloads 2
st1 = Date.now()
for (let i = 0; i < 10; i++) {
// We need 1Mb random file to check upload speed.
const st = Date.now()
const readable = await storageService.get(toolCtx, ws1, `testObject.${i}`)
const chunks: Buffer[] = []
readable.on('data', (chunk) => {
chunks.push(chunk)
})
await new Promise<void>((resolve) => {
readable.on('end', () => {
resolve()
readable.destroy()
})
})
console.log('download time 2', Date.now() - st)
}
now = Date.now()
console.log(`download performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`)
/// Downloads 3
st1 = Date.now()
for (let i = 0; i < 10; i++) {
// We need 1Mb random file to check upload speed.
const st = Date.now()
for (let i = 0; i < sz; i++) {
const readable = await storageService.partial(toolCtx, ws1, `testObject.${i}`, i * MB, MB)
const chunks: Buffer[] = []
readable.on('data', (chunk) => {
chunks.push(chunk)
})
await new Promise<void>((resolve) => {
readable.on('end', () => {
resolve()
readable.destroy()
})
})
}
console.log('download time 2', Date.now() - st)
}
now = Date.now()
console.log(`download performance: ${Math.round((sz * 10 * 1000 * 100) / (now - st1)) / 100} mb per second`)
}
void doTest().catch((err) => {
console.error(err)
})
console.log('done')

View File

@ -75,12 +75,25 @@ class StorageBlobAdapter implements DbAdapter {
async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise<void> {
// We need to update docs to have provider === defualt one.
if ('adapters' in this.client) {
const toUpload: Doc[] = []
const adapterEx = this.client as StorageAdapterEx
for (const d of docs) {
// We need sync stats to be sure all info are correct from storage.
if (d._class === core.class.Blob) {
;(d as Blob).provider = adapterEx.defaultAdapter
const blob = d as Blob
const blobStat = await this.client.stat(ctx, this.workspaceId, blob.storageId)
if (blobStat !== undefined) {
blob.provider = adapterEx.defaultAdapter
blob.etag = blobStat.etag
blob.contentType = blobStat.contentType
blob.version = blobStat.version
blob.size = blobStat.size
toUpload.push(blob)
}
}
}
docs = toUpload
}
await this.blobAdapter.upload(ctx, domain, docs)
}

View File

@ -139,7 +139,7 @@ export class BlobClient {
}
): Promise<void> {
let written = 0
const chunkSize = 1024 * 1024
const chunkSize = 50 * 1024 * 1024
let writtenMb = 0
// Use ranges to iterave through file with retry if required.
@ -148,12 +148,25 @@ export class BlobClient {
let response: Response | undefined
for (; i < 5; i++) {
try {
response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, {
headers: {
Authorization: 'Bearer ' + this.token,
Range: `bytes=${written}-${size === -1 ? written + chunkSize : Math.min(size - 1, written + chunkSize)}`
}
})
const st = Date.now()
const header: Record<string, string> = {
Authorization: 'Bearer ' + this.token
}
if (!(size !== -1 && written === 0 && size < chunkSize)) {
header.Range = `bytes=${written}-${size === -1 ? written + chunkSize : Math.min(size - 1, written + chunkSize)}`
}
response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, { headers: header })
if (header.Range != null) {
ctx.info('fetch part', { time: Date.now() - st, blobId: name, written, size })
}
if (response.status === 403) {
i = 5
// No file, so make it empty
throw new Error(`Unauthorized ${this.transactorAPIUrl}/${this.workspace.name}/${name}`)
}
if (response.status === 404) {
i = 5
// No file, so make it empty
@ -169,6 +182,10 @@ export class BlobClient {
throw new Error(`No file for ${this.transactorAPIUrl}/${this.workspace.name}/${name}`)
}
const chunk = Buffer.from(await response.arrayBuffer())
if (header.Range == null) {
size = chunk.length
}
// We need to parse
// 'Content-Range': `bytes ${start}-${end}/${size}`
// To determine if something is left
@ -223,12 +240,13 @@ export class BlobClient {
for (let i = 0; i < 5; i++) {
try {
await fetch(
this.transactorAPIUrl + `?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}`,
this.transactorAPIUrl +
`?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}&size=${size}`,
{
method: 'PUT',
headers: {
Authorization: 'Bearer ' + this.token,
'Content-Type': 'application/octet-stream'
'Content-Type': contentType
},
body: buffer
}
@ -239,9 +257,6 @@ export class BlobClient {
ctx.error('failed to upload file', { name })
throw err
}
await new Promise<void>((resolve) => {
setTimeout(resolve, 500)
})
}
}
}

View File

@ -174,6 +174,13 @@ export async function initModel (
}
}
export function getStorageAdapter (): StorageAdapter {
const { mongodbUri } = prepareTools([])
const storageConfig: StorageConfiguration = storageConfigFromEnv()
return buildStorageFromConfig(storageConfig, mongodbUri)
}
/**
* @public
*/
@ -213,16 +220,19 @@ export async function upgradeModel (
)
await progress(0)
await ctx.with('pre-migrate', {}, async () => {
await ctx.with('pre-migrate', {}, async (ctx) => {
let i = 0
for (const op of migrateOperations) {
if (op[1].preMigrate === undefined) {
continue
}
const preMigrate = op[1].preMigrate
const t = Date.now()
try {
await op[1].preMigrate(preMigrateClient, logger)
await ctx.with(op[0], {}, async (ctx) => {
await preMigrate(preMigrateClient, logger)
})
} catch (err: any) {
logger.error(`error during pre-migrate: ${op[0]} ${err.message}`, err)
throw err
@ -280,12 +290,14 @@ export async function upgradeModel (
workspaceId
)
await ctx.with('migrate', {}, async () => {
await ctx.with('migrate', {}, async (ctx) => {
let i = 0
for (const op of migrateOperations) {
const t = Date.now()
try {
await op[1].migrate(migrateClient, logger)
await ctx.with(op[0], {}, async () => {
await op[1].migrate(migrateClient, logger)
})
} catch (err: any) {
logger.error(`error during migrate: ${op[0]} ${err.message}`, err)
throw err
@ -295,6 +307,7 @@ export async function upgradeModel (
i++
}
})
logger.log('Apply upgrade operations', { workspaceId: workspaceId.name })
let connection: (CoreClient & BackupClient) | undefined
@ -317,11 +330,13 @@ export async function upgradeModel (
return connection
})
try {
await ctx.with('upgrade', {}, async () => {
await ctx.with('upgrade', {}, async (ctx) => {
let i = 0
for (const op of migrateOperations) {
const t = Date.now()
await op[1].upgrade(migrateState, getUpgradeClient, logger)
await ctx.with(op[0], {}, async () => {
await op[1].upgrade(migrateState, getUpgradeClient, logger)
})
logger.log('upgrade:', { operation: op[0], time: Date.now() - t, workspaceId: workspaceId.name })
await progress(60 + ((100 / migrateOperations.length) * i * 40) / 100)
i++

View File

@ -176,21 +176,29 @@ export function startHttpServer (
const name = req.query.name as string
const contentType = req.query.contentType as string
const size = parseInt((req.query.size as string) ?? '-1')
void ctx
.with(
'storage upload',
{ workspace: payload.workspace.name },
async () => await externalStorage.put(ctx, payload.workspace, name, req, contentType),
async (ctx) =>
await externalStorage.put(ctx, payload.workspace, name, req, contentType, size !== -1 ? size : undefined),
{ file: name, contentType }
)
.then(() => {
res.writeHead(200, { 'Cache-Control': 'no-cache' })
res.end()
})
.catch((err) => {
Analytics.handleError(err)
ctx.error('/api/v1/blob put error', { err })
res.writeHead(404, {})
res.end()
})
} catch (err: any) {
Analytics.handleError(err)
console.error(err)
ctx.error('/api/v1/blob put error', { err })
res.writeHead(404, {})
res.end()
}
@ -209,14 +217,27 @@ export function startHttpServer (
const range = req.headers.range
if (range !== undefined) {
void ctx.with('file-range', { workspace: payload.workspace.name }, async (ctx) => {
await getFileRange(ctx, range, externalStorage, payload.workspace, name, wrapRes(res))
})
void ctx
.with('file-range', { workspace: payload.workspace.name }, async (ctx) => {
await getFileRange(ctx, range, externalStorage, payload.workspace, name, wrapRes(res))
})
.catch((err) => {
Analytics.handleError(err)
ctx.error('/api/v1/blob get error', { err })
res.writeHead(404, {})
res.end()
})
} else {
void getFile(ctx, externalStorage, payload.workspace, name, wrapRes(res))
void getFile(ctx, externalStorage, payload.workspace, name, wrapRes(res)).catch((err) => {
Analytics.handleError(err)
ctx.error('/api/v1/blob get error', { err })
res.writeHead(404, {})
res.end()
})
}
} catch (err: any) {
Analytics.handleError(err)
ctx.error('/api/v1/blob get error', { err })
}
})

View File

@ -318,13 +318,22 @@ export function startUWebsocketServer (
const name = req.getQuery('name') as string
const contentType = req.getQuery('contentType') as string
const size = parseInt((req.getQuery('size') as string) ?? '-1')
const pipe = pipeFromRequest(res)
void ctx
.with(
'storage upload',
{ workspace: payload.workspace.name },
async () => await externalStorage.put(ctx, payload.workspace, name, pipe, contentType),
async () =>
await externalStorage.put(
ctx,
payload.workspace,
name,
pipe,
contentType,
size !== -1 ? size : undefined
),
{ file: name, contentType }
)
.then(() => {