UBERF-9075 Support LiveKit recordings in Datalake (#7607)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2025-01-09 00:06:00 +07:00 committed by GitHub
parent bfe2960787
commit f8c27c3b10
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 250 additions and 45 deletions

View File

@ -238,8 +238,8 @@
"summary": "Build docker with platform",
"description": "use to build all docker containers required for platform",
"safeForSimultaneousRushProcesses": true,
"shellCommand": "rush docker:build -p 20 --to @hcengineering/pod-server --to @hcengineering/pod-front --to @hcengineering/prod --to @hcengineering/pod-account --to @hcengineering/pod-workspace --to @hcengineering/pod-collaborator --to @hcengineering/tool --to @hcengineering/pod-print --to @hcengineering/pod-sign --to @hcengineering/pod-analytics-collector --to @hcengineering/rekoni-service --to @hcengineering/pod-ai-bot --to @hcengineering/import-tool --to @hcengineering/pod-stats --to @hcengineering/pod-fulltext"
},
"shellCommand": "rush docker:build -p 20 --to @hcengineering/pod-server --to @hcengineering/pod-front --to @hcengineering/prod --to @hcengineering/pod-account --to @hcengineering/pod-workspace --to @hcengineering/pod-collaborator --to @hcengineering/tool --to @hcengineering/pod-print --to @hcengineering/pod-sign --to @hcengineering/pod-analytics-collector --to @hcengineering/rekoni-service --to @hcengineering/pod-ai-bot --to @hcengineering/import-tool --to @hcengineering/pod-stats --to @hcengineering/pod-fulltext --to @hcengineering/pod-love"
},
{
"commandKind": "global",
"name": "docker:up",

View File

@ -73,6 +73,11 @@ interface MultipartUploadPart {
etag: string
}
export interface R2UploadParams {
location: string
bucket: string
}
/** @public */
export class DatalakeClient {
constructor (private readonly endpoint: string) {}
@ -320,6 +325,8 @@ export class DatalakeClient {
return await this.signObjectComplete(ctx, workspace, objectName)
}
// S3
async uploadFromS3 (
ctx: MeasureContext,
workspace: WorkspaceId,
@ -342,6 +349,37 @@ export class DatalakeClient {
})
}
// R2
async getR2UploadParams (ctx: MeasureContext, workspace: WorkspaceId): Promise<R2UploadParams> {
const path = `/upload/r2/${workspace.name}`
const url = concatLink(this.endpoint, path)
const response = await fetchSafe(ctx, url)
const json = (await response.json()) as R2UploadParams
return json
}
async uploadFromR2 (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string,
params: {
filename: string
}
): Promise<void> {
const path = `/upload/r2/${workspace.name}/${encodeURIComponent(objectName)}`
const url = concatLink(this.endpoint, path)
await fetchSafe(ctx, url, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(params)
})
}
// Signed URL
private async signObjectSign (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<string> {

View File

@ -65,6 +65,8 @@
"@hcengineering/platform": "^0.6.11",
"@hcengineering/server-client": "^0.6.0",
"@hcengineering/server-token": "^0.6.11",
"@hcengineering/datalake": "^0.6.0",
"@hcengineering/s3": "^0.6.0",
"livekit-server-sdk": "^2.0.10",
"jwt-simple": "^0.5.6",
"uuid": "^8.3.2",

View File

@ -24,6 +24,7 @@ interface Config {
StorageConfig: string
StorageProviderName: string
S3StorageConfig: string
Secret: string
MongoUrl: string
@ -39,6 +40,7 @@ const envMap: { [key in keyof Config]: string } = {
StorageConfig: 'STORAGE_CONFIG',
StorageProviderName: 'STORAGE_PROVIDER_NAME',
S3StorageConfig: 'S3_STORAGE_CONFIG',
Secret: 'SECRET',
ServiceID: 'SERVICE_ID',
MongoUrl: 'MONGO_URL'
@ -55,12 +57,13 @@ const config: Config = (() => {
ApiSecret: process.env[envMap.ApiSecret],
StorageConfig: process.env[envMap.StorageConfig],
StorageProviderName: process.env[envMap.StorageProviderName] ?? 's3',
S3StorageConfig: process.env[envMap.S3StorageConfig],
Secret: process.env[envMap.Secret],
ServiceID: process.env[envMap.ServiceID] ?? 'love-service',
MongoUrl: process.env[envMap.MongoUrl]
}
const optional = ['StorageConfig']
const optional = ['StorageConfig', 'S3StorageConfig']
const missingEnv = (Object.keys(params) as Array<keyof Config>)
.filter((key) => !optional.includes(key))

View File

@ -13,11 +13,11 @@
// limitations under the License.
//
import { Ref, toWorkspaceString, WorkspaceId } from '@hcengineering/core'
import { MeasureContext, Ref, WorkspaceId } from '@hcengineering/core'
import { setMetadata } from '@hcengineering/platform'
import serverClient from '@hcengineering/server-client'
import { initStatisticsContext, StorageConfig, StorageConfiguration } from '@hcengineering/server-core'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import { storageConfigFromEnv } from '@hcengineering/server-storage'
import serverToken, { decodeToken } from '@hcengineering/server-token'
import { RoomMetadata, TranscriptionStatus, MeetingMinutes } from '@hcengineering/love'
import cors from 'cors'
@ -32,8 +32,8 @@ import {
S3Upload,
WebhookReceiver
} from 'livekit-server-sdk'
import { v4 as uuid } from 'uuid'
import config from './config'
import { getS3UploadParams, saveFile } from './storage'
import { WorkspaceClient } from './workspaceClient'
const extractToken = (header: IncomingHttpHeaders): any => {
@ -50,11 +50,14 @@ export const main = async (): Promise<void> => {
setMetadata(serverToken.metadata.Secret, config.Secret)
const storageConfigs: StorageConfiguration = storageConfigFromEnv()
const s3StorageConfigs: StorageConfiguration | undefined =
config.S3StorageConfig !== undefined ? storageConfigFromEnv(config.S3StorageConfig) : undefined
const ctx = initStatisticsContext('love', {})
const storageConfig = storageConfigs.storages.findLast((p) => p.name === config.StorageProviderName)
const storageAdapter = buildStorageFromConfig(storageConfigs)
const s3storageConfig = s3StorageConfigs?.storages.findLast((p) => p.kind === 's3')
const app = express()
const port = config.Port
app.use(cors())
@ -81,13 +84,11 @@ export const main = async (): Promise<void> => {
if (event.event === 'egress_ended' && event.egressInfo !== undefined) {
for (const res of event.egressInfo.fileResults) {
const data = dataByUUID.get(res.filename)
if (data !== undefined) {
const prefix = rootPrefix(storageConfig, data.workspaceId)
const filename = stripPrefix(prefix, res.filename)
const storedBlob = await storageAdapter.stat(ctx, data.workspaceId, filename)
if (data !== undefined && storageConfig !== undefined) {
const storedBlob = await saveFile(ctx, data.workspaceId, storageConfig, s3storageConfig, res.filename)
if (storedBlob !== undefined) {
const client = await WorkspaceClient.create(data.workspace, ctx)
await client.saveFile(filename, data.name, storedBlob, data.meetingMinutes)
await client.saveFile(storedBlob._id, data.name, storedBlob, data.meetingMinutes)
await client.close()
}
dataByUUID.delete(res.filename)
@ -135,7 +136,7 @@ export const main = async (): Promise<void> => {
try {
const dateStr = new Date().toISOString().replace('T', '_').slice(0, 19)
const name = `${room}_${dateStr}.mp4`
const id = await startRecord(storageConfig, egressClient, roomClient, roomName, workspace)
const id = await startRecord(ctx, storageConfig, s3storageConfig, egressClient, roomClient, roomName, workspace)
dataByUUID.set(id, { name, workspace: workspace.name, workspaceId: workspace, meetingMinutes })
ctx.info('Start recording', { workspace: workspace.name, roomName, meetingMinutes })
res.send()
@ -257,50 +258,26 @@ const checkRecordAvailable = async (storageConfig: StorageConfig | undefined): P
return storageConfig !== undefined
}
function getBucket (storageConfig: any, workspaceId: WorkspaceId): string {
return storageConfig.rootBucket ?? (storageConfig.bucketPrefix ?? '') + toWorkspaceString(workspaceId)
}
function getBucketFolder (workspaceId: WorkspaceId): string {
return toWorkspaceString(workspaceId)
}
function getDocumentKey (storageConfig: any, workspace: WorkspaceId, name: string): string {
return storageConfig.rootBucket === undefined ? name : `${getBucketFolder(workspace)}/${name}`
}
function stripPrefix (prefix: string | undefined, key: string): string {
if (prefix !== undefined && key.startsWith(prefix)) {
return key.slice(prefix.length)
}
return key
}
function rootPrefix (storageConfig: any, workspaceId: WorkspaceId): string | undefined {
return storageConfig.rootBucket !== undefined ? getBucketFolder(workspaceId) + '/' : undefined
}
const startRecord = async (
ctx: MeasureContext,
storageConfig: StorageConfig | undefined,
s3StorageConfig: StorageConfig | undefined,
egressClient: EgressClient,
roomClient: RoomServiceClient,
roomName: string,
workspaceId: WorkspaceId
): Promise<string> => {
if (storageConfig === undefined) {
console.error('please provide s3 storage configuration')
throw new Error('please provide s3 storage configuration')
console.error('please provide storage configuration')
throw new Error('please provide storage configuration')
}
const endpoint = storageConfig.endpoint
const accessKey = (storageConfig as any).accessKey
const secret = (storageConfig as any).secretKey
const region = (storageConfig as any).region ?? 'auto'
const bucket = getBucket(storageConfig, workspaceId)
const name = uuid()
const filepath = getDocumentKey(storageConfig, workspaceId, `${name}.mp4`)
const uploadParams = await getS3UploadParams(ctx, workspaceId, storageConfig, s3StorageConfig)
const { filepath, endpoint, accessKey, secret, region, bucket } = uploadParams
const output = new EncodedFileOutput({
fileType: EncodedFileType.MP4,
filepath,
disableManifest: true,
output: {
case: 's3',
value: new S3Upload({

View File

@ -0,0 +1,185 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { Blob, MeasureContext, toWorkspaceString, WorkspaceId } from '@hcengineering/core'
import { DatalakeConfig, DatalakeService, createDatalakeClient } from '@hcengineering/datalake'
import { S3Config, S3Service } from '@hcengineering/s3'
import { StorageConfig } from '@hcengineering/server-core'
import { v4 as uuid } from 'uuid'
export interface S3UploadParams {
filepath: string
endpoint: string
accessKey: string
region: string
secret: string
bucket: string
}
export async function getS3UploadParams (
ctx: MeasureContext,
workspaceId: WorkspaceId,
storageConfig: StorageConfig,
s3StorageConfig: StorageConfig | undefined
): Promise<S3UploadParams> {
if (storageConfig.kind === 's3') {
if (storageConfig.kind !== 's3') {
throw new Error('Please provide S3 storage config')
}
return await getS3UploadParamsS3(ctx, workspaceId, storageConfig as S3Config)
} else if (storageConfig.kind === 'datalake') {
if (s3StorageConfig === undefined || s3StorageConfig.kind !== 's3') {
throw new Error('Please provide S3 storage config')
}
return await getS3UploadParamsDatalake(
ctx,
workspaceId,
storageConfig as DatalakeConfig,
s3StorageConfig as S3Config
)
} else {
throw new Error('Unknown storage kind: ' + storageConfig.kind)
}
}
export async function saveFile (
ctx: MeasureContext,
workspaceId: WorkspaceId,
storageConfig: StorageConfig,
s3StorageConfig: StorageConfig | undefined,
filename: string
): Promise<Blob | undefined> {
if (storageConfig.kind === 's3') {
if (storageConfig.kind !== 's3') {
throw new Error('Please provide S3 storage config')
}
return await saveFileToS3(ctx, workspaceId, storageConfig as S3Config, filename)
} else if (storageConfig.kind === 'datalake') {
if (s3StorageConfig === undefined || s3StorageConfig.kind !== 's3') {
throw new Error('Please provide S3 storage config')
}
return await saveFileToDatalake(
ctx,
workspaceId,
storageConfig as DatalakeConfig,
s3StorageConfig as S3Config,
filename
)
} else {
throw new Error('Unknown storage kind: ' + storageConfig.kind)
}
}
async function getS3UploadParamsS3 (
ctx: MeasureContext,
workspaceId: WorkspaceId,
storageConfig: S3Config
): Promise<S3UploadParams> {
const endpoint = storageConfig.endpoint
const accessKey = storageConfig.accessKey
const secret = storageConfig.secretKey
const region = storageConfig.region ?? 'auto'
const bucket = getBucket(storageConfig, workspaceId)
const name = uuid()
const filepath = getDocumentKey(storageConfig, workspaceId, `${name}.mp4`)
return {
filepath,
endpoint,
accessKey,
region,
secret,
bucket
}
}
async function getS3UploadParamsDatalake (
ctx: MeasureContext,
workspaceId: WorkspaceId,
config: DatalakeConfig,
s3config: S3Config
): Promise<S3UploadParams> {
const client = createDatalakeClient(config)
const { bucket } = await client.getR2UploadParams(ctx, workspaceId)
const endpoint = s3config.endpoint
const accessKey = s3config.accessKey
const secret = s3config.secretKey
const region = s3config.region ?? 'auto'
const name = uuid()
const filepath = getDocumentKey(s3config, workspaceId, `${name}.mp4`)
return {
filepath,
endpoint,
accessKey,
region,
secret,
bucket
}
}
async function saveFileToS3 (
ctx: MeasureContext,
workspaceId: WorkspaceId,
config: S3Config,
filename: string
): Promise<Blob | undefined> {
const storageAdapter = new S3Service(config)
const prefix = rootPrefix(config, workspaceId)
const uuid = stripPrefix(prefix, filename)
return await storageAdapter.stat(ctx, workspaceId, uuid)
}
async function saveFileToDatalake (
ctx: MeasureContext,
workspaceId: WorkspaceId,
config: DatalakeConfig,
s3config: S3Config,
filename: string
): Promise<Blob | undefined> {
const client = createDatalakeClient(config)
const storageAdapter = new DatalakeService(config)
const prefix = rootPrefix(s3config, workspaceId)
const uuid = stripPrefix(prefix, filename)
await client.uploadFromR2(ctx, workspaceId, uuid, { filename: uuid })
return await storageAdapter.stat(ctx, workspaceId, uuid)
}
function getBucket (storageConfig: S3Config, workspaceId: WorkspaceId): string {
return storageConfig.rootBucket ?? (storageConfig.bucketPrefix ?? '') + toWorkspaceString(workspaceId)
}
function getBucketFolder (workspaceId: WorkspaceId): string {
return toWorkspaceString(workspaceId)
}
function getDocumentKey (storageConfig: any, workspace: WorkspaceId, name: string): string {
return storageConfig.rootBucket === undefined ? name : `${getBucketFolder(workspace)}/${name}`
}
function stripPrefix (prefix: string | undefined, key: string): string {
if (prefix !== undefined && key.startsWith(prefix)) {
return key.slice(prefix.length)
}
return key
}
function rootPrefix (storageConfig: S3Config, workspaceId: WorkspaceId): string | undefined {
return storageConfig.rootBucket !== undefined ? getBucketFolder(workspaceId) + '/' : undefined
}