Fix onConnect handler (#7265)
Some checks are pending
CI / build (push) Waiting to run
CI / svelte-check (push) Blocked by required conditions
CI / formatting (push) Blocked by required conditions
CI / test (push) Blocked by required conditions
CI / uitest (push) Waiting to run
CI / uitest-pg (push) Waiting to run
CI / uitest-qms (push) Waiting to run
CI / docker-build (push) Blocked by required conditions
CI / dist-build (push) Blocked by required conditions

Signed-off-by: Denis Bykhov <bykhov.denis@gmail.com>
This commit is contained in:
Denis Bykhov 2024-12-05 20:29:54 +05:00 committed by GitHub
parent d13c80be5f
commit ecf4064f69
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 21 additions and 17 deletions

View File

@ -85,7 +85,7 @@ export interface ClientConnection extends Storage, FulltextStorage, BackupClient
isConnected: () => boolean isConnected: () => boolean
close: () => Promise<void> close: () => Promise<void>
onConnect?: (event: ClientConnectEvent) => Promise<void> onConnect?: (event: ClientConnectEvent, data: any) => Promise<void>
// If hash is passed, will return LoadModelResponse // If hash is passed, will return LoadModelResponse
loadModel: (last: Timestamp, hash?: string) => Promise<Tx[] | LoadModelResponse> loadModel: (last: Timestamp, hash?: string) => Promise<Tx[] | LoadModelResponse>
@ -264,11 +264,11 @@ export async function createClient (
txHandler(...txBuffer) txHandler(...txBuffer)
txBuffer = undefined txBuffer = undefined
const oldOnConnect: ((event: ClientConnectEvent) => Promise<void>) | undefined = conn.onConnect const oldOnConnect: ((event: ClientConnectEvent, data: any) => Promise<void>) | undefined = conn.onConnect
conn.onConnect = async (event) => { conn.onConnect = async (event, data) => {
console.log('Client: onConnect', event) console.log('Client: onConnect', event)
if (event === ClientConnectEvent.Maintenance) { if (event === ClientConnectEvent.Maintenance) {
await oldOnConnect?.(ClientConnectEvent.Maintenance) await oldOnConnect?.(ClientConnectEvent.Maintenance, data)
return return
} }
// Find all new transactions and apply // Find all new transactions and apply
@ -282,7 +282,7 @@ export async function createClient (
model = new ModelDb(hierarchy) model = new ModelDb(hierarchy)
await ctx.with('build-model', {}, (ctx) => buildModel(ctx, loadModelResponse, modelFilter, hierarchy, model)) await ctx.with('build-model', {}, (ctx) => buildModel(ctx, loadModelResponse, modelFilter, hierarchy, model))
await oldOnConnect?.(ClientConnectEvent.Upgraded) await oldOnConnect?.(ClientConnectEvent.Upgraded, data)
// No need to fetch more stuff since upgrade was happened. // No need to fetch more stuff since upgrade was happened.
return return
@ -290,7 +290,7 @@ export async function createClient (
if (event === ClientConnectEvent.Connected) { if (event === ClientConnectEvent.Connected) {
// No need to do anything here since we connected. // No need to do anything here since we connected.
await oldOnConnect?.(event) await oldOnConnect?.(event, data)
return return
} }
@ -318,10 +318,10 @@ export async function createClient (
if (atxes.length < transactionThreshold && !needFullRefresh) { if (atxes.length < transactionThreshold && !needFullRefresh) {
console.log('applying input transactions', atxes.length) console.log('applying input transactions', atxes.length)
txHandler(...atxes) txHandler(...atxes)
await oldOnConnect?.(ClientConnectEvent.Reconnected) await oldOnConnect?.(ClientConnectEvent.Reconnected, data)
} else { } else {
// We need to trigger full refresh on queries, etc. // We need to trigger full refresh on queries, etc.
await oldOnConnect?.(ClientConnectEvent.Refresh) await oldOnConnect?.(ClientConnectEvent.Refresh, data)
} }
} }

View File

@ -101,6 +101,8 @@ class Connection implements ClientConnection {
private helloRecieved: boolean = false private helloRecieved: boolean = false
onConnect?: (event: ClientConnectEvent, data: any) => Promise<void>
rpcHandler = new RPCHandler() rpcHandler = new RPCHandler()
constructor ( constructor (
@ -130,6 +132,8 @@ class Connection implements ClientConnection {
this.sessionId = generateId() this.sessionId = generateId()
} }
this.onConnect = opt?.onConnect
this.scheduleOpen(this.ctx, false) this.scheduleOpen(this.ctx, false)
} }
@ -261,7 +265,7 @@ class Connection implements ClientConnection {
if (resp.id === -1) { if (resp.id === -1) {
this.delay = 0 this.delay = 0
if (resp.result?.state === 'upgrading') { if (resp.result?.state === 'upgrading') {
void this.opt?.onConnect?.(ClientConnectEvent.Maintenance, resp.result.stats) void this.onConnect?.(ClientConnectEvent.Maintenance, resp.result.stats)
this.upgrading = true this.upgrading = true
this.delay = 3 this.delay = 3
return return
@ -302,7 +306,7 @@ class Connection implements ClientConnection {
v.reconnect?.() v.reconnect?.()
} }
void this.opt?.onConnect?.( void this.onConnect?.(
(resp as HelloResponse).reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected, (resp as HelloResponse).reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected,
this.sessionId this.sessionId
) )

View File

@ -118,14 +118,14 @@ export default async () => {
reject(new Error(`Connection timeout, and no connection established to ${endpoint}`)) reject(new Error(`Connection timeout, and no connection established to ${endpoint}`))
} }
}, connectTimeout) }, connectTimeout)
newOpt.onConnect = (event) => { newOpt.onConnect = async (event, data) => {
// Any event is fine, it means server is alive. // Any event is fine, it means server is alive.
clearTimeout(connectTO) clearTimeout(connectTO)
resolve() resolve()
} }
}) })
} }
const clientConnection = connect(url, upgradeHandler, tokenPayload.workspace, tokenPayload.email, newOpt) const clientConnection = connect(url, upgradeHandler, tokenPayload.workspace, tokenPayload.email, opt)
if (connectPromise !== undefined) { if (connectPromise !== undefined) {
await connectPromise await connectPromise
} }

