lerobot
Lerobot的使用以及代码教程
安装
- 仿照install
- linux没啥问题,有一个报错,用python3-dev代替python-dev
1
2
3
4Package python-dev is not available, but is referred to by another package.
This may mean that the package is missing, has been obsoleted or is only available from another source
However the following packages replace it:
python2-dev:i386 python2:i386 python2-dev python2 python-dev-is-python3 - pip
install超时,换镜像
pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple - 下载会很慢,又超时了,加上--default-timeout=100
数据集
数据集来源:自己采集/下载开源 ## 采集流程 真实机器人采集(需要设备)和仿真数据采集(需要软件,有的比较高级的采集也需要设备)
不管是仿真还是真实,都会用到record_loop函数 ### 真实-so100 校准:In
record.py , keep calibration followed
lerobot-so100-calibrate huggingface
or github,
then get 2 json file indicating follower_id.json and leadr_id.json whose
id name should be written into
SO100Follower/LeaderConfig(id=)
仿真
record代码
record_loop函数
record_loop里面,函数的输入参数有 -
teleop_action_processor:专门用于teleoperate,原始leader的act(joint)经过这个处理变成ee
- robot_action_processor:ee形式的action经过变成joint,可以交给机器执行
- robot_observation_processor:原始obs(joint)经过这个变成ee
分成四个部分: 1. 处理obs 1
2
3
4obs = robot.get_observation()
obs_processed = robot_observation_processor(obs)
# 此时obs_processed应该要处理所有observation作为前缀的feature了
observation_frame = build_dataset_frame(dataset.features, obs_processed, prefix=OBS_STR)1
2
3
4
5
6
7
8
9
10if policy is not None:
action_values = predict_action(
observation=observation_frame,
policy=policy,)
# 用模型推理出来的action调整格式
act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features)
elif policy is None and isinstance(teleop, Teleoperator):
# 原始act
act = teleop.get_action()
act_processed_teleop = teleop_action_processor((act, obs))1
2
3
4
5
6
7
8
9# Applies a pipeline to the action, default is IdentityProcessor
if policy is not None and act_processed_policy is not None:
action_values = act_processed_policy
robot_action_to_send = robot_action_processor((act_processed_policy, obs))
else:
action_values = act_processed_teleop
robot_action_to_send = robot_action_processor((act_processed_teleop, obs))
# 执行
_sent_action = robot.send_action(robot_action_to_send)1
2
3action_frame = build_dataset_frame(dataset.features, action_values, prefix=ACTION)
frame = {**observation_frame, **action_frame, "task": single_task}
dataset.add_frame(frame)
下载
git clone方式(不怎么好用)
- sudo apt-get install git-lfs
- git clone https://hf-mirror.com/datasets/lerobot/berkeley_autolab_ur5 ### hf cli(镜像) 这个方式得到的比较依赖huggingface代码生态(就是比较难找到真正的文件)
- 设置 HF_ENDPOINT(一次性示例)
1
export HF_ENDPOINT=https://hf-mirror.com
- 验证是否设置成功
1 | echo $HF_ENDPOINT |
如果输出 1
https://hf-mirror.com
1
env | grep -i proxy || echo "no proxy env"
1
2
3
4http_proxy=http://127.0.0.1:7890/
https_proxy=http://127.0.0.1:7890/
HTTP_PROXY=http://127.0.0.1:7890/
HTTPS_PROXY=http://127.0.0.1:7890/Connection refused。
>
no_proxy/NO_PROXY 仅表示“这些地址不走代理”,不会影响你直连外网。
1
pip install -U "huggingface_hub[cli]"
1
hf download lerobot/pi05_base
~/.cache/huggingface/hub/models--lerobot--pi0_base/snapshots
如果用lerobot的代码直接id就能对应到这个位置,否则需要自己转移位置,或者hf
download的时候设置好 4. hf download下载的文件在哪里?什么格式? hf
download
lerobot/pi0_base这个命令下载的文件,~/.cache/huggingface/hub/models--lerobot--pi0_base/这个目录里面,有3个文件夹,snapshots,refs和blobs
看起来只有snapshots里面的比较像文件,有文件名而不是奇怪的数字和字母组合,但是,它并不是真正的文件,只是符号链接
snapshots/... 目录下的文件都是 符号链接(symlink),指向 blobs/... 目录下的实际文件
这是 Hugging Face Hub 的 去重存储(content-addressable
storage) 机制
下载到本地以后直接repo_id,pretrained_path_or_name改成id就能直接访问到了
内容
meta/info.json: 配置和feature等meta/stats.json: mean/std/min/max这些dataset.meta.statsmeta/tasks.jsonl: 任务meta/episodes/: per‑episode records (lengths, tasks, offsets) stored as chunked Parquet for scalability.data/: frame‑by‑frame Parquet shards; each file typically contains many episodes.videos/: MP4 shards per camera; each file typically contains many episodes.
工具
可视化
1 | python lerobot/scripts/visualize_dataset.py \ |
数据集版本
现在数据集已经变成3.0版本了,之前2.1的都用不了 1. 安装支持
1
pip install "https://github.com/huggingface/lerobot/archive/33cad37054c2b594ceba57463e8f11ee374fa93c.zip"
1
PYTHONPATH=src python -m lerobot.datasets.v30.convert_dataset_v21_to_v30 --repo-id=<HF_USER/DATASET_ID>
1
2
3
4lerobot-edit-dataset \
--repo_id lerobot/pusht \
--operation.type remove_feature \
--operation.feature_names "['observation.images.top']"1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117import torch
from lerobot.datasets.lerobot_dataset import LeRobotDataset
# ---------- 加载旧数据集 ----------
old_dataset = LeRobotDataset(
repo_id="test1110/merged",
)
# ---------- 修改 features ----------
new_features = old_dataset.features.copy()
# action -> ee_action
if "action" in new_features:
old_action_feature = new_features.pop("action")
new_features["ee_action"] = {
"dtype": old_action_feature["dtype"],
"shape": old_action_feature["shape"],
"names": old_action_feature.get("names", [f"{i}" for i in range(old_action_feature["shape"][0])]),
}
# joint_action -> action
if "joint_action" in new_features:
joint_action_feature = new_features.pop("joint_action")
new_features["action"] = {
"dtype": joint_action_feature["dtype"],
"shape": joint_action_feature["shape"],
"names": joint_action_feature.get("names", [f"{i}" for i in range(joint_action_feature["shape"][0])]),
}
# observation.state -> ee_state
if "observation.state" in new_features:
state_feature = new_features.pop("observation.state")
new_features["observation.ee_state"] = {
"dtype": state_feature["dtype"],
"shape": state_feature["shape"],
"names": state_feature.get("names", [f"{i}" for i in range(state_feature["shape"][0])]),
}
# observation.joint_state -> state
if "observation.joint_state" in new_features:
joint_state_feature = new_features.pop("observation.joint_state")
new_features["observation.state"] = {
"dtype": joint_state_feature["dtype"],
"shape": joint_state_feature["shape"],
"names": joint_state_feature.get("names", [f"{i}" for i in range(joint_state_feature["shape"][0])]),
}
print(new_features)
# ---------- 创建新的空数据集 ----------
FPS = old_dataset.meta.fps
ROBOT_TYPE = old_dataset.meta.robot_type
new_dataset = LeRobotDataset.create(
repo_id="test1111/merged",
features=new_features,
fps=FPS,
robot_type=ROBOT_TYPE,
use_videos=True,
image_writer_threads=4
)
# ---------- 遍历旧数据集,复制特征 ----------
for idx in range(len(old_dataset)):
sample = old_dataset[idx]
new_sample = sample.copy()
# 删除多余索引信息
for k in ["timestamp", "frame_index", "episode_index", "task_index", "index"]:
new_sample.pop(k, None)
# joint_action -> action
if "joint_action" in sample:
new_sample["action"] = sample["joint_action"].clone() if isinstance(sample["joint_action"], torch.Tensor) else torch.tensor(sample["joint_action"])
# action -> ee_action
if "action" in sample:
new_sample["ee_action"] = sample["action"].clone() if isinstance(sample["action"], torch.Tensor) else torch.tensor(sample["action"])
# observation.joint_state -> state
if "observation.joint_state" in sample:
new_sample["observation.state"] = sample["observation.joint_state"].clone() if isinstance(sample["observation.joint_state"], torch.Tensor) else torch.tensor(sample["observation.joint_state"])
# observation.state -> ee_state
if "observation.state" in sample:
new_sample["observation.ee_state"] = sample["observation.state"].clone() if isinstance(sample["observation.state"], torch.Tensor) else torch.tensor(sample["observation.state"])
# 删除旧的 action 和 joint_action
for k in ["joint_action"]:
new_sample.pop(k, None)
# 删除旧的 observation state
for k in ["observation.state", "observation.joint_state"]:
if k in new_sample:
new_sample.pop(k)
# 处理图像 shape (C,H,W) -> (H,W,C)
for img_key in ["observation.images.wrist", "observation.images.side"]:
if img_key in new_sample:
new_sample[img_key] = new_sample[img_key].permute(1, 2, 0)
# 添加到新数据集
new_dataset.add_frame(new_sample)
# 每1000帧保存一次
if (idx + 1) % 1000 == 0:
new_dataset.save_episode()
new_dataset.episode_buffer = None
# 保存剩余帧
if new_dataset.episode_buffer is not None and new_dataset.episode_buffer["size"] > 0:
new_dataset.save_episode()
new_dataset.episode_buffer = None
new_dataset.finalize()
print("新数据集已保存完成!")
模型
下载方式和数据集差不多,但是模型可能有多个
训练
简单
想用指定的卡id训练 1. 直接命令行 1
2
3
4
5
6HF_HUB_OFFLINE=1 CUDA_VISIBLE_DEVICES=0,1,2 accelerate launch \
--multi_gpu \
--num_processes=3 \
src/lerobot/scripts/lerobot_train.py \
--config_path=fscript/finetune/simple_test.yaml1
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks... To disable this warning, you can either: - Avoid using `tokenizers` before the fork if possible - Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers 的 fast
tokenizer 内部默认会用多线程(并行化加速)。Python 的
fork(比如在 multiprocessing 或 PyTorch
DataLoader(num_workers>0))会复制parent进程。如果parent进程已经启动了
tokenizer 的线程池,再 fork
child进程,就可能死锁,是一个使用顺序的问题。解决方案是需要我们手动显式设置该变量TOKENIZERS_PARALLELISM是否支持tokens并行。即在程序前面加上以下代码行就可以。
1
2
3import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"
推理
robot_client和record_loop在send_action上的区别
一般直接推理joint-send_action(joint)是没关系的,但是如果我再robot_client里面send_action之前做了其它操作的话,这个操作时间会影响send_action的执行效果,也就是joint_action同步写入不到位。 这一部分需要了解 - [ ] robot_client_ee的代码和evaluate相比,send_action之后有什么操作打断了写? - [ ] 怎样演唱robot_client的等待时间,让它可以等写完再执行下一次行为 1. sync是什么[[同步和异步]] <-sync是同步那应该写完再执行别的 2. evaluate的按照fps控制。
evalute的时间控制:send_action一定会写完。假设fps控制的循环时间一次30ms。这一次前面花费了20ms,send_action需要消耗6ms,那执行之后等待4ms,进入下一次。如果前面花了30ms,send_action需要消耗6ms,超过30ms了就不等待,直接进入 下一次。
1 | while timestamp < control_time_s: |
robot_client_ee.py的时间控制比较复杂 1.
核心流程包含三个:执行action,捕获观察,sleep的时间也是和evaluate比较像,要确认self.config.environment_dt是否和1/fps一致。
1
2
3
4
5
6
7
8
9def control_loop(self, task, verbose=False):
while self.running:
if self.actions_available():
_performed_action = self.control_loop_action(verbose)
if self._ready_to_send_observation():
_captured_observation = self.control_loop_observation(task, verbose)
time.sleep(max(0, self.config.environment_dt - (time.perf_counter() - control_loop_start)))control_loop_action这个函数里面
1
2
3
4
5# 从queue里面获取action
timed_action = self.action_queue.get_nowait()
ee_action = self._action_tensor_to_action_dict(timed_action.get_action())
joint_action = self.ee_to_follower_joints((ee_action, obs))
_performed_action = self.robot.send_action(joint_action)
async推理
async inferece action on cpu
现在有个bug是服务器推理本地cpu执行,报错说“RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.” 但按照它的该法也会报错
github.com/huggingface/lerobot/issues/2244
提供了一个方案:改policy_server.py,传之前做好cpu转换 1
2
3
4"""5. Convert to TimedAction list"""
action_chunk = self._time_action_chunk(
observation_t.get_timestamp(), list(action_tensor.cpu()), observation_t.get_timestep()
)
inference on policy_srever
- policy & preprocessor/postprocessor
1
2
3
4
5
6
7
8
9
10
11
12self.policy = policy_class.from_pretrained(policy_specs.pretrained_name_or_path)
# Load preprocessor and postprocessor, overriding device to match requested device
device_override = {"device": self.device}
self.preprocessor, self.postprocessor = make_pre_post_processors(
self.policy.config,
pretrained_path=policy_specs.pretrained_name_or_path,
preprocessor_overrides={
"device_processor": device_override,
"rename_observations_processor": {"rename_map": policy_specs.rename_map},
},
postprocessor_overrides={"device_processor": device_override},
) - mostly in
_predict_action_chunktriggered byaction_chunk = self._predict_action_chunk(obs)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43def _predict_action_chunk(self, observation_t: TimedObservation) -> list[TimedAction]:
"""Predict an action chunk based on an observation.
Pipeline:
1. Convert raw observation to LeRobot format
2. Apply preprocessor (tokenization, normalization, batching, device placement)
3. Run policy inference to get action chunk
4. Apply postprocessor (unnormalization, device movement)
5. Convert to TimedAction list
"""
"""1. Prepare observation"""
observation: Observation = raw_observation_to_observation(
observation_t.get_observation(),
self.lerobot_features,
self.policy_image_features,
)
"""2. Apply preprocessor"""
observation = self.preprocessor(observation)
"""3. Get action chunk"""
action_tensor = self._get_action_chunk(observation)
"""4. Apply postprocessor"""
# Apply postprocessor (handles unnormalization and device movement)
# Postprocessor expects (B, action_dim) per action, but we have (B, chunk_size, action_dim)
# So we process each action in the chunk individually
_, chunk_size, _ = action_tensor.shape
# Process each action in the chunk
processed_actions = []
for i in range(chunk_size):
# Extract action at timestep i: (B, action_dim)
single_action = action_tensor[:, i, :]
processed_action = self.postprocessor(single_action)
processed_actions.append(processed_action)
# Stack back to (B, chunk_size, action_dim), then remove batch dim
action_tensor = torch.stack(processed_actions, dim=1).squeeze(0)
"""5. Convert to TimedAction list"""
action_chunk = self._time_action_chunk(
observation_t.get_timestamp(), list(action_tensor), observation_t.get_timestep()
)
return action_chunk - What does
raw_observation_to_observationdo?1
2
3
4
5
6
7
8
9
10
11
12
13
14
15def raw_observation_to_observation(
raw_observation: RawObservation,
lerobot_features: dict[str, dict],
policy_image_features: dict[str, PolicyFeature],
) -> Observation:
observation = {}
observation = prepare_raw_observation(raw_observation, lerobot_features, policy_image_features)
for k, v in observation.items():
if isinstance(v, torch.Tensor): # VLAs present natural-language instructions in observations
if "image" in k:
# Policy expects images in shape (B, C, H, W)
observation[k] = prepare_image(v).unsqueeze(0)
else:
observation[k] = v
return observation - details in prepare_raw_observation and prepare_image
- prepare_raw_observation . In
resizefunction I found difference between local inference and async inference (the problem I asked in title.)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31def prepare_raw_observation(
robot_obs: RawObservation,
lerobot_features: dict[str, dict],
policy_image_features: dict[str, PolicyFeature],
) -> Observation:
"""Matches keys from the raw robot_obs dict to the keys expected by a given policy (passed as
policy_image_features)."""
# 1. {motor.pos1:value1, motor.pos2:value2, ..., laptop:np.ndarray} ->
# -> {observation.state:[value1,value2,...], observation.images.laptop:np.ndarray}
# Indeed it runs build_dataset_frame(lerobot_features, robot_obs, prefix=OBS_STR)
lerobot_obs = make_lerobot_observation(robot_obs, lerobot_features)
# 2. Greps all observation.images.<> keys
image_keys = list(filter(is_image_key, lerobot_obs))
# state's shape is expected as (B, state_dim)
state_dict = {OBS_STATE: extract_state_from_raw_observation(lerobot_obs)}
# extract_images_from_raw_observation return torch.tensor(lerobot_obs[camera_key])
image_dict = {
image_k: extract_images_from_raw_observation(lerobot_obs, image_k) for image_k in image_keys
}
# Turns the image features to (C, H, W) with H, W matching the policy image features.
# This reduces the resolution of the images
image_dict = {
key: resize_robot_observation_image(torch.tensor(lerobot_obs[key]), policy_image_features[key].shape)
for key in image_keys
}
if "task" in robot_obs:
state_dict["task"] = robot_obs["task"]
return {**state_dict, **image_dict} - resize_robot_observation_image
1
2
3
4
5
6
7
8
9
10
11def resize_robot_observation_image(image: torch.tensor, resize_dims: tuple[int, int, int]) -> torch.tensor:
assert image.ndim == 3, f"Image must be (C, H, W)! Received {image.shape}"
# (H, W, C) -> (C, H, W) for resizing from robot obsevation resolution to policy image resolution
image = image.permute(2, 0, 1)
dims = (resize_dims[1], resize_dims[2])
# Add batch dimension for interpolate: (C, H, W) -> (1, C, H, W)
image_batched = image.unsqueeze(0)
# Interpolate and remove batch dimension: (1, C, H, W) -> (C, H, W)
resized = torch.nn.functional.interpolate(image_batched, size=dims, mode="bilinear", align_corners=False)
return resized.squeeze(0) - prepare_image
1
2
3
4
5
6def prepare_image(image: torch.Tensor) -> torch.Tensor:
"""Minimal preprocessing to turn int8 images to float32 in [0, 1], and create a memory-contiguous tensor"""
image = image.type(torch.float32) / 255
image = image.contiguous()
return image
- prepare_raw_observation . In
joint->ee
record
- record的数据逻辑:
- follower_joints_to_ee:follower当前的状态state(ee格式)
- leader_joints_to_ee:leader的目标动作(ee格式)
- ee_to_follower_joints:没有像lerobot以前那样joint->joint的teleoperate方法,而是用leader的ee action计算follower的joint action。(那岂不是就能验证它能否teleoperate别的?)
- 操作过程
- modifiying settings in
record.pyinexamples/so100_to_so100_EEto set cameras/robots/datasets' episodes. etc. - Download ursf and tell code where to find it.
- Then directly run python record.py to start record. In default the dataset will be saved with state/action in ee type.
- The code uses
follower_kinematics_solverandleader_kinematics_solverwith ik solver placo ## 保存joint形式的数据 record里面的调用形式1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22# Create the dataset
dataset = LeRobotDataset.create(
repo_id=HF_REPO_ID,
fps=FPS,
features=combine_feature_dicts(
# Run the feature contract of the pipelines
# This tells you how the features would look like after the pipeline steps
aggregate_pipeline_dataset_features(
pipeline=leader_joints_to_ee,
initial_features=create_initial_features(action=leader.action_features),
use_videos=True,
),
aggregate_pipeline_dataset_features(
pipeline=follower_joints_to_ee,
initial_features=create_initial_features(observation=follower.observation_features),
use_videos=True,
),
),
robot_type=follower.name,
use_videos=True,
image_writer_threads=4,
)
- modifiying settings in
feature关键字里面,流程是 1
2
3
4
5
6
7
8
9(raw robot data)
↓
create_initial_features()
↓
aggregate_pipeline_dataset_features(pipeline=leader_joints_to_ee)
↓
combine_feature_dicts(...) ← merge leader + follower 的特征
↓
LeRobotDataset.create(...) ← 把特征schema + 数据保存为HF Dataset
create_initial_features()构建初始的特征结构(feature
dict),告诉 pipeline 原始输入有哪些字段。
aggregate_pipeline_dataset_features这个函数主要的功能是把一个数据处理管线(DataProcessorPipeline)输出的特征,整理、筛选并格式化成可用于
Hugging Face LeRobot 数据集的标准特征字典。
combine_feature_dicts(*dicts)把多个 pipeline 的特征
schema 合并
要求同时保存ee和joint形式的action/state,需要修改feature,在原来的pipeline基础上加保存原始数据的,修改dataset的feature
record数据获取、处理与保存
2个函数,一个是dataset = LeRobotDataset.create(features),另一个是record_loop。分别负责dataset空数据集的feture设置和数据填入
- features里面是把它原来的action和state变成了与joint
name符合的类型。参考lerobot_record的代码,发现了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
dataset_features = combine_feature_dicts(
aggregate_pipeline_dataset_features(
pipeline=teleop_action_processor,
initial_features=create_initial_features(
action=robot.action_features
), # TODO(steven, pepijn): in future this should be come from teleop or policy
use_videos=cfg.dataset.video,
),
aggregate_pipeline_dataset_features(
pipeline=robot_observation_processor,
initial_features=create_initial_features(observation=robot.observation_features),
use_videos=cfg.dataset.video,
),
) - 如果直接新增,会导致action有多个部分,怎样区分开?
- 数据填入
整个流程: 1. lerobot-record --config_path=tools/collect/collect_complex_movement.yaml 采集joint形式的data 2. python examples/so100_to_so100_EE/new_change.py 正运动学 3. python examples/so100_to_so100_EE/new_replay.py 用ee数据反解并执行,感觉还是有细微差距的 4. python examples/so100_to_so100_EE/compare_2_actions.py 比较 1. 新问题:new_replay和compare_2_actions在计算action使用的obs不一样,要改成执行动作之后 2. 新问题2:transfer的时候没有问题吗?使用本地的state_joint数值,但是fk为什么要用state?
fk的计算过程in lerobot 🧩 一、record_loop 的总体数据流逻辑
在 record_loop() 中,执行循环 roughly 如下:
obs = robot.get_observation() # ① 获取机器人的关节状态(joints) obs_processed = robot_observation_processor(obs) # ② 对 observation 做处理(通常FK)
act = teleop.get_action() # ③ 从 teleop 设备拿动作(leader 的 joints) act_processed_teleop = teleop_action_processor((act, obs)) # ④ 对动作处理(通常FK)
robot_action_to_send = robot_action_processor((act_processed_teleop, obs)) # ⑤ 做IK robot.send_action(robot_action_to_send) # ⑥ 发给follower执行
从数据流角度看:
teleop_action_processor:leader joints → leader EE
robot_action_processor:EE → follower joints
robot_observation_processor:follower joints → follower EE
metaworld
下载数据集() export HF_ENDPOINT=https://hf-mirror.com hf download lerobot/metaworld_mt50 --repo-type=dataset --token 复制token(需要能进huggingface官网)
环境 pip install -e ".[metaworld,smolvla]" hf download HuggingFaceTB/SmolVLM2-500M-Instruct hf download lerobot/smolvla_base
测试一个已经训练好的模型:检查metaworld环境 下载模型hf download jadechoghari/smolvla_metaworld --token 复制token(需要能进huggingface官网) HF_HUB_OFFLINE=1 lerobot-eval
--policy.path="jadechoghari/smolvla_metaworld"
--env.type=metaworld
--env.task=medium
--eval.batch_size=1
--eval.n_episodes=2重新校准
lerobot-calibrate
--teleop.type=so100_leader
--teleop.port=/dev/ttyACM1
--teleop.id=middle_leader
lerobot-calibrate
--robot.type=so100_follower
--robot.port=/dev/ttyACM0
--robot.id=middle_follower
- 保存的action不应该是follower读出来的,那只要保证不会跳变+精度足够就ok了?
- 之前没有安装深度相关的:pyrealsense2
- 采集:lerobot-record --config_path=tools/collect/collect_joint_test.yaml
- replay下joint控制的效果:lerobot-replay --robot.type=so100_follower --robot.port=/dev/ttyACM0 --robot.id=follower --dataset.repo_id=test_10/ee --dataset.episode=2 同时可以运行一个录制代码
- joint变成ee:python examples/so100_to_so100_EE/new_change.py
- visu
感觉先collect再change不好,还是直接collect吧。 先在服务器上处理好模型等 1. 下载位置在~/.cache/huggingface/hub 2. export HF_ENDPOINT=https://hf-mirror.com 3. hf download HuggingFaceTB/SmolVLM2-500M-Instruct 4. hf download lerobot/smolvla_base
选择哪种方式? 看action保存的形式:使用pipeline的时候, follower->raw_obs->robot_action_processor->obs_processed->dataset leader->raw_act->teleop_action_process->act_processed_teleop=action_values->dataset act_processed_teleop->robot_observation_processor->robot_action_sent->执行
所以还是保存record比较好,处理一下代码,修改了record_loop里面
rsync -avz /path/to/local/folder/ username@remote_host:/path/to/remote/folder/
rsync -avz test1110/ user@10.10.16.18:/home/user/.cache/huggingface/lerobot/test1110/
accelerate launch
--multi_gpu
--num_processes=2
$(which lerobot-train)
--policy.path=lerobot/smolvla_base
--dataset.repo_id=test1110/merged
--batch_size=64
--steps=20000
--output_dir=outputs/train/ee_action_ee_state
--job_name=my_smolvla_training
--policy.device=cuda
--wandb.enable=false
CUDA_VISIBLE_DEVICES=0 HF_HUB_OFFLINE=1 lerobot-train
--policy.path=lerobot/smolvla_base
--dataset.repo_id=test1110/merged
--batch_size=64
--steps=20000
--output_dir=outputs/train/ee_action_ee_state
--job_name=my_smolvla_training
--policy.device=cuda
--wandb.enable=false
--policy.push_to_hub=false
--rename_map='{"observation.images.side": "observation.images.camera1",
"observation.images.wrist": "observation.images.camera2"}'
CUDA_VISIBLE_DEVICES=1 HF_HUB_OFFLINE=1 lerobot-train
--policy.path=lerobot/smolvla_base
--dataset.repo_id=test1110/merged
--batch_size=64
--steps=20000
--output_dir=outputs/train/joint_action_joint_state
--job_name=my_smolvla_training_joint
--policy.device=cuda
--wandb.enable=false
--policy.push_to_hub=false
--rename_map='{"observation.images.side": "observation.images.camera1",
"observation.images.wrist": "observation.images.camera2"}'
rsync -av --progress --exclude='*.safetensors' user@10.10.16.18:/home/user/working_folder/lerobot/outputs/ /home/user/gene/lerobot/outputs/
python -m lerobot.async_inference.policy_server
--host=10.10.16.18
--port=8080
python -m lerobot.async_inference.robot_client
--server_address=10.10.16.18:8080
--robot.type=so100_follower
--robot.port=/dev/ttyACM0
--robot.id=follower
--robot.cameras="{ camera1: {type: intelrealsense,
serial_number_or_name: 806312060427, width: 640, height: 480, fps: 30,
use_depth: False}, camera2: {type: opencv, index_or_path: 6, width: 640,
height: 480, fps: 30}}"
--task="pick up the yellow sachet and place it into the box."
--policy_type=smolvla
--pretrained_name_or_path=outputs/train/ee_action_ee_state/checkpoints/020000/pretrained_model
--policy_device=cuda
--actions_per_chunk=50
--chunk_size_threshold=0.5
--aggregate_fn_name=weighted_average
--debug_visualize_queue_size=True
action转换
lerobot里面提供了一个:examples/so100_to_so100_EE,看一下这部分代码做了什么?
四个代码,record,replay,evaluate和teleoperate,分别做
lerobot-ee
record
evaluate
the most core work in predict_action function. Where 1.
normalize and to CHW observation 2. preprocessor observation 3. policy
4. post processor policy output ### evaluate.py 1. robot 1
robot = SO100Follower(robot_config)
1
policy = SmolVLAPolicy.from_pretrained(HF_MODEL_ID)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31kinematics_solver = RobotKinematics(
urdf_path="SO-ARM100/Simulation/SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(robot.bus.motors.keys()),
)
# Build pipeline to convert EE action to joints action
robot_ee_to_joints_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
[
EEBoundsAndSafety(
end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
max_ee_step_m=0.10,
),
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
motor_names=list(robot.bus.motors.keys()),
initial_guess_current_joints=True,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
# Build pipeline to convert joints observation to EE observation
robot_joints_to_ee_pose_processor = RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[
ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys()))
],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27# Create the dataset
dataset = LeRobotDataset.create(
repo_id=HF_DATASET_ID,
fps=FPS,
features=combine_feature_dicts(
aggregate_pipeline_dataset_features(
pipeline=robot_joints_to_ee_pose_processor,
initial_features=create_initial_features(observation=robot.observation_features),
use_videos=True,
),
# User for now should be explicit on the feature keys that were used for record
# Alternatively, the user can pass the processor step that has the right features
aggregate_pipeline_dataset_features(
pipeline=make_default_teleop_action_processor(),
initial_features=create_initial_features(
action={
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
}
),
use_videos=True,
),
),
robot_type=robot.name,
use_videos=True,
image_writer_threads=4,
)dataset.meta.stats is None since it creates a new dataset
1
2
3
4
5
6
7
8preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=policy,
pretrained_path=HF_MODEL_ID,
dataset_stats=dataset.meta.stats,
# dataset_stats=None,
# The inference device is automatically set to match the detected hardware, overriding any previous device settings from training to ensure compatibility.
preprocessor_overrides={"device_processor": {"device": str(policy.config.device)}},
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15record_loop(
robot=robot,
events=events,
fps=FPS,
policy=policy,
preprocessor=preprocessor, # Pass the pre and post policy processors
postprocessor=postprocessor,
dataset=dataset,
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=make_default_teleop_action_processor(), # 相当于不用管
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose_processor,
)1
2
3
4if policy is not None and preprocessor is not None and postprocessor is not None:
policy.reset()
preprocessor.reset()
postprocessor.reset()1
obs = robot.get_observation()
1
2if policy is not None or dataset is not None:
observation_frame = build_dataset_frame(dataset.features, obs_processed, prefix=OBS_STR)1
2
3
4
5
6
7
8
9
10
11
12if policy is not None and preprocessor is not None and postprocessor is not None:
action_values = predict_action(
observation=observation_frame,
policy=policy,
device=get_safe_torch_device(policy.config.device),
preprocessor=preprocessor,
postprocessor=postprocessor,
use_amp=policy.config.use_amp,
task=single_task,
robot_type=robot.robot_type,
)
act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features)1
2
3if policy is not None and act_processed_policy is not None:
action_values = act_processed_policy
robot_action_to_send = robot_action_processor((act_processed_policy, obs))1
2# didn't do action clip
_sent_action = robot.send_action(robot_action_to_send)
predict_action
real observation->policy->action. Here observation
is Lerobotdataset type already. In
src/lerobot/utils/control_utils.py As described,it does: 1.
Prepares the observation by converting it to PyTorch tensors and adding
a batch dimension. 2. Runs the preprocessor pipeline on the observation.
3. Feeds the processed observation to the policy to get a raw action. 4.
Runs the postprocessor pipeline on the raw action. 5. Formats the final
action by removing the batch dimension and moving it to the CPU. Finaly
returns: A torch.Tensor containing the predicted action,
ready for the robot.
1 | def predict_action( |
prepare_observation_for_inference takes a dictionary of NumPy arrays,
performs necessary preprocessing, and prepares it for model inference.
The steps include: 1. Converting NumPy arrays to PyTorch tensors. 2.
Normalizing and permuting image data (if any). 3. Adding a batch
dimension to each tensor. 4. Moving all tensors to the specified compute
device. 1
2
3
4
5
6
7
8
9
10
11
12
13def prepare_observation_for_inference(
observation: dict[str, np.ndarray],
device: torch.device,
) -> RobotObservation:
for name in observation:
observation[name] = torch.from_numpy(observation[name])
# image process: /255 and to CHW
if "image" in name:
observation[name] = observation[name].type(torch.float32) / 255
observation[name] = observation[name].permute(2, 0, 1).contiguous()
observation[name] = observation[name].unsqueeze(0)
observation[name] = observation[name].to(device)
return observation
local
async
修改async
根据[[#async]],直接修改本地接收到action之后的执行,在那之前加上action
pipeline+处理传给policy之前的state。
在robot_client_ee.py(和robot_client.py一样,改class名) 1.
init函数里增加ee和joint转换的工具 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 # solver
kinematics_solver = RobotKinematics(
urdf_path="SO-ARM100/Simulation/SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(robot.bus.motors.keys()),
)
# ee到joint
robot_ee_to_joints_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
motor_names=list(robot.bus.motors.keys()),
initial_guess_current_joints=True,
),
],
EEBoundsAndSafety(
end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
max_ee_step_m=0.10,
),
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
# joint到ee
robot_joints_to_ee_pose_processor = RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[
ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys()))
],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)ee_obs = self.joints_to_ee(raw_observation)
3. server返回的action从ee变成joint,control_loop_action函数里把
_performed_action = self.robot.send_action(self._action_tensor_to_action_dict(timed_action.get_action()))替换成
1
2
3ee_action = self._action_tensor_to_action_dict(timed_action.get_action())
joint_action = self.ee_to_follower_joints((ee_action, self.latest_joint_observation))
_performed_action = self.robot.send_action(joint_action)self.latest_joint_observation应该是joint形式,保持不变,因为之前没有修改它
5. 修改feature 1.
这个feature要求和什么一致?RemotePolicyConfig里面传给服务器self.lerobot_features
= policy_specs.lerobot_features 2. 原来的形式:{'observation.state':
{'dtype': 'float32', 'shape': (6,), 'names': ['shoulder_pan.pos',
'shoulder_lift.pos', 'elbow_flex.pos', 'wrist_flex.pos',
'wrist_roll.pos', 'gripper.pos']}} 3. 应该要的形式:'observation.state':
{'dtype': 'float32', 'shape': (7,), 'names': ['ee.x', 'ee.y', 'ee.z',
'ee.wx', 'ee.wy', 'ee.wz', 'ee.gripper_pos']} 4. 最终的修改方案 1.
lerobot_features_ee_state['observation.state']=ee_feature['observation.state']
2.
async执行
1 | sudo chmod 666 /dev/ttyACM0 |
执行
服务器端 1
2
3
4
5
6HF_HUB_OFFLINE=1 CUDA_VISIBLE_DEVICES=0 python -m lerobot.async_inference.policy_server \
--host=10.10.16.18 \
--port=8080 \
--fps=30 \
--inference_latency=0.033 \
--obs_queue_timeout=1
客户端,还是用yaml的形式不然太难看了。另外有一个之前没有注意的参数 robot里面的use_degrees,这个参数是用来告诉 当前新版硬件 API: 你传入和读出的关节位置是否使用“度数(degrees)”,还是使用新版统一的“归一化范围(–100…100)”。
1 | # test.yaml |
执行 1
HF_HUB_OFFLINE=1 python -m lerobot.async_inference.robot_client_ee --config_path=local_tools/eva/test.yaml
有一个新问题,就是服务器端和本地真正执行predict的时候input不一样,因为async有给图片做和policy一致的resize 所以服务器里面最好加上dataset.meta.stats,
时间问题
服务器推理send_action能不能send完全? 1. robot_client的代码 它
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
class RobotClient:
prefix = "robot_client"
logger = get_logger(prefix)
def __init__(self, config: RobotClientConfig):
"""Initialize RobotClient with unified configuration.
Args:
config: RobotClientConfig containing all configuration parameters
"""
# Store configuration
self.config = config
self.robot = make_robot_from_config(config.robot)
self.robot.connect()
lerobot_features = map_robot_keys_to_lerobot_features(self.robot)
# Use environment variable if server_address is not provided in config
self.server_address = config.server_address
self.policy_config = RemotePolicyConfig(
config.policy_type,
config.pretrained_name_or_path,
lerobot_features,
config.actions_per_chunk,
config.policy_device,
)
self.channel = grpc.insecure_channel(
self.server_address, grpc_channel_options(initial_backoff=f"{config.environment_dt:.4f}s")
)
self.stub = services_pb2_grpc.AsyncInferenceStub(self.channel)
self.shutdown_event = threading.Event()
# Initialize client side variables
self.latest_action_lock = threading.Lock()
self.latest_action = -1
self.action_chunk_size = -1
self._chunk_size_threshold = config.chunk_size_threshold
self.action_queue = Queue()
self.action_queue_lock = threading.Lock() # Protect queue operations
self.action_queue_size = []
self.start_barrier = threading.Barrier(2) # 2 threads: action receiver, control loop
# FPS measurement
self.fps_tracker = FPSTracker(target_fps=self.config.fps)
# Use an event for thread-safe coordination
self.must_go = threading.Event()
self.must_go.set() # Initially set - observations qualify for direct processing
def running(self):
return not self.shutdown_event.is_set()
def start(self):
"""Start the robot client and connect to the policy server"""
try:
# client-server handshake
start_time = time.perf_counter()
self.stub.Ready(services_pb2.Empty())
end_time = time.perf_counter()
# send policy instructions
policy_config_bytes = pickle.dumps(self.policy_config)
policy_setup = services_pb2.PolicySetup(data=policy_config_bytes)
self.stub.SendPolicyInstructions(policy_setup)
self.shutdown_event.clear()
return True
except grpc.RpcError as e:
self.logger.error(f"Failed to connect to policy server: {e}")
return False
def stop(self):
"""Stop the robot client"""
self.shutdown_event.set()
self.robot.disconnect()
self.channel.close()
def send_observation(
self,
obs: TimedObservation,
) -> bool:
"""Send observation to the policy server.
Returns True if the observation was sent successfully, False otherwise."""
if not self.running:
raise RuntimeError("Client not running. Run RobotClient.start() before sending observations.")
if not isinstance(obs, TimedObservation):
raise ValueError("Input observation needs to be a TimedObservation!")
start_time = time.perf_counter()
observation_bytes = pickle.dumps(obs)
serialize_time = time.perf_counter() - start_time
try:
observation_iterator = send_bytes_in_chunks(
observation_bytes,
services_pb2.Observation,
log_prefix="[CLIENT] Observation",
silent=True,
)
_ = self.stub.SendObservations(observation_iterator)
obs_timestep = obs.get_timestep()
return True
except grpc.RpcError as e:
self.logger.error(f"Error sending observation #{obs.get_timestep()}: {e}")
return False
def _inspect_action_queue(self):
with self.action_queue_lock:
queue_size = self.action_queue.qsize()
timestamps = sorted([action.get_timestep() for action in self.action_queue.queue])
return queue_size, timestamps
def _aggregate_action_queues(
self,
incoming_actions: list[TimedAction],
aggregate_fn: Callable[[torch.Tensor, torch.Tensor], torch.Tensor] | None = None,
):
"""Finds the same timestep actions in the queue and aggregates them using the aggregate_fn"""
if aggregate_fn is None:
# default aggregate function: take the latest action
def aggregate_fn(x1, x2):
return x2
future_action_queue = Queue()
with self.action_queue_lock:
internal_queue = self.action_queue.queue
current_action_queue = {action.get_timestep(): action.get_action() for action in internal_queue}
for new_action in incoming_actions:
with self.latest_action_lock:
latest_action = self.latest_action
# New action is older than the latest action in the queue, skip it
if new_action.get_timestep() <= latest_action:
continue
# If the new action's timestep is not in the current action queue, add it directly
elif new_action.get_timestep() not in current_action_queue:
future_action_queue.put(new_action)
continue
# If the new action's timestep is in the current action queue, aggregate it
# TODO: There is probably a way to do this with broadcasting of the two action tensors
future_action_queue.put(
TimedAction(
timestamp=new_action.get_timestamp(),
timestep=new_action.get_timestep(),
action=aggregate_fn(
current_action_queue[new_action.get_timestep()], new_action.get_action()
),
)
)
with self.action_queue_lock:
self.action_queue = future_action_queue
def receive_actions(self, verbose: bool = False):
"""Receive actions from the policy server"""
# Wait at barrier for synchronized start
self.start_barrier.wait()
while self.running:
try:
# Use StreamActions to get a stream of actions from the server
actions_chunk = self.stub.GetActions(services_pb2.Empty())
if len(actions_chunk.data) == 0:
continue # received `Empty` from server, wait for next call
receive_time = time.time()
# Deserialize bytes back into list[TimedAction]
deserialize_start = time.perf_counter()
timed_actions = pickle.loads(actions_chunk.data) # nosec
deserialize_time = time.perf_counter() - deserialize_start
self.action_chunk_size = max(self.action_chunk_size, len(timed_actions))
# Calculate network latency if we have matching observations
if len(timed_actions) > 0 and verbose:
with self.latest_action_lock:
latest_action = self.latest_action
# Get queue state before changes
old_size, old_timesteps = self._inspect_action_queue()
if not old_timesteps:
old_timesteps = [latest_action] # queue was empty
# Log incoming actions
incoming_timesteps = [a.get_timestep() for a in timed_actions]
first_action_timestep = timed_actions[0].get_timestep()
server_to_client_latency = (receive_time - timed_actions[0].get_timestamp()) * 1000
# Update action queue
start_time = time.perf_counter()
self._aggregate_action_queues(timed_actions, self.config.aggregate_fn)
queue_update_time = time.perf_counter() - start_time
self.must_go.set() # after receiving actions, next empty queue triggers must-go processing!
if verbose:
# Get queue state after changes
new_size, new_timesteps = self._inspect_action_queue()
with self.latest_action_lock:
latest_action = self.latest_action
except grpc.RpcError as e:
self.logger.error(f"Error receiving actions: {e}")
def actions_available(self):
"""Check if there are actions available in the queue"""
with self.action_queue_lock:
return not self.action_queue.empty()
def _action_tensor_to_action_dict(self, action_tensor: torch.Tensor) -> dict[str, float]:
action = {key: action_tensor[i].item() for i, key in enumerate(self.robot.action_features)}
return action
def control_loop_action(self, verbose: bool = False) -> dict[str, Any]:
"""Reading and performing actions in local queue"""
# Lock only for queue operations
get_start = time.perf_counter()
with self.action_queue_lock:
self.action_queue_size.append(self.action_queue.qsize())
# Get action from queue
timed_action = self.action_queue.get_nowait()
get_end = time.perf_counter() - get_start
_performed_action = self.robot.send_action(
self._action_tensor_to_action_dict(timed_action.get_action())
)
with self.latest_action_lock:
self.latest_action = timed_action.get_timestep()
return _performed_action
def _ready_to_send_observation(self):
"""Flags when the client is ready to send an observation"""
with self.action_queue_lock:
return self.action_queue.qsize() / self.action_chunk_size <= self._chunk_size_threshold
def control_loop_observation(self, task: str, verbose: bool = False) -> RawObservation:
try:
# Get serialized observation bytes from the function
start_time = time.perf_counter()
raw_observation: RawObservation = self.robot.get_observation()
raw_observation["task"] = task
with self.latest_action_lock:
latest_action = self.latest_action
observation = TimedObservation(
timestamp=time.time(), # need time.time() to compare timestamps across client and server
observation=raw_observation,
timestep=max(latest_action, 0),
)
obs_capture_time = time.perf_counter() - start_time
# If there are no actions left in the queue, the observation must go through processing!
with self.action_queue_lock:
observation.must_go = self.must_go.is_set() and self.action_queue.empty()
current_queue_size = self.action_queue.qsize()
_ = self.send_observation(observation)
if observation.must_go:
# must-go event will be set again after receiving actions
self.must_go.clear()
return raw_observation
except Exception as e:
self.logger.error(f"Error in observation sender: {e}")
def control_loop(self, task: str, verbose: bool = False) -> tuple[Observation, Action]:
"""Combined function for executing actions and streaming observations"""
# Wait at barrier for synchronized start
self.start_barrier.wait()
_performed_action = None
_captured_observation = None
while self.running:
control_loop_start = time.perf_counter()
"""Control loop: (1) Performing actions, when available"""
if self.actions_available():
_performed_action = self.control_loop_action(verbose)
"""Control loop: (2) Streaming observations to the remote policy server"""
if self._ready_to_send_observation():
_captured_observation = self.control_loop_observation(task, verbose)
# Dynamically adjust sleep time to maintain the desired control frequency
time.sleep(max(0, self.config.environment_dt - (time.perf_counter() - control_loop_start)))
return _captured_observation, _performed_action
def async_client(cfg: RobotClientConfig):
logging.info(pformat(asdict(cfg)))
if cfg.robot.type not in SUPPORTED_ROBOTS:
raise ValueError(f"Robot {cfg.robot.type} not yet supported!")
client = RobotClient(cfg)
if client.start():
client.logger.info("Starting action receiver thread...")
# Create and start action receiver thread
action_receiver_thread = threading.Thread(target=client.receive_actions, daemon=True)
# Start action receiver thread
action_receiver_thread.start()
try:
# The main thread runs the control loop
client.control_loop(task=cfg.task)
finally:
client.stop()
action_receiver_thread.join()
if cfg.debug_visualize_queue_size:
visualize_action_queue_size(client.action_queue_size)
client.logger.info("Client stopped")
if __name__ == "__main__":
async_client() # run the client1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class SO100Follower(Robot):
"""
[SO-100 Follower Arm](https://github.com/TheRobotStudio/SO-ARM100) designed by TheRobotStudio
"""
config_class = SO100FollowerConfig
name = "so100_follower"
def __init__(self, config: SO100FollowerConfig):
super().__init__(config)
self.config = config
norm_mode_body = MotorNormMode.DEGREES if config.use_degrees else MotorNormMode.RANGE_M100_100
self.bus = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": Motor(1, "sts3215", norm_mode_body),
"shoulder_lift": Motor(2, "sts3215", norm_mode_body),
"elbow_flex": Motor(3, "sts3215", norm_mode_body),
"wrist_flex": Motor(4, "sts3215", norm_mode_body),
"wrist_roll": Motor(5, "sts3215", norm_mode_body),
"gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100),
},
calibration=self.calibration,
)
self.cameras = make_cameras_from_configs(config.cameras)
def observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
def action_features(self) -> dict[str, type]:
return self._motors_ft
def is_connected(self) -> bool:
return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())
def connect(self, calibrate: bool = True) -> None:
"""
We assume that at connection time, arm is in a rest position,
and torque can be safely disabled to run calibration.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.bus.connect()
if not self.is_calibrated and calibrate:
logger.info(
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
)
self.calibrate()
for cam in self.cameras.values():
cam.connect()
self.configure()
logger.info(f"{self} connected.")
def is_calibrated(self) -> bool:
return self.bus.is_calibrated
def calibrate(self) -> None:
if self.calibration:
# Calibration file exists, ask user whether to use it or run new calibration
user_input = input(
f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: "
)
if user_input.strip().lower() != "c":
logger.info(f"Writing calibration file associated with the id {self.id} to the motors")
self.bus.write_calibration(self.calibration)
return
logger.info(f"\nRunning calibration of {self}")
self.bus.disable_torque()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
input(f"Move {self} to the middle of its range of motion and press ENTER....")
homing_offsets = self.bus.set_half_turn_homings()
full_turn_motor = "wrist_roll"
unknown_range_motors = [motor for motor in self.bus.motors if motor != full_turn_motor]
print(
f"Move all joints except '{full_turn_motor}' sequentially through their "
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors)
range_mins[full_turn_motor] = 0
range_maxes[full_turn_motor] = 4095
self.calibration = {}
for motor, m in self.bus.motors.items():
self.calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=0,
homing_offset=homing_offsets[motor],
range_min=range_mins[motor],
range_max=range_maxes[motor],
)
self.bus.write_calibration(self.calibration)
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
def configure(self) -> None:
with self.bus.torque_disabled():
self.bus.configure_motors()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
# Set P_Coefficient to lower value to avoid shakiness (Default is 32)
self.bus.write("P_Coefficient", motor, 16)
# Set I_Coefficient and D_Coefficient to default value 0 and 32
self.bus.write("I_Coefficient", motor, 0)
self.bus.write("D_Coefficient", motor, 32)
if motor == "gripper":
self.bus.write("Max_Torque_Limit", motor, 500) # 50% of max torque to avoid burnout
self.bus.write("Protection_Current", motor, 250) # 50% of max current to avoid burnout
self.bus.write("Overload_Torque", motor, 25) # 25% torque when overloaded
def setup_motors(self) -> None:
for motor in reversed(self.bus.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
self.bus.setup_motor(motor)
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
def get_observation(self) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Read arm position
start = time.perf_counter()
obs_dict = self.bus.sync_read("Present_Position")
obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.bus.sync_read("Present_Position")
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
# Send goal position to the arm
self.bus.sync_write("Goal_Position", goal_pos)
return {f"{motor}.pos": val for motor, val in goal_pos.items()}
其中send_action会sync_write 给电机,其它程序都是读,不会写,所以不会阻塞。
debug配置文件自存
服务器
需要训练、async推理 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53{
"version": "0.2.0",
"configurations": [
{
"name": "lerobot_train训练",
"type": "python",
"request": "launch",
"program": "/home/user/miniconda3/envs/lerobot/bin/lerobot-train",
"args": [
"--policy.path=lerobot/smolvla_base",
"--dataset.repo_id=test1110/merged",
"--batch_size=64",
"--steps=20000",
"--output_dir=outputs/train/joint_action_joint_state",
"--job_name=my_smolvla_training_joint",
"--policy.device=cuda",
"--wandb.enable=false",
"--policy.push_to_hub=false",
"--rename_map={\"observation.images.side\":\"observation.images.camera1\",\"observation.images.wrist\":\"observation.images.camera2\"}"
],
// ,\"joint_action\":\"action\",\"action\":\"ee_action\",\"action_is_pad\":\"ee_action_is_pad\",\"joint_action_is_pad\":\"action_is_pad\"
"env": {
"CUDA_VISIBLE_DEVICES": "1",
"HF_HUB_OFFLINE": "1"
},
"console": "integratedTerminal",
"justMyCode": false,
"python": "/home/user/miniconda3/envs/lerobot/bin/python",
"cwd": "/home/user/working_folder/lerobot"
},
{
"name": "server async推理",
"type": "python",
"request": "launch",
"module": "lerobot.async_inference.policy_server",
"args": [
"--host=10.10.16.18",
"--port=8080",
"--fps=30",
"--inference_latency=0.033",
"--obs_queue_timeout=1"
],
"env": {
"CUDA_VISIBLE_DEVICES": "1",
"HF_HUB_OFFLINE": "1"
},
"console": "integratedTerminal",
"justMyCode": false,
"python": "/home/user/miniconda3/envs/lerobot/bin/python",
"cwd": "/home/user/working_folder/lerobot"
}
]
}
pipeline
折磨了我很久的代码。 用joint->ee举例 ### 1. DataProcessorPipeline
类似一个“流水线”,把输入数据依次经过多个处理步骤(
ProcessorStep)处理。核心函数:
__call__(data):transition = self.to_transition(data)→ 将原始输入(RobotObservation)转成标准化的EnvTransition字典。transformed_transition = self._forward(transition)→ 遍历每个ProcessorStep对 transition 处理。return self.to_output(transformed_transition)→ 将最终EnvTransition转回需要的输出格式(在你这就是RobotObservation)。
_forward(transition):遍历 pipeline 的每个
ProcessorStep。执行前/后 hook(这里没有自定义 hook,所以可以忽略)。
每步处理都返回新的
EnvTransition,并覆盖之前的 transition。
2. observation_to_transition
- 将原始观测(
RobotObservation,也就是你的raw_observation_state)转换成EnvTransition:
{ TransitionKey.OBSERVATION: observation, TransitionKey.ACTION: None, TransitionKey.REWARD: 0.0, TransitionKey.DONE: False, TransitionKey.TRUNCATED: False, TransitionKey.INFO: {}, TransitionKey.COMPLEMENTARY_DATA: {}, }
- 也就是说,它只是包装了一下原始观测,添加了 action、reward、done 等默认字段。
3. ForwardKinematicsJointsToEE
一个
ProcessorStep,负责把 joint 信息转成 end-effector (EE) 信息。__post_init__():初始化两个子处理器:
ForwardKinematicsJointsToEEAction(处理动作中的 joint→EE,通常不需要)ForwardKinematicsJointsToEEObservation(处理观测中的 joint→EE)
__call__(transition):如果 transition 中有
action,处理 action(这里没有,所以跳过)。如果 transition 中有
observation,调用ForwardKinematicsJointsToEEObservation处理。
作用:对
EnvTransition的 observation 做 FK 计算,并添加 EE 数据。
4. ObservationProcessorStep
抽象类,用于专门处理 transition 中的
OBSERVATION。子类只需要实现
observation()方法即可。__call__(transition):复制 transition。
调用
self.observation(observation)。将处理后的 observation 放回 transition。
5. ForwardKinematicsJointsToEEObservation
ObservationProcessorStep的具体实现。observation(observation):- 调用
compute_forward_kinematics_joints_to_ee(),把 joint 信息计算成 EE pose。
- 调用
核心逻辑:
motor_joint_values = [joints[f"{n}.pos"] for n in motor_names] q = np.array(motor_joint_values) t = kinematics.forward_kinematics(q) pos = t[:3, 3] tw = Rotation.from_matrix(t[:3, :3]).as_rotvec() # 删除原 joint 信息,添加 EE 数据 joints["ee.x"] = pos[0] joints["ee.y"] = pos[1] joints["ee.z"] = pos[2] joints["ee.wx"] = tw[0] joints["ee.wy"] = tw[1] joints["ee.wz"] = tw[2] joints["ee.gripper_pos"] = joints["gripper.pos"]
- 返回修改后的 observation。
6. compute_forward_kinematics_joints_to_ee
真正的 FK 计算逻辑。
输入:joint dict, kinematics 对象, motor names
输出:包含 EE pose 的 observation dict。
7. transition_to_observation
只取
EnvTransition中的OBSERVATION部分返回。所以最终
obs_processed = self.robot_joints_to_ee_pose_processor(raw_observation_state)得到的就是原 observation 加上 EE 信息。
总结数据处理流程
输入:
raw_observation_state(dict,包含 joint 信息)。observation_to_transition→ 包装成EnvTransition。_forward():调用
ForwardKinematicsJointsToEE.__call__():调用
ForwardKinematicsJointsToEEObservation.__call__():- 调用
observation()→compute_forward_kinematics_joints_to_ee()→ 计算 EE,修改 observation。
- 调用
transition_to_observation→ 返回最终RobotObservation(包含 EE 数据)。
最终处理结果:
原来的 joint 数据被移除(如果 motor_name 对应的 joint 被删除的话)。
添加了 EE 的位置和旋转信息:
'ee.x', 'ee.y', 'ee.z', 'ee.wx', 'ee.wy', 'ee.wz', 'ee.gripper_pos'