Source code for lib.QskySim

import os
import pickle
import random as rd
import string

import healpy as hp
import matplotlib.pyplot as plt
import numpy as np
import pysm3 as pysm
import pysm3.units as u
from pysm3 import utils
from scipy.optimize import curve_fit

from qubic.data import PATH as data_dir
from qubic.lib.Calibration import Qfiber as ft
from qubic.lib.Fitting import Qcamb as qc
from qubic.lib.Instrument.Qinstrument import compute_freq
from qubic.lib.Qutilities import progress_bar

__all__ = ["sky", "Qubic_sky"]


def cov2corr(mat):
    """
    Converts a Covariance Matrix into a Correlation Matrix
    """
    sh = np.shape(mat)
    if sh[0] == 1:
        return mat
    outmat = np.zeros_like(mat)
    for i in range(sh[0]):
        for j in range(sh[1]):
            outmat[i, j] = mat[i, j] / np.sqrt(mat[i, i] * mat[j, j])
    return outmat


def corr2cov(mat, diagvals):
    sh = np.shape(mat)
    if sh[0] == 1:
        return mat
    outmat = np.zeros_like(mat)
    for i in range(sh[0]):
        for j in range(sh[1]):
            outmat[i, j] = mat[i, j] * np.sqrt(diagvals[i] * diagvals[j])
    return outmat


