-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcw_spectrogram_v2_snddev.py
More file actions
235 lines (203 loc) · 7.56 KB
/
cw_spectrogram_v2_snddev.py
File metadata and controls
235 lines (203 loc) · 7.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
#!/usr/bin/python
"""
based on
PyAudio + PyQtGraph Spectrum Analyzer
Author:@sbarratt
Date Created: August 8, 2015
and Spectrum Analyzer with STFT see Yumi's blog
https://fairyonice.github.io/implement-the-spectrogram-from-scratch-in-python.html
as modified by waszee Oct 12, 2020
this version is using sounddevice instead of pyaudio
"""
import struct
import sys
import numpy as np
import IPython as ipy
import pyqtgraph as pg
from pyqtgraph.Qt import QtGui, QtCore
import matplotlib.pyplot as plt
import queue
import sounddevice as sd
q = queue.Queue()
# Audio Format (check Audio MIDI Setup if on Mac)
FORMAT = np.int16
RATE = 44100
CHANNELS = 1
# Set Plot Range [-RANGE,RANGE], default is nyquist/2
URANGE =12000
if not URANGE:
URANGE = RATE/2
LRANGE=200
if not LRANGE:
LRANGE=0
TRACK=1024 #for spectrogram
OVERLAP=400
COLLECTSEC=30
#expect 44100 data points per sec so 30sec is about 1.2MB
# input block is used for the realtime pyqtgraph
INPUT_BLOCK_TIME = 0.1
INPUT_FRAMES_PER_BLOCK = int(RATE*INPUT_BLOCK_TIME) #for pyQTgraph
#print("block:",INPUT_FRAMES_PER_BLOCK)
# Which Channel if stereo? (L or R)
LR = "l"
class SpectrumAnalyzer():
def __init__(self):
self.sdinit_stream()
self.initUI()
def sdinit_stream(self):
self.sdstream = sd.InputStream(samplerate=RATE,channels=1,blocksize =INPUT_FRAMES_PER_BLOCK,callback = self.sdcallback, dtype=np.int16)
self.sdstream.start()
def sdcallback(self,indata, frames, time, status):
"""This is called (from a separate thread) for each audio block."""
if status:
print(status)
ndata=indata[:]
q.put(ndata)
def sdreadData(self):
try:
data = q.get()
except queue.Empty:
pass
return data
def initUI(self):
self.app = QtGui.QApplication([])
self.app.quitOnLastWindowClosed()
self.mainWindow = QtGui.QMainWindow()
self.mainWindow.setWindowTitle("Spectrum Analyzer")
self.mainWindow.resize(800,300)
self.centralWid = QtGui.QWidget()
self.mainWindow.setCentralWidget(self.centralWid)
self.lay = QtGui.QVBoxLayout()
self.centralWid.setLayout(self.lay)
self.specWid = pg.PlotWidget(name="spectrum")
self.specItem = self.specWid.getPlotItem()
self.specItem.setMouseEnabled(y=False)
self.specItem.setYRange(0,5000)
self.specItem.setXRange(-URANGE,URANGE, padding=0)
self.specAxis = self.specItem.getAxis("bottom")
self.specAxis.setLabel("Frequency [Hz]")
self.lay.addWidget(self.specWid)
self.mainWindow.show()
self.app.aboutToQuit.connect(self.close)
def close(self):
self.sdstream.stop()
sys.exit()
def get_spectrum(self, data):
T = 1.0/RATE
N = data.shape[0]
f = np.fft.fftfreq(N,T)
f = np.fft.fftshift(f)
w = np.blackman(N)
Pxx = np.fft.fft(data*w)
Pxx = np.fft.fftshift(Pxx)
Pxx = 2/N*np.abs(Pxx)
return f, Pxx
def create_spectrogram(self,ts,NFFT,noverlap = None):
'''
ts: original time series
NFFT: The number of data points used in each block for the DFT.
Fs: the number of points sampled per second, so called sample_rate
noverlap: The number of points of overlap between blocks. The default
value is NFFT/2.
'''
if noverlap is None:
noverlap = NFFT/2
noverlap = int(noverlap)
starts = np.arange(0,len(ts),NFFT-noverlap,dtype=int)
# remove any window with less than NFFT sample size
starts = starts[starts + NFFT < len(ts)]
xns = []
for start in starts:
# short term discrete fourier transform
#ts_window = get_xns(ts[start:start + NFFT])
f, Pxx = self.get_spectrum(ts[start:start + NFFT])
#xns.append(ts_window)
#stack the new readings in upper half array and transpose to horizontal
N=len(Pxx)
Pxx = 2/N*np.abs(Pxx[N//2:N-1])
xns.append(Pxx)
specX = np.array(xns).T
# rescale the absolute value of the spectrogram as rescaling is standard
spec = 20*np.log10(specX)
assert spec.shape[1] == len(starts)
return(starts,spec)
def plot_spectrogram(self,spec,ks,sample_rate, L, starts,tslen, mappable = None):
plt.figure(figsize=(7.5,3))
rlow = int(L*LRANGE/sample_rate)
rhigh = int(L*URANGE/sample_rate)
specshow = spec[rlow:rhigh,]
plt_spec = plt.imshow(specshow,origin='lower', cmap="twilight_r")
## create ylim
Nyticks = 10
V=int(specshow.shape[0])
ks = np.linspace(0,V,Nyticks)
ksHz = self.get_Hz_scale_vec(ks,sample_rate,V*2)
plt.yticks(ks,ksHz)
plt.ylabel("Frequency (Hz)")
## create xlim
Nxticks = 10
ts_spec = np.linspace(0,spec.shape[1],Nxticks)
total_ts_sec=int(tslen/RATE)
ts_spec_sec = ["{:4.2f}".format(i) for i in np.linspace(0,total_ts_sec*starts[-1]/tslen,Nxticks)]
plt.xticks(ts_spec,ts_spec_sec)
plt.xlabel("Time (sec)")
plt.title("Spectrogram L={} Spectrogram.shape={}".format(L,spec.shape))
#plt.colorbar(mappable,use_gridspec=True)
plt.show()
return(plt_spec)
def get_Hz_scale_vec(self,ks,sample_rate,Npoints):
maxrange=sample_rate/2
freq_Hz = ks*sample_rate/Npoints*(URANGE-LRANGE)/maxrange+LRANGE
freq_Hz = [int(i) for i in freq_Hz ]
return(freq_Hz )
# def alt_spectrogram(self,ts,sample_rate):
# dt = 1/sample_rate
# t = np.arange(0.0, COLLECTSEC, dt)
# NFFT = TRACK # the length of the windowing segments
# Fs = int(1.0 / dt) # the sampling frequency
# fig, (ax1, ax2) = plt.subplots(nrows=2)
# ax1.plot(t, ts)
# Pxx, freqs, bins, im = ax2.specgram(ts, NFFT=NFFT, Fs=Fs, noverlap=OVERLAP)
# return
def mainLoop(self):
ts=[]
#with self.stream:
while 1:
# Sometimes Input overflowed because of mouse events, ignore this
while (len(ts)<(RATE*COLLECTSEC)):
try:
#data = self.readData()
sddata=self.sdreadData()
data=sddata.reshape(-1)
#print(data)
except IOError:
continue
except Exception as e:
print("Exception:",e)
self.close()
try:
f, Pxx = self.get_spectrum(data)
self.specItem.plot(x=f,y=Pxx, clear=True)
QtGui.QApplication.processEvents()
ts=np.concatenate((ts,data))
except Exception as e:
print("exception = ",e)
print(len(data))
print(len(Pxx)," Pxx shape= ",Pxx.shape)
print("ts len",len(ts))
self.close()
break
L =TRACK
noverlap = OVERLAP
Nxlim=10
sample_rate=RATE
ks = np.linspace(0,len(Pxx),Nxlim)
starts, spec = self.create_spectrogram(ts,L,noverlap = noverlap )
tslen=len(ts)
self.plot_spectrogram(spec,ks,sample_rate,L, starts,tslen)
#self.alt_spectrogram(ts,sample_rate)
self.close()
if __name__ == '__main__':
sa = SpectrumAnalyzer()
sa.mainLoop()
sa.close()