From 703f808f8020bfd943036d86c2fac73dca119118 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 28 Mar 2025 13:28:58 +0100 Subject: [PATCH 1/5] publish transcriptions additionally via text stream APIs --- agents/src/constants.ts | 4 ++++ agents/src/multimodal/multimodal_agent.ts | 18 ++++++++++++++++-- agents/src/pipeline/pipeline_agent.ts | 15 ++++++++++++++- 3 files changed, 34 insertions(+), 3 deletions(-) create mode 100644 agents/src/constants.ts diff --git a/agents/src/constants.ts b/agents/src/constants.ts new file mode 100644 index 00000000..e7c7332b --- /dev/null +++ b/agents/src/constants.ts @@ -0,0 +1,4 @@ +export const ATTRIBUTE_TRANSCRIPTION_TRACK_ID = 'lk.transcribed_track_id'; +export const ATTRIBUTE_TRANSCRIPTION_FINAL = 'lk.transcription_final'; +export const TOPIC_TRANSCRIPTION = 'lk.transcription'; +export const TOPIC_CHAT = 'lk.chat'; diff --git a/agents/src/multimodal/multimodal_agent.ts b/agents/src/multimodal/multimodal_agent.ts index 98dfe8b0..c3e92660 100644 --- a/agents/src/multimodal/multimodal_agent.ts +++ b/agents/src/multimodal/multimodal_agent.ts @@ -19,6 +19,9 @@ import { } from '@livekit/rtc-node'; import { EventEmitter } from 'node:events'; import { AudioByteStream } from '../audio.js'; +import { TOPIC_TRANSCRIPTION } from '../constants.js'; +import { ATTRIBUTE_TRANSCRIPTION_FINAL } from '../constants.js'; +import { ATTRIBUTE_TRANSCRIPTION_TRACK_ID } from '../constants.js'; import * as llm from '../llm/index.js'; import { log } from '../log.js'; import type { MultimodalLLMMetrics } from '../metrics/base.js'; @@ -467,13 +470,13 @@ export class MultimodalAgent extends EventEmitter { return this.#localTrackSid; } - #publishTranscription( + async #publishTranscription( participantIdentity: string, trackSid: string, text: string, isFinal: boolean, id: string, - ): void { + ): Promise { this.#logger.debug( `Publishing transcription ${participantIdentity} ${trackSid} ${text} ${isFinal} ${id}`, ); @@ -496,6 +499,17 @@ export class MultimodalAgent extends EventEmitter { }, ], }); + + const stream = await this.room.localParticipant.streamText({ + topic: TOPIC_TRANSCRIPTION, + senderIdentity: participantIdentity, + attributes: { + [ATTRIBUTE_TRANSCRIPTION_TRACK_ID]: this.#agentPublication!.sid!, + [ATTRIBUTE_TRANSCRIPTION_FINAL]: isFinal.toString(), + }, + }); + await stream.write(text); + await stream.close(); } #updateState() { diff --git a/agents/src/pipeline/pipeline_agent.ts b/agents/src/pipeline/pipeline_agent.ts index 28b34960..d43f031f 100644 --- a/agents/src/pipeline/pipeline_agent.ts +++ b/agents/src/pipeline/pipeline_agent.ts @@ -12,6 +12,9 @@ import { import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { randomUUID } from 'node:crypto'; import EventEmitter from 'node:events'; +import { TOPIC_TRANSCRIPTION } from '../constants.js'; +import { ATTRIBUTE_TRANSCRIPTION_TRACK_ID } from '../constants.js'; +import { ATTRIBUTE_TRANSCRIPTION_FINAL } from '../constants.js'; import type { CallableFunctionResult, FunctionCallInfo, @@ -886,13 +889,23 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter< source: string | LLMStream | AsyncIterable, ): SynthesisHandle { const synchronizer = new TextAudioSynchronizer(defaultTextSyncOptions); - synchronizer.on('textUpdated', (text) => { + // TODO: where possible we would want to use deltas instead of full text segments, esp for LLM streams over the streamText API + synchronizer.on('textUpdated', async (text) => { this.#agentTranscribedText = text.text; this.#room!.localParticipant!.publishTranscription({ participantIdentity: this.#room!.localParticipant!.identity, trackSid: this.#agentPublication!.sid!, segments: [text], }); + const stream = await this.#room!.localParticipant!.streamText({ + topic: TOPIC_TRANSCRIPTION, + attributes: { + [ATTRIBUTE_TRANSCRIPTION_TRACK_ID]: this.#agentPublication!.sid!, + [ATTRIBUTE_TRANSCRIPTION_FINAL]: text.final.toString(), + }, + }); + await stream.write(text.text); + await stream.close(); }); if (!this.#agentOutput) { From 693c29835761bbbabae71fb6e00e81c9685fbb67 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 28 Mar 2025 13:45:16 +0100 Subject: [PATCH 2/5] align publishTranscription signature --- agents/src/multimodal/multimodal_agent.ts | 24 +++-- agents/src/pipeline/pipeline_agent.ts | 101 ++++++++++++---------- 2 files changed, 72 insertions(+), 53 deletions(-) diff --git a/agents/src/multimodal/multimodal_agent.ts b/agents/src/multimodal/multimodal_agent.ts index c3e92660..2122ed88 100644 --- a/agents/src/multimodal/multimodal_agent.ts +++ b/agents/src/multimodal/multimodal_agent.ts @@ -249,8 +249,8 @@ export class MultimodalAgent extends EventEmitter { if (message.contentType === 'text') return; const synchronizer = new TextAudioSynchronizer(defaultTextSyncOptions); - synchronizer.on('textUpdated', (text) => { - this.#publishTranscription( + synchronizer.on('textUpdated', async (text) => { + await this.#publishTranscription( this.room!.localParticipant!.identity!, this.#getLocalTrackSid()!, text.text, @@ -300,25 +300,31 @@ export class MultimodalAgent extends EventEmitter { }); // eslint-disable-next-line @typescript-eslint/no-explicit-any - this.#session.on('input_speech_committed', (ev: any) => { + this.#session.on('input_speech_committed', async (ev: any) => { // openai.realtime.InputSpeechCommittedEvent const participantIdentity = this.linkedParticipant?.identity; const trackSid = this.subscribedTrack?.sid; if (participantIdentity && trackSid) { - this.#publishTranscription(participantIdentity, trackSid, '…', false, ev.itemId); + await this.#publishTranscription(participantIdentity, trackSid, '…', false, ev.itemId); } else { this.#logger.error('Participant or track not set'); } }); // eslint-disable-next-line @typescript-eslint/no-explicit-any - this.#session.on('input_speech_transcription_completed', (ev: any) => { + this.#session.on('input_speech_transcription_completed', async (ev: any) => { // openai.realtime.InputSpeechTranscriptionCompletedEvent const transcription = ev.transcript; const participantIdentity = this.linkedParticipant?.identity; const trackSid = this.subscribedTrack?.sid; if (participantIdentity && trackSid) { - this.#publishTranscription(participantIdentity, trackSid, transcription, true, ev.itemId); + await this.#publishTranscription( + participantIdentity, + trackSid, + transcription, + true, + ev.itemId, + ); } else { this.#logger.error('Participant or track not set'); } @@ -330,7 +336,7 @@ export class MultimodalAgent extends EventEmitter { this.#logger.child({ transcription }).debug('committed user speech'); }); - this.#session.on('input_speech_started', (ev: any) => { + this.#session.on('input_speech_started', async (ev: any) => { this.emit('user_started_speaking'); if (this.#playingHandle && !this.#playingHandle.done) { this.#playingHandle.interrupt(); @@ -347,7 +353,7 @@ export class MultimodalAgent extends EventEmitter { const participantIdentity = this.linkedParticipant?.identity; const trackSid = this.subscribedTrack?.sid; if (participantIdentity && trackSid) { - this.#publishTranscription(participantIdentity, trackSid, '…', false, ev.itemId); + await this.#publishTranscription(participantIdentity, trackSid, '…', false, ev.itemId); } }); @@ -504,7 +510,7 @@ export class MultimodalAgent extends EventEmitter { topic: TOPIC_TRANSCRIPTION, senderIdentity: participantIdentity, attributes: { - [ATTRIBUTE_TRANSCRIPTION_TRACK_ID]: this.#agentPublication!.sid!, + [ATTRIBUTE_TRANSCRIPTION_TRACK_ID]: trackSid, [ATTRIBUTE_TRANSCRIPTION_FINAL]: isFinal.toString(), }, }); diff --git a/agents/src/pipeline/pipeline_agent.ts b/agents/src/pipeline/pipeline_agent.ts index d43f031f..9c045336 100644 --- a/agents/src/pipeline/pipeline_agent.ts +++ b/agents/src/pipeline/pipeline_agent.ts @@ -508,28 +508,21 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter< this.emit(VPAEvent.USER_STOPPED_SPEAKING); this.#deferredValidation.onHumanEndOfSpeech(event); }); - this.#humanInput.on(HumanInputEvent.INTERIM_TRANSCRIPT, (event) => { + this.#humanInput.on(HumanInputEvent.INTERIM_TRANSCRIPT, async (event) => { if (!this.#transcriptionId) { this.#transcriptionId = randomUUID(); } this.#transcribedInterimText = event.alternatives![0].text; - this.#room!.localParticipant!.publishTranscription({ - participantIdentity: this.#humanInput!.participant.identity, - trackSid: this.#humanInput!.subscribedTrack!.sid!, - segments: [ - { - text: this.#transcribedInterimText, - id: this.#transcriptionId, - final: true, - startTime: BigInt(0), - endTime: BigInt(0), - language: '', - }, - ], - }); + await this.#publishTranscription( + this.#humanInput!.participant.identity, + this.#humanInput!.subscribedTrack!.sid!, + this.#transcribedInterimText, + false, + this.#transcriptionId, + ); }); - this.#humanInput.on(HumanInputEvent.FINAL_TRANSCRIPT, (event) => { + this.#humanInput.on(HumanInputEvent.FINAL_TRANSCRIPT, async (event) => { const newTranscript = event.alternatives![0].text; if (!newTranscript) return; @@ -540,20 +533,14 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter< this.#lastFinalTranscriptTime = Date.now(); this.transcribedText += (this.transcribedText ? ' ' : '') + newTranscript; - this.#room!.localParticipant!.publishTranscription({ - participantIdentity: this.#humanInput!.participant.identity, - trackSid: this.#humanInput!.subscribedTrack!.sid!, - segments: [ - { - text: this.transcribedText, - id: this.#transcriptionId, - final: true, - startTime: BigInt(0), - endTime: BigInt(0), - language: '', - }, - ], - }); + await this.#publishTranscription( + this.#humanInput!.participant.identity, + this.#humanInput!.subscribedTrack!.sid!, + this.transcribedText, + true, + this.#transcriptionId, + ); + this.#transcriptionId = undefined; if ( @@ -884,6 +871,39 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter< handle.setDone(); } + async #publishTranscription( + participantIdentity: string, + trackSid: string, + text: string, + isFinal: boolean, + id: string, + ) { + this.#room!.localParticipant!.publishTranscription({ + participantIdentity: participantIdentity, + trackSid: trackSid, + segments: [ + { + text: text, + final: isFinal, + id: id, + startTime: BigInt(0), + endTime: BigInt(0), + language: '', + }, + ], + }); + const stream = await this.#room!.localParticipant!.streamText({ + senderIdentity: participantIdentity, + topic: TOPIC_TRANSCRIPTION, + attributes: { + [ATTRIBUTE_TRANSCRIPTION_TRACK_ID]: trackSid, + [ATTRIBUTE_TRANSCRIPTION_FINAL]: isFinal.toString(), + }, + }); + await stream.write(text); + await stream.close(); + } + #synthesizeAgentSpeech( speechId: string, source: string | LLMStream | AsyncIterable, @@ -892,20 +912,13 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter< // TODO: where possible we would want to use deltas instead of full text segments, esp for LLM streams over the streamText API synchronizer.on('textUpdated', async (text) => { this.#agentTranscribedText = text.text; - this.#room!.localParticipant!.publishTranscription({ - participantIdentity: this.#room!.localParticipant!.identity, - trackSid: this.#agentPublication!.sid!, - segments: [text], - }); - const stream = await this.#room!.localParticipant!.streamText({ - topic: TOPIC_TRANSCRIPTION, - attributes: { - [ATTRIBUTE_TRANSCRIPTION_TRACK_ID]: this.#agentPublication!.sid!, - [ATTRIBUTE_TRANSCRIPTION_FINAL]: text.final.toString(), - }, - }); - await stream.write(text.text); - await stream.close(); + await this.#publishTranscription( + this.#room!.localParticipant!.identity!, + this.#agentPublication?.sid ?? '', + text.text, + text.final, + text.id, + ); }); if (!this.#agentOutput) { From 6c36cf7f8a063f07284dfe07006692cf2f3ae153 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 28 Mar 2025 13:46:55 +0100 Subject: [PATCH 3/5] cleanup --- agents/src/multimodal/multimodal_agent.ts | 8 +++++--- agents/src/pipeline/pipeline_agent.ts | 8 +++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/agents/src/multimodal/multimodal_agent.ts b/agents/src/multimodal/multimodal_agent.ts index 2122ed88..94bffc9e 100644 --- a/agents/src/multimodal/multimodal_agent.ts +++ b/agents/src/multimodal/multimodal_agent.ts @@ -19,9 +19,11 @@ import { } from '@livekit/rtc-node'; import { EventEmitter } from 'node:events'; import { AudioByteStream } from '../audio.js'; -import { TOPIC_TRANSCRIPTION } from '../constants.js'; -import { ATTRIBUTE_TRANSCRIPTION_FINAL } from '../constants.js'; -import { ATTRIBUTE_TRANSCRIPTION_TRACK_ID } from '../constants.js'; +import { + ATTRIBUTE_TRANSCRIPTION_FINAL, + ATTRIBUTE_TRANSCRIPTION_TRACK_ID, + TOPIC_TRANSCRIPTION, +} from '../constants.js'; import * as llm from '../llm/index.js'; import { log } from '../log.js'; import type { MultimodalLLMMetrics } from '../metrics/base.js'; diff --git a/agents/src/pipeline/pipeline_agent.ts b/agents/src/pipeline/pipeline_agent.ts index 9c045336..253b0ced 100644 --- a/agents/src/pipeline/pipeline_agent.ts +++ b/agents/src/pipeline/pipeline_agent.ts @@ -12,9 +12,11 @@ import { import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { randomUUID } from 'node:crypto'; import EventEmitter from 'node:events'; -import { TOPIC_TRANSCRIPTION } from '../constants.js'; -import { ATTRIBUTE_TRANSCRIPTION_TRACK_ID } from '../constants.js'; -import { ATTRIBUTE_TRANSCRIPTION_FINAL } from '../constants.js'; +import { + ATTRIBUTE_TRANSCRIPTION_FINAL, + ATTRIBUTE_TRANSCRIPTION_TRACK_ID, + TOPIC_TRANSCRIPTION, +} from '../constants.js'; import type { CallableFunctionResult, FunctionCallInfo, From dacf3280b8e2326de6f3ed6d501f18210286026f Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 28 Mar 2025 13:49:57 +0100 Subject: [PATCH 4/5] reuse --- agents/src/constants.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/agents/src/constants.ts b/agents/src/constants.ts index e7c7332b..66975036 100644 --- a/agents/src/constants.ts +++ b/agents/src/constants.ts @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 export const ATTRIBUTE_TRANSCRIPTION_TRACK_ID = 'lk.transcribed_track_id'; export const ATTRIBUTE_TRANSCRIPTION_FINAL = 'lk.transcription_final'; export const TOPIC_TRANSCRIPTION = 'lk.transcription'; From 1392f155d5be58648974e6f484355ea9a21f0230 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 29 Apr 2025 12:22:59 +0200 Subject: [PATCH 5/5] Create heavy-cats-unite.md --- .changeset/heavy-cats-unite.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/heavy-cats-unite.md diff --git a/.changeset/heavy-cats-unite.md b/.changeset/heavy-cats-unite.md new file mode 100644 index 00000000..afda602d --- /dev/null +++ b/.changeset/heavy-cats-unite.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": patch +--- + +Publish transcriptions additionally via text stream APIs