Core Functions

Primary functions for running experiments and managing execution.

evaluate()

evaluate(function, *, dataset=None, dataset_id=None, evaluators=None, instrumentors=None, api_key=None, server_url=None, project=None, name=None, max_workers=10, aggregate_function='average', verbose=False, print_results=True)

Run an experiment by executing a function against a dataset and evaluating outputs.

This is the main entry point for the experiments framework. It handles:

  • Function execution with tracer integration

  • Evaluator orchestration (sync and async)

  • Session/event linking

  • Results aggregation via backend

Parameters:
  • function (Callable[[Dict[str, Any]], Dict[str, Any]]) – Function to test. Should accept Dict[str, Any] (datapoint) and return Dict[str, Any] (outputs).

  • dataset (Optional[List[Dict[str, Any]]]) – List of test cases with inputs and optional ground_truth. Mutually exclusive with dataset_id.

  • dataset_id (Optional[str]) – ID of HoneyHive-managed dataset. Mutually exclusive with dataset.

  • evaluators (Optional[List[Callable]]) – List of evaluator functions decorated with @evaluator or @aevaluator.

  • api_key (Optional[str]) – HoneyHive API key. Falls back to HH_API_KEY environment variable.

  • project (Optional[str]) – HoneyHive project name. Falls back to HH_PROJECT environment variable.

  • name (Optional[str]) – Human-readable name for this experiment run.

  • instrumentors (Optional[List[Callable[[], Any]]]) – List of instrumentor factory functions. Each factory returns a new instrumentor instance. Example: [lambda: OpenAIInstrumentor()]

  • server_url (Optional[str]) – HoneyHive server URL. Falls back to HH_API_URL environment variable.

  • max_workers (int) – Maximum number of concurrent workers for parallel execution. Default: 10.

  • print_results (bool) – Print a formatted results table after evaluation. Default: True.

  • aggregate_function (str) – Aggregation method for metrics (“average”, “sum”, “min”, “max”).

  • verbose (bool) – Enable detailed logging.

Returns:

Experiment result summary with aggregated metrics.

Return type:

ExperimentResultSummary

Raises:

ValueError – If neither dataset nor dataset_id provided, or if both provided.

Basic Usage

from honeyhive.experiments import evaluate, evaluator

@evaluator
def accuracy_evaluator(outputs, inputs, ground_truth):
    return {"score": 1.0 if outputs == ground_truth else 0.0}

def my_llm_function(datapoint):
    inputs = datapoint["inputs"]
    # Your LLM logic here
    return {"answer": process(inputs["query"])}

result = evaluate(
    function=my_llm_function,
    dataset=[
        {"inputs": {"query": "Q1"}, "ground_truth": {"answer": "A1"}},
        {"inputs": {"query": "Q2"}, "ground_truth": {"answer": "A2"}},
    ],
    evaluators=[accuracy_evaluator],
    api_key="your-api-key",
    project="your-project",
    name="accuracy-test-v1"
)

print(f"Success: {result.success}")
print(f"Passed: {result.passed} / {result.passed + result.failed}")
print(f"Avg accuracy: {result.metrics.get_metric('accuracy_evaluator')}")

External Dataset (Client-Side Data)

# SDK auto-generates EXT- prefixed IDs
result = evaluate(
    function=my_function,
    dataset=[
        {"inputs": {"x": 1}, "ground_truth": {"y": 2}},
        {"inputs": {"x": 2}, "ground_truth": {"y": 4}},
    ],
    evaluators=[my_evaluator],
    api_key="key",
    project="project"
)

Managed Dataset (HoneyHive-Stored)

# Use existing dataset by ID
result = evaluate(
    function=my_function,
    dataset_id="dataset-abc-123",  # Pre-created in HoneyHive
    evaluators=[my_evaluator],
    api_key="key",
    project="project"
)

Multiple Evaluators

@evaluator
def accuracy(outputs, inputs, ground_truth):
    return {"score": calculate_accuracy(outputs, ground_truth)}

@evaluator
def relevance(outputs, inputs, ground_truth):
    return {"score": calculate_relevance(outputs, inputs)}

@aevaluator
async def external_check(outputs, inputs, ground_truth):
    result = await external_api.validate(outputs)
    return {"score": result.score}

result = evaluate(
    function=my_function,
    dataset=test_data,
    evaluators=[accuracy, relevance, external_check],
    api_key="key",
    project="project",
    max_workers=4  # Parallel execution
)

Accessing Results

result = evaluate(...)

# Overall status
print(f"Run ID: {result.run_id}")
print(f"Status: {result.status}")
print(f"Success: {result.success}")

# Aggregated metrics
accuracy_score = result.metrics.get_metric("accuracy")
all_metrics = result.metrics.get_all_metrics()

# Individual datapoints
for datapoint in result.datapoints:
    print(f"Datapoint: {datapoint}")

run_experiment()

