Merge branch 'develop' into staging-new

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2025-05-30 10:15:26 +07:00
commit b8ded04adf
No known key found for this signature in database
GPG Key ID: BD80F68D68D8F7F2
55 changed files with 704 additions and 454 deletions

9
.vscode/launch.json vendored
View File

@ -124,7 +124,7 @@
// "DB_URL": "postgresql://postgres:example@localhost:5432",
"DB_URL": "postgresql://root@huly.local:26258/defaultdb?sslmode=disable",
// "GREEN_URL": "http://huly.local:6767?token=secret",
"SERVER_PORT": "3334",
"SERVER_PORT": "3335",
"METRICS_CONSOLE": "false",
"DEBUG_PRINT_SQL": "true",
"METRICS_FILE": "${workspaceRoot}/metrics.txt", // Show metrics in console evert 30 seconds.,
@ -135,7 +135,7 @@
"FRONT_URL": "http://localhost:8083",
"ACCOUNTS_URL": "http://localhost:3003",
"MODEL_JSON": "${workspaceRoot}/models/all/bundle/model.json",
"MODEL_VERSION": "0.7.75",
"MODEL_VERSION": "0.7.110",
"STATS_URL": "http://huly.local:4901",
"QUEUE_CONFIG": "localhost:19093"
},
@ -248,7 +248,7 @@
"request": "launch",
"args": ["src/__start.ts"],
"env": {
"PORT": "4900",
"PORT": "4901",
"SERVER_SECRET": "secret"
},
"runtimeVersion": "20",
@ -585,13 +585,12 @@
"PLATFORM_OPERATION_LOGGING": "true",
"FRONT_URL": "http://localhost:8080",
"PORT": "3500",
"STATS_URL": "http://huly.local:4900"
"STATS_URL": "http://huly.local:4901"
},
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
"sourceMaps": true,
"cwd": "${workspaceRoot}/services/github/pod-github",
"protocol": "inspector",
"attachSimplePort": 0,
"outputCapture": "std"
},
{

View File

@ -254,14 +254,7 @@ export async function benchmark (
// operations = 0
requestTime = 0
// transfer = 0
const r = extract(
json.metrics as Metrics,
'🧲 session',
'client',
'handleRequest',
'process',
'find-all'
)
const r = extract(json.metrics as Metrics, '🧲 session', 'client', 'process', 'find-all')
operations = (r?.operations ?? 0) - oldOperations
oldOperations = r?.operations ?? 0

View File

@ -157,7 +157,7 @@ export async function backupRestore (
dataId: workspace.dataId,
url: workspace.url
}
const result: boolean = await ctx.with('restore', { workspace: workspace.url }, (ctx) =>
const result: boolean = await ctx.with('restore', {}, (ctx) =>
restore(ctx, '', wsUrl, storage, {
date: -1,
skip: new Set(skipDomains),

View File

@ -84,7 +84,9 @@ export function updateMeasure (
const fParams = typeof fullParams === 'function' ? fullParams() : fullParams
// Update params if required
for (const [k, v] of Object.entries(params)) {
const pparams = Object.entries(params)
if (pparams.length > 0) {
const [k, v] = pparams[0]
let params = metrics.params[k]
if (params === undefined) {
params = {}
@ -106,7 +108,22 @@ export function updateMeasure (
param.operations++
}
// Do not update top results for params.
// param.topResult = getUpdatedTopResult(param.topResult, ed - st, fParams)
if (pparams.length > 1) {
// We need to update all other params as counters.
if (param.topResult === undefined) {
param.topResult = []
}
for (const [, v] of pparams.slice(1)) {
const r = (param.topResult ?? []).find((it) => it.params[`${v}`] === true)
if (r !== undefined) {
r.value += 1 // Counter of operations
r.time = (r.time ?? 0) + (value ?? ed - st)
} else {
param.topResult.push({ params: { [`${v}`]: true }, value: 1, time: value ?? ed - st })
}
}
param.topResult.sort((a, b) => b.value - a.value)
}
}
// Update leaf data
if (override === true) {

View File

@ -21,6 +21,7 @@ export interface MetricsData {
value: number
topResult?: {
value: number
time?: number
params: FullParamsType
}[]
}

View File

@ -46,6 +46,7 @@ export interface SessionData {
contextCache: Map<string, any>
removedMap: Map<Ref<Doc>, Doc>
account: Account
service: string
sessionId: string
admin?: boolean
isTriggerCtx?: boolean

View File

@ -12,11 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-->
<script context="module" lang="ts">
let fullScreenMode: boolean = false
export function getFullScreenMode (): boolean {
return fullScreenMode
}
function setFullScreenMode (value: boolean): void {
fullScreenMode = value
}
</script>
<script lang="ts">
import { popupstore as popups } from '../popups'
import { modalStore as modals } from '../modals'
import PopupInstance from './PopupInstance.svelte'
import { onDestroy, onMount } from 'svelte'
export let contentPanel: HTMLElement | undefined = undefined
export let fullScreen: boolean = false
@ -27,11 +39,18 @@
instances.forEach((p) => p.fitPopupInstance())
}
onMount(() => {
if (fullScreen) setFullScreenMode(true)
})
onDestroy(() => {
if (fullScreen) setFullScreenMode(false)
})
const shouldDisplayPopup = (popup: any): boolean => {
return (
(fullScreen && document.fullscreenElement != null && popup.element !== 'full-centered') ||
(!fullScreen && document.fullscreenElement != null && popup.element === 'full-centered') ||
(!fullScreen && document.fullscreenElement == null)
(fullScreen && fullScreenMode && popup.element !== 'full-centered') ||
(!fullScreen && fullScreenMode && popup.element === 'full-centered') ||
(!fullScreen && !fullScreenMode)
)
}

View File

@ -12,8 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-->
<script context="module" lang="ts">
let fullScreenMode: boolean = false
export function getFullScreenMode (): boolean {
return fullScreenMode
}
function setFullScreenMode (value: boolean): void {
fullScreenMode = value
}
</script>
<script lang="ts">
import { afterUpdate, onDestroy } from 'svelte'
import { afterUpdate, onDestroy, onMount } from 'svelte'
import { resizeObserver } from '../resize'
import { closeTooltip, tooltipstore as tooltip } from '../tooltips'
import { modalStore as modals } from '../modals'
@ -66,8 +77,14 @@
visibility: 'hidden',
classList: ''
}
const shouldHideTooltip = (): boolean =>
(fullScreen && document.fullscreenElement == null) || (!fullScreen && document.fullscreenElement != null)
onMount(() => {
if (fullScreen) setFullScreenMode(true)
})
onDestroy(() => {
if (fullScreen) setFullScreenMode(false)
})
const shouldHideTooltip = (): boolean => (fullScreen && !fullScreenMode) || (!fullScreen && fullScreenMode)
const clearStyles = (): void => {
shown = false

View File

@ -109,6 +109,10 @@ export function showPopup (
})
}
closePopupOp()
const anchor = document.activeElement as HTMLElement
const editable = anchor?.isContentEditable || anchor?.tagName === 'INPUT' || anchor?.tagName === 'TEXTAREA'
if (anchor != null && !editable) anchor.blur()
const _element = element instanceof HTMLElement ? getPopupPositionElement(element) : element
const data: Omit<CompAndProps, 'is'> = {
id,

View File

@ -33,7 +33,9 @@
<!-- svelte-ignore a11y-click-events-have-key-events -->
<!-- svelte-ignore a11y-no-static-element-interactions -->
<div class="flex-row-center" on:click>
<Avatar person={value} {size} {icon} name={value.name} on:accent-color {showStatus} />
{#key value._id}
<Avatar person={value} {size} {icon} name={value.name} on:accent-color {showStatus} />
{/key}
<div class="flex-col min-w-0 {size === 'tiny' || size === 'inline' ? 'ml-1' : 'ml-2'}" class:max-w-20={short}>
{#if subtitle}<div class="content-dark-color text-sm">{subtitle}</div>{/if}
<div class="label text-left overflow-label">{getName(client.getHierarchy(), value)}</div>

View File

@ -151,16 +151,25 @@
{#if childExpandable}
<div class="p-1" style:margin-left={`${level * 0.5 + 0.5}rem`}>
{#each vv.topResult ?? [] as r}
<Expandable>
<svelte:fragment slot="title">
<div class="flex-row-center flex-between flex-grow">
Time:{toTime(r.value)}
</div>
</svelte:fragment>
<pre class="select-text">
{JSON.stringify(r, null, 2)}
</pre>
</Expandable>
<FixedColumn key="row-f">
<div class="flex-row-center">
<FixedColumn key="f1">
<div class="p-1">
{Object.entries(r.params)[0][0]}
</div>
</FixedColumn>
<FixedColumn key="f2">
<div class="p-1">
{r.value}
</div>
</FixedColumn>
<FixedColumn key="f3">
<div class="p-1">
{toTime(r.time ?? 0)}
</div>
</FixedColumn>
</div>
</FixedColumn>
{/each}
</div>
{/if}

View File

@ -76,7 +76,7 @@ export async function preparePipeline (
const middlewares: MiddlewareCreator[] = [
TxMiddleware.create, // Store tx into transaction domain
FullTextMiddleware.create('', generateToken(systemAccountUuid, wsIds.uuid)),
FullTextMiddleware.create('', generateToken(systemAccountUuid, wsIds.uuid, { service: 'fulltext' })),
LowLevelMiddleware.create,
QueryJoinMiddleware.create,
DomainFindMiddleware.create,

View File

@ -116,7 +116,14 @@ export class WorkspaceManager {
for (const m of msg) {
const ws = m.id as WorkspaceUuid
const indexer = await this.getIndexer(this.ctx, ws, generateToken(systemAccountUuid, ws), true)
const indexer = await this.getIndexer(
this.ctx,
ws,
generateToken(systemAccountUuid, ws, {
service: 'fulltext'
}),
true
)
await indexer?.fulltext.processDocuments(this.ctx, m.value, control)
}
}
@ -135,7 +142,12 @@ export class WorkspaceManager {
mm.type === QueueWorkspaceEvent.Restored ||
mm.type === QueueWorkspaceEvent.FullReindex
) {
const indexer = await this.getIndexer(this.ctx, ws, generateToken(systemAccountUuid, ws), true)
const indexer = await this.getIndexer(
this.ctx,
ws,
generateToken(systemAccountUuid, ws, { service: 'fulltext' }),
true
)
if (indexer !== undefined) {
await indexer.dropWorkspace() // TODO: Add heartbeat
const classes = await indexer.getIndexClassess()
@ -149,7 +161,7 @@ export class WorkspaceManager {
mm.type === QueueWorkspaceEvent.Archived ||
mm.type === QueueWorkspaceEvent.ClearIndex
) {
const token = generateToken(systemAccountUuid, ws)
const token = generateToken(systemAccountUuid, ws, { service: 'fulltext' })
const workspaceInfo = await this.getWorkspaceInfo(token)
if (workspaceInfo !== undefined) {
if (workspaceInfo.dataId != null) {
@ -158,7 +170,12 @@ export class WorkspaceManager {
await this.fulltextAdapter.clean(this.ctx, workspaceInfo.uuid)
}
} else if (mm.type === QueueWorkspaceEvent.Reindex) {
const indexer = await this.getIndexer(this.ctx, ws, generateToken(systemAccountUuid, ws), true)
const indexer = await this.getIndexer(
this.ctx,
ws,
generateToken(systemAccountUuid, ws, { service: 'fulltext' }),
true
)
const mmd = mm as QueueWorkspaceReindexMessage
await indexer?.reindex(this.ctx, mmd.domain, mmd.classes, control)
}

View File

@ -95,7 +95,7 @@ export class WorkspaceIndexer {
throw new PlatformError(unknownError('Default adapter should be set'))
}
const token = generateToken(systemAccountUuid, workspace.uuid)
const token = generateToken(systemAccountUuid, workspace.uuid, { service: 'fulltext' })
const transactorEndpoint = await endpointProvider(token)
result.fulltext = new FullTextIndexPipeline(

View File

@ -40,6 +40,7 @@ interface RPCClientInfo {
client: ConnectionSocket
session: Session
workspaceId: string
context: MeasureContext
}
const gzipAsync = promisify(gzip)
@ -173,12 +174,12 @@ export function registerRPC (app: Express, sessions: SessionManager, ctx: Measur
})
return
}
transactorRpc = { session: s.session, client: cs, workspaceId: s.workspaceId }
transactorRpc = { session: s.session, client: cs, workspaceId: s.workspaceId, context: s.context }
rpcSessions.set(token, transactorRpc)
}
const rpc = transactorRpc
const rateLimit = await sessions.handleRPC(ctx, rpc.session, rpc.client, async (ctx, rateLimit) => {
const rateLimit = await sessions.handleRPC(rpc.context, rpc.session, rpc.client, async (ctx, rateLimit) => {
await operation(ctx, rpc.session, rateLimit, token)
})
if (rateLimit !== undefined) {

View File

@ -320,7 +320,7 @@ export function startHttpServer (
}
await ctx.with(
'storage upload',
{ workspace: wsIds.uuid },
{},
async (ctx) => {
await externalStorage.put(
ctx,
@ -336,7 +336,7 @@ export function startHttpServer (
})
res.end(JSON.stringify({ success: true }))
},
{ file: name, contentType }
{ file: name, contentType, workspace: wsIds.uuid }
)
} catch (err: any) {
Analytics.handleError(err)

View File

@ -15,6 +15,7 @@ import {
} from '@hcengineering/server-core'
import serverToken, { decodeToken } from '@hcengineering/server-token'
import cors from '@koa/cors'
import type { IncomingHttpHeaders } from 'http'
import Koa from 'koa'
import bodyParser from 'koa-bodyparser'
import Router from 'koa-router'
@ -36,6 +37,14 @@ interface OverviewStatistics {
workspaces: WorkspaceStatistics[]
}
const extractAuthorizationToken = (headers: IncomingHttpHeaders): string | undefined => {
try {
return headers.authorization?.slice(7) ?? undefined
} catch {
return undefined
}
}
/**
* @public
*/
@ -70,7 +79,7 @@ export function serveStats (ctx: MeasureContext, onClose?: () => void): void {
router.get('/api/v1/overview', (req, res) => {
try {
const token = req.query.token as string
const token = (req.query.token as string) ?? extractAuthorizationToken(req.headers)
const payload = decodeToken(token)
const admin = payload.extra?.admin === 'true'
if (!admin) {
@ -98,7 +107,7 @@ export function serveStats (ctx: MeasureContext, onClose?: () => void): void {
const json: Record<string, Omit<ServiceStatistics, 'stats' | 'workspaces'>> = {}
for (const [k, v] of statistics.entries()) {
if (Date.now() - v.lastUpdate > serviceTimeout) {
timeouts.set(v.serviceName, (timeouts.get(v.serviceName) ?? 0) + 1)
timeouts.set(k, (timeouts.get(k) ?? 0) + 1)
toClean.push(k)
continue
}
@ -117,6 +126,7 @@ export function serveStats (ctx: MeasureContext, onClose?: () => void): void {
}
}
for (const k of toClean) {
timeouts.delete(k)
statistics.delete(k)
}
@ -139,7 +149,7 @@ export function serveStats (ctx: MeasureContext, onClose?: () => void): void {
router.get('/api/v1/statistics', (req, res) => {
try {
const token = req.query.token as string
const token = (req.query.token as string) ?? extractAuthorizationToken(req.headers)
const payload = decodeToken(token)
const admin = payload.extra?.admin === 'true'
ctx.info('get stats', { admin, service: req.query.name })
@ -167,9 +177,9 @@ export function serveStats (ctx: MeasureContext, onClose?: () => void): void {
})
router.put('/api/v1/statistics', (req, res) => {
try {
const token = req.query.token as string
const token = (req.query.token as string) ?? extractAuthorizationToken(req.headers)
const payload = decodeToken(token)
const service = payload.extra?.service === 'true'
const service = payload.extra?.service != null
const serviceName = (req.query.name as string) ?? ''
if (service) {
ctx.info('put stats', { service: req.query.name, len: req.request.length })

View File

@ -319,7 +319,7 @@ async function sendEventToService (
method: 'POST',
keepalive: true,
headers: {
Authorization: 'Bearer ' + generateToken(systemAccountUuid, workspace),
Authorization: 'Bearer ' + generateToken(systemAccountUuid, workspace, { service: 'calendar' }),
'Content-Type': 'application/json'
},
body: JSON.stringify({

View File

@ -295,7 +295,7 @@ export function serveAccount (measureCtx: MeasureContext, brandings: BrandingMap
router.put('/api/v1/manage', async (req, res) => {
try {
const token = req.query.token as string
const token = (req.query.token as string) ?? extractToken(req.headers)
const payload = decodeToken(token)
if (payload.extra?.admin !== 'true') {
req.res.writeHead(404, {})
@ -358,7 +358,15 @@ export function serveAccount (measureCtx: MeasureContext, brandings: BrandingMap
host = new URL(origin).host
}
const branding = host !== undefined ? brandings[host] : null
const result = await measureCtx.with(request.method, {}, (mctx) => {
let source = ''
try {
source = (token != null ? decodeToken(token).extra?.service : undefined) ?? '🤦user'
} catch (err) {
// Ignore
}
const result = await measureCtx.with(request.method, { source }, (mctx) => {
if (method === undefined || typeof method !== 'function') {
const response = {
id: request.id,

View File

@ -104,6 +104,7 @@ export interface PostgresDbCollectionOptions<T extends Record<string, any>, K ex
ns?: string
fieldTypes?: Record<string, string>
timestampFields?: Array<keyof T>
withRetryClient?: <R>(callback: (client: Sql) => Promise<R>) => Promise<R>
}
export class PostgresDbCollection<T extends Record<string, any>, K extends keyof T | undefined = undefined>
@ -247,6 +248,15 @@ implements DbCollection<T> {
return res as T
}
async unsafe (sql: string, values: any[], client?: Sql): Promise<any[]> {
if (this.options.withRetryClient !== undefined) {
return await this.options.withRetryClient((_client) => _client.unsafe(sql, values))
} else {
const _client = client ?? this.client
return await _client.unsafe(sql, values)
}
}
async find (query: Query<T>, sort?: Sort<T>, limit?: number, client?: Sql): Promise<T[]> {
const sqlChunks: string[] = [this.buildSelectClause()]
const [whereClause, whereValues] = this.buildWhereClause(query)
@ -264,8 +274,7 @@ implements DbCollection<T> {
}
const finalSql: string = sqlChunks.join(' ')
const _client = client ?? this.client
const result = await _client.unsafe(finalSql, whereValues)
const result = await this.unsafe(finalSql, whereValues, client)
return result.map((row) => this.convertToObj(row))
}
@ -281,8 +290,7 @@ implements DbCollection<T> {
const sql = `INSERT INTO ${this.getTableName()} (${keys.map((k) => `"${k}"`).join(', ')}) VALUES (${keys.map((_, idx) => `$${idx + 1}`).join(', ')}) RETURNING *`
const _client = client ?? this.client
const res: any | undefined = await _client.unsafe(sql, values)
const res: any | undefined = await this.unsafe(sql, values, client)
const idKey = this.idKey
if (idKey === undefined) {
@ -317,8 +325,7 @@ implements DbCollection<T> {
RETURNING *
`
const _client = client ?? this.client
const res: any = await _client.unsafe(sql, values)
const res: any = await this.unsafe(sql, values, client)
const idKey = this.idKey
if (idKey === undefined) {
@ -370,8 +377,7 @@ implements DbCollection<T> {
}
const finalSql = sqlChunks.join(' ')
const _client = client ?? this.client
await _client.unsafe(finalSql, [...updateValues, ...whereValues])
await this.unsafe(finalSql, [...updateValues, ...whereValues], client)
}
async deleteMany (query: Query<T>, client?: Sql): Promise<void> {
@ -383,8 +389,7 @@ implements DbCollection<T> {
}
const finalSql = sqlChunks.join(' ')
const _client = client ?? this.client
await _client.unsafe(finalSql, whereValues)
await this.unsafe(finalSql, whereValues, client)
}
}
@ -393,8 +398,12 @@ export class AccountPostgresDbCollection
implements DbCollection<Account> {
private readonly passwordKeys = ['hash', 'salt']
constructor (client: Sql, ns?: string) {
super('account', client, { idKey: 'uuid', ns })
constructor (
client: Sql,
ns?: string,
withRetryClient?: PostgresDbCollectionOptions<Account, 'uuid'>['withRetryClient']
) {
super('account', client, { idKey: 'uuid', ns, withRetryClient })
}
getPasswordsTableName (): string {
@ -466,6 +475,12 @@ export class AccountPostgresDbCollection
}
export class PostgresAccountDB implements AccountDB {
private readonly retryOptions = {
maxAttempts: 5,
initialDelayMs: 100,
maxDelayMs: 2000
}
readonly wsMembersName = 'workspace_members'
person: PostgresDbCollection<Person, 'uuid'>
@ -485,36 +500,45 @@ export class PostgresAccountDB implements AccountDB {
readonly client: Sql,
readonly ns: string = 'global_account'
) {
this.person = new PostgresDbCollection<Person, 'uuid'>('person', client, { ns, idKey: 'uuid' })
this.account = new AccountPostgresDbCollection(client, ns)
const withRetryClient = this.withRetry
this.person = new PostgresDbCollection<Person, 'uuid'>('person', client, { ns, idKey: 'uuid', withRetryClient })
this.account = new AccountPostgresDbCollection(client, ns, withRetryClient)
this.socialId = new PostgresDbCollection<SocialId, '_id'>('social_id', client, {
ns,
idKey: '_id',
timestampFields: ['createdOn', 'verifiedOn']
timestampFields: ['createdOn', 'verifiedOn'],
withRetryClient
})
this.workspaceStatus = new PostgresDbCollection<WorkspaceStatus>('workspace_status', client, {
ns,
timestampFields: ['lastProcessingTime', 'lastVisit']
timestampFields: ['lastProcessingTime', 'lastVisit'],
withRetryClient
})
this.workspace = new PostgresDbCollection<Workspace, 'uuid'>('workspace', client, {
ns,
idKey: 'uuid',
timestampFields: ['createdOn']
timestampFields: ['createdOn'],
withRetryClient
})
this.accountEvent = new PostgresDbCollection<AccountEvent>('account_events', client, {
ns,
timestampFields: ['time']
timestampFields: ['time'],
withRetryClient
})
this.otp = new PostgresDbCollection<OTP>('otp', client, { ns, timestampFields: ['expiresOn', 'createdOn'] })
this.invite = new PostgresDbCollection<WorkspaceInvite, 'id'>('invite', client, {
ns,
idKey: 'id',
timestampFields: ['expiresOn']
timestampFields: ['expiresOn'],
withRetryClient
})
this.mailbox = new PostgresDbCollection<Mailbox, 'mailbox'>('mailbox', client, { ns, withRetryClient })
this.mailboxSecret = new PostgresDbCollection<MailboxSecret>('mailbox_secrets', client, { ns, withRetryClient })
this.integration = new PostgresDbCollection<Integration>('integrations', client, { ns, withRetryClient })
this.integrationSecret = new PostgresDbCollection<IntegrationSecret>('integration_secrets', client, {
ns,
withRetryClient
})
this.mailbox = new PostgresDbCollection<Mailbox, 'mailbox'>('mailbox', client, { ns })
this.mailboxSecret = new PostgresDbCollection<MailboxSecret>('mailbox_secrets', client, { ns })
this.integration = new PostgresDbCollection<Integration>('integrations', client, { ns })
this.integrationSecret = new PostgresDbCollection<IntegrationSecret>('integration_secrets', client, { ns })
}
getWsMembersTableName (): string {
@ -672,6 +696,37 @@ export class PostgresAccountDB implements AccountDB {
}
}
withRetry = async <T>(callback: (client: Sql) => Promise<T>): Promise<T> => {
let attempt = 0
let delay = this.retryOptions.initialDelayMs
while (true) {
try {
return (await this.client.begin(callback)) as T
} catch (err: any) {
attempt++
if (!this.isRetryableError(err) || attempt >= this.retryOptions.maxAttempts) {
throw err
}
await new Promise((resolve) => setTimeout(resolve, delay))
delay = Math.min(delay * 2, this.retryOptions.maxDelayMs)
}
}
}
private isRetryableError (err: any): boolean {
const msg: string = err?.message ?? ''
return (
err.code === '40001' || // Retry transaction
err.code === '55P03' || // Lock not available
msg.includes('RETRY_SERIALIZABLE')
)
}
async createWorkspace (data: WorkspaceData, status: WorkspaceStatusData): Promise<WorkspaceUuid> {
return await this.client.begin(async (client) => {
const workspaceUuid = await this.workspace.insertOne(data, client)

View File

@ -39,6 +39,7 @@ import type {
IntegrationKey,
IntegrationSecret,
IntegrationSecretKey,
Query,
SocialId,
Workspace,
WorkspaceEvent,
@ -294,8 +295,10 @@ export async function updateWorkspaceInfo (
}
progress = Math.round(progress)
const ts = Date.now()
const update: Partial<WorkspaceStatus> = {}
const wsUpdate: Partial<Workspace> = {}
const query: Query<WorkspaceStatus> = { workspaceUuid: workspace.uuid }
switch (event) {
case 'create-started':
update.mode = 'creating'
@ -330,6 +333,7 @@ export async function updateWorkspaceInfo (
break
case 'progress':
update.processingProgress = progress
query.processingProgress = { $lte: progress }
break
case 'migrate-backup-started':
update.mode = 'migration-backup'
@ -382,6 +386,7 @@ export async function updateWorkspaceInfo (
break
case 'ping':
default:
query.lastProcessingTime = { $lte: ts }
break
}
@ -389,13 +394,10 @@ export async function updateWorkspaceInfo (
update.processingMessage = message
}
await db.workspaceStatus.updateOne(
{ workspaceUuid: workspace.uuid },
{
lastProcessingTime: Date.now(), // Some operations override it.
...update
}
)
await db.workspaceStatus.updateOne(query, {
lastProcessingTime: ts, // Some operations override it.
...update
})
if (Object.keys(wsUpdate).length !== 0) {
await db.workspace.updateOne({ uuid: workspace.uuid }, wsUpdate)

View File

@ -34,7 +34,7 @@ import {
} from '@hcengineering/core'
import { getMongoClient } from '@hcengineering/mongo' // TODO: get rid of this import later
import platform, { getMetadata, PlatformError, Severity, Status, translate } from '@hcengineering/platform'
import { getDBClient } from '@hcengineering/postgres'
import { getDBClient, setDBExtraOptions } from '@hcengineering/postgres'
import { pbkdf2Sync, randomBytes } from 'crypto'
import otpGenerator from 'otp-generator'
@ -82,6 +82,11 @@ export async function getAccountDB (uri: string, dbNs?: string): Promise<[Accoun
}
]
} else {
setDBExtraOptions({
connection: {
application_name: 'account'
}
})
const client = getDBClient(sharedPipelineContextVars, uri)
const pgClient = await client.getClient()
const pgAccount = new PostgresAccountDB(pgClient, dbNs ?? 'global_account')

View File

@ -36,30 +36,35 @@ export class AuthenticationExtension implements Extension {
const ctx = this.configuration.ctx
const { workspaceId } = decodeDocumentId(data.documentName)
return await ctx.with('authenticate', { workspaceId }, async () => {
const token = decodeToken(data.token)
const readonly = isGuest(token)
return await ctx.with(
'authenticate',
{},
async () => {
const token = decodeToken(data.token)
const readonly = isGuest(token)
ctx.info('authenticate', {
workspaceId,
account: token.account,
mode: token.extra?.mode ?? '',
readonly
})
ctx.info('authenticate', {
workspaceId,
account: token.account,
mode: token.extra?.mode ?? '',
readonly
})
if (readonly) {
data.connection.readOnly = true
}
if (readonly) {
data.connection.readOnly = true
}
// verify workspace can be accessed with the token
const ids = await getWorkspaceIds(data.token)
// verify workspace can be accessed with the token
const ids = await getWorkspaceIds(data.token)
// verify workspace uuid in the document matches the token
if (ids.uuid !== workspaceId) {
throw new Error('documentName must include workspace id')
}
// verify workspace uuid in the document matches the token
if (ids.uuid !== workspaceId) {
throw new Error('documentName must include workspace id')
}
return buildContext(data, ids)
})
return buildContext(data, ids)
},
{ workspaceId }
)
}
}

View File

@ -186,17 +186,24 @@ export async function start (ctx: MeasureContext, config: Config, storageAdapter
const context = await getContext(rawToken, token)
rpcCtx.info('rpc', { method: request.method, connectionId: context.connectionId, mode: token.extra?.mode ?? '' })
await rpcCtx.with('/rpc', { method: request.method }, async (ctx) => {
try {
const response: RpcResponse = await rpcCtx.with(request.method, {}, (ctx) => {
return method(ctx, context, documentId, request.payload, { hocuspocus, storageAdapter, transformer })
})
res.status(200).send(response)
} catch (err: any) {
Analytics.handleError(err)
res.status(500).send({ error: err.message })
await rpcCtx.with(
'/rpc',
{
source: token.extra?.service ?? '🤦user',
method: request.method
},
async (ctx) => {
try {
const response: RpcResponse = await rpcCtx.with(request.method, {}, (ctx) => {
return method(ctx, context, documentId, request.payload, { hocuspocus, storageAdapter, transformer })
})
res.status(200).send(response)
} catch (err: any) {
Analytics.handleError(err)
res.status(500).send({ error: err.message })
}
}
})
)
})
const wss = new WebSocketServer({

View File

@ -45,7 +45,7 @@ export async function createPipeline (
constructors: MiddlewareCreator[],
context: PipelineContext
): Promise<Pipeline> {
return await PipelineImpl.create(ctx.newChild('pipeline-operations', {}), constructors, context)
return await PipelineImpl.create(ctx, constructors, context)
}
class PipelineImpl implements Pipeline {

View File

@ -100,13 +100,14 @@ export function initStatisticsContext (
let errorToSend = 0
if (metricsFile !== undefined || ops?.logConsole === true || statsUrl !== undefined) {
metricsContext.info('using stats url', { statsUrl, service: serviceName ?? '' })
if (metricsFile !== undefined) {
console.info('storing measurements into local file', metricsFile)
}
let oldMetricsValue = ''
const serviceId = encodeURIComponent(os.hostname() + '-' + serviceName)
let prev: Promise<void> | undefined
let prev: Promise<void> | Promise<any> | undefined
const handleError = (err: any): void => {
errorToSend++
if (errorToSend % 2 === 0) {
@ -138,7 +139,7 @@ export function initStatisticsContext (
return
}
if (statsUrl !== undefined) {
const token = generateToken(systemAccountUuid, undefined, { service: 'true' })
const token = generateToken(systemAccountUuid, undefined, { service: serviceName })
const data: ServiceStatistics = {
serviceName: ops?.serviceName?.() ?? serviceName,
cpu: getCPUInfo(),
@ -149,20 +150,18 @@ export function initStatisticsContext (
const statData = JSON.stringify(data)
prev = fetch(
concatLink(statsUrl, '/api/v1/statistics') + `/?token=${encodeURIComponent(token)}&name=${serviceId}`,
{
method: 'PUT',
headers: {
'Content-Type': 'application/json'
},
body: statData
}
)
.catch(handleError)
.then(() => {
prev = fetch(concatLink(statsUrl, '/api/v1/statistics') + `/?name=${serviceId}`, {
method: 'PUT',
headers: {
'Content-Type': 'application/json',
authorization: `Bearer ${token}`
},
body: statData
})
.finally(() => {
prev = undefined
})
.catch(handleError)
}
} catch (err: any) {
handleError(err)

View File

@ -563,6 +563,8 @@ export interface ClientSessionCtx {
*/
export interface Session {
workspace: WorkspaceIds
token: Token
createTime: number
// Session restore information

View File

@ -169,7 +169,8 @@ export class SessionDataImpl implements SessionData {
_removedMap: Map<Ref<Doc>, Doc> | undefined,
_contextCache: Map<string, any> | undefined,
readonly modelDb: ModelDb,
readonly socialStringsToUsers: Map<PersonId, AccountUuid>
readonly socialStringsToUsers: Map<PersonId, AccountUuid>,
readonly service: string
) {
this._removedMap = _removedMap
this._contextCache = _contextCache
@ -246,7 +247,8 @@ export function wrapPipeline (
undefined,
undefined,
pipeline.context.modelDb,
new Map()
new Map(),
'transactor'
)
ctx.contextData = contextData
if (pipeline.context.lowLevelStorage === undefined) {

View File

@ -57,9 +57,9 @@ async function storageUpload (
const data = file.tempFilePath !== undefined ? fs.createReadStream(file.tempFilePath) : file.data
const resp = await ctx.with(
'storage upload',
{ workspace: wsIds.uuid },
{},
(ctx) => storageAdapter.put(ctx, wsIds, uuid, data, file.mimetype, file.size),
{ file: file.name, contentType: file.mimetype }
{ file: file.name, contentType: file.mimetype, workspace: wsIds.uuid }
)
ctx.info('storage upload', resp)
@ -522,15 +522,18 @@ export function start (
const range = req.headers.range
if (range !== undefined) {
await ctx.with('file-range', { workspace: wsIds.uuid }, (ctx) =>
getFileRange(ctx, blobInfo as PlatformBlob, range, config.storageAdapter, wsIds, res)
await ctx.with(
'file-range',
{},
(ctx) => getFileRange(ctx, blobInfo as PlatformBlob, range, config.storageAdapter, wsIds, res),
{ workspace: wsIds.uuid }
)
} else {
await ctx.with(
'file',
{ workspace: wsIds.uuid },
{},
(ctx) => getFile(ctx, blobInfo as PlatformBlob, config.storageAdapter, wsIds, req, res),
{ uuid }
{ uuid, workspace: wsIds.uuid }
)
}
} catch (error: any) {

View File

@ -563,7 +563,8 @@ export class FullTextIndexPipeline implements FullTextPipeline {
undefined,
undefined,
this.model,
new Map()
new Map(),
'fulltext'
)
}

View File

@ -37,7 +37,7 @@ export class ContextNameMiddleware extends BaseMiddleware implements Middleware
return new ContextNameMiddleware(context, next)
}
async tx (ctx: MeasureContext, txes: Tx[]): Promise<TxMiddlewareResult> {
async tx (ctx: MeasureContext<SessionData>, txes: Tx[]): Promise<TxMiddlewareResult> {
let measureName: string | undefined
const tx = txes.find((it) => it._class === core.class.TxApplyIf)
@ -53,10 +53,10 @@ export class ContextNameMiddleware extends BaseMiddleware implements Middleware
const result = await ctx.with(
measureName !== undefined ? `📶 ${measureName}` : 'client-tx',
{ _class: tx?._class },
{ source: ctx.contextData.service },
(ctx) => {
;({ opLogMetrics, op } = registerOperationLog(ctx))
return this.provideTx(ctx as MeasureContext<SessionData>, txes)
return this.provideTx(ctx, txes)
}
)
updateOperationLog(opLogMetrics, op)

View File

@ -22,6 +22,7 @@ import {
type FindResult,
type MeasureContext,
type Ref,
type SessionData,
DOMAIN_MODEL
} from '@hcengineering/core'
import { PlatformError, unknownError } from '@hcengineering/platform'
@ -51,7 +52,7 @@ export class DomainFindMiddleware extends BaseMiddleware implements Middleware {
}
findAll<T extends Doc>(
ctx: MeasureContext,
ctx: MeasureContext<SessionData>,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: ServerFindOptions<T>
@ -67,7 +68,7 @@ export class DomainFindMiddleware extends BaseMiddleware implements Middleware {
}
return ctx.with(
p + '-find-all',
{ _class },
{ source: ctx.contextData?.service ?? 'system', _class },
(ctx) => {
return this.adapterManager.getAdapter(domain, false).findAll(ctx, _class, query, options)
},

View File

@ -226,7 +226,8 @@ export class TriggersMiddleware extends BaseMiddleware implements Middleware {
sctx.removedMap,
sctx.contextCache,
this.context.modelDb,
sctx.socialStringsToUsers
sctx.socialStringsToUsers,
sctx.service
)
ctx.contextData = asyncContextData
const aresult = await this.triggers.apply(

View File

@ -247,6 +247,9 @@ describe('postgres operations', () => {
const sortDesc = await client.findAll(taskPlugin.class.Task, {}, { sort: { name: SortingOrder.Descending } })
expect(sortDesc[0].name).toMatch('my-task-4')
const sortEmpty = await client.findAll(taskPlugin.class.Task, {}, { sort: {} })
expect(sortEmpty).toHaveLength(5)
})
it('check attached', async () => {

View File

@ -522,7 +522,11 @@ abstract class PostgresAdapterBase implements DbAdapter {
// todo handle custom sorting
}
}
return `ORDER BY ${res.join(', ')}`
if (res.length > 0) {
return `ORDER BY ${res.join(', ')}`
} else {
return ''
}
}
buildRawQuery<T extends Doc>(
@ -1170,7 +1174,11 @@ abstract class PostgresAdapterBase implements DbAdapter {
// todo handle custom sorting
}
}
return `ORDER BY ${res.join(', ')}`
if (res.length > 0) {
return `ORDER BY ${res.join(', ')}`
} else {
return ''
}
}
private buildQuery<T extends Doc>(

View File

@ -133,7 +133,12 @@ export function createServerPipeline (
TxMiddleware.create, // Store tx into transaction domain
...(opt.disableTriggers === true ? [] : [TriggersMiddleware.create]),
...(opt.fulltextUrl !== undefined
? [FullTextMiddleware.create(opt.fulltextUrl, generateToken(systemAccountUuid, workspace.uuid))]
? [
FullTextMiddleware.create(
opt.fulltextUrl,
generateToken(systemAccountUuid, workspace.uuid, { service: 'transactor' })
)
]
: []),
LowLevelMiddleware.create,
QueryJoinMiddleware.create,

View File

@ -95,7 +95,7 @@ export class ClientSession implements Session {
isAdmin: boolean
constructor (
protected readonly token: Token,
readonly token: Token,
readonly workspace: WorkspaceIds,
readonly account: Account,
readonly info: LoginInfoWithWorkspaces,
@ -164,7 +164,8 @@ export class ClientSession implements Session {
undefined,
undefined,
ctx.pipeline.context.modelDb,
ctx.socialStringsToUsers
ctx.socialStringsToUsers,
this.token.extra?.service ?? '🤦user'
)
ctx.ctx.contextData = contextData
}

View File

@ -121,6 +121,7 @@ export class TSessionManager implements SessionManager {
now: number = Date.now()
ticksContext: MeasureContext
constructor (
readonly ctx: MeasureContext,
readonly timeouts: Timeouts,
@ -145,6 +146,8 @@ export class TSessionManager implements SessionManager {
}
this.workspaceProducer = this.queue.getProducer(ctx.newChild('queue', {}), QueueTopic.Workspace)
this.usersProducer = this.queue.getProducer(ctx.newChild('queue', {}), QueueTopic.Users)
this.ticksContext = ctx.newChild('ticks', {})
}
scheduleMaintenance (timeMinutes: number): void {
@ -218,7 +221,7 @@ export class TSessionManager implements SessionManager {
break
}
}
void this.getWorkspaceInfo(workspace.token, connected).catch(() => {
void this.getWorkspaceInfo(this.ticksContext, workspace.token, connected).catch(() => {
// Ignore
})
} catch (err: any) {
@ -280,7 +283,7 @@ export class TSessionManager implements SessionManager {
this.ctx.warn('session hang, closing...', { wsId, user: s.session.getUser() })
// Force close workspace if only one client and it hang.
void this.close(this.ctx, s.socket, wsId).catch((err) => {
void this.close(this.ticksContext, s.socket, wsId).catch((err) => {
this.ctx.error('failed to close', err)
})
continue
@ -293,7 +296,7 @@ export class TSessionManager implements SessionManager {
// And ping other wize
s.session.lastPing = now
if (s.socket.checkState()) {
void s.socket.send(this.ctx, { result: pingConst }, s.session.binaryMode, s.session.useCompression)
void s.socket.send(this.ticksContext, { result: pingConst }, s.session.binaryMode, s.session.useCompression)
}
}
for (const r of s.session.requests.values()) {
@ -343,7 +346,12 @@ export class TSessionManager implements SessionManager {
)
}
async getWorkspaceInfo (token: string, updateLastVisit = true): Promise<WorkspaceInfoWithStatus | undefined> {
@withContext('🧭 get-workspace-info')
async getWorkspaceInfo (
ctx: MeasureContext,
token: string,
updateLastVisit = true
): Promise<WorkspaceInfoWithStatus | undefined> {
try {
return await getAccountClient(this.accountsUrl, token).getWorkspaceInfo(updateLastVisit)
} catch (err: any) {
@ -354,7 +362,8 @@ export class TSessionManager implements SessionManager {
}
}
async getLoginWithWorkspaceInfo (token: string): Promise<LoginInfoWithWorkspaces | undefined> {
@withContext('🧭 get-login-with-workspace-info')
async getLoginWithWorkspaceInfo (ctx: MeasureContext, token: string): Promise<LoginInfoWithWorkspaces | undefined> {
try {
const accountClient = getAccountClient(this.accountsUrl, token)
return await accountClient.getLoginWithWorkspaceInfo()
@ -374,6 +383,7 @@ export class TSessionManager implements SessionManager {
tickCounter = 0
@withContext('🧭 get-workspace')
async getWorkspace (
ctx: MeasureContext,
workspaceUuid: WorkspaceUuid,
@ -480,7 +490,6 @@ export class TSessionManager implements SessionManager {
return { workspace }
}
@withContext('📲 add-session')
async addSession (
ctx: MeasureContext,
ws: ConnectionSocket,
@ -488,92 +497,94 @@ export class TSessionManager implements SessionManager {
rawToken: string,
sessionId: string | undefined
): Promise<AddSessionResponse> {
let account: LoginInfoWithWorkspaces | undefined
return await ctx.with('📲 add-session', { source: token.extra?.service ?? '🤦user' }, async (ctx) => {
let account: LoginInfoWithWorkspaces | undefined
try {
account = await this.getLoginWithWorkspaceInfo(rawToken)
} catch (err: any) {
return { error: err }
}
try {
account = await this.getLoginWithWorkspaceInfo(ctx, rawToken)
} catch (err: any) {
return { error: err }
}
if (account === undefined) {
return { error: new Error('Account not found or not available'), terminate: true }
}
if (account === undefined) {
return { error: new Error('Account not found or not available'), terminate: true }
}
let wsInfo = account.workspaces[token.workspace]
let wsInfo = account.workspaces[token.workspace]
if (wsInfo === undefined) {
// In case of guest or system account
// We need to get workspace info for system account.
const workspaceInfo = await this.getWorkspaceInfo(rawToken, false)
if (workspaceInfo === undefined) {
if (wsInfo === undefined) {
// In case of guest or system account
// We need to get workspace info for system account.
const workspaceInfo = await this.getWorkspaceInfo(ctx, rawToken, false)
if (workspaceInfo === undefined) {
return { error: new Error('Workspace not found or not available'), terminate: true }
}
wsInfo = {
url: workspaceInfo.url,
mode: workspaceInfo.mode,
dataId: workspaceInfo.dataId,
version: {
versionMajor: workspaceInfo.versionMajor,
versionMinor: workspaceInfo.versionMinor,
versionPatch: workspaceInfo.versionPatch
},
role: AccountRole.Owner,
endpoint: { externalUrl: '', internalUrl: '', region: workspaceInfo.region ?? '' },
progress: workspaceInfo.processingProgress
}
}
const { workspace, resp } = await this.getWorkspace(ctx.parent ?? ctx, token.workspace, wsInfo, token, ws)
if (resp !== undefined) {
return resp
}
if (workspace === undefined || account === undefined) {
// Should not happen
return { error: new Error('Workspace not found or not available'), terminate: true }
}
wsInfo = {
url: workspaceInfo.url,
mode: workspaceInfo.mode,
dataId: workspaceInfo.dataId,
version: {
versionMajor: workspaceInfo.versionMajor,
versionMinor: workspaceInfo.versionMinor,
versionPatch: workspaceInfo.versionPatch
},
role: AccountRole.Owner,
endpoint: { externalUrl: '', internalUrl: '', region: workspaceInfo.region ?? '' },
progress: workspaceInfo.processingProgress
const oldSession = sessionId !== undefined ? workspace.sessions?.get(sessionId) : undefined
if (oldSession !== undefined) {
// Just close old socket for old session id.
await this.close(ctx, oldSession.socket, workspace.wsId.uuid)
}
}
const { workspace, resp } = await this.getWorkspace(ctx, token.workspace, wsInfo, token, ws)
if (resp !== undefined) {
return resp
}
if (workspace === undefined || account === undefined) {
// Should not happen
return { error: new Error('Workspace not found or not available'), terminate: true }
}
const session = this.createSession(token, workspace.wsId, account)
const oldSession = sessionId !== undefined ? workspace.sessions?.get(sessionId) : undefined
if (oldSession !== undefined) {
// Just close old socket for old session id.
await this.close(ctx, oldSession.socket, workspace.wsId.uuid)
}
session.sessionId = sessionId !== undefined && (sessionId ?? '').trim().length > 0 ? sessionId : generateId()
session.sessionInstanceId = generateId()
const tickHash = this.tickCounter % ticksPerSecond
const session = this.createSession(token, workspace.wsId, account)
this.sessions.set(ws.id, { session, socket: ws, tickHash })
// We need to delete previous session with Id if found.
this.tickCounter++
workspace.sessions.set(session.sessionId, { session, socket: ws, tickHash })
session.sessionId = sessionId !== undefined && (sessionId ?? '').trim().length > 0 ? sessionId : generateId()
session.sessionInstanceId = generateId()
const tickHash = this.tickCounter % ticksPerSecond
const accountUuid = account.account
if (accountUuid !== systemAccountUuid && accountUuid !== guestAccount) {
await this.usersProducer.send(workspace.wsId.uuid, [
userEvents.login({
user: accountUuid,
sessions: this.countUserSessions(workspace, accountUuid),
socialIds: account.socialIds.map((it) => it._id)
})
])
}
this.sessions.set(ws.id, { session, socket: ws, tickHash })
// We need to delete previous session with Id if found.
this.tickCounter++
workspace.sessions.set(session.sessionId, { session, socket: ws, tickHash })
// Mark workspace as init completed and we had at least one client.
if (!workspace.workspaceInitCompleted) {
workspace.workspaceInitCompleted = true
}
const accountUuid = account.account
if (accountUuid !== systemAccountUuid && accountUuid !== guestAccount) {
await this.usersProducer.send(workspace.wsId.uuid, [
userEvents.login({
user: accountUuid,
sessions: this.countUserSessions(workspace, accountUuid),
socialIds: account.socialIds.map((it) => it._id)
})
])
}
// Mark workspace as init completed and we had at least one client.
if (!workspace.workspaceInitCompleted) {
workspace.workspaceInitCompleted = true
}
if (this.timeMinutes > 0) {
void ws
.send(ctx, { result: this.createMaintenanceWarning() }, session.binaryMode, session.useCompression)
.catch((err) => {
ctx.error('failed to send maintenance warning', err)
})
}
return { session, context: workspace.context, workspaceId: workspace.wsId.uuid }
if (this.timeMinutes > 0) {
void ws
.send(ctx, { result: this.createMaintenanceWarning() }, session.binaryMode, session.useCompression)
.catch((err) => {
ctx.error('failed to send maintenance warning', err)
})
}
return { session, context: workspace.context, workspaceId: workspace.wsId.uuid }
})
}
private async switchToUpgradeSession (
@ -744,7 +755,7 @@ export class TSessionManager implements SessionManager {
}
const workspace: Workspace = new Workspace(
context,
generateToken(systemAccountUuid, token.workspace),
generateToken(systemAccountUuid, token.workspace, { service: 'transactor' }),
factory,
this.tickCounter % ticksPerSecond,
workspaceSoftShutdownTicks,
@ -1077,14 +1088,17 @@ export class TSessionManager implements SessionManager {
() => Date.now()
)
handleRequest<S extends Session>(
async handleRequest<S extends Session>(
requestCtx: MeasureContext,
service: S,
ws: ConnectionSocket,
request: Request<any>,
workspaceId: WorkspaceUuid
): Promise<void> {
const userCtx = requestCtx.newChild('📞 client', {})
const userCtx = requestCtx.newChild('📞 client', {
source: service.token.extra?.service ?? '🤦user',
mode: '🧭 handleRequest'
})
const rateLimit = this.limitter.checkRateLimit(service.getUser())
// If remaining is 0, rate limit is exceeded
if (rateLimit?.remaining === 0) {
@ -1098,104 +1112,102 @@ export class TSessionManager implements SessionManager {
service.binaryMode,
service.useCompression
)
return Promise.resolve()
return
}
// Calculate total number of clients
const reqId = generateId()
const st = Date.now()
return userCtx
.with('🧭 handleRequest', {}, async (ctx) => {
if (request.time != null) {
const delta = Date.now() - request.time
requestCtx.measure('msg-receive-delta', delta)
}
const workspace = this.workspaces.get(workspaceId)
if (workspace === undefined || workspace.closing !== undefined) {
await ws.send(
ctx,
{
id: request.id,
error: unknownError('Workspace is closing')
},
service.binaryMode,
service.useCompression
)
return
}
if (request.id === -1 && request.method === 'hello') {
await this.handleHello<S>(request, service, ctx, workspace, ws, requestCtx)
return
}
if (request.id === -2 && request.method === 'forceClose') {
// TODO: we chould allow this only for admin or system accounts
let done = false
const wsRef = this.workspaces.get(workspaceId)
if (wsRef?.upgrade ?? false) {
done = true
this.ctx.warn('FORCE CLOSE', { workspace: workspaceId })
// In case of upgrade, we need to force close workspace not in interval handler
await this.forceClose(workspaceId, ws)
}
const forceCloseResponse: Response<any> = {
try {
if (request.time != null) {
const delta = Date.now() - request.time
requestCtx.measure('msg-receive-delta', delta)
}
const workspace = this.workspaces.get(workspaceId)
if (workspace === undefined || workspace.closing !== undefined) {
await ws.send(
userCtx,
{
id: request.id,
result: done
}
await ws.send(ctx, forceCloseResponse, service.binaryMode, service.useCompression)
return
error: unknownError('Workspace is closing')
},
service.binaryMode,
service.useCompression
)
return
}
if (request.id === -1 && request.method === 'hello') {
await this.handleHello<S>(request, service, userCtx, workspace, ws, requestCtx)
return
}
if (request.id === -2 && request.method === 'forceClose') {
// TODO: we chould allow this only for admin or system accounts
let done = false
const wsRef = this.workspaces.get(workspaceId)
if (wsRef?.upgrade ?? false) {
done = true
this.ctx.warn('FORCE CLOSE', { workspace: workspaceId })
// In case of upgrade, we need to force close workspace not in interval handler
await this.forceClose(workspaceId, ws)
}
const forceCloseResponse: Response<any> = {
id: request.id,
result: done
}
await ws.send(userCtx, forceCloseResponse, service.binaryMode, service.useCompression)
return
}
service.requests.set(reqId, {
id: reqId,
params: request,
start: st
})
if (request.id === -1 && request.method === '#upgrade') {
ws.close()
return
}
const f = (service as any)[request.method]
try {
const params = [...request.params]
if (ws.isBackpressure()) {
await ws.backpressure(userCtx)
}
service.requests.set(reqId, {
id: reqId,
params: request,
start: st
})
if (request.id === -1 && request.method === '#upgrade') {
ws.close()
return
}
const f = (service as any)[request.method]
try {
const params = [...request.params]
if (ws.isBackpressure()) {
await ws.backpressure(ctx)
}
await workspace.with(async (pipeline, communicationApi) => {
await ctx.with('🧨 process', {}, (callTx) =>
f.apply(service, [
this.createOpContext(callTx, userCtx, pipeline, communicationApi, request.id, service, ws, rateLimit),
...params
])
)
})
} catch (err: any) {
Analytics.handleError(err)
if (LOGGING_ENABLED) {
this.ctx.error('error handle request', { error: err, request })
}
await ws.send(
userCtx,
{
id: request.id,
error: unknownError(err),
result: JSON.parse(JSON.stringify(err?.stack))
},
service.binaryMode,
service.useCompression
await workspace.with(async (pipeline, communicationApi) => {
await userCtx.with('🧨 process', {}, (callTx) =>
f.apply(service, [
this.createOpContext(callTx, userCtx, pipeline, communicationApi, request.id, service, ws, rateLimit),
...params
])
)
})
} catch (err: any) {
Analytics.handleError(err)
if (LOGGING_ENABLED) {
this.ctx.error('error handle request', { error: err, request })
}
})
.finally(() => {
userCtx.end()
service.requests.delete(reqId)
})
await ws.send(
userCtx,
{
id: request.id,
error: unknownError(err),
result: JSON.parse(JSON.stringify(err?.stack))
},
service.binaryMode,
service.useCompression
)
}
} finally {
userCtx.end()
service.requests.delete(reqId)
}
}
handleRPC<S extends Session>(
async handleRPC<S extends Session>(
requestCtx: MeasureContext,
service: S,
ws: ConnectionSocket,
@ -1204,65 +1216,66 @@ export class TSessionManager implements SessionManager {
const rateLimitStatus = this.limitter.checkRateLimit(service.getUser())
// If remaining is 0, rate limit is exceeded
if (rateLimitStatus?.remaining === 0) {
return Promise.resolve(rateLimitStatus)
return await Promise.resolve(rateLimitStatus)
}
const userCtx = requestCtx.newChild('📞 client', {})
const userCtx = requestCtx.newChild('📞 client', {
source: service.token.extra?.service ?? '🤦user',
mode: '🧭 handleRPC'
})
// Calculate total number of clients
const reqId = generateId()
const st = Date.now()
return userCtx
.with('🧭 handleRPC', {}, async (ctx) => {
const workspace = this.workspaces.get(service.workspace.uuid)
if (workspace === undefined || workspace.closing !== undefined) {
throw new Error('Workspace is closing')
}
try {
const workspace = this.workspaces.get(service.workspace.uuid)
if (workspace === undefined || workspace.closing !== undefined) {
throw new Error('Workspace is closing')
}
service.requests.set(reqId, {
id: reqId,
params: {},
start: st
})
service.requests.set(reqId, {
id: reqId,
params: {},
start: st
})
try {
await workspace.with(async (pipeline, communicationApi) => {
const uctx = this.createOpContext(
ctx,
userCtx,
pipeline,
communicationApi,
reqId,
service,
ws,
rateLimitStatus
)
await operation(uctx)
})
} catch (err: any) {
Analytics.handleError(err)
if (LOGGING_ENABLED) {
this.ctx.error('error handle request', { error: err })
}
await ws.send(
try {
await workspace.with(async (pipeline, communicationApi) => {
const uctx = this.createOpContext(
userCtx,
{
id: reqId,
error: unknownError(err),
result: JSON.parse(JSON.stringify(err?.stack))
},
service.binaryMode,
service.useCompression
userCtx,
pipeline,
communicationApi,
reqId,
service,
ws,
rateLimitStatus
)
throw err
await operation(uctx)
})
} catch (err: any) {
Analytics.handleError(err)
if (LOGGING_ENABLED) {
this.ctx.error('error handle request', { error: err })
}
return undefined
})
.finally(() => {
userCtx.end()
service.requests.delete(reqId)
})
await ws.send(
userCtx,
{
id: reqId,
error: unknownError(err),
result: JSON.parse(JSON.stringify(err?.stack))
},
service.binaryMode,
service.useCompression
)
throw err
}
return undefined
} finally {
userCtx.end()
service.requests.delete(reqId)
}
}
entryToUserStats = (session: Session, socket: ConnectionSocket): UserStatistics => {

View File

@ -322,7 +322,8 @@ export async function upgradeWorkspaceWith (
undefined,
undefined,
pipeline.context.modelDb,
new Map()
new Map(),
'workspace'
)
ctx.contextData = contextData
await handleWsEvent?.('upgrade-started', version, 0)

View File

@ -63,7 +63,10 @@ export const start = async (): Promise<void> => {
ctx.info('AI person uuid', { personUuid })
const storage = await getDbStorage()
const socialIds: SocialId[] = await getAccountClient(config.AccountsURL, generateToken(personUuid)).getSocialIds()
const socialIds: SocialId[] = await getAccountClient(
config.AccountsURL,
generateToken(personUuid, undefined, { service: 'aibot' })
).getSocialIds()
const aiControl = new AIControl(personUuid, socialIds, storage, ctx)

View File

@ -133,7 +133,7 @@ export class Collector {
return true
}
const rawToken = generateToken(token.account, token.workspace, token.extra)
const rawToken = generateToken(token.account, token.workspace, { ...token.extra, service: 'analytics-collector' })
const wsInfo = await getAccountClient(config.AccountsUrl, rawToken).getWorkspaceInfo()
this.ctx.info('workspace info', wsInfo)

View File

@ -24,7 +24,7 @@ export async function getClient (
workspaceUuid: WorkspaceUuid,
socialId?: PersonId
): Promise<{ client: TxOperations, accountClient: AccountClient }> {
const token = generateToken(systemAccountUuid, workspaceUuid)
const token = generateToken(systemAccountUuid, workspaceUuid, { service: 'calendar-mailer' })
let accountClient = getAccountClient(config.accountsUrl, token)
if (socialId !== undefined && socialId !== core.account.System) {
@ -32,7 +32,7 @@ export async function getClient (
if (personUuid === undefined) {
throw new Error('Global person not found')
}
const token = generateToken(personUuid, workspaceUuid)
const token = generateToken(personUuid, workspaceUuid, { service: 'calendar-mailer' })
accountClient = getAccountClient(config.accountsUrl, token)
}

View File

@ -55,17 +55,22 @@ export class AuthController {
state: State,
code: string
): Promise<void> {
await ctx.with('Create auth controller', { workspace: state.workspace, user: state.userId }, async () => {
const mutex = await lock(`${state.workspace}:${state.userId}`)
try {
const client = await getClient(getWorkspaceToken(state.workspace))
const txOp = new TxOperations(client, core.account.System)
const controller = new AuthController(ctx, accountClient, txOp, state)
await controller.process(code)
} finally {
mutex()
}
})
await ctx.with(
'Create auth controller',
{},
async () => {
const mutex = await lock(`${state.workspace}:${state.userId}`)
try {
const client = await getClient(getWorkspaceToken(state.workspace))
const txOp = new TxOperations(client, core.account.System)
const controller = new AuthController(ctx, accountClient, txOp, state)
await controller.process(code)
} finally {
mutex()
}
},
{ workspace: state.workspace, user: state.userId }
)
}
static async signout (
@ -75,22 +80,27 @@ export class AuthController {
workspace: WorkspaceUuid,
value: GoogleEmail
): Promise<void> {
await ctx.with('Signout auth controller', { workspace, userId }, async () => {
const mutex = await lock(`${workspace}:${userId}`)
try {
const client = await getClient(getWorkspaceToken(workspace))
const txOp = new TxOperations(client, core.account.System)
const controller = new AuthController(ctx, accountClient, txOp, {
userId,
workspace
})
await controller.signout(value)
} catch (err) {
ctx.error('signout', { workspace, userId, err })
} finally {
mutex()
}
})
await ctx.with(
'Signout auth controller',
{},
async () => {
const mutex = await lock(`${workspace}:${userId}`)
try {
const client = await getClient(getWorkspaceToken(workspace))
const txOp = new TxOperations(client, core.account.System)
const controller = new AuthController(ctx, accountClient, txOp, {
userId,
workspace
})
await controller.signout(value)
} catch (err) {
ctx.error('signout', { workspace, userId, err })
} finally {
mutex()
}
},
{ workspace, userId }
)
}
private async signout (value: GoogleEmail): Promise<void> {
@ -167,7 +177,7 @@ export class AuthController {
private async setWorkspaceIntegration (res: AuthResult): Promise<void> {
await this.ctx.with(
'Set workspace integration',
{ user: this.user.userId, workspace: this.user.workspace, email: res.email },
{},
async () => {
const integrations = await this.client.findAll(setting.class.Integration, {
createdBy: this.user.userId,
@ -207,26 +217,36 @@ export class AuthController {
})
}
}
},
{
user: this.user.userId,
workspace: this.user.workspace,
email: res.email
}
)
}
private async createAccIntegrationIfNotExists (): Promise<void> {
await this.ctx.with('Create account integration if not exists', { user: this.user.userId }, async () => {
const integration = await this.accountClient.getIntegration({
socialId: this.user.userId,
kind: CALENDAR_INTEGRATION,
workspaceUuid: this.user.workspace
})
if (integration != null) {
return
}
await this.accountClient.createIntegration({
socialId: this.user.userId,
kind: CALENDAR_INTEGRATION,
workspaceUuid: this.user.workspace
})
})
await this.ctx.with(
'Create account integration if not exists',
{},
async () => {
const integration = await this.accountClient.getIntegration({
socialId: this.user.userId,
kind: CALENDAR_INTEGRATION,
workspaceUuid: this.user.workspace
})
if (integration != null) {
return
}
await this.accountClient.createIntegration({
socialId: this.user.userId,
kind: CALENDAR_INTEGRATION,
workspaceUuid: this.user.workspace
})
},
{ user: this.user.userId }
)
}
private async updateToken (token: Credentials, email: GoogleEmail): Promise<void> {

View File

@ -28,14 +28,19 @@ export class PushHandler {
) {}
async sync (token: Token, calendarId: string | null): Promise<void> {
await this.ctx.with('Push handler', { workspace: token.workspace, user: token.userId }, async () => {
const client = await getClient(getWorkspaceToken(token.workspace))
const txOp = new TxOperations(client, core.account.System)
const res = getGoogleClient()
res.auth.setCredentials(token)
await IncomingSyncManager.push(this.ctx, this.accountClient, txOp, token, res.google, calendarId)
await txOp.close()
})
await this.ctx.with(
'Push handler',
{},
async () => {
const client = await getClient(getWorkspaceToken(token.workspace))
const txOp = new TxOperations(client, core.account.System)
const res = getGoogleClient()
res.auth.setCredentials(token)
await IncomingSyncManager.push(this.ctx, this.accountClient, txOp, token, res.google, calendarId)
await txOp.close()
},
{ workspace: token.workspace, user: token.userId }
)
}
async push (email: GoogleEmail, mode: 'events' | 'calendar', calendarId?: string): Promise<void> {

View File

@ -282,16 +282,21 @@ export class PlatformWorker {
}
ctx.info('add integration', { workspace, installationId, accountId })
await ctx.with('add integration', { workspace, installationId, accountId }, async (ctx) => {
await accountsClient.createIntegration({
kind: 'github',
workspaceUuid: record.workspace,
socialId: record.accountId,
data: { installationId: record.installationId }
})
await ctx.with(
'add integration',
{},
async (ctx) => {
await accountsClient.createIntegration({
kind: 'github',
workspaceUuid: record.workspace,
socialId: record.accountId,
data: { installationId: record.installationId }
})
this.integrations.push(record)
})
this.integrations.push(record)
},
{ workspace, installationId, accountId }
)
// We need to query installations to be sure we have it, in case event is delayed or not received.
await this.updateInstallation(installationId)
@ -491,14 +496,16 @@ export class PlatformWorker {
if (!revoke) {
const personSpace = await client.findOne(contact.class.PersonSpace, { person: person._id })
if (personSpace !== undefined && person.personUuid !== undefined) {
await createNotification(client, person, {
user: person.personUuid,
space: personSpace._id,
message: github.string.AuthenticatedWithGithub,
props: {
login: update.login
}
})
if (update.login != null) {
await createNotification(client, person, {
user: person.personUuid,
space: personSpace._id,
message: github.string.AuthenticatedWithGithub,
props: {
login: update.login
}
})
}
}
if (dta?._id !== undefined) {

View File

@ -887,7 +887,7 @@ export class IssueSyncManager extends IssueSyncManagerBase implements DocSyncMan
try {
const response: any = await this.ctx.with(
'graphql.listIssue',
{ prj: prj.name, repo: repo.name },
{},
() =>
integration.octokit.graphql(
`query listIssues {

View File

@ -1557,7 +1557,7 @@ export class GithubWorker implements IntegrationManager {
}
await this.ctx.withLog(
'external sync',
{ installation: integration.installationName, workspace: this.workspace.uuid },
{},
async () => {
const enabled = integration.enabled && integration.octokit !== undefined
@ -1652,13 +1652,15 @@ export class GithubWorker implements IntegrationManager {
}
await this.ctx.withLog(
'external sync',
{ _class: _class.join(', '), workspace: this.workspace.uuid },
{ _class: _class.join(', ') },
async () => {
await mapper.externalFullSync(integration, derivedClient, _projects, _repositories)
}
},
{ installation: integration.installationName, workspace: this.workspace.uuid }
)
}
}
},
{ installation: integration.installationName, workspace: this.workspace.uuid }
)
}
}

View File

@ -21,7 +21,7 @@ import { serviceToken } from './utils'
export async function getAccountPerson (account: AccountUuid): Promise<Person | undefined> {
try {
const accountClient = getAccountClient(generateToken(account))
const accountClient = getAccountClient(generateToken(account, undefined, { service: 'gmail' }))
return await accountClient.getPerson()
} catch (e) {
console.error(e)
@ -31,7 +31,7 @@ export async function getAccountPerson (account: AccountUuid): Promise<Person |
export async function getAccountSocialIds (account: AccountUuid): Promise<SocialId[]> {
try {
const accountClient = getAccountClient(generateToken(account))
const accountClient = getAccountClient(generateToken(account, undefined, { service: 'gmail' }))
return await accountClient.getSocialIds()
} catch (e) {
console.error(e)

View File

@ -77,7 +77,7 @@ export async function saveLiveKitSessionBilling (ctx: MeasureContext, sessionId:
const workspace = session.roomName.split('_')[0] as WorkspaceUuid
const endpoint = concatLink(config.BillingUrl, `/api/v1/billing/${workspace}/livekit/session`)
const token = generateToken(systemAccountUuid, workspace)
const token = generateToken(systemAccountUuid, workspace, { service: 'love' })
try {
const res = await fetch(endpoint, {
@ -116,7 +116,7 @@ export async function saveLiveKitEgressBilling (ctx: MeasureContext, egress: Egr
const workspace = egress.roomName.split('_')[0] as WorkspaceUuid
const endpoint = concatLink(config.BillingUrl, `/api/v1/billing/${workspace}/livekit/egress`)
const token = generateToken(systemAccountUuid, workspace)
const token = generateToken(systemAccountUuid, workspace, { service: 'love' })
try {
const res = await fetch(endpoint, {

View File

@ -21,7 +21,7 @@ import { getClient } from '@hcengineering/kvs-client'
import config from './config'
// TODO: Find account UUID from mailboxes and use personal workspace
export const mailServiceToken = generateToken(systemAccountUuid, undefined, { service: 'mail' }, config.secret)
export const mailServiceToken = generateToken(systemAccountUuid, undefined, { service: 'inbound-mail' }, config.secret)
export const baseConfig: BaseConfig = {
AccountsURL: config.accountsUrl,
KvsUrl: config.kvsUrl,

View File

@ -61,13 +61,14 @@ export async function job (ctx: MeasureContext, storage: StorageAdapter, db: Pos
await rateLimiter.add(async () => {
await ctx.with(
'process',
{},
async () => {
await processRecord(ctx, record, db, storage)
},
{
workspace: record.workspace,
card: record.card,
attempt: record.attempt
},
async () => {
await processRecord(ctx, record, db, storage)
}
)
})

View File

@ -30,7 +30,7 @@ import { IntegrationInfo } from './types'
export async function getAccountPerson (account: AccountUuid): Promise<Person | undefined> {
try {
const accountClient = getAccountClient(generateToken(account))
const accountClient = getAccountClient(generateToken(account, undefined, { service: 'telegram-bot' }))
return await accountClient.getPerson()
} catch (e) {
console.error(e)
@ -40,7 +40,7 @@ export async function getAccountPerson (account: AccountUuid): Promise<Person |
export async function getAccountSocialIds (account: AccountUuid): Promise<SocialId[]> {
try {
const accountClient = getAccountClient(generateToken(account))
const accountClient = getAccountClient(generateToken(account, undefined, { service: 'telegram-bot' }))
return await accountClient.getSocialIds()
} catch (e) {
console.error(e)
@ -49,7 +49,7 @@ export async function getAccountSocialIds (account: AccountUuid): Promise<Social
}
export async function listIntegrationsByAccount (account: AccountUuid): Promise<IntegrationInfo[]> {
const client = getAccountClient(generateToken(account))
const client = getAccountClient(generateToken(account, undefined, { serviece: 'telegram-bot' }))
const integrations = await client.listIntegrations({ kind: 'telegram-bot' })
if (integrations.length === 0) return []
const socialIds = await getAccountSocialIds(account)
@ -118,7 +118,7 @@ export async function getAnyIntegrationByAccount (
account: AccountUuid,
workspace?: WorkspaceUuid
): Promise<IntegrationInfo | undefined> {
const client = getAccountClient(generateToken(account))
const client = getAccountClient(generateToken(account, undefined, { service: 'telegram-bot' }))
const integrations = await client.listIntegrations({ kind: 'telegram-bot', workspaceUuid: workspace })
if (integrations.length === 0) return undefined

View File

@ -292,7 +292,7 @@ export class PlatformWorker {
}
try {
const accountClient = getAccountClient(generateToken(account, workspaceId))
const accountClient = getAccountClient(generateToken(account, workspaceId, { service: 'telegram-bot' }))
const result = await accountClient.getWorkspaceInfo(false)
if (result === undefined) {

View File

@ -55,7 +55,7 @@ export class WorkspaceClient {
ctx: MeasureContext,
storage: StorageAdapter
): Promise<WorkspaceClient> {
const token = generateToken(account, workspace)
const token = generateToken(account, workspace, { service: 'telegram-bot' })
const endpoint = await getTransactorEndpoint(token)
const client = createRestClient(endpoint, workspace, token)
const model = await client.getModel()

View File

@ -400,4 +400,4 @@ services:
- PASSWORD=password
- AVATAR_PATH=./avatar.png
- AVATAR_CONTENT_TYPE=.png
- STATS_URL=http://huly.local:4900
- STATS_URL=http://huly.local:4901