head_controller.md - dingdongdengdong/astra_ws GitHub Wiki
Head Controller (head_controller.py)
flowchart
graph TD
A[Instantiate HeadController] --> B[__init__ Method];
B --> C[Log Device Name];
C --> D[Initialize Callbacks and State Variables];
D --> E[Open Serial Port];
E --> F[Create Thread Locks];
F --> G[Create Quit Event];
G --> H[Create Receive Thread];
H --> I[Start Receive Thread];
I --> J[Receive Thread Runs];
J --> K[Continuously Read Serial Data];
K --> L{Is Header Byte?};
L -- No --> M{Is Newline/Carriage Return?};
M -- Yes --> N{Databuf Looks Like Debug?};
N -- Yes --> O[Parse Debug Data];
O --> P{Debug Callback Registered?};
P -- Yes --> Q[Call Debug Callback];
Q --> R[Reset Databuf];
P -- No --> R;
N -- No --> R;
M -- No --> S[Append Data to Databuf];
S --> T[Echo Data to Stdout];
T --> K;
R --> K;
L -- Yes --> U[Read Remaining Packet Bytes];
U --> V{Packet Length Correct?};
V -- No --> K;
V -- Yes --> Y{Packet Type?};
Y --> |COMM_TYPE_PONG| Z{Pong Callback Registered?};
Z -- Yes --> AA[Call Pong Callback];
AA --> K;
Z -- No --> K;
Y --> |COMM_TYPE_FEEDBACK| AB[Unpack Raw Feedback];
AB --> AC[Convert Raw to SI Units];
AC --> AD[Record Current Time];
AD --> AE[Acquire State Lock];
AE --> AF{First Update?};
AF -- Yes --> AG[Initialize Last State and Time];
AG --> AH[Release State Lock];
AH --> AI{State Callback Registered?};
AI -- Yes --> AJ[Call State Callback with State];
AJ --> K;
AI -- No --> K;
AF -- No --> AK[Calculate Velocity and Effort];
AK --> AL[Update Last State and Time];
AL --> AH;
I --> BA[Wait for First Feedback];
BA --> BB[Controller Ready];
BB --> BC[Call set_pos method];
BC --> BD[Clamp Positions to Limits];
BD --> BE{Original Pos Close to Clamped?};
BE -- Yes --> BF[Convert Clamped to Raw Units];
BF --> BG[Construct Control Packet];
BG --> BH[Call write method];
BH --> BI[Controller Ready];
BE -- No --> BJ[Log Limit Error];
BJ --> BI;
BB --> BK[Call set_torque method];
BK --> BL[Construct Torque Packet];
BL --> BH;
BH --> BO[Acquire Write Lock];
BO --> BP[Write Data to Serial];
BP --> BQ[Release Write Lock];
BQ --> BI;
BI --> BR[Call stop method];
BR --> BS[Set Quit Event];
BS --> BT[Receive Thread Exits Loop];
BT --> BU[Disable Torque];
BU --> BV[Flush Serial Output];
BV --> BW[Close Serial Port];
BW --> BX[Controller Stopped];
BY[Object Garbage Collected] --> BZ[__del__ Method];
BZ --> BR;
J -- Quit Event Set --> BT;
Class Structure
classDiagram
class HeadController {
-current_pan: float
-current_tilt: float
-target_pan: float
-target_tilt: float
-pan_limits: tuple
-tilt_limits: tuple
-motor_controller
+__init__()
+set_position(pan, tilt)
+get_position()
+home()
+stop()
-update_state()
-check_limits()
}
class MotorController {
+set_position()
+get_position()
+get_velocity()
+enable()
+disable()
}
class SafetyMonitor {
+check_limits()
+check_velocity()
+check_acceleration()
}
HeadController --> MotorController : controls
HeadController --> SafetyMonitor : uses
Component Interaction
flowchart TD
subgraph HeadController
PC[Position Controller]
SM[Safety Monitor]
FB[Feedback Handler]
end
subgraph Motors
PM[Pan Motor]
TM[Tilt Motor]
end
subgraph External
CMD[Commands]
ST[State]
end
CMD -->|position| PC
PC -->|check| SM
SM -->|validated| PM & TM
PM & TM -->|feedback| FB
FB -->|state| ST
style HeadController fill:#bbf,stroke:#333,stroke-width:2px
style Motors fill:#fbf,stroke:#333,stroke-width:2px
style External fill:#fbb,stroke:#333,stroke-width:2px
Control Flow
sequenceDiagram
participant External
participant Controller
participant Safety
participant Motors
participant Feedback
External->>Controller: set_position(pan, tilt)
Controller->>Safety: check_limits()
Safety-->>Controller: validated positions
Controller->>Motors: apply positions
loop State Update
Motors->>Feedback: motor_states
Feedback->>External: head_state
end
State Machine
stateDiagram-v2
[*] --> Initializing
Initializing --> Homing: Start
Homing --> Ready: Home Complete
Ready --> Moving: Position Command
Moving --> Ready: Motion Complete
Ready --> [*]: Shutdown
state Moving {
[*] --> ValidatingCommand
ValidatingCommand --> ExecutingMotion
ExecutingMotion --> UpdateState
UpdateState --> [*]
}
Motion Profiles
graph LR
subgraph Position Profile
P1[Current Position]
P2[Target Position]
P3[Interpolated Path]
end
subgraph Velocity Profile
V1[Acceleration]
V2[Constant Velocity]
V3[Deceleration]
end
subgraph Limits
L1[Position Limits]
L2[Velocity Limits]
L3[Acceleration Limits]
end
P1 --> P3
P2 --> P3
V1 & V2 & V3 --> P3
L1 & L2 & L3 -->|constrain| P3
style Position Profile fill:#bbf
style Velocity Profile fill:#fbf
style Limits fill:#fbb
Error Handling
flowchart TD
A[Error Detected] --> B{Error Type}
B -->|Position Limit| C[Clip to Limit]
B -->|Velocity Limit| D[Slow Down]
B -->|Motor Error| E[Emergency Stop]
C & D --> F[Resume Motion]
E --> G[Safe Position]
G --> H[Require Reset]
Configuration
Motor Parameters
graph LR
subgraph Pan Motor
P1[Position Range]
P2[Max Velocity]
P3[Max Acceleration]
end
subgraph Tilt Motor
T1[Position Range]
T2[Max Velocity]
T3[Max Acceleration]
end
subgraph Control
C1[PID Gains]
C2[Deadband]
C3[Smoothing]
end
style Pan Motor fill:#bbf
style Tilt Motor fill:#fbf
style Control fill:#fbb
Dependencies
- ROS2 Packages:
- rclpy
- sensor_msgs
- std_msgs
- Python Libraries:
Topics
Published Topics
/head/state
(sensor_msgs/JointState)
- pan_joint position
- tilt_joint position
- velocities
- efforts
Subscribed Topics
/head/command
(sensor_msgs/JointState)
- Target pan position
- Target tilt position
Notes
- Implements smooth position control
- Enforces joint limits
- Provides homing sequence
- Real-time state feedback
- Safety monitoring
- Configurable motion profiles
- Emergency stop support
- Independent pan/tilt control