Add openai transcription provider ()

Signed-off-by: Kristina Fefelova <kristin.fefelova@gmail.com>
This commit is contained in:
Kristina 2024-12-16 15:36:08 +04:00 committed by GitHub
parent 377fb908ba
commit 102066ac19
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1093 additions and 333 deletions

View File

@ -19,6 +19,11 @@
"lint:fix": "eslint --fix src/**/*.ts",
"format": "prettier --write src/**/*.ts && pnpm lint:fix"
},
"pnpm": {
"overrides": {
"livekit-server-sdk": "2.8.1"
}
},
"devDependencies": {
"@types/node": "~20.11.16",
"@typescript-eslint/eslint-plugin": "^6.11.0",
@ -35,8 +40,9 @@
},
"dependencies": {
"@deepgram/sdk": "^3.9.0",
"@livekit/agents": "^0.4.6",
"@livekit/agents": "^0.5.1",
"@livekit/rtc-node": "^0.12.1",
"@livekit/agents-plugin-openai": "^0.7.1",
"dotenv": "^16.4.5"
}
}

File diff suppressed because it is too large Load Diff

View File

@ -17,9 +17,9 @@ import { cli, defineAgent, type JobContext, JobRequest, WorkerOptions } from '@l
import { fileURLToPath } from 'node:url'
import { RemoteParticipant, RemoteTrack, RemoteTrackPublication, RoomEvent, TrackKind } from '@livekit/rtc-node'
import { STT } from './deepgram/stt.js'
import { Metadata, TranscriptionStatus } from './type.js'
import { Metadata, TranscriptionStatus, Stt } from './type.js'
import config from './config.js'
import { getStt } from './utils.js'
function parseMetadata (metadata: string): Metadata {
try {
@ -70,7 +70,7 @@ const requestFunc = async (req: JobRequest): Promise<void> => {
await req.accept(identity.name, identity.identity)
}
function applyMetadata (data: string | undefined, stt: STT): void {
function applyMetadata (data: string | undefined, stt: Stt): void {
if (data == null || data === '') return
const metadata = parseMetadata(data)
@ -101,7 +101,13 @@ export default defineAgent({
return
}
const stt = new STT(roomName)
const stt = getStt(ctx.room)
if (stt === undefined) {
console.error('Transcription provider is not configured')
ctx.shutdown()
return
}
applyMetadata(ctx.room.metadata, stt)
@ -129,18 +135,6 @@ export default defineAgent({
}
)
ctx.room.on(RoomEvent.TrackMuted, (publication) => {
if (publication.kind === TrackKind.KIND_AUDIO) {
stt.mute(publication.sid)
}
})
ctx.room.on(RoomEvent.TrackUnmuted, (publication) => {
if (publication.kind === TrackKind.KIND_AUDIO) {
stt.unmute(publication.sid)
}
})
ctx.addShutdownCallback(async () => {
stt.close()
})

View File

@ -13,21 +13,25 @@
// limitations under the License.
//
import { SttProvider } from './type.'
interface Config {
TranscriptDelay: number
DeepgramApiKey: string
PlatformUrl: string
PlatformToken: string
OpenaiApiKey: string
OpenaiBaseUrl: string
SttProvider: SttProvider
}
const parseNumber = (str: string | undefined): number | undefined => (str !== undefined ? Number(str) : undefined)
const config: Config = (() => {
const params: Partial<Config> = {
DeepgramApiKey: process.env.DEEPGRAM_API_KEY,
TranscriptDelay: parseNumber(process.env.TRANSCRIPT_DELAY) ?? 3000,
PlatformUrl: process.env.PLATFORM_URL,
PlatformToken: process.env.PLATFORM_TOKEN
PlatformToken: process.env.PLATFORM_TOKEN,
OpenaiApiKey: process.env.OPENAI_API_KEY,
OpenaiBaseUrl: process.env.OPENAI_BASE_URL ?? '',
SttProvider: (process.env.STT_PROVIDER as SttProvider) ?? 'deepgram'
}
const missingEnv = (Object.keys(params) as Array<keyof Config>).filter((key) => params[key] === undefined)

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import { AudioStream, RemoteParticipant, RemoteTrack, RemoteTrackPublication } from '@livekit/rtc-node'
import { AudioStream, RemoteParticipant, RemoteTrack, RemoteTrackPublication, Room } from '@livekit/rtc-node'
import {
createClient,
DeepgramClient,
@ -24,6 +24,7 @@ import {
SOCKET_STATES
} from '@deepgram/sdk'
import { Stt } from '../type'
import config from '../config.js'
const KEEP_ALIVE_INTERVAL = 10 * 1000
@ -41,7 +42,7 @@ const dgSchema: LiveSchema = {
language: 'en'
}
export class STT {
export class STT implements Stt {
private readonly deepgram: DeepgramClient
private isInProgress = false
@ -49,7 +50,6 @@ export class STT {
private readonly trackBySid = new Map<string, RemoteTrack>()
private readonly streamBySid = new Map<string, AudioStream>()
private readonly mutedTracks = new Set<string>()
private readonly participantBySid = new Map<string, RemoteParticipant>()
private readonly dgConnectionBySid = new Map<string, ListenLiveClient>()
@ -57,7 +57,7 @@ export class STT {
private transcriptionCount = 0
constructor (readonly name: string) {
constructor (readonly room: Room) {
this.deepgram = createClient(config.DeepgramApiKey)
}
@ -72,7 +72,7 @@ export class STT {
start (): void {
if (this.isInProgress) return
console.log('Starting transcription', this.name)
console.log('Starting transcription', this.room.name)
this.isInProgress = true
for (const sid of this.trackBySid.keys()) {
@ -82,28 +82,17 @@ export class STT {
stop (): void {
if (!this.isInProgress) return
console.log('Stopping transcription', this.name)
console.log('Stopping transcription', this.room.name)
this.isInProgress = false
for (const sid of this.trackBySid.keys()) {
this.stopDeepgram(sid)
}
}
mute (sid: string): void {
this.mutedTracks.add(sid)
}
unmute (sid: string): void {
this.mutedTracks.delete(sid)
}
subscribe (track: RemoteTrack, publication: RemoteTrackPublication, participant: RemoteParticipant): void {
if (this.trackBySid.has(publication.sid)) return
this.trackBySid.set(publication.sid, track)
this.participantBySid.set(publication.sid, participant)
if (track.muted) {
this.mutedTracks.add(publication.sid)
}
if (this.isInProgress) {
this.processTrack(publication.sid)
}
@ -112,7 +101,6 @@ export class STT {
unsubscribe (_: RemoteTrack | undefined, publication: RemoteTrackPublication, participant: RemoteParticipant): void {
this.trackBySid.delete(publication.sid)
this.participantBySid.delete(participant.sid)
this.mutedTracks.delete(publication.sid)
this.stopDeepgram(publication.sid)
}
@ -150,7 +138,7 @@ export class STT {
sample_rate: stream.sampleRate,
language: this.language ?? 'en'
})
console.log('Starting deepgram for track', this.name, sid)
console.log('Starting deepgram for track', this.room.name, sid)
const interval = setInterval(() => {
dgConnection.keepAlive()
@ -192,7 +180,6 @@ export class STT {
async streamToDeepgram (sid: string, stream: AudioStream): Promise<void> {
for await (const frame of stream) {
if (!this.isInProgress) continue
if (this.mutedTracks.has(sid)) continue
const dgConnection = this.dgConnectionBySid.get(sid)
if (dgConnection === undefined) {
stream.close()
@ -208,13 +195,13 @@ export class STT {
const request = {
transcript,
participant: this.participantBySid.get(sid)?.identity,
roomName: this.name
roomName: this.room.name
}
this.transcriptionCount++
if (this.transcriptionCount === 1 || this.transcriptionCount % 50 === 0) {
console.log('Sending transcript', this.name, this.transcriptionCount)
console.log('Sending transcript', this.room.name, this.transcriptionCount)
}
try {

View File

@ -0,0 +1,154 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { RemoteParticipant, RemoteTrack, RemoteTrackPublication, Room } from '@livekit/rtc-node'
import * as openai from '@livekit/agents-plugin-openai'
import { multimodal } from '@livekit/agents'
import { Stt } from '../type'
import config from '../config.js'
export class STT implements Stt {
private isInProgress = false
private readonly trackBySid = new Map<string, RemoteTrack>()
private readonly participantBySid = new Map<string, RemoteParticipant>()
private readonly connectionBySid = new Map<string, openai.realtime.RealtimeSession>()
private transcriptionCount = 0
private readonly model = new openai.realtime.RealtimeModel({
modalities: ['text'],
instructions:
'You are an expert transcription assistant. Your task is to listen to audio content and transcribe it into text with high accuracy. Do not summarize or skip any content; transcribe everything exactly as spoken.',
model: 'gpt-4o-realtime-preview',
apiKey: config.OpenaiApiKey,
...(config.OpenaiBaseUrl === '' ? {} : { baseUrl: config.OpenaiBaseUrl })
})
constructor (readonly room: Room) {}
updateLanguage (language: string): void {
/* noop */
}
async start (): Promise<void> {
if (this.isInProgress) return
console.log('Starting transcription', this.room.name)
this.isInProgress = true
for (const sid of this.trackBySid.keys()) {
await this.subscribeOpenai(sid)
}
}
stop (): void {
if (!this.isInProgress) return
console.log('Stopping transcription', this.room.name)
this.isInProgress = false
for (const sid of this.trackBySid.keys()) {
this.unsubscribeOpenai(sid)
}
}
async subscribe (
track: RemoteTrack,
publication: RemoteTrackPublication,
participant: RemoteParticipant
): Promise<void> {
if (this.trackBySid.has(publication.sid)) return
this.trackBySid.set(publication.sid, track)
this.participantBySid.set(publication.sid, participant)
if (this.isInProgress) {
await this.subscribeOpenai(publication.sid)
}
}
unsubscribe (_: RemoteTrack | undefined, publication: RemoteTrackPublication, participant: RemoteParticipant): void {
this.trackBySid.delete(publication.sid)
this.participantBySid.delete(participant.sid)
this.unsubscribeOpenai(publication.sid)
}
unsubscribeOpenai (sid: string): void {
const connection = this.connectionBySid.get(sid)
if (connection !== undefined) {
connection.removeAllListeners()
void connection.close()
}
this.connectionBySid.delete(sid)
}
async subscribeOpenai (sid: string): Promise<void> {
const track = this.trackBySid.get(sid)
if (track === undefined) return
if (this.connectionBySid.has(sid)) return
const participant = this.participantBySid.get(sid)
if (participant === undefined) return
const agent = new multimodal.MultimodalAgent({
model: this.model
})
const session = await agent
.start(this.room, participant)
.then((session) => session as openai.realtime.RealtimeSession)
session.on('input_speech_transcription_completed', (res) => {
if (res.transcript !== '') {
void this.sendToPlatform(res.transcript, sid)
}
})
this.connectionBySid.set(sid, session)
}
async sendToPlatform (transcript: string, sid: string): Promise<void> {
const request = {
transcript,
participant: this.participantBySid.get(sid)?.identity,
roomName: this.room.name
}
this.transcriptionCount++
if (this.transcriptionCount === 1 || this.transcriptionCount % 50 === 0) {
console.log('Sending transcript', this.room.name, this.transcriptionCount)
}
try {
await fetch(`${config.PlatformUrl}/love/transcript`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: 'Bearer ' + config.PlatformToken
},
body: JSON.stringify(request)
})
} catch (e) {
console.error('Error sending to platform', e)
}
}
close (): void {
this.trackBySid.clear()
this.participantBySid.clear()
for (const sid of this.connectionBySid.keys()) {
this.unsubscribeOpenai(sid)
}
}
}

View File

@ -13,6 +13,8 @@
// limitations under the License.
//
import { RemoteParticipant, RemoteTrack, RemoteTrackPublication } from '@livekit/rtc-node'
export enum TranscriptionStatus {
Idle = 'idle',
InProgress = 'inProgress',
@ -23,3 +25,14 @@ export interface Metadata {
transcription?: TranscriptionStatus
language?: string
}
export type SttProvider = 'openai' | 'deepgram'
export interface Stt {
stop: () => void
start: () => void
subscribe: (track: RemoteTrack, publication: RemoteTrackPublication, participant: RemoteParticipant) => void
unsubscribe: (track: RemoteTrack | undefined, publication: RemoteTrackPublication, participant: RemoteParticipant) => void
updateLanguage: (language: string) => void
close: () => void
}

View File

@ -0,0 +1,18 @@
import { Room } from '@livekit/rtc-node'
import * as dg from './deepgram/stt.js'
import * as openai from './openai/stt.js'
import config from './config.js'
import { Stt } from './type.js'
export function getStt (room: Room): Stt | undefined {
const provider = config.SttProvider
switch (provider) {
case 'deepgram':
return new dg.STT(room)
case 'openai':
return new openai.STT(room)
default: return undefined
}
}