From ff43872a4168f5ff6391b041f559a847a4e66b78 Mon Sep 17 00:00:00 2001 From: Alexey Zinoviev Date: Mon, 2 Jun 2025 02:57:45 +0400 Subject: [PATCH] uberf-8425: close migration iterators properly Signed-off-by: Alexey Zinoviev Signed-off-by: Alexey Zinoviev --- models/contact/src/migration.ts | 44 ++++----- models/core/src/migration.ts | 42 +++++---- services/github/model-github/src/migration.ts | 90 ++++++++++--------- 3 files changed, 94 insertions(+), 82 deletions(-) diff --git a/models/contact/src/migration.ts b/models/contact/src/migration.ts index e0949b94ef..82b54a4df2 100644 --- a/models/contact/src/migration.ts +++ b/models/contact/src/migration.ts @@ -195,13 +195,13 @@ async function assignEmployeeRoles (client: MigrationClient): Promise { client.logger.log('assigning roles to employees...', {}) const wsMembers = await client.accountClient.getWorkspaceMembers() - const persons = await client.traverse(DOMAIN_CONTACT, { + const personsIterator = await client.traverse(DOMAIN_CONTACT, { _class: contact.class.Person }) try { while (true) { - const docs = await persons.next(50) + const docs = await personsIterator.next(50) if (docs === null || docs?.length === 0) { break } @@ -236,7 +236,7 @@ async function assignEmployeeRoles (client: MigrationClient): Promise { } } } finally { - await persons.close() + await personsIterator.close() client.logger.log('finished assigning roles to employees...', {}) } } @@ -402,7 +402,7 @@ async function ensureGlobalPersonsForLocalAccounts (client: MigrationClient): Pr async function createUserProfiles (client: MigrationClient): Promise { client.logger.log('creating user profiles for persons...', {}) - const persons = await client.traverse(DOMAIN_CONTACT, { + const personsIterator = await client.traverse(DOMAIN_CONTACT, { _class: contact.class.Person, profile: { $exists: false } }) @@ -418,7 +418,7 @@ async function createUserProfiles (client: MigrationClient): Promise { try { while (true) { - const docs = await persons.next(200) + const docs = await personsIterator.next(200) if (docs === null || docs?.length === 0) { break } @@ -452,7 +452,7 @@ async function createUserProfiles (client: MigrationClient): Promise { } } } finally { - await persons.close() + await personsIterator.close() client.logger.log('finished creating user profiles for persons...', {}) } } @@ -460,27 +460,31 @@ async function createUserProfiles (client: MigrationClient): Promise { async function fixSocialIdCase (client: MigrationClient): Promise { client.logger.log('Fixing social id case...', {}) - const socialIds = await client.traverse(DOMAIN_CHANNEL, { + const socialIdsIterator = await client.traverse(DOMAIN_CHANNEL, { _class: contact.class.SocialIdentity }) - let updated = 0 - while (true) { - const docs = await socialIds.next(200) - if (docs === null || docs?.length === 0) { - break - } - for (const d of docs) { - const newKey = d.key.toLowerCase() - const newVal = d.value.toLowerCase() - if (newKey !== d.key || newVal !== d.value) { - await client.update(DOMAIN_CHANNEL, { _id: d._id }, { key: newKey, value: newVal }) - updated++ + try { + while (true) { + const docs = await socialIdsIterator.next(200) + if (docs === null || docs?.length === 0) { + break + } + + for (const d of docs) { + const newKey = d.key.toLowerCase() + const newVal = d.value.toLowerCase() + if (newKey !== d.key || newVal !== d.value) { + await client.update(DOMAIN_CHANNEL, { _id: d._id }, { key: newKey, value: newVal }) + updated++ + } } } + } finally { + await socialIdsIterator.close() + client.logger.log('Finished fixing social id case. Total updated:', { updated }) } - client.logger.log('Finished fixing social id case. Total updated:', { updated }) } export const contactOperation: MigrateOperation = { diff --git a/models/core/src/migration.ts b/models/core/src/migration.ts index bb09acdf73..1b7eccc7d1 100644 --- a/models/core/src/migration.ts +++ b/models/core/src/migration.ts @@ -270,31 +270,35 @@ export async function migrateBackupMixins (client: MigrationClient): Promise>(DOMAIN_TX, { _class: core.class.TxMixin }) - while (true) { - const mixinOps = await txIterator.next(500) - if (mixinOps === null || mixinOps.length === 0) break - const _classes = groupByArray(mixinOps, (it) => it.objectClass) + try { + while (true) { + const mixinOps = await txIterator.next(500) + if (mixinOps === null || mixinOps.length === 0) break + const _classes = groupByArray(mixinOps, (it) => it.objectClass) - for (const [_class, ops] of _classes.entries()) { - const domain = hierarchy.findDomain(_class) - if (domain === undefined) continue - let docs = await client.find(domain, { _id: { $in: ops.map((it) => it.objectId) } }) + for (const [_class, ops] of _classes.entries()) { + const domain = hierarchy.findDomain(_class) + if (domain === undefined) continue + let docs = await client.find(domain, { _id: { $in: ops.map((it) => it.objectId) } }) - docs = docs.filter((it) => { - // Check if mixin is last operation by modifiedOn - const mops = ops.filter((mi) => mi.objectId === it._id) - if (mops.length === 0) return false - return mops.some((mi) => mi.modifiedOn === it.modifiedOn && mi.modifiedBy === it.modifiedBy) - }) + docs = docs.filter((it) => { + // Check if mixin is last operation by modifiedOn + const mops = ops.filter((mi) => mi.objectId === it._id) + if (mops.length === 0) return false + return mops.some((mi) => mi.modifiedOn === it.modifiedOn && mi.modifiedBy === it.modifiedBy) + }) - if (docs.length > 0) { - // Check if docs has mixins from list - const toUpdate = docs.filter((it) => hierarchy.findAllMixins(it).length > 0) - if (toUpdate.length > 0) { - await client.update(domain, { _id: { $in: toUpdate.map((it) => it._id) } }, { '%hash%': curHash }) + if (docs.length > 0) { + // Check if docs has mixins from list + const toUpdate = docs.filter((it) => hierarchy.findAllMixins(it).length > 0) + if (toUpdate.length > 0) { + await client.update(domain, { _id: { $in: toUpdate.map((it) => it._id) } }, { '%hash%': curHash }) + } } } } + } finally { + await txIterator.close() } } diff --git a/services/github/model-github/src/migration.ts b/services/github/model-github/src/migration.ts index ff8ae04215..c3457a9ebd 100644 --- a/services/github/model-github/src/migration.ts +++ b/services/github/model-github/src/migration.ts @@ -160,7 +160,7 @@ async function migrateFixMissingDocSyncInfo (client: MigrationClient): Promise( + const issuesIterator = await client.traverse( DOMAIN_TASK, { _class: tracker.class.Issue, @@ -178,52 +178,56 @@ async function migrateFixMissingDocSyncInfo (client: MigrationClient): Promise it._id as unknown as Ref) } - }, - { - projection: { - _id: 1 - } + try { + while (true) { + const docs = await issuesIterator.next(1000) + if (docs === null || docs.length === 0) { + break } - ) - const infoIds = toIdMap(infos) - let repository: Ref | null = null - for (const issue of docs) { - if (!infoIds.has(issue._id)) { - if (client.hierarchy.hasMixin(issue, github.mixin.GithubIssue)) { - repository = client.hierarchy.as(issue, github.mixin.GithubIssue).repository - } - counter++ - // Missing - await client.create(DOMAIN_GITHUB, { + const infos = await client.find( + DOMAIN_GITHUB, + { _class: github.class.DocSyncInfo, - _id: issue._id as any, - url: '', - githubNumber: 0, - repository, - objectClass: issue._class, - externalVersion: '#', // We need to put this one to handle new documents. - needSync: '', - derivedVersion: '', - attachedTo: issue.attachedTo ?? tracker.ids.NoParent, - space: issue.space, - modifiedBy: issue.modifiedBy, - modifiedOn: issue.modifiedOn - }) + _id: { $in: docs.map((it) => it._id as unknown as Ref) } + }, + { + projection: { + _id: 1 + } + } + ) + const infoIds = toIdMap(infos) + let repository: Ref | null = null + for (const issue of docs) { + if (!infoIds.has(issue._id)) { + if (client.hierarchy.hasMixin(issue, github.mixin.GithubIssue)) { + repository = client.hierarchy.as(issue, github.mixin.GithubIssue).repository + } + counter++ + // Missing + await client.create(DOMAIN_GITHUB, { + _class: github.class.DocSyncInfo, + _id: issue._id as any, + url: '', + githubNumber: 0, + repository, + objectClass: issue._class, + externalVersion: '#', // We need to put this one to handle new documents. + needSync: '', + derivedVersion: '', + attachedTo: issue.attachedTo ?? tracker.ids.NoParent, + space: issue.space, + modifiedBy: issue.modifiedBy, + modifiedOn: issue.modifiedOn + }) + } } } - } - if (counter > 0) { - console.log('Created', counter, 'DocSyncInfos') + } finally { + await issuesIterator.close() + if (counter > 0) { + console.log('Created', counter, 'DocSyncInfos') + } } } }