추가 의견 - dingdongdengdong/astra_ws GitHub Wiki
시스템 개요
본 시스템은 웹 브라우저를 통해 사용자가 so-100 로봇 팔을 원격으로 제어하는 것을 목표로 합니다. ESP32는 웹 서버로부터 제어 명령을 받아 STS3215 서보모터(TTL 통신 보드 사용)를 구동하여 로봇 팔을 움직입니다. 이 과정에서 ROS2를 사용하거나 사용하지 않는 두 가지 주요 접근 방식을 고려할 수 있습니다.
핵심 구성 요소 및 역할
-
so-100 로봇 팔:
- 제어 대상이 되는 물리적인 로봇 팔입니다. STS3215 서보모터에 의해 각 관절이 구동됩니다.
-
STS3215 서보모터 (TTL 통신 보드 포함):
- so-100 로봇 팔의 각 관절을 움직이는 액추에이터입니다.
- TTL 통신 보드를 통해 ESP32와 UART (Serial) 통신을 수행합니다. ESP32로부터 특정 프로토콜(예: Feetech/SCServo 프로토콜)에 따른 명령을 받아 위치, 속도 등을 제어하고, 현재 상태(위치, 온도 등)를 ESP32로 피드백할 수 있습니다.
-
ESP32 (마이크로컨트롤러):
- 하드웨어 직접 제어: STS3215 서보모터의 TTL 통신 보드와 UART를 통해 직접 통신하여 로봇 팔의 각 관절을 정밀하게 제어합니다. 웹 서버로부터 받은 상위 레벨의 명령 (예: "1번 관절 30도 회전")을 STS3215 서보가 이해할 수 있는 저수준 TTL 프로토콜 명령으로 변환하여 전송합니다.
- 네트워크 연결: Wi-Fi를 통해 로컬 네트워크에 연결됩니다.
- 통신: 웹소켓(WebSocket) 또는 HTTP 요청을 통해 웹 서버(ROS2 노드 또는 독립 Python 서버)와 통신합니다. 제어 명령을 수신하고, 로봇 팔의 상태 정보(서보 각도, 센서 값 등)를 송신합니다.
- 펌웨어: 수신된 명령 해석 및 서보 제어 로직이 포함된 코드가 업로드됩니다. STS3215 서보 제어를 위한 라이브러리(예: Feetech SCServo 라이브러리 또는 해당 프로토콜을 직접 구현)가 필요합니다.
-
웹 브라우저 (사용자 인터페이스):
- 사용자가 로봇 팔을 원격으로 조종할 수 있는 인터페이스를 제공합니다. (예시:
non_ros_src/astra_teleop_web/src/astra_teleop_web/static/index.html
기반으로 개발 가능) - HTML, CSS, JavaScript로 구성되며, 사용자의 입력(버튼 클릭, 슬라이더 조작 등)을 감지합니다.
- JavaScript를 사용하여 웹소켓을 통해 ESP32 또는 중간 서버로 제어 명령을 전송합니다.
- ESP32로부터 로봇 상태 정보를 받아 화면에 표시할 수 있습니다.
- 사용자가 로봇 팔을 원격으로 조종할 수 있는 인터페이스를 제공합니다. (예시:
-
웹 원격 조종 서버 (선택에 따라 구현):
- 옵션 1:
teleop_web_node.py
(ROS2 웹 원격 조종 노드):- ROS2 시스템과 웹 인터페이스, 그리고 ESP32 간의 중계 역할을 합니다.
astra_controller
패키지에 포함된 코드를 참고하여 개발할 수 있습니다. - 웹 서버 기능: 웹 브라우저로부터 웹소켓 또는 HTTP 요청을 수신합니다. (Python의
aiohttp
,flask
,fastapi
등 사용) - ROS2 연동: 웹 명령을 ROS2 토픽 (예:
/joint_commands
)으로 발행하여 다른 ROS2 노드(예: 로봇 팔 제어 노드)로 전달하거나, 직접 ESP32로 명령을 포워딩합니다. - ESP32 통신: 수신한 웹 명령을 ESP32가 이해할 수 있는 형태로 변환하여 네트워크(TCP/IP, UDP, 웹소켓)를 통해 ESP32로 전송합니다.
- ROS2 시스템과 웹 인터페이스, 그리고 ESP32 간의 중계 역할을 합니다.
- 옵션 2:
astra_teleop_web/src/astra_teleop_web/webserver.py
(독립형 웹 서버):- ROS2 없이 독립적인 웹 기반 원격 조종 시스템을 구성할 때 사용합니다.
- Python 기반 웹 서버(Flask, aiohttp 등)를 실행하여
static/index.html
과 같은 정적 파일을 제공하고, 웹소켓 또는 HTTP 엔드포인트를 통해 클라이언트(웹 브라우저, ESP32)와 직접 통신합니다. teleoperator.py
는 웹 서버로부터 받은 명령을 처리하고 ESP32와 통신하는 로직을 담당할 수 있습니다.
- 옵션 1:
데이터 송수신 및 구동 방식 다이어그램
제공해주신 다이어그램은 여전히 유효하며, 특히 ESP32와 하드웨어(여기서는 STS3215 서보모터) 간의 통신은 Serial (TTL UART) 을 통해 이루어진다는 점을 명확히 합니다.
graph TD
%% 사용자 인터페이스 서브그래프
subgraph 사용자 인터페이스
A[웹 브라우저 - index.html JS]
end
%% 네트워크 통신 서브그래프
subgraph 네트워크 통신
B[Websocket HTTP]
end
%% 서버 및 ROS2 통합 서브그래프
subgraph 서버/ROS2 통합
C[teleop_web_node.py - ROS2]
D[webserver.py - Non-ROS]
end
%% ROS2 시스템 서브그래프
subgraph ROS2 시스템
E[ROS2 Arm 제어 노드]
F[ROS2 토픽 - /joint_commands]
end
%% 마이크로컨트롤러 및 하드웨어 서브그래프
subgraph 마이크로컨트롤러/하드웨어
G[ESP32]
J[896C TTL 모터 제어 보드]
H[6DOF Arm 모터 STS3215]
end
%% 연결
A -->|제어 명령 JSON Text| B
B -->|웹 명령| C
B -->|웹 명령| D
%% 시나리오 1: teleop_web_node.py를 통한 ROS2 통합
C -->|파싱된 명령| F
F --> E
E -->|로봇 팔 명령 네트워크| J
J -->|TTL Serial 명령 STS3215 프로토콜| H
C -->|직접 명령 TCP UDP Websocket| G
%% 시나리오 2: 독립 웹 서버
D -->|파싱된 명령 TCP UDP Websocket| G
%% ESP32와 모터 제어 보드 간 통신
G -->|TTL Serial 명령 STS3215 프로토콜| J
J --> H
%% 하드웨어 상호작용 및 피드백
H -->|상태 피드백 예: 현재 각도| J
J -->|TTL Serial 데이터| G
G -->|상태 센서 데이터 Websocket TCP UDP| C
G -->|상태 센서 데이터 Websocket HTTP| D
D -->|상태 센서 데이터| B
C -->|상태 센서 데이터 Websocket 또는 ROS2 토픽| B
B -->|화면 업데이트 로봇 상태| A
%% 스타일링
classDef ui fill:#D5F5E3,stroke:#28B463,stroke-width:2px
classDef network fill:#E8DAEF,stroke:#8E44AD,stroke-width:2px
classDef ros fill:#D6EAF8,stroke:#2E86C1,stroke-width:2px
classDef hardware fill:#FDEDEC,stroke:#CB4335,stroke-width:2px
classDef topic fill:#FEF9E7,stroke:#F39C12,stroke-width:1px
class A ui
class B network
class C,E ros
class D ros
class F topic
class G,J,H hardware
데이터 송수신 및 구동 상세 설명
-
사용자 입력 (A → B): 웹 브라우저에서 사용자가 로봇 팔 제어 명령을 입력하면, JavaScript가 이를 JSON 또는 텍스트 형태로 변환하여 웹소켓 또는 HTTP를 통해 웹 서버로 전송합니다.
-
서버에서의 명령 수신 및 처리 (B → C 또는 B → D → G):
- ROS2 통합 (
teleop_web_node.py
, C):- 웹 서버는 명령을 수신하여 파싱한 후, 이를 ROS2 토픽(F)으로 발행하거나, ESP32(G)가 이해할 수 있는 형식으로 변환하여 네트워크를 통해 직접 전송합니다.
- 독립 실행 (
webserver.py
, D):- 웹 서버는 수신된 명령을 ESP32(G)가 이해할 수 있는 프로토콜로 변환하여 네트워크(Wi-Fi)를 통해 ESP32로 전송합니다.
- ROS2 통합 (
-
ESP32에서의 명령 수신 및 서보 제어 (G → I → H):
- ESP32(G)는 Wi-Fi를 통해 웹 서버로부터 제어 명령을 수신합니다.
- ESP32 펌웨어는 수신된 명령 (예: 특정 관절의 목표 각도)을 해석합니다.
- 해석된 명령에 따라, ESP32는 UART를 사용하여 STS3215 TTL 통신 보드(I)에 해당 서보모터(H)를 제어하기 위한 TTL 레벨의 시리얼 명령(STS3215 프로토콜에 따름)을 전송합니다. 이 명령에는 ID, 목표 위치, 속도 등의 정보가 포함될 수 있습니다.
- STS3215 서보모터(H)는 수신된 명령에 따라 구동됩니다.
-
상태 정보 피드백 (H → I → G → D/C → B → A) (선택 사항):
- STS3215 서보모터(H)는 현재 각도, 속도, 부하, 온도 등의 상태 정보를 TTL 통신 보드(I)를 통해 ESP32(G)로 전송할 수 있습니다.
- ESP32는 이 정보를 수집하여 웹 서버(D 또는 C)로 전송하고, 웹 서버는 다시 웹 브라우저(A)로 전달하여 사용자에게 로봇의 현재 상태를 표시합니다.
구체적인 수행 방법 (단계별)
-
하드웨어 연결 및 설정:
- so-100 로봇 팔 조립: STS3215 서보모터를 각 관절에 맞게 조립합니다.
- STS3215 서보 및 TTL 보드 연결: 각 STS3215 서보를 TTL 통신 보드에 연결합니다. 서보 ID를 각각 다르게 설정해야 개별 제어가 가능합니다. (STS3215 매뉴얼 참조)
- ESP32와 TTL 통신 보드 연결: ESP32의 UART TX/RX 핀을 STS3215 TTL 통신 보드의 RX/TX 핀에 연결합니다. (주의: ESP32는 3.3V 로직 레벨을 사용하므로, TTL 통신 보드가 3.3V와 호환되는지 확인 필요. 레벨 시프터가 필요할 수 있습니다.)
- 전원 공급: ESP32와 STS3215 서보모터에 충분한 전원을 각각 또는 공통으로 공급합니다. 서보모터는 상대적으로 큰 전류를 소모하므로 안정적인 전원 공급이 매우 중요합니다.
-
ESP32 펌웨어 개발:
- 개발 환경 설정: Arduino IDE, PlatformIO (VS Code 확장) 등 ESP32 개발 환경을 설정합니다.
- Wi-Fi 연결: ESP32가 로컬 Wi-Fi 네트워크에 접속하도록 설정합니다.
- 웹 서버 또는 클라이언트 구현:
- ESP32가 직접 웹 서버 역할을 하여 웹 브라우저로부터 HTTP 요청/웹소켓 연결을 받거나,
- ESP32가 클라이언트로서 중간 서버(Python 웹 서버)에 웹소켓으로 연결합니다.
- STS3215 서보 제어 로직:
- STS3215 서보모터의 통신 프로토콜(SCServo/Feetech 프로토콜)을 이해하고 구현합니다. 관련 라이브러리가 있다면 활용하고, 없다면 데이터시트를 참고하여 직접 패킷 생성 및 파싱 로직을 작성해야 합니다. (Baudrate, 데이터 패킷 구조 등 확인)
- ESP32의
HardwareSerial
을 사용하여 TTL 통신 보드와 시리얼 통신을 설정합니다. - 수신된 웹 명령(예: JSON 형식의
{ "joint": 1, "angle": 90 }
)을 파싱하여 해당 ID의 서보에 목표 각도, 속도 등을 설정하는 명령을 전송합니다. - (선택 사항) 서보로부터 현재 각도, 온도 등의 상태 정보를 읽어와 웹 서버로 전송하는 기능을 구현합니다.
- 펌웨어 업로드: 작성된 펌웨어를 ESP32에 업로드합니다.
-
웹 원격 조종 서버 개발 (Python, 선택 사항):
teleop_web_node.py
(ROS2) 또는webserver.py
(Non-ROS) 중 선택하여 개발합니다.- 웹소켓/HTTP 서버 설정: 웹 브라우저로부터 제어 명령을 수신하고, ESP32로 명령을 전달하는 엔드포인트를 구현합니다. (예:
aiohttp
,Flask-SocketIO
,FastAPI
사용) - 명령 중계: 브라우저에서 받은 명령을 ESP32로 전달하고, ESP32에서 받은 상태 정보를 브라우저로 전달합니다.
- ROS2 연동 (
teleop_web_node.py
의 경우):rclpy
를 사용하여 ROS2 노드를 생성합니다.- 웹 명령을 받아
/joint_commands
와 같은 토픽으로sensor_msgs/msg/JointState
또는 커스텀 메시지 형태로 발행합니다. - 다른 ROS2 노드(예: MoveIt2)와 연동하여 로봇 팔 제어 기능을 확장할 수 있습니다.
-
웹 브라우저 인터페이스 개발 (
index.html
, JavaScript):- HTML로 기본적인 UI 구조(버튼, 슬라이더, 상태 표시 영역 등)를 만듭니다.
- JavaScript를 사용하여 웹소켓 클라이언트를 구현하여 웹 서버와 양방향 통신 채널을 설정합니다.
- 사용자 입력(버튼 클릭, 슬라이더 값 변경)에 따라 제어 명령 (JSON 형식 권장)을 생성하여 웹소켓을 통해 서버로 전송합니다.
- 서버로부터 수신된 로봇 상태 정보를 화면에 동적으로 업데이트합니다.
astra_teleop_web/static/
내의 파일들을 참고하여 UI를 구성할 수 있습니다.
-
통합 테스트 및 디버깅:
- 각 구성 요소(ESP32 펌웨어, 웹 서버, 웹 UI)를 개별적으로 테스트한 후 통합 테스트를 진행합니다.
- ESP32와 STS3215 서보 간의 TTL 통신이 올바르게 이루어지는지 시리얼 모니터 등을 통해 확인합니다. (패킷 송수신 확인)
- 웹 브라우저에서 명령을 내렸을 때 로봇 팔이 의도대로 움직이는지, 상태 정보가 정확히 표시되는지 확인합니다.
주요 고려 사항:
- STS3215 통신 프로토콜: STS3215 서보 및 TTL 통신 보드의 정확한 데이터시트를 확보하여 통신 프로토콜(명령 패킷 구조, 응답 패킷 구조, 체크섬 계산 방법 등)을 완벽히 이해하고 ESP32 펌웨어에 구현해야 합니다. 일반적으로 ID, 명령어, 파라미터, 체크섬 등으로 구성됩니다.
- 전원 공급: STS3215 서보는 구동 시 상대적으로 큰 전류를 소모합니다. ESP32와 별도로 안정적인 서보 전용 전원 공급 장치를 사용하거나, ESP32와 전원을 공유할 경우 충분한 전류 용량을 확보해야 합니다. 노이즈 방지를 위한 전원 라인 설계도 중요합니다.
- ROS2 사용 여부: ROS2를 사용하면 MoveIt2와 같은 강력한 로봇 제어 프레임워크와 쉽게 통합할 수 있지만, 시스템 복잡도가 증가합니다. 단순 원격 제어가 목적이라면 Non-ROS 방식이 더 간결할 수 있습니다.
- 응답성 및 지연: 웹 기반 제어 시스템에서는 네트워크 지연이 발생할 수 있습니다. 실시간성이 매우 중요한 작업이라면 웹소켓 사용 및 데이터 전송 최적화를 고려해야 합니다.
제공된 astra_ws
내의 코드들은 본 시스템 구축에 유용한 참고 자료가 될 수 있습니다. 특히 teleop_web_node.py
, webserver.py
, index.html
등의 구조를 참고하여 개발하시면 도움이 될 것입니다. 다만, so-100
팔과 STS3215
서보에 맞는 ESP32 레벨의 저수준 제어 로직은 직접 구현하시거나 해당 하드웨어에 맞는 라이브러리를 찾아 적용해야 합니다.
astra.py
from dataclasses import dataclass, field, replace
import cv2
import numpy as np
import torch
from astra_controller.astra_controller import AstraController
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from lerobot.common.robot_devices.robots.configs import AstraRobotConfig
########################################################################
# Astra robot arm
########################################################################
class AstraRobot:
"""
Example of highest frequency teleoperation without camera:
```python
# Defines how to communicate with the motors of the leader and follower arms
astra_controller = AstraController(
)
robot = AstraRobot(astra_controller)
# Connect motors buses and cameras if any (Required)
robot.connect()
while True:
robot.teleop_step()
```
Example of highest frequency data collection without camera:
```python
# Assumes leader and follower arms have been instantiated already (see first example)
robot = AstraRobot(astra_controller)
robot.connect()
while True:
observation, action = robot.teleop_step(record_data=True)
```
Example of highest frequency data collection with cameras:
```python
# Defines how to communicate with 2 cameras connected to the computer.
# Here, the webcam of the mackbookpro and the iphone (connected in USB to the macbookpro)
# can be reached respectively using the camera indices 0 and 1. These indices can be
# arbitrary. See the documentation of `OpenCVCamera` to find your own camera indices.
cameras = {
"macbookpro": OpenCVCamera(camera_index=0, fps=30, width=640, height=480),
"iphone": OpenCVCamera(camera_index=1, fps=30, width=640, height=480),
}
# Assumes leader and follower arms have been instantiated already (see first example)
robot = AstraRobot(astra_controller)
robot.connect()
while True:
observation, action = robot.teleop_step(record_data=True)
```
Example of controlling the robot with a policy (without running multiple policies in parallel to ensure highest frequency):
```python
# Assumes leader and follower arms + cameras have been instantiated already (see previous example)
robot = AstraRobot(astra_controller)
robot.connect()
while True:
# Uses the follower arms and cameras to capture an observation
observation = robot.capture_observation()
# Assumes a policy has been instantiated
with torch.inference_mode():
action = policy.select_action(observation)
# Orders the robot to move
robot.send_action(action)
```
Example of disconnecting which is not mandatory since we disconnect when the object is deleted:
```python
robot.disconnect()
```
"""
def __init__(
self,
config: AstraRobotConfig | None = None,
**kwargs,
):
if config is None:
config = AstraRobotConfig()
# Overwrite config arguments using kwargs
self.config = replace(config, **kwargs)
self.robot_type = self.config.type
self.astra_controller: AstraController = AstraController(
space=self.config.space
)
self.is_connected = False
self.logs = {}
@property
def camera_features(self) -> dict:
cam_ft = {}
for cam_key in ["head", "wrist_left", "wrist_right"]:
key = f"observation.images.{cam_key}"
cam_ft[key] = {
"shape": (360, 640, 3),
"names": ["height", "width", "channels"],
"info": None,
}
return cam_ft
@property
def motor_features(self) -> dict:
features = {
"action.arm_l": {
"dtype": "float32",
"shape": (6,),
"names": list(range(6)),
},
"action.gripper_l": {
"dtype": "float32",
"shape": (1,),
"names": list(range(1)),
},
"action.arm_r": {
"dtype": "float32",
"shape": (6,),
"names": list(range(6)),
},
"action.gripper_r": {
"dtype": "float32",
"shape": (1,),
"names": list(range(1)),
},
"action.base": {
"dtype": "float32",
"shape": (2,),
"names": list(range(6)),
},
"action.eef_l": {
"dtype": "float32",
"shape": (7,),
"names": list(range(7)),
},
"action.eef_r": {
"dtype": "float32",
"shape": (7,),
"names": list(range(7)),
},
"action.head": {
"dtype": "float32",
"shape": (2,),
"names": list(range(2)),
},
"observation.state.arm_l": {
"dtype": "float32",
"shape": (6,),
"names": list(range(6)),
},
"observation.state.gripper_l": {
"dtype": "float32",
"shape": (1,),
"names": list(range(1)),
},
"observation.state.arm_r": {
"dtype": "float32",
"shape": (6,),
"names": list(range(6)),
},
"observation.state.gripper_r": {
"dtype": "float32",
"shape": (1,),
"names": list(range(1)),
},
"observation.state.base": {
"dtype": "float32",
"shape": (2,),
"names": list(range(6)),
},
"observation.state.eef_l": {
"dtype": "float32",
"shape": (7,),
"names": list(range(7)),
},
"observation.state.eef_r": {
"dtype": "float32",
"shape": (7,),
"names": list(range(7)),
},
"observation.state.odom": {
"dtype": "float32",
"shape": (7,),
"names": list(range(7)),
},
"observation.state.head": {
"dtype": "float32",
"shape": (2,),
"names": list(range(2)),
},
}
if self.astra_controller.space == "joint":
return {
"action": {
"dtype": "float32",
"shape": (6+1+6+1+2+2,),
"names": list(range(6+1+6+1+2+2)),
},
"observation.state": {
"dtype": "float32",
"shape": (6+1+6+1+2+2,),
"names": list(range(6+1+6+1+2+2)),
},
**features,
}
elif self.astra_controller.space == "cart":
raise NotImplementedError("Cartesian space is not supported for now")
else:
return features
@property
def features(self):
return {**self.motor_features, **self.camera_features}
def connect(self):
if self.is_connected:
raise RobotDeviceAlreadyConnectedError(
"AstraRobot is already connected. Do not run `robot.connect()` twice."
)
if not self.astra_controller:
raise ValueError(
"AstraRobot doesn't have any device to connect. See example of usage in docstring of the class."
)
# Connect the arms
self.astra_controller.connect()
self.is_connected = True
def wait_for_reset(self):
self.astra_controller.wait_for_reset()
def teleop_step(
self, record_data=False
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"AstraRobot is not connected. You need to run `robot.connect()`."
)
assert record_data, "Please use Astra Web Teleop"
# Prepare to assign the positions of the leader to the follower
action, action_arm_l, action_gripper_l, action_arm_r, action_gripper_r, action_base, action_eef_l, action_eef_r, action_head = self.astra_controller.read_leader_present_position()
# Leader-follower process will be automatically handle in astra controller.
# Reason for that is we want to deliver image from device camera to the operator as soon as possible.
# Also, delay of arm is all over the place. Strictly do as aloha does may not be necessary.
# TODO delay consideration
obs_dict = self.capture_observation()
action_dict = {}
if self.astra_controller.space == 'joint' or self.astra_controller.space == 'cartesian':
action_dict["action"] = torch.from_numpy(np.array(action)).to(torch.float32)
action_dict["action.arm_l"] = torch.from_numpy(np.array(action_arm_l)).to(torch.float32)
action_dict["action.gripper_l"] = torch.from_numpy(np.array(action_gripper_l)).to(torch.float32)
action_dict["action.arm_r"] = torch.from_numpy(np.array(action_arm_r)).to(torch.float32)
action_dict["action.gripper_r"] = torch.from_numpy(np.array(action_gripper_r)).to(torch.float32)
action_dict["action.base"] = torch.from_numpy(np.array(action_base)).to(torch.float32)
action_dict["action.eef_l"] = torch.from_numpy(np.array(action_eef_l)).to(torch.float32)
action_dict["action.eef_r"] = torch.from_numpy(np.array(action_eef_r)).to(torch.float32)
action_dict["action.head"] = torch.from_numpy(np.array(action_head)).to(torch.float32)
return obs_dict, action_dict
def capture_observation(self):
"""The returned observations do not have a batch dimension."""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"AstraRobot is not connected. You need to run `robot.connect()`."
)
# TODO(rcadene): Add velocity and other info
# Read follower position
state, state_arm_l, state_gripper_l, state_arm_r, state_gripper_r, state_base, state_eef_l, state_eef_r, state_odom, state_head = self.astra_controller.read_present_position()
# Capture images from cameras
images = self.astra_controller.read_cameras()
# Populate output dictionnaries and format to pytorch
obs_dict = {}
if self.astra_controller.space == 'joint' or self.astra_controller.space == 'cartesian':
obs_dict["observation.state"] = torch.from_numpy(np.array(state)).to(torch.float32)
obs_dict["observation.state.arm_l"] = torch.from_numpy(np.array(state_arm_l)).to(torch.float32)
obs_dict["observation.state.gripper_l"] = torch.from_numpy(np.array(state_gripper_l)).to(torch.float32)
obs_dict["observation.state.arm_r"] = torch.from_numpy(np.array(state_arm_r)).to(torch.float32)
obs_dict["observation.state.gripper_r"] = torch.from_numpy(np.array(state_gripper_r)).to(torch.float32)
obs_dict["observation.state.base"] = torch.from_numpy(np.array(state_base)).to(torch.float32)
obs_dict["observation.state.eef_l"] = torch.from_numpy(np.array(state_eef_l)).to(torch.float32)
obs_dict["observation.state.eef_r"] = torch.from_numpy(np.array(state_eef_r)).to(torch.float32)
obs_dict["observation.state.odom"] = torch.from_numpy(np.array(state_odom)).to(torch.float32)
obs_dict["observation.state.head"] = torch.from_numpy(np.array(state_head)).to(torch.float32)
# Convert to pytorch format: channel first and float32 in [0,1]
for name in images:
obs_dict[f"observation.images.{name}"] = torch.from_numpy(images[name])
obs_dict["done"] = self.astra_controller.done
self.astra_controller.done = False
return obs_dict
def send_action(self, action: torch.Tensor):
"""The provided action is expected to be a vector."""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"AstraRobot is not connected. You need to run `robot.connect()`."
)
self.astra_controller.write_goal_position(action.tolist())
action_dict = {}
action_dict["action"] = action
action_dict["action.arm_l"] = torch.zeros(6).to(torch.float32)
action_dict["action.gripper_l"] = torch.zeros(1).to(torch.float32)
action_dict["action.arm_r"] = torch.zeros(6).to(torch.float32)
action_dict["action.gripper_r"] = torch.zeros(1).to(torch.float32)
action_dict["action.base"] = torch.zeros(2).to(torch.float32)
action_dict["action.eef_l"] = torch.zeros(7).to(torch.float32)
action_dict["action.eef_r"] = torch.zeros(7).to(torch.float32)
action_dict["action.head"] = torch.zeros(2).to(torch.float32)
return action_dict
def log_control_info(self, log_dt):
pass
def disconnect(self):
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"AstraRobot is not connected. You need to run `robot.connect()` before disconnecting."
)
self.astra_controller.disconnect()
self.is_connected = False
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()