"""HoneyHive API Client.
This module provides the main HoneyHive client with an ergonomic interface
wrapping the auto-generated API code.
Usage:
from honeyhive.api import HoneyHive
client = HoneyHive(api_key="hh-example")
# Sync usage (project falls back to HH_PROJECT env var if not provided)
configs = client.configurations.list(project="my-project")
configs = client.configurations.list() # uses HH_PROJECT env var
# Async usage
configs = await client.configurations.list_async(project="my-project")
"""
from typing import Any, Dict, List, Optional, Union
from honeyhive._generated.api_config import APIConfig
# Import all models needed by the wrapper layer
from honeyhive._generated.models import (
AddDatapointsResponse,
AddDatapointsToDatasetRequest,
Configuration,
CreateConfigurationRequest,
CreateDatapointRequest,
CreateDatapointResponse,
CreateDatasetRequest,
CreateDatasetResponse,
CreateEventBatchRequest,
CreateModelEvent,
CreateModelEventBatchRequest,
CreateModelEventBatchResponse,
CreateModelEventRequestBody,
CreateProjectRequest,
CreateToolRequest,
CreateToolResponse,
DeleteDatapointResponse,
DeleteExperimentRunResponse,
Event,
ExperimentComparisonResponse,
ExperimentResultResponse,
GetDatapointResponse,
GetDatapointsResponse,
GetDatasetsResponse,
GetEventsRequest,
GetEventsResponse,
GetExperimentRunResponse,
GetRunsResponse,
Metric,
PostEventBatchResponse,
PostEventRequest,
PostEventRequestBody,
PostEventResponse,
PostExperimentRunRequest,
PostExperimentRunResponse,
Project,
PutExperimentRunRequest,
PutExperimentRunResponse,
SessionStartRequest,
StartSessionRequestBody,
StartSessionResponse,
Tool,
UpdateConfigurationRequest,
UpdateDatapointRequest,
UpdateDatasetRequest,
UpdateEventRequest,
UpdateMetricRequest,
UpdateProjectRequest,
UpdateToolRequest,
)
# Import async services
# Import sync services (9 services)
from honeyhive._generated.services import Configurations_service as configs_svc
from honeyhive._generated.services import Datapoints_service as datapoints_svc
from honeyhive._generated.services import Datasets_service as datasets_svc
from honeyhive._generated.services import Events_service as events_svc
from honeyhive._generated.services import Experiments_service as experiments_svc
from honeyhive._generated.services import Metrics_service as metrics_svc
from honeyhive._generated.services import Projects_service as projects_svc
from honeyhive._generated.services import Session_service as session_svc
from honeyhive._generated.services import Tools_service as tools_svc
from honeyhive._generated.services import (
async_Configurations_service as configs_svc_async,
)
from honeyhive._generated.services import (
async_Datapoints_service as datapoints_svc_async,
)
from honeyhive._generated.services import async_Datasets_service as datasets_svc_async
from honeyhive._generated.services import async_Events_service as events_svc_async
from honeyhive._generated.services import (
async_Experiments_service as experiments_svc_async,
)
from honeyhive._generated.services import async_Metrics_service as metrics_svc_async
from honeyhive._generated.services import async_Projects_service as projects_svc_async
from honeyhive._generated.services import async_Session_service as session_svc_async
from honeyhive._generated.services import async_Tools_service as tools_svc_async
from honeyhive.config.utils import resolve_project
# Import alias (Metric is used as the create request body in FED)
from honeyhive.models import CreateMetricRequest
from ._base import BaseAPI
[docs]
class ConfigurationsAPI(BaseAPI):
"""Configurations API."""
# Sync methods
[docs]
def list(
self,
project: Optional[str] = None,
env: Optional[str] = None,
name: Optional[str] = None,
) -> List[Configuration]:
"""List configurations.
Args:
project: Project name. Falls back to HH_PROJECT env var if not provided.
env: Optional environment filter.
name: Optional name filter.
"""
project = resolve_project(project)
return configs_svc.getConfigurations(
self._api_config, project=project, env=env, name=name
)
[docs]
def create(self, request: CreateConfigurationRequest) -> None:
"""Create a configuration."""
return configs_svc.createConfiguration(self._api_config, data=request)
[docs]
def update(self, id: str, request: UpdateConfigurationRequest) -> None:
"""Update a configuration."""
return configs_svc.updateConfiguration(self._api_config, id=id, data=request)
[docs]
def delete(self, id: str) -> None:
"""Delete a configuration."""
return configs_svc.deleteConfiguration(self._api_config, id=id)
# Async methods
[docs]
async def list_async(
self,
project: Optional[str] = None,
env: Optional[str] = None,
name: Optional[str] = None,
) -> List[Configuration]:
"""List configurations asynchronously."""
project = resolve_project(project)
return await configs_svc_async.getConfigurations(
self._api_config, project=project, env=env, name=name
)
[docs]
async def create_async(self, request: CreateConfigurationRequest) -> None:
"""Create a configuration asynchronously."""
return await configs_svc_async.createConfiguration(
self._api_config, data=request
)
[docs]
async def update_async(self, id: str, request: UpdateConfigurationRequest) -> None:
"""Update a configuration asynchronously."""
return await configs_svc_async.updateConfiguration(
self._api_config, id=id, data=request
)
[docs]
async def delete_async(self, id: str) -> None:
"""Delete a configuration asynchronously."""
return await configs_svc_async.deleteConfiguration(self._api_config, id=id)
# Backwards compatible aliases
[docs]
def create_configuration(self, request: CreateConfigurationRequest) -> None:
"""Create a configuration (backwards compatible alias)."""
return self.create(request)
[docs]
def update_configuration(
self, id: str, request: UpdateConfigurationRequest
) -> None:
"""Update a configuration (backwards compatible alias)."""
return self.update(id, request)
[docs]
def delete_configuration(self, id: str) -> None:
"""Delete a configuration (backwards compatible alias)."""
return self.delete(id)
[docs]
def list_configurations(
self, project: Optional[str] = None, **kwargs: Any
) -> List[Configuration]:
"""List configurations (backwards compatible alias)."""
return self.list(project=project, **kwargs)
[docs]
class DatapointsAPI(BaseAPI):
"""Datapoints API."""
# Sync methods
[docs]
def list(
self,
project: Optional[str] = None,
datapoint_ids: Optional[List[str]] = None,
dataset_name: Optional[str] = None,
) -> GetDatapointsResponse:
"""List datapoints.
Args:
project: Project name. Falls back to HH_PROJECT env var if not provided.
datapoint_ids: Optional list of datapoint IDs to fetch.
dataset_name: Optional dataset name to filter by.
"""
project = resolve_project(project)
return datapoints_svc.getDatapoints(
self._api_config,
project=project,
datapoint_ids=datapoint_ids,
dataset_name=dataset_name,
)
[docs]
def get(self, id: str) -> GetDatapointResponse:
"""Get a datapoint by ID."""
return datapoints_svc.getDatapoint(self._api_config, id=id)
[docs]
def create(self, request: CreateDatapointRequest) -> CreateDatapointResponse:
"""Create a datapoint."""
return datapoints_svc.createDatapoint(self._api_config, data=request)
[docs]
def update(self, id: str, request: UpdateDatapointRequest) -> None:
"""Update a datapoint."""
return datapoints_svc.updateDatapoint(self._api_config, id=id, data=request)
[docs]
def delete(self, id: str) -> DeleteDatapointResponse:
"""Delete a datapoint."""
return datapoints_svc.deleteDatapoint(self._api_config, id=id)
# Async methods
[docs]
async def list_async(
self,
project: Optional[str] = None,
datapoint_ids: Optional[List[str]] = None,
dataset_name: Optional[str] = None,
) -> GetDatapointsResponse:
"""List datapoints asynchronously."""
project = resolve_project(project)
return await datapoints_svc_async.getDatapoints(
self._api_config,
project=project,
datapoint_ids=datapoint_ids,
dataset_name=dataset_name,
)
[docs]
async def get_async(self, id: str) -> GetDatapointResponse:
"""Get a datapoint by ID asynchronously."""
return await datapoints_svc_async.getDatapoint(self._api_config, id=id)
[docs]
async def create_async(
self, request: CreateDatapointRequest
) -> CreateDatapointResponse:
"""Create a datapoint asynchronously."""
return await datapoints_svc_async.createDatapoint(
self._api_config, data=request
)
[docs]
async def update_async(self, id: str, request: UpdateDatapointRequest) -> None:
"""Update a datapoint asynchronously."""
return await datapoints_svc_async.updateDatapoint(
self._api_config, id=id, data=request
)
[docs]
async def delete_async(self, id: str) -> DeleteDatapointResponse:
"""Delete a datapoint asynchronously."""
return await datapoints_svc_async.deleteDatapoint(self._api_config, id=id)
# Backwards compatible aliases
[docs]
def get_datapoint(self, id: str) -> GetDatapointResponse:
"""Get a datapoint by ID (backwards compatible alias for get())."""
return self.get(id)
[docs]
def create_datapoint(
self, request: CreateDatapointRequest
) -> CreateDatapointResponse:
"""Create a datapoint (backwards compatible alias for create())."""
return self.create(request)
[docs]
def update_datapoint(self, id: str, request: UpdateDatapointRequest) -> None:
"""Update a datapoint (backwards compatible alias for update())."""
return self.update(id, request)
[docs]
def delete_datapoint(self, id: str) -> DeleteDatapointResponse:
"""Delete a datapoint (backwards compatible alias for delete())."""
return self.delete(id)
[docs]
def list_datapoints(
self, project: Optional[str] = None, **kwargs: Any
) -> GetDatapointsResponse:
"""List datapoints (backwards compatible alias)."""
return self.list(project=project, **kwargs)
[docs]
class DatasetsAPI(BaseAPI):
"""Datasets API."""
# Sync methods
[docs]
def list(
self,
project: Optional[str] = None,
type: Optional[str] = None,
dataset_id: Optional[str] = None,
) -> GetDatasetsResponse:
"""List datasets.
Args:
project: Project name. Falls back to HH_PROJECT env var if not provided.
type: Optional dataset type filter.
dataset_id: Optional dataset ID to fetch.
"""
project = resolve_project(project)
return datasets_svc.getDatasets(
self._api_config,
project=project,
type=type,
dataset_id=dataset_id,
)
[docs]
def create(self, request: CreateDatasetRequest) -> CreateDatasetResponse:
"""Create a dataset."""
return datasets_svc.createDataset(self._api_config, data=request)
[docs]
def update(self, request: UpdateDatasetRequest) -> None:
"""Update a dataset."""
return datasets_svc.updateDataset(self._api_config, data=request)
[docs]
def delete(self, id: str) -> None:
"""Delete a dataset."""
return datasets_svc.deleteDataset(self._api_config, dataset_id=id)
[docs]
def add_datapoints(
self,
dataset_id: str,
request: Union[AddDatapointsToDatasetRequest, Dict[str, Any]],
) -> AddDatapointsResponse:
"""Add datapoints to a dataset."""
if isinstance(request, AddDatapointsToDatasetRequest):
req = request
else:
req = AddDatapointsToDatasetRequest(**request)
return datasets_svc.addDatapoints(
self._api_config, dataset_id=dataset_id, data=req
)
# Async methods
[docs]
async def list_async(
self,
project: Optional[str] = None,
type: Optional[str] = None,
dataset_id: Optional[str] = None,
) -> GetDatasetsResponse:
"""List datasets asynchronously."""
project = resolve_project(project)
return await datasets_svc_async.getDatasets(
self._api_config,
project=project,
type=type,
dataset_id=dataset_id,
)
[docs]
async def create_async(
self, request: CreateDatasetRequest
) -> CreateDatasetResponse:
"""Create a dataset asynchronously."""
return await datasets_svc_async.createDataset(self._api_config, data=request)
[docs]
async def update_async(self, request: UpdateDatasetRequest) -> None:
"""Update a dataset asynchronously."""
return await datasets_svc_async.updateDataset(self._api_config, data=request)
[docs]
async def delete_async(self, id: str) -> None:
"""Delete a dataset asynchronously."""
return await datasets_svc_async.deleteDataset(self._api_config, dataset_id=id)
[docs]
async def add_datapoints_async(
self,
dataset_id: str,
request: Union[AddDatapointsToDatasetRequest, Dict[str, Any]],
) -> AddDatapointsResponse:
"""Add datapoints to a dataset asynchronously."""
if isinstance(request, AddDatapointsToDatasetRequest):
req = request
else:
req = AddDatapointsToDatasetRequest(**request)
return await datasets_svc_async.addDatapoints(
self._api_config, dataset_id=dataset_id, data=req
)
# Backwards compatible aliases
[docs]
def get_dataset(
self, id: str, project: Optional[str] = None
) -> GetDatasetsResponse:
"""Get a dataset by ID (backwards compatible alias)."""
return self.list(project=project, dataset_id=id)
[docs]
def create_dataset(self, request: CreateDatasetRequest) -> CreateDatasetResponse:
"""Create a dataset (backwards compatible alias for create())."""
return self.create(request)
[docs]
def update_dataset(self, request: UpdateDatasetRequest) -> None:
"""Update a dataset (backwards compatible alias for update())."""
return self.update(request)
[docs]
def delete_dataset(self, id: str) -> None:
"""Delete a dataset (backwards compatible alias for delete())."""
return self.delete(id)
[docs]
def list_datasets(
self, project: Optional[str] = None, **kwargs: Any
) -> GetDatasetsResponse:
"""List datasets (backwards compatible alias)."""
return self.list(project=project, **kwargs)
[docs]
class EventsAPI(BaseAPI):
"""Events API."""
# Sync methods
[docs]
def list(self, query: Union[GetEventsRequest, Dict[str, Any]]) -> GetEventsResponse:
"""Get events (POST /events/export)."""
if isinstance(query, GetEventsRequest):
req = query
else:
req = GetEventsRequest(**query)
return events_svc.getEvents(self._api_config, data=req)
[docs]
def create(
self, request: Union[PostEventRequestBody, PostEventRequest, Dict[str, Any]]
) -> PostEventResponse:
"""Create an event."""
if isinstance(request, PostEventRequestBody):
req = request
elif isinstance(request, dict):
req = PostEventRequestBody(event=PostEventRequest(**request))
else:
req = PostEventRequestBody(event=request)
return events_svc.createEvent(self._api_config, data=req)
[docs]
def update(self, data: Union[UpdateEventRequest, Dict[str, Any]]) -> None:
"""Update an event."""
if isinstance(data, UpdateEventRequest):
req = data
else:
req = UpdateEventRequest(**data)
return events_svc.updateEvent(self._api_config, data=req)
[docs]
def create_batch(
self, data: Union[CreateEventBatchRequest, Dict[str, Any]]
) -> PostEventBatchResponse:
"""Create events in batch."""
if isinstance(data, CreateEventBatchRequest):
req = data
else:
req = CreateEventBatchRequest(**data)
return events_svc.createEventBatch(self._api_config, data=req)
[docs]
def create_model_event(
self, data: Union[CreateModelEventRequestBody, CreateModelEvent, Dict[str, Any]]
) -> PostEventResponse:
"""Create a model event."""
if isinstance(data, CreateModelEventRequestBody):
req = data
elif isinstance(data, dict):
req = CreateModelEventRequestBody(model_event=CreateModelEvent(**data))
else:
req = CreateModelEventRequestBody(model_event=data)
return events_svc.createModelEvent(self._api_config, data=req)
[docs]
def create_model_event_batch(
self, data: Union[CreateModelEventBatchRequest, Dict[str, Any]]
) -> CreateModelEventBatchResponse:
"""Create model events in batch."""
if isinstance(data, CreateModelEventBatchRequest):
req = data
else:
req = CreateModelEventBatchRequest(**data)
return events_svc.createModelEventBatch(self._api_config, data=req)
# Async methods
[docs]
async def list_async(
self,
query: Union[GetEventsRequest, Dict[str, Any]],
) -> GetEventsResponse:
"""Get events asynchronously."""
if isinstance(query, GetEventsRequest):
req = query
else:
req = GetEventsRequest(**query)
return await events_svc_async.getEvents(self._api_config, data=req)
[docs]
async def create_async(
self,
request: Union[PostEventRequestBody, PostEventRequest, Dict[str, Any]],
) -> PostEventResponse:
"""Create an event asynchronously."""
if isinstance(request, PostEventRequestBody):
req = request
elif isinstance(request, dict):
req = PostEventRequestBody(event=PostEventRequest(**request))
else:
req = PostEventRequestBody(event=request)
return await events_svc_async.createEvent(self._api_config, data=req)
[docs]
async def update_async(
self, data: Union[UpdateEventRequest, Dict[str, Any]]
) -> None:
"""Update an event asynchronously."""
if isinstance(data, UpdateEventRequest):
req = data
else:
req = UpdateEventRequest(**data)
return await events_svc_async.updateEvent(self._api_config, data=req)
[docs]
async def create_batch_async(
self, data: Union[CreateEventBatchRequest, Dict[str, Any]]
) -> PostEventBatchResponse:
"""Create events in batch asynchronously."""
if isinstance(data, CreateEventBatchRequest):
req = data
else:
req = CreateEventBatchRequest(**data)
return await events_svc_async.createEventBatch(self._api_config, data=req)
# Backwards compatible aliases
[docs]
def create_event(
self, request: Union[PostEventRequestBody, PostEventRequest, Dict[str, Any]]
) -> PostEventResponse:
"""Create an event (backwards compatible alias for create())."""
return self.create(request)
[docs]
def update_event(self, data: Union[UpdateEventRequest, Dict[str, Any]]) -> None:
"""Update an event (backwards compatible alias for update())."""
return self.update(data)
[docs]
def list_events(
self, query: Union[GetEventsRequest, Dict[str, Any]]
) -> GetEventsResponse:
"""List events (backwards compatible alias for list())."""
return self.list(query)
[docs]
def get_events(
self, query: Union[GetEventsRequest, Dict[str, Any]]
) -> GetEventsResponse:
"""Get events (backwards compatible alias for list())."""
return self.list(query)
[docs]
def get_by_session_id(self, session_id: str) -> GetEventsResponse:
"""Get session event by session ID (GET /session/{session_id}).
Returns a GetEventsResponse with the session event in the 'events' list.
"""
result = session_svc.getSession(self._api_config, session_id=session_id)
# getSession returns an Event object; wrap in GetEventsResponse
if hasattr(result, "model_dump"):
return GetEventsResponse(events=[result])
elif isinstance(result, dict):
return GetEventsResponse(events=[Event(**result)])
return GetEventsResponse(events=[Event.model_validate(result)])
[docs]
class ExperimentsAPI(BaseAPI):
"""Experiments API."""
# Sync methods
[docs]
def list_runs(self, project: Optional[str] = None) -> GetRunsResponse:
"""List experiment runs.
Args:
project: Optional project name filter.
"""
return experiments_svc.getRuns(self._api_config, project=project)
[docs]
def get_run(self, run_id: str) -> GetExperimentRunResponse:
"""Get an experiment run by ID."""
return experiments_svc.getRun(self._api_config, run_id=run_id)
[docs]
def create_run(
self, request: PostExperimentRunRequest
) -> PostExperimentRunResponse:
"""Create an experiment run."""
return experiments_svc.createRun(self._api_config, data=request)
[docs]
def update_run(
self, run_id: str, request: PutExperimentRunRequest
) -> PutExperimentRunResponse:
"""Update an experiment run."""
return experiments_svc.updateRun(self._api_config, run_id=run_id, data=request)
[docs]
def delete_run(self, run_id: str) -> DeleteExperimentRunResponse:
"""Delete an experiment run."""
return experiments_svc.deleteRun(self._api_config, run_id=run_id)
[docs]
def get_result(
self,
run_id: str,
project_id: Optional[str] = None,
aggregate_function: Optional[str] = None,
) -> Dict[str, Any]:
"""Get experiment run result.
Args:
run_id: The experiment run ID.
project_id: The project ID. Falls back to HH_PROJECT env var if not provided.
aggregate_function: Aggregation function to apply.
"""
project_id = resolve_project(project_id)
result = experiments_svc.getExperimentResult(
self._api_config,
run_id=run_id,
project_id=project_id,
aggregate_function=aggregate_function,
)
return result.model_dump() if hasattr(result, "model_dump") else dict(result)
[docs]
def compare_runs(
self,
new_run_id: str,
old_run_id: str,
project_id: Optional[str] = None,
aggregate_function: Optional[str] = None,
) -> Dict[str, Any]:
"""Compare two experiment runs.
Args:
new_run_id: The new run ID to compare (maps to run_id_1).
old_run_id: The old run ID to compare against (maps to run_id_2).
project_id: The project ID. Falls back to HH_PROJECT env var if not provided.
aggregate_function: Aggregation function to apply.
"""
project_id = resolve_project(project_id)
result = experiments_svc.getExperimentComparison(
self._api_config,
project_id=project_id,
run_id_1=new_run_id,
run_id_2=old_run_id,
aggregate_function=aggregate_function,
)
return result.model_dump() if hasattr(result, "model_dump") else dict(result)
# Async methods
[docs]
async def list_runs_async(self, project: Optional[str] = None) -> GetRunsResponse:
"""List experiment runs asynchronously."""
return await experiments_svc_async.getRuns(self._api_config, project=project)
[docs]
async def get_run_async(self, run_id: str) -> GetExperimentRunResponse:
"""Get an experiment run by ID asynchronously."""
return await experiments_svc_async.getRun(self._api_config, run_id=run_id)
[docs]
async def create_run_async(
self, request: PostExperimentRunRequest
) -> PostExperimentRunResponse:
"""Create an experiment run asynchronously."""
return await experiments_svc_async.createRun(self._api_config, data=request)
[docs]
async def update_run_async(
self, run_id: str, request: PutExperimentRunRequest
) -> PutExperimentRunResponse:
"""Update an experiment run asynchronously."""
return await experiments_svc_async.updateRun(
self._api_config, run_id=run_id, data=request
)
[docs]
async def delete_run_async(self, run_id: str) -> DeleteExperimentRunResponse:
"""Delete an experiment run asynchronously."""
return await experiments_svc_async.deleteRun(self._api_config, run_id=run_id)
[docs]
async def get_result_async(
self,
run_id: str,
project_id: Optional[str] = None,
aggregate_function: Optional[str] = None,
) -> Dict[str, Any]:
"""Get experiment run result asynchronously."""
project_id = resolve_project(project_id)
result = await experiments_svc_async.getExperimentResult(
self._api_config,
run_id=run_id,
project_id=project_id,
aggregate_function=aggregate_function,
)
return result.model_dump() if hasattr(result, "model_dump") else dict(result)
[docs]
async def compare_runs_async(
self,
new_run_id: str,
old_run_id: str,
project_id: Optional[str] = None,
aggregate_function: Optional[str] = None,
) -> Dict[str, Any]:
"""Compare two experiment runs asynchronously."""
project_id = resolve_project(project_id)
result = await experiments_svc_async.getExperimentComparison(
self._api_config,
project_id=project_id,
run_id_1=new_run_id,
run_id_2=old_run_id,
aggregate_function=aggregate_function,
)
return result.model_dump() if hasattr(result, "model_dump") else dict(result)
# Backwards compatible aliases
[docs]
def get_run_result(
self,
run_id: str,
project_id: Optional[str] = None,
aggregate_function: Optional[str] = None,
) -> Dict[str, Any]:
"""Get experiment run result (alias for get_result)."""
return self.get_result(run_id, project_id, aggregate_function)
[docs]
class MetricsAPI(BaseAPI):
"""Metrics API."""
# Sync methods
[docs]
def list(self, project: Optional[str] = None) -> List[Metric]:
"""List metrics.
Args:
project: Project name. Falls back to HH_PROJECT env var if not provided.
"""
project = resolve_project(project)
return metrics_svc.getMetrics(self._api_config, project_name=project)
[docs]
def create(self, request: CreateMetricRequest) -> None:
"""Create a metric."""
return metrics_svc.createMetric(self._api_config, data=request)
[docs]
def update(self, request: UpdateMetricRequest) -> None:
"""Update a metric."""
return metrics_svc.updateMetric(self._api_config, data=request)
[docs]
def delete(self, id: str) -> None:
"""Delete a metric."""
return metrics_svc.deleteMetric(self._api_config, metric_id=id)
# Async methods
[docs]
async def list_async(self, project: Optional[str] = None) -> List[Metric]:
"""List metrics asynchronously."""
project = resolve_project(project)
return await metrics_svc_async.getMetrics(
self._api_config, project_name=project
)
[docs]
async def create_async(self, request: CreateMetricRequest) -> None:
"""Create a metric asynchronously."""
return await metrics_svc_async.createMetric(self._api_config, data=request)
[docs]
async def update_async(self, request: UpdateMetricRequest) -> None:
"""Update a metric asynchronously."""
return await metrics_svc_async.updateMetric(self._api_config, data=request)
[docs]
async def delete_async(self, id: str) -> None:
"""Delete a metric asynchronously."""
return await metrics_svc_async.deleteMetric(self._api_config, metric_id=id)
# Backwards compatible aliases
[docs]
def create_metric(self, request: CreateMetricRequest) -> None:
"""Create a metric (backwards compatible alias)."""
return self.create(request)
[docs]
def update_metric(self, request: UpdateMetricRequest) -> None:
"""Update a metric (backwards compatible alias)."""
return self.update(request)
[docs]
def delete_metric(self, id: str) -> None:
"""Delete a metric (backwards compatible alias)."""
return self.delete(id)
[docs]
def list_metrics(self, project: Optional[str] = None) -> List[Metric]:
"""List metrics (backwards compatible alias)."""
return self.list(project=project)
[docs]
class ProjectsAPI(BaseAPI):
"""Projects API."""
# Sync methods
[docs]
def list(self, name: Optional[str] = None) -> List[Project]:
"""List projects."""
return projects_svc.getProjects(self._api_config, name=name)
[docs]
def create(self, data: CreateProjectRequest) -> Project:
"""Create a project."""
return projects_svc.createProject(self._api_config, data=data)
[docs]
def update(self, data: UpdateProjectRequest) -> None:
"""Update a project."""
return projects_svc.updateProject(self._api_config, data=data)
[docs]
def delete(self, name: str) -> None:
"""Delete a project."""
return projects_svc.deleteProject(self._api_config, name=name)
# Async methods
[docs]
async def list_async(self, name: Optional[str] = None) -> List[Project]:
"""List projects asynchronously."""
return await projects_svc_async.getProjects(self._api_config, name=name)
[docs]
async def create_async(self, data: CreateProjectRequest) -> Project:
"""Create a project asynchronously."""
return await projects_svc_async.createProject(self._api_config, data=data)
[docs]
async def update_async(self, data: UpdateProjectRequest) -> None:
"""Update a project asynchronously."""
return await projects_svc_async.updateProject(self._api_config, data=data)
[docs]
async def delete_async(self, name: str) -> None:
"""Delete a project asynchronously."""
return await projects_svc_async.deleteProject(self._api_config, name=name)
# Backwards compatible aliases
[docs]
def get_project(self, id: str) -> List[Project]:
"""Get a project (backwards compatible alias)."""
return self.list(name=id)
[docs]
def create_project(self, data: Dict[str, Any]) -> Project:
"""Create a project (backwards compatible alias)."""
return self.create(CreateProjectRequest(**data))
[docs]
def update_project(self, data: Dict[str, Any]) -> None:
"""Update a project (backwards compatible alias)."""
return self.update(UpdateProjectRequest(**data))
[docs]
def delete_project(self, name: str) -> None:
"""Delete a project (backwards compatible alias)."""
return self.delete(name)
[docs]
def list_projects(self, name: Optional[str] = None) -> List[Project]:
"""List projects (backwards compatible alias)."""
return self.list(name=name)
[docs]
class SessionsAPI(BaseAPI):
"""Sessions API.
Supports startSession and getSession operations.
"""
# Sync methods
[docs]
def get(self, session_id: str) -> Event:
"""Get a session by ID."""
return session_svc.getSession(self._api_config, session_id=session_id)
[docs]
def start(
self, data: Union[StartSessionRequestBody, SessionStartRequest, Dict[str, Any]]
) -> StartSessionResponse:
"""Start a new session."""
if isinstance(data, StartSessionRequestBody):
req = data
elif isinstance(data, dict):
req = StartSessionRequestBody(session=SessionStartRequest(**data))
else:
req = StartSessionRequestBody(session=data)
return session_svc.startSession(self._api_config, data=req)
# Async methods
[docs]
async def get_async(self, session_id: str) -> Event:
"""Get a session by ID asynchronously."""
return await session_svc_async.getSession(
self._api_config, session_id=session_id
)
[docs]
async def start_async(
self, data: Union[StartSessionRequestBody, SessionStartRequest, Dict[str, Any]]
) -> StartSessionResponse:
"""Start a new session asynchronously."""
if isinstance(data, StartSessionRequestBody):
req = data
elif isinstance(data, dict):
req = StartSessionRequestBody(session=SessionStartRequest(**data))
else:
req = StartSessionRequestBody(session=data)
return await session_svc_async.startSession(self._api_config, data=req)
# Backwards compatible aliases
[docs]
def create_session(
self,
request: Union[StartSessionRequestBody, SessionStartRequest, Dict[str, Any]],
) -> StartSessionResponse:
"""Create/start a session (backwards compatible alias for start())."""
return self.start(request)
[docs]
def start_session(
self,
request: Union[StartSessionRequestBody, SessionStartRequest, Dict[str, Any]],
) -> StartSessionResponse:
"""Start a session (backwards compatible alias for start())."""
return self.start(request)
[docs]
def get_session(self, session_id: str) -> Event:
"""Get a session (backwards compatible alias for get())."""
return self.get(session_id)
[docs]
class HoneyHive:
"""Main HoneyHive API client.
Provides an ergonomic interface to the HoneyHive API with both
sync and async methods.
Usage:
client = HoneyHive(api_key="hh-example")
# Sync
configs = client.configurations.list(project="my-project")
# Async
configs = await client.configurations.list_async(project="my-project")
Attributes:
configurations: API for managing configurations.
datapoints: API for managing datapoints.
datasets: API for managing datasets.
events: API for managing events.
experiments: API for managing experiment runs.
metrics: API for managing metrics.
projects: API for managing projects.
sessions: API for managing sessions.
tools: API for managing tools.
"""
[docs]
def __init__(
self,
api_key: Optional[str] = None,
*,
# Primary URL parameters
base_url: Optional[str] = None,
cp_base_url: Optional[str] = None,
# Backwards compatible alias for base_url
server_url: Optional[str] = None,
# Backwards compatible parameters (accepted but not used in new client)
timeout: Optional[float] = None,
retry_config: Optional[Any] = None,
rate_limit_calls: Optional[int] = None,
rate_limit_window: Optional[float] = None,
max_connections: Optional[int] = None,
max_keepalive: Optional[int] = None,
test_mode: Optional[bool] = None,
verbose: Optional[bool] = None,
tracer_instance: Optional[Any] = None,
) -> None:
"""Initialize the HoneyHive client.
Args:
api_key: HoneyHive API key (typically starts with ``hh_``).
Falls back to HH_API_KEY environment variable.
base_url: API base URL for Data Plane/ingestion.
Falls back to HH_API_URL env var, then https://api.honeyhive.ai.
cp_base_url: Control Plane API URL for query endpoints.
Falls back to HH_CP_API_URL, then HH_API_URL env var.
server_url: Deprecated alias for base_url (for backwards compatibility).
timeout: Request timeout in seconds (accepted for backwards compat, not used).
retry_config: Retry configuration (accepted for backwards compat, not used).
rate_limit_calls: Max calls per time window (accepted for backwards compat).
rate_limit_window: Time window in seconds (accepted for backwards compat).
max_connections: Max connections in pool (accepted for backwards compat).
max_keepalive: Max keepalive connections (accepted for backwards compat).
test_mode: Enable test mode (accepted for backwards compat, not used).
verbose: Enable verbose logging (accepted for backwards compat, not used).
tracer_instance: Tracer instance (accepted for backwards compat, not used).
"""
import os
# Resolve API key from parameter or environment
self._api_key: str = (
api_key if api_key is not None else os.environ.get("HH_API_KEY", "")
)
# Resolve base URL: base_url > server_url (legacy) > env var > default
resolved_base_url = (
base_url
or server_url # Legacy parameter
or os.environ.get("HH_API_URL")
or "https://api.honeyhive.ai"
)
# Resolve CP base URL: cp_base_url > HH_CP_API_URL > HH_API_URL > base_url
resolved_cp_base_url = (
cp_base_url
or os.environ.get("HH_CP_API_URL")
or os.environ.get("HH_API_URL")
or resolved_base_url
)
# Store backwards compat params (silently accepted)
self._timeout = timeout
self._test_mode = test_mode if test_mode is not None else False
self._verbose = verbose if verbose is not None else False
self._tracer_instance = tracer_instance
# Create API config
self._api_config = APIConfig(
base_path=resolved_base_url,
access_token=self._api_key,
)
# Store CP URL separately
self._cp_base_url = resolved_cp_base_url
# Initialize API namespaces
self.configurations = ConfigurationsAPI(self._api_config)
self.datapoints = DatapointsAPI(self._api_config)
self.datasets = DatasetsAPI(self._api_config)
self.events = EventsAPI(self._api_config)
self.experiments = ExperimentsAPI(self._api_config)
self.metrics = MetricsAPI(self._api_config)
self.projects = ProjectsAPI(self._api_config)
self.sessions = SessionsAPI(self._api_config)
self.tools = ToolsAPI(self._api_config)
# Alias for backwards compatibility
self.evaluations = self.experiments
@property
def test_mode(self) -> bool:
"""Return whether client is in test mode."""
return self._test_mode
@property
def verbose(self) -> bool:
"""Return whether verbose mode is enabled."""
return self._verbose
@property
def timeout(self) -> Optional[float]:
"""Return the configured timeout."""
return self._timeout
@property
def api_config(self) -> APIConfig:
"""Access the underlying API configuration."""
return self._api_config
@property
def api_key(self) -> str:
"""Get the HoneyHive API key."""
return self._api_key
@property
def server_url(self) -> str:
"""Get the HoneyHive API server URL."""
return self._api_config.base_path
@server_url.setter
def server_url(self, value: str) -> None:
"""Set the HoneyHive API server URL."""
self._api_config.base_path = value