UBERF-10375: Fix full email messages sync (#8758)

Signed-off-by: Artem Savchenko <armisav@gmail.com>
This commit is contained in:
Artyom Savchenko 2025-04-29 14:03:04 +07:00 committed by GitHub
parent 89d2d6872a
commit 5311096e05
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -88,6 +88,16 @@ export class SyncManager {
await this.keyValueClient.setValue(historyKey, history)
}
private async getPageToken (userId: PersonId): Promise<string | null> {
const pageTokenKey = this.getPageTokenKey(userId)
return await this.keyValueClient.getValue<string>(pageTokenKey)
}
private async setPageToken (userId: PersonId, pageToken: string): Promise<void> {
const pageTokenKey = this.getPageTokenKey(userId)
await this.keyValueClient.setValue(pageTokenKey, pageToken)
}
private async partSync (userId: PersonId, userEmail: string | undefined, historyId: string): Promise<void> {
if (userEmail === undefined) {
throw new Error('Cannot sync without user email')
@ -151,7 +161,10 @@ export class SyncManager {
if (userEmail === undefined) {
throw new Error('Cannot sync without user email')
}
const pageToken: string | undefined = undefined
// Get saved page token if exists to resume sync
let pageToken: string | undefined = (await this.getPageToken(userId)) ?? undefined
const query: gmail_v1.Params$Resource$Users$Messages$List = {
userId: 'me',
pageToken
@ -160,48 +173,56 @@ export class SyncManager {
query.q = q
}
let currentHistoryId: string | undefined
let totalProcessedMessages = 0
try {
const messagesIds: string[] = []
// Process one page at a time
while (true) {
await this.rateLimiter.take(5)
const messages = await this.gmail.messages.list(query)
if (query.pageToken == null) {
this.ctx.info('Total messages', {
workspace: this.workspace,
userId,
resultSizeEstimate: messages.data.resultSizeEstimate
})
}
const ids = messages.data.messages?.map((p) => p.id) ?? []
for (let index = 0; index < ids.length; index++) {
const id = ids[index]
const ids = messages.data.messages?.map((p) => p.id).filter((id) => id != null) ?? []
this.ctx.info('Processing page', {
workspace: this.workspace,
userId,
messagesInPage: ids.length,
totalProcessed: totalProcessedMessages,
currentHistoryId,
pageToken: query.pageToken
})
for (const id of ids) {
if (id == null) continue
messagesIds.push(id)
try {
const message = await this.getMessage(id)
const historyId = message.data.historyId
await this.messageManager.saveMessage(message, userEmail)
if (historyId != null && q === undefined) {
if (currentHistoryId == null || Number(currentHistoryId) < Number(historyId)) {
currentHistoryId = historyId
}
}
} catch (err: any) {
this.ctx.error('Full sync message error', { workspace: this.workspace, userId, messageId: id, err })
}
}
totalProcessedMessages += ids.length
if (messages.data.nextPageToken == null) {
this.ctx.info('Break', { totalNewMessages: messagesIds.length })
this.ctx.info('Completed sync', { workspace: this.workspace, userId, totalMessages: totalProcessedMessages })
break
}
query.pageToken = messages.data.nextPageToken
// Update page token for the next iteration
pageToken = messages.data.nextPageToken
query.pageToken = pageToken
await this.setPageToken(userId, pageToken)
}
for (let index = messagesIds.length - 1; index >= 0; index--) {
const id = messagesIds[index]
try {
const message = await this.getMessage(id)
const historyId = message.data.historyId
await this.messageManager.saveMessage(message, userEmail)
if (historyId != null && q === undefined) {
if (currentHistoryId == null || Number(currentHistoryId) < Number(historyId)) {
await this.setHistoryId(userId, historyId)
currentHistoryId = historyId
}
}
if (index % 500 === 0) {
this.ctx.info('Remaining messages to sync', { workspace: this.workspace, userId, count: index })
}
} catch (err: any) {
this.ctx.error('Full sync message error', { workspace: this.workspace, userId, err })
}
if (currentHistoryId != null) {
await this.setHistoryId(userId, currentHistoryId)
}
this.ctx.info('Full sync finished', { workspaceUuid: this.workspace, userId, userEmail })
} catch (err) {
@ -243,4 +264,8 @@ export class SyncManager {
private getHistoryKey (userId: PersonId): string {
return `history:${this.workspace}:${userId}`
}
private getPageTokenKey (userId: PersonId): string {
return `page-token:${this.workspace}:${userId}`
}
}