UBERF-8285: Fix backup service lastVisit check (#6738)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-09-26 22:01:53 +07:00 committed by GitHub
parent b585a9a721
commit 8ac0555654
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 52 additions and 13 deletions

View File

@ -682,7 +682,7 @@ export async function backup (
let downloaded = 0 let downloaded = 0
const printDownloaded = (msg: string, size?: number | null): void => { const printDownloaded = (msg: string, size?: number | null): void => {
if (size == null || Number.isNaN(size)) { if (size == null || Number.isNaN(size) || !Number.isInteger(size)) {
return return
} }
ops++ ops++
@ -710,6 +710,12 @@ export async function backup (
let changed: number = 0 let changed: number = 0
const needRetrieveChunks: Ref<Doc>[][] = [] const needRetrieveChunks: Ref<Doc>[][] = []
// Load all digest from collection. // Load all digest from collection.
ctx.info('processed', {
processed,
digest: digest.size,
time: Date.now() - st,
workspace: workspaceId.name
})
while (true) { while (true) {
try { try {
const currentChunk = await ctx.with('loadChunk', {}, () => connection.loadChunk(domain, idx, options.recheck)) const currentChunk = await ctx.with('loadChunk', {}, () => connection.loadChunk(domain, idx, options.recheck))

View File

@ -115,27 +115,37 @@ class BackupWorker {
ctx: MeasureContext ctx: MeasureContext
): Promise<{ failedWorkspaces: BaseWorkspaceInfo[], processed: number, skipped: number }> { ): Promise<{ failedWorkspaces: BaseWorkspaceInfo[], processed: number, skipped: number }> {
const workspacesIgnore = new Set(this.config.SkipWorkspaces.split(';')) const workspacesIgnore = new Set(this.config.SkipWorkspaces.split(';'))
ctx.info('skipped workspaces', { workspacesIgnore })
let skipped = 0
const workspaces = (await listAccountWorkspaces(this.config.Token)).filter((it) => { const workspaces = (await listAccountWorkspaces(this.config.Token)).filter((it) => {
const lastBackup = it.backupInfo?.lastBackup ?? 0 const lastBackup = it.backupInfo?.lastBackup ?? 0
if ((Date.now() - lastBackup) / 1000 < this.config.Interval) { if ((Date.now() - lastBackup) / 1000 < this.config.Interval) {
// No backup required, interval not elapsed // No backup required, interval not elapsed
ctx.info('Skip backup', { workspace: it.workspace, lastBackup: Math.round((Date.now() - lastBackup) / 1000) }) skipped++
return false
}
if (it.lastVisit == null) {
skipped++
return false return false
} }
const lastVisitSec = Math.floor((Date.now() - it.lastVisit) / 1000) const lastVisitSec = Math.floor((Date.now() - it.lastVisit) / 1000)
if (lastVisitSec > this.config.Interval) { if (lastVisitSec > this.config.Interval) {
// No backup required, interval not elapsed // No backup required, interval not elapsed
ctx.info('Skip backup, since not visited since last check', { skipped++
workspace: it.workspace,
days: Math.floor(lastVisitSec / 3600 / 24),
seconds: lastVisitSec
})
return false return false
} }
return !workspacesIgnore.has(it.workspace) return !workspacesIgnore.has(it.workspace)
}) })
workspaces.sort((a, b) => b.lastVisit - a.lastVisit) workspaces.sort((a, b) => b.lastVisit - a.lastVisit)
ctx.info('Preparing for BACKUP', {
total: workspaces.length,
skipped,
workspaces: workspaces.map((it) => it.workspace)
})
return await this.doBackup(ctx, workspaces) return await this.doBackup(ctx, workspaces)
} }
@ -230,7 +240,7 @@ class BackupWorker {
dataSize: Math.round((result.dataSize * 100) / (1024 * 1024)) / 100, dataSize: Math.round((result.dataSize * 100) / (1024 * 1024)) / 100,
blobsSize: Math.round((result.blobsSize * 100) / (1024 * 1024)) / 100 blobsSize: Math.round((result.blobsSize * 100) / (1024 * 1024)) / 100
} }
rootCtx.warn('\n\nBACKUP STATS ', { rootCtx.warn('BACKUP STATS', {
workspace: ws.workspace, workspace: ws.workspace,
index, index,
...backupInfo, ...backupInfo,

View File

@ -98,6 +98,7 @@ export class BlobClient {
chunks.push(chunk) chunks.push(chunk)
}) })
readable.on('end', () => { readable.on('end', () => {
readable.destroy()
resolve() resolve()
}) })
}) })

View File

@ -77,6 +77,16 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {} async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {}
doTrimHash (s: string | undefined): string {
if (s == null) {
return ''
}
if (s.startsWith('"') && s.endsWith('"')) {
return s.slice(1, s.length - 1)
}
return s
}
async doSyncDocs (ctx: MeasureContext, workspaceId: WorkspaceId, docs: ListBlobResult[]): Promise<void> { async doSyncDocs (ctx: MeasureContext, workspaceId: WorkspaceId, docs: ListBlobResult[]): Promise<void> {
const existingBlobs = toIdMap( const existingBlobs = toIdMap(
await this.dbAdapter.find<Blob>(ctx, workspaceId, DOMAIN_BLOB, { _id: { $in: docs.map((it) => it._id) } }) await this.dbAdapter.find<Blob>(ctx, workspaceId, DOMAIN_BLOB, { _id: { $in: docs.map((it) => it._id) } })
@ -84,10 +94,17 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
const toUpdate: Blob[] = [] const toUpdate: Blob[] = []
for (const d of docs) { for (const d of docs) {
const blobInfo = existingBlobs.get(d._id) const blobInfo = existingBlobs.get(d._id)
if (blobInfo === undefined || blobInfo.etag !== d.etag || blobInfo.size !== d.size) { if (
const stat = await this.stat(ctx, workspaceId, d._id) blobInfo === undefined ||
this.doTrimHash(blobInfo.etag) !== this.doTrimHash(d.etag) ||
blobInfo.size !== d.size
) {
const stat = await this.adapters.get(d.provider)?.stat(ctx, workspaceId, d._id)
if (stat !== undefined) { if (stat !== undefined) {
stat.provider = d.provider
toUpdate.push(stat) toUpdate.push(stat)
} else {
ctx.error('blob not found for sync', { provider: d.provider, id: d._id, workspace: workspaceId.name })
} }
} }
} }
@ -120,19 +137,24 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
} }
private makeStorageIterator (ctx: MeasureContext, workspaceId: WorkspaceId): BlobStorageIterator { private makeStorageIterator (ctx: MeasureContext, workspaceId: WorkspaceId): BlobStorageIterator {
const adapters = Array.from(this.adapters.values()) const adapters = Array.from(this.adapters.entries())
let provider: [string, StorageAdapter] | undefined
let iterator: BlobStorageIterator | undefined let iterator: BlobStorageIterator | undefined
return { return {
next: async () => { next: async () => {
while (true) { while (true) {
if (iterator === undefined && adapters.length > 0) { if (iterator === undefined && adapters.length > 0) {
iterator = await (adapters.shift() as StorageAdapter).listStream(ctx, workspaceId) provider = adapters.shift() as [string, StorageAdapter]
iterator = await provider[1].listStream(ctx, workspaceId)
} }
if (iterator === undefined) { if (iterator === undefined) {
return [] return []
} }
const docInfos = await iterator.next() const docInfos = await iterator.next()
if (docInfos.length > 0) { if (docInfos.length > 0) {
for (const d of docInfos) {
d.provider = provider?.[0] as string
}
// We need to check if our stored version is fine // We need to check if our stored version is fine
return docInfos return docInfos
} else { } else {
@ -323,7 +345,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
const result = await adapter.put(ctx, workspaceId, objectName, stream, contentType, size) const result = await adapter.put(ctx, workspaceId, objectName, stream, contentType, size)
if (size === undefined || size === 0) { if (size === undefined || size === 0 || !Number.isInteger(size)) {
const docStats = await adapter.stat(ctx, workspaceId, objectName) const docStats = await adapter.stat(ctx, workspaceId, objectName)
if (docStats !== undefined) { if (docStats !== undefined) {
if (contentType !== docStats.contentType) { if (contentType !== docStats.contentType) {