Source code for lib.Calibration.Qdemodulation

import matplotlib.pyplot as plt
import numpy as np
import scipy.signal as scsig

import qubic.lib.Calibration.Qfiber as ft
from qubic.lib.Calibration.Qfiber import printnow


[docs] class interp_template: """ Allows to interpolate a template applying an amplitude, offset and phase This si to be used in order to fit this template in each epriod of some pseudo-periodic signal For instance for demodulation """ def __init__(self, xx, yy): self.period = xx[-1] - xx[0] self.xx = xx self.yy = yy def __call__(self, x, pars, extra_args=None): amp = pars[0] off = pars[1] dx = pars[2] return np.interp(x, self.xx - dx, self.yy, period=self.period) * amp + off
[docs] class Demodulation:
[docs] def read_cal_src_data(self, file_list, time_offset=7200): ttsrc_i = [] ddsrc_i = [] for ff in file_list: thett, thedd = np.loadtxt(ff).T ttsrc_i.append(thett + time_offset) ddsrc_i.append(thedd) t_src = np.concatenate(ttsrc_i) data_src = np.concatenate(ddsrc_i) return t_src, data_src
[docs] def bin_per_period(self, period, time, invec): period_index = ((time - time[0]) / period).astype(int) allperiods = np.unique(period_index) tper = np.zeros(len(allperiods)) nvec = np.shape(invec)[0] newvecs = np.zeros((nvec, len(allperiods))) for i in range(len(allperiods)): ok = period_index == allperiods[i] tper[i] = np.mean(time[ok]) newvecs[:, i] = np.mean(invec[:, ok], axis=1) return tper, newvecs
[docs] def hf_noise_estimate(self, tt, dd): sh = np.shape(dd) if len(sh) == 1: dd = np.reshape(dd, (1, len(dd))) ndet = 1 else: ndet = sh[0] estimate = np.zeros(ndet) for i in range(ndet): spectrum_f, freq_f = ft.power_spectrum(tt, dd[i, :], rebin=True) mean_level = np.mean(spectrum_f[np.abs(freq_f) > (np.max(freq_f) / 2)]) samplefreq = 1.0 / (tt[1] - tt[0]) estimate[i] = np.sqrt(mean_level * samplefreq / 2) return estimate
[docs] def return_rms_period(self, period, indata, others=None, verbose=False, remove_noise=False): if verbose: print("return_rms_period: indata length=", len(indata)) time = indata[0] data = indata[1] if verbose: printnow("Entering RMS/period") if data.ndim == 1: nTES = 1 else: sh = np.shape(data) nTES = sh[0] if verbose: print("return_rms_period: nTES=", nTES) # We label each data sample with a period period_index = ((time - time[0]) / period).astype(int) # We loop on periods to measure their respective amplitude and azimuth allperiods = np.unique(period_index) tper = np.zeros(len(allperiods)) ampdata = np.zeros((nTES, len(allperiods))) err_ampdata = np.zeros((nTES, len(allperiods))) if others is not None: newothers = plt.bin_per_period(period, time, others) if verbose: printnow( "Calculating RMS per period for {} periods and {} TES".format(len(allperiods), nTES) ) for i in range(len(allperiods)): ok = period_index == allperiods[i] tper[i] = np.mean(time[ok]) if nTES == 1: mm, ss = ft.meancut(data[ok], 3) ampdata[0, i] = ss err_ampdata[0, i] = 1 else: for j in range(nTES): mm, ss = ft.meancut(data[j, ok], 3) ampdata[j, i] = ss err_ampdata[j, i] = 1 if remove_noise: hf_noise = self.hf_noise_estimate(time, data) var_diff = np.zeros((nTES, len(tper))) for k in range(nTES): var_diff[k, :] = ampdata[k, :] ** 2 - hf_noise[k] ** 2 ampdata = np.sqrt(np.abs(var_diff)) * np.sign(var_diff) if others is None: return tper, ampdata, err_ampdata else: return tper, ampdata, err_ampdata, newothers
[docs] def fitperiod(self, x, y, fct): guess = np.array([np.std(y), np.mean(y), 0.0]) res = ft.do_minuit( x, y, y * 0 + 1, guess, functname=fct, verbose=False, nohesse=True, force_chi2_ndf=True ) return res
[docs] def return_fit_period(self, period, indata, others=None, verbose=False, template=None): time = indata[0] data = indata[1] if verbose: printnow("Entering Fit/period") if data.ndim == 1: nTES = 1 else: sh = np.shape(data) nTES = sh[0] if template is None: xxtemplate = np.linspace(0, period, 100) yytemplate = -np.sin(xxtemplate / period * 2 * np.pi) yytemplate /= np.std(yytemplate) else: xxtemplate = template[0] yytemplate = template[1] yytemplate /= np.std(yytemplate) fct = interp_template(xxtemplate, yytemplate) # class of interpolation period_index = ((time - time[0]) / period).astype(int) allperiods = np.unique(period_index) tper = np.zeros(len(allperiods)) ampdata = np.zeros((nTES, len(allperiods))) err_ampdata = np.zeros((nTES, len(allperiods))) if others is not None: newothers = self.bin_per_period(period, time, others) if verbose: printnow( "Performing fit per period for {} periods and {} TES".format(len(allperiods), nTES) ) for i in range(len(allperiods)): ok = period_index == allperiods[i] tper[i] = 0.5 * (np.min(time[ok]) + np.max(time[ok])) # np.mean(time[ok]) if nTES == 1: res = self.fitperiod(time[ok], data[ok], fct) ampdata[0, i] = res[1][0] err_ampdata[0, i] = res[2][0] else: for j in range(nTES): res = self.fitperiod(time[ok], data[j, ok], fct) ampdata[j, i] = res[1][0] err_ampdata[j, i] = res[2][0] if others is None: return tper, ampdata, err_ampdata else: return tper, ampdata, err_ampdata, newothers
[docs] def demodulate_JC( self, period, indata, indata_src, others=None, verbose=False, template=None, quadrature=False, remove_noise=False, ): time = indata[0] data = indata[1] sh = data.shape if len(sh) == 1: data = np.reshape(data, (1, sh[0])) time_src = indata_src[0] data_src = indata_src[1] # print(quadrature) if quadrature: # ## Shift src data by 1/2 period data_src_shift = np.interp(time_src - period / 2, time_src, data_src, period=period) # ## Now smooth over a period FREQ_SAMPLING = 1.0 / (time[1] - time[0]) size_period = int(FREQ_SAMPLING * period) + 1 filter_period = np.ones((size_period,)) / size_period demodulated = np.zeros_like(data) sh = np.shape(data) for i in range(sh[0]): if quadrature: demodulated[i, :] = scsig.fftconvolve( (np.sqrt((data[i, :] * data_src) ** 2 + (data[i, :] * data_src_shift) ** 2)) / np.sqrt(2), filter_period, mode="same", ) else: demodulated[i, :] = scsig.fftconvolve( data[i, :] * data_src, filter_period, mode="same" ) # Remove First and last periods nper = 4.0 nsamples = int(nper * period / (time[1] - time[0])) timereturn = time[nsamples:-nsamples] demodulated = demodulated[:, nsamples:-nsamples] if remove_noise: hf_noise = self.hf_noise_estimate(time, data) / np.sqrt(2) var_diff = np.zeros((sh[0], len(timereturn))) for k in range(sh[0]): var_diff[k, :] = demodulated[k, :] ** 2 - hf_noise[k] ** 2 demodulated = np.sqrt(np.abs(var_diff)) * np.sign(var_diff) if sh[0] == 1: demodulated = demodulated[0, :] return timereturn, demodulated, demodulated * 0 + 1
[docs] def demodulate_methods( self, data_in, fmod, fourier_cuts=None, verbose=False, src_data_in=None, method="demod", others=None, template=None, remove_noise=False, ): # Various demodulation methods # Others is a list of other vectors (with similar time sampling as the data to demodulate) # that we need to sample the same way as the data. # To filter the data before demodulation give the array fourier_cuts = [lowcut, highcut, notch]. # If both lowcut and highcut are given, a bandpass filter is applied. # If lowcut is given but highcut = None, a highpass filter at f_cut = lowcut is applied. # If highcut is given but lowcut = None, a lowpass filter at f_cut = highcut is applied. # If none of them is given, no cut frequency filter is applied. # In any case notch filter can still be used. If notch = None, notch filter is not applied. # if fourier_cuts is None: # Duplicate the input data data = data_in.copy() if src_data_in is None: src_data = None else: src_data = src_data_in.copy() else: # Filter data and source accordingly lowcut = fourier_cuts[0] highcut = fourier_cuts[1] notch = fourier_cuts[2] newtod = ft.filter_data( data_in[0], data_in[1], lowcut, highcut, notch=notch, rebin=True, verbose=verbose ) data = [data_in[0], newtod] if src_data_in is None: src_data = None else: new_src_tod = ft.filter_data( src_data_in[0], src_data_in[1], lowcut, highcut, notch=notch, rebin=True, verbose=verbose, ) src_data = [src_data_in[0], new_src_tod] # Now we have the "input" data, we can start demodulation period = 1.0 / fmod if method == "rms": # RMS method: calculate the RMS in each period (beware ! it returns noise+signal !) return self.return_rms_period( period, data, others=others, verbose=verbose, remove_noise=remove_noise ) elif method == "fit": return self.return_fit_period( period, data, others=others, verbose=verbose, template=template ) elif method == "demod": return self.demodulate_JC( period, data, src_data, others=others, verbose=verbose, template=None ) elif method == "demod_quad": return self.demodulate_JC( period, data, src_data, others=others, verbose=verbose, template=None, quadrature=True, remove_noise=remove_noise, ) elif method == "absolute_value": return np.abs(data)