run_experiment(function, dataset, datapoint_ids, *, server_url=None, experiment_context, api_key=None, max_workers=10, verbose=False, instrumentors=None)

Low-level function to execute a function against a dataset with tracer integration.

Warning

This is a low-level API. Most users should use evaluate() instead, which provides a higher-level interface with evaluator support.

Parameters:
  • function (Callable[[Dict[str, Any]], Dict[str, Any]]) – Function to execute for each datapoint.

  • dataset (List[Dict[str, Any]]) – List of datapoints to process.

  • datapoint_ids (List[str]) – List of datapoint IDs (must match dataset length).

  • experiment_context (ExperimentContext) – Context with run_id, dataset_id, project, source.

  • server_url (Optional[str]) – HoneyHive server URL. Falls back to HH_API_URL env var.

  • api_key (Optional[str]) – HoneyHive API key. Falls back to HH_API_KEY env var.

  • max_workers (int) – Maximum concurrent workers. Default: 10.

  • verbose (bool) – Enable detailed logging.

  • instrumentors (Optional[List[Callable[[], Any]]]) – List of instrumentor factory functions per datapoint.

Returns:

List of execution results with outputs, errors, and session IDs.

Return type:

List[Dict[str, Any]]

Usage Example

from honeyhive.experiments import run_experiment, ExperimentContext

context = ExperimentContext(
    run_id="run-123",
    dataset_id="dataset-456",
    project="my-project",
    source="test"
)

results = run_experiment(
    function=my_function,
    dataset=test_data,
    datapoint_ids=["dp-1", "dp-2", "dp-3"],
    experiment_context=context,
    api_key="key",
    max_workers=2
)

for result in results:
    print(f"Datapoint: {result['datapoint_id']}")
    print(f"Status: {result['status']}")
    print(f"Outputs: {result['outputs']}")
    if result['error']:
        print(f"Error: {result['error']}")

ExperimentContext

class ExperimentContext

Context object storing experiment metadata for tracer integration.

Parameters:
  • run_id (str) – Unique experiment run identifier.

  • dataset_id (str) – Dataset identifier (may be EXT- prefixed for external datasets).

  • project (str) – HoneyHive project name.

  • run_name (Optional[str]) – Optional human-readable name for the run (used for session naming).

  • source (str) – Source identifier. Default: "evaluation".

  • metadata (Optional[Dict[str, Any]]) – Additional metadata dictionary.

Methods

to_tracer_config(datapoint_id)

Convert context to tracer configuration dictionary for a specific datapoint.

Parameters:

datapoint_id (str) – Datapoint identifier for this execution.

Returns:

Configuration dict for HoneyHiveTracer initialization.

Return type:

Dict[str, Any]

Usage Example

from honeyhive.experiments import ExperimentContext

context = ExperimentContext(
    run_id="run-abc-123",
    dataset_id="EXT-dataset-xyz",
    project="my-project",
    source="ci-pipeline"
)

# Convert to tracer config (requires a datapoint_id)
tracer_config = context.to_tracer_config("dp-1")

# Use with HoneyHiveTracer
from honeyhive import HoneyHiveTracer
tracer = HoneyHiveTracer(**tracer_config, api_key="key")

Best Practices

1. Function Signatures

Your function should accept a datapoint dict and return outputs dict:

def my_function(datapoint: Dict[str, Any]) -> Dict[str, Any]:

    Args:
        datapoint: Contains 'inputs' and optionally 'ground_truth'

    Returns:
        Dict with your outputs (e.g., {"answer": "...", "confidence": 0.9})

    inputs = datapoint["inputs"]
    # Process inputs
    return {"answer": process(inputs)}

2. Error Handling

Let exceptions bubble up - evaluate() catches and logs them:

def my_function(datapoint):
    try:
        result = risky_operation(datapoint["inputs"])
        return {"result": result}
    except SpecificError as e:
        # Log but don't suppress - let evaluate() handle it
        logger.warning(f"Operation failed: {e}")
        raise

3. Parallel Execution

Use max_workers for I/O-bound workloads:

# Good for API calls
result = evaluate(
    function=api_heavy_function,
    dataset=large_dataset,
    evaluators=[...],
    max_workers=10,  # High concurrency for I/O
    api_key="key",
    project="project"
)

# For CPU-bound work, keep lower
result = evaluate(
    function=cpu_intensive_function,
    dataset=dataset,
    max_workers=2,  # Lower for CPU work
    api_key="key",
    project="project"
)

4. Dataset Size Management

For large datasets, use batching:

def run_large_experiment(full_dataset, batch_size=100):
    """Process large dataset in batches."""
    results = []

    for i in range(0, len(full_dataset), batch_size):
        batch = full_dataset[i:i+batch_size]

        result = evaluate(
            function=my_function,
            dataset=batch,
            evaluators=[my_evaluator],
            name=f"experiment-batch-{i//batch_size}",
            api_key="key",
            project="project"
        )

        results.append(result)

    return results

See Also