[docs] class sky(object): """ Define a sky object as seen by an instrument. """ def __init__(self, skyconfig, d, instrument, out_dir, out_prefix, lmax=None): """ Parameters: skyconfig : a skyconfig dictionary to pass to (as expected by) `PySM` d : input dictionary, from which the following Parameters are read instrument : a `PySM` instrument describing the instrument out_dir : default path where the sky maps will be saved out_prefix : default word for the output files For more details about `PySM` see the `PySM` documentation at the floowing link: https://pysm-public.readthedocs.io/en/latest/index.html """ self.skyconfig = skyconfig self.nside = d["nside"] self.npix = 12 * self.nside**2 self.dictionary = d self.Nfin = int(self.dictionary["nf_sub"]) self.Nfout = int(self.dictionary["nf_recon"]) self.filter_nu = int(self.dictionary["filter_nu"] / 1e9) self.filter_relative_bandwidth = self.dictionary["filter_relative_bandwidth"] self.instrument = instrument self.output_directory = out_dir self.output_prefix = out_prefix self.input_cmb_maps = None self.input_cmb_spectra = None if lmax is None: self.lmax = 3 * d["nside"] else: self.lmax = lmax iscmb = False preset_strings = [] for k in skyconfig.keys(): if k == "cmb": iscmb = True keyword = skyconfig[k] if isinstance(keyword, dict): # the CMB part is defined via a dictionary # This can be either a set of maps, a set of CAMB spectra, or whatever # In the second case it might also contain the seed (None means rerun it each time) # In the third case we recompute some CAMB spectra and generate the maps keys = keyword.keys() if "IQUMaps" in keys: # this is the case where we have IQU maps mymaps = keyword["IQUMaps"] self.input_cmb_maps = mymaps self.input_cmb_spectra = None elif "CAMBSpectra" in keys: # this is the case where we have CAMB Spectra # Note that they are in l(l+1) CL/2pi so we have to change that for synfast totDL = keyword["CAMBSpectra"] ell = keyword["ell"] mycls = qc.Dl2Cl_without_monopole(ell, totDL) # set the seed if needed if "seed" in keys: np.random.seed(keyword["seed"]) mymaps = hp.synfast(mycls.T, self.nside, new=True) self.input_cmb_maps = mymaps self.input_cmb_spectra = totDL else: raise ValueError("Bad Dictionary given for PySM in the CMB part - see QubicSkySim.py for details") else: # The CMB part is not defined via a dictionary but only by the seed for synfast # No map nor CAMB spectra was given, so we recompute them. # The assumed cosmology is the default one given in the get_CAMB_Dl() function # from camb_interface library. if keyword is not None: np.random.seed(keyword) ell, totDL, unlensedCL = qc.get_camb_Dl(lmax=self.lmax) mycls = qc.Dl2Cl_without_monopole(ell, totDL) mymaps = hp.synfast(mycls.T, self.nside, new=True) self.input_cmb_maps = mymaps self.input_cmb_spectra = totDL # Write a temporary file with the maps so the PySM can read them rndstr = random_string(10) hp.write_map("/tmp/" + rndstr, mymaps) cmbmap = pysm.CMBMap(self.nside, map_IQU="/tmp/" + rndstr) os.remove("/tmp/" + rndstr) else: # we add the other predefined components preset_strings.append(skyconfig[k]) self.sky = pysm.Sky(nside=self.nside, preset_strings=preset_strings) if iscmb: self.sky.add_component(cmbmap)
[docs] def get_simple_sky_map(self): """ Create as many skies as the number of input frequencies. Instrumental effects are not considered. Return a vector of shape (number_of_input_subfrequencies, npix, 3) """ _, nus_edge, nus_in, _, _, Nbbands_in = compute_freq(self.filter_nu, self.Nfin, self.filter_relative_bandwidth) sky = np.zeros((self.Nfin, self.npix, 3)) ##### This is the old code by JCH - It has been replaced by what Edgar Jaber has proposed ##### see his presentation on Git: qubic/scripts/ComponentSeparation/InternshipJaber/teleconf_03122020.pdf # for i in range(Nf): # # ###################### This is puzzling part here: ############################ # # # See Issue on PySM Git: https://github.com/healpy/pysm/issues/49 # # ############################################################################### # # # #### THIS IS WHAT WOULD MAKE SENSE BUT DOES NOT WORK ~ 5% on maps w.r.t. input # # # nfreqinteg = 5 # # # nus = np.linspace(nus_edge[i], nus_edge[i + 1], nfreqinteg) # # # freqs = utils.check_freq_input(nus) # # # convert_to_uK_RJ = (np.ones(len(freqs), dtype=np.double) * u.uK_CMB).to_value( # # # u.uK_RJ, equivalencies=u.cmb_equivalencies(freqs)) # # # #print('Convert_to_uK_RJ :',convert_to_uK_RJ) # # # weights = np.ones(nfreqinteg) * convert_to_uK_RJ # # ############################################################################### # # ###### Works OK but not clear why... # # ############################################################################### # # nfreqinteg = 5 # # nus = np.linspace(nus_edge[i], nus_edge[i + 1], nfreqinteg) # # filter_uK_CMB = np.ones(len(nus), dtype=np.double) # # filter_uK_CMB_normalized = utils.normalize_weights(nus, filter_uK_CMB) # # weights = 1. / filter_uK_CMB_normalized # # ############################################################################### # # ### Integrate through band using filter shape defined in weights # # themaps_iqu = self.sky.get_emission(nus * u.GHz, weights=weights) # # sky[i, :, :] = np.array(themaps_iqu.to(u.uK_CMB, equivalencies=u.cmb_equivalencies(nus_in[i] * u.GHz))).T # # ratio = np.mean(self.input_cmb_maps[0,:]/sky[i,:,0]) # # print('Ratio to initial: ',ratio) # #### Here is the new code from Edgar Jaber for i in range(self.Nfin): nfreqinteg = 5 freqs = np.linspace(nus_edge[i], nus_edge[i + 1], nfreqinteg) weights = np.ones(nfreqinteg) sky[i, :, :] = (self.sky.get_emission(freqs * u.GHz, weights) * utils.bandpass_unit_conversion(freqs * u.GHz, weights, u.uK_CMB)).T return sky
# ### This is not supported yet.... # def read_sky_map(self): # """ # Returns the maps saved in the `output_directory` containing the `output_prefix`. # """ # map_list = [s for s in os.listdir(self.output_directory) if self.output_prefix in s] # map_list = [m for m in map_list if 'total' in m] # if len(map_list) > len(self.instrument.Frequencies): # map_list = np.array( # [[m for m in map_list if x in m] for x in self.instrument.Channel_Names]).ravel().tolist() # maps = np.zeros((len(map_list), hp.nside2npix(self.nside), 3)) # for i, title in enumerate(map_list): # maps[i] = hp.read_map(title, field=(0, 1, 2)).T # return map_list, maps # def get_sky_map(self): # """ # Returns the maps saved in the `output_directory` containing the `output_prefix`. If # there are no maps in the `ouput_directory` they will be created. # """ # sky_map_list, sky_map = self.read_sky_map() # if len(sky_map_list) < len(self.instrument.Frequencies): # self.instrument.observe(self.sky) # sky_map_list, sky_map = self.read_sky_map() # return sky_map # ### This part has been commented as it is not yet compatible with PySM3 # ### it was written by F. Incardona using PySM2 # class Planck_sky(sky): # """ # Define a sky object as seen by Planck. # """ # def __init__(self, skyconfig, d, output_directory="./", output_prefix="planck_sky", band=143): # self.band = band # self.planck_central_nus = np.array([30, 44, 70, 100, 143, 217, 353, 545, 857]) # self.planck_relative_bandwidths = np.array([0.2, 0.2, 0.2, 0.33, 0.33, 0.33, 0.33, 0.33, 0.33]) # self.planck_beams = np.array([33, 24, 14, 9.2, 7.1, 5.5, 5, 5, 5]) # self.planck_Isensitivities_pixel = np.array([2, 2.7, 4.7, 2, 2.2, 4.8, 14.7, 147, 6700]) # self.planck_Psensitivities_pixel = np.array([2.8, 3.9, 6.7, np.NaN, 4.2, 9.8, 29.8, np.NaN, np.NaN]) # self.planck_channels = self.create_planck_bandwidth() # self.planck_channels_names = ['33_GHz', '44_GHz', '70_GHz', '100_GHz', '143_GHz', '217_GHz', '353_GHz', # '545_GHz', '857_GHz'] # if band is not None: # idx = np.argwhere(self.planck_central_nus == band)[0][0] # instrument = pysm.Instrument( # {'nside': d['nside'], 'frequencies': self.planck_central_nus[idx:idx + 1], # GHz # 'use_smoothing': True, 'beams': self.planck_beams[idx:idx + 1], # arcmin # 'add_noise': True, # If True `sens_I` and `sens_Q` are required # 'noise_seed': 0, # Not used if `add_noise` is False # 'sens_I': self.get_planck_sensitivity("I")[idx:idx + 1], # Not used if `add_noise` is False # 'sens_P': self.get_planck_sensitivity("P")[idx:idx + 1], # Not used if `add_noise` is False # 'use_bandpass': True, # If True pass banpasses with the key `channels` # 'channel_names': self.planck_channels_names[idx:idx + 1], # 'channels': self.planck_channels[idx:idx + 1], 'output_units': 'uK_RJ', # 'output_directory': output_directory, 'output_prefix': output_prefix, 'pixel_indices': None}) # else: # instrument = {'nside': d['nside'], 'frequencies': self.planck_central_nus, # GHz # 'use_smoothing': True, 'beams': self.planck_beams, # arcmin # 'add_noise': True, # If True `sens_I` and `sens_Q` are required # 'noise_seed': 0, # Not used if `add_noise` is False # 'sens_I': self.get_planck_sensitivity("I"), # # Not used if `add_noise` is False # 'sens_P': self.get_planck_sensitivity("P"), # # Not used if `add_noise` is False # 'use_bandpass': True, # If True pass banpasses with the key `channels` # 'channel_names': self.planck_channels_names, 'channels': self.planck_channels, # 'output_units': 'uK_RJ', 'output_directory': output_directory, # 'output_prefix': output_prefix, 'pixel_indices': None} # sky.__init__(self, skyconfig, d, instrument, output_directory, output_prefix) # def create_planck_bandwidth(self, length=100): # """ # Returns a list of bandwidths and respectively weights correponding to the ideal Planck bandwidths. # `planck_central_nus` must be an array containing the central frequency of the channel while the # `planck_relative_bandwidth` parameter must be an array containig the relative bandwidths for # each Planck channel. `length` is the length of the output array; default is 100. # """ # halfband = self.planck_relative_bandwidths * self.planck_central_nus / 2 # bandwidths = np.zeros((len(self.planck_relative_bandwidths), length)) # v = [] # for i, hb in enumerate(halfband): # bandwidths[i] = np.linspace(self.planck_central_nus[i] - hb, self.planck_central_nus[i] + hb, num=length) # v.append((bandwidths[i], np.ones_like(bandwidths[i]))) # return v # def get_planck_sensitivity(self, kind): # """ # Convert the sensitiviy per pixel to sensitivity per arcmin. # """ # if kind == "I": # return self.planck_Isensitivities_pixel * self.planck_beams ** 2 # return self.planck_Psensitivities_pixel * self.planck_beams ** 2
[docs] class Qubic_sky(sky): """ Define a sky object as seen by Qubic """ def __init__(self, skyconfig, d, output_directory="./", output_prefix="qubic_sky"): self.Nfin = int(d["nf_sub"]) self.Nfout = int(d["nf_recon"]) self.filter_relative_bandwidth = d["filter_relative_bandwidth"] self.filter_nu = int(d["filter_nu"] / 1e9) _, nus_edge_in, central_nus, deltas, _, _ = compute_freq(self.filter_nu, self.Nfin, self.filter_relative_bandwidth) self.qubic_central_nus = central_nus # THESE LINES HAVE TO BE CONFIRMED/IMPROVED in future since fwhm = lambda / (P Delta_x) # is an approximation for the resolution if d["config"] == "FI": self.fi2td = 1 elif d["config"] == "TD": P_FI = 22 # horns in the largest baseline in the FI P_TD = 8 # horns in the largest baseline in the TD self.fi2td = (P_FI - 1) / (P_TD - 1) # self.qubic_resolution_nus = d["synthbeam_peak150_fwhm"] * 150 / self.qubic_central_nus * self.fi2td self.qubic_channels_names = ["{:.3s}".format(str(i)) + "_GHz" for i in self.qubic_central_nus] instrument = { "nside": d["nside"], "frequencies": central_nus, # GHz "use_smoothing": False, "beams": np.ones_like(central_nus), # arcmin "add_noise": False, # If True `sens_I` and `sens_Q` are required "noise_seed": 0.0, # Not used if `add_noise` is False "sens_I": np.ones_like(central_nus), # Not used if `add_noise` is False "sens_P": np.ones_like(central_nus), # Not used if `add_noise` is False "use_bandpass": False, # If True pass banpasses with the key `channels` "channel_names": self.qubic_channels_names, # np.ones_like(central_nus), "channels": np.ones_like(central_nus), "output_units": "uK_RJ", "output_directory": output_directory, "output_prefix": output_prefix, "pixel_indices": None, } sky.__init__(self, skyconfig, d, instrument, output_directory, output_prefix)
[docs] def get_fullsky_convolved_maps(self, FWHMdeg=None, verbose=None): """ This returns full sky maps at each subfrequency convolved by the beam of the instrument at each frequency or with another beam if FWHMdeg is provided. when FWHMdeg is 0, the maps are not convolved. Parameters ---------- FWHMdeg: float verbose: bool Returns ------- """ # First get the full sky maps fullmaps = self.get_simple_sky_map() # Convolve the maps fwhms, fullmaps = self.smoothing(fullmaps, FWHMdeg, self.Nfin, self.qubic_central_nus, verbose=verbose) self.instrument["beams"] = fwhms return fullmaps
[docs] def smoothing(self, maps, FWHMdeg, Nf, central_nus, verbose=True): """Convolve the maps to the FWHM at each sub-frequency or to a common beam if FWHMdeg is given.""" fwhms = np.zeros(Nf) if FWHMdeg is not None: fwhms += FWHMdeg else: fwhms = self.dictionary["synthbeam_peak150_fwhm"] * 150.0 / central_nus * self.fi2td for i in range(Nf): if fwhms[i] != 0: maps[i, :, :] = hp.sphtfunc.smoothing(maps[i, :, :].T, fwhm=np.deg2rad(fwhms[i])).T return fwhms, maps
[docs] def get_partial_sky_maps_withnoise( self, coverage=None, version_FastSim="01", sigma_sec=None, Nyears=4.0, FWHMdeg=None, seed=None, noise_profile=True, spatial_noise=True, nunu_correlation=True, noise_only=False, integrate_into_band=True, verbose=False, noise_covcut=0.1, ): """ This returns maps in the same way as with get_simple_sky_map but cut according to the coverage and with noise added according to this coverage and the RMS in muK.sqrt(sec) given by sigma_sec The default integration time is 4 years but can be modified with optional variable Nyears Note that the maps are convolved with the instrument beam by default, or with FWHMdeg (can be an array) if provided. If seed is provided, it will be used for the noise realization. If not it will be a new realization at each call. The optional effective_variance_invcov keyword is a modification law to be applied to the coverage in order to obtain more realistic noise profile. It is a law for effective RMS as a function of inverse coverage and is 2D array with the first one being (nx samples) inverse coverage and the second being the corresponding effective variance to be used through interpolation when generating the noise. Parameters ---------- coverage: array Coverage map of the sky. By default, we load a coverage centered on the galactic center with 10000 pointings. version_FastSim: str Version of the FastSimulator files: 01, 02, 03... For now, only 01 exists. sigma_sec: float Nyears: float Integration time for observation to scale the noise, by default it is 4. FWHMdeg: seed: noise_profile: spatial_noise: bool If True, spatial noise correlations are added. True by default. nunu_correlation: bool If True, correlations between frequency sub-bands are added. True by default. noise_only: bool If True, only returns the noise maps and the coverage (without the sky signal). integrate_into_band: bool If True, averaging input sub-band maps into reconstruction sub-bands. True by default. verbose: bool Returns ------- maps + noisemaps, maps, noisemaps, coverage """ ### Input bands Nfreq_edges, nus_edge, nus, deltas, Delta, Nbbands = compute_freq(self.filter_nu, self.Nfin, self.filter_relative_bandwidth) ### Output bands # Check Nfout is between 1 and 8. if self.Nfout < 1 or self.Nfout > 8: raise NameError("Nfout should be contained between 1 and 8 for FastSimulation.") Nfreq_edges_out, nus_edge_out, nus_out, deltas_out, Delta_out, Nbbands_out = compute_freq(self.filter_nu, self.Nfout, self.filter_relative_bandwidth) # First get the convolved maps if noise_only is False: maps = np.zeros((self.Nfout, self.npix, 3)) if integrate_into_band: if verbose: print("Convolving each input frequency map") maps_all = self.get_fullsky_convolved_maps(FWHMdeg=FWHMdeg, verbose=verbose) # Now averaging maps into reconstruction sub-bands maps if verbose: print("Averaging input maps from input sub-bands into reconstruction sub-bands:") for i in range(self.Nfout): print("doing band {} {} {}".format(i, nus_edge_out[i], nus_edge_out[i + 1])) inband = (nus > nus_edge_out[i]) & (nus < nus_edge_out[i + 1]) maps[i, :, :] = np.mean(maps_all[inband, :, :], axis=0) else: for i in range(self.Nfout): freq = nus_out[i] maps[i, :, :] = (self.sky.get_emission(freq * u.GHz) * utils.bandpass_unit_conversion(freq * u.GHz, weights=None, output_unit=u.uK_CMB)).T _, maps = self.smoothing(maps, FWHMdeg, self.Nfout, nus_out, verbose=verbose) ############################################################################################################## # Restore data for FastSimulation ############################################################################ ############################################################################################################## #### Directory for fast simulations dir_fast = os.path.join(data_dir, f"FastSimulator_version{version_FastSim}") #### Integration time assumed in FastSim files fastsimfile_effective_duration = 2.0 fast_pkl = os.path.join(dir_fast, "DataFastSimulator_{}{}_nfsub_{}.pkl".format(self.dictionary["config"], str(self.filter_nu), self.Nfout)) with open(fast_pkl, "rb") as file: print("reading pickle file: %s" % fast_pkl) DataFastSim = pickle.load(file) # Read Coverage map if coverage is None: coverage_pkl = os.path.join(dir_fast, "DataFastSimulator_{}{}_coverage.pkl".format(self.dictionary["config"], str(self.filter_nu))) print("reading coverage file: %s" % coverage_pkl) h = open(coverage_pkl, "rb") DataFastSimCoverage = pickle.load(h) h.close() coverage = DataFastSimCoverage["coverage"] # Read noise normalization if sigma_sec is None: #### Beware ! Initial End-To-End simulations that produced the first FastSimulator were done with #### Effective_duration = 4 years and this is the meaning of signoise #### New files were done with 2 years and as result the signoise needs to be multiplied by sqrt(effective_duration/4) sigma_sec = DataFastSim["signoise"] * np.sqrt(fastsimfile_effective_duration / 4.0) # # Read Nyears # if Nyears is None: # Nyears = DataFastSim['years'] # Read Noise Profile if noise_profile is True: effective_variance_invcov = DataFastSim["effective_variance_invcov"] else: effective_variance_invcov = None # Read Spatial noise correlation if spatial_noise is True: clnoise = DataFastSim["clnoise"] else: clnoise = None # Read Noise Profile if nunu_correlation is True: covI = DataFastSim["CovI"] covQ = DataFastSim["CovQ"] covU = DataFastSim["CovU"] sub_bands_cov = [covI, covQ, covU] else: sub_bands_cov = None ############################################################################################################## # Now pure noise maps if verbose: print("Making noise realizations") noisemaps = self.create_noise_maps( sigma_sec, coverage, nsub=self.Nfout, Nyears=Nyears, verbose=verbose, seed=seed, effective_variance_invcov=effective_variance_invcov, clnoise=clnoise, sub_bands_cov=sub_bands_cov, covcut=noise_covcut, ) if self.Nfout == 1: noisemaps = np.reshape(noisemaps, (1, len(coverage), 3)) seenpix = noisemaps[0, :, 0] != 0 coverage[~seenpix] = 0 if noise_only: return noisemaps, coverage else: maps[:, ~seenpix, :] = 0 return maps + noisemaps, maps, noisemaps, coverage
[docs] def create_noise_maps(self, sigma_sec, coverage, covcut=0.1, nsub=1, Nyears=4, verbose=False, seed=None, effective_variance_invcov=None, clnoise=None, sub_bands_cov=None): """ This returns a realization of noise maps for I, Q and U with no correlation between them, according to a noise RMS map built according to the coverage specified as an attribute to the class The optional effective_variance_invcov keyword is a modification law to be applied to the coverage in order to obtain more realistic noise profile. It is a law for effective RMS as a function of inverse coverage and is 2D array with the first one being (nx samples) inverse coverage and the second being the corresponding effective variance to be used through interpolation when generating the noise. The clnoise option is used to apply a convolution to the noise to obtain spatially correlated noise. This cl should be calculated from the c(theta) of the noise that can be measured using the function ctheta_parts() below. The transformation of this C9theta) into Cl has to be done using wrappers on camb function found in camb_interface.py of the QUBIC software: the functions to back and forth from ctheta to cl are: cl_2_ctheta and ctheta_2_cell. The simulation of the noise itself calls a function of camb_interface called simulate_correlated_map(). Parameters ---------- sigma_sec coverage Nyears verbose seed effective_variance_invcov Returns ------- """ # Seen pixels seenpix = (coverage / np.max(coverage)) > covcut # Sigma_sec for each Stokes: by default they are the same unless there is non trivial covariance if sub_bands_cov is None: fact_I = np.ones(nsub) fact_Q = np.ones(nsub) fact_U = np.ones(nsub) else: fact_I = 1.0 / np.sqrt(np.diag(sub_bands_cov[0] / sub_bands_cov[0][0, 0])) fact_Q = 1.0 / np.sqrt(np.diag(sub_bands_cov[1] / sub_bands_cov[1][0, 0])) fact_U = 1.0 / np.sqrt(np.diag(sub_bands_cov[2] / sub_bands_cov[2][0, 0])) all_sigma_sec_I = fact_I * sigma_sec all_sigma_sec_Q = fact_Q * sigma_sec all_sigma_sec_U = fact_U * sigma_sec thnoiseI = np.zeros((nsub, len(seenpix))) thnoiseQ = np.zeros((nsub, len(seenpix))) thnoiseU = np.zeros((nsub, len(seenpix))) for isub in range(nsub): # The theoretical noise in I for the coverage ideal_noise_I = self.theoretical_noise_maps(all_sigma_sec_I[isub], coverage, Nyears=Nyears, verbose=verbose) ideal_noise_Q = self.theoretical_noise_maps(all_sigma_sec_Q[isub], coverage, Nyears=Nyears, verbose=verbose) ideal_noise_U = self.theoretical_noise_maps(all_sigma_sec_U[isub], coverage, Nyears=Nyears, verbose=verbose) sh = np.shape(ideal_noise_I) if effective_variance_invcov is None: thnoiseI[isub, :] = ideal_noise_I thnoiseQ[isub, :] = ideal_noise_Q thnoiseU[isub, :] = ideal_noise_U else: if isinstance(effective_variance_invcov, list): my_effective_variance_invcov = effective_variance_invcov[isub] else: my_effective_variance_invcov = effective_variance_invcov sh = np.shape(my_effective_variance_invcov) if sh[0] == 2: ### We have the same correction for I, Q and U correction = np.interp(np.max(coverage[seenpix]) / coverage[seenpix], my_effective_variance_invcov[0, :], my_effective_variance_invcov[1, :]) thnoiseI[isub, seenpix] = ideal_noise_I[seenpix] * np.sqrt(correction) thnoiseQ[isub, seenpix] = ideal_noise_Q[seenpix] * np.sqrt(correction) thnoiseU[isub, seenpix] = ideal_noise_U[seenpix] * np.sqrt(correction) else: ### We have distinct correction for I and QU correctionI = np.interp(np.max(coverage[seenpix]) / coverage[seenpix], my_effective_variance_invcov[0, :], my_effective_variance_invcov[1, :]) correctionQU = np.interp(np.max(coverage[seenpix]) / coverage[seenpix], my_effective_variance_invcov[0, :], my_effective_variance_invcov[2, :]) thnoiseI[isub, seenpix] = ideal_noise_I[seenpix] * np.sqrt(correctionI) thnoiseQ[isub, seenpix] = ideal_noise_Q[seenpix] * np.sqrt(correctionQU) thnoiseU[isub, seenpix] = ideal_noise_U[seenpix] * np.sqrt(correctionQU) noise_maps = np.zeros((nsub, len(coverage), 3)) if seed is not None: np.random.seed(seed) ### Simulate variance 1 maps for each sub-band independently for isub in range(nsub): if clnoise is None: ### With no sspatial correlation if verbose: if isub == 0: print("Simulating noise maps with no spatial correlation") IrndFull = np.random.randn(self.npix) QrndFull = np.random.randn(self.npix) * np.sqrt(2) UrndFull = np.random.randn(self.npix) * np.sqrt(2) else: ### With spatial correlations given by cl which is the Legendre transform of the targetted C(theta) ### NB: here one should not expect the variance of the obtained maps to make complete sense because ### of ell space truncation. They have however the correct Cl spectrum in the relevant ell range ### (up to lmax = 2*nside) if verbose: if isub == 0: print("Simulating noise maps with spatial correlation") IrndFull = qc.simulate_correlated_map(self.nside, 1.0, clin=clnoise, verbose=False) QrndFull = qc.simulate_correlated_map(self.nside, 1.0, clin=clnoise, verbose=False) * np.sqrt(2) UrndFull = qc.simulate_correlated_map(self.nside, 1.0, clin=clnoise, verbose=False) * np.sqrt(2) ### put them into the whole sub-bandss array noise_maps[isub, seenpix, 0] = IrndFull[seenpix] noise_maps[isub, seenpix, 1] = QrndFull[seenpix] noise_maps[isub, seenpix, 2] = UrndFull[seenpix] ### If there is non-diagonal noise covariance between sub-bands (spectro-imaging case) if nsub > 1: if sub_bands_cov is not None: if verbose: print("Simulating noise maps sub-bands covariance") ### We get the eigenvalues and eigenvectors of the sub-band covariance matrix divided by its 0,0 element ### The reason for this si that the overall noise is given by the input parameter sigma_sec which we do not ### want to override wI, vI = np.linalg.eig(sub_bands_cov[0] / sub_bands_cov[0][0, 0]) wQ, vQ = np.linalg.eig(sub_bands_cov[1] / sub_bands_cov[1][0, 0]) wU, vU = np.linalg.eig(sub_bands_cov[2] / sub_bands_cov[2][0, 0]) ### Multiply the maps by the sqrt(eigenvalues) for isub in range(nsub): noise_maps[isub, seenpix, 0] *= np.sqrt(wI[isub]) noise_maps[isub, seenpix, 1] *= np.sqrt(wQ[isub]) noise_maps[isub, seenpix, 2] *= np.sqrt(wU[isub]) ### Apply the rotation to each Stokes Parameter separately noise_maps[:, seenpix, 0] = np.dot(vI, noise_maps[:, seenpix, 0]) noise_maps[:, seenpix, 1] = np.dot(vQ, noise_maps[:, seenpix, 1]) noise_maps[:, seenpix, 2] = np.dot(vU, noise_maps[:, seenpix, 2]) # Now normalize the maps with the coverage behaviour and the sqrt(2) for Q and U noise_maps[:, seenpix, 0] *= thnoiseI[:, seenpix] noise_maps[:, seenpix, 1] *= thnoiseQ[:, seenpix] noise_maps[:, seenpix, 2] *= thnoiseU[:, seenpix] if nsub == 1: return noise_maps[0, :, :] else: return noise_maps
[docs] def theoretical_noise_maps(self, sigma_sec, coverage, Nyears=4, verbose=False): """ This returns a map of the RMS noise (not an actual realization, just the expected RMS - No covariance) Parameters ---------- sigma_sec: float Noise level. coverage: array The coverage map. Nyears: int verbose: bool Returns ------- """ # ###### Noise normalization # We assume we have integrated for a time Ttot in seconds with a sigma per root sec sigma_sec Ttot = Nyears * 365 * 24 * 3600 # in seconds if verbose: print("Total time is {} seconds".format(Ttot)) # Oberved pixels thepix = coverage > 0 # Normalized coverage (sum=1) covnorm = coverage / np.sum(coverage) if verbose: print("Normalized coverage sum: {}".format(np.sum(covnorm))) # Time per pixel Tpix = np.zeros_like(covnorm) Tpix[thepix] = Ttot * covnorm[thepix] if verbose: print("Sum Tpix: {} s ; Ttot = {} s".format(np.sum(Tpix), Ttot)) # RMS per pixel Sigpix = np.zeros_like(covnorm) Sigpix[thepix] = sigma_sec / np.sqrt(Tpix[thepix]) if verbose: print("Total noise (with no averages in pixels): {}".format(np.sum((Sigpix * Tpix) ** 2))) return Sigpix
def random_string(nchars): lst = [rd.choice(string.ascii_letters + string.digits) for n in range(nchars)] str = "".join(lst) return str def get_noise_invcov_profile(maps, coverage, covcut=0.1, nbins=100, fit=True, label="", norm=False, allstokes=False, fitlim=None, doplot=False, QUsep=True): seenpix = coverage > (covcut * np.max(coverage)) covnorm = coverage / np.max(coverage) xx, _, _, dyI, _ = ft.profile(np.sqrt(1.0 / covnorm[seenpix]), maps[seenpix, 0], nbins=nbins, plot=False) xx, _, _, dyQ, _ = ft.profile(np.sqrt(1.0 / covnorm[seenpix]), maps[seenpix, 1], nbins=nbins, plot=False) xx, _, _, dyU, _ = ft.profile(np.sqrt(1.0 / covnorm[seenpix]), maps[seenpix, 2], nbins=nbins, plot=False) avg = np.sqrt((dyI**2 + dyQ**2 / 2 + dyU**2 / 2) / 3) avgQU = np.sqrt((dyQ**2 / 2 + dyU**2 / 2) / 2) if norm: fact = xx[0] / avg[0] else: fact = 1.0 myY = (avg / xx) * fact myYI = (dyI / xx) * fact myYQU = (avgQU / xx) * fact if doplot: if QUsep is False: p = plt.plot(xx**2, myY, "o", label=label + " IQU") if allstokes: plt.plot(xx**2, myYI, label=label + " I", alpha=0.3) plt.plot(xx**2, myYQU, label=label + " Average Q, U /sqrt(2)", alpha=0.3) else: pi = plt.plot(xx**2, myYI, "o", label=label + " I") pqu = plt.plot(xx**2, myYQU, "o", label=label + " QU / sqrt(2)") if fit: ok = np.isfinite(myY) if fitlim is not None: print("Clipping fit from {} to {}".format(fitlim[0], fitlim[1])) ok = ok & (xx >= fitlim[0]) & (xx <= fitlim[1]) def model_simple(x, a, b, c, d, e): return a + b * x + c * np.exp(-d * (x - e)) def model_complex(x, a, b, c, d, e, f, g): return a + b * x + f * x**2 + g * x**3 + c * np.exp(-d * (x - e)) if QUsep is False: myfit = curve_fit(model_simple, xx[ok] ** 2, myY[ok], p0=[np.min(myY[ok]), 0.4, 0, 2, 1.5], maxfev=100000, ftol=1e-7) else: myfitI = curve_fit(model_complex, xx[ok] ** 2, myYI[ok], p0=[np.min(myY[ok]), 0.4, 0, 2, 1.5, 0.0, 0.0], maxfev=100000, ftol=1e-7) myfitQU = curve_fit(model_complex, xx[ok] ** 2, myYQU[ok], p0=[np.min(myY[ok]), 0.4, 0, 2, 1.5, 0.0, 0.0], maxfev=100000, ftol=1e-7) if doplot: if QUsep is False: plt.plot(xx**2, model_simple(xx**2, *myfit[0]), label=label + " Fit", color=p[0].get_color()) else: plt.plot(xx**2, model_complex(xx**2, *myfitI[0]), label=label + " Fit I", color=pi[0].get_color()) plt.plot(xx**2, model_complex(xx**2, *myfitQU[0]), label=label + " Fit QU / sqrt(2)", color=pqu[0].get_color()) invcov_samples = np.linspace(1, 15, 1000) if QUsep is False: eff_v = model_simple(invcov_samples, *myfit[0]) ** 2 eff_v[invcov_samples < xx[0] ** 2] = model_simple(xx[0] ** 2, *myfit[0]) ** 2 eff_v[invcov_samples > xx[-1] ** 2] = model_simple(xx[-1] ** 2, *myfit[0]) ** 2 effective_variance_invcov = np.array([invcov_samples, eff_v]) else: eff_vI = model_complex(invcov_samples, *myfitI[0]) ** 2 eff_vQU = model_complex(invcov_samples, *myfitQU[0]) ** 2 eff_vI[invcov_samples < xx[0] ** 2] = model_complex(xx[0] ** 2, *myfitI[0]) ** 2 eff_vQU[invcov_samples > xx[-1] ** 2] = model_complex(xx[-1] ** 2, *myfitQU[0]) ** 2 effective_variance_invcov = np.array([invcov_samples, eff_vI, eff_vQU]) if doplot: plt.xlabel("1./cov normed") plt.legend() if norm: add_yl = " (Normalized to 1 at 1)" else: add_yl = "" plt.ylabel("RMS Ratio w.r.t linear scaling" + add_yl) if fit: return xx, myY, effective_variance_invcov else: return xx, myY, None def get_angular_profile(maps, thmax=25, nbins=20, label="", center=np.array([316.44761929, -58.75808063]), allstokes=False, fontsize=None, doplot=False, separate=False): vec0 = hp.ang2vec(center[0], center[1], lonlat=True) sh = np.shape(maps) ns = hp.npix2nside(sh[0]) vecpix = hp.pix2vec(ns, np.arange(12 * ns**2)) angs = np.degrees(np.arccos(np.dot(vec0, vecpix))) rng = np.array([0, thmax]) xx, yyI, dx, dyI, _ = ft.profile(angs, maps[:, 0], nbins=nbins, plot=False, rng=rng) xx, yyQ, dx, dyQ, _ = ft.profile(angs, maps[:, 1], nbins=nbins, plot=False, rng=rng) xx, yyU, dx, dyU, _ = ft.profile(angs, maps[:, 2], nbins=nbins, plot=False, rng=rng) print(dyI.shape, dyQ.shape, dyU.shape) avg = np.sqrt((dyI**2 + dyQ**2 / 2 + dyU**2 / 2) / 3) if doplot: plt.plot(xx, avg, "o", label=label + " Average") if allstokes: plt.plot(xx, dyI, label=label + " I", alpha=0.3) plt.plot(xx, dyQ / np.sqrt(2), label=label + " Q/sqrt(2)", alpha=0.3) plt.plot(xx, dyU / np.sqrt(2), label=label + " U/sqrt(2)", alpha=0.3) plt.xlabel("Angle [deg.]") plt.ylabel("RMS") plt.legend(fontsize=fontsize) if separate: return xx, dyI, dyQ, dyU else: return xx, avg def correct_maps_rms(maps, cov, effective_variance_invcov): okpix = cov > 0 newmaps = maps * 0 sh = np.shape(effective_variance_invcov) if sh[0] == 2: correction = np.interp(np.max(cov) / cov[okpix], effective_variance_invcov[0, :], effective_variance_invcov[1, :]) for s in range(3): newmaps[okpix, s] = maps[okpix, s] / np.sqrt(correction) * np.sqrt(cov[okpix] / np.max(cov)) else: correctionI = np.interp(np.max(cov) / cov[okpix], effective_variance_invcov[0, :], effective_variance_invcov[1, :]) correctionQU = np.interp(np.max(cov) / cov[okpix], effective_variance_invcov[0, :], effective_variance_invcov[2, :]) newmaps[okpix, 0] = maps[okpix, 0] / np.sqrt(correctionI) * np.sqrt(cov[okpix] / np.max(cov)) newmaps[okpix, 1] = maps[okpix, 1] / np.sqrt(correctionQU) * np.sqrt(cov[okpix] / np.max(cov)) newmaps[okpix, 2] = maps[okpix, 2] / np.sqrt(correctionQU) * np.sqrt(cov[okpix] / np.max(cov)) return newmaps def flatten_noise(maps, coverage, nbins=20, doplot=False, normalize_all=False, QUsep=True): sh = np.shape(maps) if len(sh) == 2: maps = np.reshape(maps, (1, sh[0], sh[1])) out_maps = np.zeros_like(maps) newsh = np.shape(maps) all_fitcov = [] all_norm_noise = [] if doplot: plt.figure() for isub in range(newsh[0]): xx, yy, fitcov = get_noise_invcov_profile(maps[isub, :, :], coverage, nbins=nbins, norm=False, label="sub-band: {}".format(isub), fit=True, doplot=doplot, allstokes=True, QUsep=QUsep) all_norm_noise.append(yy[0]) if doplot: plt.legend(fontsize=10) all_fitcov.append(fitcov) if normalize_all: out_maps[isub, :, :] = correct_maps_rms(maps[isub, :, :], coverage, fitcov) else: out_maps[isub, :, :] = correct_maps_rms(maps[isub, :, :], coverage, all_fitcov[0]) if len(sh) == 2: return out_maps[0, :, :], all_fitcov else: return out_maps, all_fitcov, all_norm_noise def map_corr_neighbtheta(themap_in, ipok_in, thetamin, thetamax, nbins, degrade=None, verbose=True): if degrade is None: themap = themap_in.copy() ipok = ipok_in.copy() else: themap = hp.ud_grade(themap_in, degrade) mapbool = themap_in < -1e30 mapbool[ipok_in] = True mapbool = hp.ud_grade(mapbool, degrade) ip = np.arange(12 * degrade**2) ipok = ip[mapbool] rthmin = np.radians(thetamin) rthmax = np.radians(thetamax) thvals = np.linspace(rthmin, rthmax, nbins + 1) ns = hp.npix2nside(len(themap)) thesum = np.zeros(nbins) thesum2 = np.zeros(nbins) thecount = np.zeros(nbins) if verbose: bar = progress_bar(len(ipok), "Pixels") for i in range(len(ipok)): if verbose: bar.update() valthis = themap[ipok[i]] v = hp.pix2vec(ns, ipok[i]) # ipneighb_inner = [] ipneighb_inner = list(hp.query_disc(ns, v, np.radians(thetamin))) for j in range(nbins): thmax = thvals[j + 1] ipneighb_outer = list(hp.query_disc(ns, v, thmax)) ipneighb = ipneighb_outer.copy() for k in ipneighb_inner: ipneighb.remove(k) valneighb = themap[ipneighb] thesum[j] += np.sum(valthis * valneighb) thesum2[j] += np.sum((valthis * valneighb) ** 2) thecount[j] += len(valneighb) ipneighb_inner = ipneighb_outer.copy() mm = thesum / thecount mm2 = thesum2 / thecount errs = np.sqrt(mm2 - mm**2) / np.sqrt(np.sqrt(thecount)) corrfct = thesum / thecount mythetas = np.degrees(thvals[:-1] + thvals[1:]) / 2 return mythetas, corrfct, errs def get_angles(ip0, ips, ns): v = np.array(hp.pix2vec(ns, ip0)) vecs = np.array(hp.pix2vec(ns, ips)) th = np.degrees(np.arccos(np.dot(v.T, vecs))) return th def ctheta_parts(themap, ipok, thetamin, thetamax, nbinstot, nsplit=4, degrade_init=None, verbose=True): allthetalims = np.linspace(thetamin, thetamax, nbinstot + 1) thmin = allthetalims[:-1] thmax = allthetalims[1:] idx = np.arange(nbinstot) // (nbinstot // nsplit) if degrade_init is None: nside_init = hp.npix2nside(len(themap)) else: nside_init = degrade_init thall = np.zeros(nbinstot) cthall = np.zeros(nbinstot) errcthall = np.zeros(nbinstot) for k in range(nsplit): thispart = idx == k mythmin = np.min(thmin[thispart]) mythmax = np.max(thmax[thispart]) mynbins = nbinstot // nsplit mynside = nside_init // (2**k) if verbose: print("Doing {0:3.0f} bins between {1:5.2f} and {2:5.2f} deg at nside={3:4.0f}".format(mynbins, mythmin, mythmax, mynside)) myth, mycth, errs = map_corr_neighbtheta(themap, ipok, mythmin, mythmax, mynbins, degrade=mynside, verbose=verbose) cthall[thispart] = mycth errcthall[thispart] = errs thall[thispart] = myth ### One could also calculate the average of the distribution of pixels within the ring instead of the simplistic thetas dtheta = allthetalims[1] - allthetalims[0] thall = 2.0 / 3 * ((thmin + dtheta) ** 3 - thmin**3) / ((thmin + dtheta) ** 2 - thmin**2) ### But it actually changes very little # print('coucou') return thall, cthall, errcthall def get_cov_nunu(maps, coverage, nbins=20, QUsep=True, return_flat_maps=False): # This function returns the sub-frequency, sub_frequency covariance matrix for each stoke parameter # it does not attend to check for covariance between Stokes parameters (this should be incorporated later) # it returns the three covariance matrices as well as the fitted function of coverage that was used to # flatten the noise RMS in the maps before covariance calculation (this is for subsequent possible use) # NB: because this is done with maps that are flattened, the RMS is put to 1 for I (and should be sqrt(2) for Q and U # so this covariance absorbes the overall maps variances ### First normalize by coverage new_sub_maps, all_fitcov, all_norm_noise = flatten_noise(maps, coverage, nbins=nbins, doplot=False, QUsep=QUsep) ### Now calculate the covariance matrix for each sub map sh = np.shape(maps) if len(sh) == 2: okpix = new_sub_maps[:, 0] != 0 cov_I = np.array([[np.cov(new_sub_maps[okpix, 0])]]) cov_Q = np.array([[np.cov(new_sub_maps[okpix, 1])]]) cov_U = np.array([[np.cov(new_sub_maps[okpix, 2])]]) else: okpix = new_sub_maps[0, :, 0] != 0 cov_I = np.cov(new_sub_maps[:, okpix, 0]) cov_Q = np.cov(new_sub_maps[:, okpix, 1]) cov_U = np.cov(new_sub_maps[:, okpix, 2]) if sh[0] == 1: cov_I = np.array([[cov_I]]) cov_Q = np.array([[cov_Q]]) cov_U = np.array([[cov_U]]) if return_flat_maps: return cov_I, cov_Q, cov_U, all_fitcov, all_norm_noise, new_sub_maps else: return cov_I, cov_Q, cov_U, all_fitcov, all_norm_noise