Merge branch 'develop' into staging

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2025-01-22 22:14:42 +07:00
commit 1a3ee1de3e
No known key found for this signature in database
GPG Key ID: BD80F68D68D8F7F2
11 changed files with 176 additions and 75 deletions

33
dev/tool/src/fulltext.ts Normal file
View File

@ -0,0 +1,33 @@
//
// Copyright © 2025 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { type MeasureContext } from '@hcengineering/core'
export async function reindexWorkspace (ctx: MeasureContext, fulltextUrl: string, token: string): Promise<void> {
try {
const res = await fetch(fulltextUrl + '/api/v1/reindex', {
method: 'PUT',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ token })
})
if (!res.ok) {
throw new Error(`HTTP Error ${res.status} ${res.statusText}`)
}
} catch (err: any) {
ctx.error('failed to reset index', { err })
}
}

View File

@ -77,7 +77,7 @@ import { buildStorageFromConfig, createStorageFromConfig, storageConfigFromEnv }
import { program, type Command } from 'commander'
import { addControlledDocumentRank } from './qms'
import { clearTelegramHistory } from './telegram'
import { diffWorkspace, recreateElastic, updateField } from './workspace'
import { diffWorkspace, updateField } from './workspace'
import core, {
AccountRole,
@ -149,6 +149,7 @@ import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin'
import { fixAccountEmails, renameAccount } from './renameAccount'
import { copyToDatalake, moveFiles, showLostFiles } from './storage'
import { createPostgresTxAdapter, createPostgresAdapter, createPostgreeDestroyAdapter } from '@hcengineering/postgres'
import { reindexWorkspace } from './fulltext'
const colorConstants = {
colorRed: '\u001b[31m',
@ -1925,27 +1926,43 @@ export function devTool (
)
program
.command('recreate-elastic-indexes-mongo <workspace>')
.description('reindex workspace to elastic')
.command('fulltext-reindex <workspace>')
.description('reindex workspace')
.action(async (workspace: string) => {
const mongodbUri = getMongoDBUrl()
const fulltextUrl = process.env.FULLTEXT_URL
if (fulltextUrl === undefined) {
console.error('please provide FULLTEXT_URL')
process.exit(1)
}
const wsid = getWorkspaceId(workspace)
await recreateElastic(mongodbUri, wsid)
const token = generateToken(systemAccountEmail, wsid)
console.log('reindex workspace', workspace)
await reindexWorkspace(toolCtx, fulltextUrl, token)
console.log('done', workspace)
})
program
.command('recreate-all-elastic-indexes-mongo')
.description('reindex elastic')
.command('fulltext-reindex-all')
.description('reindex workspaces')
.action(async () => {
const { dbUrl } = prepareTools()
const mongodbUri = getMongoDBUrl()
const fulltextUrl = process.env.FULLTEXT_URL
if (fulltextUrl === undefined) {
console.error('please provide FULLTEXT_URL')
process.exit(1)
}
await withAccountDatabase(async (db) => {
const workspaces = await listWorkspacesRaw(db)
workspaces.sort((a, b) => b.lastVisit - a.lastVisit)
for (const workspace of workspaces) {
const wsid = getWorkspaceId(workspace.workspace)
await recreateElastic(mongodbUri ?? dbUrl, wsid)
const token = generateToken(systemAccountEmail, wsid)
console.log('reindex workspace', workspace)
await reindexWorkspace(toolCtx, fulltextUrl, token)
console.log('done', workspace)
}
})
})

View File

@ -20,7 +20,6 @@ import core, {
type Class,
type Client as CoreClient,
type Doc,
DOMAIN_DOC_INDEX_STATE,
DOMAIN_TX,
type Ref,
type Tx,
@ -96,16 +95,3 @@ export async function updateField (
await connection.close()
}
}
export async function recreateElastic (mongoUrl: string, workspaceId: WorkspaceId): Promise<void> {
const client = getMongoClient(mongoUrl)
const _client = await client.getClient()
try {
const db = getWorkspaceMongoDB(_client, workspaceId)
await db
.collection(DOMAIN_DOC_INDEX_STATE)
.updateMany({ _class: core.class.DocIndexState }, { $set: { needIndex: true } })
} finally {
client.close()
}
}

View File

@ -72,7 +72,7 @@ describe('TimeRateLimiter', () => {
const operations = Promise.all([limiter.exec(mockFn), limiter.exec(mockFn), limiter.exec(mockFn)])
expect(mockFn).toHaveBeenCalledTimes(2)
expect(limiter.processingQueue.size).toBe(2)
expect(limiter.active).toBe(2)
jest.advanceTimersByTime(500)
await Promise.resolve()
@ -84,7 +84,7 @@ describe('TimeRateLimiter', () => {
await Promise.resolve()
await Promise.resolve()
expect(limiter.processingQueue.size).toBe(0)
expect(limiter.active).toBe(0)
expect(mockFn).toHaveBeenCalledTimes(3)
@ -104,7 +104,7 @@ describe('TimeRateLimiter', () => {
console.log('wait complete')
})
expect(limiter.processingQueue.size).toBe(1)
expect(limiter.active).toBe(1)
jest.advanceTimersByTime(1001)
await Promise.resolve()
@ -113,6 +113,6 @@ describe('TimeRateLimiter', () => {
await waitPromise
await operation
expect(limiter.processingQueue.size).toBe(0)
expect(limiter.active).toBe(0)
})
})

View File

@ -846,7 +846,7 @@ export function pluginFilterTx (
export class TimeRateLimiter {
idCounter: number = 0
processingQueue = new Map<number, Promise<void>>()
active: number = 0
last: number = 0
rate: number
period: number
@ -866,9 +866,7 @@ export class TimeRateLimiter {
}
async exec<T, B extends Record<string, any> = any>(op: (args?: B) => Promise<T>, args?: B): Promise<T> {
const processingId = this.idCounter++
while (this.processingQueue.size >= this.rate || this.executions.length >= this.rate) {
while (this.active >= this.rate || this.executions.length >= this.rate) {
this.cleanupExecutions()
if (this.executions.length < this.rate) {
break
@ -882,11 +880,11 @@ export class TimeRateLimiter {
try {
this.executions.push(v)
const p = op(args)
this.processingQueue.set(processingId, p as Promise<void>)
this.active++
return await p
} finally {
v.running = false
this.processingQueue.delete(processingId)
this.active--
this.cleanupExecutions()
const n = this.notify.shift()
if (n !== undefined) {
@ -896,8 +894,8 @@ export class TimeRateLimiter {
}
async waitProcessing (): Promise<void> {
while (this.processingQueue.size > 0) {
console.log('wait', this.processingQueue.size)
while (this.active > 0) {
console.log('wait', this.active)
await new Promise<void>((resolve) => {
this.notify.push(resolve)
})

View File

@ -161,6 +161,14 @@ class WorkspaceIndexer {
return result
}
async reindex (): Promise<void> {
await this.fulltext.cancel()
await this.fulltext.clearIndex()
await this.fulltext.startIndexing(() => {
this.lastUpdate = Date.now()
})
}
async close (): Promise<void> {
await this.fulltext.cancel()
await this.pipeline.close()
@ -188,6 +196,10 @@ interface Search {
fullTextLimit: number
}
interface Reindex {
token: string
}
export async function startIndexer (
ctx: MeasureContext,
opt: {
@ -391,6 +403,26 @@ export async function startIndexer (
}
})
router.put('/api/v1/reindex', async (req, res) => {
try {
const request = req.request.body as Reindex
const decoded = decodeToken(request.token) // Just to be safe
req.body = {}
ctx.info('reindex', { workspace: decoded.workspace })
const indexer = await getIndexer(ctx, decoded.workspace, request.token, true)
if (indexer !== undefined) {
indexer.lastUpdate = Date.now()
await indexer.reindex()
}
} catch (err: any) {
Analytics.handleError(err)
console.error(err)
req.res.writeHead(404, {})
req.res.end()
}
})
app.use(router.routes()).use(router.allowedMethods())
const server = app.listen(opt.port, () => {

View File

@ -15,6 +15,7 @@
//
import activity, { ActivityMessage, DocUpdateMessage } from '@hcengineering/activity'
import { Analytics } from '@hcengineering/analytics'
import chunter, { ChatMessage } from '@hcengineering/chunter'
import contact, {
Employee,
@ -39,7 +40,6 @@ import core, {
generateId,
MeasureContext,
MixinUpdate,
RateLimiter,
Ref,
RefTo,
SortingOrder,
@ -82,7 +82,6 @@ import serverView from '@hcengineering/server-view'
import { markupToText, stripTags } from '@hcengineering/text-core'
import { encodeObjectURI } from '@hcengineering/view'
import { workbenchId } from '@hcengineering/workbench'
import { Analytics } from '@hcengineering/analytics'
import { Content, ContextsCache, ContextsCacheKey, NotifyParams, NotifyResult } from './types'
import {
@ -92,6 +91,7 @@ import {
getNotificationContent,
getNotificationLink,
getNotificationProviderControl,
getObjectSpace,
getTextPresenter,
getUsersInfo,
isAllowed,
@ -103,8 +103,7 @@ import {
replaceAll,
toReceiverInfo,
updateNotifyContextsSpace,
type NotificationProviderControl,
getObjectSpace
type NotificationProviderControl
} from './utils'
export function getPushCollaboratorTx (
@ -602,13 +601,7 @@ export async function createPushNotification (
}
}
const limiter = new RateLimiter(5)
for (const subscription of userSubscriptions) {
await limiter.add(async () => {
await sendPushToSubscription(sesURL, sesAuth, control, target, subscription, data)
})
}
await limiter.waitProcessing()
void sendPushToSubscription(sesURL, sesAuth, control, target, userSubscriptions, data)
}
async function sendPushToSubscription (
@ -616,11 +609,11 @@ async function sendPushToSubscription (
sesAuth: string | undefined,
control: TriggerControl,
targetUser: Ref<Account>,
subscription: PushSubscription,
subscriptions: PushSubscription[],
data: PushData
): Promise<void> {
try {
const result: 'ok' | 'clear-push' = (
const result: Ref<PushSubscription>[] = (
await (
await fetch(concatLink(sesURL, '/web-push'), {
method: 'post',
@ -629,15 +622,17 @@ async function sendPushToSubscription (
...(sesAuth != null ? { Authorization: `Bearer ${sesAuth}` } : {})
},
body: JSON.stringify({
subscription,
subscriptions,
data
})
})
).json()
).result
if (result === 'clear-push') {
const tx = control.txFactory.createTxRemoveDoc(subscription._class, subscription.space, subscription._id)
await control.apply(control.ctx, [tx])
if (result.length > 0) {
const domain = control.hierarchy.findDomain(notification.class.PushSubscription)
if (domain !== undefined) {
await control.lowLevel.clean(control.ctx, domain, result)
}
}
} catch (err) {
control.ctx.info('Cannot send push notification to', { user: targetUser, err })

View File

@ -165,6 +165,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
triggerIndexing = (): void => {}
async startIndexing (indexing: () => void): Promise<void> {
this.cancelling = false
this.verify = this.verifyWorkspace(this.metrics, indexing)
void this.verify.then(() => {
this.indexing = this.doIndexing(indexing)
@ -282,6 +283,19 @@ export class FullTextIndexPipeline implements FullTextPipeline {
}
}
async clearIndex (): Promise<void> {
const ctx = this.metrics
const migrations = await this.storage.findAll<MigrationState>(ctx, core.class.MigrationState, {
plugin: coreId,
state: {
$in: ['verify-indexes-v2', 'full-text-indexer-v4', 'full-text-structure-v4']
}
})
const refs = migrations.map((it) => it._id)
await this.storage.clean(ctx, DOMAIN_MIGRATION, refs)
}
broadcastClasses = new Set<Ref<Class<Doc>>>()
broadcasts: number = 0

View File

@ -80,6 +80,7 @@ import {
createTables,
DBCollectionHelper,
type DBDoc,
dbExtra,
getDBClient,
getPrepare,
inferType,
@ -673,7 +674,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
sqlChunks.push(secJoin)
}
if (joins.length > 0) {
sqlChunks.push(this.buildJoinString(joins))
sqlChunks.push(this.buildJoinString(vars, joins))
}
sqlChunks.push(`WHERE ${this.buildQuery(vars, _class, domain, query, joins, options)}`)
@ -699,7 +700,9 @@ abstract class PostgresAdapterBase implements DbAdapter {
const finalSql: string = [select, ...sqlChunks].join(' ')
fquery = finalSql
const result = await connection.unsafe(finalSql, vars.getValues(), getPrepare())
const result = dbExtra?.useCF
? await connection.unsafe(vars.injectVars(finalSql), undefined, { prepare: false })
: await connection.unsafe(finalSql, vars.getValues(), getPrepare())
if (
options?.lookup === undefined &&
options?.domainLookup === undefined &&
@ -934,19 +937,19 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
}
private buildJoinString (value: JoinProps[]): string {
private buildJoinString (vars: ValuesVariables, value: JoinProps[]): string {
const res: string[] = []
for (const val of value) {
if (val.isReverse) continue
if (val.table === DOMAIN_MODEL) continue
res.push(
`LEFT JOIN ${val.table} AS ${val.toAlias} ON ${val.fromAlias}.${val.fromField} = ${val.toAlias}."${val.toField}" AND ${val.toAlias}."workspaceId" = '${this.workspaceId.name}'`
`LEFT JOIN ${val.table} AS ${val.toAlias} ON ${val.fromAlias}.${val.fromField} = ${val.toAlias}."${val.toField}" AND ${val.toAlias}."workspaceId" = ${vars.add(this.workspaceId.name, '::uuid')}`
)
if (val.classes !== undefined) {
if (val.classes.length === 1) {
res.push(`AND ${val.toAlias}._class = '${val.classes[0]}'`)
res.push(`AND ${val.toAlias}._class = ${vars.add(val.classes[0], '::text')}`)
} else {
res.push(`AND ${val.toAlias}._class IN (${val.classes.map((c) => `'${c}'`).join(', ')})`)
res.push(`AND ${val.toAlias}._class = ANY (${vars.addArray(val.classes, '::text[]')})`)
}
}
}
@ -1251,7 +1254,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
private translateQueryValue (vars: ValuesVariables, tkey: string, value: any, type: ValueType): string | undefined {
const tkeyData = tkey.includes('data->') || tkey.includes('data#>>')
const tkeyData = tkey.includes('data') && (tkey.includes('->') || tkey.includes('#>>'))
if (tkeyData && (Array.isArray(value) || (typeof value !== 'object' && typeof value !== 'string'))) {
value = Array.isArray(value) ? value.map((it) => (it == null ? null : `${it}`)) : `${value}`
}

View File

@ -15,4 +15,8 @@
import { main } from './main'
void main()
void main().catch((err) => {
if (err != null) {
console.error(err)
}
})

View File

@ -13,6 +13,7 @@
// limitations under the License.
//
import type { Ref } from '@hcengineering/core'
import { PushSubscription, type PushData } from '@hcengineering/notification'
import type { Request, Response } from 'express'
import webpush, { WebPushError } from 'web-push'
@ -22,25 +23,39 @@ import { SES } from './ses'
import { Endpoint } from './types'
const errorMessages = ['expired', 'Unregistered', 'No such subscription']
async function sendPushToSubscription (subscription: PushSubscription, data: PushData): Promise<'ok' | 'clear-push'> {
try {
await webpush.sendNotification(subscription, JSON.stringify(data))
} catch (err: any) {
if (err instanceof WebPushError) {
if (errorMessages.some((p) => JSON.stringify((err as WebPushError).body).includes(p))) {
return 'clear-push'
async function sendPushToSubscription (
subscriptions: PushSubscription[],
data: PushData
): Promise<Ref<PushSubscription>[]> {
const result: Ref<PushSubscription>[] = []
for (const subscription of subscriptions) {
try {
await webpush.sendNotification(subscription, JSON.stringify(data))
} catch (err: any) {
if (err instanceof WebPushError) {
if (errorMessages.some((p) => JSON.stringify((err as WebPushError).body).includes(p))) {
result.push(subscription._id)
}
}
}
}
return 'ok'
return result
}
export const main = async (): Promise<void> => {
const ses = new SES()
console.log('SES service has been started')
let webpushInitDone = false
if (config.PushPublicKey !== undefined && config.PushPrivateKey !== undefined) {
webpush.setVapidDetails(config.PushSubject ?? 'mailto:hey@huly.io', config.PushPublicKey, config.PushPublicKey)
try {
const subj = config.PushSubject ?? 'mailto:hey@huly.io'
console.log('Setting VAPID details', subj, config.PushPublicKey.length, config.PushPrivateKey.length)
webpush.setVapidDetails(config.PushSubject ?? 'mailto:hey@huly.io', config.PushPublicKey, config.PushPrivateKey)
webpushInitDone = true
} catch (err: any) {
console.error(err)
}
}
const checkAuth = (req: Request<any>, res: Response<any>): boolean => {
@ -104,14 +119,18 @@ export const main = async (): Promise<void> => {
res.status(400).send({ err: "'data' is missing" })
return
}
const subscription: PushSubscription | undefined = req.body?.subscription
if (subscription === undefined) {
res.status(400).send({ err: "'subscription' is missing" })
const subscriptions: PushSubscription[] | undefined = req.body?.subscriptions
if (subscriptions === undefined) {
res.status(400).send({ err: "'subscriptions' is missing" })
return
}
if (!webpushInitDone) {
res.json({ result: [] }).end()
return
}
const result = await sendPushToSubscription(subscription, data)
res.json({ result })
const result = await sendPushToSubscription(subscriptions, data)
res.json({ result }).end()
}
}
]