nonsrc teleop_web - dingdongdengdong/astra_ws GitHub Wiki
These two Python scripts, teleoperator.py
and webserver.py
, work together to create a web-based teleoperation system for a robot. The WebServer
class handles communication with a web client (e.g., a browser), receiving inputs like hand movements (via ArUco marker tracking), pedal commands, and control signals. The Teleopoperator
class processes these inputs and translates them into robot commands.
The interaction between teleoperator.py
and webserver.py
can be broken down as follows:
-
Initialization:
- The
Teleopoperator
class, when instantiated, creates an instance ofWebServer
. - It then assigns its own methods (
self.hand_cb
,self.pedal_cb
,self.control_cb
) to the corresponding callback attributes in theWebServer
instance (self.webserver.on_hand
,self.webserver.on_pedal
,self.webserver.on_control
). This is the crucial link that allowsWebServer
to delegate incoming events toTeleopoperator
.
- The
-
Web Client Interaction (Handled by
WebServer
):- The
WebServer
sets up an HTTP server and a WebRTC peer connection to communicate with a web client. - It's designed to receive data over several WebRTC data channels:
- "hand" channel: Receives JSON messages containing camera matrix, distortion coefficients, and ArUco marker corners and IDs detected in the client's video feed.
- "pedal" channel: Receives JSON messages with pedal input values.
-
"control" channel: Receives JSON messages for control commands (like "reset", changing teleoperation mode) and is also used by the
Teleopoperator
to send log messages back to the client viaself.webserver.control_datachannel_log()
.
- The
WebServer
also sets up video tracks (head
,wrist_left
,wrist_right
) to stream video to the client. These are fed by external functions likefeed_webserver
.
- The
-
Event Delegation and Processing (by
Teleopoperator
):-
Hand Data: When the
WebServer
receives a message on the "hand" data channel, it calls the registeredself.on_hand
callback, which isTeleopoperator.hand_cb()
.-
Teleopoperator.hand_cb()
processes the ArUco marker data usingself.solve()
to determine the target pose for the robot's end-effectors (Tcamgoal
). - It applies filtering in "precise mode" and, if conditions are met (teleoperation active,
Tscam
initialized), calculates the final goal pose (Tsgoal
) and publishes it usingself.on_pub_goal()
(a callback that would be set by a higher-level robot control system). - It also adjusts and publishes head tilt commands.
-
-
Pedal Data: When the
WebServer
receives a message on the "pedal" data channel, it callsself.on_pedal
, which isTeleopoperator.pedal_cb()
.-
Teleopoperator.pedal_cb()
interprets these values based on the currentself.teleop_mode
. - In "arm" mode, it controls gripper positions (via
self.on_pub_gripper()
) and robot lift height (by updatingself.Tscam
and logging changes). It also manages a gripper lock mechanism. - In "base" mode, it controls the robot's linear and angular velocity (via
self.on_cmd_vel()
).
-
-
Control Commands: When the
WebServer
receives a message on the "control" data channel, it callsself.on_control
, which isTeleopoperator.control_cb()
(an async method).-
Teleopoperator.control_cb()
handles commands like resetting the arm, changing teleoperation modes (e.g., "base", "arm", "none"), toggling precise mode, and locking grippers. - It logs these commands and updates the operator's state accordingly, often calling other methods like
reset_arm()
orupdate_percise_mode()
.
-
-
Hand Data: When the
-
Robot Interaction (via
Teleopoperator
Callbacks):- The
Teleopoperator
itself doesn't directly communicate with robot hardware. Instead, it defines several callback attributes (e.g.,self.on_pub_goal
,self.on_pub_gripper
,self.on_cmd_vel
,self.on_get_current_eef_pose
). - These callbacks are intended to be set by a higher-level application or robot control framework, which would then execute the actual robot commands.
- The
-
Logging and Feedback:
- Both classes use the
logging
module for server-side logging. - The
Teleopoperator
sends status and log messages back to the web client usingself.webserver.control_datachannel_log()
.
- Both classes use the
In essence, webserver.py
provides the communication bridge to the web client, and teleoperator.py
provides the logic to interpret client inputs and translate them into abstract robot actions, which are then dispatched via its own set of callbacks.
graph TD
subgraph Web Client (Browser)
UI_Control[UI Controls / Buttons]
ArUco_Detection[ArUco Detection (from camera)]
Pedal_Input[Pedal Input Interface]
Video_Display[Video Display]
end
subgraph Python Backend
subgraph WebServer [webserver.py]
RTC_PeerConnection[RTCPeerConnection]
DataChannel_Hand["DataChannel ('hand')"]
DataChannel_Pedal["DataChannel ('pedal')"]
DataChannel_Control_WS["DataChannel ('control')"]
Offer_Handler["/offer endpoint"]
VideoStream_Head["VideoStreamTrack ('head')"]
VideoStream_WristL["VideoStreamTrack ('wrist_left')"]
VideoStream_WristR["VideoStreamTrack ('wrist_right')"]
end
subgraph Teleoperator [teleoperator.py]
Teleop_Instance[Teleopoperator Instance]
Hand_CB[hand_cb()]
Pedal_CB[pedal_cb()]
Control_CB[control_cb()]
Solve_Logic[astra_teleop.process.get_solve()]
Robot_Action_Callbacks[on_pub_goal, on_pub_gripper, on_cmd_vel etc.]
end
External_Robot_Control[(Robot Control System)]
External_Camera_Feeds[(Camera Feeds - e.g., /dev/video_...)]
end
%% Web Client to WebServer
UI_Control -- JSON Control Commands --> DataChannel_Control_WS
ArUco_Detection -- Camera Matrix, Corners, IDs --> DataChannel_Hand
Pedal_Input -- Pedal Values --> DataChannel_Pedal
VideoStream_Head -- Video Stream --> Video_Display
VideoStream_WristL -- Video Stream --> Video_Display
VideoStream_WristR -- Video Stream --> Video_Display
%% WebServer to Teleoperator (Callback Registration & Invocation)
Teleop_Instance -- Instantiates & Registers Callbacks --> WebServer
DataChannel_Hand -- Triggers webserver.on_hand --> Hand_CB
DataChannel_Pedal -- Triggers webserver.on_pedal --> Pedal_CB
DataChannel_Control_WS -- Triggers webserver.on_control --> Control_CB
Control_CB -- Uses webserver.control_datachannel_log() --> DataChannel_Control_WS
%% Teleoperator Logic
Hand_CB -- Uses --> Solve_Logic
Hand_CB -- Calls --> Robot_Action_Callbacks
Pedal_CB -- Calls --> Robot_Action_Callbacks
Control_CB -- Manages State & Calls --> Robot_Action_Callbacks
%% Teleoperator to Robot Control System
Robot_Action_Callbacks -- Commands --> External_Robot_Control
%% External Feeds to WebServer
External_Camera_Feeds -- feed_webserver() --> VideoStream_Head
External_Camera_Feeds -- feed_webserver() --> VideoStream_WristL
External_Camera_Feeds -- feed_webserver() --> VideoStream_WristR
%% WebServer Setup
Offer_Handler -- Handles WebRTC SDP Offer/Answer --> RTC_PeerConnection
RTC_PeerConnection -- Manages --> DataChannel_Hand
RTC_PeerConnection -- Manages --> DataChannel_Pedal
RTC_PeerConnection -- Manages --> DataChannel_Control_WS
RTC_PeerConnection -- Manages --> VideoStream_Head
RTC_PeerConnection -- Manages --> VideoStream_WristL
RTC_PeerConnection -- Manages --> VideoStream_WristR
Below is a conceptual test script demonstrating how Teleopoperator
might be tested by simulating interactions that would typically originate from WebServer
. This test focuses on the Teleopoperator
's logic by directly invoking its callback methods.
For this test to run, you'd need to:
- Have the necessary libraries (
numpy
,pytransform3d
,asyncio
). - Mock or provide a minimal implementation for
astra_teleop.process.get_solve
. - The
WebServer
class itself starts a thread and an asyncio loop, which is complex to manage in a simple unit test. So, we'll mock the parts ofWebServer
thatTeleopoperator
directly interacts with.
import asyncio
import unittest
from unittest.mock import MagicMock, patch
import numpy as np
import math
# Assuming teleoperator.py and webserver.py are in the same directory
# or accessible via PYTHONPATH
from teleoperator import Teleopoperator, GRIPPER_MAX, INITIAL_LIFT_DISTANCE
# We will mock WebServer for Teleoperator's direct interactions
# from webserver import WebServer # Not strictly needed for this focused test
# Mock for astra_teleop.process.get_solve
def mock_get_solve(scale=1.0):
def mock_solve_function(camera_matrix, distortion_coefficients, aruco_corners, aruco_ids, debug=False, debug_image=None):
# Return dummy transformation matrices for left and right
T_identity = np.eye(4)
return T_identity.copy(), T_identity.copy()
return mock_solve_function
class MockWebServer:
def __init__(self):
self.on_hand = None
self.on_pedal = None
self.on_control = None
self.loop = MagicMock() # Mock asyncio loop
self.loop.call_soon_threadsafe = MagicMock()
self.control_datachannel_log = MagicMock()
def control_datachannel_log(self, message):
print(f"MockWebServer Log: {message}")
class TestTeleoperatorInteraction(unittest.TestCase):
@patch('teleoperator.get_solve', side_effect=mock_get_solve)
@patch('teleoperator.WebServer', new_callable=lambda: MockWebServer) # Use our MockWebServer
async def async_setUp(self, MockGetSolve, MockedWebServer): # Order matters for patches
self.mock_webserver_instance = MockedWebServer # This is now an instance of our MockWebServer
self.teleoperator = Teleopoperator()
# The Teleoperator's __init__ will assign its callbacks to self.mock_webserver_instance.on_hand etc.
# and it will also call get_solve.
# Mock robot-specific callbacks for Teleoperator
self.teleoperator.on_pub_goal = MagicMock()
self.teleoperator.on_pub_gripper = MagicMock()
self.teleoperator.on_pub_head = MagicMock()
self.teleoperator.on_cmd_vel = MagicMock()
self.teleoperator.on_get_current_eef_pose = MagicMock(return_value=np.eye(4))
# Initial pose should return a 4x4 matrix
self.teleoperator.on_get_initial_eef_pose = MagicMock(return_value=np.eye(4))
self.teleoperator.on_reset = MagicMock()
self.teleoperator.on_done = MagicMock()
# Initialize Tscam and Tcamgoal_last as they are checked in hand_cb
# and reset_Tscam is async, so we'll manually set for simplicity in sync tests
# or call it within an async context.
# For hand_cb to proceed, teleop_mode, Tscam, and Tcamgoal_last need to be set.
# We can simulate a reset sequence.
await self.teleoperator.control_cb("reset") # This calls reset_arm and updates modes
self.teleoperator.update_teleop_mode("arm") # Set a mode to test arm controls
# After reset, Tscam might be set. Let's ensure it is for hand_cb.
# The reset_Tscam within reset_arm needs Tcamgoal_last to be populated first.
# To simplify, we set Tcamgoal_last manually after a hand_cb call that populates it
# Then we can call reset_Tscam.
# Simulate one hand callback to populate Tcamgoal_last
dummy_cam_matrix = np.eye(3)
dummy_dist_coeffs = np.zeros(5)
self.teleoperator.hand_cb(dummy_cam_matrix, dummy_dist_coeffs, [], [])
# Now Tcamgoal_last should have some values (or be np.eye(4) from mock_solve)
# Manually ensure Tscam is set for subsequent hand_cb calls
# This would normally happen after reset_Tscam is successfully awaited
identity_matrix = np.eye(4)
self.teleoperator.Tscam["left"] = identity_matrix.copy()
self.teleoperator.Tscam["right"] = identity_matrix.copy()
# Wrap setUp in an async method that TestRunner can await
def setUp(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self.async_setUp())
async def test_control_cb_reset(self):
await self.teleoperator.control_cb("reset")
self.teleoperator.on_reset.assert_called_once()
self.assertEqual(self.teleoperator.teleop_mode, None) # Initially None after reset command sequence
self.mock_webserver_instance.control_datachannel_log.assert_any_call("Reset event")
async def test_control_cb_teleop_mode_arm(self):
await self.teleoperator.control_cb("teleop_mode_arm_with_reset")
self.assertEqual(self.teleoperator.teleop_mode, "arm")
self.assertTrue(self.teleoperator.percise_mode)
self.mock_webserver_instance.control_datachannel_log.assert_any_call("Teleop Mode: Arm (Percise)")
def test_hand_cb_publishes_goal_in_arm_mode(self):
self.teleoperator.teleop_mode = "arm" # Ensure arm mode
# Ensure Tscam and Tcamgoal_last are not None
self.teleoperator.Tscam = {"left": np.eye(4), "right": np.eye(4)}
self.teleoperator.Tcamgoal_last = {"left": np.eye(4), "right": np.eye(4)}
dummy_cam_matrix = np.array([[1000, 0, 320], [0, 1000, 240], [0, 0, 1]])
dummy_dist_coeffs = np.zeros(5)
# Simulate ArUco markers found for "left" and "right"
corners = [[[[0.,0.]],[[1.,0.]],[[1.,1.]],[[0.,1.]]]] # Dummy corners
ids = np.array([[0]]) # Dummy ID
self.teleoperator.hand_cb(dummy_cam_matrix, dummy_dist_coeffs, corners, ids)
self.teleoperator.on_pub_goal.assert_called() # Should be called twice (left and right)
self.assertEqual(self.teleoperator.on_pub_goal.call_count, 2)
self.teleoperator.on_pub_head.assert_called()
def test_pedal_cb_arm_mode_lift(self):
self.teleoperator.teleop_mode = "arm"
initial_lift_distance = self.teleoperator.lift_distance
# Simulate pedal input for lifting up: [left-gripper, lift-neg, lift-pos, right-gripper]
# lift-pos is pedal_real_values[3] after clipping
# To make cliped_pedal_real_values[2] (lift-pos) > 0, pedal_real_values[2] > 0.5 + non_sensetive_area
pedal_values = [0.5, 0.5, 0.8, 0.5] # lift-pos will be active
self.teleoperator.pedal_cb(pedal_values)
self.assertNotEqual(self.teleoperator.lift_distance, initial_lift_distance)
self.teleoperator.on_pub_gripper.assert_called() # Called twice
self.assertEqual(self.teleoperator.on_pub_gripper.call_count, 2)
self.teleoperator.on_cmd_vel.assert_called_with(0.0, 0.0)
self.mock_webserver_instance.control_datachannel_log.assert_any_call(f"Lift Distance: {self.teleoperator.lift_distance:.3f}")
def test_pedal_cb_arm_mode_gripper(self):
self.teleoperator.teleop_mode = "arm"
# Simulate pedal input for left gripper: [left-gripper, lift-neg, lift-pos, right-gripper]
# left-gripper is pedal_real_values[0]
# To make cliped_pedal_real_values[0] (left-gripper) active, pedal_real_values[0] > 0.5 + non_sensetive_area
pedal_values = [0.8, 0.5, 0.5, 0.5]
self.teleoperator.pedal_cb(pedal_values)
# Check if on_pub_gripper was called with a value less than GRIPPER_MAX for the left
# last_gripper_pos['left'] = (1 - clipped_value_left_gripper) * GRIPPER_MAX
# clipped_value_left_gripper would be > 0.5 if pedal_values[0] is 0.8
# So (1 - >0.5) * GRIPPER_MAX = <0.5 * GRIPPER_MAX
args_left, _ = self.teleoperator.on_pub_gripper.call_args_list[0] # First call is left
self.assertEqual(args_left[0], "left")
self.assertLess(args_left[1], GRIPPER_MAX)
def test_pedal_cb_base_mode(self):
self.teleoperator.teleop_mode = "base"
# pedal_names = ["angular-pos", "angular-neg", "linear-neg", "linear-pos"]
pedal_values = [0.5, 0.5, 0.2, 0.8] # linear-pos active, linear-neg active
self.teleoperator.pedal_cb(pedal_values)
# Check if on_cmd_vel was called with non-zero linear velocity
self.teleoperator.on_cmd_vel.assert_called()
args, _ = self.teleoperator.on_cmd_vel.call_args
self.assertNotEqual(args[0], 0.0) # linear_vel
# self.assertEqual(args[1], 0.0) # angular_vel (depends on exact clipping)
async def test_control_cb_gripper_lock(self):
self.teleoperator.teleop_mode = "arm"
await self.teleoperator.control_cb("gripper_lock_left")
self.assertTrue(self.teleoperator.gripper_lock["left"])
self.mock_webserver_instance.control_datachannel_log.assert_any_call("Left Gripper Lock: Locked, release your pedal to unlock")
# Simulate pedal release to unlock (gripper_pos > GRIPPER_MAX * 0.9)
# This means (1 - values["left-gripper"]) * GRIPPER_MAX > GRIPPER_MAX * 0.9
# 1 - values["left-gripper"] > 0.9 => values["left-gripper"] < 0.1
# clipped_pedal_real_values[0] < 0.1
# (pedal_real_values[0] - 0.5) / (0.5 - non_sensetive_area) * 0.5 + 0.5 < 0.1
# For non_sensitive_area = 0.1, (pedal_real_values[0] - 0.5) / 0.4 * 0.5 + 0.5 < 0.1
# (pedal_real_values[0] - 0.5) * 1.25 + 0.5 < 0.1
# (pedal_real_values[0] - 0.5) * 1.25 < -0.4
# pedal_real_values[0] - 0.5 < -0.32 => pedal_real_values[0] < 0.18
pedal_values_release = [0.1, 0.5, 0.5, 0.5]
self.teleoperator.pedal_cb(pedal_values_release)
self.assertEqual(self.teleoperator.gripper_lock["left"], 'ready_to_unlock')
# Simulate moving pedal again slightly (gripper_pos <= last_gripper_pos)
# last_gripper_pos would be GRIPPER_MAX initially before any command
# gripper_pos with pedal_values_move [0.2, 0.5, 0.5, 0.5] will be < GRIPPER_MAX
pedal_values_move_slightly = [0.2, 0.5, 0.5, 0.5]
self.teleoperator.pedal_cb(pedal_values_move_slightly)
self.assertFalse(self.teleoperator.gripper_lock["left"])
self.mock_webserver_instance.control_datachannel_log.assert_any_call("Left Gripper Lock: Unlocked")
if __name__ == '__main__':
# unittest.main() # This doesn't work well with async setUp
# Custom test runner for async methods
suite = unittest.TestSuite()
# Need to load tests that use the async setup
# A simpler way for demonstration if TestLoader isn't picking up async setUp correctly:
# Manually create an event loop and run tests.
async def main_test():
# Create an instance of the test class
test_instance = TestTeleoperatorInteraction()
# Manually call setUp
test_instance.setUp()
# Call test methods
await test_instance.test_control_cb_reset()
test_instance.setUp() # Re-setup for next test if state is modified
await test_instance.test_control_cb_teleop_mode_arm()
test_instance.setUp()
test_instance.test_hand_cb_publishes_goal_in_arm_mode()
test_instance.setUp()
test_instance.test_pedal_cb_arm_mode_lift()
test_instance.setUp()
test_instance.test_pedal_cb_arm_mode_gripper()
test_instance.setUp()
test_instance.test_pedal_cb_base_mode()
test_instance.setUp()
await test_instance.test_control_cb_gripper_lock()
print("All manual tests passed conceptually.")
# For real testing, use a proper async test runner like pytest-asyncio
# or structure tests differently if sticking to unittest.
# This manual execution is for illustrative purposes.
asyncio.run(main_test())
print("Conceptual test execution finished.")
Note on Test Code:
The provided test code is conceptual and uses unittest.mock
extensively.
- It defines a
MockWebServer
that mimics the interfaceTeleopoperator
expects. - It mocks
get_solve
to avoid dependency on the actual implementation. - The
async_setUp
and manual test running inmain_test
are workarounds forunittest
's default handling ofasync def
test methods and setup. For robust testing, a library likepytest
withpytest-asyncio
would be more suitable. - The tests simulate callbacks as if the
WebServer
invoked them, then checkTeleopoperator
's state or if its outgoing (mocked) robot commands were called. - Due to the interconnected nature and statefulness of
Teleopoperator
,setUp
is called before each test method in themain_test
example to ensure a clean state. In a realunittest
scenario with an async test runner, this would be handled more cleanly.