"""
Synchronous remote controller client for aiofranka.
Communicates with the aiofranka server process via shared memory (state reads)
and ZMQ (commands). Drop-in replacement for FrankaController with a sync API.
"""
import atexit
import contextlib
import json
import os
import time
import threading
import msgpack
import numpy as np
import zmq
from aiofranka.ipc import StateBlock, zmq_endpoint_for_ip, STATUS_ERROR
DEFAULT_IP = "172.16.0.2"
# Attributes that get sent to server via ZMQ when assigned
_REMOTE_ATTRS = {
"kp", "kd", "ki", "ee_kp", "ee_kd", "null_kp", "null_kd",
"torque_diff_limit", "torque_limit", "clip",
"q_desired", "ee_desired", "torque",
}
[docs]
class FrankaRemoteController:
"""
Synchronous remote controller for Franka robots.
Connects to a running aiofranka server process and provides the same
API as FrankaController, but fully synchronous (no async/await needed).
The 1kHz control loop runs in the server process. This client sends
commands via ZMQ and reads state from shared memory (zero-copy).
Examples:
>>> controller = FrankaRemoteController() # default IP
>>> controller.start()
>>> controller.switch("impedance")
>>> controller.set_freq(50)
>>> for i in range(100):
... state = controller.state
... controller.set("q_desired", target)
>>> controller.stop()
"""
[docs]
def __init__(self, robot_ip: str = None, *, home: bool = True):
if robot_ip is None:
robot_ip = _load_last_ip()
self.robot_ip = robot_ip
self._home = home
self._shm = None
self._zmq_ctx = None
self._zmq_sock = None
self._connected = False
self._server_proc = None
# Rate limiting (mirrors FrankaController)
self._update_freq = 50.0
self._last_update_time = {}
# Compatibility: no-op lock for code that uses `with controller.state_lock:`
self.state_lock = threading.Lock()
[docs]
def start(self):
"""Start the server subprocess and connect.
Spawns a 1kHz control loop in a child process, then connects to it
via shared memory and ZMQ. The subprocess terminates automatically
when this script exits.
The robot must already be unlocked with FCI active. If not, prints
a status summary and raises RuntimeError.
Raises:
RuntimeError: If the robot is not ready or the server fails to start.
"""
from aiofranka.server import start_subprocess, _resolve_from_config, _DeskClientV2
_BOLD = "\033[1m"
_DIM = "\033[2m"
_GREEN = "\033[32m"
_YELLOW = "\033[33m"
_RED = "\033[31m"
_RST = "\033[0m"
print(f"\n {_BOLD}aiofranka{_RST} {_DIM}|{_RST} "
f"starting server {_DIM}({self.robot_ip}){_RST}\n")
# Pre-check: is the robot ready?
try:
ip, username, password = _resolve_from_config(
self.robot_ip, None, None, interactive=False)
probe = _DeskClientV2(ip, username, password)
joints_ok = probe.are_joints_unlocked()
fci_ok = probe.is_fci_active()
j_status = f"{_GREEN}unlocked{_RST}" if joints_ok else f"{_RED}locked{_RST}"
f_status = f"{_GREEN}active{_RST}" if fci_ok else f"{_RED}inactive{_RST}"
print(f" Joints ........... {j_status}")
print(f" FCI .............. {f_status}")
print()
if not joints_ok or not fci_ok:
print(f" {_RED}Robot is not ready.{_RST} Unlock first:\n")
print(f" Add to your script before {_BOLD}ctrl.start(){_RST}:")
print(f" {_BOLD}aiofranka.unlock(){_RST}\n")
print(f" Or run from the terminal right now:")
print(f" {_BOLD}$ aiofranka unlock{_RST}\n")
import sys
sys.exit(1)
except Exception:
pass # Can't reach Desk API — let the server try
# Spawn server subprocess (no homing — user script controls movement)
self._server_proc = start_subprocess(self.robot_ip)
# Register cleanup so the subprocess dies when the script exits
atexit.register(self._terminate_server)
# Connect to the now-running server
self._shm = StateBlock(self.robot_ip, create=False, track=False)
self._zmq_ctx = zmq.Context()
self._zmq_sock = self._zmq_ctx.socket(zmq.REQ)
self._zmq_sock.setsockopt(zmq.RCVTIMEO, 2000)
self._zmq_sock.setsockopt(zmq.SNDTIMEO, 2000)
self._zmq_sock.connect(zmq_endpoint_for_ip(self.robot_ip))
resp = self._send({"cmd": "status"})
if not resp.get("running"):
raise RuntimeError("Server started but control loop is not running")
self._connected = True
print(f" {_GREEN}Ready{_RST} {_DIM}(PID {self._server_proc.pid}){_RST}\n")
[docs]
def stop(self):
"""Stop the server subprocess and disconnect."""
if self._connected:
try:
self._send({"cmd": "stop"})
except Exception:
pass
self._disconnect()
self._terminate_server()
def _disconnect(self):
if self._zmq_sock:
self._zmq_sock.close()
self._zmq_sock = None
if self._zmq_ctx:
self._zmq_ctx.term()
self._zmq_ctx = None
if self._shm:
self._shm.close()
self._shm = None
self._connected = False
def _terminate_server(self):
"""Terminate the server subprocess if it's still alive."""
proc = self._server_proc
if proc is None or not proc.is_alive():
self._server_proc = None
return
proc.terminate()
proc.join(timeout=10)
if proc.is_alive():
proc.kill()
proc.join(timeout=5)
self._server_proc = None
def _check_server_alive(self):
"""Raise if the server has died or errored."""
if self._shm is None:
raise RuntimeError("Not connected. Call start() first.")
status = self._shm.read_status()
if status == STATUS_ERROR:
error_msg = self._shm.read_error()
raise RuntimeError(f"Server control loop error: {error_msg}")
@property
def state(self) -> dict:
"""Read current robot state from shared memory (zero-copy, no IPC overhead)."""
self._check_server_alive()
return self._shm.read_state()
@property
def q_desired(self) -> np.ndarray:
"""Current desired joint positions (7,) from shared memory."""
return self.state["q_desired"]
@property
def ee_desired(self) -> np.ndarray:
"""Current desired end-effector pose (4,4) from shared memory."""
return self.state["ee_desired"]
@property
def torque(self) -> np.ndarray:
"""Current commanded torque (7,) from shared memory."""
return self.state["torque"]
@property
def initial_qpos(self) -> np.ndarray:
"""Initial joint positions (7,) set at last switch() from shared memory."""
return self.state["initial_qpos"]
@property
def initial_ee(self) -> np.ndarray:
"""Initial end-effector pose (4,4) set at last switch() from shared memory."""
return self.state["initial_ee"]
@property
def running(self) -> bool:
if not self._connected:
return False
try:
resp = self._send({"cmd": "status"})
return resp.get("running", False)
except Exception:
return False
[docs]
def switch(self, controller_type: str):
"""Switch control mode on the server ("impedance", "pid", "osc", "torque")."""
self._send({"cmd": "switch", "type": controller_type})
self._last_update_time.clear()
[docs]
def set_freq(self, freq: float):
"""Set update frequency for rate-limited set() calls (Hz)."""
self._update_freq = freq
self._send({"cmd": "set_freq", "freq": freq})
[docs]
def set(self, attr: str, value):
"""
Rate-limited setter for controller attributes.
Sends value to server and sleeps to maintain frequency from set_freq().
Args:
attr: "q_desired", "ee_desired", or "torque"
value: numpy array
"""
current_time = time.perf_counter()
dt = 1.0 / self._update_freq
value = np.asarray(value, dtype=np.float64)
if attr not in self._last_update_time:
self._last_update_time[attr] = current_time
self._send_set(attr, value)
time.sleep(dt)
self._last_update_time[attr] = current_time + dt
return
target_time = self._last_update_time[attr] + dt
self._send_set(attr, value)
sleep_time = target_time - time.perf_counter()
if sleep_time > 0:
time.sleep(sleep_time)
self._last_update_time[attr] = target_time
[docs]
def move(self, qpos=None):
"""
Move to target joint position. Blocks until complete.
Args:
qpos: Target joint positions (7,). Default: home position.
"""
if qpos is None:
qpos = [0, 0, 0.0, -1.57079, 0, 1.57079, -0.7853]
qpos = np.asarray(qpos, dtype=np.float64)
resp = self._send({"cmd": "move", "qpos": qpos.tobytes()})
if not resp.get("ok"):
raise RuntimeError(f"Move failed: {resp.get('error')}")
move_id = resp["move_id"]
while True:
resp = self._send({"cmd": "move_status", "move_id": move_id})
if resp.get("done"):
if resp.get("error"):
raise RuntimeError(f"Move error: {resp['error']}")
return
time.sleep(0.05)
[docs]
def initialize(self):
"""Re-initialize controller state to current robot position."""
self._send({"cmd": "initialize"})
def __setattr__(self, name, value):
if name.startswith('_') or name in ('robot_ip', 'state_lock'):
super().__setattr__(name, value)
return
if name in _REMOTE_ATTRS and self.__dict__.get('_connected', False):
self._send_set(name, value if not isinstance(value, np.ndarray) else value)
else:
super().__setattr__(name, value)
# --- Internal ---
def _send_set(self, attr: str, value):
if isinstance(value, np.ndarray):
msg = {
"cmd": "set",
"attr": attr,
"value": value.astype(np.float64).tobytes(),
"shape": list(value.shape),
}
else:
msg = {"cmd": "set", "attr": attr, "value": value}
self._send(msg)
def _send(self, msg: dict) -> dict:
if self._zmq_sock is None:
raise RuntimeError("Not connected. Call start() first.")
try:
self._zmq_sock.send(msgpack.packb(msg, use_bin_type=True))
raw = self._zmq_sock.recv()
return msgpack.unpackb(raw, raw=False)
except zmq.error.Again:
raise ConnectionError(
"Server not responding (timeout). Check if server is running."
)
def __del__(self):
self._disconnect()
self._terminate_server()
def _load_last_ip() -> str:
config_path = os.path.expanduser("~/.aiofranka/config.json")
try:
with open(config_path) as f:
return json.load(f).get("last_ip", DEFAULT_IP)
except (FileNotFoundError, json.JSONDecodeError):
return DEFAULT_IP