diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index 47acdf5..0000000 Binary files a/.DS_Store and /dev/null differ diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 8b2ae6e..7b15d6d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -14,7 +14,7 @@ jobs: matrix: platform: - ubuntu-latest - python-version: ['3.9', '3.10', '3.11', '3.12'] + python-version: ['3.10', '3.11', '3.12'] steps: - uses: actions/checkout@v3 diff --git a/.gitignore b/.gitignore index 93b30a0..c2ee334 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ python_adc_eval.egg-info dist .ruff_cache .coverage +.secret build diff --git a/README.rst b/README.rst index fa08338..d936b9c 100644 --- a/README.rst +++ b/README.rst @@ -42,7 +42,8 @@ Given an array of values representing the output of an ADC, the spectrum can be leak=, window=, no_plot=, - yaxis=<"power"/"fullscale"> + yaxis=<"power"/"fullscale"/"magnitude">, + single_sided= ) diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..d0d6f98 --- /dev/null +++ b/__init__.py @@ -0,0 +1 @@ +"""Init file for tests.""" diff --git a/adc_eval/__init__.py b/adc_eval/__init__.py index 00d2b6a..b4593a3 100644 --- a/adc_eval/__init__.py +++ b/adc_eval/__init__.py @@ -1,5 +1 @@ """Initialization file for module.""" - -from . import spectrum -from . import converters -from . import signals diff --git a/adc_eval/adcs/__init__.py b/adc_eval/adcs/__init__.py new file mode 100644 index 0000000..b4593a3 --- /dev/null +++ b/adc_eval/adcs/__init__.py @@ -0,0 +1 @@ +"""Initialization file for module.""" diff --git a/adc_eval/adcs/basic.py b/adc_eval/adcs/basic.py new file mode 100644 index 0000000..c9235af --- /dev/null +++ b/adc_eval/adcs/basic.py @@ -0,0 +1,152 @@ +"""Basic ADC models.""" + +import numpy as np + + +def dac(samples, nbits=8, vref=1): + """Digital to analog converter.""" + quants = 2**nbits + dv = vref / quants + return samples * dv + + +class ADC: + """ + Generic ADC Class. + + Parameters + ---------- + nbits : int, default=8 + Number of bits for the ADC. + fs : int or float, default=1 + Sample rate for the ADC in Hz. + vref : int or float, default=1 + Reference level of the ADC in Volts ([0, +vref] conversion range). + seed : int, default=1 + Seed for random variable generation. + **kwargs + Extra arguments. + + Attributes + ------- + vin : float + Sets or returns the current input voltage level. Assumed +/-vref/2 input + vlsb : float + LSB voltage of the converter. vref/2^nbits + noise : float, default=0 + Sets or returns the stdev of the noise generated by the converter. + mismatch : float, default=0 + Sets or returns the stdev of the mismatch of the converter. + offset : tuple of float, default=(0, 0) + Sets the (mean, stdev) of the offset of the converter. + gain_error : tuple of float, default=(0, 0) + Sets the (mean, stdev) of the gain error of the converter. + distortion : list of float, default=[1] + Sets the harmonic distortion values with index=0 corresponding to HD1. + Example: For unity gain and only -30dB of HD3, input is [1, 0, 0.032] + dout : int + Digital output code for current vin value. + + Methods + ------- + run_step + + """ + + def __init__(self, nbits=8, fs=1, vref=1, seed=1, **kwargs): + """Initialization function for Generic ADC.""" + np.random.seed(seed) + self.nbits = nbits + self.fs = fs + self.vref = vref + self.seed = seed + self.err = {"noise": 0, "gain": 0, "dist": [1], "offset": 0, "mismatch": 0} + self.dbits = np.zeros(nbits) + self.dval = 0 + + @property + def vin(self): + """Return input value.""" + return self._vin + + @vin.setter + def vin(self, x): + """Set the input value.""" + x += self.vref / 2 + x = max(0, min(x, self.vref)) + self._vin = x + + @property + def vlsb(self): + """Return the LSB voltage.""" + return self.vref / 2**self.nbits + + @property + def noise(self): + """Return noise status.""" + return self.err["noise"] + + @noise.setter + def noise(self, stdev): + """Set noise stdev in Vrms.""" + self.err["noise"] = stdev + + @property + def mismatch(self): + """Return noise stdev.""" + print("WARNING: 'mismatch' feature not implemented for this class.") + return False + + @mismatch.setter + def mismatch(self, stdev): + """Set mismatch stdev.""" + print("WARNING: 'mismatch' feature not implemented for this class.") + pass + + @property + def offset(self): + """Return offset value.""" + return self.err["offset"] + + @offset.setter + def offset(self, values): + """Set offset mean and stdev.""" + self.err["offset"] = np.random.normal(values[0], values[1]) + + @property + def gain_error(self): + """Return gain error status.""" + return self.err["gain"] + + @gain_error.setter + def gain_error(self, values): + """Set gain error mean and stdev.""" + self.err["gain"] = np.random.normal(values[0], values[1]) + + @property + def distortion(self): + """Return distortion gains (1st-order indexed).""" + return self.err["dist"] + + @distortion.setter + def distortion(self, gains): + """Set distortion gains (1st-order indexed).""" + self.err["dist"] = gains + + @property + def dout(self): + """Return digital output code.""" + return int(self.dval) + + def run_step(self): + """Run a single ADC step.""" + vinx = self.vin + dval = int( + min(max(int((2**self.nbits) * vinx / self.vref), 0), 2**self.nbits - 1) + ) + bits = [int(x) for x in bin(dval)[2:]] + + while len(bits) < self.nbits: + bits.insert(0, 0) + self.dbits = bits + self.dval = dval diff --git a/adc_eval/adcs/sar.py b/adc_eval/adcs/sar.py new file mode 100644 index 0000000..f4b59d7 --- /dev/null +++ b/adc_eval/adcs/sar.py @@ -0,0 +1,130 @@ +"""SAR ADC models""" + +import numpy as np +from adc_eval.adcs.basic import ADC + + +class SAR(ADC): + """ + SAR ADC Class. + + Parameters + ---------- + nbits : int, optional + Number of bits for the ADC. The default is 8. + fs : float, optional + Sample rate for the ADC in Hz. The default is 1Hz. + vref : float, optional + Reference level of the ADC in Volts ([0, +vref] conversion range). The default is 1. + seed : int, optional + Seed for random variable generation. The default is 1. + **kwargs + Extra arguments. + weights : list, optional + List of weights for SAR capacitors. Must be >= nbits. Defaults to binary weights. + MSB weight should be in index 0. + + Attributes + ------- + vin : float + Sets or returns the current input voltage level. Assumed +/-vref/2 input + vlsb : float + LSB voltage of the converter. vref/2^nbits + noise : float, default=0 + Sets or returns the stdev of the noise generated by the converter. + weights : list + Sets or returns the capacitor weighting of the array. Default is binary weighting. + mismatch : float + Sets or returns the stdev of the mismatch of the converter. Default is no mismatch. + comp_noise : float + Sets or returns the stdev of the comparator noise. Default is no noise. + offset : tuple of float + Sets the (mean, stdev) of the offset of the converter. Default is no offset. + gain_error : tuple of float + Sets the (mean, stdev) of the gain error of the converter. Default is no gain error. + distortion : list of float + Sets the harmonic distortion values with index=0 corresponding to HD1. + Example: For unity gain and only -30dB of HD3, input is [1, 0, 0.032] + dout : int + Digital output code for current vin value. + + Methods + ------- + run_step + + """ + + def __init__(self, nbits=8, fs=1, vref=1, seed=1, **kwargs): + """Initialization function for Generic ADC.""" + super().__init__(nbits, fs, vref, seed) + + self._mismatch = None + self._comp_noise = 0 + + # Get keyword arguments + self._weights = None + self.weights = kwargs.get("weights", self.weights) + + @property + def weights(self): + """Returns capacitor unit weights.""" + if self._weights is None: + self._weights = np.flip(2 ** np.linspace(0, self.nbits - 1, self.nbits)) + return np.array(self._weights) + + @weights.setter + def weights(self, values): + """Sets the capacitor unit weights.""" + self._weights = np.array(values) + self.dbits = np.zeros(len(values)) + if self._weights.size < self.nbits: + print( + f"WARNING: Capacitor weight array size is {self._weights.size} for {self.nbits}-bit ADC." + ) + self.mismatch = self.err["mismatch"] + + @property + def mismatch(self): + """Return noise stdev.""" + if self._mismatch is None: + self._mismatch = np.zeros(self.weights.size) + return self._mismatch + + @mismatch.setter + def mismatch(self, stdev): + """Sets mismatch stdev.""" + self.err["mismatch"] = stdev + self._mismatch = np.random.normal(0, stdev, self.weights.size) + self._mismatch /= np.sqrt(self.weights) + + @property + def comp_noise(self): + """Returns the noise of the comparator.""" + return self._comp_noise + + @comp_noise.setter + def comp_noise(self, value): + """Sets the noise of the comparator.""" + self._comp_noise = value + + def run_step(self): + """Run a single ADC step.""" + vinx = self.vin + + cweights = self.weights * (1 + self.mismatch) + cdenom = sum(cweights) + 1 + + comp_noise = np.random.normal(0, self.comp_noise, cweights.size) + + # Bit cycling + vdac = vinx + for n, _ in enumerate(cweights): + vcomp = vdac - self.vref / 2 + comp_noise[n] + compout = vcomp * 1e6 + compout = -1 if compout <= 0 else 1 + self.dbits[n] = max(0, compout) + vdac -= compout * self.vref / 2 * cweights[n] / cdenom + + # Re-scale the data + scalar = 2**self.nbits / cdenom + self.dval = min(2**self.nbits - 1, scalar * sum(self.weights * self.dbits)) diff --git a/adc_eval/converters.py b/adc_eval/converters.py deleted file mode 100644 index bea7480..0000000 --- a/adc_eval/converters.py +++ /dev/null @@ -1,29 +0,0 @@ -"""Analog <-> Digital converters behavioral models.""" - -import numpy as np - - -def analog2digital( - sig_f, sample_freq=1e6, sample_n=1024, sample_bits=8, vref=3.3, noisy_lsb=1 -): - """Analog to digital converter.""" - sample_quants = 2**sample_bits - sample_prd = 1 / sample_freq - t = np.arange(0, sample_n * sample_prd, sample_prd) - dv = vref / sample_quants - samples = np.rint(sig_f(t) / dv).astype(int) - # apply noise - if noisy_lsb: - noise = np.random.normal(0, 2 ** (noisy_lsb - 1), size=sample_n).astype(int) - samples += noise - # to be sure that samples fit the range: - samples[np.argwhere(samples >= sample_quants)] = sample_quants - 1 - samples[np.argwhere(samples < 0)] = 0 - return samples - - -def digital2analog(samples, sample_bits=8, vref=3.3): - """Digital to analog converter.""" - quants = 2**sample_bits - dv = vref / quants - return samples * dv diff --git a/adc_eval/eval/calc.py b/adc_eval/eval/calc.py new file mode 100644 index 0000000..0d3ce61 --- /dev/null +++ b/adc_eval/eval/calc.py @@ -0,0 +1,241 @@ +"""Spectral analysis helper module.""" + +import numpy as np + + +def db_to_pow(value, places=3): + """ + Convert dBW to W. + + Parameters + ---------- + value : float or ndarray + Value to convert to power, in dBW. + places : int, optional + Number of places to round output value to. Default is 3. + + Returns + ------- + float or ndarray + Returns either the rounded and converted value, or the ndarray + """ + if isinstance(value, np.ndarray): + return np.round(10 ** (0.1 * value), places) + return round(10 ** (0.1 * value), places) + + +def dBW(value, places=1): + """ + Convert to dBW. + + Parameters + ---------- + value : float or ndarray + Value to convert to dBW, in W. + places : int, optional + Number of places to round output value to. Default is 1. + + Returns + ------- + float or ndarray + Returns either the rounded and converted value, or the ndarray + """ + if isinstance(value, np.ndarray): + return np.round(10 * np.log10(value), places) + return round(10 * np.log10(value), places) + + +def enob(sndr, places=1): + """ + Return ENOB for given SNDR. + + Parameters + ---------- + sndr : float + SNDR value in dBW to convert to ENOB. + places : int, optional + Number of places to round output value to. Default is 1. + + Returns + ------- + float or ndarray + Returns either the rounded and converted value, or the ndarray + """ + return round((sndr - 1.76) / 6.02, places) + + +def sndr_sfdr(spectrum, freq, fs, nfft, leak=0, full_scale=0): + """ + Get SNDR and SFDR. + + Parameters + ---------- + spectrum : ndarray + Power spectrum as ndarray in units of Watts. + freq : ndarray + Array of frequencies for the input power spectrum. + fs : float + Sample frequency of power spectrum in Hz. + nfft : int + Number of samples in the FFT. + leak : int, optional + Number of leakage bins to consider when looking for peaks. Default is 0. + full_scale : float, optional + Full scale reference value for spectrum in Watts. + + Returns + ------- + dict + Returns a dictionary of computed stats. + """ + # Zero the DC bin + for i in range(0, leak + 1): + spectrum[i] = 0 + bin_sig = np.argmax(spectrum) + psig = sum(spectrum[i] for i in range(bin_sig - leak, bin_sig + leak + 1)) + spectrum_n = spectrum.copy() + spectrum_n[bin_sig] = 0 + fbin = fs / nfft + + for i in range(bin_sig - leak, bin_sig + leak + 1): + spectrum_n[i] = 0 + + bin_spur = np.argmax(spectrum_n) + pspur = spectrum[bin_spur] + + noise_power = sum(spectrum_n) + noise_floor = 2 * noise_power / nfft + + stats = {} + + stats["sig"] = { + "freq": freq[bin_sig], + "bin": bin_sig, + "power": psig, + "dB": dBW(psig), + "dBFS": round(dBW(psig) - full_scale, 1), + } + + stats["spur"] = { + "freq": freq[bin_spur], + "bin": bin_spur, + "power": pspur, + "dB": dBW(pspur), + "dBFS": round(dBW(pspur) - full_scale, 1), + } + stats["noise"] = { + "floor": noise_floor, + "power": noise_power, + "rms": np.sqrt(noise_power), + "dBHz": round(dBW(noise_floor, 3) - full_scale, 1), + "NSD": round(dBW(noise_floor, 3) - full_scale - 2 * dBW(fbin, 3), 1), + } + stats["sndr"] = { + "dBc": dBW(psig / noise_power), + "dBFS": round(full_scale - dBW(noise_power), 1), + } + stats["sfdr"] = { + "dBc": dBW(psig / pspur), + "dBFS": round(full_scale - dBW(pspur), 1), + } + stats["enob"] = {"bits": enob(stats["sndr"]["dBFS"])} + + return stats + + +def find_harmonics(spectrum, freq, nfft, bin_sig, psig, harms=5, leak=20, fscale=1e6): + """ + Get the harmonic contents of the data. + + Parameters + ---------- + spectrum : ndarray + Power spectrum as ndarray in units of Watts. + freq : ndarray + Array of frequencies for the input power spectrum. + nfft : int + Number of samples in the FFT. + bin_sig : int + Frequency bin of the dominant signal. + psig : float + Power of dominant signal in spectrum. + harms : int, optional + Number of input harmonics to calculate. Default is 5. + leak : int, optional + Number of leakage bins to look at when finding harmonics. Default is 20. + fscale : float, optional + Value to scale frequencies by in Hz. Default is 1MHz. + + Returns + ------- + dict + Returns a dictionary of computed stats. + """ + harm_stats = {"harm": {}} + harm_index = 2 + for harm in bin_sig * np.arange(2, harms + 1): + harm_stats["harm"][harm_index] = {} + zone = np.floor(harm / (nfft / 2)) + 1 + if zone % 2 == 0: + bin_harm = int(nfft / 2 - (harm - (zone - 1) * nfft / 2)) + else: + bin_harm = int(harm - (zone - 1) * nfft / 2) + + # Make sure we pick the max bin where power is maximized; due to spectral leakage + # if bin_harm == nfft/2, set to bin of 0 + if bin_harm == nfft / 2: + bin_harm = 0 + pwr_max = spectrum[bin_harm] + bin_harm_max = bin_harm + for i in range(bin_harm - leak, bin_harm + leak + 1): + try: + pwr = spectrum[i] + if pwr > pwr_max: + bin_harm_max = i + pwr_max = pwr + except IndexError: + # bin + leakage out of bounds, so stop looking + break + + harm_stats["harm"][harm_index]["bin"] = bin_harm_max + harm_stats["harm"][harm_index]["power"] = pwr_max + harm_stats["harm"][harm_index]["freq"] = round(freq[bin_harm] / fscale, 1) + harm_stats["harm"][harm_index]["dBc"] = dBW(pwr_max / psig) + harm_stats["harm"][harm_index]["dB"] = dBW(pwr_max) + + harm_index = harm_index + 1 + + return harm_stats + + +def get_plot_string(stats, full_scale, fs, nfft, window, xscale=1e6, fscale="MHz"): + """Generate plot string from stats dict.""" + + plt_str = "==== FFT ====\n" + plt_str += f"NFFT = {nfft}\n" + plt_str += f"fbin = {round(fs/nfft / 1e3, 2)} kHz\n" + plt_str += f"window = {window}\n" + plt_str += "\n" + plt_str += "==== Signal ====\n" + plt_str += f"FullScale = {full_scale} dB\n" + plt_str += f"Psig = {stats['sig']['dBFS']} dBFS ({stats['sig']['dB']} dB)\n" + plt_str += f"fsig = {round(stats['sig']['freq']/xscale, 2)} {fscale}\n" + plt_str += f"fsamp = {round(fs/xscale, 2)} {fscale}\n" + plt_str += "\n" + plt_str += "==== SNDR/SFDR ====\n" + plt_str += f"ENOB = {stats['enob']['bits']} bits\n" + plt_str += f"SNDR = {stats['sndr']['dBFS']} dBFS ({stats['sndr']['dBc']} dBc)\n" + plt_str += f"SFDR = {stats['sfdr']['dBFS']} dBFS ({stats['sfdr']['dBc']} dBc)\n" + plt_str += f"Pspur = {stats['spur']['dBFS']} dBFS\n" + plt_str += f"fspur = {round(stats['spur']['freq']/xscale, 2)} {fscale}\n" + plt_str += f"Noise Floor = {stats['noise']['dBHz']} dBFS\n" + plt_str += f"NSD = {stats['noise']['NSD']} dBFS\n" + plt_str += "\n" + plt_str += "==== Harmonics ====\n" + + for hindex, hdata in stats["harm"].items(): + plt_str += f"HD{hindex} = {round(hdata['dB'] - full_scale, 1)} dBFS @ {hdata['freq']} {fscale}\n" + + plt_str += "\n" + + return plt_str diff --git a/adc_eval/eval/simulate.py b/adc_eval/eval/simulate.py new file mode 100644 index 0000000..a8b3b2f --- /dev/null +++ b/adc_eval/eval/simulate.py @@ -0,0 +1,70 @@ +"""Generic simulator class for adc evaluation.""" + +import numpy as np +from tqdm import tqdm + + +class Simulator: + """ + Class for handling simulation functions. + + Parameters + ---------- + adc_obj : ADC.__class__ + An ADC object from the adc_eval.eval.adc class list. + xarray : ndarray + Input signal array to simulate the adc_obj with. + + + Attributes + ---------- + out : ndarray of ADC output values. + adc : Reference to the input adc_obj. + vin : xarray with global signal errors included as set by adj_obj. + + Methods + ------- + run + + """ + + def __init__(self, adc_obj, xarray): + """Initialize the simulator class.""" + self.dval = [] + self.adc = adc_obj + self.vin = self.calc_error(xarray) + + @property + def out(self): + """Return output value array.""" + return np.array(self.dval) + + def calc_error(self, vin): + """Using the adc_obj, calculates global signal error before simulation.""" + vinx = vin + + # First calculate gain error + vinx *= (1 + self.adc.err["gain"]) * self.adc.err["dist"][0] + + # Next calculate the harmonic distortion + for index, gain in enumerate(self.adc.err["dist"]): + if index > 0: + vinx += gain * vin ** (index + 1) + + # Now add the offset + vinx += self.adc.err["offset"] + + # Now add random noise + vinx += self.adc.err["noise"] * np.random.randn(vin.size) + + return vinx + + def run(self): + with tqdm( + range(len(self.vin)), "RUNNING", unit=" samples", position=0, leave=True + ) as pbar: + for xval in self.vin: + self.adc.vin = xval + self.adc.run_step() + self.dval.append(self.adc.dout) + pbar.update() diff --git a/adc_eval/eval/spectrum.py b/adc_eval/eval/spectrum.py new file mode 100644 index 0000000..657a215 --- /dev/null +++ b/adc_eval/eval/spectrum.py @@ -0,0 +1,363 @@ +"""Spectral analysis module.""" + +import numpy as np +import matplotlib.pyplot as plt +from adc_eval.eval import calc + + +def calc_psd(data, fs=1, nfft=2**12): + """ + Calculate the PSD using the Bartlett method. + + Parameters + ---------- + data : ndarray + Time-series input data. + fs : float, optional + Sample frequency of the input time series data in Hz. Default is 1Hz. + nfft : int, optional + Number of FFT samples to use for PSD calculation. Default is 2^12. + + Returns + ------- + list + [freq_ss, psd_ss, freq_ds, psd_ds] + List containing single and double-sided PSDs along with frequncy array. + """ + nwindows = max(1, int(np.floor(len(data) / nfft))) + nfft = int(nfft) + xs = data[0 : int(nwindows * nfft)] + xt = xs.reshape(nwindows, nfft).T + XF = abs(np.fft.fft(xt, nfft, axis=0) / nfft) ** 2 + psd = np.mean(XF, axis=1) / (fs / nfft) # average the ffts and divide by bin width + psd += np.finfo(float).eps # Prevents zeros in the PSD + freq = np.fft.fftshift(np.fft.fftfreq(nfft, d=1 / fs)) + + # For single sided we double all the bins, then we halve the DC bin + psd_ss = 2 * psd[0 : int(nfft / 2)] + psd_ss[0] /= 2 + freq_ss = freq[int(nfft / 2) :] + + # Need to rotate DS PSD so 0Hz is in middle of graph + freq_ds = freq + psd_ds = np.concatenate([psd[int(nfft / 2) :], psd[0 : int(nfft / 2)]]) + + return [freq_ss, psd_ss, freq_ds, psd_ds] + + +def get_spectrum(data, fs=1, nfft=2**12, single_sided=True): + """ + Get the power spectrum for an input signal. + + Parameters + ---------- + data : ndarray + Time-series input data. + fs : float, optional + Sample frequency of the input time series data in Hz. Default is 1Hz. + nfft : int, optional + Number of FFT samples to use for PSD calculation. Default is 2^12. + single_sided : bool, optional + Set to `True` for single-sided spectrum or `False` for double-sided. + Default is `True`. + + Returns + ------- + tuple + (freq, psd) + Tuple containing frequency array and PSD of input data. + """ + (freq_ss, psd_ss, freq_ds, psd_ds) = calc_psd(np.array(data), fs=fs, nfft=nfft) + if single_sided: + return (freq_ss, psd_ss * fs / nfft) + return (freq_ds, psd_ds * fs / nfft) + + +def window_data(data, window="rectangular"): + """ + Applies a window to the time-domain data. + + Parameters + ---------- + data : ndarray + Time-series input data. + window : str, optional + Window to use for input data. Default is rectangular. + + Returns + ------- + ndarray + Windowed version of input data. + """ + try: + wsize = data.size + except AttributeError: + data = np.array(data) + wsize = data.size + + windows = { + "rectangular": (np.ones(wsize), 1.0), + "hanning": (np.hanning(wsize), 1.633), + } + + if window not in windows: + print(f"WARNING: {window} not implemented. Defaulting to 'rectangular'.") + window = "rectangular" + + wscale = windows[window][1] + + return data * windows[window][0] * wscale + + +def plot_spectrum( + data, + fs=1, + nfft=2**12, + dr=1, + harmonics=7, + leak=1, + window="rectangular", + no_plot=False, + yaxis="power", + single_sided=True, + fscale=("MHz", 1e6), +): + """ + Plot Power Spectrum for input signal. + + Parameters + ---------- + data : ndarray + Time-series input data. + fs : float, optional + Sample frequency of the input time series data in Hz. Default is 1Hz. + nfft : int, optional + Number of FFT samples to use for PSD calculation. Default is 2^12. + dr : float, optional + Dynamic range for input data to be referenced to. Default is 1. + harmonics : int, optional + Number of harmonics to calculate and annotate on plot. Default is 7. + leak : int, optional + Number of leakage bins to use in signal and harmonic calculation. Default is 1. + window : str, optional + Type of input window to use for input data. Default is rectangular. + no_plot : bool, optional + Selects whether to plot (`False`) or not (`True`). Default is `False`. + yaxis : str, optional + Selects y-axis reference units. Example: `power`, `fullscale`, etc. Default is `power`. + single_sided : bool, optional + Set to `True` for single-sided spectrum or `False` for double-sided. + Default is `True`. + fscale : tuple, optional + Selects x-axis scaling and units. Default is ('MHz', 1e6). + + Returns + ------- + tuple + (freq, psd, stats) + Tuple containing frequency array, PSD of input data, and calculated statstics dictionary. + """ + (freq, pwr) = get_spectrum(data, fs=fs, nfft=nfft, single_sided=single_sided) + + # Calculate the fullscale range of the spectrum in Watts + full_scale = calc.dBW(dr**2 / 8) + + # Determine what y-axis scaling to use + yaxis_lut = { + "power": [0, "dB"], + "fullscale": [full_scale, "dBFS"], + "normalize": [max(calc.dBW(pwr)), "dB Normalized"], + "magnitude": [0, "W"], + } + + lut_key = yaxis.lower() + scalar = yaxis_lut[lut_key][0] + yunits = yaxis_lut[lut_key][1] + xscale = fscale[1] + + # Convert to dBW and perform scalar based on y-axis scaling input + psd_out = calc.dBW(pwr, places=3) - scalar + + # Use Watts if magnitude y-axis scaling is desired + if lut_key in ["magnitude"]: + psd_out = pwr + + # Get single-sided spectrum for consistent SNDR and harmonic calculation behavior + f_ss = freq + psd_ss = pwr + if not single_sided: + # Get single-sided spectrum for SNDR and Harmonic stats + (f_ss, psd_ss) = get_spectrum(data, fs=fs, nfft=nfft, single_sided=True) + + sndr_stats = calc.sndr_sfdr( + psd_ss, f_ss, fs, nfft, leak=leak, full_scale=full_scale + ) + harm_stats = calc.find_harmonics( + psd_ss, + f_ss, + nfft, + sndr_stats["sig"]["bin"], + sndr_stats["sig"]["power"], + harms=harmonics, + leak=leak, + fscale=xscale, + ) + + # Merge the two stat dictionaries into one for convenient access + stats = {**sndr_stats, **harm_stats} + + # Change the x-axis minimum value based on single or dual-sided selection + xmin = 0 if single_sided else -fs / 2e6 + + # If plotting, prep plot and generate all required axis strings + if not no_plot: + plt_str = calc.get_plot_string( + stats, full_scale, fs, nfft, window, xscale, fscale[0] + ) + fig, ax = plt.subplots(figsize=(15, 8)) + ax.plot(freq / xscale, psd_out) + ax.set_ylabel(f"Power Spectrum ({yunits})", fontsize=18) + ax.set_xlabel(f"Frequency ({fscale[0]})", fontsize=16) + ax.set_title("Output Power Spectrum", fontsize=16) + ax.set_xlim([xmin, fs / (2 * xscale)]) + ax.set_ylim([1.1 * min(psd_out), 1]) + + ax.annotate( + plt_str, + xy=(1, 1), + xytext=(10, -80), + xycoords=("axes fraction", "figure fraction"), + textcoords="offset points", + size=11, + ha="left", + va="top", + ) + + # Get noise floor in dB/Hz (not dBFS/Hz) + noise_dB = stats["noise"]["dBHz"] + full_scale + + # Add points for harmonics and largest spur + if not single_sided: + scalar += 3 + for hindex in range(2, harmonics + 1): + if stats["harm"][hindex]["dB"] > (noise_dB + 3): + fharm = stats["harm"][hindex]["freq"] + aharm = stats["harm"][hindex]["dB"] - scalar + ax.plot( + fharm, + aharm, + marker="s", + mec="r", + ms=8, + fillstyle="none", + mew=3, + ) + ax.text( + fharm, + aharm + 3, + f"HD{hindex}", + ha="center", + weight="bold", + ) + if not single_sided: + ax.plot( + -fharm, + aharm, + marker="s", + mec="r", + ms=8, + fillstyle="none", + mew=3, + ) + ax.text( + -fharm, + aharm + 3, + f"HD{hindex}", + ha="center", + weight="bold", + ) + ax.tick_params(axis="both", which="major", labelsize=14) + ax.grid() + + return (freq, psd_out, stats) + + +def analyze( + data, + nfft, + fs=1, + dr=1, + harmonics=11, + leak=5, + window="rectangular", + no_plot=False, + yaxis="fullscale", + single_sided=True, + fscale="MHz", +): + """ + Perform spectral analysis on input waveform. + + Parameters + ---------- + data : ndarray + Time-series input data. + nfft : int + Number of FFT samples to use for PSD calculation. + fs : float, optional + Sample frequency of the input time series data in Hz. Default is 1Hz. + dr : float, optional + Dynamic range for input data to be referenced to. Default is 1. + harmonics : int, optional + Number of harmonics to calculate and annotate on plot. Default is 7. + leak : int, optional + Number of leakage bins to use in signal and harmonic calculation. Default is 1. + window : str, optional + Type of input window to use for input data. Default is rectangular. + no_plot : bool, optional + Selects whether to plot (`False`) or not (`True`). Default is `False`. + yaxis : str, optional + Selects y-axis reference units. Example: `power`, `fullscale`, etc. Default is `power`. + single_sided : bool, optional + Set to `True` for single-sided spectrum or `False` for double-sided. + Default is `True`. + fscale : str, optional + Selects x-axis units. Default is 'MHz'. + + Returns + ------- + tuple + (freq, psd, stats) + Tuple containing frequency array, PSD of input data, and calculated statstics dictionary. + """ + fscalar = { + "uHz": 1e-6, + "mHz": 1e-3, + "Hz": 1, + "kHz": 1e3, + "MHz": 1e6, + "GHz": 1e9, + "THz": 1e12, + } + if fscale not in fscalar: + print(f"WARNING: {fscale} not implemented. Defaulting to 'MHz'.") + fscale = "MHz" + + # Window the data + wdata = window_data(data, window=window) + + (freq, spectrum, stats) = plot_spectrum( + wdata, + fs=fs, + nfft=nfft, + dr=dr, + harmonics=harmonics, + leak=leak, + window=window, + no_plot=no_plot, + yaxis=yaxis, + single_sided=single_sided, + fscale=(fscale, fscalar[fscale]), + ) + + return (freq, spectrum, stats) diff --git a/adc_eval/filt.py b/adc_eval/filt.py new file mode 100644 index 0000000..5f6c383 --- /dev/null +++ b/adc_eval/filt.py @@ -0,0 +1,250 @@ +"""Implements filters and decimation.""" + +import numpy as np +from scipy.signal import remez, freqz +import matplotlib.pyplot as plt +from adc_eval import signals +from adc_eval.eval import spectrum +from adc_eval.eval import calc + + +class CICDecimate: + """ + Generic CIC Decimator Object. + + Parameters + ---------- + dec : int, default=2 + Output decimation factor. + order : int, default=1 + Filter order. + fs : int or float, default=1 + Sample rate for the filter in Hz. + + + Attributes + ---------- + gain : Gain normalization factor of CIC filter + out : Filtered and decimated output data + + + Methods + ------- + run + response + + """ + + def __init__(self, dec=2, order=1, fs=1): + """Initialize the CIC filter.""" + self._dec = dec + self._order = order + self.fs = fs + self.gain = dec**order + self._xout = None + self._xfilt = None + + @property + def dec(self): + """Returns the decimation factor.""" + return self._dec + + @dec.setter + def dec(self, value): + """Sets the decimation factor.""" + self._dec = value + self.gain = value**self._order + + @property + def order(self): + """Returns the order of the filter.""" + return self._order + + @order.setter + def order(self, value): + """Sets the filter order.""" + self._order = value + self.gain = self.dec**value + + @property + def out(self): + """Filtered and decimated output data.""" + return np.array(self._xout) + + def filt(self, xarray): + """CIC filtering routine.""" + yint = xarray + + # Integrate first + for _ in range(self.order): + yint = np.cumsum(yint) + + # Then comb, adding delays based on decimation factor + xcomb = yint + xcomb = np.insert(xcomb, 0, [0 for x in range(self.dec)]) + ycomb = xcomb + for _ in range(self.order): + ycomb = ycomb[self.dec :] - ycomb[0 : -self.dec] + xcomb = ycomb + xcomb = np.insert(xcomb, 0, [0 for x in range(self.dec)]) + ycomb = xcomb + + self._xfilt = ycomb / self.gain + + def decimate(self, xarray=None): + """decimation routine.""" + if xarray is None: + xarray = self._xfilt + self._xout = xarray[:: self.dec] + + def run(self, xarray): + """Runs filtering and decimation on input list.""" + self.filt(xarray) + self.decimate() + + def response(self, fft, no_plot=False): + """Plots the frequency response of the pre-decimated filter.""" + xin = signals.impulse(fft) + self.filt(xin) + (freq, psd, stats) = spectrum.analyze( + self._xfilt * fft / np.sqrt(2), + fft, + fs=self.fs, + dr=1, + harmonics=0, + leak=1, + window="rectangular", + no_plot=no_plot, + yaxis="power", + single_sided=True, + ) + if not no_plot: + ax = plt.gca() + n1 = 0 + n2 = int(fft / (2 * self.dec)) + x = freq[n1:n2] / 1e6 + y1 = psd[n1:n2] - max(psd) + y2 = -2000 * np.ones(np.size(x)) + ax.plot(x, y2, alpha=0) + ax.plot(x, y1, alpha=0) + ax.fill_between(x, y1, y2, color="green", alpha=0.1) + ax.set_xticks(np.linspace(0, self.fs / 2e6, 9)) + return (freq, psd) + + +class FIRLowPass: + """ + Generic FIR Low Pass Filter. + + Parameters + ---------- + dec : int, optional + Output decimation rate. The default is 1. + fs : int or float, optional + Sample rate for the filter in Hz. The default is 1Hz. + bit_depth : int, optional + Bit depth to store coefficients. The default is 16b. + coeffs : list, optional + List of coefficients if pre-determined. + + + Attributes + ---------- + out : Filtered and decimated output data. + ntaps : Number of filter taps. + + + Methods + ------- + run + response + + """ + + def __init__(self, dec=1, fs=1, bit_depth=16, coeffs=None): + """Initialize the FIR LowPass Class.""" + self.coeffs = coeffs + self.dec = dec + self.fs = fs + self.bit_depth = bit_depth + self.ntaps = np.size(coeffs) if coeffs is not None else 0 + self.yfilt = None + self._out = None + + @property + def out(self): + """Filtered and decimated output datat.""" + return np.array(self._out) + + def generate_taps(self, fbw, pbripple=1, stopatt=-60, deltaf=None): + """ + Generates FIR taps from key inputs. + + Parameters + ---------- + fbw : float + Bandwidth of the filter in Hz. + pbripple : float, optional + Acceptable passband ripple in percentage. The default is 1%. + stopatt : float, optional + Desired stop-band attenuation. The default is -60dB. + deltaf : float, optional + Desired transition band of the filter. The default is FS/100 if set to None. + + Returns + ------- + (ntaps, coeffs) : tuple + Minimum number of taps required, List of FIR tap coefficients. + + """ + if deltaf is None: + deltaf = self.fs / 100 + x1 = pbripple / 100 + x2 = 10 ** (stopatt / 20) + x3 = np.log10(1 / (10 * x1 * x2)) + _ntaps = 2 / 3 * x3 * self.fs / deltaf + + if self.ntaps > 0 and _ntaps > self.ntaps: + print( + f"WARNING: Required NTAPs calculated as {int(_ntaps)} but only {self.ntaps} were provided." + ) + elif self.ntaps == 0: + self.ntaps = int(_ntaps) + + _coeffs = remez( + self.ntaps, [0, fbw, fbw + deltaf, self.fs / 2], [1, 0], fs=self.fs + ) + self.coeffs = np.int32(_coeffs * 2**self.bit_depth).tolist() + return (self.ntaps, self.coeffs) + + def filt(self, xarray): + """Performs FIR filtering on input xarray.""" + _coeffs = np.array(self.coeffs) / 2**self.bit_depth + self.yfilt = np.convolve(_coeffs, xarray, mode="same") + + def decimate(self, xarray=None): + """Performs decimation on the input data.""" + if xarray is None: + xarray = self.yfilt + self._out = xarray[:: self.dec] + + def run(self, xarray): + """Runs FIR filtering and decimation on input xarray data.""" + self.filt(xarray) + self.decimate() + + def response(self, fft, no_plot=False): + """Plots the frequency response of the pre-decimated filter.""" + freq, mag = freqz(self.coeffs, [1], worN=fft, fs=self.fs) + yfft = calc.dBW(np.abs(mag)) + if not no_plot: + fig, ax = plt.subplots(figsize=(15, 8)) + ax.plot(freq / 1e6, yfft) + ax.grid(True) + ax.set_ylabel("Filter Magnitude Response (dB)", fontsize=18) + ax.set_xlabel("Frequency (MHz)", fontsize=16) + ax.set_title("FIR Low Pass Response", fontsize=16) + ax.set_xlim([0, self.fs / 2e6]) + ax.set_ylim([1.1 * min(yfft), 1]) + plt.show() + return (freq, yfft) diff --git a/adc_eval/signals.py b/adc_eval/signals.py index 0aeb71c..2d93d2c 100644 --- a/adc_eval/signals.py +++ b/adc_eval/signals.py @@ -3,11 +3,133 @@ import numpy as np -def sin(t, peak=1.5, offset=1.65, freq=1e3, ph0=0): - """Generate a sine wave.""" - return offset + peak * np.sin(ph0 + 2 * np.pi * freq * t) +def time(nlen, fs=1): + """ + Create time array based on signal length and sample rate. + Paraneters + ---------- + nlen : int + Desired length of time array. + fs : float, optional + Sample frequency of data in Hz. Default is 1 Hz. -def noise(t, mean=0, std=0.1): - """Generate random noise.""" - return np.random.normal(mean, std, size=len(t)) + Returns + ------- + ndarray + Time list stored in ndarray type. + + """ + return 1 / fs * np.linspace(0, nlen - 1, nlen) + + +def sin(t, amp=0.5, offset=0.5, freq=1e3, ph0=0): + """ + Generate a sine wave. + + Parameters + ---------- + t : ndarray + Time array list for sine wave. + amp : float, optional + Amplitude of desired sine wave. Default is 0.5. + offset : float, optional + DC offset of the desired sine wave. Default is 0.5. + freq : float, optional + Desired frequency of the sine wave in Hz. Default is 1kHz. + ph0 : float, optional + Desired phase shift of the sine wave in radians. Default is 0. + + Returns + ------- + ndarray + Sine wave stored in ndarray type. + + """ + return offset + amp * np.sin(ph0 + 2 * np.pi * freq * t) + + +def noise(nlen, mean=0, std=0.1): + """ + Generate random noise. + + Parameters + ---------- + nlen : int + Desired length of noise array. + mean : float, optional + Desired average of noise array. Default is 0. + std : float, optional + Desired standard deviation of noise array. Default is 0.1. + + Returns + ------- + ndarray + Gaussian distributed noise array. + """ + return np.random.normal(mean, std, size=nlen) + + +def impulse(nlen, mag=1): + """ + Generate an impulse input. + + Parameters + ---------- + nlen : int + Desired length of noise array. + mag : float, optional + Desired magnitude of impulse. Default is 1. + + Returns + ------- + ndarray + Impulse waveform in ndarray type. + """ + data = np.zeros(nlen) + data[0] = mag + return data + + +def tones(nlen, bins, amps, offset=0, fs=1, nfft=None, phases=None): + """ + Generate a time-series of multiple tones. + + Parameters + ---------- + nlen : int + Length of time-series array. + bins : list + List of signal bins to generate tones for. + amps : list + List of amplitudes for given bins. + offset : int, optional + Offset to apply to each signal (globally applied). + fs : float, optional + Sample rate of the signal in Hz. The default is 1Hz. + nfft : int, optional + Number of FFT samples, if different than length of signal. The default is None. + phases : list, optional + List of phase shifts for each bin. The default is None. + + Returns + ------- + tuple of ndarray + (time, signal) + Time-series and associated tone array. + """ + t = time(nlen, fs=fs) + + signal = np.zeros(nlen) + if phases is None: + phases = np.zeros(nlen) + if nfft is None: + nfft = nlen + + fbin = fs / nfft + for index, nbin in enumerate(bins): + signal += sin( + t, amp=amps[index], offset=offset, freq=nbin * fbin, ph0=phases[index] + ) + + return (t, signal) diff --git a/adc_eval/spectrum.py b/adc_eval/spectrum.py deleted file mode 100644 index 3c4b35e..0000000 --- a/adc_eval/spectrum.py +++ /dev/null @@ -1,301 +0,0 @@ -"""Spectral analysis module.""" - -import numpy as np -import matplotlib.pyplot as plt - - -def db_to_pow(value, places=3): - """Convert dBW to W.""" - if isinstance(value, np.ndarray): - return 10 ** (0.1 * value) - return round(10 ** (0.1 * value), places) - - -def dBW(value, places=1): - """Convert to dBW.""" - if isinstance(value, np.ndarray): - return 10 * np.log10(value) - return round(10 * np.log10(value), places) - - -def enob(sndr, places=1): - """Return ENOB for given SNDR.""" - return round((sndr - 1.76) / 6.02, places) - - -def sndr_sfdr(spectrum, freq, fs, nfft, leak, full_scale=0): - """Get SNDR and SFDR.""" - # Zero the DC bin - spectrum[0] = 0 - bin_sig = np.argmax(spectrum) - psig = sum(spectrum[i] for i in range(bin_sig - leak, bin_sig + leak + 1)) - spectrum_n = spectrum - spectrum_n[bin_sig] = 0 - fbin = fs / nfft - - for i in range(bin_sig - leak, bin_sig + leak + 1): - spectrum_n[i] = 0 - - bin_spur = np.argmax(spectrum_n) - pspur = spectrum[bin_spur] - - noise_power = sum(spectrum_n) - noise_floor = 2 * noise_power / nfft - - stats = {} - - stats["sig"] = { - "freq": freq[bin_sig], - "bin": bin_sig, - "power": psig, - "dB": dBW(psig), - "dBFS": round(dBW(psig) - full_scale, 1), - } - - stats["spur"] = { - "freq": freq[bin_spur], - "bin": bin_spur, - "power": pspur, - "dB": dBW(pspur), - "dBFS": round(dBW(pspur) - full_scale, 1), - } - stats["noise"] = { - "floor": noise_floor, - "power": noise_power, - "rms": np.sqrt(noise_power), - "dBHz": round(dBW(noise_floor, 3) - full_scale, 1), - "NSD": round(dBW(noise_floor, 3) - full_scale - 2 * dBW(fbin, 3), 1), - } - stats["sndr"] = { - "dBc": dBW(psig / noise_power), - "dBFS": round(full_scale - dBW(noise_power), 1), - } - stats["sfdr"] = { - "dBc": dBW(psig / pspur), - "dBFS": round(full_scale - dBW(pspur), 1), - } - stats["enob"] = {"bits": enob(stats["sndr"]["dBFS"])} - - return stats - - -def find_harmonics(spectrum, freq, nfft, bin_sig, psig, harms=5, leak=20): - """Get the harmonic contents of the data.""" - harm_stats = {"harm": {}} - harm_index = 2 - for harm in bin_sig * np.arange(2, harms + 1): - harm_stats["harm"][harm_index] = {} - zone = np.floor(harm / (nfft / 2)) + 1 - if zone % 2 == 0: - bin_harm = int(nfft / 2 - (harm - (zone - 1) * nfft / 2)) - else: - bin_harm = int(harm - (zone - 1) * nfft / 2) - - # Make sure we pick the max bin where power is maximized; due to spectral leakage - # if bin_harm == nfft/2, set to bin of 0 - if bin_harm == nfft / 2: - bin_harm = 0 - pwr_max = spectrum[bin_harm] - bin_harm_max = bin_harm - for i in range(bin_harm - leak, bin_harm + leak + 1): - try: - pwr = spectrum[i] - if pwr > pwr_max: - bin_harm_max = i - pwr_max = pwr - except IndexError: - # bin + leakage out of bounds, so stop looking - break - - harm_stats["harm"][harm_index]["bin"] = bin_harm_max - harm_stats["harm"][harm_index]["power"] = pwr_max - harm_stats["harm"][harm_index]["freq"] = round(freq[bin_harm] / 1e6, 1) - harm_stats["harm"][harm_index]["dBc"] = dBW(pwr_max / psig) - harm_stats["harm"][harm_index]["dB"] = dBW(pwr_max) - - harm_index = harm_index + 1 - - return harm_stats - - -def calc_psd(data, fs, nfft=2**12, single_sided=False): - """Calculate the PSD using the Bartlett method.""" - nwindows = int(np.floor(len(data) / nfft)) - nfft = int(nfft) - xs = data[0 : int(nwindows * nfft)] - xt = xs.reshape(nwindows, nfft).T - XF = abs(np.fft.fft(xt, nfft, axis=0) / nfft) ** 2 - psd = np.mean(XF, axis=1) / (fs / nfft) # average the ffts and divide by bin width - freq = fs * np.linspace(0, 1, nfft) - if single_sided: - # First we double all the bins, then we halve the DC bin - psd = 2 * psd[0 : int(nfft / 2)] - psd[0] /= 2 - freq = freq[0 : int(nfft / 2)] - return (freq, psd) - - -def get_spectrum(data, fs=1, nfft=2**12): - """Get the power spectrum for an input signal.""" - (freq, psd) = calc_psd(np.array(data), fs=fs, nfft=nfft, single_sided=True) - return (freq, psd * fs / nfft) - - -def plot_spectrum( - data, - fs=1, - nfft=2**12, - dr=1, - harmonics=7, - leak=1, - window="rectangular", - no_plot=False, - yaxis="power", -): - """Plot Power Spectrum for input signal.""" - wsize = data.size - windows = { - "rectangular": np.ones(wsize), - "hanning": np.hanning(wsize), - } - - if window not in windows: - print(f"WARNING: {window} not implemented. Defaulting to 'rectangular'.") - window = "rectangular" - - wscale = { - "rectangular": 1.0, - "hanning": 1.633, - }[window] - - (freq, pwr) = get_spectrum(data * windows[window] * wscale, fs=fs, nfft=nfft) - full_scale = dBW(dr**2 / 8) - - scalar = 0 - yunits = "dB" - if yaxis.lower() == "fullscale": - scalar = full_scale - yunits = "dBFS" - - pwr_dB = 10 * np.log10(pwr) - scalar - - sndr_stats = sndr_sfdr(pwr, freq, fs, nfft, leak=leak, full_scale=full_scale) - harm_stats = find_harmonics( - pwr, - freq, - nfft, - sndr_stats["sig"]["bin"], - sndr_stats["sig"]["power"], - harms=harmonics, - leak=leak, - ) - - stats = {**sndr_stats, **harm_stats} - - if not no_plot: - plt_str = get_plot_string(stats, full_scale, fs, nfft, window) - - fig, ax = plt.subplots(figsize=(15, 8)) - ax.plot(freq / 1e6, pwr_dB) - ax.set_ylabel(f"Power Spectrum ({yunits})", fontsize=18) - ax.set_xlabel("Frequency (MHz)", fontsize=16) - ax.set_title("Output Power Spectrum", fontsize=16) - ax.set_xlim([0, fs / 2e6]) - ax.set_ylim([1.1 * min(pwr_dB), 0]) - ax.annotate( - plt_str, - xy=(1, 1), - xytext=(10, -80), - xycoords=("axes fraction", "figure fraction"), - textcoords="offset points", - size=11, - ha="left", - va="top", - ) - - # Get noise floor in dB/Hz (not dBFS/Hz) - noise_dB = stats["noise"]["dBHz"] + full_scale - - # Add points for harmonics and largest spur - for hindex in range(2, harmonics + 1): - if stats["harm"][hindex]["dB"] > (noise_dB + 3): - ax.plot( - stats["harm"][hindex]["freq"], - stats["harm"][hindex]["dB"] - scalar, - marker="s", - mec="r", - ms=8, - fillstyle="none", - mew=3, - ) - ax.text( - stats["harm"][hindex]["freq"], - stats["harm"][hindex]["dB"] - scalar + 3, - f"HD{hindex}", - ha="center", - weight="bold", - ) - ax.tick_params(axis="both", which="major", labelsize=14) - ax.grid() - - return (pwr, stats) - - -def get_plot_string(stats, full_scale, fs, nfft, window): - """Generate plot string from stats dict.""" - - plt_str = "==== FFT ====\n" - plt_str += f"NFFT = {nfft}\n" - plt_str += f"fbin = {round(fs/nfft / 1e3, 2)} kHz\n" - plt_str += f"window = {window}\n" - plt_str += "\n" - plt_str += "==== Signal ====\n" - plt_str += f"FullScale = {full_scale} dB\n" - plt_str += f"Psig = {stats['sig']['dBFS']} dBFS ({stats['sig']['dB']} dB)\n" - plt_str += f"fsig = {round(stats['sig']['freq']/1e6, 2)} MHz\n" - plt_str += f"fsamp = {round(fs/1e6, 2)} MHz\n" - plt_str += "\n" - plt_str += "==== SNDR/SFDR ====\n" - plt_str += f"ENOB = {stats['enob']['bits']} bits\n" - plt_str += f"SNDR = {stats['sndr']['dBFS']} dBFS ({stats['sndr']['dBc']} dBc)\n" - plt_str += f"SFDR = {stats['sfdr']['dBFS']} dBFS ({stats['sfdr']['dBc']} dBc)\n" - plt_str += f"Pspur = {stats['spur']['dBFS']} dBFS\n" - plt_str += f"fspur = {round(stats['spur']['freq']/1e6, 2)} MHz\n" - plt_str += f"Noise Floor = {stats['noise']['dBHz']} dBFS\n" - plt_str += f"NSD = {stats['noise']['NSD']} dBFS\n" - plt_str += "\n" - plt_str += "==== Harmonics ====\n" - - for hindex, hdata in stats["harm"].items(): - plt_str += f"HD{hindex} = {round(hdata['dB'] - full_scale, 1)} dBFS @ {hdata['freq']} MHz\n" - - plt_str += "\n" - - return plt_str - - -def analyze( - data, - nfft, - fs=1, - dr=1, - harmonics=11, - leak=5, - window="rectangular", - no_plot=False, - yaxis="fullscale", -): - """Perform spectral analysis on input waveform.""" - (spectrum, stats) = plot_spectrum( - data, - fs=fs, - nfft=nfft, - dr=dr, - harmonics=harmonics, - leak=leak, - window=window, - no_plot=no_plot, - yaxis=yaxis, - ) - - return (spectrum, stats) diff --git a/examples/basic_adc_simulation.py b/examples/basic_adc_simulation.py new file mode 100644 index 0000000..a82abb6 --- /dev/null +++ b/examples/basic_adc_simulation.py @@ -0,0 +1,72 @@ +"""Runs a basic ADC simulation and plots the spectrum.""" +import numpy as np +import matplotlib.pyplot as plt +from scipy import signal +from adc_eval import signals +from adc_eval.adcs import basic +from adc_eval.eval import spectrum +from adc_eval.eval.simulate import Simulator + + +""" +Simulation Settings +""" +SEED = 42 +NBITS = 10 +FS = 200e6 +NLEN = 2**16 # Larger than NFFT to enable Bartlett method for PSD +NFFT = 2**12 +vref = 1 +fbin = NFFT / 4 - 31 +ftone = NFFT / 2 - 15 +vin_amp = 0.707 * vref / 2 + + +""" +VIN Generation +""" +(t, vin) = signals.tones(NLEN, [fbin, ftone], [vin_amp, vin_amp*0.2], offset=0, fs=FS, nfft=NFFT) + +""" +ADC Architecture Creation +""" +adc_dut = basic.ADC(nbits=NBITS, vref=vref, fs=FS, seed=SEED) + + +""" +Global ADC Error Settings +""" +adc_dut.noise = 0 # No internal noise generation +adc_dut.offset = (0, 0) # 10mV mean offset with no stdev +adc_dut.gain_err = (0, 0) # No internal gain error +adc_dut.distortion = [1, 0, 0.3] # HD3 only + + +""" +Run Simulation +""" +s = Simulator(adc_dut, vin) +s.run() + + +""" +Output Plotting +""" +(freq, ps, stats) = spectrum.analyze( + s.out, + NFFT, + fs=FS, + dr=2**NBITS, + harmonics=7, + leak=1, + window="rectangular", + no_plot=False, + yaxis="fullscale", + single_sided=True, + fscale="MHz" +) +ax = plt.gca() +ax.set_title("ADC Spectrum") +ax.set_ylim([-100, 0]) +ax.set_yticks(np.linspace(-100, 0, 11)) +ax.set_xticks(np.linspace(0, FS/2e6, 9)) diff --git a/pylintrc b/pylintrc index e937446..e981f97 100644 --- a/pylintrc +++ b/pylintrc @@ -9,6 +9,7 @@ disable= duplicate-code, implicit-str-concat, too-many-arguments, + too-many-positional-arguments, too-many-branches, too-many-instance-attributes, too-many-locals, diff --git a/pyproject.toml b/pyproject.toml index 8851284..bebfd06 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,10 +1,10 @@ [build-system] -requires = ["setuptools~=68.0", "wheel~=0.40.0"] +requires = ["setuptools>=68,<76", "wheel~=0.40,<0.46"] build-backend = "setuptools.build_meta" [project] name = "python-adc-eval" -version = "0.3.0" +version = "0.4.0rc1" license = {text = "MIT"} description = "ADC Evaluation Library" readme = "README.rst" @@ -21,7 +21,7 @@ classifiers = [ ] requires-python = ">=3.9.0" dependencies = [ - "matplotlib==3.9.0", + "matplotlib==3.9.2", ] [project.urls] @@ -73,6 +73,7 @@ ignore = [ "PLR0912", # Too many branches ({branches} > {max_branches}) "PLR0913", # Too many arguments to function call ({c_args} > {max_args}) "PLR0915", # Too many statements ({statements} > {max_statements}) + "PLR0917", # Too many positional arguments "PLR2004", # Magic value used in comparison, consider replacing {value} with a constant variable "PLW2901", # Outer {outer_kind} variable {name} overwritten by inner {inner_kind} target "T201", # Allow print statements diff --git a/requirements.txt b/requirements.txt index 9a9169b..7c20cd4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ -matplotlib==3.9.0 +matplotlib==3.9.2 +tqdm==4.67.1 +scipy==1.15.2 diff --git a/requirements_test.txt b/requirements_test.txt index 13761f5..5a32c43 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -1,11 +1,11 @@ -black==24.4.2 -coverage==7.5.4 -pylint==3.2.3 -pytest==8.2.2 -pytest-cov==5.0.0 +black==24.10.0 +coverage==7.6.8 +pylint==3.3.1 +pytest==8.3.3 +pytest-cov==6.0.0 pytest-sugar==1.0.0 pytest-timeout==2.3.1 -ruff==0.4.10 -tox==4.15.1 +ruff==0.8.0 +tox==4.23.2 restructuredtext-lint==1.4.0 pygments==2.18.0 diff --git a/tests/eval/__init__.py b/tests/eval/__init__.py new file mode 100644 index 0000000..c9ae6f7 --- /dev/null +++ b/tests/eval/__init__.py @@ -0,0 +1 @@ +"""Init file for eval tests.""" diff --git a/tests/eval/test_calc.py b/tests/eval/test_calc.py new file mode 100644 index 0000000..a0a627a --- /dev/null +++ b/tests/eval/test_calc.py @@ -0,0 +1,261 @@ +"""Test the eval.calc functions.""" + +import pytest +import numpy as np +from unittest import mock +from adc_eval.eval import calc +from adc_eval.eval import spectrum + + +RTOL = 0.05 +NLEN = 2**16 +AMPLITUDE = 0.5 / np.sqrt(2) +RAND_HARMS = 4 +RAND_NFFT = 3 +RAND_LEAK = 3 +TEST_SNDR = [ + np.random.uniform(low=0.1, high=100, size=np.random.randint(4, 31)) + for _ in range(10) +] +TEST_VALS = [np.random.uniform(low=0.1, high=50) for _ in range(3)] +PLACES = [i for i in range(6)] + + +def gen_spectrum(sig_bin, harmonics, nfft): + """Generate a wave with arbitrary harmonics.""" + t = np.linspace(0, NLEN - 1, NLEN) + vin = np.zeros(len(t)) + fin = sig_bin / nfft + for i in range(1, harmonics + 1): + vin += np.sqrt(2) * AMPLITUDE / i * np.sin(2 * np.pi * i * fin * t) + + return spectrum.get_spectrum(vin, fs=1, nfft=nfft) + + +@pytest.mark.parametrize("data", TEST_VALS) +@pytest.mark.parametrize("places", PLACES) +def test_db_to_pow_places(data, places): + """Test the db_to_pow conversion with multiple places.""" + exp_val = round(10 ** (data / 10), places) + assert exp_val == calc.db_to_pow(data, places=places) + + +@pytest.mark.parametrize("data", TEST_VALS) +@pytest.mark.parametrize("places", PLACES) +def test_db_to_pow_ndarray(data, places): + """Test db_to_pow with ndarray input.""" + data = np.array(data) + exp_val = np.array(round(10 ** (data / 10), places)) + assert exp_val == calc.db_to_pow(data, places=places) + + +@pytest.mark.parametrize("data", TEST_VALS) +@pytest.mark.parametrize("places", PLACES) +def test_dbW(data, places): + """Test the dbW conversion with normal inputs.""" + exp_val = round(10 * np.log10(data), places) + assert exp_val == calc.dBW(data, places=places) + + +@pytest.mark.parametrize("data", TEST_VALS) +@pytest.mark.parametrize("places", PLACES) +def test_dbW_ndarray(data, places): + """Test dbW with ndarray input.""" + data = np.array(data) + exp_val = np.array(round(10 * np.log10(data), places)) + assert exp_val == calc.dBW(data, places=places) + + +@pytest.mark.parametrize("data", TEST_VALS) +@pytest.mark.parametrize("places", PLACES) +def test_enob(data, places): + """Test enob with muliple places.""" + exp_val = round(1 / 6.02 * (data - 1.76), places) + assert exp_val == calc.enob(data, places=places) + + +@pytest.mark.parametrize("harms", np.random.randint(1, 21, RAND_HARMS)) +@pytest.mark.parametrize("nfft", 2 ** (np.random.randint(10, 16, RAND_NFFT))) +def test_find_harmonics(harms, nfft): + """Test the find harmonics method.""" + nbin = np.random.randint(1, int(nfft / (2 * harms)) - 1) + (freq, pwr) = gen_spectrum(nbin, harms, nfft) + + stats = calc.find_harmonics( + pwr, freq, nfft, nbin, AMPLITUDE, harms=harms, leak=0, fscale=1e-6 + ) + + for n in range(2, harms + 1): + msg_txt = f"nfft={nfft}, nbin={nbin}, harm={harms}, index={n}" + exp_bin = n * nbin + exp_power = (AMPLITUDE / n) ** 2 + exp_freq = freq[exp_bin] * 1e6 + assert stats["harm"][n]["bin"] == exp_bin, msg_txt + assert np.allclose(stats["harm"][n]["freq"], exp_freq, rtol=RTOL), msg_txt + assert np.allclose(stats["harm"][n]["power"], exp_power, rtol=RTOL), msg_txt + + +@pytest.mark.parametrize("harms", np.random.randint(1, 21, RAND_HARMS)) +@pytest.mark.parametrize("nfft", 2 ** (np.random.randint(12, 16, RAND_NFFT))) +@pytest.mark.parametrize("leak", np.random.randint(1, 10, RAND_LEAK)) +def test_find_harmonics_with_leakage(harms, nfft, leak): + """Test the find harmonics method with spectral leakage.""" + nbin = np.random.randint(2 * leak, int(nfft / (2 * harms)) - 1) + nbin = nbin + round( + np.random.uniform(-0.5, 0.5), 2 + ) # Ensures we're not coherently sampled + (freq, pwr) = gen_spectrum(nbin, harms, nfft) + + stats = calc.find_harmonics( + pwr, freq, nfft, nbin, AMPLITUDE, harms=harms, leak=leak + ) + + for n in range(2, harms + 1): + msg_txt = f"nfft={nfft}, nbin={nbin}, harm={harms}, leak={leak}, index={n}" + bin_low = n * nbin - leak + bin_high = n * nbin + leak + assert bin_low <= stats["harm"][n]["bin"] <= bin_high, msg_txt + + +@pytest.mark.parametrize("harms", np.random.randint(2, 21, RAND_HARMS)) +@pytest.mark.parametrize("nfft", 2 ** (np.random.randint(12, 16, RAND_NFFT))) +@pytest.mark.parametrize("leak", np.random.randint(1, 10, RAND_LEAK)) +def test_find_harmonics_with_leakage_outside_bounds(harms, nfft, leak): + """Test find harmonics with leakage bins exceeding array bounds.""" + nbin = nfft / 4 - 0.5 + (freq, pwr) = gen_spectrum(nbin, harms, nfft) + stats = calc.find_harmonics( + pwr, freq, nfft, nbin, AMPLITUDE, harms=harms, leak=leak + ) + # Only check second harmonic which is guaranteed to be at edge of FFT + msg_txt = f"nfft={nfft}, nbin={nbin}, harm={harms}, leak={leak}" + assert nfft / 2 - leak <= stats["harm"][2]["bin"] <= nfft / 2 - 1 + + +@pytest.mark.parametrize("harms", np.random.randint(6, 21, RAND_HARMS)) +@pytest.mark.parametrize("nfft", 2 ** (np.random.randint(8, 16, RAND_NFFT))) +def test_find_harmonics_on_fft_bound(harms, nfft): + """Test find harmonics with harmonics landing at nfft/2.""" + nbin = nfft / 8 + (freq, pwr) = gen_spectrum(nbin, harms, nfft) + + stats = calc.find_harmonics(pwr, freq, nfft, nbin, AMPLITUDE, harms=harms, leak=0) + + exp_bin = { + 2: 2 * nbin, + 3: 3 * nbin, + 4: 0, + 5: nfft - 5 * nbin, + } + + for n, exp_val in exp_bin.items(): + msg_txt = f"nfft={nfft}, nbin={nbin}, harm={harms}, index={n}" + assert stats["harm"][n]["bin"] == exp_val, msg_txt + + +@pytest.mark.parametrize("data", TEST_SNDR) +def test_sndr_sfdr_outputs(data): + """Test the sndr_sfdr method outputs.""" + freq = np.linspace(0, 1000, np.size(data)) + full_scale = -3 + nfft = 2**8 + fs = 1 + + psd_test = data.copy() + psd_exp = data.copy() + + result = calc.sndr_sfdr(psd_test, freq, fs, nfft, 0, full_scale=full_scale) + + data[0] = 0 + psd_exp[0] = 0 + data_string = f"F = {freq}\nD = {data}" + + indices = np.argsort(psd_exp) + sbin = indices[-1] + spurbin = indices[-2] + sfreq = freq[sbin] + spwr = psd_exp[sbin] + + psd_exp[sbin] = 0 + spurfreq = freq[spurbin] + spurpwr = psd_exp[spurbin] + + noise_pwr = np.sum(psd_exp[1:]) + + exp_return = { + "sig": { + "freq": sfreq, + "bin": sbin, + "power": spwr, + "dB": round(10 * np.log10(spwr), 1), + "dBFS": round(10 * np.log10(spwr) - full_scale, 1), + }, + "spur": { + "freq": spurfreq, + "bin": spurbin, + "power": spurpwr, + "dB": 10 * np.log10(spurpwr), + "dBFS": round(10 * np.log10(spurpwr) - full_scale, 1), + }, + "noise": { + "floor": 2 * noise_pwr / nfft, + "power": noise_pwr, + "rms": np.sqrt(noise_pwr), + "dBHz": round(10 * np.log10(2 * noise_pwr / nfft) - full_scale, 1), + "NSD": round( + 10 * np.log10(2 * noise_pwr / nfft) + - full_scale + - 2 * 10 * np.log10(fs / nfft), + 1, + ), + }, + "sndr": { + "dBc": round(10 * np.log10(spwr / noise_pwr), 1), + "dBFS": round(full_scale - 10 * np.log10(noise_pwr), 1), + }, + "sfdr": { + "dBc": round(10 * np.log10(spwr / spurpwr), 1), + "dBFS": round(full_scale - 10 * np.log10(spurpwr), 1), + }, + "enob": { + "bits": round((full_scale - 10 * np.log10(noise_pwr) - 1.76) / 6.02, 1), + }, + } + + for key, val in exp_return.items(): + for measure, measure_val in val.items(): + msg = f"{data_string}\n{key} -> {measure} | Expected {measure_val} | Got {result[key][measure]}" + assert np.allclose(measure_val, result[key][measure], rtol=RTOL), msg + + +@pytest.mark.parametrize("harms", np.random.randint(2, 21, RAND_HARMS)) +@pytest.mark.parametrize("nfft", 2 ** (np.random.randint(8, 16, RAND_NFFT))) +def test_plot_string(harms, nfft): + """Test proper return of plotting string.""" + nbin = np.random.randint(2, int(nfft / (2 * harms)) - 1) + (freq, pwr) = gen_spectrum(nbin, harms, nfft) + + stats = calc.sndr_sfdr(pwr, freq, 1, nfft, leak=0, full_scale=0) + hstats = calc.find_harmonics( + pwr, freq, nfft, nbin, AMPLITUDE, harms=harms, leak=0, fscale=1 + ) + + all_stats = {**stats, **hstats} + + plt_str = calc.get_plot_string( + all_stats, 0, 1, nfft, window="rectangular", xscale=1, fscale="Hz" + ) + + # Check for important information, not everything + msg_txt = f"{all_stats}\n{plt_str}" + + assert f"NFFT = {nfft}" in plt_str, msg_txt + assert f"ENOB = {all_stats['enob']['bits']} bits" in plt_str, msg_txt + assert f"SNDR = {all_stats['sndr']['dBFS']} dBFS" in plt_str, msg_txt + assert f"SFDR = {all_stats['sfdr']['dBFS']} dBFS" in plt_str, msg_txt + assert f"Noise Floor = {all_stats['noise']['dBHz']} dBFS" in plt_str, msg_txt + + for n in range(2, harms + 1): + harm_power = round(all_stats["harm"][n]["dB"], 1) + harm_freq = all_stats["harm"][n]["freq"] + assert f"HD{n} = {harm_power} dBFS @ {harm_freq} Hz" in plt_str, msg_txt diff --git a/tests/eval/test_spectrum.py b/tests/eval/test_spectrum.py new file mode 100644 index 0000000..4f11331 --- /dev/null +++ b/tests/eval/test_spectrum.py @@ -0,0 +1,334 @@ +"""Test the spectrum module.""" + +import pytest +import numpy as np +from unittest import mock +from adc_eval.eval import spectrum + + +RTOL = 0.05 +NLEN = 2**18 +NFFT = 2**10 +DATA_SINE = [ + { + "f1": np.random.randint(1, NFFT / 4 - 1), + "f2": np.random.randint(NFFT / 4, NFFT / 2 - 1), + "a1": np.random.uniform(low=0.5, high=0.8), + "a2": np.random.uniform(low=0.1, high=0.4), + } + for _ in range(10) +] + + +@mock.patch("adc_eval.eval.spectrum.calc_psd") +def test_get_spectrum(mock_calc_psd): + """Test that the get_spectrum method returns power spectrum.""" + fs = 4 + nfft = 3 + data = np.array([1]) + exp_spectrum = np.array([fs / nfft]) + + mock_calc_psd.return_value = (None, data, None, 2 * data) + + assert (None, exp_spectrum) == spectrum.get_spectrum( + None, fs=fs, nfft=nfft, single_sided=True + ) + + +@mock.patch("adc_eval.eval.spectrum.calc_psd") +def test_get_spectrum_dual(mock_calc_psd): + """Test that the get_spectrum method returns dual-sided power spectrum.""" + fs = 4 + nfft = 3 + data = np.array([1]) + exp_spectrum = np.array([fs / nfft]) + + mock_calc_psd.return_value = (None, data, None, 2 * data) + + assert (None, 2 * exp_spectrum) == spectrum.get_spectrum( + None, fs=fs, nfft=nfft, single_sided=False + ) + + +@pytest.mark.parametrize("data", [np.random.randn(NLEN) for _ in range(10)]) +def test_calc_psd_randomized_dual(data): + """Test calc_psd with random data.""" + (_, _, freq, psd) = spectrum.calc_psd(data, 1, nfft=NFFT) + mean_val = np.mean(psd) + assert np.isclose(mean_val, 1, rtol=RTOL) + + +@pytest.mark.parametrize("data", [np.random.randn(NLEN) for _ in range(10)]) +def test_calc_psd_randomized_single(data): + """Test calc_psd with random data and single-sided.""" + (freq, psd, _, _) = spectrum.calc_psd(data, 1, nfft=NFFT) + mean_val = np.mean(psd) + assert np.isclose(mean_val, 2, rtol=RTOL) + + +def test_calc_psd_zeros_dual(): + """Test calc_psd with zeros.""" + data = np.zeros(NLEN) + (_, _, freq, psd) = spectrum.calc_psd(data, 1, nfft=NFFT) + mean_val = np.mean(psd) + assert np.isclose(mean_val, 0, rtol=RTOL) + + +def test_calc_psd_zeros_single(): + """Test calc_psd with zeros and single-sided..""" + data = np.zeros(NLEN) + (freq, psd, _, _) = spectrum.calc_psd(data, 1, nfft=NFFT) + mean_val = np.mean(psd) + assert np.isclose(mean_val, 0, rtol=RTOL) + + +def test_calc_psd_ones_dual(): + """Test calc_psd with ones.""" + data = np.ones(NLEN) + (_, _, freq, psd) = spectrum.calc_psd(data, 1, nfft=NFFT) + mean_val = np.mean(psd) + assert np.isclose(mean_val, 1, rtol=RTOL) + + +def test_calc_psd_ones_single(): + """Test calc_psd with ones and single-sided.""" + data = np.ones(NLEN) + (freq, psd, _, _) = spectrum.calc_psd(data, 1, nfft=NFFT) + mean_val = np.mean(psd) + assert np.isclose(mean_val, 2, rtol=RTOL) + + +@pytest.mark.parametrize("data", DATA_SINE) +def test_calc_psd_two_sine_dual(data): + """Test calc_psd with two sine waves.""" + fs = 1 + fbin = fs / NFFT + f1 = data["f1"] * fbin + f2 = data["f2"] * fbin + a1 = data["a1"] + a2 = data["a2"] + + t = 1 / fs * np.linspace(0, NLEN - 1, NLEN) + pin = a1 * np.sin(2 * np.pi * f1 * t) + a2 * np.sin(2 * np.pi * f2 * t) + + (_, _, freq, psd) = spectrum.calc_psd(pin, fs, nfft=NFFT) + + exp_peaks = [ + round(a1**2 / 4 * NFFT, 3), + round(a2**2 / 4 * NFFT, 3), + ] + + exp_f1 = [round(-f1, 2), round(f1, 2)] + exp_f2 = [round(-f2, 2), round(f2, 2)] + + peak1 = max(psd) + ipeaks = np.where(psd >= peak1 * (1 - RTOL))[0] + fpeaks = [round(freq[ipeaks[0]], 2), round(freq[ipeaks[1]], 2)] + + assertmsg = f"f1={f1} | f2={f2} | a1={a1} | a2={a2}" + + assert np.allclose(peak1, exp_peaks[0], rtol=RTOL), assertmsg + assert np.allclose(fpeaks, exp_f1, rtol=RTOL), assertmsg + + psd[ipeaks[0]] = 0 + psd[ipeaks[1]] = 0 + + peak2 = max(psd) + ipeaks = np.where(psd >= peak2 * (1 - RTOL))[0] + fpeaks = [round(freq[ipeaks[0]], 2), round(freq[ipeaks[1]], 2)] + + assert np.allclose(peak2, exp_peaks[1], rtol=RTOL), assertmsg + assert np.allclose(fpeaks, exp_f2), assertmsg + + +@pytest.mark.parametrize("data", DATA_SINE) +def test_calc_psd_two_sine_single(data): + """Test calc_psd with two sine waves, single-eided.""" + fs = 1 + fbin = fs / NFFT + f1 = data["f1"] * fbin + f2 = data["f2"] * fbin + a1 = data["a1"] + a2 = data["a2"] + + t = 1 / fs * np.linspace(0, NLEN - 1, NLEN) + pin = a1 * np.sin(2 * np.pi * f1 * t) + a2 * np.sin(2 * np.pi * f2 * t) + + (freq, psd, _, _) = spectrum.calc_psd(pin, fs, nfft=NFFT) + + exp_peaks = [ + round(a1**2 / 2 * NFFT, 3), + round(a2**2 / 2 * NFFT, 3), + ] + exp_f1 = round(f1, 2) + exp_f2 = round(f2, 2) + + peak1 = max(psd) + ipeak = np.where(psd == peak1)[0][0] + fpeak = round(freq[ipeak], 2) + + assertmsg = f"f1={f1} | f2={f2} | a1={a1} | a2={a2}" + + assert np.allclose(peak1, exp_peaks[0], rtol=RTOL), assertmsg + assert np.allclose(fpeak, exp_f1), assertmsg + + psd[ipeak] = 0 + + peak2 = max(psd) + ipeak = np.where(psd == peak2)[0][0] + fpeak = round(freq[ipeak], 2) + + assert np.allclose(peak2, exp_peaks[1], rtol=RTOL), assertmsg + assert np.allclose(fpeak, exp_f2), assertmsg + + +def test_window_data_as_list(): + """Tests the window_data function when given a list instead of numpy array.""" + data = np.random.rand(NLEN).tolist() + wdata = spectrum.window_data(data, window="rectangular") + + assert type(data) == type(list()) + assert type(wdata) == type(np.ndarray([])) + + +def test_window_data_bad_window_type(capfd): + """Tests the window_data function with an incorrect window selection.""" + data = np.random.rand(NLEN) + wdata = spectrum.window_data(data, window="foobar") + captured = capfd.readouterr() + + assert data.size == wdata.size + assert data.all() == wdata.all() + assert "WARNING" in captured.out + + +@mock.patch("adc_eval.eval.spectrum.plot_spectrum") +def test_analyze_bad_input_scalar(mock_plot_spectrum, capfd): + """Tests bad input scalar keys.""" + mock_plot_spectrum.return_value = (None, None, None) + mock_plot_spectrum.side_effect = lambda *args, **kwargs: (kwargs, None, None) + (kwargs, _, _) = spectrum.analyze([0], 1, fscale="foobar") + captured = capfd.readouterr() + + assert "WARNING" in captured.out + assert kwargs.get("fscale") == ("MHz", 1e6) + + +@mock.patch("adc_eval.eval.spectrum.plot_spectrum") +def test_analyze_valid_input_scalar(mock_plot_spectrum): + """Tests the valid input scalar keys.""" + mock_plot_spectrum.return_value = (None, None, None) + mock_plot_spectrum.side_effect = lambda *args, **kwargs: (kwargs, None, None) + + test_vals = { + "Hz": 1, + "kHz": 1e3, + "MHz": 1e6, + "GHz": 1e9, + } + for key, val in test_vals.items(): + (kwargs, _, _) = spectrum.analyze([0], 1, fscale=key) + assert kwargs.get("fscale") == (key, val) + + +@mock.patch("adc_eval.eval.calc.sndr_sfdr") +@mock.patch("adc_eval.eval.calc.find_harmonics") +def test_analyze_no_plot(mock_sndr_sfdr, mock_find_harmonics): + """Tests the psd output of the analyze function with no plotting.""" + data = np.random.rand(NLEN) + data_sndr = { + "sig": {"bin": 1, "power": 2}, + } + data_harms = {"harmonics": 3} + exp_stats = {**data_sndr, **data_harms} + + mock_sndr_sfdr.return_value = data_sndr + mock_find_harmonics = data_harms + + (freq, psd, stats) = spectrum.analyze( + data, + fs=1, + nfft=NFFT, + dr=1, + harmonics=0, + leak=0, + window="rectangular", + no_plot=True, + yaxis="power", + single_sided=True, + fscale="Hz", + ) + + assert freq.all() == np.linspace(0, 1, int(NFFT / 2)).all() + assert psd.size == int(NFFT / 2) + + for key, value in stats.items(): + assert value == exp_stats[key] + + +@mock.patch("adc_eval.eval.calc.sndr_sfdr") +@mock.patch("adc_eval.eval.calc.find_harmonics") +def test_analyze_no_plot_dual(mock_sndr_sfdr, mock_find_harmonics): + """Tests the psd output of the analyze function with no plotting.""" + data = np.random.rand(NLEN) + data_sndr = { + "sig": {"bin": 1, "power": 2}, + } + data_harms = {"harmonics": 3} + exp_stats = {**data_sndr, **data_harms} + + mock_sndr_sfdr.return_value = data_sndr + mock_find_harmonics = data_harms + + (freq, psd, stats) = spectrum.analyze( + data, + fs=1, + nfft=NFFT, + dr=1, + harmonics=0, + leak=0, + window="rectangular", + no_plot=True, + yaxis="power", + single_sided=False, + fscale="Hz", + ) + + assert freq.all() == np.linspace(-0.5, 0.5, NFFT - 1).all() + assert psd.size == NFFT + for key, value in stats.items(): + assert value == exp_stats[key] + + +@mock.patch("adc_eval.eval.calc.sndr_sfdr") +@mock.patch("adc_eval.eval.calc.find_harmonics") +def test_analyze_no_plot_magnitude(mock_sndr_sfdr, mock_find_harmonics): + """Tests the psd output of the analyze function with no plotting.""" + data = np.random.rand(NLEN) + data_sndr = { + "sig": {"bin": 1, "power": 2}, + } + data_harms = {"harmonics": 3} + exp_stats = {**data_sndr, **data_harms} + + mock_sndr_sfdr.return_value = data_sndr + mock_find_harmonics = data_harms + + (freq, psd, stats) = spectrum.analyze( + data, + fs=1, + nfft=NFFT, + dr=1, + harmonics=0, + leak=0, + window="rectangular", + no_plot=True, + yaxis="magnitude", + single_sided=True, + fscale="Hz", + ) + + assert freq.all() == np.linspace(0, 1, int(NFFT / 2)).all() + assert psd.size == int(NFFT / 2) + for key, value in stats.items(): + assert value == exp_stats[key] diff --git a/tests/test_calc_psd.py b/tests/test_calc_psd.py deleted file mode 100644 index 8aae2e8..0000000 --- a/tests/test_calc_psd.py +++ /dev/null @@ -1,139 +0,0 @@ -"""Test the calc_psd method.""" - -import unittest -import numpy as np -from unittest import mock -from adc_eval import spectrum - - -class TestCalcPSD(unittest.TestCase): - """Test the calc_psd method.""" - - def setUp(self): - """Initialize tests.""" - self.nfft = 2**8 - self.nlen = 2**18 - accuracy = 0.01 - self.bounds = [1 - accuracy, 1 + accuracy] - np.random.seed(1) - - def test_calc_psd_randomized_dual(self): - """Test calc_psd with random data.""" - for i in range(0, 10): - data = np.random.randn(self.nlen) - (freq, psd) = spectrum.calc_psd(data, 1, nfft=self.nfft, single_sided=False) - mean_val = np.mean(psd) - self.assertTrue(self.bounds[0] <= mean_val <= self.bounds[1], msg=mean_val) - - def test_calc_psd_randomized_single(self): - """Test calc_psd with random data and single-sided.""" - for i in range(0, 10): - data = np.random.randn(self.nlen) - (freq, psd) = spectrum.calc_psd(data, 1, nfft=self.nfft, single_sided=True) - mean_val = np.mean(psd) - self.assertTrue( - 2 * self.bounds[0] <= mean_val <= 2 * self.bounds[1], msg=mean_val - ) - - def test_calc_psd_zeros_dual(self): - """Test calc_psd with zeros.""" - data = np.zeros(self.nlen) - (freq, psd) = spectrum.calc_psd(data, 1, nfft=self.nfft, single_sided=False) - mean_val = np.mean(psd) - self.assertTrue( - self.bounds[0] - 1 <= mean_val <= self.bounds[1] - 1, msg=mean_val - ) - - def test_calc_psd_zeros_single(self): - """Test calc_psd with zeros and single-sided..""" - data = np.zeros(self.nlen) - (freq, psd) = spectrum.calc_psd(data, 1, nfft=self.nfft, single_sided=True) - mean_val = np.mean(psd) - self.assertTrue( - self.bounds[0] - 1 <= mean_val <= self.bounds[1] - 1, msg=mean_val - ) - - def test_calc_psd_ones_dual(self): - """Test calc_psd with ones.""" - data = np.ones(self.nlen) - (freq, psd) = spectrum.calc_psd(data, 1, nfft=self.nfft, single_sided=False) - mean_val = np.mean(psd) - self.assertTrue(self.bounds[0] <= mean_val <= self.bounds[1], msg=mean_val) - - def test_calc_psd_ones_single(self): - """Test calc_psd with ones and single-sided.""" - data = np.ones(self.nlen) - (freq, psd) = spectrum.calc_psd(data, 1, nfft=self.nfft, single_sided=True) - mean_val = np.mean(psd) - self.assertTrue( - 2 * self.bounds[0] <= mean_val <= 2 * self.bounds[1], msg=mean_val - ) - - def test_calc_psd_two_sine_dual(self): - """Test calc_psd with two sine waves.""" - fs = 1 - fbin = fs / self.nfft - f1 = 29 * fbin - f2 = 97 * fbin - a1 = 0.37 - a2 = 0.11 - t = 1 / fs * np.linspace(0, self.nlen - 1, self.nlen) - data = a1 * np.sin(2 * np.pi * f1 * t) + a2 * np.sin(2 * np.pi * f2 * t) - (freq, psd) = spectrum.calc_psd(data, fs, nfft=self.nfft, single_sided=False) - exp_peaks = [ - round(a1**2 / 4 * self.nfft, 3), - round(a2**2 / 4 * self.nfft, 3), - ] - exp_f1 = [round(f1, 2), round(fs - f1, 2)] - exp_f2 = [round(f2, 2), round(fs - f2, 2)] - - peak1 = max(psd) - ipeaks = np.where(psd >= peak1 * self.bounds[0])[0] - fpeaks = [round(freq[ipeaks[0]], 2), round(freq[ipeaks[1]], 2)] - - self.assertEqual(round(peak1, 3), exp_peaks[0]) - self.assertListEqual(fpeaks, exp_f1) - - psd[ipeaks[0]] = 0 - psd[ipeaks[1]] = 0 - - peak2 = max(psd) - ipeaks = np.where(psd >= peak2 * self.bounds[0])[0] - fpeaks = [round(freq[ipeaks[0]], 2), round(freq[ipeaks[1]], 2)] - - self.assertEqual(round(peak2, 3), exp_peaks[1]) - self.assertListEqual(fpeaks, exp_f2) - - def test_calc_psd_two_sine_single(self): - """Test calc_psd with two sine waves, single-eided.""" - fs = 1 - fbin = fs / self.nfft - f1 = 29 * fbin - f2 = 97 * fbin - a1 = 0.37 - a2 = 0.11 - t = 1 / fs * np.linspace(0, self.nlen - 1, self.nlen) - data = a1 * np.sin(2 * np.pi * f1 * t) + a2 * np.sin(2 * np.pi * f2 * t) - (freq, psd) = spectrum.calc_psd(data, fs, nfft=self.nfft, single_sided=True) - exp_peaks = [ - round(a1**2 / 2 * self.nfft, 3), - round(a2**2 / 2 * self.nfft, 3), - ] - exp_f1 = round(f1, 2) - exp_f2 = round(f2, 2) - - peak1 = max(psd) - ipeak = np.where(psd == peak1)[0][0] - fpeak = round(freq[ipeak], 2) - - self.assertEqual(round(peak1, 3), exp_peaks[0]) - self.assertEqual(fpeak, exp_f1) - - psd[ipeak] = 0 - - peak2 = max(psd) - ipeak = np.where(psd == peak2)[0][0] - fpeak = round(freq[ipeak], 2) - - self.assertEqual(round(peak2, 3), exp_peaks[1]) - self.assertEqual(fpeak, exp_f2) diff --git a/tests/test_filt.py b/tests/test_filt.py new file mode 100644 index 0000000..cb4e83c --- /dev/null +++ b/tests/test_filt.py @@ -0,0 +1,177 @@ +"""Test the filter module.""" + +import pytest +import numpy as np +from unittest import mock +from adc_eval import filt +from adc_eval import signals + + +@pytest.mark.parametrize("dec", np.random.randint(1, 20, 4)) +def test_cic_decimate_set_dec_updates_gain(dec): + """Tests that changing decimation factor updates gain.""" + cicfilt = filt.CICDecimate(dec=1, order=2) + assert cicfilt.gain == 1 + + cicfilt.dec = dec + assert cicfilt.gain == (dec**2) + + +@pytest.mark.parametrize("order", np.random.randint(1, 20, 4)) +def test_cic_decimate_set_order_updates_gain(order): + """Tests that changing filter order updates gain.""" + cicfilt = filt.CICDecimate(dec=2, order=1) + assert cicfilt.gain == 2 + + cicfilt.order = order + assert cicfilt.gain == (2**order) + + +def test_cic_decimate_returns_ndarray(): + """Tests the CICDecimate output data conversion.""" + cicfilt = filt.CICDecimate() + data = np.random.randn(100).tolist() + cicfilt._xout = data + + assert type(cicfilt.out) == type(np.array(list())) + assert cicfilt.out.all() == np.array(data).all() + + +@pytest.mark.parametrize("dec", np.random.randint(1, 20, 4)) +def test_cic_decimate_function(dec): + """Tests the CICDecimate decimate function.""" + cicfilt = filt.CICDecimate(dec=dec) + data = np.random.randn(100) + cicfilt.decimate(data) + + exp_result = data[::dec] + + assert cicfilt.out.size == exp_result.size + assert cicfilt.out.all() == exp_result.all() + + +def test_cic_decimate_function_none_input(): + """Tests the CICDecimate decimate function with no input arg.""" + cicfilt = filt.CICDecimate(dec=1) + data = np.random.randn(100) + cicfilt._xfilt = data + cicfilt.decimate() + + exp_result = data + + assert cicfilt.out.size == exp_result.size + assert cicfilt.out.all() == exp_result.all() + + +@pytest.mark.parametrize("nlen", np.random.randint(8, 2**10, 4)) +def test_cic_decimate_all_ones(nlen): + """Test the CICDecimate filtering with all ones.""" + cicfilt = filt.CICDecimate(dec=1, order=1) + data = np.ones(nlen) + cicfilt.run(data) + + exp_data = data.copy() + exp_data[0] = 0 + + assert cicfilt.out.all() == exp_data.all() + + +@pytest.mark.parametrize("nlen", np.random.randint(8, 2**10, 4)) +def test_cic_decimate_all_zeros(nlen): + """Test the CICDecimate filtering with all zeros.""" + cicfilt = filt.CICDecimate(dec=1, order=1) + data = np.zeros(nlen) + cicfilt.run(data) + + exp_data = data.copy() + + assert cicfilt.out.all() == exp_data.all() + + +@pytest.mark.parametrize("nlen", np.random.randint(8, 2**10, 4)) +def test_cic_decimate_impulse(nlen): + """Test the CICDecimate filtering with impulse.""" + cicfilt = filt.CICDecimate(dec=1, order=1) + data = signals.impulse(nlen) + cicfilt.run(data) + + exp_data = np.concatenate([[0], data[0:-1]]) + + assert cicfilt.out.all() == exp_data.all() + + +def test_fir_lowpass_returns_ndarray(): + """Tests the FIRLowPass output data conversion.""" + fir = filt.FIRLowPass() + data = np.random.randn(100).tolist() + fir._out = data + + assert type(fir.out) == type(np.array(list())) + assert fir.out.all() == np.array(data).all() + + +@pytest.mark.parametrize("dec", np.random.randint(1, 20, 4)) +def test_fir_decimate_function(dec): + """Tests the FIRLowPass decimate function.""" + fir = filt.FIRLowPass(dec=dec) + data = np.random.randn(100) + fir.decimate(data) + + exp_result = data[::dec] + + assert fir.out.size == exp_result.size + assert fir.out.all() == exp_result.all() + + +def test_fir_decimate_function_none_input(): + """Tests the FIRLowPass decimate function with no input arg.""" + fir = filt.FIRLowPass(dec=1) + data = np.random.randn(100) + fir.yfilt = data + fir.decimate() + + exp_result = data + + assert fir.out.size == exp_result.size + assert fir.out.all() == exp_result.all() + + +@mock.patch("adc_eval.filt.remez") +def test_fir_lowpass_tap_generation(mock_remez, capfd): + """Tests the FIRLowPass decimate function.""" + fir = filt.FIRLowPass() + fir.ntaps = 3 + fir.bit_depth = 12 + mock_remez.return_value = np.ones(3) + + (taps, coeffs) = fir.generate_taps(0.1) + + captured = capfd.readouterr() + exp_coeffs = [2**12, 2**12, 2**12] + + assert "WARNING" in captured.out + assert taps == 3 + assert coeffs == exp_coeffs + + +@pytest.mark.parametrize("ntaps", np.random.randint(3, 511, 5)) +def test_fir_lowpass_run(ntaps): + """Tests the FIRLowPass run function.""" + fir = filt.FIRLowPass() + fir.ntaps = ntaps + fir.bit_depth = 10 + fir.coeffs = 2**10 * np.ones(ntaps) + data = signals.impulse(2**12) + fir.run(data) + + exp_sum = np.ceil((ntaps + 1) / 2) + out_sum = sum(fir.out) + + tap_val = int(exp_sum) + + assert fir.out.size == data.size + assert max(fir.out) == 1 + assert min(fir.out) == 0 + assert fir.out[0:tap_val].all() == 1 + assert fir.out[tap_val + 1 :].all() == 0 + assert out_sum == exp_sum diff --git a/tests/test_signals.py b/tests/test_signals.py new file mode 100644 index 0000000..9464dc1 --- /dev/null +++ b/tests/test_signals.py @@ -0,0 +1,113 @@ +"""Test the signals module.""" + +import pytest +import numpy as np +from scipy import stats +from adc_eval import signals + + +RTOL = 0.01 + +@pytest.mark.parametrize("nlen", np.random.randint(4, 2**16, 3)) +@pytest.mark.parametrize("fs", np.random.uniform(1, 1e9, 3)) +def test_time(nlen, fs): + """Test time with random data.""" + value = signals.time(nlen, fs=fs) + assert value.size == nlen + assert value[0] == 0 + assert np.isclose(value[nlen - 1], (nlen - 1) / fs, rtol=RTOL) + + +@pytest.mark.parametrize("nlen", np.random.randint(2**10, 2**16, 3)) +@pytest.mark.parametrize("offset", np.random.uniform(-10, 10, 3)) +@pytest.mark.parametrize("amp", np.random.uniform(0, 10, 3)) +def test_sin(nlen, offset, amp): + """Test sine generation with random data.""" + fs = np.random.uniform(1, 1e9) + fin = np.random.uniform(fs / 10, fs / 3) + + t = signals.time(nlen, fs=fs) + value = signals.sin(t, amp=amp, offset=offset, freq=fin) + + exp_peaks = [offset - amp, amp + offset] + + assert value.size == nlen + assert np.isclose(max(value), exp_peaks[1], rtol=RTOL) + assert np.isclose(min(value), exp_peaks[0], rtol=RTOL) + assert value[0] == offset + + +@pytest.mark.parametrize("nlen", np.random.randint(1, 2**16, 5)) +def test_noise_length(nlen): + """Test noise generation with random data.""" + value = signals.noise(nlen, mean=0, std=1) + + # Just check correct size + assert value.size == nlen + + +@pytest.mark.parametrize("std", np.random.uniform(0, 1, 4)) +def test_noise_length(std): + """Test noise is gaussian with random data.""" + nlen = 2**12 + noise = signals.noise(nlen, mean=0, std=std) + autocorr = np.correlate(noise, noise, mode="full") + autocorr /= max(autocorr) + asize = autocorr.size + + midlag = autocorr.size // 2 + acorr_nopeak = np.concatenate([autocorr[0 : midlag - 1], autocorr[midlag + 1 :]]) + + shapiro = stats.shapiro(acorr_nopeak) + + # Check that middle lag is 1 + assert autocorr[midlag] == 1 + + # Now check that noise is gaussian + assert shapiro.pvalue < 0.01 + + +@pytest.mark.parametrize("nlen", np.random.randint(2, 2**12, 3)) +@pytest.mark.parametrize("mag", np.random.uniform(0.1, 100, 3)) +def test_impulse(nlen, mag): + """Test impulse generation with random length and amplitude.""" + data = signals.impulse(nlen, mag) + + assert data.size == nlen + assert data[0] == mag + assert data[1:].all() == 0 + + +@pytest.mark.parametrize("nlen", np.random.randint(2, 2**12, 3)) +def test_tones_no_nfft_arg(nlen): + """Test tone generation with random length no nfft param.""" + (t, data) = signals.tones(nlen, [0.5], [0.5]) + + assert t.size == nlen + assert t[0] == 0 + assert t[-1] == nlen-1 + assert data.size == nlen + + +@pytest.mark.parametrize("fs", np.random.uniform(100, 1e9, 3)) +@pytest.mark.parametrize("nlen", np.random.randint(2, 2**12, 3)) +def test_tones_with_fs_arg(fs, nlen): + """Test tone generation with random length and fs given.""" + (t, data) = signals.tones(nlen, [0.5], [0.5], fs=fs) + + assert t.size == nlen + assert t[0] == 0 + assert np.isclose(t[-1], (nlen-1) / fs, rtol=RTOL) + assert data.size == nlen + + +@pytest.mark.parametrize("nlen", np.random.randint(2, 2**12, 3)) +def test_tones_with_empty_list( nlen): + """Test tone generation with random length and fs given.""" + (t, data) = signals.tones(nlen, [], []) + + assert t.size == nlen + assert t[0] == 0 + assert t[-1] == nlen-1 + assert data.size == nlen + assert data.all() == np.zeros(nlen).all() \ No newline at end of file diff --git a/tests/test_spectrum.py b/tests/test_spectrum.py deleted file mode 100644 index 625b314..0000000 --- a/tests/test_spectrum.py +++ /dev/null @@ -1,108 +0,0 @@ -"""Test the spectrum module.""" - -import unittest -import numpy as np -from unittest import mock -from adc_eval import spectrum - - -class TestSpectrum(unittest.TestCase): - """Test the spectrum module.""" - - def setUp(self): - """Initialize tests.""" - pass - - def test_db_to_pow_places(self): - """Test the db_to_pow conversion with multiple places.""" - test_val = 29.9460497 - exp_val = [988, 987.7, 987.65, 987.654, 987.6543] - - for i in range(0, len(exp_val)): - self.assertEqual(spectrum.db_to_pow(test_val, places=i), exp_val[i]) - - def test_db_to_pow_ndarray(self): - """Test db_to_pow with ndarray input.""" - test_val = np.array([30.0]) - self.assertEqual(spectrum.db_to_pow(test_val), np.array([1000.0])) - - def test_dbW(self): - """Test the dbW conversion with normal inputs.""" - test_val = 9.7197255 - exp_val = [10, 9.9, 9.88, 9.877, 9.8765] - - for i in range(0, len(exp_val)): - self.assertEqual(spectrum.dBW(test_val, places=i), exp_val[i]) - - def test_dbW_ndarray(self): - """Test dbW with ndarray input.""" - test_val = np.array([100.0]) - self.assertEqual(spectrum.dBW(test_val), np.array([20.0])) - - def test_enob(self): - """Test enob with muliple places.""" - test_val = 60.123456 - exp_val = [10, 9.7, 9.69, 9.695, 9.6949] - - for i in range(0, len(exp_val)): - self.assertEqual(spectrum.enob(test_val, places=i), exp_val[i]) - - @mock.patch("adc_eval.spectrum.calc_psd") - def test_get_spectrum(self, mock_calc_psd): - """Test that the get_spectrum method returns power spectrum.""" - fs = 4 - nfft = 3 - data = np.array([1]) - exp_spectrum = np.array([fs / nfft]) - - mock_calc_psd.return_value = (None, data) - - self.assertEqual( - spectrum.get_spectrum(None, fs=fs, nfft=nfft), (None, exp_spectrum) - ) - - def test_sndr_sfdr_outputs(self): - """Test the sndr_sfdr method outputs.""" - data = np.array([1, 2, 91, 7]) - freq = np.array([100, 200, 300, 400]) - full_scale = -3 - nfft = 2**8 - fs = 1 - exp_return = { - "sig": { - "freq": 300, - "bin": 2, - "power": 91, - "dB": 19.6, - "dBFS": round(19.590 - full_scale, 1), - }, - "spur": { - "freq": 400, - "bin": 3, - "power": 7, - "dB": 8.5, - "dBFS": round(8.451 - full_scale, 1), - }, - "noise": { - "floor": 18 / nfft, - "power": 9, - "rms": 3, - "dBHz": round(-11.529675 - full_scale, 1), - "NSD": round(36.6351 - full_scale, 1), - }, - "sndr": { - "dBc": 10.0, - "dBFS": round(full_scale - 9.542, 1), - }, - "sfdr": { - "dBc": 11.1, - "dBFS": round(full_scale - 8.451, 1), - }, - "enob": { - "bits": round((full_scale - 11.3024) / 6.02, 1), - }, - } - - result = spectrum.sndr_sfdr(data, freq, fs, nfft, 0, full_scale=full_scale) - for key, val in exp_return.items(): - self.assertDictEqual(result[key], val, msg=key) diff --git a/tests/test_spectrum_plotting.py b/tests/test_spectrum_plotting.py deleted file mode 100644 index 03a0d9f..0000000 --- a/tests/test_spectrum_plotting.py +++ /dev/null @@ -1,136 +0,0 @@ -"""Test the spectrum plotting functions.""" - -import unittest -import numpy as np -from unittest import mock -from adc_eval import spectrum - - -class TestSpectrumPlotting(unittest.TestCase): - """Test the spectrum module.""" - - def setUp(self): - """Initialize tests.""" - self.nlen = 2**16 - self.nfft = 2**12 - self.fs = 1 - self.bin = 13 - self.arms = 0.5 / np.sqrt(2) - self.fin = self.fs / self.nfft * self.bin - - def gen_spectrum(self, harmonics): - """Generate a wave with arbitrary harmonics.""" - t = 1 / self.fs * np.linspace(0, self.nlen - 1, self.nlen) - vin = np.zeros(len(t)) - for i in range(1, harmonics + 1): - vin += np.sqrt(2) * self.arms / i * np.sin(2 * np.pi * i * self.fin * t) - - return spectrum.get_spectrum(vin, fs=self.fs, nfft=self.nfft) - - def test_find_harmonics(self): - """Test the find harmonics method.""" - for i in range(2, 10): - (freq, pwr) = self.gen_spectrum(10) - - stats = spectrum.find_harmonics( - pwr, freq, self.nfft, self.bin, self.arms, harms=i, leak=0 - ) - - for x in range(2, i + 1): - msg_txt = f"harm={i}, index={x}" - self.assertEqual(stats["harm"][x]["bin"], x * self.bin, msg=msg_txt) - self.assertEqual( - round(stats["harm"][x]["power"], 4), - round((self.arms / x) ** 2, 4), - msg=msg_txt, - ) - self.assertEqual( - stats["harm"][x]["freq"], - round(freq[x * self.bin] / 1e6, 1), - msg=msg_txt, - ) - - def test_find_harmonics_with_leakage(self): - """Test the find harmonics method with spectral leakage.""" - self.bin = 13.5 - leakage_bins = 5 - for i in range(2, 10): - (freq, pwr) = self.gen_spectrum(10) - - stats = spectrum.find_harmonics( - pwr, freq, self.nfft, self.bin, self.arms, harms=i, leak=leakage_bins - ) - - for x in range(2, i + 1): - msg_txt = f"harm={i}, index={x}" - self.assertTrue( - x * self.bin - leakage_bins - <= stats["harm"][x]["bin"] - <= x * self.bin + leakage_bins, - msg=msg_txt, - ) - - def test_find_harmonics_with_leakage_outside_bounds(self): - """Test find harmonics with leakage bins exceeding array bounds.""" - self.bin = self.nfft / 4 - 0.5 - (freq, pwr) = self.gen_spectrum(5) - leakage_bins = 2 - stats = spectrum.find_harmonics( - pwr, freq, self.nfft, self.bin, self.arms, harms=2, leak=leakage_bins - ) - self.assertTrue(self.nfft / 2 - 3 <= stats["harm"][2]["bin"], self.nfft / 2 - 1) - - def test_find_harmonics_on_fft_bound(self): - """Test find harmonics with harmonics landing at nfft/2.""" - self.nfft = 2**12 - self.bin = self.nfft / 8 - (freq, pwr) = self.gen_spectrum(10) - leakage_bins = 0 - stats = spectrum.find_harmonics( - pwr, freq, self.nfft, self.bin, self.arms, harms=5, leak=leakage_bins - ) - self.assertEqual(stats["harm"][2]["bin"], 2 * self.bin) - self.assertEqual(stats["harm"][3]["bin"], 3 * self.bin) - self.assertEqual(stats["harm"][4]["bin"], 0) - self.assertEqual(stats["harm"][5]["bin"], self.nfft - 5 * self.bin) - - def test_plot_string(self): - """Test proper return of plotting string.""" - self.bin = 13 - (freq, pwr) = self.gen_spectrum(3) - stats = spectrum.sndr_sfdr(pwr, freq, 1, self.nfft, leak=0, full_scale=0) - harms = spectrum.find_harmonics( - pwr, freq, self.nfft, self.bin, self.arms, harms=3, leak=0 - ) - all_stats = {**stats, **harms} - - plt_str = spectrum.get_plot_string( - all_stats, 0, self.fs, self.nfft, window="rectangular" - ) - - # Check for important information, not everything - msg_txt = f"{all_stats}\n{plt_str}" - self.assertTrue(f"NFFT = {self.nfft}" in plt_str, msg=msg_txt) - self.assertTrue( - f"ENOB = {all_stats['enob']['bits']} bits" in plt_str, msg=msg_txt - ) - self.assertTrue( - f"SNDR = {all_stats['sndr']['dBFS']} dBFS" in plt_str, msg=msg_txt - ) - self.assertTrue( - f"SFDR = {all_stats['sfdr']['dBFS']} dBFS" in plt_str, msg=msg_txt - ) - self.assertTrue( - f"Noise Floor = {all_stats['noise']['dBHz']} dBFS" in plt_str, - msg=msg_txt, - ) - self.assertTrue( - f"HD2 = {round(all_stats['harm'][2]['dB'], 1)} dBFS @ {all_stats['harm'][2]['freq']} MHz" - in plt_str, - msg=msg_txt, - ) - self.assertTrue( - f"HD3 = {round(all_stats['harm'][3]['dB'], 1)} dBFS @ {all_stats['harm'][3]['freq']} MHz" - in plt_str, - msg=msg_txt, - ) diff --git a/tox.ini b/tox.ini index fc9bbe1..f77285b 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = lint,build,py39,py310,py311,py312 +envlist = lint,build,py310,py311,py312 skip_missing_interpreters = True skipsdist = True