QFIX: Restore services work (#7641)
Some checks are pending
CI / build (push) Waiting to run
CI / svelte-check (push) Blocked by required conditions
CI / formatting (push) Blocked by required conditions
CI / test (push) Blocked by required conditions
CI / uitest (push) Waiting to run
CI / uitest-pg (push) Waiting to run
CI / uitest-qms (push) Waiting to run
CI / docker-build (push) Blocked by required conditions
CI / dist-build (push) Blocked by required conditions

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2025-01-11 21:53:04 +07:00 committed by GitHub
parent 59141b9096
commit dc9240218f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 109 additions and 57 deletions

View File

@ -1 +1 @@
"0.6.287"
"0.6.382"

View File

@ -270,7 +270,7 @@ services:
- UV_THREADPOOL_SIZE=10
- SERVER_PORT=3333
- SERVER_SECRET=secret
- ENABLE_COMPRESSION=true
- ENABLE_COMPRESSION=false
- STATS_URL=http://host.docker.internal:4900
- FULLTEXT_URL=http://host.docker.internal:4700
# - DB_URL=postgresql://postgres:example@postgres:5432
@ -343,7 +343,7 @@ services:
# - UV_THREADPOOL_SIZE=10
- SERVER_PORT=3332
- SERVER_SECRET=secret
- ENABLE_COMPRESSION=true
- ENABLE_COMPRESSION=false
- FULLTEXT_URL=http://host.docker.internal:4702
- STATS_URL=http://host.docker.internal:4900
- DB_URL=postgresql://root@host.docker.internal:26257/defaultdb?sslmode=disable

View File

@ -513,8 +513,17 @@ class Connection implements ClientConnection {
console.error(err)
}
}
const resp = this.rpcHandler.readResponse<any>(data, this.binaryMode)
this.handleMsg(socketId, resp)
try {
const resp = this.rpcHandler.readResponse<any>(data, this.binaryMode)
this.handleMsg(socketId, resp)
} catch (err: any) {
if (!this.helloReceived) {
// Just error and ignore for now.
console.error(err)
} else {
throw err
}
}
})
} else {
let data = event.data
@ -526,8 +535,17 @@ class Connection implements ClientConnection {
console.error(err)
}
}
const resp = this.rpcHandler.readResponse<any>(data, this.binaryMode)
this.handleMsg(socketId, resp)
try {
const resp = this.rpcHandler.readResponse<any>(data, this.binaryMode)
this.handleMsg(socketId, resp)
} catch (err: any) {
if (!this.helloReceived) {
// Just error and ignore for now.
console.error(err)
} else {
throw err
}
}
}
}
wsocket.onclose = (ev) => {

View File

@ -58,7 +58,7 @@ export class ClientSession implements Session {
createTime = Date.now()
requests = new Map<string, SessionRequest>()
binaryMode: boolean = false
useCompression: boolean = true
useCompression: boolean = false
sessionId = ''
lastRequest = Date.now()

View File

@ -498,12 +498,6 @@ class TSessionManager implements SessionManager {
workspace.workspaceInitCompleted = true
}
// We do not need to wait for set-status, just return session to client
const _workspace = workspace
void ctx
.with('set-status', {}, (ctx) => this.trySetStatus(ctx, pipeline, session, true, _workspace.workspaceId))
.catch(() => {})
if (this.timeMinutes > 0) {
ws.send(ctx, { result: this.createMaintenanceWarning() }, session.binaryMode, session.useCompression)
}
@ -837,7 +831,7 @@ class TSessionManager implements SessionManager {
s.workspaceClosed = true
if (reason === 'upgrade' || reason === 'force-close') {
// Override message handler, to wait for upgrading response from clients.
this.sendUpgrade(workspace.context, webSocket, s.binaryMode)
this.sendUpgrade(workspace.context, webSocket, s.binaryMode, s.useCompression)
}
webSocket.close()
this.reconnectIds.delete(s.sessionId)
@ -877,7 +871,7 @@ class TSessionManager implements SessionManager {
}
}
private sendUpgrade (ctx: MeasureContext, webSocket: ConnectionSocket, binary: boolean): void {
private sendUpgrade (ctx: MeasureContext, webSocket: ConnectionSocket, binary: boolean, compression: boolean): void {
webSocket.send(
ctx,
{
@ -886,7 +880,7 @@ class TSessionManager implements SessionManager {
}
},
binary,
false
compression
)
}
@ -1081,39 +1075,49 @@ class TSessionManager implements SessionManager {
ws: ConnectionSocket,
requestCtx: MeasureContext<any>
): Promise<void> {
const hello = request as HelloRequest
service.binaryMode = hello.binary ?? false
service.useCompression = this.enableCompression ? hello.compression ?? false : false
try {
const hello = request as HelloRequest
service.binaryMode = hello.binary ?? false
service.useCompression = this.enableCompression ? hello.compression ?? false : false
if (LOGGING_ENABLED) {
ctx.info('hello happen', {
workspace,
user: service.getUser(),
if (LOGGING_ENABLED) {
ctx.info('hello happen', {
workspace,
user: service.getUser(),
binary: service.binaryMode,
compression: service.useCompression,
timeToHello: Date.now() - service.createTime,
workspaceUsers: this.workspaces.get(workspace)?.sessions?.size,
totalUsers: this.sessions.size
})
}
const reconnect = this.reconnectIds.has(service.sessionId)
if (reconnect) {
this.reconnectIds.delete(service.sessionId)
}
const pipeline =
service.workspace.pipeline instanceof Promise ? await service.workspace.pipeline : service.workspace.pipeline
const helloResponse: HelloResponse = {
id: -1,
result: 'hello',
binary: service.binaryMode,
compression: service.useCompression,
timeToHello: Date.now() - service.createTime,
workspaceUsers: this.workspaces.get(workspace)?.sessions?.size,
totalUsers: this.sessions.size
})
reconnect,
serverVersion: this.serverVersion,
lastTx: pipeline.context.lastTx,
lastHash: pipeline.context.lastHash,
account: service.getRawAccount(pipeline),
useCompression: service.useCompression
}
ws.send(requestCtx, helloResponse, false, false)
// We do not need to wait for set-status, just return session to client
const _workspace = service.workspace
void ctx
.with('set-status', {}, (ctx) => this.trySetStatus(ctx, pipeline, service, true, _workspace.workspaceId))
.catch(() => {})
} catch (err: any) {
ctx.error('error', { err })
}
const reconnect = this.reconnectIds.has(service.sessionId)
if (reconnect) {
this.reconnectIds.delete(service.sessionId)
}
const pipeline =
service.workspace.pipeline instanceof Promise ? await service.workspace.pipeline : service.workspace.pipeline
const helloResponse: HelloResponse = {
id: -1,
result: 'hello',
binary: service.binaryMode,
reconnect,
serverVersion: this.serverVersion,
lastTx: pipeline.context.lastTx,
lastHash: pipeline.context.lastHash,
account: service.getRawAccount(pipeline),
useCompression: service.useCompression
}
ws.send(requestCtx, helloResponse, false, false)
}
}

View File

@ -13,12 +13,14 @@
// limitations under the License.
//
import { Account, RateLimiter, Ref } from '@hcengineering/core'
import { Account, isActiveMode, RateLimiter, Ref, systemAccountEmail } from '@hcengineering/core'
import { type Db } from 'mongodb'
import { type CalendarClient } from './calendar'
import config from './config'
import { type ProjectCredentials, type Token, type User } from './types'
import { WorkspaceClient } from './workspaceClient'
import { getWorkspaceInfo } from '@hcengineering/server-client'
import { generateToken } from '@hcengineering/server-token'
export class CalendarController {
private readonly workspaces: Map<string, WorkspaceClient> = new Map<string, WorkspaceClient>()
@ -60,6 +62,16 @@ export class CalendarController {
for (const [workspace, tokens] of groups) {
await limiter.add(async () => {
const wstok = generateToken(systemAccountEmail, { name: workspace })
const info = await getWorkspaceInfo(wstok)
if (info === undefined) {
console.log('workspace not found', workspace)
return
}
if (!isActiveMode(info.mode)) {
console.log('workspace is not active', workspace)
return
}
const startPromise = this.startWorkspace(workspace, tokens)
const timeoutPromise = new Promise<void>((resolve) => {
setTimeout(() => {

View File

@ -36,6 +36,8 @@ export async function createPlatformClient (
},
{ mode: 'github' }
)
setMetadata(client.metadata.UseBinaryProtocol, true)
setMetadata(client.metadata.UseProtocolCompression, true)
setMetadata(client.metadata.ConnectionTimeout, timeout)
setMetadata(client.metadata.FilterModel, 'client')
const endpoint = await getTransactorEndpoint(token)

View File

@ -795,6 +795,7 @@ export class PlatformWorker {
total: workspaces.length
})
let initialized = false
const worker = await GithubWorker.create(
this,
workerCtx,
@ -811,17 +812,20 @@ export class PlatformWorker {
if (event === ClientConnectEvent.Refresh || event === ClientConnectEvent.Upgraded) {
void this.clients.get(workspace)?.refreshClient(event === ClientConnectEvent.Upgraded)
}
// We need to check if workspace is inactive
void this.checkWorkspaceIsActive(token, workspace).then((res) => {
if (res === undefined) {
this.ctx.warn('Workspace is inactive, removing from clients list.', { workspace })
this.clients.delete(workspace)
void worker?.close()
}
})
if (initialized) {
// We need to check if workspace is inactive
void this.checkWorkspaceIsActive(token, workspace).then((res) => {
if (res === undefined) {
this.ctx.warn('Workspace is inactive, removing from clients list.', { workspace })
this.clients.delete(workspace)
void worker?.close()
}
})
}
}
)
if (worker !== undefined) {
initialized = true
workerCtx.info('************************* Register worker Done ************************* ', {
workspaceId: workspaceInfo.workspaceId,
workspace: workspaceInfo.workspace,

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import { MeasureContext, RateLimiter } from '@hcengineering/core'
import { isActiveMode, MeasureContext, RateLimiter, systemAccountEmail } from '@hcengineering/core'
import type { StorageAdapter } from '@hcengineering/server-core'
import { type Db } from 'mongodb'
@ -22,6 +22,8 @@ import config from './config'
import { type GmailClient } from './gmail'
import { type ProjectCredentials, type Token, type User } from './types'
import { WorkspaceClient } from './workspaceClient'
import { generateToken } from '@hcengineering/server-token'
import { getWorkspaceInfo } from '@hcengineering/server-client'
export class GmailController {
private readonly workspaces: Map<string, WorkspaceClient> = new Map<string, WorkspaceClient>()
@ -72,6 +74,16 @@ export class GmailController {
const limiter = new RateLimiter(config.InitLimit)
for (const [workspace, tokens] of groups) {
await limiter.add(async () => {
const wstok = generateToken(systemAccountEmail, { name: workspace })
const info = await getWorkspaceInfo(wstok)
if (info === undefined) {
console.log('workspace not found', workspace)
return
}
if (!isActiveMode(info.mode)) {
console.log('workspace is not active', workspace)
return
}
const startPromise = this.startWorkspace(workspace, tokens)
const timeoutPromise = new Promise<void>((resolve) => {
setTimeout(() => {