LImit bulk operations from server (#2947)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2023-04-12 00:43:37 +07:00 committed by GitHub
parent 269d585638
commit 751a65f37e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 33 additions and 22 deletions

View File

@ -206,13 +206,20 @@ export function devTool (
program
.command('upgrade')
.description('upgrade')
.action(async (cmd) => {
.option('-p|--parallel', 'Parallel upgrade', false)
.action(async (cmd: { parallel: boolean }) => {
const { mongodbUri, version, txes, migrateOperations } = prepareTools()
return await withDatabase(mongodbUri, async (db) => {
const workspaces = await listWorkspaces(db, productId)
for (const ws of workspaces) {
console.log('---UPGRADING----', ws.workspace)
await upgradeWorkspace(version, txes, migrateOperations, productId, db, ws.workspace)
if (cmd.parallel) {
await Promise.all(
workspaces.map((ws) => upgradeWorkspace(version, txes, migrateOperations, productId, db, ws.workspace))
)
} else {
for (const ws of workspaces) {
console.log('---UPGRADING----', ws.workspace)
await upgradeWorkspace(version, txes, migrateOperations, productId, db, ws.workspace)
}
}
})
})

View File

@ -280,7 +280,7 @@ class ElasticAdapter implements FullTextAdapter {
async updateMany (docs: IndexedDoc[]): Promise<TxResult[]> {
const parts = Array.from(docs)
while (parts.length > 0) {
const part = parts.splice(0, 10000)
const part = parts.splice(0, 1000)
const operations = part.flatMap((doc) => [
{ index: { _index: toWorkspaceString(this.workspaceId), _id: doc.id } },

View File

@ -575,13 +575,12 @@ abstract class MongoAdapterBase implements DbAdapter {
async upload (domain: Domain, docs: Doc[]): Promise<void> {
const coll = this.db.collection(domain)
const docMap = new Map(docs.map((it) => [it._id, it]))
const ops = Array.from(docs)
// remove old and insert new ones
const keys = Array.from(docMap.keys())
if (keys.length > 0) {
while (ops.length > 0) {
const part = ops.splice(0, 500)
await coll.bulkWrite(
docs.map((it) => ({
part.map((it) => ({
replaceOne: {
filter: { _id: it._id },
replacement: it,
@ -597,9 +596,11 @@ abstract class MongoAdapterBase implements DbAdapter {
try {
// remove old and insert new ones
if (operations.size > 0) {
const ops = Array.from(operations.entries())
if (ops.length > 0) {
const part = ops.splice(0, 500)
await coll.bulkWrite(
Array.from(operations.entries()).map((it) => ({
part.map((it) => ({
updateOne: {
filter: { _id: it[0] },
update: {
@ -752,7 +753,6 @@ class MongoAdapter extends MongoAdapterBase {
}
}
]
// return await this.db.collection(domain).bulkWrite(ops as any)
return {
raw: async () => await this.db.collection(domain).bulkWrite(ops),
domain,

View File

@ -160,19 +160,19 @@ export async function upgradeModel (
await client.connect()
const db = getWorkspaceDB(client, workspaceId)
console.log('removing model...')
console.log(`${workspaceId.name}: removing model...`)
// we're preserving accounts (created by core.account.System).
const result = await db.collection(DOMAIN_TX).deleteMany({
objectSpace: core.space.Model,
modifiedBy: core.account.System,
objectClass: { $ne: contact.class.EmployeeAccount }
})
console.log(`${result.deletedCount} transactions deleted.`)
console.log(`${workspaceId.name}: ${result.deletedCount} transactions deleted.`)
console.log('creating model...')
console.log(`${workspaceId.name}: creating model...`)
const model = txes
const insert = await db.collection(DOMAIN_TX).insertMany(model as Document[])
console.log(`${insert.insertedCount} model transactions inserted.`)
console.log(`${workspaceId.name}: ${insert.insertedCount} model transactions inserted.`)
const hierarchy = new Hierarchy()
const modelDb = new ModelDb(hierarchy)
@ -189,11 +189,11 @@ export async function upgradeModel (
const migrateClient = new MigrateClientImpl(db, hierarchy, modelDb)
for (const op of migrateOperations) {
console.log('migrate:', op[0])
console.log(`${workspaceId.name}: migrate:`, op[0])
await op[1].migrate(migrateClient)
}
console.log('Apply upgrade operations')
console.log(`${workspaceId.name}: Apply upgrade operations`)
const connection = await connect(transactorUrl, workspaceId, undefined, { mode: 'backup', model: 'upgrade' })
@ -201,7 +201,7 @@ export async function upgradeModel (
await createUpdateIndexes(connection, db)
for (const op of migrateOperations) {
console.log('upgrade:', op[0])
console.log(`${workspaceId.name}: upgrade:`, op[0])
await op[1].upgrade(connection)
}

View File

@ -335,15 +335,17 @@ async function handleRequest<S extends Session> (
const userCtx = ctx.newChild(service.getUser(), { userId: service.getUser() }) as SessionContext
userCtx.sessionId = service.sessionInstanceId ?? ''
const f = (service as any)[request.method]
let timeout: any
let hangTimeout: any
try {
const params = [userCtx, ...request.params]
const st = Date.now()
const timeout = setTimeout(() => {
timeout = setTimeout(() => {
console.log('long request found', workspace, service.getUser(), request, params)
}, 4000)
const hangTimeout = setTimeout(() => {
hangTimeout = setTimeout(() => {
console.log('request hang found, 30sec', workspace, service.getUser(), request, params)
}, 30000)
@ -367,6 +369,8 @@ async function handleRequest<S extends Session> (
ws.send(serialize(resp))
} catch (err: any) {
console.error(err)
clearTimeout(timeout)
clearTimeout(hangTimeout)
const resp: Response<any> = {
id: request.id,
error: unknownError(err)