Server API SendData not delivered to Python on data_received using version 1.5.10

I’m trying to send data from an external source to livekit agent. As mentioned in the docs, I’m using Data Packets and SendData to do this.

Below is agent.py

from dotenv import load_dotenv
import asyncio
import json
from livekit import agents, rtc
from livekit.agents import AgentServer, AgentSession, Agent, room_io
from livekit.plugins import aws, noise_cancellation
from utils import load_prompt

load_dotenv()

ROOM_NAME = "my-room"
agent_name = "ecom-agent"

class ContextAgent(Agent):
    def __init__(self, context_vars=None, call_type=None) -> None:
        instructions = load_prompt("instructions.yaml")

        if context_vars:
            instructions = instructions.format(**context_vars)
        super().__init__(instructions=instructions)

    async def on_enter(self):
        await self.session.generate_reply(
            instructions="""
            Greet the customer.
            Example:
            Hi {first_name}, I'm Linda from Dalabey Live.
            I saw you're interested in the product.
            Offer your assistance. Start in English.
            """
        )
# ------------------ SERVER ------------------

server = AgentServer()

@server.rtc_session(agent_name=agent_name)
async def my_agent(ctx: agents.JobContext):

    await ctx.connect()
    participant = await ctx.wait_for_participant()

    # Create session
    session = AgentSession(
        llm=aws.realtime.RealtimeModel(voice="tiffany"),
    )

    agent = ContextAgent(
        context_vars=participant.attributes,
        call_type=participant.attributes.get("call_type"),
    )

    # Start session
    await session.start(
        room=ctx.room,
        agent=agent,
        room_options=room_io.RoomOptions(
            audio_input=room_io.AudioInputOptions(
                noise_cancellation=lambda params:
                    noise_cancellation.BVCTelephony()
                    if params.participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP
                    else noise_cancellation.BVC(),
            ),
        ),
    )

    # ------------------ ROOM EVENT HANDLER ------------------

    @ctx.room.on("data_received")
    def on_data(packet: rtc.DataPacket):
      print(f"DATA RECEIVED ...")
      async def handle():
          try:
              payload = json.loads(packet.data.decode("utf-8"))
              event_type = payload.get("type")
              message = payload.get("message")
              if message:
                  await session.say(message)

              if event_type == "data_sent":
                  await session.say(
                      "Great! External Data has been sent."
                  )

          except Exception as e:
              print(f"Error handling data packet: {e}")
      asyncio.create_task(handle())

if __name__ == "__main__":
    agents.cli.run_app(server)

Below is me sending data to livekit agent

import os
from dotenv import load_dotenv
import base64
import json
import time
import jwt
import httpx
from fastapi import FastAPI
import requests
load_dotenv()

app = FastAPI()

LIVEKIT_URL = os.environ["LIVEKIT_URL"] 
LIVEKIT_API_KEY = os.environ["LIVEKIT_API_KEY"]
LIVEKIT_API_SECRET = os.environ["LIVEKIT_API_SECRET"]
ROOM_NAME = "my-room"
AGENT_IDENTITY = "data-sender"

from livekit import api
import os

def create_room_admin_token(room: str) -> str:
  token = api.AccessToken(os.getenv('LIVEKIT_API_KEY'),
                          os.getenv('LIVEKIT_API_SECRET')) \
      .with_identity("data-sender") \
      .with_name("Kamal") \
      .with_grants(api.VideoGrants(
          room_join=True,
          room_admin=True,
          can_publish=True,
          can_subscribe=True,
          room=room)).to_jwt()

  return token

@app.post("/send-data")
async def handle_event(event: dict):
    token = create_room_admin_token(ROOM_NAME)
    print(f"TOKEN:{token}")

    payload = {
        "type": event.get("type"),
        "message": event.get("message"),
    }

    print(f"PAYLOAD: {payload}")
    encoded_data = base64.b64encode(
        json.dumps(payload).encode("utf-8")
    ).decode("utf-8")
    print(f"Encoded Data: {encoded_data}")

    # Convert WebSocket URL to HTTP API URL

    api_url = os.environ.get('LIVEKIT_URL').replace('wss://', 'https://').replace('ws://', 'http://')

    if not api_url.endswith('/'):
        api_url += '/'

    headers={
      "Authorization": f"Bearer {token}",
      "Content-Type": "application/json"
    }

    payload={
      "room": ROOM_NAME,
      "data": encoded_data,
      "kind": "reliable",
      "destination_identities": [AGENT_IDENTITY],
    }

    # Send data message via REST API
    url = f"{api_url}twirp/livekit.RoomService/SendData"

    response = requests.post(url, json=payload, headers=headers, timeout=10)

    if response.status_code == 200:
        print(f"Successfully sent message to room")
        return f"Successfully sent message to room"
    else:
        print(f"Failed to send message. Status: {response.status_code}, Response: {response.text}")
        raise Exception(f"LiveKit API error: {response.status_code} - {response.text}")

Issue

When I send an event using RoomService.sendData(...), the agent doesn’t receive the data. And room.on("data_received") does not fire for messages sent via RoomService.sendData(...). I’m using version 1.5.10

