-
Notifications
You must be signed in to change notification settings - Fork 808
/
Copy pathpush_to_talk.py
74 lines (55 loc) · 2.48 KB
/
push_to_talk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import logging
from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import Agent, AgentSession, JobContext, RoomIO, WorkerOptions, cli
from livekit.agents.llm import ChatContext, ChatMessage, StopResponse
from livekit.plugins import cartesia, deepgram, openai
logger = logging.getLogger("push-to-talk")
logger.setLevel(logging.INFO)
load_dotenv()
## This example demonstrates how to use the push-to-talk for multi-participant
## conversations with a voice agent
## It disables audio input by default, and only enables it when the client explicitly
## triggers the `start_turn` RPC method
class MyAgent(Agent):
def __init__(self) -> None:
super().__init__(
instructions="You are a helpful assistant.",
stt=deepgram.STT(),
llm=openai.LLM(model="gpt-4o-mini"),
tts=cartesia.TTS(),
# llm=openai.realtime.RealtimeModel(voice="alloy", turn_detection=None),
)
async def on_user_turn_completed(self, turn_ctx: ChatContext, new_message: ChatMessage) -> None:
# callback before generating a reply after user turn committed
if not new_message.text_content:
# for example, raise StopResponse to stop the agent from generating a reply
logger.info("ignore empty user turn")
raise StopResponse()
async def entrypoint(ctx: JobContext):
await ctx.connect()
session = AgentSession(turn_detection="manual")
room_io = RoomIO(session, room=ctx.room)
await room_io.start()
agent = MyAgent()
await session.start(agent=agent)
# disable input audio at the start
session.input.set_audio_enabled(False)
@ctx.room.local_participant.register_rpc_method("start_turn")
async def start_turn(data: rtc.RpcInvocationData):
session.interrupt()
session.clear_user_turn()
# listen to the caller if multi-user
room_io.set_participant(data.caller_identity)
session.input.set_audio_enabled(True)
@ctx.room.local_participant.register_rpc_method("end_turn")
async def end_turn(data: rtc.RpcInvocationData):
session.input.set_audio_enabled(False)
session.commit_user_turn()
@ctx.room.local_participant.register_rpc_method("cancel_turn")
async def cancel_turn(data: rtc.RpcInvocationData):
session.input.set_audio_enabled(False)
session.clear_user_turn()
logger.info("cancel turn")
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))