Merge remote-tracking branch 'origin/develop' into staging

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-12-16 21:17:25 +07:00
commit 77a8cf142c
No known key found for this signature in database
GPG Key ID: BD80F68D68D8F7F2
25 changed files with 1334 additions and 382 deletions

View File

@ -20,8 +20,10 @@ import core, {
type MeasureContext,
type Ref,
type TxCreateDoc,
type TxUpdateDoc,
type WorkspaceId,
DOMAIN_TX,
SortingOrder,
makeCollabYdocId,
makeDocCollabId
} from '@hcengineering/core'
@ -63,13 +65,18 @@ export async function restoreWikiContentMongo (
}
const correctCollabId = { objectClass: doc._class, objectId: doc._id, objectAttr: 'content' }
const wrongCollabId = { objectClass: doc._class, objectId: doc._id, objectAttr: 'description' }
const stat = storageAdapter.stat(ctx, workspaceId, makeCollabYdocId(wrongCollabId))
const wrongYdocId = await findWikiDocYdocName(ctx, db, workspaceId, doc._id)
if (wrongYdocId === undefined) {
console.log('current ydoc not found', doc._id)
continue
}
const stat = storageAdapter.stat(ctx, workspaceId, wrongYdocId)
if (stat === undefined) continue
const ydoc1 = await loadCollabYdoc(ctx, storageAdapter, workspaceId, correctCollabId)
const ydoc2 = await loadCollabYdoc(ctx, storageAdapter, workspaceId, wrongCollabId)
const ydoc2 = await loadCollabYdoc(ctx, storageAdapter, workspaceId, wrongYdocId)
if (ydoc1 !== undefined && ydoc1.share.has('content')) {
// There already is content, we should skip the document
@ -101,6 +108,81 @@ export async function restoreWikiContentMongo (
}
}
export async function findWikiDocYdocName (
ctx: MeasureContext,
db: Db,
workspaceId: WorkspaceId,
doc: Ref<Document>
): Promise<Ref<Blob> | undefined> {
const updateContentTx = await db.collection<TxUpdateDoc<Document & { content: string }>>(DOMAIN_TX).findOne(
{
_class: core.class.TxUpdateDoc,
objectId: doc,
objectClass: document.class.Document,
'operations.content': { $exists: true }
},
{
sort: { modifiedOn: SortingOrder.Descending }
}
)
if (updateContentTx?.operations?.content != null) {
const value = updateContentTx.operations.content as string
if (value.includes(':')) {
console.log('found update content tx', doc, value)
return value.split(':')[0] as Ref<Blob>
}
}
const updateDescriptionTx = await db.collection<TxUpdateDoc<Document & { description: string }>>(DOMAIN_TX).findOne(
{
_class: core.class.TxUpdateDoc,
objectId: doc,
objectClass: document.class.Document,
'operations.description': { $exists: true }
},
{
sort: { modifiedOn: SortingOrder.Descending }
}
)
if (updateDescriptionTx?.operations?.description != null) {
const value = updateDescriptionTx.operations.description
if (value.includes(':')) {
console.log('found update description tx', doc, value)
return value.split(':')[0] as Ref<Blob>
}
}
const createContentTx = await db.collection<TxCreateDoc<Document & { content: string }>>(DOMAIN_TX).findOne({
_class: core.class.TxCreateDoc,
objectId: doc,
objectClass: document.class.Document,
'attributes.content': { $exists: true }
})
if (createContentTx?.attributes?.content != null) {
const value = createContentTx.attributes.content
if (value.includes(':')) {
console.log('found create content tx', doc, value)
return value.split(':')[0] as Ref<Blob>
}
}
const createContentIdTx = await db.collection<TxCreateDoc<Document & { contentId: Ref<Blob> }>>(DOMAIN_TX).findOne({
_class: core.class.TxCreateDoc,
objectId: doc,
objectClass: document.class.Document,
'attributes.contentId': { $exists: true }
})
if (createContentIdTx?.attributes?.contentId != null) {
const value = createContentIdTx.attributes.contentId
console.log('found create contentId tx', doc, value)
return value
}
}
export interface RestoreControlledDocContentParams {
dryRun: boolean
}

