In this project, we use the myController S570, a wearable motion capture device, to collect human motion data on Ubuntu 20.04. This data will be fed into LeRobot, an open-source Learning from Demonstration (LfD) platform, enabling robots to learn tasks by mimicking human demonstrations.
The S570 provides real-time orientation and motion data from multiple degrees of freedom, making it ideal for teleoperation and robot training applications.
LeRobotLeRobot is an open-source, modular robotics platform built on ROS, designed for fast prototyping and real-world robot control. It supports easy integration with various hardware and AI models.
Project link: huggingface/lerobot
Make sure to use a GPU device with CUDA cores; otherwise, the environment cannot be installed successfully.
myController S570The myController S570 is highly compatible, easily adapting to the widely used collaborative robots in industrial applications. Its lightweight exoskeleton ensures easy portability, while supporting up to 14 degrees of freedom(DOF). Equipped with 2 joysticks and 2 buttons, it is well-suited for remote operation, research on unmanned tasks, and data acquisition, making it an ideal tool for industrial automation and workstation setup.
Before installation, you need to prepare:
● Python 3.8+
● Miniconda for environment management
● Git and pip
1.1 Install Git Toolsudo apt update sudo apt install git wget -y
1.2 Install Minicondawget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash Miniconda3-latest-Linux-x86_64.sh
When prompted, select 'yes' to initialize Conda.
Close and reopen your terminal or run:
source ~/.bashrc
1.3 Create a Conda Environmentconda create -n lerobot python=3.8 -y
conda activate lerobot
1.4 Clone the LeRobot Repositorygit clone git@github.com:lerobot/lerobot.git
cd lerobot
pip install .
1.5 Install Python Dependenciespip install empy==3.4
pip install pymycobot
pip install
conda install -y -c conda-forge ffmpeg
pip uninstall -y opencv-python
conda install -y -c conda-forge "opencv>=4.10.0"
2. Install ROS Package of myController S5702.1 Build ROS Workspacecd ~
mkdir myController_ws && mkdir myController_ws/src && cd myController_ws/src
(You need to download the correct branch)
git clone -b mycontroller_s570 https://github.com/elephantrobotics/mycobot_ros.git
cd ..
catkin_make
Create a new file, copy the code into it, and save it as "data_record.py".
#!/usr/bin/env python3
"""
LeRobot Data Collection Script
Collects robot joint states and camera data, and saves them in LeRobot-compatible format.
"""
import rospy
import cv2
import numpy as np
import json
import os
import datetime
from sensor_msgs.msg import JointState, Image
from cv_bridge import CvBridge
import threading
import time
from collections import deque
import argparse
class LeRobotDataCollector:
def __init__(self, output_dir="lerobot_dataset", buffer_size=1000):
"""
Initialize data collector
Args:
output_dir: Directory to save data
buffer_size: Size of the data buffer
"""
rospy.init_node('lerobot_data_collector', anonymous=True)
self.output_dir = output_dir
self.bridge = CvBridge()
# Create output directories
self.setup_directories()
# Data buffers
self.joint_buffer = deque(maxlen=buffer_size)
self.image_buffer = deque(maxlen=buffer_size)
# Control flags
self.collecting = False
self.episode_count = 0
self.frame_count = 0
# Data lock
self.data_lock = threading.Lock()
# ROS subscribers
self.joint_sub = rospy.Subscriber('/joint_states', JointState, self.joint_callback)
self.image_sub = rospy.Subscriber('/camera/image_raw', Image, self.image_callback)
# Camera
self.camera_available = False
self.try_camera_sources()
# Current data
self.current_joint_data = None
self.current_image = None
self.last_joint_time = None
self.last_image_time = None
rospy.loginfo("LeRobot Data Collector initialized")
def setup_directories(self):
"""Create required directory structure"""
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
self.session_dir = os.path.join(self.output_dir, f"session_{timestamp}")
os.makedirs(self.session_dir, exist_ok=True)
os.makedirs(os.path.join(self.session_dir, "images"), exist_ok=True)
os.makedirs(os.path.join(self.session_dir, "episodes"), exist_ok=True)
rospy.loginfo(f"Data will be saved in: {self.session_dir}")
def try_camera_sources(self):
"""Try available USB camera sources"""
for i in range(3):
try:
cap = cv2.VideoCapture(i)
if cap.isOpened():
ret, frame = cap.read()
if ret:
self.camera_source = i
self.camera_available = True
cap.release()
rospy.loginfo(f"Found USB camera: /dev/video{i}")
return
cap.release()
except:
continue
rospy.logwarn("No usable USB camera found. Will use ROS image topic only.")
def joint_callback(self, msg):
"""Joint state callback"""
with self.data_lock:
if len(msg.position) >= 14:
joint_data = {
'timestamp': msg.header.stamp.to_sec(),
'joint_names': list(msg.name[:14]),
'joint_positions': list(msg.position[:14]),
'joint_velocities': list(msg.velocity[:14]) if len(msg.velocity) >= 14 else [0.0] * 14,
'joint_efforts': list(msg.effort[:14]) if len(msg.effort) >= 14 else [0.0] * 14
}
self.current_joint_data = joint_data
self.last_joint_time = rospy.Time.now()
if self.collecting:
self.joint_buffer.append(joint_data)
def image_callback(self, msg):
"""ROS image callback"""
try:
cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8")
with self.data_lock:
self.current_image = cv_image
self.last_image_time = rospy.Time.now()
if self.collecting:
image_data = {
'timestamp': msg.header.stamp.to_sec(),
'image': cv_image,
'frame_id': self.frame_count
}
self.image_buffer.append(image_data)
except Exception as e:
rospy.logerr(f"Image conversion error: {e}")
def capture_usb_camera(self):
"""Capture from USB camera"""
if not self.camera_available:
return None
try:
cap = cv2.VideoCapture(self.camera_source)
ret, frame = cap.read()
cap.release()
if ret:
return frame
except:
pass
return None
def sync_and_save_data(self):
"""Synchronize and save collected data"""
if not self.joint_buffer or not self.image_buffer:
rospy.logwarn("Buffers are empty. Cannot save episode.")
return
episode_data = {
'episode_id': self.episode_count,
'timestamp': datetime.datetime.now().isoformat(),
'total_frames': len(self.joint_buffer),
'data': []
}
joint_list = list(self.joint_buffer)
image_list = list(self.image_buffer)
min_length = min(len(joint_list), len(image_list))
for i in range(min_length):
joint_data = joint_list[i]
image_data = image_list[i]
image_filename = f"episode_{self.episode_count:04d}_frame_{i:06d}.jpg"
image_path = os.path.join(self.session_dir, "images", image_filename)
cv2.imwrite(image_path, image_data['image'])
data_record = {
'frame_id': i,
'timestamp': joint_data['timestamp'],
'joint_data': {
'names': joint_data['joint_names'],
'positions': joint_data['joint_positions'],
'velocities': joint_data['joint_velocities'],
'efforts': joint_data['joint_efforts']
},
'image_path': image_filename,
'image_timestamp': image_data['timestamp']
}
episode_data['data'].append(data_record)
episode_filename = f"episode_{self.episode_count:04d}.json"
episode_path = os.path.join(self.session_dir, "episodes", episode_filename)
with open(episode_path, 'w') as f:
json.dump(episode_data, f, indent=2)
rospy.loginfo(f"Episode {self.episode_count} saved: {min_length} frames")
self.joint_buffer.clear()
self.image_buffer.clear()
self.episode_count += 1
def start_collection(self):
"""Start data collection"""
with self.data_lock:
self.collecting = True
self.frame_count = 0
rospy.loginfo("Data collection started.")
def stop_collection(self):
"""Stop data collection and save"""
with self.data_lock:
if self.collecting:
self.collecting = False
rospy.loginfo("Data collection stopped.")
self.sync_and_save_data()
def display_status(self):
"""Display current status"""
joint_status = "✓" if self.current_joint_data else "✗"
image_status = "✓" if self.current_image is not None else "✗"
joint_time_diff = (rospy.Time.now() - self.last_joint_time).to_sec() if self.last_joint_time else float('inf')
image_time_diff = (rospy.Time.now() - self.last_image_time).to_sec() if self.last_image_time else float('inf')
status = f"""
--- System Status ---
Joint Data Received: {joint_status} | Age: {joint_time_diff:.2f}s
Image Data Received: {image_status} | Age: {image_time_diff:.2f}s
Episodes Collected: {self.episode_count}
Collecting Now: {self.collecting}
---------------------
"""
print(status)
if self.current_joint_data:
print("Current Joint Positions:")
for i, (name, pos) in enumerate(zip(self.current_joint_data['joint_names'],
self.current_joint_data['joint_positions'])):
print(f" {name}: {pos:.4f}")
def run_interactive(self):
"""Run in interactive terminal mode"""
print("""
=== LeRobot Data Collector ===
Press 's' to start episode
Press 'e' to end and save episode
Press 'status' to show current state
Press 'q' to quit
Press 'h' for help
""")
rate = rospy.Rate(10) # 10Hz
while not rospy.is_shutdown():
try:
import select
import sys
if select.select([sys.stdin], [], [], 0.1)[0]:
command = sys.stdin.readline().strip().lower()
if command == 's':
self.start_collection()
elif command == 'e':
self.stop_collection()
elif command == 'q':
if self.collecting:
self.stop_collection()
break
elif command == 'status':
self.display_status()
elif command == 'h':
print("""
Commands:
's' - start episode
'e' - stop episode and save
'q' - quit
'status' - show current status
'h' - help
""")
if self.current_image is not None:
display_img = cv2.resize(self.current_image, (640, 480))
status_text = f"Episode: {self.episode_count} | Collecting: {self.collecting}"
cv2.putText(display_img, status_text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.imshow('LeRobot Data Collector', display_img)
cv2.waitKey(1)
rate.sleep()
except KeyboardInterrupt:
break
cv2.destroyAllWindows()
if self.collecting:
self.stop_collection()
self.save_session_summary()
def save_session_summary(self):
"""Save session metadata summary"""
summary = {
'session_info': {
'total_episodes': self.episode_count,
'session_directory': self.session_dir,
'collection_date': datetime.datetime.now().isoformat()
},
'data_format': {
'joint_names': self.current_joint_data['joint_names'] if self.current_joint_data else [],
'description': 'LeRobot compatible dataset format'
}
}
summary_path = os.path.join(self.session_dir, "session_summary.json")
with open(summary_path, 'w') as f:
json.dump(summary, f, indent=2)
rospy.loginfo(f"Session summary saved: {summary_path}")
def main():
parser = argparse.ArgumentParser(description='LeRobot data collection')
parser.add_argument('--output', '-o', default='lerobot_dataset',
help='Output directory (default: lerobot_dataset)')
parser.add_argument('--buffer-size', '-b', type=int, default=1000,
help='Buffer size (default: 1000)')
args = parser.parse_args()
try:
collector = LeRobotDataCollector(
output_dir=args.output,
buffer_size=args.buffer_size
)
rospy.loginfo("Initializing...")
rospy.sleep(2)
collector.run_interactive()
except rospy.ROSInterruptException:
rospy.loginfo("ROS interrupted.")
except Exception as e:
rospy.logerr(f"Error: {e}")
if __name__ == '__main__':
main()
4. Run Data AcquisitionRun the RViz process for myController within the ROS environment, and then execute the data collection script. The script will automatically subscribe to the robot arm joint states and external camera data from the ROS topics, process the data accordingly, and save it to files.
This article provided a brief overview of how to use myController S570 to generate training data for LeRobot. In the next phase, we will integrate a real robotic arm myCobot 280 for data acquisition and model training, aiming to achieve robotic arm control via the LeRobot framework. We also look forward to more researchers exploring the potential of exoskeletons in advanced scientific applications.
Comments