Coverage for src/pycse/sklearn/lhc.py: 0.00%
65 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-23 16:23 -0400
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-23 16:23 -0400
1"""
2An sklearn-compatible Latin Hypercube library.
4This is lightly tested on three-factor systems with 3 and 4 levels.
6"""
8import numpy as np
9import pandas as pd
10import scipy.stats as stats
13class LatinSquare:
14 """LatinSquare class."""
16 seed = 42
18 def __init__(self, vars=None):
19 """Initialize the LatinSquare class.
21 vars is a dictionary: key: values
22 The first entry is used for rows
23 The second entry is used for cols
24 The third entry is used in the cells.
26 The values are the levels for each factor
28 Each entry should have the same number of elements. Tested on 3 and 4
29 levels for three factors.
31 """
32 self.vars = vars
33 self.labels = list(vars.keys())
34 np.random.seed(self.seed)
36 def design(self, shuffle=False):
37 """Return a design matrix as a dataframe.
39 Each row is an experiment to run. If shuffle is True, it is randomized.
41 """
42 # Setup the design matrix
43 lhs = []
44 levels = self.vars[self.labels[2]].copy()
45 if shuffle:
46 np.random.shuffle(levels)
47 lhs += [levels]
48 for i in range(1, len(levels)):
49 levels = levels[1:] + levels[:1]
50 lhs += [levels]
52 expts = []
54 for i, r in enumerate(self.vars[self.labels[0]]):
55 for j, c in enumerate(self.vars[self.labels[1]]):
56 expts += [[r, c, lhs[i][j]]]
58 df = pd.DataFrame(expts, columns=self.labels)
60 if shuffle:
61 return df.sample(frac=1)
62 else:
63 return df
65 def pivot(self):
66 """Show the Pivot table version of the design."""
67 df = self.design()[self.labels]
68 return df.pivot(index=self.labels[0], columns=self.labels[1])
70 def fit(self, X, y):
71 """Fit the model.
73 X is the experiment design dataframe
74 y is a pd.Series for the responses.
76 This signature is for compatibility with the sklearn fit API.
78 """
79 df = pd.concat([X, y], axis=1)
81 avg = df["avg"] = df[y.name].mean()
82 rows = df.groupby(self.labels[0])[y.name].mean() - avg
83 cols = df.groupby(self.labels[1])[y.name].mean() - avg
84 effs = df.groupby(self.labels[2])[y.name].mean() - avg
85 resids = df[y.name] - avg - rows - cols - effs
87 # the values are series here.
88 self.effects = {
89 "avg": avg,
90 self.labels[0]: rows,
91 self.labels[1]: cols,
92 self.labels[2]: effs,
93 "residuals": resids,
94 }
96 df[f"{self.labels[0]}_effect"] = df.join(
97 rows, on=self.labels[0], rsuffix="_effect", how="left"
98 )[f"{y.name}_effect"]
99 df[f"{self.labels[1]}_effect"] = df.join(
100 cols, on=self.labels[1], rsuffix="_effect", how="left"
101 )[f"{y.name}_effect"]
102 df[f"{self.labels[2]}_effect"] = df.join(
103 effs, on=self.labels[2], rsuffix="_effect", how="left"
104 )[f"{y.name}_effect"]
105 df["residuals"] = (
106 df[y.name]
107 - df["avg"]
108 - df[f"{self.labels[0]}_effect"]
109 - df[f"{self.labels[1]}_effect"]
110 - df[f"{self.labels[2]}_effect"]
111 )
113 self.results = df
114 self.y = y.name
115 return self.results
117 def anova(self):
118 """Do an ANOVA on significance of each effect.
120 Returns a dataframe containing normalized variance for each effect, and
121 significance.
123 """
124 df = self.results
125 N = len(df) ** 2
126 n0 = len(self.vars[self.labels[0]]) - 1
127 n1 = len(self.vars[self.labels[1]]) - 1
128 n2 = len(self.vars[self.labels[2]]) - 1
130 dof = np.array([n0, n1, n2, len(df) - 1 - n0 - n1 - n2])
131 ddof = N / (N - (N - dof))
133 # These are variances in each factor
134 S2 = (
135 (
136 df[
137 [
138 f"{self.labels[0]}_effect",
139 f"{self.labels[1]}_effect",
140 f"{self.labels[2]}_effect",
141 "residuals",
142 ]
143 ]
144 ).var(ddof=0)
145 * ddof
146 )
148 # I think this is like an F-score
149 ns = S2 / S2.loc["residuals"]
151 # Critical F-score. Bigger than this means significant
152 fc = stats.f.ppf(0.95, dof[0], dof[-1])
154 table = np.vstack([ns.index.values, ns.values, ns > fc]).T
156 return pd.DataFrame(
157 table, columns=[f"{self.y} effect", f"F-score (fc={fc:1.1f})", "Significant"]
158 )
160 def predict(self, args):
161 """Predict the response for ARGS.
163 ARGS is a list of labels in order
164 (row, col, effect). This only works for levels you have already defined,
165 i.e. it is not a continuous function.
167 """
168 p = self.effects["avg"]
169 for i, val in enumerate(args):
170 label = self.labels[i]
171 ind = self.results[label] == val
172 effect = self.results[ind][f"{label}_effect"].mean()
173 p += effect
174 return p