"""Centralized display / repr logic for all timedatamodel data classes.
This module contains:
- Module-level formatting helpers (moved from _base.py)
- CSS infrastructure for HTML reprs
- CoverageBar class (moved from coverage.py)
- HierarchyTree class (moved from hierarchy.py)
- DataPoint repr functions (standalone, monkey-patched)
- Mixin classes providing __repr__ / _repr_html_ for each data class
"""
from __future__ import annotations
from datetime import datetime
from html import escape
from itertools import product
from typing import TYPE_CHECKING, Callable
import numpy as np
from ._theme import THEME, get_theme_version
from .location import GeoArea, GeoLocation, Location
if TYPE_CHECKING:
from .hierarchy import HierarchyNode
# ---------------------------------------------------------------------------
# Module-level state
# ---------------------------------------------------------------------------
_default_repr_width: int | None = None # None = no limit
def set_repr_width(width: int | None) -> None:
"""Set max repr box width. None = no limit."""
global _default_repr_width
if width is not None and width < 10:
raise ValueError("repr width must be at least 10 or None")
_default_repr_width = width
def get_repr_width() -> int | None:
"""Return current max repr width (None = no limit)."""
return _default_repr_width
_MAX_PREVIEW = 3 # rows shown at head/tail in repr
_MAX_COL_PREVIEW = 4 # leaf columns shown at head/tail in array repr
# ---------------------------------------------------------------------------
# Pure formatting helpers
# ---------------------------------------------------------------------------
def _truncate(s: str, max_len: int) -> str:
"""Return s[:max_len-3] + '...' if too long, else s."""
if len(s) <= max_len:
return s
return s[: max_len - 3] + "..."
def _fmt_short_date(dt: datetime) -> str:
"""Format a datetime concisely: always include time, omit tzinfo."""
dt_naive = dt.replace(tzinfo=None)
return dt_naive.strftime("%Y-%m-%d %H:%M")
def _fmt_timestamp(ts: datetime | tuple[datetime, ...]) -> str:
"""Format a single or multi-index timestamp for terminal repr."""
if isinstance(ts, tuple):
return ", ".join(_fmt_short_date(t) for t in ts)
return _fmt_short_date(ts)
def _fmt_timestamp_cells(ts: datetime | tuple[datetime, ...]) -> str:
"""Format a single or multi-index timestamp as HTML ``<td>`` cells."""
if isinstance(ts, tuple):
return "".join(f"<td>{escape(_fmt_short_date(t))}</td>" for t in ts)
return f"<td>{escape(_fmt_short_date(ts))}</td>"
def _format_meta_lines(pairs: list[tuple[str, str]], label_w: int = 18) -> list[str]:
"""Convert (label, value) pairs to formatted terminal meta lines."""
return [f"{label + ':':<{label_w}}{value}" for label, value in pairs]
def _fmt_tz_with_offset(tz_str: str, timestamps: list) -> str:
"""Return e.g. ``'UTC (+00:00)'`` when the first timestamp is tz-aware."""
if timestamps:
first = timestamps[0]
if isinstance(first, tuple):
first = first[0]
if hasattr(first, 'utcoffset') and first.utcoffset() is not None:
offset = first.utcoffset()
total_seconds = int(offset.total_seconds())
sign = '+' if total_seconds >= 0 else '-'
hours, remainder = divmod(abs(total_seconds), 3600)
minutes = remainder // 60
return f"{tz_str} ({sign}{hours:02d}:{minutes:02d})"
return tz_str
def _fmt_value(v: float | None) -> str:
if v is None or (isinstance(v, float) and np.isnan(v)):
return "NaN"
if v == int(v):
return f"{v:.1f}"
return f"{v:g}"
def _fmt_location(loc: GeoLocation | GeoArea | None) -> str:
if loc is None:
return ""
if isinstance(loc, GeoLocation):
return f"{loc.latitude}\u00b0N, {loc.longitude}\u00b0E"
name = loc.name or "unnamed"
c = loc.centroid
return f"{name} (centroid {c.latitude}\u00b0N, {c.longitude}\u00b0E)"
# ---------------------------------------------------------------------------
# CSS infrastructure
# ---------------------------------------------------------------------------
_css_cache: str | None = None
_css_cache_version: int = -1
def _repr_css() -> str:
lt = THEME["light"]
dk = THEME["dark"]
return f"""\
<style>
.ts-repr {{ font-family: monospace; font-size: 13px; max-width: 640px; display: inline-grid; }}
.ts-repr .ts-header {{
font-weight: bold; font-size: 14px;
padding: 6px 10px; border-bottom: 2px solid {lt["header_border"]};
background: {lt["header_bg"]}; color: {lt["header_text"]};
}}
.ts-repr .ts-meta {{ padding: 6px 10px; background: {lt["meta_bg"]}; overflow: hidden; min-width: 0; }}
.ts-repr .ts-meta table {{ border-collapse: collapse; width: 100%; table-layout: fixed; }}
.ts-repr .ts-meta td {{ padding: 1px 8px 1px 0; white-space: nowrap; }}
.ts-repr .ts-meta td:first-child {{ color: {lt["meta_label"]}; font-weight: 600; width: 90px; }}
.ts-repr .ts-meta td:last-child {{ color: {lt["meta_value"]}; overflow: hidden; text-overflow: ellipsis; }}
.ts-repr .ts-data {{ padding: 6px 10px; }}
.ts-repr .ts-data table {{
border-collapse: collapse; text-align: right;
}}
.ts-repr .ts-data th {{
text-align: right; padding: 3px 10px; border-bottom: 1px solid {lt["col_header_border"]};
color: {lt["col_header_text"]}; font-weight: 600;
}}
.ts-repr .ts-data th.ts-idx {{ text-align: left; }}
.ts-repr .ts-data td {{ padding: 2px 10px; }}
.ts-repr .ts-data tr:hover {{ background: {lt["hover_bg"]}; }}
.ts-repr .ts-data td:first-child {{ text-align: left; color: {lt["index_text"]}; }}
.ts-repr .ts-data td.ts-idx {{ text-align: left; color: {lt["index_text"]}; }}
.ts-repr .ts-ellipsis {{ text-align: center !important; color: {lt["ellipsis"]}; }}
@media (prefers-color-scheme: dark) {{
.ts-repr .ts-header {{ background: {dk["header_bg"]}; color: {dk["header_text"]}; border-color: {dk["header_border"]}; }}
.ts-repr .ts-meta {{ background: {dk["meta_bg"]}; }}
.ts-repr .ts-meta td:first-child {{ color: {dk["meta_label"]}; }}
.ts-repr .ts-meta td:last-child {{ color: {dk["meta_value"]}; }}
.ts-repr .ts-data th {{ color: {dk["col_header_text"]}; border-color: {dk["col_header_border"]}; }}
.ts-repr .ts-data td {{ color: {dk["data_text"]}; }}
.ts-repr .ts-data td:first-child {{ color: {dk["index_text"]}; }}
.ts-repr .ts-data td.ts-idx {{ color: {dk["index_text"]}; }}
.ts-repr .ts-data tr:hover {{ background: {dk["hover_bg"]}; }}
.ts-repr .ts-ellipsis {{ color: {dk["ellipsis"]}; }}
}}
</style>"""
def _get_repr_css() -> str:
"""Return the CSS string, regenerating only when the theme version changes."""
global _css_cache, _css_cache_version
version = get_theme_version()
if _css_cache is not None and _css_cache_version == version:
return _css_cache
_css_cache = _repr_css()
_css_cache_version = version
return _css_cache
# ---------------------------------------------------------------------------
# Builders
# ---------------------------------------------------------------------------
def _build_repr_html(
class_name: str,
meta_rows: list[tuple[str, str]],
index_names: tuple[str, ...],
column_names: tuple[str, ...],
n_rows: int,
html_row_fn: Callable[[int], str],
max_preview: int = _MAX_PREVIEW,
) -> str:
"""Build a shared HTML repr for TimeSeriesList and TimeSeriesTable."""
total_cols = len(index_names) + len(column_names)
html = [_get_repr_css(), '<div class="ts-repr">']
html.append(f'<div class="ts-header">{escape(class_name)}</div>')
# Meta section
html.append('<div class="ts-meta"><table>')
for label, value in meta_rows:
html.append(f"<tr><td>{escape(label)}</td><td>{escape(value)}</td></tr>")
html.append("</table></div>")
# Data section
html.append('<div class="ts-data"><table>')
header_cells = "".join(f'<th class="ts-idx">{escape(c)}</th>' for c in index_names)
header_cells += "".join(f"<th>{escape(c)}</th>" for c in column_names)
html.append(f"<tr>{header_cells}</tr>")
if n_rows == 0:
html.append(
f'<tr><td colspan="{total_cols}" class="ts-ellipsis">'
f"(empty)</td></tr>"
)
else:
show_all = n_rows <= max_preview * 2 + 1
head_rows = range(min(max_preview, n_rows))
tail_rows = (
range(max(n_rows - max_preview, max_preview), n_rows)
if not show_all
else range(0)
)
for i in head_rows:
html.append(html_row_fn(i))
if not show_all:
ellipsis_cells = "".join(
f'<td class="ts-ellipsis">…</td>'
for _ in range(total_cols)
)
html.append(f"<tr>{ellipsis_cells}</tr>")
for i in tail_rows:
html.append(html_row_fn(i))
html.append("</table></div>")
html.append("</div>")
return "\n".join(html)
def _render_box(
class_name: str,
content_lines: list[str | None],
padding: int = 2,
max_width: int | None = None,
) -> str:
"""Render a Unicode box around content lines.
``None`` entries in *content_lines* are drawn as horizontal separators.
*max_width* caps the total box width (border + padding + content).
Defaults to the global ``get_repr_width()`` setting when ``None``.
"""
if max_width is None:
max_width = get_repr_width()
max_w = max((len(l) for l in content_lines if l is not None), default=0)
# Cap content width when a max_width is active
# Total width = 2 (borders) + 2*padding + content
if max_width is not None:
max_content = max_width - 2 * padding - 2
if max_content < 1:
max_content = 1
max_w = min(max_w, max_content)
box_inner = max_w + padding * 2
top = "\u250c" + "\u2500" * box_inner + "\u2510"
bot = "\u2514" + "\u2500" * box_inner + "\u2518"
sep = "\u251c" + "\u2500" * box_inner + "\u2524"
lines = [class_name, top]
for cl in content_lines:
if cl is None:
lines.append(sep)
else:
if max_width is not None and len(cl) > max_w:
cl = _truncate(cl, max_w)
lines.append(
"\u2502" + " " * padding + cl.ljust(max_w) + " " * padding + "\u2502"
)
lines.append(bot)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# CoverageBar (moved from coverage.py)
# ---------------------------------------------------------------------------
class CoverageBar:
"""Displayable coverage bar for TimeSeriesList objects."""
_TERM_BINS = 60
_SVG_BINS = 60
def __init__(
self,
masks: list[tuple[str, list[bool]]],
begin: datetime | None,
end: datetime | None,
) -> None:
self._masks = masks
self._begin = begin
self._end = end
@staticmethod
def _bin_coverage(mask: list[bool], n_bins: int) -> list[bool]:
n = len(mask)
if n == 0:
return [False] * n_bins
actual_bins = min(n_bins, n)
bins: list[bool] = []
for i in range(actual_bins):
lo = i * n // actual_bins
hi = (i + 1) * n // actual_bins
bins.append(any(mask[lo:hi]))
return bins
def __repr__(self) -> str:
if not self._masks:
return ""
n_bins = self._TERM_BINS
label_w = max(len(name) for name, _ in self._masks) + 2
lines: list[str] = []
for name, mask in self._masks:
binned = self._bin_coverage(mask, n_bins)
bar = "".join("\u2588" if b else "\u2591" for b in binned)
lines.append(f"{name:<{label_w}}{bar}")
bar_len = len(self._bin_coverage(self._masks[0][1], n_bins))
start_str = _fmt_short_date(self._begin) if self._begin else ""
end_str = _fmt_short_date(self._end) if self._end else ""
gap = bar_len - len(start_str) - len(end_str)
if gap < 2:
gap = 2
date_line = start_str + " " * gap + end_str
lines.append(f"{'':<{label_w}}{date_line}")
return "\n".join(lines)
def _repr_html_(self) -> str:
if not self._masks:
return ""
n_bins = self._SVG_BINS
# Use actual bin count (may be less than n_bins for short series)
max_mask_len = max(len(m) for _, m in self._masks) if self._masks else 0
actual_bins = min(n_bins, max_mask_len) if max_mask_len > 0 else n_bins
label_w = 120 # px reserved for labels
bar_w = 480 # px for the bar area
row_h = 22
n_rows = len(self._masks)
date_h = 18
total_h = n_rows * row_h + date_h + 4
parts: list[str] = []
parts.append(
f'<svg xmlns="http://www.w3.org/2000/svg" '
f'viewBox="0 0 {label_w + bar_w} {total_h}" '
f'width="100%" style="max-width:{label_w + bar_w}px;'
f'font-family:monospace;font-size:12px;">'
)
lt = THEME["light"]
for row_idx, (name, mask) in enumerate(self._masks):
y = row_idx * row_h
# label
parts.append(
f'<text x="{label_w - 6}" y="{y + 15}" '
f'text-anchor="end" fill="{lt["coverage_label"]}">{escape(name)}</text>'
)
# bar segments
binned = self._bin_coverage(mask, n_bins)
seg_w = bar_w / len(binned) if binned else bar_w
for i, b in enumerate(binned):
color = lt["coverage_present"] if b else lt["coverage_absent"]
x = label_w + i * seg_w
parts.append(
f'<rect x="{x:.1f}" y="{y + 2}" '
f'width="{seg_w:.2f}" height="{row_h - 4}" '
f'fill="{color}" />'
)
# date labels
date_y = n_rows * row_h + date_h
if self._begin:
parts.append(
f'<text x="{label_w}" y="{date_y}" '
f'text-anchor="start" fill="{lt["coverage_date"]}">'
f'{escape(_fmt_short_date(self._begin))}</text>'
)
if self._end:
parts.append(
f'<text x="{label_w + bar_w}" y="{date_y}" '
f'text-anchor="end" fill="{lt["coverage_date"]}">'
f'{escape(_fmt_short_date(self._end))}</text>'
)
parts.append("</svg>")
return "\n".join(parts)
# ---------------------------------------------------------------------------
# HierarchyTree (moved from hierarchy.py)
# ---------------------------------------------------------------------------
[docs]
class HierarchyTree:
"""Displayable tree visualization for a HierarchicalTimeSeries."""
__slots__ = ("_root",)
def __init__(self, root: HierarchyNode) -> None:
self._root = root
def __repr__(self) -> str:
lines: list[str] = []
self._build_tree(self._root, "", True, lines)
return "\n".join(lines)
@staticmethod
def _build_tree(
node: HierarchyNode,
prefix: str,
is_last: bool,
lines: list[str],
) -> None:
connector = "\u2514\u2500\u2500 " if is_last else "\u251c\u2500\u2500 "
label = node.key
if node.is_leaf and node.timeseries is not None:
label += f" [{len(node.timeseries)} pts]"
elif not node.is_leaf:
label += f" ({node.level})"
lines.append(f"{prefix}{connector}{label}")
new_prefix = prefix + (" " if is_last else "\u2502 ")
for i, child in enumerate(node.children):
HierarchyTree._build_tree(
child, new_prefix, i == len(node.children) - 1, lines
)
def _repr_html_(self) -> str:
css = """\
<style>
.tsh-tree { font-family: monospace; font-size: 13px; }
.tsh-tree details { margin-left: 16px; }
.tsh-tree summary { cursor: pointer; padding: 1px 0; }
.tsh-tree .tsh-leaf { margin-left: 16px; padding: 1px 0; }
</style>"""
return css + '\n<div class="tsh-tree">\n' + self._html_node(self._root) + "</div>"
@staticmethod
def _html_node(node: HierarchyNode) -> str:
if node.is_leaf:
label = escape(node.key)
if node.timeseries is not None:
label += f" [{len(node.timeseries)} pts]"
return f'<div class="tsh-leaf">{label}</div>\n'
label = f"{escape(node.key)} ({escape(node.level)})"
children_html = "".join(
HierarchyTree._html_node(c) for c in node.children
)
return (
f"<details open>"
f"<summary>{label}</summary>\n"
f"{children_html}"
f"</details>\n"
)
# ---------------------------------------------------------------------------
# DataPoint repr functions (standalone, monkey-patched onto NamedTuple)
# ---------------------------------------------------------------------------
def _datapoint_repr(self) -> str:
meta_lines: list[str] = []
ts_str = _fmt_short_date(self.timestamp)
meta_lines.append(f"Timestamp: {ts_str}")
if hasattr(self.timestamp, "utcoffset") and self.timestamp.utcoffset() is not None:
tz_str = str(self.timestamp.tzinfo)
tz_display = _fmt_tz_with_offset(tz_str, [self.timestamp])
meta_lines.append(f"Timezone: {tz_display}")
meta_lines.append(f"Value: {_fmt_value(self.value)}")
return _render_box("DataPoint", meta_lines)
def _datapoint_repr_html(self) -> str:
ts_str = escape(_fmt_short_date(self.timestamp))
val_str = escape(_fmt_value(self.value))
meta_rows = [("Timestamp", ts_str)]
if hasattr(self.timestamp, "utcoffset") and self.timestamp.utcoffset() is not None:
tz_str = str(self.timestamp.tzinfo)
tz_display = escape(_fmt_tz_with_offset(tz_str, [self.timestamp]))
meta_rows.append(("Timezone", tz_display))
meta_rows.append(("Value", val_str))
html = [_get_repr_css(), '<div class="ts-repr">']
html.append('<div class="ts-header">DataPoint</div>')
html.append('<div class="ts-meta"><table>')
for label, value in meta_rows:
html.append(f"<tr><td>{label}</td><td>{value}</td></tr>")
html.append("</table></div>")
html.append("</div>")
return "\n".join(html)
# ---------------------------------------------------------------------------
# Mixin: _TimeSeriesBaseReprMixin
# ---------------------------------------------------------------------------
class _TimeSeriesBaseReprMixin:
"""Repr mixin for _TimeSeriesBase (shared by List and Table)."""
__slots__ = ()
def _repr_meta_lines(self) -> list[str]:
"""Build terminal meta lines from ``_repr_meta_pairs()``."""
return _format_meta_lines(self._repr_meta_pairs())
def __repr__(self) -> str:
class_name = type(self).__name__
meta_lines = self._repr_meta_lines()
n = len(self._timestamps)
# Compute preview row indices
if n == 0:
indices: list[int] = []
truncated = False
elif n <= _MAX_PREVIEW * 2 + 1:
indices = list(range(n))
truncated = False
else:
indices = list(range(_MAX_PREVIEW)) + list(
range(n - _MAX_PREVIEW, n)
)
truncated = True
data_rows = self._repr_data_rows(indices) if indices else []
col_names = list(self.column_names)
show_col_header = len(col_names) > 1
# Build all content lines (without box chars)
content_lines: list[str] = []
# Meta section
for ml in meta_lines:
content_lines.append(ml)
# Data section
if n == 0:
content_lines.append(None) # separator marker
content_lines.append("(empty)")
else:
# Compute column widths for data rows
all_rows: list[list[str]] = []
if show_col_header:
all_rows.append([""] + col_names)
all_rows.extend(data_rows)
ncols_data = len(all_rows[0]) if all_rows else 0
col_widths = [0] * ncols_data
for row in all_rows:
for j, cell in enumerate(row):
col_widths[j] = max(col_widths[j], len(cell))
# Also account for ellipsis row
if truncated:
for j in range(ncols_data):
col_widths[j] = max(col_widths[j], 3)
def _format_row(row: list[str]) -> str:
parts: list[str] = []
for j, cell in enumerate(row):
if j == 0:
parts.append(f"{cell:<{col_widths[j]}}")
else:
parts.append(f"{cell:>{col_widths[j]}}")
return " ".join(parts)
content_lines.append(None) # separator marker
if show_col_header:
content_lines.append(_format_row([""] + col_names))
head_data = data_rows[:_MAX_PREVIEW] if truncated else data_rows
tail_data = data_rows[_MAX_PREVIEW:] if truncated else []
for row in head_data:
content_lines.append(_format_row(row))
if truncated:
ellipsis_row = ["..."] * ncols_data
content_lines.append(_format_row(ellipsis_row))
for row in tail_data:
content_lines.append(_format_row(row))
return _render_box(class_name, content_lines)
@staticmethod
def _fmt_value(v: float | None) -> str:
return _fmt_value(v)
@staticmethod
def _fmt_location(loc: GeoLocation | GeoArea | None) -> str:
return _fmt_location(loc)
# ---------------------------------------------------------------------------
# Mixin: _TimeSeriesListReprMixin
# ---------------------------------------------------------------------------
class _TimeSeriesListReprMixin:
"""Repr mixin for TimeSeriesList."""
__slots__ = ()
def _repr_meta_pairs(self) -> list[tuple[str, str]]:
pairs: list[tuple[str, str]] = [
("Name", self.name or "unnamed"),
("Length", str(len(self._timestamps))),
("Frequency", str(self.frequency)),
("Timezone", _fmt_tz_with_offset(self.timezone, self._timestamps)),
]
if self.unit:
pairs.append(("Unit", self.unit))
if self.data_type:
pairs.append(("Data type", str(self.data_type)))
if self.location:
pairs.append(("Location", _fmt_location(self.location)))
if self.description:
pairs.append(("Description", self.description))
if self.timeseries_type and self.timeseries_type != "FLAT":
pairs.append(("Timeseries type", str(self.timeseries_type)))
if self.labels:
pairs.append(("Labels", str(self.labels)))
return pairs
def _repr_data_rows(self, indices: list[int]) -> list[list[str]]:
rows: list[list[str]] = []
for i in indices:
rows.append([_fmt_timestamp(self._timestamps[i]), _fmt_value(self._values[i])])
return rows
def _repr_html_(self) -> str:
n = len(self._timestamps)
meta_rows = self._repr_meta_pairs()
def _html_row(i: int) -> str:
ts_cells = _fmt_timestamp_cells(self._timestamps[i])
val_cells = (
f"<td>{escape(_fmt_value(self._values[i]))}</td>"
)
return f"<tr>{ts_cells}{val_cells}</tr>"
return _build_repr_html(
class_name=type(self).__name__,
meta_rows=meta_rows,
index_names=self.index_names,
column_names=self.column_names,
n_rows=n,
html_row_fn=_html_row,
)
def _coverage_masks(self) -> list[tuple[str, list[bool]]]:
return [(self.name or "value", [v is not None for v in self._values])]
def coverage_bar(self) -> CoverageBar:
"""Return a displayable coverage bar."""
return CoverageBar(self._coverage_masks(), self.begin, self.end)
# ---------------------------------------------------------------------------
# Mixin: _TimeSeriesTableReprMixin
# ---------------------------------------------------------------------------
class _TimeSeriesTableReprMixin:
"""Repr mixin for TimeSeriesTable."""
__slots__ = ()
def _repr_meta_pairs(self) -> list[tuple[str, str]]:
cn = self.column_names
pairs: list[tuple[str, str]] = [
("Name", "unnamed"),
("Columns", ", ".join(cn)),
("Length", f"{len(self._timestamps)} \u00d7 {self.n_columns}"),
("Frequency", str(self.frequency)),
("Timezone", _fmt_tz_with_offset(self.timezone, self._timestamps)),
]
# Unit — show if any is set
unit_vals = [self._get_attr(self.units, i) for i in range(self.n_columns)]
if any(u is not None for u in unit_vals):
pairs.append(("Unit", ", ".join(str(u) if u else "-" for u in unit_vals)))
# Data type — show if any is set
dt_vals = [self._get_attr(self.data_types, i) for i in range(self.n_columns)]
if any(d is not None for d in dt_vals):
pairs.append(("Data type", ", ".join(str(d) if d else "-" for d in dt_vals)))
# Location — show if any is set
loc_vals = [self._get_attr(self.locations, i) for i in range(self.n_columns)]
if any(loc is not None for loc in loc_vals):
pairs.append(("Location", ", ".join(_fmt_location(loc) or "-" for loc in loc_vals)))
# Timeseries type — show if any is not FLAT
tst_vals = [self._get_attr(self.timeseries_types, i) for i in range(self.n_columns)]
if any(t != "FLAT" for t in tst_vals):
pairs.append(("Timeseries type", ", ".join(str(t) for t in tst_vals)))
# Labels — show if any is non-empty
lbl_vals = [self._get_attr(self.labels, i) for i in range(self.n_columns)]
if any(lbl for lbl in lbl_vals):
pairs.append(("Labels", ", ".join(str(lbl) for lbl in lbl_vals)))
return pairs
def _repr_data_rows(self, indices: list[int]) -> list[list[str]]:
rows: list[list[str]] = []
for i in indices:
rows.append(
[_fmt_timestamp(self._timestamps[i])] + [_fmt_value(float(v)) for v in self._values[i]]
)
return rows
def _repr_html_(self) -> str:
n = len(self._timestamps)
meta_rows = self._repr_meta_pairs()
def _html_row(i: int) -> str:
ts_cells = _fmt_timestamp_cells(self._timestamps[i])
val_cells = "".join(
f"<td>{escape(_fmt_value(float(v)))}</td>"
for v in self._values[i]
)
return f"<tr>{ts_cells}{val_cells}</tr>"
return _build_repr_html(
class_name=type(self).__name__,
meta_rows=meta_rows,
index_names=self.index_names,
column_names=self.column_names,
n_rows=n,
html_row_fn=_html_row,
)
def _coverage_masks(self) -> list[tuple[str, list[bool]]]:
masks: list[tuple[str, list[bool]]] = []
for col, name in enumerate(self.column_names):
col_data = self._values[:, col]
masks.append((name, [not np.isnan(v) for v in col_data]))
return masks
def coverage_bar(self) -> CoverageBar:
"""Return a displayable coverage bar."""
return CoverageBar(self._coverage_masks(), self.begin, self.end)
# ---------------------------------------------------------------------------
# Mixin: _TimeSeriesArrayReprMixin
# ---------------------------------------------------------------------------
class _TimeSeriesArrayReprMixin:
"""Repr mixin for TimeSeriesArray."""
__slots__ = ()
def _repr_meta_pairs(self) -> list[tuple[str, str]]:
dim_parts = [f"{d.name}: {len(d.labels)}" for d in self.dimensions]
pairs: list[tuple[str, str]] = [
("Name", self.name or "unnamed"),
("Dimensions", ", ".join(dim_parts)),
("Shape", str(self.shape)),
("Frequency", str(self.frequency)),
("Timezone", _fmt_tz_with_offset(self.timezone, self.primary_time_dim.labels)),
]
if self.unit:
pairs.append(("Unit", self.unit))
if self.data_type:
pairs.append(("Data type", str(self.data_type)))
total = self._values.size
if total > 0:
n_masked = int(self._values.mask.sum()) if self._values.mask.any() else 0
if n_masked > 0:
pct = n_masked / total * 100
pairs.append(("Masked", f"{n_masked}/{total} ({pct:.1f}%)"))
return pairs
def __repr__(self) -> str:
return _render_box(type(self).__name__, _format_meta_lines(self._repr_meta_pairs()))
def _repr_html_(self) -> str:
n_dims = self.ndim
meta_rows = self._repr_meta_pairs()
if n_dims >= 2:
# Classify dimensions: datetime → rows, others → columns
row_dims: list = []
col_dims: list = []
for d in self.dimensions:
if d.labels and isinstance(d.labels[0], datetime):
row_dims.append(d)
else:
col_dims.append(d)
# Edge: all datetime → move last to columns
if not col_dims:
col_dims.append(row_dims.pop())
# Edge: no datetime → move first to rows
elif not row_dims:
row_dims.append(col_dims.pop(0))
# Map dimension names to original axis indices
dim_to_axis = {d.name: i for i, d in enumerate(self.dimensions)}
# Cross-product index combinations
row_combos = list(
product(*(range(len(d.labels)) for d in row_dims))
)
col_combos = list(
product(*(range(len(d.labels)) for d in col_dims))
)
n_rows = len(row_combos)
n_cols = len(col_combos)
n_col_levels = len(col_dims)
# Visible row indices (truncation)
show_all_rows = n_rows <= _MAX_PREVIEW * 2 + 1
if show_all_rows:
vis_rows = list(range(n_rows))
else:
vis_rows = list(range(_MAX_PREVIEW)) + list(
range(n_rows - _MAX_PREVIEW, n_rows)
)
# Visible column indices (truncation)
show_all_cols = n_cols <= _MAX_COL_PREVIEW * 2 + 1
if show_all_cols:
vis_cols = list(range(n_cols))
else:
vis_cols = list(range(_MAX_COL_PREVIEW)) + list(
range(n_cols - _MAX_COL_PREVIEW, n_cols)
)
def _fmt_label(label):
if isinstance(label, datetime):
return _fmt_short_date(label)
return str(label)
# ---- build <thead> ----
def _group_header(indices, level):
"""Group consecutive column indices by label at *level*."""
cells: list[str] = []
if not indices:
return cells
cur_lbl = col_combos[indices[0]][level]
cur_cnt = 1
for k in range(1, len(indices)):
lbl = col_combos[indices[k]][level]
if lbl == cur_lbl:
cur_cnt += 1
else:
txt = escape(
_fmt_label(col_dims[level].labels[cur_lbl])
)
cells.append(
f'<th colspan="{cur_cnt}">{txt}</th>'
if cur_cnt > 1
else f"<th>{txt}</th>"
)
cur_lbl = lbl
cur_cnt = 1
txt = escape(_fmt_label(col_dims[level].labels[cur_lbl]))
cells.append(
f'<th colspan="{cur_cnt}">{txt}</th>'
if cur_cnt > 1
else f"<th>{txt}</th>"
)
return cells
thead_rows: list[str] = []
for level in range(n_col_levels):
tr: list[str] = []
if level == 0:
for rd in row_dims:
if n_col_levels > 1:
tr.append(
f'<th rowspan="{n_col_levels}">'
f"{escape(rd.name)}</th>"
)
else:
tr.append(f"<th>{escape(rd.name)}</th>")
if not show_all_cols:
head_cells = _group_header(
vis_cols[:_MAX_COL_PREVIEW], level
)
tail_cells = _group_header(
vis_cols[_MAX_COL_PREVIEW:], level
)
tr.extend(head_cells)
tr.append("<th>…</th>")
tr.extend(tail_cells)
else:
tr.extend(_group_header(vis_cols, level))
thead_rows.append(f'<tr>{"".join(tr)}</tr>')
# ---- build <tbody> ----
def _data_row(ri):
rc = row_combos[ri]
cells: list[str] = []
for rl, rd in enumerate(row_dims):
lbl = _fmt_label(rd.labels[rc[rl]])
cells.append(
f'<td class="ts-idx">{escape(lbl)}</td>'
)
for k, ci in enumerate(vis_cols):
cc = col_combos[ci]
idx = [0] * len(self.dimensions)
for rl, rd in enumerate(row_dims):
idx[dim_to_axis[rd.name]] = rc[rl]
for cl, cd in enumerate(col_dims):
idx[dim_to_axis[cd.name]] = cc[cl]
v = float(
np.ma.filled(
self._values[tuple(idx)], fill_value=np.nan
)
)
cells.append(
f"<td>"
f"{escape(_fmt_value(v))}</td>"
)
if not show_all_cols and k == _MAX_COL_PREVIEW - 1:
cells.append(
'<td class="ts-ellipsis">…</td>'
)
return f'<tr>{"".join(cells)}</tr>'
tbody: list[str] = []
head_vis = (
vis_rows[:_MAX_PREVIEW] if not show_all_rows else vis_rows
)
tail_vis = (
vis_rows[_MAX_PREVIEW:] if not show_all_rows else []
)
for ri in head_vis:
tbody.append(_data_row(ri))
if not show_all_rows:
n_td = (
len(row_dims)
+ len(vis_cols)
+ (1 if not show_all_cols else 0)
)
ell = "".join(
'<td class="ts-ellipsis">…</td>'
for _ in range(n_td)
)
tbody.append(f"<tr>{ell}</tr>")
for ri in tail_vis:
tbody.append(_data_row(ri))
# ---- assemble HTML ----
html = [_get_repr_css(), '<div class="ts-repr">']
html.append(
f'<div class="ts-header">'
f"{escape(type(self).__name__)}</div>"
)
html.append('<div class="ts-meta"><table>')
for label, value in meta_rows:
html.append(
f"<tr><td>{escape(label)}</td><td>{escape(value)}</td></tr>"
)
html.append("</table></div>")
html.append('<div class="ts-data"><table>')
html.append("<thead>")
for tr_str in thead_rows:
html.append(tr_str)
html.append("</thead>")
html.append("<tbody>")
for tr_str in tbody:
html.append(tr_str)
html.append("</tbody>")
html.append("</table></div>")
html.append("</div>")
return "\n".join(html)
elif n_dims == 1:
dim0 = self.dimensions[0]
n_rows = len(dim0.labels)
col_name = self.name or "value"
def _html_row_1d(i: int) -> str:
ts_cell = f"<td>{escape(str(dim0.labels[i]))}</td>"
v = float(np.ma.filled(self._values[i], fill_value=np.nan))
val_cell = f"<td>{escape(_fmt_value(v))}</td>"
return f"<tr>{ts_cell}{val_cell}</tr>"
return _build_repr_html(
class_name=type(self).__name__,
meta_rows=meta_rows,
index_names=(dim0.name,),
column_names=(col_name,),
n_rows=n_rows,
html_row_fn=_html_row_1d,
)
else:
return _build_repr_html(
class_name=type(self).__name__,
meta_rows=meta_rows,
index_names=(),
column_names=(),
n_rows=0,
html_row_fn=lambda i: "",
)
def coverage_bar(self) -> CoverageBar:
ptd = self.primary_time_dim
ptd_axis = self._dim_index(ptd.name)
if self.ndim == 1:
filled = np.ma.filled(self._values, fill_value=np.nan)
mask = [not np.isnan(v) for v in filled]
masks = [(self.name or "value", mask)]
else:
# Use the first non-time dimension for rows
other_axis = 1 if ptd_axis == 0 else 0
other_dim = self.dimensions[other_axis]
# Collapse remaining dims by taking index 0
vals = self._values
dims_to_remove = []
for i in range(self.ndim - 1, -1, -1):
if i != ptd_axis and i != other_axis:
vals = np.take(vals, 0, axis=i)
dims_to_remove.append(i)
masks = []
for j, label in enumerate(other_dim.labels):
if other_axis < ptd_axis:
row = np.take(vals, j, axis=0 if other_axis == 0 else other_axis)
else:
row = np.take(vals, j, axis=other_axis - len(dims_to_remove))
filled = np.ma.filled(row, fill_value=np.nan)
mask = [not np.isnan(float(v)) for v in filled]
masks.append((str(label), mask))
begin = ptd.labels[0] if ptd.labels and isinstance(ptd.labels[0], datetime) else None
end = ptd.labels[-1] if ptd.labels and isinstance(ptd.labels[-1], datetime) else None
return CoverageBar(masks, begin, end)
# ---------------------------------------------------------------------------
# Mixin: _HierarchicalTimeSeriesReprMixin
# ---------------------------------------------------------------------------
class _HierarchicalTimeSeriesReprMixin:
"""Repr mixin for HierarchicalTimeSeries."""
__slots__ = ()
_MAX_LEAF_ROWS = 7
def _repr_meta_pairs(self) -> list[tuple[str, str]]:
_tz_timestamps = [self._begin] if self._begin is not None else []
pairs: list[tuple[str, str]] = [
("Name", self._name or "unnamed"),
("Levels", ", ".join(self._levels)),
("Nodes", f"{self.n_nodes} ({self.n_leaves} leaves)"),
("Frequency", str(self._frequency)),
("Timezone", _fmt_tz_with_offset(self._timezone, _tz_timestamps)),
]
if self._unit:
pairs.append(("Unit", self._unit))
pairs.append(("Aggregation", str(self._aggregation)))
return pairs
def _leaf_summary_rows(self) -> list[dict[str, str]]:
"""Build summary rows for leaf nodes."""
leaf_nodes = self.leaves()
rows: list[dict[str, str]] = []
for node in leaf_nodes:
ts = node.timeseries
length = str(len(ts)) if ts is not None else "0"
begin = _fmt_short_date(ts.begin) if ts and ts.begin else "-"
end = _fmt_short_date(ts.end) if ts and ts.end else "-"
rows.append({
"name": node.key,
"level": node.level,
"length": length,
"begin": begin,
"end": end,
})
return rows
def _leaf_display_rows(self) -> list[dict[str, str]]:
"""Return leaf rows with head/tail truncation applied."""
rows = self._leaf_summary_rows()
if len(rows) <= self._MAX_LEAF_ROWS:
return rows
headers = ["name", "level", "length", "begin", "end"]
return rows[:3] + [{h: "..." for h in headers}] + rows[-3:]
def __repr__(self) -> str:
class_name = type(self).__name__
meta_lines = _format_meta_lines(self._repr_meta_pairs())
# Leaf table
rows = self._leaf_display_rows()
headers = ["name", "level", "length", "begin", "end"]
col_widths = {h: len(h) for h in headers}
for row in rows:
for h in headers:
col_widths[h] = max(col_widths[h], len(row[h]))
def _fmt_row(vals: dict[str, str]) -> str:
return " ".join(f"{vals[h]:<{col_widths[h]}}" for h in headers)
header_line = _fmt_row({h: h for h in headers})
# Combine all content lines
content_lines: list[str | None] = list(meta_lines)
content_lines.append(None) # separator
content_lines.append(header_line)
content_lines.append(None) # separator
for row in rows:
content_lines.append(_fmt_row(row))
return _render_box(class_name, content_lines)
def _repr_html_(self) -> str:
meta_rows = self._repr_meta_pairs()
html = [_get_repr_css(), '<div class="ts-repr">']
html.append(f'<div class="ts-header">{escape(type(self).__name__)}</div>')
html.append('<div class="ts-meta"><table>')
for label, value in meta_rows:
html.append(f"<tr><td>{escape(label)}</td><td>{escape(value)}</td></tr>")
html.append("</table></div>")
# Leaf table
headers = ["name", "level", "length", "begin", "end"]
display_rows = self._leaf_display_rows()
html.append('<div class="ts-data"><table style="text-align: left;">')
html.append(
"<tr>" + "".join(f"<th>{escape(h)}</th>" for h in headers) + "</tr>"
)
for row in display_rows:
html.append(
"<tr>"
+ "".join(f"<td>{escape(row[h])}</td>" for h in headers)
+ "</tr>"
)
html.append("</table></div></div>")
return "\n".join(html)
def tree(self) -> HierarchyTree:
"""Return a displayable tree visualization."""
return HierarchyTree(self._root)
# ---------------------------------------------------------------------------
# Mixin: _TimeSeriesCollectionReprMixin
# ---------------------------------------------------------------------------
class _TimeSeriesCollectionReprMixin:
"""Repr mixin for TimeSeriesCollection."""
__slots__ = ()
def _item_summary(self, key: str, item) -> dict:
"""Summarize a single item for repr tables."""
kind = type(item).__name__
freq = str(item.frequency) if hasattr(item, "frequency") else "-"
tz = item.timezone if hasattr(item, "timezone") else "-"
n = len(item)
begin = item.begin
end = item.end
begin_s = _fmt_short_date(begin) if begin else "-"
end_s = _fmt_short_date(end) if end else "-"
return {
"name": key,
"type": kind,
"freq": freq,
"tz": tz,
"length": str(n),
"begin": begin_s,
"end": end_s,
}
def __repr__(self) -> str:
class_name = type(self).__name__
if not self._series:
return f"{type(self).__name__}(empty)"
rows = [
self._item_summary(k, v) for k, v in self._series.items()
]
headers = ["name", "type", "freq", "tz", "length", "begin", "end"]
col_widths = {h: len(h) for h in headers}
for row in rows:
for h in headers:
col_widths[h] = max(col_widths[h], len(row[h]))
def _fmt_row(vals: dict) -> str:
return " ".join(f"{vals[h]:<{col_widths[h]}}" for h in headers)
header_line = _fmt_row({h: h for h in headers})
content_lines: list[str | None] = [header_line]
content_lines.append(None) # separator
for row in rows:
content_lines.append(_fmt_row(row))
return _render_box(class_name, content_lines)
def _repr_html_(self) -> str:
if not self._series:
return "<div><b>TimeSeriesCollection</b> (empty)</div>"
rows = [
self._item_summary(k, v) for k, v in self._series.items()
]
headers = ["name", "type", "freq", "tz", "length", "begin", "end"]
html = [_get_repr_css(), '<div class="ts-repr">']
html.append(f'<div class="ts-header">{escape(type(self).__name__)}</div>')
html.append('<div class="ts-data"><table style="text-align: left;">')
html.append(
"<tr>" + "".join(f"<th>{escape(h)}</th>" for h in headers) + "</tr>"
)
nowrap = {"begin", "end"}
for row in rows:
cells = []
for h in headers:
style = ' style="white-space:nowrap"' if h in nowrap else ""
cells.append(f"<td{style}>{escape(row[h])}</td>")
html.append("<tr>" + "".join(cells) + "</tr>")
html.append("</table></div></div>")
return "\n".join(html)
def coverage_bar(self) -> CoverageBar:
"""Return a multi-row CoverageBar spanning the global time range."""
masks: list[tuple[str, list[bool]]] = []
all_begins: list[datetime] = []
all_ends: list[datetime] = []
for key, item in self._series.items():
begin = item.begin
end = item.end
if begin is not None and end is not None:
# Unwrap tuples for multi-index
b = begin[0] if isinstance(begin, tuple) else begin
e = end[0] if isinstance(end, tuple) else end
all_begins.append(b)
all_ends.append(e)
if not all_begins:
return CoverageBar([], None, None)
global_begin = min(all_begins)
global_end = max(all_ends)
global_span = (global_end - global_begin).total_seconds()
n_bins = CoverageBar._TERM_BINS
for key, item in self._series.items():
begin = item.begin
end = item.end
# Import here to check type without circular import
from .timeseries import TimeSeriesList
from .table import TimeSeriesTable
if isinstance(item, TimeSeriesList):
label = key
if begin is None or end is None or global_span == 0:
masks.append((label, [False] * n_bins))
continue
b = begin[0] if isinstance(begin, tuple) else begin
e = end[0] if isinstance(end, tuple) else end
item_masks = item._coverage_masks()
_, raw_mask = item_masks[0]
# Map raw mask onto global bins
bin_mask = self._rebin_to_global(
raw_mask, b, e, global_begin, global_span, n_bins
)
masks.append((label, bin_mask))
elif isinstance(item, TimeSeriesTable):
if begin is None or end is None or global_span == 0:
for col_name in item.column_names:
label = f"{key}/{col_name}"
masks.append((label, [False] * n_bins))
continue
b = begin[0] if isinstance(begin, tuple) else begin
e = end[0] if isinstance(end, tuple) else end
for col_name, raw_mask in item._coverage_masks():
label = f"{key}/{col_name}"
bin_mask = self._rebin_to_global(
raw_mask, b, e, global_begin, global_span, n_bins
)
masks.append((label, bin_mask))
return CoverageBar(masks, global_begin, global_end)
@staticmethod
def _rebin_to_global(
raw_mask: list[bool],
item_begin: datetime,
item_end: datetime,
global_begin: datetime,
global_span: float,
n_bins: int,
) -> list[bool]:
"""Map an item's coverage mask onto global bins."""
if not raw_mask or global_span == 0:
return [False] * n_bins
result = [False] * n_bins
item_span = (item_end - item_begin).total_seconds()
n_points = len(raw_mask)
for i, present in enumerate(raw_mask):
if not present:
continue
# Position of this point in the item's time range
if n_points == 1:
t_offset = (item_begin - global_begin).total_seconds()
else:
t_offset = (
(item_begin - global_begin).total_seconds()
+ item_span * i / (n_points - 1)
)
bin_idx = int(t_offset / global_span * n_bins)
bin_idx = min(bin_idx, n_bins - 1)
result[bin_idx] = True
return result