Source code for timedatamodel.collection

from __future__ import annotations

from datetime import datetime
from typing import Iterator

from ._base import _DataFrameMixin, _import_polars
from ._repr import _TimeSeriesCollectionReprMixin
from .location import GeoArea, GeoLocation
from .table import TimeSeriesTable
from .timeseries import TimeSeriesList


[docs] class TimeSeriesCollection(_TimeSeriesCollectionReprMixin, _DataFrameMixin): """Container for TimeSeriesList and/or TimeSeriesTable objects that don't share an index. Items are stored internally as an ordered ``dict[str, TimeSeriesList | TimeSeriesTable]``. """ __slots__ = ("_series", "_name", "_description") def __init__( self, series: ( list[TimeSeriesList | TimeSeriesTable] | dict[str, TimeSeriesList | TimeSeriesTable] | None ) = None, *, name: str | None = None, description: str | None = None, ) -> None: self._name = name self._description = description if series is None: self._series: dict[str, TimeSeriesList | TimeSeriesTable] = {} elif isinstance(series, dict): self._series = dict(series) else: self._series = {} used: dict[str, int] = {} for idx, item in enumerate(series): key: str | None = None if isinstance(item, TimeSeriesList) and item.name: key = item.name elif isinstance(item, TimeSeriesTable): names = item.column_names if names: key = ",".join(names) if key is None: key = f"series_{idx}" if key in used: used[key] += 1 key = f"{key}_{used[key]}" else: used[key] = 0 self._series[key] = item # ---- properties ------------------------------------------------------- @property def name(self) -> str | None: return self._name @property def description(self) -> str | None: return self._description @property def names(self) -> list[str]: return list(self._series.keys()) @property def series_count(self) -> int: return len(self._series) # ---- mapping / sequence protocol -------------------------------------- def __len__(self) -> int: return len(self._series) def __bool__(self) -> bool: return len(self._series) > 0 def __contains__(self, key: str) -> bool: return key in self._series def __iter__(self) -> Iterator[str]: return iter(self._series) def __getitem__(self, key: str | int) -> TimeSeriesList | TimeSeriesTable: if isinstance(key, int): keys = list(self._series.keys()) return self._series[keys[key]] return self._series[key]
[docs] def keys(self): return self._series.keys()
[docs] def values(self): return self._series.values()
[docs] def items(self): return self._series.items()
# ---- mutation (returns new collection) --------------------------------
[docs] def add( self, item: TimeSeriesList | TimeSeriesTable, name: str | None = None, ) -> TimeSeriesCollection: if name is None: if isinstance(item, TimeSeriesList) and item.name: name = item.name elif isinstance(item, TimeSeriesTable): names = item.column_names name = ",".join(names) if names else None if name is None: name = f"series_{len(self._series)}" new_series = dict(self._series) new_series[name] = item return TimeSeriesCollection( new_series, name=self._name, description=self._description )
[docs] def remove(self, name: str) -> TimeSeriesCollection: new_series = {k: v for k, v in self._series.items() if k != name} return TimeSeriesCollection( new_series, name=self._name, description=self._description )
# ---- spatial filtering ------------------------------------------------- @staticmethod def _item_distance( item: TimeSeriesList | TimeSeriesTable, target: GeoLocation ) -> float | None: """Return the minimum distance from *item* to *target*, or None if no location.""" if isinstance(item, TimeSeriesList): loc = item.location if isinstance(loc, GeoLocation): return loc.distance_to(target) if isinstance(loc, GeoArea): return loc.centroid.distance_to(target) return None # TimeSeriesTable — take min across columns dists: list[float] = [] for i in range(item.n_columns): loc = item._get_attr(item.locations, i) if isinstance(loc, GeoLocation): dists.append(loc.distance_to(target)) elif isinstance(loc, GeoArea): dists.append(loc.centroid.distance_to(target)) return min(dists) if dists else None @staticmethod def _item_in_radius( item: TimeSeriesList | TimeSeriesTable, center: GeoLocation, radius_km: float, ) -> bool: """True if any location on *item* is within *radius_km* of *center*.""" if isinstance(item, TimeSeriesList): loc = item.location if isinstance(loc, GeoLocation): return loc.distance_to(center) <= radius_km if isinstance(loc, GeoArea): return loc.centroid.distance_to(center) <= radius_km return False for i in range(item.n_columns): loc = item._get_attr(item.locations, i) if isinstance(loc, GeoLocation) and loc.distance_to(center) <= radius_km: return True if isinstance(loc, GeoArea) and loc.centroid.distance_to(center) <= radius_km: return True return False @staticmethod def _item_in_area( item: TimeSeriesList | TimeSeriesTable, area: GeoArea ) -> bool: """True if any location on *item* is inside *area*.""" if isinstance(item, TimeSeriesList): loc = item.location if isinstance(loc, GeoLocation): return loc.is_within(area) if isinstance(loc, GeoArea): return area.contains_area(loc) return False for i in range(item.n_columns): loc = item._get_attr(item.locations, i) if isinstance(loc, GeoLocation) and loc.is_within(area): return True if isinstance(loc, GeoArea) and area.contains_area(loc): return True return False
[docs] def filter_by_location( self, center: GeoLocation, radius_km: float ) -> TimeSeriesCollection: """Keep series within *radius_km* of *center*.""" new_series = { k: v for k, v in self._series.items() if self._item_in_radius(v, center, radius_km) } return TimeSeriesCollection( new_series, name=self._name, description=self._description )
[docs] def filter_by_area(self, area: GeoArea) -> TimeSeriesCollection: """Keep series inside *area*.""" new_series = { k: v for k, v in self._series.items() if self._item_in_area(v, area) } return TimeSeriesCollection( new_series, name=self._name, description=self._description )
[docs] def nearest( self, target: GeoLocation, n: int = 1 ) -> TimeSeriesCollection: """Keep the *n* nearest series to *target*.""" scored: list[tuple[float, str]] = [] for key, item in self._series.items(): d = self._item_distance(item, target) if d is not None: scored.append((d, key)) scored.sort(key=lambda x: x[0]) keep_keys = {key for _, key in scored[:n]} new_series = {k: v for k, v in self._series.items() if k in keep_keys} return TimeSeriesCollection( new_series, name=self._name, description=self._description )
# ---- conversion --------------------------------------------------------
[docs] def to_pandas_dataframe(self) -> "pd.DataFrame": """Outer-join all series into a single pandas DataFrame. Each series becomes a column named by its key. The index is the union of all timestamps (outer join), with ``NaN`` for missing values. """ import pandas as pd if not self._series: return pd.DataFrame() frames: dict[str, "pd.Series"] = {} for key, item in self._series.items(): df_item = item.to_pandas_dataframe() # TimeSeriesList produces a single-column DataFrame; extract the Series if df_item.shape[1] == 1: frames[key] = df_item.iloc[:, 0] else: # TimeSeriesTable: each column gets a composite key for col in df_item.columns: frames[f"{key}/{col}"] = df_item[col] return pd.DataFrame(frames)
[docs] def to_pd_df(self) -> "pd.DataFrame": """Alias for ``to_pandas_dataframe()``.""" return self.to_pandas_dataframe()
[docs] def to_polars_dataframe(self): """Outer-join all series into a single polars DataFrame.""" pl = _import_polars() pdf = self.to_pandas_dataframe() return pl.from_pandas(pdf.reset_index())
[docs] def to_pl_df(self): """Alias for ``to_polars_dataframe()``.""" return self.to_polars_dataframe()
[docs] def to_numpy(self) -> "dict[str, np.ndarray]": """Return each series as a numpy array in a dict keyed by series name.""" import numpy as np result: dict[str, np.ndarray] = {} for key, item in self._series.items(): result[key] = item.to_numpy() return result
@property def arr(self) -> "dict[str, np.ndarray]": """Shorthand for ``to_numpy()``.""" return self.to_numpy()