Urgent: on_session_end callback not triggering in Agents SDK

Hi LiveKit Team,

RoomID : RM_zaCJyUiW8PvA
Room : RM_DknoFff2STzj

I am experiencing an urgent issue where the on_session_end callback registered via the @server.rtc_session decorator is not being called when a room ends or the agent disconnects.

class InterviewAgent(Agent):
def __init__(self, instructions: str, ctx: JobContext):
super().__init__(
instructions=instructions,
)
self.end_reason = "customer-ended-call"
self.ctx = ctx

async def on_session_end(ctx: JobContext) -> None:
try:
report = ctx.make_session_report()
report_dict = report.to_dict()
ended_reason = "customer-ended-call"

# Check if we stored a specific reason in the agent session
if hasattr(ctx.agent, "end_reason"):
ended_reason = ctx.agent.end_reason
report_dict["ended_reason"] = ended_reason

meta = ctx.job.metadata or "{}"
payload = json.loads(meta)
environment = payload.get("environment", "")
webhook_url = payload.get("webhookURL", "")

report_json_string = json.dumps(report_dict, default=str)
json_data = json.loads(report_json_string)
if environment != "local":
logger.info(report_json_string)

async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{webhook_url}/ai-interview/livekit/session/end",
json=json_data
)
logger.info(f"Session report for {ctx.room.name} sended to {webhook_url}")

except Exception as e:
logger.error(f"Error sending session report to webhook: {e}")

@server.rtc_session(agent_name="interview-agent", on_session_end=on_session_end)
async def entry(ctx: JobContext):
logger.info("\n +++++++. entry +++++++++")
# --- 1. SETUP LOG INTERCEPTOR ---
agent_logger = logging.getLogger("livekit.agents")
gpt_model = "gpt-5"
preemptive_generation=False
min_interruption_duration=1.2
min_endpointing_delay=1.4
max_endpointing_delay=7.0
false_interruption_timeout=1.5
user_away_timeout=20

try:
payload = json.loads(meta)
# logger.info(f"\n +++++++. {payload} +++++++++ \n")
agent_config = payload.get("agentConfig", {})
..... other code

except Exception as e:
logger.error(f"\n +++++++. Error parsing metadata: {e} +++++++++ \n")
instructions = meta

@llm.function_tool(description=end_interview_instruction)
async def end_interview():
logger.info("\n 🛑 TOOL CALLED: end_interview")
ctx.agent.end_reason = "assistant-said-end-call-phrase"
await session.say("Thank you for your time. Goodbye!", allow_interruptions=False)
# await asyncio.sleep(4)
await ctx.room.disconnect()

session = AgentSession(
tools=[end_interview],
stt=stt_plugin.FallbackAdapter([
stt,
assemblyai.STT(model="universal-streaming", api_key=ASSEMBLYAI_API_KEY)
]),
# llm="openai/gpt-4o",
llm = llm.FallbackAdapter(llm=[
openai.LLM(
model=gpt_model,
api_key=OPENAI_API_KEY
),
openai.LLM(
model="gpt-4o"
),
openai.LLM(
model="gpt-4.1"
)
],
retry_on_chunk_sent = True
),
tts=tts_plugin.FallbackAdapter([
tts,

inworld.TTS(voice="Ashley", model="inworld-tts-1", api_key=INWORLD_API_KEY)
]),
vad=silero.VAD.load(),
turn_detection=MultilingualModel(),
preemptive_generation=preemptive_generation,
allow_interruptions=True,
min_interruption_duration=min_interruption_duration, # Minimum speech length (s) to register as an interruption
min_endpointing_delay=min_endpointing_delay, # Minimum time-in-seconds since the last detected speech before the agent declares the user’s turn complete.
max_endpointing_delay=max_endpointing_delay, # Maximum time-in-seconds the agent will wait before terminating the turn
false_interruption_timeout=false_interruption_timeout, #
user_away_timeout=user_away_timeout,
use_tts_aligned_transcript=True
)
# --- 3. ADD THE SPECIFIC ERROR EVENT HANDLER ---
@session.on("error")
def on_agent_error(error):
email = test_info.get("email", "")
companyId = test_info.get("companyId", "")
testId = test_info.get("testId", "")

