mirror of
https://github.com/hcengineering/platform.git
synced 2025-04-15 04:49:00 +00:00
QFIX: Fix Double model (#7617)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
5924c4aef5
commit
65e037cc7e
@ -14,10 +14,10 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
//
|
//
|
||||||
import { IntlString, Plugin } from '@hcengineering/platform'
|
import { IntlString, Plugin } from '@hcengineering/platform'
|
||||||
|
import { ClientConnectEvent, DocChunk } from '..'
|
||||||
import type { Account, Class, Data, Doc, Domain, PluginConfiguration, Ref, Timestamp } from '../classes'
|
import type { Account, Class, Data, Doc, Domain, PluginConfiguration, Ref, Timestamp } from '../classes'
|
||||||
import { ClassifierKind, DOMAIN_MODEL, Space } from '../classes'
|
import { ClassifierKind, DOMAIN_MODEL, Space } from '../classes'
|
||||||
import { ClientConnection, createClient } from '../client'
|
import { ClientConnection, createClient } from '../client'
|
||||||
import { clone } from '../clone'
|
|
||||||
import core from '../component'
|
import core from '../component'
|
||||||
import { Hierarchy } from '../hierarchy'
|
import { Hierarchy } from '../hierarchy'
|
||||||
import { ModelDb, TxDb } from '../memdb'
|
import { ModelDb, TxDb } from '../memdb'
|
||||||
@ -103,38 +103,62 @@ describe('client', () => {
|
|||||||
return await transactions.findAll(_class, query)
|
return await transactions.findAll(_class, query)
|
||||||
}
|
}
|
||||||
|
|
||||||
return {
|
return new (class implements ClientConnection {
|
||||||
isConnected: () => true,
|
handler?: (event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>
|
||||||
findAll,
|
|
||||||
|
|
||||||
searchFulltext: async (query: SearchQuery, options: SearchOptions): Promise<SearchResult> => {
|
set onConnect (
|
||||||
|
handler: ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>) | undefined
|
||||||
|
) {
|
||||||
|
this.handler = handler
|
||||||
|
void this.handler?.(ClientConnectEvent.Connected, '', {})
|
||||||
|
}
|
||||||
|
|
||||||
|
get onConnect ():
|
||||||
|
| ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>)
|
||||||
|
| undefined {
|
||||||
|
return this.handler
|
||||||
|
}
|
||||||
|
|
||||||
|
isConnected = (): boolean => true
|
||||||
|
findAll = findAll
|
||||||
|
|
||||||
|
searchFulltext = async (query: SearchQuery, options: SearchOptions): Promise<SearchResult> => {
|
||||||
return { docs: [] }
|
return { docs: [] }
|
||||||
},
|
}
|
||||||
|
|
||||||
tx: async (tx: Tx): Promise<TxResult> => {
|
tx = async (tx: Tx): Promise<TxResult> => {
|
||||||
if (tx.objectSpace === core.space.Model) {
|
if (tx.objectSpace === core.space.Model) {
|
||||||
hierarchy.tx(tx)
|
hierarchy.tx(tx)
|
||||||
}
|
}
|
||||||
const result = await Promise.all([transactions.tx(tx)])
|
const result = await Promise.all([transactions.tx(tx)])
|
||||||
return result[0]
|
return result[0]
|
||||||
},
|
}
|
||||||
close: async () => {},
|
|
||||||
|
|
||||||
loadChunk: async (domain: Domain, idx?: number) => ({
|
close = async (): Promise<void> => {}
|
||||||
|
|
||||||
|
loadChunk = async (domain: Domain, idx?: number): Promise<DocChunk> => ({
|
||||||
idx: -1,
|
idx: -1,
|
||||||
index: -1,
|
|
||||||
docs: [],
|
docs: [],
|
||||||
finished: true,
|
finished: true
|
||||||
digest: ''
|
})
|
||||||
}),
|
|
||||||
closeChunk: async (idx: number) => {},
|
async closeChunk (idx: number): Promise<void> {}
|
||||||
loadDocs: async (domain: Domain, docs: Ref<Doc>[]) => [],
|
async loadDocs (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
|
||||||
upload: async (domain: Domain, docs: Doc[]) => {},
|
return []
|
||||||
clean: async (domain: Domain, docs: Ref<Doc>[]) => {},
|
}
|
||||||
loadModel: async (last: Timestamp) => clone(txes),
|
|
||||||
getAccount: async () => null as unknown as Account,
|
async upload (domain: Domain, docs: Doc[]): Promise<void> {}
|
||||||
sendForceClose: async () => {}
|
async clean (domain: Domain, docs: Ref<Doc>[]): Promise<void> {}
|
||||||
}
|
async loadModel (last: Timestamp): Promise<Tx[]> {
|
||||||
|
return txes
|
||||||
|
}
|
||||||
|
|
||||||
|
async getAccount (): Promise<Account> {
|
||||||
|
return null as unknown as Account
|
||||||
|
}
|
||||||
|
|
||||||
|
async sendForceClose (): Promise<void> {}
|
||||||
|
})()
|
||||||
}
|
}
|
||||||
const spyCreate = jest.spyOn(TxProcessor, 'createDoc2Doc')
|
const spyCreate = jest.spyOn(TxProcessor, 'createDoc2Doc')
|
||||||
const spyUpdate = jest.spyOn(TxProcessor, 'updateDoc2Doc')
|
const spyUpdate = jest.spyOn(TxProcessor, 'updateDoc2Doc')
|
||||||
|
@ -13,12 +13,13 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
//
|
//
|
||||||
|
|
||||||
|
import { ClientConnectEvent, DocChunk } from '..'
|
||||||
import type { Account, Class, Doc, Domain, Ref, Timestamp } from '../classes'
|
import type { Account, Class, Doc, Domain, Ref, Timestamp } from '../classes'
|
||||||
import { ClientConnection } from '../client'
|
import { ClientConnection } from '../client'
|
||||||
import core from '../component'
|
import core from '../component'
|
||||||
import { Hierarchy } from '../hierarchy'
|
import { Hierarchy } from '../hierarchy'
|
||||||
import { ModelDb, TxDb } from '../memdb'
|
import { ModelDb, TxDb } from '../memdb'
|
||||||
import type { DocumentQuery, FindResult, TxResult, SearchQuery, SearchOptions, SearchResult } from '../storage'
|
import type { DocumentQuery, FindResult, SearchOptions, SearchQuery, SearchResult, TxResult } from '../storage'
|
||||||
import type { Tx } from '../tx'
|
import type { Tx } from '../tx'
|
||||||
import { DOMAIN_TX } from '../tx'
|
import { DOMAIN_TX } from '../tx'
|
||||||
import { genMinModel } from './minmodel'
|
import { genMinModel } from './minmodel'
|
||||||
@ -42,37 +43,62 @@ export async function connect (handler: (tx: Tx) => void): Promise<ClientConnect
|
|||||||
return await model.findAll(_class, query)
|
return await model.findAll(_class, query)
|
||||||
}
|
}
|
||||||
|
|
||||||
return {
|
class ClientConnectionImpl implements ClientConnection {
|
||||||
isConnected: () => true,
|
isConnected = (): boolean => true
|
||||||
findAll,
|
findAll = findAll
|
||||||
|
|
||||||
searchFulltext: async (query: SearchQuery, options: SearchOptions): Promise<SearchResult> => {
|
handler?: (event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>
|
||||||
|
|
||||||
|
set onConnect (
|
||||||
|
handler: ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>) | undefined
|
||||||
|
) {
|
||||||
|
this.handler = handler
|
||||||
|
void this.handler?.(ClientConnectEvent.Connected, '', {})
|
||||||
|
}
|
||||||
|
|
||||||
|
get onConnect (): ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>) | undefined {
|
||||||
|
return this.handler
|
||||||
|
}
|
||||||
|
|
||||||
|
async searchFulltext (query: SearchQuery, options: SearchOptions): Promise<SearchResult> {
|
||||||
return { docs: [] }
|
return { docs: [] }
|
||||||
},
|
}
|
||||||
|
|
||||||
tx: async (tx: Tx): Promise<TxResult> => {
|
async tx (tx: Tx): Promise<TxResult> {
|
||||||
if (tx.objectSpace === core.space.Model) {
|
if (tx.objectSpace === core.space.Model) {
|
||||||
hierarchy.tx(tx)
|
hierarchy.tx(tx)
|
||||||
}
|
}
|
||||||
const result = await Promise.all([model.tx(tx), transactions.tx(tx)])
|
const result = await Promise.all([model.tx(tx), transactions.tx(tx)])
|
||||||
return result[0]
|
return result[0]
|
||||||
// handler(tx) - we have only one client, should not update?
|
}
|
||||||
},
|
|
||||||
close: async () => {},
|
|
||||||
|
|
||||||
loadChunk: async (domain: Domain, idx?: number) => ({
|
async close (): Promise<void> {}
|
||||||
idx: -1,
|
|
||||||
index: -1,
|
async loadChunk (domain: Domain, idx?: number): Promise<DocChunk> {
|
||||||
docs: [],
|
return {
|
||||||
finished: true,
|
idx: -1,
|
||||||
digest: ''
|
docs: [],
|
||||||
}),
|
finished: true
|
||||||
closeChunk: async (idx: number) => {},
|
}
|
||||||
loadDocs: async (domain: Domain, docs: Ref<Doc>[]) => [],
|
}
|
||||||
upload: async (domain: Domain, docs: Doc[]) => {},
|
|
||||||
clean: async (domain: Domain, docs: Ref<Doc>[]) => {},
|
async closeChunk (idx: number): Promise<void> {}
|
||||||
loadModel: async (last: Timestamp) => txes,
|
async loadDocs (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
|
||||||
getAccount: async () => null as unknown as Account,
|
return []
|
||||||
sendForceClose: async () => {}
|
}
|
||||||
|
|
||||||
|
async upload (domain: Domain, docs: Doc[]): Promise<void> {}
|
||||||
|
async clean (domain: Domain, docs: Ref<Doc>[]): Promise<void> {}
|
||||||
|
async loadModel (last: Timestamp): Promise<Tx[]> {
|
||||||
|
return txes
|
||||||
|
}
|
||||||
|
|
||||||
|
async getAccount (): Promise<Account> {
|
||||||
|
return null as unknown as Account
|
||||||
|
}
|
||||||
|
|
||||||
|
async sendForceClose (): Promise<void> {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return new ClientConnectionImpl()
|
||||||
}
|
}
|
||||||
|
@ -254,12 +254,9 @@ export async function createClient (
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
let initialized = false
|
||||||
const conn = await ctx.with('connect', {}, () => connect(txHandler))
|
const conn = await ctx.with('connect', {}, () => connect(txHandler))
|
||||||
|
|
||||||
await ctx.with('load-model', { reload: false }, (ctx) =>
|
|
||||||
loadModel(ctx, conn, modelFilter, hierarchy, model, false, txPersistence)
|
|
||||||
)
|
|
||||||
|
|
||||||
txBuffer = txBuffer.filter((tx) => tx.space !== core.space.Model)
|
txBuffer = txBuffer.filter((tx) => tx.space !== core.space.Model)
|
||||||
|
|
||||||
client = new ClientImpl(conn)
|
client = new ClientImpl(conn)
|
||||||
@ -271,102 +268,74 @@ export async function createClient (
|
|||||||
const oldOnConnect:
|
const oldOnConnect:
|
||||||
| ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>)
|
| ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>)
|
||||||
| undefined = conn.onConnect
|
| undefined = conn.onConnect
|
||||||
conn.onConnect = async (event, _lastTx, data) => {
|
|
||||||
console.log('Client: onConnect', event)
|
await new Promise<void>((resolve) => {
|
||||||
if (event === ClientConnectEvent.Maintenance) {
|
conn.onConnect = async (event, _lastTx, data) => {
|
||||||
|
console.log('Client: onConnect', event)
|
||||||
|
if (event === ClientConnectEvent.Maintenance) {
|
||||||
|
lastTx = _lastTx
|
||||||
|
await oldOnConnect?.(ClientConnectEvent.Maintenance, _lastTx, data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Find all new transactions and apply
|
||||||
|
const { mode, current, addition } = await ctx.with('load-model', {}, (ctx) => loadModel(ctx, conn, txPersistence))
|
||||||
|
if (!initialized) {
|
||||||
|
switch (mode) {
|
||||||
|
case 'same':
|
||||||
|
case 'upgrade':
|
||||||
|
await ctx.with('build-model', {}, (ctx) => buildModel(ctx, current, modelFilter, hierarchy, model))
|
||||||
|
break
|
||||||
|
case 'addition':
|
||||||
|
await ctx.with('build-model', {}, (ctx) =>
|
||||||
|
buildModel(ctx, current.concat(addition), modelFilter, hierarchy, model)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
initialized = true
|
||||||
|
} else {
|
||||||
|
switch (mode) {
|
||||||
|
case 'upgrade':
|
||||||
|
// We have upgrade procedure and need rebuild all stuff.
|
||||||
|
hierarchy = new Hierarchy()
|
||||||
|
model = new ModelDb(hierarchy)
|
||||||
|
;(client as ClientImpl).setModel(hierarchy, model)
|
||||||
|
|
||||||
|
await ctx.with('build-model', {}, (ctx) => buildModel(ctx, current, modelFilter, hierarchy, model))
|
||||||
|
await oldOnConnect?.(ClientConnectEvent.Upgraded, _lastTx, data)
|
||||||
|
// No need to fetch more stuff since upgrade was happened.
|
||||||
|
break
|
||||||
|
case 'addition':
|
||||||
|
await ctx.with('build-model', {}, (ctx) =>
|
||||||
|
buildModel(ctx, current.concat(addition), modelFilter, hierarchy, model)
|
||||||
|
)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resolve()
|
||||||
|
|
||||||
|
if (lastTx === undefined) {
|
||||||
|
// No need to do anything here since we connected.
|
||||||
|
await oldOnConnect?.(event, _lastTx, data)
|
||||||
|
lastTx = _lastTx
|
||||||
|
resolve()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if (lastTx === _lastTx) {
|
||||||
|
// Same lastTx, no need to refresh
|
||||||
|
await oldOnConnect?.(ClientConnectEvent.Reconnected, _lastTx, data)
|
||||||
|
resolve()
|
||||||
|
return
|
||||||
|
}
|
||||||
lastTx = _lastTx
|
lastTx = _lastTx
|
||||||
await oldOnConnect?.(ClientConnectEvent.Maintenance, _lastTx, data)
|
// We need to trigger full refresh on queries, etc.
|
||||||
return
|
await oldOnConnect?.(ClientConnectEvent.Refresh, lastTx, data)
|
||||||
|
resolve()
|
||||||
}
|
}
|
||||||
// Find all new transactions and apply
|
})
|
||||||
const loadModelResponse = await ctx.with('connect', { reload: true }, (ctx) =>
|
|
||||||
loadModel(ctx, conn, modelFilter, hierarchy, model, true, txPersistence)
|
|
||||||
)
|
|
||||||
|
|
||||||
if (event === ClientConnectEvent.Reconnected && loadModelResponse.full) {
|
|
||||||
// We have upgrade procedure and need rebuild all stuff.
|
|
||||||
hierarchy = new Hierarchy()
|
|
||||||
model = new ModelDb(hierarchy)
|
|
||||||
|
|
||||||
await ctx.with('build-model', {}, (ctx) => buildModel(ctx, loadModelResponse, modelFilter, hierarchy, model))
|
|
||||||
await oldOnConnect?.(ClientConnectEvent.Upgraded, _lastTx, data)
|
|
||||||
lastTx = _lastTx
|
|
||||||
// No need to fetch more stuff since upgrade was happened.
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (event === ClientConnectEvent.Connected && _lastTx !== lastTx && lastTx === undefined) {
|
|
||||||
// No need to do anything here since we connected.
|
|
||||||
await oldOnConnect?.(event, _lastTx, data)
|
|
||||||
lastTx = _lastTx
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_lastTx === lastTx) {
|
|
||||||
// Same lastTx, no need to refresh
|
|
||||||
await oldOnConnect?.(ClientConnectEvent.Reconnected, _lastTx, data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
lastTx = _lastTx
|
|
||||||
// We need to trigger full refresh on queries, etc.
|
|
||||||
await oldOnConnect?.(ClientConnectEvent.Refresh, lastTx, data)
|
|
||||||
}
|
|
||||||
|
|
||||||
return client
|
return client
|
||||||
}
|
}
|
||||||
|
|
||||||
async function tryLoadModel (
|
|
||||||
ctx: MeasureContext,
|
|
||||||
conn: ClientConnection,
|
|
||||||
reload: boolean,
|
|
||||||
persistence?: TxPersistenceStore
|
|
||||||
): Promise<LoadModelResponse> {
|
|
||||||
const current = (await ctx.with('persistence-load', {}, () => persistence?.load())) ?? {
|
|
||||||
full: true,
|
|
||||||
transactions: [],
|
|
||||||
hash: ''
|
|
||||||
}
|
|
||||||
|
|
||||||
if (conn.getLastHash !== undefined && (await conn.getLastHash(ctx)) === current.hash) {
|
|
||||||
// We have same model hash.
|
|
||||||
current.full = false // Since we load, no need to send full
|
|
||||||
return current
|
|
||||||
}
|
|
||||||
const lastTxTime = getLastTxTime(current.transactions)
|
|
||||||
const result = await ctx.with('connection-load-model', { hash: current.hash !== '' }, (ctx) =>
|
|
||||||
conn.loadModel(lastTxTime, current.hash)
|
|
||||||
)
|
|
||||||
|
|
||||||
if (Array.isArray(result)) {
|
|
||||||
// Fallback to old behavior, only for tests
|
|
||||||
return {
|
|
||||||
full: true,
|
|
||||||
transactions: result,
|
|
||||||
hash: ''
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const transactions = current.transactions.concat(result.transactions)
|
|
||||||
if (result.hash !== current.hash) {
|
|
||||||
// Save concatenated, if have some more of them.
|
|
||||||
void ctx
|
|
||||||
.with('persistence-store', {}, (ctx) =>
|
|
||||||
persistence?.store({
|
|
||||||
...result,
|
|
||||||
transactions: !result.full ? transactions : result.transactions
|
|
||||||
})
|
|
||||||
)
|
|
||||||
.catch((err) => {
|
|
||||||
Analytics.handleError(err)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!result.full && !reload) {
|
|
||||||
result.transactions = transactions
|
|
||||||
}
|
|
||||||
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ignore Employee accounts.
|
// Ignore Employee accounts.
|
||||||
function isPersonAccount (tx: Tx): boolean {
|
function isPersonAccount (tx: Tx): boolean {
|
||||||
return (
|
return (
|
||||||
@ -381,37 +350,59 @@ function isPersonAccount (tx: Tx): boolean {
|
|||||||
async function loadModel (
|
async function loadModel (
|
||||||
ctx: MeasureContext,
|
ctx: MeasureContext,
|
||||||
conn: ClientConnection,
|
conn: ClientConnection,
|
||||||
modelFilter: ModelFilter | undefined,
|
|
||||||
hierarchy: Hierarchy,
|
|
||||||
model: ModelDb,
|
|
||||||
reload = false,
|
|
||||||
persistence?: TxPersistenceStore
|
persistence?: TxPersistenceStore
|
||||||
): Promise<LoadModelResponse> {
|
): Promise<{ mode: 'same' | 'addition' | 'upgrade', current: Tx[], addition: Tx[] }> {
|
||||||
const t = Date.now()
|
const t = Date.now()
|
||||||
|
|
||||||
const modelResponse = await ctx.with('try-load-model', { reload }, (ctx) =>
|
const current = (await ctx.with('persistence-load', {}, () => persistence?.load())) ?? {
|
||||||
tryLoadModel(ctx, conn, reload, persistence)
|
full: true,
|
||||||
|
transactions: [],
|
||||||
|
hash: ''
|
||||||
|
}
|
||||||
|
|
||||||
|
if (conn.getLastHash !== undefined && (await conn.getLastHash(ctx)) === current.hash) {
|
||||||
|
// We have same model hash.
|
||||||
|
return { mode: 'same', current: current.transactions, addition: [] }
|
||||||
|
}
|
||||||
|
const lastTxTime = getLastTxTime(current.transactions)
|
||||||
|
const result = await ctx.with('connection-load-model', { hash: current.hash !== '' }, (ctx) =>
|
||||||
|
conn.loadModel(lastTxTime, current.hash)
|
||||||
)
|
)
|
||||||
|
|
||||||
if (reload && modelResponse.full) {
|
if (Array.isArray(result)) {
|
||||||
return modelResponse
|
// Fallback to old behavior, only for tests
|
||||||
|
return {
|
||||||
|
mode: 'same',
|
||||||
|
current: result,
|
||||||
|
addition: []
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Save concatenated, if have some more of them.
|
||||||
|
void ctx
|
||||||
|
.with('persistence-store', {}, (ctx) =>
|
||||||
|
persistence?.store({
|
||||||
|
...result,
|
||||||
|
// Store concatinated old + new txes
|
||||||
|
transactions: result.full ? result.transactions : current.transactions.concat(result.transactions)
|
||||||
|
})
|
||||||
|
)
|
||||||
|
.catch((err) => {
|
||||||
|
Analytics.handleError(err)
|
||||||
|
})
|
||||||
|
|
||||||
if (typeof window !== 'undefined') {
|
if (typeof window !== 'undefined') {
|
||||||
console.log(
|
console.log('find' + (result.full ? 'full model' : 'model diff'), result.transactions.length, Date.now() - t)
|
||||||
'find' + (modelResponse.full ? 'full model' : 'model diff'),
|
|
||||||
modelResponse.transactions.length,
|
|
||||||
Date.now() - t
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
if (result.full) {
|
||||||
await ctx.with('build-model', {}, (ctx) => buildModel(ctx, modelResponse, modelFilter, hierarchy, model))
|
return { mode: 'upgrade', current: result.transactions, addition: [] }
|
||||||
return modelResponse
|
}
|
||||||
|
return { mode: 'addition', current: current.transactions, addition: result.transactions }
|
||||||
}
|
}
|
||||||
|
|
||||||
async function buildModel (
|
async function buildModel (
|
||||||
ctx: MeasureContext,
|
ctx: MeasureContext,
|
||||||
modelResponse: LoadModelResponse,
|
transactions: Tx[],
|
||||||
modelFilter: ModelFilter | undefined,
|
modelFilter: ModelFilter | undefined,
|
||||||
hierarchy: Hierarchy,
|
hierarchy: Hierarchy,
|
||||||
model: ModelDb
|
model: ModelDb
|
||||||
@ -419,7 +410,7 @@ async function buildModel (
|
|||||||
const systemTx: Tx[] = []
|
const systemTx: Tx[] = []
|
||||||
const userTx: Tx[] = []
|
const userTx: Tx[] = []
|
||||||
|
|
||||||
const atxes = modelResponse.transactions
|
const atxes = transactions
|
||||||
|
|
||||||
ctx.withSync('split txes', {}, () => {
|
ctx.withSync('split txes', {}, () => {
|
||||||
atxes.forEach((tx) =>
|
atxes.forEach((tx) =>
|
||||||
|
@ -13,26 +13,33 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
//
|
//
|
||||||
|
|
||||||
import type {
|
import core, {
|
||||||
|
Account,
|
||||||
AccountClient,
|
AccountClient,
|
||||||
BackupClient,
|
BackupClient,
|
||||||
Class,
|
Class,
|
||||||
|
ClientConnectEvent,
|
||||||
|
ClientConnection,
|
||||||
Doc,
|
Doc,
|
||||||
|
DocChunk,
|
||||||
DocumentQuery,
|
DocumentQuery,
|
||||||
Domain,
|
Domain,
|
||||||
|
DOMAIN_TX,
|
||||||
FindOptions,
|
FindOptions,
|
||||||
FindResult,
|
FindResult,
|
||||||
|
FulltextStorage,
|
||||||
|
Hierarchy,
|
||||||
LoadModelResponse,
|
LoadModelResponse,
|
||||||
|
ModelDb,
|
||||||
Ref,
|
Ref,
|
||||||
|
SearchOptions,
|
||||||
|
SearchQuery,
|
||||||
|
SearchResult,
|
||||||
Timestamp,
|
Timestamp,
|
||||||
Tx,
|
Tx,
|
||||||
TxResult,
|
TxDb,
|
||||||
FulltextStorage,
|
TxResult
|
||||||
SearchQuery,
|
|
||||||
SearchOptions,
|
|
||||||
SearchResult
|
|
||||||
} from '@hcengineering/core'
|
} from '@hcengineering/core'
|
||||||
import core, { DOMAIN_TX, Hierarchy, ModelDb, TxDb } from '@hcengineering/core'
|
|
||||||
import { genMinModel } from './minmodel'
|
import { genMinModel } from './minmodel'
|
||||||
|
|
||||||
export async function connect (handler: (tx: Tx) => void): Promise<
|
export async function connect (handler: (tx: Tx) => void): Promise<
|
||||||
@ -55,49 +62,103 @@ FulltextStorage & {
|
|||||||
await model.tx(tx)
|
await model.tx(tx)
|
||||||
}
|
}
|
||||||
|
|
||||||
async function findAll<T extends Doc> (
|
class TestConnection implements ClientConnection {
|
||||||
_class: Ref<Class<T>>,
|
private readonly hierarchy: Hierarchy
|
||||||
query: DocumentQuery<T>,
|
private readonly model: ModelDb
|
||||||
options?: FindOptions<T>
|
private readonly transactions: TxDb
|
||||||
): Promise<FindResult<T>> {
|
|
||||||
const domain = hierarchy.getClass(_class).domain
|
|
||||||
if (domain === DOMAIN_TX) return await transactions.findAll(_class, query, options)
|
|
||||||
return await model.findAll(_class, query, options)
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
constructor (hierarchy: Hierarchy, model: ModelDb, transactions: TxDb) {
|
||||||
isConnected: () => true,
|
this.hierarchy = hierarchy
|
||||||
findAll,
|
this.model = model
|
||||||
findOne: async (_class, query, options) => (await findAll(_class, query, { ...options, limit: 1 })).shift(),
|
this.transactions = transactions
|
||||||
getHierarchy: () => hierarchy,
|
}
|
||||||
getModel: () => model,
|
|
||||||
getAccount: async () => ({}) as unknown as any,
|
isConnected (): boolean {
|
||||||
tx: async (tx: Tx): Promise<TxResult> => {
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
async findAll<T extends Doc>(
|
||||||
|
_class: Ref<Class<T>>,
|
||||||
|
query: DocumentQuery<T>,
|
||||||
|
options?: FindOptions<T>
|
||||||
|
): Promise<FindResult<T>> {
|
||||||
|
const domain = this.hierarchy.getClass(_class).domain
|
||||||
|
if (domain === DOMAIN_TX) return await this.transactions.findAll(_class, query, options)
|
||||||
|
return await this.model.findAll(_class, query, options)
|
||||||
|
}
|
||||||
|
|
||||||
|
async findOne<T extends Doc>(
|
||||||
|
_class: Ref<Class<T>>,
|
||||||
|
query: DocumentQuery<T>,
|
||||||
|
options?: FindOptions<T>
|
||||||
|
): Promise<T | undefined> {
|
||||||
|
return (await this.findAll(_class, query, { ...options, limit: 1 })).shift()
|
||||||
|
}
|
||||||
|
|
||||||
|
getHierarchy (): Hierarchy {
|
||||||
|
return this.hierarchy
|
||||||
|
}
|
||||||
|
|
||||||
|
getModel (): ModelDb {
|
||||||
|
return this.model
|
||||||
|
}
|
||||||
|
|
||||||
|
async getAccount (): Promise<Account> {
|
||||||
|
return {} as unknown as any
|
||||||
|
}
|
||||||
|
|
||||||
|
async tx (tx: Tx): Promise<TxResult> {
|
||||||
if (tx.objectSpace === core.space.Model) {
|
if (tx.objectSpace === core.space.Model) {
|
||||||
hierarchy.tx(tx)
|
this.hierarchy.tx(tx)
|
||||||
}
|
}
|
||||||
await Promise.all([model.tx(tx), transactions.tx(tx)])
|
await Promise.all([this.model.tx(tx), this.transactions.tx(tx)])
|
||||||
// Not required, since handled in client.
|
|
||||||
handler(tx)
|
handler(tx)
|
||||||
return {}
|
return {}
|
||||||
},
|
}
|
||||||
close: async () => {},
|
|
||||||
loadChunk: async (domain: Domain, idx?: number) => ({
|
|
||||||
idx: -1,
|
|
||||||
index: -1,
|
|
||||||
docs: [],
|
|
||||||
finished: true,
|
|
||||||
digest: ''
|
|
||||||
}),
|
|
||||||
loadModel: async (lastTxTime) => txes,
|
|
||||||
closeChunk: async (idx: number) => {},
|
|
||||||
loadDocs: async (domain: Domain, docs: Ref<Doc>[]) => [],
|
|
||||||
upload: async (domain: Domain, docs: Doc[]) => {},
|
|
||||||
clean: async (domain: Domain, docs: Ref<Doc>[]) => {},
|
|
||||||
|
|
||||||
searchFulltext: async (query: SearchQuery, options: SearchOptions): Promise<SearchResult> => {
|
async close (): Promise<void> {}
|
||||||
|
|
||||||
|
async loadChunk (domain: Domain, idx?: number): Promise<DocChunk> {
|
||||||
|
return {
|
||||||
|
idx: -1,
|
||||||
|
docs: [],
|
||||||
|
finished: true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async loadModel (lastTxTime: Timestamp): Promise<Tx[]> {
|
||||||
|
return txes
|
||||||
|
}
|
||||||
|
|
||||||
|
async closeChunk (idx: number): Promise<void> {}
|
||||||
|
|
||||||
|
async loadDocs (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
|
||||||
|
return []
|
||||||
|
}
|
||||||
|
|
||||||
|
async upload (domain: Domain, docs: Doc[]): Promise<void> {}
|
||||||
|
|
||||||
|
async clean (domain: Domain, docs: Ref<Doc>[]): Promise<void> {}
|
||||||
|
|
||||||
|
async searchFulltext (query: SearchQuery, options: SearchOptions): Promise<SearchResult> {
|
||||||
return { docs: [] }
|
return { docs: [] }
|
||||||
},
|
}
|
||||||
sendForceClose: async () => {}
|
|
||||||
|
async sendForceClose (): Promise<void> {}
|
||||||
|
|
||||||
|
handler?: (event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>
|
||||||
|
|
||||||
|
set onConnect (
|
||||||
|
handler: ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>) | undefined
|
||||||
|
) {
|
||||||
|
this.handler = handler
|
||||||
|
void this.handler?.(ClientConnectEvent.Connected, '', {})
|
||||||
|
}
|
||||||
|
|
||||||
|
get onConnect (): ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>) | undefined {
|
||||||
|
return this.handler
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return new TestConnection(hierarchy, model, transactions)
|
||||||
}
|
}
|
||||||
|
@ -984,7 +984,7 @@ describe('query', () => {
|
|||||||
it('test clone ops', async () => {
|
it('test clone ops', async () => {
|
||||||
const { liveQuery, factory } = await getClient()
|
const { liveQuery, factory } = await getClient()
|
||||||
|
|
||||||
const counter = 10000
|
const counter = 1000
|
||||||
const ctx = new MeasureMetricsContext('tool', {})
|
const ctx = new MeasureMetricsContext('tool', {})
|
||||||
let data: Space[] = []
|
let data: Space[] = []
|
||||||
const pp = new Promise((resolve) => {
|
const pp = new Promise((resolve) => {
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
import core, {
|
import core, {
|
||||||
AccountRole,
|
AccountRole,
|
||||||
|
ClientConnectEvent,
|
||||||
WorkspaceEvent,
|
WorkspaceEvent,
|
||||||
generateId,
|
generateId,
|
||||||
getTypeOf,
|
getTypeOf,
|
||||||
@ -11,17 +12,27 @@ import core, {
|
|||||||
type BulkUpdateEvent,
|
type BulkUpdateEvent,
|
||||||
type Class,
|
type Class,
|
||||||
type Client,
|
type Client,
|
||||||
|
type ClientConnection,
|
||||||
type Doc,
|
type Doc,
|
||||||
|
type DocChunk,
|
||||||
|
type DocumentQuery,
|
||||||
|
type Domain,
|
||||||
|
type FindOptions,
|
||||||
|
type FindResult,
|
||||||
type MeasureContext,
|
type MeasureContext,
|
||||||
type ModelDb,
|
type ModelDb,
|
||||||
type Ref,
|
type Ref,
|
||||||
|
type SearchResult,
|
||||||
type SessionData,
|
type SessionData,
|
||||||
|
type Tx,
|
||||||
|
type TxResult,
|
||||||
type TxWorkspaceEvent,
|
type TxWorkspaceEvent,
|
||||||
type WorkspaceIdWithUrl
|
type WorkspaceIdWithUrl
|
||||||
} from '@hcengineering/core'
|
} from '@hcengineering/core'
|
||||||
import platform, { PlatformError, Severity, Status, unknownError } from '@hcengineering/platform'
|
import platform, { PlatformError, Severity, Status, unknownError } from '@hcengineering/platform'
|
||||||
import { type Hash } from 'crypto'
|
import { type Hash } from 'crypto'
|
||||||
import fs from 'fs'
|
import fs from 'fs'
|
||||||
|
import type { DbAdapter } from './adapter'
|
||||||
import { BackupClientOps } from './storage'
|
import { BackupClientOps } from './storage'
|
||||||
import type { Pipeline } from './types'
|
import type { Pipeline } from './types'
|
||||||
|
|
||||||
@ -293,3 +304,65 @@ export function wrapPipeline (
|
|||||||
notify: (...tx) => {}
|
notify: (...tx) => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function wrapAdapterToClient (ctx: MeasureContext, storageAdapter: DbAdapter, txes: Tx[]): ClientConnection {
|
||||||
|
class TestClientConnection implements ClientConnection {
|
||||||
|
isConnected = (): boolean => true
|
||||||
|
|
||||||
|
handler?: (event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>
|
||||||
|
|
||||||
|
set onConnect (
|
||||||
|
handler: ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>) | undefined
|
||||||
|
) {
|
||||||
|
this.handler = handler
|
||||||
|
void this.handler?.(ClientConnectEvent.Connected, '', {})
|
||||||
|
}
|
||||||
|
|
||||||
|
get onConnect (): ((event: ClientConnectEvent, lastTx: string | undefined, data: any) => Promise<void>) | undefined {
|
||||||
|
return this.handler
|
||||||
|
}
|
||||||
|
|
||||||
|
async findAll<T extends Doc>(
|
||||||
|
_class: Ref<Class<Doc>>,
|
||||||
|
query: DocumentQuery<Doc>,
|
||||||
|
options?: FindOptions<Doc>
|
||||||
|
): Promise<FindResult<T>> {
|
||||||
|
return (await storageAdapter.findAll(ctx, _class, query, options)) as any
|
||||||
|
}
|
||||||
|
|
||||||
|
async tx (tx: Tx): Promise<TxResult> {
|
||||||
|
return await storageAdapter.tx(ctx, tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
async searchFulltext (): Promise<SearchResult> {
|
||||||
|
return { docs: [] }
|
||||||
|
}
|
||||||
|
|
||||||
|
async close (): Promise<void> {}
|
||||||
|
|
||||||
|
async loadChunk (domain: Domain): Promise<DocChunk> {
|
||||||
|
throw new Error('unsupported')
|
||||||
|
}
|
||||||
|
|
||||||
|
async closeChunk (idx: number): Promise<void> {}
|
||||||
|
|
||||||
|
async loadDocs (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
|
||||||
|
return []
|
||||||
|
}
|
||||||
|
|
||||||
|
async upload (domain: Domain, docs: Doc[]): Promise<void> {}
|
||||||
|
|
||||||
|
async clean (domain: Domain, docs: Ref<Doc>[]): Promise<void> {}
|
||||||
|
|
||||||
|
async loadModel (): Promise<Tx[]> {
|
||||||
|
return txes
|
||||||
|
}
|
||||||
|
|
||||||
|
async getAccount (): Promise<Account> {
|
||||||
|
return {} as any
|
||||||
|
}
|
||||||
|
|
||||||
|
async sendForceClose (): Promise<void> {}
|
||||||
|
}
|
||||||
|
return new TestClientConnection()
|
||||||
|
}
|
||||||
|
@ -15,11 +15,7 @@
|
|||||||
//
|
//
|
||||||
import core, {
|
import core, {
|
||||||
type Client,
|
type Client,
|
||||||
type ClientConnection,
|
|
||||||
createClient,
|
createClient,
|
||||||
type Doc,
|
|
||||||
type DocChunk,
|
|
||||||
type Domain,
|
|
||||||
generateId,
|
generateId,
|
||||||
getWorkspaceId,
|
getWorkspaceId,
|
||||||
Hierarchy,
|
Hierarchy,
|
||||||
@ -30,7 +26,7 @@ import core, {
|
|||||||
type Space,
|
type Space,
|
||||||
TxOperations
|
TxOperations
|
||||||
} from '@hcengineering/core'
|
} from '@hcengineering/core'
|
||||||
import { type DbAdapter } from '@hcengineering/server-core'
|
import { type DbAdapter, wrapAdapterToClient } from '@hcengineering/server-core'
|
||||||
import { createMongoAdapter, createMongoTxAdapter } from '..'
|
import { createMongoAdapter, createMongoTxAdapter } from '..'
|
||||||
import { getMongoClient, type MongoClientReference, shutdown } from '../utils'
|
import { getMongoClient, type MongoClientReference, shutdown } from '../utils'
|
||||||
import { genMinModel } from './minmodel'
|
import { genMinModel } from './minmodel'
|
||||||
@ -108,22 +104,7 @@ describe('mongo operations', () => {
|
|||||||
const ctx = new MeasureMetricsContext('client', {})
|
const ctx = new MeasureMetricsContext('client', {})
|
||||||
|
|
||||||
client = await createClient(async (handler) => {
|
client = await createClient(async (handler) => {
|
||||||
const st: ClientConnection = {
|
return wrapAdapterToClient(ctx, serverStorage, txes)
|
||||||
isConnected: () => true,
|
|
||||||
findAll: async (_class, query, options) => await serverStorage.findAll(ctx, _class, query, options),
|
|
||||||
tx: async (tx) => await serverStorage.tx(ctx, tx),
|
|
||||||
searchFulltext: async () => ({ docs: [] }),
|
|
||||||
close: async () => {},
|
|
||||||
loadChunk: async (domain): Promise<DocChunk> => await Promise.reject(new Error('unsupported')),
|
|
||||||
closeChunk: async (idx) => {},
|
|
||||||
loadDocs: async (domain: Domain, docs: Ref<Doc>[]) => [],
|
|
||||||
upload: async (domain: Domain, docs: Doc[]) => {},
|
|
||||||
clean: async (domain: Domain, docs: Ref<Doc>[]) => {},
|
|
||||||
loadModel: async () => txes,
|
|
||||||
getAccount: async () => ({}) as any,
|
|
||||||
sendForceClose: async () => {}
|
|
||||||
}
|
|
||||||
return st
|
|
||||||
})
|
})
|
||||||
|
|
||||||
operations = new TxOperations(client, core.account.System)
|
operations = new TxOperations(client, core.account.System)
|
||||||
|
@ -14,11 +14,7 @@
|
|||||||
//
|
//
|
||||||
import core, {
|
import core, {
|
||||||
type Client,
|
type Client,
|
||||||
type ClientConnection,
|
|
||||||
createClient,
|
createClient,
|
||||||
type Doc,
|
|
||||||
type DocChunk,
|
|
||||||
type Domain,
|
|
||||||
generateId,
|
generateId,
|
||||||
getWorkspaceId,
|
getWorkspaceId,
|
||||||
Hierarchy,
|
Hierarchy,
|
||||||
@ -29,7 +25,7 @@ import core, {
|
|||||||
type Space,
|
type Space,
|
||||||
TxOperations
|
TxOperations
|
||||||
} from '@hcengineering/core'
|
} from '@hcengineering/core'
|
||||||
import { type DbAdapter } from '@hcengineering/server-core'
|
import { type DbAdapter, wrapAdapterToClient } from '@hcengineering/server-core'
|
||||||
import { createPostgresAdapter, createPostgresTxAdapter } from '..'
|
import { createPostgresAdapter, createPostgresTxAdapter } from '..'
|
||||||
import { getDBClient, type PostgresClientReference, shutdown } from '../utils'
|
import { getDBClient, type PostgresClientReference, shutdown } from '../utils'
|
||||||
import { genMinModel } from './minmodel'
|
import { genMinModel } from './minmodel'
|
||||||
@ -121,22 +117,7 @@ describe('postgres operations', () => {
|
|||||||
)
|
)
|
||||||
await serverStorage.init?.(ctx)
|
await serverStorage.init?.(ctx)
|
||||||
client = await createClient(async (handler) => {
|
client = await createClient(async (handler) => {
|
||||||
const st: ClientConnection = {
|
return wrapAdapterToClient(ctx, serverStorage, txes)
|
||||||
isConnected: () => true,
|
|
||||||
findAll: async (_class, query, options) => await serverStorage.findAll(ctx, _class, query, options),
|
|
||||||
tx: async (tx) => await serverStorage.tx(ctx, tx),
|
|
||||||
searchFulltext: async () => ({ docs: [] }),
|
|
||||||
close: async () => {},
|
|
||||||
loadChunk: async (domain): Promise<DocChunk> => await Promise.reject(new Error('unsupported')),
|
|
||||||
closeChunk: async (idx) => {},
|
|
||||||
loadDocs: async (domain: Domain, docs: Ref<Doc>[]) => [],
|
|
||||||
upload: async (domain: Domain, docs: Doc[]) => {},
|
|
||||||
clean: async (domain: Domain, docs: Ref<Doc>[]) => {},
|
|
||||||
loadModel: async () => txes,
|
|
||||||
getAccount: async () => ({}) as any,
|
|
||||||
sendForceClose: async () => {}
|
|
||||||
}
|
|
||||||
return st
|
|
||||||
})
|
})
|
||||||
|
|
||||||
operations = new TxOperations(client, core.account.System)
|
operations = new TxOperations(client, core.account.System)
|
||||||
|
Loading…
Reference in New Issue
Block a user