"""
Robotiq Gripper Interface for aiofranka
This module provides an async wrapper for Robotiq grippers, following the same
design pattern as FrankaController - a background control loop continuously
sends commands while you update the target position.
Requires optional dependencies: pip install "aiofranka[robotiq]"
Example:
>>> import asyncio
>>> from aiofranka.gripper import GripperController
>>>
>>> async def main():
... gripper = GripperController("/dev/ttyUSB1")
... await gripper.start()
... gripper.speed = 128 # Set speed (like kp/kd)
... gripper.q_desired = 200 # Set target position
... await asyncio.sleep(1)
... await gripper.stop()
>>>
>>> asyncio.run(main())
"""
import asyncio
import threading
import time
from typing import Optional
import numpy as np
[docs]
class GripperController:
"""
Async gripper controller with background control loop.
Similar to FrankaController, this runs a background loop that continuously
sends goTo commands. You only need to set the target position - the loop
handles the communication.
Attributes:
q_desired (int): Target gripper position (0=open, 255=closed)
qpos (int): Current gripper position (read-only)
speed (int): Gripper speed setting (1-255), like kp gain
force (int): Gripper force setting (0-255), like kd gain
state (dict): Current gripper state (qpos, q_desired, speed, force)
running (bool): Whether control loop is active
Args:
port: Serial port for the gripper (e.g., "/dev/ttyUSB1")
speed: Initial speed setting (default: 255)
force: Initial force setting (default: 255)
loop_rate: Control loop frequency in Hz (default: 50)
Example:
>>> gripper = GripperController("/dev/ttyUSB1")
>>> await gripper.start()
>>>
>>> # Set gains (speed/force) like you set kp/kd
>>> gripper.speed = 128
>>> gripper.force = 200
>>>
>>> # Set target position (like q_desired for Franka)
>>> gripper.q_desired = 255 # Close
>>> await asyncio.sleep(0.5)
>>> gripper.q_desired = 0 # Open
>>> await asyncio.sleep(0.5)
>>>
>>> # Read current position (like qpos for Franka)
>>> print(gripper.qpos)
>>>
>>> await gripper.stop()
"""
[docs]
def __init__(
self,
port: str = "/dev/ttyUSB1",
speed: int = 255,
force: int = 255,
loop_rate: float = 50.0,
read_every_n: int = 2,
):
try:
from pyrobotiqgripper import RobotiqGripper
self._gripper_cls = RobotiqGripper
except ImportError:
raise ImportError(
"pyrobotiqgripper is required for gripper control. "
"Install it with: pip install 'aiofranka[robotiq]'"
)
self._port = port
self._gripper = None
self._loop_rate = loop_rate
self._read_every_n = read_every_n # Read position every Nth cycle
# Control parameters (like kp/kd for Franka)
self._speed = speed
self._force = force
# Target position (like q_desired for Franka)
self._position = 0
# State
self._current_position = 0
self._state_lock = threading.Lock()
# Control loop
self.running = False
self._thread = None
self._task = None
# Rate limiting for set() method
self._update_freq = 50.0
self._last_update_time = {}
@property
def q_desired(self) -> int:
"""Target gripper position (0=open, 255=closed)."""
with self._state_lock:
return self._position
@q_desired.setter
def q_desired(self, value: int):
"""Set target position (0-255)."""
value = int(np.clip(value, 0, 255))
with self._state_lock:
self._position = value
# Backwards compatibility alias
@property
def position(self) -> int:
"""Alias for q_desired (deprecated)."""
return self.q_desired
@position.setter
def position(self, value: int):
self.q_desired = value
@property
def speed(self) -> int:
"""Gripper speed (1-255). Higher = faster. Like kp gain."""
return self._speed
@speed.setter
def speed(self, value: int):
"""Set speed (1-255)."""
self._speed = int(np.clip(value, 1, 255))
@property
def force(self) -> int:
"""Gripper force (0-255). Higher = stronger grip. Like kd gain."""
return self._force
@force.setter
def force(self, value: int):
"""Set force (0-255)."""
self._force = int(np.clip(value, 0, 255))
@property
def qpos(self) -> int:
"""Current gripper position (read-only)."""
with self._state_lock:
return self._current_position
# Backwards compatibility alias
@property
def current_position(self) -> int:
"""Alias for qpos (deprecated)."""
return self.qpos
@property
def state(self) -> dict:
"""Current gripper state dictionary."""
with self._state_lock:
return {
'qpos': self._current_position,
'q_desired': self._position,
'speed': self._speed,
'force': self._force,
'error': abs(self._position - self._current_position)
}
[docs]
def set_freq(self, freq: float):
"""
Set the update frequency for rate-limited set() calls.
Args:
freq: Desired update frequency in Hz (typically 10-100 Hz)
"""
self._update_freq = freq
[docs]
async def set(self, attr: str, value):
"""
Rate-limited setter (same pattern as FrankaController).
Args:
attr: Attribute name ("q_desired", "speed", "force")
value: Value to set
"""
current_time = time.perf_counter()
dt = 1.0 / self._update_freq
if attr not in self._last_update_time:
self._last_update_time[attr] = current_time
await asyncio.sleep(dt)
setattr(self, attr, value)
self._last_update_time[attr] = current_time + dt
return
target_time = self._last_update_time[attr] + dt
sleep_time = target_time - current_time
if sleep_time > 0:
await asyncio.sleep(sleep_time)
setattr(self, attr, value)
self._last_update_time[attr] = target_time
def _write_goto(self, position: int, speed: int, force: int):
"""Direct register write for goTo - skips all the overhead in pyrobotiqgripper's goTo()."""
self._gripper.write_registers(1000, [
0b0000100100000000,
position,
speed * 0b100000000 + force
])
def _read_position(self) -> int:
"""Direct register read for position - skips readAll() overhead."""
registers = self._gripper.read_registers(2000, 3)
return (registers[2] >> 8) & 0xFF
def _control_loop_step(self, read: bool):
"""One iteration of the control loop (runs in dedicated thread).
Args:
read: Whether to read position this cycle.
"""
# Write target position first (lowest latency to gripper)
with self._state_lock:
target = self._position
speed = self._speed
force = self._force
try:
self._write_goto(target, speed, force)
except Exception as e:
print(f"Gripper command error: {e}")
# Read current position after (only every Nth cycle)
if read:
try:
pos = self._read_position()
with self._state_lock:
self._current_position = pos
except Exception:
pass
def _thread_loop(self):
"""Dedicated thread running the control loop.
Avoids run_in_executor overhead by running continuously in its own thread.
"""
dt = 1.0 / self._loop_rate
cycle = 0
while self.running:
t0 = time.perf_counter()
read = (cycle % self._read_every_n == 0)
self._control_loop_step(read)
cycle += 1
elapsed = time.perf_counter() - t0
sleep_time = dt - elapsed
if sleep_time > 0:
time.sleep(sleep_time)
async def _run(self):
"""Start the control loop in a dedicated thread.
Uses a real thread instead of run_in_executor to avoid
thread pool scheduling overhead on every iteration.
"""
self.running = True
self._thread = threading.Thread(target=self._thread_loop, daemon=True)
self._thread.start()
# Keep the task alive until stopped
try:
while self.running:
await asyncio.sleep(0.1)
except asyncio.CancelledError:
self.running = False
if self._thread:
self._thread.join(timeout=1.0)
[docs]
async def start(self):
"""
Initialize gripper and start background control loop.
Returns:
asyncio.Task: The background control loop task
"""
print(f"Starting gripper on {self._port}...")
# Initialize gripper
self._gripper = self._gripper_cls(self._port)
# Reduce serial timeout from 200ms to 50ms for faster round-trips
self._gripper.serial.timeout = 0.05
self._gripper.resetActivate()
# Read initial position using direct register read
self._current_position = self._read_position()
self._position = self._current_position
print(f"Gripper activated. Initial position: {self._current_position}")
# Start control loop
if self._task is None or self._task.done():
self._task = asyncio.create_task(self._run())
await asyncio.sleep(0.5) # Let loop start
return self._task
[docs]
async def stop(self):
"""Stop the control loop and wait for thread to finish."""
self.running = False
if self._thread:
self._thread.join(timeout=2.0)
self._thread = None
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
print("Gripper control loop stopped.")
# Convenience methods
[docs]
def open(self):
"""Set target to fully open (0)."""
self.q_desired = 0
[docs]
def close(self):
"""Set target to fully closed (255)."""
self.q_desired = 255
[docs]
async def wait_until_reached(self, tolerance: int = 5, timeout: float = 5.0) -> bool:
"""
Wait until gripper reaches target position.
Args:
tolerance: Position tolerance in gripper units
timeout: Maximum wait time in seconds
Returns:
True if position reached, False if timeout
"""
start = time.perf_counter()
while time.perf_counter() - start < timeout:
if abs(self.qpos - self.q_desired) <= tolerance:
return True
await asyncio.sleep(0.02)
return False
# Backwards compatibility alias
RobotiqGripperInterface = GripperController
def create_gripper(port: str = "/dev/ttyUSB1") -> GripperController:
"""
Factory function to create a GripperController.
Note: You still need to call await gripper.start() to activate.
Args:
port: Serial port for the gripper
Returns:
GripperController instance (not yet started)
"""
return GripperController(port)