Metadata-Version: 2.4
Name: nodemanager
Version: 0.1.2
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Intended Audience :: Science/Research
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Rust
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Networking
Classifier: Topic :: Scientific/Engineering
Classifier: Operating System :: OS Independent
Requires-Dist: networkx>=3.1,<4
License-File: LICENSE
Summary: A package to create and manage P2P application
Keywords: nodemanager,node,p2p,peer,network,distributed
Author-email: Cyrille Kenfack <cyrille.kenfack@inria.fr>
Maintainer-email: Cyrille Kenfack <cyrille.kenfack@inria.fr>
Requires-Python: >=3.8
Description-Content-Type: text/markdown; charset=UTF-8; variant=GFM
Project-URL: Bug Tracker, https://gitlab.inria.fr/peer-communication/nodemanager/-/issues
Project-URL: documentation, https://peer-communication.gitlabpages.inria.fr/nodemanager/
Project-URL: homepage, https://peer-communication.gitlabpages.inria.fr/nodemanager/
Project-URL: repository, https://gitlab.inria.fr/peer-communication/nodemanager/-/tree/main/py-nodemanager

# nodemanager

This project is a Python binding of the [nodemanager](https://gitlab.inria.fr/peer-communication/nodemanager/-/tree/main/py-nodemanager) project implemented in Rust. It enables interfacing between Rust code and Python applications using peer-to-peer communication.

## Installation

For testing purposes, you can install it from the repository of Python packages under test.

> Note: A stable solution will soon be available in the Python package repository

```bash
pip install -i https://test.pypi.org/simple/ nodemanager
```

For stable version just type 

```bash
pip install nodemanager
```

---

## Example: Sharing Vectors Between Nodes

In this example, we create a **decentralized application** where each node:

1. Computes a vector by multiplying it with a matrix at each cycle.
2. Selects a random neighbor from its peer-to-peer view.
3. Sends the resulting vector to that neighbor.
4. Receives vectors from other nodes and processes them.

This demonstrates the core features of `nodemanager`: creating a node, configuring its network, defining a custom application, and running it on the P2P network.

---

### Step 1 — Imports

```python
import os
import random
import json
import numpy as np
from argparse import ArgumentParser
from dataclasses import asdict, dataclass
from logging import ERROR, INFO

# nodemanager core classes
from nodemanager.application import App
from nodemanager.node import Node
from nodemanager.utils import (
    Gbps,
    SelectionPolicy,
    PropagationPolicy,
    log,
    configure,
)
from nodemanager.network import (
    NetworkSettings,
    ResquestResponseSettings,
    YamuxSettings,
)
```

- **`App`**: Abstract base class that every application must extend. It defines the lifecycle methods (`periodic_run`, `handle_message`, `send_message`).
- **`Node`**: Represents a peer in the network. It handles connections, peer discovery (via mDNS and/or Kademlia), and message routing.
- **`Gbps`**: Configuration for the *Gossip-Based Peer Sampling* algorithm, which maintains a dynamic and random view of neighbors.
- **`log` / `configure`**: Logging utilities. `configure` sets up file-based logging; `log` writes log messages with a given severity level.
- **`NetworkSettings`**, **`ResquestResponseSettings`**, **`YamuxSettings`**: Fine-grained settings for the underlying libp2p network layer (timeouts, buffer sizes, stream limits, etc.).

---

### Step 2 — Utility function

```python
FILEDIR = os.path.dirname(os.path.abspath(__file__))

def convert_numpy_to_list(obj):
    """Recursively convert numpy arrays to Python lists for JSON serialization."""
    if isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, dict):
        return {key: convert_numpy_to_list(value) for key, value in obj.items()}
    elif isinstance(obj, list):
        return [convert_numpy_to_list(element) for element in obj]
    else:
        return obj
```

This helper is necessary because `json.dumps` cannot serialize `numpy` arrays directly. It recursively converts all arrays to native Python lists.

---

### Step 3 — Define the message structure

```python
@dataclass
class Message:
    sender: str      # Peer ID of the sender
    receiver: str    # Peer ID of the receiver
    data: list       # The vector payload (as a list of floats)
```

Using a `@dataclass` makes it easy to serialize/deserialize messages with `json.dumps(asdict(msg))` and `Message(**json.loads(raw))`.

---

### Step 4 — Create the application

Every application must inherit from `App` and implement two abstract methods:

- **`handle_message(message, node_id)`** — called when a message is received.
- **`periodic_run(view, node_id)`** — called periodically based on `elapsed_time` (in seconds) and `cycles`.

```python
class ShareVec(App):
    def __init__(self, vector, matrix) -> None:
        # Application name: used to register and route messages
        super().__init__(name="share_vec")
        self.vector = vector    # Current vector state
        self.matrix = matrix    # Transformation matrix

    def handle_message(self, message: str, node_id: str = None):
        """Called when this node receives a message from another node."""
        log(INFO, "Node %s received a message (%s bytes)", self.id, len(message.encode()))
        log(INFO, "BEFORE -> Node %s : %s", self.id, self.vector)

        # Deserialize the message and average the received vector with our own
        rcv_msg = json.loads(message)
        msg = Message(**rcv_msg)
        self.vector = np.mean(
            np.array([self.vector, np.array(msg.data)]), axis=0
        )

        log(INFO, "AFTER  -> Node %s : %s", self.id, self.vector)

    def periodic_run(self, view: list, node_id: str = None):
        """
        Called periodically (every `elapsed_time` seconds, for `cycles` iterations).

        Parameters
        ----------
        view : list
            The current list of known peer IDs (provided by the peer sampling service).
        node_id : str
            The ID of the current node.
        """
        log(INFO, "Node %s — periodic_run — BEFORE: %s", self.id, self.vector)

        # Step 1: Transform the vector
        self.vector = np.dot(self.vector, self.matrix)

        # Step 2: If we have neighbors, pick one at random and send our vector
        if not view:
            log(INFO, "Node %s has no neighbors yet, skipping send.", self.id)
            return

        rec = random.choice(view)
        msge = Message(
            sender=self.id,
            receiver=rec,
            data=convert_numpy_to_list(self.vector),
        )

        # Step 3: Send the serialized message with a timeout of 30 seconds
        self.send_message(
            message=json.dumps(asdict(msge)),
            destination=rec,
            timeout=30,
        )

        log(INFO, "Node %s — periodic_run — AFTER: %s — sent to %s", self.id, self.vector, rec)

    def send_message(self, message: str, destination: str, timeout: int = 2) -> str:
        """
        Queue a message to be sent to a destination peer.

        Parameters
        ----------
        message : str
            Serialized message content.
        destination : str
            Peer ID of the target node.
        timeout : int
            Timeout in seconds for the send operation (default: 2).
        """
        return super().send_message(message, destination, timeout)
```

> **Key points:**
> - `periodic_run` receives the `view` parameter directly from the peer sampling service — no need to call `node.get_view()` manually.
> - `send_message` queues the message internally; Rust handles the actual network transmission asynchronously after each `periodic_run` / `handle_message` invocation.

---

### Step 5 — Parse arguments and configure logging

```python
if __name__ == "__main__":

    parser = ArgumentParser(description="Decentralized Vector Sharing Example.")
    parser.add_argument("--cid", type=int, help="ID of current node")
    parser.add_argument("--file", type=str, help="Prefix for log file names")
    parser.add_argument("--nb", type=int, help="Total number of nodes")
    args = parser.parse_args()

    # Configure file-based logging (one log file per node)
    configure(
        identifier=f"{args.file}{args.cid}",
        filename=f"logs/{args.file}{args.cid}.log",
    )
```

Each node instance writes its logs to a separate file (e.g., `logs/node0.log`, `logs/node1.log`, etc.), making debugging easier in a multi-node setup.

---

### Step 6 — Configure the peer sampling protocol

```python
    # Gossip-Based Peer Sampling (GBPS) configuration
    config_sampling = Gbps(
        view_size=4,                                  # Number of neighbors in the view
        heal=0,                                       # Heal parameter
        swap=0,                                       # Swap parameter
        selection_policy=SelectionPolicy.OLD,         # Select the oldest neighbor
        propagation_policy=PropagationPolicy.PUSHPULL,# Exchange views bidirectionally
        delay=2,                                      # Period (seconds) between sampling cycles
        age=1,                                        # Age increment per cycle
    )

    # Generate the config file for this node
    config_path = os.path.join(FILEDIR, "config", f"node_{args.cid}.json")
    config_sampling.create(config_file=config_path)
```

The `Gbps` class implements the [Gossip-Based Peer Sampling](https://dl.acm.org/doi/10.1145/1275517.1275520) algorithm. The generated JSON config file is read by the Rust backend at startup.

**Available sampling algorithms:**
| Algorithm | Class | Description |
|-----------|-------|-------------|
| GBPS | `Gbps` | Gossip-Based Peer Sampling |
| Brahms | `Brahams` | Biased Random Sampling |
| Basalt | `Basalt` | Byzantine fault-tolerant sampling |

---

### Step 7 — Configure the network layer

```python
    # Request-response protocol settings
    request_response_settings = ResquestResponseSettings(
        max_concurrent_streams=1024,     # Max concurrent inbound + outbound streams
        request_timeout_secs=12000,      # Timeout per request (seconds)
    )

    # Yamux multiplexer settings (controls stream/buffer limits)
    yamux_settings = YamuxSettings(
        max_buffer_size=512 * 1024 * 1024,    # 512 MB max buffer
        receive_window=512 * 1024 * 1024,     # 512 MB receive window
        max_num_streams=80000,                # Max multiplexed streams
    )

    # Assemble the full network configuration
    network_settings = NetworkSettings(
        request_response=request_response_settings,
        yamux=yamux_settings,
        idle_connection_timeout_secs=720,            # Close idle connections after 12 minutes
        max_negotiating_inbound_streams=128,         # Max inbound streams being negotiated
        enable_kad=True,                             # Enable Kademlia DHT for peer discovery
    )
```

> **Tip:** For local testing with mDNS (automatic local peer discovery), you can also pass `enable_mdns=True` and optionally provide custom `MdnsSettings`.

---

### Step 8 — Create the node and register the application

```python
    try:
        # Create a node bound to all interfaces (0.0.0.0) using TCP
        node = Node(
            context="Test",                    # Network context (nodes only communicate within the same context)
            address="0.0.0.0",                 # Listen on all interfaces
            tcp=True,                          # Enable TCP transport
            udp=False,                         # Disable UDP transport
            config_path=config_path,           # Path to the sampling config file
            network_settings=network_settings, # Network layer configuration
        )

        # Create the application instance
        app = ShareVec(
            vector=(np.random.randint(1, 10, size=(5,))) / 10,
            matrix=(np.random.randint(1, 10, size=(5, 5))) / 10,
        )
        app.node = node           # Associate the node with the application
        app.cycles = 10           # Run periodic_run 10 times
        app.elapsed_time = 5      # Wait 5 seconds between each cycle

        # Start the node (begins listening and peer discovery in a background thread)
        node.start()

        # Register the application under the name "share_vec"
        # This name is used for routing messages to the correct handler
        node.register(app.name, app)

        # Block until all registered applications finish their cycles
        # timeout=10 means wait 10ms after the last app finishes before shutting down
        node.run(timeout=10)

        # Access sampling data after execution
        log(INFO, "Sampling data: %s", node.enode.get_data_sampling())

    except Exception as e:
        log(ERROR, f"An error occurred: {e}")

    log(INFO, "Node %s finished", args.cid)
```

---

### Running the example

To run multiple nodes locally, launch each one in a separate terminal:

```bash
# Terminal 1
python example.py --cid 0 --file node --nb 3

# Terminal 2
python example.py --cid 1 --file node --nb 3

# Terminal 3
python example.py --cid 2 --file node --nb 3
```

Each node will:
1. Start and discover peers via Kademlia/mDNS.
2. Execute `periodic_run` every 5 seconds for 10 cycles.
3. Send its computed vector to a random neighbor.
4. Receive vectors from other nodes and average them with its own.
5. Log all activity to `logs/nodeX.log`.

---

### Summary of the lifecycle

```
Node.start()          →  Start listening + peer discovery (background thread)
Node.register(app)    →  Register the application for message routing
                          ↓
              ┌─────────────────────────────┐
              │   Every `elapsed_time` sec: │
              │   1. periodic_run(view)     │
              │   2. Send queued messages   │
              │                             │
              │   On message received:      │
              │   1. handle_message(msg)    │
              │   2. Send queued messages   │
              └─────────────────────────────┘
                          ↓
Node.run(timeout)     →  Block until all apps complete their cycles
```
