Hi LiveKit Team,
RoomID : RM_zaCJyUiW8PvA
Room : RM_DknoFff2STzj
I am experiencing an urgent issue where the on_session_end callback registered via the @server.rtc_session decorator is not being called when a room ends or the agent disconnects.
class InterviewAgent(Agent):
def __init__(self, instructions: str, ctx: JobContext):
super().__init__(
instructions=instructions,
)
self.end_reason = "customer-ended-call"
self.ctx = ctx
async def on_session_end(ctx: JobContext) -> None:
try:
report = ctx.make_session_report()
report_dict = report.to_dict()
ended_reason = "customer-ended-call"
# Check if we stored a specific reason in the agent session
if hasattr(ctx.agent, "end_reason"):
ended_reason = ctx.agent.end_reason
report_dict["ended_reason"] = ended_reason
meta = ctx.job.metadata or "{}"
payload = json.loads(meta)
environment = payload.get("environment", "")
webhook_url = payload.get("webhookURL", "")
report_json_string = json.dumps(report_dict, default=str)
json_data = json.loads(report_json_string)
if environment != "local":
logger.info(report_json_string)
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{webhook_url}/ai-interview/livekit/session/end",
json=json_data
)
logger.info(f"Session report for {ctx.room.name} sended to {webhook_url}")
except Exception as e:
logger.error(f"Error sending session report to webhook: {e}")
@server.rtc_session(agent_name="interview-agent", on_session_end=on_session_end)
async def entry(ctx: JobContext):
logger.info("\n +++++++. entry +++++++++")
# --- 1. SETUP LOG INTERCEPTOR ---
agent_logger = logging.getLogger("livekit.agents")
gpt_model = "gpt-5"
preemptive_generation=False
min_interruption_duration=1.2
min_endpointing_delay=1.4
max_endpointing_delay=7.0
false_interruption_timeout=1.5
user_away_timeout=20
try:
payload = json.loads(meta)
# logger.info(f"\n +++++++. {payload} +++++++++ \n")
agent_config = payload.get("agentConfig", {})
..... other code
except Exception as e:
logger.error(f"\n +++++++. Error parsing metadata: {e} +++++++++ \n")
instructions = meta
@llm.function_tool(description=end_interview_instruction)
async def end_interview():
logger.info("\n 🛑 TOOL CALLED: end_interview")
ctx.agent.end_reason = "assistant-said-end-call-phrase"
await session.say("Thank you for your time. Goodbye!", allow_interruptions=False)
# await asyncio.sleep(4)
await ctx.room.disconnect()
session = AgentSession(
tools=[end_interview],
stt=stt_plugin.FallbackAdapter([
stt,
assemblyai.STT(model="universal-streaming", api_key=ASSEMBLYAI_API_KEY)
]),
# llm="openai/gpt-4o",
llm = llm.FallbackAdapter(llm=[
openai.LLM(
model=gpt_model,
api_key=OPENAI_API_KEY
),
openai.LLM(
model="gpt-4o"
),
openai.LLM(
model="gpt-4.1"
)
],
retry_on_chunk_sent = True
),
tts=tts_plugin.FallbackAdapter([
tts,
inworld.TTS(voice="Ashley", model="inworld-tts-1", api_key=INWORLD_API_KEY)
]),
vad=silero.VAD.load(),
turn_detection=MultilingualModel(),
preemptive_generation=preemptive_generation,
allow_interruptions=True,
min_interruption_duration=min_interruption_duration, # Minimum speech length (s) to register as an interruption
min_endpointing_delay=min_endpointing_delay, # Minimum time-in-seconds since the last detected speech before the agent declares the user’s turn complete.
max_endpointing_delay=max_endpointing_delay, # Maximum time-in-seconds the agent will wait before terminating the turn
false_interruption_timeout=false_interruption_timeout, #
user_away_timeout=user_away_timeout,
use_tts_aligned_transcript=True
)
# --- 3. ADD THE SPECIFIC ERROR EVENT HANDLER ---
@session.on("error")
def on_agent_error(error):
email = test_info.get("email", "")
companyId = test_info.get("companyId", "")
testId = test_info.get("testId", "")
logger.info(f"\n +++++++. on_agent_error +++++++++ \n {error}")
# getattr prevents the code from crashing if the attribute is missing
actual_error = getattr(error, "error", error)
error_message = str(actual_error)
error_source = getattr(error, "source", "Unknown Source")
# 2. Send Slack Message
slack_send_message(
message=f"InviteId: {ctx.room.name} \n testId: {testId} \n companyId: {companyId} \n email: {email} \n Error: {error_message}",
type="ERROR",
notificationHeading=f"*LIVEKIT AGENT ERROR [{environment}]*"
)
# 3. Safely log to console
logger.error(f"===== Caught agent error from Agent Runtime: {error_source}: {error_message}")
usage = metrics.UsageCollector()
@session.on("metrics_collected")
def _m(ev):
usage.collect(ev.metrics)
async def log_usage():
logger.info(f"Usage: {usage.get_summary()}")
ctx.add_shutdown_callback(log_usage)
await session.start(
room=ctx.room,
agent=InterviewAgent(instructions=instructions, ctx=ctx),
room_options=room_io.RoomOptions(
audio_input=room_io.AudioInputOptions(
noise_cancellation=noise_cancellation.BVC(),
),
# optional but nice for cleanup:
delete_room_on_close=True,
),
# record=True, // it is recording file in server
)
# âś… 5. MAX DURATION TIMER TASK
async def timeout_monitor():
try:
logger.info(f"Starting max duration timer: {session_max_duration} seconds")
await asyncio.sleep(session_max_duration)
# If we reach here, time is up
logger.info("Session exceeded max duration. Closing.")
ctx.agent.end_reason = "exceeded-max-duration"
await ctx.room.disconnect()
except asyncio.CancelledError:
pass # Timer cancelled (normal end)
timeout_task = asyncio.create_task(timeout_monitor())
# Wait until at least one non-agent participant is present
# while True:
# participants = list(ctx.room.remote_participants.values())
# if any(p.identity and not p.identity.startswith("agent") for p in participants):
# break
# await asyncio.sleep(0.05)
await session.generate_reply(
instructions="Begin by welcoming the candidate and follow '### Start With' instruction",
tool_choice="none",
)
@session.on("close")
def on_closed(session):
logger.info("\n ------ Cleaning up timeout task on session close ------")
logger.info(f"Session closed Usage: {usage.get_summary()}")
timeout_task.cancel()
if __name__ == "__main__":
from livekit.agents import cli
cli.run_app(server)
The Behavior:
-
The agent starts and functions correctly (STT/LLM/TTS are all working).
-
When the agent calls await ctx.room.disconnect() or the user leaves, the process seems to terminate, but the logs inside on_session_end never appear.
-
This is preventing our backend from receiving the session report via webhook to process candidate results.
Urgency:
This is affecting our production environment and candidate results are not being processed.
Questions:
-
Are there specific conditions (like task cancellation or specific disconnect reasons) where on_session_end is bypassed?
-
Has there been a recent change in how rtc_session workers handle cleanup tasks?
-
Is there a recommended way to debug why this lifecycle hook isn’t firing?