-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathchewbite_utils.py
More file actions
328 lines (246 loc) · 13.1 KB
/
chewbite_utils.py
File metadata and controls
328 lines (246 loc) · 13.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
from armetrics import utils as armutils
from armetrics import plotter
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
standardized_names = {"RUMIA PASTURA": "RUMIA", "PASTURA": "PASTOREO", "RUMIA PASTOREO": "RUMIA",
"RUMIA EN PASTURA": "RUMIA", "GRAZING": "PASTOREO", "RUMINATION": "RUMIA",
"R": "RUMIA", "P": "PASTOREO"}
segmentation_replacements = {"RUMIA": "SEGMENTACION", "PASTOREO": "SEGMENTACION", "REGULAR": "SEGMENTACION"}
_names_of_interest = ["PASTOREO", "RUMIA"]
_name_of_segmentation = ["SEGMENTACION"]
def load_chewbite(filename: str, start: int = None, end: int = None, verbose=True, to_segmentation=False) -> pd.Series:
df = pd.read_table(filename, decimal=',', header=None, delim_whitespace=True,
names=["bl_start", "bl_end", "label"], usecols=[0, 1, 2])
df[["bl_start", "bl_end"]] = df[["bl_start", "bl_end"]].astype('float')
df = df.round(0)
df.label = df.label.str.strip().str.upper()
df.label.replace(standardized_names, inplace=True)
df[["bl_start", "bl_end"]] = df[["bl_start", "bl_end"]].astype('int')
# It will modify the limits of partially selected labels
# Given end and start may be in the middle of a label
if start:
df = df[df.bl_end > start]
df.loc[df.bl_start < start, "bl_start"] = start
df = df[df.bl_start >= start]
if end:
df = df[df.bl_start < end]
df.loc[df.bl_end > end, "bl_end"] = end
df = df[df.bl_end <= end]
names_of_interest = _names_of_interest
if to_segmentation:
names_of_interest = _name_of_segmentation
df.label.replace(segmentation_replacements, inplace=True)
if verbose:
print("Labels in (", start, ",", end, ") from", filename, "\n", df.label.unique())
df = df.loc[df.label.isin(names_of_interest)]
segments = [armutils.Segment(bl_start, bl_end, label) for name, (bl_start, bl_end, label) in df.iterrows()]
indexes = [np.arange(bl_start, bl_end) for name, (bl_start, bl_end, label) in df.iterrows()]
if len(segments) < 1:
print("Warning, you are trying to load a span with no labels from:", filename)
indexes = [np.array([])] # To avoid errors when no blocks are present in the given interval
frames = armutils.segments2frames(segments)
indexes = np.concatenate(indexes)
s = pd.Series(frames, index=indexes)
if s.index.has_duplicates:
print("Overlapping labels were found in", filename)
print("Check labels corresponding to times given below (in seconds):")
print(s.index[s.index.duplicated()])
if len(segments) < 1:
series_len = 1 # Provides a series with a single element that has an empty label, to avoid error
else:
series_len = s.index[-1] # The series will have the length of up to the last second of the last block
s_formatted = s.reindex(np.arange(series_len), fill_value="")
return s_formatted
def length_signal_chewbite(filename, start=None, end=None, verbose=True, to_segmentation=False):
df = pd.read_table(filename, decimal=',', header=None)
df.dropna(axis=1, how='all', inplace=True)
df.columns = ["start", "end", "label"]
df[["start", "end"]] = df[["start", "end"]].astype('float').round(0)
df[["start", "end"]] = df[["start", "end"]].astype('int')
# It will modify the limits of partially selected labels
# Given end and start may be in the middle of a label
if start:
df = df[df.end > start]
df.loc[df.start < start, "start"] = start
df = df[df.start >= start]
if end:
df = df[df.start < end]
df.loc[df.end > end, "end"] = end
df = df[df.end <= end]
return df["end"].max() - df["start"].min()
def merge_contiguous(df):
""" Given a dataframe df with start, end and label columns it will merge contiguous equally labeled """
for i in df.index[:-1]:
next_label = df.loc[i + 1].label
if next_label == df.loc[i].label:
df.loc[i + 1, "start"] = df.loc[i].start
df.drop(i, inplace=True)
return df
def remove_silences(filename_in, filename_out, max_len=300, sil_label="SILENCIO"):
""" Given a labels filename will remove SILENCE blocks shorter than max_len (in seconds)
if they are surrounded by blocks with the same label.
This silences will be merged with contiguous blocks.
"""
df = pd.read_table(filename_in, decimal=',', header=None)
df.dropna(axis=1, how='all', inplace=True)
df.columns = ["start", "end", "label"]
df[["start", "end"]] = df[["start", "end"]].astype('float')
df = df.round(0)
df.label = df.label.str.strip().str.upper()
df[["start", "end"]] = df[["start", "end"]].astype('int')
sil_label = str.strip(str.upper(sil_label))
for i, (start, end, label) in df.loc[df.index[1:-1]].iterrows():
length = end - start
prev_label = df.loc[i - 1].label
next_label = df.loc[i + 1].label
if label == sil_label and length <= max_len and prev_label == next_label:
df.loc[i, "label"] = prev_label
df = merge_contiguous(df)
df.to_csv(filename_out,
header=False, index=False, sep="\t")
def remove_between_given(filename_in, filename_out, search_label, max_len=300):
""" Given a labels filename will remove blocks shorter than max_len (in seconds)
if they are surrounded by blocks of the search_label.
This short blocks will be merged with contiguous blocks.
"""
df = pd.read_table(filename_in, decimal=',', header=None)
df.dropna(axis=1, how='all', inplace=True)
df.columns = ["start", "end", "label"]
df[["start", "end"]] = df[["start", "end"]].astype('float')
df = df.round(0)
df.label = df.label.str.strip().str.upper()
df[["start", "end"]] = df[["start", "end"]].astype('int')
for i, (start, end, label) in df.loc[df.index[1:-1]].iterrows():
length = end - start
prev_label = df.loc[i - 1].label
next_label = df.loc[i + 1].label
if label != search_label and length <= max_len \
and next_label == prev_label == search_label:
df.loc[i, "label"] = search_label
df = merge_contiguous(df)
df.to_csv(filename_out,
header=False, index=False, sep="\t")
def merge_file(filename_in, filename_out):
"""
Given a labels filename_in will merge contiguous blocks and save it to filename_out.
"""
df = pd.read_table(filename_in, decimal=',', header=None)
df.dropna(axis=1, how='all', inplace=True)
df.columns = ["start", "end", "label"]
df[["start", "end"]] = df[["start", "end"]].astype('float')
df = df.round(0)
df.label = df.label.str.strip().str.upper()
df[["start", "end"]] = df[["start", "end"]].astype('int')
df = merge_contiguous(df)
df.to_csv(filename_out,
header=False, index=False, sep="\t")
def violinplot_metric_from_report(single_activity_report, metric):
grouped_reports = single_activity_report.groupby("predictor_name")
n_predictors = len(grouped_reports)
predictors_labels = []
activity = single_activity_report.activity.iloc[0]
plt.figure()
pos = np.arange(n_predictors) + .5 # the bar centers on the y axis
if n_predictors > 10:
print("Be careful! I cannot plot more than 10 labels.")
# colors = ["C%d" % i for i in range(n_predictors)]
for (predictor_name, predictor_report), p in zip(grouped_reports, pos):
predictors_labels.append(predictor_name)
values_to_plot = predictor_report.loc[:, metric].values
plt.violinplot(values_to_plot[np.isfinite(values_to_plot)], [p], points=50, vert=False, widths=0.65,
showmeans=False, showmedians=True, showextrema=True, bw_method='silverman')
plt.axvline(x=0, color="k", linestyle="dashed")
plt.axvline(x=1, color="k", linestyle="dashed")
plt.yticks(pos, predictors_labels)
plt.gca().invert_yaxis()
plt.minorticks_on()
plt.xlabel('Frame F1-score')
plt.tight_layout()
plt.savefig('violin_' + metric + "_" + activity + '.pdf')
plt.savefig('violin_' + metric + "_" + activity + '.png')
plt.show()
def my_display_report(complete_report_df):
report_activity_grouped = complete_report_df.groupby("activity")
for activity_label, single_activity_report in report_activity_grouped:
print("\n================", activity_label, "================\n")
violinplot_metric_from_report(single_activity_report, "frame_f1score")
def load_chewbite2(filename: str, start: float = None, end: float = None, verbose=True, to_segmentation=False,
decimals: int = 0, frame_len: float = 1.0, names_of_interest: list = None) -> pd.DataFrame:
blocks_in = pd.read_table(filename, decimal='.', header=None, delim_whitespace=True,
names=["start", "end", "label"], usecols=[0, 1, 2])
blocks_in.loc[:, "start":"end"] = blocks_in.loc[:, "start":"end"].astype('float').round(decimals)
blocks_in.label = blocks_in.label.str.strip().str.upper().replace(standardized_names)
# It will modify the limits of partially selected labels
# end and start may be in the middle of a label
if start:
blocks_in = blocks_in[blocks_in.end > start]
blocks_in.loc[blocks_in.start < start, "start"] = start
else:
start = 0.0
if end:
blocks_in = blocks_in[blocks_in.start < end]
blocks_in.loc[blocks_in.end > end, "end"] = end
else:
end = blocks_in.end.max()
if not names_of_interest:
names_of_interest = _names_of_interest
if to_segmentation:
names_of_interest = _name_of_segmentation
blocks_in.label.replace(segmentation_replacements, inplace=True)
if verbose:
print("Labels in (", start, ",", end, ") from", filename, "\n", blocks_in.label.unique())
blocks_in = blocks_in.loc[blocks_in.label.isin(names_of_interest)]
if verbose:
print(blocks_in)
start_out = np.arange(start, end, frame_len)
end_out = start_out + frame_len
frames_out = pd.DataFrame({"start": start_out, "end": end_out, "label": ""})
to_revise_label = "to_revise"
for row_id, block in blocks_in.iterrows():
criteria = (frames_out.start >= block.start) & (frames_out.end <= block.end)
frames_out.loc[criteria, "label"] = block.label
criteria = (frames_out.start >= block.start) & (frames_out.start < block.end) & (frames_out.end > block.end)
frames_out.loc[criteria, "label"] = to_revise_label
criteria = (frames_out.start < block.start) & (frames_out.end > block.start) & (frames_out.end <= block.end)
frames_out.loc[criteria, "label"] = to_revise_label
def revise_frame(frame):
criteria = (blocks_in.end >= frame.start) & (blocks_in.start < frame.end)
blocks_in_frame = blocks_in.loc[criteria].copy()
blocks_in_frame.loc[blocks_in_frame.start < frame.start, "start"] = frame.start
blocks_in_frame.loc[blocks_in_frame.end > frame.end, "end"] = frame.end
blocks_in_frame["overlap"] = (blocks_in_frame.end - blocks_in_frame.start) / frame_len
null_overlap = 1.0 - blocks_in_frame["overlap"].sum()
blocks_in_frame = blocks_in_frame.append({"label": "", "overlap": null_overlap}, ignore_index=True)
frame.label = blocks_in_frame.groupby("label").sum().sort_values("overlap").last_valid_index()
return frame
to_revise_frames = frames_out.label == to_revise_label
frames_out.loc[to_revise_frames] = frames_out.loc[to_revise_frames].apply(revise_frame, axis="columns")
if verbose:
print(frames_out)
if list(frames_out.label.unique()) == [""]:
print("Warning, you are trying to load a span (", start, ",", end, ") with no labels from:", filename)
return frames_out
def merge_true_pred(filename_true, filename_pred, **kwargs):
y_true = load_chewbite2(filename_true, verbose=False, **kwargs)
y_pred = load_chewbite2(filename_pred, verbose=False, **kwargs)
merged = pd.merge(y_pred, y_true, on=["start", "end"],
how="outer", suffixes=("_pred", "_true"), sort=True).fillna("")
return merged
def compute_cm_and_plot(df_true_pred):
cm = pd.crosstab(df_true_pred.label_true, df_true_pred.label_pred,
rownames=['True'], colnames=['Predicted'], normalize="index")
sns.heatmap(cm, annot=True, cmap="Blues", cbar=None, fmt=".2f", square=True)
plt.show()
def cm_single_pred(true_filenames, pred_filenames, pred_name, starts_ends=None, **kwargs):
if starts_ends is None:
starts_ends = [(None, None)] * len(true_filenames)
print("\n================", pred_name, "================\n")
to_concat = [merge_true_pred(truef, predf, start=s, end=e, **kwargs) for truef, predf, (s, e) in
zip(true_filenames, pred_filenames, starts_ends)]
concatenated = pd.concat(to_concat, ignore_index=True)
compute_cm_and_plot(concatenated)
def plot_predictors_cm(true_filenames, names_of_predictors, *argv_prediction_filenames, **kwargs):
for pred_name, pred_filenames in zip(names_of_predictors, argv_prediction_filenames):
cm_single_pred(true_filenames, pred_filenames, pred_name, **kwargs)