View File

@ -54,9 +54,8 @@ export function clone (obj: any, as?: (doc: any, m: any) => any, needAs?: (value
if (type === 'Array') {
result[key] = clone(value, as, needAs)
} else if (type === 'Object') {
const m = needAs?.(value)
const valClone = clone(value, as, needAs)
result[key] = m !== undefined && as !== undefined ? as(valClone, m) : valClone
result[key] = valClone
} else if (type === 'Date') {
result[key] = new Date(value.getTime())
} else {

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import type { Class, MarkupBlobRef, Doc, Ref } from './classes'
import type { Blob, Class, Doc, MarkupBlobRef, Ref } from './classes'
/** @public */
export interface CollaborativeDoc {
@ -40,9 +40,9 @@ export function makeDocCollabId<T extends Doc, U extends keyof T> (
}
/** @public */
export function makeCollabYdocId (doc: CollaborativeDoc): MarkupBlobRef {
export function makeCollabYdocId (doc: CollaborativeDoc): Ref<Blob> {
const { objectId, objectAttr } = doc
return `${objectId}%${objectAttr}` as MarkupBlobRef
return `${objectId}%${objectAttr}` as Ref<Blob>
}
/** @public */

View File

@ -18,7 +18,7 @@ import type { AnyAttribute, Class, Classifier, Doc, Domain, Interface, Mixin, Ob
import { ClassifierKind } from './classes'
import { clone as deepClone } from './clone'
import core from './component'
import { _createMixinProxy, _mixinClass, _toDoc } from './proxy'
import { _createMixinProxy, _mixinClass, _toDoc, PROXY_MIXIN_CLASS_KEY } from './proxy'
import type { Tx, TxCreateDoc, TxMixin, TxRemoveDoc, TxUpdateDoc } from './tx'
import { TxProcessor } from './tx'
@ -53,7 +53,9 @@ export class Hierarchy {
}
as<D extends Doc, M extends D>(doc: D, mixin: Ref<Mixin<M>>): M {
return new Proxy(doc, this.getMixinProxyHandler(mixin)) as M
if ((doc as any)[PROXY_MIXIN_CLASS_KEY] === mixin) return doc as M
return new Proxy(Hierarchy.toDoc(doc), this.getMixinProxyHandler(mixin)) as M
}
asIf<D extends Doc, M extends D>(doc: D | undefined, mixin: Ref<Mixin<M>>): M | undefined {

View File

@ -3,7 +3,7 @@ import { Ref } from '.'
import type { Doc, Mixin } from './classes'
const PROXY_TARGET_KEY = '$___proxy_target'
const PROXY_MIXIN_CLASS_KEY = '$__mixin'
export const PROXY_MIXIN_CLASS_KEY = '$__mixin'
/**
* @internal

View File

@ -1,24 +1,12 @@
<script lang="ts">
import { Person, PersonAccount, getName } from '@hcengineering/contact'
import { personAccountByIdStore, personByIdStore } from '@hcengineering/contact-resources'
import { Account, IdMap, Ref } from '@hcengineering/core'
import { getClient } from '@hcengineering/presentation'
import core, { Account, Ref } from '@hcengineering/core'
import { ObjectPresenter } from '@hcengineering/view-resources'
export let reactionAccounts: Ref<Account>[]
const client = getClient()
function getAccName (acc: Ref<Account>, accounts: IdMap<PersonAccount>, employees: IdMap<Person>): string {
const account = accounts.get(acc as Ref<PersonAccount>)
if (account !== undefined) {
const emp = employees.get(account.person)
return emp ? getName(client.getHierarchy(), emp) : ''
}
return ''
}
</script>
{#each reactionAccounts as acc}
<div>
{getAccName(acc, $personAccountByIdStore, $personByIdStore)}
</div>
{/each}
<div class="m-2 flex-col flex-gap-2">
{#each reactionAccounts as acc}
<ObjectPresenter objectId={acc} _class={core.class.Account} disabled />
{/each}
</div>

View File

@ -106,9 +106,14 @@
const starredQuery = createQuery()
let isStarred = false
$: starredQuery.query(document.class.SavedDocument, { attachedTo: _id }, (res) => {
isStarred = res.length !== 0
})
$: starredQuery.query(
document.class.SavedDocument,
{ attachedTo: _id },
(res) => {
isStarred = res.length !== 0
},
{ limit: 1 }
)
async function createEmbedding (file: File): Promise<{ file: Ref<Blob>, type: string } | undefined> {
if (doc === undefined) {

View File

@ -34,9 +34,11 @@
products.class.ProductVersion,
{ space: objectId },
(res) => {
versions = res.length
versions = res.total
},
{
total: true,
limit: 1,
projection: { _id: 1 }
}
)

View File

@ -220,9 +220,10 @@
const spaceQuery = createQuery()
let vacancy: Vacancy | undefined
const me = getCurrentAccount()._id
$: if (_space) {
spaceQuery.query(recruit.class.Vacancy, { _id: _space }, (res) => {
spaceQuery.query(recruit.class.Vacancy, { _id: _space, members: me }, (res) => {
vacancy = res.shift()
})
}
@ -329,7 +330,11 @@
<div class="flex-grow">
<SpaceSelect
_class={recruit.class.Vacancy}
spaceQuery={{ archived: false, ...($selectedTypeStore !== undefined ? { type: $selectedTypeStore } : {}) }}
spaceQuery={{
archived: false,
members: me,
...($selectedTypeStore !== undefined ? { type: $selectedTypeStore } : {})
}}
spaceOptions={orgOptions}
readonly={preserveVacancy}
label={recruit.string.Vacancy}

View File

@ -30,9 +30,14 @@
let applications: number
const query = createQuery()
$: query.query(recruit.class.Applicant, { space: objectId }, (res) => {
applications = res.length
})
$: query.query(
recruit.class.Applicant,
{ space: objectId },
(res) => {
applications = res.total
},
{ total: true, limit: 1 }
)
const createApp = (ev: MouseEvent): void => {
showPopup(CreateApplication, { space: objectId, preserveVacancy: true }, ev.target as HTMLElement)

View File

@ -47,10 +47,12 @@
core.class.TypedSpace,
{ type: type._id },
(res) => {
spacesCount = res.length
spacesCount = res.total
loading = false
},
{
total: true,
limit: 1,
projection: { _id: 1 }
}
)

View File

@ -71,9 +71,11 @@
task.class.Task,
{ kind: taskType._id },
(res) => {
tasksCounter = res.length
tasksCounter = res.total
},
{
total: true,
limit: 1,
projection: {
_id: 1
}

View File

@ -29,7 +29,7 @@
testManagement.class.TestCase,
query,
(res) => {
testCases = res.length
testCases = res.total
},
{ total: true, limit: 1 }
)

View File

@ -19,6 +19,7 @@ import {
type Ref,
type WorkspaceId,
Markup,
MarkupBlobRef,
MeasureContext,
generateId,
makeCollabJsonId,
@ -35,9 +36,9 @@ export async function loadCollabYdoc (
ctx: MeasureContext,
storageAdapter: StorageAdapter,
workspace: WorkspaceId,
doc: CollaborativeDoc
doc: CollaborativeDoc | MarkupBlobRef
): Promise<YDoc | undefined> {
const blobId = makeCollabYdocId(doc)
const blobId = typeof doc === 'string' ? doc : makeCollabYdocId(doc)
const blob = await storageAdapter.stat(ctx, workspace, blobId)
if (blob === undefined) {
@ -61,10 +62,10 @@ export async function saveCollabYdoc (
ctx: MeasureContext,
storageAdapter: StorageAdapter,
workspace: WorkspaceId,
doc: CollaborativeDoc,
doc: CollaborativeDoc | MarkupBlobRef,
ydoc: YDoc
): Promise<Ref<Blob>> {
const blobId = makeCollabYdocId(doc)
const blobId = typeof doc === 'string' ? doc : makeCollabYdocId(doc)
const buffer = yDocToBuffer(ydoc)
await storageAdapter.put(ctx, workspace, blobId, buffer, 'application/ydoc', buffer.length)

View File

@ -16,7 +16,7 @@
import activity, { DocUpdateMessage } from '@hcengineering/activity'
import { loadCollabJson, loadCollabYdoc, saveCollabJson, saveCollabYdoc } from '@hcengineering/collaboration'
import { decodeDocumentId } from '@hcengineering/collaborator-client'
import core, { AttachedData, MeasureContext, TxOperations } from '@hcengineering/core'
import core, { AttachedData, MeasureContext, Ref, Space, TxOperations } from '@hcengineering/core'
import { StorageAdapter } from '@hcengineering/server-core'
import { markupToYDocNoSchema, areEqualMarkups } from '@hcengineering/text'
import { Doc as YDoc } from 'yjs'
@ -173,6 +173,8 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
await ctx.with('update', {}, () => client.diffUpdate(current, { [objectAttr]: blobId }))
await ctx.with('activity', {}, () => {
const space = hierarchy.isDerived(current._class, core.class.Space) ? (current._id as Ref<Space>) : current.space
const data: AttachedData<DocUpdateMessage> = {
objectId,
objectClass,
@ -189,7 +191,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
}
return client.addCollection(
activity.class.DocUpdateMessage,
current.space,
space,
current._id,
current._class,
'docUpdateMessages',

View File

@ -307,7 +307,9 @@ export class S3Service implements StorageAdapter {
version: result.VersionId ?? null
}
} catch (err: any) {
ctx.warn('no object found', { error: err, objectName, workspaceId: workspaceId.name })
if (err?.$metadata?.httpStatusCode !== 404) {
ctx.warn('no object found', { error: err, objectName, workspaceId: workspaceId.name })
}
}
}

View File

@ -19,6 +19,11 @@
"lint:fix": "eslint --fix src/**/*.ts",
"format": "prettier --write src/**/*.ts && pnpm lint:fix"
},
"pnpm": {
"overrides": {
"livekit-server-sdk": "2.8.1"
}
},
"devDependencies": {
"@types/node": "~20.11.16",
"@typescript-eslint/eslint-plugin": "^6.11.0",
@ -35,8 +40,9 @@
},
"dependencies": {
"@deepgram/sdk": "^3.9.0",
"@livekit/agents": "^0.4.6",
"@livekit/agents": "^0.5.1",
"@livekit/rtc-node": "^0.12.1",
"@livekit/agents-plugin-openai": "^0.7.1",
"dotenv": "^16.4.5"
}
}

File diff suppressed because it is too large Load Diff

View File

@ -17,9 +17,9 @@ import { cli, defineAgent, type JobContext, JobRequest, WorkerOptions } from '@l
import { fileURLToPath } from 'node:url'
import { RemoteParticipant, RemoteTrack, RemoteTrackPublication, RoomEvent, TrackKind } from '@livekit/rtc-node'
import { STT } from './deepgram/stt.js'
import { Metadata, TranscriptionStatus } from './type.js'
import { Metadata, TranscriptionStatus, Stt } from './type.js'
import config from './config.js'
import { getStt } from './utils.js'
function parseMetadata (metadata: string): Metadata {
try {
@ -70,7 +70,7 @@ const requestFunc = async (req: JobRequest): Promise<void> => {
await req.accept(identity.name, identity.identity)
}
function applyMetadata (data: string | undefined, stt: STT): void {
function applyMetadata (data: string | undefined, stt: Stt): void {
if (data == null || data === '') return
const metadata = parseMetadata(data)
@ -101,7 +101,13 @@ export default defineAgent({
return
}
const stt = new STT(roomName)
const stt = getStt(ctx.room)
if (stt === undefined) {
console.error('Transcription provider is not configured')
ctx.shutdown()
return
}
applyMetadata(ctx.room.metadata, stt)
@ -129,18 +135,6 @@ export default defineAgent({
}
)
ctx.room.on(RoomEvent.TrackMuted, (publication) => {
if (publication.kind === TrackKind.KIND_AUDIO) {
stt.mute(publication.sid)
}
})
ctx.room.on(RoomEvent.TrackUnmuted, (publication) => {
if (publication.kind === TrackKind.KIND_AUDIO) {
stt.unmute(publication.sid)
}
})
ctx.addShutdownCallback(async () => {
stt.close()
})

View File

@ -13,21 +13,25 @@
// limitations under the License.
//
import { SttProvider } from './type.'
interface Config {
TranscriptDelay: number
DeepgramApiKey: string
PlatformUrl: string
PlatformToken: string
OpenaiApiKey: string
OpenaiBaseUrl: string
SttProvider: SttProvider
}
const parseNumber = (str: string | undefined): number | undefined => (str !== undefined ? Number(str) : undefined)
const config: Config = (() => {
const params: Partial<Config> = {
DeepgramApiKey: process.env.DEEPGRAM_API_KEY,
TranscriptDelay: parseNumber(process.env.TRANSCRIPT_DELAY) ?? 3000,
PlatformUrl: process.env.PLATFORM_URL,
PlatformToken: process.env.PLATFORM_TOKEN
PlatformToken: process.env.PLATFORM_TOKEN,
OpenaiApiKey: process.env.OPENAI_API_KEY,
OpenaiBaseUrl: process.env.OPENAI_BASE_URL ?? '',
SttProvider: (process.env.STT_PROVIDER as SttProvider) ?? 'deepgram'
}
const missingEnv = (Object.keys(params) as Array<keyof Config>).filter((key) => params[key] === undefined)

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import { AudioStream, RemoteParticipant, RemoteTrack, RemoteTrackPublication } from '@livekit/rtc-node'
import { AudioStream, RemoteParticipant, RemoteTrack, RemoteTrackPublication, Room } from '@livekit/rtc-node'
import {
createClient,
DeepgramClient,
@ -24,6 +24,7 @@ import {
SOCKET_STATES
} from '@deepgram/sdk'
import { Stt } from '../type'
import config from '../config.js'
const KEEP_ALIVE_INTERVAL = 10 * 1000
@ -41,7 +42,7 @@ const dgSchema: LiveSchema = {
language: 'en'
}
export class STT {
export class STT implements Stt {
private readonly deepgram: DeepgramClient
private isInProgress = false
@ -49,7 +50,6 @@ export class STT {
private readonly trackBySid = new Map<string, RemoteTrack>()
private readonly streamBySid = new Map<string, AudioStream>()
private readonly mutedTracks = new Set<string>()
private readonly participantBySid = new Map<string, RemoteParticipant>()
private readonly dgConnectionBySid = new Map<string, ListenLiveClient>()
@ -57,7 +57,7 @@ export class STT {
private transcriptionCount = 0
constructor (readonly name: string) {
constructor (readonly room: Room) {
this.deepgram = createClient(config.DeepgramApiKey)
}
@ -72,7 +72,7 @@ export class STT {
start (): void {
if (this.isInProgress) return
console.log('Starting transcription', this.name)
console.log('Starting transcription', this.room.name)
this.isInProgress = true
for (const sid of this.trackBySid.keys()) {
@ -82,28 +82,17 @@ export class STT {
stop (): void {
if (!this.isInProgress) return
console.log('Stopping transcription', this.name)
console.log('Stopping transcription', this.room.name)
this.isInProgress = false
for (const sid of this.trackBySid.keys()) {
this.stopDeepgram(sid)
}
}
mute (sid: string): void {
this.mutedTracks.add(sid)
}
unmute (sid: string): void {
this.mutedTracks.delete(sid)
}
subscribe (track: RemoteTrack, publication: RemoteTrackPublication, participant: RemoteParticipant): void {
if (this.trackBySid.has(publication.sid)) return
this.trackBySid.set(publication.sid, track)
this.participantBySid.set(publication.sid, participant)
if (track.muted) {
this.mutedTracks.add(publication.sid)
}
if (this.isInProgress) {
this.processTrack(publication.sid)
}
@ -112,7 +101,6 @@ export class STT {
unsubscribe (_: RemoteTrack | undefined, publication: RemoteTrackPublication, participant: RemoteParticipant): void {
this.trackBySid.delete(publication.sid)
this.participantBySid.delete(participant.sid)
this.mutedTracks.delete(publication.sid)
this.stopDeepgram(publication.sid)
}
@ -150,7 +138,7 @@ export class STT {
sample_rate: stream.sampleRate,
language: this.language ?? 'en'
})
console.log('Starting deepgram for track', this.name, sid)
console.log('Starting deepgram for track', this.room.name, sid)
const interval = setInterval(() => {
dgConnection.keepAlive()
@ -192,7 +180,6 @@ export class STT {
async streamToDeepgram (sid: string, stream: AudioStream): Promise<void> {
for await (const frame of stream) {
if (!this.isInProgress) continue
if (this.mutedTracks.has(sid)) continue
const dgConnection = this.dgConnectionBySid.get(sid)
if (dgConnection === undefined) {
stream.close()
@ -208,13 +195,13 @@ export class STT {
const request = {
transcript,
participant: this.participantBySid.get(sid)?.identity,
roomName: this.name
roomName: this.room.name
}
this.transcriptionCount++
if (this.transcriptionCount === 1 || this.transcriptionCount % 50 === 0) {
console.log('Sending transcript', this.name, this.transcriptionCount)
console.log('Sending transcript', this.room.name, this.transcriptionCount)
}
try {

View File

@ -0,0 +1,154 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { RemoteParticipant, RemoteTrack, RemoteTrackPublication, Room } from '@livekit/rtc-node'
import * as openai from '@livekit/agents-plugin-openai'
import { multimodal } from '@livekit/agents'
import { Stt } from '../type'
import config from '../config.js'
export class STT implements Stt {
private isInProgress = false
private readonly trackBySid = new Map<string, RemoteTrack>()
private readonly participantBySid = new Map<string, RemoteParticipant>()
private readonly connectionBySid = new Map<string, openai.realtime.RealtimeSession>()
private transcriptionCount = 0
private readonly model = new openai.realtime.RealtimeModel({
modalities: ['text'],
instructions:
'You are an expert transcription assistant. Your task is to listen to audio content and transcribe it into text with high accuracy. Do not summarize or skip any content; transcribe everything exactly as spoken.',
model: 'gpt-4o-realtime-preview',
apiKey: config.OpenaiApiKey,
...(config.OpenaiBaseUrl === '' ? {} : { baseUrl: config.OpenaiBaseUrl })
})
constructor (readonly room: Room) {}
updateLanguage (language: string): void {
/* noop */
}
async start (): Promise<void> {
if (this.isInProgress) return
console.log('Starting transcription', this.room.name)
this.isInProgress = true
for (const sid of this.trackBySid.keys()) {
await this.subscribeOpenai(sid)
}
}
stop (): void {
if (!this.isInProgress) return
console.log('Stopping transcription', this.room.name)
this.isInProgress = false
for (const sid of this.trackBySid.keys()) {
this.unsubscribeOpenai(sid)
}
}
async subscribe (
track: RemoteTrack,
publication: RemoteTrackPublication,
participant: RemoteParticipant
): Promise<void> {
if (this.trackBySid.has(publication.sid)) return
this.trackBySid.set(publication.sid, track)
this.participantBySid.set(publication.sid, participant)
if (this.isInProgress) {
await this.subscribeOpenai(publication.sid)
}
}
unsubscribe (_: RemoteTrack | undefined, publication: RemoteTrackPublication, participant: RemoteParticipant): void {
this.trackBySid.delete(publication.sid)
this.participantBySid.delete(participant.sid)
this.unsubscribeOpenai(publication.sid)
}
unsubscribeOpenai (sid: string): void {
const connection = this.connectionBySid.get(sid)
if (connection !== undefined) {
connection.removeAllListeners()
void connection.close()
}
this.connectionBySid.delete(sid)
}
async subscribeOpenai (sid: string): Promise<void> {
const track = this.trackBySid.get(sid)
if (track === undefined) return
if (this.connectionBySid.has(sid)) return
const participant = this.participantBySid.get(sid)
if (participant === undefined) return
const agent = new multimodal.MultimodalAgent({
model: this.model
})
const session = await agent
.start(this.room, participant)
.then((session) => session as openai.realtime.RealtimeSession)
session.on('input_speech_transcription_completed', (res) => {
if (res.transcript !== '') {
void this.sendToPlatform(res.transcript, sid)
}
})
this.connectionBySid.set(sid, session)
}
async sendToPlatform (transcript: string, sid: string): Promise<void> {
const request = {
transcript,
participant: this.participantBySid.get(sid)?.identity,
roomName: this.room.name
}
this.transcriptionCount++
if (this.transcriptionCount === 1 || this.transcriptionCount % 50 === 0) {
console.log('Sending transcript', this.room.name, this.transcriptionCount)
}
try {
await fetch(`${config.PlatformUrl}/love/transcript`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: 'Bearer ' + config.PlatformToken
},
body: JSON.stringify(request)
})
} catch (e) {
console.error('Error sending to platform', e)
}
}
close (): void {
this.trackBySid.clear()
this.participantBySid.clear()
for (const sid of this.connectionBySid.keys()) {
this.unsubscribeOpenai(sid)
}
}
}

View File

@ -13,6 +13,8 @@
// limitations under the License.
//
import { RemoteParticipant, RemoteTrack, RemoteTrackPublication } from '@livekit/rtc-node'
export enum TranscriptionStatus {
Idle = 'idle',
InProgress = 'inProgress',
@ -23,3 +25,14 @@ export interface Metadata {
transcription?: TranscriptionStatus
language?: string
}
export type SttProvider = 'openai' | 'deepgram'
export interface Stt {
stop: () => void
start: () => void
subscribe: (track: RemoteTrack, publication: RemoteTrackPublication, participant: RemoteParticipant) => void
unsubscribe: (track: RemoteTrack | undefined, publication: RemoteTrackPublication, participant: RemoteParticipant) => void
updateLanguage: (language: string) => void
close: () => void
}

View File

@ -0,0 +1,18 @@
import { Room } from '@livekit/rtc-node'
import * as dg from './deepgram/stt.js'
import * as openai from './openai/stt.js'
import config from './config.js'
import { Stt } from './type.js'
export function getStt (room: Room): Stt | undefined {
const provider = config.SttProvider
switch (provider) {
case 'deepgram':
return new dg.STT(room)
case 'openai':
return new openai.STT(room)
default: return undefined
}
}

View File

@ -0,0 +1,95 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the 'License');
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an 'AS IS' BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { type Env } from './env'
export async function withMetrics<T> (name: string, fn: (ctx: MetricsContext) => Promise<T>): Promise<T> {
const ctx = new MetricsContext()
const start = performance.now()
try {
return await fn(ctx)
} finally {
const total = performance.now() - start
const ops = ctx.metrics
const message = `${name} total=${total} ` + ctx.toString()
console.log({ message, total, ops })
}
}
export interface MetricsData {
op: string
time: number
}
export class MetricsContext {
metrics: Array<MetricsData> = []
debug (...data: any[]): void {
console.debug(...data)
}
log (...data: any[]): void {
console.log(...data)
}
error (...data: any[]): void {
console.error(...data)
}
async with<T>(op: string, fn: () => Promise<T>): Promise<T> {
const start = performance.now()
try {
return await fn()
} finally {
const time = performance.now() - start
this.metrics.push({ op, time })
}
}
withSync<T>(op: string, fn: () => T): T {
const start = performance.now()
try {
return fn()
} finally {
const time = performance.now() - start
this.metrics.push({ op, time })
}
}
toString (): string {
return this.metrics.map((p) => `${p.op}=${p.time}`).join(' ')
}
}
export class LoggedDatalake {
constructor (
private readonly datalake: Env['DATALAKE'],
private readonly ctx: MetricsContext
) {}
async getBlob (workspace: string, name: string): Promise<ArrayBuffer> {
return await this.ctx.with('datalake.getBlob', () => {
return this.datalake.getBlob(workspace, name)
})
}
async putBlob (workspace: string, name: string, data: ArrayBuffer | Blob | string, type: string): Promise<void> {
await this.ctx.with('datalake.putBlob', () => {
return this.datalake.putBlob(workspace, name, data, type)
})
}
}