diff --git a/dev/.env b/dev/.env index 90fe452486..8306222d28 100644 --- a/dev/.env +++ b/dev/.env @@ -1 +1,2 @@ -STORAGE_CONFIG="minio|minio?accessKey=minioadmin&secretKey=minioadmin" \ No newline at end of file +STORAGE_CONFIG="minio|minio?accessKey=minioadmin&secretKey=minioadmin" +MONGO_URL=mongodb://mongodb:27017?compressors=snappy \ No newline at end of file diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index e7e9d000ac..2debbc9482 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -2,6 +2,15 @@ services: mongodb: image: 'mongo:7-jammy' container_name: mongodb + extra_hosts: + - "host.docker.internal:host-gateway" + healthcheck: + test: echo "try { db.currentOp().ok } catch (err) { }" | mongosh --port 27017 --quiet + interval: 5s + timeout: 30s + start_period: 0s + start_interval: 1s + retries: 30 environment: - PUID=1000 - PGID=1000 @@ -52,11 +61,11 @@ services: environment: - ACCOUNT_PORT=3000 - SERVER_SECRET=secret - - MONGO_URL=mongodb://mongodb:27017?compressors=snappy - - TRANSACTOR_URL=ws://transactor:3333;ws://localhost:3333 + - MONGO_URL=${MONGO_URL} + - TRANSACTOR_URL=ws://host.docker.internal:3333;ws://localhost:3333 - SES_URL= - STORAGE_CONFIG=${STORAGE_CONFIG} - - FRONT_URL=http://localhost:8087 + - FRONT_URL=http://host.docker.internal:8087 - RESERVED_DB_NAMES=telegram,gmail,github - MODEL_ENABLED=* - LAST_NAME_FIRST=true @@ -76,8 +85,8 @@ services: environment: - COLLABORATOR_PORT=3078 - SECRET=secret - - ACCOUNTS_URL=http://account:3000 - - MONGO_URL=mongodb://mongodb:27017?compressors=snappy + - ACCOUNTS_URL=http://host.docker.internal:3000 + - MONGO_URL=${MONGO_URL} - 'MONGO_OPTIONS={"appName":"collaborator","maxPoolSize":2}' - STORAGE_CONFIG=${STORAGE_CONFIG} restart: unless-stopped @@ -96,11 +105,11 @@ services: - UV_THREADPOOL_SIZE=10 - SERVER_PORT=8080 - SERVER_SECRET=secret - - MONGO_URL=mongodb://mongodb:27017?compressors=snappy + - MONGO_URL=${MONGO_URL} - 'MONGO_OPTIONS={"appName":"front","maxPoolSize":1}' - ACCOUNTS_URL=http://localhost:3000 - UPLOAD_URL=/files - - ELASTIC_URL=http://elastic:9200 + - ELASTIC_URL=http://host.docker.internal:9200 - GMAIL_URL=http://localhost:8088 - CALENDAR_URL=http://localhost:8095 - TELEGRAM_URL=http://localhost:8086 @@ -135,17 +144,17 @@ services: - SERVER_PORT=3333 - SERVER_SECRET=secret - ENABLE_COMPRESSION=true - - ELASTIC_URL=http://elastic:9200 - - MONGO_URL=mongodb://mongodb:27017?compressors=snappy + - ELASTIC_URL=http://host.docker.internal:9200 + - MONGO_URL=${MONGO_URL} - 'MONGO_OPTIONS={"appName": "transactor", "maxPoolSize": 10}' - METRICS_CONSOLE=false - METRICS_FILE=metrics.txt - STORAGE_CONFIG=${STORAGE_CONFIG} - - REKONI_URL=http://rekoni:4004 + - REKONI_URL=http://host.docker.internal:4004 - FRONT_URL=http://localhost:8087 # - APM_SERVER_URL=http://apm-server:8200 - SES_URL='' - - ACCOUNTS_URL=http://account:3000 + - ACCOUNTS_URL=http://host.docker.internal:3000 - LAST_NAME_FIRST=true - ELASTIC_INDEX_NAME=local_storage_index - BRANDING_PATH=/var/cfg/branding.json @@ -166,7 +175,7 @@ services: - 4005:4005 environment: - SECRET=secret - - MONGO_URL=mongodb://mongodb:27017?compressors=snappy + - MONGO_URL=${MONGO_URL} - 'MONGO_OPTIONS={"appName":"print","maxPoolSize":1}' - STORAGE_CONFIG=${STORAGE_CONFIG} deploy: @@ -183,11 +192,11 @@ services: - ../services/sign/pod-sign/debug/branding.json:/var/cfg/branding.json environment: - SECRET=secret - - MONGO_URL=mongodb://mongodb:27017 + - MONGO_URL=${MONGO_URL} - 'MONGO_OPTIONS={"appName":"sign","maxPoolSize":1}' - MINIO_ENDPOINT=minio - MINIO_ACCESS_KEY=minioadmin - - ACCOUNTS_URL=http://account:3000 + - ACCOUNTS_URL=http://host.docker.internal:3000 - MINIO_SECRET_KEY=minioadmin - CERTIFICATE_PATH=/var/cfg/certificate.p12 - SERVICE_ID=sign-service @@ -204,10 +213,10 @@ services: environment: - SECRET=secret - PORT=4007 - - MONGO_URL=mongodb://mongodb:27017 + - MONGO_URL=${MONGO_URL} - 'MONGO_OPTIONS={"appName":"analytics","maxPoolSize":1}' - SERVICE_ID=analytics-collector-service - - ACCOUNTS_URL=http://account:3000 + - ACCOUNTS_URL=http://host.docker.internal:3000 - SUPPORT_WORKSPACE=support deploy: resources: @@ -218,8 +227,8 @@ services: restart: unless-stopped environment: - SERVER_SECRET=secret - - MONGO_URL=mongodb://mongodb:27017 - - ACCOUNTS_URL=http://account:3000 + - MONGO_URL=${MONGO_URL} + - ACCOUNTS_URL=http://host.docker.internal:3000 - SUPPORT_WORKSPACE=support - FIRST_NAME=Jolie - LAST_NAME=AI @@ -236,11 +245,11 @@ services: # environment: # - PORT=4020 # - BOT_TOKEN=token -# - MONGO_URL=mongodb://mongodb:27017 +# - MONGO_URL=${MONGO_URL} # - MONGO_DB=telegram-bot # - SECRET=secret # - DOMAIN=domain -# - ACCOUNTS_URL=http://account:3000 +# - ACCOUNTS_URL=http://host.docker.internal:3000 # - SERVICE_ID=telegram-bot-service # deploy: # resources: diff --git a/dev/prod/src/platform.ts b/dev/prod/src/platform.ts index c8724610ae..0ddfee20db 100644 --- a/dev/prod/src/platform.ts +++ b/dev/prod/src/platform.ts @@ -107,7 +107,7 @@ import github, { githubId } from '@hcengineering/github' import '@hcengineering/github-assets' import { coreId } from '@hcengineering/core' -import presentation, { parsePreviewConfig, presentationId } from '@hcengineering/presentation' +import presentation, { loadServerConfig, parsePreviewConfig, presentationId } from '@hcengineering/presentation' import { setMetadata } from '@hcengineering/platform' import { setDefaultLanguage } from '@hcengineering/theme' @@ -239,13 +239,12 @@ export async function configurePlatform() { }) configureI18n() - const config: Config = await (await fetch( + const config: Config = await loadServerConfig( devConfigHuly ? '/config-huly.json' : ( devConfigBold ? '/config-bold.json' : ( devConfig ? '/config-dev.json' : '/config.json')) ) - ).json() const branding: BrandingMap = config.BRANDING_URL !== undefined ? await (await fetch(config.BRANDING_URL)).json() : {} const myBranding = branding[window.location.host] ?? {} diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index f7aebcb3d3..56671c74f9 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -90,7 +90,7 @@ import { restoreRecruitingTaskTypes } from './clean' import { changeConfiguration } from './configuration' -import { fixJsonMarkup } from './markup' +import { fixJsonMarkup, migrateMarkup } from './markup' import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin' import { fixAccountEmails, renameAccount } from './renameAccount' import { moveFiles } from './storage' @@ -1290,6 +1290,34 @@ export function devTool ( }) }) + program + .command('migrate-markup') + .description('migrates collaborative markup to storage') + .option('-w, --workspace ', 'Selected workspace only', '') + .option('-c, --concurrency ', 'Number of documents being processed concurrently', '10') + .action(async (cmd: { workspace: string, concurrency: string }) => { + const { mongodbUri } = prepareTools() + await withDatabase(mongodbUri, async (db, client) => { + await withStorage(mongodbUri, async (adapter) => { + const workspaces = await listWorkspacesPure(db) + for (const workspace of workspaces) { + if (cmd.workspace !== '' && workspace.workspace !== cmd.workspace) { + continue + } + + const wsId = getWorkspaceId(workspace.workspace) + const endpoint = await getTransactorEndpoint(generateToken(systemAccountEmail, wsId), 'external') + + console.log('processing workspace', workspace.workspace) + + await migrateMarkup(toolCtx, adapter, wsId, client, endpoint, parseInt(cmd.concurrency)) + + console.log('...done', workspace.workspace) + } + }) + }) + }) + program .command('remove-duplicates-ids ') .description('remove duplicates ids for futue migration') diff --git a/dev/tool/src/markup.ts b/dev/tool/src/markup.ts index da3cdb9d75..7c194924ff 100644 --- a/dev/tool/src/markup.ts +++ b/dev/tool/src/markup.ts @@ -1,19 +1,23 @@ +import { saveCollaborativeDoc } from '@hcengineering/collaboration' import core, { type AnyAttribute, type Class, type Client as CoreClient, type Doc, type Domain, + type Hierarchy, type MeasureContext, type Ref, type WorkspaceId, + RateLimiter, + collaborativeDocParse, makeCollaborativeDoc } from '@hcengineering/core' import { getMongoClient, getWorkspaceDB } from '@hcengineering/mongo' import { type StorageAdapter } from '@hcengineering/server-core' import { connect } from '@hcengineering/server-tool' -import { jsonToText } from '@hcengineering/text' -import { type Db } from 'mongodb' +import { jsonToText, markupToYDoc } from '@hcengineering/text' +import { type Db, type FindCursor, type MongoClient } from 'mongodb' export async function fixJsonMarkup ( ctx: MeasureContext, @@ -110,3 +114,105 @@ async function processFixJsonMarkupFor ( console.log('...processed', docs.length) } + +export async function migrateMarkup ( + ctx: MeasureContext, + storageAdapter: StorageAdapter, + workspaceId: WorkspaceId, + client: MongoClient, + transactorUrl: string, + concurrency: number +): Promise { + const connection = (await connect(transactorUrl, workspaceId, undefined, { + mode: 'backup' + })) as unknown as CoreClient + + const hierarchy = connection.getHierarchy() + + const workspaceDb = client.db(workspaceId.name) + + try { + const classes = hierarchy.getDescendants(core.class.Doc) + for (const _class of classes) { + const domain = hierarchy.findDomain(_class) + if (domain === undefined) continue + + const allAttributes = hierarchy.getAllAttributes(_class) + const attributes = Array.from(allAttributes.values()).filter((attribute) => { + return hierarchy.isDerived(attribute.type._class, 'core:class:TypeCollaborativeMarkup' as Ref>) + }) + + if (attributes.length === 0) continue + if (hierarchy.isMixin(_class) && attributes.every((p) => p.attributeOf !== _class)) continue + + const collection = workspaceDb.collection(domain) + + const filter = hierarchy.isMixin(_class) ? { [_class]: { $exists: true } } : { _class } + + const count = await collection.countDocuments(filter) + const iterator = collection.find(filter) + + try { + console.log('processing', _class, '->', count) + await processMigrateMarkupFor(ctx, hierarchy, storageAdapter, workspaceId, attributes, iterator, concurrency) + } finally { + await iterator.close() + } + } + } finally { + await connection.close() + } +} + +async function processMigrateMarkupFor ( + ctx: MeasureContext, + hierarchy: Hierarchy, + storageAdapter: StorageAdapter, + workspaceId: WorkspaceId, + attributes: AnyAttribute[], + iterator: FindCursor, + concurrency: number +): Promise { + const rateLimiter = new RateLimiter(concurrency) + + let processed = 0 + + while (true) { + const doc = await iterator.next() + if (doc === null) break + + const timestamp = Date.now() + const revisionId = `${timestamp}` + + await rateLimiter.exec(async () => { + for (const attribute of attributes) { + const collaborativeDoc = makeCollaborativeDoc(doc._id, attribute.name, revisionId) + const { documentId } = collaborativeDocParse(collaborativeDoc) + + const value = hierarchy.isMixin(attribute.attributeOf) + ? ((doc as any)[attribute.attributeOf]?.[attribute.name] as string) + : ((doc as any)[attribute.name] as string) + + if (value != null && value.startsWith('{')) { + const blob = await storageAdapter.stat(ctx, workspaceId, documentId) + // only for documents not in storage + if (blob === undefined) { + const ydoc = markupToYDoc(value, attribute.name) + await saveCollaborativeDoc(storageAdapter, workspaceId, collaborativeDoc, ydoc, ctx) + } + } + } + }) + + processed += 1 + + if (processed % 100 === 0) { + await rateLimiter.waitProcessing() + console.log('...processing', processed) + } + } + + await rateLimiter.waitProcessing() + + console.log('processed', processed) +} diff --git a/dev/tool/src/storage.ts b/dev/tool/src/storage.ts index 47387298de..27f7e23792 100644 --- a/dev/tool/src/storage.ts +++ b/dev/tool/src/storage.ts @@ -14,22 +14,22 @@ // import { type Blob, type MeasureContext, type WorkspaceId, RateLimiter } from '@hcengineering/core' -import { type StorageAdapterEx } from '@hcengineering/server-core' +import { type StorageAdapter, type StorageAdapterEx } from '@hcengineering/server-core' import { PassThrough } from 'stream' +export interface MoveFilesParams { + blobSizeLimitMb: number + concurrency: number +} + export async function moveFiles ( ctx: MeasureContext, workspaceId: WorkspaceId, exAdapter: StorageAdapterEx, - params: { - blobSizeLimitMb: number - concurrency: number - } + params: MoveFilesParams ): Promise { if (exAdapter.adapters === undefined) return - let count = 0 - console.log('start', workspaceId.name) // We assume that the adapter moves all new files to the default adapter @@ -38,74 +38,125 @@ export async function moveFiles ( for (const [name, adapter] of exAdapter.adapters.entries()) { if (name === target) continue + console.log('moving from', name, 'limit', params.blobSizeLimitMb, 'concurrency', params.concurrency) - let time = Date.now() + // we attempt retry the whole process in case of failure + // files that were already moved will be skipped + await retryOnFailure(ctx, 5, async () => { + await processAdapter(ctx, exAdapter, adapter, workspaceId, params) + }) + } - const rateLimiter = new RateLimiter(params.concurrency) + console.log('...done', workspaceId.name) +} - const iterator = await adapter.listStream(ctx, workspaceId) +async function processAdapter ( + ctx: MeasureContext, + exAdapter: StorageAdapterEx, + adapter: StorageAdapter, + workspaceId: WorkspaceId, + params: MoveFilesParams +): Promise { + const target = exAdapter.defaultAdapter + + let time = Date.now() + let processedCnt = 0 + let processedBytes = 0 + let skippedCnt = 0 + let movedCnt = 0 + let movedBytes = 0 + let batchBytes = 0 + + const rateLimiter = new RateLimiter(params.concurrency) + + const iterator = await adapter.listStream(ctx, workspaceId) + try { while (true) { const data = await iterator.next() if (data === undefined) break - const blob = await exAdapter.stat(ctx, workspaceId, data._id) - if (blob === undefined) continue - if (blob.provider === target) continue + const blob = + (await exAdapter.stat(ctx, workspaceId, data._id)) ?? (await adapter.stat(ctx, workspaceId, data._id)) - if (blob.size > params.blobSizeLimitMb * 1024 * 1024) { - console.log('skipping large blob', name, data._id, Math.round(blob.size / 1024 / 1024)) + if (blob === undefined) { + console.error('blob not found', data._id) continue } - await rateLimiter.exec(async () => { - try { - await retryOnFailure( - ctx, - 5, - async () => { - await moveFile(ctx, exAdapter, workspaceId, blob) - }, - 50 - ) - } catch (err) { - console.error('failed to process blob', name, data._id, err) + if (blob.provider !== target) { + if (blob.size <= params.blobSizeLimitMb * 1024 * 1024) { + await rateLimiter.exec(async () => { + try { + await retryOnFailure( + ctx, + 5, + async () => { + await processFile(ctx, exAdapter, adapter, workspaceId, blob) + }, + 50 + ) + movedCnt += 1 + movedBytes += blob.size + batchBytes += blob.size + } catch (err) { + console.error('failed to process blob', data._id, err) + } + }) + } else { + skippedCnt += 1 + console.log('skipping large blob', data._id, Math.round(blob.size / 1024 / 1024)) } - }) + } - count += 1 - if (count % 100 === 0) { + processedCnt += 1 + processedBytes += blob.size + + if (processedCnt % 100 === 0) { await rateLimiter.waitProcessing() + const duration = Date.now() - time + + console.log( + '...processed', + processedCnt, + Math.round(processedBytes / 1024 / 1024) + 'MB', + 'moved', + movedCnt, + Math.round(movedBytes / 1024 / 1024) + 'MB', + '+' + Math.round(batchBytes / 1024 / 1024) + 'MB', + 'skipped', + skippedCnt, + Math.round(duration / 1000) + 's' + ) + + batchBytes = 0 time = Date.now() - console.log('...moved: ', count, Math.round(duration / 1000)) } } await rateLimiter.waitProcessing() - + } finally { await iterator.close() } - - console.log('...done', workspaceId.name, count) } -async function moveFile ( +async function processFile ( ctx: MeasureContext, exAdapter: StorageAdapterEx, + adapter: StorageAdapter, workspaceId: WorkspaceId, blob: Blob ): Promise { - const readable = await exAdapter.get(ctx, workspaceId, blob._id) + const readable = await adapter.get(ctx, workspaceId, blob._id) try { readable.on('end', () => { readable.destroy() }) const stream = readable.pipe(new PassThrough()) await exAdapter.put(ctx, workspaceId, blob._id, stream, blob.contentType, blob.size) - } catch (err) { + } finally { readable.destroy() - throw err } } @@ -115,18 +166,19 @@ async function retryOnFailure ( op: () => Promise, delay: number = 0 ): Promise { - let error: any + let lastError: any while (retries > 0) { retries-- try { return await op() } catch (err: any) { - error = err + console.error(err) + lastError = err ctx.error('error', { err, retries }) if (retries !== 0 && delay > 0) { await new Promise((resolve) => setTimeout(resolve, delay)) } } } - throw error + throw lastError } diff --git a/models/core/src/migration.ts b/models/core/src/migration.ts index 6efe17ca4c..7282d9ae27 100644 --- a/models/core/src/migration.ts +++ b/models/core/src/migration.ts @@ -165,16 +165,20 @@ async function migrateCollaborativeContentToStorage (client: MigrationClient): P const domain = hierarchy.findDomain(_class) if (domain === undefined) continue - const attributes = hierarchy.getAllAttributes(_class) - const filtered = Array.from(attributes.values()).filter((attribute) => { + const allAttributes = hierarchy.getAllAttributes(_class) + const attributes = Array.from(allAttributes.values()).filter((attribute) => { return hierarchy.isDerived(attribute.type._class, core.class.TypeCollaborativeDoc) }) - if (filtered.length === 0) continue - const iterator = await client.traverse(domain, { _class }) + if (attributes.length === 0) continue + if (hierarchy.isMixin(_class) && attributes.every((p) => p.attributeOf !== _class)) continue + + const query = hierarchy.isMixin(_class) ? { [_class]: { $exists: true } } : { _class } + + const iterator = await client.traverse(domain, query) try { console.log('processing', _class) - await processMigrateContentFor(ctx, domain, filtered, client, storageAdapter, iterator) + await processMigrateContentFor(ctx, domain, attributes, client, storageAdapter, iterator) } finally { await iterator.close() } @@ -189,6 +193,8 @@ async function processMigrateContentFor ( storageAdapter: StorageAdapter, iterator: MigrationIterator ): Promise { + const hierarchy = client.hierarchy + const rateLimiter = new RateLimiter(10) let processed = 0 @@ -211,7 +217,14 @@ async function processMigrateContentFor ( for (const attribute of attributes) { const collaborativeDoc = makeCollaborativeDoc(doc._id, attribute.name, revisionId) - const value = (doc as any)[attribute.name] as string + const value = hierarchy.isMixin(attribute.attributeOf) + ? ((doc as any)[attribute.attributeOf]?.[attribute.name] as string) + : ((doc as any)[attribute.name] as string) + + const attributeName = hierarchy.isMixin(attribute.attributeOf) + ? `${attribute.attributeOf}.${attribute.name}` + : attribute.name + if (value != null && value.startsWith('{')) { const { documentId } = collaborativeDocParse(collaborativeDoc) const blob = await storageAdapter.stat(ctx, client.workspaceId, documentId) @@ -221,9 +234,9 @@ async function processMigrateContentFor ( await saveCollaborativeDoc(storageAdapter, client.workspaceId, collaborativeDoc, ydoc, ctx) } - update[attribute.name] = collaborativeDoc - } else if (value == null) { - update[attribute.name] = makeCollaborativeDoc(doc._id, attribute.name, revisionId) + update[attributeName] = collaborativeDoc + } else if (value == null || value === '') { + update[attributeName] = collaborativeDoc } } diff --git a/packages/presentation/src/utils.ts b/packages/presentation/src/utils.ts index 5224918cb9..cfa4590862 100644 --- a/packages/presentation/src/utils.ts +++ b/packages/presentation/src/utils.ts @@ -693,3 +693,28 @@ export function setDownloadProgress (percent: number): void { upgradeDownloadProgress.set(Math.round(percent)) } + +export async function loadServerConfig (url: string): Promise { + let retries = 5 + let res: Response | undefined + + do { + try { + res = await fetch(url) + break + } catch (e: any) { + retries-- + if (retries === 0) { + throw new Error(`Failed to load server config: ${e}`) + } + await new Promise((resolve) => setTimeout(resolve, 1000 * (5 - retries))) + } + } while (retries > 0) + + if (res === undefined) { + // In theory should never get here + throw new Error('Failed to load server config') + } + + return await res.json() +} diff --git a/plugins/guest-resources/src/connect.ts b/plugins/guest-resources/src/connect.ts index 53285fc166..0d8c9349e5 100644 --- a/plugins/guest-resources/src/connect.ts +++ b/plugins/guest-resources/src/connect.ts @@ -11,10 +11,15 @@ import core, { } from '@hcengineering/core' import login, { loginId } from '@hcengineering/login' import { getMetadata, getResource, setMetadata } from '@hcengineering/platform' -import presentation, { closeClient, refreshClient, setClient, setPresentationCookie } from '@hcengineering/presentation' +import presentation, { + closeClient, + loadServerConfig, + refreshClient, + setClient, + setPresentationCookie +} from '@hcengineering/presentation' import { fetchMetadataLocalStorage, getCurrentLocation, navigate, setMetadataLocalStorage } from '@hcengineering/ui' import { writable } from 'svelte/store' - export const versionError = writable(undefined) const versionStorageKey = 'last_server_version' @@ -113,7 +118,7 @@ export async function connect (title: string): Promise { const frontUrl = getMetadata(presentation.metadata.FrontUrl) ?? '' const currentFrontVersion = getMetadata(presentation.metadata.FrontVersion) if (currentFrontVersion !== undefined) { - const frontConfig = await (await fetch(concatLink(frontUrl, '/config.json'))).json() + const frontConfig = await loadServerConfig(concatLink(frontUrl, '/config.json')) if (frontConfig?.version !== undefined && frontConfig.version !== currentFrontVersion) { location.reload() } diff --git a/plugins/workbench-resources/src/connect.ts b/plugins/workbench-resources/src/connect.ts index 377ec30b8c..787ce42611 100644 --- a/plugins/workbench-resources/src/connect.ts +++ b/plugins/workbench-resources/src/connect.ts @@ -17,6 +17,7 @@ import login, { loginId } from '@hcengineering/login' import { broadcastEvent, getMetadata, getResource, setMetadata } from '@hcengineering/platform' import presentation, { closeClient, + loadServerConfig, purgeClient, refreshClient, setClient, @@ -221,7 +222,7 @@ export async function connect (title: string): Promise { const frontUrl = getMetadata(presentation.metadata.FrontUrl) ?? '' const currentFrontVersion = getMetadata(presentation.metadata.FrontVersion) if (currentFrontVersion !== undefined) { - const frontConfig = await (await fetch(concatLink(frontUrl, '/config.json'))).json() + const frontConfig = await loadServerConfig(concatLink(frontUrl, '/config.json')) if (frontConfig?.version !== undefined && frontConfig.version !== currentFrontVersion) { location.reload() } diff --git a/server/mongo/src/storage.ts b/server/mongo/src/storage.ts index 051af5249c..4644eda2fb 100644 --- a/server/mongo/src/storage.ts +++ b/server/mongo/src/storage.ts @@ -75,7 +75,6 @@ import { createHash } from 'crypto' import { type AbstractCursor, type AnyBulkWriteOperation, - type BulkWriteResult, type Collection, type Db, type Document, @@ -1107,57 +1106,6 @@ class MongoAdapter extends MongoAdapterBase { } } - bulkOps = new Map[]>() - - async _pushBulk (ctx: MeasureContext): Promise { - const bulk = Array.from(this.bulkOps.entries()) - this.bulkOps.clear() - if (bulk.length === 0) { - return - } - const promises: Promise[] = [] - for (const [domain, ops] of bulk) { - if (ops === undefined || ops.length === 0) { - continue - } - const coll = this.db.collection(domain) - - promises.push( - addOperation( - ctx, - 'bulk-write', - { domain, operations: ops.length }, - async (ctx) => - await ctx.with( - 'bulk-write', - { domain }, - () => - coll.bulkWrite(ops, { - ordered: false - }), - { - domain, - operations: ops.length - } - ) - ) - ) - } - await Promise.all(promises) - } - - async pushBulk (ctx: MeasureContext, domain: Domain, ops: AnyBulkWriteOperation[]): Promise { - const existing = this.bulkOps.get(domain) - if (existing !== undefined) { - existing.push(...ops) - } else { - this.bulkOps.set(domain, ops) - } - // We need to wait next cycle to send request - await new Promise((resolve) => setImmediate(resolve)) - await this._pushBulk(ctx) - } - async tx (ctx: MeasureContext, ...txes: Tx[]): Promise { const result: TxResult[] = [] @@ -1171,6 +1119,7 @@ class MongoAdapter extends MongoAdapterBase { const stTime = Date.now() const st = Date.now() + let promises: Promise[] = [] for (const [domain, txs] of byDomain) { if (domain === undefined) { continue @@ -1227,9 +1176,37 @@ class MongoAdapter extends MongoAdapterBase { } if (ops.length > 0) { - await this.pushBulk(ctx, domain, ops) + if (ops === undefined || ops.length === 0) { + continue + } + const coll = this.db.collection(domain) + + promises.push( + addOperation( + ctx, + 'bulk-write', + { domain, operations: ops.length }, + async (ctx) => + await ctx.with( + 'bulk-write', + { domain }, + () => + coll.bulkWrite(ops, { + ordered: false + }), + { + domain, + operations: ops.length + } + ) + ) + ) } if (domainBulk.findUpdate.size > 0) { + if (promises.length > 0) { + await Promise.all(promises) + promises = [] + } const coll = this.db.collection(domain) await ctx.with( @@ -1255,6 +1232,10 @@ class MongoAdapter extends MongoAdapterBase { } if (domainBulk.raw.length > 0) { + if (promises.length > 0) { + await Promise.all(promises) + promises = [] + } await ctx.with( 'raw', {}, @@ -1270,6 +1251,9 @@ class MongoAdapter extends MongoAdapterBase { ) } } + if (promises.length > 0) { + await Promise.all(promises) + } return result } @@ -1511,17 +1495,12 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter { await this._db.init(DOMAIN_TX) } - txBulk: Tx[] = [] - - async _bulkTx (ctx: MeasureContext): Promise { - const txes = this.txBulk - this.txBulk = [] - - if (txes.length === 0) { - return + override async tx (ctx: MeasureContext, ...tx: Tx[]): Promise { + if (tx.length === 0) { + return [] } - const opName = txes.length === 1 ? 'tx-one' : 'tx' + const opName = tx.length === 1 ? 'tx-one' : 'tx' await addOperation( ctx, opName, @@ -1532,31 +1511,20 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter { { domain: 'tx' }, () => this.txCollection().insertMany( - txes.map((it) => translateDoc(it)), + tx.map((it) => translateDoc(it)), { ordered: false } ), { - count: txes.length + count: tx.length } ), - { domain: 'tx', count: txes.length } + { domain: 'tx', count: tx.length } ) ctx.withSync('handleEvent', {}, () => { - this.handleEvent(DOMAIN_TX, 'add', txes.length) + this.handleEvent(DOMAIN_TX, 'add', tx.length) }) - } - - override async tx (ctx: MeasureContext, ...tx: Tx[]): Promise { - if (tx.length === 0) { - return [] - } - this.txBulk.push(...tx) - - // We need to wait next cycle to send request - await new Promise((resolve) => setImmediate(resolve)) - await this._bulkTx(ctx) return [] } diff --git a/services/github/github-resources/src/components/presenters/GithubReviewPresenter.svelte b/services/github/github-resources/src/components/presenters/GithubReviewPresenter.svelte index b06a6803eb..dcdb57ff49 100644 --- a/services/github/github-resources/src/components/presenters/GithubReviewPresenter.svelte +++ b/services/github/github-resources/src/components/presenters/GithubReviewPresenter.svelte @@ -3,24 +3,17 @@ // -->