From 7575a6661893cfc614c17427179be32b51a26b97 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Fri, 9 Aug 2024 18:52:44 +0700 Subject: [PATCH] UBERF-7854: Fix live query $lookup update (#6304) --- packages/query/src/index.ts | 20 +++-- .../src/components/Connect.svelte | 2 +- services/github/pod-github/src/platform.ts | 78 +++---------------- services/github/pod-github/src/worker.ts | 13 ++++ 4 files changed, 37 insertions(+), 76 deletions(-) diff --git a/packages/query/src/index.ts b/packages/query/src/index.ts index d7621db544..5232215221 100644 --- a/packages/query/src/index.ts +++ b/packages/query/src/index.ts @@ -1111,15 +1111,21 @@ export class LiveQuery implements WithTx, Client { for (const resDoc of docs) { const obj = getObjectValue(objWay, resDoc) if (obj === undefined) continue - const value = getObjectValue('$lookup.' + key, obj) + let value = getObjectValue('$lookup.' + key, obj) + const reverseCheck = reverseLookupKey !== undefined && (doc as any)[reverseLookupKey] === obj._id + if (value == null && reverseCheck) { + value = [] + obj.$lookup[key] = value + } if (Array.isArray(value)) { - if (this.client.getHierarchy().isDerived(doc._class, core.class.AttachedDoc)) { - if (reverseLookupKey !== undefined && (doc as any)[reverseLookupKey] === obj._id) { - if ((value as Doc[]).find((p) => p._id === doc._id) === undefined) { - value.push(doc) - needCallback = true - } + if (this.client.getHierarchy().isDerived(doc._class, core.class.AttachedDoc) && reverseCheck) { + const idx = (value as Doc[]).findIndex((p) => p._id === doc._id) + if (idx === -1) { + value.push(doc) + } else { + value[idx] = doc } + needCallback = true } } else { if (obj[key] === doc._id) { diff --git a/services/github/github-resources/src/components/Connect.svelte b/services/github/github-resources/src/components/Connect.svelte index 46cce16a99..7eb6e3cdae 100644 --- a/services/github/github-resources/src/components/Connect.svelte +++ b/services/github/github-resources/src/components/Connect.svelte @@ -84,7 +84,7 @@ labelIntl: getEmbeddedLabel('Github Repositories') } ] - let selectedTab: string = tabs[0].id + let selectedTab = tabs[0].id $: loading = $ticker - (auth?.authRequestTime ?? 0) < 5000 diff --git a/services/github/pod-github/src/platform.ts b/services/github/pod-github/src/platform.ts index 950c6611c6..3875c61a79 100644 --- a/services/github/pod-github/src/platform.ts +++ b/services/github/pod-github/src/platform.ts @@ -15,7 +15,7 @@ import core, { Ref, TxOperations } from '@hcengineering/core' -import github, { GithubAuthentication, GithubIntegration, makeQuery } from '@hcengineering/github' +import github, { GithubAuthentication, makeQuery } from '@hcengineering/github' import { MongoClientReference, getMongoClient } from '@hcengineering/mongo' import { setMetadata } from '@hcengineering/platform' import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage' @@ -104,11 +104,6 @@ export class PlatformWorker { this.integrations = await this.integrationCollection.find({}).toArray() await this.queryInstallations(ctx) - const workspacesToCheck = new Set() - // We need to delete local integrations not retrieved by queryInstallations() - for (const intValue of this.integrations) { - workspacesToCheck.add(intValue.workspace) - } for (const integr of [...this.integrations]) { // We need to check and remove integrations without a real integration's if (!this.installations.has(integr.installationId)) { @@ -116,31 +111,11 @@ export class PlatformWorker { installationId: integr.installationId, workspace: integr.workspace }) + await this.integrationCollection.deleteOne({ installationId: integr.installationId }) this.integrations = this.integrations.filter((it) => it.installationId !== integr.installationId) } } - const checkClean = async (): Promise => { - const rateLimit = new RateLimiter(10) - for (const workspace of workspacesToCheck) { - // We need to connect to workspace and verify all installations and clean if required - try { - await rateLimit.add(async () => { - ctx.info('check clean', { workspace }) - try { - await this.cleanWorkspaceInstallations(ctx, workspace) - } catch (err: any) { - ctx.error('failed to check clean', { workspace }) - } - }) - } catch (err: any) { - ctx.error('failed to clean workspace', { err, workspace }) - } - } - await rateLimit.waitProcessing() - } - void checkClean() - void this.doSyncWorkspaces().catch((err) => { ctx.error('error during sync workspaces', { err }) process.exit(1) @@ -181,6 +156,7 @@ export class PlatformWorker { } await new Promise((resolve) => { this.triggerCheckWorkspaces = resolve + this.ctx.info('Workspaces check triggered') if (errors) { setTimeout(resolve, 5000) } @@ -217,44 +193,6 @@ export class PlatformWorker { return (await this.usersCollection.find({ _id: login }).toArray()).shift() } - async cleanWorkspaceInstallations (ctx: MeasureContext, workspace: string, installId?: number): Promise { - // TODO: Do not remove record from $github if we failed to clean github installations inside workspace. - const token = generateToken( - config.SystemEmail, - { - name: workspace, - productId: config.ProductID - }, - { mode: 'github' } - ) - let workspaceInfo: ClientWorkspaceInfo - try { - workspaceInfo = await getWorkspaceInfo(token) - } catch (err: any) { - ctx.error('Workspace not found:', { workspace }) - return - } - if (workspaceInfo === undefined) { - ctx.error('No workspace found', { workspace }) - return - } - let client: Client | undefined - try { - client = await createPlatformClient(workspace, config.ProductID, 10000) - const ops = new TxOperations(client, core.account.System) - - const wsIntegerations = await client.findAll(github.class.GithubIntegration, {}) - - for (const intValue of wsIntegerations) { - if (!this.installations.has(intValue.installationId) || intValue.installationId === installId) { - await ops.remove(intValue) - } - } - } finally { - await client?.close() - } - } - async mapInstallation ( ctx: MeasureContext, workspace: string, @@ -297,8 +235,6 @@ export class PlatformWorker { installation_id: installationId }) } - // Clean workspace - await this.cleanWorkspaceInstallations(ctx, workspace, installationId) this.triggerCheckWorkspaces() } @@ -679,6 +615,7 @@ export class PlatformWorker { index: widx, total: workspaces.length }) + const worker = await GithubWorker.create( this, workerCtx, @@ -708,6 +645,12 @@ export class PlatformWorker { // No if no integration, we will try connect one more time in a time period this.clients.set(workspace, worker) } else { + workerCtx.info('Failed Register worker, timeout or integrations removed', { + workspaceId: workspaceInfo.workspaceId, + workspace: workspaceInfo.workspace, + index: widx, + total: workspaces.length + }) errors++ } } catch (e: any) { @@ -729,7 +672,6 @@ export class PlatformWorker { const ws = this.clients.get(deleted) if (ws !== undefined) { try { - await ws.ctx.logger.close() this.ctx.info('workspace removed from tracking list', { workspace: deleted }) this.clients.delete(deleted) await ws.close() diff --git a/services/github/pod-github/src/worker.ts b/services/github/pod-github/src/worker.ts index ac61d6e615..44f15e84a6 100644 --- a/services/github/pod-github/src/worker.ts +++ b/services/github/pod-github/src/worker.ts @@ -1491,6 +1491,8 @@ export class GithubWorker implements IntegrationManager { reconnect(workspace.name, event) }) + await GithubWorker.checkIntegrations(client, installations) + const worker = new GithubWorker( ctx, platformWorker, @@ -1510,6 +1512,17 @@ export class GithubWorker implements IntegrationManager { await client?.close() } } + + static async checkIntegrations (client: Client, installations: Map): Promise { + const wsIntegerations = await client.findAll(github.class.GithubIntegration, {}) + + for (const intValue of wsIntegerations) { + if (!installations.has(intValue.installationId)) { + const ops = new TxOperations(client, core.account.System) + await ops.remove(intValue) + } + } + } } export async function syncUser (