UBERF-9496 (#8061)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2025-02-20 22:28:59 +07:00 committed by GitHub
parent 9e202352eb
commit 3cc324da77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 114 additions and 131 deletions

View File

@ -466,7 +466,11 @@ export class MarkdownState implements IState {
for (let i = 0; i < len; i++) {
const mark = state.marks[i]
if (!this.marks[mark.type].mixable) break
const mm = this.marks[mark.type]
if (mm == null) {
break
}
if (!mm.mixable) break
this.reorderMixableMark(state, mark, i, len)
}
}
@ -735,6 +739,9 @@ export class MarkdownState implements IState {
let value = mark.attrs?.marker
if (value === undefined) {
const info = this.marks[mark.type]
if (info == null) {
throw new Error(`No info for mark ${mark.type}`)
}
value = open ? info.open : info.close
}
return typeof value === 'string' ? value : value(this, mark, parent, index) ?? ''

View File

@ -47,7 +47,6 @@ import {
type Workspace
} from '@hcengineering/server-core'
import { type Token } from '@hcengineering/server-token'
import { handleSend } from './utils'
const useReserveContext = (process.env.USE_RESERVE_CTX ?? 'true') === 'true'
@ -222,9 +221,7 @@ export class ClientSession implements Session {
this.useCompression
)
} else {
void handleSend(ctx, socket, { result: tx }, 1024 * 1024, this.binaryMode, this.useCompression).catch((err) => {
ctx.error('failed to broadcast', err)
})
socket.send(ctx, { result: tx }, this.binaryMode, this.useCompression)
}
}

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import { type FindResult, type MeasureContext } from '@hcengineering/core'
import { type MeasureContext } from '@hcengineering/core'
import type {
AddSessionActive,
@ -23,7 +23,6 @@ import type {
Session
} from '@hcengineering/server-core'
import { toFindResult } from '@hcengineering/core'
import { type Response } from '@hcengineering/rpc'
import type { Token } from '@hcengineering/server-token'
@ -80,64 +79,12 @@ export function processRequest (
}
}
export async function sendResponse (
export function sendResponse (
ctx: MeasureContext,
session: Session,
socket: ConnectionSocket,
resp: Response<any>
): Promise<void> {
await handleSend(ctx, socket, resp, 1024 * 1024, session.binaryMode, session.useCompression)
}
function waitNextTick (): Promise<void> | undefined {
return new Promise<void>((resolve) => {
setImmediate(resolve)
})
}
export async function handleSend (
ctx: MeasureContext,
ws: ConnectionSocket,
msg: Response<any>,
chunkLimit: number,
useBinary: boolean,
useCompression: boolean
): Promise<void> {
// ws.send(msg)
if (Array.isArray(msg.result) && msg.result.length > 1 && chunkLimit > 0) {
// Split and send by chunks
const data = [...msg.result]
let cid = 1
const dataSize = JSON.stringify(data).length
const avg = Math.round(dataSize / data.length)
const itemChunk = Math.round(chunkLimit / avg) + 1
while (data.length > 0 && !ws.isClosed) {
let itemChunkCurrent = itemChunk
if (data.length - itemChunk < itemChunk / 2) {
itemChunkCurrent = data.length
}
const chunk: FindResult<any> = toFindResult(data.splice(0, itemChunkCurrent))
if (data.length === 0) {
const orig = msg.result as FindResult<any>
chunk.total = orig.total ?? 0
chunk.lookupMap = orig.lookupMap
}
if (chunk !== undefined) {
ws.send(
ctx,
{ ...msg, result: chunk, chunk: { index: cid, final: data.length === 0 } },
useBinary,
useCompression
)
}
cid++
if (data.length > 0 && !ws.isClosed) {
await waitNextTick()
}
}
} else {
ws.send(ctx, msg, useBinary, useCompression)
}
socket.send(ctx, resp, session.binaryMode, session.useCompression)
return Promise.resolve()
}

View File

@ -11,6 +11,7 @@ import core, {
ClientConnectEvent,
DocumentUpdate,
isActiveMode,
isDeletingMode,
MeasureContext,
RateLimiter,
Ref,
@ -712,33 +713,36 @@ export class PlatformWorker {
return Array.from(workspaces)
}
async checkWorkspaceIsActive (token: string, workspace: string): Promise<ClientWorkspaceInfo | undefined> {
async checkWorkspaceIsActive (
token: string,
workspace: string
): Promise<{ workspaceInfo: ClientWorkspaceInfo | undefined, needRecheck: boolean }> {
let workspaceInfo: ClientWorkspaceInfo | undefined
try {
workspaceInfo = await getWorkspaceInfo(token)
} catch (err: any) {
this.ctx.error('Workspace not found:', { workspace })
return
return { workspaceInfo: undefined, needRecheck: false }
}
if (workspaceInfo?.workspace === undefined) {
this.ctx.error('No workspace exists for workspaceId', { workspace })
return
return { workspaceInfo: undefined, needRecheck: false }
}
if (workspaceInfo?.disabled === true || isDeletingMode(workspaceInfo?.mode)) {
this.ctx.warn('Workspace is disabled', { workspace })
return { workspaceInfo: undefined, needRecheck: false }
}
if (!isActiveMode(workspaceInfo?.mode)) {
this.ctx.warn('Workspace is in maitenance, skipping for now.', { workspace })
return
}
if (workspaceInfo?.disabled === true) {
this.ctx.warn('Workspace is disabled', { workspace })
return
this.ctx.warn('Workspace is in maitenance, skipping for now.', { workspace, mode: workspaceInfo?.mode })
return { workspaceInfo: undefined, needRecheck: true }
}
const lastVisit = (Date.now() - workspaceInfo.lastVisit) / (3600 * 24 * 1000) // In days
if (config.WorkspaceInactivityInterval > 0 && lastVisit > config.WorkspaceInactivityInterval) {
this.ctx.warn('Workspace is inactive for too long, skipping for now.', { workspace })
return
return { workspaceInfo: undefined, needRecheck: true }
}
return workspaceInfo
return { workspaceInfo, needRecheck: true }
}
private async checkWorkspaces (): Promise<boolean> {
@ -781,9 +785,11 @@ export class PlatformWorker {
},
{ mode: 'github' }
)
const workspaceInfo = await this.checkWorkspaceIsActive(token, workspace)
const { workspaceInfo, needRecheck } = await this.checkWorkspaceIsActive(token, workspace)
if (workspaceInfo === undefined) {
errors++
if (needRecheck) {
errors++
}
return
}
try {

View File

@ -497,7 +497,15 @@ export abstract class IssueSyncManagerBase {
return (pField.node.options ?? []).find((it) => it.id === field.optionId)
}
findOptionId (container: ContainerFocus, fieldId: string, value: string, target: IssueSyncTarget): string | undefined {
findOptionId (
container: ContainerFocus,
fieldId: string,
value: string | null,
target: IssueSyncTarget
): string | undefined {
if (value == null) {
return
}
const structure = container.container.projectStructure.get(target.target._id)
if (structure === undefined) {
return
@ -506,7 +514,7 @@ export abstract class IssueSyncManagerBase {
if (pField === undefined) {
return undefined
}
return (pField.node.options ?? []).find((it) => it.name.toLowerCase() === value.toLowerCase())?.id
return (pField.node.options ?? []).find((it) => it.name?.toLowerCase() === value.toLowerCase())?.id
}
async toPlatformField (

View File

@ -344,7 +344,7 @@ export class ProjectsSyncManager implements DocSyncManager {
derivedClient: TxOperations,
deleteExisting: boolean
): Promise<boolean> {
return false
return true
}
async externalSync (

View File

@ -127,6 +127,9 @@ export async function getSinceRaw (
export function gqlp (params: Record<string, string | number | string[] | undefined>): string {
let result = ''
let first = true
function escape (str: string): string {
return str.replace(/"/g, '\\"')
}
for (const [k, v] of Object.entries(params)) {
if (v !== undefined) {
if (!first) {
@ -136,9 +139,9 @@ export function gqlp (params: Record<string, string | number | string[] | undefi
if (typeof v === 'number') {
result += `${k}: ${v}`
} else if (Array.isArray(v)) {
result += `${k}: [${v.map((it) => `"${it}"`).join(', ')}]`
result += `${k}: [${v.map((it) => `"${escape(it)}"`).join(', ')}]`
} else {
result += `${k}: "${v}"`
result += `${k}: "${escape(v)}"`
}
}
}

View File

@ -226,7 +226,10 @@ export class GithubWorker implements IntegrationManager {
}
}
async getAccountU (user: User): Promise<PersonAccount | undefined> {
async getAccountU (user?: User): Promise<PersonAccount | undefined> {
if (user == null) {
return undefined
}
return await this.getAccount({
id: user.node_id,
login: user.login,
@ -1101,38 +1104,41 @@ export class GithubWorker implements IntegrationManager {
this.ctx.error('Failed to perform full sync', { error: err })
})
}
const { projects, repositories } = await this.collectActiveProjects()
if (projects.length === 0 && repositories.length === 0) {
await this.waitChanges()
continue
}
// Check if we have documents with external sync request's pending.
const hadExternalChanges = await this.performExternalSync(
projects,
repositories,
'externalVersion',
githubExternalSyncVersion
)
const hadSyncChanges = await this.performSync(projects, repositories)
// Perform derived operations
// Sync derived external data, like pull request reviews, files etc.
const hadDerivedChanges = await this.performExternalSync(
projects,
repositories,
'derivedVersion',
githubDerivedSyncVersion
)
if (!hadExternalChanges && !hadSyncChanges && !hadDerivedChanges) {
if (this.previousWait !== 0) {
this.ctx.info('Wait for changes:', { previousWait: this.previousWait, workspace: this.workspace.name })
this.previousWait = 0
try {
const { projects, repositories } = await this.collectActiveProjects()
if (projects.length === 0 && repositories.length === 0) {
await this.waitChanges()
continue
}
// Wait until some sync documents will be modified, updated.
await this.waitChanges()
// Check if we have documents with external sync request's pending.
const hadExternalChanges = await this.performExternalSync(
projects,
repositories,
'externalVersion',
githubExternalSyncVersion
)
const hadSyncChanges = await this.performSync(projects, repositories)
// Perform derived operations
// Sync derived external data, like pull request reviews, files etc.
const hadDerivedChanges = await this.performExternalSync(
projects,
repositories,
'derivedVersion',
githubDerivedSyncVersion
)
if (!hadExternalChanges && !hadSyncChanges && !hadDerivedChanges) {
if (this.previousWait !== 0) {
this.ctx.info('Wait for changes:', { previousWait: this.previousWait, workspace: this.workspace.name })
this.previousWait = 0
}
// Wait until some sync documents will be modified, updated.
await this.waitChanges()
}
} catch (err: any) {
this.ctx.error('failed to perform sync', { err, workspace: this.workspace.name })
}
}
}
@ -1327,25 +1333,30 @@ export class GithubWorker implements IntegrationManager {
const targetProject = await this.client.findOne(github.mixin.GithubProject, {
_id: existing.space as Ref<GithubProject>
})
if (await mapper.handleDelete(existing, info, derivedClient, false, parent)) {
const h = this._client.getHierarchy()
await derivedClient.remove(info)
if (h.hasMixin(existing, github.mixin.GithubIssue)) {
const mixinData = this._client.getHierarchy().as(existing, github.mixin.GithubIssue)
await this._client.update<GithubIssue>(
mixinData,
{
url: '',
githubNumber: 0,
repository: '' as Ref<GithubIntegrationRepository>
},
false,
Date.now(),
existing.modifiedBy
)
try {
if (await mapper.handleDelete(existing, info, derivedClient, false, parent)) {
const h = this._client.getHierarchy()
await derivedClient.remove(info)
if (h.hasMixin(existing, github.mixin.GithubIssue)) {
const mixinData = this._client.getHierarchy().as(existing, github.mixin.GithubIssue)
await this._client.update<GithubIssue>(
mixinData,
{
url: '',
githubNumber: 0,
repository: '' as Ref<GithubIntegrationRepository>
},
false,
Date.now(),
existing.modifiedBy
)
}
return
}
return
} catch (err: any) {
this.ctx.error('failed to handle delete', { err })
}
if (targetProject !== undefined) {
// We need to sync into new project.
await derivedClient.update<DocSyncInfo>(info, {
@ -1361,8 +1372,12 @@ export class GithubWorker implements IntegrationManager {
}
if (info.deleted === true) {
if (await mapper.handleDelete(existing, info, derivedClient, true)) {
await derivedClient.remove(info)
try {
if (await mapper.handleDelete(existing, info, derivedClient, true)) {
await derivedClient.remove(info)
}
} catch (err: any) {
this.ctx.error('failed to handle delete', { err })
}
return
}