From 9c04340976c9ad0d9392b6bc7b9561c8ad1ff221 Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Tue, 3 Jun 2025 15:48:18 +0700 Subject: [PATCH] UBERF-11409 Retry collab doc save on storage failure (#9155) Signed-off-by: Alexander Onnikov --- server/collaborator/src/extensions/storage.ts | 144 ++++++++++++++---- 1 file changed, 116 insertions(+), 28 deletions(-) diff --git a/server/collaborator/src/extensions/storage.ts b/server/collaborator/src/extensions/storage.ts index 1f38b2baf5..213800b6a9 100644 --- a/server/collaborator/src/extensions/storage.ts +++ b/server/collaborator/src/extensions/storage.ts @@ -1,5 +1,5 @@ // -// Copyright © 2023 Hardcore Engineering Inc. +// Copyright © 2025 Hardcore Engineering Inc. // // Licensed under the Eclipse Public License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. You may @@ -35,6 +35,7 @@ export interface StorageConfiguration { ctx: MeasureContext adapter: CollabStorageAdapter transformer: Transformer + saveRetryInterval?: number } type DocumentName = string @@ -50,9 +51,30 @@ export class StorageExtension implements Extension { private readonly configuration: StorageConfiguration private readonly updates = new Map() private readonly markups = new Map>() + private readonly promises = new Map>() + + private readonly saveRetryInterval: number + private stopped = false constructor (configuration: StorageConfiguration) { this.configuration = configuration + this.saveRetryInterval = configuration.saveRetryInterval ?? 1000 + } + + async onDestroy (): Promise { + this.stopped = true + const documents = Array.from(this.promises.keys()) + const promises = Array.from(this.promises.values()) + + if (promises.length > 0) { + const { ctx } = this.configuration + try { + ctx.info('waiting for pending document saves', { documents, count: promises.length }) + await Promise.all(promises) + } catch (error) { + ctx.error('error while waiting for pending document saves', { documents, error }) + } + } } async onChange ({ context, document, documentName }: withContext): Promise { @@ -98,18 +120,15 @@ export class StorageExtension implements Extension { const { ctx } = this.configuration const { connectionId } = context - const updates = this.updates.get(documentName) const connections = document.getConnectionsCount() - const collaborators = updates?.collaborators.size ?? 0 - ctx.info('store document', { documentName, connectionId, connections, collaborators }) + ctx.info('store document', { documentName, connectionId, connections }) - if (updates === undefined || updates.collaborators.size === 0) { + if (this.hasNoUpdates(documentName)) { ctx.info('no changes for document', { documentName, connectionId }) return } - updates.collaborators.clear() - await this.storeDocument(documentName, document, updates.context) + await this.storeDocument(documentName, document, context) } async onConnect ({ context, documentName, instance }: withContext): Promise { @@ -122,13 +141,11 @@ export class StorageExtension implements Extension { const { ctx } = this.configuration const { connectionId } = context - const updates = this.updates.get(documentName) const connections = document.getConnectionsCount() - const collaborators = updates?.collaborators.size ?? 0 - const updatedAt = updates?.collaborators.get(connectionId) - ctx.info('disconnect from document', { documentName, connectionId, connections, collaborators, updatedAt }) + ctx.info('disconnect from document', { documentName, connectionId, connections }) - if (updates === undefined || !updates.collaborators.has(connectionId)) { + const noUpdates = this.hasNoUpdates(documentName, connectionId) + if (noUpdates) { ctx.info('no changes for document', { documentName, connectionId }) return } @@ -138,14 +155,14 @@ export class StorageExtension implements Extension { return } - updates.collaborators.clear() - await this.storeDocument(documentName, document, context) + await this.storeDocument(documentName, document, context, connectionId) } async afterUnloadDocument ({ documentName }: afterUnloadDocumentPayload): Promise { this.configuration.ctx.info('unload document', { documentName }) this.updates.delete(documentName) this.markups.delete(documentName) + this.promises.delete(documentName) } private async loadDocument (documentName: string, context: Context): Promise { @@ -162,22 +179,93 @@ export class StorageExtension implements Extension { } } - private async storeDocument (documentName: string, document: Document, context: Context): Promise { - const { ctx, adapter } = this.configuration + private async storeDocument ( + documentName: string, + document: Document, + context: Context, + connectionId?: string + ): Promise { + const prev = this.promises.get(documentName) + + const curr = async (): Promise => { + if (prev !== undefined) { + await prev + } + + // Check whether we still have changes after the previous save + const noUpdates = this.hasNoUpdates(documentName, connectionId) + if (!noUpdates) { + await this.performStoreDocument(documentName, document, context) + } + } + + const promise = curr() + this.promises.set(documentName, promise) try { - const currMarkup = await ctx.with('save-document', {}, (ctx) => - adapter.saveDocument(ctx, documentName, document, context, { - prev: () => this.markups.get(documentName) ?? {}, - curr: () => this.configuration.transformer.fromYdoc(document) - }) - ) - - this.markups.set(documentName, currMarkup ?? {}) - } catch (err: any) { - Analytics.handleError(err) - ctx.error('failed to save document', { documentName, error: err }) - throw new Error('Failed to save document') + await promise + } finally { + if (this.promises.get(documentName) === promise) { + this.promises.delete(documentName) + } } } + + private async performStoreDocument (documentName: string, document: Document, context: Context): Promise { + const { ctx, adapter } = this.configuration + + let attempt = 0 + while (true) { + attempt++ + const now = Date.now() + + try { + const currMarkup = await ctx.with('save-document', {}, (ctx) => + adapter.saveDocument(ctx, documentName, document, context, { + prev: () => this.markups.get(documentName) ?? {}, + curr: () => this.configuration.transformer.fromYdoc(document) + }) + ) + + this.markups.set(documentName, currMarkup ?? {}) + this.clearUpdates(documentName, now) + + return + } catch (err: any) { + Analytics.handleError(err) + ctx.error('failed to save document', { documentName, attempt, error: err }) + + if (this.stopped) { + ctx.info('storage extension stopped, skipping document save', { documentName }) + throw new Error('Aborted') + } + + await new Promise((resolve) => setTimeout(resolve, this.saveRetryInterval)) + } + } + } + + private clearUpdates (documentName: string, timestamp: number): void { + const updates = this.updates.get(documentName) + if (updates !== undefined) { + for (const [connectionId, updatedAt] of updates.collaborators.entries()) { + if (updatedAt < timestamp) { + updates.collaborators.delete(connectionId) + } + } + } + } + + private hasNoUpdates (documentName: string, connectionId?: string): boolean { + const updates = this.updates.get(documentName) + if (updates === undefined) { + return true + } + + if (connectionId !== undefined) { + return !updates.collaborators.has(connectionId) + } + + return updates.collaborators.size === 0 + } }