mirror of
https://github.com/hcengineering/platform.git
synced 2025-06-09 09:20:54 +00:00
UBERF-11409 Retry collab doc save on storage failure (#9155)
Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
parent
26077b475d
commit
9c04340976
@ -1,5 +1,5 @@
|
|||||||
//
|
//
|
||||||
// Copyright © 2023 Hardcore Engineering Inc.
|
// Copyright © 2025 Hardcore Engineering Inc.
|
||||||
//
|
//
|
||||||
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
|
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
|
||||||
// you may not use this file except in compliance with the License. You may
|
// you may not use this file except in compliance with the License. You may
|
||||||
@ -35,6 +35,7 @@ export interface StorageConfiguration {
|
|||||||
ctx: MeasureContext
|
ctx: MeasureContext
|
||||||
adapter: CollabStorageAdapter
|
adapter: CollabStorageAdapter
|
||||||
transformer: Transformer
|
transformer: Transformer
|
||||||
|
saveRetryInterval?: number
|
||||||
}
|
}
|
||||||
|
|
||||||
type DocumentName = string
|
type DocumentName = string
|
||||||
@ -50,9 +51,30 @@ export class StorageExtension implements Extension {
|
|||||||
private readonly configuration: StorageConfiguration
|
private readonly configuration: StorageConfiguration
|
||||||
private readonly updates = new Map<DocumentName, DocumentUpdates>()
|
private readonly updates = new Map<DocumentName, DocumentUpdates>()
|
||||||
private readonly markups = new Map<DocumentName, Record<Markup, Markup>>()
|
private readonly markups = new Map<DocumentName, Record<Markup, Markup>>()
|
||||||
|
private readonly promises = new Map<DocumentName, Promise<void>>()
|
||||||
|
|
||||||
|
private readonly saveRetryInterval: number
|
||||||
|
private stopped = false
|
||||||
|
|
||||||
constructor (configuration: StorageConfiguration) {
|
constructor (configuration: StorageConfiguration) {
|
||||||
this.configuration = configuration
|
this.configuration = configuration
|
||||||
|
this.saveRetryInterval = configuration.saveRetryInterval ?? 1000
|
||||||
|
}
|
||||||
|
|
||||||
|
async onDestroy (): Promise<any> {
|
||||||
|
this.stopped = true
|
||||||
|
const documents = Array.from(this.promises.keys())
|
||||||
|
const promises = Array.from(this.promises.values())
|
||||||
|
|
||||||
|
if (promises.length > 0) {
|
||||||
|
const { ctx } = this.configuration
|
||||||
|
try {
|
||||||
|
ctx.info('waiting for pending document saves', { documents, count: promises.length })
|
||||||
|
await Promise.all(promises)
|
||||||
|
} catch (error) {
|
||||||
|
ctx.error('error while waiting for pending document saves', { documents, error })
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async onChange ({ context, document, documentName }: withContext<onChangePayload>): Promise<any> {
|
async onChange ({ context, document, documentName }: withContext<onChangePayload>): Promise<any> {
|
||||||
@ -98,18 +120,15 @@ export class StorageExtension implements Extension {
|
|||||||
const { ctx } = this.configuration
|
const { ctx } = this.configuration
|
||||||
const { connectionId } = context
|
const { connectionId } = context
|
||||||
|
|
||||||
const updates = this.updates.get(documentName)
|
|
||||||
const connections = document.getConnectionsCount()
|
const connections = document.getConnectionsCount()
|
||||||
const collaborators = updates?.collaborators.size ?? 0
|
ctx.info('store document', { documentName, connectionId, connections })
|
||||||
ctx.info('store document', { documentName, connectionId, connections, collaborators })
|
|
||||||
|
|
||||||
if (updates === undefined || updates.collaborators.size === 0) {
|
if (this.hasNoUpdates(documentName)) {
|
||||||
ctx.info('no changes for document', { documentName, connectionId })
|
ctx.info('no changes for document', { documentName, connectionId })
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
updates.collaborators.clear()
|
await this.storeDocument(documentName, document, context)
|
||||||
await this.storeDocument(documentName, document, updates.context)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async onConnect ({ context, documentName, instance }: withContext<onConnectPayload>): Promise<any> {
|
async onConnect ({ context, documentName, instance }: withContext<onConnectPayload>): Promise<any> {
|
||||||
@ -122,13 +141,11 @@ export class StorageExtension implements Extension {
|
|||||||
const { ctx } = this.configuration
|
const { ctx } = this.configuration
|
||||||
const { connectionId } = context
|
const { connectionId } = context
|
||||||
|
|
||||||
const updates = this.updates.get(documentName)
|
|
||||||
const connections = document.getConnectionsCount()
|
const connections = document.getConnectionsCount()
|
||||||
const collaborators = updates?.collaborators.size ?? 0
|
ctx.info('disconnect from document', { documentName, connectionId, connections })
|
||||||
const updatedAt = updates?.collaborators.get(connectionId)
|
|
||||||
ctx.info('disconnect from document', { documentName, connectionId, connections, collaborators, updatedAt })
|
|
||||||
|
|
||||||
if (updates === undefined || !updates.collaborators.has(connectionId)) {
|
const noUpdates = this.hasNoUpdates(documentName, connectionId)
|
||||||
|
if (noUpdates) {
|
||||||
ctx.info('no changes for document', { documentName, connectionId })
|
ctx.info('no changes for document', { documentName, connectionId })
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -138,14 +155,14 @@ export class StorageExtension implements Extension {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
updates.collaborators.clear()
|
await this.storeDocument(documentName, document, context, connectionId)
|
||||||
await this.storeDocument(documentName, document, context)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async afterUnloadDocument ({ documentName }: afterUnloadDocumentPayload): Promise<any> {
|
async afterUnloadDocument ({ documentName }: afterUnloadDocumentPayload): Promise<any> {
|
||||||
this.configuration.ctx.info('unload document', { documentName })
|
this.configuration.ctx.info('unload document', { documentName })
|
||||||
this.updates.delete(documentName)
|
this.updates.delete(documentName)
|
||||||
this.markups.delete(documentName)
|
this.markups.delete(documentName)
|
||||||
|
this.promises.delete(documentName)
|
||||||
}
|
}
|
||||||
|
|
||||||
private async loadDocument (documentName: string, context: Context): Promise<YDoc | undefined> {
|
private async loadDocument (documentName: string, context: Context): Promise<YDoc | undefined> {
|
||||||
@ -162,22 +179,93 @@ export class StorageExtension implements Extension {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async storeDocument (documentName: string, document: Document, context: Context): Promise<void> {
|
private async storeDocument (
|
||||||
const { ctx, adapter } = this.configuration
|
documentName: string,
|
||||||
|
document: Document,
|
||||||
|
context: Context,
|
||||||
|
connectionId?: string
|
||||||
|
): Promise<void> {
|
||||||
|
const prev = this.promises.get(documentName)
|
||||||
|
|
||||||
|
const curr = async (): Promise<void> => {
|
||||||
|
if (prev !== undefined) {
|
||||||
|
await prev
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check whether we still have changes after the previous save
|
||||||
|
const noUpdates = this.hasNoUpdates(documentName, connectionId)
|
||||||
|
if (!noUpdates) {
|
||||||
|
await this.performStoreDocument(documentName, document, context)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const promise = curr()
|
||||||
|
this.promises.set(documentName, promise)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const currMarkup = await ctx.with('save-document', {}, (ctx) =>
|
await promise
|
||||||
adapter.saveDocument(ctx, documentName, document, context, {
|
} finally {
|
||||||
prev: () => this.markups.get(documentName) ?? {},
|
if (this.promises.get(documentName) === promise) {
|
||||||
curr: () => this.configuration.transformer.fromYdoc(document)
|
this.promises.delete(documentName)
|
||||||
})
|
}
|
||||||
)
|
|
||||||
|
|
||||||
this.markups.set(documentName, currMarkup ?? {})
|
|
||||||
} catch (err: any) {
|
|
||||||
Analytics.handleError(err)
|
|
||||||
ctx.error('failed to save document', { documentName, error: err })
|
|
||||||
throw new Error('Failed to save document')
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async performStoreDocument (documentName: string, document: Document, context: Context): Promise<void> {
|
||||||
|
const { ctx, adapter } = this.configuration
|
||||||
|
|
||||||
|
let attempt = 0
|
||||||
|
while (true) {
|
||||||
|
attempt++
|
||||||
|
const now = Date.now()
|
||||||
|
|
||||||
|
try {
|
||||||
|
const currMarkup = await ctx.with('save-document', {}, (ctx) =>
|
||||||
|
adapter.saveDocument(ctx, documentName, document, context, {
|
||||||
|
prev: () => this.markups.get(documentName) ?? {},
|
||||||
|
curr: () => this.configuration.transformer.fromYdoc(document)
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
this.markups.set(documentName, currMarkup ?? {})
|
||||||
|
this.clearUpdates(documentName, now)
|
||||||
|
|
||||||
|
return
|
||||||
|
} catch (err: any) {
|
||||||
|
Analytics.handleError(err)
|
||||||
|
ctx.error('failed to save document', { documentName, attempt, error: err })
|
||||||
|
|
||||||
|
if (this.stopped) {
|
||||||
|
ctx.info('storage extension stopped, skipping document save', { documentName })
|
||||||
|
throw new Error('Aborted')
|
||||||
|
}
|
||||||
|
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, this.saveRetryInterval))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private clearUpdates (documentName: string, timestamp: number): void {
|
||||||
|
const updates = this.updates.get(documentName)
|
||||||
|
if (updates !== undefined) {
|
||||||
|
for (const [connectionId, updatedAt] of updates.collaborators.entries()) {
|
||||||
|
if (updatedAt < timestamp) {
|
||||||
|
updates.collaborators.delete(connectionId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private hasNoUpdates (documentName: string, connectionId?: string): boolean {
|
||||||
|
const updates = this.updates.get(documentName)
|
||||||
|
if (updates === undefined) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if (connectionId !== undefined) {
|
||||||
|
return !updates.collaborators.has(connectionId)
|
||||||
|
}
|
||||||
|
|
||||||
|
return updates.collaborators.size === 0
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user