Source code for lib.MapMaking.Qmap_plotter

import os

import healpy as hp
import matplotlib.cm as cm
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
import numpy as np
import yaml
from getdist import MCSamples, plots
from matplotlib.gridspec import GridSpec
from pysimulators.interfaces.healpy import HealpixConvolutionGaussianOperator


def _plot_reconstructed_maps(
    maps,
    m_in,
    seenpix,
    name_file,
    center,
    max,
    min,
    reso=15,
    iter=0,
):
    """

    Save a svg with the actual maps at iteration i. It assumes that maps is 3-dimensional

    """
    stk = ["I", "Q", "U"]

    Nmaps, _, Nstk = maps.shape
    res = maps - m_in

    k = 0
    plt.figure(figsize=(12, 3 * Nmaps))
    for imaps in range(Nmaps):
        maps[imaps, ~seenpix, :] = hp.UNSEEN
        res[imaps, ~seenpix, :] = hp.UNSEEN
        for istk in range(Nstk):
            hp.gnomview(
                maps[imaps, :, istk],
                rot=center,
                reso=reso,
                cmap="jet",
                sub=(Nmaps, Nstk * 2, k + 1),
                notext=True,
                max=max[imaps, istk],
                min=min[imaps, istk],
                title=f"Output - {stk[istk]}",
            )
            hp.gnomview(
                res[imaps, :, istk],
                rot=center,
                reso=reso,
                cmap="jet",
                sub=(Nmaps, Nstk * 2, k + 2),
                notext=True,
                max=max[imaps, istk],
                min=min[imaps, istk],
                title=f"Residuals - {stk[istk]}",
            )
            k += 2
    plt.suptitle(f"Iteration #{iter}", fontsize=15)
    plt.tight_layout()
    plt.savefig(name_file)
    plt.close()


