From 5311096e05aa8f5e742da8dee95f269f0318ea2c Mon Sep 17 00:00:00 2001 From: Artyom Savchenko Date: Tue, 29 Apr 2025 14:03:04 +0700 Subject: [PATCH] UBERF-10375: Fix full email messages sync (#8758) Signed-off-by: Artem Savchenko --- services/gmail/pod-gmail/src/message/sync.ts | 91 +++++++++++++------- 1 file changed, 58 insertions(+), 33 deletions(-) diff --git a/services/gmail/pod-gmail/src/message/sync.ts b/services/gmail/pod-gmail/src/message/sync.ts index d3a26afae4..28f1311fdc 100644 --- a/services/gmail/pod-gmail/src/message/sync.ts +++ b/services/gmail/pod-gmail/src/message/sync.ts @@ -88,6 +88,16 @@ export class SyncManager { await this.keyValueClient.setValue(historyKey, history) } + private async getPageToken (userId: PersonId): Promise { + const pageTokenKey = this.getPageTokenKey(userId) + return await this.keyValueClient.getValue(pageTokenKey) + } + + private async setPageToken (userId: PersonId, pageToken: string): Promise { + const pageTokenKey = this.getPageTokenKey(userId) + await this.keyValueClient.setValue(pageTokenKey, pageToken) + } + private async partSync (userId: PersonId, userEmail: string | undefined, historyId: string): Promise { if (userEmail === undefined) { throw new Error('Cannot sync without user email') @@ -151,7 +161,10 @@ export class SyncManager { if (userEmail === undefined) { throw new Error('Cannot sync without user email') } - const pageToken: string | undefined = undefined + + // Get saved page token if exists to resume sync + let pageToken: string | undefined = (await this.getPageToken(userId)) ?? undefined + const query: gmail_v1.Params$Resource$Users$Messages$List = { userId: 'me', pageToken @@ -160,48 +173,56 @@ export class SyncManager { query.q = q } let currentHistoryId: string | undefined + let totalProcessedMessages = 0 + try { - const messagesIds: string[] = [] + // Process one page at a time while (true) { await this.rateLimiter.take(5) const messages = await this.gmail.messages.list(query) - if (query.pageToken == null) { - this.ctx.info('Total messages', { - workspace: this.workspace, - userId, - resultSizeEstimate: messages.data.resultSizeEstimate - }) - } - const ids = messages.data.messages?.map((p) => p.id) ?? [] - for (let index = 0; index < ids.length; index++) { - const id = ids[index] + + const ids = messages.data.messages?.map((p) => p.id).filter((id) => id != null) ?? [] + this.ctx.info('Processing page', { + workspace: this.workspace, + userId, + messagesInPage: ids.length, + totalProcessed: totalProcessedMessages, + currentHistoryId, + pageToken: query.pageToken + }) + + for (const id of ids) { if (id == null) continue - messagesIds.push(id) + try { + const message = await this.getMessage(id) + const historyId = message.data.historyId + await this.messageManager.saveMessage(message, userEmail) + + if (historyId != null && q === undefined) { + if (currentHistoryId == null || Number(currentHistoryId) < Number(historyId)) { + currentHistoryId = historyId + } + } + } catch (err: any) { + this.ctx.error('Full sync message error', { workspace: this.workspace, userId, messageId: id, err }) + } } + + totalProcessedMessages += ids.length + if (messages.data.nextPageToken == null) { - this.ctx.info('Break', { totalNewMessages: messagesIds.length }) + this.ctx.info('Completed sync', { workspace: this.workspace, userId, totalMessages: totalProcessedMessages }) break } - query.pageToken = messages.data.nextPageToken + + // Update page token for the next iteration + pageToken = messages.data.nextPageToken + query.pageToken = pageToken + await this.setPageToken(userId, pageToken) } - for (let index = messagesIds.length - 1; index >= 0; index--) { - const id = messagesIds[index] - try { - const message = await this.getMessage(id) - const historyId = message.data.historyId - await this.messageManager.saveMessage(message, userEmail) - if (historyId != null && q === undefined) { - if (currentHistoryId == null || Number(currentHistoryId) < Number(historyId)) { - await this.setHistoryId(userId, historyId) - currentHistoryId = historyId - } - } - if (index % 500 === 0) { - this.ctx.info('Remaining messages to sync', { workspace: this.workspace, userId, count: index }) - } - } catch (err: any) { - this.ctx.error('Full sync message error', { workspace: this.workspace, userId, err }) - } + + if (currentHistoryId != null) { + await this.setHistoryId(userId, currentHistoryId) } this.ctx.info('Full sync finished', { workspaceUuid: this.workspace, userId, userEmail }) } catch (err) { @@ -243,4 +264,8 @@ export class SyncManager { private getHistoryKey (userId: PersonId): string { return `history:${this.workspace}:${userId}` } + + private getPageTokenKey (userId: PersonId): string { + return `page-token:${this.workspace}:${userId}` + } }