from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from html import escape
import numpy as np
from ._base import (
_MAX_PREVIEW,
_TimeSeriesBase,
_build_repr_html,
_import_pandas,
)
from .coverage import CoverageBar
from .enums import DataType, Frequency
[docs]
@dataclass(frozen=True)
class Dimension:
name: str
labels: list[datetime] | list[float] | list[str]
[docs]
@dataclass(slots=True, repr=False, eq=False)
class TimeSeriesCube:
frequency: Frequency
timezone: str = "UTC"
name: str | None = None
unit: str | None = None
description: str | None = None
data_type: DataType | None = None
attributes: dict[str, str] = field(default_factory=dict)
dimensions: list[Dimension] = field(default_factory=list)
_values: np.ma.MaskedArray = field(
default_factory=lambda: np.ma.MaskedArray(np.empty(0)), repr=False
)
def __init__(
self,
frequency: Frequency,
*,
timezone: str = "UTC",
name: str | None = None,
unit: str | None = None,
description: str | None = None,
data_type: DataType | None = None,
attributes: dict[str, str] | None = None,
dimensions: list[Dimension] | None = None,
values: np.ndarray | np.ma.MaskedArray,
) -> None:
self.frequency = frequency
self.timezone = timezone
self.name = name
self.unit = unit
self.description = description
self.data_type = data_type
self.attributes = attributes if attributes is not None else {}
self.dimensions = dimensions if dimensions is not None else []
if isinstance(values, np.ma.MaskedArray):
self._values = values
else:
arr = np.asarray(values, dtype=np.float64)
self._values = np.ma.MaskedArray(arr, mask=np.isnan(arr))
expected = tuple(len(d.labels) for d in self.dimensions)
if self._values.shape != expected:
raise ValueError(
f"values shape {self._values.shape} does not match "
f"dimensions {expected}"
)
__hash__ = None
# ---- properties ----------------------------------------------------------
@property
def shape(self) -> tuple[int, ...]:
return self._values.shape
@property
def ndim(self) -> int:
return self._values.ndim
@property
def dim_names(self) -> tuple[str, ...]:
return tuple(d.name for d in self.dimensions)
@property
def coords(self) -> dict[str, list]:
return {d.name: list(d.labels) for d in self.dimensions}
@property
def primary_time_dim(self) -> Dimension:
# Prefer "valid_time"
for d in self.dimensions:
if d.name == "valid_time":
return d
# Fall back to first datetime-labelled dimension
for d in self.dimensions:
if d.labels and isinstance(d.labels[0], datetime):
return d
# Fall back to first dimension
return self.dimensions[0]
@property
def begin(self) -> datetime | float | str | None:
ptd = self.primary_time_dim
return ptd.labels[0] if ptd.labels else None
@property
def end(self) -> datetime | float | str | None:
ptd = self.primary_time_dim
return ptd.labels[-1] if ptd.labels else None
@property
def has_missing(self) -> bool:
return bool(self._values.mask.any()) if self._values.size else False
# ---- internal helpers ----------------------------------------------------
def _get_dim(self, name: str) -> Dimension:
for d in self.dimensions:
if d.name == name:
return d
raise KeyError(f"Dimension {name!r} not found. Available: {self.dim_names}")
def _dim_index(self, name: str) -> int:
for i, d in enumerate(self.dimensions):
if d.name == name:
return i
raise KeyError(f"Dimension {name!r} not found. Available: {self.dim_names}")
def _meta_kwargs(self) -> dict:
return dict(
name=self.name,
unit=self.unit,
description=self.description,
data_type=self.data_type,
attributes=self.attributes,
)
# ---- sel / isel ----------------------------------------------------------
[docs]
def sel(self, **kwargs) -> TimeSeriesCube | "TimeSeriesTable" | "TimeSeries":
remaining_dims = list(self.dimensions)
values = self._values
for dim_name, selector in kwargs.items():
axis = next(
i for i, d in enumerate(remaining_dims) if d.name == dim_name
)
dim = remaining_dims[axis]
if isinstance(selector, slice):
labels = list(dim.labels)
start_idx = 0 if selector.start is None else labels.index(selector.start)
stop_idx = len(labels) if selector.stop is None else labels.index(selector.stop) + 1
slc = slice(start_idx, stop_idx)
values = values[(slice(None),) * axis + (slc,)]
remaining_dims[axis] = Dimension(dim.name, labels[slc])
else:
try:
idx = list(dim.labels).index(selector)
except ValueError:
raise KeyError(
f"Label {selector!r} not found in dimension {dim_name!r}"
) from None
values = np.take(values, idx, axis=axis)
remaining_dims.pop(axis)
return self._maybe_collapse(values, remaining_dims)
[docs]
def isel(self, **kwargs) -> TimeSeriesCube | "TimeSeriesTable" | "TimeSeries":
remaining_dims = list(self.dimensions)
values = self._values
for dim_name, selector in kwargs.items():
axis = next(
i for i, d in enumerate(remaining_dims) if d.name == dim_name
)
dim = remaining_dims[axis]
if isinstance(selector, slice):
values = values[(slice(None),) * axis + (selector,)]
remaining_dims[axis] = Dimension(dim.name, list(dim.labels)[selector])
else:
values = np.take(values, selector, axis=axis)
remaining_dims.pop(axis)
return self._maybe_collapse(values, remaining_dims)
def _maybe_collapse(self, values, remaining_dims):
ndim = values.ndim if hasattr(values, 'ndim') else 0
if ndim == 0:
raise ValueError("Selection collapsed all dimensions; scalar result.")
if ndim >= 3:
return TimeSeriesCube(
self.frequency,
timezone=self.timezone,
dimensions=remaining_dims,
values=values,
**self._meta_kwargs(),
)
# Deferred imports to avoid circular imports
from .table import TimeSeriesTable
from .timeseries import TimeSeries
filled = np.ma.filled(values, fill_value=np.nan)
if ndim == 2:
# Find which dimension has datetime labels for timestamps
time_axis = None
for i, d in enumerate(remaining_dims):
if d.labels and isinstance(d.labels[0], datetime):
time_axis = i
break
if time_axis is None:
raise ValueError(
"Cannot collapse to TimeSeriesTable: no dimension "
"has datetime labels."
)
# Transpose so time is axis 0
if time_axis != 0:
filled = filled.T
remaining_dims = [remaining_dims[1], remaining_dims[0]]
timestamps = list(remaining_dims[0].labels)
col_dim = remaining_dims[1]
col_names = [str(lbl) for lbl in col_dim.labels]
return TimeSeriesTable(
self.frequency,
timezone=self.timezone,
timestamps=timestamps,
values=filled,
names=col_names,
)
# ndim == 1
dim0 = remaining_dims[0]
if not (dim0.labels and isinstance(dim0.labels[0], datetime)):
raise ValueError(
f"Cannot collapse to TimeSeries: dimension "
f"{dim0.name!r} labels are not datetimes."
)
timestamps = list(dim0.labels)
values_list = _TimeSeriesBase._from_float_array(filled)
return TimeSeries(
self.frequency,
timezone=self.timezone,
timestamps=timestamps,
values=values_list,
**self._meta_kwargs(),
)
# ---- conversion methods --------------------------------------------------
[docs]
def to_timeseries(self, **sel_kwargs) -> "TimeSeries":
from .timeseries import TimeSeries
if sel_kwargs:
result = self.sel(**sel_kwargs)
else:
result = self._maybe_collapse(self._values, list(self.dimensions))
if not isinstance(result, TimeSeries):
raise ValueError(
f"Selection did not collapse to TimeSeries, got {type(result).__name__}"
)
return result
[docs]
def to_table(self, **sel_kwargs) -> "TimeSeriesTable":
from .table import TimeSeriesTable
if sel_kwargs:
result = self.sel(**sel_kwargs)
else:
result = self._maybe_collapse(self._values, list(self.dimensions))
if not isinstance(result, TimeSeriesTable):
raise ValueError(
f"Selection did not collapse to TimeSeriesTable, got {type(result).__name__}"
)
return result
[docs]
def to_numpy(self) -> np.ma.MaskedArray:
return self._values.copy()
[docs]
def to_pandas_dataframe(self) -> "pd.DataFrame":
pd = _import_pandas()
dim_labels = [list(d.labels) for d in self.dimensions]
dim_names = [d.name for d in self.dimensions]
index = pd.MultiIndex.from_product(dim_labels, names=dim_names)
flat = np.ma.filled(self._values, fill_value=np.nan).ravel()
col_name = self.name or "value"
return pd.DataFrame({col_name: flat}, index=index)
# ---- class method constructors -------------------------------------------
[docs]
@classmethod
def from_numpy(
cls,
dimensions: list[Dimension],
values: np.ndarray | np.ma.MaskedArray,
frequency: Frequency,
*,
timezone: str = "UTC",
name: str | None = None,
unit: str | None = None,
description: str | None = None,
data_type: DataType | None = None,
attributes: dict[str, str] | None = None,
) -> TimeSeriesCube:
return cls(
frequency,
timezone=timezone,
name=name,
unit=unit,
description=description,
data_type=data_type,
attributes=attributes,
dimensions=dimensions,
values=values,
)
[docs]
@classmethod
def from_timeseries_list(
cls,
series: list,
dimension: Dimension,
*,
frequency: Frequency | None = None,
timezone: str | None = None,
name: str | None = None,
unit: str | None = None,
description: str | None = None,
data_type: DataType | None = None,
attributes: dict[str, str] | None = None,
) -> TimeSeriesCube:
if not series:
raise ValueError("Cannot build cube from an empty list of TimeSeries.")
if len(dimension.labels) != len(series):
raise ValueError(
f"dimension has {len(dimension.labels)} labels but "
f"{len(series)} series were provided."
)
# Compute sorted union of all timestamps
all_ts: set[datetime] = set()
for s in series:
all_ts.update(s.timestamps)
union_ts = sorted(all_ts)
ts_index = {t: i for i, t in enumerate(union_ts)}
n_series = len(series)
n_timestamps = len(union_ts)
data = np.full((n_series, n_timestamps), np.nan, dtype=np.float64)
for row, s in enumerate(series):
for t, v in zip(s.timestamps, s.values):
col = ts_index[t]
data[row, col] = v if v is not None else np.nan
mask = np.isnan(data)
values = np.ma.MaskedArray(data, mask=mask)
ref = series[0]
time_dim = Dimension("valid_time", union_ts)
return cls(
frequency=frequency or ref.frequency,
timezone=timezone or ref.timezone,
name=name or ref.name,
unit=unit or ref.unit,
description=description or ref.description,
data_type=data_type or ref.data_type,
attributes=attributes or ref.attributes,
dimensions=[dimension, time_dim],
values=values,
)
# ---- repr ----------------------------------------------------------------
def __repr__(self) -> str:
class_name = type(self).__name__
label_w = 18
# Dimensions line
dim_parts = [f"{d.name}: {len(d.labels)}" for d in self.dimensions]
dim_str = ", ".join(dim_parts)
meta_lines: list[str] = []
meta_lines.append(f"{'Dimensions:':<{label_w}}{dim_str}")
meta_lines.append(f"{'Shape:':<{label_w}}{self.shape}")
meta_lines.append(f"{'Frequency:':<{label_w}}{self.frequency}")
meta_lines.append(f"{'Timezone:':<{label_w}}{self.timezone}")
if self.name:
meta_lines.append(f"{'Name:':<{label_w}}{self.name}")
if self.unit:
meta_lines.append(f"{'Unit:':<{label_w}}{self.unit}")
if self.data_type:
meta_lines.append(f"{'Data type:':<{label_w}}{self.data_type}")
total = self._values.size
if total > 0:
n_masked = int(self._values.mask.sum()) if self._values.mask.any() else 0
if n_masked > 0:
pct = n_masked / total * 100
meta_lines.append(
f"{'Masked:':<{label_w}}{n_masked}/{total} ({pct:.1f}%)"
)
# Box drawing
padding = 2
max_w = max(len(line) for line in meta_lines)
box_inner = max_w + padding * 2
lines: list[str] = [class_name]
lines.append("\u250c" + "\u2500" * box_inner + "\u2510")
for line in meta_lines:
lines.append(
"\u2502" + " " * padding + line.ljust(max_w) + " " * padding + "\u2502"
)
lines.append("\u2514" + "\u2500" * box_inner + "\u2518")
return "\n".join(lines)
def _repr_html_(self) -> str:
n_dims = self.ndim
meta_rows: list[tuple[str, str]] = []
dim_parts = [f"{d.name}: {len(d.labels)}" for d in self.dimensions]
meta_rows.append(("Dimensions", ", ".join(dim_parts)))
meta_rows.append(("Shape", str(self.shape)))
meta_rows.append(("Frequency", escape(str(self.frequency))))
meta_rows.append(("Timezone", escape(self.timezone)))
if self.name:
meta_rows.append(("Name", escape(self.name)))
if self.unit:
meta_rows.append(("Unit", escape(self.unit)))
# Build a 2D preview slice: first two dims, index 0 on remaining
if n_dims >= 2:
# Slice down to 2D
slice_vals = self._values
slice_dims = list(self.dimensions)
while len(slice_dims) > 2:
slice_vals = np.take(slice_vals, 0, axis=len(slice_dims) - 1)
slice_dims = slice_dims[:-1]
dim0 = slice_dims[0]
dim1 = slice_dims[1]
n_rows = len(dim0.labels)
col_names = tuple(str(lbl) for lbl in dim1.labels)
def _html_row(i: int) -> str:
ts_cell = f"<td>{escape(str(dim0.labels[i]))}</td>"
val_cells = "".join(
f"<td>{escape(_TimeSeriesBase._fmt_value(float(v)))}</td>"
for v in np.ma.filled(slice_vals[i], fill_value=np.nan)
)
return f"<tr>{ts_cell}{val_cells}</tr>"
return _build_repr_html(
class_name=type(self).__name__,
meta_rows=meta_rows,
index_names=(dim0.name,),
column_names=col_names,
n_rows=n_rows,
html_row_fn=_html_row,
)
elif n_dims == 1:
dim0 = self.dimensions[0]
n_rows = len(dim0.labels)
col_name = self.name or "value"
def _html_row_1d(i: int) -> str:
ts_cell = f"<td>{escape(str(dim0.labels[i]))}</td>"
v = float(np.ma.filled(self._values[i], fill_value=np.nan))
val_cell = f"<td>{escape(_TimeSeriesBase._fmt_value(v))}</td>"
return f"<tr>{ts_cell}{val_cell}</tr>"
return _build_repr_html(
class_name=type(self).__name__,
meta_rows=meta_rows,
index_names=(dim0.name,),
column_names=(col_name,),
n_rows=n_rows,
html_row_fn=_html_row_1d,
)
else:
return _build_repr_html(
class_name=type(self).__name__,
meta_rows=meta_rows,
index_names=(),
column_names=(),
n_rows=0,
html_row_fn=lambda i: "",
)
[docs]
def coverage_bar(self) -> CoverageBar:
ptd = self.primary_time_dim
ptd_axis = self._dim_index(ptd.name)
if self.ndim == 1:
filled = np.ma.filled(self._values, fill_value=np.nan)
mask = [not np.isnan(v) for v in filled]
masks = [(self.name or "value", mask)]
else:
# Use the first non-time dimension for rows
other_axis = 1 if ptd_axis == 0 else 0
other_dim = self.dimensions[other_axis]
# Collapse remaining dims by taking index 0
vals = self._values
dims_to_remove = []
for i in range(self.ndim - 1, -1, -1):
if i != ptd_axis and i != other_axis:
vals = np.take(vals, 0, axis=i)
dims_to_remove.append(i)
masks = []
for j, label in enumerate(other_dim.labels):
if other_axis < ptd_axis:
row = np.take(vals, j, axis=0 if other_axis == 0 else other_axis)
else:
row = np.take(vals, j, axis=other_axis - len(dims_to_remove))
filled = np.ma.filled(row, fill_value=np.nan)
mask = [not np.isnan(float(v)) for v in filled]
masks.append((str(label), mask))
begin = ptd.labels[0] if ptd.labels and isinstance(ptd.labels[0], datetime) else None
end = ptd.labels[-1] if ptd.labels and isinstance(ptd.labels[-1], datetime) else None
return CoverageBar(masks, begin, end)
# ---- equality ------------------------------------------------------------
[docs]
def equals(self, other: object) -> bool:
if not isinstance(other, TimeSeriesCube):
return NotImplemented
if (
self.frequency != other.frequency
or self.timezone != other.timezone
or self.name != other.name
or self.unit != other.unit
or self.description != other.description
or self.data_type != other.data_type
or self.attributes != other.attributes
or len(self.dimensions) != len(other.dimensions)
):
return False
for d1, d2 in zip(self.dimensions, other.dimensions):
if d1.name != d2.name or list(d1.labels) != list(d2.labels):
return False
return bool(
np.array_equal(
np.ma.filled(self._values, fill_value=np.nan),
np.ma.filled(other._values, fill_value=np.nan),
equal_nan=True,
)
)
def __eq__(self, other: object) -> bool:
if not isinstance(other, TimeSeriesCube):
return NotImplemented
return self.equals(other)
NDTimeSeries = TimeSeriesCube