Server Backup Client interface (#1813)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2022-05-23 22:53:33 +07:00 committed by GitHub
parent 37abd77e44
commit bc5f9c0631
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 1057 additions and 130 deletions

36
.vscode/launch.json vendored
View File

@ -15,8 +15,8 @@
"APM_SERVER_URL2": "http://localhost:8200",
"METRICS_CONSOLE": "true", // Show metrics in console evert 30 seconds.,
"MINIO_ENDPOINT": "localhost",
"MINIO_ACCESS_KEY":"minioadmin",
"MINIO_SECRET_KEY":"minioadmin",
"MINIO_ACCESS_KEY": "minioadmin",
"MINIO_SECRET_KEY": "minioadmin",
"SERVER_SECRET": "secret"
},
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
@ -41,34 +41,34 @@
"request": "launch",
"args": ["src/index.ts", "gen-recruit", "ws1", "20"],
"env": {
"TRANSACTOR_URL":"ws:/localhost:3333",
"MINIO_ACCESS_KEY":"minioadmin",
"MINIO_SECRET_KEY":"minioadmin",
"MINIO_ENDPOINT":"localhost"
"TRANSACTOR_URL": "ws:/localhost:3333",
"MINIO_ACCESS_KEY": "minioadmin",
"MINIO_SECRET_KEY": "minioadmin",
"MINIO_ENDPOINT": "localhost"
},
"runtimeArgs": ["--nolazy", "-r", "ts-node/register" ],
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
"sourceMaps": true,
"cwd": "${workspaceRoot}/dev/generator",
"protocol": "inspector"
},
},
{
"name": "Debug tool",
"type": "node",
"request": "launch",
"args": ["src/index.ts", "upgrade-workspace", "ws1"],
"args": ["src/index.ts", "backup-workspace", "ws1", "../../../dump/test/ws1"],
"env": {
"MINIO_ACCESS_KEY":"minioadmin",
"MINIO_SECRET_KEY":"minioadmin",
"MINIO_ENDPOINT":"localhost",
"MONGO_URL":"mongodb://localhost:27017",
"TRANSACTOR_URL":"ws:/localhost:3333",
"TELEGRAM_DATABASE":"telegram-service",
"ELASTIC_URL":"http://localhost:9200",
"MINIO_ACCESS_KEY": "minioadmin",
"MINIO_SECRET_KEY": "minioadmin",
"MINIO_ENDPOINT": "localhost",
"MONGO_URL": "mongodb://localhost:27017",
"TRANSACTOR_URL": "ws:/localhost:3333",
"TELEGRAM_DATABASE": "telegram-service",
"ELASTIC_URL": "http://localhost:9200"
},
"runtimeArgs": ["--nolazy", "-r", "ts-node/register" ],
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
"sourceMaps": true,
"cwd": "${workspaceRoot}/dev/tool",
"protocol": "inspector"
},
}
]
}

View File

@ -267,6 +267,7 @@ specifiers:
svelte-loader: ^3.1.2
svelte-preprocess: ^4.10.5
svgo-loader: ^3.0.0
tar-stream: ~2.2.0
toposort: ^2.0.2
ts-loader: ^9.2.5
ts-node: ~10.5.0
@ -546,6 +547,7 @@ dependencies:
svelte-loader: 3.1.2_svelte@3.48.0
svelte-preprocess: 4.10.6_6851c101606d18b3be336719f14f900a
svgo-loader: 3.0.0
tar-stream: 2.2.0
toposort: 2.0.2
ts-loader: 9.3.0_typescript@4.6.4+webpack@5.72.0
ts-node: 10.5.0_fd4b5ff48b408bf425abfec4a641da13
@ -2488,6 +2490,12 @@ packages:
resolution: {integrity: sha512-W+bw9ds02rAQaMvaLYxAbJ6cvguW/iJXNT6lTssS1ps6QdrMKttqEAMEG/b5CR8TZl3/L7/lH0ZV5nNR1LXikA==}
dev: false
/@types/tar-stream/2.2.2:
resolution: {integrity: sha512-1AX+Yt3icFuU6kxwmPakaiGrJUwG44MpuiqPg4dSolRFk6jmvs4b3IbUol9wKDLIgU76gevn3EwE8y/DkSJCZQ==}
dependencies:
'@types/node': 16.11.33
dev: false
/@types/toposort/2.0.3:
resolution: {integrity: sha512-jRtyvEu0Na/sy0oIxBW0f6wPQjidgVqlmCTJVHEGTNEUdL1f0YSvdPzHY7nX7MUWAZS6zcAa0KkqofHjy/xDZQ==}
dev: false
@ -3350,6 +3358,14 @@ packages:
resolution: {integrity: sha512-nbE1WxOTTrUWIfsfZ4aHGYu5DOuNkbxGokjV6Z2kxfJK3uaAb8zNK1muzOeipoLHZjInT4Br88BHpzevc681xA==}
dev: false
/bl/4.1.0:
resolution: {integrity: sha512-1W07cM9gS6DcLperZfFSj+bWLtaPGSOHWhPiGzXmvVJbRLdG82sH/Kn8EtW1VqWVA54AKf2h5k5BbnIbwF3h6w==}
dependencies:
buffer: 5.7.1
inherits: 2.0.4
readable-stream: 3.6.0
dev: false
/block-stream2/2.1.0:
resolution: {integrity: sha512-suhjmLI57Ewpmq00qaygS8UgEq2ly2PCItenIyhMqVjo4t4pGzqMvfgJuX8iWTeSDdfSSqS6j38fL4ToNL7Pfg==}
dependencies:
@ -5400,6 +5416,10 @@ packages:
engines: {node: '>= 0.6'}
dev: false
/fs-constants/1.0.0:
resolution: {integrity: sha512-y6OAwoSIf7FyjMIv94u+b5rdheZEjzR63GTyZJm5qh4Bi+2YgwLCcI/fPFZkL5PSixOt6ZNKm+w+Hfp/Bciwow==}
dev: false
/fs-extra/7.0.1:
resolution: {integrity: sha512-YJDaCJZEnBmcbw13fvdAM9AwNOJwOzrE4pqMqBq5nFiEqXUqHwlK4B+3pUw6JNvfSPtX05xFHtYy/1ni01eGCw==}
engines: {node: '>=6 <7 || >=8'}
@ -9324,6 +9344,17 @@ packages:
engines: {node: '>=6'}
dev: false
/tar-stream/2.2.0:
resolution: {integrity: sha512-ujeqbceABgwMZxEJnk2HDY2DlnUZ+9oEcb1KzTVfYHio0UE6dG71n60d8D2I4qNvleWrrXpmjpt7vZeF1LnMZQ==}
engines: {node: '>=6'}
dependencies:
bl: 4.1.0
end-of-stream: 1.4.4
fs-constants: 1.0.0
inherits: 2.0.4
readable-stream: 3.6.0
dev: false
/terminal-link/2.1.1:
resolution: {integrity: sha512-un0FmiRUQNr5PJqy9kP7c40F5BOfpGlYTrxonDChEZB7pzZxRNp/bt+ymiy9/npwXya9KH99nJ/GXFIiUkYGFQ==}
engines: {node: '>=8'}
@ -11434,7 +11465,7 @@ packages:
dev: false
file:projects/login-resources.tgz_3b42a51b6c974062237d417c554d9dd7:
resolution: {integrity: sha512-eMjS1PLlboPHBexc6EnxyYyzaYc0RwpDn6wfd8lYt819DKg12jbhGxeeUya16VBcFThhUaE3JtoHoGWN/IbhOg==, tarball: file:projects/login-resources.tgz}
resolution: {integrity: sha512-NM5ASZ51LO6nG/+f24JOQlwQbaEsiCuXcFqz7/Wl9wJUg3JS+NapdgzqQKrB8Zj8woXKwpQRdMMCWtm/ia+G+w==, tarball: file:projects/login-resources.tgz}
id: file:projects/login-resources.tgz
name: '@rush-temp/login-resources'
version: 0.0.0
@ -14118,7 +14149,7 @@ packages:
dev: false
file:projects/tool.tgz:
resolution: {integrity: sha512-F55vTTO/9oh6IInFdnFBe1RJdOJ5RG7R++gf7t46Eo7l4T7XLFLRpXssTqi0ZVMFF3qIctlhcFJ/8TJKGr7XfQ==, tarball: file:projects/tool.tgz}
resolution: {integrity: sha512-FDtGsWkU/bXJkI2a4pv99KMKgUbBZIaH6OEcDApP6N18CAwPVT6neRFMTNb+3h98qwYOZLLU0amG3+ZFCkvrcw==, tarball: file:projects/tool.tgz}
name: '@rush-temp/tool'
version: 0.0.0
dependencies:
@ -14129,6 +14160,7 @@ packages:
'@types/minio': 7.0.13
'@types/node': 16.11.33
'@types/request': 2.48.8
'@types/tar-stream': 2.2.2
'@types/ws': 8.5.3
'@types/xml2js': 0.4.11
'@typescript-eslint/eslint-plugin': 5.22.0_27efc1da00e78084f5aa1809ff6483a1
@ -14147,6 +14179,7 @@ packages:
mongodb: 4.5.0
prettier: 2.6.2
request: 2.88.2
tar-stream: 2.2.0
ts-node: 10.5.0_fd4b5ff48b408bf425abfec4a641da13
typescript: 4.6.4
ws: 8.6.0
@ -14182,7 +14215,7 @@ packages:
dev: false
file:projects/tracker-resources.tgz_3b42a51b6c974062237d417c554d9dd7:
resolution: {integrity: sha512-1pqvSrfYuNekxpXAp1d1W7rorRKgZEr4W9/mp87dR+IBO21CFCaZxQNatPGqXom+ROv6Jv0TSAszQLB6dowCOA==, tarball: file:projects/tracker-resources.tgz}
resolution: {integrity: sha512-rmgG7PdVMOQB2GtA1cN+ppLMmJ7tRcju8tdHSGzxTX65FRuejH0zW2SU789JnGncjt1A1TT9bIZXSNwOV3V/yw==, tarball: file:projects/tracker-resources.tgz}
id: file:projects/tracker-resources.tgz
name: '@rush-temp/tracker-resources'
version: 0.0.0