logger.info(f"\n +++++++. on_agent_error +++++++++ \n {error}")
# getattr prevents the code from crashing if the attribute is missing
actual_error = getattr(error, "error", error)
error_message = str(actual_error)
error_source = getattr(error, "source", "Unknown Source")

# 2. Send Slack Message
slack_send_message(
message=f"InviteId: {ctx.room.name} \n testId: {testId} \n companyId: {companyId} \n email: {email} \n Error: {error_message}",
type="ERROR",
notificationHeading=f"*LIVEKIT AGENT ERROR [{environment}]*"
)

# 3. Safely log to console
logger.error(f"===== Caught agent error from Agent Runtime: {error_source}: {error_message}")

usage = metrics.UsageCollector()

@session.on("metrics_collected")
def _m(ev):
usage.collect(ev.metrics)

async def log_usage():
logger.info(f"Usage: {usage.get_summary()}")

ctx.add_shutdown_callback(log_usage)

await session.start(
room=ctx.room,
agent=InterviewAgent(instructions=instructions, ctx=ctx),
room_options=room_io.RoomOptions(
audio_input=room_io.AudioInputOptions(
noise_cancellation=noise_cancellation.BVC(),
),
# optional but nice for cleanup:
delete_room_on_close=True,
),
# record=True, // it is recording file in server
)

# âś… 5. MAX DURATION TIMER TASK
async def timeout_monitor():
try:
logger.info(f"Starting max duration timer: {session_max_duration} seconds")
await asyncio.sleep(session_max_duration)

# If we reach here, time is up
logger.info("Session exceeded max duration. Closing.")
ctx.agent.end_reason = "exceeded-max-duration"

await ctx.room.disconnect()
except asyncio.CancelledError:
pass # Timer cancelled (normal end)

timeout_task = asyncio.create_task(timeout_monitor())

# Wait until at least one non-agent participant is present
# while True:
# participants = list(ctx.room.remote_participants.values())
# if any(p.identity and not p.identity.startswith("agent") for p in participants):
# break
# await asyncio.sleep(0.05)

await session.generate_reply(
instructions="Begin by welcoming the candidate and follow '### Start With' instruction",
tool_choice="none",
)

@session.on("close")
def on_closed(session):
logger.info("\n ------ Cleaning up timeout task on session close ------")
logger.info(f"Session closed Usage: {usage.get_summary()}")
timeout_task.cancel()

if __name__ == "__main__":
from livekit.agents import cli
cli.run_app(server)

The Behavior:

  1. The agent starts and functions correctly (STT/LLM/TTS are all working).

  2. When the agent calls await ctx.room.disconnect() or the user leaves, the process seems to terminate, but the logs inside on_session_end never appear.

  3. This is preventing our backend from receiving the session report via webhook to process candidate results.

Urgency:
This is affecting our production environment and candidate results are not being processed.

Questions:

  • Are there specific conditions (like task cancellation or specific disconnect reasons) where on_session_end is bypassed?

  • Has there been a recent change in how rtc_session workers handle cleanup tasks?

  • Is there a recommended way to debug why this lifecycle hook isn’t firing?

ctx.room.disconnect() will drop the agent RTC connection to the room, but to trigger the expected shutdown behaviour you should call session.shutdown(), Job lifecycle | LiveKit Documentation

1 Like

Does using the code below solve my problem

@llm.function_tool(description=end_interview_instruction)
async def end_interview():
logger.info("\n 🛑 TOOL CALLED: end_interview")
ctx.agent.end_reason = "assistant-said-end-call-phrase"

# Say goodbye first
await session.say("Thank you for your time. Goodbye!", allow_interruptions=False)

