Source code for nrgpy.quality.quality

from collections import Counter
from datetime import datetime
import math
import numpy as np
import pandas as pd
from typing import Union


[docs]def check_intervals( df: pd.DataFrame, verbose: bool=True, return_info: bool=False, show_all_missing_timestamps: bool=False, show_all_duplicate_timestamps: bool=False, interval: Union[int, str]="", ) -> Union[dict, None]: """checks for missing or duplicate intervals in a pandas dataframe with a "Timestamp" column Parameters ---------- df : object the dataframe to be checked interval : int [deprecated] the averaging interval in seconds verbose : bool print results to terminal; False to skip return_info : bool set to True to return dict with below values show_all_missing_timestamps : bool set to True to show all missing timestamps in verbose option. otherwise, shows first and last 3. show_all_duplicate_timestamps : bool set to True to show all duplicate timestamps in verbose option. otherwise, shows first and last 3. Returns ---------- dict actual_rows : int actual number of rows in data section of export file (1 subtracted for column headers) expected_rows : int expected number of rows (assumes 10 min. AVG), converts result to whole integer time_range : str range of time represented in export file first_interval : str file starting timestamp last_interval : str file ending timestamp missing_timestamps : list a list of missing timestamps duplicate_timestamps : list a list of duplicate timestamps Examples ---------- ex. pass a reader.data dataframe for an interval check: >>> reader = nrgpy.sympro_txt_read() instance created, no filename specified >>> reader.concat_txt(txt_dir="C:/data/sympro_data/000110/") ... >>> nrgpy.check_intervals(reader.data, interval=600) Starting timestamp : 2019-01-01 00:00:00 Ending timestamp : 2019-07-01 04:50:00 Data set Duration : 181 days, 4:50:00 Expected rows in data set : 26093 Actual rows in data set : 26093 Data set complete. """ if "horz" in "".join(df.columns).lower() or isinstance( df["Timestamp"][0], datetime ): df2 = df.copy() # df2.Timestamp = df2.Timestamp.apply(lambda x: x.strftime("%Y-%m-%d %H:%M:%S")) # noqa: E501 df2.reset_index(level=0, inplace=True) _df = df2 first_interval = _df["Timestamp"].min() last_interval = _df["Timestamp"].max() else: _df = df.copy() time_fmt = "%Y-%m-%d %H:%M:%S" first_interval = datetime.strptime(_df["Timestamp"].min(), time_fmt) last_interval = datetime.strptime(_df["Timestamp"].max(), time_fmt) duplicate_timestamps, _df = find_duplicate_intervals(_df) # delete duplicate intervals before doing the rest of the interval checks _df = _df.drop_duplicates(subset=["Timestamp"], keep="first") _df.reset_index(drop=True, inplace=True) interval = int(select_interval_length(_df)) time_range = last_interval - first_interval expected_rows = int(time_range.total_seconds() / interval) actual_rows = len(_df) - 1 loss_pct = 100 * ((expected_rows - actual_rows) / expected_rows) if abs(loss_pct) > 1: loss_pct = round(loss_pct) else: loss_pct = round(loss_pct, 3) if expected_rows != actual_rows: missing_timestamps, _df = find_missing_intervals(_df, interval) if verbose: print("Statistical interval : {0} seconds".format(interval)) print("Starting timestamp : {0}".format(first_interval)) print("Ending timestamp : {0}".format(last_interval)) print("Data set Duration : {0}".format(time_range)) print("Expected rows in data set : {0}".format(expected_rows)) print("Actual rows in data set : {0}".format(actual_rows)) if expected_rows == actual_rows: print("\nData set complete.") else: print("Interval loss percentage : {0}".format(loss_pct)) print("\nMissing {0} timestamps:".format(len(missing_timestamps))) if len(missing_timestamps) <= 8 or show_all_missing_timestamps: for i, timestamp in enumerate(missing_timestamps): print("\t{0}\t{1}".format(i + 1, timestamp)) else: for timestamp in ( missing_timestamps[0:3] + ["..."] + missing_timestamps[-3:] ): print("\t{0}\t{1}".format(" ", timestamp)) print("Duplicate {0} timestamps:".format(len(duplicate_timestamps))) if len(duplicate_timestamps) <= 8 or show_all_duplicate_timestamps: for i, timestamp in enumerate(duplicate_timestamps): print("\t{0}\t{1}".format(i + 1, timestamp)) else: for timestamp in ( duplicate_timestamps[0:3] + ["..."] + duplicate_timestamps[-3:] ): print("\t{0}\t{1}".format(" ", timestamp)) if return_info: interval_info = {} interval_info["actual_rows"] = actual_rows interval_info["expected_rows"] = expected_rows interval_info["interval_length"] = interval interval_info["first_interval"] = first_interval interval_info["last_interval"] = last_interval interval_info["time_range"] = time_range interval_info["loss_pct"] = loss_pct try: interval_info["missing_timestamps"] = missing_timestamps except Exception: interval_info["missing_timestamps"] = [] interval_info["duplicate_timestamps"] = duplicate_timestamps return interval_info return None
[docs]def find_missing_intervals(__df: pd.DataFrame, interval: Union[int, str]) -> tuple: """find gaps in data dataframe returns ---------- list a list of all missing intervals """ _df = __df.copy() _df["data"] = True _df["Timestamp"] = pd.to_datetime(_df["Timestamp"]) _df.set_index("Timestamp", inplace=True) _df = _df.reindex( pd.date_range( start=_df.index[0], end=_df.index[-1], freq="{0}s".format(interval) ) ) missing_timestamps = [] for index, row in _df.iterrows(): try: if math.isnan(row["data"]): missing_timestamps.append(index) except TypeError: missing_timestamps.append(index) return missing_timestamps, _df
[docs]def find_duplicate_intervals(__df: pd.DataFrame) -> tuple: """find duplicate interval timestamps returns ------- list a list of all duplicate intervals """ _df = __df.copy() time_fmt = "%Y-%m-%d %H:%M:%S" observed_intervals = pd.to_datetime(_df["Timestamp"].values, format=time_fmt) unique_timestamps_set = set(observed_intervals) if len(observed_intervals) == len(unique_timestamps_set): duplicate_timestamps = [] else: # Record duplicates using Counter timestamp_counter = Counter(observed_intervals) duplicate_timestamps = [ timestamp for timestamp, count in timestamp_counter.items() if count > 1 ] return duplicate_timestamps, _df
[docs]def select_interval_length(df: pd.DataFrame, seconds: bool = True) -> Union[int, float]: """returns the mode of the first 10 intervals of the data set parameters ---------- reader : nrgpy reader object seconds : bool (True) set to False to get interval length in minutes returns ------- int """ from datetime import datetime formatter = "%Y-%m-%d %H:%M:%S" interval = [] for i in range(5): try: interval.append( int( ( datetime.strptime(df["Timestamp"].loc[i + 1], formatter) - datetime.strptime(df["Timestamp"].loc[i], formatter) ).seconds ) ) except Exception: formatter = "%Y-%m-%d %H:%M:%S.%f" interval.append(int((df["Timestamp"].iloc[i + 1] - df["Timestamp"].iloc[i]).seconds)) try: if seconds: return select_mode_from_list(interval) return select_mode_from_list(interval) / 60 except Exception: return False
[docs]def select_mode_from_list(lst: list) -> int: return max(set(lst), key=lst.count)
[docs]def check_for_missing_txt_files(txt_file_names): """check list of files for missing file numbers parameters ---------- txt_file_names : list list of SymphoniePRO text file exports returns ------- list "missing" text file numbers """ missing_file_numbers = [] for i, f in enumerate(sorted(txt_file_names)): file_number = int(f.split("_")[-2]) if i > 0: if file_number - _file_number > 1: # noqa: F821 missing_file_numbers.append(f) _file_number = file_number return missing_file_numbers