arm_protocol - dingdongdengdong/astra_ws GitHub Wiki
1. Introduction
This document provides a detailed explanation of the arm_controller.py
script. This script serves as the high-level Python interface for controlling the Astra robotic arm. It communicates directly with the ESP32-based AstraArmController
firmware over a serial connection.
As outlined in the AstraArmController_Connection_Analysis.md
, this script is the "master" in a master-slave architecture, sending commands and processing feedback from the firmware. It abstracts the low-level serial communication protocol into a clean, object-oriented Python class, ArmController
.
ArmController
Class
2. The The ArmController
class encapsulates all the functionality for communicating with the arm.
Key Responsibilities:
- Managing the serial connection.
- Encoding and sending command packets.
- Receiving, decoding, and validating feedback packets in a background thread.
- Converting data between raw hardware units and standard SI units (radians, meters).
- Estimating joint velocity and effort from position feedback.
- Providing callbacks for state updates, errors, and other events.
__init__
)
2.1. Initialization and Setup (When an ArmController
object is created, it performs several key setup steps:
# arm_controller.py
def __init__(self, name):
logger.info(f"Using device {name}")
self.state_cb = None
# ... other callbacks ...
self.ser = serial.Serial(name, 921600, timeout=None)
self.lock = threading.Lock()
# ... other state variables ...
self.quit = threading.Event()
self.t = threading.Thread(target=self.recv_thread, daemon=True)
self.t.start()
self.set_torque(1)
self.set_pid()
while self.last_position is None: # wait for init done
time.sleep(0.1)
- Open Serial Port: It initializes
serial.Serial
with the provided devicename
(e.g.,/dev/ttyUSB0
) and a high baud rate of 921,600, as specified in the system architecture. - Callbacks & State: It initializes placeholders for various callbacks (
state_cb
,pong_cb
, etc.) and state variables (last_position
,last_velocity
). Threading locks (self.lock
,self.write_lock
) are created to ensure data integrity in a multi-threaded context. - Start Receiver Thread: It spawns a daemon thread running the
recv_thread
method. This is crucial as it allows the controller to continuously listen for incoming data from the firmware without blocking the main application thread. - Initial Commands:
self.set_torque(1)
: It immediately sends a command to enable the motor torque.self.set_pid()
: It sends a default set of PID tuning parameters to the firmware.
- Wait for Feedback: The constructor blocks (
while self.last_position is None
) until the first feedback packet is received and processed byrecv_thread
. This ensures that the controller has a valid initial state before the application attempts to use it.
2.2. Communication Protocol In-Depth
The communication between arm_controller.py
and the firmware (comm.cpp
) relies on a custom serial protocol designed for real-time control. This section details its structure, key packet types, and a critical implementation nuance.
Packet Structure
All communication happens in fixed-size packets, which are processed atomically. The data is encoded in big-endian format.
A standard packet has the following structure:
[Header (1B)] [Type (1B)] [Payload (16B)] [Checksum (1B)]
- Header (
COMM_HEAD = 0x5A
): A constant byte that marks the beginning of a new packet. The receiver uses this to synchronize with the data stream. - Type: A single byte defining the purpose of the packet. Both the Python script and firmware define a set of
COMM_TYPE_*
constants for this purpose. - Payload: The data content. For most packets, this is 16 bytes. A notable exception is the
COMM_TYPE_PIDTUNE
packet, which has a 32-byte payload to carry floating-point PID parameters. - Checksum: A single byte used to verify data integrity. It is calculated as the sum of the
Type
byte and all bytes in thePayload
, with the result taken modulo 256.
# arm_controller.py: Checksum validation for incoming packets
@staticmethod
def checksum(data: bytes):
checksum = 0
for b in data[1:-1]: # Sums Type + Payload
checksum = checksum + b
checksum = checksum % 256
return checksum == data[-1]
Key Packet Types
-
From Python to Firmware:
COMM_TYPE_CTRL
(0x02): The primary command for motion control. Its 16-byte payload contains the target positions for the 6 joints, encoded asunsigned short
integers after conversion byto_raw_unit
.COMM_TYPE_TORQUE
(0x04): Enables (1
) or disables (0
) motor torque.COMM_TYPE_PIDTUNE
(0x08): Sends PID tuning parameters to the firmware's motor controllers. It has a larger 32-byte payload.
-
From Firmware to Python:
COMM_TYPE_FEEDBACK
(0x03): The primary data packet from the arm. Its 16-byte payload contains the current positions of the 6 joints, whicharm_controller.py
uses to estimate position, velocity, and effort.COMM_TYPE_PONG
(0x01): A response to aPING
, used for connection health checks.
set_pid
Command Construction Example: To understand how commands are assembled, let's examine the set_pid
method from arm_controller.py
. This function is responsible for sending PID tuning parameters to the firmware.
# arm_controller.py
def set_pid(self, p=30, i=0, d=0, i_max=800, p2=10, p2_err_thres=2, i_clip_thres=100000.0, i_clip_coef=1):
self.write(struct.pack('>BBffffffff', self.COMM_HEAD, self.COMM_TYPE_PIDTUNE, *[p, i, d, i_clip_thres, i_clip_coef, i_max, p2, p2_err_thres]))
The core of this operation is struct.pack()
, which converts Python data types into a sequence of bytes. Here's a breakdown of the format string '>BBffffffff'
:
>
: Specifies that the data should be packed in big-endian order. This is critical for ensuring the multi-byte numbers (the floats) are interpreted correctly by the ESP32's processor.B
: Packs an unsigned char (1 byte). The firstB
is forself.COMM_HEAD
(the0x5A
header).B
: A second unsigned char forself.COMM_TYPE_PIDTUNE
(the packet type identifier,0x08
).ffffffff
: Packs eight floating-point numbers (4 bytes each). These correspond to the eight PID parameters passed into the function.
The arguments are passed to struct.pack
in order:
self.COMM_HEAD
->B
self.COMM_TYPE_PIDTUNE
->B
- The list of 8 parameters is unpacked with
*
, so each float is passed as a separate argument, matching the eightf
s in the format string.
The result is a 34-byte bytes
object:
- 1 byte for the Header (
0x5A
) - 1 byte for the Type (
0x08
) - 32 bytes for the Payload (8 floats * 4 bytes/float)
This 34-byte object is then passed to self.write()
, which sends it directly over the serial port. This process perfectly constructs the [Header][Type][Payload]
sequence that the firmware's comm_recv_poll
function is expecting for a PIDTUNE
command.
Implementation Asymmetry: A Critical Detail
A deep analysis of arm_controller.py
and comm.cpp
reveals an important asymmetry in the protocol's implementation:
-
Firmware-to-Python (Robust): The firmware's
comm_send_blocking
function incomm.cpp
correctly calculates and appends a checksum to every outgoing packet. On the receiving end,arm_controller.py
'srecv_thread
validates this checksum using thechecksum
method. If the checksum fails, the packet is discarded. This ensures a reliable data stream from the arm to the host computer. -
Python-to-Firmware (Less Robust): The Python script does not calculate or append a checksum when sending commands like
set_pos
. Thestruct.pack
calls build an 18-byte packet (Header
+Type
+Payload
). Concurrently, the firmware'scomm_recv_poll
function incomm.cpp
is written to expect exactly this length; it checks forHeader + Type + Payload
but does not account for or verify a checksum byte.
Implication: This works for basic operation, but it means there is no integrity check on commands sent to the arm. A corrupted COMM_TYPE_CTRL
packet due to serial noise could be interpreted by the firmware as a valid (but incorrect) target position, potentially leading to unexpected movements. The protocol is effectively one-way validated.
2.3. Functional Deep Dive with Examples
This section provides a practical, step-by-step walkthrough of the core functions, illustrating the data flow with concrete examples.
Sending Commands to the Arm
set_torque(torque)
Example 1: This is the simplest command. Its purpose is to enable or disable the power to all motors.
# arm_controller.py
def set_torque(self, torque):
self.write(struct.pack('>BBBxxxxxxxxxxxxxxx', self.COMM_HEAD, self.COMM_TYPE_TORQUE, torque))
- Scenario: We want to enable the motors. We call
controller.set_torque(1)
. - Packing:
struct.pack
is called with the format>BBBxxxxxxxxxxxxxxx
.>
: Big-endian.B
:self.COMM_HEAD
(0x5A
).B
:self.COMM_TYPE_TORQUE
(0x04
).B
: Thetorque
value,1
(0x01
).x
is a padding byte. The 15x
's ensure the payload is padded out to 16 bytes, matching the standard packet length, even though the command only needs one byte.
- Resulting Packet:
5a 04 01 00 00 00 00 00 00 00 00 00 00 00 00 00 00
. This 18-byte sequence is sent over the serial port. The firmware receives it, sees type0x04
, and reads the third byte (0x01
) to enable the motors.
set_pos(pos)
Example 2: This is the most common command, used to move the arm to a specific configuration.
- Scenario: We want to move the first joint (joint #2) to 0.5 radians and all other joints to 0. We call
controller.set_pos(np.array([0.5, 0, 0, 0, 0, 0]))
. - Step 1: Joint Limit Check: The method first checks if
0.5
is within the allowed range for the first joint. Assuming it is, the function proceeds. - Step 2: Unit Conversion: The
pos
array[0.5, 0, 0, 0, 0, 0]
is passed toto_raw_unit()
.- This static method converts the SI unit values (radians) into the 12-bit integer values (0-4095) that the firmware expects.
- For
0.5
radians, the calculation is roughlyint(-0.5 / (2 * pi) * 4096 + 2048)
, which results in1721
. The other zeros map to2048
. - The result is an integer array like
[1721, 2048, 2048, 2048, 2048, 2048]
.
- Step 3: Packing:
struct.pack
is called with the format>BBHHHHHHxxxx
.B
:self.COMM_HEAD
(0x5A
).B
:self.COMM_TYPE_CTRL
(0x02
).H
: An unsigned short (2 bytes). The sixH
's correspond to the six integer values from the previous step.1721
is packed as06 b9
.2048
is packed as08 00
.xxxx
: Four padding bytes are added to complete the 16-byte payload.
- Resulting Packet:
5a 02 06 b9 08 00 08 00 08 00 08 00 08 00 00 00 00 00
. This is sent to the firmware, which interprets it as a new target position for the arm's PID controllers.
Receiving and Processing Data from the Arm
recv_thread
processing a FEEDBACK
packet
Example: The recv_thread
runs constantly in the background, listening for incoming serial data.
- Scenario: The arm is powered on and sending its state. A complete feedback packet arrives at the serial port.
- Step 1: Synchronization and Read: The thread finds the
0x5A
header and reads the next 18 bytes to get a full packet. - Dummy Data: Let's assume the received
data
(after the header) is03 07 d0 08 00 08 00 08 00 08 00 08 00 2b
. - Step 2: Checksum Validation:
- The
checksum()
method is called on the full 19-byte packet. - It sums all bytes from the type (
0x03
) to the end of the payload. The last byte of the packet (0x2b
) is the checksum sent by the firmware. - If
(0x03 + 0x07 + 0xd0 + ... + 0x08 + 0x00) % 256
equals0x2b
, the check passes.
- The
- Step 3: Parsing:
- The packet
type
isdata[1]
, which is0x03
(COMM_TYPE_FEEDBACK
). - The
struct.unpack
function is called with format>HHHHHHxxxx
on the 16-byte payload07 d0 08 00 ... 08 00
. - This unpacks the raw joint values:
[2000, 2048, 2048, 2048, 2048, 2048]
.
- The packet
- Step 4: Unit Conversion:
- The raw integer array is passed to
to_si_unit()
. - For
2000
, the calculation is-(2000 - 2048) / 4096 * (2 * pi)
, which is0.0736
radians. The other2048
values become0
. - The resulting SI
position
is[0.0736, 0, 0, 0, 0, 0]
.
- The raw integer array is passed to
- Step 5: State Estimation and Callback:
- The thread acquires a lock to safely update the shared state variables.
- It calculates
delta_time
since the last packet. - It calculates
velocity
by subtracting theself.last_position
from the newposition
and dividing bydelta_time
. - It updates
self.last_position
,self.last_velocity
, etc. - Finally, if a
state_cb
function is registered, it's called with the newly computed position, velocity, and effort. This is how the rest of the application gets live updates from the arm.
2.4. Sending Commands
The class provides high-level methods to send commands to the arm. These methods handle the necessary data conversion and packet formatting.
set_pos
)
Setting Position (# arm_controller.py
def set_pos(self, pos):
ok = True
for i, (p, mn, mx) in enumerate(zip(pos, self.JOINT_MIN, self.JOINT_MAX)):
if not (mn <= p <= mx):
# ... log error and call error_cb ...
ok = False
if ok:
self.write(struct.pack('>BBHHHHHHxxxx', self.COMM_HEAD, self.COMM_TYPE_CTRL, *self.to_raw_unit(pos)))
This method takes a target position array in SI units.
- Joint Limit Check: It first validates the command against predefined
JOINT_MIN
andJOINT_MAX
limits to ensure safe operation. - Unit Conversion: It converts the SI position command back to the raw integer units expected by the firmware using
to_raw_unit
. - Packing and Sending: It uses
struct.pack
to assemble the finalCOMM_TYPE_CTRL
packet and sends it over the serial port via the thread-safeself.write()
method.
Other Commands
set_torque(torque)
: Sends aCOMM_TYPE_TORQUE
packet to enable (1
) or disable (0
) power to the motors.set_pid(...)
: Sends aCOMM_TYPE_PIDTUNE
packet. As noted in the analysis document, these parameters configure the advanced PID controller running on the firmware, which handles the real-time motor control loop.
to_si_unit
, to_raw_unit
)
2.5. Data Conversion (These static methods are helpers for translating between two different representations of the arm's state.
# arm_controller.py
@staticmethod
def to_si_unit(arr):
arr = arr.copy()
arr = -(np.array(arr) - 2048) / 4096 * (2*math.pi)
arr[-1] *= ArmController.GRIPPER_GEAR_R
arr[-1] += 0.03 # Gripper offset
return arr
@staticmethod
def to_raw_unit(arr):
arr = arr.copy()
arr[-1] -= 0.03
arr[-1] /= ArmController.GRIPPER_GEAR_R
arr = (-np.array(arr) / (2*math.pi) * 4096 + 2048).astype(int)
return arr
- The firmware represents joint positions as integers from 0 to 4095, corresponding to a full circle.
- The high-level Python code (and ROS) uses standard SI units: radians for rotational joints and meters for the linear motion of the gripper.
- These methods perform the linear transformation between these two coordinate systems. Note the special handling for the gripper, which involves a gear ratio (
GRIPPER_GEAR_R
) and a calibrated offset.
3. Conclusion
arm_controller.py
is a well-designed driver that serves as the critical bridge between high-level robotics software (like a ROS node) and the low-level AstraArmController
firmware. It effectively hides the complexities of serial communication, packet formatting, and state estimation, providing a simple and robust API. Its use of a dedicated background thread for receiving data ensures that the main application remains responsive while receiving a steady stream of state updates from the hardware.