mirror of
https://github.com/hcengineering/platform.git
synced 2025-06-11 21:11:57 +00:00
fix: implement infinite save attempts
Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
parent
61921126fe
commit
0b5308e2f0
@ -18,11 +18,9 @@ import { type Markup, MeasureContext } from '@hcengineering/core'
|
||||
import {
|
||||
Document,
|
||||
Extension,
|
||||
Hocuspocus,
|
||||
afterLoadDocumentPayload,
|
||||
afterUnloadDocumentPayload,
|
||||
onChangePayload,
|
||||
onConfigurePayload,
|
||||
onConnectPayload,
|
||||
onDisconnectPayload,
|
||||
onLoadDocumentPayload,
|
||||
@ -37,7 +35,7 @@ export interface StorageConfiguration {
|
||||
ctx: MeasureContext
|
||||
adapter: CollabStorageAdapter
|
||||
transformer: Transformer
|
||||
retryIntervalMs?: number
|
||||
saveRetryInterval?: number
|
||||
}
|
||||
|
||||
type DocumentName = string
|
||||
@ -53,26 +51,30 @@ export class StorageExtension implements Extension {
|
||||
private readonly configuration: StorageConfiguration
|
||||
private readonly updates = new Map<DocumentName, DocumentUpdates>()
|
||||
private readonly markups = new Map<DocumentName, Record<Markup, Markup>>()
|
||||
private readonly failedDocuments = new Map<DocumentName, Context>()
|
||||
private readonly retryInterval
|
||||
private instance: Hocuspocus | undefined
|
||||
private readonly promises = new Map<DocumentName, Promise<void>>()
|
||||
|
||||
private readonly saveRetryInterval: number
|
||||
private stopped = false
|
||||
|
||||
constructor (configuration: StorageConfiguration) {
|
||||
this.configuration = configuration
|
||||
|
||||
const retryIntervalMs = configuration.retryIntervalMs ?? 1000 * 60
|
||||
this.retryInterval = setInterval(() => {
|
||||
void this.retrySaveDocuments()
|
||||
}, retryIntervalMs)
|
||||
this.saveRetryInterval = configuration.saveRetryInterval ?? 1000
|
||||
}
|
||||
|
||||
async onDestroy (): Promise<any> {
|
||||
clearInterval(this.retryInterval)
|
||||
await this.retrySaveDocuments()
|
||||
}
|
||||
this.stopped = true
|
||||
const documents = Array.from(this.promises.keys())
|
||||
const promises = Array.from(this.promises.values())
|
||||
|
||||
async onConfigure ({ instance }: onConfigurePayload): Promise<any> {
|
||||
this.instance = instance
|
||||
if (promises.length > 0) {
|
||||
const { ctx } = this.configuration
|
||||
try {
|
||||
ctx.info('waiting for pending document saves', { documents, count: promises.length })
|
||||
await Promise.all(promises)
|
||||
} catch (error) {
|
||||
ctx.error('error while waiting for pending document saves', { documents, error })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async onChange ({ context, document, documentName }: withContext<onChangePayload>): Promise<any> {
|
||||
@ -115,29 +117,19 @@ export class StorageExtension implements Extension {
|
||||
}
|
||||
|
||||
async onStoreDocument ({ context, documentName, document }: withContext<onStoreDocumentPayload>): Promise<void> {
|
||||
console.log('onStoreDocument', { context, documentName })
|
||||
const { ctx } = this.configuration
|
||||
const { connectionId } = context
|
||||
|
||||
const updates = this.updates.get(documentName)
|
||||
const connections = document.getConnectionsCount()
|
||||
const collaborators = updates?.collaborators.size ?? 0
|
||||
ctx.info('store document', { documentName, connectionId, connections, collaborators })
|
||||
ctx.info('store document', { documentName, connectionId, connections })
|
||||
|
||||
if (updates === undefined || updates.collaborators.size === 0) {
|
||||
if (this.hasNoUpdates(documentName)) {
|
||||
ctx.info('no changes for document', { documentName, connectionId })
|
||||
return
|
||||
}
|
||||
|
||||
const now = Date.now()
|
||||
|
||||
await this.storeDocument(documentName, document, updates.context)
|
||||
|
||||
// Remove collaborators that were not updated from before save
|
||||
for (const [connectionId, updatedAt] of updates.collaborators.entries()) {
|
||||
if (updatedAt < now) {
|
||||
updates.collaborators.delete(connectionId)
|
||||
}
|
||||
}
|
||||
await this.storeDocument(documentName, document, context)
|
||||
}
|
||||
|
||||
async onConnect ({ context, documentName, instance }: withContext<onConnectPayload>): Promise<any> {
|
||||
@ -147,16 +139,15 @@ export class StorageExtension implements Extension {
|
||||
}
|
||||
|
||||
async onDisconnect ({ context, documentName, document }: withContext<onDisconnectPayload>): Promise<any> {
|
||||
console.log('onDisconnect', { context, documentName })
|
||||
const { ctx } = this.configuration
|
||||
const { connectionId } = context
|
||||
|
||||
const updates = this.updates.get(documentName)
|
||||
const connections = document.getConnectionsCount()
|
||||
const collaborators = updates?.collaborators.size ?? 0
|
||||
const updatedAt = updates?.collaborators.get(connectionId)
|
||||
ctx.info('disconnect from document', { documentName, connectionId, connections, collaborators, updatedAt })
|
||||
ctx.info('disconnect from document', { documentName, connectionId, connections })
|
||||
|
||||
if (updates === undefined || !updates.collaborators.has(connectionId)) {
|
||||
const noUpdates = this.hasNoUpdates(documentName, connectionId)
|
||||
if (noUpdates) {
|
||||
ctx.info('no changes for document', { documentName, connectionId })
|
||||
return
|
||||
}
|
||||
@ -166,23 +157,14 @@ export class StorageExtension implements Extension {
|
||||
return
|
||||
}
|
||||
|
||||
const now = Date.now()
|
||||
|
||||
await this.storeDocument(documentName, document, updates.context)
|
||||
|
||||
// Remove collaborators that were not updated from before save
|
||||
for (const [connectionId, updatedAt] of updates.collaborators.entries()) {
|
||||
if (updatedAt < now) {
|
||||
updates.collaborators.delete(connectionId)
|
||||
}
|
||||
}
|
||||
await this.storeDocument(documentName, document, context, connectionId)
|
||||
}
|
||||
|
||||
async afterUnloadDocument ({ documentName }: afterUnloadDocumentPayload): Promise<any> {
|
||||
this.configuration.ctx.info('unload document', { documentName })
|
||||
this.updates.delete(documentName)
|
||||
this.markups.delete(documentName)
|
||||
this.failedDocuments.delete(documentName)
|
||||
this.promises.delete(documentName)
|
||||
}
|
||||
|
||||
private async loadDocument (documentName: string, context: Context): Promise<YDoc | undefined> {
|
||||
@ -199,81 +181,93 @@ export class StorageExtension implements Extension {
|
||||
}
|
||||
}
|
||||
|
||||
private async storeDocument (documentName: string, document: Document, context: Context): Promise<void> {
|
||||
const { ctx, adapter } = this.configuration
|
||||
private async storeDocument (
|
||||
documentName: string,
|
||||
document: Document,
|
||||
context: Context,
|
||||
connectionId?: string
|
||||
): Promise<void> {
|
||||
const prev = this.promises.get(documentName)
|
||||
|
||||
const curr = async (): Promise<void> => {
|
||||
if (prev !== undefined) {
|
||||
await prev
|
||||
}
|
||||
|
||||
// Check whether we still have changes after the previous save
|
||||
const noUpdates = this.hasNoUpdates(documentName, connectionId)
|
||||
if (!noUpdates) {
|
||||
await this.performStoreDocument(documentName, document, context)
|
||||
}
|
||||
}
|
||||
|
||||
const promise = curr()
|
||||
this.promises.set(documentName, promise)
|
||||
|
||||
try {
|
||||
const currMarkup = await ctx.with('save-document', {}, (ctx) =>
|
||||
adapter.saveDocument(ctx, documentName, document, context, {
|
||||
prev: () => this.markups.get(documentName) ?? {},
|
||||
curr: () => this.configuration.transformer.fromYdoc(document)
|
||||
})
|
||||
)
|
||||
|
||||
this.markups.set(documentName, currMarkup ?? {})
|
||||
} catch (err: any) {
|
||||
this.failedDocuments.set(documentName, context)
|
||||
|
||||
Analytics.handleError(err)
|
||||
ctx.error('failed to save document', { documentName, error: err })
|
||||
throw new Error(`Failed to save document ${documentName}`)
|
||||
await promise
|
||||
} finally {
|
||||
if (this.promises.get(documentName) === promise) {
|
||||
this.promises.delete(documentName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async retrySaveDocuments (): Promise<void> {
|
||||
const ctx = this.configuration.ctx
|
||||
private async performStoreDocument (documentName: string, document: Document, context: Context): Promise<void> {
|
||||
const { ctx, adapter } = this.configuration
|
||||
|
||||
const count = this.failedDocuments.size
|
||||
if (count === 0) {
|
||||
return
|
||||
}
|
||||
let attempt = 0
|
||||
while (true) {
|
||||
attempt++
|
||||
const now = Date.now()
|
||||
|
||||
ctx.info('retry failed documents', { count })
|
||||
try {
|
||||
const currMarkup = await ctx.with('save-document', {}, (ctx) =>
|
||||
adapter.saveDocument(ctx, documentName, document, context, {
|
||||
prev: () => this.markups.get(documentName) ?? {},
|
||||
curr: () => this.configuration.transformer.fromYdoc(document)
|
||||
})
|
||||
)
|
||||
|
||||
const hocuspocus = this.instance
|
||||
if (hocuspocus === undefined) {
|
||||
ctx.warn('instance is not set, cannot retry failed documents')
|
||||
return
|
||||
}
|
||||
this.markups.set(documentName, currMarkup ?? {})
|
||||
this.clearUpdates(documentName, now)
|
||||
|
||||
const promises: Promise<void>[] = []
|
||||
return
|
||||
} catch (err: any) {
|
||||
Analytics.handleError(err)
|
||||
ctx.error('failed to save document', { documentName, attempt, error: err })
|
||||
|
||||
for (const [documentName, context] of this.failedDocuments.entries()) {
|
||||
const document = hocuspocus.documents.get(documentName)
|
||||
if (this.stopped) {
|
||||
ctx.info('storage extension stopped, skipping document save', { documentName })
|
||||
throw new Error('Aborted')
|
||||
}
|
||||
|
||||
if (document === undefined) {
|
||||
ctx.warn('document not found', { documentName })
|
||||
this.failedDocuments.delete(documentName)
|
||||
continue
|
||||
await new Promise((resolve) => setTimeout(resolve, this.saveRetryInterval))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const connections = document.getConnectionsCount()
|
||||
if (connections > 0) {
|
||||
// Someone is still connected to the document
|
||||
// We will retry later, when onStoreDocument or onDisconnect hook is called
|
||||
ctx.info('document is connected, skipping', { documentName, connections })
|
||||
this.failedDocuments.delete(documentName)
|
||||
continue
|
||||
private clearUpdates (documentName: string, timestamp: number): void {
|
||||
const updates = this.updates.get(documentName)
|
||||
if (updates !== undefined) {
|
||||
for (const [connectionId, updatedAt] of updates.collaborators.entries()) {
|
||||
if (updatedAt < timestamp) {
|
||||
updates.collaborators.delete(connectionId)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
promises.push(
|
||||
ctx.with('retry-failed-document', {}, async (ctx) => {
|
||||
try {
|
||||
await this.storeDocument(documentName, document, context)
|
||||
this.failedDocuments.delete(documentName)
|
||||
if (document.getConnectionsCount() === 0) {
|
||||
await hocuspocus.unloadDocument(document)
|
||||
}
|
||||
ctx.info('successfully retried save document', { documentName })
|
||||
} catch (err: any) {
|
||||
ctx.error('failed to retry save document', { documentName, error: err })
|
||||
}
|
||||
})
|
||||
)
|
||||
private hasNoUpdates (documentName: string, connectionId?: string): boolean {
|
||||
const updates = this.updates.get(documentName)
|
||||
if (updates === undefined) {
|
||||
return true
|
||||
}
|
||||
|
||||
await ctx.with('retry-failed', {}, async () => {
|
||||
await Promise.all(promises)
|
||||
})
|
||||
if (connectionId !== undefined) {
|
||||
return !updates.collaborators.has(connectionId)
|
||||
}
|
||||
|
||||
return updates.collaborators.size === 0
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user