[docs] def Dl2Cl(ell, Dl): """ Convert angular power spectrum from D_ell to C_ell. Formula --- C_ell = D_ell * 2*pi / (ell * (ell + 1)) Parameters -- ell : array_like Multipole moments (1D). Values must be > 0 (division by zero otherwise). Dl : array_like D_ell values. Can be scalar, 1D (same length as `ell`) or broadcastable to `ell`. Typical shape in this module: (n_nus, n_nus, n_bins). Returns --- ndarray C_ell values with the same shape as the broadcast result of `Dl` and `ell`. Notes - - Uses elementwise broadcasting; ensure `ell` and `Dl` shapes are compatible. - No guard is performed for ell == 0; caller must avoid or mask such entries. """ return Dl * 2 * np.pi / (ell * (ell + 1))
[docs] def Cl2BK(ell, Cl): """ Convert C_ell to the bandpower-like quantity 100 * ell * C_ell / (2*pi). Formula --- result = 100 * ell * C_ell / (2*pi) Parameters -- ell : array_like Multipole moments (1D). Should match or broadcast with `Cl`. Cl : array_like C_ell values. Can be scalar, 1D or broadcastable to `ell`. Typical shape in this module: (n_nus, n_nus, n_bins). Returns --- ndarray Transformed bandpower values with shape equal to the broadcasted inputs. """ return 100 * ell * Cl / (2 * np.pi)
[docs] def plot_cross_spectrum(nus, ell, Dl, Dl_err, ymodel, Dl2=None, Dl2_err=None, label_model="CMB + Dust", nbins=None, nrec=2, mode="Dl", figsize=None, title=None, name=None, dpi=300): """ Plot the upper-triangle matrix of cross-angular power spectra D_ell (and optional model). The function arranges a len(nus) x len(nus) grid and fills only the upper triangle (including diagonal) with small subplots labelled by the frequency pair `nus[i] x nus[j]`. It draws data errorbars, an optional second series (Dl - noise if `Dl_noise` provided), and a model line (from `ymodel`) either in D_ell units or transformed to the "100 * ell * C_ell / (2*pi)" units depending on `mode`. Parameters -- nus : array_like 1D array of frequency identifiers (used for subplot annotations). Length = n_nus. ell : array_like 1D array of multipole moments. Length >= nbins (if nbins provided). Must be > 0 to avoid division-by-zero in conversions. Dl : ndarray Data D_ell values, expected shape (n_nus, n_nus, n_ell) or broadcastable to that. Dl_noise : ndarray or None Errors on Dl with same shape as `Dl` (or broadcastable). If provided, an additional series `Dl - Dl_noise` will be plotted where applicable. Errors are absolute-valued (the function applies np.abs). ymodel : ndarray or None Model values for plotting. Expected shape (n_nus, n_nus, n_ell) (or broadcastable). If None, no model line is drawn. label_model : str, optional Legend label for the model line (default: "CMB + Dust"). nbins : int or None, optional Number of ell bins to plot. If None (default) uses len(ell). nrec : int, optional Number of "recon" channels used to choose subplot background color and styling. Default is 2. mode : {"Dl", ...}, optional If "Dl" the data are plotted in D_ell units. Otherwise the model/data are transformed via `_Dl2Cl` and `_Cl2BK` before plotting (matching original behaviour). ft_nus : int, optional Font size for the subplot frequency annotations (default: 10). figsize : tuple, optional Matplotlib figure size (default: (10, 8)). title : str or None, optional Suptitle appended to the fixed prefix "Angular Cross-Power Spectra". If None, only the prefix is used. name : str or None, optional If provided, the figure is saved to this filename as a PDF. Side effects - Creates a matplotlib figure, shows it with plt.show() and optionally saves it. - Does not return the figure (returns None). If you need the figure object, modify the function to `return fig` after creation. Notes - - The function preserves exact plotting order, labels and colours of the original code. - The caller must ensure shapes of `nus`, `ell`, `Dl`, `Dl_noise`, and `ymodel` are compatible. """ n = len(nus) if figsize is None: figsize = (2.2 * n, 2.2 * n) # Dynamic font scaling based on figure height # Guard against n <= 0 and large figsize values. Compute a scale factor # that decreases with number of subplots and increases with figure height. count_factor = 1.0 / np.sqrt(max(1, n)) scale_factor = (float(figsize[1]) / 8.0) * count_factor # Clamp sizes to reasonable readable bounds so very large figures (e.g. # figsize=(25,25)) don't produce oversized fonts. Ranges chosen to keep # ticks and small labels readable while preventing overlap. # ft_axis: tick label size (6..16) # ft_nus: small subtitle/annotation size (8..14) # ft_title: suptitle size (12..28) ft_axis = int(np.clip(13.0 * scale_factor, 6, 16)) ft_nus = int(np.clip(10.0 * scale_factor, 8, 14)) ft_title = int(np.clip(32.0 * np.sqrt(max(scale_factor, 0.1)), 12, 28)) # defaults & preproc if nbins is None: nbins = len(ell) # keep absolute-valued errors as in original Dl_err = np.abs(Dl_err) if Dl_err is not None else None ell_sel = ell[:nbins] fig = plt.figure(figsize=figsize, dpi=dpi) gs = GridSpec(len(nus), len(nus), figure=fig) kp = 0 # subplot counter (used to control first-plot labelling) def _model_plot(ax, i, j, kp): """Plot model lines respecting the original oddity: - if kp == 0: plot *only* the mode-specific model (with label) - else: if ymodel exists, plot both Dl model and transformed Cl model (no label) """ if ymodel is None: return y = ymodel[i, j, :nbins] if kp == 0: if mode == "Dl": ax.plot(ell_sel, y, "--r", label=label_model) else: ax.plot(ell_sel, Cl2BK(ell_sel, Dl2Cl(ell_sel, y)), "--r", label=label_model) else: if mode == "Dl": ax.plot(ell_sel, y, "--r") else: ax.plot(ell_sel, Cl2BK(ell_sel, Dl2Cl(ell_sel, y)), "--r") def _plot_errorbars(ax, i, j, color_main, color_second=None, label_main=None, label_second=None): """Add errorbars in either 'Dl' or transformed mode.""" main = Dl[i, j, :nbins] main_err = Dl_err[i, j, :nbins] if Dl_err is not None else None # plot main series if mode == "Dl": ax.errorbar(ell_sel, main, yerr=main_err, capsize=5, color=color_main, fmt="-o", label=label_main) else: y_main = Cl2BK(ell_sel, Dl2Cl(ell_sel, main)) yerr_main = Cl2BK(ell_sel, Dl2Cl(ell_sel, main_err)) if main_err is not None else None ax.errorbar(ell_sel, y_main, yerr=yerr_main, capsize=5, color=color_main, fmt="-o", label=label_main) # optional second series (Dl - noise) if Dl2 is not None: sec = Dl2[i, j, :nbins] sec_err = Dl2_err[i, j, :nbins] if Dl2_err is not None else None if mode == "Dl": ax.errorbar(ell_sel, sec, yerr=sec_err, capsize=5, color=color_second or "orange", fmt="-o", label=label_second) else: y_sec = Cl2BK(ell_sel, Dl2Cl(ell_sel, sec)) yerr_sec = Cl2BK(ell_sel, Dl2Cl(ell_sel, sec_err)) if sec_err is not None else None ax.errorbar(ell_sel, y_sec, yerr=yerr_sec, capsize=5, color=color_second or "orange", fmt="-o", label=label_second) # iterate over upper triangle (including diagonal) for i in range(len(nus)): for j in range(i, len(nus)): ax = fig.add_subplot(gs[i, j]) # labels (preserve font sizes) if i == j: ax.set_xlabel(r"$\ell$", fontsize=2 * ft_axis) if mode == "Dl": ax.set_ylabel(r"$\mathcal{D}_{\ell}$", fontsize=2 * ft_axis) else: ax.set_ylabel(r"100 $ \frac{\ell \mathcal{C}_{\ell}}{2 \pi}$", fontsize=2 * ft_axis) # set tick label rotation and size per-axis so small/large figures # and varying `n` use the computed `ft_axis` consistently ax.tick_params(axis="x", labelrotation=30, labelsize=ft_axis) ax.tick_params(axis="y", labelrotation=-45, labelsize=ft_axis) # model plotting (keeps original behavior/oddity) _model_plot(ax, i, j, kp) ax.patch.set_alpha(0.2) ax.annotate(f"{nus[i]}x{nus[j]}", xy=(0.1, 0.9), fontsize=ft_nus, xycoords="axes fraction", color="black", weight="bold") # facecolor + plotting choices exactly as original if i < nrec and j < nrec: ax.set_facecolor("blue") if kp == 0: _plot_errorbars( ax, i, j, color_main="darkblue", color_second="orange", label_main=r"$\mathcal{D}_{\ell}^{\nu_1 \times \nu_2}$", label_second=r"$\mathcal{D}_{\ell}^{\nu_1 \times \nu_2} - \mathcal{N}_{\ell}^{\nu_1 \times \nu_2}$", ) else: _plot_errorbars( ax, i, j, color_main="darkblue", color_second="orange", ) elif i < nrec and j >= nrec: ax.set_facecolor("skyblue") _plot_errorbars(ax, i, j, color_main="blue", color_second="orange") else: ax.set_facecolor("green") if kp == 0: _plot_errorbars( ax, i, j, color_main="blue", color_second="orange", label_main=r"$\mathcal{D}_{\ell}^{\nu_1 \times \nu_2}$", label_second=r"$\mathcal{D}_{\ell}^{\nu_1 \times \nu_2} - \mathcal{N}_{\ell}^{\nu_1 \times \nu_2}$", ) else: _plot_errorbars(ax, i, j, color_main="blue", color_second="orange") kp += 1 # title / legend / save / show if title is not None: title = "Angular Cross-Power Spectra" + title else: title = "Angular Cross-Power Spectra" fig.suptitle(title, fontsize=ft_title, fontweight="bold", y=0.96) fig.subplots_adjust(left=0.05, right=0.98, bottom=0.05, top=0.9, wspace=0.25, hspace=0.30) fig.legend(fontsize=ft_title, bbox_to_anchor=(0.4, 0.4)) if name is not None: plt.savefig(name)
[docs] class Plots: """ Instance for plotting results of Monte-Carlo Markov Chain (i.e emcee). """ def __init__(self): pass def _make_samples(self, chain, names, labels): self.sample = MCSamples(samples=chain, names=names, labels=labels)
[docs] def make_list_free_parameter(self): """ Make few list : - fp : list of value of free parameters - fp_name : list of name for each values - fp_latex : list of name in LateX for each values """ fp = [] fp_name = [] fp_latex = [] k = 0 for iname, name in enumerate(self.params["Sky"].keys()): try: # print('yes') for jname, n in enumerate(self.params["Sky"][name]): if type(self.params["Sky"][name][n]) is list: # print(self.params['Sky'][name][n], n) if self.params["Sky"][name][n][1] == "f": fp += [self.params["Sky"][name][n][0]] fp_latex += [self.params["Sky"][name][n][2]] fp_name += [list(self.params["Sky"][name].keys())[k]] k += 1 k = 0 except Exception: pass return fp, fp_name, fp_latex
def _set_marker(self, values): """ Define the markers to see the input values in GetDist plot. """ dict = {} for ii, i in enumerate(values): # print(self.names[ii], values[ii]) dict[self.names[ii]] = values[ii] if self.params["Sampler"]["markers"] is False: dict = None return dict
[docs] def get_convergence(self, chain, job_id): """ chain assumed to be not flat with shape (nsamples, nwalkers, nparams) """ with open("params.yml", "r") as stream: self.params = yaml.safe_load(stream) self.values, self.names, self.labels = self.make_list_free_parameter() plt.figure(figsize=(4, 4)) for i in range(chain.shape[2]): plt.subplot(chain.shape[2], 1, i + 1) plt.plot(chain[:, :, i], "-b", alpha=0.2) plt.plot(np.mean(chain[:, :, i], axis=1), "-r", alpha=1) plt.axhline(self.values[i], ls="--", color="black") plt.ylabel(self.names[i], fontsize=12) plt.xlabel("Iterations", fontsize=12) plt.savefig(f"allplots_{job_id}/Convergence_chain.svg") plt.close()
[docs] def get_triangle(self, chain, names, labels, job_id): """ Make triangle plot of each estimated parameters """ with open("params.yml", "r") as stream: self.params = yaml.safe_load(stream) self.values, self.names, self.labels = self.make_list_free_parameter() self.marker = self._set_marker(self.values) print(self.marker) self._make_samples(chain, names, labels) plt.figure(figsize=(8, 8)) # Triangle plot g = plots.get_subplot_plotter() g.triangle_plot( [self.sample], filled=True, markers=self.marker, title_limit=self.params["Sampler"]["title_limit"], ) # title_limit=1) plt.savefig(f"allplots_{job_id}/triangle_plot.svg") plt.close()
[docs] def get_Dl_plot(self, ell, Dl, Dl_noise, nus, job_id, figsize=(10, 10), model=None): plt.figure(figsize=figsize) k = 0 for i in range(len(nus)): for j in range(len(nus)): plt.subplot(len(nus), len(nus), k + 1) plt.errorbar(ell, Dl[k], yerr=Dl_noise[k], fmt="or") if model is not None: plt.errorbar(ell, model[k], fmt="-k") plt.title(f"{nus[i]:.0f}x{nus[j]:.0f}") # plt.yscale('log') k += 1 plt.tight_layout() plt.savefig(f"allplots_{job_id}/Dl_plot.svg") plt.close()
[docs] class PlotsFMM: def __init__(self, seenpix): self.stk = ["I", "Q", "U"] self.seenpix = seenpix
[docs] def plot_frequency_maps(self, m_in, m_out, center, nus, reso=15, nsig=3, filename=None): Nf, _, Nstk = m_in.shape res = m_out - m_in plt.figure(figsize=(15, 3.3 * Nf)) k = 1 for inu in range(Nf): for istk in range(Nstk): max_out = np.max(np.abs(m_out[inu, self.seenpix, istk])) max_res = np.max(np.abs(res[inu, self.seenpix, istk])) hp.gnomview( m_out[inu, :, istk], rot=center, reso=reso, cmap="jet", min=-max_out, max=max_out, sub=(Nf, 6, k), title=f"{nus[inu]:.1f} GHz - Output {self.stk[istk]}", notext=True, ) hp.gnomview( res[inu, :, istk], rot=center, reso=reso, cmap="jet", min=-max_res, max=max_res, sub=(Nf, 6, k + 1), title=f"{nus[inu]:.1f} GHz - Residuals {self.stk[istk]}", notext=True, ) k += 2 if filename is not None: plt.savefig(filename) plt.close()
[docs] def plot_FMM_old( self, m_in, m_out, center, seenpix, nus, job_id, figsize=(10, 8), istk=1, nsig=3, name="signal", ): m_in[:, ~seenpix, :] = hp.UNSEEN m_out[:, ~seenpix, :] = hp.UNSEEN plt.figure(figsize=figsize) k = 1 for i in range(self.params["QUBIC"]["nrec"]): hp.gnomview( m_in[i, :, istk], rot=center, reso=15, cmap="jet", min=-nsig * np.std(m_out[0, seenpix, istk]), max=nsig * np.std(m_out[0, seenpix, istk]), sub=(self.params["QUBIC"]["nrec"], 3, k), title=r"Input - $\nu$ = " + f"{nus[i]:.0f} GHz", ) hp.gnomview( m_out[i, :, istk], rot=center, reso=15, cmap="jet", min=-nsig * np.std(m_out[0, seenpix, istk]), max=nsig * np.std(m_out[0, seenpix, istk]), sub=(self.params["QUBIC"]["nrec"], 3, k + 1), title=r"Output - $\nu$ = " + f"{nus[i]:.0f} GHz", ) res = m_in[i, :, istk] - m_out[i, :, istk] res[~seenpix] = hp.UNSEEN hp.gnomview( res, rot=center, reso=15, cmap="jet", min=-nsig * np.std(m_out[0, seenpix, istk]), max=nsig * np.std(m_out[0, seenpix, istk]), sub=(self.params["QUBIC"]["nrec"], 3, k + 2), ) k += 3 plt.savefig(f"FMM/allplots_{job_id}/frequency_maps_{self.stk[istk]}_{name}.svg") plt.close()
[docs] def plot_FMM_mollview(self, m_in, m_out, nus, job_id, figsize=(10, 8), istk=1, nsig=3, fwhm=0): C = HealpixConvolutionGaussianOperator(fwhm=fwhm) plt.figure(figsize=figsize) k = 1 for i in range(self.params["QUBIC"]["nrec"]): hp.mollview( C(m_in[i, :, istk]), cmap="jet", min=-nsig * np.std(m_out[0, :, istk]), max=nsig * np.std(m_out[0, :, istk]), sub=(self.params["QUBIC"]["nrec"], 3, k), title=r"Input - $\nu$ = " + f"{nus[i]:.0f} GHz", ) hp.mollview( C(m_out[i, :, istk]), cmap="jet", min=-nsig * np.std(m_out[0, :, istk]), max=nsig * np.std(m_out[0, :, istk]), sub=(self.params["QUBIC"]["nrec"], 3, k + 1), title=r"Output - $\nu$ = " + f"{nus[i]:.0f} GHz", ) hp.mollview( C(m_in[i, :, istk]) - C(m_out[i, :, istk]), cmap="jet", min=-nsig * np.std(m_out[0, :, istk]), max=nsig * np.std(m_out[0, :, istk]), sub=(self.params["QUBIC"]["nrec"], 3, k + 2), ) k += 3 plt.savefig(f"allplots_{job_id}/frequency_maps_{self.stk[istk]}_moll.svg") plt.close()
[docs] class PlotsCMM: """ Instance to produce plots on the convergence. Arguments : =========== - jobid : Int number for saving figures. - dogif : Bool to produce GIF. """ def __init__(self, preset, dogif=True): self.preset = preset self.job_id = self.preset.job_id self.dogif = dogif self.params = self.preset.tools.params
[docs] def plot_sed(self, nus_in, A_in, nus_out, A_out, figsize=(8, 6), ki=0, gif=False): """ Plots the Spectral Energy Distribution (SED) and saves the plot as a svg file. Parameters: nus (array-like): Array of frequency values. A (array-like): Array of amplitude values. figsize (tuple, optional): Size of the figure. Defaults to (8, 6). truth (array-like, optional): Array of true values for comparison. Defaults to None. ki (int, optional): Iteration index for file naming. Defaults to 0. Returns: None """ if not self.params["Plots"]["conv_beta"]: return nf_in, nc = A_in.shape nf_out, nc_out = A_out.shape assert nc == nc_out, "Inconsistent number of components between A_in and A_out" fsub = nf_in // nf_out # Labels fg = self.preset.tools.params["Foregrounds"] labels = [] if fg["Dust"]["Dust_out"]: labels.append(f"Dust ({fg['Dust']['model']})") if fg["Synchrotron"]["Synchrotron_out"]: labels.append(f"Sync ({fg['Synchrotron']['model']})") plt.figure(figsize=figsize) plt.title(f"Mixing matrix fit (blind) – iteration #{ki + 1}") colors = plt.cm.tab10.colors # color palette for components # True continuous SEDs for ic in range(nc): plt.plot( nus_in, A_in[:, ic], color=colors[ic % 10], alpha=0.7, lw=2, label=f"Model: {labels[ic]}", ) # True binned values for ic in range(nc): means = np.array([A_in[inu * fsub : (inu + 1) * fsub, ic].mean() for inu in range(nf_out)]) plt.scatter( nus_out, means, color=colors[ic % 10], marker="o", s=60, label=f"True binned: {labels[ic]}", zorder=3, ) # Fitted values (cross on top) for ic in range(nc): plt.scatter( nus_out, A_out[:, ic], color=colors[ic % 10], marker="x", s=80, lw=2, label=f"Fitted: {labels[ic]}", zorder=4, ) # Axes & scaling plt.xlabel("Frequency [GHz]") plt.ylabel("Mixing matrix element") plt.xlim(120, 260) plt.yscale("log") Amin = np.min(A_in[A_in > 0]) Amax = np.max(A_in) plt.ylim(Amin * 0.5, Amax * 2.0) plt.legend() plt.tight_layout() out = f"CMM/{self.preset.tools.params['foldername']}/Plots/A_iter/A_iter{ki + 1}.svg" plt.savefig(out) # Cleanup previous iteration if self.preset.tools.rank == 0 and ki > 0 and not gif: prev = f"CMM/{self.preset.tools.params['foldername']}/Plots/A_iter/A_iter{ki}.svg" if os.path.exists(prev): os.remove(prev) plt.close()
[docs] def plot_beta_iteration(self, beta, figsize=(8, 6), truth=None, ki=0, errors=None): if not self.params["Plots"]["conv_beta"]: return beta = np.asarray(beta) niter = beta.shape[0] it = np.arange(niter) fig, (ax_top, ax_bot) = plt.subplots(2, 1, figsize=figsize, sharex=True, gridspec_kw={"height_ratios": [3, 1]}) # Colors cmap_comp = plt.get_cmap("tab10") ncomp = beta.shape[1] if beta.ndim >= 2 else 1 colors_comp = cmap_comp.colors[:ncomp] # TOP : beta evolution if beta.ndim == 1: ax_top.plot(it, beta, lw=2, color=colors_comp[0], label=r"$\beta$") if errors is not None: ax_top.fill_between(it, beta - errors, beta + errors, color=colors_comp[0], alpha=0.25, label=r"$1\sigma$") if truth is not None: ax_top.axhline(truth, ls="--", color="k", lw=1.5, label="Truth") elif beta.ndim == 3 and beta.shape[-1] == 1: for ic in range(beta.shape[1]): name = self.preset.comp.components_name_out[ic + 1] ax_top.plot(it, beta[:, ic], color=colors_comp[ic], lw=2, label=rf"$\beta_{{{name}}}$") if truth is not None: for ic, val in enumerate(truth): ax_top.axhline(val, ls="--", color=colors_comp[ic], alpha=0.6) else: # (Niter, Ncomp, Nbeta) ncomp, nbeta = beta.shape[1], beta.shape[2] cmap_beta = cm.get_cmap("jet", nbeta) colors_beta = [cmap_beta(i) for i in range(nbeta)] for ic in range(ncomp): name = self.preset.comp.components_name_out[ic + 1] for ib in range(nbeta): ax_top.plot( it, beta[:, ic, ib], color=colors_beta[ib], lw=1.7, alpha=0.8, ) # one legend entry per component ax_top.plot([], [], color=colors_comp[ic], lw=2, label=rf"$\beta_{{{name}}}$") # truth if truth is not None: truth = np.asarray(truth) for ic in range(ncomp): for ib in range(nbeta): ax_top.axhline(truth[ic, ib], ls="--", color=colors_beta[ib], alpha=0.4) # colorbar for beta index norm = mcolors.Normalize(vmin=0, vmax=nbeta - 1) sm = cm.ScalarMappable(norm=norm, cmap=cmap_beta) sm.set_array([]) cbar = fig.colorbar(sm, ax=ax_top, pad=0.02, aspect=30) cbar.set_label(r"Index $\beta$") ax_top.set_ylabel(r"$\beta$") ax_top.legend(ncol=2, fontsize=9, frameon=False) ax_top.grid(alpha=0.3) # BOTTOM : convergence if truth is not None: if beta.ndim == 1: ax_bot.plot(it, np.abs(beta - truth), color=colors_comp[0], lw=2) elif beta.ndim == 3 and beta.shape[-1] == 1: for ic in range(beta.shape[1]): ax_bot.plot(it, np.abs(beta[:, ic] - truth[ic]), color=colors_comp[ic], lw=2, alpha=0.8) else: for ic in range(beta.shape[1]): for ib in range(beta.shape[2]): ax_bot.plot(it, np.abs(beta[:, ic, ib] - truth[ic, ib]), color=colors_beta[ib], lw=1.3, alpha=0.35) ax_bot.set_yscale("log") ax_bot.set_xlabel("Iteration") ax_bot.set_ylabel(r"$|\beta - \beta_{\mathrm{true}}|$") ax_bot.grid(alpha=0.3) # Save fname = f"CMM/{self.preset.tools.params['foldername']}/Plots/A_iter/beta_iter{ki + 1}.svg" fig.tight_layout() fig.savefig(fname) if ki > 0: old = f"CMM/{self.preset.tools.params['foldername']}/Plots/A_iter/beta_iter{ki}.svg" if os.path.exists(old): os.remove(old) plt.close(fig)
def _display_allresiduals(self, map_i, seenpix, ki=0): """ Display all components of the Healpix map with Gaussian convolution. Parameters: seenpix (array-like): Boolean array indicating the pixels that are seen. figsize (tuple): Size of the figure to be plotted. Default is (14, 10). ki (int): Iteration index for saving the figure. Default is 0. This function generates and saves a figure showing the output maps and residuals for each component and Stokes parameter (I, Q, U). The maps are convolved using a Gaussian operator and displayed using Healpix's gnomview function. """ Nmaps, _, Nstk = map_i.shape stk = ["I", "Q", "U"] if self.params["Plots"]["maps"]: plt.figure(figsize=(10, 5 * Nmaps)) k = 0 r = self.preset.A(map_i) - self.preset.b map_res = np.ones(self.preset.comp.components_iter.shape) * hp.UNSEEN map_res[:, seenpix, :] = r for istk in range(Nstk): for icomp in range(len(self.preset.comp.components_name_out)): reso = 15 nsig = 3 hp.gnomview( map_res[icomp, :, istk], rot=self.preset.sky.center, reso=reso, notext=True, title=f"{self.preset.comp.components_name_out[icomp]} - {stk[istk]} - r = A x - b", cmap="jet", sub=(Nstk, len(self.preset.comp.components_out), k + 1), min=-nsig * np.std(r[icomp, :, istk]), max=nsig * np.std(r[icomp, :, istk]), ) k += 1 plt.tight_layout() plt.savefig("CMM/" + self.preset.tools.params["foldername"] + f"/Plots/allcomps/allres_iter{ki + 1}.svg") plt.close() def _display_allcomponents(self, input_maps, reconstructed_maps, ki=0, gif=True, reso=15): """ Display all components of the Healpix map with Gaussian convolution. Parameters: seenpix (array-like): Boolean array indicating the pixels that are seen. figsize (tuple): Size of the figure to be plotted. Default is (14, 10). ki (int): Iteration index for saving the figure. Default is 0. This function generates and saves a figure showing the output maps and residuals for each component and Stokes parameter (I, Q, U). The maps are convolved using a Gaussian operator and displayed using Healpix's gnomview function. """ stk = ["I", "Q", "U"] if self.params["Plots"]["maps"]: maps_in = input_maps maps_rec = reconstructed_maps maps_res = maps_rec - maps_in Nmaps, _, Nstk = maps_res.shape k = 0 plt.figure(figsize=(5 * Nmaps, 10)) for istk in range(Nstk): for icomp in range(len(self.preset.comp.components_name_out)): hp.gnomview( maps_rec[icomp, :, istk], rot=self.preset.sky.center, reso=reso, notext=True, title=f"{self.preset.comp.components_name_out[icomp]} - {stk[istk]} - Output", cmap="jet", sub=(Nstk, len(self.preset.comp.components_out) * 2, k + 1), ) k += 1 hp.gnomview( maps_res[icomp, :, istk], rot=self.preset.sky.center, reso=reso, notext=True, title=f"{self.preset.comp.components_name_out[icomp]} - {stk[istk]} - Residual", cmap="jet", sub=(Nstk, len(self.preset.comp.components_out) * 2, k + 1), ) k += 1 plt.tight_layout() plt.savefig("CMM/" + self.preset.tools.params["foldername"] + f"/Plots/allcomps/allcomps_iter{ki + 1}.svg") if self.preset.tools.rank == 0: if ki > 0 and gif is False: os.remove("CMM/" + self.preset.tools.params["foldername"] + f"/Plots/allcomps/allcomps_iter{ki}.svg") plt.close()
[docs] def display_maps(self, input_maps, reconstructed_maps, seenpix, ki=0, reso=15, view="gnomview"): """ Display input / output / residual maps at a given iteration. """ if not self.params["Plots"]["maps"]: return stk = ["I", "Q", "U"] rms_i = np.zeros((1, 2)) maps_in = input_maps maps_rec = reconstructed_maps maps_res = maps_rec - maps_in ncomp = len(self.preset.comp.components_name_out) for istk, s in enumerate(stk): plt.figure(figsize=(6 * ncomp, 12)) k = 0 for icomp in range(ncomp): if icomp == 0 and istk > 0: rms_i[0, istk - 1] = np.std(maps_res[icomp, seenpix, istk]) if view == "gnomview": hp.gnomview( maps_in[icomp, :, istk], rot=self.preset.sky.center, reso=reso, notext=True, title=f"Input {s}", cmap="jet", sub=(ncomp, 3, k + 1), ) hp.gnomview( maps_rec[icomp, :, istk], rot=self.preset.sky.center, reso=reso, notext=True, title=f"Output {s}", cmap="jet", sub=(ncomp, 3, k + 2), ) hp.gnomview( maps_res[icomp, :, istk], rot=self.preset.sky.center, reso=reso, notext=True, title=f"Residual {s} - Std = {np.std(maps_res[icomp, seenpix, istk]):.3e}", cmap="jet", sub=(ncomp, 3, k + 3), ) elif view == "mollview": hp.mollview( maps_in[icomp, :, istk], notext=True, title=f"Input {s}", cmap="jet", sub=(ncomp, 3, k + 1), ) hp.mollview( maps_rec[icomp, :, istk], notext=True, title=f"Output {s}", cmap="jet", sub=(ncomp, 3, k + 2), ) hp.mollview( maps_res[icomp, :, istk], notext=True, title=f"Residual {s} - Std = {np.std(maps_res[icomp, seenpix, istk]):.3e}", cmap="jet", sub=(ncomp, 3, k + 3), ) k += 3 plt.tight_layout() out = f"CMM/{self.preset.tools.params['foldername']}/Plots/{s}/maps_iter{ki + 1}.svg" plt.savefig(out) if self.preset.tools.rank == 0 and ki > 0: prev = f"CMM/{self.preset.tools.params['foldername']}/Plots/{s}/maps_iter{ki}.svg" if os.path.exists(prev): os.remove(prev) plt.close() self.preset.acquisition.rms_plot = np.concatenate((self.preset.acquisition.rms_plot, rms_i), axis=0)
[docs] def plot_gain_iteration(self, gain, figsize=(8, 6), ki=0): """ Method to plot convergence of reconstructed gains. Arguments : --- - gain : Array containing gain number (1 per detectors). It has the shape (Niteration, Ndet, 2) for Two Bands design and (Niteration, Ndet) for Wide Band design - alpha : Transparency for curves. - figsize : Tuple to control size of plots. """ if self.params["Plots"]["conv_gain"]: plt.figure(figsize=figsize) # plt.hist(gain[:, i, j]) if self.preset.qubic.params_qubic["type"] == "two": color = ["red", "blue"] for j in range(2): plt.hist(gain[-1, :, j], bins=20, color=color[j]) # plt.plot(alliter-1, np.mean(gain, axis=1)[:, j], color[j], alpha=1) # for i in range(ndet): # plt.plot(alliter-1, gain[:, i, j], color[j], alpha=alpha) # elif self.preset.qubic.params_qubic['type'] == 'wide': # color = ['--g'] # plt.plot(alliter-1, np.mean(gain, axis=1), color[0], alpha=1) # for i in range(ndet): # plt.plot(alliter-1, gain[:, i], color[0], alpha=alpha) # plt.yscale('log') # plt.ylabel(r'|$g_{reconstructed} - g_{input}$|', fontsize=12) # plt.xlabel('Iterations', fontsize=12) plt.xlim(-0.1, 0.1) plt.ylim(0, 100) plt.axvline(0, ls="--", color="black") plt.savefig("CMM/" + self.preset.tools.params["foldername"] + f"/Plots/A_iter/gain_iter{ki + 1}.svg") if self.preset.tools.rank == 0: if ki > 0: os.remove("CMM/" + self.preset.tools.params["foldername"] + f"/Plots/A_iter/gain_iter{ki}.svg") plt.close()
[docs] def plot_rms_iteration(self, rms, figsize=(8, 6), ki=0): if not self.params["Plots"]["conv_rms"]: return plt.figure(figsize=figsize) plt.plot(rms[1:, 0], "-b", label="Q") plt.plot(rms[1:, 1], "-r", label="U") plt.yscale("log") plt.xlabel("Iteration") plt.ylabel("RMS") plt.title("RMS evolution") plt.legend() plt.tight_layout() out = f"CMM/{self.preset.tools.params['foldername']}/Plots/rms_iter{ki + 1}.svg" plt.savefig(out) if self.preset.tools.rank == 0 and ki > 0: prev = f"CMM/{self.preset.tools.params['foldername']}/Plots/rms_iter{ki}.svg" if os.path.exists(prev): os.remove(prev) plt.close()