platform/server/ws/src/server_u.ts_
Andrey Sobolev 84b7df4923
TSK-742: Use partial binary protocol with ability on/off (#3153)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
2023-05-11 00:16:13 +07:00

209 lines
6.2 KiB
Plaintext

//
// Copyright © 2023 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
// uWebSockets.js
// Import should be added: "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.23.0"
import { MeasureContext, generateId } from '@hcengineering/core'
import { Token, decodeToken } from '@hcengineering/server-token'
import { serialize } from '@hcengineering/rpc'
import uWebSockets, { SHARED_COMPRESSOR, WebSocket } from 'uWebSockets.js'
import { getStatistics } from './stats'
import {
ConnectionSocket,
HandleRequestFunction,
LOGGING_ENABLED,
PipelineFactory,
Session,
SessionManager
} from './types'
interface WebsocketUserData {
wrapper?: ConnectionSocket
payload: Token
session?: Promise<Session>
backPressure?: Promise<void>
backPressureResolve?: () => void
unsendMsg: any[]
}
/**
* @public
* @param port -
* @param host -
*/
export function startUWebsocketServer (
sessions: SessionManager,
handleRequest: HandleRequestFunction,
ctx: MeasureContext,
pipelineFactory: PipelineFactory,
port: number,
productId: string
): () => Promise<void> {
if (LOGGING_ENABLED) console.log(`starting U server on port ${port} ...`)
const uAPP = uWebSockets.App()
uAPP
.ws<WebsocketUserData>('/*', {
/* There are many common helper features */
// idleTimeout: 32,
// maxBackpressure: 1024,
maxPayloadLength: 50 * 1024 * 1024,
compression: SHARED_COMPRESSOR,
maxLifetime: 0,
sendPingsAutomatically: true,
upgrade (res, req, context) {
const url = new URL('http://localhost' + (req.getUrl() ?? ''))
const token = url.pathname.substring(1)
try {
const payload = decodeToken(token ?? '')
if (payload.workspace.productId !== productId) {
throw new Error('Invalid workspace product')
}
/* This immediately calls open handler, you must not use res after this call */
res.upgrade<WebsocketUserData>(
{
payload,
session: undefined,
unsendMsg: []
},
/* Spell these correctly */
req.getHeader('sec-websocket-key'),
req.getHeader('sec-websocket-protocol'),
req.getHeader('sec-websocket-extensions'),
context
)
} catch (err) {
if (LOGGING_ENABLED) console.error('invalid token', err)
res.writeStatus('401 Unauthorized').end()
}
},
open: (ws: WebSocket<WebsocketUserData>) => {
const data = ws.getUserData()
data.wrapper = {
id: generateId(),
close: () => {
try {
ws.close()
} catch (err) {
// Ignore closed
}
},
send: async (ctx, msg): Promise<void> => {
await ctx.with('backpressure', {}, async () => await data.backPressure)
const serialized = await ctx.with('serialize', {}, async () => serialize(msg))
ctx.measure('send-data', serialized.length)
try {
const sendR = await ctx.with('backpressure', {}, async () =>
ws.send(serialized, false, Array.isArray(msg.result))
)
if (sendR === 2) {
data.backPressure = new Promise((resolve) => {
data.backPressureResolve = resolve
})
data.unsendMsg.push(msg)
}
} catch (err: any) {
if (!((err.message ?? '') as string).includes('Invalid access of closed')) {
console.error(err)
}
// Ignore socket is closed
}
}
}
data.session = sessions.addSession(
ctx,
data.wrapper,
ws.getUserData().payload,
pipelineFactory,
productId,
undefined
)
},
message: (ws, message, isBinary) => {
const data = ws.getUserData()
const enc = new TextDecoder('utf-8')
const tmsg = enc.decode(message)
void data.session?.then((s) => {
void handleRequest(ctx, s, data.wrapper as ConnectionSocket, tmsg, data.payload.workspace.name)
})
},
drain: (ws) => {
console.log(`WebSocket backpressure: ${ws.getBufferedAmount()}`)
const data = ws.getUserData()
while (data.unsendMsg.length > 0) {
if (ws.send(data.unsendMsg[0]) !== 1) {
data.unsendMsg.shift()
} else {
// Wait for next drain.
return
}
}
data.backPressureResolve?.()
data.backPressure = undefined
},
close: (ws, code, message) => {
const enc = new TextDecoder('utf-8')
const data = ws.getUserData()
try {
const tmsg = enc.decode(message)
if (tmsg !== undefined && tmsg !== '') {
console.error(tmsg)
}
} catch (err) {
console.error(err)
}
void data.session?.then((s) => {
void sessions.close(ctx, data.wrapper as ConnectionSocket, data.payload.workspace, code, '')
})
}
})
.any('/*', (response, request) => {
const url = new URL('http://localhost' + (request.getUrl() ?? ''))
const token = url.pathname.substring(1)
try {
const payload = decodeToken(token ?? '')
console.log(payload.workspace, 'statistics request')
const json = JSON.stringify(getStatistics(ctx, sessions))
response
.writeStatus('200 OK')
.writeHeader('Content-Type', 'application/json')
.writeHeader('Access-Control-Allow-Origin', '*')
.writeHeader('Access-Control-Allow-Methods', 'GET, OPTIONS')
.writeHeader('Access-Control-Allow-Headers', 'Content-Type')
.end(json)
} catch (err) {
response.writeHead(404, {})
response.end()
}
})
.listen(port, (s) => {})
return async () => {
await sessions.closeWorkspaces(ctx)
}
}