-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdecoder_preprocessor.py
More file actions
126 lines (111 loc) · 4.14 KB
/
decoder_preprocessor.py
File metadata and controls
126 lines (111 loc) · 4.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import numpy as np
import pandas as pd
import random
class preprocess_single_trial_data:
def __init__(self):
self.pmap = {
'sf': [],
'ori': [],
'phi': [],
'id': []
}
def reset_pmap(self):
self.pmap = {
'sf': [],
'ori': [],
'phi': [],
'id': []
}
def expand_pmap(self, row, n_trials):
"""
Generate an ordered list of categorical variables
corresponding to the order of appearance and number
of trials in the single trial data.
"""
self.pmap['sf'] += [row['Spatial Freq']] * n_trials
self.pmap['ori'] += [row['Orientation']] * n_trials
self.pmap['phi'] += [row['Phase']] * n_trials
self.pmap['id'] += [row['stim_condition_ids']] * n_trials
def generate_class_labels(self, parameter_map, label, n_trials, slc = None):
Y = []
if slc == None:
for sf in parameter_map[label].unique():
Y+=[sf]*n_trials
Y = np.array(Y)
else:
for sf in parameter_map[label].unique()[slc]:
Y+=[sf]*n_trials
Y = np.array(Y)
return (Y*100).astype(int)
def unpack_pref_data(self, l):
unpacked = []
for i in l:
for key, val in i.items():
unpacked.append(val)
return unpacked
def collect_max_indices(self, row):
sf = round(row['sf'], 2)
pref_ori = row['pref_ori']
pref_phase = row['pref_phase']
sf_map = self.pmap.loc[
(self.pmap.sf == sf)
& (self.pmap.ori == pref_ori)
& (self.pmap.phi == pref_phase)
]
self.map_list.append(sf_map)
def select_max_data(self, trial_data, parameter_map, pref_phase_ori, n_trials):
pref_phase_ori = self.unpack_pref_data(pref_phase_ori)
self.reset_pmap()
parameter_map.apply(self.expand_pmap, n_trials = n_trials, axis = 1)
self.pmap = pd.DataFrame(self.pmap)
unit_best_trial_indices = []
for unit_pref in pref_phase_ori:
self.map_list = []
unit_pref.iloc[1:].apply(self.collect_max_indices, axis = 1)
unit_best_trial_indices.append(pd.concat(self.map_list))
std_maxonly = []
for i0, unit in enumerate(unit_best_trial_indices):
i1 = unit.index.values
std_maxonly.append(trial_data[i0][i1])
self.reset_pmap()
return np.array(std_maxonly)
def get_trial_averaged_samples(self, trial_data, n_stim = 10, n_trials = 250, n_avg = 5):
sf_indices = list(
zip(
list(range(0, int(n_stim*n_trials), n_trials)),
list(range(n_trials, int((n_stim*n_trials)+n_trials), n_trials))
)
)
if n_trials%n_avg != 0:
raise _NotAFactor(n_avg, n_trials)
else:
n_samples = n_trials/n_avg
trial_averaged_samples = []
for sf in sf_indices:
sf_slice = trial_data[:,sf[0]:sf[1],:]
averages = []
choices = list(range(n_trials))
random.shuffle(choices)
choices = np.array(choices).reshape(int(n_samples), n_avg)
for choice in choices:
averages.append(sf_slice[:,choice,:].mean(1))
averages = np.stack(averages, axis = 1)
trial_averaged_samples.append(averages)
return np.concatenate(trial_averaged_samples, axis = 1)
def apply_temporal_smoothing(self, trial_data, kernel, t_axis):
return np.apply_along_axis(
lambda m: np.convolve(
m, kernel,
mode='same'
), axis=t_axis, arr=trial_data
)
class _NotAFactor(Exception):
"""
Exception raised for when n_avg is not a factor
of n_trials.
"""
def __init__(self, arg0, arg1):
self.message = f"""
n_avg ({arg0}) is not a factor of n_trials ({arg1}).
"""
super().__init__(self.message)