import pickle
import healpy as hp
import numpy as np
import pysm3
import pysm3.units as u
from pysimulators.interfaces.healpy import HealpixConvolutionGaussianOperator
from pysm3 import utils
from qubic.data import PATH as data_dir
from qubic.lib.MapMaking.FrequencyMapMaking.Qspectra_component import CMBModel
[docs]
class Maps:
def __init__(self, skyconfig, nus, nrec, nside=256, corrected_bandpass=True):
self.nus = nus
self.nside = nside
self.nrec = nrec
self.nsub = len(self.nus)
self.skyconfig = skyconfig
#! Tom: Corrected factor to adjust Planck maps regarding Bicep results, should be around 1.5
# Need to be verify and corrected (+ implemented in a cleaner way)
self.corrected_factor_bicep = 1.5
self.skyconfig_foregrounds = []
for key in skyconfig.keys():
if key == "cmb":
self.is_cmb = True
else:
self.skyconfig_foregrounds += [skyconfig[key]]
[docs]
def average_within_band(self, m_nu):
m_mean = np.zeros((self.nrec, 12 * self.nside**2, 3))
fsub = self.nsub // self.nrec
for i in range(self.nrec):
m_mean[i] = np.mean(m_nu[i * fsub : (i + 1) * fsub], axis=0)
return m_mean
def _corrected_maps(self, m_nu, m_nu_fg):
"""
Function to remove the mean of the foreground maps m_nu_fg in the frequency maps m_nu.
"""
fsub = self.nsub // self.nrec
mean_fg = self.average_within_band(m_nu_fg)
k = 0
for i in range(self.nrec):
delta = m_nu_fg[i * fsub : (i + 1) * fsub] - mean_fg[i]
for j in range(fsub):
m_nu[k] -= delta[j]
k += 1
return m_nu
def _get_cmb(self, r, Alens, seed):
#! Tom : Is it okay to compute the CMB from power spectrum instead of using PySM ?
#! I compared the two options and the fluctuations amplitude are not the same
cmbmodel = CMBModel(None)
mycls = cmbmodel.give_cl_cmb(r, Alens)
np.random.seed(seed)
cmb = hp.synfast(mycls, self.nside, verbose=False, new=True).T
return cmb
[docs]
def average_map(self, r, Alens, central_nu, bw, nb=100):
mysky = np.zeros((12 * self.nside**2, 3))
if len(self.skyconfig_foregrounds) != 0:
sky = pysm3.Sky(nside=self.nside, preset_strings=self.skyconfig_foregrounds)
edges_min = central_nu - bw / 2
edges_max = central_nu + bw / 2
bandpass_frequencies = np.linspace(edges_min, edges_max, nb)
print(f"Integrating bandpass from {edges_min} GHz to {edges_max} GHz with {nb} frequencies.")
mysky += np.array(sky.get_emission(bandpass_frequencies * u.GHz, None) * utils.bandpass_unit_conversion(bandpass_frequencies * u.GHz, None, u.uK_CMB)).T / self.corrected_factor_bicep
if self.is_cmb:
cmb = self._get_cmb(r, Alens, self.skyconfig["cmb"])
mysky += cmb
return mysky
[docs]
class PlanckMaps(Maps):
def __init__(self, skyconfig, nus, nrec, nside=256, r=0, Alens=1):
Maps.__init__(self, skyconfig, nus, nrec, nside=nside)
self.experiments = {
"Planck": {
"frequency": [30, 44, 70, 100, 143, 217, 353],
"depth_i": [150.0, 162.0, 210.0, 77.4, 33.0, 46.8, 154],
"depth_p": [210.0, 240.0, 300.0, 118, 70.2, 105.0, 439],
"fwhm": [32.29, 27.94, 13.08, 9.66, 7.22, 4.90, 4.92],
"bw": [0.2, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2],
}
}
self.skyconfig = skyconfig
self.r = r
self.Alens = Alens
self.nside = nside
def _get_ave_map(self, r, Alens, skyconfig, central_nu, bw, nsub=100):
is_cmb = False
model = []
for key in skyconfig.keys():
if key == "cmb":
is_cmb = True
else:
model += [skyconfig[key]]
mysky = np.zeros((12 * self.nside**2, 3))
if len(model) != 0:
sky = pysm3.Sky(nside=self.nside, preset_strings=model)
edges_min = central_nu - bw / 2
edges_max = central_nu + bw / 2
bandpass_frequencies = np.linspace(edges_min, edges_max, nsub)
print(f"Integrating bandpass from {edges_min} GHz to {edges_max} GHz with {nsub} frequencies.")
mysky += np.array(sky.get_emission(bandpass_frequencies * u.GHz, None) * utils.bandpass_unit_conversion(bandpass_frequencies * u.GHz, None, u.uK_CMB)).T / self.corrected_factor_bicep
if is_cmb:
cmb = self._get_cmb(r, Alens, skyconfig["cmb"])
mysky += cmb
return mysky
def _get_fwhm(self, nu):
with open(data_dir + f"Planck{nu:.0f}GHz.pkl", "rb") as f:
data = pickle.load(f)
fwhmi = data[f"fwhm{nu:.0f}"]
return fwhmi
def _get_noise(self, nu):
index = self.experiments["Planck"]["frequency"].index(nu)
np.random.seed(None)
sigma_I = self.experiments["Planck"]["depth_i"][index] / hp.nside2resol(self.nside, arcmin=True)
sigma_P = self.experiments["Planck"]["depth_p"][index] / hp.nside2resol(self.nside, arcmin=True)
sigma = [sigma_I, sigma_P, sigma_P]
out = np.random.standard_normal(np.ones((12 * self.nside**2, 3)).shape) * sigma
return out
[docs]
def run(self, use_fwhm=False, nsub=100):
"""
Method that create global variables such as :
- self.maps : Frequency maps from external data with shape (Nf, Npix, Nstk)
- self.external_nus : Frequency array [GHz]
"""
maps = np.zeros((len(self.experiments["Planck"]["frequency"]), 12 * self.nside**2, 3))
maps_noise = np.zeros((len(self.experiments["Planck"]["frequency"]), 12 * self.nside**2, 3))
self.fwhm_ext = []
for inu, nu in enumerate(self.experiments["Planck"]["frequency"]):
bandwidth = self.experiments["Planck"]["bw"][inu]
maps[inu] = self._get_ave_map(self.r, self.Alens, self.skyconfig, nu, nu * bandwidth, nsub=nsub)
n = self._get_noise(nu)
maps[inu] += n
maps_noise[inu] += n
if use_fwhm:
# Conversion from arcmin to rad
fwhm_rad = np.deg2rad(self._get_fwhm(nu) / 60.0)
C = HealpixConvolutionGaussianOperator(fwhm=fwhm_rad, lmax=3 * self.nside - 1)
self.fwhm_ext.append(fwhm_rad)
maps[inu] = C(maps[inu])
maps_noise[inu] = C(maps_noise[inu])
else:
self.fwhm_ext.append(0)
return maps, maps_noise