mirror of
https://github.com/hcengineering/platform.git
synced 2025-04-13 03:40:48 +00:00
Refresh query security (#3189)
Signed-off-by: Denis Bykhov <bykhov.denis@gmail.com>
This commit is contained in:
parent
bb53cd615f
commit
190b1078b5
@ -49,7 +49,8 @@ export interface Tx extends Doc {
|
||||
export enum WorkspaceEvent {
|
||||
UpgradeScheduled,
|
||||
Upgrade,
|
||||
IndexingUpdate
|
||||
IndexingUpdate,
|
||||
SecurityChange
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -97,7 +97,7 @@ export class LiveQuery extends TxProcessor implements Client {
|
||||
// Perform refresh of content since connection established.
|
||||
async refreshConnect (): Promise<void> {
|
||||
for (const q of [...this.queue]) {
|
||||
if (!(await this.removeFromQueue(q))) {
|
||||
if (!this.removeFromQueue(q)) {
|
||||
try {
|
||||
await this.refresh(q)
|
||||
} catch (err) {
|
||||
@ -622,9 +622,11 @@ export class LiveQuery extends TxProcessor implements Client {
|
||||
|
||||
private async refresh (q: Query): Promise<void> {
|
||||
const res = await this.client.findAll(q._class, q.query, q.options)
|
||||
q.result = res
|
||||
q.total = res.total
|
||||
await this.callback(q)
|
||||
if (!deepEqual(res, q.result)) {
|
||||
q.result = res
|
||||
q.total = res.total
|
||||
await this.callback(q)
|
||||
}
|
||||
}
|
||||
|
||||
// Check if query is partially matched.
|
||||
@ -974,6 +976,7 @@ export class LiveQuery extends TxProcessor implements Client {
|
||||
async tx (tx: Tx): Promise<TxResult> {
|
||||
if (tx._class === core.class.TxWorkspaceEvent) {
|
||||
await this.checkUpdateFulltextQueries(tx)
|
||||
await this.changePrivateHandler(tx)
|
||||
return {}
|
||||
}
|
||||
return await super.tx(tx)
|
||||
@ -985,7 +988,7 @@ export class LiveQuery extends TxProcessor implements Client {
|
||||
const indexingParam = evt.params as IndexingUpdateEvent
|
||||
for (const q of [...this.queue]) {
|
||||
if (indexingParam._class.includes(q._class) && q.query.$search !== undefined) {
|
||||
if (!(await this.removeFromQueue(q))) {
|
||||
if (!this.removeFromQueue(q)) {
|
||||
try {
|
||||
await this.refresh(q)
|
||||
} catch (err) {
|
||||
@ -1008,6 +1011,34 @@ export class LiveQuery extends TxProcessor implements Client {
|
||||
}
|
||||
}
|
||||
|
||||
private async changePrivateHandler (tx: Tx): Promise<void> {
|
||||
const evt = tx as TxWorkspaceEvent
|
||||
if (evt.event === WorkspaceEvent.SecurityChange) {
|
||||
for (const q of [...this.queue]) {
|
||||
if (typeof q.query.space !== 'string') {
|
||||
if (!this.removeFromQueue(q)) {
|
||||
try {
|
||||
await this.refresh(q)
|
||||
} catch (err) {
|
||||
console.error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for (const v of this.queries.values()) {
|
||||
for (const q of v) {
|
||||
if (typeof q.query.space !== 'string') {
|
||||
try {
|
||||
await this.refresh(q)
|
||||
} catch (err) {
|
||||
console.error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async __updateLookup (q: Query, updatedDoc: WithLookup<Doc>, ops: any): Promise<void> {
|
||||
for (const key in ops) {
|
||||
if (!key.startsWith('$')) {
|
||||
|
@ -40,7 +40,10 @@
|
||||
$: if (value.category === tracker.issueStatusCategory.Started) {
|
||||
const _s = [
|
||||
...$statusStore.filter(
|
||||
(it) => it.ofAttribute === value.ofAttribute && it.category === tracker.issueStatusCategory.Started
|
||||
(it) =>
|
||||
it.ofAttribute === value.ofAttribute &&
|
||||
it.category === tracker.issueStatusCategory.Started &&
|
||||
it.space === value.space
|
||||
)
|
||||
]
|
||||
_s.sort((a, b) => a.rank.localeCompare(b.rank))
|
||||
|
@ -29,7 +29,7 @@ import {
|
||||
TxResult
|
||||
} from '@hcengineering/core'
|
||||
import { DbConfiguration, createServerStorage } from './storage'
|
||||
import { Middleware, MiddlewareCreator, Pipeline, SessionContext } from './types'
|
||||
import { BroadcastFunc, Middleware, MiddlewareCreator, Pipeline, SessionContext } from './types'
|
||||
|
||||
/**
|
||||
* @public
|
||||
@ -39,13 +39,13 @@ export async function createPipeline (
|
||||
conf: DbConfiguration,
|
||||
constructors: MiddlewareCreator[],
|
||||
upgrade: boolean,
|
||||
broadcast: (tx: Tx[]) => void
|
||||
broadcast: BroadcastFunc
|
||||
): Promise<Pipeline> {
|
||||
const storage = await createServerStorage(conf, {
|
||||
upgrade,
|
||||
broadcast
|
||||
})
|
||||
const pipeline = PipelineImpl.create(ctx, storage, constructors)
|
||||
const pipeline = PipelineImpl.create(ctx, storage, constructors, broadcast)
|
||||
return await pipeline
|
||||
}
|
||||
|
||||
@ -59,18 +59,23 @@ class PipelineImpl implements Pipeline {
|
||||
static async create (
|
||||
ctx: MeasureContext,
|
||||
storage: ServerStorage,
|
||||
constructors: MiddlewareCreator[]
|
||||
constructors: MiddlewareCreator[],
|
||||
broadcast: BroadcastFunc
|
||||
): Promise<PipelineImpl> {
|
||||
const pipeline = new PipelineImpl(storage)
|
||||
pipeline.head = await pipeline.buildChain(ctx, constructors)
|
||||
pipeline.head = await pipeline.buildChain(ctx, constructors, broadcast)
|
||||
return pipeline
|
||||
}
|
||||
|
||||
private async buildChain (ctx: MeasureContext, constructors: MiddlewareCreator[]): Promise<Middleware | undefined> {
|
||||
private async buildChain (
|
||||
ctx: MeasureContext,
|
||||
constructors: MiddlewareCreator[],
|
||||
broadcast: BroadcastFunc
|
||||
): Promise<Middleware | undefined> {
|
||||
let current: Middleware | undefined
|
||||
for (let index = constructors.length - 1; index >= 0; index--) {
|
||||
const element = constructors[index]
|
||||
current = await element(ctx, this.storage, current)
|
||||
current = await element(ctx, broadcast, this.storage, current)
|
||||
}
|
||||
return current
|
||||
}
|
||||
|
@ -64,7 +64,17 @@ export interface Middleware {
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export type MiddlewareCreator = (ctx: MeasureContext, storage: ServerStorage, next?: Middleware) => Promise<Middleware>
|
||||
export type BroadcastFunc = (tx: Tx[], targets?: string[]) => void
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export type MiddlewareCreator = (
|
||||
ctx: MeasureContext,
|
||||
broadcast: BroadcastFunc,
|
||||
storage: ServerStorage,
|
||||
next?: Middleware
|
||||
) => Promise<Middleware>
|
||||
|
||||
/**
|
||||
* @public
|
||||
|
@ -29,7 +29,7 @@ import core, {
|
||||
TxCUD
|
||||
} from '@hcengineering/core'
|
||||
import platform, { PlatformError, Severity, Status } from '@hcengineering/platform'
|
||||
import { Middleware, SessionContext, TxMiddlewareResult } from '@hcengineering/server-core'
|
||||
import { BroadcastFunc, Middleware, SessionContext, TxMiddlewareResult } from '@hcengineering/server-core'
|
||||
import { BaseMiddleware } from './base'
|
||||
|
||||
const configurationAccountEmail = '#configurator@hc.engineering'
|
||||
@ -45,6 +45,7 @@ export class ConfigurationMiddleware extends BaseMiddleware implements Middlewar
|
||||
|
||||
static async create (
|
||||
ctx: MeasureContext,
|
||||
broadcast: BroadcastFunc,
|
||||
storage: ServerStorage,
|
||||
next?: Middleware
|
||||
): Promise<ConfigurationMiddleware> {
|
||||
|
@ -23,7 +23,7 @@ import core, {
|
||||
TxCollectionCUD,
|
||||
TxCreateDoc
|
||||
} from '@hcengineering/core'
|
||||
import { Middleware, SessionContext, TxMiddlewareResult } from '@hcengineering/server-core'
|
||||
import { BroadcastFunc, Middleware, SessionContext, TxMiddlewareResult } from '@hcengineering/server-core'
|
||||
import { BaseMiddleware } from './base'
|
||||
|
||||
/**
|
||||
@ -34,7 +34,12 @@ export class ModifiedMiddleware extends BaseMiddleware implements Middleware {
|
||||
super(storage, next)
|
||||
}
|
||||
|
||||
static async create (ctx: MeasureContext, storage: ServerStorage, next?: Middleware): Promise<ModifiedMiddleware> {
|
||||
static async create (
|
||||
ctx: MeasureContext,
|
||||
broadcast: BroadcastFunc,
|
||||
storage: ServerStorage,
|
||||
next?: Middleware
|
||||
): Promise<ModifiedMiddleware> {
|
||||
return new ModifiedMiddleware(storage, next)
|
||||
}
|
||||
|
||||
|
@ -29,7 +29,7 @@ import core, {
|
||||
TxCUD
|
||||
} from '@hcengineering/core'
|
||||
import platform, { PlatformError, Severity, Status } from '@hcengineering/platform'
|
||||
import { Middleware, SessionContext, TxMiddlewareResult } from '@hcengineering/server-core'
|
||||
import { BroadcastFunc, Middleware, SessionContext, TxMiddlewareResult } from '@hcengineering/server-core'
|
||||
import { DOMAIN_PREFERENCE } from '@hcengineering/server-preference'
|
||||
import { BaseMiddleware } from './base'
|
||||
import { getUser, mergeTargets } from './utils'
|
||||
@ -44,7 +44,12 @@ export class PrivateMiddleware extends BaseMiddleware implements Middleware {
|
||||
super(storage, next)
|
||||
}
|
||||
|
||||
static async create (ctx: MeasureContext, storage: ServerStorage, next?: Middleware): Promise<PrivateMiddleware> {
|
||||
static async create (
|
||||
ctx: MeasureContext,
|
||||
broadcast: BroadcastFunc,
|
||||
storage: ServerStorage,
|
||||
next?: Middleware
|
||||
): Promise<PrivateMiddleware> {
|
||||
return new PrivateMiddleware(storage, next)
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,7 @@ import {
|
||||
ServerStorage,
|
||||
Tx
|
||||
} from '@hcengineering/core'
|
||||
import { Middleware, SessionContext, TxMiddlewareResult } from '@hcengineering/server-core'
|
||||
import { BroadcastFunc, Middleware, SessionContext, TxMiddlewareResult } from '@hcengineering/server-core'
|
||||
import { BaseMiddleware } from './base'
|
||||
|
||||
import { deepEqual } from 'fast-equals'
|
||||
@ -47,7 +47,12 @@ export class QueryJoinMiddleware extends BaseMiddleware implements Middleware {
|
||||
super(storage, next)
|
||||
}
|
||||
|
||||
static async create (ctx: MeasureContext, storage: ServerStorage, next?: Middleware): Promise<QueryJoinMiddleware> {
|
||||
static async create (
|
||||
ctx: MeasureContext,
|
||||
broadcast: BroadcastFunc,
|
||||
storage: ServerStorage,
|
||||
next?: Middleware
|
||||
): Promise<QueryJoinMiddleware> {
|
||||
return new QueryJoinMiddleware(storage, next)
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@ import core, {
|
||||
DocumentQuery,
|
||||
FindOptions,
|
||||
FindResult,
|
||||
generateId,
|
||||
LookupData,
|
||||
MeasureContext,
|
||||
ObjQueryType,
|
||||
@ -34,10 +35,12 @@ import core, {
|
||||
TxCUD,
|
||||
TxProcessor,
|
||||
TxRemoveDoc,
|
||||
TxUpdateDoc
|
||||
TxUpdateDoc,
|
||||
TxWorkspaceEvent,
|
||||
WorkspaceEvent
|
||||
} from '@hcengineering/core'
|
||||
import platform, { PlatformError, Severity, Status } from '@hcengineering/platform'
|
||||
import { Middleware, SessionContext, TxMiddlewareResult } from '@hcengineering/server-core'
|
||||
import { BroadcastFunc, Middleware, SessionContext, TxMiddlewareResult } from '@hcengineering/server-core'
|
||||
import { BaseMiddleware } from './base'
|
||||
import { getUser, isOwner, mergeTargets } from './utils'
|
||||
|
||||
@ -56,16 +59,17 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
|
||||
core.space.Tx
|
||||
]
|
||||
|
||||
private constructor (storage: ServerStorage, next?: Middleware) {
|
||||
private constructor (private readonly broadcast: BroadcastFunc, storage: ServerStorage, next?: Middleware) {
|
||||
super(storage, next)
|
||||
}
|
||||
|
||||
static async create (
|
||||
ctx: MeasureContext,
|
||||
broadcast: BroadcastFunc,
|
||||
storage: ServerStorage,
|
||||
next?: Middleware
|
||||
): Promise<SpaceSecurityMiddleware> {
|
||||
const res = new SpaceSecurityMiddleware(storage, next)
|
||||
const res = new SpaceSecurityMiddleware(broadcast, storage, next)
|
||||
await res.init(ctx)
|
||||
return res
|
||||
}
|
||||
@ -125,42 +129,58 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
|
||||
}
|
||||
}
|
||||
|
||||
private pushMembersHandle (addedMembers: Ref<Account> | Position<Ref<Account>>, space: Ref<Space>): void {
|
||||
private async pushMembersHandle (
|
||||
addedMembers: Ref<Account> | Position<Ref<Account>>,
|
||||
space: Ref<Space>
|
||||
): Promise<void> {
|
||||
if (typeof addedMembers === 'object') {
|
||||
for (const member of addedMembers.$each) {
|
||||
this.addMemberSpace(member, space)
|
||||
}
|
||||
await this.brodcastEvent(addedMembers.$each)
|
||||
} else {
|
||||
this.addMemberSpace(addedMembers, space)
|
||||
await this.brodcastEvent([addedMembers])
|
||||
}
|
||||
}
|
||||
|
||||
private pullMembersHandle (removedMembers: Partial<Ref<Account>> | PullArray<Ref<Account>>, space: Ref<Space>): void {
|
||||
private async pullMembersHandle (
|
||||
removedMembers: Partial<Ref<Account>> | PullArray<Ref<Account>>,
|
||||
space: Ref<Space>
|
||||
): Promise<void> {
|
||||
if (typeof removedMembers === 'object') {
|
||||
const { $in } = removedMembers as PullArray<Ref<Account>>
|
||||
if ($in !== undefined) {
|
||||
for (const member of $in) {
|
||||
this.removeMemberSpace(member, space)
|
||||
}
|
||||
await this.brodcastEvent($in)
|
||||
}
|
||||
} else {
|
||||
this.removeMemberSpace(removedMembers, space)
|
||||
await this.brodcastEvent([removedMembers])
|
||||
}
|
||||
}
|
||||
|
||||
private syncMembers (members: Ref<Account>[], space: Space): void {
|
||||
private async syncMembers (members: Ref<Account>[], space: Space): Promise<void> {
|
||||
const oldMembers = new Set(space.members)
|
||||
const newMembers = new Set(members)
|
||||
const changed: Ref<Account>[] = []
|
||||
for (const old of oldMembers) {
|
||||
if (!newMembers.has(old)) {
|
||||
this.removeMemberSpace(old, space._id)
|
||||
changed.push(old)
|
||||
}
|
||||
}
|
||||
for (const newMem of newMembers) {
|
||||
if (!oldMembers.has(newMem)) {
|
||||
this.addMemberSpace(newMem, space._id)
|
||||
changed.push(newMem)
|
||||
}
|
||||
}
|
||||
if (changed.length > 0) {
|
||||
await this.brodcastEvent(changed)
|
||||
}
|
||||
}
|
||||
|
||||
private removePublicSpace (_id: Ref<Space>): void {
|
||||
@ -170,6 +190,26 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
|
||||
}
|
||||
}
|
||||
|
||||
private async brodcastEvent (users: Ref<Account>[]): Promise<void> {
|
||||
const targets = await this.getTargets(users)
|
||||
const tx: TxWorkspaceEvent = {
|
||||
_class: core.class.TxWorkspaceEvent,
|
||||
_id: generateId(),
|
||||
event: WorkspaceEvent.SecurityChange,
|
||||
modifiedBy: core.account.System,
|
||||
modifiedOn: Date.now(),
|
||||
objectSpace: core.space.DerivedTx,
|
||||
space: core.space.DerivedTx,
|
||||
params: null
|
||||
}
|
||||
this.broadcast([tx], targets)
|
||||
}
|
||||
|
||||
private async broadcastNonMembers (space: Space | undefined): Promise<void> {
|
||||
const users = await this.storage.modelDb.findAll(core.class.Account, { _id: { $nin: space?.members } })
|
||||
await this.brodcastEvent(users.map((p) => p._id))
|
||||
}
|
||||
|
||||
private async handleUpdate (ctx: SessionContext, tx: TxCUD<Space>): Promise<void> {
|
||||
const updateDoc = tx as TxUpdateDoc<Space>
|
||||
if (!this.storage.hierarchy.isDerived(updateDoc.objectClass, core.class.Space)) return
|
||||
@ -181,24 +221,27 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
|
||||
res.private = true
|
||||
this.addSpace(res)
|
||||
this.removePublicSpace(res._id)
|
||||
await this.broadcastNonMembers(res)
|
||||
}
|
||||
} else if (!updateDoc.operations.private) {
|
||||
const space = this.privateSpaces[updateDoc.objectId]
|
||||
this.removeSpace(updateDoc.objectId)
|
||||
this.publicSpaces.push(updateDoc.objectId)
|
||||
await this.broadcastNonMembers(space)
|
||||
}
|
||||
}
|
||||
|
||||
let space = this.privateSpaces[updateDoc.objectId]
|
||||
if (space !== undefined) {
|
||||
if (updateDoc.operations.members !== undefined) {
|
||||
this.syncMembers(updateDoc.operations.members, space)
|
||||
await this.syncMembers(updateDoc.operations.members, space)
|
||||
}
|
||||
if (updateDoc.operations.$push?.members !== undefined) {
|
||||
this.pushMembersHandle(updateDoc.operations.$push.members, space._id)
|
||||
await this.pushMembersHandle(updateDoc.operations.$push.members, space._id)
|
||||
}
|
||||
|
||||
if (updateDoc.operations.$pull?.members !== undefined) {
|
||||
this.pullMembersHandle(updateDoc.operations.$pull.members, space._id)
|
||||
await this.pullMembersHandle(updateDoc.operations.$pull.members, space._id)
|
||||
}
|
||||
space = TxProcessor.updateDoc2Doc(space, updateDoc)
|
||||
}
|
||||
@ -259,6 +302,12 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
|
||||
}
|
||||
}
|
||||
}
|
||||
if (h.isDerived(cudTx.objectClass, core.class.Account) && cudTx._class === core.class.TxUpdateDoc) {
|
||||
const ctx = cudTx as TxUpdateDoc<Account>
|
||||
if (ctx.operations.role !== undefined) {
|
||||
await this.brodcastEvent([ctx.objectId])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const res = await this.provideTx(ctx, tx)
|
||||
|
@ -151,11 +151,13 @@ class TSessionManager implements SessionManager {
|
||||
console.log(token.workspace.name, 'no sessions for workspace', wsString)
|
||||
}
|
||||
// Re-create pipeline.
|
||||
workspace.pipeline = pipelineFactory(ctx, token.workspace, true, (tx) => this.broadcastAll(workspace, tx))
|
||||
workspace.pipeline = pipelineFactory(ctx, token.workspace, true, (tx, targets) =>
|
||||
this.broadcastAll(workspace, tx, targets)
|
||||
)
|
||||
return await workspace.pipeline
|
||||
}
|
||||
|
||||
broadcastAll (workspace: Workspace, tx: Tx[]): void {
|
||||
broadcastAll (workspace: Workspace, tx: Tx[], targets?: string[]): void {
|
||||
if (workspace?.upgrade ?? false) {
|
||||
return
|
||||
}
|
||||
@ -163,6 +165,7 @@ class TSessionManager implements SessionManager {
|
||||
const sessions = [...workspace.sessions.values()]
|
||||
function send (): void {
|
||||
for (const session of sessions.splice(0, 1)) {
|
||||
if (targets !== undefined && !targets.includes(session.session.getUser())) continue
|
||||
for (const _tx of tx) {
|
||||
void session.socket.send(ctx, { result: _tx }, session.session.binaryResponseMode, false)
|
||||
}
|
||||
@ -180,7 +183,9 @@ class TSessionManager implements SessionManager {
|
||||
const upgrade = token.extra?.model === 'upgrade'
|
||||
const workspace: Workspace = {
|
||||
id: generateId(),
|
||||
pipeline: pipelineFactory(ctx, token.workspace, upgrade, (tx) => this.broadcastAll(workspace, tx)),
|
||||
pipeline: pipelineFactory(ctx, token.workspace, upgrade, (tx, targets) =>
|
||||
this.broadcastAll(workspace, tx, targets)
|
||||
),
|
||||
sessions: new Map(),
|
||||
upgrade
|
||||
}
|
||||
|
@ -11,7 +11,7 @@ import {
|
||||
WorkspaceId
|
||||
} from '@hcengineering/core'
|
||||
import { Response } from '@hcengineering/rpc'
|
||||
import { Pipeline } from '@hcengineering/server-core'
|
||||
import { BroadcastFunc, Pipeline } from '@hcengineering/server-core'
|
||||
import { Token } from '@hcengineering/server-token'
|
||||
|
||||
/**
|
||||
@ -67,7 +67,7 @@ export type PipelineFactory = (
|
||||
ctx: MeasureContext,
|
||||
ws: WorkspaceId,
|
||||
upgrade: boolean,
|
||||
broadcast: (tx: Tx[]) => void
|
||||
broadcast: BroadcastFunc
|
||||
) => Promise<Pipeline>
|
||||
|
||||
/**
|
||||
@ -120,7 +120,7 @@ export interface SessionManager {
|
||||
sessionId?: string
|
||||
) => Promise<Session>
|
||||
|
||||
broadcastAll: (workspace: Workspace, tx: Tx[]) => void
|
||||
broadcastAll: (workspace: Workspace, tx: Tx[], targets?: string[]) => void
|
||||
|
||||
close: (
|
||||
ctx: MeasureContext,
|
||||
|
Loading…
Reference in New Issue
Block a user