import camb.correlations as cc
import healpy as hp
import numpy as np
import scipy.constants as c
import scipy.special as sp
from CoolProp import CoolProp as CP
from pyoperators import (
Cartesian2SphericalOperator,
Rotation3dOperator,
Spherical2CartesianOperator,
)
from pysimulators.interfaces.healpy import Spherical2HealpixOperator
from scipy.integrate import quad
from qubic.lib.Instrument.Qinstrument import compute_freq
from qubic.lib.Qdictionary import qubicDict
from qubic.lib.Qsamplings import QubicSampling
# TODO : Adjust rho_0 with PWV
# TODO : Verify conversion into µK_CMB
# TODO : Adjust pressure with density
# TODO : Verify frequency dependence of the atmosphere
# TODO : Verify if the atmosphere is in the same frame as the qubic
# TODO : Add the possibility to have a 3d atmosphere
# TODO : Verify that get_integrated_absorption_spectrum works as expected, i.e. tha same way as in MM pipeline to build maps
[docs]
class AtmosphereProperties:
def __init__(self, params):
### Import parameters files
self.params = params
self.qubic_dict = self.get_qubic_dict()
np.random.seed(self.params["seed"])
### Build atmospheric coordinates
# Cartesian coordinates
if self.params["h_grid"] == 1:
# 2d model
self.altitude = (
self.params["h_qubic"] + self.params["altitude_atm_2d"]
) * np.ones(self.params["h_grid"])
else:
# 3d model, not yet implemented
self.altitude = np.linspace(
self.params["h_qubic"],
self.params["altitude_atm_2d"],
self.params["h_grid"],
)
# Import atmosphere coordinates for the flat atmosphere case
if self.params["flat"]:
self.x_list = np.linspace(
-self.params["size_atm"], self.params["size_atm"], self.params["n_grid"]
)
self.y_list = np.linspace(
-self.params["size_atm"], self.params["size_atm"], self.params["n_grid"]
)
# Azimuth / Elevation coordinates
x, y = np.meshgrid(self.x_list, self.y_list)
z = np.ones(x.shape) * self.params["altitude_atm_2d"]
self.r, self.el, self.az = self.horizontal_plane_to_azel(x, y, z)
### Compute atmosphere temperature and mean water vapor density
self.temperature = self.get_temperature_atm(
self.altitude, self.params["temp_ground"], self.params["h_temp"]
)
self.mean_water_vapor_density = self.get_mean_water_vapor_density(
self.altitude, self.params["rho_0"], self.params["h_h2o"]
)
### Compute absorption coefficients
(
self.mol_absorption_coeff,
self.self_absorption_coeff,
self.air_absorption_coeff,
self.integration_frequencies,
) = self.atm_absorption_coeff()
### Compute absorption spectrum
self.abs_spectrum = self.absorption_spectrum()
self.integrated_abs_spectrum, self.frequencies, self.bandwidths = (
self.integrated_absorption_spectrum()
)
### Compute water vapor density's standard deviation
self.sigma_rho = self.get_sigma_rho(
sigma_pwv=self.params["sigma_pwv"], h=2 * self.params["h_h2o"]
)
[docs]
def get_qubic_dict(self, key="in"):
"""QUBIC dictionary.
Method to modify the qubic dictionary.
Parameters
----------
key : str, optional
Can be "in" or "out".
It is used to build respectively the instances to generate the TODs or to reconstruct the sky maps,
by default "in".
Returns
-------
dict_qubic: dict
Modified QUBIC dictionary.
"""
args = {
"nf_sub": self.params[f"nsub_{key}"],
"filter_relative_bandwidth": 0.25,
"npointings": self.params["npointings"],
"nside": self.params["nside"],
"MultiBand": True,
"period": 1,
"RA_center": 0,
"DEC_center": -57,
"filter_nu": 150 * 1e9,
"noiseless": True,
"dtheta": self.params["dtheta"],
"nprocs_sampling": 1,
"photon_noise": False,
"nhwp_angles": 3,
#'effective_duration':3,
"effective_duration150": 3,
"effective_duration220": 3,
"type_instrument": "two",
"TemperatureAtmosphere150": None,
"TemperatureAtmosphere220": None,
"EmissivityAtmosphere150": None,
"EmissivityAtmosphere220": None,
"detector_nep": 4.7e-17,
"synthbeam_kmax": self.params["kmax"],
"synthbeam_fraction": self.params["synthbeam_fraction"],
"kind": "IQU",
}
### Get the default dictionary
dictfilename = "dicts/pipeline_demo.dict"
dict_qubic = qubicDict()
dict_qubic.read_from_file(dictfilename)
### Modify the dictionary
for i in args.keys():
dict_qubic[str(i)] = args[i]
return dict_qubic
[docs]
def get_sigma_rho(self, sigma_pwv, h=None):
r"""Water vapor density's standard deviation
Compute sigma_rho from sigma_pwv at altitude h, with sigma_pwv taken from Table 1 in Sugiyama 2024.
The formula, being $\sigma_{\rho(h)} = \frac{\sigma_{PWV}}{h}$ , comes from a scale approximation.
h should be defined as 2*h_h2o, with h_h2o the water vapor half-height.
Parameters
----------
sigma_pwv : float
PWV's STD in mm.(1 mm of water over a 1 m^2 area is equivalent to 1 kg/m^2, the actual dimension of the PWD per definition.)
h : float
height of the considered water vapor in m.
Returns
-------
sigma_rho : float
Water vapor density's standard deviation in kg/m^3
"""
if h is None:
h = 2 * self.params["h_h2o"]
if h is not None:
return sigma_pwv / h
else:
raise ValueError("height of the water vapor h must be given")
[docs]
def get_mean_water_vapor_density(self, altitude, rho_0, h_h2o):
r"""Mean water vapor density.
Compute the mean water vapor density depending on the altitude, using reference water vapor density and water vapor half_height, given in self.params.yml.
The corresponding equation to compute the mean water vapor density, taken from equation (1) in Morris 2021, is :
.. math::
\langle \rho(h) \rangle = \rho_0 e^{\left( -log(2).(h - 5190) / h_0 \right)} .
Parameters
----------
altitude : array_like
Array containing altitudes at which we want to compute the mean water vapor density.
Returns
-------
mean_water_vapor_density : array_like
Mean water vapor density in :math:`g/m{3}`.
"""
return rho_0 * np.exp(-np.log(2) * (altitude - 5190) / h_h2o)
[docs]
def get_temperature_atm(self, altitude, temp_ground, h_T):
"""Temperature.
Compute the temperature of the atmosphere depending on the altitude, using the average ground temperature and a typical height that depend on the observation site.
The corresponding equation to compute the temperature, taken from equation (13) in Morris 2021, is :
.. math::
T_{atm}(h) = T_{atm}(0) e^{h / h_T} .
Parameters
----------
altitude : array_like
Array containing altitudes at which we want to compute the temperature.
temp_ground : float
Average ground temperature in K.
h_T : float
Typical height in m.
Returns
-------
temperature : array_like
Temperature in K.
"""
return temp_ground * np.exp(-altitude / h_T)
[docs]
def atm_absorption_coeff(self):
r"""Absorption coefficients.
Method to build the absorption spectrum of the atmopshere using files computed using the am atmospheric model (Paine, 2018).
The absorption coefficient has two origins: the line-by-line absorption which reprensents the spectral lines,
and a continuum absorption coming from collisions between molecules,
either :math:`H_2O-H_2O` collisions (self-induced continuum), or collisions between :math:`H_2O` and the air (air_induced).
See the am documentation for more details: https://lweb.cfa.harvard.edu/~spaine/am/ .
Returns
-------
frequencies : array_like
Frequencies at which the absorption spectrum is computed.
mol_absorption_coeff : array_like
Molecular absorption lines spectrum, from water and dioxygene, in :math:`m^{2}`.
self_absorption_coeff : array_like
Self-induced collisions continuum, from water, in :math:`m^{5}`.
air_absorption_coeff : array_like
Air_induced collisions continuun, from water, in :math:`m^{5}`.
"""
# Spectras for the line-by-line absorption-coefficient were produced using .amc files that looks like that:
# f 130 GHz 250 GHz 0.005 GHz
# output f k
# layer
# P 500 hPa
# T 280 K
# column h2o_lines 1 mm_pwv
# And for the continuum absorption:
# f 130 GHz 250 GHz 0.005 GHz
# output f k
# layer
# P 550 hPa
# T 280 K
# h 3000 m
# column h2o_self_continuum 1 mm_pwv
# Then we only need to execute a command line like: am file_name.amc > file_name.out .
# The file 'file_name.out' is created and contains two colomns, one for the frequency, and one for the absorption coefficient.
# Be careful, am results are in cm and not in m.
frequencies = []
mol_absorption_coeff = []
self_absorption_coeff = []
air_absorption_coeff = []
### Initialize atmosppheric properties
pressure = self.params["pressure_atm"]
temp = self.params["temp_ground"]
pwv = self.params["pwv"]
### Import absorption coefficients from molecular absorption lines
with open(
self.params["path_am_files"] + f"h2o_lines_{pressure}hPa_{temp}K_{pwv}mm.out",
"r",
) as file:
for line in file:
frequencies.append(float(line.split()[0]))
mol_absorption_coeff.append(float(line.split()[1]) * (1e-2) ** 2)
### Import absorption coefficients from self-induced collisions continuum
with open(self.params["path_am_files"] + "h2o_self_continuum.out", "r") as file:
for line in file:
self_absorption_coeff.append(float(line.split()[1]) * (1e-2) ** 5)
### Import absorption coefficients from air-induced collisions continuum
with open(self.params["path_am_files"] + "h2o_air_continuum.out", "r") as file:
for line in file:
air_absorption_coeff.append(float(line.split()[1]) * (1e-2) ** 5)
return (
np.array(mol_absorption_coeff),
np.array(self_absorption_coeff),
np.array(air_absorption_coeff),
frequencies,
)
[docs]
def get_gas_properties(self, params_file=True):
r"""Gas properties.
Method to compute the properties of the water vapor and the air in the atmosphere. It uses the CoolProp package (https://github.com/CoolProp/CoolProp) to compute the molar mass of air and water.
It can use parameters given in self.params.yml or be baised on the CoolProp package to compute the water vapor density.
The pressure as to be defined in the params.yml file. The temperature is computed using the function 'get_temp_atm'.
The molar mass of air is given by the CoolProp package: CP.PropsSI('MOLARMASS', 'Air') kg/mol.
The molar mass of water is given by the CoolProp package: CP.PropsSI('MOLARMASS', 'Water') kg/mol.
From molar mass, the mass is given by :
.. math::
m = M / N_A ,
where :math:`m` is the mass, :math:`M` is the molar mass and :math:`N_A` is the Avogadro constant, given by scipy.constants .
The mass density of air is computed using CoolProp package, from the pressure and the temperature of the atmosphere.
Then, the density of air is computed with :
.. math::
n = \rho / m ,
where :math:`n` is the density, :math:`\rho` is the mass density and :math:`m` is the mass.
For the water vapor density, you can follow the same steps by putting the argument params_file = False,
or compute it using the mass density computed using 'get_mean_water_vapor_density' and the parameters given in self.params_file.
Parameters
----------
self.params_file : bool, optional
If True, will use the reference water vapor density given in self.params_file by self.params['rho_0'] (in :math:`g/m^{3}`) to compute the density in :math:`m^{-3}`.
Else, will use the temperature and pression value to compute it. By default True.
Returns
-------
water_mass : float
The weight of a :math:`H_2O` molecule in :math:`g`.
water_vapor_density : float
The density of water vapor given the atmospheric temperature and pressure in :math:`m^{-3}`.
air_vapor_density : float
The density of air vapor given the atmospheric temperature and pressure in :math:`m^{-3}`.
"""
### Import physical constants
temp_atm = self.temperature # in K
pressure_atm = self.params["pressure_atm"] * 100 # in Pa
### Air properties
air_molar_mass = CP.PropsSI("MOLARMASS", "Air") # in kg/mol
air_mass = air_molar_mass / c.Avogadro * 1e3 # in g
air_mass_density = CP.PropsSI(
"D", "T", temp_atm, "P", pressure_atm, "Air"
) # in kg/m-3
air_density = air_mass_density * 1e3 / air_mass # in m-3
### Water properties
water_molar_mass = CP.PropsSI("MOLARMASS", "Water") # in kg/mol
water_mass = water_molar_mass / c.Avogadro * 1e3 # in g
### Compute water vapor density
if params_file:
water_vapor_density = self.mean_water_vapor_density / water_mass # in m-3
else:
water_vapor_mass_density = CP.PropsSI(
"D", "T", temp_atm, "P", pressure_atm, "Water"
) # in kg/m-3
water_vapor_density = (
c.Avogadro * water_vapor_mass_density / water_molar_mass * 1e-3
) # in m-3
return water_mass, water_vapor_density, air_density
[docs]
def absorption_spectrum(self):
r"""Absorption spectrum.
The coefficient :math:`\alpha_b(\nu)` (:math:`m^2/g`) is defined by:
.. math::
\alpha_b(\nu) = \frac{1}{m_{H_2O}} \left(k_{lines}(\nu) + n_{H_2O}k_{self}(\nu) + n_{air}k_{air}(\nu)\right) ,
with :math:`m_{H_2O}= 2.992\times 10^{-23} g` the mass of a :math:`H_2O` molecule, :math:`k_{lines}` (:math:`m^2`) the line-by-line absorption coefficient,
:math`k_{self}` and :math`k_{air}` (:math:`m^5`) the self- and air-induced continua, :math:`n_{H_2O}` and :math:`n_{air}` (:math:`m^{-3}`) the densities of water vapor and air.
Returns
-------
abs_spectrum : array_like
Atmosphere absorption coefficient, in :math:`m^{2} / g`.
"""
### Import gas properties
water_mass, water_vapor_density, air_density = self.get_gas_properties()
### Compute coeff
abs_spectrum = (
self.mol_absorption_coeff
+ water_vapor_density * self.self_absorption_coeff
+ air_density * self.air_absorption_coeff
) / water_mass
return abs_spectrum
[docs]
def get_integrated_absorption_spectrum(self, band):
"""Integrated absorption spectrum within a band.
Compute the integrated absorption spectrum in a given frequency band, according to the parameters in the self.params.yml file.
Parameters
----------
band : int
QUBIC frequency band. Can be either 150 or 220.
Returns
-------
integrated_abs_spectrum : numpy.ndarray
The integrated absorption spectrum in the given frequency band, in m^2/g.
nus : numpy.ndarray
The frequencies at which the absorption spectrum is computed, in GHz.
"""
#! Verify if the integration is made properly !!!
### Verify the given band
if band not in [150, 220]:
raise ValueError("Band must be either 150 or 220 GHz.")
### Evaluate the frequency band edges
freq_min, freq_max = (
self.integration_frequencies[0],
self.integration_frequencies[-1],
)
freq_step = (freq_max - freq_min) / (len(self.integration_frequencies) - 1)
### Compute the frequency sub-bands within the QUBIC band and their associated indexes
_, nus_edges, nus, bandwidths, _, N_bands = compute_freq(
band=band,
Nfreq=int(self.params["nsub_in"] / 2),
relative_bandwidth=self.qubic_dict["filter_relative_bandwidth"],
)
nus_edge_index = np.round((nus_edges - freq_min) / freq_step).astype(int)
### Integrate the absorption spectrum over the frequency sub-bands using the trapezoidal method
# Need to normalize by the bandwidth to get average absorption coefficient
integrated_abs_spectrum = np.array(
[
np.trapz(
self.abs_spectrum[nus_edge_index[i] : nus_edge_index[i + 1]],
x=self.integration_frequencies[
nus_edge_index[i] : nus_edge_index[i + 1]
],
)
/ (nus_edges[i + 1] - nus_edges[i])
for i in range(N_bands)
]
)
return integrated_abs_spectrum, nus, bandwidths
[docs]
def integrated_absorption_spectrum(self):
"""Integrated absorption spectrum.
Integrated absorption spectrum in the QUBIC frequency bands: 150 and 220 GHz.
Returns
-------
integrated_abs_spectrum : array_like
The integrated absorption spectrum in the given frequency band, in :math:`m^{2} / g`.
nus : array_like
The frequencies at which the absorption spectrum is computed, in :math:`GHz`.
"""
### Get the integrated absorption spectrum in the two QUBIC bands : 150 and 220 GHz
int_abs_spectrum_150, nus_150, bandwidths_150 = (
self.get_integrated_absorption_spectrum(band=150)
)
int_abs_spectrum_220, nus_220, bandwidths_220 = (
self.get_integrated_absorption_spectrum(band=220)
)
return (
np.append(int_abs_spectrum_150, int_abs_spectrum_220),
np.append(nus_150, nus_220),
np.append(bandwidths_150, bandwidths_220),
)
[docs]
class AtmosphereMaps(AtmosphereProperties):
def __init__(self, params):
### Import parameters and the class describing the atmosphere
self.params = params
AtmosphereProperties.__init__(self, params)
### Compute the maximum multipole according to the resolution of the map
self.lmax = 3 * self.params["nside"] - 1
### Build water vapor density map
self.delta_rho_map = self.get_water_vapor_density_fluctuation_2d_map(
flat=self.params["flat"]
)
### Build the temperature maps of the atmosphere
self.atm_temp_maps = self.get_temp_maps(self.delta_rho_map)
[docs]
def get_fourier_grid_2d(self, n_grid, size_atm):
"""Fourier 2d grid.
Generate a 2d grid of spatial frequencies in Fourier space according to the parameters in the self.params.yml file.
Parameters
----------
n_grid : int
Number of grid points in the 2d grid.
size_atm : float
Size of the atmosphere in m.
Returns
-------
kx : array_like
2d array containing the spatial x frequencies in Fourier space, (n_grid, n_grid).
ky : array_like
2d array containing the spatial y frequencies in Fourier space, (n_grid, n_grid).
k_norm : array_like
2d array containing the norm of the spatial frequencies in Fourier space, (n_grid, n_grid).
"""
### Generate spatial frequency in Fourier space
k_distrib_y = np.fft.fftfreq(n_grid, d=2 * size_atm / n_grid) * 2 * np.pi
k_distrib_x = np.fft.fftfreq(n_grid, d=2 * size_atm / n_grid) * 2 * np.pi
### Build 2d grid and compute the norm of the spatial frequencies
kx, ky = np.meshgrid(k_distrib_x, k_distrib_y, indexing="ij")
k_norm = np.sqrt(kx**2 + ky**2)
return kx, ky, k_norm
[docs]
def kolmogorov_spectrum(self, k, r0, sigma_rho=None, atm_size=None):
r"""Kolmogorov spectrum.
Compute the Kolmogorov spectrum, which simulate the power spectrum of the spatial fluctuations of the water vapor density, following the equation :
.. math::
P(\textbf{k}) = (r_0^{-2} + \lvert \textbf{k} \rvert ^{2})^{-11/6} .
Parameters
----------
k : array_like
Array containing the spatial frequencies at which we want to compute the Kolmogorov 2d spectrum.
r0 : float
Maximum spatial coherence length of the water vapor density, in m.
Returns
-------
kolmogorov_spectrum : array_like
Kolmogorov spectrum.
"""
if self.params["2d"]:
exposant = -8 / 6
else:
exposant = -11 / 6
k_r0 = 2 * np.pi / r0
P = (k_r0**2 + k**2) ** exposant
if self.params["adjust"] and sigma_rho is not None and atm_size is not None:
# Compute normalization constant C
dk = np.pi / atm_size
sum_kk = np.sum(P) * dk**2
C = sigma_rho / sum_kk
P *= C
return P
[docs]
def normalized_kolmogorov_spectrum(self, k, r0, sigma_rho=None, atm_size=None):
r"""Normalized Kolmogorov spectrum.
Compute the normalized Kolmogorov spectrum, to ensure :
.. math::
\int_\textbf{k} P(\textbf{k}) d\textbf{k} = 1 .
Parameters
----------
k : array_like
Array containing the spatial frequencies at which we want to compute the normalized Kolmogorov spectrum.
r0 : float
Maximum spatial coherence length of the water vapor density, in m.
Returns
-------
normalized_kolmogorov_spectrum : array_like
Normalized Kolmogorov spectrum.
"""
### Compute the normalization constant
res, _ = quad(
lambda x: self.kolmogorov_spectrum(x, r0, sigma_rho, atm_size),
np.min(k),
np.max(k),
)
return self.kolmogorov_spectrum(k, r0, sigma_rho, atm_size) / res
[docs]
def generate_spatial_fluctuations_fourier(
self, n_grid, size_atm, r0, sigma_rho=None, atm_size=None, Debug=False
):
"""Spatial 2d fluctuations.
Produce the spatial fluctuations of the water vapor density, by generating random phases in Fourier space, and then computing the inverse Fourier transform.
Parameters
----------
n_grid : int
Number of grid points in the 2d grid.
size_atm : float
Size of the atmosphere in m.
r0 : float
Maximum spatial coherence length of the water vapor density, in m
Debug : bool
If True, prints sigma_rho / np.sqrt( np.var(delta_rho)) to check wether the fourier normalization was done correctly in kolmogorov_spectrum.
By default False
Returns
-------
delta_rho_x : array_like
Variation of the water vapor density.
"""
#! At some point, we will need to normalize these fluctuations using real data. We can maybe use :math:`\sigma_{PWV}` that can be estimated with figure 4 in Morris 2021.
### Compute the spatial frequencies & power spectrum.
_, _, k = self.get_fourier_grid_2d(n_grid, size_atm)
kolmogorov_spectrum = self.normalized_kolmogorov_spectrum(
k, r0, sigma_rho, atm_size
)
### Generate spatial fluctuations through random phases in Fourier space
phi = np.random.uniform(
0, 2 * np.pi, size=(self.params["n_grid"], self.params["n_grid"])
)
delta_rho_k = np.sqrt(kolmogorov_spectrum) * np.exp(1j * phi)
### Apply inverse Fourier transform to obtain spatial fluctuations in real space
delta_rho = np.fft.ifft2(
delta_rho_k, s=(self.params["n_grid"], self.params["n_grid"])
).real
### Normalize to ensure correct variance
delta_rho *= sigma_rho / np.std(delta_rho)
if Debug:
print(
"sigma_rho / np.sqrt( np.var(delta_rho))= ",
sigma_rho / np.sqrt(np.var(delta_rho)),
)
return delta_rho
[docs]
def kolmogorov_correlation_function(self, r, r0):
r"""Kolmogorov correlation function.
Compute the Kolmogorov correlation function by applying the inverse Fourier transform to the Kolmogorov 2d spectrum.
This correlation function can be written as:
.. math::
D(r) = \frac{2^{2/3}}{\Gamma(1/3)} \left(\frac{r}{r_0}\right)^{1/3} K_{1/3} \left(\frac{r}{r_0}\right) .
We impose that the correlation function is 1 at r = 0.
Parameters
----------
r : array_like or float
Distance between two points, in meters.
r0 : array_like or float
Maximum correlation length, in meters.
Returns
-------
D : array_like
Correlation function.
"""
return np.where(
r == 0,
1,
2 ** (2 / 3) / sp.gamma(1 / 3) * (r / r0) ** (1 / 3) * sp.kv(1 / 3, r / r0),
)
[docs]
def angular_correlation(self, theta, h_atm, r0):
r"""Angular Kolmogorov correlation function.
We compute the angular Kolmogorov correlation function, switching the distance between two points to the angle between them on the surface of the sphere,
using the relation :
.. math::
r = 2h_{atm} \sin \left(\frac{\theta}{2}\right) ,
where :math:`h_{atm}` is the distance between the atmosphere and our instrument and :math:`\theta` is the angle between the two points.
Parameters
----------
theta : array_like or float
Angle between two points, in degrees.
h_atm : array_like or float
Distance between the atmosphere and our instrument, in meters.
r0 : float
Maximum correlation length, in meters.
Returns
-------
C : array_like or float
Angular Kolmogorov correlation function.
"""
### Compute the distance between two points separeted by the angle theta on the surface of the sphere
r = 2 * h_atm * np.sin(np.radians(theta) / 2)
return self.kolmogorov_correlation_function(r, r0)
[docs]
def cl_from_angular_correlation_int(self, ell):
r"""Angular power spectrum from angular correlation function.
Compute the angular power spectrum from the angular correlation function, using the formula:
.. math::
C_{\ell} = 2 \pi \int_{-1}^{1} C(\theta) P_{\ell}(\cos \theta) d \cos \theta ,
where :math:`C(\theta)` is the angular correlation function and :math:`P_l(\cos \theta)` is the Legendre polynomial of order :math:`l`.
WARNING: This function is not very efficient, as it computes the Legendre polynomial of order l for each value of cos(theta). It takes a long time to compute for high values of l.
It is just used for testing purposes and to compute the case \ell = 0 which is not possible in 'ctheta_2_dell'.
Parameters
----------
ell : int
Angular multipole order.
Returns
-------
C_l : float
Angular power spectrum of order l.
"""
#! It should be possible to speed the computation by using the function np.polynomial.legendre.leggauss
###Compute the integrand for the integral
def integrand(cos_theta):
# Compute theta from cos(theta)
theta = np.degrees(np.arccos(cos_theta))
# Compute the legendre polynomial of order l
legendre = sp.legendre(ell)(cos_theta)
# Return the product of the angular correlation function and the legendre polynomial
return (
self.angular_correlation(
theta,
self.params["altitude_atm_2d"],
self.params["correlation_length"],
)
* legendre
)
### Integrate over cos(theta)
res, _ = quad(integrand, -1, 1)
return 2 * np.pi * res
[docs]
def ctheta_2_dell(self, theta_deg, ctheta, lmax, normalization=1):
r"""Angular power spectrum from angular correlation function.
Compute the angular power spectrum from the angular correlation function, using the formula:
.. math::
C_{\ell} = 2 \pi \int_{-1}^{1} C(\theta) P_{\ell}(\cos \theta) d \cos \theta ,
where :math:`C(\theta)` is the angular correlation function and :math:`P_l(\cos \theta)` is the Legendre polynomial of order :math:`l`.
This computation is done using the CAMB library, which is much faster than the 'cl_from_angular_correlation_int' function.
Parameters
----------
theta_deg : array_like
Angles between two points on the surface of the sphere, in degrees.
ctheta : array_like
Angular correlation function.
lmax : int
Maximum angular multipole order.
normalization : int, optional
Normalization parameter, by default 1
Returns
-------
dell : array_like
Angular power spectrum.
"""
### Compute the Legendre polynomials up to lmax+1, and the theta values
### These cos_theta do not contain cos(theta)=1 so we have to do this case separately
cos_theta, legendre = np.polynomial.legendre.leggauss(lmax + 1)
xdeg = np.degrees(np.arccos(cos_theta))
### Replace C(theta=0) by 0
myctheta = ctheta.copy()
myctheta[0] = 0
### Fill the array that should include polarization (we put zeros there) with the values of our imput c(theta) interpolated at the cos_theta locations
allctheta = np.zeros((len(cos_theta), 4))
allctheta[:, 0] = np.interp(xdeg, theta_deg, myctheta)
### Call the camb function that does the transform from C(theta) to Dl
dlth = cc.corr2cl(allctheta, cos_theta, legendre, lmax)
### Compute the multipole moments
ell = np.arange(lmax + 1)
### the special case cos(theta)=1 corresponds to theta=0 and add 2pi times c(theta=0) to the Dl
return ell, dlth[:, 0] + ctheta[0] * normalization
[docs]
def ctheta_2_cell(self, theta_deg, ctheta, lmax, normalization=1):
r"""Angular power spectrum from angular correlation function.
Compute the angular power spectrum from the angular correlation function, using the function 'ctheta_2_dell',
and convert the result to :math:`C_{\ell}`.
Parameters
----------
theta_deg : array_like
Angles between two points on the surface of the sphere, in degrees.
ctheta : array_like
Angular correlation function.
lmax : int
Maximum angular multipole order.
normalization : int, optional
Normalization parameter, by default 1
Returns
-------
cell : array_like
Angular power spectrum.
"""
#! Warning : the Cl computed using CAMB are different from the ones computed using 'cl_from_angular_correlation_int' at large l
### Compute multipole moments and Dl angular power spectrum
ell, dlth = self.ctheta_2_dell(
theta_deg, ctheta, lmax, normalization=normalization
)
### Convert from Dl to Cl
dl2cl_factor = 2 * np.pi / (ell * (ell + 1))
clth = dlth * dl2cl_factor
### Correct for the special case l=0, as the convertion factor is not valid
clth[0] = self.cl_from_angular_correlation_int(0)
return ell, clth
[docs]
def generate_spatial_fluctuation_sphercial_harmonics(
self, sigma_rho=None, atm_size=None, Debug=False
):
"""Spatial fluctuation map from angular correlation function.
Compute the spatial fluctuation HEALPix map from the angular correlation function, using the function 'ctheta_2_cell'.
Debug can be used to check
Returns
-------
delta : array_like
Spatial fluctuation map, generated accorging HEALPix formalism.
"""
### Compute angular correlation function
theta = np.linspace(0, 180, self.params["n_theta"])
ctheta = self.angular_correlation(
theta, self.params["altitude_atm_2d"], self.params["correlation_length"]
)
### Compute spherical harmonics from angular correlation function
_, clth = self.ctheta_2_cell(
theta, ctheta, self.lmax, normalization=self.params["normalization"]
)
"""
if self.params["adjust"]:
sum_cl = np.sum(clth)
C = sigma_rho * (atm_size**2) / sum_cl
clth *= C
"""
if self.params["adjust"]:
# var_theory = np.sum(np.fromiter(((2*l + 1) * clth[l] for l in range(self.lmax + 1)),float) )/ (4*np.pi) #Calling np.sum(generator) is deprecated, and in the future will give a different result. Use np.sum(np.fromiter(generator)) or the python sum builtin instead.
sigma_theo = np.std(clth)
C = sigma_rho / np.sqrt(sigma_theo)
clth *= C
### Build fluctuations map
delta_rho = hp.synfast(clth, nside=self.params["nside"], lmax=self.lmax)
return delta_rho
[docs]
def get_water_vapor_density_fluctuation_2d_map(self, flat=True):
"""Water vapor density 2d map.
Get the water vapor density 2d map with simulated fluctuations.
The spatial fluctuations are generated using either the correlation function or the angular correlation function.
Parameters
----------
mean_rho : array_like
Mean water vapor density.
angular : bool, optional
If True, use the angular correlation function. If False, use the correlation function., by default True
Returns
-------
atm_maps_2d : array_like
Water vapor density 2d map.
"""
### Build water vapor density fluctuations
if flat:
delta_rho = self.generate_spatial_fluctuations_fourier(
self.params["n_grid"],
self.params["size_atm"],
self.params["correlation_length"],
sigma_rho=self.params["sigma_rho"],
atm_size=self.params["size_atm"],
)
else:
delta_rho = self.generate_spatial_fluctuation_sphercial_harmonics(
sigma_rho=self.params["sigma_rho"], atm_size=self.params["size_atm"]
)
return delta_rho
[docs]
def get_temp_maps(self, maps):
r"""Atmosphere maps.
Get the atmosphere maps in temperature, by using the equation 12 from Morris 2021, that compute the induced temperature in the detector due to the water vapor density :
.. math::
dT( \textbf{r}, \nu) = \alpha_b(\nu) \rho(\textbf{r}) T_{atm}(\textbf{r}) dV .
And then, convert it in micro Kelvin CMB.
Parameters
----------
maps : array_like
Water vapor density 2d map.
Returns
-------
temp_maps : array_like
Temperature maps in micro Kelvin CMB.
"""
### Compute the associated temperature maps from the wapor density maps, using the equation 12 from Morris 2021
if len(maps.shape) == 1:
temp_maps = (
self.integrated_abs_spectrum[:, np.newaxis]
* self.mean_water_vapor_density
* self.temperature
* maps
) # maps obtained with spherical harmonics (flat =False)
else:
temp_maps = (
self.integrated_abs_spectrum[:, np.newaxis, np.newaxis]
* self.mean_water_vapor_density
* self.temperature
* maps
) # flat maps
### Convert them into micro Kelvin CMB
# temp_maps -= Planck18.Tcmb0.value
temp_maps *= 1e6
return temp_maps
[docs]
def horizontal_plane_to_azel(self, x, y, z):
"""Horizontal plane to azimuth and elevation.
Convert the coordinates in the horizontal plane to azimuth and elevation.
Parameters
----------
x : array_like
x coordinate.
y : array_like
y coordinate.
z : array_like
z coordinate.
Returns
-------
az : array_like
Azimuth coordinate.
el : array_like
Elevation coordinate.
"""
r = np.sqrt(x**2 + y**2 + z**2)
el = np.pi / 2 - np.arccos(z / r)
az = np.arctan2(y, x)
return r, az, el
[docs]
def azel_to_horizontal_plane(self, r, az, el):
r"""
Parameters
----------
r : array_like
Radius coordinate.
az : array_like
Azimuth coordinate.
el : array_like
Elevation coordinate.
Returns
-------
x : array_like
x coordinate.
y : array_like
y coordinate.
z : array_like
z coordinate.
"""
x = r * np.cos(el) * np.cos(az)
y = r * np.cos(el) * np.sin(az)
z = r * np.sin(el)
return x, y, z
[docs]
def get_azel_coordinates(self):
az_list, el_list = [], []
for ind_x in range(len(self.x_list)):
for ind_y in range(len(self.y_list)):
_, az, el = self.horizontal_plane_to_azel(
self.x_list[ind_x], self.y_list[ind_y], self.altitude[0]
)
az_list.append(az)
el_list.append(el)
return np.asarray([az_list, el_list]).T
[docs]
def get_healpy_atm_maps_2d(self, maps, longitude, latitude):
"""Healpy 2d atmosphere maps.
Function to project the 2d atmosphere maps in cartesian coordinates, and then project them in spherical coordinates using healpy.
By default, the projection is centered on the QUBIC patch (RA=0, DEC=-57).
Returns
-------
hp_maps_2d : array_like
2d healpy maps of the atmosphere.
"""
### Build list of azimuth and elevation coordinates for each point of the atmosphere
azel_coordinates = self.get_azel_coordinates()
### Build rotation operator
rotation_above_qubic = Cartesian2SphericalOperator("azimuth,elevation")(
Rotation3dOperator("ZY'", longitude, 90 - latitude, degrees=True)(
Spherical2CartesianOperator("azimuth,elevation")
)
)
### Build healpy projection operator
rotation_azel2hp = Spherical2HealpixOperator(
self.params["nside"], "azimuth,elevation"
)
### Fill the healpy maps with the temperature maps using the operators
hp_maps_index = rotation_azel2hp(rotation_above_qubic(azel_coordinates)).astype(
int
)
hp_maps_2d = np.zeros(
(len(self.frequencies), hp.nside2npix(self.params["nside"]))
)
for ifreq in range(len(self.frequencies)):
hp_maps_2d[ifreq, hp_maps_index] = maps[ifreq].flatten()
return hp_maps_2d
[docs]
class WindPerturbation:
def __init__(self, params, qubic_sampling):
"""Wind Perturbation
This class aims to simulate wind for atmosphere observations.
It relies on the same parameters dictionary than AtmosphereMaps, as well as on a QubicSampling instance.
We take the choice to simulate wind by deviating the scanning strategy, rather than shifting pixels in an atmosphere maps.
The two methods are equivalent, but the one we chose if way cheaper in terms of computational ressources.
We will then deviated the pixels seen in the QubicSampling instance according to the wind generated from the parameters given in the parameters dict.
Parameters
----------
params : dict
Dictionary containing all the simulation parameters
scanning_strategy : QubicSampling
QubicSampling instance
"""
self.params = params
self.qubic_sampling = qubic_sampling
self.npointings = self.qubic_sampling.index.size
[docs]
def azel_to_cartesian(self, azimuth, elevation, altitude):
"""AzEl to Cartesian coordinates."""
x = altitude / np.sin(elevation) * np.cos(elevation) * np.cos(azimuth)
y = altitude / np.sin(elevation) * np.cos(elevation) * np.sin(azimuth)
return x, y
[docs]
def cartesian_to_azel(self, x, y, z):
"""Cartesian to AzEl cooordinates."""
r = np.sqrt(x**2 + y**2 + z**2)
el = np.pi / 2 - np.arccos(z / r)
az = np.arctan2(y, x)
return az, el
[docs]
def get_constant_wind(self, wind_x, wind_y):
r"""Constant wind field
Parameters
----------
wind_x, wind_y : float
wind's x and y components
Returns
-------
wind_field : array_like
shape (2, npointings)
"""
wind = np.cumsum(np.ones(self.npointings))
return wind_x * wind, wind_y * wind
[docs]
def get_normal_wind(self, wind_mean, wind_std):
r"""Random normal wind field
Parameters
----------
wind_mean, wind_std : array_like
parameters of the Gaussian distribution. Each is a lenght 2 array for x and y components
Returns
-------
wind_field : array_like
shape (2, npointings)
"""
wind_x = np.cumsum(np.random.normal(wind_mean[0], wind_std[0], self.npointings))
wind_y = np.cumsum(np.random.normal(wind_mean[1], wind_std[1], self.npointings))
return wind_x, wind_y
[docs]
def get_wind(self):
if self.params["wind_type"] == "constant":
wind_x, wind_y = self.get_constant_wind(
self.params["wind_cst"][0], self.params["wind_cst"][1]
)
elif self.params["wind_type"] == "normal":
wind_x, wind_y = self.get_normal_wind(
self.params["wind_normal"][0], self.params["wind_normal"][1]
)
else:
raise ValueError(
"Wind type not defined yet. Please enter 'constant' or 'normal'."
)
return wind_x, wind_y
[docs]
def get_deviated_index(self, pos_x, pos_y, wind_x, wind_y):
deviated_index_x = (np.round(wind_x) + np.round(pos_x)).astype(int)
deviated_index_y = (np.round(wind_y) + np.round(pos_y)).astype(int)
return deviated_index_x, deviated_index_y
[docs]
def get_deviated_coord(self):
azimuth, elevation = self.qubic_sampling.azimuth, self.qubic_sampling.elevation
if not self.params["wind"]:
return azimuth, elevation
# Get wind
wind_x, wind_y = self.get_wind()
# Compute deviated scanning strategy
x, y = self.azel_to_cartesian(
np.radians(azimuth), np.radians(elevation), self.params["altitude_atm_2d"]
)
deviated_index_x, deviated_index_y = self.get_deviated_index(x, y, wind_x, wind_y)
deviated_az, deviated_el = self.cartesian_to_azel(
deviated_index_x, deviated_index_y, self.params["altitude_atm_2d"]
)
deviated_az, deviated_el = (
np.degrees(deviated_az % (2 * np.pi)),
np.degrees(deviated_el),
)
return deviated_az, deviated_el
[docs]
def get_deviated_qubic_sampling(self):
deviated_az, deviated_el = self.get_deviated_coord()
deviated_qubic_sampling = QubicSampling(
azimuth=deviated_az,
elevation=deviated_el,
pitch=self.qubic_sampling.pitch,
angle_hwp=self.qubic_sampling.angle_hwp,
time=self.qubic_sampling.time,
period=self.qubic_sampling.period,
latitude=self.qubic_sampling.latitude,
longitude=self.qubic_sampling.longitude,
)
deviated_qubic_sampling.fix_az = True
return deviated_qubic_sampling