import os
import healpy as hp
import matplotlib.cm as cm
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
import numpy as np
import yaml
from getdist import MCSamples, plots
from matplotlib.gridspec import GridSpec
from pysimulators.interfaces.healpy import HealpixConvolutionGaussianOperator
def _plot_reconstructed_maps(
maps,
m_in,
seenpix,
name_file,
center,
max,
min,
reso=15,
iter=0,
):
"""
Save a svg with the actual maps at iteration i. It assumes that maps is 3-dimensional
"""
stk = ["I", "Q", "U"]
Nmaps, _, Nstk = maps.shape
res = maps - m_in
k = 0
plt.figure(figsize=(12, 3 * Nmaps))
for imaps in range(Nmaps):
maps[imaps, ~seenpix, :] = hp.UNSEEN
res[imaps, ~seenpix, :] = hp.UNSEEN
for istk in range(Nstk):
hp.gnomview(
maps[imaps, :, istk],
rot=center,
reso=reso,
cmap="jet",
sub=(Nmaps, Nstk * 2, k + 1),
notext=True,
max=max[imaps, istk],
min=min[imaps, istk],
title=f"Output - {stk[istk]}",
)
hp.gnomview(
res[imaps, :, istk],
rot=center,
reso=reso,
cmap="jet",
sub=(Nmaps, Nstk * 2, k + 2),
notext=True,
max=max[imaps, istk],
min=min[imaps, istk],
title=f"Residuals - {stk[istk]}",
)
k += 2
plt.suptitle(f"Iteration #{iter}", fontsize=15)
plt.tight_layout()
plt.savefig(name_file)
plt.close()
[docs]
def Dl2Cl(ell, Dl):
"""
Convert angular power spectrum from D_ell to C_ell.
Formula
---
C_ell = D_ell * 2*pi / (ell * (ell + 1))
Parameters
--
ell : array_like
Multipole moments (1D). Values must be > 0 (division by zero otherwise).
Dl : array_like
D_ell values. Can be scalar, 1D (same length as `ell`) or broadcastable to `ell`.
Typical shape in this module: (n_nus, n_nus, n_bins).
Returns
---
ndarray
C_ell values with the same shape as the broadcast result of `Dl` and `ell`.
Notes
-
- Uses elementwise broadcasting; ensure `ell` and `Dl` shapes are compatible.
- No guard is performed for ell == 0; caller must avoid or mask such entries.
"""
return Dl * 2 * np.pi / (ell * (ell + 1))
[docs]
def Cl2BK(ell, Cl):
"""
Convert C_ell to the bandpower-like quantity 100 * ell * C_ell / (2*pi).
Formula
---
result = 100 * ell * C_ell / (2*pi)
Parameters
--
ell : array_like
Multipole moments (1D). Should match or broadcast with `Cl`.
Cl : array_like
C_ell values. Can be scalar, 1D or broadcastable to `ell`.
Typical shape in this module: (n_nus, n_nus, n_bins).
Returns
---
ndarray
Transformed bandpower values with shape equal to the broadcasted inputs.
"""
return 100 * ell * Cl / (2 * np.pi)
[docs]
def plot_cross_spectrum(nus, ell, Dl, Dl_err, ymodel, Dl2=None, Dl2_err=None, label_model="CMB + Dust", nbins=None, nrec=2, mode="Dl", figsize=None, title=None, name=None, dpi=300):
"""
Plot the upper-triangle matrix of cross-angular power spectra D_ell (and optional model).
The function arranges a len(nus) x len(nus) grid and fills only the upper triangle
(including diagonal) with small subplots labelled by the frequency pair `nus[i] x nus[j]`.
It draws data errorbars, an optional second series (Dl - noise if `Dl_noise` provided),
and a model line (from `ymodel`) either in D_ell units or transformed to the
"100 * ell * C_ell / (2*pi)" units depending on `mode`.
Parameters
--
nus : array_like
1D array of frequency identifiers (used for subplot annotations). Length = n_nus.
ell : array_like
1D array of multipole moments. Length >= nbins (if nbins provided).
Must be > 0 to avoid division-by-zero in conversions.
Dl : ndarray
Data D_ell values, expected shape (n_nus, n_nus, n_ell) or broadcastable to that.
Dl_noise : ndarray or None
Errors on Dl with same shape as `Dl` (or broadcastable). If provided, an additional
series `Dl - Dl_noise` will be plotted where applicable. Errors are absolute-valued
(the function applies np.abs).
ymodel : ndarray or None
Model values for plotting. Expected shape (n_nus, n_nus, n_ell) (or broadcastable).
If None, no model line is drawn.
label_model : str, optional
Legend label for the model line (default: "CMB + Dust").
nbins : int or None, optional
Number of ell bins to plot. If None (default) uses len(ell).
nrec : int, optional
Number of "recon" channels used to choose subplot background color and styling.
Default is 2.
mode : {"Dl", ...}, optional
If "Dl" the data are plotted in D_ell units. Otherwise the model/data are
transformed via `_Dl2Cl` and `_Cl2BK` before plotting (matching original behaviour).
ft_nus : int, optional
Font size for the subplot frequency annotations (default: 10).
figsize : tuple, optional
Matplotlib figure size (default: (10, 8)).
title : str or None, optional
Suptitle appended to the fixed prefix "Angular Cross-Power Spectra".
If None, only the prefix is used.
name : str or None, optional
If provided, the figure is saved to this filename as a PDF.
Side effects
- Creates a matplotlib figure, shows it with plt.show() and optionally saves it.
- Does not return the figure (returns None). If you need the figure object, modify the
function to `return fig` after creation.
Notes
-
- The function preserves exact plotting order, labels and colours of the original code.
- The caller must ensure shapes of `nus`, `ell`, `Dl`, `Dl_noise`, and `ymodel` are compatible.
"""
n = len(nus)
if figsize is None:
figsize = (2.2 * n, 2.2 * n)
# Dynamic font scaling based on figure height
# Guard against n <= 0 and large figsize values. Compute a scale factor
# that decreases with number of subplots and increases with figure height.
count_factor = 1.0 / np.sqrt(max(1, n))
scale_factor = (float(figsize[1]) / 8.0) * count_factor
# Clamp sizes to reasonable readable bounds so very large figures (e.g.
# figsize=(25,25)) don't produce oversized fonts. Ranges chosen to keep
# ticks and small labels readable while preventing overlap.
# ft_axis: tick label size (6..16)
# ft_nus: small subtitle/annotation size (8..14)
# ft_title: suptitle size (12..28)
ft_axis = int(np.clip(13.0 * scale_factor, 6, 16))
ft_nus = int(np.clip(10.0 * scale_factor, 8, 14))
ft_title = int(np.clip(32.0 * np.sqrt(max(scale_factor, 0.1)), 12, 28))
# defaults & preproc
if nbins is None:
nbins = len(ell)
# keep absolute-valued errors as in original
Dl_err = np.abs(Dl_err) if Dl_err is not None else None
ell_sel = ell[:nbins]
fig = plt.figure(figsize=figsize, dpi=dpi)
gs = GridSpec(len(nus), len(nus), figure=fig)
kp = 0 # subplot counter (used to control first-plot labelling)
def _model_plot(ax, i, j, kp):
"""Plot model lines respecting the original oddity:
- if kp == 0: plot *only* the mode-specific model (with label)
- else: if ymodel exists, plot both Dl model and transformed Cl model (no label)
"""
if ymodel is None:
return
y = ymodel[i, j, :nbins]
if kp == 0:
if mode == "Dl":
ax.plot(ell_sel, y, "--r", label=label_model)
else:
ax.plot(ell_sel, Cl2BK(ell_sel, Dl2Cl(ell_sel, y)), "--r", label=label_model)
else:
if mode == "Dl":
ax.plot(ell_sel, y, "--r")
else:
ax.plot(ell_sel, Cl2BK(ell_sel, Dl2Cl(ell_sel, y)), "--r")
def _plot_errorbars(ax, i, j, color_main, color_second=None, label_main=None, label_second=None):
"""Add errorbars in either 'Dl' or transformed mode."""
main = Dl[i, j, :nbins]
main_err = Dl_err[i, j, :nbins] if Dl_err is not None else None
# plot main series
if mode == "Dl":
ax.errorbar(ell_sel, main, yerr=main_err, capsize=5, color=color_main, fmt="-o", label=label_main)
else:
y_main = Cl2BK(ell_sel, Dl2Cl(ell_sel, main))
yerr_main = Cl2BK(ell_sel, Dl2Cl(ell_sel, main_err)) if main_err is not None else None
ax.errorbar(ell_sel, y_main, yerr=yerr_main, capsize=5, color=color_main, fmt="-o", label=label_main)
# optional second series (Dl - noise)
if Dl2 is not None:
sec = Dl2[i, j, :nbins]
sec_err = Dl2_err[i, j, :nbins] if Dl2_err is not None else None
if mode == "Dl":
ax.errorbar(ell_sel, sec, yerr=sec_err, capsize=5, color=color_second or "orange", fmt="-o", label=label_second)
else:
y_sec = Cl2BK(ell_sel, Dl2Cl(ell_sel, sec))
yerr_sec = Cl2BK(ell_sel, Dl2Cl(ell_sel, sec_err)) if sec_err is not None else None
ax.errorbar(ell_sel, y_sec, yerr=yerr_sec, capsize=5, color=color_second or "orange", fmt="-o", label=label_second)
# iterate over upper triangle (including diagonal)
for i in range(len(nus)):
for j in range(i, len(nus)):
ax = fig.add_subplot(gs[i, j])
# labels (preserve font sizes)
if i == j:
ax.set_xlabel(r"$\ell$", fontsize=2 * ft_axis)
if mode == "Dl":
ax.set_ylabel(r"$\mathcal{D}_{\ell}$", fontsize=2 * ft_axis)
else:
ax.set_ylabel(r"100 $ \frac{\ell \mathcal{C}_{\ell}}{2 \pi}$", fontsize=2 * ft_axis)
# set tick label rotation and size per-axis so small/large figures
# and varying `n` use the computed `ft_axis` consistently
ax.tick_params(axis="x", labelrotation=30, labelsize=ft_axis)
ax.tick_params(axis="y", labelrotation=-45, labelsize=ft_axis)
# model plotting (keeps original behavior/oddity)
_model_plot(ax, i, j, kp)
ax.patch.set_alpha(0.2)
ax.annotate(f"{nus[i]}x{nus[j]}", xy=(0.1, 0.9), fontsize=ft_nus, xycoords="axes fraction", color="black", weight="bold")
# facecolor + plotting choices exactly as original
if i < nrec and j < nrec:
ax.set_facecolor("blue")
if kp == 0:
_plot_errorbars(
ax,
i,
j,
color_main="darkblue",
color_second="orange",
label_main=r"$\mathcal{D}_{\ell}^{\nu_1 \times \nu_2}$",
label_second=r"$\mathcal{D}_{\ell}^{\nu_1 \times \nu_2} - \mathcal{N}_{\ell}^{\nu_1 \times \nu_2}$",
)
else:
_plot_errorbars(
ax,
i,
j,
color_main="darkblue",
color_second="orange",
)
elif i < nrec and j >= nrec:
ax.set_facecolor("skyblue")
_plot_errorbars(ax, i, j, color_main="blue", color_second="orange")
else:
ax.set_facecolor("green")
if kp == 0:
_plot_errorbars(
ax,
i,
j,
color_main="blue",
color_second="orange",
label_main=r"$\mathcal{D}_{\ell}^{\nu_1 \times \nu_2}$",
label_second=r"$\mathcal{D}_{\ell}^{\nu_1 \times \nu_2} - \mathcal{N}_{\ell}^{\nu_1 \times \nu_2}$",
)
else:
_plot_errorbars(ax, i, j, color_main="blue", color_second="orange")
kp += 1
# title / legend / save / show
if title is not None:
title = "Angular Cross-Power Spectra" + title
else:
title = "Angular Cross-Power Spectra"
fig.suptitle(title, fontsize=ft_title, fontweight="bold", y=0.96)
fig.subplots_adjust(left=0.05, right=0.98, bottom=0.05, top=0.9, wspace=0.25, hspace=0.30)
fig.legend(fontsize=ft_title, bbox_to_anchor=(0.4, 0.4))
if name is not None:
plt.savefig(name)
[docs]
class Plots:
"""
Instance for plotting results of Monte-Carlo Markov Chain (i.e emcee).
"""
def __init__(self):
pass
def _make_samples(self, chain, names, labels):
self.sample = MCSamples(samples=chain, names=names, labels=labels)
[docs]
def make_list_free_parameter(self):
"""
Make few list :
- fp : list of value of free parameters
- fp_name : list of name for each values
- fp_latex : list of name in LateX for each values
"""
fp = []
fp_name = []
fp_latex = []
k = 0
for iname, name in enumerate(self.params["Sky"].keys()):
try:
# print('yes')
for jname, n in enumerate(self.params["Sky"][name]):
if type(self.params["Sky"][name][n]) is list:
# print(self.params['Sky'][name][n], n)
if self.params["Sky"][name][n][1] == "f":
fp += [self.params["Sky"][name][n][0]]
fp_latex += [self.params["Sky"][name][n][2]]
fp_name += [list(self.params["Sky"][name].keys())[k]]
k += 1
k = 0
except Exception:
pass
return fp, fp_name, fp_latex
def _set_marker(self, values):
"""
Define the markers to see the input values in GetDist plot.
"""
dict = {}
for ii, i in enumerate(values):
# print(self.names[ii], values[ii])
dict[self.names[ii]] = values[ii]
if self.params["Sampler"]["markers"] is False:
dict = None
return dict
[docs]
def get_convergence(self, chain, job_id):
"""
chain assumed to be not flat with shape (nsamples, nwalkers, nparams)
"""
with open("params.yml", "r") as stream:
self.params = yaml.safe_load(stream)
self.values, self.names, self.labels = self.make_list_free_parameter()
plt.figure(figsize=(4, 4))
for i in range(chain.shape[2]):
plt.subplot(chain.shape[2], 1, i + 1)
plt.plot(chain[:, :, i], "-b", alpha=0.2)
plt.plot(np.mean(chain[:, :, i], axis=1), "-r", alpha=1)
plt.axhline(self.values[i], ls="--", color="black")
plt.ylabel(self.names[i], fontsize=12)
plt.xlabel("Iterations", fontsize=12)
plt.savefig(f"allplots_{job_id}/Convergence_chain.svg")
plt.close()
[docs]
def get_triangle(self, chain, names, labels, job_id):
"""
Make triangle plot of each estimated parameters
"""
with open("params.yml", "r") as stream:
self.params = yaml.safe_load(stream)
self.values, self.names, self.labels = self.make_list_free_parameter()
self.marker = self._set_marker(self.values)
print(self.marker)
self._make_samples(chain, names, labels)
plt.figure(figsize=(8, 8))
# Triangle plot
g = plots.get_subplot_plotter()
g.triangle_plot(
[self.sample],
filled=True,
markers=self.marker,
title_limit=self.params["Sampler"]["title_limit"],
)
# title_limit=1)
plt.savefig(f"allplots_{job_id}/triangle_plot.svg")
plt.close()
[docs]
def get_Dl_plot(self, ell, Dl, Dl_noise, nus, job_id, figsize=(10, 10), model=None):
plt.figure(figsize=figsize)
k = 0
for i in range(len(nus)):
for j in range(len(nus)):
plt.subplot(len(nus), len(nus), k + 1)
plt.errorbar(ell, Dl[k], yerr=Dl_noise[k], fmt="or")
if model is not None:
plt.errorbar(ell, model[k], fmt="-k")
plt.title(f"{nus[i]:.0f}x{nus[j]:.0f}")
# plt.yscale('log')
k += 1
plt.tight_layout()
plt.savefig(f"allplots_{job_id}/Dl_plot.svg")
plt.close()
[docs]
class PlotsFMM:
def __init__(self, seenpix):
self.stk = ["I", "Q", "U"]
self.seenpix = seenpix
[docs]
def plot_frequency_maps(self, m_in, m_out, center, nus, reso=15, nsig=3, filename=None):
Nf, _, Nstk = m_in.shape
res = m_out - m_in
plt.figure(figsize=(15, 3.3 * Nf))
k = 1
for inu in range(Nf):
for istk in range(Nstk):
max_out = np.max(np.abs(m_out[inu, self.seenpix, istk]))
max_res = np.max(np.abs(res[inu, self.seenpix, istk]))
hp.gnomview(
m_out[inu, :, istk],
rot=center,
reso=reso,
cmap="jet",
min=-max_out,
max=max_out,
sub=(Nf, 6, k),
title=f"{nus[inu]:.1f} GHz - Output {self.stk[istk]}",
notext=True,
)
hp.gnomview(
res[inu, :, istk],
rot=center,
reso=reso,
cmap="jet",
min=-max_res,
max=max_res,
sub=(Nf, 6, k + 1),
title=f"{nus[inu]:.1f} GHz - Residuals {self.stk[istk]}",
notext=True,
)
k += 2
if filename is not None:
plt.savefig(filename)
plt.close()
[docs]
def plot_FMM_old(
self,
m_in,
m_out,
center,
seenpix,
nus,
job_id,
figsize=(10, 8),
istk=1,
nsig=3,
name="signal",
):
m_in[:, ~seenpix, :] = hp.UNSEEN
m_out[:, ~seenpix, :] = hp.UNSEEN
plt.figure(figsize=figsize)
k = 1
for i in range(self.params["QUBIC"]["nrec"]):
hp.gnomview(
m_in[i, :, istk],
rot=center,
reso=15,
cmap="jet",
min=-nsig * np.std(m_out[0, seenpix, istk]),
max=nsig * np.std(m_out[0, seenpix, istk]),
sub=(self.params["QUBIC"]["nrec"], 3, k),
title=r"Input - $\nu$ = " + f"{nus[i]:.0f} GHz",
)
hp.gnomview(
m_out[i, :, istk],
rot=center,
reso=15,
cmap="jet",
min=-nsig * np.std(m_out[0, seenpix, istk]),
max=nsig * np.std(m_out[0, seenpix, istk]),
sub=(self.params["QUBIC"]["nrec"], 3, k + 1),
title=r"Output - $\nu$ = " + f"{nus[i]:.0f} GHz",
)
res = m_in[i, :, istk] - m_out[i, :, istk]
res[~seenpix] = hp.UNSEEN
hp.gnomview(
res,
rot=center,
reso=15,
cmap="jet",
min=-nsig * np.std(m_out[0, seenpix, istk]),
max=nsig * np.std(m_out[0, seenpix, istk]),
sub=(self.params["QUBIC"]["nrec"], 3, k + 2),
)
k += 3
plt.savefig(f"FMM/allplots_{job_id}/frequency_maps_{self.stk[istk]}_{name}.svg")
plt.close()
[docs]
def plot_FMM_mollview(self, m_in, m_out, nus, job_id, figsize=(10, 8), istk=1, nsig=3, fwhm=0):
C = HealpixConvolutionGaussianOperator(fwhm=fwhm)
plt.figure(figsize=figsize)
k = 1
for i in range(self.params["QUBIC"]["nrec"]):
hp.mollview(
C(m_in[i, :, istk]),
cmap="jet",
min=-nsig * np.std(m_out[0, :, istk]),
max=nsig * np.std(m_out[0, :, istk]),
sub=(self.params["QUBIC"]["nrec"], 3, k),
title=r"Input - $\nu$ = " + f"{nus[i]:.0f} GHz",
)
hp.mollview(
C(m_out[i, :, istk]),
cmap="jet",
min=-nsig * np.std(m_out[0, :, istk]),
max=nsig * np.std(m_out[0, :, istk]),
sub=(self.params["QUBIC"]["nrec"], 3, k + 1),
title=r"Output - $\nu$ = " + f"{nus[i]:.0f} GHz",
)
hp.mollview(
C(m_in[i, :, istk]) - C(m_out[i, :, istk]),
cmap="jet",
min=-nsig * np.std(m_out[0, :, istk]),
max=nsig * np.std(m_out[0, :, istk]),
sub=(self.params["QUBIC"]["nrec"], 3, k + 2),
)
k += 3
plt.savefig(f"allplots_{job_id}/frequency_maps_{self.stk[istk]}_moll.svg")
plt.close()
[docs]
class PlotsCMM:
"""
Instance to produce plots on the convergence.
Arguments :
===========
- jobid : Int number for saving figures.
- dogif : Bool to produce GIF.
"""
def __init__(self, preset, dogif=True):
self.preset = preset
self.job_id = self.preset.job_id
self.dogif = dogif
self.params = self.preset.tools.params
[docs]
def plot_sed(self, nus_in, A_in, nus_out, A_out, figsize=(8, 6), ki=0, gif=False):
"""
Plots the Spectral Energy Distribution (SED) and saves the plot as a svg file.
Parameters:
nus (array-like): Array of frequency values.
A (array-like): Array of amplitude values.
figsize (tuple, optional): Size of the figure. Defaults to (8, 6).
truth (array-like, optional): Array of true values for comparison. Defaults to None.
ki (int, optional): Iteration index for file naming. Defaults to 0.
Returns:
None
"""
if not self.params["Plots"]["conv_beta"]:
return
nf_in, nc = A_in.shape
nf_out, nc_out = A_out.shape
assert nc == nc_out, "Inconsistent number of components between A_in and A_out"
fsub = nf_in // nf_out
# Labels
fg = self.preset.tools.params["Foregrounds"]
labels = []
if fg["Dust"]["Dust_out"]:
labels.append(f"Dust ({fg['Dust']['model']})")
if fg["Synchrotron"]["Synchrotron_out"]:
labels.append(f"Sync ({fg['Synchrotron']['model']})")
plt.figure(figsize=figsize)
plt.title(f"Mixing matrix fit (blind) – iteration #{ki + 1}")
colors = plt.cm.tab10.colors # color palette for components
# True continuous SEDs
for ic in range(nc):
plt.plot(
nus_in,
A_in[:, ic],
color=colors[ic % 10],
alpha=0.7,
lw=2,
label=f"Model: {labels[ic]}",
)
# True binned values
for ic in range(nc):
means = np.array([A_in[inu * fsub : (inu + 1) * fsub, ic].mean() for inu in range(nf_out)])
plt.scatter(
nus_out,
means,
color=colors[ic % 10],
marker="o",
s=60,
label=f"True binned: {labels[ic]}",
zorder=3,
)
# Fitted values (cross on top)
for ic in range(nc):
plt.scatter(
nus_out,
A_out[:, ic],
color=colors[ic % 10],
marker="x",
s=80,
lw=2,
label=f"Fitted: {labels[ic]}",
zorder=4,
)
# Axes & scaling
plt.xlabel("Frequency [GHz]")
plt.ylabel("Mixing matrix element")
plt.xlim(120, 260)
plt.yscale("log")
Amin = np.min(A_in[A_in > 0])
Amax = np.max(A_in)
plt.ylim(Amin * 0.5, Amax * 2.0)
plt.legend()
plt.tight_layout()
out = f"CMM/{self.preset.tools.params['foldername']}/Plots/A_iter/A_iter{ki + 1}.svg"
plt.savefig(out)
# Cleanup previous iteration
if self.preset.tools.rank == 0 and ki > 0 and not gif:
prev = f"CMM/{self.preset.tools.params['foldername']}/Plots/A_iter/A_iter{ki}.svg"
if os.path.exists(prev):
os.remove(prev)
plt.close()
[docs]
def plot_beta_iteration(self, beta, figsize=(8, 6), truth=None, ki=0, errors=None):
if not self.params["Plots"]["conv_beta"]:
return
beta = np.asarray(beta)
niter = beta.shape[0]
it = np.arange(niter)
fig, (ax_top, ax_bot) = plt.subplots(2, 1, figsize=figsize, sharex=True, gridspec_kw={"height_ratios": [3, 1]})
# Colors
cmap_comp = plt.get_cmap("tab10")
ncomp = beta.shape[1] if beta.ndim >= 2 else 1
colors_comp = cmap_comp.colors[:ncomp]
# TOP : beta evolution
if beta.ndim == 1:
ax_top.plot(it, beta, lw=2, color=colors_comp[0], label=r"$\beta$")
if errors is not None:
ax_top.fill_between(it, beta - errors, beta + errors, color=colors_comp[0], alpha=0.25, label=r"$1\sigma$")
if truth is not None:
ax_top.axhline(truth, ls="--", color="k", lw=1.5, label="Truth")
elif beta.ndim == 3 and beta.shape[-1] == 1:
for ic in range(beta.shape[1]):
name = self.preset.comp.components_name_out[ic + 1]
ax_top.plot(it, beta[:, ic], color=colors_comp[ic], lw=2, label=rf"$\beta_{{{name}}}$")
if truth is not None:
for ic, val in enumerate(truth):
ax_top.axhline(val, ls="--", color=colors_comp[ic], alpha=0.6)
else:
# (Niter, Ncomp, Nbeta)
ncomp, nbeta = beta.shape[1], beta.shape[2]
cmap_beta = cm.get_cmap("jet", nbeta)
colors_beta = [cmap_beta(i) for i in range(nbeta)]
for ic in range(ncomp):
name = self.preset.comp.components_name_out[ic + 1]
for ib in range(nbeta):
ax_top.plot(
it,
beta[:, ic, ib],
color=colors_beta[ib],
lw=1.7,
alpha=0.8,
)
# one legend entry per component
ax_top.plot([], [], color=colors_comp[ic], lw=2, label=rf"$\beta_{{{name}}}$")
# truth
if truth is not None:
truth = np.asarray(truth)
for ic in range(ncomp):
for ib in range(nbeta):
ax_top.axhline(truth[ic, ib], ls="--", color=colors_beta[ib], alpha=0.4)
# colorbar for beta index
norm = mcolors.Normalize(vmin=0, vmax=nbeta - 1)
sm = cm.ScalarMappable(norm=norm, cmap=cmap_beta)
sm.set_array([])
cbar = fig.colorbar(sm, ax=ax_top, pad=0.02, aspect=30)
cbar.set_label(r"Index $\beta$")
ax_top.set_ylabel(r"$\beta$")
ax_top.legend(ncol=2, fontsize=9, frameon=False)
ax_top.grid(alpha=0.3)
# BOTTOM : convergence
if truth is not None:
if beta.ndim == 1:
ax_bot.plot(it, np.abs(beta - truth), color=colors_comp[0], lw=2)
elif beta.ndim == 3 and beta.shape[-1] == 1:
for ic in range(beta.shape[1]):
ax_bot.plot(it, np.abs(beta[:, ic] - truth[ic]), color=colors_comp[ic], lw=2, alpha=0.8)
else:
for ic in range(beta.shape[1]):
for ib in range(beta.shape[2]):
ax_bot.plot(it, np.abs(beta[:, ic, ib] - truth[ic, ib]), color=colors_beta[ib], lw=1.3, alpha=0.35)
ax_bot.set_yscale("log")
ax_bot.set_xlabel("Iteration")
ax_bot.set_ylabel(r"$|\beta - \beta_{\mathrm{true}}|$")
ax_bot.grid(alpha=0.3)
# Save
fname = f"CMM/{self.preset.tools.params['foldername']}/Plots/A_iter/beta_iter{ki + 1}.svg"
fig.tight_layout()
fig.savefig(fname)
if ki > 0:
old = f"CMM/{self.preset.tools.params['foldername']}/Plots/A_iter/beta_iter{ki}.svg"
if os.path.exists(old):
os.remove(old)
plt.close(fig)
def _display_allresiduals(self, map_i, seenpix, ki=0):
"""
Display all components of the Healpix map with Gaussian convolution.
Parameters:
seenpix (array-like): Boolean array indicating the pixels that are seen.
figsize (tuple): Size of the figure to be plotted. Default is (14, 10).
ki (int): Iteration index for saving the figure. Default is 0.
This function generates and saves a figure showing the output maps and residuals
for each component and Stokes parameter (I, Q, U). The maps are convolved using
a Gaussian operator and displayed using Healpix's gnomview function.
"""
Nmaps, _, Nstk = map_i.shape
stk = ["I", "Q", "U"]
if self.params["Plots"]["maps"]:
plt.figure(figsize=(10, 5 * Nmaps))
k = 0
r = self.preset.A(map_i) - self.preset.b
map_res = np.ones(self.preset.comp.components_iter.shape) * hp.UNSEEN
map_res[:, seenpix, :] = r
for istk in range(Nstk):
for icomp in range(len(self.preset.comp.components_name_out)):
reso = 15
nsig = 3
hp.gnomview(
map_res[icomp, :, istk],
rot=self.preset.sky.center,
reso=reso,
notext=True,
title=f"{self.preset.comp.components_name_out[icomp]} - {stk[istk]} - r = A x - b",
cmap="jet",
sub=(Nstk, len(self.preset.comp.components_out), k + 1),
min=-nsig * np.std(r[icomp, :, istk]),
max=nsig * np.std(r[icomp, :, istk]),
)
k += 1
plt.tight_layout()
plt.savefig("CMM/" + self.preset.tools.params["foldername"] + f"/Plots/allcomps/allres_iter{ki + 1}.svg")
plt.close()
def _display_allcomponents(self, input_maps, reconstructed_maps, ki=0, gif=True, reso=15):
"""
Display all components of the Healpix map with Gaussian convolution.
Parameters:
seenpix (array-like): Boolean array indicating the pixels that are seen.
figsize (tuple): Size of the figure to be plotted. Default is (14, 10).
ki (int): Iteration index for saving the figure. Default is 0.
This function generates and saves a figure showing the output maps and residuals
for each component and Stokes parameter (I, Q, U). The maps are convolved using
a Gaussian operator and displayed using Healpix's gnomview function.
"""
stk = ["I", "Q", "U"]
if self.params["Plots"]["maps"]:
maps_in = input_maps
maps_rec = reconstructed_maps
maps_res = maps_rec - maps_in
Nmaps, _, Nstk = maps_res.shape
k = 0
plt.figure(figsize=(5 * Nmaps, 10))
for istk in range(Nstk):
for icomp in range(len(self.preset.comp.components_name_out)):
hp.gnomview(
maps_rec[icomp, :, istk],
rot=self.preset.sky.center,
reso=reso,
notext=True,
title=f"{self.preset.comp.components_name_out[icomp]} - {stk[istk]} - Output",
cmap="jet",
sub=(Nstk, len(self.preset.comp.components_out) * 2, k + 1),
)
k += 1
hp.gnomview(
maps_res[icomp, :, istk],
rot=self.preset.sky.center,
reso=reso,
notext=True,
title=f"{self.preset.comp.components_name_out[icomp]} - {stk[istk]} - Residual",
cmap="jet",
sub=(Nstk, len(self.preset.comp.components_out) * 2, k + 1),
)
k += 1
plt.tight_layout()
plt.savefig("CMM/" + self.preset.tools.params["foldername"] + f"/Plots/allcomps/allcomps_iter{ki + 1}.svg")
if self.preset.tools.rank == 0:
if ki > 0 and gif is False:
os.remove("CMM/" + self.preset.tools.params["foldername"] + f"/Plots/allcomps/allcomps_iter{ki}.svg")
plt.close()
[docs]
def display_maps(self, input_maps, reconstructed_maps, seenpix, ki=0, reso=15, view="gnomview"):
"""
Display input / output / residual maps at a given iteration.
"""
if not self.params["Plots"]["maps"]:
return
stk = ["I", "Q", "U"]
rms_i = np.zeros((1, 2))
maps_in = input_maps
maps_rec = reconstructed_maps
maps_res = maps_rec - maps_in
ncomp = len(self.preset.comp.components_name_out)
for istk, s in enumerate(stk):
plt.figure(figsize=(6 * ncomp, 12))
k = 0
for icomp in range(ncomp):
if icomp == 0 and istk > 0:
rms_i[0, istk - 1] = np.std(maps_res[icomp, seenpix, istk])
if view == "gnomview":
hp.gnomview(
maps_in[icomp, :, istk],
rot=self.preset.sky.center,
reso=reso,
notext=True,
title=f"Input {s}",
cmap="jet",
sub=(ncomp, 3, k + 1),
)
hp.gnomview(
maps_rec[icomp, :, istk],
rot=self.preset.sky.center,
reso=reso,
notext=True,
title=f"Output {s}",
cmap="jet",
sub=(ncomp, 3, k + 2),
)
hp.gnomview(
maps_res[icomp, :, istk],
rot=self.preset.sky.center,
reso=reso,
notext=True,
title=f"Residual {s} - Std = {np.std(maps_res[icomp, seenpix, istk]):.3e}",
cmap="jet",
sub=(ncomp, 3, k + 3),
)
elif view == "mollview":
hp.mollview(
maps_in[icomp, :, istk],
notext=True,
title=f"Input {s}",
cmap="jet",
sub=(ncomp, 3, k + 1),
)
hp.mollview(
maps_rec[icomp, :, istk],
notext=True,
title=f"Output {s}",
cmap="jet",
sub=(ncomp, 3, k + 2),
)
hp.mollview(
maps_res[icomp, :, istk],
notext=True,
title=f"Residual {s} - Std = {np.std(maps_res[icomp, seenpix, istk]):.3e}",
cmap="jet",
sub=(ncomp, 3, k + 3),
)
k += 3
plt.tight_layout()
out = f"CMM/{self.preset.tools.params['foldername']}/Plots/{s}/maps_iter{ki + 1}.svg"
plt.savefig(out)
if self.preset.tools.rank == 0 and ki > 0:
prev = f"CMM/{self.preset.tools.params['foldername']}/Plots/{s}/maps_iter{ki}.svg"
if os.path.exists(prev):
os.remove(prev)
plt.close()
self.preset.acquisition.rms_plot = np.concatenate((self.preset.acquisition.rms_plot, rms_i), axis=0)
[docs]
def plot_gain_iteration(self, gain, figsize=(8, 6), ki=0):
"""
Method to plot convergence of reconstructed gains.
Arguments :
---
- gain : Array containing gain number (1 per detectors). It has the shape (Niteration, Ndet, 2) for Two Bands design and (Niteration, Ndet) for Wide Band design
- alpha : Transparency for curves.
- figsize : Tuple to control size of plots.
"""
if self.params["Plots"]["conv_gain"]:
plt.figure(figsize=figsize)
# plt.hist(gain[:, i, j])
if self.preset.qubic.params_qubic["type"] == "two":
color = ["red", "blue"]
for j in range(2):
plt.hist(gain[-1, :, j], bins=20, color=color[j])
# plt.plot(alliter-1, np.mean(gain, axis=1)[:, j], color[j], alpha=1)
# for i in range(ndet):
# plt.plot(alliter-1, gain[:, i, j], color[j], alpha=alpha)
# elif self.preset.qubic.params_qubic['type'] == 'wide':
# color = ['--g']
# plt.plot(alliter-1, np.mean(gain, axis=1), color[0], alpha=1)
# for i in range(ndet):
# plt.plot(alliter-1, gain[:, i], color[0], alpha=alpha)
# plt.yscale('log')
# plt.ylabel(r'|$g_{reconstructed} - g_{input}$|', fontsize=12)
# plt.xlabel('Iterations', fontsize=12)
plt.xlim(-0.1, 0.1)
plt.ylim(0, 100)
plt.axvline(0, ls="--", color="black")
plt.savefig("CMM/" + self.preset.tools.params["foldername"] + f"/Plots/A_iter/gain_iter{ki + 1}.svg")
if self.preset.tools.rank == 0:
if ki > 0:
os.remove("CMM/" + self.preset.tools.params["foldername"] + f"/Plots/A_iter/gain_iter{ki}.svg")
plt.close()
[docs]
def plot_rms_iteration(self, rms, figsize=(8, 6), ki=0):
if not self.params["Plots"]["conv_rms"]:
return
plt.figure(figsize=figsize)
plt.plot(rms[1:, 0], "-b", label="Q")
plt.plot(rms[1:, 1], "-r", label="U")
plt.yscale("log")
plt.xlabel("Iteration")
plt.ylabel("RMS")
plt.title("RMS evolution")
plt.legend()
plt.tight_layout()
out = f"CMM/{self.preset.tools.params['foldername']}/Plots/rms_iter{ki + 1}.svg"
plt.savefig(out)
if self.preset.tools.rank == 0 and ki > 0:
prev = f"CMM/{self.preset.tools.params['foldername']}/Plots/rms_iter{ki}.svg"
if os.path.exists(prev):
os.remove(prev)
plt.close()