Metadata-Version: 2.4
Name: amscrot-py
Version: 1.0.0.post17
Summary: AmSC Resource Orchestration Client Toolkit
Author-email: Abdelilah Essiari <aessiari@lbl.gov>, Ezra Kissel <kissel@es.net>
License: MIT License
Project-URL: Homepage, https://github.com/esnet/amsc-isro-toolkit
Keywords: AmSC Resource Orchestration Toolkit,Fabfed Framework,Cloud,FABRIC,Chameleon,MOC
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: fabrictestbed-extensions
Requires-Dist: python-chi==0.17.11
Requires-Dist: sense-o-api==1.48
Requires-Dist: ansible==9.5.1
Requires-Dist: ansible-runner==2.3.6
Requires-Dist: xmltodict
Requires-Dist: boto3
Requires-Dist: google-cloud-compute
Requires-Dist: pydantic
Requires-Dist: paramiko==4.0.0
Requires-Dist: amsc-iri==1.0.0
Requires-Dist: kubernetes==35.0.0
Dynamic: license-file

# Table of contents

 - [Description](#descr)
 - [Installation](#install)
 - [Operating Instructions](#operate)
 - [Jupyter Notebook Examples](#jupyter)
 - [Apache Airflow Support](#airflow)

# <a name="descr"></a>Description
The American Science Cloud Infrastructure Services Resource Orchestration Toolkit (AmSC-ISRO-Toolkit [AmSCROT]) provides _infrastructure_ orchestration for AmSC use.

# <a name="install"></a>Installation

```
pip install amscrot-py
```

# <a name="operate"></a>Operation Instructions

## Credentials

AmSCROT reads provider credentials from `~/.amscrot/credentials.yml`. Each section key corresponds to a service type or profile name:

```yaml
esnet-iri:
  api_key: <token>
  api_endpoint: https://iri.es.net/api/v1

nersc-iri:
  api_key: <token>
  api_endpoint: https://api.iri.nersc.gov/api/v1

amsc-iro:
  api_key: <token>
  api_endpoint: https://...
```

An example credentials file is provided in [credentials-template.yml](examples/client/credentials-template.yml).


## Core Concepts

| Class | Role |
|---|---|
| `Client` | Top-level entry point; owns sessions and service clients |
| `ServiceClient` | Provider-specific driver (Kube/Kueue, ESnet IRI, NERSC IRI, AMSC-IRO) |
| `Session` | Named unit of work; groups jobs and persists state to disk |
| `Job` | A single compute task bound to a `ServiceClient` |
| `JobSpec` | Declares executable, arguments, resources, and provider attributes |
| `DiscoveryResult` | Typed result from `service_client.discover()` |

## Basic Usage

### 1. Set up a Client and ServiceClient

```python
from amscrot.client.client import Client
from amscrot.serviceclient import ServiceClient
from amscrot.util.constants import Constants

client = Client()

# Choose a provider: KUBE, AMSC_IRI, AMSC_IRO
svc = ServiceClient.create(
    type=Constants.ServiceType.AMSC_IRI,
    name="nersc-compute",
    profile="nersc-iri"           # matches credentials.yml section
)
client.add_service_client(svc)
```

### 2. Discover Available Resources

```python
result = svc.discover()      # returns DiscoveryResult

# Iterate typed resources
for item in result.by_type("compute"):
    print(item.data["id"], item.data["name"])

# Normalized Facility objects (provider-agnostic)
for facility in result.facilities:
    print(facility.name, [c.cores for c in (facility.compute or [])])
```

### 3. Define a Job

```python
from amscrot.client.job import Job, JobSpec, JobType, JobServiceType

spec = JobSpec(
    executable="python",
    arguments=["-c", "print('hello')"],
    resources={
            "node_count": 1,
            "process_count": 1,
            "processes_per_node": 1,
            "cpu_cores_per_process": 1,
            "exclusive_node_use": False,
            "memory": 268435456
    },
    attributes={
        "container": {"image": "python:3.12-slim"},  # provider image
        "resource_id": "<compute-resource-id>"       # from discovery
    }
)

job = Job(
    name="my-job",
    type=JobType.COMPUTE,
    service_type=JobServiceType.BATCH,
    service_client=svc,
    job_spec=spec
)
```

### 4. Create a Session and Submit

```python
session = client.create_session("my-session")
session.add_job(job)

# Validate (raises PlanError on failure)
session.plan(verbose=True)

# Submit all jobs
session.apply()
```

### 5. Wait for Completion

```python
from amscrot.client.job import JobState

results = session.wait(
    timeout=300,
    interval=5,
    verbose=True,
)

for job_name, status in results.items():
    print(f"{job_name}: {status.state}  message={status.message}")
```

`session.wait()` polls until all jobs reach a terminal state (`COMPLETED`, `FAILED`, or `CANCELED`). Pass `jobs=[job1, job2]` to wait on a subset.

### 6. Clean Up

```python
session.destroy()   # cancels running jobs and removes session state
```

Sessions are persisted to `~/.amscrot/sessions/<session-name>/` so they survive process restarts. An existing session is restored automatically on `client.create_session(name)`.

### 7. Fetch Output Files (IRI providers)

After a job completes, stdout/stderr can be downloaded from the remote filesystem:

```python
# Include stdout/stderr paths in the job spec attributes
spec = JobSpec(
    executable="python",
    arguments=["-c", "print('done')"],
    attributes={
        "resource_id": "<compute-resource-id>",
        "directory": "/path/to/workdir",        # remote working directory
        "stdout_path": "/path/to/workdir/out.log",
        "stderr_path": "/path/to/workdir/err.log",
    }
)

# After session.wait() returns COMPLETED:
fetched = session.fetch_output_files(jobs=[job])
# fetched == {"my-job": {"stdout": "/local/.amscrot/sessions/my-session/files/my-job/stdout.log",
#                        "stderr": "/local/.amscrot/sessions/my-session/files/my-job/stderr.log"}}
```

Files are written to `~/.amscrot/sessions/<session-name>/files/<job-name>/` by default. Pass `output_path=` to override.

### 8. Direct Filesystem Access (IRI providers)

ESnet IRI and NERSC IRI service clients expose an `IriFilesystem` interface for direct file operations independent of job submission:

```python
fs = svc.filesystem          # IriFilesystem instance (None if client unavailable)

# Upload a local file to remote storage
fs.upload(storage_resource_id, local_path="/tmp/input.txt", remote_path="/scratch/input.txt")

# Download a remote file
fs.download(storage_resource_id, remote_path="/scratch/out.log", local_path="/tmp/out.log")

# List a remote directory
entries = fs.list(storage_resource_id, remote_path="/scratch/")

# Compute checksum of a remote file
checksum = fs.checksum(storage_resource_id, remote_path="/scratch/data.tar")

# Create a tar archive of a remote directory
fs.compress(storage_resource_id, remote_path="/scratch/results/", archive_path="/scratch/results.tar.gz")
```

`storage_resource_id` is the UUID of a storage resource from `svc.discover()`. For most IRI deployments, the home storage resource is auto-resolved when calling `session.fetch_output_files()`.

## Kubernetes / Kueue Jobs

```python
spec = JobSpec(
    executable="sleep",
    arguments=["30"],
    resources={"requests": {"cpu": "1", "memory": "1Gi"}},
    attributes={
        "container": {"image": "busybox"},
        "namespace": "default",
        "labels": {"kueue.x-k8s.io/queue-name": "compute-queue"},
        "completions": 1,
        "restartPolicy": "Never"
    }
)
```

See [`scripts/kube/setup-kueue.sh`](scripts/kube/setup-kueue.sh) to install Kueue and create the required `ResourceFlavor`, `ClusterQueue`, `LocalQueue`, and `PriorityClass` resources on your cluster.


# <a name="jupyter"></a>Jupyter Notebook Examples

Interactive notebooks are provided under [`examples/notebooks/client/`](examples/notebooks/client/).

| Notebook | Description |
|---|---|
| [amsc_hello_world](examples/notebooks/client/amsc_hello_world.ipynb) | **Start here.** Walks through installation, credential setup, creating a `Client`/`Session`, submitting a job, monitoring with `session.wait()`, and cleanup with `session.destroy()`. |
| [amsc_gpt2_training_job](examples/notebooks/client/amsc_gpt2_training_job.ipynb) | Submits a GPT-2 training job to a remote IRI compute resource, polls for completion, and fetches log output. |
| [amsc_iri_multisite](examples/notebooks/client/amsc_iri_multisite.ipynb) | Demonstrates multi-site job submission across ESnet IRI East and West endpoints using a single session. |
| [amsc_iro_net_xfer](examples/notebooks/client/amsc_iro_net_xfer.ipynb) | Uses the AMSC-IRO backend to orchestrate networked data-transfer jobs with L2 network metadata. |

# <a name="airflow"></a>Apache Airflow Support

The [`airflow/`](airflow/) subdirectory provides custom Airflow operators for submitting and monitoring IRI compute jobs as part of larger data pipelines.

## Operators

- **`IriJobSubmitOperator`** — Plans, submits, and waits for an IRI job. Accepts `service_type`, `profile`, `executable`, `resources`, and `attributes`. Pushes `job_id` to XCom on completion.
- **`IriFetchOutputOperator`** — Downloads stdout/stderr from a completed job to a local directory.

## Quick Start

```bash
cd airflow/
bash setup_airflow.sh --start   # installs deps, initialises DB, launches standalone Airflow
```

Open [http://localhost:8080](http://localhost:8080), configure credentials in `~/.amscrot/credentials.yml`, then trigger the demo DAG (`esnet_iri_example` or `gpt2_training_job`) from the UI or via the REST API:

```bash
curl -X POST http://localhost:8080/api/v2/dags/esnet_iri_example/dagRuns \
     -H "Content-Type: application/json" \
     -u "admin:<password>" -d '{}'
```

See [`airflow/README.md`](airflow/README.md) for the full operator reference and configuration options.
