RIP Sim2Real Control and Hardware Deployment Methodology
Part 1. RL Sim2Real Learning Methodology (Theory & Rigor)
This section outlines the core learning theories and environmental configurations applied to ensure the reinforcement learning (RL) agent operates stably under real-world physical laws.
1. Control Space Transition: Torque to Velocity
- Theoretical Background: Servo motors with reduction gears, such as Dynamixels, exhibit strong nonlinearities, including static friction, Coulomb friction, viscous friction, and gear backlash. Using pure torque control forces the RL agent to calculate and compensate for these internal hardware nonlinearities in addition to the inverted pendulum’s dynamics.
- Mathematical Advantage of Velocity Control: By setting the control command to a “target velocity,” the motor’s built-in, high-performance PI controller operates at several kHz to actively cancel out nonlinear friction and external disturbances. This frees the agent from low-level friction modeling, allowing it to focus entirely on high-level control—generating optimal velocity trajectories for balancing. This dramatically increases the Sim2Real success rate.
2. Time Horizon and 500Hz Control Frequency Synchronization
- Physics-Control Synchronization: The simulator’s physics step ($dt$) and the RL agent’s policy inference rate (decimation) are fully synchronized at 500Hz (2ms). Considering the latency and jitter of actual hardware communication networks (e.g., USB to RS485), this is the optimal frequency achievable without control delay.
- Value Estimation Horizon: The PPO algorithm’s
num_steps_per_env = 256setting translates to approximately 0.51 seconds in a 500Hz system. When the agent calculates the Advantage Function, this is the most mathematically suitable time horizon to evaluate how the current $\Delta v$ command affects system stability 0.5 seconds later.
3. Delay Embedding via History Taps
- Theoretical Background: Relying on observation data from a single timestep ($t$) is insufficient for filtering sensor noise or determining the motor’s acceleration state and communication dead-time. This violates the Markov Property, which dictates that the current state must represent all past information.
- Implementation: By configuring
sim2real_obs_history_taps = [0, 10, 20, 30], the system concatenates data from 0ms, 20ms, 40ms, and 60ms ago (based on 500Hz) to form a 28D state vector. This allows the neural network to implicitly infer dynamic system states, such as acceleration and delay rates.
4. Simulating Physical Inertia: Command Low-Pass Filter (LPF)
- Theoretical Background: The target velocity output from a digital controller acts as a step function. However, a physical motor rotor cannot accelerate or decelerate instantly due to its moment of inertia ($I$) and coil inductance ($L$).
- Implementation: A 1st-order lag system filter was introduced into the simulation:$$v_{exec, t} = \alpha \cdot v_{cmd, t} + (1-\alpha) \cdot v_{exec, t-1}$$By randomly sampling the time constant ($\tau$) between 1.5ms and 5.0ms to update the $\alpha$ value, the agent is trained to be robust against mechanical and electrical response delays.
5. Explicit Modeling of Hardware Communication Dead-Time
This technique is essential to overcome the phase lag between the simulator’s instantaneous feedback and the real hardware’s time delays.
- Theoretical Background: A real robot’s control loop inevitably experiences latency: Sensor data collection $\rightarrow$ Communication $\rightarrow$ Python node & Inference $\rightarrow$ Motor command execution. In a 500Hz system, a 2-4ms communication delay causes a 1-2 step phase lag. An agent trained under the assumption of instantaneous environmental changes will over-compensate on real hardware, leading to immediate oscillation and failure.
- Implementation Methodology:
- Independent Delay Sampling: Because system bottlenecks are unpredictable,
env_imu_delay,env_joint_delay, andenv_action_delayare independently sampled from a uniform distribution of 0 to 2 steps (0 to 4ms). - Ring Buffer Architecture: Instead of immediately passing observations and actions to the physics engine, data is pushed into a raw buffer. The pipeline then retrieves past data based on the sampled delay index.
- Independent Delay Sampling: Because system bottlenecks are unpredictable,
- Sim2Real Effect: This severe temporal randomization forces the agent to respect physical latency. Consequently, the agent utilizes the history taps to implicitly perform state prediction, resulting in a highly robust control policy that does not diverge even in hardware environments with fluctuating communication delays.
Part 2. Reward Function Design and Sim2Real Control Theory
The reward function is strictly formulated using principles of control engineering to achieve balancing while deriving a physically feasible policy that real-world motors can withstand. It is divided into three pillars: survival, state regulation, and action smoothness.
1. Survival & Termination Penalty
This forms the objective function of the base Markov Decision Process (MDP).
- Alive Reward: $r_{alive} = +1.0$
- Termination Penalty: $r_{term} = -10.0$
- Theoretical Background: In this dense reward structure, surviving yields continuous benefit, while falling beyond a certain angle triggers a massive penalty. This acts as the strongest foundational condition for the agent to learn that maintaining the upright state is the global optimum.
2. State Regulation Penalty
This acts as a Linear Quadratic Regulator (LQR)-style state cost to converge the pendulum upright (0 rad).
- Pendulum Angle Penalty: $r_{\theta} = -2.0 \cdot \theta^2$
- Pendulum Angular Velocity Penalty: $r_{\dot{\theta}} = -0.1 \cdot \dot{\theta}^2$
- Theoretical Background: Using an L2 Norm (squared) rather than an L1 Norm (absolute value) is critical. The squared penalty applies strong corrective force for large errors, but the gradient flattens near the target point. This mathematically suppresses micro-oscillations by preventing over-actuation once the goal is reached, mirroring the Q-matrix penalty in Optimal Control.
3. Action Smoothness & Control Effort (Sim2Real Core)
The primary reason perfect simulated controllers diverge in reality is hardware bandwidth limitations. This is the most crucial reward component for suppressing high-frequency jitter, protecting motors, and closing the Reality Gap.
- Control Effort Penalty (Magnitude):$$r_{mag} = -0.01 \cdot |a_t|^2$$Purpose: Prevents the excessive use of high-velocity commands. It induces a policy that balances with minimal energy, reducing power consumption and motor heat.
- 1st-order Action Smoothness:$$r_{smooth1} = -0.02 \cdot (a_t – a_{t-1})^2$$Purpose: Restricts acceleration. It prevents “bang-bang” control behavior (rapid switching from max forward to max reverse) and enforces smooth acceleration/deceleration profiles.
- 2nd-order Action Smoothness:$$r_{smooth2} = -0.005 \cdot (a_t – 2a_{t-1} + a_{t-2})^2$$Purpose: Restricts jerk. It filters out high-frequency vibrations and micro-kinks in the control signal that a 1st-order penalty cannot catch. By heavily smoothing the agent’s action manifold, it allows Dynamixel motors with physical gear backlash to track commands without mechanical shock.
Summary: By penalizing the 1st and 2nd derivatives of the raw neural network output, the policy does not blindly rely on the physics engine’s perfection. Instead, it naturally respects the physical limits of real motors.
Part 3. Synchronization Protocols for Hardware Deployment
These software engineering protocols prevent control divergence between the virtual simulator and the physical hardware.
Rule 1: Absolute Ground Truth
Regardless of what hardware sensors or motors output, the coordinate system defined in the simulator (URDF/USD) must be treated as the absolute ground truth. Any discrepancies must be corrected immediately during the ingestion phase.
Rule 2: EBIMU Sensor Asymmetric Coordinate Sync
The EBIMU24GV6 sensor uses different reference axes internally for calculating angle and angular velocity.
- Cause: According to the manual, Euler Angle calculations use a right-facing Y-axis (pitching backward yields a positive output), whereas Local Velocity calculations use a left-facing Y-axis (pitch rate forward yields a positive output).
- Directive: To align with the simulator’s FLU (Forward-Left-Up) coordinate system, the pitch angle’s sign must be inverted upon receipt.Python
# Pitch sign inversion upon receipt msg.data = [-pitch, pitch_rate]
Rule 3: Motor Coordinate Alignment (XM430-W210)
Even if a motor is mechanically assembled facing upward, its default rotational direction may contradict the URDF joint axis definition.
- Cause: Robotis Dynamixel motors define counter-clockwise (CCW) rotation as positive when facing the horn. This often opposes the positive axis in the URDF.
- Directive: Signs must be inverted in both directions:
- Command Phase (C++ Node): Multiply the agent’s target velocity by -1 before writing to the motor register.
- Feedback Phase (Python Node): The raw encoder value accumulates in the opposite direction, so its sign must be inverted immediately upon conversion to radians:$$q_{wrapped} = -\left( \frac{\text{raw}}{4096} \times 2\pi – \pi \right)$$
Rule 4: Observation Space Continuity (Wrap & Unwrap)
Rotary joints experience discontinuous value spikes when wrapping from $-\pi$ to $\pi$.
- Directive: 1. Provide angular data as separate $\sin(q)$ and $\cos(q)$ dimensions so the network understands the continuity.2. When calculating velocity, do not use simple difference ($q_t – q_{t-1}$). Use the arctangent function to calculate the true angular delta:$$\Delta q = \text{atan2}\left( \sin(q_t – q_{t-1}), \cos(q_t – q_{t-1}) \right)$$
Rule 5: Units & Quantization
Data exchanged between the agent and hardware must strictly follow the simulator’s units.
- IMU Unit Conversion: Degrees must be converted to radians ($\times \frac{\pi}{180}$).
- Motor Velocity Quantization: The Dynamixel XM430-W210 has a velocity resolution of 0.229 rev/min. The C++ driver must quantize the radian velocity (rad/s) into motor ticks to reflect the hardware’s discrete operational steps:$$\text{vel\_step\_radps} = 0.229 \times \frac{2\pi}{60}$$
- Input Normalization: Observations fed to the neural network must be divided by their designated limit scales (e.g.,
obs_scale_imu_vel = 20.0) so that all inputs are normalized to the $[-1, 1]$ range.
Code
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import math
from isaaclab_assets.robots.rip_v1 import RIP_CFG
from isaaclab.assets import ArticulationCfg
from isaaclab.envs import DirectRLEnvCfg
from isaaclab.scene import InteractiveSceneCfg
from isaaclab.sim import SimulationCfg
from isaaclab.sensors import ImuCfg
from isaaclab.utils import configclass
@configclass
class IsaaclabRipV0EnvCfg(DirectRLEnvCfg):
# env
decimation = 1 # 1000Hz physics -> 1000Hz control
episode_length_s = 5.0
debug_log = False # If True, clones large tensors to extras["step_logs"]
# - spaces definition
action_space = 1
observation_space = 28 # obs_history(24) + raw_action_history(4)
state_space = 0
# ── sim2real: observation noise ──
sim2real_obs_noise_pole_angle = 0.000435 # IMU angle noise [rad]
sim2real_obs_noise_imu_vel = 0.005 # IMU ang vel noise [rad/s]
sim2real_obs_noise_base = 0.0 # limit encoder noise due to quantization [rad]
sim2real_obs_noise_vel = 0.01 # base angular velocity noise [rad/s]
# ── sim2real: observation bias (per env, episode-fixed) ──
sim2real_pole_angle_bias_rad_min = -0.00435
sim2real_pole_angle_bias_rad_max = 0.00435
sim2real_imu_vel_bias_min = -0.1
sim2real_imu_vel_bias_max = 0.1
sim2real_base_pos_bias_min = -0.0085
sim2real_base_pos_bias_max = 0.0085
sim2real_base_vel_bias_min = -0.15
sim2real_base_vel_bias_max = 0.15
# ── sim2real: observation scaling ──
obs_scale_imu_vel = 20.0 # [rad/s], imu angular velocity base
obs_scale_base_vel = 20.0 # [rad/s], base angular velocity base
raw_action_obs_scale = 1.0 # raw NN output history normalization
# ── sim2real: command & velocity LPF ──
sim2real_base_vel_lpf_alpha = 0.1 # IIR alpha: out = alpha*new + (1-alpha)*prev
sim2real_cmd_lpf_tau_ms_min = 1.5 # Motor 1st-order delay tau min
sim2real_cmd_lpf_tau_ms_max = 5.0 # Motor 1st-order delay tau max
# ── sim2real: encoder step size ──
sim2real_base_encoder_cpr = 4096
sim2real_base_encoder_step = 2 * math.pi / 4096
# ── sim2real: domain randomization (physics) ──
dr_mass_scale_min = 0.95
dr_mass_scale_max = 1.05
# ── sim2real: history taps ──
sim2real_obs_history_taps = [0, 10, 20, 30]
sim2real_raw_action_history_taps = [0, 10, 20, 30] # raw NN output history
imu_sensor: ImuCfg = ImuCfg(
prim_path="/World/envs/env_.*/Robot/.*link2",
update_period=0.0,
)
# simulation
sim: SimulationCfg = SimulationCfg(dt=1 / 500, render_interval=decimation)
# robot(s)
robot_cfg: ArticulationCfg = RIP_CFG.replace(prim_path="/World/envs/env_.*/Robot")
# scene
scene: InteractiveSceneCfg = InteractiveSceneCfg(
num_envs=2048, env_spacing=4.0, replicate_physics=True
)
# custom parameters/scales
# - controllable joint
cart_dof_name = "base________5"
pole_dof_name = "link1________4"
# - action scale
max_base_vel_cmd_radps = 7.75
sim2real_base_vel_step = 0.229 * 2 * math.pi / 60.0
# ── reward: upright-only ──
rew_scale_alive = 1.0
rew_scale_terminated = -10.0
rew_scale_pole_pos = -2.0 # r_θ = -k_θ · θ²
rew_scale_pole_vel = -0.1 # r_dθ = -k_dθ · dθ²
rew_scale_action_smooth1 = -0.02 # -k_s1 · (a_t - a_{t-1})² (raw NN output)
rew_scale_action_smooth2 = -0.005 # -k_s2 · (a_t - 2·a_{t-1} + a_{t-2})²
rew_scale_action_mag_cmd = -0.01 # raw NN output magnitude penalty
raw_action_rew_clip = 100.0
# ── progressive randomization stages ──
rand_steps_per_iteration = 256 # matches num_steps_per_env in PPO config
# Stage A (nominal-heavy)
rand_A_action_delay = (0, 2)
rand_A_imu_delay = (0, 2)
rand_A_joint_delay = (0, 2)
rand_A_noise_scale = 0.01
rand_A_pole_angle_range = [-20.0 * math.pi / 180.0, 20.0 * math.pi / 180.0]
rand_A_pole_vel_range = [-0.5, 0.5]
rand_A_base_vel_range = [-0.5, 0.5]
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import math
import torch
from collections.abc import Sequence
import isaaclab.sim as sim_utils
from isaaclab.assets import Articulation
from isaaclab.envs import DirectRLEnv
from isaaclab.sim.spawners.from_files import GroundPlaneCfg, spawn_ground_plane
from isaaclab.sensors import Imu
from isaaclab.utils.math import sample_uniform
import isaaclab.utils.math as math_utils
from .isaaclab_rip_v0_env_cfg import IsaaclabRipV0EnvCfg
# Buffer length: max history tap (30) + 1 = 31
_BUF_LEN = 31
class IsaaclabRipV0Env(DirectRLEnv):
cfg: IsaaclabRipV0EnvCfg
def __init__(
self, cfg: IsaaclabRipV0EnvCfg, render_mode: str | None = None, **kwargs
):
super().__init__(cfg, render_mode, **kwargs)
self._cart_dof_idx, _ = self.robot.find_joints(self.cfg.cart_dof_name)
self._pole_dof_idx, _ = self.robot.find_joints(self.cfg.pole_dof_name)
# 모든 링크(Body)의 인덱스 탐색
self._body_indices, _ = self.robot.find_bodies(".*")
self.joint_pos = self.robot.data.joint_pos
self.joint_vel = self.robot.data.joint_vel
# ── ring buffers (newest at index 0, oldest at index _BUF_LEN-1) ──
N = self.num_envs
dev = self.device
# raw NN output buffer [N, 61, 1]
self.action_buf = torch.zeros(N, _BUF_LEN, self.cfg.action_space, device=dev)
# raw IMU buffer: [pitch_rad, ang_vel] → [N, 61, 2]
self.raw_imu_buf = torch.zeros(N, _BUF_LEN, 2, device=dev)
# raw base buffer: [base_pos, base_vel] → [N, 61, 2]
self.raw_base_buf = torch.zeros(N, _BUF_LEN, 2, device=dev)
# perceived observation feature buffer: [sin(pole), cos(pole), pole_vel, sin(base), cos(base), base_vel] → [N, 61, 6]
self.obs_feat_buf = torch.zeros(N, _BUF_LEN, 6, device=dev)
# ── per-env sampled parameters ──
self.env_action_delay = torch.zeros(N, dtype=torch.long, device=dev)
self.env_imu_delay = torch.zeros(N, dtype=torch.long, device=dev)
self.env_joint_delay = torch.zeros(N, dtype=torch.long, device=dev)
self.env_pole_angle_bias = torch.zeros(N, 1, device=dev)
self.env_imu_vel_bias = torch.zeros(N, 1, device=dev)
self.env_base_pos_bias = torch.zeros(N, 1, device=dev)
self.env_base_vel_bias = torch.zeros(N, 1, device=dev)
# ── base velocity: previous position for differentiation ──
self.prev_base_pos = torch.zeros(N, 1, device=dev)
# ── base velocity LPF state ──
self.base_vel_lpf_state = torch.zeros(N, 1, device=dev)
# ── command LPF state ──
self.vel_cmd_exec_state = torch.zeros(N, 1, device=dev)
self.env_cmd_lpf_alpha = torch.zeros(N, 1, device=dev)
# physics dt [s]
self._dt = 1.0 / 500.0 # matches sim dt
# history tap indices as tensors
self._obs_taps = torch.tensor(
self.cfg.sim2real_obs_history_taps, dtype=torch.long, device=dev
)
self._raw_action_taps = torch.tensor(
self.cfg.sim2real_raw_action_history_taps,
dtype=torch.long,
device=dev,
)
# ── Logging buffers (episode sums & trackers) ──
self.episode_sums = {
"raw_mag_sum": torch.zeros(N, dtype=torch.float, device=dev),
"raw_mag_max": torch.zeros(N, dtype=torch.float, device=dev),
"raw_diff1_sum": torch.zeros(N, dtype=torch.float, device=dev),
"raw_diff2_sum": torch.zeros(N, dtype=torch.float, device=dev),
"pole_angle_sq_sum": torch.zeros(N, dtype=torch.float, device=dev),
"pole_vel_sq_sum": torch.zeros(N, dtype=torch.float, device=dev),
"base_vel_sq_sum": torch.zeros(N, dtype=torch.float, device=dev),
"sign_flip_count": torch.zeros(N, dtype=torch.float, device=dev),
"cmd_lpf_alpha_mean": torch.zeros(N, dtype=torch.float, device=dev),
"cmd_lpf_tau_ms_mean": torch.zeros(N, dtype=torch.float, device=dev),
}
# ── progressive randomization state ──
self._total_steps: int = 0
self.current_noise_scale: float = self.cfg.rand_A_noise_scale
def _setup_scene(self):
self.robot = Articulation(self.cfg.robot_cfg)
# add ground plane
spawn_ground_plane(prim_path="/World/ground", cfg=GroundPlaneCfg())
# clone and replicate
self.scene.clone_environments(copy_from_source=False)
# we need to explicitly filter collisions for CPU simulation
if self.device == "cpu":
self.scene.filter_collisions(global_prim_paths=[])
# add articulation to scene
self.scene.articulations["robot"] = self.robot
# add sensors
self.imu_sensor = Imu(self.cfg.imu_sensor)
self.scene.sensors["imu_sensor"] = self.imu_sensor
# add lights
light_cfg = sim_utils.DomeLightCfg(intensity=2000.0, color=(0.75, 0.75, 0.75))
light_cfg.func("/World/Light", light_cfg)
# ────────────────────────────────────────────────────────────────────────
# _pre_physics_step: store current raw NN output only
# ────────────────────────────────────────────────────────────────────────
def _pre_physics_step(self, actions: torch.Tensor) -> None:
self._total_steps += 1
# Roll buffer: shift everything by 1 (newest at index 0)
self.action_buf = torch.roll(self.action_buf, shifts=1, dims=1)
self.action_buf[:, 0, :] = actions.clone()
# ────────────────────────────────────────────────────────────────────────
# _apply_action: action delay → delta_pos command → LPF → position control
# ────────────────────────────────────────────────────────────────────────
def _apply_action(self) -> None:
N = self.num_envs
delay_idx = self.env_action_delay
a_delayed = self.action_buf[torch.arange(N, device=self.device), delay_idx, :]
# 1. Action 값을 목표 속도(rad/s)로 연속적 매핑
vel_cmd_raw = (
torch.clamp(a_delayed, -1.0, 1.0) * self.cfg.max_base_vel_cmd_radps
)
# 2. Dynamixel 통신 전 디지털 속도 양자화
# - 0.229 rev/min 단위의 계단형(Step) 명령으로 변환
vel_step = self.cfg.sim2real_base_vel_step
vel_cmd_quantized = torch.round(vel_cmd_raw / vel_step) * vel_step
# 3. Command LPF (모터 관성 및 전기적 지연 모사)
# - 양자화된 계단식 명령을 모터 로터가 부드럽게 추종하도록 필터링
alpha_lpf = self.env_cmd_lpf_alpha
self.vel_cmd_exec_state = (
alpha_lpf * vel_cmd_quantized + (1.0 - alpha_lpf) * self.vel_cmd_exec_state
)
# 4. 물리 엔진에 최종 스무딩된 속도 하달
self.robot.set_joint_velocity_target(
self.vel_cmd_exec_state,
joint_ids=self._cart_dof_idx,
)
# 5. per-step log in extras summary (conditional)
if self.cfg.debug_log:
if "step_logs" not in self.extras:
self.extras["step_logs"] = {}
self.extras["step_logs"]["actions"] = self.action_buf[:, 0, :].clone()
self.extras["step_logs"]["a_delayed"] = a_delayed.clone()
self.extras["step_logs"]["vel_cmd_raw"] = vel_cmd_raw.clone()
self.extras["step_logs"]["vel_cmd_quantized"] = vel_cmd_quantized.clone()
self.extras["step_logs"]["vel_cmd_exec"] = self.vel_cmd_exec_state.clone()
# 6. update episode-level metrics (raw NN output based)
raw_cur = self.action_buf[:, 0, :] # current raw NN output [N, 1]
raw_prev1 = self.action_buf[:, 1, :] # previous raw NN output [N, 1]
raw_prev2 = self.action_buf[:, 2, :] # 2 steps ago [N, 1]
raw_mag = torch.abs(raw_cur).squeeze(-1)
self.episode_sums["raw_mag_sum"] += raw_mag
self.episode_sums["raw_mag_max"] = torch.maximum(
self.episode_sums["raw_mag_max"], raw_mag
)
# Raw NN output smoothness tracking
self.episode_sums["raw_diff1_sum"] += torch.abs(raw_cur - raw_prev1).squeeze(-1)
self.episode_sums["raw_diff2_sum"] += torch.abs(
raw_cur - 2.0 * raw_prev1 + raw_prev2
).squeeze(-1)
self.episode_sums["pole_angle_sq_sum"] += torch.square(
self.joint_pos[:, self._pole_dof_idx[0]]
)
self.episode_sums["pole_vel_sq_sum"] += torch.square(
self.joint_vel[:, self._pole_dof_idx[0]]
)
sign_flip = (torch.sign(raw_cur) * torch.sign(raw_prev1)) < 0
self.episode_sums["sign_flip_count"] += sign_flip.squeeze(-1).float()
self.episode_sums["cmd_lpf_alpha_mean"] += self.env_cmd_lpf_alpha.squeeze(-1)
self.episode_sums["cmd_lpf_tau_ms_mean"] += (
(self._dt * 1000.0 / (self.env_cmd_lpf_alpha + 1e-6)) - (self._dt * 1000.0)
).squeeze(-1)
# ────────────────────────────────────────────────────────────────────────
# _get_observations: raw → pos-diff vel → LPF → delay → noise → feature → history stack
# ────────────────────────────────────────────────────────────────────────
def _get_observations(self) -> dict:
N = self.num_envs
# 6-1. read raw sensors
quat = self.imu_sensor.data.quat_w
_, pitch, _ = math_utils.euler_xyz_from_quat(quat)
pole_angle_raw = pitch.unsqueeze(1) # [N, 1] pure rad
imu_ang_vel_raw = self.imu_sensor.data.ang_vel_b[:, 1].unsqueeze(1) # [N, 1]
base_pos_raw = self.joint_pos[:, self._cart_dof_idx[0]].unsqueeze(1) # [N, 1]
encoder_step = self.cfg.sim2real_base_encoder_step
base_pos_quant = torch.round(base_pos_raw / encoder_step) * encoder_step
# 6-2. base velocity via position differentiation
base_vel_raw_current = (
base_pos_quant - self.prev_base_pos
) / self._dt # [N, 1]
self.prev_base_pos = base_pos_quant.clone()
# 6-3. base velocity LPF (applied before buffer storage)
alpha = self.cfg.sim2real_base_vel_lpf_alpha
self.base_vel_lpf_state = (
alpha * base_vel_raw_current + (1.0 - alpha) * self.base_vel_lpf_state
)
base_vel_lpf = self.base_vel_lpf_state.clone()
# Update episode-level metrics: accumulate base velocity square
self.episode_sums["base_vel_sq_sum"] += torch.square(base_vel_lpf.squeeze(-1))
# 6-4. push into raw buffers
self.raw_imu_buf = torch.roll(self.raw_imu_buf, shifts=1, dims=1)
self.raw_imu_buf[:, 0, 0] = pole_angle_raw.squeeze(-1)
self.raw_imu_buf[:, 0, 1] = imu_ang_vel_raw.squeeze(-1)
self.raw_base_buf = torch.roll(self.raw_base_buf, shifts=1, dims=1)
self.raw_base_buf[:, 0, 0] = base_pos_quant.squeeze(-1)
self.raw_base_buf[:, 0, 1] = base_vel_lpf.squeeze(-1) # LPF'd velocity stored
# 6-5. env-specific sensor delay
arange_n = torch.arange(N, device=self.device)
imu_delayed = self.raw_imu_buf[arange_n, self.env_imu_delay, :] # [N, 2]
pole_angle_delayed = imu_delayed[:, 0:1] # [N, 1]
ang_vel_delayed = imu_delayed[:, 1:2] # [N, 1]
base_delayed = self.raw_base_buf[arange_n, self.env_joint_delay, :] # [N, 2]
base_pos_delayed = base_delayed[:, 0:1] # [N, 1]
base_vel_delayed = base_delayed[:, 1:2] # [N, 1]
# 6-6. observation noise (scaled by progressive randomization stage)
ns = self.current_noise_scale
pole_angle_obs = (
pole_angle_delayed
+ self.env_pole_angle_bias
+ torch.randn_like(pole_angle_delayed)
* self.cfg.sim2real_obs_noise_pole_angle
* ns
)
ang_vel_obs = (
ang_vel_delayed
+ self.env_imu_vel_bias
+ torch.randn_like(ang_vel_delayed)
* self.cfg.sim2real_obs_noise_imu_vel
* ns
)
base_pos_obs = (
base_pos_delayed
+ self.env_base_pos_bias
+ torch.randn_like(base_pos_delayed) * self.cfg.sim2real_obs_noise_base * ns
)
base_vel_obs = (
base_vel_delayed
+ self.env_base_vel_bias
+ torch.randn_like(base_vel_delayed) * self.cfg.sim2real_obs_noise_vel * ns
)
# 6-7. fixed observation scaling
pole_angle_sin = torch.sin(pole_angle_obs)
pole_angle_cos = torch.cos(pole_angle_obs)
ang_vel_scaled = torch.clamp(
ang_vel_obs / self.cfg.obs_scale_imu_vel, -1.0, 1.0
)
# base pos to sin and cos
base_pos_sin = torch.sin(base_pos_obs)
base_pos_cos = torch.cos(base_pos_obs)
base_vel_scaled = torch.clamp(
base_vel_obs / self.cfg.obs_scale_base_vel, -1.0, 1.0
)
# 6-8. compose current perceived feature [N, 6]
obs_feat = torch.cat(
[
pole_angle_sin,
pole_angle_cos,
ang_vel_scaled,
base_pos_sin,
base_pos_cos,
base_vel_scaled,
],
dim=-1,
)
# 6-9. push into perceived feature buffer
self.obs_feat_buf = torch.roll(self.obs_feat_buf, shifts=1, dims=1)
self.obs_feat_buf[:, 0, :] = obs_feat
# 6-10. observation history stack: taps [0, 20, 40, 60] → 4×6 = 24 dim
obs_history_parts = []
for tap in self._obs_taps:
obs_history_parts.append(self.obs_feat_buf[:, tap, :]) # [N, 6]
obs_history = torch.cat(obs_history_parts, dim=-1) # [N, 24]
# 6-11. raw action history stack: taps [0, 20, 40, 60] → 4×1 = 4 dim
raw_action_history_parts = []
for tap in self._raw_action_taps:
raw_action_history_parts.append(
torch.clamp(
self.action_buf[:, tap, :] / self.cfg.raw_action_obs_scale,
-1.0,
1.0,
)
) # [N, 1]
raw_action_history = torch.cat(
raw_action_history_parts,
dim=-1,
) # [N, 4]
# 6-12. final observation: 24 + 4 = 28 dim
obs = torch.cat(
[obs_history, raw_action_history],
dim=-1,
) # [N, 28]
return {"policy": obs}
def _get_rewards(self) -> torch.Tensor:
total_reward = compute_rewards(
self.cfg.rew_scale_alive,
self.cfg.rew_scale_terminated,
self.cfg.rew_scale_pole_pos,
self.cfg.rew_scale_pole_vel,
self.cfg.rew_scale_action_smooth1,
self.cfg.rew_scale_action_smooth2,
self.cfg.rew_scale_action_mag_cmd,
self.cfg.raw_action_rew_clip,
self.joint_pos[:, self._pole_dof_idx[0]],
self.joint_vel[:, self._pole_dof_idx[0]],
self.action_buf[:, 0, :], # raw NN output: current
self.action_buf[:, 1, :], # raw NN output: t-1
self.action_buf[:, 2, :], # raw NN output: t-2
self.reset_terminated,
)
return total_reward
def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
self.joint_pos = self.robot.data.joint_pos
self.joint_vel = self.robot.data.joint_vel
time_out = self.episode_length_buf >= self.max_episode_length - 1
out_of_bounds = torch.any(
torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1
)
return out_of_bounds, time_out
# ────────────────────────────────────────────────────────────────────────
# Progressive randomization: determine current stage from iteration count
# ────────────────────────────────────────────────────────────────────────
def _get_rand_stage(self):
"""Return (delay_ranges, noise_scale, recovery_ranges) for current stage."""
return (
self.cfg.rand_A_action_delay,
self.cfg.rand_A_imu_delay,
self.cfg.rand_A_joint_delay,
self.cfg.rand_A_noise_scale,
(
self.cfg.rand_A_pole_angle_range,
self.cfg.rand_A_pole_vel_range,
self.cfg.rand_A_base_vel_range,
),
)
def _reset_idx(self, env_ids: Sequence[int] | None):
if env_ids is None:
env_ids = self.robot._ALL_INDICES
# ── episode logging output (Moved here to use correct episode lengths before reset) ──
if "episode" not in self.extras:
self.extras["episode"] = {}
ep_len = self.episode_length_buf[env_ids].float()
ep_len = torch.where(ep_len == 0.0, torch.ones_like(ep_len), ep_len)
self.extras["episode"]["mean_raw_mag"] = torch.mean(
self.episode_sums["raw_mag_sum"][env_ids] / ep_len
)
self.extras["episode"]["max_raw_mag"] = torch.mean(
self.episode_sums["raw_mag_max"][env_ids]
)
self.extras["episode"]["mean_raw_diff2"] = torch.mean(
self.episode_sums["raw_diff2_sum"][env_ids] / ep_len
)
self.extras["episode"]["pole_angle_rms"] = torch.mean(
torch.sqrt(self.episode_sums["pole_angle_sq_sum"][env_ids] / ep_len)
)
self.extras["episode"]["pole_vel_rms"] = torch.mean(
torch.sqrt(self.episode_sums["pole_vel_sq_sum"][env_ids] / ep_len)
)
self.extras["episode"]["base_vel_rms"] = torch.mean(
torch.sqrt(self.episode_sums["base_vel_sq_sum"][env_ids] / ep_len)
)
self.extras["episode"]["sign_flip_rate"] = torch.mean(
self.episode_sums["sign_flip_count"][env_ids] / ep_len
)
self.extras["episode"]["mean_cmd_lpf_alpha"] = torch.mean(
self.episode_sums["cmd_lpf_alpha_mean"][env_ids] / ep_len
)
self.extras["episode"]["mean_cmd_lpf_tau_ms"] = torch.mean(
self.episode_sums["cmd_lpf_tau_ms_mean"][env_ids] / ep_len
)
for key in self.episode_sums.keys():
self.episode_sums[key][env_ids] = 0.0
super()._reset_idx(env_ids)
n = len(env_ids)
# ── progressive randomization: update stage ──
(
act_delay_range,
imu_delay_range,
joint_delay_range,
noise_scale,
recovery_ranges,
) = self._get_rand_stage()
self.current_noise_scale = noise_scale
# ── sample per-env delays (from current stage) ──
self.env_action_delay[env_ids] = torch.randint(
act_delay_range[0],
act_delay_range[1] + 1,
(n,),
device=self.device,
)
self.env_imu_delay[env_ids] = torch.randint(
imu_delay_range[0],
imu_delay_range[1] + 1,
(n,),
device=self.device,
)
self.env_joint_delay[env_ids] = torch.randint(
joint_delay_range[0],
joint_delay_range[1] + 1,
(n,),
device=self.device,
)
# ── command LPF tau sampling ──
tau_ms = (
torch.rand(n, 1, device=self.device)
* (
self.cfg.sim2real_cmd_lpf_tau_ms_max
- self.cfg.sim2real_cmd_lpf_tau_ms_min
)
+ self.cfg.sim2real_cmd_lpf_tau_ms_min
)
dt_ms = self._dt * 1000.0
self.env_cmd_lpf_alpha[env_ids] = dt_ms / (tau_ms + dt_ms)
# ── bias sampling ──
self.env_pole_angle_bias[env_ids] = (
torch.rand(n, 1, device=self.device)
* (
self.cfg.sim2real_pole_angle_bias_rad_max
- self.cfg.sim2real_pole_angle_bias_rad_min
)
+ self.cfg.sim2real_pole_angle_bias_rad_min
)
self.env_imu_vel_bias[env_ids] = (
torch.rand(n, 1, device=self.device)
* (self.cfg.sim2real_imu_vel_bias_max - self.cfg.sim2real_imu_vel_bias_min)
+ self.cfg.sim2real_imu_vel_bias_min
)
self.env_base_pos_bias[env_ids] = (
torch.rand(n, 1, device=self.device)
* (
self.cfg.sim2real_base_pos_bias_max
- self.cfg.sim2real_base_pos_bias_min
)
+ self.cfg.sim2real_base_pos_bias_min
)
self.env_base_vel_bias[env_ids] = (
torch.rand(n, 1, device=self.device)
* (
self.cfg.sim2real_base_vel_bias_max
- self.cfg.sim2real_base_vel_bias_min
)
+ self.cfg.sim2real_base_vel_bias_min
)
# =========================================================================
# ── Domain Randomization: Mass (질량 랜덤화) ──
# =========================================================================
# PhysX API는 일부 환경만 업데이트하더라도 항상 전체 크기([num_envs, num_bodies])의 텐서를 요구합니다.
if not hasattr(self, "_current_masses"):
# 최초 호출 시 전체 환경의 질량 버퍼(2048 x 3)를 'GPU'에 생성하여 저장
self._current_masses = self.robot.data.default_mass.to(self.device).clone()
num_bodies = len(self._body_indices)
# 1. 리셋되는 환경(env_ids)에 대해서만 랜덤 스케일 샘플링 (0.9 ~ 1.1)
mass_scale = (
torch.rand((n, num_bodies), device=self.device)
* (self.cfg.dr_mass_scale_max - self.cfg.dr_mass_scale_min)
+ self.cfg.dr_mass_scale_min
)
# 2. GPU 상에서 기본 질량에 스케일을 곱해서 전체 버퍼 중 env_ids에 해당하는 행만 덮어쓰기
self._current_masses[env_ids] = (
self.robot.data.default_mass.to(self.device)[env_ids] * mass_scale
)
# 3. PhysX 백엔드 전달을 위해 CPU 텐서로 변환 (전체 크기 유지)
masses_cpu = self._current_masses.cpu().contiguous()
env_ids_cpu = (
env_ids.cpu().contiguous() if isinstance(env_ids, torch.Tensor) else env_ids
)
# 4. 물리 엔진 적용 (전체 텐서와 업데이트할 인덱스를 함께 전달)
self.robot.root_physx_view.set_masses(masses_cpu, env_ids_cpu)
# =========================================================================
# ── reset robot state first (need initial state for buffer fill) ──
joint_pos = self.robot.data.default_joint_pos[env_ids]
joint_vel = self.robot.data.default_joint_vel[env_ids]
# Mixed reset logic: apply recovery everywhere, no stage B/C
pole_angle_range, pole_vel_range, base_vel_range = recovery_ranges
n_nom = n
if n_nom > 0:
joint_pos[:, self._pole_dof_idx[0]] += sample_uniform(
pole_angle_range[0],
pole_angle_range[1],
(n_nom,),
joint_pos.device,
)
joint_vel[:, self._pole_dof_idx[0]] += sample_uniform(
pole_vel_range[0],
pole_vel_range[1],
(n_nom,),
joint_vel.device,
)
joint_vel[:, self._cart_dof_idx[0]] += sample_uniform(
base_vel_range[0],
base_vel_range[1],
(n_nom,),
joint_vel.device,
)
default_root_state = self.robot.data.default_root_state[env_ids]
default_root_state[:, :3] += self.scene.env_origins[env_ids]
self.joint_pos[env_ids] = joint_pos
self.joint_vel[env_ids] = joint_vel
self.robot.write_root_pose_to_sim(default_root_state[:, :7], env_ids)
self.robot.write_root_velocity_to_sim(default_root_state[:, 7:], env_ids)
self.robot.write_joint_state_to_sim(joint_pos, joint_vel, None, env_ids)
# ── fill ring buffers with initial state ──
self.action_buf[env_ids] = 0.0
self.vel_cmd_exec_state[env_ids] = 0.0
# Compute initial sensor values from reset state
init_pole_angle = joint_pos[:, self._pole_dof_idx[0]] # [n] pure rad
init_ang_vel = joint_vel[:, self._pole_dof_idx[0]] # [n]
init_base_pos_raw = joint_pos[:, self._cart_dof_idx[0]] # [n]
# Initialize prev_base_pos for position differentiation
encoder_step = self.cfg.sim2real_base_encoder_step
init_base_pos = torch.round(init_base_pos_raw / encoder_step) * encoder_step
self.prev_base_pos[env_ids] = init_base_pos.unsqueeze(-1)
# Initialize base velocity LPF state with sampled bias (since raw physical velocity is 0 at reset)
self.base_vel_lpf_state[env_ids] = self.env_base_vel_bias[env_ids].clone()
# Fill raw IMU buffer: repeat initial state across all taps
self.raw_imu_buf[env_ids, :, 0] = init_pole_angle.unsqueeze(1)
self.raw_imu_buf[env_ids, :, 1] = init_ang_vel.unsqueeze(1)
# ── fill raw base buffer (velocity = 0 at reset, position differentiation has no history) ──
self.raw_base_buf[env_ids, :, 0] = init_base_pos.unsqueeze(1)
self.raw_base_buf[env_ids, :, 1] = 0.0
# Fill perceived obs feature buffer [sin(pole), cos(pole), pole_vel, sin(base_pos), cos(base_pos), base_vel]
# Adding initial sensor bias (base_vel = 0 + bias):
init_pole_angle_biased = init_pole_angle + self.env_pole_angle_bias[
env_ids
].squeeze(-1)
init_ang_vel_biased = init_ang_vel + self.env_imu_vel_bias[env_ids].squeeze(-1)
init_base_pos_biased = init_base_pos + self.env_base_pos_bias[env_ids].squeeze(
-1
)
init_pole_angle_sin = torch.sin(init_pole_angle_biased)
init_pole_angle_cos = torch.cos(init_pole_angle_biased)
init_ang_vel_scaled = torch.clamp(
init_ang_vel_biased / self.cfg.obs_scale_imu_vel, -1.0, 1.0
)
init_base_pos_sin = torch.sin(init_base_pos_biased)
init_base_pos_cos = torch.cos(init_base_pos_biased)
# base_vel_scaled: include bias at reset (base_vel_raw = 0)
init_base_vel_biased = self.env_base_vel_bias[env_ids].squeeze(-1)
init_base_vel_scaled = torch.clamp(
init_base_vel_biased / self.cfg.obs_scale_base_vel, -1.0, 1.0
)
init_feat = torch.stack(
[
init_pole_angle_sin,
init_pole_angle_cos,
init_ang_vel_scaled,
init_base_pos_sin,
init_base_pos_cos,
init_base_vel_scaled,
],
dim=-1,
) # [n, 6]
self.obs_feat_buf[env_ids] = init_feat.unsqueeze(1).expand(-1, _BUF_LEN, -1)
@torch.jit.script
def compute_rewards(
rew_scale_alive: float,
rew_scale_terminated: float,
rew_scale_pole_pos: float,
rew_scale_pole_vel: float,
rew_scale_action_smooth1: float,
rew_scale_action_smooth2: float,
rew_scale_action_mag_cmd: float,
raw_action_rew_clip: float,
pole_pos: torch.Tensor,
pole_vel: torch.Tensor,
raw_action_t: torch.Tensor,
raw_action_t1: torch.Tensor,
raw_action_t2: torch.Tensor,
reset_terminated: torch.Tensor,
):
# alive / termination
rew_alive = rew_scale_alive * (1.0 - reset_terminated.float())
rew_termination = rew_scale_terminated * reset_terminated.float()
# pole angle: -k_θ * θ²
rew_pole_pos = rew_scale_pole_pos * torch.sum(
torch.square(pole_pos).unsqueeze(dim=1), dim=-1
)
# pole angular velocity: -k_dθ * dθ²
rew_pole_vel = rew_scale_pole_vel * torch.sum(
torch.square(pole_vel).unsqueeze(dim=1), dim=-1
)
# Clip values before squaring for action penalties
raw_t_r = torch.clamp(raw_action_t, -raw_action_rew_clip, raw_action_rew_clip)
raw_t1_r = torch.clamp(raw_action_t1, -raw_action_rew_clip, raw_action_rew_clip)
raw_t2_r = torch.clamp(raw_action_t2, -raw_action_rew_clip, raw_action_rew_clip)
# action smoothness 1 (raw NN output): -k_s1 * (a_t - a_{t-1})²
rew_smooth1 = rew_scale_action_smooth1 * torch.sum(
torch.square(raw_t_r - raw_t1_r), dim=-1
)
# action smoothness 2 (raw NN output): -k_s2 * (a_t - 2*a_{t-1} + a_{t-2})²
rew_smooth2 = rew_scale_action_smooth2 * torch.sum(
torch.square(raw_t_r - 2.0 * raw_t1_r + raw_t2_r), dim=-1
)
# raw NN output magnitude penalty: -k_cmd * |a_t|²
rew_mag_cmd = rew_scale_action_mag_cmd * torch.sum(torch.square(raw_t_r), dim=-1)
total_reward = (
rew_alive
+ rew_termination
+ rew_pole_pos
+ rew_pole_vel
+ rew_smooth1
+ rew_smooth2
+ rew_mag_cmd
)
return total_reward
0 Comments