# âś… CHANGE THIS: Use session.shutdown instead of room.disconnect
# drain=True ensures the "Goodbye" audio is sent completely before closing
session.shutdown(drain=True)

async def timeout_monitor():
try:
logger.info(f"Starting max duration timer: {session_max_duration} seconds")
await asyncio.sleep(session_max_duration)

logger.info("Session exceeded max duration. Closing.")
ctx.agent.end_reason = "exceeded-max-duration"

# âś… CHANGE THIS: Use session.shutdown
session.shutdown()
except asyncio.CancelledError:
pass

If yes, then what if the user disconnects from the FrontEnd SDK because on user click on end interview, I am calling

const room = useRoomContext()
const handleEndInterview = async () => {
room?.disconnect()
}

Does this cause the same issue or not

I have implemented your solution, and it’s working till now.
Thanks for your support.

1 Like

I have shared the code showing how my agent calls the end interview function. I am using session.shutdown(drain=True) as suggested by you, but on_session_end is still not being called for some sessions when the room closes.

This API call is critical for my platform because post-screening scoring depends entirely on it. I cannot afford to lose this call under any circumstances.

Rooms that failed to call on_session_end:

  • RM_2BZmRVtYDZ6f

  • RM_fPKCaCtnWfn5

Could you please investigate these rooms and help me understand why on_session_end was not triggered?

Here is my current implementation for reference:

class InterviewAgent(Agent):
def init(self, instructions: str, ctx: JobContext):
super().init(
instructions=instructions,
)
self.end_reason = “customer-ended-call”
self.ctx = ctx

def get_session_report_json(ctx: JobContext) → str:
report = ctx.make_session_report()
report_dict = report.to_dict()
ended_reason = “customer-ended-call”

# Check if we stored a specific reason in the agent session
if hasattr(ctx.agent, "end_reason"):
     ended_reason = ctx.agent.end_reason
report_dict["ended_reason"] = ended_reason

report_json_string = json.dumps(report_dict, default=str)
json_data = json.loads(report_json_string)

return json_data

async def send_session_report(ctx: JobContext) → None:
try:
logger.info(f"Started session report sending for {ctx.room.name}")

    meta = ctx.job.metadata or "{}"
    payload = json.loads(meta)
    environment = payload.get("environment", "")
    webhook_url = payload.get("webhookURL", "")

    json_data = get_session_report_json(ctx)
    if environment != "local":
        logger.info(json.dumps(json_data, default=str))

    async with httpx.AsyncClient(timeout=10.0) as client:
        response = await client.post(
            f"{webhook_url}/ai-interview/livekit/session/end",
            json=json_data
        )
    logger.info(f"Session report for {ctx.room.name} sended to {webhook_url}")

except Exception as e:
    logger.error(f"Error sending session report to webhook: {e}")
# current_date = datetime.now().strftime("%Y%m%d_%H%M%S")

async def on_session_end(ctx: JobContext) → None:
await send_session_report(ctx)

@server.rtc_session(agent_name=“interview-agent”, on_session_end=on_session_end)
async def entry(ctx: JobContext):
logger.info(“\n +++++++. entry +++++++++”)
// … code

@llm.function_tool(description=end_interview_instruction)
async def end_interview():
    logger.info("\n 🛑 TOOL CALLED: end_interview")
    ctx.agent.end_reason = "assistant-said-end-call-phrase"
    await session.say("Thank you for your time. Goodbye!", allow_interruptions=False)
    session.shutdown(drain=True)

