I am facing an issue related to concurrent speech generation in LiveKit voice agent, and I would like to understand why it happens and how LiveKit internally handles speech scheduling.
Below is the complete code that reproduces the issue
import asyncio
from dataclasses import dataclass
import logging
from dotenv import load_dotenv
from typing import Literal
from livekit.agents import JobContext, WorkerOptions, cli, RoomInputOptions, Agent, RunContext, function_tool, AgentTask, inference
from livekit.agents.voice import AgentSession
from livekit.plugins import silero, noise_cancellation
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from livekit.agents.utils.log import log_exceptions
# Load environment
load_dotenv(dotenv_path='/home/shataxi/practise/livekit-voice-agent/.env')
logger = logging.getLogger("sample-agent")
@dataclass
class ChildProfile:
age: int
gender: Literal["male", "female", "other"]
@dataclass
class CollectAgeGenderResult:
age: int
gender: Literal["male", "female", "other"]
task_status: Literal["completed", "cancelled"] | None
task_cancellation_reason: str | None
class CollectAgeGender(AgentTask[CollectAgeGenderResult]):
def __init__(
self,
chat_ctx=None,
user_query: str = "",
age: int | None = None,
gender: Literal["male", "female", "other"] | None = None,
):
logger.debug(
f"CollectAgeGender Task initialized with user query: {user_query} age: {age} gender: {gender}"
)
self.user_query = user_query
super().__init__(
instructions="""
## Child Enrollment Information
**Objective**
You are responsible to collect the age and gender of the child currently being discussed.
Understand the context of the conversation to understand which child is being discussed in the latest message and collect the missing information of that child.
If the information is already provided, you can directly call the `record_child_profile` tool.
**Required Details**
- **Age**: Integer
- **Gender**: `male`, `female`, or `other`
**Tool Usage**
- Call `record_child_profile` immediately when **both age and gender** (explicit or inferred) are known.
- If the user refuses to provide **age** or gender cannot be inferred or provided, call `cancel_task` and exit.
- If the user has already provided the age and gender, call `record_child_profile` with the provided values.
**Rules**
- Only ask for missing information.
- If the user provides information about multiple children, acknowledge but focus on the child in the conversation and inform the user.
**Information already provided**
{age_str}
{gender_str}
""",
chat_ctx=chat_ctx,
tts=inference.TTS(model="cartesia/sonic-3", voice="f31cc6a7-c1e8-4764-980c-60a361443dd1")
)
self.age = age
self.gender = gender
self.age_gender_result = CollectAgeGenderResult(
age=None, gender=None, task_status=None, task_cancellation_reason=None
)
@log_exceptions(logger=logger)
async def on_enter(self) -> None:
logger.info(f"Generating reply from {self.__class__.__name__}")
age_str = (
("Age: " + str(self.age)) if self.age is not None else "Age not provided"
)
gender_str = (
("Gender: " + self.gender)
if self.gender is not None
else "Gender not provided"
)
try:
response = await self.session.generate_reply(
instructions=self.instructions.format(
age_str=age_str, gender_str=gender_str
),
user_input=self.user_query,
)
except asyncio.CancelledError:
logger.warning("CollectAgeGender task interrupted during on_enter")
@function_tool()
@log_exceptions(logger=logger)
async def record_child_profile(
self, context: RunContext, age: int, gender: Literal["male", "female", "other"]
) -> None:
"""
Call this tool when you have successfully collected the child's age and gender.
"""
logger.debug(f"Record child profile called with age {age} and gender {gender}")
self.voice_handle = context.speech_handle
self.age_gender_result = CollectAgeGenderResult(
age=age,
gender=gender,
task_status="completed",
task_cancellation_reason=None,
)
logger.debug(
f"Record child profile completed with age {age} and gender {gender}"
)
self.complete(
[CollectAgeGenderResult(
age=age,
gender=gender,
task_status="completed",
task_cancellation_reason=None,
), self.voice_handle]
)
@function_tool
@log_exceptions(logger=logger)
async def cancel_task(self, context: RunContext, reason: str) -> None:
"""
Call this tool when you want to cancel the task.
"""
self.voice_handle = context.speech_handle
self.age_gender_result = CollectAgeGenderResult(
age=None,
gender=None,
task_status="cancelled",
task_cancellation_reason=reason,
)
logger.debug(f"Task cancelled with reason {reason}")
self.complete([
CollectAgeGenderResult(
age=None,
gender=None,
task_status="cancelled",
task_cancellation_reason=reason,
), self.voice_handle]
)
class FrontDeskAgent(Agent):
def __init__(self, job_context=None):
self.job_context = job_context
super().__init__(
instructions="""You are a customer service agent for a gaming club, you tell the programs to the customer
Use `get_programs` tool to find the programs for the customer."""
)
@function_tool
@log_exceptions(logger=logger)
async def get_programs(
self,
context: RunContext,
):
"""Use this tool when the user is asking for programs."""
await context.wait_for_playout()
history = self.chat_ctx.to_dict()["items"]
user_messages = [m["content"][0] for m in history if m.get("role") == "user"]
user_query = user_messages[-1]
logger.info(f"Get Program Information called with query: {user_query}")
async def fillers():
logger.info("Going to say !!")
logger.info("Generating Fillers !!")
await context.session.say("Hmm--mm, I am looking into the details, please wait for a second.")
logger.info("Completed Fillers !!")
asyncio.create_task(fillers())
logger.debug("Program Registration Intent, starting CollectAgeGender Task")
profile, voice_handle = await CollectAgeGender(
chat_ctx=self.chat_ctx, user_query=user_query
)
logger.debug(
"Program Registration Intent, CollectAgeGender Task completed with age {} and gender {}".format(
profile.age, profile.gender
)
)
if profile.task_status == "cancelled":
return (
"Task could not be completed for the following reason: {}".format(
profile.task_cancellation_reason
)
)
return "We have following programs: Giants, Hustlers, Avengers, Incredibles, Marvels, Star wars"
async def entrypoint(ctx: JobContext):
logger.info(f"Starting booking agent in room {ctx.room.name}")
# Connect to the room
await ctx.connect()
# Create the agent session with LiveKit inference gateway
session = AgentSession(
vad=silero.VAD.load(),
stt="deepgram/nova-2",
llm="openai/gpt-4o",
tts="cartesia/sonic-3:a167e0f3-df7e-4d52-a9c3-f949145efdab",
turn_detection=MultilingualModel(),
)
# Start the session with FrontDeskAgent
logger.info("Starting session with FrontDeskAgent")
await session.start(
agent=FrontDeskAgent(),
room=ctx.room,
room_input_options=RoomInputOptions(
noise_cancellation=noise_cancellation.BVC(),
),
)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
I have:
1.A FrontDeskAgent that responds to the user
2.A long-running AgentTask (CollectAgeGender)
3.A filler speech (e.g. “Hmm… I am looking into this”) that runs asynchronously to avoid silence during processing
The fillers are generated using asyncio.create_task, while the CollectAgeGender task starts immediately and generates speech in its on_enter() method.
At runtime, I frequently get this error:
RuntimeError: cannot schedule new speech, the speech scheduling is draining/pausing
And sometimes, instead of the above, I get a circular wait error from the same code.
RuntimeError: cannot call SpeechHandle.wait_for_playout() from inside the function tool get_programs that owns this SpeechHandle.
This creates a circular wait: the speech handle is waiting for the function tool to complete, while the function tool is simultaneously
waiting for the speech handle.
To wait for the assistant’s spoken response prior to running this tool, use RunContext.wait_for_playout() instead.
Conceptually, I expected the flow to be:
1.FrontDeskAgent starts a filler using session.say
2.While the filler is playing, CollectAgeGender task initializes
3.Once the filler completes, the task starts generating speech.
4.This could result in smooth voice interaction with minimal silence
However, this is not happening in practice.
I am doing two independent speech requests at nearly the same time.
One using context.session.say for fillers and other using session.generate_reply inside the task.
What confuses me is why the error type changes and why “speech draining” appears more frequently than “circular wait”
I want to understand the root cause
1.What does “speech draining / pausing” mean internally in LiveKit?
2.Why does scheduling speech from a filler and a task at nearly the same time cause conflicts?
3.How does LiveKit decide when speech is allowed vs blocked?
4.Why does this sometimes turn into a circular wait instead of a speech-draining error?
I do not want to statically generate fillers first and then run the task sequentially
I have reviewed the LiveKit examples related to long-running tools and status updates, but I still do not clearly understand the above two errors and why they appear inconsistently.
An explanation would be really helpful @CWilson @darryncampbell
Thanks