Hi Team,
Please provide a proper and reliable solution for the following issue.
I am using session.shutdown() but for some sessions, the on_session_end function is still not being called. Since I am using LiveKit in production, this kind of issue is not acceptable as it directly impacts my users.
If I do not receive the session report, I am unable to score the interview results, which is a critical part of my product.
Could you suggest a more reliable approach to get the call transcription after the call ends? For example, is there a webhook available that guarantees the transcription is delivered at least once, even if the session end event is missed?
@llm.function_tool(description=end_interview_instruction)
async def end_interview():
logger.info("\n 🛑 TOOL CALLED: end_interview")
ctx.agent.end_reason = "assistant-said-end-call-phrase"
# Say goodbye first
await session.say("Thank you for your time. Goodbye!", allow_interruptions=False)
# ✅ CHANGE THIS: Use session.shutdown instead of room.disconnect
# drain=True ensures the "Goodbye" audio is sent completely before closing
session.shutdown(drain=True)
async def timeout_monitor():
try:
logger.info(f"Starting max duration timer: {session_max_duration} seconds")
await asyncio.sleep(session_max_duration)
logger.info("Session exceeded max duration. Closing.")
ctx.agent.end_reason = "exceeded-max-duration"
# ✅ CHANGE THIS: Use session.shutdown
session.shutdown()
except asyncio.CancelledError:
pass
def get_session_report_json(ctx: JobContext) -> str:
report = ctx.make_session_report()
report_dict = report.to_dict()
ended_reason = "customer-ended-call"
# Check if we stored a specific reason in the agent session
if hasattr(ctx.agent, "end_reason"):
ended_reason = ctx.agent.end_reason
report_dict["ended_reason"] = ended_reason
report_json_string = json.dumps(report_dict, default=str)
json_data = json.loads(report_json_string)
return json_data
async def send_session_report(ctx: JobContext) -> None:
try:
logger.info(f"Started session report sending for {ctx.room.name}")
meta = ctx.job.metadata or "{}"
payload = json.loads(meta)
environment = payload.get("environment", "")
webhook_url = payload.get("webhookURL", "")
json_data = get_session_report_json(ctx)
if environment != "local":
logger.info(json.dumps(json_data, default=str))
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{webhook_url}/ai-interview/livekit/session/end",
json=json_data
)
logger.info(f"Session report for {ctx.room.name} sended to {webhook_url}")
except Exception as e:
logger.error(f"Error sending session report to webhook: {e}")
# current_date = datetime.now().strftime("%Y%m%d_%H%M%S")
async def on_session_end(ctx: JobContext) -> None:
await send_session_report(ctx)
@server.rtc_session(agent_name="interview-agent", on_session_end=on_session_end)