From 74d76d34a4bca8cad250319a01ae0b0c986b4772 Mon Sep 17 00:00:00 2001 From: Denis Bykhov Date: Fri, 2 Aug 2024 19:36:51 +0500 Subject: [PATCH 1/3] Log errors in check model (#6235) --- common/scripts/check_model_version.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/common/scripts/check_model_version.js b/common/scripts/check_model_version.js index 295065d7df..095f783735 100755 --- a/common/scripts/check_model_version.js +++ b/common/scripts/check_model_version.js @@ -2,12 +2,14 @@ const exec = require('child_process').exec exec('git describe --tags `git rev-list --tags --max-count=1`', (err, stdout, stderr) => { if (err !== null) { + console.log('Error', err) process.exit(1) } const tag = stdout.trim() console.log('Check changes for tag:', tag) exec(`git fetch --tags && git diff ${tag} --name-only`, (err, stdout, stderr) => { if (err !== null) { + console.log('Error', err) process.exit(1) } const changedFiles = stdout.trim().split('\n') From 29b082fc7b83c76cb9edaa89d911da2080bef515 Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Fri, 2 Aug 2024 21:40:15 +0700 Subject: [PATCH 2/3] Add file moving tool (#6223) --- dev/tool/src/index.ts | 36 +++++++++++- dev/tool/src/storage.ts | 60 ++++++++++++++++++++ server/core/src/__tests__/aggregator.spec.ts | 4 +- server/core/src/server/aggregator.ts | 10 +++- 4 files changed, 104 insertions(+), 6 deletions(-) create mode 100644 dev/tool/src/storage.ts diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index c46db8e771..ad20f05155 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -1,6 +1,6 @@ // // Copyright © 2020, 2021 Anticrm Platform Contributors. -// Copyright © 2021 Hardcore Engineering Inc. +// Copyright © 2021, 2024 Hardcore Engineering Inc. // // Licensed under the Eclipse Public License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. You may @@ -74,7 +74,7 @@ import { consoleModelLogger, type MigrateOperation } from '@hcengineering/model' import contact from '@hcengineering/model-contact' import { getMongoClient, getWorkspaceDB } from '@hcengineering/mongo' import { openAIConfigDefaults } from '@hcengineering/openai' -import type { StorageAdapter } from '@hcengineering/server-core' +import type { StorageAdapter, StorageAdapterEx } from '@hcengineering/server-core' import { deepEqual } from 'fast-equals' import { createWriteStream, readFileSync } from 'fs' import { benchmark, benchmarkWorker } from './benchmark' @@ -95,6 +95,7 @@ import { fixJsonMarkup } from './markup' import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin' import { openAIConfig } from './openai' import { fixAccountEmails, renameAccount } from './renameAccount' +import { moveFiles } from './storage' const colorConstants = { colorRed: '\u001b[31m', @@ -1040,6 +1041,37 @@ export function devTool ( }) }) + program + .command('move-files') + .option('-w, --workspace ', 'Selected workspace only', '') + .action(async (cmd: { workspace: string }) => { + const { mongodbUri } = prepareTools() + await withDatabase(mongodbUri, async (db, client) => { + await withStorage(mongodbUri, async (adapter) => { + try { + const exAdapter = adapter as StorageAdapterEx + if (exAdapter.adapters === undefined || exAdapter.adapters.size < 2) { + throw new Error('bad storage config, at least two storage providers are required') + } + + console.log('moving files to storage provider', exAdapter.defaultAdapter) + + const workspaces = await listWorkspacesPure(db, productId) + for (const workspace of workspaces) { + if (cmd.workspace !== '' && workspace.workspace !== cmd.workspace) { + continue + } + + const wsId = getWorkspaceId(workspace.workspace, productId) + await moveFiles(toolCtx, wsId, exAdapter) + } + } catch (err: any) { + console.error(err) + } + }) + }) + }) + program.command('fix-bw-workspace ').action(async (workspace: string) => { const { mongodbUri } = prepareTools() await withStorage(mongodbUri, async (adapter) => { diff --git a/dev/tool/src/storage.ts b/dev/tool/src/storage.ts new file mode 100644 index 0000000000..098d4f6a1b --- /dev/null +++ b/dev/tool/src/storage.ts @@ -0,0 +1,60 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { type MeasureContext, type WorkspaceId } from '@hcengineering/core' +import { type StorageAdapterEx } from '@hcengineering/server-core' +import { PassThrough } from 'stream' + +export async function moveFiles ( + ctx: MeasureContext, + workspaceId: WorkspaceId, + exAdapter: StorageAdapterEx +): Promise { + if (exAdapter.adapters === undefined) return + + let count = 0 + + console.log('start', workspaceId.name) + + // We assume that the adapter moves all new files to the default adapter + const target = exAdapter.defaultAdapter + await exAdapter.adapters.get(target)?.make(ctx, workspaceId) + + for (const [name, adapter] of exAdapter.adapters.entries()) { + if (name === target) continue + + const iterator = await adapter.listStream(ctx, workspaceId) + while (true) { + const data = await iterator.next() + if (data === undefined) break + + const blob = await exAdapter.stat(ctx, workspaceId, data._id) + if (blob === undefined) continue + if (blob.provider === target) continue + + const readable = await exAdapter.get(ctx, workspaceId, data._id) + const stream = readable.pipe(new PassThrough()) + await exAdapter.put(ctx, workspaceId, data._id, stream, blob.contentType, blob.size) + + count += 1 + if (count % 100 === 0) { + console.log('...moved: ', count) + } + } + await iterator.close() + } + + console.log('...done', workspaceId.name, count) +} diff --git a/server/core/src/__tests__/aggregator.spec.ts b/server/core/src/__tests__/aggregator.spec.ts index a190a6c8d0..3c80a02ee1 100644 --- a/server/core/src/__tests__/aggregator.spec.ts +++ b/server/core/src/__tests__/aggregator.spec.ts @@ -24,7 +24,7 @@ describe('aggregator tests', () => { const ws1: WorkspaceId = { name: 'ws1', productId: '' } return { mem1, mem2, aggr, ws1, testCtx } } - it('reuse existing storage', async () => { + it('not reuse existing storage', async () => { const { mem1, aggr, ws1, testCtx } = prepare1() // Test default provider @@ -37,7 +37,7 @@ describe('aggregator tests', () => { // Test content typed provider await aggr.put(testCtx, ws1, 'test', 'data2', 'text/plain') const stat2 = await aggr.stat(testCtx, ws1, 'test') - expect(stat2?.provider).toEqual('mem1') + expect(stat2?.provider).toEqual('mem2') const dta = Buffer.concat(await aggr.read(testCtx, ws1, 'test')).toString() expect(dta).toEqual('data2') diff --git a/server/core/src/server/aggregator.ts b/server/core/src/server/aggregator.ts index 6ceee9a073..d7391a5691 100644 --- a/server/core/src/server/aggregator.ts +++ b/server/core/src/server/aggregator.ts @@ -317,12 +317,11 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE contentType: string, size?: number | undefined ): Promise { - // We need to reuse same provider for existing documents. const stat = ( await this.dbAdapter.find(ctx, workspaceId, DOMAIN_BLOB, { _id: objectName as Ref }, { limit: 1 }) ).shift() - const { provider, adapter } = this.selectProvider(stat?.provider, contentType) + const { provider, adapter } = this.selectProvider(undefined, contentType) const result = await adapter.put(ctx, workspaceId, objectName, stream, contentType, size) @@ -351,6 +350,13 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE } await this.dbAdapter.upload(ctx, workspaceId, DOMAIN_BLOB, [blobDoc]) + + // If the file is already stored in different provider, we need to remove it. + if (stat !== undefined && stat.provider !== provider) { + const adapter = this.adapters.get(stat.provider) + await adapter?.remove(ctx, workspaceId, [stat._id]) + } + return result } } From 7c1a1619e8fc45dd9ee534924a4cf9a6a7a24abe Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Fri, 2 Aug 2024 22:38:34 +0700 Subject: [PATCH 3/3] Revert rate limiter changes (#6237) Signed-off-by: Andrey Sobolev --- packages/core/src/utils.ts | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/packages/core/src/utils.ts b/packages/core/src/utils.ts index 075f0f535e..2fe36d2e10 100644 --- a/packages/core/src/utils.ts +++ b/packages/core/src/utils.ts @@ -13,9 +13,8 @@ // limitations under the License. // -import { getEmbeddedLabel, IntlString, PlatformError, unknownError } from '@hcengineering/platform' +import { getEmbeddedLabel, IntlString } from '@hcengineering/platform' import { deepEqual } from 'fast-equals' -import { DOMAIN_BENCHMARK } from './benchmark' import { Account, AccountRole, @@ -47,6 +46,7 @@ import { TxOperations } from './operations' import { isPredicate } from './predicate' import { DocumentQuery, FindResult } from './storage' import { DOMAIN_TX } from './tx' +import { DOMAIN_BENCHMARK } from './benchmark' function toHex (value: number, chars: number): string { const result = value.toString(16) @@ -355,6 +355,7 @@ export class DocManager implements IDocManager { export class RateLimiter { idCounter: number = 0 + processingQueue = new Map>() last: number = 0 rate: number @@ -365,21 +366,21 @@ export class RateLimiter { } notify: (() => void)[] = [] - finished: boolean = false async exec = any>(op: (args?: B) => Promise, args?: B): Promise { - if (this.finished) { - throw new PlatformError(unknownError('No Possible to add/exec on finished queue')) - } - while (this.notify.length >= this.rate) { + const processingId = this.idCounter++ + + while (this.processingQueue.size >= this.rate) { await new Promise((resolve) => { this.notify.push(resolve) }) } try { const p = op(args) + this.processingQueue.set(processingId, p as Promise) return await p } finally { + this.processingQueue.delete(processingId) const n = this.notify.shift() if (n !== undefined) { n() @@ -388,7 +389,7 @@ export class RateLimiter { } async add = any>(op: (args?: B) => Promise, args?: B): Promise { - if (this.notify.length < this.rate) { + if (this.processingQueue.size < this.rate) { void this.exec(op, args) } else { await this.exec(op, args) @@ -396,12 +397,7 @@ export class RateLimiter { } async waitProcessing (): Promise { - this.finished = true - while (this.notify.length > 0) { - await new Promise((resolve) => { - this.notify.push(resolve) - }) - } + await Promise.all(this.processingQueue.values()) } }