session = AgentSession(
    tools=[end_interview],
    stt=stt_plugin.FallbackAdapter([
        stt,
        assemblyai.STT(model="universal-streaming", api_key=ASSEMBLYAI_API_KEY)
    ]),
    # llm="openai/gpt-4o",
    llm = llm.FallbackAdapter(llm=[
        openai.LLM(
            model=gpt_model,
            api_key=OPENAI_API_KEY
        ),
        openai.LLM(
            model="gpt-4o"
        ),
        openai.LLM(
            model="gpt-4.1"
        )
    ],
    retry_on_chunk_sent = True
    ),
    tts=tts_plugin.FallbackAdapter([
        tts,
        inworld.TTS(voice="Ashley", model="inworld-tts-1", api_key=INWORLD_API_KEY)
    ]),
    vad=silero.VAD.load(),
    turn_detection=MultilingualModel(),
    preemptive_generation=preemptive_generation,
    allow_interruptions=True,
    min_interruption_duration=min_interruption_duration, # Minimum speech length (s) to register as an interruption
    min_endpointing_delay=min_endpointing_delay, # Minimum time-in-seconds since the last detected speech before the agent declares the user’s turn complete.
    max_endpointing_delay=max_endpointing_delay, # Maximum time-in-seconds the agent will wait before terminating the turn
    false_interruption_timeout=false_interruption_timeout, #
    user_away_timeout=user_away_timeout,
    use_tts_aligned_transcript=True
)
# --- 3. ADD THE SPECIFIC ERROR EVENT HANDLER ---
@session.on("error")
def on_agent_error(error):
    email = test_info.get("email", "")
    companyId = test_info.get("companyId", "")
    testId = test_info.get("testId", "")
    
    logger.info(f"\n +++++++.  on_agent_error +++++++++ \n {error}")
    # getattr prevents the code from crashing if the attribute is missing
    actual_error = getattr(error, "error", error)
    error_message = str(actual_error)
    error_source = getattr(error, "source", "Unknown Source")
    
    # 2. Send Slack Message
    slack_send_message(
        message=f"InviteId: {ctx.room.name} \n testId: {testId} \n companyId: {companyId} \n email: {email} \n Error: {error_message}",
        type="ERROR",
        notificationHeading=f"*LIVEKIT AGENT ERROR [{environment}]*"
    )
    
    # 3. Safely log to console
    logger.error(f"===== Caught agent error from Agent Runtime: {error_source}: {error_message}")

usage = metrics.UsageCollector()

@session.on("metrics_collected")
def _m(ev):
    usage.collect(ev.metrics)

async def log_usage():
    logger.info(f"Usage: {usage.get_summary()}")

ctx.add_shutdown_callback(log_usage)


await session.start(
    room=ctx.room,
    agent=InterviewAgent(instructions=instructions, ctx=ctx),
    room_options=room_io.RoomOptions(
        audio_input=room_io.AudioInputOptions(
            noise_cancellation=noise_cancellation.BVC(),
        ),
        # optional but nice for cleanup:
        delete_room_on_close=True,
    ),
    # record=True, // it is recording file in server
)

# âś… 5. MAX DURATION TIMER TASK
async def timeout_monitor():
    try:
        logger.info(f"Starting max duration timer: {session_max_duration} seconds")
        await asyncio.sleep(session_max_duration)
        
        # If we reach here, time is up
        logger.info("Session exceeded max duration. Closing.")
        ctx.agent.end_reason = "exceeded-max-duration"
        
        session.shutdown(drain=True)
    except asyncio.CancelledError:
        pass # Timer cancelled (normal end)

timeout_task = asyncio.create_task(timeout_monitor())

await session.generate_reply(
    instructions="Begin by welcoming the candidate and follow '### Start With' instruction",
    tool_choice="none",
)
    
@session.on("close")
def on_closed(session):
    logger.info("\n ------ Cleaning up timeout task on session close ------")
    logger.info(f"Session closed Usage: {usage.get_summary()}")
    try:
        json_data = get_session_report_json(ctx)
        if environment != "local":
            logger.info( f" Close event: {json.dumps(json_data, default=str)}")
    except Exception as e:
        logger.error(f"Error getting session report: {e}")
    timeout_task.cancel()

if name == “main”:
from livekit.agents import cli
cli.run_app(server)