UBERF-8333: Retry WS handshake (#6786)

Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com>
This commit is contained in:
Alexey Zinoviev 2024-10-02 20:10:52 +04:00 committed by GitHub
parent 386965194d
commit ce2621d472
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 37 additions and 14 deletions

View File

@ -102,30 +102,47 @@ export async function getTransactorEndpoint (
} }
} }
export function withRetryUntilTimeout<P extends any[], T> ( export function withRetry<P extends any[], T> (
f: (...params: P) => Promise<T>, f: (...params: P) => Promise<T>,
timeoutMs: number = 5000 shouldFail: (err: any, attempt: number) => boolean,
intervalMs: number = 1000
): (...params: P) => Promise<T> { ): (...params: P) => Promise<T> {
return async function (...params: P): Promise<T> { return async function (...params: P): Promise<T> {
const timeout = Date.now() + timeoutMs let attempt = 0
while (true) { while (true) {
try { try {
return await f(...params) return await f(...params)
} catch (err: any) { } catch (err: any) {
if (timeout < Date.now()) { if (shouldFail(err, attempt)) {
// Timeout happened
throw err
}
if (err?.cause?.code === 'ECONNRESET' || err?.cause?.code === 'ECONNREFUSED') {
await new Promise<void>((resolve) => setTimeout(resolve, 1000))
} else {
throw err throw err
} }
attempt++
await new Promise<void>((resolve) => setTimeout(resolve, intervalMs))
} }
} }
} }
} }
export function withRetryConnUntilTimeout<P extends any[], T> (
f: (...params: P) => Promise<T>,
timeoutMs: number = 5000
): (...params: P) => Promise<T> {
const timeout = Date.now() + timeoutMs
const shouldFail = (err: any): boolean =>
(err?.cause?.code !== 'ECONNRESET' && err?.cause?.code !== 'ECONNREFUSED') || timeout < Date.now()
return withRetry(f, shouldFail)
}
export function withRetryConnUntilSuccess<P extends any[], T> (
f: (...params: P) => Promise<T>
): (...params: P) => Promise<T> {
const shouldFail = (err: any): boolean => err?.cause?.code !== 'ECONNRESET' && err?.cause?.code !== 'ECONNREFUSED'
return withRetry(f, shouldFail)
}
export async function getPendingWorkspace ( export async function getPendingWorkspace (
token: string, token: string,
region: string, region: string,

View File

@ -28,7 +28,8 @@ import {
getPendingWorkspace, getPendingWorkspace,
updateWorkspaceInfo, updateWorkspaceInfo,
workerHandshake, workerHandshake,
withRetryUntilTimeout withRetryConnUntilTimeout,
withRetryConnUntilSuccess
} from '@hcengineering/server-client' } from '@hcengineering/server-client'
import { generateToken } from '@hcengineering/server-token' import { generateToken } from '@hcengineering/server-token'
import { FileModelLogger } from '@hcengineering/server-tool' import { FileModelLogger } from '@hcengineering/server-tool'
@ -84,7 +85,12 @@ export class WorkspaceWorker {
} }
this.wakeup = this.defaultWakeup this.wakeup = this.defaultWakeup
const token = generateToken(systemAccountEmail, { name: '-' }, { service: 'workspace' }) const token = generateToken(systemAccountEmail, { name: '-' }, { service: 'workspace' })
await workerHandshake(token, this.region, this.version, this.operation)
ctx.info('Sending a handshake to the account service...')
await withRetryConnUntilSuccess(workerHandshake)(token, this.region, this.version, this.operation)
ctx.info('Successfully connected to the account service')
while (true) { while (true) {
await this.waitForAvailableThread() await this.waitForAvailableThread()
@ -156,7 +162,7 @@ export class WorkspaceWorker {
progress: number, progress: number,
message?: string message?: string
): Promise<void> => { ): Promise<void> => {
return withRetryUntilTimeout( return withRetryConnUntilTimeout(
() => updateWorkspaceInfo(token, ws.workspace, event, version, progress, message), () => updateWorkspaceInfo(token, ws.workspace, event, version, progress, message),
5000 5000
)() )()
@ -239,7 +245,7 @@ export class WorkspaceWorker {
progress: number, progress: number,
message?: string message?: string
): Promise<void> => { ): Promise<void> => {
return withRetryUntilTimeout( return withRetryConnUntilTimeout(
() => updateWorkspaceInfo(token, ws.workspace, event, version, progress, message), () => updateWorkspaceInfo(token, ws.workspace, event, version, progress, message),
5000 5000
)() )()