from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
import fgbuster.mixingmatrix as mm
import healpy as hp
import numpy as np
from pyoperators import MPI
[docs]
@dataclass
class ParamLayout:
beta_indices: list # [(comp_index, position_in_x)]
blind_indices: list # [(comp_index, start, length)]
ndim: int
def _dot(x, y, comm):
d = np.array(np.dot(x.ravel(), y.ravel()))
if comm is not None:
comm.Allreduce(MPI.IN_PLACE, d)
return d
[docs]
class AbstractChi2(ABC):
"""
Common base class for chi2-like objects.
Subclasses must implement compute_mixing_matrix(x) and, if needed,
compute_chi_square(x). A helper compute_qubic_chi(A) is provided
because both original classes shared that logic.
"""
def __init__(
self,
preset,
TOD_sim,
layout: Optional[ParamLayout] = None,
beta_map: Optional[np.ndarray] = None,
):
self.preset = preset
self.TOD_sim = TOD_sim
self.layout = layout
self.beta_map = beta_map
# Deduce shapes for 3D or 4D TOD_sim
if self.TOD_sim.ndim == 3:
# (ncomp, nfreq, nsampling_ndet)
self.ncomp, self.nfreq, self.nsampling_ndet = self.TOD_sim.shape
self.npix = 1
elif self.TOD_sim.ndim == 4:
# (ncomp, nfreq, npix, nsampling_ndet)
self.ncomp, self.nfreq, self.npix, self.nsampling_ndet = self.TOD_sim.shape
self.seenpix_beta = self.beta_map == hp.UNSEEN
else:
raise TypeError("TOD_sim should have 3 or 4 dimensions.")
self.nus = preset.qubic.joint_out.allnus
self.nFP = preset.qubic.joint_out.qubic.nFocalPlanes
self.nsub = self.nfreq // self.nFP
self.fsub = self.nfreq // self.preset.comp.params_foregrounds["bin_mixing_matrix"]
# Pre-definition of Planck Likelihood
self.Lplanck = 0
# Build per-focal-plane TOD_sim
self.TOD_sim_fp = []
for i in range(self.nFP):
block = self.TOD_sim[:, self.nsub * i : self.nsub * (i + 1)]
if block.ndim == 3: # 3D block: (ncomp, nsub, nsampling_ndet)
resh = block.reshape((self.ncomp * self.nsub * 1, self.nsampling_ndet))
else: # 4D block: (ncomp, nsub, npix, nsampling_ndet)
resh = block.reshape(
(self.ncomp * self.nsub * self.npix, self.nsampling_ndet)
)
self.TOD_sim_fp.append(resh)
self.TOD_sim_fp = np.asarray(self.TOD_sim_fp)
[docs]
@abstractmethod
def compute_mixing_matrix(self, x) -> np.ndarray:
"""Return mixing matrix A with shape (nfreq, ncomp)."""
raise NotImplementedError
[docs]
def compute_qubic_chi(self, A):
"""
Shared computation of the QUBIC time-domain chi^2 given mixing
matrix A. Returns Lqubic and stores it on the instance.
"""
# build ysim concatenating focal planes
ysim_parts = []
for i in range(self.nFP):
a_slice = A[self.nsub * i : self.nsub * (i + 1)] # shape (nsub, ncomp)
vec = a_slice.T.reshape(self.ncomp * self.nsub) @ self.TOD_sim_fp[i]
ysim_parts.append(vec)
ysim = np.concatenate(ysim_parts, axis=0)
residuals = ysim - self.preset.acquisition.TOD_qubic
self.Lqubic = 0.5 * _dot(
residuals.T,
self.preset.acquisition.invN.operands[0](residuals),
self.preset.comm,
)
return self.Lqubic
[docs]
def compute_qubic_chi_varying_beta(self, A):
ysim_parts = []
for i in range(self.nFP):
a_slice = A[
self.seenpix_beta[0], self.nsub * i : self.nsub * (i + 1)
] # (npix, nsub, ncomp)
vec = (
a_slice.T.reshape(self.ncomp * self.nsub * a_slice.shape[0])
@ self.TOD_sim_fp[i]
)
ysim_parts.append(vec)
ysim = np.concatenate(ysim_parts, axis=0)
residuals = ysim - self.preset.acquisition.TOD_qubic
Lqubic = 0.5 * _dot(
residuals.T,
self.preset.acquisition.invN.operands[0](residuals),
self.preset.comm,
)
return Lqubic
# default call: subclasses may override if they need to alter behavior
def __call__(self, x):
Lqubic = self.compute_qubic_chi(self.compute_mixing_matrix(x))
# allow subclasses to add Lplanck if needed (they should set self.Lplanck)
return Lqubic + getattr(self, "Lplanck", 0)
[docs]
class MixedChi2(AbstractChi2):
def __init__(self, preset, TOD_sim, layout: ParamLayout):
super().__init__(preset, TOD_sim, layout=layout)
self.layout = layout
# packing
[docs]
def unpack(self, x):
beta = {}
Amm = {}
for comp, idx in self.layout.beta_indices:
beta[comp] = x[idx]
for comp, start, n in self.layout.blind_indices:
Amm[comp] = x[start : start + n]
return beta, Amm
[docs]
def compute_mixing_matrix(self, x):
beta, Amm = self.unpack(x)
A = self.preset.acquisition.Amm_iter.copy()
# parametric
for comp, b in beta.items():
model = mm.MixingMatrix(self.preset.comp.components_model_out[comp - 1])
A[:, comp] = model.eval(self.nus, b)[:, 0]
# blind
for comp, v in Amm.items():
A[:, comp] = v
return A
def __call__(self, x):
A = self.compute_mixing_matrix(x)
return self.compute_qubic_chi(A)
[docs]
class Chi2(AbstractChi2):
def __init__(self, preset, TOD_sim, parametric=True, beta_map=None):
# original class raised on 4D TOD; keep that behaviour explicit
super().__init__(preset, TOD_sim, beta_map=beta_map)
self.parametric = parametric
self.beta_map = beta_map
[docs]
def compute_mixing_matrix_parametric(self, nus, x):
"""
Parametric case
"""
mixingmatrix = mm.MixingMatrix(*self.preset.comp.components_model_out)
return mixingmatrix.eval(nus, *x)
[docs]
def compute_mixing_matrix_blind(self, x):
"""
Blind case
"""
A = np.ones((self.nfreq, self.ncomp))
k = 0
for i in range(self.preset.comp.params_foregrounds["bin_mixing_matrix"]):
for j in range(1, self.ncomp):
A[i * self.fsub : (i + 1) * self.fsub, j] = np.array([x[k]] * self.fsub)
k += 1
return A
[docs]
def compute_mixing_matrix(self, x):
if self.parametric:
return self.compute_mixing_matrix_parametric(self.nus, x)
else:
return self.compute_mixing_matrix_blind(x)
[docs]
def compute_chi_square_fix_beta(self, x):
A = self.compute_mixing_matrix(x)
self.Lqubic = self.compute_qubic_chi(A)
self.Lplanck = 0
if self.parametric:
Aext = A[self.nfreq :]
# TODO : should we convolve here ? Tom : I think we should if convolutin_out==True, but fwhm_mapmaking is not adapted for that, we need to compute specific fwhm for planck
H_planck = self.preset.qubic.joint_out.external.get_operator(A=Aext)
comp = self.preset.comp.components_iter.copy()
comp[:, ~self.preset.sky.seenpix] = 0
ysim_pl = H_planck(comp)
_residuals_pl = (
np.r_[ysim_pl] - self.preset.acquisition.TOD_external_zero_outside_patch
)
self.Lplanck = 0.5 * _dot(
_residuals_pl.T,
self.preset.acquisition.invN.operands[1](_residuals_pl),
self.preset.comm,
)
return self.Lqubic, self.Lplanck
[docs]
def compute_chi_square_varying_beta(self, x):
### Fill the full sky map of beta with the unknowns
beta_map = self.beta_map.copy()
x = x.reshape(((self.ncomp - 1) * self.npix))
beta_map[self.seenpix_beta] = x
### Compute the mixing matrix for the full sky
A = self.compute_mixing_matrix_parametric(self.nus, beta_map)
Aext = A[:, self.nfreq :]
### Qubic chi2
self.Lqubic = self.compute_qubic_chi_varying_beta(A)
### Planck chi2
H_planck = self.preset.qubic.joint_out.external.get_operator(
A=Aext.transpose(1, 0, 2)
)
ysim_pl = H_planck(self.preset.comp.components_iter.copy())
residuals_pl = np.r_[ysim_pl] - self.preset.acquisition.TOD_external
self.Lplanck = 0.5 * _dot(
residuals_pl.T,
self.preset.acquisition.invN.operands[1](residuals_pl),
self.preset.comm,
)
return self.Lqubic, self.Lplanck
[docs]
def compute_chi_square(self, x):
if self.TOD_sim.ndim == 3:
return self.compute_chi_square_fix_beta(x)
elif self.TOD_sim.ndim == 4:
return self.compute_chi_square_varying_beta(x)
else:
raise TypeError("TOD_sim should have 3 or 4 dimensions.")
def __call__(self, x):
Lqubic, Lplanck = self.compute_chi_square(x)
L = Lqubic
if self.parametric:
L += Lplanck
return L