Metadata-Version: 2.4
Name: cawlib
Version: 0.0.7
Classifier: Programming Language :: Rust
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Operating System :: POSIX :: Linux
Classifier: Topic :: Scientific/Engineering
Classifier: Topic :: System :: Hardware
Classifier: Intended Audience :: Developers
Classifier: Development Status :: 3 - Alpha
Summary: A high-performance Rust-powered Python library for CAN bus communication and FOC motor control in robotics
Keywords: robotics,can-bus,motor-control,foc,socketcan
Author-email: noxrick91 <noxrick91@gmail.com>
License: Proprietary - All rights reserved
Requires-Python: >=3.8
Description-Content-Type: text/markdown; charset=UTF-8; variant=GFM
Project-URL: Bug Tracker, https://github.com/noxrick91/cawlib/issues
Project-URL: Homepage, https://github.com/noxrick91/cawlib
Project-URL: Repository, https://github.com/noxrick91/cawlib

# cawlib

A high-performance Rust-powered Python library for **CAN bus communication** and **FOC motor control** in robotics. Built with [PyO3](https://pyo3.rs/) and [Tokio](https://tokio.rs/) for blazing-fast async I/O, precise timing, and effortless concurrency.

## Features

- **CAN Bus Communication** — Blocking and non-blocking CAN device interfaces via Linux SocketCAN
- **FOC Motor Control** — Broadcast-style control of up to 4 motors per CAN bus with current/speed/position modes
- **High-Precision Timers** — Three tiers of timing precision, from ~1 ms down to ~1 μs
- **Async Task Execution** — Run Python functions as background tasks with cancellation and result retrieval
- **Lifecycle Management** — Global task manager with graceful shutdown and signal handling (SIGINT/SIGTERM)

## Requirements

- **OS**: Linux (SocketCAN required)
- **Python**: >= 3.8
- **CAN Interface**: A configured SocketCAN interface (e.g. `can0`)

## Installation

```bash
pip install cawlib
```

### CAN Interface Setup

Before using the library, bring up a CAN interface:

```bash
sudo ip link set can0 up type can bitrate 1000000
```

## Quick Start

```python
import cawlib

# Create a motor controller on CAN bus
motor = cawlib.Motor("can0", broadcast_id=0x100)

# Add motor devices (up to 4 per bus)
d1 = motor.add_device(slot_index=0, feedback_id=0x201, mode="speed")
d2 = motor.add_device(slot_index=1, feedback_id=0x202, mode="speed")

# Set target speeds and broadcast
d1.set_target(10.0)   # 10 rad/s
d2.set_target(-5.0)   # -5 rad/s
motor.broadcast()

# Run a periodic control loop at 100 Hz
@cawlib.timer(100.0)
def control_loop():
    motor.broadcast()
    feedback = d1.get_watch_data()
    print(f"pos={feedback['position']:.2f}, speed={feedback['speed']:.2f}")

handle = control_loop()

# Block until Ctrl+C, then gracefully shut down
cawlib.block_all(timeout_ms=5000)
```

## API Reference

### CAN Devices

#### `CanDevice(interface: str)`

Blocking CAN device for synchronous communication.

```python
can = cawlib.CanDevice("can0")
can.send(0x100, [0x01, 0x02, 0x03])
frame_id, data = can.receive()
print(can.interface())
```

| Method | Description |
|--------|-------------|
| `send(id: int, data: list[int])` | Send a CAN frame |
| `receive() -> tuple[int, bytes]` | Receive a CAN frame (blocking) |
| `interface() -> str` | Get CAN interface name |

#### `CanDeviceNonBlocking(interface: str)`

Non-blocking CAN device with background I/O.

```python
can = cawlib.CanDeviceNonBlocking("can0")
can.send(0x100, [0x01, 0x02])
result = can.try_receive()           # Returns None if no frame available
result = can.receive_timeout(1000)   # Wait up to 1000 ms
```

| Method | Description |
|--------|-------------|
| `send(id: int, data: list[int])` | Non-blocking send |
| `try_receive() -> tuple[int, bytes] \| None` | Non-blocking receive |
| `receive_timeout(timeout_ms: int) -> tuple[int, bytes] \| None` | Receive with timeout |
| `interface() -> str` | Get CAN interface name |

### Motor Control

#### `Motor(interface: str, broadcast_id: int)`

Broadcast motor controller supporting up to 4 devices per CAN bus. Uses FOC (Field-Oriented Control) protocol.

```python
motor = cawlib.Motor("can0", broadcast_id=0x100)

# Add devices in different control modes
d1 = motor.add_device(slot_index=0, feedback_id=0x201, mode="current")
d2 = motor.add_device(slot_index=1, feedback_id=0x202, mode="speed")
d3 = motor.add_device(slot_index=2, feedback_id=0x203, mode="position")

# Control
d1.set_target(1.5)     # 1.5 A
d2.set_target(10.0)    # 10 rad/s
d3.set_target(3.14)    # 3.14 rad
motor.broadcast()

# Read feedback
data = d1.get_watch_data()
# {"position": 0.0, "speed": 0.0, "current": 1.5}
```

| Method | Description |
|--------|-------------|
| `add_device(slot_index: int, feedback_id: int, mode: str) -> Device` | Add a motor device. `mode`: `"current"`, `"speed"`, or `"position"` |
| `set_target(slot_index: int, value: float)` | Set target for a slot |
| `broadcast()` | Send broadcast frame to all devices |
| `get_watch_data(slot_index: int) -> dict \| None` | Get feedback data for a slot |

#### `Device`

Individual motor unit within a broadcast group.

| Method / Property | Description |
|-------------------|-------------|
| `set_target(value: float)` | Set target value (unit depends on mode) |
| `get_target() -> float` | Get current target |
| `get_watch_data() -> dict` | Get feedback: `{"position", "speed", "current"}` |
| `slot_index` | Slot index (0–3) |
| `feedback_id` | Feedback CAN ID |

**Control Modes and Units:**

| Mode | Unit | Resolution | Range |
|------|------|------------|-------|
| `current` | A | 0.01 A | ±327.67 A |
| `speed` | rad/s | 0.1 rad/s | ±3276.7 rad/s |
| `position` | rad | 0.01 rad | ±327.67 rad |

### Timers

#### `@timer(frequency_hz: float)`

Standard timer decorator using Tokio async runtime. Resolution ~1–10 ms.

```python
@cawlib.timer(10.0)  # 10 Hz
def update():
    print("tick")

handle = update()
print(handle.actual_frequency)  # Achieved frequency
handle.stop()
```

#### `@precision_timer(frequency_hz: float, realtime: bool = False)`

High-precision timer. Two modes:

- **`realtime=False`** (default): Busy-wait loop, ~10–100 μs resolution
- **`realtime=True`**: Dedicated OS thread, ~1–10 μs resolution (CPU-intensive)

```python
@cawlib.precision_timer(1000.0)  # 1 kHz high-precision
def fast_loop():
    motor.broadcast()

handle = fast_loop()
```

#### `TimerHandle`

Returned when calling a timer-decorated function.

| Method / Property | Description |
|-------------------|-------------|
| `stop()` | Stop the timer |
| `is_running() -> bool` | Check if running |
| `frequency` | Target frequency (Hz) |
| `actual_frequency` | Achieved average frequency (Hz) |
| `tick_count` | Number of ticks elapsed |
| `elapsed_secs` | Elapsed time in seconds |
| `precision_mode` | `"normal"`, `"high"`, or `"realtime"` |
| `task_id` | Task ID |

### Async Tasks

#### `@run_async`

Run a Python function as a background async task.

```python
@cawlib.run_async
def long_task():
    import time
    time.sleep(5)
    return 42

handle = long_task()
handle.wait(timeout_ms=10000)
print(handle.try_result())  # 42
```

#### `AsyncHandle`

| Method / Property | Description |
|-------------------|-------------|
| `is_done() -> bool` | Task finished (success or failure) |
| `is_completed() -> bool` | Completed successfully |
| `is_cancelled() -> bool` | Task was cancelled |
| `cancel()` | Request cancellation |
| `try_result() -> Any \| None` | Get result if done |
| `wait(timeout_ms: int \| None = None)` | Block until done or timeout |
| `task_id` | Task ID |

### Task Management

#### `get_task_manager() -> TaskManager`

Access the global task manager for monitoring and lifecycle control.

```python
tm = cawlib.get_task_manager()
print(tm.status())
# {"running_timers": 2, "running_tasks": 1, "stopped": 0, "total": 3}

tm.stop_all()
tm.wait_all(timeout_ms=5000)
```

| Method / Property | Description |
|-------------------|-------------|
| `running_count` | Number of running tasks |
| `total_count` | Total registered tasks |
| `timer_count` | Running timers |
| `async_task_count` | Running async tasks |
| `stop_all()` | Stop all tasks |
| `wait_all(timeout_ms: int \| None = None)` | Wait for all tasks to finish |
| `shutdown(timeout_ms: int \| None = None)` | Stop all and wait |
| `cleanup()` | Remove finished tasks |
| `status() -> dict` | Get status summary |

### Module-Level Functions

| Function | Description |
|----------|-------------|
| `shutdown_all(timeout_ms=None)` | Stop all tasks and wait for completion |
| `block_all(timeout_ms=None)` | Block until SIGINT/SIGTERM, then gracefully shut down |
| `block_until(wait_ms=None, shutdown_timeout_ms=None)` | Block until timeout or signal, then shut down |

## Example: Complete Motor Control Loop

```python
import cawlib

# Setup
motor = cawlib.Motor("can0", broadcast_id=0x100)
d1 = motor.add_device(slot_index=0, feedback_id=0x201, mode="speed")
d2 = motor.add_device(slot_index=1, feedback_id=0x202, mode="speed")

target_speed = 5.0

# High-frequency control loop
@cawlib.precision_timer(500.0)  # 500 Hz
def control():
    d1.set_target(target_speed)
    d2.set_target(-target_speed)
    motor.broadcast()

# Low-frequency monitoring
@cawlib.timer(2.0)  # 2 Hz
def monitor():
    w1 = d1.get_watch_data()
    w2 = d2.get_watch_data()
    print(f"D1: speed={w1['speed']:.1f} rad/s, current={w1['current']:.2f} A")
    print(f"D2: speed={w2['speed']:.1f} rad/s, current={w2['current']:.2f} A")

# Start everything
ctrl_handle = control()
mon_handle = monitor()

# Block until Ctrl+C
cawlib.block_all(timeout_ms=3000)
```

## License

Proprietary - All rights reserved.