View File

@ -17,18 +17,21 @@ import {
Class,
ClientConnection,
Doc,
DocChunk,
DocumentQuery,
Domain,
DOMAIN_TX,
FindOptions,
FindResult,
MeasureMetricsContext,
Ref,
ServerStorage,
Tx,
TxHander,
TxResult,
DOMAIN_TX,
MeasureMetricsContext
TxResult
} from '@anticrm/core'
import { createInMemoryTxAdapter } from '@anticrm/dev-storage'
import devmodel from '@anticrm/devmodel'
import { protoDeserialize, protoSerialize, setMetadata } from '@anticrm/platform'
import {
createInMemoryAdapter,
@ -37,7 +40,6 @@ import {
FullTextAdapter,
IndexedDoc
} from '@anticrm/server-core'
import devmodel from '@anticrm/devmodel'
class ServerStorageWrapper implements ClientConnection {
measureCtx = new MeasureMetricsContext('client', {})
@ -62,6 +64,16 @@ class ServerStorageWrapper implements ClientConnection {
}
async close (): Promise<void> {}
async loadChunk (domain: Domain, idx?: number): Promise<DocChunk> {
return { idx: -1, docs: {}, finished: true }
}
async closeChunk (idx: number): Promise<void> {}
async loadDocs (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return []
}
}
class NullFullTextAdapter implements FullTextAdapter {

View File

@ -24,7 +24,7 @@ import {
FullTextAdapter,
IndexedDoc
} from '@anticrm/server-core'
import { start as startJsonRpc } from '@anticrm/server-ws'
import { ClientSession, start as startJsonRpc } from '@anticrm/server-ws'
class NullFullTextAdapter implements FullTextAdapter {
async index (doc: IndexedDoc): Promise<TxResult> {
@ -78,6 +78,7 @@ export async function start (port: number, host?: string): Promise<void> {
}
return createPipeline(conf, [])
},
(token, pipeline, broadcast) => new ClientSession(broadcast, token, pipeline),
port,
host
)

View File

@ -13,7 +13,18 @@
// limitations under the License.
//
import type { Class, Doc, DocumentQuery, FindOptions, FindResult, Ref, Tx, TxResult } from '@anticrm/core'
import type {
Class,
Doc,
DocumentQuery,
Domain,
FindOptions,
FindResult,
Ref,
StorageIterator,
Tx,
TxResult
} from '@anticrm/core'
import { Hierarchy, TxDb } from '@anticrm/core'
import builder from '@anticrm/model-all'
import type { TxAdapter } from '@anticrm/server-core'
@ -48,6 +59,17 @@ class InMemoryTxAdapter implements TxAdapter {
}
async close (): Promise<void> {}
find (domain: Domain): StorageIterator {
return {
next: async () => await Promise.reject(new Error('Not implemented')),
close: async () => {}
}
}
async load (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return []
}
}
/**

View File

@ -39,7 +39,8 @@
"@types/ws": "^8.2.1",
"@types/xml2js": "~0.4.9",
"@types/mime-types": "~2.1.1",
"@types/request": "~2.48.8"
"@types/request": "~2.48.8",
"@types/tar-stream": "~2.2.2"
},
"dependencies": {
"mongodb": "^4.1.1",
@ -103,6 +104,7 @@
"@anticrm/server-telegram-resources": "~0.6.0",
"@anticrm/rekoni": "~0.6.0",
"request": "~2.88.2",
"@anticrm/tags": "~0.6.2"
"@anticrm/tags": "~0.6.2",
"tar-stream": "~2.2.0"
}
}

244
dev/tool/src/backup.ts Normal file
View File

@ -0,0 +1,244 @@
//
// Copyright © 2020, 2021 Anticrm Platform Contributors.
// Copyright © 2021 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import core, {
BackupClient,
BlobData,
Client as CoreClient,
Doc,
Domain,
DOMAIN_MODEL,
DOMAIN_TRANSIENT,
Ref
} from '@anticrm/core'
import { createWriteStream, existsSync } from 'fs'
import { mkdir, readFile, writeFile } from 'fs/promises'
import { createGzip } from 'node:zlib'
import { join } from 'path'
import { Pack, pack } from 'tar-stream'
import { gunzipSync, gzipSync } from 'zlib'
import { connect } from './connect'
const dataBlobSize = 100 * 1024 * 1024
export interface Snapshot {
added: Record<Ref<Doc>, string>
updated: Record<Ref<Doc>, string>
removed: Ref<Doc>[]
}
export interface DomainData {
snapshot?: string
storage?: string[]
// Some statistics
added: number
updated: number
removed: number
}
export interface BackupSnapshot {
// _id => hash of added items.
domains: Record<Domain, DomainData>
date: number
}
export interface BackupInfo {
version: string
snapshots: BackupSnapshot[]
}
async function loadDigest (
fileName: string,
snapshots: BackupSnapshot[],
domain: Domain
): Promise<Map<Ref<Doc>, string>> {
const result = new Map<Ref<Doc>, string>()
for (const s of snapshots) {
const d = s.domains[domain]
if (d?.snapshot !== undefined) {
const dChanges: Snapshot = JSON.parse(gunzipSync(await readFile(join(fileName, d.snapshot))).toString())
for (const [k, v] of Object.entries(dChanges.added)) {
result.set(k as Ref<Doc>, v)
}
for (const [k, v] of Object.entries(dChanges.updated)) {
result.set(k as Ref<Doc>, v)
}
for (const d of dChanges.removed) {
result.delete(d)
}
}
}
return result
}
/**
* @public
*/
export async function backupWorkspace (transactorUrl: string, dbName: string, fileName: string): Promise<void> {
const connection = (await connect(transactorUrl, dbName, {
mode: 'backup'
})) as unknown as CoreClient & BackupClient
try {
const domains = connection
.getHierarchy()
.domains()
.filter((it) => it !== DOMAIN_TRANSIENT && it !== DOMAIN_MODEL)
if (!existsSync(fileName)) {
await mkdir(fileName, { recursive: true })
}
let backupInfo: BackupInfo = {
version: '0.6',
snapshots: []
}
const infoFile = join(fileName, 'backup.json.gz')
if (existsSync(infoFile)) {
backupInfo = JSON.parse(gunzipSync(await readFile(infoFile)).toString())
}
const snapshot: BackupSnapshot = {
date: Date.now(),
domains: {}
}
backupInfo.snapshots.push(snapshot)
let backupIndex = `${backupInfo.snapshots.length}`
while (backupIndex.length < 6) {
backupIndex = '0' + backupIndex
}
const bdir = join(fileName, backupIndex)
if (!existsSync(bdir)) {
await mkdir(bdir, { recursive: true })
}
for (const c of domains) {
console.log('dumping domain...', c)
const changes: Snapshot = {
added: {},
updated: {},
removed: []
}
let changed = 0
let stIndex = 0
const domainInfo: Required<DomainData> = {
snapshot: join(backupIndex, `${c}-${snapshot.date}.json.gz`),
storage: [],
added: 0,
updated: 0,
removed: 0
}
// Comulative digest
const digest = await loadDigest(fileName, backupInfo.snapshots, c)
let idx: number | undefined
let _pack: Pack | undefined
let addedDocuments = 0
// update digest tar
while (true) {
const it = await connection.loadChunk(c, idx)
idx = it.idx
const needRetrieve: Ref<Doc>[] = []
for (const [k, v] of Object.entries(it.docs)) {
const kHash = digest.get(k as Ref<Doc>)
if (kHash !== undefined) {
digest.delete(k as Ref<Doc>)
if (kHash !== v) {
changes.updated[k as Ref<Doc>] = v
needRetrieve.push(k as Ref<Doc>)
changed++
}
} else {
changes.added[k as Ref<Doc>] = v
needRetrieve.push(k as Ref<Doc>)
changed++
}
}
if (needRetrieve.length > 0) {
const docs = await connection.loadDocs(c, needRetrieve)
// Chunk data into small pieces
if (addedDocuments > dataBlobSize && _pack !== undefined) {
_pack.finalize()
_pack = undefined
addedDocuments = 0
}
if (_pack === undefined) {
_pack = pack()
stIndex++
const storageFile = join(backupIndex, `${c}-data-${snapshot.date}-${stIndex}.tar.gz`)
console.log('storing from domain', c, storageFile)
domainInfo.storage.push(storageFile)
const dataStream = createWriteStream(join(fileName, storageFile))
const storageZip = createGzip()
_pack.pipe(storageZip)
storageZip.pipe(dataStream)
}
for (const d of docs) {
if (d._class === core.class.BlobData) {
const blob = d as BlobData
const data = Buffer.from(blob.base64Data, 'base64')
blob.base64Data = ''
const descrJson = JSON.stringify(d)
addedDocuments += descrJson.length
addedDocuments += data.length
_pack.entry({ name: d._id + '.json' }, descrJson, function (err) {
if (err != null) throw err
})
_pack.entry({ name: d._id }, data, function (err) {
if (err != null) throw err
})
} else {
const data = JSON.stringify(d)
addedDocuments += data.length
_pack.entry({ name: d._id + '.json' }, data, function (err) {
if (err != null) throw err
})
}
}
}
if (it.finished) {
break
}
}
changes.removed = Array.from(digest.keys())
if (changes.removed.length > 0) {
changed++
}
if (changed > 0) {
snapshot.domains[c] = domainInfo
domainInfo.added = Object.keys(changes.added).length
domainInfo.updated = Object.keys(changes.updated).length
domainInfo.removed = changes.removed.length
await writeFile(join(fileName, domainInfo.snapshot), gzipSync(JSON.stringify(changes)))
_pack?.finalize()
}
}
await writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2)))
} finally {
await connection.close()
}
}

