-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathutils.py
More file actions
220 lines (184 loc) · 7.45 KB
/
utils.py
File metadata and controls
220 lines (184 loc) · 7.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import numpy as np
from time import time
from statistics import mean
from scipy.stats import rankdata
from sklearn.metrics import roc_auc_score
from sklearn.svm import SVC
from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_curve, auc
class Scorer():
def __init__(self, score_func, greater_better=True, prob=False, **kwargs):
self.score_func_ = score_func
self.greater_better = greater_better
self.prob = prob
self.kwargs_ = kwargs
def score(self, y_true, y_pred):
return self.score_func_(y_true, y_pred, **self.kwargs_)
def compute_scores(scores, y_true, y_pred, y_score=None):
scores_val = {}
for score_name, scorer in scores.items():
if scorer.prob:
scores_val[score_name] = scorer.score(y_true, y_score)
else:
scores_val[score_name] = scorer.score(y_true, y_pred)
return scores_val
def grid_search_cv(
base_clf,
param_grid,
scores,
X,
y,
cv,
scaler=False,
best_score='mean accuracy',
probability=False,
verbose=False
):
n_splits = cv.get_n_splits(X)
n_params = len(list(param_grid))
if verbose:
print('Fitting %d folds for each of %d candidates, totalling %d fits'
% (n_splits, n_params, n_splits*n_params))
results = {}
results['params'] = []
results['mean fit time'] = []
results['mean score time'] = []
for score_name in scores:
for i in range(n_splits):
results['split%d %s' % (i, score_name)] = []
results['mean '+score_name] = []
results['concat '+score_name] = []
for param_comb in param_grid:
for param in param_comb:
results['param_'+param] = [None for _ in range(n_params)]
# Iterate on parameters
for p, params in enumerate(param_grid):
y_pred_concat = np.array([])
y_true_concat = np.array([])
y_score_concat = None
fit_times = []
score_times = []
folds_scores = {}
for score_name in scores:
folds_scores[score_name] = []
for param_name, param_val in params.items():
results['param_'+param_name][p] = param_val
results['params'].append(params)
# Cross validation
for split, (train_index , test_index) in enumerate(cv.split(X, y)):
if verbose:
print('[CV %d/%d; %d/%d] %s '
% (split+1, n_splits, p+1, n_params, params), end='')
X_train, X_val = X[train_index, :], X[test_index, :]
y_train, y_val = y[train_index] , y[test_index]
# Data scaling
if scaler:
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)
if probability and base_clf==SVC:
params['probability'] = True
# Fit the model on training set
fit_start = time()
clf = base_clf(**params).fit(X_train, y_train)
fit_times.append(time()-fit_start)
# Test the model on validation set
score_start = time()
y_pred = clf.predict(X_val)
score_times.append(time()-score_start)
# Compute the probability associated to predictions
y_score = None
if probability:
y_score = clf.predict_proba(X_val)
# Concatenate predictions on this test fold
y_pred_concat = np.concatenate([y_pred_concat, y_pred])
y_true_concat = np.concatenate([y_true_concat, y_val])
if probability:
y_score_concat = np.concatenate([y_score_concat, y_score]) \
if y_score_concat is not None else y_score
# Compute and store scores on this test fold
curr_fold_scores = compute_scores(
scores,
y_pred=y_pred,
y_true=y_val,
y_score=y_score
)
for score_name, value in curr_fold_scores.items():
results['split%d %s' % (split, score_name)].append(value)
folds_scores[score_name].append(value)
if verbose: print('%s=%f ' % (score_name, value), end='')
if verbose: print()
# Compute and store mean scores over folds
for score_name, value in folds_scores.items():
results['mean '+score_name].append(mean(value))
results['mean fit time'].append(mean(fit_times))
results['mean score time'].append(mean(score_times))
# Compute and store scores on the concatenation of the predictions for each test fold
concat_scores = compute_scores(
scores,
y_pred=y_pred_concat,
y_true=y_true_concat,
y_score=y_score_concat
)
for score_name, value in concat_scores.items():
results['concat '+score_name].append(value)
# Rank models
aggr_rank = np.zeros((n_params))
for score_name, scorer in scores.items():
mean_score = np.array(results['mean '+score_name])
if scorer.greater_better:
mean_score = 1-mean_score
rank_mean = rankdata(mean_score, method='min')
aggr_rank += rank_mean
results['rank mean '+score_name] = rank_mean.tolist()
concat_score = np.array(results['concat '+score_name])
if scorer.greater_better:
concat_score = 1-concat_score
rank_concat = rankdata(concat_score, method='min')
aggr_rank += rank_concat
results['rank concat '+score_name] = rank_concat.tolist()
results['aggregated rank'] = rankdata(aggr_rank, method="min").tolist()
# Find the index of the best params
if best_score=='aggregated rank':
idx_best_params = results[best_score].index(1)
else:
# If more than one parameter combination has rank 1
# select the one with lower index whose aggregated rank is 1
indeces_best_params = [
i for i, x in enumerate(results['rank '+best_score]) if x == 1
]
bests_ranks = np.array(results['aggregated rank'])[indeces_best_params]
idx_best_params = indeces_best_params[np.argmin(bests_ranks)]
best_params = results['params'][idx_best_params]
return results, best_params
def compute_roc_auc(y_true, y_score, weighted, labels):
n_classes = len(labels)
if n_classes <= 2:
roc_auc = roc_auc_score(y_true, y_score[:, 1])
fpr, tpr, _ = roc_curve(y_true, y_score[:, 1], pos_label=labels[1])
return fpr, tpr, roc_auc
y_true = label_binarize(y_true, classes=labels)
fpr = dict()
tpr = dict()
roc_auc = dict()
# Compute weights
w = np.ones((n_classes))
tot = n_classes
if weighted:
tot = y_true.shape[0]
for i in range(n_classes):
w[i] = np.count_nonzero(y_true[:, i] == 1)
# Compute ROC curve points and AUC for each class and average them
roc_auc_mean = 0
for i in range(n_classes):
fpr[i], tpr[i], _ = roc_curve(y_true[:, i], y_score[:, i])
roc_auc[i] = auc(fpr[i], tpr[i])
roc_auc_mean += roc_auc[i] * w[i]
roc_auc_mean /= tot
# Concatenate unique fpr of all classes
all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))
# Compute mean tpr with interpolation
mean_tpr = np.zeros_like(all_fpr)
for i in range(n_classes):
mean_tpr += np.interp(all_fpr, fpr[i], tpr[i]) * w[i]
mean_tpr /= tot
return all_fpr, mean_tpr, roc_auc_mean