I have shared the code showing how my agent calls the end interview function. I am using session.shutdown(drain=True) as suggested by you, but on_session_end is still not being called for some sessions when the room closes.
This API call is critical for my platform because post-screening scoring depends entirely on it. I cannot afford to lose this call under any circumstances.
Rooms that failed to call on_session_end:
-
RM_2BZmRVtYDZ6f -
RM_fPKCaCtnWfn5
Could you please investigate these rooms and help me understand why on_session_end was not triggered?
Here is my current implementation for reference:
class InterviewAgent(Agent):
def init(self, instructions: str, ctx: JobContext):
super().init(
instructions=instructions,
)
self.end_reason = “customer-ended-call”
self.ctx = ctx
def get_session_report_json(ctx: JobContext) → str:
report = ctx.make_session_report()
report_dict = report.to_dict()
ended_reason = “customer-ended-call”
# Check if we stored a specific reason in the agent session
if hasattr(ctx.agent, "end_reason"):
ended_reason = ctx.agent.end_reason
report_dict["ended_reason"] = ended_reason
report_json_string = json.dumps(report_dict, default=str)
json_data = json.loads(report_json_string)
return json_data
async def send_session_report(ctx: JobContext) → None:
try:
logger.info(f"Started session report sending for {ctx.room.name}")
meta = ctx.job.metadata or "{}"
payload = json.loads(meta)
environment = payload.get("environment", "")
webhook_url = payload.get("webhookURL", "")
json_data = get_session_report_json(ctx)
if environment != "local":
logger.info(json.dumps(json_data, default=str))
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{webhook_url}/ai-interview/livekit/session/end",
json=json_data
)
logger.info(f"Session report for {ctx.room.name} sended to {webhook_url}")
except Exception as e:
logger.error(f"Error sending session report to webhook: {e}")
# current_date = datetime.now().strftime("%Y%m%d_%H%M%S")
async def on_session_end(ctx: JobContext) → None:
await send_session_report(ctx)
@server.rtc_session(agent_name=“interview-agent”, on_session_end=on_session_end)
async def entry(ctx: JobContext):
logger.info(“\n +++++++. entry +++++++++”)
// … code
@llm.function_tool(description=end_interview_instruction)
async def end_interview():
logger.info("\n 🛑 TOOL CALLED: end_interview")
ctx.agent.end_reason = "assistant-said-end-call-phrase"
await session.say("Thank you for your time. Goodbye!", allow_interruptions=False)
session.shutdown(drain=True)
session = AgentSession(
tools=[end_interview],
stt=stt_plugin.FallbackAdapter([
stt,
assemblyai.STT(model="universal-streaming", api_key=ASSEMBLYAI_API_KEY)
]),
# llm="openai/gpt-4o",
llm = llm.FallbackAdapter(llm=[
openai.LLM(
model=gpt_model,
api_key=OPENAI_API_KEY
),
openai.LLM(
model="gpt-4o"
),
openai.LLM(
model="gpt-4.1"
)
],
retry_on_chunk_sent = True
),
tts=tts_plugin.FallbackAdapter([
tts,
inworld.TTS(voice="Ashley", model="inworld-tts-1", api_key=INWORLD_API_KEY)
]),
vad=silero.VAD.load(),
turn_detection=MultilingualModel(),
preemptive_generation=preemptive_generation,
allow_interruptions=True,
min_interruption_duration=min_interruption_duration, # Minimum speech length (s) to register as an interruption
min_endpointing_delay=min_endpointing_delay, # Minimum time-in-seconds since the last detected speech before the agent declares the user’s turn complete.
max_endpointing_delay=max_endpointing_delay, # Maximum time-in-seconds the agent will wait before terminating the turn
false_interruption_timeout=false_interruption_timeout, #
user_away_timeout=user_away_timeout,
use_tts_aligned_transcript=True
)
# --- 3. ADD THE SPECIFIC ERROR EVENT HANDLER ---
@session.on("error")
def on_agent_error(error):
email = test_info.get("email", "")
companyId = test_info.get("companyId", "")
testId = test_info.get("testId", "")
logger.info(f"\n +++++++. on_agent_error +++++++++ \n {error}")
# getattr prevents the code from crashing if the attribute is missing
actual_error = getattr(error, "error", error)
error_message = str(actual_error)
error_source = getattr(error, "source", "Unknown Source")
# 2. Send Slack Message
slack_send_message(
message=f"InviteId: {ctx.room.name} \n testId: {testId} \n companyId: {companyId} \n email: {email} \n Error: {error_message}",
type="ERROR",
notificationHeading=f"*LIVEKIT AGENT ERROR [{environment}]*"
)
# 3. Safely log to console
logger.error(f"===== Caught agent error from Agent Runtime: {error_source}: {error_message}")
usage = metrics.UsageCollector()
@session.on("metrics_collected")
def _m(ev):
usage.collect(ev.metrics)
async def log_usage():
logger.info(f"Usage: {usage.get_summary()}")
ctx.add_shutdown_callback(log_usage)
await session.start(
room=ctx.room,
agent=InterviewAgent(instructions=instructions, ctx=ctx),
room_options=room_io.RoomOptions(
audio_input=room_io.AudioInputOptions(
noise_cancellation=noise_cancellation.BVC(),
),
# optional but nice for cleanup:
delete_room_on_close=True,
),
# record=True, // it is recording file in server
)
# âś… 5. MAX DURATION TIMER TASK
async def timeout_monitor():
try:
logger.info(f"Starting max duration timer: {session_max_duration} seconds")
await asyncio.sleep(session_max_duration)
# If we reach here, time is up
logger.info("Session exceeded max duration. Closing.")
ctx.agent.end_reason = "exceeded-max-duration"
session.shutdown(drain=True)
except asyncio.CancelledError:
pass # Timer cancelled (normal end)
timeout_task = asyncio.create_task(timeout_monitor())
await session.generate_reply(
instructions="Begin by welcoming the candidate and follow '### Start With' instruction",
tool_choice="none",
)
@session.on("close")
def on_closed(session):
logger.info("\n ------ Cleaning up timeout task on session close ------")
logger.info(f"Session closed Usage: {usage.get_summary()}")
try:
json_data = get_session_report_json(ctx)
if environment != "local":
logger.info( f" Close event: {json.dumps(json_data, default=str)}")
except Exception as e:
logger.error(f"Error getting session report: {e}")
timeout_task.cancel()
if name == “main”:
from livekit.agents import cli
cli.run_app(server)