import numpy as np
from scipy.spatial.transform import Rotation as R
from copy import deepcopy
import sys
sys.path.append(".")
import threading
import asyncio
import time
import logging
from tqdm import trange
from pathlib import Path
import numpy as np
import time
from aiofranka.robot import RobotInterface
from ruckig import InputParameter, Ruckig, Trajectory, Result
logger = logging.getLogger(__name__)
CUR_DIR = Path(__file__).parent.resolve()
[docs]
class FrankaController:
"""
High-level asyncio controller for Franka robots with multiple control modes.
This controller runs a 1kHz torque control loop in the background using asyncio,
while allowing you to send high-level commands asynchronously. Supports three
control modes:
1. Impedance Control: Joint-space spring-damper control
2. Operational Space Control (OSC): Task-space control with null-space
3. Direct Torque Control: Raw torque commands
The controller automatically handles:
- Background control loop at 1kHz
- Torque rate limiting for safety
- Thread-safe state updates
- Rate-limited command updates
- Smooth trajectory generation
Attributes:
robot (RobotInterface): Robot interface instance
type (str): Current controller type ("impedance", "osc", "torque")
running (bool): Whether control loop is active
state (dict): Current robot state (updated at 1kHz)
# Impedance control gains
kp (np.ndarray): Joint position stiffness [Nm/rad] (7,)
kd (np.ndarray): Joint damping [Nm⋅s/rad] (7,)
# OSC gains
ee_kp (np.ndarray): EE stiffness [N/m for xyz, Nm/rad for rpy] (6,)
ee_kd (np.ndarray): EE damping [N⋅s/m for xyz, Nm⋅s/rad for rpy] (6,)
null_kp (np.ndarray): Null-space stiffness [Nm/rad] (7,)
null_kd (np.ndarray): Null-space damping [Nm⋅s/rad] (7,)
# Target states
q_desired (np.ndarray): Desired joint positions [rad] (7,)
ee_desired (np.ndarray): Desired EE pose as 4x4 transform
torque (np.ndarray): Direct torque command [Nm] (7,)
# Safety
clip (bool): Enable torque rate limiting (default: True)
torque_diff_limit (float): Max torque rate [Nm/s] (default: 990)
Examples:
Basic usage:
>>> robot = RobotInterface("172.16.0.2")
>>> controller = FrankaController(robot)
>>> await controller.start()
>>> controller.switch("impedance")
>>> controller.set_freq(50)
>>> await controller.set("q_desired", target_joints)
>>> await controller.stop()
OSC control:
>>> controller.switch("osc")
>>> controller.ee_kp = np.array([300, 300, 300, 1000, 1000, 1000])
>>> controller.set_freq(50)
>>> desired_ee = np.eye(4)
>>> desired_ee[:3, 3] = [0.4, 0.0, 0.5]
>>> await controller.set("ee_desired", desired_ee)
Direct torque:
>>> controller.switch("torque")
>>> controller.torque = np.zeros(7) # Zero torques
Caveats:
- Must await controller.start() before sending commands
- Use set_freq() before set() to enforce timing
- High gains can cause instability or safety triggers
- Switching controllers resets initial state
- State access is thread-safe but copy if modifying
- Control loop must run continuously at ~1kHz
"""
[docs]
def __init__(self, robot: RobotInterface):
"""
Initialize controller with robot interface.
Args:
robot (RobotInterface): Initialized robot interface
Note:
Controller is initialized in "impedance" mode with conservative gains.
Call start() to begin the control loop, then switch() to change modes.
Default Gains:
- Impedance: kp=80, kd=4 (per joint)
- OSC: ee_kp=[100]*6, ee_kd=[4]*6
- Null-space: null_kp=1, null_kd=1
"""
self.robot = robot
self.initialize()
self.state_lock = threading.Lock()
self.type = "impedance"
self.running = False
self.task = None
self.clip = True
self.error_callback = None # Callback function(error_str) called on control loop exception
self.kp, self.kd = np.ones(7) * 80, np.ones(7) * 4
self.ki = np.ones(7) * 0.1 # Integral gains
self.error_integral = np.zeros(7) # Accumulated error
self.ee_kp, self.ee_kd = np.ones(6) * 100, np.ones(6) * 4
self.null_kp, self.null_kd = np.ones(7) * 1, np.ones(7) * 1
self.track = False
self.torque_diff_limit = 990.
self.torque_limit = np.array([87, 87, 87, 87, 12, 12, 12]) # Nm
# Rate limiting for .set() method
self._update_freq = 50.0 # Default 50Hz
self._last_update_time = {}
self._pending_updates = {}
self.state = None
self.verbose = False
[docs]
async def test_connection(self):
"""
Test control loop timing and diagnose connection quality.
Runs for 5 seconds and prints statistics about control loop performance:
- Actual frequency (should be ~1000 Hz)
- Mean/std/min/max loop time
- Jitter (max - min)
Use this to verify your setup is working correctly before running experiments.
Example Output:
Control loop stats (last 1000 iterations):
Frequency: 1000.2 Hz (target: 1000 Hz)
Mean dt: 1.000 ms, Std: 0.015 ms
Min dt: 0.985 ms, Max dt: 1.025 ms
Jitter (max-min): 0.040 ms
Caveats:
- High jitter (>0.5ms) indicates system load or network issues
- Frequency <990 Hz suggests performance problems
- Run on a dedicated realtime system for best results
"""
self.track = True
await asyncio.sleep(5)
self.track = False
[docs]
def initialize(self):
# Read from already-synced MuJoCo state to avoid calling readOnce()
# concurrently with the active torque control loop (would drop TCP connection)
self.initial_qpos = deepcopy(self.robot.data.qpos)
self.initial_qvel = deepcopy(self.robot.data.qvel)
self.initial_ee = self.robot._ee()
self.q_desired = self.initial_qpos
self.ee_desired = self.initial_ee
def _update_desired(self, desired):
"""
Update the desired joint positions, kp and kd
This function is called by the server when a client sends new values
Thread-safe update of shared state.
Args:
desired: Desired joint positions (7-element array)
kp: Joint position stiffness gains (7-element array)
kd: Joint velocity damping gains (7-element array)
"""
with self.state_lock:
self.q_desired = np.array(desired) if type(desired) == list else desired
[docs]
def set_freq(self, freq: float):
"""
Set the update frequency for rate-limited set() calls.
This enforces strict timing for subsequent set() calls, automatically
sleeping to maintain the specified frequency. Prevents sending commands
too fast and ensures smooth, consistent control.
Args:
freq (float): Desired update frequency in Hz (typically 10-100 Hz)
Note:
Must be called BEFORE the first set() call to take effect.
Each attribute tracked by set() has independent timing.
Examples:
>>> controller.set_freq(50) # 50 Hz updates
>>> for i in range(100):
... await controller.set("q_desired", compute_target())
... # Automatically sleeps to maintain 50 Hz
Caveats:
- Don't set freq > 200 Hz (unnecessary and may cause timing issues)
- Lower freq = smoother motion but slower response
- Higher freq = faster response but requires more computation
- Timing is per-attribute (q_desired and ee_desired tracked separately)
"""
self._update_freq = freq
[docs]
async def set(self, attr: str, value):
"""
Rate-limited setter that enforces strict timing for control updates.
This method ensures updates to controller attributes happen at the
frequency specified by set_freq(). It compensates for drift by tracking
the target time for each update, guaranteeing consistent timing even
if your computation time varies.
Args:
attr (str): Attribute name to set. Common values:
- "q_desired": Joint position target (impedance mode)
- "ee_desired": End-effector pose target (OSC mode)
- "torque": Direct torque command (torque mode)
value: Value to set. Type depends on attr:
- q_desired: np.ndarray (7,) [rad]
- ee_desired: np.ndarray (4, 4) homogeneous transform
- torque: np.ndarray (7,) [Nm]
Note:
- Automatically sleeps to maintain frequency set by set_freq()
- First call for an attribute initializes timing
- Each attribute has independent timing tracking
- Thread-safe update of shared state
Examples:
Impedance control:
>>> controller.set_freq(50)
>>> for i in range(100):
... target = initial_q + np.sin(i / 50.0 * np.pi) * 0.1
... await controller.set("q_desired", target)
OSC control:
>>> controller.set_freq(100)
>>> desired_ee = np.eye(4)
>>> desired_ee[:3, 3] = [0.5, 0.0, 0.3]
>>> await controller.set("ee_desired", desired_ee)
Caveats:
- Must call set_freq() before first set() call
- If computation takes longer than 1/freq, timing will slip
- Don't mix set() and direct attribute assignment
- Don't call set() faster than the specified frequency
"""
current_time = time.perf_counter()
dt = 1.0 / self._update_freq
# Initialize tracking for this attribute if first time
if attr not in self._last_update_time:
self._last_update_time[attr] = current_time
with self.state_lock:
setattr(self, attr, value)
await asyncio.sleep(dt)
self._last_update_time[attr] = current_time + dt
return
# Calculate target time for this update
target_time = self._last_update_time[attr] + dt
# Set value immediately to minimize latency
with self.state_lock:
setattr(self, attr, value)
# Then sleep to rate-limit the next call
sleep_time = target_time - current_time
if sleep_time > 0:
await asyncio.sleep(sleep_time)
# Update last update time to target (not actual) to avoid drift
self._last_update_time[attr] = target_time
async def _run(self):
"""Run the control loop continuously in the background"""
self.running = True
loop_times = []
last_time = time.perf_counter()
log_interval = 1000 # Log every 1000 iterations (1 second at 1000Hz)
iteration = 0
try:
while self.running:
t0 = time.time()
self.step()
# Track timing
if self.track:
current_time = time.perf_counter()
dt = current_time - last_time
loop_times.append(dt)
last_time = current_time
iteration += 1
# Log statistics every log_interval iterations
if iteration % log_interval == 0:
loop_times_array = np.array(loop_times)
mean_dt = np.mean(loop_times_array) * 1000 # Convert to ms
std_dt = np.std(loop_times_array) * 1000
min_dt = np.min(loop_times_array) * 1000
max_dt = np.max(loop_times_array) * 1000
actual_freq = 1.0 / np.mean(loop_times_array)
print(f"Control loop stats (last {log_interval} iterations):")
print(f" Frequency: {actual_freq:.1f} Hz (target: 1000 Hz)")
print(f" Mean dt: {mean_dt:.3f} ms, Std: {std_dt:.3f} ms")
print(f" Min dt: {min_dt:.3f} ms, Max dt: {max_dt:.3f} ms")
print(f" Jitter (max-min): {max_dt - min_dt:.3f} ms")
loop_times.clear()
dt = time.time() - t0
if not self.robot.real:
await asyncio.sleep(1/1000. - dt) # Yield control to event loop
else:
await asyncio.sleep(0) # Yield control to event loop
except Exception as e:
self.running = False
error_str = str(e)
print(f"Error in control loop: {error_str}")
# Call error callback if set
if self.error_callback is not None:
try:
self.error_callback(error_str)
except Exception as cb_err:
print(f"Error in error_callback: {cb_err}")
# diff = (self.torque - self.last_torque)/1e-3
# diff = np.abs(diff)
# if np.any(diff > 1000.):
# # print what axis is causing the issue
# arg_idxs = np.where(diff > 1000.)[0]
# print(f"High torque rate of change detected on axes: {arg_idxs}")
# print((self.torque - self.last_torque)/1e-3)
sys.exit(1) # Kill the entire script
[docs]
async def start(self):
"""
Start the 1kHz background control loop.
This creates an asyncio task that runs the control loop continuously
at ~1000 Hz. The loop reads robot state, computes control torques based
on the current controller type, and sends torque commands.
Returns:
asyncio.Task: The background control loop task
Note:
- Blocks for 1 second to ensure loop starts successfully
- Starts robot torque control mode automatically
- Control loop runs until stop() is called
- You can send commands via set() while loop is running
Example:
>>> await controller.start()
>>> # Control loop now running in background
>>> controller.set_freq(50)
>>> await controller.set("q_desired", target)
>>> await controller.stop()
Caveats:
- Must be awaited (async function)
- Don't call start() multiple times without stop()
- If loop crashes, robot will trigger safety stop
- Check terminal for error messages if robot stops unexpectedly
"""
logger.info("Starting robot control loop")
self.robot.start()
if self.task is None or self.task.done():
self.task = asyncio.create_task(self._run())
await asyncio.sleep(1) # Yield to ensure the task starts
return self.task
[docs]
async def stop(self):
"""
Stop the background control loop and robot.
Gracefully terminates the control loop task and stops robot torque control.
Robot will hold position briefly then release brakes.
Note:
- Blocks for 1 second to ensure clean shutdown
- Cancels asyncio control loop task
- Stops robot torque control mode
- Safe to call multiple times
Example:
>>> await controller.start()
>>> # ... do control ...
>>> await controller.stop()
Caveat:
Ensure robot is in a safe configuration before stopping. Robot
will briefly hold position then release brakes.
"""
self.running = False
if self.task:
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
print("Control loop task cancelled.")
self.robot.stop()
print("robot stopped")
await asyncio.sleep(1) # Yield to ensure the task starts
[docs]
def switch(self, controller_type: str):
"""
Switch between control modes at runtime.
Changes the active controller without stopping the control loop. Resets
the initial state (initial_qpos, initial_ee) to current robot state and
clears rate-limiting timing.
Args:
controller_type (str): Controller type to switch to:
- "impedance": Joint-space impedance control
- "pid": Joint-space PID control with integral term
- "osc": Operational space control (task space)
- "torque": Direct torque control
Example:
>>> controller.switch("impedance")
>>> controller.kp = np.ones(7) * 80
>>> # ... run impedance control ...
>>>
>>> controller.switch("osc")
>>> controller.ee_kp = np.array([300, 300, 300, 1000, 1000, 1000])
>>> # ... run OSC control ...
Note:
- Can be called while control loop is running
- Resets q_desired to current position
- Resets ee_desired to current end-effector pose
- Clears timing state from previous set() calls
- Resets integral term when switching to/from PID
Caveats:
- Switching causes brief discontinuity in control
- Adjust gains after switching for smooth transition
- Don't switch rapidly (< 1 Hz) as it resets state
"""
self.type = controller_type
self.initialize()
# Reset integral term when switching controllers
self.error_integral = np.zeros(7)
# Reset timing state when switching controllers
self._last_update_time.clear()
if self.verbose:
print("==================================")
print(f"Switched to {controller_type} controller.")
print("==================================")
[docs]
def step(self):
self.state = self.robot.state
if self.type == "impedance":
self._impedance_step(self.state)
elif self.type == "pid":
self._pid_step(self.state)
elif self.type == "osc":
self._osc_step(self.state)
elif self.type == "torque":
self._torque_step(self.state)
else:
raise ValueError(f"Unknown controller type: {self.type}")
def _torque_step(self, state):
self.robot.step(self.torque)
def _osc_step(self, state):
jac = state['jac']
ee = state['ee']
q = state['qpos']
dq = state['qvel']
mm = state['mm']
last_torque = state['last_torque']
with self.state_lock:
ee_goal = self.ee_desired
position_error = ee_goal[:3, 3] - ee[:3, 3]
rotation_error = R.from_matrix(ee_goal[:3, :3]) * R.from_matrix(ee[:3, :3]).inv()
rotation_error_vec = rotation_error.as_rotvec()
twist = np.zeros(6)
twist[:3] = position_error
twist[3:] = rotation_error_vec
ee_vel = jac @ dq
# minv = np.linalg.inv(mm)
if abs(np.linalg.det(mm)) > 1e-2:
minv = np.linalg.inv(mm)
else:
minv = np.linalg.pinv(mm)
mx_inv = jac @ minv @ jac.T
if abs(np.linalg.det(mx_inv)) > 1e-2:
mx = np.linalg.inv(mx_inv)
else:
mx = np.linalg.pinv(mx_inv)
# operational space feedback torque
feedback = jac.T @ mx @ (self.ee_kp * twist - self.ee_kd * ee_vel)
# null space torque
q0 = self.initial_qpos
ddq = self.null_kp * (q0 - q) - self.null_kd * dq
jbar = minv @ jac.T @ mx
null = (np.eye(7) - jac.T @ jbar.T) @ ddq
# Add coriolis compensation
tau_d = feedback + null
# make sure torque rate of change is not too high
if self.clip:
diff = (tau_d - last_torque)/1e-3
diff = np.clip(diff, -self.torque_diff_limit, self.torque_diff_limit)
tau_d = last_torque + diff * 1e-3
self.robot.step(tau_d)
def _pid_step(self, robot_state):
"""
PID control in joint space with integral term for steady-state error.
Computes: τ = Kp*e + Ki*∫e*dt - Kd*dq
where e = q_desired - q
"""
# Get state variables
q = np.array(robot_state['qpos'])
dq = np.array(robot_state['qvel'])
last_torque = robot_state['last_torque']
# Get current target (thread-safe)
with self.state_lock:
kp = self.kp
ki = self.ki
kd = self.kd
q_desired = self.q_desired
q_goal = q_desired
position_error = q_goal - q
# Update integral term (dt = 1ms = 0.001s)
self.error_integral += position_error * 1e-3
# Anti-windup: clamp integral term
integral_limit = 10.0 # Nm*s (adjust as needed)
self.error_integral = np.clip(self.error_integral, -integral_limit, integral_limit)
# Compute PID control
tau = position_error * kp + self.error_integral * ki - dq * kd
tau_d = tau
# Torque rate limiting
if self.clip:
diff = (tau_d - last_torque)/1e-3
diff = np.clip(diff, -self.torque_diff_limit, self.torque_diff_limit)
tau_d = last_torque + diff * 1e-3
self.torque = tau_d
self.robot.step(tau_d)
def _impedance_step(self, robot_state):
# Get state variables
q = np.array(robot_state['qpos'])
dq = np.array(robot_state['qvel'])
last_torque = robot_state['last_torque']
# Get current target from trajectory (thread-safe)
with self.state_lock:
kp = self.kp
kd = self.kd
q_desired = self.q_desired
q_goal = q_desired
position_error = q_goal - q
# Compute joint-space impedance control
tau = position_error * kp - dq * kd
# Add coriolis compensation
tau_d = tau #+ coriolis
# make sure torque rate of change is not too high
if self.clip:
diff = (tau_d - last_torque)/1e-3
diff = np.clip(diff, -self.torque_diff_limit, self.torque_diff_limit)
tau_d = last_torque + diff * 1e-3
tau_d = np.clip(tau_d, -self.torque_limit, self.torque_limit)
self.torque = tau_d
self.robot.step(tau_d)
[docs]
async def move(self, qpos = [0, 0, 0.0, -1.57079, 0, 1.57079, -0.7853]):
"""
Move robot to target joint position using smooth trajectory.
Generates a time-optimal, jerk-limited trajectory using Ruckig online
trajectory generation, then executes it at 50 Hz. Automatically switches
to impedance mode if not already active.
Args:
qpos (list | np.ndarray): Target joint positions [rad] (7,)
Default: Home position
Note:
- Uses Ruckig for smooth, time-optimal trajectories
- Respects velocity, acceleration, and jerk limits
- Switches to impedance control automatically
- Executes trajectory at 50 Hz (20ms updates)
- Duration depends on distance and limits
Trajectory Limits:
- Max velocity: 10 rad/s per joint
- Max acceleration: 5 rad/s² per joint
- Max jerk: 1 rad/s³ per joint
Examples:
Move to home position:
>>> await controller.move()
Move to custom position:
>>> target = [0, -0.785, 0, -2.356, 0, 1.571, 0.785]
>>> await controller.move(target)
Move to current position + offset:
>>> current = controller.state['qpos']
>>> await controller.move(current + np.array([0.1, 0, 0, 0, 0, 0, 0]))
Caveats:
- Large motions take longer (trajectory is time-optimal)
- Don't call while other control is active
- Blocks until motion completes
- Switches to impedance mode (resets controller state)
- May fail if target is at joint limits or in collision
"""
self.type = "impedance"
inp = InputParameter(7)
inp.current_position = self.robot.state['qpos']
inp.current_velocity = self.robot.state['qvel']
inp.current_acceleration = np.zeros(7)
inp.target_position = np.array(qpos)
inp.target_velocity = np.zeros(7)
inp.target_acceleration = np.zeros(7)
inp.max_velocity = np.ones(7) * 10
inp.max_acceleration = np.ones(7) * 5
inp.max_jerk = np.ones(7)
otg = Ruckig(7)
trajectory = Trajectory(7)
otg.calculate(inp, trajectory)
# create a trajectory to the desired qpos (linear interpolation)
n_steps = int(trajectory.duration * 50)
eta_total = trajectory.duration
for i in range(n_steps):
q_desired, _, _ = trajectory.at_time(i / 50.0)
await self.set("q_desired", q_desired)
done = (i + 1) * 20 // n_steps
bar = "█" * done + "░" * (20 - done)
eta = eta_total - (i + 1) / 50.0
print(f"\r Moving [{bar}] {(i + 1) * 100 // n_steps}% ETA {eta:.1f}s", end="", flush=True)
print()
# await self.stabilize()