mirror of
https://github.com/hcengineering/platform.git
synced 2025-04-13 03:40:48 +00:00
TSK-1152: Fix connections mess (#2969)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
dd6619985c
commit
ec2e505e28
@ -15,13 +15,13 @@
|
||||
|
||||
import { Plugin } from '@hcengineering/platform'
|
||||
import { BackupClient, DocChunk } from './backup'
|
||||
import { Class, DOMAIN_MODEL, Doc, Domain, PluginConfiguration, Ref } from './classes'
|
||||
import { AttachedDoc, Class, DOMAIN_MODEL, Doc, Domain, PluginConfiguration, Ref } from './classes'
|
||||
import core from './component'
|
||||
import { Hierarchy } from './hierarchy'
|
||||
import { ModelDb } from './memdb'
|
||||
import type { DocumentQuery, FindOptions, FindResult, Storage, TxResult, WithLookup } from './storage'
|
||||
import { SortingOrder } from './storage'
|
||||
import { Tx, TxCUD, TxCreateDoc, TxProcessor, TxUpdateDoc } from './tx'
|
||||
import { Tx, TxCUD, TxCollectionCUD, TxCreateDoc, TxProcessor, TxUpdateDoc } from './tx'
|
||||
import { toFindResult } from './utils'
|
||||
|
||||
const transactionThreshold = 500
|
||||
@ -199,10 +199,24 @@ export async function createClient (
|
||||
// We need to look for last {transactionThreshold} transactions and if it is more since lastTx one we receive, we need to perform full refresh.
|
||||
const atxes = await conn.findAll(
|
||||
core.class.Tx,
|
||||
{ modifiedOn: { $gt: lastTx } },
|
||||
{ modifiedOn: { $gt: lastTx }, objectSpace: { $ne: core.space.Model } },
|
||||
{ sort: { modifiedOn: SortingOrder.Ascending, _id: SortingOrder.Ascending }, limit: transactionThreshold }
|
||||
)
|
||||
if (atxes.total < transactionThreshold) {
|
||||
|
||||
let needFullRefresh = false
|
||||
// if we have attachment document create/delete we need to full refresh, since some derived data could be missing
|
||||
for (const tx of atxes) {
|
||||
if (
|
||||
tx._class === core.class.TxCollectionCUD &&
|
||||
((tx as TxCollectionCUD<Doc, AttachedDoc>).tx._class === core.class.TxCreateDoc ||
|
||||
(tx as TxCollectionCUD<Doc, AttachedDoc>).tx._class === core.class.TxRemoveDoc)
|
||||
) {
|
||||
needFullRefresh = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if (atxes.total < transactionThreshold && !needFullRefresh) {
|
||||
console.log('applying input transactions', atxes.length)
|
||||
for (const tx of atxes) {
|
||||
txHandler(tx)
|
||||
|
@ -14,7 +14,7 @@
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import client, { ClientSocket } from '@hcengineering/client'
|
||||
import client, { ClientSocket, ClientSocketReadyState } from '@hcengineering/client'
|
||||
import core, {
|
||||
Class,
|
||||
ClientConnection,
|
||||
@ -24,27 +24,28 @@ import core, {
|
||||
Domain,
|
||||
FindOptions,
|
||||
FindResult,
|
||||
generateId,
|
||||
Ref,
|
||||
Tx,
|
||||
TxApplyIf,
|
||||
TxHandler,
|
||||
TxResult,
|
||||
TxWorkspaceEvent,
|
||||
WorkspaceEvent
|
||||
WorkspaceEvent,
|
||||
generateId
|
||||
} from '@hcengineering/core'
|
||||
import {
|
||||
getMetadata,
|
||||
PlatformError,
|
||||
readResponse,
|
||||
ReqId,
|
||||
serialize,
|
||||
UNAUTHORIZED,
|
||||
getMetadata,
|
||||
readResponse,
|
||||
serialize,
|
||||
unknownError
|
||||
} from '@hcengineering/platform'
|
||||
|
||||
const SECOND = 1000
|
||||
const pingTimeout = 10 * SECOND
|
||||
const hangTimeout = 5 * 60 * SECOND
|
||||
const dialTimeout = 20 * SECOND
|
||||
|
||||
class RequestPromise {
|
||||
@ -81,7 +82,7 @@ class Connection implements ClientConnection {
|
||||
this.interval = setInterval(() => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||
|
||||
if (this.pingResponse !== 0 && Date.now() - this.pingResponse > 3 * pingTimeout) {
|
||||
if (this.pingResponse !== 0 && Date.now() - this.pingResponse > hangTimeout) {
|
||||
// No ping response from server.
|
||||
const s = this.websocket
|
||||
|
||||
@ -116,13 +117,21 @@ class Connection implements ClientConnection {
|
||||
}
|
||||
|
||||
delay = 1
|
||||
pending: Promise<ClientSocket> | undefined
|
||||
|
||||
private async waitOpenConnection (): Promise<ClientSocket> {
|
||||
while (true) {
|
||||
try {
|
||||
const conn = await this.openConnection()
|
||||
const socket = await this.pending
|
||||
if (socket != null && socket.readyState === ClientSocketReadyState.OPEN) {
|
||||
return socket
|
||||
}
|
||||
this.pending = this.openConnection()
|
||||
await this.pending
|
||||
this.delay = 5
|
||||
return conn
|
||||
return await this.pending
|
||||
} catch (err: any) {
|
||||
this.pending = undefined
|
||||
console.log('failed to connect', err)
|
||||
if (err?.code === UNAUTHORIZED.code) {
|
||||
this.onUnauthorized?.()
|
||||
|
@ -47,6 +47,18 @@ export interface ClientSocket {
|
||||
send: (data: string | ArrayBufferLike | Blob | ArrayBufferView) => void
|
||||
|
||||
close: (code?: number) => void
|
||||
|
||||
readyState: ClientSocketReadyState
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export enum ClientSocketReadyState {
|
||||
CONNECTING = 0,
|
||||
OPEN = 1,
|
||||
CLOSING = 2,
|
||||
CLOSED = 3
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user