Urgent: on_session_end callback not triggering in Agents SDK

I have shared the code showing how my agent calls the end interview function. I am using session.shutdown(drain=True) as suggested by you, but on_session_end is still not being called for some sessions when the room closes.

This API call is critical for my platform because post-screening scoring depends entirely on it. I cannot afford to lose this call under any circumstances.

Rooms that failed to call on_session_end:

  • RM_2BZmRVtYDZ6f

  • RM_fPKCaCtnWfn5

Could you please investigate these rooms and help me understand why on_session_end was not triggered?

Here is my current implementation for reference:

class InterviewAgent(Agent):
def init(self, instructions: str, ctx: JobContext):
super().init(
instructions=instructions,
)
self.end_reason = “customer-ended-call”
self.ctx = ctx

def get_session_report_json(ctx: JobContext) → str:
report = ctx.make_session_report()
report_dict = report.to_dict()
ended_reason = “customer-ended-call”

# Check if we stored a specific reason in the agent session
if hasattr(ctx.agent, "end_reason"):
     ended_reason = ctx.agent.end_reason
report_dict["ended_reason"] = ended_reason

report_json_string = json.dumps(report_dict, default=str)
json_data = json.loads(report_json_string)

return json_data

async def send_session_report(ctx: JobContext) → None:
try:
logger.info(f"Started session report sending for {ctx.room.name}")

    meta = ctx.job.metadata or "{}"
    payload = json.loads(meta)
    environment = payload.get("environment", "")
    webhook_url = payload.get("webhookURL", "")

    json_data = get_session_report_json(ctx)
    if environment != "local":
        logger.info(json.dumps(json_data, default=str))

    async with httpx.AsyncClient(timeout=10.0) as client:
        response = await client.post(
            f"{webhook_url}/ai-interview/livekit/session/end",
            json=json_data
        )
    logger.info(f"Session report for {ctx.room.name} sended to {webhook_url}")

except Exception as e:
    logger.error(f"Error sending session report to webhook: {e}")
# current_date = datetime.now().strftime("%Y%m%d_%H%M%S")

async def on_session_end(ctx: JobContext) → None:
await send_session_report(ctx)

@server.rtc_session(agent_name=“interview-agent”, on_session_end=on_session_end)
async def entry(ctx: JobContext):
logger.info(“\n +++++++. entry +++++++++”)
// … code

@llm.function_tool(description=end_interview_instruction)
async def end_interview():
    logger.info("\n 🛑 TOOL CALLED: end_interview")
    ctx.agent.end_reason = "assistant-said-end-call-phrase"
    await session.say("Thank you for your time. Goodbye!", allow_interruptions=False)
    session.shutdown(drain=True)

session = AgentSession(
    tools=[end_interview],
    stt=stt_plugin.FallbackAdapter([
        stt,
        assemblyai.STT(model="universal-streaming", api_key=ASSEMBLYAI_API_KEY)
    ]),
    # llm="openai/gpt-4o",
    llm = llm.FallbackAdapter(llm=[
        openai.LLM(
            model=gpt_model,
            api_key=OPENAI_API_KEY
        ),
        openai.LLM(
            model="gpt-4o"
        ),
        openai.LLM(
            model="gpt-4.1"
        )
    ],
    retry_on_chunk_sent = True
    ),
    tts=tts_plugin.FallbackAdapter([
        tts,
        inworld.TTS(voice="Ashley", model="inworld-tts-1", api_key=INWORLD_API_KEY)
    ]),
    vad=silero.VAD.load(),
    turn_detection=MultilingualModel(),
    preemptive_generation=preemptive_generation,
    allow_interruptions=True,
    min_interruption_duration=min_interruption_duration, # Minimum speech length (s) to register as an interruption
    min_endpointing_delay=min_endpointing_delay, # Minimum time-in-seconds since the last detected speech before the agent declares the user’s turn complete.
    max_endpointing_delay=max_endpointing_delay, # Maximum time-in-seconds the agent will wait before terminating the turn
    false_interruption_timeout=false_interruption_timeout, #
    user_away_timeout=user_away_timeout,
    use_tts_aligned_transcript=True
)
# --- 3. ADD THE SPECIFIC ERROR EVENT HANDLER ---
@session.on("error")
def on_agent_error(error):
    email = test_info.get("email", "")
    companyId = test_info.get("companyId", "")
    testId = test_info.get("testId", "")
    
    logger.info(f"\n +++++++.  on_agent_error +++++++++ \n {error}")
    # getattr prevents the code from crashing if the attribute is missing
    actual_error = getattr(error, "error", error)
    error_message = str(actual_error)
    error_source = getattr(error, "source", "Unknown Source")
    
    # 2. Send Slack Message
    slack_send_message(
        message=f"InviteId: {ctx.room.name} \n testId: {testId} \n companyId: {companyId} \n email: {email} \n Error: {error_message}",
        type="ERROR",
        notificationHeading=f"*LIVEKIT AGENT ERROR [{environment}]*"
    )
    
    # 3. Safely log to console
    logger.error(f"===== Caught agent error from Agent Runtime: {error_source}: {error_message}")

usage = metrics.UsageCollector()

@session.on("metrics_collected")
def _m(ev):
    usage.collect(ev.metrics)

async def log_usage():
    logger.info(f"Usage: {usage.get_summary()}")

ctx.add_shutdown_callback(log_usage)


await session.start(
    room=ctx.room,
    agent=InterviewAgent(instructions=instructions, ctx=ctx),
    room_options=room_io.RoomOptions(
        audio_input=room_io.AudioInputOptions(
            noise_cancellation=noise_cancellation.BVC(),
        ),
        # optional but nice for cleanup:
        delete_room_on_close=True,
    ),
    # record=True, // it is recording file in server
)

# âś… 5. MAX DURATION TIMER TASK
async def timeout_monitor():
    try:
        logger.info(f"Starting max duration timer: {session_max_duration} seconds")
        await asyncio.sleep(session_max_duration)
        
        # If we reach here, time is up
        logger.info("Session exceeded max duration. Closing.")
        ctx.agent.end_reason = "exceeded-max-duration"
        
        session.shutdown(drain=True)
    except asyncio.CancelledError:
        pass # Timer cancelled (normal end)

timeout_task = asyncio.create_task(timeout_monitor())

await session.generate_reply(
    instructions="Begin by welcoming the candidate and follow '### Start With' instruction",
    tool_choice="none",
)
    
@session.on("close")
def on_closed(session):
    logger.info("\n ------ Cleaning up timeout task on session close ------")
    logger.info(f"Session closed Usage: {usage.get_summary()}")
    try:
        json_data = get_session_report_json(ctx)
        if environment != "local":
            logger.info( f" Close event: {json.dumps(json_data, default=str)}")
    except Exception as e:
        logger.error(f"Error getting session report: {e}")
    timeout_task.cancel()

if name == “main”:
from livekit.agents import cli
cli.run_app(server)