I’m trying to send data from an external source to livekit agent. As mentioned in the docs, I’m using Data Packets and SendData to do this.
Below is agent.py
from dotenv import load_dotenv
import asyncio
import json
from livekit import agents, rtc
from livekit.agents import AgentServer, AgentSession, Agent, room_io
from livekit.plugins import aws, noise_cancellation
from utils import load_prompt
load_dotenv()
ROOM_NAME = "my-room"
agent_name = "ecom-agent"
class ContextAgent(Agent):
def __init__(self, context_vars=None, call_type=None) -> None:
instructions = load_prompt("instructions.yaml")
if context_vars:
instructions = instructions.format(**context_vars)
super().__init__(instructions=instructions)
async def on_enter(self):
await self.session.generate_reply(
instructions="""
Greet the customer.
Example:
Hi {first_name}, I'm Linda from Dalabey Live.
I saw you're interested in the product.
Offer your assistance. Start in English.
"""
)
# ------------------ SERVER ------------------
server = AgentServer()
@server.rtc_session(agent_name=agent_name)
async def my_agent(ctx: agents.JobContext):
await ctx.connect()
participant = await ctx.wait_for_participant()
# Create session
session = AgentSession(
llm=aws.realtime.RealtimeModel(voice="tiffany"),
)
agent = ContextAgent(
context_vars=participant.attributes,
call_type=participant.attributes.get("call_type"),
)
# Start session
await session.start(
room=ctx.room,
agent=agent,
room_options=room_io.RoomOptions(
audio_input=room_io.AudioInputOptions(
noise_cancellation=lambda params:
noise_cancellation.BVCTelephony()
if params.participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP
else noise_cancellation.BVC(),
),
),
)
# ------------------ ROOM EVENT HANDLER ------------------
@ctx.room.on("data_received")
def on_data(packet: rtc.DataPacket):
print(f"DATA RECEIVED ...")
async def handle():
try:
payload = json.loads(packet.data.decode("utf-8"))
event_type = payload.get("type")
message = payload.get("message")
if message:
await session.say(message)
if event_type == "data_sent":
await session.say(
"Great! External Data has been sent."
)
except Exception as e:
print(f"Error handling data packet: {e}")
asyncio.create_task(handle())
if __name__ == "__main__":
agents.cli.run_app(server)
Below is me sending data to livekit agent
import os
from dotenv import load_dotenv
import base64
import json
import time
import jwt
import httpx
from fastapi import FastAPI
import requests
load_dotenv()
app = FastAPI()
LIVEKIT_URL = os.environ["LIVEKIT_URL"]
LIVEKIT_API_KEY = os.environ["LIVEKIT_API_KEY"]
LIVEKIT_API_SECRET = os.environ["LIVEKIT_API_SECRET"]
ROOM_NAME = "my-room"
AGENT_IDENTITY = "data-sender"
from livekit import api
import os
def create_room_admin_token(room: str) -> str:
token = api.AccessToken(os.getenv('LIVEKIT_API_KEY'),
os.getenv('LIVEKIT_API_SECRET')) \
.with_identity("data-sender") \
.with_name("Kamal") \
.with_grants(api.VideoGrants(
room_join=True,
room_admin=True,
can_publish=True,
can_subscribe=True,
room=room)).to_jwt()
return token
@app.post("/send-data")
async def handle_event(event: dict):
token = create_room_admin_token(ROOM_NAME)
print(f"TOKEN:{token}")
payload = {
"type": event.get("type"),
"message": event.get("message"),
}
print(f"PAYLOAD: {payload}")
encoded_data = base64.b64encode(
json.dumps(payload).encode("utf-8")
).decode("utf-8")
print(f"Encoded Data: {encoded_data}")
# Convert WebSocket URL to HTTP API URL
api_url = os.environ.get('LIVEKIT_URL').replace('wss://', 'https://').replace('ws://', 'http://')
if not api_url.endswith('/'):
api_url += '/'
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
payload={
"room": ROOM_NAME,
"data": encoded_data,
"kind": "reliable",
"destination_identities": [AGENT_IDENTITY],
}
# Send data message via REST API
url = f"{api_url}twirp/livekit.RoomService/SendData"
response = requests.post(url, json=payload, headers=headers, timeout=10)
if response.status_code == 200:
print(f"Successfully sent message to room")
return f"Successfully sent message to room"
else:
print(f"Failed to send message. Status: {response.status_code}, Response: {response.text}")
raise Exception(f"LiveKit API error: {response.status_code} - {response.text}")
Issue
When I send an event using RoomService.sendData(...), the agent doesn’t receive the data. And room.on("data_received") does not fire for messages sent via RoomService.sendData(...). I’m using version 1.5.10
Expected behavior
RoomService.sendData(...) should deliver data to connected participants and fires room.on("data_received") in the Python SDK.
I see this issue was reported here as well Server API SendData not delivered to Python data_received listeners in 1.0.16 (worked in 1.0.13) · Issue #519 · livekit/python-sdks · GitHub . One of the LK team said that this issue has been fixed. But I’m still facing the issue that the room doesn’t receive the data.
Am I doing anything wrong?
