diff --git a/server/mongo/src/storage.ts b/server/mongo/src/storage.ts index 051af5249c..4644eda2fb 100644 --- a/server/mongo/src/storage.ts +++ b/server/mongo/src/storage.ts @@ -75,7 +75,6 @@ import { createHash } from 'crypto' import { type AbstractCursor, type AnyBulkWriteOperation, - type BulkWriteResult, type Collection, type Db, type Document, @@ -1107,57 +1106,6 @@ class MongoAdapter extends MongoAdapterBase { } } - bulkOps = new Map[]>() - - async _pushBulk (ctx: MeasureContext): Promise { - const bulk = Array.from(this.bulkOps.entries()) - this.bulkOps.clear() - if (bulk.length === 0) { - return - } - const promises: Promise[] = [] - for (const [domain, ops] of bulk) { - if (ops === undefined || ops.length === 0) { - continue - } - const coll = this.db.collection(domain) - - promises.push( - addOperation( - ctx, - 'bulk-write', - { domain, operations: ops.length }, - async (ctx) => - await ctx.with( - 'bulk-write', - { domain }, - () => - coll.bulkWrite(ops, { - ordered: false - }), - { - domain, - operations: ops.length - } - ) - ) - ) - } - await Promise.all(promises) - } - - async pushBulk (ctx: MeasureContext, domain: Domain, ops: AnyBulkWriteOperation[]): Promise { - const existing = this.bulkOps.get(domain) - if (existing !== undefined) { - existing.push(...ops) - } else { - this.bulkOps.set(domain, ops) - } - // We need to wait next cycle to send request - await new Promise((resolve) => setImmediate(resolve)) - await this._pushBulk(ctx) - } - async tx (ctx: MeasureContext, ...txes: Tx[]): Promise { const result: TxResult[] = [] @@ -1171,6 +1119,7 @@ class MongoAdapter extends MongoAdapterBase { const stTime = Date.now() const st = Date.now() + let promises: Promise[] = [] for (const [domain, txs] of byDomain) { if (domain === undefined) { continue @@ -1227,9 +1176,37 @@ class MongoAdapter extends MongoAdapterBase { } if (ops.length > 0) { - await this.pushBulk(ctx, domain, ops) + if (ops === undefined || ops.length === 0) { + continue + } + const coll = this.db.collection(domain) + + promises.push( + addOperation( + ctx, + 'bulk-write', + { domain, operations: ops.length }, + async (ctx) => + await ctx.with( + 'bulk-write', + { domain }, + () => + coll.bulkWrite(ops, { + ordered: false + }), + { + domain, + operations: ops.length + } + ) + ) + ) } if (domainBulk.findUpdate.size > 0) { + if (promises.length > 0) { + await Promise.all(promises) + promises = [] + } const coll = this.db.collection(domain) await ctx.with( @@ -1255,6 +1232,10 @@ class MongoAdapter extends MongoAdapterBase { } if (domainBulk.raw.length > 0) { + if (promises.length > 0) { + await Promise.all(promises) + promises = [] + } await ctx.with( 'raw', {}, @@ -1270,6 +1251,9 @@ class MongoAdapter extends MongoAdapterBase { ) } } + if (promises.length > 0) { + await Promise.all(promises) + } return result } @@ -1511,17 +1495,12 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter { await this._db.init(DOMAIN_TX) } - txBulk: Tx[] = [] - - async _bulkTx (ctx: MeasureContext): Promise { - const txes = this.txBulk - this.txBulk = [] - - if (txes.length === 0) { - return + override async tx (ctx: MeasureContext, ...tx: Tx[]): Promise { + if (tx.length === 0) { + return [] } - const opName = txes.length === 1 ? 'tx-one' : 'tx' + const opName = tx.length === 1 ? 'tx-one' : 'tx' await addOperation( ctx, opName, @@ -1532,31 +1511,20 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter { { domain: 'tx' }, () => this.txCollection().insertMany( - txes.map((it) => translateDoc(it)), + tx.map((it) => translateDoc(it)), { ordered: false } ), { - count: txes.length + count: tx.length } ), - { domain: 'tx', count: txes.length } + { domain: 'tx', count: tx.length } ) ctx.withSync('handleEvent', {}, () => { - this.handleEvent(DOMAIN_TX, 'add', txes.length) + this.handleEvent(DOMAIN_TX, 'add', tx.length) }) - } - - override async tx (ctx: MeasureContext, ...tx: Tx[]): Promise { - if (tx.length === 0) { - return [] - } - this.txBulk.push(...tx) - - // We need to wait next cycle to send request - await new Promise((resolve) => setImmediate(resolve)) - await this._bulkTx(ctx) return [] }