Source code for lib.MapMaking.ComponentMapMaking.preset.preset_acquisition

import numpy as np
from fgbuster import component_model as c
from pysimulators.interfaces.healpy import HealpixConvolutionGaussianOperator

from qubic.lib.Instrument.Qnoise import QubicTotNoise


[docs] class PresetAcquisition: r"""Preset Acquisition. Instance to initialize the Components Map-Making. It defines the data acquisition variables and methods. Parameters ---------- seed_noise : int Seed for random noise generation. preset_tools : object Class containing tools and simulation parameters. preset_external : object Class containing external variables and methods. preset_qubic : object Class containing qubic operator and variables and methods. preset_sky : object Class containing sky varaibles and methods. preset_comp : object Class containing component variables and methods. preset_mixing_matrix : object Class containing mixing-matrix variables and methods. preset_gain : object Class containing detector gain variables and methods. Attributes ---------- seed_noise: int Seed for random noise realization. params_foregrounds: dict Dictionary containing the paramters associated with foregrounds. ites_to_converge: int Max iteration number for PCG algorithm. invN: BlockDiagonalOperator Inverse noise covariance matrix with input shape :math:`(N_{det}*N_{samples} + N_{pix}*N_{planck}*N_{stokes})` and output shape :math:`(N_{det}*N_{samples} + N_{pix}*N_{planck}*N_{stokes})`. M: DiagonalOperator Preconditioner for PCG algorithm. fwhm_qubic_rec: array_like Resolution of the reconstructed maps. H: BlockColumnOperator Pointing matrix. TOD_qubic: array_like QUBIC simulated TOD. TOD_external: array_like Planck simulated TOD. TOD_obs: array_like Simulated observed TOD. beta_iter: array_like Spectral indices, if d1 :math:`(12 \times Nside_{\beta}^2, N_{comp}-1)`, if not :math:`(N_{comp}-1)`. allbeta: array_like Spectral indeices, if d1 :math:`(iter, 12 \times Nside_{\beta}^2, N_{comp}-1)`, if not :math:`(iter, N_{comp}-1)`. Amm_iter: array_like Mixing matrix. """ # Notes # ----- # invN : # .operands[0] = QUBIC, .operands[1] = Planck\ # .operands[0].operands[0] = ReshapeOperator (Ndet, Nsamples) ---> (Ndet*Nsamples), .operands[0].operands[1] = ReshapeOperator (Ndet*Nsamples) ---> (Ndet, Nsamples)\ # .operands[0].operands[0/1].operands[0] = 150 GHz focal plane , .operands[0].operands[0/1].operands[1] = 220 GHz focal plane\ # H : # .operands[0] = QUBIC / .operands[1] = Planck # .operands[0].operands = Frequency Sub-Bands # .operands[0].operands[0].operands[0] = 150 GHz / .operands[0].operands[0].operands[1] = 220 GHz\ def __init__( self, seed_noise_qubic, seed_noise_planck, seed_start_pcg, preset_tools, preset_external, preset_qubic, preset_sky, preset_comp, preset_mixing_matrix, preset_gain, ): """ Initialization. """ ### Import preset Gain, Mixing Matrix, Foregrounds, Sky, QUBIC & tools self.preset_tools = preset_tools self.preset_external = preset_external self.preset_qubic = preset_qubic self.preset_sky = preset_sky self.preset_comp = preset_comp self.preset_mixingmatrix = preset_mixing_matrix self.preset_gain = preset_gain ### Set noise seeds + PCG start self.seed_noise_qubic = seed_noise_qubic self.seed_noise_planck = seed_noise_planck self.seed_start_pcg = seed_start_pcg ### Define tolerance of the rms variations self.ites_to_converge = self.preset_tools.params["PCG"]["ites_to_converge"] self.rms_plot = np.zeros((1, 2)) self.convergence = [] ### Inverse noise-covariance matrix self.preset_tools.mpi._print_message(" => Building inverse noise covariance matrix") self.invN = self.preset_qubic.joint_out.get_invntt_operator( self.preset_tools.params["QUBIC"]["NOISE"]["ndet"], self.preset_tools.params["QUBIC"]["NOISE"]["npho150"], self.preset_tools.params["QUBIC"]["NOISE"]["npho220"], self.preset_tools.params["PLANCK"]["level_noise_planck"], self.preset_sky.seenpix, ) ### Get convolution self.preset_tools.mpi._print_message(" => Getting convolution") self.fwhm_qubic_tod, self.fwhm_qubic_mapmaking, self.fwhm_qubic_rec = self.get_convolution() #! Tom: need to update this self.fwhm_planck_tod = [self.fwhm_qubic_tod.min()] * len(self.preset_external.external_nus) * self.preset_external.params_external["nsub_planck"] self.fwhm_planck_mapmaking = [self.fwhm_qubic_mapmaking.min()] * len(self.preset_external.external_nus) * self.preset_external.params_external["nsub_planck"] self.fwhm_tod = np.concatenate((self.fwhm_qubic_tod, self.fwhm_planck_tod)) self.fwhm_mapmaking = np.concatenate((self.fwhm_qubic_mapmaking, self.fwhm_planck_mapmaking)) self.fwhm_rec = self.fwhm_qubic_rec ### Build Planck maps self.components_in_convolved = np.zeros(np.shape(self.preset_comp.components_out)) C = HealpixConvolutionGaussianOperator(np.min(self.fwhm_qubic_tod)) for icomp, _ in enumerate(self.preset_comp.components_name_out): self.components_in_convolved[icomp] = C(self.preset_comp.components_in[icomp]) ### Get observed data self.preset_tools.mpi._print_message(" => Getting observational data") self.get_tod() ### Compute initial guess for PCG self.preset_tools.mpi._print_message(" => Initializing starting point") self.get_x0() def _get_scalar_acquisition_operator(self): """ Function that will compute "scalar acquisition operatord" by applying the acquisition operators to a vector full of ones. These scalar operators will be used to compute the resolutions in the case where we do not add convolutions during reconstruction. """ ### Import the acquisition operators acquisition_operators = self.preset_qubic.joint_out.qubic.H ### Create the vector full of ones which will be used to compute the scalar operators vector_ones = np.ones(acquisition_operators[0].shapein) ### Apply each sub_operator on the vector scalar_acquisition_operators = np.empty(len(self.preset_qubic.joint_out.qubic.allnus)) for freq in range(len(self.preset_qubic.joint_out.qubic.allnus)): scalar_acquisition_operators[freq] = np.mean(acquisition_operators[freq](vector_ones)) return scalar_acquisition_operators
[docs] def get_convolution(self): """Convolutions. Method to define all angular resolutions of the instrument at each frequency. This method sets the Full Width at Half Maximum (FWHM) for Time-Ordered Data (TOD) and map-making processes. `self.fwhm_qubic_tod` represents the real angular resolution, and `self.fwhm_qubic_mapmaking` represents the beams used for reconstruction. The method checks the `convolution_in` and `convolution_out` parameters to determine the appropriate FWHM values. It also calculates the reconstructed FWHM based on these parameters. Returns ------- fwhm_qubic_tod: array_like Resolutions to create input TOD. fwhm_qubic_mapmaking: array_like Resolutions for map-making. fwhm_qubic_rec: array_like Resolutions of reconstructed maps. """ # Initialize FWHM arrays to 0 fwhm_qubic_tod = self.preset_qubic.joint_in.qubic.allfwhm * 0 fwhm_qubic_mapmaking = self.preset_qubic.joint_in.qubic.allfwhm * 0 # Check if convolution_in is True if self.preset_qubic.params_qubic["convolution_in"]: fwhm_qubic_tod = self.preset_qubic.joint_in.qubic.allfwhm # Check if convolution_out is True if self.preset_qubic.params_qubic["convolution_out"]: fwhm_qubic_mapmaking = np.sqrt(self.preset_qubic.joint_in.qubic.allfwhm**2 - np.min(self.preset_qubic.joint_in.qubic.allfwhm) ** 2) # Calculate the reconstructed FWHM based on convolution parameters if self.preset_qubic.params_qubic["convolution_in"] and self.preset_qubic.params_qubic["convolution_out"]: fwhm_qubic_rec = np.min(self.preset_qubic.joint_in.qubic.allfwhm) # min of allfwhm? elif self.preset_qubic.params_qubic["convolution_in"] and not self.preset_qubic.params_qubic["convolution_out"]: # fwhm_qubic_rec = np.full(len(self.preset_comp.components_model_out), np.mean(self.preset_qubic.joint_in.qubic.allfwhm)) scalar_acquisition_operators = self._get_scalar_acquisition_operator() fwhm_qubic_rec = np.zeros(len(self.preset_comp.components_model_out)) for comp, comp_name in enumerate(self.preset_comp.components_name_out): if comp_name == "CMB": factor = scalar_acquisition_operators elif comp_name == "Dust": f_dust = c.Dust(nu0=self.preset_comp.params_foregrounds["Dust"]["nu0"], beta_d=self.preset_comp.params_foregrounds["Dust"]["beta_init"][0], temp=20) factor = scalar_acquisition_operators * f_dust.eval(self.preset_qubic.joint_out.qubic.allnus) elif self.preset_comp.components_name_out[comp] == "Synchrotron": f_sync = c.Synchrotron( nu0=self.preset_comp.params_foregrounds["Synchrotron"]["nu0"], beta_pl=self.preset_comp.params_foregrounds["Synchrotron"]["beta_init"][0], ) factor = scalar_acquisition_operators * f_sync.eval(self.preset_qubic.joint_out.qubic.allnus) fwhm_qubic_rec[comp] = np.sum(factor * fwhm_qubic_tod) / (np.sum(factor)) elif not self.preset_qubic.params_qubic["convolution_in"] and not self.preset_qubic.params_qubic["convolution_out"]: fwhm_qubic_rec = np.zeros(len(self.preset_comp.components_model_out)) # Print the FWHM values self.preset_tools.mpi._print_message(f"FWHM for TOD making : {fwhm_qubic_tod}") self.preset_tools.mpi._print_message(f"FWHM for reconstruction : {fwhm_qubic_mapmaking}") self.preset_tools.mpi._print_message(f"Reconstructed FWHM : {fwhm_qubic_rec}") return fwhm_qubic_tod, fwhm_qubic_mapmaking, fwhm_qubic_rec
[docs] def get_noise(self): """Noise. Method to define QUBIC noise, can generate Dual band or Wide Band noise by following: - Dual Band: n = [Ndet + Npho_150, Ndet + Npho_220] - Wide Band: n = [Ndet + Npho_150 + Npho_220] Depending on the instrument type specified in the preset_qubic parameters, this method will instantiate either QubicWideBandNoise or QubicDualBandNoise and return the total noise. Returns ------- noise: array_like The total noise array, flattened. """ noise = QubicTotNoise( self.preset_qubic.dict, self.preset_qubic.joint_out.qubic.sampling, self.preset_qubic.joint_out.qubic.scene, ) return noise.total_noise( self.preset_qubic.params_qubic["NOISE"]["ndet"], self.preset_qubic.params_qubic["NOISE"]["npho150"], self.preset_qubic.params_qubic["NOISE"]["npho220"], seed_noise=self.seed_noise_qubic, ).ravel()
[docs] def get_tod(self): r"""TOD. Generate fake observational data from QUBIC and external experiments. This method simulates observational data, including astrophysical foregrounds contamination using `self.beta` and systematics using `self.g`. The data generation follows the formula: :math:`\vec{d} = H.A.\vec{c} + \vec{n}`. Attributes ---------- H: Operator Pointing matrix operator. TOD_qubic: array_like The simulated observational data for QUBIC. TOD_external: array_like The simulated observational data for external experiments. TOD_obs: array_like The combined observational data from QUBIC and external experiments. nsampling_x_ndetectors: int The number of samples in `self.TOD_qubic`. """ ### Build joint acquisition operator print("Amm_in : ", self.preset_mixingmatrix.Amm_in.shape) self.H = self.preset_qubic.joint_in.get_operator( A=self.preset_mixingmatrix.Amm_in, gain=self.preset_gain.gain_in, fwhm=self.fwhm_tod, ) ### Build noise variables noise_external = self.preset_qubic.joint_in.external.get_noise( planck_ntot=self.preset_tools.params["PLANCK"]["level_noise_planck"], weight_planck=self.preset_tools.params["PLANCK"]["weight_planck"], seenpix=self.preset_sky.seenpix, seed=self.seed_noise_planck, ) noise_qubic = self.get_noise() ### Create QUBIC TOD self.TOD_qubic = (self.H.operands[0])(self.preset_comp.components_in) + noise_qubic self.nsampling_x_ndetectors = self.TOD_qubic.shape[0] ### Create external TOD # self.TOD_external = self.H.operands[1](self.components_in_convolved) + noise_external.ravel() # self.TOD_external_zero_outside_patch = self.components_in_convolved.copy() # self.TOD_external_zero_outside_patch[:, ~self.preset_sky.seenpix] = 0 # self.TOD_external_zero_outside_patch = self.H.operands[1](self.TOD_external_zero_outside_patch) + noise_external.ravel() self.TOD_external = self.H.operands[1](self.preset_comp.components_in) + noise_external.ravel() self.TOD_external_zero_outside_patch = self.preset_comp.components_in.copy() self.TOD_external_zero_outside_patch[:, ~self.preset_sky.seenpix] = 0 self.TOD_external_zero_outside_patch = self.H.operands[1](self.TOD_external_zero_outside_patch) + noise_external.ravel() #! Tom : Here, we are computing TOD from maps, then reshape to refound the maps, convolve the maps, and then reshape again to have the TOD... It is really dumb # _r = ReshapeOperator(self.TOD_external.shape, (len(self.preset_external.external_nus), 12 * self.preset_sky.params_sky["nside"] ** 2, 3)) # maps_external = _r(self.TOD_external) # ### Reconvolve Planck data toward QUBIC angular resolution # #! Tom : correct this part, we don't want to reconvolve here # if self.preset_qubic.params_qubic["convolution_in"] or self.preset_qubic.params_qubic["convolution_out"]: # C = HealpixConvolutionGaussianOperator( # fwhm=self.preset_qubic.joint_in.qubic.allfwhm[-1] * 0, # lmax=3 * self.preset_sky.params_sky["nside"], # ) # for i in range(maps_external.shape[0]): # maps_external[i] = C(maps_external[i]) # # if self.preset_tools.params["PCG"]["fix_pixels_outside_patch"]: # # maps_external[:, ~self.preset_sky.seenpix_qubic, :] = 0 # # self.TOD_external = _r.T(maps_external) # # self.seenpix_external = np.tile(self.preset_sky.seenpix_qubic, (maps_external.shape[0], 3, 1)).reshape(maps_external.shape) # ## Planck dataset with 0 outside QUBIC patch (Planck is assumed on the full sky) # maps_external[:, ~self.preset_sky.seenpix_qubic, :] = 0 # self.TOD_external_zero_outside_patch = _r.T(maps_external) ### Observed TOD (Planck is assumed on the full sky) self.TOD_obs = np.r_[self.TOD_qubic, self.TOD_external] self.TOD_obs_zero_outside = np.r_[self.TOD_qubic, self.TOD_external_zero_outside_patch.ravel()]
[docs] def get_x0(self): """PCG starting point. Define starting point of the convergence. The argument 'set_comp_to_0' multiplies the pixel values by a given factor. You can decide to convolve the map by a beam with an FWHM in radians. This method initializes the beta_iter and Amm_iter attributes based on the foreground model parameters. It also applies convolution and noise to the components based on the preset parameters. Raises ------ TypeError If an unrecognized component name is encountered. """ ### Fix random state np.random.seed(self.seed_start_pcg) self.beta_iter, self.Amm_iter = self.preset_mixingmatrix._get_beta_iter() # Build beta map for spatially varying spectral index self.allbeta = np.array([self.beta_iter]) # C1 = [HealpixConvolutionGaussianOperator(fwhm=self.fwhm_qubic_rec[i], lmax=3 * self.preset_tools.params["SKY"]["nside"]) for i in range(len(self.preset_comp.components_model_out))] C2 = HealpixConvolutionGaussianOperator( fwhm=self.preset_tools.params["INITIAL"]["fwhm0"], lmax=3 * self.preset_tools.params["SKY"]["nside"] - 1, ) # Constant spectral index -> maps have shape (Ncomp, Npix, Nstk) istk = 0 mypix = self.preset_sky.seenpix for i, comp_name in enumerate(self.preset_comp.components_name_out): # self.preset_comp.components_iter[i] = C2( # C1[i](self.preset_comp.components_iter[i]) # Two convolutions in a row? # ) # self.preset_comp.components_iter[i] = C2( # self.preset_comp.components_convolved_out[i] # ) # self.preset_comp.components_iter[i] = C2( # self.components_convolved_recon[i] # ) self.preset_comp.components_iter[i] = C2(self.components_in_convolved[i]) for istk in range(3): if istk == 0: key = "I" else: key = "P" initial_factor = ( self.preset_tools.params["INITIAL"]["qubic_patch_{}_{}".format(key, comp_name[: min(4, len(comp_name))].lower())] * self.preset_tools.params["INITIAL"]["global_{}".format(key)] ) self.preset_comp.components_iter[i, mypix, istk] *= initial_factor # To make it more uniform, either name the components "cmb", "dust", "sync", "co" or the files "qubic_patch_I_CMB", "qubic_patch_I_Dust", "qubic_patch_I_Synchrotron", "qubic_patch_I_CO" self.preset_comp.components_iter[i, mypix, istk] += np.random.normal(0, self.preset_tools.params["INITIAL"]["sig_map_noise"], self.preset_comp.components_iter[i, mypix, istk].shape)
# else: # self.allbeta = np.array([self.beta_iter]) # C1 = HealpixConvolutionGaussianOperator(fwhm=self.fwhm_qubic_rec, lmax=3*self.preset_tools.params['SKY']['nside']) # C2 = HealpixConvolutionGaussianOperator(fwhm=self.preset_tools.params['INITIAL']['fwhm0'], lmax=3*self.preset_tools.params['SKY']['nside']) # ### Varying spectral indices -> maps have shape (Nstk, Npix, Ncomp) # for i in range(len(self.preset_comp.components_model_out)): # if self.preset_comp.components_name_out[i] == 'CMB': # #print(self.preset_comp.components_iter.shape) # self.preset_comp.components_iter[:, :, i] = C2(C1(self.preset_comp.components_iter[:, :, i].T)).T # self.preset_comp.components_iter[1:, self.preset_sky.seenpix, i] *= self.preset_tools.params['INITIAL']['qubic_patch_cmb'] # elif self.preset_comp.components_name_out[i] == 'Dust': # self.preset_comp.components_iter[:, :, i] = C2(C1(self.preset_comp.components_iter[:, :, i].T)).T + np.random.normal(0, self.preset_tools.params['INITIAL']['sig_map_noise'], self.preset_comp.components_iter[:, :, i].T.shape).T # self.preset_comp.components_iter[1:, self.preset_sky.seenpix, i] *= self.preset_tools.params['INITIAL']['qubic_patch_dust'] # elif self.preset_comp.components_name_out[i] == 'Synchrotron': # self.preset_comp.components_iter[:, :, i] = C2(C1(self.preset_comp.components_iter[:, :, i].T)).T + np.random.normal(0, self.preset_tools.params['INITIAL']['sig_map_noise'], self.preset_comp.components_iter[:, :, i].T.shape).T # self.preset_comp.components_iter[1:, self.preset_sky.seenpix, i] *= self.preset_tools.params['INITIAL']['qubic_patch_sync'] # elif self.preset_comp.components_name_out[i] == 'CO': # self.preset_comp.components_iter[:, :, i] = C2(C1(self.preset_comp.components_iter[:, :, i].T)).T + np.random.normal(0, self.preset_tools.params['INITIAL']['sig_map_noise'], self.preset_comp.components_iter[:, :, i].T.shape).T # self.preset_comp.components_iter[1:, self.preset_sky.seenpix, i] *= self.preset_tools.params['INITIAL']['qubic_patch_co'] # else: # raise TypeError(f'{self.preset_comp.components_name_out[i]} not recognize')