View File

@ -7,9 +7,13 @@ import { generateToken } from '@anticrm/server-token'
// eslint-disable-next-line
const WebSocket = require('ws')
export async function connect (transactorUrl: string, workspace: string): Promise<Client> {
export async function connect (
transactorUrl: string,
workspace: string,
extra?: Record<string, string>
): Promise<Client> {
console.log('connecting to transactor...')
const token = generateToken('anticrm@hc.engineering', workspace)
const token = generateToken('anticrm@hc.engineering', workspace, extra)
// We need to override default factory with 'ws' one.
setMetadata(client.metadata.ClientSocketFactory, (url) => new WebSocket(url))

View File

@ -19,6 +19,7 @@ import core, {
Class,
Doc,
DocumentQuery,
Domain,
DOMAIN_TX,
FindOptions,
FindResult,
@ -30,6 +31,7 @@ import core, {
newMetrics,
Ref,
ServerStorage,
StorageIterator,
Tx,
TxCollectionCUD,
TxCreateDoc,
@ -398,6 +400,17 @@ class MongoReadOnlyAdapter extends TxProcessor implements DbAdapter {
async close (): Promise<void> {
await this.adapter.close()
}
find (domain: Domain): StorageIterator {
return {
next: async () => undefined,
close: async () => {}
}
}
async load (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return []
}
}
class MongoReadOnlyTxAdapter extends MongoReadOnlyAdapter implements TxAdapter {

View File

@ -33,6 +33,7 @@ import toolPlugin, { prepareTools, version } from '@anticrm/server-tool'
import { program } from 'commander'
import { Db, MongoClient } from 'mongodb'
import { exit } from 'process'
import { backupWorkspace } from './backup'
import { rebuildElastic } from './elastic'
import { importXml } from './importer'
import { updateCandidates } from './recruit'
@ -184,6 +185,13 @@ program
return await dumpWorkspace(mongodbUri, workspace, dirName, minio)
})
program
.command('backup-workspace <workspace> <dirName>')
.description('dump workspace transactions and minio resources')
.action(async (workspace, dirName, cmd) => {
return await backupWorkspace(transactorUrl, workspace, dirName)
})
program
.command('restore-workspace <workspace> <dirName>')
.description('restore workspace transactions and minio resources from previous dump.')

View File

@ -23,7 +23,7 @@ export async function listMinioObjects (minio: Client, dbName: string): Promise<
const list = await minio.listObjects(dbName, undefined, true)
await new Promise((resolve) => {
list.on('data', (data) => {
items.push({ ...data, metaData: {} })
items.push({ metaData: {}, ...data })
})
list.on('end', () => {
resolve(null)

View File

@ -26,6 +26,7 @@ import { join } from 'path'
import { rebuildElastic } from './elastic'
import { generateModelDiff, printDiff } from './mdiff'
import { listMinioObjects, MinioWorkspaceItem } from './minio'
interface CollectionInfo {
name: string
file: string

View File

@ -24,6 +24,7 @@ import {
Doc,
Domain,
DOMAIN_MODEL,
DOMAIN_BLOB,
IndexKind,
Interface,
Mixin,
@ -34,7 +35,8 @@ import {
Space,
Timestamp,
Type,
Version
Version,
BlobData
} from '@anticrm/core'
import { Hidden, Index, Model, Prop, TypeIntlString, TypeRef, TypeString, TypeTimestamp, UX } from '@anticrm/model'
import type { IntlString } from '@anticrm/platform'
@ -173,3 +175,12 @@ export class TPluginConfiguration extends TDoc implements PluginConfiguration {
pluginId!: string
transactions!: Ref<Doc>[]
}
@Model(core.class.BlobData, core.class.Doc, DOMAIN_BLOB)
export class TBlobData extends TDoc implements BlobData {
name!: string
file!: string
size!: number
type!: string
base64Data!: string
}

View File

@ -19,6 +19,7 @@ import {
TArrOf,
TAttachedDoc,
TAttribute,
TBlobData,
TClass,
TCollection,
TDoc,
@ -90,6 +91,7 @@ export function createModel (builder: Builder): void {
TTypeNumber,
TTypeIntlString,
TPluginConfiguration,
TUserStatus
TUserStatus,
TBlobData
)
}

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import type { Class, Doc, Ref } from '../classes'
import type { Class, Doc, Domain, Ref } from '../classes'
import { ClientConnection } from '../client'
import core from '../component'
import { Hierarchy } from '../hierarchy'
@ -52,6 +52,16 @@ export async function connect (handler: (tx: Tx) => void): Promise<ClientConnect
return result[0]
// handler(tx) - we have only one client, should not update?
},
close: async () => {}
close: async () => {},
loadChunk: async (domain: Domain, idx?: number) => ({
idx: -1,
index: -1,
docs: {},
finished: true,
digest: ''
}),
closeChunk: async (idx: number) => {},
loadDocs: async (domain: Domain, docs: Ref<Doc>[]) => []
}
}

View File

@ -0,0 +1,23 @@
import { Doc, Domain, Ref } from './classes'
/**
* @public
* Define a st of document + hashcode for chunk
* So backup client could decide to download or not any of documents.
*/
export interface DocChunk {
idx: number
// _id => hash mapping
docs: Record<string, string>
finished: boolean
}
/**
* @public
*/
export interface BackupClient {
loadChunk: (domain: Domain, idx?: number) => Promise<DocChunk>
closeChunk: (idx: number) => Promise<void>
loadDocs: (domain: Domain, docs: Ref<Doc>[]) => Promise<Doc[]>
}

View File

@ -234,6 +234,12 @@ export const DOMAIN_MODEL = 'model' as Domain
*/
export const DOMAIN_TRANSIENT = 'transient' as Domain
/**
* Special domain to access s3 blob data.
* @public
*/
export const DOMAIN_BLOB = 'blob' as Domain
// S P A C E
/**
@ -269,3 +275,14 @@ export interface Version extends Doc {
minor: number
patch: number
}
/**
* Blob data from s3 storage
* @public
*/
export interface BlobData extends Doc {
name: string
size: number
type: string
base64Data: string // base64 encoded data
}

View File

@ -14,7 +14,8 @@
//
import { Plugin } from '@anticrm/platform'
import type { Class, Doc, PluginConfiguration, Ref } from './classes'
import { BackupClient, DocChunk } from './backup'
import type { Class, Doc, Domain, PluginConfiguration, Ref } from './classes'
import { DOMAIN_MODEL } from './classes'
import core from './component'
import { Hierarchy } from './hierarchy'
@ -47,11 +48,11 @@ export interface Client extends Storage {
/**
* @public
*/
export interface ClientConnection extends Storage {
export interface ClientConnection extends Storage, BackupClient {
close: () => Promise<void>
}
class ClientImpl implements Client {
class ClientImpl implements Client, BackupClient {
notify?: (tx: Tx) => void
constructor (
@ -119,6 +120,18 @@ class ClientImpl implements Client {
async close (): Promise<void> {
await this.conn.close()
}
async loadChunk (domain: Domain, idx?: number | undefined): Promise<DocChunk> {
return await this.conn.loadChunk(domain, idx)
}
async closeChunk (idx: number): Promise<void> {
return await this.conn.closeChunk(idx)
}
async loadDocs (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return await this.conn.loadDocs(domain, docs)
}
}
/**

View File

@ -20,6 +20,7 @@ import type {
AnyAttribute,
ArrOf,
AttachedDoc,
BlobData,
Class,
Collection,
Doc,
@ -85,7 +86,8 @@ export default plugin(coreId, {
Bag: '' as Ref<Class<Type<Record<string, PropertyType>>>>,
Version: '' as Ref<Class<Version>>,
PluginConfiguration: '' as Ref<Class<PluginConfiguration>>,
UserStatus: '' as Ref<Class<UserStatus>>
UserStatus: '' as Ref<Class<UserStatus>>,
BlobData: '' as Ref<Class<BlobData>>
},
space: {
Tx: '' as Ref<Space>,

View File

@ -405,6 +405,15 @@ export class Hierarchy {
}
return result
}
domains (): Domain[] {
const classes = Array.from(this.classifiers.values()).filter(
(it) => this.isClass(it) || this._isMixin(it)
) as Class<Doc>[]
return (classes.map((it) => it.domain).filter((it) => it !== undefined) as Domain[]).filter(
(it, idx, array) => array.findIndex((pt) => pt === it) === idx
)
}
}
function addNew<T> (val: Set<T>, value: T): boolean {

View File

@ -30,6 +30,7 @@ export * from './server'
export * from './storage'
export * from './tx'
export * from './utils'
export * from './backup'
addStringsLoader(coreId, async (lang: string) => {
return await import(`../lang/${lang}.json`)

View File

@ -13,8 +13,8 @@
// limitations under the License.
//
import { MeasureContext } from '.'
import type { Doc, Class, Ref } from './classes'
import { MeasureContext } from './measurements'
import type { Doc, Class, Ref, Domain } from './classes'
import { Hierarchy } from './hierarchy'
import { ModelDb } from './memdb'
import type { DocumentQuery, FindOptions, FindResult, TxResult } from './storage'
@ -23,7 +23,31 @@ import type { Tx } from './tx'
/**
* @public
*/
export interface ServerStorage {
export interface DocInfo {
id: string
hash: string
size: number // Aprox size
}
/**
* @public
*/
export interface StorageIterator {
next: () => Promise<DocInfo | undefined>
close: () => Promise<void>
}
/**
* @public
*/
export interface LowLevelStorage {
// Low level streaming API to retrieve information
find: (domain: Domain) => StorageIterator
load: (domain: Domain, docs: Ref<Doc>[]) => Promise<Doc[]>
}
/**
* @public
*/
export interface ServerStorage extends LowLevelStorage {
hierarchy: Hierarchy
modelDb: ModelDb
findAll: <T extends Doc>(

View File

@ -13,11 +13,23 @@
// limitations under the License.
//
import type { Class, Client, Doc, DocumentQuery, FindOptions, FindResult, Ref, Tx, TxResult } from '@anticrm/core'
import type {
BackupClient,
Class,
Client,
Doc,
DocumentQuery,
Domain,
FindOptions,
FindResult,
Ref,
Tx,
TxResult
} from '@anticrm/core'
import core, { DOMAIN_TX, Hierarchy, ModelDb, TxDb } from '@anticrm/core'
import { genMinModel } from './minmodel'
export async function connect (handler: (tx: Tx) => void): Promise<Client> {
export async function connect (handler: (tx: Tx) => void): Promise<Client & BackupClient> {
const txes = genMinModel()
const hierarchy = new Hierarchy()
@ -54,6 +66,15 @@ export async function connect (handler: (tx: Tx) => void): Promise<Client> {
// handler(tx)
return {}
},
close: async () => {}
close: async () => {},
loadChunk: async (domain: Domain, idx?: number) => ({
idx: -1,
index: -1,
docs: {},
finished: true,
digest: ''
}),
closeChunk: async (idx: number) => {},
loadDocs: async (domain: Domain, docs: Ref<Doc>[]) => []
}
}

View File

@ -19,7 +19,9 @@ import type {
Class,
ClientConnection,
Doc,
DocChunk,
DocumentQuery,
Domain,
FindOptions,
FindResult,
Ref,
@ -127,6 +129,18 @@ class Connection implements ClientConnection {
tx (tx: Tx): Promise<TxResult> {
return this.sendRequest('tx', tx)
}
loadChunk (domain: Domain, idx?: number): Promise<DocChunk> {
return this.sendRequest('loadChunk', domain, idx)
}
closeChunk (idx: number): Promise<void> {
return this.sendRequest('closeChunk', idx)
}
loadDocs (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return this.sendRequest('loadDocs', domain, docs)
}
}
/**

View File

@ -14,7 +14,7 @@
// limitations under the License.
//
import type { Resource, Plugin } from '@anticrm/platform'
import type { Plugin, Resource } from '@anticrm/platform'
import { plugin } from '@anticrm/platform'
import type { TriggerFunc } from '@anticrm/server-core'

View File

@ -17,14 +17,17 @@ import {
Class,
Doc,
DocumentQuery,
Domain,
FindOptions,
FindResult,
Hierarchy,
ModelDb,
Ref,
StorageIterator,
Tx,
TxResult
} from '@anticrm/core'
import { Client } from 'minio'
/**
* @public
@ -41,6 +44,10 @@ export interface DbAdapter {
options?: FindOptions<T>
) => Promise<FindResult<T>>
tx: (tx: Tx) => Promise<TxResult>
find: (domain: Domain) => StorageIterator
load: (domain: Domain, docs: Ref<Doc>[]) => Promise<Doc[]>
}
/**
@ -53,7 +60,13 @@ export interface TxAdapter extends DbAdapter {
/**
* @public
*/
export type DbAdapterFactory = (hierarchy: Hierarchy, url: string, db: string, modelDb: ModelDb) => Promise<DbAdapter>
export type DbAdapterFactory = (
hierarchy: Hierarchy,
url: string,
db: string,
modelDb: ModelDb,
storage?: Client
) => Promise<DbAdapter>
/**
* @public
@ -89,6 +102,18 @@ class InMemoryAdapter implements DbAdapter {
}
async close (): Promise<void> {}
find (domain: Domain): StorageIterator {
// Not required for in memory
return {
next: async () => undefined,
close: async () => {}
}
}
async load (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return []
}
}
/**

View File

@ -17,11 +17,13 @@ import {
Class,
Doc,
DocumentQuery,
Domain,
FindOptions,
FindResult,
ModelDb,
Ref,
ServerStorage,
StorageIterator,
Tx,
TxResult
} from '@anticrm/core'
@ -39,7 +41,7 @@ export async function createPipeline (conf: DbConfiguration, constructors: Middl
class TPipeline implements Pipeline {
private readonly head: Middleware | undefined
readonly modelDb: ModelDb
constructor (private readonly storage: ServerStorage, constructors: MiddlewareCreator[]) {
constructor (readonly storage: ServerStorage, constructors: MiddlewareCreator[]) {
this.head = this.buildChain(constructors)
this.modelDb = storage.modelDb
}
@ -73,4 +75,12 @@ class TPipeline implements Pipeline {
async close (): Promise<void> {
await this.storage.close()
}
find (domain: Domain): StorageIterator {
return this.storage.find(domain)
}
async load (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return await this.storage.load(domain, docs)
}
}

View File

@ -32,6 +32,7 @@ import core, {
ModelDb,
Ref,
ServerStorage,
StorageIterator,
Tx,
TxBulkWrite,
TxCollectionCUD,
@ -72,13 +73,13 @@ class TServerStorage implements ServerStorage {
hierarchy: Hierarchy
constructor (
private readonly domains: Record<string, string>,
private readonly _domains: Record<string, string>,
private readonly defaultAdapter: string,
private readonly adapters: Map<string, DbAdapter>,
hierarchy: Hierarchy,
private readonly triggers: Triggers,
private readonly fulltextAdapter: FullTextAdapter,
private readonly storageAdapter: MinioClient | undefined,
readonly storageAdapter: MinioClient | undefined,
readonly modelDb: ModelDb,
private readonly workspace: string,
options?: ServerStorageOptions
@ -95,7 +96,7 @@ class TServerStorage implements ServerStorage {
}
private getAdapter (domain: Domain): DbAdapter {
const name = this.domains[domain] ?? this.defaultAdapter
const name = this._domains[domain] ?? this.defaultAdapter
const adapter = this.adapters.get(name)
if (adapter === undefined) {
throw new Error('adapter not provided: ' + name)
@ -404,6 +405,14 @@ class TServerStorage implements ServerStorage {
return [result, derived]
})
}
find (domain: Domain): StorageIterator {
return this.getAdapter(domain).find(domain)
}
async load (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return await this.getAdapter(domain).load(domain, docs)
}
}
type Effect = () => Promise<void>
@ -447,10 +456,11 @@ export async function createServerStorage (
const triggers = new Triggers()
const adapters = new Map<string, DbAdapter>()
const modelDb = new ModelDb(hierarchy)
const storageAdapter = conf.storageFactory?.()
for (const key in conf.adapters) {
const adapterConf = conf.adapters[key]
adapters.set(key, await adapterConf.factory(hierarchy, adapterConf.url, conf.workspace, modelDb))
adapters.set(key, await adapterConf.factory(hierarchy, adapterConf.url, conf.workspace, modelDb, storageAdapter))
}
const txAdapter = adapters.get(conf.domains[DOMAIN_TX]) as TxAdapter
@ -482,7 +492,6 @@ export async function createServerStorage (
}
const fulltextAdapter = await conf.fulltextAdapter.factory(conf.fulltextAdapter.url, conf.workspace)
const storageAdapter = conf.storageFactory?.()
return new TServerStorage(
conf.domains,

View File

@ -20,6 +20,7 @@ import type {
DocumentQuery,
FindOptions,
FindResult,
LowLevelStorage,
MeasureContext,
ModelDb,
Obj,
@ -78,8 +79,9 @@ export type FindAllMiddlewareResult<T extends Doc> = [
/**
* @public
*/
export interface Pipeline {
export interface Pipeline extends LowLevelStorage {
modelDb: ModelDb
storage: ServerStorage
findAll: <T extends Doc>(
ctx: SessionContext,
_class: Ref<Class<T>>,

View File

@ -19,7 +19,9 @@ import core, {
ClientConnection,
createClient,
Doc,
DocChunk,
DocumentQuery,
Domain,
DOMAIN_MODEL,
DOMAIN_TX,
FindOptions,
@ -31,10 +33,11 @@ import core, {
Ref,
SortingOrder,
Space,
StorageIterator,
toFindResult,
Tx,
TxOperations,
TxResult,
toFindResult
TxResult
} from '@anticrm/core'
import { createServerStorage, DbAdapter, DbConfiguration, FullTextAdapter, IndexedDoc } from '@anticrm/server-core'
import { MongoClient } from 'mongodb'
@ -62,6 +65,17 @@ class NullDbAdapter implements DbAdapter {
}
async close (): Promise<void> {}
find (domain: Domain): StorageIterator {
return {
next: async () => undefined,
close: async () => {}
}
}
async load (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return []
}
}
async function createNullAdapter (hierarchy: Hierarchy, url: string, db: string, modelDb: ModelDb): Promise<DbAdapter> {
@ -168,7 +182,10 @@ describe('mongo operations', () => {
const st: ClientConnection = {
findAll: async (_class, query, options) => await serverStorage.findAll(ctx, _class, query, options),
tx: async (tx) => (await serverStorage.tx(ctx, tx))[0],
close: async () => {}
close: async () => {},
loadChunk: async (domain): Promise<DocChunk> => await Promise.reject(new Error('unsupported')),
closeChunk: async (idx) => {},
loadDocs: async (domain: Domain, docs: Ref<Doc>[]) => []
}
return st
})

View File

@ -17,6 +17,7 @@ import core, {
Class,
Doc,
DocumentQuery,
Domain,
DOMAIN_MODEL,
DOMAIN_TX,
escapeLikeForRegexp,
@ -31,6 +32,7 @@ import core, {
ReverseLookups,
SortingOrder,
SortingQuery,
StorageIterator,
toFindResult,
Tx,
TxCreateDoc,
@ -46,6 +48,8 @@ import type { DbAdapter, TxAdapter } from '@anticrm/server-core'
import { Collection, Db, Document, Filter, MongoClient, Sort } from 'mongodb'
import { getMongoClient } from './utils'
import { createHash } from 'node:crypto'
function translateDoc (doc: Doc): Document {
return doc as Document
}
@ -420,6 +424,39 @@ abstract class MongoAdapterBase extends TxProcessor {
const res = await cursor.toArray()
return toFindResult(res, total)
}
find (domain: Domain): StorageIterator {
const coll = this.db.collection<Doc>(domain)
const iterator = coll.find({}, {})
return {
next: async () => {
const d = await iterator.next()
if (d === null) {
return undefined
}
const doc = JSON.stringify(d)
const hash = createHash('sha256')
hash.update(doc)
const digest = hash.digest('base64')
return {
id: d._id,
hash: digest,
size: doc.length // Some approx size for document.
}
},
close: async () => {
await iterator.close()
}
}
}
async load (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return await this.db
.collection(domain)
.find<Doc>({ _id: { $in: docs } })
.toArray()
}
}
class MongoAdapter extends MongoAdapterBase {

View File

@ -0,0 +1,84 @@
import { Doc, DocChunk, Domain, MeasureContext, Ref, StorageIterator } from '@anticrm/core'
import { Pipeline } from '@anticrm/server-core'
import { Token } from '@anticrm/server-token'
import { BroadcastCall, ClientSession, Session } from '@anticrm/server-ws'
const chunkSize = 1024 * 1024
interface ChunkInfo {
idx: number
index: 0
finished: boolean
iterator: StorageIterator
}
/**
* @public
*/
export interface BackupSession extends Session {
loadChunk: (ctx: MeasureContext, domain: Domain, idx?: number) => Promise<DocChunk>
closeChunk: (ctx: MeasureContext, idx: number) => Promise<void>
loadDocs: (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]) => Promise<Doc[]>
}
export class BackupClientSession extends ClientSession implements BackupSession {
constructor (
protected readonly broadcast: BroadcastCall,
protected readonly token: Token,
protected readonly _pipeline: Pipeline
) {
super(broadcast, token, _pipeline)
}
idIndex = 0
chunkInfo = new Map<number, ChunkInfo>()
async loadChunk (ctx: MeasureContext, domain: Domain, idx?: number): Promise<DocChunk> {
idx = idx ?? this.idIndex++
let chunk: ChunkInfo | undefined = this.chunkInfo.get(idx)
if (chunk !== undefined) {
chunk.index++
if (chunk.finished === undefined) {
return {
idx,
docs: {},
finished: true
}
}
} else {
chunk = { idx, iterator: this._pipeline.storage.find(domain), finished: false, index: 0 }
this.chunkInfo.set(idx, chunk)
}
let size = 0
const docs: Record<string, string> = {}
while (size < chunkSize) {
const doc = await chunk.iterator.next()
if (doc === undefined) {
chunk.finished = true
break
}
size = size + doc.id.length + doc.hash.length + doc.size
docs[doc.id] = doc.hash
}
return {
idx,
docs,
finished: chunk.finished
}
}
async closeChunk (ctx: MeasureContext, idx: number): Promise<void> {
const chunk = this.chunkInfo.get(idx)
this.chunkInfo.delete(idx)
if (chunk != null) {
await chunk.iterator.close()
}
}
async loadDocs (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return await this._pipeline.storage.load(domain, docs)
}
}

150
server/server/src/minio.ts Normal file
View File

@ -0,0 +1,150 @@
//
// Copyright © 2022 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import core, {
Class,
Doc,
DocumentQuery,
Domain,
FindOptions,
FindResult,
Hierarchy,
ModelDb,
Ref,
StorageIterator,
Tx,
TxResult,
BlobData,
Space
} from '@anticrm/core'
import { DbAdapter } from '@anticrm/server-core'
import { BucketItem, Client, ItemBucketMetadata } from 'minio'
class MinioBlobAdapter implements DbAdapter {
constructor (readonly db: string, readonly client: Client) {}
async findAll<T extends Doc>(
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
return Object.assign([], { total: 0 })
}
async tx (tx: Tx): Promise<TxResult> {
return {}
}
async init (model: Tx[]): Promise<void> {}
async close (): Promise<void> {}
async listMinioObjects (): Promise<Map<string, BucketItem & { metaData: ItemBucketMetadata }>> {
const items = new Map<string, BucketItem & { metaData: ItemBucketMetadata }>()
const list = await this.client.listObjects(this.db, undefined, true)
await new Promise((resolve) => {
list.on('data', (data) => {
items.set(data.name, { metaData: {}, ...data })
})
list.on('end', () => {
resolve(null)
})
})
return items
}
async readMinioData (name: string): Promise<Buffer[]> {
const data = await this.client.getObject(this.db, name)
const chunks: Buffer[] = []
await new Promise((resolve) => {
data.on('readable', () => {
let chunk
while ((chunk = data.read()) !== null) {
const b = chunk as Buffer
chunks.push(b)
}
})
data.on('end', () => {
resolve(null)
})
})
return chunks
}
find (domain: Domain): StorageIterator {
let listRecieved = false
let items: (BucketItem & { metaData: ItemBucketMetadata })[]
let pos = 0
return {
next: async () => {
if (!listRecieved) {
items = Array.from((await this.listMinioObjects()).values())
listRecieved = true
}
if (pos < items?.length) {
const item = items[pos]
const result = {
id: item.name,
hash: item.etag,
size: item.size
}
pos++
return result
}
},
close: async () => {}
}
}
async load (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
const result: Doc[] = []
for (const item of docs) {
const stat = await this.client.statObject(this.db, item)
const chunks: Buffer[] = await this.readMinioData(item)
const final = Buffer.concat(chunks)
const dta: BlobData = {
_id: item as Ref<BlobData>,
_class: core.class.BlobData,
name: item as string,
size: stat.size,
type: stat.metaData['content-type'],
space: 'blob' as Ref<Space>,
modifiedOn: stat.lastModified.getTime(),
modifiedBy: core.account.System,
base64Data: final.toString('base64')
}
result.push(dta)
}
return result
}
}
/**
* @public
*/
export async function createMinioDataAdapter (
hierarchy: Hierarchy,
url: string,
db: string,
modelDb: ModelDb,
storage?: Client
): Promise<DbAdapter> {
if (storage === undefined) {
throw new Error('minio storage adapter require minio')
}
return new MinioBlobAdapter(db, storage)
}

View File

@ -17,6 +17,8 @@ import {
Class,
Doc,
DocumentQuery,
Domain,
DOMAIN_BLOB,
DOMAIN_MODEL,
DOMAIN_TRANSIENT,
DOMAIN_TX,
@ -25,12 +27,13 @@ import {
Hierarchy,
ModelDb,
Ref,
StorageIterator,
toFindResult,
Tx,
TxResult
} from '@anticrm/core'
import { createElasticAdapter } from '@anticrm/elastic'
import { PrivateMiddleware, ModifiedMiddleware } from '@anticrm/middleware'
import { ModifiedMiddleware, PrivateMiddleware } from '@anticrm/middleware'
import { createMongoAdapter, createMongoTxAdapter } from '@anticrm/mongo'
import { addLocation } from '@anticrm/platform'
import { serverAttachmentId } from '@anticrm/server-attachment'
@ -43,7 +46,8 @@ import {
createPipeline,
DbAdapter,
DbConfiguration,
MiddlewareCreator
MiddlewareCreator,
Pipeline
} from '@anticrm/server-core'
import { serverGmailId } from '@anticrm/server-gmail'
import { serverInventoryId } from '@anticrm/server-inventory'
@ -54,9 +58,12 @@ import { serverSettingId } from '@anticrm/server-setting'
import { serverTagsId } from '@anticrm/server-tags'
import { serverTaskId } from '@anticrm/server-task'
import { serverTelegramId } from '@anticrm/server-telegram'
import { start as startJsonRpc } from '@anticrm/server-ws'
import { Token } from '@anticrm/server-token'
import { BroadcastCall, ClientSession, start as startJsonRpc } from '@anticrm/server-ws'
import { Client as MinioClient } from 'minio'
import { BackupClientSession } from './backup'
import { metricsContext } from './metrics'
import { createMinioDataAdapter } from './minio'
class NullDbAdapter implements DbAdapter {
async init (model: Tx[]): Promise<void> {}
@ -73,6 +80,17 @@ class NullDbAdapter implements DbAdapter {
}
async close (): Promise<void> {}
find (domain: Domain): StorageIterator {
return {
next: async () => undefined,
close: async () => {}
}
}
async load (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return []
}
}
async function createNullAdapter (hierarchy: Hierarchy, url: string, db: string, modelDb: ModelDb): Promise<DbAdapter> {
@ -122,6 +140,7 @@ export function start (
domains: {
[DOMAIN_TX]: 'MongoTx',
[DOMAIN_TRANSIENT]: 'InMemory',
[DOMAIN_BLOB]: 'MinioData',
[DOMAIN_MODEL]: 'Null'
},
defaultAdapter: 'Mongo',
@ -141,6 +160,10 @@ export function start (
InMemory: {
factory: createInMemoryAdapter,
url: ''
},
MinioData: {
factory: createMinioDataAdapter,
url: ''
}
},
fulltextAdapter: {
@ -157,6 +180,12 @@ export function start (
}
return createPipeline(conf, middlewares)
},
(token: Token, pipeline: Pipeline, broadcast: BroadcastCall) => {
if (token.extra?.mode === 'backup') {
return new BackupClientSession(broadcast, token, pipeline)
}
return new ClientSession(broadcast, token, pipeline)
},
port,
host
)

View File

@ -32,7 +32,7 @@ export async function connect (
const token = generateToken(
email ?? 'anticrm@hc.engineering',
workspace,
reloadModel ? { model: 'reload' } : undefined
reloadModel ? { model: 'upgrade' } : undefined
)
// We need to override default factory with 'ws' one.

View File

@ -31,10 +31,13 @@ import {
ModelDb,
MeasureMetricsContext,
toFindResult,
Hierarchy
Hierarchy,
ServerStorage,
Domain
} from '@anticrm/core'
import { SessionContext } from '@anticrm/server-core'
import { genMinModel } from './minmodel'
import { ClientSession } from '../client'
describe('server', () => {
disableLogging()
@ -63,8 +66,16 @@ describe('server', () => {
options?: FindOptions<T>
): Promise<FindResult<T>> => toFindResult([]),
tx: async (ctx: SessionContext, tx: Tx): Promise<[TxResult, Tx[], string | undefined]> => [{}, [], undefined],
close: async () => {}
close: async () => {},
storage: {} as unknown as ServerStorage,
domains: async () => [],
find: (domain: Domain) => ({
next: async () => undefined,
close: async () => {}
}),
load: async (domain: Domain, docs: Ref<Doc>[]) => []
}),
(token, pipeline, broadcast) => new ClientSession(broadcast, token, pipeline),
3333
)

66
server/ws/src/client.ts Normal file
View File

@ -0,0 +1,66 @@
//
// Copyright © 2022 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { Class, Doc, DocumentQuery, FindOptions, FindResult, MeasureContext, Ref, Tx, TxResult } from '@anticrm/core'
import type { Pipeline, SessionContext } from '@anticrm/server-core'
import { Token } from '@anticrm/server-token'
import { BroadcastCall, Session } from './types'
/**
* @public
*/
export class ClientSession implements Session {
constructor (
protected readonly broadcast: BroadcastCall,
protected readonly token: Token,
protected readonly _pipeline: Pipeline
) {}
getUser (): string {
return this.token.email
}
pipeline (): Pipeline {
return this._pipeline
}
async ping (): Promise<string> {
console.log('ping')
return 'pong!'
}
async findAll<T extends Doc>(
ctx: MeasureContext,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
const context = ctx as SessionContext
context.userEmail = this.token.email
return await this._pipeline.findAll(context, _class, query, options)
}
async tx (ctx: MeasureContext, tx: Tx): Promise<TxResult> {
const context = ctx as SessionContext
context.userEmail = this.token.email
const [result, derived, target] = await this._pipeline.tx(context, tx)
this.broadcast(this, this.token.workspace, { result: tx }, target)
for (const dtx of derived) {
this.broadcast(null, this.token.workspace, { result: dtx }, target)
}
return result
}
}

View File

@ -15,3 +15,5 @@
//
export { start } from './server'
export * from './types'
export * from './client'

View File

@ -13,25 +13,13 @@
// limitations under the License.
//
import core, {
Class,
Doc,
DocumentQuery,
FindOptions,
FindResult,
MeasureContext,
ModelDb,
Ref,
Space,
Tx,
TxFactory,
TxResult
} from '@anticrm/core'
import core, { MeasureContext, Ref, Space, TxFactory } from '@anticrm/core'
import { readRequest, Response, serialize, unknownError } from '@anticrm/platform'
import type { Pipeline, SessionContext } from '@anticrm/server-core'
import type { Pipeline } from '@anticrm/server-core'
import { decodeToken, Token } from '@anticrm/server-token'
import { createServer, IncomingMessage } from 'http'
import WebSocket, { Server } from 'ws'
import { BroadcastCall, Session } from './types'
let LOGGING_ENABLED = true
@ -39,58 +27,21 @@ export function disableLogging (): void {
LOGGING_ENABLED = false
}
class Session {
readonly modelDb: ModelDb
constructor (
private readonly manager: SessionManager,
private readonly token: Token,
private readonly pipeline: Pipeline
) {
this.modelDb = pipeline.modelDb
}
getUser (): string {
return this.token.email
}
async ping (): Promise<string> {
console.log('ping')
return 'pong!'
}
async findAll<T extends Doc>(
ctx: MeasureContext,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
const context = ctx as SessionContext
context.userEmail = this.token.email
return await this.pipeline.findAll(context, _class, query, options)
}
async tx (ctx: MeasureContext, tx: Tx): Promise<TxResult> {
const context = ctx as SessionContext
context.userEmail = this.token.email
const [result, derived, target] = await this.pipeline.tx(context, tx)
this.manager.broadcast(this, this.token.workspace, { result: tx }, target)
for (const dtx of derived) {
this.manager.broadcast(null, this.token.workspace, { result: dtx }, target)
}
return result
}
}
interface Workspace {
pipeline: Pipeline
sessions: [Session, WebSocket][]
upgrade: boolean
}
class SessionManager {
private readonly workspaces = new Map<string, Workspace>()
constructor (readonly sessionFactory: (token: Token, pipeline: Pipeline, broadcast: BroadcastCall) => Session) {}
createSession (token: Token, pipeline: Pipeline): Session {
return this.sessionFactory(token, pipeline, this.broadcast.bind(this))
}
async addSession (
ctx: MeasureContext,
ws: WebSocket,
@ -101,7 +52,7 @@ class SessionManager {
if (workspace === undefined) {
return await this.createWorkspace(ctx, pipelineFactory, token, ws)
} else {
if (token.extra?.model === 'reload') {
if (token.extra?.model === 'upgrade') {
console.log('reloading workspace', JSON.stringify(token))
// If upgrade client is used.
// Drop all existing clients
@ -113,7 +64,11 @@ class SessionManager {
return await this.createWorkspace(ctx, pipelineFactory, token, ws)
}
const session = new Session(this, token, workspace.pipeline)
if (workspace.upgrade) {
throw new Error('Upgrade in progress....')
}
const session = this.createSession(token, workspace.pipeline)
workspace.sessions.push([session, ws])
await this.setStatus(ctx, session, true)
return session
@ -123,7 +78,7 @@ class SessionManager {
private async setStatus (ctx: MeasureContext, session: Session, online: boolean): Promise<void> {
try {
const user = (
await session.modelDb.findAll(
await session.pipeline().modelDb.findAll(
core.class.Account,
{
email: session.getUser()
@ -157,10 +112,11 @@ class SessionManager {
ws: WebSocket
): Promise<Session> {
const pipeline = await pipelineFactory(token.workspace)
const session = new Session(this, token, pipeline)
const session = this.createSession(token, pipeline)
const workspace: Workspace = {
pipeline,
sessions: [[session, ws]]
sessions: [[session, ws]],
upgrade: token.extra?.model === 'upgrade'
}
this.workspaces.set(token.workspace, workspace)
await this.setStatus(ctx, session, true)
@ -242,14 +198,32 @@ async function handleRequest<S extends Session> (
export function start (
ctx: MeasureContext,
pipelineFactory: (workspace: string) => Promise<Pipeline>,
sessionFactory: (token: Token, pipeline: Pipeline, broadcast: BroadcastCall) => Session,
port: number,
host?: string
): () => void {
console.log(`starting server on port ${port} ...`)
const sessions = new SessionManager()
const sessions = new SessionManager(sessionFactory)
const wss = new Server({ noServer: true })
const wss = new Server({
noServer: true,
perMessageDeflate: {
zlibDeflateOptions: {
// See zlib defaults.
chunkSize: 1024,
memLevel: 7,
level: 3
},
zlibInflateOptions: {
chunkSize: 10 * 1024
},
// Below options specified as default values.
concurrencyLimit: 10, // Limits zlib concurrency for perf.
threshold: 1024 // Size (in bytes) below which messages
// should not be compressed if context takeover is disabled.
}
})
// eslint-disable-next-line @typescript-eslint/no-misused-promises
wss.on('connection', async (ws: WebSocket, request: any, token: Token) => {
const buffer: string[] = []

24
server/ws/src/types.ts Normal file
View File

@ -0,0 +1,24 @@
import { Class, Doc, DocumentQuery, FindOptions, FindResult, MeasureContext, Ref, Tx, TxResult } from '@anticrm/core'
import { Response } from '@anticrm/platform'
import { Pipeline } from '@anticrm/server-core'
/**
* @public
*/
export interface Session {
getUser: () => string
pipeline: () => Pipeline
ping: () => Promise<string>
findAll: <T extends Doc>(
ctx: MeasureContext,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
) => Promise<FindResult<T>>
tx: (ctx: MeasureContext, tx: Tx) => Promise<TxResult>
}
/**
* @public
*/
export type BroadcastCall = (from: Session | null, workspaceId: string, resp: Response<any>, target?: string) => void