Expected behavior

RoomService.sendData(...) should deliver data to connected participants and fires room.on("data_received") in the Python SDK.

I see this issue was reported here as well Server API SendData not delivered to Python data_received listeners in 1.0.16 (worked in 1.0.13) · Issue #519 · livekit/python-sdks · GitHub . One of the LK team said that this issue has been fixed. But I’m still facing the issue that the room doesn’t receive the data.

Am I doing anything wrong?

To send data between an agent and a front end we recommend RPC, Remote method calls | LiveKit Documentation

There are a few examples that show this, e.g. python-agents-examples/complex-agents/drive-thru at main · livekit-examples/python-agents-examples · GitHub

Some things to try if I have misunderstood your use case and you want to continue to debug with SendData:

  • Try removing the destination_identities parameter to send the message to the entire room (this would eliminate the possibility that the provided identity was incorrect)
  • Verify that the participant is in the room before sending them the message.

@darryncampbell My use case is that I want my backend server (which doesn’t have a connection with LK) to send info to a livekit room when an event happens. The best option is to use Data Packets SendData Room service API | LiveKit Documentation

Thanks for the guidance on removing the destination_identities parameter. When I made destination_identities=None, the data is being sent to the entire room. At least data is being sent and the agent is able to generate_reply mid call after it receives the data. But what I have noticed is that the generate_reply is being repeated twice. I’m thinking both participants are receiving the message and doing a generate_reply twice.

I want to use destination_identities so that I can be specific and so that the generate_reply is done once. I can confirm that the destination_identities I’m entering is correct and is in the room before I send the data.

I’m testing this in LK agent playground, and this is where I’m entering participant_identity

Isn’t this the correct way of entering a participant_identity when joining the room.

When I put specific identities under destination_identities when using SendData, the room doesn’t receive data. I don’t know why.

@Kamal_Moha, These symptoms (broadcast delivers, targeted doesn't) mean destination_identities is filtering out the agent. For a quick confirmation, call ListParticipants from the server side and dump the identities before sending Room service API | LiveKit Documentation. ListParticipants returns ParticipantInfo for every participant in the room, each with their identity string.

The playground likely registers the joining human with the identity you typed, while the agent worker is a separate participant. Per the agents worker docs, “room.local_participant represents the agentJob lifecycle | LiveKit Documentation, so the agent’s ctx.room.local_participant.identity inside the agent code is what should go in destination_identities, not whatever the playground field shows.

GitHub issue: [livekit/python-sdks#519] is closed and the fix is in 1.5.10, so this isn’t that SDK regression.

The playground likely registers the joining human with the identity you typed, while the agent worker is a separate participant.

Correct. Yes playground is where I join the call as the human. So my intention is that the data (from SendData) is sent to the human and then agent takes an action (like generate_reply) when this data is received. Does that make sense @Muhammad_Usman_Bashir.

so the agent’s ctx.room.local_participant.identity inside the agent code is what should go indestination_identities, not whatever the playground field shows.

Using the Agent identity worked for me. When I used the agent identity (not human) in destination_identities, I see that it’s able to receive the data.

I will have to use ListParticipants, find the identity for the agent and then use that identity in destination_identities

The nuance here is that the agent has an ID which is used as a unique identifier, so for example LiveKit dispatches a specific agent ID to the room. This is the ID you are seeing in the playground above.

After joining a room, the agent will subscribe to various tracks as a participant, so it can engage in conversation. As a participant it receives a separate ID, which is the ID returned by the local_participant.identity and used by the data_received API.

Apologies for the confusion, like I said previously this is not a common use case so this overlap of the term ID doesn’t usually come up.

Apologies for the confusion, like I said previously this is not a common use case so this overlap of the term ID doesn’t usually come up.

@darryncampbell No worries, all good. I’m happy that I have made some progress in the right direction.

One thing I have noticed and wanted to ask you @darryncampbell is around the best practice around room naming when doing CreateSIPParticipantRequest. Is it better to create random room names every time a SIP participant joins the call rather than hard coding the room name. Since the Server API SendData requires a room name, then I’m thinking it needs a specific room name so as to know which specific room to send the data to, right.

Any thoughts?

I thought we would have something in our docs, but I don’t see anything.

My personal recommendations for room naming:

  • Use unique room names for each session where possible. This makes it easier to debug issues and analyse logs / review analytics. You can use a random string for this.
  • Include a prefix in your room names to distinguish between different environments (prod/staging/dev) or distinguish between anything else that makes sense to, such as if you have different agents to handle different workflows.
  • NEVER put PII or secret information in your room names
  • Keep room names URL friendly and stick to characters [a-zA-Z0-9._-]. This is not required, but it makes the names easier to parse by humans.

Thanks for the advice.

I believe it will be nice if we could have a dedicated page in the LK docs to cover Best Practices for building Voice Agents on LK.

Circling back to Darryns comment and he is 100% correct. I would add that it is use case specific. But for 1:1 cases like with agents a unique room name is what you want. There is a note about it here: