From 751a65f37ec16d063f9020b95b7cd165a4054e2e Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Wed, 12 Apr 2023 00:43:37 +0700 Subject: [PATCH] LImit bulk operations from server (#2947) Signed-off-by: Andrey Sobolev --- dev/tool/src/index.ts | 15 +++++++++++---- server/elastic/src/adapter.ts | 2 +- server/mongo/src/storage.ts | 16 ++++++++-------- server/tool/src/index.ts | 14 +++++++------- server/ws/src/server.ts | 8 ++++++-- 5 files changed, 33 insertions(+), 22 deletions(-) diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index bb385a9d81..4fe76e006d 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -206,13 +206,20 @@ export function devTool ( program .command('upgrade') .description('upgrade') - .action(async (cmd) => { + .option('-p|--parallel', 'Parallel upgrade', false) + .action(async (cmd: { parallel: boolean }) => { const { mongodbUri, version, txes, migrateOperations } = prepareTools() return await withDatabase(mongodbUri, async (db) => { const workspaces = await listWorkspaces(db, productId) - for (const ws of workspaces) { - console.log('---UPGRADING----', ws.workspace) - await upgradeWorkspace(version, txes, migrateOperations, productId, db, ws.workspace) + if (cmd.parallel) { + await Promise.all( + workspaces.map((ws) => upgradeWorkspace(version, txes, migrateOperations, productId, db, ws.workspace)) + ) + } else { + for (const ws of workspaces) { + console.log('---UPGRADING----', ws.workspace) + await upgradeWorkspace(version, txes, migrateOperations, productId, db, ws.workspace) + } } }) }) diff --git a/server/elastic/src/adapter.ts b/server/elastic/src/adapter.ts index a9b1dff242..cc2bece329 100644 --- a/server/elastic/src/adapter.ts +++ b/server/elastic/src/adapter.ts @@ -280,7 +280,7 @@ class ElasticAdapter implements FullTextAdapter { async updateMany (docs: IndexedDoc[]): Promise { const parts = Array.from(docs) while (parts.length > 0) { - const part = parts.splice(0, 10000) + const part = parts.splice(0, 1000) const operations = part.flatMap((doc) => [ { index: { _index: toWorkspaceString(this.workspaceId), _id: doc.id } }, diff --git a/server/mongo/src/storage.ts b/server/mongo/src/storage.ts index 13178a73ef..15cdc8b00a 100644 --- a/server/mongo/src/storage.ts +++ b/server/mongo/src/storage.ts @@ -575,13 +575,12 @@ abstract class MongoAdapterBase implements DbAdapter { async upload (domain: Domain, docs: Doc[]): Promise { const coll = this.db.collection(domain) - const docMap = new Map(docs.map((it) => [it._id, it])) + const ops = Array.from(docs) - // remove old and insert new ones - const keys = Array.from(docMap.keys()) - if (keys.length > 0) { + while (ops.length > 0) { + const part = ops.splice(0, 500) await coll.bulkWrite( - docs.map((it) => ({ + part.map((it) => ({ replaceOne: { filter: { _id: it._id }, replacement: it, @@ -597,9 +596,11 @@ abstract class MongoAdapterBase implements DbAdapter { try { // remove old and insert new ones - if (operations.size > 0) { + const ops = Array.from(operations.entries()) + if (ops.length > 0) { + const part = ops.splice(0, 500) await coll.bulkWrite( - Array.from(operations.entries()).map((it) => ({ + part.map((it) => ({ updateOne: { filter: { _id: it[0] }, update: { @@ -752,7 +753,6 @@ class MongoAdapter extends MongoAdapterBase { } } ] - // return await this.db.collection(domain).bulkWrite(ops as any) return { raw: async () => await this.db.collection(domain).bulkWrite(ops), domain, diff --git a/server/tool/src/index.ts b/server/tool/src/index.ts index 7aeb6337c2..7e5414c8af 100644 --- a/server/tool/src/index.ts +++ b/server/tool/src/index.ts @@ -160,19 +160,19 @@ export async function upgradeModel ( await client.connect() const db = getWorkspaceDB(client, workspaceId) - console.log('removing model...') + console.log(`${workspaceId.name}: removing model...`) // we're preserving accounts (created by core.account.System). const result = await db.collection(DOMAIN_TX).deleteMany({ objectSpace: core.space.Model, modifiedBy: core.account.System, objectClass: { $ne: contact.class.EmployeeAccount } }) - console.log(`${result.deletedCount} transactions deleted.`) + console.log(`${workspaceId.name}: ${result.deletedCount} transactions deleted.`) - console.log('creating model...') + console.log(`${workspaceId.name}: creating model...`) const model = txes const insert = await db.collection(DOMAIN_TX).insertMany(model as Document[]) - console.log(`${insert.insertedCount} model transactions inserted.`) + console.log(`${workspaceId.name}: ${insert.insertedCount} model transactions inserted.`) const hierarchy = new Hierarchy() const modelDb = new ModelDb(hierarchy) @@ -189,11 +189,11 @@ export async function upgradeModel ( const migrateClient = new MigrateClientImpl(db, hierarchy, modelDb) for (const op of migrateOperations) { - console.log('migrate:', op[0]) + console.log(`${workspaceId.name}: migrate:`, op[0]) await op[1].migrate(migrateClient) } - console.log('Apply upgrade operations') + console.log(`${workspaceId.name}: Apply upgrade operations`) const connection = await connect(transactorUrl, workspaceId, undefined, { mode: 'backup', model: 'upgrade' }) @@ -201,7 +201,7 @@ export async function upgradeModel ( await createUpdateIndexes(connection, db) for (const op of migrateOperations) { - console.log('upgrade:', op[0]) + console.log(`${workspaceId.name}: upgrade:`, op[0]) await op[1].upgrade(connection) } diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index 06a027d5e2..6588470b71 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -335,15 +335,17 @@ async function handleRequest ( const userCtx = ctx.newChild(service.getUser(), { userId: service.getUser() }) as SessionContext userCtx.sessionId = service.sessionInstanceId ?? '' const f = (service as any)[request.method] + let timeout: any + let hangTimeout: any try { const params = [userCtx, ...request.params] const st = Date.now() - const timeout = setTimeout(() => { + timeout = setTimeout(() => { console.log('long request found', workspace, service.getUser(), request, params) }, 4000) - const hangTimeout = setTimeout(() => { + hangTimeout = setTimeout(() => { console.log('request hang found, 30sec', workspace, service.getUser(), request, params) }, 30000) @@ -367,6 +369,8 @@ async function handleRequest ( ws.send(serialize(resp)) } catch (err: any) { console.error(err) + clearTimeout(timeout) + clearTimeout(hangTimeout) const resp: Response = { id: request.id, error: unknownError(err)