UBERF-7946: Remove bulk in mongo adapter (#6395)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-08-27 12:28:27 +07:00 committed by GitHub
parent eaa23c771d
commit e231f89e0b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -75,7 +75,6 @@ import { createHash } from 'crypto'
import {
type AbstractCursor,
type AnyBulkWriteOperation,
type BulkWriteResult,
type Collection,
type Db,
type Document,
@ -1107,57 +1106,6 @@ class MongoAdapter extends MongoAdapterBase {
}
}
bulkOps = new Map<Domain, AnyBulkWriteOperation<Doc>[]>()
async _pushBulk (ctx: MeasureContext): Promise<void> {
const bulk = Array.from(this.bulkOps.entries())
this.bulkOps.clear()
if (bulk.length === 0) {
return
}
const promises: Promise<BulkWriteResult>[] = []
for (const [domain, ops] of bulk) {
if (ops === undefined || ops.length === 0) {
continue
}
const coll = this.db.collection<Doc>(domain)
promises.push(
addOperation(
ctx,
'bulk-write',
{ domain, operations: ops.length },
async (ctx) =>
await ctx.with(
'bulk-write',
{ domain },
() =>
coll.bulkWrite(ops, {
ordered: false
}),
{
domain,
operations: ops.length
}
)
)
)
}
await Promise.all(promises)
}
async pushBulk (ctx: MeasureContext, domain: Domain, ops: AnyBulkWriteOperation<Doc>[]): Promise<void> {
const existing = this.bulkOps.get(domain)
if (existing !== undefined) {
existing.push(...ops)
} else {
this.bulkOps.set(domain, ops)
}
// We need to wait next cycle to send request
await new Promise<void>((resolve) => setImmediate(resolve))
await this._pushBulk(ctx)
}
async tx (ctx: MeasureContext, ...txes: Tx[]): Promise<TxResult[]> {
const result: TxResult[] = []
@ -1171,6 +1119,7 @@ class MongoAdapter extends MongoAdapterBase {
const stTime = Date.now()
const st = Date.now()
let promises: Promise<any>[] = []
for (const [domain, txs] of byDomain) {
if (domain === undefined) {
continue
@ -1227,9 +1176,37 @@ class MongoAdapter extends MongoAdapterBase {
}
if (ops.length > 0) {
await this.pushBulk(ctx, domain, ops)
if (ops === undefined || ops.length === 0) {
continue
}
const coll = this.db.collection<Doc>(domain)
promises.push(
addOperation(
ctx,
'bulk-write',
{ domain, operations: ops.length },
async (ctx) =>
await ctx.with(
'bulk-write',
{ domain },
() =>
coll.bulkWrite(ops, {
ordered: false
}),
{
domain,
operations: ops.length
}
)
)
)
}
if (domainBulk.findUpdate.size > 0) {
if (promises.length > 0) {
await Promise.all(promises)
promises = []
}
const coll = this.db.collection<Doc>(domain)
await ctx.with(
@ -1255,6 +1232,10 @@ class MongoAdapter extends MongoAdapterBase {
}
if (domainBulk.raw.length > 0) {
if (promises.length > 0) {
await Promise.all(promises)
promises = []
}
await ctx.with(
'raw',
{},
@ -1270,6 +1251,9 @@ class MongoAdapter extends MongoAdapterBase {
)
}
}
if (promises.length > 0) {
await Promise.all(promises)
}
return result
}
@ -1511,17 +1495,12 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
await this._db.init(DOMAIN_TX)
}
txBulk: Tx[] = []
async _bulkTx (ctx: MeasureContext): Promise<void> {
const txes = this.txBulk
this.txBulk = []
if (txes.length === 0) {
return
override async tx (ctx: MeasureContext, ...tx: Tx[]): Promise<TxResult[]> {
if (tx.length === 0) {
return []
}
const opName = txes.length === 1 ? 'tx-one' : 'tx'
const opName = tx.length === 1 ? 'tx-one' : 'tx'
await addOperation(
ctx,
opName,
@ -1532,31 +1511,20 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
{ domain: 'tx' },
() =>
this.txCollection().insertMany(
txes.map((it) => translateDoc(it)),
tx.map((it) => translateDoc(it)),
{
ordered: false
}
),
{
count: txes.length
count: tx.length
}
),
{ domain: 'tx', count: txes.length }
{ domain: 'tx', count: tx.length }
)
ctx.withSync('handleEvent', {}, () => {
this.handleEvent(DOMAIN_TX, 'add', txes.length)
this.handleEvent(DOMAIN_TX, 'add', tx.length)
})
}
override async tx (ctx: MeasureContext, ...tx: Tx[]): Promise<TxResult[]> {
if (tx.length === 0) {
return []
}
this.txBulk.push(...tx)
// We need to wait next cycle to send request
await new Promise<void>((resolve) => setImmediate(resolve))
await this._bulkTx(ctx)
return []
}