import pickle
import warnings
import healpy as hp
import numpy as np
from fgbuster.mixingmatrix import MixingMatrix
from pyoperators import (
MPI,
AdditionOperator,
BlockColumnOperator,
BlockDiagonalOperator,
BlockRowOperator,
CompositionOperator,
DenseBlockDiagonalOperator,
DenseOperator,
DiagonalOperator,
I,
IdentityOperator,
MPIDistributionIdentityOperator,
Operator,
PackOperator,
ReshapeOperator,
SymmetricBandToeplitzOperator,
proxy_group,
rule_manager,
)
from pyoperators.utils.mpi import as_mpi
from pysimulators import (
Acquisition,
)
from pysimulators.interfaces.healpy import HealpixConvolutionGaussianOperator
from pysimulators.noises import (
_fold_psd,
_gaussian_psd_1f,
_logloginterp_psd,
_psd2invntt,
_unfold_psd,
)
from qubic.data import PATH
from qubic.lib.Instrument.Qinstrument import (
QubicInstrument,
QubicMultibandInstrument,
compute_freq,
)
from qubic.lib.Qsamplings import get_pointing
from qubic.lib.Qscene import QubicScene
warnings.filterwarnings("ignore")
[docs]
class QubicAcquisition(Acquisition):
"""
The QubicAcquisition class, which combines the instrument, sampling and
scene models.
"""
def __init__(self, instrument, sampling, scene, d):
"""
acq = QubicAcquisition(instrument, sampling, scene, d)
Parameters
----------
instrument : QubicInstrument, optional
The QubicInstrument instance.
sampling : pointing
Pointing obtained with get_pointing().
scene : QubicScene, optional
The discretized observed scene (the sky).
d : dictionary with lot of parameters:
block : tuple of slices, optional
Partition of the samplings.
effective_duration : float, optional
If not None, the noise properties are rescaled so that this
acquisition has an effective duration equal to the specified value,
in years.
photon_noise : boolean, optional
If true, the photon noise contribution is included.
max_nbytes : int or None, optional
Maximum number of bytes to be allocated for the acquisition's
operator.
nprocs_instrument : int, optional
For a given sampling slice, number of procs dedicated to
the instrument.
nprocs_sampling : int, optional
For a given detector slice, number of procs dedicated to
the sampling.
comm : mpi4py.MPI.Comm, optional
The acquisition's MPI communicator. Note that it is transformed
into a 2d cartesian communicator before being stored as the 'comm'
attribute. The following relationship must hold:
comm.size = nprocs_instrument * nprocs_sampling
psd : array-like, optional
The one-sided or two-sided power spectrum density
[signal unit/sqrt Hz].
bandwidth : float, optional
The PSD frequency increment [Hz].
twosided : boolean, optional
Whether or not the input psd is one-sided (only positive
frequencies) or two-sided (positive and negative frequencies).
sigma : float
Standard deviation of the white noise component.
"""
block = d["block"]
effective_duration = d["effective_duration"]
photon_noise = d["photon_noise"]
max_nbytes = d["max_nbytes"]
psd = d["psd"]
bandwidth = d["bandwidth"]
twosided = d["twosided"]
sigma = d["sigma"]
self.interp_projection = d["interp_projection"]
comm = d["comm"]
nprocs_instrument = d["nprocs_instrument"]
nprocs_sampling = d["nprocs_sampling"]
Acquisition.__init__(
self,
instrument,
sampling,
scene,
block=block,
max_nbytes=max_nbytes,
nprocs_instrument=nprocs_instrument,
nprocs_sampling=nprocs_sampling,
comm=comm,
)
self.photon_noise = bool(photon_noise)
self.effective_duration = effective_duration
self.bandwidth = bandwidth
self.psd = psd
self.twosided = twosided
self.sigma = sigma
self.forced_sigma = None
[docs]
def get_coverage(self):
"""
Return the acquisition scene coverage as given by H.T(1), normalized
so that its integral over the sky is the number of detectors times
the duration of the acquisition.
"""
H = self.get_operator()
out = H.T(np.ones((len(self.instrument), len(self.sampling))))
if self.scene.kind != "I":
out = out[..., 0].copy() # to avoid keeping QU in memory
ndetectors = self.comm.allreduce(len(self.instrument))
nsamplings = self.sampling.comm.allreduce(len(self.sampling))
out *= ndetectors * nsamplings * self.sampling.period / np.sum(out)
return out
[docs]
def get_hitmap(self, nside=None): # ?
"""
Return a healpy map whose values are the number of times a pointing
hits the pixel.
"""
if nside is None:
nside = self.scene.nside
ipixel = self.sampling.healpix(nside)
npixel = 12 * nside**2
hit = np.histogram(ipixel, bins=npixel, range=(0, npixel))[0]
self.comm.Allreduce(MPI.IN_PLACE, as_mpi(hit), op=MPI.SUM)
return hit
[docs]
def get_noise(self, det_noise, photon_noise, seed=None, out=None):
np.random.seed(seed)
out = self.instrument.get_noise(
self.sampling, self.scene, det_noise, photon_noise, out=out
)
if self.effective_duration is not None:
nsamplings = self.sampling.comm.allreduce(len(self.sampling))
factor = np.sqrt(
nsamplings * self.sampling.period / (self.effective_duration * 31557600)
)
out *= factor
return out
get_noise.__doc__ = Acquisition.get_noise.__doc__
[docs]
def get_aperture_integration_operator(self):
"""
Integrate flux density in the telescope aperture.
Convert signal from W / m^2 / Hz into W / Hz.
"""
return self.instrument.get_aperture_integration_operator()
[docs]
def get_convolution_peak_operator(self, **keywords):
"""
Return an operator that convolves the Healpix sky by the gaussian
kernel that, if used in conjonction with the peak sampling operator,
best approximates the synthetic beam.
"""
return self.instrument.get_convolution_peak_operator(**keywords)
[docs]
def get_detector_integration_operator(self):
"""
Integrate flux density in detector solid angles.
Convert W / sr into W.
"""
return self.instrument.get_detector_integration_operator()
[docs]
def get_detector_response_operator(self):
"""
Return the operator for the bolometer responses.
"""
return BlockDiagonalOperator(
[
self.instrument.get_detector_response_operator(self.sampling[b])
for b in self.block
],
axisin=1,
)
[docs]
def get_distribution_operator(self):
"""
Return the MPI distribution operator.
"""
return MPIDistributionIdentityOperator(self.comm)
[docs]
def get_filter_operator(self):
"""
Return the filter operator.
Convert units from W/Hz to W.
"""
return self.instrument.get_filter_operator()
[docs]
def get_hwp_operator(self):
"""
Return the operator for the bolometer responses.
"""
return BlockDiagonalOperator(
[
self.instrument.get_hwp_operator(self.sampling[b], self.scene)
for b in self.block
],
axisin=1,
)
[docs]
def get_diag_invntt_operator(self):
print("Use diagonal noise covariance matrix")
sigma_detector = self.instrument.detector.nep / np.sqrt(2 * self.sampling.period)
if self.photon_noise:
sigma_photon = self.instrument._get_noise_photon_nep(self.scene) / np.sqrt(
2 * self.sampling.period
)
else:
sigma_photon = 0
out = DiagonalOperator(
1 / (sigma_detector**2 + sigma_photon**2),
broadcast="rightward",
shapein=(len(self.instrument), len(self.sampling)),
)
if self.effective_duration is not None:
nsamplings = self.comm.allreduce(len(self.sampling))
out /= (
nsamplings * self.sampling.period / (self.effective_duration * 31557600)
)
return out
[docs]
def get_invntt_operator(
self, det_noise, photon_noise
): # Not working in some cases? ## det_noise, photon_noise are now weights
"""
Return the inverse time-time noise correlation matrix as an Operator.
The input Power Spectrum Density can either be fully specified by using
the 'bandwidth' and 'psd' keywords, or by providing the parameters of
the gaussian distribution:
psd = sigma**2 * (1 + (fknee/f)**fslope) / B
where B is the sampling bandwidth equal to sampling_frequency / N.
Parameters
----------
sampling : Sampling
The temporal sampling.
psd : array-like, optional
The one-sided or two-sided power spectrum density
[signal unit/sqrt Hz].
bandwidth : float, optional
The PSD frequency increment [Hz].
twosided : boolean, optional
Whether or not the input psd is one-sided (only positive
frequencies) or two-sided (positive and negative frequencies).
sigma : float
Standard deviation of the white noise component.
sampling_frequency : float
The sampling frequency [Hz].
fftw_flag : string, optional
The flags FFTW_ESTIMATE, FFTW_MEASURE, FFTW_PATIENT and
FFTW_EXHAUSTIVE can be used to describe the increasing amount of
effort spent during the planning stage to create the fastest
possible transform. Usually, FFTW_MEASURE is a good compromise
and is the default.
nthreads : int, optional
Tells how many threads to use when invoking FFTW or MKL. Default is
the number of cores.
"""
fftw_flag = "FFTW_MEASURE"
nthreads = None
# if self.bandwidth is None or self.psd is None:
if (
self.bandwidth is None
and self.psd is not None
or self.bandwidth is not None
and self.psd is None
):
raise ValueError("The bandwidth or the PSD is not specified.")
# Get sigma in Watt
self.sigma = 0
if det_noise != 0:
self.sigma = (
self.instrument.detector.nep
/ np.sqrt(2 * self.sampling.period)
* det_noise
)
if photon_noise != 0:
sigma_photon = (
self.instrument._get_noise_photon_nep(self.scene)
/ np.sqrt(2 * self.sampling.period)
* photon_noise
)
self.sigma = np.sqrt(self.sigma**2 + sigma_photon**2)
else:
pass
if self.bandwidth is None and self.psd is None and self.sigma is None:
raise ValueError("The noise model is not specified.")
if self.forced_sigma is None:
pass
else:
self.sigma = self.forced_sigma.copy()
shapein = (len(self.instrument), len(self.sampling))
if self.bandwidth is None and self.instrument.detector.fknee == 0:
out = DiagonalOperator(
1 / self.sigma**2,
broadcast="rightward",
shapein=(len(self.instrument), len(self.sampling)),
)
if self.effective_duration is not None:
nsamplings = self.sampling.comm.allreduce(len(self.sampling))
out /= (
nsamplings
* self.sampling.period
/ (self.effective_duration * 31557600)
)
return out
sampling_frequency = 1 / self.sampling.period
nsamples_max = len(self.sampling)
fftsize = 2
while fftsize < nsamples_max:
fftsize *= 2
new_bandwidth = sampling_frequency / fftsize
if self.bandwidth is not None and self.psd is not None:
if self.twosided:
self.psd = _fold_psd(self.psd)
f = np.arange(fftsize // 2 + 1, dtype=float) * new_bandwidth
p = _unfold_psd(_logloginterp_psd(f, self.bandwidth, self.psd))
else:
p = _gaussian_psd_1f(
fftsize,
sampling_frequency,
self.sigma,
self.instrument.detector.fknee,
self.instrument.detector.fslope,
twosided=True,
)
p[..., 0] = p[..., 1]
invntt = _psd2invntt(
p, new_bandwidth, self.instrument.detector.ncorr, fftw_flag=fftw_flag
)
print("non diagonal case")
if self.effective_duration is not None:
nsamplings = self.comm.allreduce(len(self.sampling))
invntt /= (
nsamplings * self.sampling.period / (self.effective_duration * 31557600)
)
return SymmetricBandToeplitzOperator(
shapein, invntt, fftw_flag=fftw_flag, nthreads=nthreads
)
get_invntt_operator.__doc__ = Acquisition.get_invntt_operator.__doc__
[docs]
def get_unit_conversion_operator(self):
"""
Convert sky temperature into W / m^2 / Hz.
If the scene has been initialised with the 'absolute' keyword, the
scene is assumed to include the CMB background and the fluctuations
(in Kelvin) and the operator follows the non-linear Planck law.
Otherwise, the scene only includes the fluctuations (in microKelvin)
and the operator is linear (i.e. the output also corresponds to power
fluctuations).
"""
nu = self.instrument.filter.nu
return self.scene.get_unit_conversion_operator(nu)
[docs]
def get_operator(self):
"""
Return the operator of the acquisition. Note that the operator is only
linear if the scene temperature is differential (absolute=False).
"""
distribution = self.get_distribution_operator()
temp = self.get_unit_conversion_operator()
aperture = self.get_aperture_integration_operator()
filter = self.get_filter_operator()
projection = self.get_projection_operator()
hwp = self.get_hwp_operator()
polarizer = self.get_polarizer_operator()
integ = self.get_detector_integration_operator()
trans_inst = self.instrument.get_transmission_operator()
trans_atm = self.scene.atmosphere.transmission
response = self.get_detector_response_operator()
with rule_manager(inplace=True):
H = CompositionOperator(
[
response,
trans_inst,
integ,
polarizer,
hwp * projection,
filter,
aperture,
trans_atm,
temp,
distribution,
]
)
if self.scene == "QU":
H = self.get_subtract_grid_operator()(H)
return H
[docs]
def get_polarizer_operator(self):
"""
Return operator for the polarizer grid.
"""
return BlockDiagonalOperator(
[
self.instrument.get_polarizer_operator(self.sampling[b], self.scene)
for b in self.block
],
axisin=1,
)
[docs]
def get_projection_operator(self, verbose=True):
"""
Return the projection operator for the peak sampling.
Convert units from W to W/sr.
Parameters
----------
verbose : bool, optional
If true, display information about the memory allocation.
"""
f = self.instrument.get_projection_operator
if len(self.block) == 1:
return BlockColumnOperator(
[
f(
self.sampling[b],
self.scene,
verbose=verbose,
interp_projection=self.interp_projection,
)
for b in self.block
],
axisout=1,
)
# XXX HACK
def callback(i):
p = f(self.sampling[self.block[i]], self.scene, verbose=False)
return p
shapeouts = [
(len(self.instrument), s.stop - s.start) + self.scene.shape[1:]
for s in self.block
]
proxies = proxy_group(len(self.block), callback, shapeouts=shapeouts)
return BlockColumnOperator(proxies, axisout=1)
[docs]
def get_add_grids_operator(self):
"""Return operator to add signal from detector pairs."""
nd = len(self.instrument)
if nd % 2 != 0:
raise ValueError("Odd number of detectors.")
partitionin = 2 * (len(self.instrument) // 2,)
return BlockRowOperator([I, I], axisin=0, partitionin=partitionin) # ?
[docs]
def get_subtract_grids_operator(self):
"""Return operator to subtract signal from detector pairs."""
nd = len(self.instrument)
if nd % 2 != 0:
raise ValueError("Odd number of detectors.")
partitionin = 2 * (len(self.instrument) // 2,)
return BlockRowOperator([I, -I], axisin=0, partitionin=partitionin) # ?
# def get_observation(self, map, convolution=True, noiseless=False):
# """
# tod = map2tod(acquisition, map)
# tod, convolved_map = map2tod(acquisition, map, convolution=True)
# Parameters
# ----------
# map : I, QU or IQU maps
# Temperature, QU or IQU maps of shapes npix, (npix, 2), (npix, 3)
# with npix = 12 * nside**2
# noiseless : boolean, optional
# If True, no noise is added to the observation.
# convolution : boolean, optional
# Set to True to convolve the input map by a gaussian and return it.
# Returns
# -------
# tod : array
# The Time-Ordered-Data of shape (ndetectors, ntimes).
# convolved_map : array, optional
# The convolved map, if the convolution keyword is set.
# """
# if convolution:
# convolution = self.get_convolution_peak_operator()
# map = convolution(map)
# H = self.get_operator()
# tod = H(map)
# if not noiseless:
# tod += self.get_noise()
# if convolution:
# return tod, map
# return tod
[docs]
def get_preconditioner(self, cov):
if cov is not None:
cov_inv = 1 / cov
cov_inv[np.isinf(cov_inv)] = 0.0
preconditioner = DiagonalOperator(cov_inv, broadcast="rightward")
else:
preconditioner = None
return preconditioner
[docs]
class QubicMultiAcquisitions:
"""
Instance to define the multi-frequency instrument.
Input : - dictionary : contains QUBIC informations
- Nsub : Number of sub-bands for integrating the physical bandwidth
- Nrec : Number of reconstructed maps (in the case of FMM)
- comps : List of astrophysical components (CMB, Dust, ...)
- H : List of pointing matrix if not already computed
- nu_co : Frequency of a line emission
"""
def __init__(
self, dictionary, nsub, nrec, comps=[], H=None, nu_co=None, sampling=None
):
### Define class arguments
self.dict = dictionary
self.nsub = nsub
self.nrec = nrec
self.dict["nf_sub"] = self.nsub
self.comps = comps
self.fsub = int(self.nsub / self.nrec)
### Resolve issue when comm, nprocs_instrument, nprocs_sampling are None in the dictionary.
# It will define them using codes from Acquisition in pysimulators if they are not defined by the user.
# When dict["nprocs_instrument"] is None, the test to save MPI communicator at the end of __init__ is passing while it should not.
comm = self.dict["comm"]
nprocs_instrument = self.dict["nprocs_instrument"]
nprocs_sampling = self.dict["nprocs_sampling"]
if comm is None:
comm = MPI.COMM_WORLD
if nprocs_instrument is None and nprocs_sampling is None:
nprocs_instrument = 1
nprocs_sampling = comm.size
elif nprocs_instrument is None:
if nprocs_sampling < 1 or nprocs_sampling > comm.size:
raise ValueError(
f"Invalid value for nprocs_sampling '{nprocs_sampling}'."
)
nprocs_instrument = comm.size // nprocs_sampling
else:
if nprocs_instrument < 1 or nprocs_instrument > comm.size:
raise ValueError(
f"Invalid value for nprocs_instrument '{nprocs_instrument}'."
)
nprocs_sampling = comm.size // nprocs_instrument
if nprocs_instrument * nprocs_sampling != comm.size:
raise ValueError("Invalid MPI distribution of the acquisition.")
self.dict["comm"] = comm
self.dict["nprocs_instrument"] = nprocs_instrument
self.dict["nprocs_sampling"] = nprocs_sampling
# There was code duplication in the previous version
self.allnus = []
self.allnus_rec = []
if self.dict["instrument_type"] == "MB": # to be implemented on dictionary level
print("Only the 150 GHz band will be used.")
f_bands = [
150
] # this is for the TD MonoBand instrument, the choice of the band could be implemented at dictionary level
else:
f_bands = [150, 220]
for i, f_band in enumerate(f_bands):
### Compute frequencies on the edges
_, _, nus_subbands_i, _, _, _ = compute_freq(
f_band,
Nfreq=int(self.nsub / len(f_bands)),
relative_bandwidth=self.dict["filter_relative_bandwidth"],
)
### Compute the effective reconstructed frequencies if FMM is applied
if nrec == 1:
if f_band == f_bands[0]:
nus_i = [np.mean(f_bands)]
else:
nus_i = []
else:
_, _, nus_i, _, _, _ = compute_freq(
f_band,
Nfreq=int(self.nrec / len(f_bands)),
relative_bandwidth=self.dict["filter_relative_bandwidth"],
)
### Join 150 and 220 GHz band if needed
self.allnus += list(nus_subbands_i)
self.allnus_rec += list(nus_i)
### Convert lists to numpy arrays
self.allnus = np.array(self.allnus)
self.allnus_rec = np.array(self.allnus_rec)
### Multi-frequency instrument
self.multiinstrument = QubicMultibandInstrument(self.dict)
if sampling is None:
self.sampling = get_pointing(self.dict)
else:
self.sampling = sampling
self.scene = QubicScene(self.dict)
self.npix = 12 * self.scene.nside**2
### Compute pointing matrix
self.subacqs = [
QubicAcquisition(
self.multiinstrument[i], self.sampling, self.scene, self.dict
)
for i in range(len(self.multiinstrument))
]
### CO line emission
if nu_co is not None:
dmono = self.dict.copy()
dmono["filter_nu"] = nu_co * 1e9
dmono["filter_relative_bandwidth"] = 0.05
instrument_co = QubicInstrument(dmono, FRBW=0.25)
self.multiinstrument.subinstruments += [instrument_co]
self.subacqs += [
QubicAcquisition(instrument_co, self.sampling, self.scene, dmono)
]
self.allnus = np.append(self.allnus, nu_co)
for acq in self.subacqs:
acq.comm = self.subacqs[0].comm
### Angular resolution
self.allfwhm = np.zeros(len(self.multiinstrument))
for i in range(len(self.multiinstrument)):
self.allfwhm[i] = self.subacqs[i].get_convolution_peak_operator().fwhm
### Compute the pointing matrix if not already done
if H is None:
self.H = [self.subacqs[i].get_operator() for i in range(len(self.subacqs))]
else:
self.H = H
### Save MPI communicator
if self.dict["nprocs_instrument"] != 1:
self.mpidist = self.H[0].operands[-1]
for i in range(1, len(self.H)):
self.H[i].operands[-1] = self.mpidist
### Define the number of detector and sampling (for each processors)
self.ndets = len(self.subacqs[0].instrument)
self.nsamples = len(self.sampling)
self.coverage = self._get_coverage()
def _get_coverage(self):
out = self.H[0].T(np.ones(self.H[0].T.shapein))
if self.scene.kind != "I":
out = out[..., 0].copy()
out *= self.ndets * self.nsamples * self.sampling.period / np.sum(out)
return out
def _get_mixing_matrix(self, nus, beta):
"""
Method to return mixing matrix.
If beta has shape (ncomp), then the mixing matrix will have shape (nfreq, ncomp).
If beta has shape (npix, ncomp), the the elements of the mxing matrix vary across the sky, it will have shape (npix, nfreq, ncomp)
"""
### Define Mixing Matrix with FGB classes
mm = MixingMatrix(*self.comps)
### Compute them using the eval method at each frequency nus
mixing_matrix_elements = mm.eval(nus, *beta)
_sh = mixing_matrix_elements.shape
if _sh[0] != 1:
beta = hp.ud_grade(beta, self.scene.nside)
mixing_matrix_elements = mm.eval(nus, *beta)
mixing_matrix = np.transpose(mixing_matrix_elements, (1, 0, 2))
else:
mixing_matrix = mixing_matrix_elements[0]
return np.round(mixing_matrix, 10)
def _get_mixing_operator(self, A):
"""
Method to define an operator like object for a given frequency nu, the input A should be for one frequency.
The type of operator depends on the shape of input A.
"""
if A.ndim == 1: ### If constant beta across the sky
r = ReshapeOperator((1, self.npix, 3), (self.npix, 3))
D = r * DenseOperator(
A,
broadcast="rightward",
shapein=(A.shape[0], self.npix, 3),
shapeout=(1, self.npix, 3),
)
else: ### If varying beta across the sky
r = ReshapeOperator((self.npix, 1, 3), (self.npix, 3))
_, nc = A.shape
def reshape_fct(vec, out):
out[...] = vec.T
R = Operator(
direct=reshape_fct,
transpose=reshape_fct,
shapein=(nc, self.npix, 3),
shapeout=(3, self.npix, nc),
flags="linear",
)
### if pixelization of A is lower than the one of components
if hp.npix2nside(A.shape[0]) != self.scene.nside:
A = hp.ud_grade(A.T, self.scene.nside).T
d = DenseBlockDiagonalOperator(
A[:, np.newaxis, :], broadcast="rightward", shapein=(self.npix, nc)
)
### Multiply by 3 to create A matrix for I, Q and U
D = r * BlockDiagonalOperator([d] * 3, new_axisin=0, new_axisout=2) * R
return D
[docs]
class QubicInstrumentType(QubicMultiAcquisitions):
"""
Class providing methods necessary for all instrument types.
"""
def __init__(
self, dictionary, nsub, nrec, comps=[], H=None, nu_co=None, sampling=None
):
QubicMultiAcquisitions.__init__(
self,
dictionary,
nsub=nsub,
nrec=nrec,
comps=comps,
H=H,
nu_co=nu_co,
sampling=sampling,
)
if self.dict["instrument_type"] == "DB":
self.used_bands = [150, 220]
self.nFocalPlanes = 2
elif self.dict["instrument_type"] == "UWB":
self.used_bands = [150, 220]
self.nFocalPlanes = 1
elif self.dict["instrument_type"] == "MB":
self.used_bands = [150] # this is the TD MonoBand instrument
self.nFocalPlanes = 1
else:
raise TypeError(f"{self.dict['instrument_type']} is not implemented...")
[docs]
def sum_over_band(self, h, algo, gain=None):
"""
Perform sum over sub-operators depending on the reconstruction algorithms (FMM or CMM)
"""
### Frequency Map-Making
if algo == "FMM":
op_sum = []
f = int(self.nsub / self.nrec)
h = np.array(h)
for irec in range(self.nrec):
imin = irec * f
imax = (irec + 1) * f - 1
op_sum += [
h[
(self.allnus >= self.allnus[imin])
* (self.allnus <= self.allnus[imax])
].sum(axis=0)
]
block_list = []
for iband in range(self.nFocalPlanes):
edges_band = [
iband * (self.nrec // self.nFocalPlanes),
(iband + 1) * (self.nrec // self.nFocalPlanes),
] # splitting nrec op
block_list.append(
BlockRowOperator(op_sum[edges_band[0] : edges_band[1]], new_axisin=0)
)
operator_H = BlockDiagonalOperator(block_list, new_axisout=0)
return (
ReshapeOperator(
operator_H.shapeout, (self.nFocalPlanes * self.ndets * self.nsamples)
)
* operator_H
* ReshapeOperator(
(self.nrec, self.npix, h[0].shapein[-1]),
(
operator_H.shapein
), # this reshape ensures that it works even for nrec=2
)
)
### Components Map-Making
else:
Operator_list = []
for iband in range(self.nFocalPlanes):
if gain is None:
gain_ = np.ones(self.ndets)
else:
if len(gain.shape) < 2:
gain_ = gain[:]
else:
gain_ = gain[:, iband]
G_band = DiagonalOperator(
gain_, broadcast="rightward", shapein=(self.ndets, self.nsamples)
)
edges_band = [
iband * int(self.nsub // self.nFocalPlanes),
(iband + 1) * int(self.nsub // self.nFocalPlanes),
] # splitting nsub h
Operator_list.append(
G_band * AdditionOperator(h[edges_band[0] : edges_band[1]])
)
return BlockColumnOperator(Operator_list, axisout=0)
[docs]
def get_operator(
self, A=None, gain=None, fwhm=None
): # exactly the same for DB and UWB get_operator except for lmax=2 * self.dict["nside"] (which should be the same anyway?)
"""
Method to generate the pointing matrix.
mixing_matrix : array like containing mixing matrix elements. If the elements of the mixing matrix are constant across the sky,
mixing_matrix.shape = (nfreq, ncomp)
"""
self.operator = []
for isub in range(self.nsub):
### Compute mixing matrix operator if mixing matrix is provided
if A is None:
Acomp = IdentityOperator()
algo = "FMM"
else:
Acomp = self._get_mixing_operator(A=A[isub])
algo = "CMM"
### Compute gaussian kernel to account for angular resolution
if fwhm is None:
convolution = IdentityOperator()
else:
convolution = HealpixConvolutionGaussianOperator(
fwhm=fwhm[isub], lmax=3 * self.scene.nside - 1
)
### Compose operator as H = Proj * C * A
with rule_manager(inplace=True):
hi = CompositionOperator([self.H[isub], convolution, Acomp])
self.operator.append(hi)
### Do the sum over operators depending on the reconstruction model
H = self.sum_over_band(self.operator, gain=gain, algo=algo)
return H
[docs]
def get_invntt_operator(
self, wdet, wpho150, wpho220
): # DB and UWB had the same get_invntt_operator except from the return and the det_noise=False in 220 band
"""
Method to compute the inverse noise covariance matrix in time-domain.
"""
if wdet == 0 and wpho150 == 0 and wpho220 == 0:
return IdentityOperator(
shapein=(
self.nFocalPlanes,
len(self.multiinstrument[0]),
len(self.sampling),
)
)
photon_noise = [wpho150, wpho220]
if self.dict["instrument_type"] == "UWB":
det_noise = [wdet, 0]
else: # doesn't matter for MB, because only the first is used anyway
det_noise = [wdet, wdet]
invn_list = []
for iband, band in enumerate(self.used_bands):
d = self.dict.copy()
d["filter_nu"] = band * 1e9
d["effective_duration"] = self.dict["effective_duration{}".format(band)]
inst = QubicInstrument(d)
subacq = QubicAcquisition(inst, self.sampling, self.scene, d)
invn_list.append(
subacq.get_invntt_operator(
det_noise=det_noise[iband], photon_noise=photon_noise[iband]
)
)
self.invn150 = invn_list[0] # used in PresetAcquisition.get_approx_hth
if self.dict["instrument_type"] == "UWB":
self.invN = np.sum(invn_list)
else:
self.invN = BlockDiagonalOperator(invn_list, axisout=0)
return self.invN
[docs]
class PlanckAcquisition:
def __init__(self, nus, nside, comps=None, nsub_planck=1):
"""Planck Acquisition.
Class to add Planck information to both FMM and CMM.
Parameters
----------
nus : ndarray
Planck frequencies to add to the Map-Making. Be careful, FMM uses only 143 and 217 GHz bands by default, while you can add every Planck bands in the CMM (30, 44, 70, 100, 143, 217, 353) GHz.
nside : int
Nside value for Healpy
comps : ndarray, optional
Components array build from FGbuster, by default None
nsub_planck : int, optional
Number of sub-acquisition for Planck, by default 1
Remarks
-------
For FMM, band is either 143 or 217, while it is an array of Planck bands for CMM. We should be able to build [143, 217] for the FMM but it is not working yet. This would need some work which are not a priority, as we do not aim to use the other Planck bands at MapMaking level (we only want to use them at spectrum level). For posterity, one should correct this to build a more general class, but it is not a priority now.
"""
self.nus = nus
self.nside = nside
self.comps = comps
self.nsub_planck = nsub_planck
self.npix = 12 * self.nside**2
self.fwhm = []
self.sigma = []
self.bandwidth = []
self.allnus = []
for nu in self.nus:
_planckData = pickle.load(open(PATH + f"Planck{nu}GHz.pkl", "rb"))
self.sigma.append(_planckData[f"sigma{nu}"])
self.fwhm.append(_planckData[f"fwhm{nu}"])
self.bandwidth.append(_planckData[f"bw{nu}"])
if self.nsub_planck == 1:
self.allnus = nus
else:
for inu, nu in enumerate(self.nus):
self.allnus += list(
np.linspace(
nu - self.bandwidth[inu] / 2,
nu + self.bandwidth[inu] / 2,
self.nsub_planck,
)
)
self.allnus = np.array(self.allnus)
[docs]
def get_noise(self, planck_ntot, weight_planck=1.0, seenpix=None, seed=None):
"""Planck Noise
Method to build Planck noise. It uses sigma values computed during initialisation of the classe.
Parameters
----------
planck_ntot : float
Multiplicative factor for the noise.
weight_planck : float
Weight of Planck information inside the QUBIC patch, by default 1.0
seed : int, optional
Seed for random noise generation, by default None
seenpix : array, optional
Array of pixels seen by QUBIC, by default None
Returns
-------
array
Array containing noise for Planck TOD
"""
nus = np.asarray(self.nus)
state = np.random.get_state()
np.random.seed(seed)
out = np.zeros((len(nus), self.npix, 3))
for inu in range(len(nus)):
sigma = self.sigma[inu]
out[inu] = np.random.standard_normal((self.npix, 3)) * sigma
np.random.set_state(state)
# if the information of Planck is added with weight w, the confidence in it should scale as 1/w
if (
weight_planck < 1.0 and weight_planck > 0.00001
): # avoid too small weight_planck to not let the noise explode
out[:, seenpix] = out[:, seenpix] / weight_planck
if weight_planck == 0:
out[:, seenpix] = 0.0
return out * planck_ntot
[docs]
def get_invntt_operator(
self, planck_ntot, weight_planck=1.0, seenpix=None, beam_correction=0
):
"""Planck inverse noise covariance matrix.
Method to build Planck inverse noise covariance matrix, using sigma computed during the initialisation of the class.
Parameters
----------
planck_ntot : float
Multiplicative factor for the noise
weight_planck : float, optional
Weight of Planck information inside the QUBIC patch, by default 1.0
seenpix : array, optional
Array of pixels seen by QUBIC, by default None
beam_correction : float, optional
Correction factor for the beam, by default 0
Returns
-------
_type_
_description_
"""
#! Tom: I never saw the beam_correction argument being used, but I kept it just in case
sigma = np.asarray(self.sigma)
assert sigma.shape == (len(self.nus), 3), (
f"sigma must be shape (nus,3), got {sigma.shape}"
)
npix = self.npix
if planck_ntot == 0:
return IdentityOperator(
shapein=(3 * len(self.nus) * npix)
) # in FMM, len(self.nus) is always 1, in CMM it is over the range
sigma_perpix = np.broadcast_to(sigma[:, None], (len(self.nus), npix, 3))
if beam_correction != 0:
factor = (
4
* np.pi
* (
np.rad2deg(beam_correction)
/ 2.35
/ np.degrees(hp.nside2resol(self.scene.nside))
)
** 2
)
# print(f'corrected by {factor}')
varnew = hp.smoothing(self.var.T, fwhm=beam_correction / np.sqrt(2)) / factor
sigma_perpix = 1e6 * np.sqrt(varnew.T) * planck_ntot
base_weight = 1.0 / (
(sigma_perpix * planck_ntot) ** 2
) # this is invN before correcting for the patch
beta = np.ones(npix)
if seenpix is not None:
beta[seenpix] = weight_planck
scale = np.zeros(npix) # we add a mask so to not divide by zero
beta_pos = beta > 0
scale[beta_pos] = beta[beta_pos] ** 2 # previously 1.0 / (beta[beta_pos] ** 2)
weight = base_weight * scale[None, :, None]
invN = DiagonalOperator(weight, broadcast="leftward", shapein=weight.shape)
R = ReshapeOperator(invN.shapeout, invN.shape[0])
return R(invN(R.T))
def _get_mixing_matrix(self, nus, beta):
"""Planck Mixing Matrix.
Method to compute Planck Mixing Matrix, which will be used lated to build the Planck acquisition operator.
If beta has shape (ncomp), then the mixing matrix will have shape (nfreq, ncomp).
If beta has shape (npix, ncomp), the the elements of the mxing matrix vary across the sky, it will have shape (npix, nfreq, ncomp)
Parameters
----------
nus : array
Frequencies of the Mixing Matrix.
beta : array
_description_
Returns
-------
array
Planck Mixing Matrix
"""
### Define Mixing Matrix with FGB classes
mm = MixingMatrix(*self.comps)
### Compute them using the eval method at each frequency nus
mixing_matrix_elements = mm.eval(nus, *beta)
_sh = mixing_matrix_elements.shape
if _sh[0] != 1:
beta = hp.ud_grade(beta, self.nside)
mixing_matrix_elements = mm.eval(nus, *beta)
mixing_matrix = np.transpose(mixing_matrix_elements, (1, 0, 2))
else:
mixing_matrix = mixing_matrix_elements[0]
return np.round(mixing_matrix, 6)
def _get_mixing_operator(self, A):
"""Planck Mixing Operator.
Method to define an operator like object for a given frequency nu, the input A should be for one frequency.
The type of operator depends on the shape of input A.
Parameters
----------
A : array
Planck Mixing Matrix.
Returns
-------
BlockDiagonalOperator
Mixing operator.
"""
if A.ndim == 1: ### If constant beta across the sky
r = ReshapeOperator((1, self.npix, 3), (self.npix, 3))
D = r * DenseOperator(
A,
broadcast="rightward",
shapein=(A.shape[0], self.npix, 3),
shapeout=(1, self.npix, 3),
)
else: ### If varying beta across the sky
r = ReshapeOperator((self.npix, 1, 3), (self.npix, 3))
_, nc = A.shape
def reshape_fct(vec, out):
out[...] = vec.T
R = Operator(
direct=reshape_fct,
transpose=reshape_fct,
shapein=(nc, self.npix, 3),
shapeout=(3, self.npix, nc),
flags="linear",
)
### if pixelization of A is lower than the one of components
if hp.npix2nside(A.shape[0]) != self.nside:
A = hp.ud_grade(A.T, self.nside).T
d = DenseBlockDiagonalOperator(
A[:, np.newaxis, :], broadcast="rightward", shapein=(self.npix, nc)
)
### Multiply by 3 to create A matrix for I, Q and U
D = r * BlockDiagonalOperator([d] * 3, new_axisin=0, new_axisout=2) * R
return D
[docs]
def get_operator(self, A=None, fwhm=None, comm=None, nu_co=None):
"""Planck Acquisition Operator.
Method to build the acquisition operator for Planck. This operator is composed at first by a convolution operator at Planck FWHM. Then, for the Component MapMaking, a Mixing Operator is added. Finally, we have the operator to turn maps into TOD.
Parameters
----------
A : array, optional
Mixing Matrix of Planck. If None, the Mixing Operator will be the Identity (FMM case), not None, the Mixing Operator will be computated and then added (CMM case), by default None
fwhm : array, optional
Array of lenght the number of Planck bands considered containing Planck FWHM. If None, the Convolution Operator will be Identity (case without convolution), if not None, the Convolution Operator will be computed and then added, by default None
comm : MPI communicator, optional
MPI communicator from pyoperators, by default None
nu_co : bool, optional
Bool to add Carbon Oxyde emission line, not supported yet, by default None
Returns
-------
BlockColumnOperator
Planck Acquisition Operator.
"""
Rmap2tod = ReshapeOperator((12 * self.nside**2, 3), (3 * 12 * self.nside**2))
Operator = []
k = 0
for _ in self.nus:
ope_i = []
for _ in range(self.nsub_planck):
if fwhm is not None:
C = HealpixConvolutionGaussianOperator(
fwhm=fwhm[k], lmax=3 * self.nside - 1
)
else:
C = IdentityOperator()
if A is not None:
D = self._get_mixing_operator(A=A[k])
else:
D = IdentityOperator()
ope_i += [C * D]
k += 1
if comm is not None:
Operator.append(
comm * Rmap2tod(AdditionOperator(ope_i) / self.nsub_planck)
)
else:
Operator.append(Rmap2tod(AdditionOperator(ope_i) / self.nsub_planck))
return BlockColumnOperator(Operator, axisout=0)
[docs]
class JointAcquisitionFrequencyMapMaking:
def __init__(
self, d, Nrec, Nsub, H=None, nsub_planck=1, is_external_data=False, sampling=None
):
self.d = d
self.Nrec = Nrec
self.Nsub = Nsub
self.is_external_data = is_external_data
self.qubic = QubicInstrumentType(
self.d, self.Nsub, self.Nrec, comps=[], H=H, nu_co=None, sampling=sampling
)
self.scene = self.qubic.scene
if self.is_external_data:
self.pl143 = PlanckAcquisition(
nus=[143], nside=self.scene.nside, comps=None, nsub_planck=nsub_planck
)
self.pl217 = PlanckAcquisition(
nus=[217], nside=self.scene.nside, comps=None, nsub_planck=nsub_planck
)
self.planck_acquisition = [self.pl143, self.pl217]
[docs]
def get_operator(self, fwhm=None, seenpix=None):
### nstokes is hardcoded to nstokes = 3
### We could retrieve it in the shape of H if we want to implement a different nstokes case
nstokes = 3
### The operator that allows the focus on seenpix:
### shapein : (self.Nrec, sum(seenpix), nstokes)
### shapeout: (self.Nrec, npix, nstokes)
if seenpix is not None:
U = (
ReshapeOperator(
(self.Nrec * sum(seenpix) * nstokes),
(self.Nrec, sum(seenpix), nstokes),
)
* PackOperator(
np.broadcast_to(
seenpix[:, None], (self.Nrec, seenpix.size, nstokes)
).copy()
)
).T
else:
U = IdentityOperator()
### Get QUBIC H operator
H = [self.qubic.get_operator(fwhm=fwhm)]
if self.is_external_data:
R_planck = ReshapeOperator(
(12 * self.qubic.scene.nside**2, nstokes),
(12 * self.qubic.scene.nside**2 * nstokes),
)
H_planck_ = BlockDiagonalOperator([R_planck] * self.Nrec, new_axisout=0)
R_diag = ReshapeOperator(H_planck_.shapeout, H_planck_.shape[0])
H_planck = R_diag(H_planck_)
H.append(H_planck)
return BlockColumnOperator(H, axisout=0) * U
[docs]
def get_invntt_operator( # We stack the invNqubic and invN_planck on top of eachother
self,
qubic_ndet,
qubic_npho150,
qubic_npho220,
planck_ntot,
seenpix,
weight_planck=1,
beam_correction=None,
):
if beam_correction is None:
beam_correction = [0] * self.Nrec
invNq = self.qubic.get_invntt_operator(
qubic_ndet, qubic_npho150, qubic_npho220
) # add weight of Qubic detector and photon noise
R = ReshapeOperator(invNq.shapeout, invNq.shape[0])
invN = [R(invNq(R.T))]
if self.is_external_data:
invntt_planck143 = self.pl143.get_invntt_operator(
planck_ntot, weight_planck=weight_planck, seenpix=seenpix
)
invntt_planck217 = self.pl217.get_invntt_operator(
planck_ntot, weight_planck=weight_planck, seenpix=seenpix
)
R_planck = ReshapeOperator(
invntt_planck143.shapeout, invntt_planck143.shape[0]
)
invN_143 = R_planck(invntt_planck143(R_planck.T))
invN_217 = R_planck(invntt_planck217(R_planck.T))
if self.Nrec == 1:
# invNe = [invN_143, invN_217]
invNe = [invN_143]
else:
invNe = [invN_143] * int(self.Nrec / 2) + [invN_217] * int(self.Nrec / 2)
invN += invNe
return BlockDiagonalOperator(invN, axisout=0)
[docs]
class JointAcquisitionComponentsMapMaking:
def __init__(
self,
d,
comp,
Nsub,
nus_external,
nsub_planck,
nu_co=None,
H=None,
weight_planck=1.0,
):
self.d = d
self.Nsub = Nsub
self.comp = comp
self.nus_external = nus_external
self.nsub_planck = nsub_planck
self.weight_planck = weight_planck
### Select the instrument model
self.qubic = QubicInstrumentType(
self.d, self.Nsub, nrec=2, comps=self.comp, H=H, nu_co=nu_co
)
self.scene = self.qubic.scene
self.external = PlanckAcquisition(
nus=self.nus_external,
nside=self.scene.nside,
comps=self.comp,
nsub_planck=self.nsub_planck,
)
self.allnus = np.array(list(self.qubic.allnus) + list(self.external.allnus))
[docs]
def get_operator(self, A, gain=None, fwhm=None, nu_co=None):
Aq = A[: self.Nsub]
Ap = A[self.Nsub :]
if fwhm is None:
fwhm_q = None
fwhm_p = None
else:
fwhm_q = fwhm[: self.Nsub]
fwhm_p = fwhm[self.Nsub :]
Hq = self.qubic.get_operator(A=Aq, gain=gain, fwhm=fwhm_q)
Rq = ReshapeOperator(Hq.shapeout, (Hq.shapeout[0] * Hq.shapeout[1]))
try:
mpidist = self.qubic.mpidist
except Exception:
mpidist = None
He = self.external.get_operator(A=Ap, fwhm=fwhm_p, comm=mpidist) # , nu_co=nu_co)
return BlockColumnOperator([Rq * Hq, He], axisout=0)
[docs]
def get_invntt_operator(
self, qubic_ndet, qubic_npho150, qubic_npho220, planck_ntot, seenpix=None
):
invNq = self.qubic.get_invntt_operator(qubic_ndet, qubic_npho150, qubic_npho220)
R = ReshapeOperator(invNq.shapeout, invNq.shape[0])
invNe = self.external.get_invntt_operator(
planck_ntot, weight_planck=self.weight_planck, seenpix=seenpix
)
return BlockDiagonalOperator([R(invNq(R.T)), invNe], axisout=0)