View File

@ -62,7 +62,7 @@ export interface ClientFactoryOptions {
onUpgrade?: () => void onUpgrade?: () => void
onUnauthorized?: () => void onUnauthorized?: () => void
onArchived?: () => void onArchived?: () => void
onConnect?: (event: ClientConnectEvent, data: any) => void onConnect?: (event: ClientConnectEvent, data: any) => Promise<void>
ctx?: MeasureContext ctx?: MeasureContext
onDialTimeout?: () => void | Promise<void> onDialTimeout?: () => void | Promise<void>
} }

View File

@ -125,7 +125,7 @@ export async function connect (title: string): Promise<Client | undefined> {
}) })
}, },
// We need to refresh all active live queries and clear old queries. // We need to refresh all active live queries and clear old queries.
onConnect: (event: ClientConnectEvent, data: any) => { onConnect: async (event: ClientConnectEvent, data: any) => {
console.log('WorkbenchClient: onConnect', event) console.log('WorkbenchClient: onConnect', event)
try { try {
if (event === ClientConnectEvent.Connected) { if (event === ClientConnectEvent.Connected) {

View File

@ -212,7 +212,7 @@ export async function connect (title: string): Promise<Client | undefined> {
}) })
}, },
// We need to refresh all active live queries and clear old queries. // We need to refresh all active live queries and clear old queries.
onConnect: (event: ClientConnectEvent, data: any) => { onConnect: async (event: ClientConnectEvent, data: any): Promise<void> => {
console.log('WorkbenchClient: onConnect', event) console.log('WorkbenchClient: onConnect', event)
if (event === ClientConnectEvent.Maintenance) { if (event === ClientConnectEvent.Maintenance) {
if (data != null && data.total !== 0) { if (data != null && data.total !== 0) {

View File

@ -18,7 +18,7 @@ import config from './config'
export async function createPlatformClient ( export async function createPlatformClient (
workspace: string, workspace: string,
timeout: number, timeout: number,
reconnect?: (event: ClientConnectEvent) => void reconnect?: (event: ClientConnectEvent, data: any) => Promise<void>
): Promise<Client> { ): Promise<Client> {
setMetadata(client.metadata.ClientSocketFactory, (url) => { setMetadata(client.metadata.ClientSocketFactory, (url) => {
return new WebSocket(url, { return new WebSocket(url, {

View File

@ -1560,7 +1560,7 @@ export class GithubWorker implements IntegrationManager {
ctx.info('Connecting to', { workspace: workspace.workspaceUrl, workspaceId: workspace.workspaceName }) ctx.info('Connecting to', { workspace: workspace.workspaceUrl, workspaceId: workspace.workspaceName })
let client: Client | undefined let client: Client | undefined
try { try {
client = await createPlatformClient(workspace.name, 30000, (event: ClientConnectEvent) => { client = await createPlatformClient(workspace.name, 30000, async (event: ClientConnectEvent) => {
reconnect(workspace.name, event) reconnect(workspace.name, event)
}) })