From ceb1c95c7cca9fe4387a35b4320637a57dc95934 Mon Sep 17 00:00:00 2001
From: Alexander Onnikov <Alexander.Onnikov@xored.com>
Date: Wed, 8 May 2024 13:06:38 +0700
Subject: [PATCH] EQMS-6844 Do not keep platform client open (#5539)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
---
 server/collaborator/src/context.ts            |  6 +-
 .../src/extensions/authentication.ts          | 23 ++++---
 server/collaborator/src/platform.ts           | 34 +++++------
 server/collaborator/src/server.ts             | 23 +++----
 server/collaborator/src/storage/platform.ts   | 60 ++++++++++---------
 5 files changed, 73 insertions(+), 73 deletions(-)

diff --git a/server/collaborator/src/context.ts b/server/collaborator/src/context.ts
index 36d60f249b..b0de5441ef 100644
--- a/server/collaborator/src/context.ts
+++ b/server/collaborator/src/context.ts
@@ -17,7 +17,7 @@ import { type DocumentId, type PlatformDocumentId } from '@hcengineering/collabo
 import { WorkspaceId, generateId } from '@hcengineering/core'
 import { decodeToken } from '@hcengineering/server-token'
 import { onAuthenticatePayload } from '@hocuspocus/server'
-import { ClientFactory, Controller, getClientFactory } from './platform'
+import { ClientFactory, simpleClientFactory } from './platform'
 
 export interface Context {
   connectionId: string
@@ -36,7 +36,7 @@ export type withContext<T extends WithContext> = Omit<T, 'context'> & {
   context: Context
 }
 
-export function buildContext (data: onAuthenticatePayload, controller: Controller): Context {
+export function buildContext (data: onAuthenticatePayload): Context {
   const context = data.context as Partial<Context>
 
   const connectionId = context.connectionId ?? generateId()
@@ -48,7 +48,7 @@ export function buildContext (data: onAuthenticatePayload, controller: Controlle
   return {
     connectionId,
     workspaceId: decodedToken.workspace,
-    clientFactory: getClientFactory(decodedToken, controller),
+    clientFactory: simpleClientFactory(decodedToken),
     initialContentId,
     platformDocumentId
   }
diff --git a/server/collaborator/src/extensions/authentication.ts b/server/collaborator/src/extensions/authentication.ts
index c82b209666..98b877084c 100644
--- a/server/collaborator/src/extensions/authentication.ts
+++ b/server/collaborator/src/extensions/authentication.ts
@@ -20,11 +20,9 @@ import { Extension, onAuthenticatePayload } from '@hocuspocus/server'
 
 import { getWorkspaceInfo } from '../account'
 import { Context, buildContext } from '../context'
-import { Controller } from '../platform'
 
 export interface AuthenticationConfiguration {
   ctx: MeasureContext
-  controller: Controller
 }
 
 export class AuthenticationExtension implements Extension {
@@ -35,20 +33,21 @@ export class AuthenticationExtension implements Extension {
   }
 
   async onAuthenticate (data: onAuthenticatePayload): Promise<Context> {
-    this.configuration.ctx.measure('authenticate', 1)
-
+    const ctx = this.configuration.ctx
     const { workspaceUrl, collaborativeDoc } = parseDocumentId(data.documentName as DocumentId)
 
-    // verify workspace can be accessed with the token
-    const workspaceInfo = await getWorkspaceInfo(data.token)
+    return await ctx.with('authenticate', { workspace: workspaceUrl }, async () => {
+      // verify workspace can be accessed with the token
+      const workspaceInfo = await getWorkspaceInfo(data.token)
 
-    // verify workspace url in the document matches the token
-    if (workspaceInfo.workspace !== workspaceUrl) {
-      throw new Error('documentName must include workspace')
-    }
+      // verify workspace url in the document matches the token
+      if (workspaceInfo.workspace !== workspaceUrl) {
+        throw new Error('documentName must include workspace')
+      }
 
-    data.connection.readOnly = isReadonlyDoc(collaborativeDoc)
+      data.connection.readOnly = isReadonlyDoc(collaborativeDoc)
 
-    return buildContext(data, this.configuration.controller)
+      return buildContext(data)
+    })
   }
 }
diff --git a/server/collaborator/src/platform.ts b/server/collaborator/src/platform.ts
index b072581864..0a9ff48480 100644
--- a/server/collaborator/src/platform.ts
+++ b/server/collaborator/src/platform.ts
@@ -15,7 +15,7 @@
 
 import client from '@hcengineering/client'
 import clientResources from '@hcengineering/client-resources'
-import core, { Client, Tx, TxOperations, WorkspaceId, systemAccountEmail, toWorkspaceString } from '@hcengineering/core'
+import core, { Client, TxOperations, WorkspaceId, systemAccountEmail, toWorkspaceString } from '@hcengineering/core'
 import { setMetadata } from '@hcengineering/platform'
 import { Token, generateToken } from '@hcengineering/server-token'
 import config from './config'
@@ -52,13 +52,25 @@ export interface ClientFactoryParams {
 /**
  * @public
  */
-export type ClientFactory = (params: ClientFactoryParams) => Promise<TxOperations>
+export type ClientFactory = (params?: ClientFactoryParams) => Promise<TxOperations>
 
 /**
  * @public
  */
-export function getClientFactory (token: Token, controller: Controller): ClientFactory {
-  return async ({ derived }: ClientFactoryParams) => {
+export function simpleClientFactory (token: Token): ClientFactory {
+  return async (params?: ClientFactoryParams) => {
+    const derived = params?.derived ?? false
+    const client = await connect(generateToken(token.email, token.workspace))
+    return await getTxOperations(client, token, derived)
+  }
+}
+
+/**
+ * @public
+ */
+export function reusableClientFactory (token: Token, controller: Controller): ClientFactory {
+  return async (params?: ClientFactoryParams) => {
+    const derived = params?.derived ?? false
     const workspaceClient = await controller.get(token.workspace)
     return await getTxOperations(workspaceClient.client, token, derived)
   }
@@ -94,16 +106,10 @@ export class Controller {
  * @public
  */
 export class WorkspaceClient {
-  private readonly txHandlers: ((...tx: Tx[]) => Promise<void>)[] = []
-
   private constructor (
     readonly workspace: WorkspaceId,
     readonly client: Client
-  ) {
-    this.client.notify = (...tx: Tx[]) => {
-      void this.txHandler(...tx)
-    }
-  }
+  ) {}
 
   static async create (workspace: WorkspaceId): Promise<WorkspaceClient> {
     const token = generateToken(systemAccountEmail, workspace)
@@ -114,10 +120,4 @@ export class WorkspaceClient {
   async close (): Promise<void> {
     await this.client.close()
   }
-
-  private async txHandler (...tx: Tx[]): Promise<void> {
-    for (const h of this.txHandlers) {
-      await h(...tx)
-    }
-  }
 }
diff --git a/server/collaborator/src/server.ts b/server/collaborator/src/server.ts
index 2b88d19e0c..51b6fc65a3 100644
--- a/server/collaborator/src/server.ts
+++ b/server/collaborator/src/server.ts
@@ -18,7 +18,7 @@ import { MeasureContext, generateId, metricsAggregate } from '@hcengineering/cor
 import { MinioService } from '@hcengineering/minio'
 import { Token, decodeToken } from '@hcengineering/server-token'
 import { ServerKit } from '@hcengineering/text'
-import { Hocuspocus, onDestroyPayload } from '@hocuspocus/server'
+import { Hocuspocus } from '@hocuspocus/server'
 import bp from 'body-parser'
 import compression from 'compression'
 import cors from 'cors'
@@ -31,7 +31,7 @@ import { Config } from './config'
 import { Context } from './context'
 import { AuthenticationExtension } from './extensions/authentication'
 import { StorageExtension } from './extensions/storage'
-import { Controller, getClientFactory } from './platform'
+import { simpleClientFactory } from './platform'
 import { RpcErrorResponse, RpcRequest, RpcResponse, methods } from './rpc'
 import { PlatformStorageAdapter } from './storage/platform'
 import { MarkupTransformer } from './transformers/markup'
@@ -83,8 +83,6 @@ export async function start (
 
   const extensionsCtx = ctx.newChild('extensions', {})
 
-  const controller = new Controller()
-
   const transformer = new MarkupTransformer(extensions)
 
   const hocuspocus = new Hocuspocus({
@@ -124,18 +122,13 @@ export async function start (
 
     extensions: [
       new AuthenticationExtension({
-        ctx: extensionsCtx.newChild('authenticate', {}),
-        controller
+        ctx: extensionsCtx.newChild('authenticate', {})
       }),
       new StorageExtension({
         ctx: extensionsCtx.newChild('storage', {}),
         adapter: new PlatformStorageAdapter({ minio }, mongo, transformer)
       })
-    ],
-
-    async onDestroy (data: onDestroyPayload): Promise<void> {
-      await controller.close()
-    }
+    ]
   })
 
   const rpcCtx = ctx.newChild('rpc', {})
@@ -144,7 +137,7 @@ export async function start (
     return {
       connectionId: generateId(),
       workspaceId: token.workspace,
-      clientFactory: getClientFactory(token, controller)
+      clientFactory: simpleClientFactory(token)
     }
   }
 
@@ -192,9 +185,11 @@ export async function start (
       }
       res.status(400).send(response)
     } else {
-      await rpcCtx.withLog('/rpc', { method: request.method }, async (ctx) => {
+      await rpcCtx.with('/rpc', { method: request.method }, async (ctx) => {
         try {
-          const response: RpcResponse = await method(ctx, context, request.payload, { hocuspocus, minio, transformer })
+          const response: RpcResponse = await rpcCtx.with(request.method, {}, async (ctx) => {
+            return await method(ctx, context, request.payload, { hocuspocus, minio, transformer })
+          })
           res.status(200).send(response)
         } catch (err: any) {
           res.status(500).send({ error: err.message })
diff --git a/server/collaborator/src/storage/platform.ts b/server/collaborator/src/storage/platform.ts
index 41d0f0ff66..54f0474900 100644
--- a/server/collaborator/src/storage/platform.ts
+++ b/server/collaborator/src/storage/platform.ts
@@ -29,6 +29,7 @@ import core, {
   CollaborativeDoc,
   Doc,
   MeasureContext,
+  TxOperations,
   collaborativeDocWithLastVersion,
   toWorkspaceString
 } from '@hcengineering/core'
@@ -87,7 +88,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
       const { platformDocumentId } = context
       if (platformDocumentId !== undefined) {
         ctx.info('load document platform content', { documentId, platformDocumentId })
-        const ydoc = await ctx.with('load-document', { storage: 'platform' }, async (ctx) => {
+        const ydoc = await ctx.with('load-from-platform', {}, async (ctx) => {
           try {
             return await this.loadDocumentFromPlatform(ctx, platformDocumentId, context)
           } catch (err) {
@@ -112,27 +113,37 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
   }
 
   async saveDocument (ctx: MeasureContext, documentId: DocumentId, document: YDoc, context: Context): Promise<void> {
-    let snapshot: YDocVersion | undefined
-    try {
-      ctx.info('take document snapshot', { documentId })
-      snapshot = await this.takeSnapshot(ctx, documentId, document, context)
-    } catch (err) {
-      ctx.error('failed to take document snapshot', { documentId, error: err })
-    }
+    const { clientFactory } = context
+
+    const client = await ctx.with('connect', {}, async () => {
+      return await clientFactory()
+    })
 
     try {
-      ctx.info('save document content', { documentId })
-      await this.saveDocumentToStorage(ctx, documentId, document, context)
-    } catch (err) {
-      ctx.error('failed to save document', { documentId, error: err })
-    }
+      let snapshot: YDocVersion | undefined
+      try {
+        ctx.info('take document snapshot', { documentId })
+        snapshot = await this.takeSnapshot(ctx, client, documentId, document, context)
+      } catch (err) {
+        ctx.error('failed to take document snapshot', { documentId, error: err })
+      }
 
-    const { platformDocumentId } = context
-    if (platformDocumentId !== undefined) {
-      ctx.info('save document content to platform', { documentId, platformDocumentId })
-      await ctx.with('save-document', { storage: 'platform' }, async (ctx) => {
-        await this.saveDocumentToPlatform(ctx, documentId, platformDocumentId, document, snapshot, context)
-      })
+      try {
+        ctx.info('save document content', { documentId })
+        await this.saveDocumentToStorage(ctx, documentId, document, context)
+      } catch (err) {
+        ctx.error('failed to save document', { documentId, error: err })
+      }
+
+      const { platformDocumentId } = context
+      if (platformDocumentId !== undefined) {
+        ctx.info('save document content to platform', { documentId, platformDocumentId })
+        await ctx.with('save-to-platform', {}, async (ctx) => {
+          await this.saveDocumentToPlatform(ctx, client, documentId, platformDocumentId, document, snapshot, context)
+        })
+      }
+    } finally {
+      await client.close()
     }
   }
 
@@ -180,6 +191,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
 
   async takeSnapshot (
     ctx: MeasureContext,
+    client: Omit<TxOperations, 'close'>,
     documentId: DocumentId,
     document: YDoc,
     context: Context
@@ -187,9 +199,8 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
     const { storage, collaborativeDoc } = parseDocumentId(documentId)
     const adapter = this.getStorageAdapter(storage)
 
-    const { clientFactory, workspaceId } = context
+    const { workspaceId } = context
 
-    const client = await clientFactory({ derived: false })
     const timestamp = Date.now()
 
     const yDocVersion: YDocVersion = {
@@ -233,6 +244,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
 
   async saveDocumentToPlatform (
     ctx: MeasureContext,
+    client: Omit<TxOperations, 'close'>,
     documentName: string,
     platformDocumentId: PlatformDocumentId,
     document: YDoc,
@@ -241,12 +253,6 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
   ): Promise<void> {
     const { objectClass, objectId, objectAttr } = parsePlatformDocumentId(platformDocumentId)
 
-    const { clientFactory } = context
-
-    const client = await ctx.with('connect', {}, async () => {
-      return await clientFactory({ derived: false })
-    })
-
     const attribute = client.getHierarchy().findAttribute(objectClass, objectAttr)
     if (attribute === undefined) {
       ctx.info('attribute not found', { documentName, objectClass, objectAttr })