UBERF-7790: Fix connection timeout issue (#6301)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-08-09 17:39:38 +07:00 committed by Andrey Sobolev
parent e9f1032b7b
commit 2e59df60da
No known key found for this signature in database
GPG Key ID: BD80F68D68D8F7F2
6 changed files with 53 additions and 38 deletions

View File

@ -91,6 +91,8 @@ class Connection implements ClientConnection {
private pingResponse: number = Date.now() private pingResponse: number = Date.now()
private helloRecieved: boolean = false
rpcHandler = new RPCHandler() rpcHandler = new RPCHandler()
constructor ( constructor (
@ -173,7 +175,7 @@ class Connection implements ClientConnection {
} }
isConnected (): boolean { isConnected (): boolean {
return this.websocket != null && this.websocket.readyState === ClientSocketReadyState.OPEN return this.websocket != null && this.websocket.readyState === ClientSocketReadyState.OPEN && this.helloRecieved
} }
delay = 0 delay = 0
@ -240,6 +242,10 @@ class Connection implements ClientConnection {
return return
} }
if (resp.result === 'hello') { if (resp.result === 'hello') {
// We need to clear dial timer, since we recieve hello response.
clearTimeout(this.dialTimer)
this.dialTimer = null
this.helloRecieved = true
if (this.upgrading) { if (this.upgrading) {
// We need to call upgrade since connection is upgraded // We need to call upgrade since connection is upgraded
this.opt?.onUpgrade?.() this.opt?.onUpgrade?.()
@ -394,12 +400,15 @@ class Connection implements ClientConnection {
this.websocket = wsocket this.websocket = wsocket
const opened = false const opened = false
this.dialTimer = setTimeout(() => { if (this.dialTimer != null) {
if (!opened && !this.closed) { this.dialTimer = setTimeout(() => {
void this.opt?.onDialTimeout?.() this.dialTimer = null
this.scheduleOpen(true) if (!opened && !this.closed) {
} void this.opt?.onDialTimeout?.()
}, dialTimeout) this.scheduleOpen(true)
}
}, dialTimeout)
}
wsocket.onmessage = (event: MessageEvent) => { wsocket.onmessage = (event: MessageEvent) => {
if (this.closed) { if (this.closed) {
@ -419,10 +428,8 @@ class Connection implements ClientConnection {
} }
} }
wsocket.onclose = (ev) => { wsocket.onclose = (ev) => {
clearTimeout(this.dialTimer)
if (this.websocket !== wsocket) { if (this.websocket !== wsocket) {
wsocket.close() wsocket.close()
clearTimeout(this.dialTimer)
return return
} }
// console.log('client websocket closed', socketId, ev?.reason) // console.log('client websocket closed', socketId, ev?.reason)
@ -435,7 +442,7 @@ class Connection implements ClientConnection {
} }
const useBinary = getMetadata(client.metadata.UseBinaryProtocol) ?? true const useBinary = getMetadata(client.metadata.UseBinaryProtocol) ?? true
const useCompression = getMetadata(client.metadata.UseProtocolCompression) ?? false const useCompression = getMetadata(client.metadata.UseProtocolCompression) ?? false
clearTimeout(this.dialTimer) this.helloRecieved = false
const helloRequest: HelloRequest = { const helloRequest: HelloRequest = {
method: 'hello', method: 'hello',
params: [], params: [],
@ -447,7 +454,6 @@ class Connection implements ClientConnection {
} }
wsocket.onerror = (event: any) => { wsocket.onerror = (event: any) => {
clearTimeout(this.dialTimer)
if (this.websocket !== wsocket) { if (this.websocket !== wsocket) {
return return
} }

View File

@ -287,33 +287,19 @@ class TSessionManager implements SessionManager {
accountsUrl !== '' ? await this.getWorkspaceInfo(ctx, accountsUrl, rawToken) : this.wsFromToken(token) accountsUrl !== '' ? await this.getWorkspaceInfo(ctx, accountsUrl, rawToken) : this.wsFromToken(token)
} catch (err: any) { } catch (err: any) {
this.updateConnectErrorInfo(token) this.updateConnectErrorInfo(token)
// No connection to account service, retry from client.
await new Promise<void>((resolve) => {
setTimeout(resolve, 1000)
})
return { error: err } return { error: err }
} }
if (workspaceInfo === undefined) { if (workspaceInfo === undefined) {
this.updateConnectErrorInfo(token) this.updateConnectErrorInfo(token)
// No connection to account service, retry from client.
await new Promise<void>((resolve) => {
setTimeout(resolve, 1000)
})
return { upgrade: true } return { upgrade: true }
} }
if (workspaceInfo?.creating === true && token.email !== systemAccountEmail) { if (workspaceInfo?.creating === true && token.email !== systemAccountEmail) {
await new Promise<void>((resolve) => {
setTimeout(resolve, 1000)
})
// No access to workspace for token. // No access to workspace for token.
return { error: new Error(`Workspace during creation phase ${token.email} ${token.workspace.name}`) } return { error: new Error(`Workspace during creation phase ${token.email} ${token.workspace.name}`) }
} }
if (workspaceInfo === undefined && token.extra?.admin !== 'true') { if (workspaceInfo === undefined && token.extra?.admin !== 'true') {
await new Promise<void>((resolve) => {
setTimeout(resolve, 5000)
})
this.updateConnectErrorInfo(token) this.updateConnectErrorInfo(token)
// No access to workspace for token. // No access to workspace for token.
return { error: new Error(`No access to workspace for token ${token.email} ${token.workspace.name}`) } return { error: new Error(`No access to workspace for token ${token.email} ${token.workspace.name}`) }

View File

@ -15,7 +15,7 @@
import { Analytics } from '@hcengineering/analytics' import { Analytics } from '@hcengineering/analytics'
import { generateId, toWorkspaceString, type MeasureContext } from '@hcengineering/core' import { generateId, toWorkspaceString, type MeasureContext } from '@hcengineering/core'
import { UNAUTHORIZED } from '@hcengineering/platform' import { UNAUTHORIZED, unknownStatus } from '@hcengineering/platform'
import { RPCHandler, type Response } from '@hcengineering/rpc' import { RPCHandler, type Response } from '@hcengineering/rpc'
import { decodeToken, type Token } from '@hcengineering/server-token' import { decodeToken, type Token } from '@hcengineering/server-token'
import cors from 'cors' import cors from 'cors'
@ -277,13 +277,22 @@ export function startHttpServer (
if (webSocketData.session instanceof Promise) { if (webSocketData.session instanceof Promise) {
void webSocketData.session.then((s) => { void webSocketData.session.then((s) => {
if ('error' in s) { if ('error' in s) {
cs.close() void cs
.send(ctx, { id: -1, error: unknownStatus(s.error.message ?? 'Unknown error') }, false, false)
.then(() => {
// No connection to account service, retry from client.
setTimeout(() => {
cs.close()
}, 1000)
})
} }
if ('upgrade' in s) { if ('upgrade' in s) {
void cs void cs
.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false) .send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false)
.then(() => { .then(() => {
cs.close() setTimeout(() => {
cs.close()
}, 5000)
}) })
} }
}) })

View File

@ -29,6 +29,7 @@ import uWebSockets, { DISABLED, SHARED_COMPRESSOR, type HttpResponse, type WebSo
import { Readable } from 'stream' import { Readable } from 'stream'
import { getFile, getFileRange, type BlobResponse } from './blobs' import { getFile, getFileRange, type BlobResponse } from './blobs'
import { doSessionOp, processRequest, type WebsocketData } from './utils' import { doSessionOp, processRequest, type WebsocketData } from './utils'
import { unknownStatus } from '@hcengineering/platform'
const rpcHandler = new RPCHandler() const rpcHandler = new RPCHandler()
@ -139,13 +140,22 @@ export function startUWebsocketServer (
if (data.session instanceof Promise) { if (data.session instanceof Promise) {
void data.session.then((s) => { void data.session.then((s) => {
if ('error' in s) { if ('error' in s) {
ctx.error('error', { error: s.error?.message, stack: s.error?.stack }) void cs
.send(ctx, { id: -1, error: unknownStatus(s.error.message ?? 'Unknown error') }, false, false)
.then(() => {
// No connection to account service, retry from client.
setTimeout(() => {
cs.close()
}, 1000)
})
} }
if ('upgrade' in s) { if ('upgrade' in s) {
void cs void cs
.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false) .send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false)
.then(() => { .then(() => {
cs.close() setTimeout(() => {
cs.close()
}, 5000)
}) })
} }
}) })

View File

@ -739,6 +739,9 @@ export class PlatformWorker {
} }
} }
} }
this.ctx.info('************************* Check workspaces done ************************* ', {
workspaces: this.clients.size
})
return errors > 0 return errors > 0
} }

View File

@ -35,13 +35,6 @@ import core, {
type Blob, type Blob,
type MigrationState type MigrationState
} from '@hcengineering/core' } from '@hcengineering/core'
import { LiveQuery } from '@hcengineering/query'
import { StorageAdapter } from '@hcengineering/server-core'
import { getPublicLinkUrl } from '@hcengineering/server-guest-resources'
import task, { ProjectType, TaskType } from '@hcengineering/task'
import { MarkupNode, jsonToMarkup, isMarkdownsEquals } from '@hcengineering/text'
import tracker from '@hcengineering/tracker'
import { User } from '@octokit/webhooks-types'
import github, { import github, {
DocSyncInfo, DocSyncInfo,
GithubAuthentication, GithubAuthentication,
@ -53,6 +46,13 @@ import github, {
GithubUserInfo, GithubUserInfo,
githubId githubId
} from '@hcengineering/github' } from '@hcengineering/github'
import { LiveQuery } from '@hcengineering/query'
import { StorageAdapter } from '@hcengineering/server-core'
import { getPublicLinkUrl } from '@hcengineering/server-guest-resources'
import task, { ProjectType, TaskType } from '@hcengineering/task'
import { MarkupNode, isMarkdownsEquals, jsonToMarkup } from '@hcengineering/text'
import tracker from '@hcengineering/tracker'
import { User } from '@octokit/webhooks-types'
import { App, Octokit } from 'octokit' import { App, Octokit } from 'octokit'
import { createPlatformClient } from './client' import { createPlatformClient } from './client'
import { createCollaboratorClient } from './collaborator' import { createCollaboratorClient } from './collaborator'
@ -1487,7 +1487,7 @@ export class GithubWorker implements IntegrationManager {
ctx.info('Connecting to', { workspace: workspace.workspaceUrl, workspaceId: workspace.workspaceName }) ctx.info('Connecting to', { workspace: workspace.workspaceUrl, workspaceId: workspace.workspaceName })
let client: Client | undefined let client: Client | undefined
try { try {
client = await createPlatformClient(workspace.name, workspace.productId, 10000, (event: ClientConnectEvent) => { client = await createPlatformClient(workspace.name, workspace.productId, 30000, (event: ClientConnectEvent) => {
reconnect(workspace.name, event) reconnect(workspace.name, event)
}) })
@ -1506,6 +1506,7 @@ export class GithubWorker implements IntegrationManager {
return worker return worker
} }
} catch (err: any) { } catch (err: any) {
ctx.error('timeout during to connect', { workspace, error: err })
await client?.close() await client?.close()
} }
} }