Source code for lib.Qcoaddition

### General modules
import os

import healpy as hp
import matplotlib.pyplot as plt
import numpy as np
import scipy.signal
from scipy import interpolate
from scipy.interpolate import interp1d
from scipy.signal import medfilt

from qubic.lib.Calibration import Qfiber as ft
from qubic.lib.Calibration import Qselfcal as scal
from qubic.lib.Instrument.Qinstrument import QubicInstrument
from qubic.lib.QdataHandling import get_mode
from qubic.lib.Qdictionary import qubicDict
from qubic.lib.Qutilities import progress_bar

__all__ = ["MySplineFitting", "Pip1Tes", "FocalPlane", "PipAllTes"]


[docs] class MySplineFitting: def __init__(self, xin, yin, covarin, nbspl, logspace=False): # input parameters self.x = xin self.y = yin self.nbspl = nbspl if np.size(np.shape(covarin)) == 1: err = covarin self.covar = np.diag(err**2) self.invcovar = np.diag(1.0 / err**2) else: self.covar = covarin self.invcovar = np.linalg.inv(covarin) # Prepare splines xspl = np.linspace(np.min(self.x), np.max(self.x), nbspl) if logspace: xspl = np.logspace(np.log10(np.min(self.x)), np.log10(np.max(self.x)), nbspl) self.xspl = xspl F = np.zeros((np.size(xin), nbspl)) self.F = F for i in np.arange(nbspl): self.F[:, i] = self.get_spline_tofit(xspl, i, xin) # solution of the chi square ft_cinv_y = np.dot(np.transpose(F), np.dot(self.invcovar, self.y)) covout = np.linalg.inv(np.dot(np.transpose(F), np.dot(self.invcovar, F))) alpha = np.dot(covout, ft_cinv_y) fitted = np.dot(F, alpha) # output self.residuals = self.y - fitted self.chi2 = np.dot(np.transpose(self.residuals), np.dot(self.invcovar, self.residuals)) self.ndf = np.size(xin) - np.size(alpha) self.alpha = alpha self.covout = covout self.dalpha = np.sqrt(np.diagonal(covout)) def __call__(self, x): theF = np.zeros((np.size(x), self.nbspl)) for i in np.arange(self.nbspl): theF[:, i] = self.get_spline_tofit(self.xspl, i, x) return np.dot(theF, self.alpha)
[docs] def with_alpha(self, x, alpha): theF = np.zeros((np.size(x), self.nbspl)) for i in np.arange(self.nbspl): theF[:, i] = self.get_spline_tofit(self.xspl, i, x) return np.dot(theF, alpha)
[docs] def get_spline_tofit(self, xspline, index, xx): yspline = np.zeros(np.size(xspline)) yspline[index] = 1.0 tck = interpolate.splrep(xspline, yspline) yy = interpolate.splev(xx, tck, der=0) return yy
[docs] class Pip1Tes: """ Instance to treat TES one by one. """ def __init__(self, tt, tod, az, el, thk): self.tt = tt - tt[0] self.tod = tod - tod[0] self.az = az self.el = el self.thk = thk self.scantype_hk, self.azt, self.elt, self.scantype = self._scantype(0.035)
[docs] def identify_scans(self, inthk, az, el, tt=None, median_size=101, thr_speedmin=0.1, plotrange=[0, 1000]): """ This function identifies and assign numbers the various regions of a back-and-forth scanning using the housepkeeping time, az, el - a numbering for each back & forth scan - a region to remove at the end of each scan (bad data due to FLL reset, slowingg down of the moiunt, possibly HWP rotation - is the scan back or forth ? It optionnaly iinterpolate this information to the TOD sampling iif provided. Parameters ---------- input thk : np.array() time samples (seconds) for az and el at the housekeeeping sampling rate az : np.array() azimuth in degrees at the housekeeping sampling rate el : np.array() elevation in degrees at the housekeeping sampling rate tt : Optional : np.array() None buy default, if not None: time samples (seconds) at the TOD sampling rate Then. the output will also containe az,el and scantype interpolated at TOD sampling rate thr_speedmin : Optional : float Threshold for angular velocity to be considered as slow doplot : [Optional] : Boolean If True displays some useeful plot output : scantype_hk: np.array(int) type of scan for each sample at housekeeping sampling rate: * 0 = speed to slow - Bad data * n = scanning towards positive azimuth * -n = scanning towards negative azimuth where n is the scan number (starting at 1) azt : [optional] np.array() azimuth (degrees) at the TOD sampling rate elt : [optional] np.array() elevation (degrees) at the TOD sampling rate scantype : [optiona] np.array() same as scantype_hk, but interpolated at TOD sampling rate """ thk = inthk - inthk[0] ### Sampling for HK data timesample = np.median(thk[1:] - thk[:-1]) ### angular velocity medaz_dt = medfilt(np.gradient(az, timesample), median_size) ### Identify regions of change # Low velocity -> Bad c0 = np.abs(medaz_dt) < thr_speedmin # Positive velicity => Good cpos = (~c0) * (medaz_dt > 0) # Negative velocity => Good cneg = (~c0) * (medaz_dt < 0) ### Scan identification at HK sampling scantype_hk = np.zeros(len(thk), dtype="int") - 10 scantype_hk[c0] = 0 scantype_hk[cpos] = 1 scantype_hk[cneg] = -1 # check that we have them all count_them = np.sum(scantype_hk == 0) + np.sum(scantype_hk == -1) + np.sum(scantype_hk == 1) if count_them != len(scantype_hk): print("Identify_scans: Bad Scan counting at HK sampling level - Error: {} {}".format(len(scantype_hk), count_them)) raise RuntimeError("Bad scan counting at HK sampling level") ### Now give a number to each back and forth scan num = 0 previous = 0 for i in range(len(scantype_hk)): if scantype_hk[i] == 0: previous = 0 else: if (previous == 0) & (scantype_hk[i] > 0): # we have a change num += 1 previous = 1 scantype_hk[i] *= num if tt is not None: ### We propagate these at TOD sampling rate (this is an "step interpolation": we do not want intermediatee values") scantype = interp1d(thk, scantype_hk, kind="previous", fill_value="extrapolate")(tt) scantype = scantype.astype(int) count_them = np.sum(scantype == 0) + np.sum(scantype <= -1) + np.sum(scantype >= 1) if count_them != len(scantype): print("Bad Scan counting at data sampling level - Error") raise RuntimeError("Bad scan counting at data sampling level") ### Interpolate azimuth and elevation to TOD sampling azt = np.interp(tt, thk, az) elt = np.interp(tt, thk, el) ### Return evereything return scantype_hk, azt, elt, scantype else: ### Return scantype at HK sampling only return scantype_hk
def _scantype(self, speedmin): """ Method to run scantype analysis. Parameters : ------------ - speedmin : float number that describe the mount speed. """ scantype_hk, azt, elt, scantype = self.identify_scans(self.thk, self.az, self.el, tt=self.tt, plotrange=[0, 2000], thr_speedmin=speedmin) return scantype_hk, azt, elt, scantype def _remove_drifts_spline(self, nsplines=40, nresample=1000, nedge=100): """ Method to remove drifts using spline interpolation. """ ### Linear function using the beginning and end of the samples (nn samples) ### In order to approach periodicity of the signal to be resampled x0 = np.mean(self.tt[:nedge]) y0 = np.mean(self.tod[:nedge]) x1 = np.mean(self.tt[-nedge:]) y1 = np.mean(self.tod[-nedge:]) p = np.poly1d(np.array([(y1 - y0) / (x1 - x0), y0])) ### Resample TOD-linearfunction to apply spline tt_resample = np.linspace(self.tt[0], self.tt[-1], nresample) tod_resample = scipy.signal.resample(self.tod - p(self.tt), nresample) + p(tt_resample) ### Perform spline fitting splfit = MySplineFitting(tt_resample, tod_resample, tod_resample * 0 + 1, nsplines) return self.tod - splfit(self.tt) def _remove_offset_scan(self, method="meancut", apply_to_bad=True): """ Method to remove offsets. """ ### We remove offsets for each good scan but we also need to remove a coomparable offset for the scantype==0 reggiions in order to keep coninuity ### This si donee by apply_to_bad=True indices = np.arange(len(self.tod)) last_index = 0 myoffsetn = 0 myoffsetp = 0 donefirst = 0 nscans = np.max(np.abs(self.scantype)) for n in range(1, nscans + 1): # scan + ok = self.scantype == n if method == "meancut": myoffsetp, _ = ft.meancut(self.tod[ok], 3) elif method == "median": myoffsetp = np.median(self.tod[ok]) elif method == "mode": myoffsetp = get_mode(self.tod[ok]) else: break self.tod[ok] -= myoffsetp if apply_to_bad: first_index = np.min(indices[ok]) if (n == 1) & (donefirst == 0): myoffsetn = myoffsetp ### deal with first region vals_offsets = myoffsetn + np.linspace(0, 1, first_index - last_index - 1) * (myoffsetp - myoffsetn) self.tod[last_index + 1 : first_index] -= vals_offsets last_index = np.max(indices[ok]) donefirst = 1 # scan - ok = self.scantype == (-n) if method == "meancut": myoffsetn, _ = ft.meancut(self.tod[ok], 3) elif method == "median": myoffsetn = np.median(self.tod[ok]) elif method == "mode": myoffsetn = get_mode(self.tod[ok]) else: break self.tod[ok] -= myoffsetn if apply_to_bad: first_index = np.min(indices[ok]) if (n == 1) & (donefirst == 0): myoffsetp = myoffsetn ### deal with first region vals_offsets = myoffsetp + np.linspace(0, 1, first_index - last_index - 1) * (myoffsetn - myoffsetp) self.tod[last_index + 1 : first_index] -= vals_offsets last_index = np.max(indices[ok]) donefirst = 1 return self.tod
[docs] def decorel_azel(self, nbins=50, n_el=30, degree=None, nbspl=10): """ Method to remove correlation in azimuth. """ ### Profiling in Azimuth and elevation el_lims = np.linspace(np.min(self.elt) - 0.0001, np.max(self.elt) + 0.0001, n_el + 1) el_av = 0.5 * (el_lims[1:] + el_lims[:-1]) okall = np.abs(self.scantype) > 0 okpos = self.scantype > 0 okneg = self.scantype < 0 oks = [okpos, okneg] minaz = np.min(self.azt[okall]) maxaz = np.max(self.azt[okall]) ### Use polynomials or spline fitting to remove drifts and large features if degree is not None: coefficients = np.zeros((2, n_el, degree + 1)) else: coefficients = np.zeros((2, n_el, nbspl)) for i in range(len(oks)): for j in range(n_el): ok = oks[i] & (self.elt >= el_lims[j]) & (self.elt < el_lims[j + 1]) if np.sum(ok) == 0: break xc, yc, dx, dy, _ = ft.profile(self.azt[ok], self.tod[ok], rng=[minaz, maxaz], nbins=nbins, mode=True, dispersion=True, plot=False) if degree is not None: ### Polynomial Fitting z = np.polyfit(xc, yc, degree, w=1.0 / dy) coefficients[i, j, :] = z else: ### Spline Fitting splfit = MySplineFitting(xc, yc, dy, nbspl) coefficients[i, j, :] = splfit.alpha ### Now interpolate this to remove it to the data nscans = np.max(np.abs(self.scantype)) for i in range(1, nscans + 1): okp = self.scantype == i okn = self.scantype == (-i) for ok in [okp, okn]: the_el = np.median(self.elt[ok]) if degree is not None: myp = np.poly1d([np.interp(the_el, el_av, coefficients[0, :, i]) for i in np.arange(degree + 1)]) self.tod[ok] -= myp(self.azt[ok]) else: myalpha = [np.interp(the_el, el_av, coefficients[0, :, i]) for i in np.arange(nbspl)] self.tod[ok] -= splfit.with_alpha(self.azt[ok], myalpha) ### And interpolate for scantype==0 regions bad_chunks = self.get_chunks(0) self.tod = self.linear_rescale_chunks(bad_chunks, sz=100) return self.tod
[docs] def linear_rescale_chunks(self, chunks, sz=1000): """ """ for i in range(len(chunks)): thechunk = chunks[i] chunklen = thechunk[1] - thechunk[0] + 1 if thechunk[0] == 0: # this is the starting index => just the average vals = np.zeros(chunklen) + np.median(self.tod[thechunk[1] + 1 : thechunk[1] + sz]) + np.median(self.tod[thechunk[0] : thechunk[1]]) self.tod[thechunk[0] : thechunk[1] + 1] -= vals elif thechunk[1] == (len(self.tod) - 1): # this is the last one => just the average vals = np.zeros(chunklen) + np.median(self.tod[thechunk[0] - 1 - sz : thechunk[0] - 1]) + np.median(self.tod[thechunk[0] : thechunk[1]]) self.tod[thechunk[0] : thechunk[1] + 1] -= vals else: left = np.median(self.tod[thechunk[0] - 1 - sz : thechunk[0] - 1]) right = np.median(self.tod[thechunk[1] + 1 : thechunk[1] + sz]) vals = left + np.linspace(0, 1, chunklen) * (right - left) self.tod[thechunk[0] : thechunk[1] + 1] -= np.median(self.tod[thechunk[0] : thechunk[1] + 1]) - vals return self.tod
[docs] def get_chunks(self, value): """ """ ### returns chunks corresponding to a given value current_chunk = [] chunk_idx = [] inchunk = 0 chunknum = 0 for i in range(len(self.scantype)): if self.scantype[i] == value: inchunk = 1 current_chunk.append(i) else: if inchunk == 1: chunknum += 1 chunk_idx.append([current_chunk[0], current_chunk[len(current_chunk) - 1]]) current_chunk = [] inchunk = 0 if inchunk == 1: chunk_idx.append([current_chunk[0], current_chunk[len(current_chunk) - 1]]) return chunk_idx
[docs] def healpix_map_(self, nside=128, countcut=0, unseen_val=hp.UNSEEN): """ Method to project data on the sky using coaddition. """ ips = hp.ang2pix(nside, self.azt, self.elt, lonlat=True) mymap = np.zeros(12 * nside**2) mapcount = np.zeros(12 * nside**2) for i in range(len(self.azt)): mymap[ips[i]] -= self.tod[i] mapcount[ips[i]] += 1 unseen = mapcount <= countcut mymap[unseen] = unseen_val mapcount[unseen] = unseen_val mymap[~unseen] = mymap[~unseen] / mapcount[~unseen] return mymap
[docs] def run(self, remove_drift=False, remove_offset=False, decorel=False, healpix_map=False): """ Main method to run the pipeline. """ if remove_drift: self.tod = self._remove_drifts_spline() if remove_offset: self.tod = self._remove_offset_scan() if decorel: self.tod = self.decorel_azel() if healpix_map: m = self.healpix_map_() return self.tod, m else: return self.tod
[docs] class FocalPlane: """ Instance to display data on the FP (assuming Technical Demonstrator) """ def __init__(self, tt, tod, az, el, thk): ### Save tod self.tt, self.tod = tt, tod self.ndets = self.tod.shape[0] self.az, self.el = az, el self.thk = thk ### Useful arguments to plot focal plane self.coord_thermo = np.array([17 * 11 + 1, 17 * 12 + 1, 17 * 13 + 1, 17 * 14 + 1, 275, 276, 277, 278]) self.numbering_tes = np.arange(1, 129, 1) x = np.linspace(-0.0504, -0.0024, 17) y = np.linspace(-0.0024, -0.0504, 17) ### Detector coordinates self.X, self.Y = np.meshgrid(x, y) def _get_instrument(self): """ Method to initiate QUBIC instrument. """ dictfilename = "global_source_oneDet.dict" d = qubicDict() d.read_from_file(dictfilename) d["synthbeam"] = "CalQubic_Synthbeam_Calibrated_Multifreq_FI.fits" q = QubicInstrument(d) return q def _get_place_ONAFP_with_TES(self, tes, asic): """ Method to compute coordintates for FP plot using TES and ASIC numbering. """ xtes, ytes, FP_index, index_q = scal.TES_Instru2coord(TES=tes, ASIC=asic, q=self._get_instrument(), frame="ONAFP", verbose=False) ind = np.where((np.round(xtes, 4) == np.round(self.X, 4)) & (np.round(ytes, 4) == np.round(self.Y, 4))) return ind # ind[0][0]*17+ind[1][0]+1
[docs] def create_subplots(self, tes, asic, ax, **kwargs): """ Method to make many subplots for each TES. """ if np.sum(tes == np.array([4, 36, 68, 100])) != 0: i = self.coord_thermo[self.k_thermo] self.k_thermo += 1 else: i = self._get_place_ONAFP_with_TES(tes, asic) if asic == 2: tes += 128 idx = tes - 1 ax[i[0][0], i[1][0]].plot(self.tt, self.tod[idx], **kwargs) ax[i[0][0], i[1][0]].annotate( "{}".format(tes), xy=(0, 0), xycoords="axes fraction", fontsize=9, color="black", fontstyle="italic", fontweight="bold", xytext=(0.05, 0.85), backgroundcolor="w" ) self.bar.update()
[docs] def create_subplots_healpix(self, tes, asic, m, center, reso, ax, fig, **kwargs): """ Method to make many subplots for each TES with healpix projection. """ if np.sum(tes == np.array([4, 36, 68, 100])) != 0: i = self.coord_thermo[self.k_thermo] self.k_thermo += 1 else: i = self._get_place_ONAFP_with_TES(tes, asic) if asic == 2: tes += 128 mproj = hp.gnomview(m, rot=center, reso=reso, title="", notext=True, cbar=False, return_projected_map=True, bgcolor="black", badcolor="gray", cmap="jet") ax[i[0][0], i[1][0]].imshow(mproj, origin="lower", cmap="jet", **kwargs) ax[i[0][0], i[1][0]].annotate( "{}".format(tes), xy=(0, 0), xycoords="axes fraction", fontsize=9, color="black", fontstyle="italic", fontweight="bold", xytext=(0.05, 0.85), backgroundcolor="w" ) self.bar.update()
def _get_FP_line(self, type_of_data="line", color=None, save=None, center=None, reso=None, nside=None, **kwargs): """ Method to display data in the FP (assuming TD configuration). """ self.bar = progress_bar(256, "Display focal plane") if color is None: color = ["black"] * self.ndets fig, ax = plt.subplots(17, 17, figsize=(30, 30)) ### Remove all x and y axis for i in range(17): for j in range(17): ax[i, j].get_xaxis().set_visible(False) ax[i, j].get_yaxis().set_visible(False) self.k_thermo = 0 ### Main loop k = 0 for j in [1, 2]: for i in self.numbering_tes: # plt.rc('font',size=6) if type_of_data == "line": self.create_subplots(i, j, ax, **kwargs) elif type_of_data == "healpix": ### Herit some methods pipeline = Pip1Tes(self.tt, self.tod[k], self.az, self.el, self.thk) m = pipeline.healpix_map_(nside=nside) self.create_subplots_healpix(i, j, m, center, reso, ax, fig, **kwargs) k += 1 ### Saving if save is not None: fig.savefig(save) plt.close()
[docs] class PipAllTes: """ Instance to analyse and display all TES. """ def __init__(self, tt, tod, az, el, thk): self.tt, self.tod = tt, tod self.ndets = self.tod.shape[0] self.az, self.el = az, el self.thk = thk self._create_folder("plots") def _create_folder(self, folder_name): """ Method to create folder to save plots. Parameters : ------------ - folder_name : str """ if not os.path.exists(folder_name): try: os.makedirs(folder_name) except OSError: pass else: pass
[docs] def run(self, remove_drift=False, remove_offset=False, decorel=False, plot_FP=False, plot_FP_healpix=False, center=None, reso=None, color=None, nside=128, **kwargs): """ Method to run loop for all TES. You can add useful arguments using **kwargs keyword. Parameters : ------------ - remove_drift : bool - remove_offset : bool - decorel : bool - plot_FP : bool to plot FP with TOD - plot_FP_healpix : bool to plot FP with projected maps - center : tuple - reso : Int number for healpix resolution - color : list of color for each TES - nside : Int number for map pixelization """ for idet in range(self.ndets): ### If treatment is needed, it initiate pipeline for a single TES if remove_drift or remove_offset or decorel: print(f"============ Treating TES #{idet + 1} ============") pipeline = Pip1Tes(self.tt, self.tod[idet], self.az, self.el, self.thk) if remove_drift: self.tod[idet] = pipeline._remove_drifts_spline() if remove_offset: self.tod[idet] = pipeline._remove_offset_scan() if decorel: self.tod[idet] = pipeline.decorel_azel() ### Initiate FP instance fp = FocalPlane(self.tt, self.tod, self.az, self.el, self.thk) if plot_FP: fp._get_FP_line(type_of_data="line", color=color, save="plots/FP.png") if plot_FP_healpix: fp._get_FP_line(type_of_data="healpix", color=color, save="plots/FP_healpix.png", center=center, reso=reso, nside=nside, **kwargs)