Source code for lib.Calibration.Qselfcal

from __future__ import division, print_function

import glob

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.optimize as sop
from mpl_toolkits.axes_grid1 import make_axes_locatable
from qubicpack.pixel_translation import make_id_focalplane, tes2index
from scipy.integrate import dblquad
from scipy.interpolate import RegularGridInterpolator

__all__ = ["Model_Fringes_QubicSoft", "Model_Fringes_Maynooth"]


# ========== Plot functions =============
def plot_horns(q, simple=False, ax=None, s=100, fontsize=14):
    if ax is None:
        fig, ax = plt.subplots()

    xhorns = q.horn.center[:, 0]
    yhorns = q.horn.center[:, 1]
    if simple:
        ax.plot(xhorns, yhorns, "ko")
    else:
        ax.scatter(xhorns[np.invert(q.horn.open)], yhorns[np.invert(q.horn.open)], c="k", s=s)
        ax.scatter(xhorns[q.horn.open], yhorns[q.horn.open], c="k", s=s, alpha=0.1)

    ax.set_xlabel("X_GRF [m]", fontsize=fontsize)
    ax.set_ylabel("Y_GRF [m]", fontsize=fontsize)
    ax.axis("square")
    return


def plot_baseline(q, bs, ax=None):
    if ax is None:
        fig, ax = plt.subplots()
    hcenters = q.horn.center[:, 0:2]
    ax.plot(hcenters[np.array(bs) - 1, 0], hcenters[np.array(bs) - 1, 1], lw=4, label=bs)
    return


def scatter_plot_FP(
    q,
    x,
    y,
    FP_signal,
    frame,
    fig=None,
    ax=None,
    s=None,
    title=None,
    unit="[W / Hz]",
    cbar=True,
    fontsize=14,
    **kwargs,
):
    """
    Make a scatter plot of the focal plane.
    Parameters
    ----------
    q: QubicInstrument()
    x: array
        X coordinates for the TES.
    y: array
        Y coordinates for the TES.
    FP_signal: array
        Signal to plot in 1D, should be ordered as x and y
    frame: str
        'GRF' or 'ONAFP', the frame used for x and y
    s: int
        Marker size on the plot
    title: str
        Plot title
    unit: str
        Unit of the signal to plot.
    kwargs: any kwarg for plt.scatter()

    """
    if fig is None:
        fig, ax = plt.subplots()
    if s is None:
        if q.config == "TD":
            s = (fig.get_figwidth() / 35 * fig.dpi) ** 2
        else:
            s = (fig.get_figwidth() / 70 * fig.dpi) ** 2
    img = ax.scatter(x, y, c=FP_signal, marker="s", s=s, **kwargs)
    if cbar:
        divider = make_axes_locatable(ax)
        cax = divider.append_axes("right", size="5%", pad=0.05)
        clb = fig.colorbar(img, cax=cax)
        clb.ax.set_title(unit)
    ax.set_xlabel(f"X_{frame} [m]", fontsize=fontsize)
    ax.set_ylabel(f"Y_{frame} [m]", fontsize=fontsize)
    ax.axis("square")
    ax.set_title(title, fontsize=fontsize)
    return


def pcolor_plot_FP(
    q, x, y, FP_signal, frame, title=None, fig=None, ax=None, cbar=True, unit="[W / Hz]", **kwargs
):
    """
    Make a pcolor plot of the focal plane.
    !!! x, y, FP_signal must be ordered as defined in q.detector.
    Parameters
    ----------
    q: QubicInstrument()
    x: array
        X coordinates for the TES.
    y: array
        Y coordinates for the TES.
    FP_signal: array
        Signal to plot in 1D, should be ordered as x and y
    frame: str
        'GRF' or 'ONAFP', the frame used for x and y
    title: str
        Plot title
    unit: str
        Unit of the signal to plot.
    kwargs: any kwarg for plt.pcolor()

    """
    if fig is None:
        fig, ax = plt.subplots()

    x2D = q.detector.unpack(x)
    y2D = q.detector.unpack(y)
    FP_signal2D = q.detector.unpack(FP_signal)

    img = ax.pcolor(x2D, y2D, FP_signal2D, shading="auto", **kwargs)
    if cbar:
        divider = make_axes_locatable(ax)
        cax = divider.append_axes("right", size="5%", pad=0.05)
        clb = fig.colorbar(img, cax=cax)
        clb.ax.set_title(unit)
    ax.set_xlabel(f"X_{frame} [m]", fontsize=14)
    ax.set_ylabel(f"Y_{frame} [m]", fontsize=14)
    ax.axis("square")
    ax.set_title(title, fontsize=14)
    return


def plot_horn_and_FP(q, x, y, FP_signal, frame, s=None, title=None, unit="[W / Hz]", **kwargs):
    """
    Plot the horn array in GRF and a scatter plot of the focal plane in GRF or ONAFP.
    See scatter_plot_FP()
    """
    fig, axs = plt.subplots(1, 2)
    ax0, ax1 = np.ravel(axs)
    fig.suptitle(title, fontsize=18)
    fig.subplots_adjust(wspace=0.3)

    plot_horns(q, ax=ax0)
    scatter_plot_FP(q, x, y, FP_signal, frame, s=s, fig=fig, ax=ax1, unit=unit, **kwargs)

    return


def plot_BLs_eq(allBLs, BLs_sort, q, simple=True, figsize=(12, 6)):
    """
    Plot the horn array and the observed baselines for each type (class of equivalence).
    Parameters
    ----------
    allBLs: list
        List containing all the baselines in the dataset.
    BLs_sort: list
        Indices of the baselines in the dataset for each type
    q: QubicInstrument
    """
    nclass_eq = len(BLs_sort)

    fig, axs = plt.subplots(1, nclass_eq, figsize=figsize)
    axs = np.ravel(axs)
    for i in range(nclass_eq):
        ax = axs[i]
        dataset_eq = BLs_sort[i]
        plot_horns(q, simple=simple, ax=ax)
        ax.set_title(f"Type {i}", fontsize=14)
        print(f"Type {i}:")
        for j in dataset_eq:
            print(f"  - {allBLs[j]}")
            plot_baseline(q, allBLs[j], ax=ax)
        ax.legend()
    return


# ========== Tool functions =============
def close_switches(q, switches):
    q.horn.open = True
    for i in switches:
        q.horn.open[i - 1] = False


def open_switches(q, switches):
    q.horn.open = False
    for i in switches:
        q.horn.open[i - 1] = True


def get_TEScoordinates_ONAFP(q):
    """
    Get coordinates of the TES, x and y for the centers
    and the 4 corners (vertex) in the ONAFP frame.
    """
    # TES centers in the ONAFP frame
    xGRF_TES = q.detector.center[:, 0]
    yGRF_TES = q.detector.center[:, 1]
    xONAFP_TES = -yGRF_TES
    yONAFP_TES = xGRF_TES

    # TES vertex in the ONAFP frame
    vGRF_TES = q.detector.vertex
    vONAFP_TES = vGRF_TES[..., [1, 0, 2]]
    vONAFP_TES[..., 0] *= -1

    return xONAFP_TES, yONAFP_TES, vONAFP_TES


def get_horn_coordinates_ONAFP(q):
    """
    Get coordinates of the horn center (x, y, z),
    in the ONAFP frame.
    """
    # Horn centers in the ONAFP frame
    center_GRF = q.horn.center
    center_ONAFP = np.zeros_like(center_GRF)
    center_ONAFP[:, 0] = -center_GRF[:, 1]  # xONAFP = -yGRF
    center_ONAFP[:, 1] = center_GRF[:, 0]  # yONAFP = xGRF
    center_ONAFP[:, 2] = center_GRF[:, 2]  # zONAFP = zGRF

    return center_ONAFP


def TES_Instru2coord(TES, ASIC, q, frame="ONAFP", verbose=True):
    """
    From (TES, ASIC) numbering on the instrument to (x,y) coordinates in ONAFP or GRF frame.
    Returns also the focal plane index.
    !!! If q is a TD instrument, only ASIC 1 and 2 are acceptable.
    Parameters
    ----------
    TES: TES number as defined on the instrument
    ASIC: ASIC number
    q: QubicInstrument()
    frame: str
        'GRF' or 'ONAFP' only

    Returns
    -------
    x, y: TES center coordinates.
    FP_index: Focal Plane index, as used in Qubic soft.
    index_q: position index of the FP_index in q.detector.index()

    """
    if TES in [4, 36, 68, 100]:
        raise ValueError("This is a thermometer !")
    FP_index = tes2index(TES, ASIC)
    if verbose:
        print("FP_index =", FP_index)

    index_q = np.where(q.detector.index == FP_index)[0][0]
    if verbose:
        print("Index_q =", index_q)

    centerGRF = q.detector.center[q.detector.index == FP_index][0]
    xGRF = centerGRF[0]
    yGRF = centerGRF[1]

    if frame not in ["GRF", "ONAFP"]:
        raise ValueError("The frame is not valid.")
    elif frame == "GRF":
        if verbose:
            print("X_GRF = {:.3f} mm, Y_GRF = {:.3f} mm".format(xGRF * 1e3, yGRF * 1e3))
        return xGRF, yGRF, FP_index, index_q
    elif frame == "ONAFP":
        xONAFP = -yGRF
        yONAFP = xGRF
        if verbose:
            print("X_ONAFP = {:.3f} mm, Y_ONAFP = {:.3f} mm".format(xONAFP * 1e3, yONAFP * 1e3))
        return xONAFP, yONAFP, FP_index, index_q


def get_TES_Instru_coords(q, frame="ONAFP", verbose=True):
    """
    Same as TES_Instru2coord() but loop on all TES.
    """
    thermos = [4, 36, 68, 100]
    if q.config == "TD":
        nASICS = 2
    else:
        nASICS = 8

    nTES = nASICS * 128
    x = np.zeros(nTES)
    y = np.zeros(nTES)
    FP_index = np.zeros(nTES, dtype=int)
    index_q = np.zeros(nTES, dtype=int)

    for ASIC in range(1, nASICS + 1):
        for TES in range(1, 129):
            if verbose:
                print(f"\n ASIC {ASIC} - TES {TES}")
            if TES not in thermos:
                i = (TES - 1) + 128 * (ASIC - 1)
                x[i], y[i], FP_index[i], index_q[i] = TES_Instru2coord(
                    TES, ASIC, q, frame=frame, verbose=verbose
                )
            else:
                if verbose:
                    print("Thermometer !")

    return x, y, FP_index, index_q


def index2tes(FPindex, FPidentity=None):
    """Return ASIC and TES numbers (instrument numbering)
    from the FPindex coming from q.detector.index"""
    if FPidentity is None:
        FPidentity = make_id_focalplane()
    TES = int(FPidentity.TES[FPidentity.index == FPindex])
    ASIC = int(FPidentity.ASIC[FPidentity.index == FPindex])
    return ASIC, TES


def get_all_tes_numbers(q):
    """Return a 2D array with ASIC and TES numbers ordered as in Qubic soft."""
    FPidentity = make_id_focalplane()
    ndet = q.detector.index.shape[0]
    tes = np.zeros((ndet, 2), dtype=int)
    for i in range(ndet):
        FPindex = q.detector.index[i]
        tes[i, 0], tes[i, 1] = index2tes(FPindex, FPidentity)
    return tes


def get_TESvertices_FromMaynoothFiles(rep, ndet=992):
    """
    Get TES vertices coordinates from Maynooth files.
    Not very useful because `q.detector.vertex` gives the same.
    Parameters
    ----------
    rep : str
        Path of the repository for the simulated files, can be download at :
        https://drive.google.com/open?id=19dPHw_CeuFZ068b-VRT7N-LWzOL1fmfG
    ndet : int
        Number of TES.
    """
    # Get a 2D array from the file
    vertices2D = pd.read_csv(rep + "/vertices.txt", sep="\ ", header=None, engine="python")

    # Make a 3D array of shape (992, 4, 2)
    vertices = np.zeros((ndet, 4, 2))
    for i in range(4):
        vertices[:, i, :] = vertices2D.iloc[i::4, :]
    return vertices


def make_position(xmin, xmax, reso, focal_length):
    """Position on the focal plane in the GRF frame."""
    xx, yy = np.meshgrid(np.linspace(xmin, xmax, reso), np.linspace(xmin, xmax, reso))
    x1d = np.ravel(xx)
    y1d = np.ravel(yy)
    z1d = x1d * 0 - focal_length
    position = np.array([x1d, y1d, z1d]).T

    return position


def give_bs_pars(q, bs, frame="GRF"):
    """Find orientation angle and length for a baseline."""

    # X, Y coordinates of the 2 horns in GRF or ONAFP.
    if frame == "ONAFP":
        hc = get_horn_coordinates_ONAFP(q)
        hc = hc[:, :2]
    elif frame == "GRF":
        hc = q.horn.center[:, :2]
    hc0 = hc[bs[0] - 1, :]
    hc1 = hc[bs[1] - 1, :]

    bsxy = hc1 - hc0
    theta = np.degrees(np.arctan2(bsxy[1], bsxy[0]))
    length = np.sqrt(np.sum(bsxy**2))
    xycenter = (hc0 + hc1) / 2.0
    return theta, length, xycenter


def check_equiv(vecbs1, vecbs2, tol=1e-5):
    """Check if 2 baselines are equivalent."""
    norm1 = np.dot(vecbs1, vecbs1)
    norm2 = np.dot(vecbs2, vecbs2)
    cross12 = np.cross(vecbs1, vecbs2)
    if (np.abs(norm1 - norm2) < tol) & (np.abs(cross12) < tol):
        return True
    else:
        return False


def find_equivalent_baselines(all_bs, q):
    """
    Find the equivalent baselines in a list.
    Parameters
    ----------
    all_bs: list
        List of baselines.
    q: QubicInstrument

    Returns
    -------
    BLs_sort: List with baseline indices sorted according to equivalence.
        ex: BLs_sort = [[1, 3], [0, 2, 4]] means that you have 2 different classes
        of equivalence with 2 and 3 baselines respectively.
    all_eqtype: List of integers with the type of each baseline.
        ex: In the example above, you have 2 types (0 or 1)
        so you will get all_eqtype = [1, 0, 1, 0, 1]

    """
    ### Convert to array
    all_bs = np.array(all_bs)
    ### centers
    hcenters = q.horn.center[:, 0:2]
    ### Baselines vectors
    all_vecs = np.zeros((len(all_bs), 2))
    for ib in range(len(all_bs)):
        coordsA = hcenters[all_bs[ib][0] - 1, :]
        coordsB = hcenters[all_bs[ib][1] - 1, :]
        all_vecs[ib, :] = coordsB - coordsA

    ### List of types of equivalence for each baseline: initially = -1
    all_eqtype = np.zeros(len(all_bs), dtype=int) - 1

    ### First type is zero and is associated to first baseline
    eqnum = 0
    all_eqtype[0] = eqnum

    ### Indices of baselines
    index_bs = np.arange(len(all_bs))

    ### Loop over baselines
    for ib in range(0, len(all_bs)):
        ### Identify those that have no type
        wnotype = all_eqtype == -1
        bsnotype = all_bs[wnotype]
        vecsnotype = all_vecs[wnotype, :]
        indexnotype = index_bs[wnotype]
        ### Loop over those with no type
        for jb in range(len(bsnotype)):
            ### Check if equivalent
            status = check_equiv(all_vecs[ib, :], vecsnotype[jb, :])
            ### If so: give it the current type
            if status:
                all_eqtype[indexnotype[jb]] = eqnum
        ### We have gone through all possibilities for this type so we increment the type by 1
        eqnum = np.max(all_eqtype) + 1

    alltypes = np.unique(all_eqtype)
    BLs_sort = []
    for i in range(len(alltypes)):
        BLs_sort.append(index_bs[all_eqtype == i])
    return BLs_sort, all_eqtype


# ========== Compute power on the focal plane =============
def make_external_A(rep, open_horns):
    """
    Compute external_A from simulated files with aberrations.
    This can be used in get_response_power method that returns the synthetic beam on the sky
    or in get_response() to have the signal on the focal plane.
    Parameters
    ----------
    rep : str
        Path of the repository for the simulated files, can be download at :
        https://drive.google.com/open?id=19dPHw_CeuFZ068b-VRT7N-LWzOL1fmfG
    open_horns : list
        Indices of the open horns between 1 and 64.

    Returns
    -------
    external_A : list of tables describing the phase and amplitude at each point of the focal
            plane for each of the horns:
            [0] : array, X coordinates with shape (n) in GRF [m]
            [1] : array, Y coordinates with shape (n) in GRF [m]
            [2] : array, amplitude on X with shape (n, nhorns)
            [3] : array, amplitude on Y with shape (n, nhorns)
            [4] : array, phase on X with shape (n, nhorns) [rad]
            [5] : array, phase on Y with shape (n, nhorns) [rad]
    """
    # Get simulation files
    files = sorted(glob.glob(rep + "/*.dat"))

    nhorns = len(files)
    if nhorns != 64:
        raise ValueError("You should have 64 .dat files")

    # This is done to get the right file for each horn
    horn_transpose = np.arange(64)
    horn_transpose = np.reshape(horn_transpose, (8, 8))
    horn_transpose = np.ravel(horn_transpose.T)

    # Get the sample number from the first file
    data0 = pd.read_csv(files[0], sep="\t", skiprows=0)
    nn = data0["X_Index"].iloc[-1] + 1
    print("Sampling number = ", nn)

    n = len(data0.index)

    # X and Y positions in the GRF frame
    xONAFP = data0["X"] * 1e-3
    yONAFP = data0["Y"] * 1e-3
    xGRF = yONAFP
    yGRF = -xONAFP
    print(xGRF.shape)

    # Get all amplitudes and phases for each open horn
    nopen_horns = len(open_horns)

    allampX = np.empty((n, nopen_horns))
    allphiX = np.empty((n, nopen_horns))
    allampY = np.empty((n, nopen_horns))
    allphiY = np.empty((n, nopen_horns))
    print(allphiY.shape)
    for i, swi in enumerate(open_horns):
        print("horn ", swi)
        if swi < 1 or swi > 64:
            raise ValueError("The horn indices must be between 1 and 64 ")

        thefile = files[horn_transpose[swi - 1]]
        print("Horn ", swi, ": ", thefile[-10:])
        data = pd.read_csv(thefile, sep="\t", skiprows=0)

        print(data["MagX"].shape)
        allampX[:, i] = data["MagX"]
        allampY[:, i] = data["MagX"]

        allphiX[:, i] = data["PhaseX"]
        allphiY[:, i] = data["PhaseY"]

    external_A = [xGRF, yGRF, allampX, allampY, allphiX, allphiY]

    return external_A


def get_response_power(
    q,
    theta,
    phi,
    nu,
    spectral_irradiance,
    frame="ONAFP",
    external_A=None,
    hwp_position=0,
    verbose=False,
):
    """
    Compute power on the focal plane in the ONAFP frame for one position of the source
    with respect to the instrument.

    Parameters
    ----------
    q: a qubic monochromatic instrument
    theta: float
        The source zenith angle [rad].
    phi: float
        The source azimuthal angle [rad].
    nu: float
        Source frequency in Hz.
    spectral_irradiance : array-like
        The source spectral_irradiance [W/m^2/Hz].
    frame: str
        Referential frame you want to use: 'GRF' or 'ONAFP'
    external_A: list of tables describing the phase and amplitude at
    each point of the focal plane for each of the horns, see make_external_A()
    hwp_position : int
        HWP position from 0 to 7.

    Returns
    ----------
    x, y: 1D arrays with the coordinates on the focal plane in GRF or ONAFP.
    power: array with the power on the focal plane for each posiion (x, y) and each pointing.
    """
    if frame not in ["GRF", "ONAFP"]:
        raise ValueError("The frame is not valid. It must be GRF or ONAFP.")

    if external_A is None:
        position = q.detector.center  # GRF
    else:
        x1d = external_A[0]
        y1d = external_A[1]
        z1d = x1d * 0 - q.optics.focal_length
        position = np.array([x1d, y1d, z1d]).T
    xGRF = position[:, 0]
    yGRF = position[:, 1]

    # Electric field on the FP in the GRF frame
    E = q._get_response(
        theta,
        phi,
        spectral_irradiance,
        position,
        q.detector.area,
        nu,
        q.horn,
        q.primary_beam,
        q.secondary_beam,
        external_A=external_A,
        hwp_position=hwp_position,
    )
    power = np.abs(E) ** 2
    # power *= q.filter.bandwidth  # [W/Hz] to [W]

    if verbose:
        print("Detector centers shape:", q.detector.center.shape)
        print("Power shape:", power.shape)
        print("X_GRF shape:", xGRF.shape)

    if frame == "GRF":
        x = xGRF
        y = yGRF
    else:
        # Make a pi/2 rotation from GRF -> ONAFP referential frame
        xONAFP = -yGRF
        yONAFP = xGRF
        x = xONAFP
        y = yONAFP
    return x, y, power


def get_power_Maynooth(rep, open_horns, theta, nu, horn_center, hwp_position=0, verbose=True):
    """
    Get power on the focal plane from Maynooth simulations.
    Parameters
    ----------
    rep: str
        Repository with the simulation files.
    open_horns: list
        List of open horns.
    theta: float
        The source zenith angle [rad].
    nu: float
        Frequency of the calibration source [Hz]
    horn_center: array
        Coordinates of the horns.
    hwp_position: int
        HWP position from 0 to 7.
    verbose: bool

    Returns
    -------
    (x, y) coordinates on the focal plane in the ONAFP frame and the power at each coordinate.

    """
    # Get simulation files
    files = sorted(glob.glob(rep + "/*.dat"))
    if len(files) != 64:
        raise ValueError("You should have 64 .dat files")

    nhorns = len(open_horns)
    # Get the sample number from the first file and the coordinates X, Y
    data0 = pd.read_csv(files[0], sep="\t", skiprows=0)
    nn = data0["X_Index"].iloc[-1] + 1
    xONAFP = data0["X"] * 1e-3  # convert from mm to m
    yONAFP = data0["Y"] * 1e-3

    if verbose:
        print(f"# open horns = {nhorns}")
        print(f"Sampling number = {nn}")
        print(f"Number of lines = {nn**2}")

    # This is done to get the right file for each horn
    horn_transpose = np.arange(64)
    horn_transpose = np.reshape(horn_transpose, (8, 8))
    horn_transpose = np.ravel(horn_transpose.T)

    Ax = np.zeros((nhorns, nn**2))
    Ay = np.zeros_like(Ax)
    Phasex = np.zeros_like(Ax)
    Phasey = np.zeros_like(Ax)
    for i, swi in enumerate(open_horns):
        if swi < 1 or swi > 64:
            raise ValueError("The switch indices must be between 1 and 64 ")

        thefile = files[horn_transpose[swi - 1]]
        if verbose:
            print("Horn ", swi, ": ", thefile[-10:])
        data = pd.read_csv(thefile, sep="\t", skiprows=0)

        # Phase calculation
        horn_x = horn_center[swi - 1, 0]
        horn_y = horn_center[swi - 1, 1]
        dist = np.sqrt(horn_x**2 + horn_y**2)  # distance between the horn and the center
        additional_phase = -2 * np.pi / 3e8 * nu * dist * np.sin(theta)

        Ax[i, :] = data["MagX"]
        Ay[i, :] = data["MagY"]

        Phasex[i, :] = data["PhaseX"] + additional_phase
        Phasey[i, :] = data["PhaseY"] + additional_phase

    # Electric field for each open horn
    Ex = Ax * (np.cos(Phasex) + 1j * np.sin(Phasex))
    Ey = Ay * (np.cos(Phasey) + 1j * np.sin(Phasey))

    # Sum of the electric fields
    sumEx = np.sum(Ex, axis=0)
    sumEy = np.sum(Ey, axis=0)

    # HWP modulation
    phi_hwp = np.arange(0, 8) * np.pi / 16
    sumEx *= np.cos(2 * phi_hwp[hwp_position])
    sumEy *= np.sin(2 * phi_hwp[hwp_position])

    # Power on the focal plane
    # power = np.abs(sumEx) ** 2 + np.abs(sumEy) ** 2
    power = np.abs(sumEx + sumEy) ** 2

    return xONAFP, yONAFP, power


def fullreso2TESreso(x, y, power, TESvertex, TESarea, interp=False, verbose=True):
    """
    Decrease the resolution to have the power in each TES.
    Parameters
    ----------
    x, y: array
        Coordinates on the FP
    power: array
        Power on the FP.
    TESvertex: array
        Coordinates of the 4 corners of each TES.
    TESarea: float
        Area of the detectors [m²]
    interp: bool
        If True, interpolate and integrate in each TES (takes time).
        If False, make the mean in each TES (faster).
    verbose: bool

    Returns
    -------
    The power in each TES.
    """
    ndet = np.shape(TESvertex)[0]
    powerTES = np.zeros(ndet)
    print("ndet:", ndet)

    if interp:
        print("********** Begin interpolation **********")
        reso = int(np.sqrt(x.shape[0]))
        print("Reso:", reso)
        power_interp = RegularGridInterpolator(
            (np.unique(x), np.unique(y)),
            power.reshape((reso, reso)),
            method="linear",
            bounds_error=False,
            fill_value=0.0,
        )

        def power_interp_function(x, y):
            return power_interp(np.array([x, y]))

        print("********** Begin integration in the TES era **********")
        for TES in range(ndet):
            # Boundaries for the integral
            x1 = np.min(TESvertex[TES, :, 0])
            x2 = np.max(TESvertex[TES, :, 0])
            y1 = np.min(TESvertex[TES, :, 1])
            y2 = np.max(TESvertex[TES, :, 1])

            def gfun(x):
                return y1

            def hfun(x):
                return y2

            if verbose:
                xTES = (x1 + x2) / 2
                yTES = (y1 + y2) / 2
                print("\n Power at the TES center:", power_interp_function(xTES, yTES))
                print("x boundaries: {:.2f} to {:.2f} mm".format(x1 * 1e3, x2 * 1e3))
                print("Delta x = {:.2f} mm".format((x2 - x1) * 1e3))
                print("y boundaries: {:.2f} to {:.2f} mm".format(y1 * 1e3, y2 * 1e3))
                print("Delta y = {:.2f} mm".format((y2 - y1) * 1e3))

            power, _ = dblquad(power_interp_function, x1, x2, gfun, hfun)
            powerTES[TES] = power

        powerTES /= TESarea

    else:
        for TES in range(ndet):
            x1 = np.min(TESvertex[TES, :, 0])
            x2 = np.max(TESvertex[TES, :, 0])
            y1 = np.min(TESvertex[TES, :, 1])
            y2 = np.max(TESvertex[TES, :, 1])

            insideTEScondition = (x >= x1) & (x <= x2) & (y >= y1) & (y <= y2)
            indices = np.where(insideTEScondition)[0]
            count = indices.shape[0]
            powerTES[TES] = np.sum(power[indices])
            powerTES[TES] /= count

    return powerTES


# ========== Chi2 to fit the fringes =========
def make_inverse_covariance(errors, verbose=True):
    Cov = np.diag(errors**2)
    if verbose:
        print(Cov)

    InvCov = np.diag(1.0 / np.diag(Cov))
    return InvCov


def get_gains(allPowerPhi, allInvCov, allData):
    """
    Compute a gain for each detector analytically.
    Parameters
    ----------
    allPowerPhi: list
        List with fringes models Phi multiply by a global power P.
    allInvCov: list
        List with the inverse covariance matrices, one for each image.
    allData: list
        List with the fringe images.
    Returns
    -------
    Gains A and their covariance matrix Cov_A, normalized such as <A> = 1.
    """
    # Number of fringe images
    nimages = len(allPowerPhi)

    InvCov_A = np.zeros_like(allInvCov[0])
    Term = np.zeros_like(allData[0])
    for k in range(nimages):
        # Make a diagonal matrix
        PowerPhi_mat = np.diag(allPowerPhi[k])

        InvCov_A += PowerPhi_mat.T @ allInvCov[k] @ PowerPhi_mat
        Term += PowerPhi_mat.T @ allInvCov[k] @ allData[k]
    Cov_A = np.linalg.inv(InvCov_A)

    A = Cov_A @ Term

    # Normalization
    weights = 1 / np.diag(Cov_A)  # To cancel bad detectors
    avgA = np.average(A, weights=weights)  # Weighted mean
    A /= avgA
    Cov_A /= avgA**2

    return A, Cov_A


def get_chi2(params, allInvCov, allData, BLs, q, nu_source=150e9, returnA=False):
    """
    Compute the Chi^2 for a list of fringe images (several baselines) and a model M.
    M_k = P_k Phi_k(focal, source angle) @ A
    where k is the image index, P_k a global power, Phi_k the fringes model given by Model_Fringes_Ana
    and A contains the gains of the detectors.
    For now, we use the analytical model but it could be adapted to a more complex model (QubicSoft or Maynooth).

    Parameters
    ----------
    params: list
        Focal length, source off-axis angle, log_10 of global powers P_k
    allInvCov: list
        List with the inverse covariance matrices, one for each image.
    allData: list
        List with the fringe images.
    BLs: list
        List of the baselines, for example [25, 57] is one baseline.
    q: a Qubic instrument
    nu_source: float
        Frequency of the calibration source
    returnA: bool
        If True, it will return the detector gains and their covariance.

    Returns
    -------
    The Chi^2.
    """

    nimages = len(BLs)
    focal = params[0]
    theta_source = params[1]
    logP = params[2:]
    q.optics.focal_length = focal
    allPowerPhi = []
    for k in range(nimages):
        model = Model_Fringes_Ana(q, BLs[k], theta_source=theta_source, nu_source=nu_source)

        x, y, Phi = model.get_fringes(times_gaussian=False)

        # Global amplitude
        allPowerPhi.append(Phi * 10 ** logP[k])

    # Gain for each detector
    A, Cov_A = get_gains(allPowerPhi, allInvCov, allData)

    chi2 = 0.0
    for k in range(nimages):
        # Compute the chi2
        M = np.diag(allPowerPhi[k]) @ A
        R = M - allData[k]
        chi2 += R.T @ allInvCov[k] @ R

    if returnA:
        return chi2, A, Cov_A
    else:
        return chi2


def make_chi2_grid(
    allInvCov,
    fringes,
    BLs,
    q,
    nval_fl=30,
    nval_th=30,
    fl_min=0.25,
    fl_max=0.35,
    th_min=np.deg2rad(-1.0),
    th_max=np.deg2rad(1),
    LogPower=-1,
    fixPower=True,
):
    """Loop over the parameters (focal length and source off-axis angle) to explore the chi2.
    Global powers are fixed to Log_10(Power)=-1 or optimized at each step by minimizing a temporary chi2 (longer)."""
    nimages = len(BLs)
    chi2_grid = np.zeros((nval_fl, nval_th))

    all_fl = np.linspace(fl_min, fl_max, nval_fl)
    all_th = np.linspace(th_min, th_max, nval_th)

    # Fast method
    if fixPower:
        # Global powers are fixed to initPower and we loop on fl and th.
        for i, fl in enumerate(all_fl):
            for j, th in enumerate(all_th):
                params = [fl, th] + [LogPower] * nimages
                chi2_grid[i, j] = get_chi2(params, allInvCov, fringes, BLs, q)
        return all_fl, all_th, chi2_grid

    # Slow method but more rigorous
    else:
        # Loop on fl and th and at each step, minimize the chi2 to find the best global powers.
        power_optimize = np.zeros((nval_fl, nval_th, nimages))
        step = 0
        for i, fl in enumerate(all_fl):
            for j, th in enumerate(all_th):

                def chi2_temporary(mypower, allInvCov, fringes, BLs, q):
                    params = [fl, th] + list(mypower)
                    chi2_temp = get_chi2(params, allInvCov, fringes, BLs, q)
                    return chi2_temp

                result = sop.minimize(
                    chi2_temporary,
                    x0=[LogPower] * nimages,
                    args=(allInvCov, fringes, BLs, q),
                    method="Nelder-Mead",
                    options={"maxiter": 10000},
                )
                chi2_grid[i, j] = result["fun"]
                power_optimize[i, j, :] = result["x"]

                print(f"\n***Step {step + 1}/{nval_fl * nval_th}")
                print("Chi2 min:", result["fun"])
                print("with powers =", result["x"])
                step += 1
        return all_fl, all_th, chi2_grid, power_optimize


# ========== Fringe simulations =============
class Model_Fringes_Ana:
    def __init__(
        self,
        q,
        baseline,
        theta_source=0.0,
        phi_source=0.0,
        nu_source=150e9,
        fwhm=20.0,
        amp=1.0,
        frame="ONAFP",
    ):
        """

        Parameters
        ----------
        q: QubicInstrument
        baseline: list
            Baseline formed with 2 horns, index between 1 and 64 as on the instrument.
        theta_source: float
            The source zenith angle [rad].
        phi_source: float
            The source azimuthal angle [rad].
        nu_source: float
            Source frequency [Hz].
        fwhm: float
        amp: float
            Global amplitude for the fringes.
        """
        self.BL = baseline
        self.q = q
        self.focal = q.optics.focal_length
        self.theta_source = theta_source
        self.phi_source = phi_source
        self.nu_source = nu_source
        self.lam = 3e8 / self.nu_source
        self.fwhm = fwhm
        self.amp = amp
        self.frame = frame

        # Detector centers
        if self.frame == "ONAFP":
            xONAFP, yONAFP, _ = get_TEScoordinates_ONAFP(self.q)
            self.x = xONAFP
            self.y = yONAFP
        elif self.frame == "GRF":
            self.x = self.q.detector.center[:, 0]
            self.y = self.q.detector.center[:, 1]

        # Angle and length of the baseline:
        BL_angle, BL_length, BL_center = give_bs_pars(self.q, self.BL, frame="ONAFP")
        self.BL_angle = np.deg2rad(BL_angle)
        self.BL_length = BL_length
        self.BL_xc = BL_center[0]
        self.BL_yc = BL_center[1]

        # Additional phase
        dist = np.sqrt(self.BL_xc**2 + self.BL_yc**2)
        phase = -2 * np.pi / 3e8 * self.nu_source * dist * np.sin(self.theta_source)
        self.phase = phase

    def make_gaussian(self):
        sigma = np.deg2rad(self.fwhm / 2.355 * self.focal)
        # The position of the gaussian depends on the position of the source
        # Maybe there is a sign problem here...
        x_center = self.focal * np.tan(self.theta_source) * np.cos(self.phi_source)
        y_center = self.focal * np.tan(self.theta_source) * np.sin(self.phi_source)
        self.gaussian = np.exp(
            -0.5 * ((self.x - x_center) ** 2 + (self.y - y_center) ** 2) / sigma**2
        )
        return self.gaussian

    def get_fringes(self, times_gaussian=True):
        if times_gaussian:
            gaussian = self.make_gaussian()
        else:
            gaussian = 1.0
        xprime = self.x * np.cos(self.BL_angle) + self.y * np.sin(self.BL_angle)
        interfrange = self.lam * self.focal / self.BL_length
        self.fringes = (
            self.amp * np.cos((2.0 * np.pi / interfrange * xprime) + self.phase) * gaussian
        )

        return self.x, self.y, self.fringes


[docs] class Model_Fringes_QubicSoft: def __init__( self, q, baseline, theta_source=0.0, phi_source=0.0, nu_source=150e9, spec_irrad_source=1.0, frame="ONAFP", external_A=None, hwp_position=0, ): """ Parameters ---------- q: QubicInstrument baseline: list Baseline formed with 2 horns, index between 1 and 64 as on the instrument. theta_source: float The source zenith angle [rad]. phi_source: float The source azimuthal angle [rad]. nu_source: float Source frequency [Hz]. spec_irrad: array-like The source spectral_irradiance [W/m^2/Hz]. frame: str 'GRF' or 'ONAFP'. """ self.q = q self.baseline = baseline self.theta_source = theta_source self.phi_source = phi_source self.nu_source = nu_source self.spec_irrad_source = spec_irrad_source self.frame = frame self.external_A = external_A self.hwp_position = hwp_position
[docs] def get_fringes(self, doplot=True, verbose=True, **kwargs): """ Compute the fringes on the focal plane directly opening the baseline. see get_response_power() for the arguments. Returns (x, y) coordinates and the power. """ open_switches(self.q, self.baseline) self.x, self.y, self.fringes = get_response_power( self.q, self.theta_source, self.phi_source, self.nu_source, self.spec_irrad_source, frame=self.frame, external_A=self.external_A, hwp_position=self.hwp_position, verbose=verbose, ) if doplot: plot_horn_and_FP( self.q, self.x, self.y, self.fringes, frame=self.frame, title="Baseline {} - Theta={}deg - Phi={}deg".format( self.baseline, np.rad2deg(self.theta_source), np.rad2deg(self.phi_source) ), **kwargs, ) return self.x, self.y, self.fringes
[docs] def get_all_combinations_power(self, doplot=True, verbose=True, **kwargs): """ Returns the power on the focal plane at each pointing (each position of the source), for different configurations of the horn array: all open, all open except i, except j, except i and j, only i open, only j open, only i and j open. Returns ------- x, y: The coordinates on the FP. S, Cminus_i, Cminus_j, Sminus_ij, Ci, Cj, Sij : arrays Power on the focal plane for each configuration, at each pointing. """ self.q.horn.open = True # All open self.x, self.y, S = get_response_power( self.q, self.theta_source, self.phi_source, self.nu_source, self.spec_irrad_source, frame=self.frame, verbose=verbose, ) if doplot: plot_horn_and_FP( self.q, self.x, self.y, S, frame=self.frame, title="$S$ - All open", **kwargs ) # All open except i self.q.horn.open[self.baseline[0] - 1] = False _, _, Cminus_i = get_response_power( self.q, self.theta_source, self.phi_source, self.nu_source, self.spec_irrad_source, frame=self.frame, verbose=verbose, ) if doplot: plot_horn_and_FP( self.q, self.x, self.y, Cminus_i, frame=self.frame, title="$C_{-i}$" + f" - Horn {self.baseline[0]} close", **kwargs, ) # All open except baseline [i, j] self.q.horn.open[self.baseline[1] - 1] = False _, _, Sminus_ij = get_response_power( self.q, self.theta_source, self.phi_source, self.nu_source, self.spec_irrad_source, frame=self.frame, verbose=verbose, ) if doplot: plot_horn_and_FP( self.q, self.x, self.y, Sminus_ij, frame=self.frame, title="$S_{-ij}$" + f" - Baseline {self.baseline} close", **kwargs, ) # All open except j self.q.horn.open[self.baseline[0] - 1] = True _, _, Cminus_j = get_response_power( self.q, self.theta_source, self.phi_source, self.nu_source, self.spec_irrad_source, frame=self.frame, verbose=verbose, ) if doplot: plot_horn_and_FP( self.q, self.x, self.y, Cminus_j, frame=self.frame, title="$C_{-j}$" + f" - Horn {self.baseline[1]} close", **kwargs, ) # Only i open (not a realistic observable) self.q.horn.open = False self.q.horn.open[self.baseline[0] - 1] = True _, _, Ci = get_response_power( self.q, self.theta_source, self.phi_source, self.nu_source, self.spec_irrad_source, frame=self.frame, verbose=verbose, ) if doplot: plot_horn_and_FP( self.q, self.x, self.y, Ci, frame=self.frame, title="$C_i$" + f" - Only horn {self.baseline[0]} open", **kwargs, ) # Only j open (not a realistic observable) self.q.horn.open[self.baseline[0] - 1] = False self.q.horn.open[self.baseline[1] - 1] = True _, _, Cj = get_response_power( self.q, self.theta_source, self.phi_source, self.nu_source, self.spec_irrad_source, frame=self.frame, verbose=verbose, ) if doplot: plot_horn_and_FP( self.q, self.x, self.y, Cj, frame=self.frame, title="$C_j$" + f" - Only horn {self.baseline[1]} open", **kwargs, ) # Only baseline [i, j] open (not a realistic observable) self.q.horn.open[self.baseline[0] - 1] = True _, _, Sij = get_response_power( self.q, self.theta_source, self.phi_source, self.nu_source, self.spec_irrad_source, frame=self.frame, verbose=verbose, ) if doplot: plot_horn_and_FP( self.q, self.x, self.y, Sij, frame=self.frame, title="$S_{ij}$" + f" - Only baseline {self.baseline} open", **kwargs, ) return self.x, self.y, S, Cminus_i, Sminus_ij, Cminus_j, Ci, Cj, Sij
[docs] def get_fringes_from_combination(self, measured_comb=True, doplot=True, verbose=True, **kwargs): """ Return the fringes on the FP by making the combination Returns ------- x, y: The coordinates on the FP. fringes : Fringes on the FP, for each coordinate, at each pointing. """ self.x, self.y, S_tot, Cminus_i, Sminus_ij, Cminus_j, Ci, Cj, Sij = ( self.get_all_combinations_power(doplot=doplot, verbose=verbose, **kwargs) ) if measured_comb: self.fringes_comb = S_tot - Cminus_i - Cminus_j + Sminus_ij else: self.fringes_comb = S_tot - Cminus_i - Cminus_j + Sminus_ij + Ci + Cj if doplot: plot_horn_and_FP( self.q, self.x, self.y, self.fringes_comb, frame=self.frame, title="Baseline {} - Theta={}deg - Phi={}deg".format( self.baseline, np.rad2deg(self.theta_source), np.rad2deg(self.phi_source) ), **kwargs, ) return self.x, self.y, self.fringes_comb
[docs] class Model_Fringes_Maynooth: def __init__( self, q, baseline, rep, theta_source=0.0, nu_source=150e9, frame="ONAFP", interp=False ): """ Parameters ---------- q: QubicInstrument baseline: list Baseline formed with 2 horns, index between 1 and 64 as on the instrument. rep: str Repository with the simulation files. theta: float The source zenith angle [rad]. nu: float Frequency of the calibration source [Hz] frame: str 'GRF' or 'ONAFP'. interp: bool If True, interpolate and integrate in each TES (takes time). If False, make the mean in each TES (faster). """ self.q = q self.baseline = baseline self.rep = rep self.theta_source = theta_source self.nu_source = nu_source self.frame = frame self.interp = interp
[docs] def get_fringes(self, verbose=True): """ Compute fringes on the focal plane from Maynooth simulations. Returns ------- x, y coordinates on the FP in the ONAFP frame and the corresponding power. """ if self.q.config != "TD": raise ValueError("Maynooth simulations are for the TD only.") xONAFP, yONAFP, fringes_fullreso = get_power_Maynooth( self.rep, self.baseline, self.theta_source, self.nu_source, self.q.horn.center, verbose=verbose, ) # TES centers and TES vertex in the ONAFP frame xONAFP_TES, yONAFP_TES, vONAFP_TES = get_TEScoordinates_ONAFP(self.q) self.fringes = fullreso2TESreso( xONAFP, yONAFP, fringes_fullreso, vONAFP_TES, self.q.detector.area, interp=self.interp, verbose=verbose, ) # power_TES *= q.filter.bandwidth # W/Hz to W if self.frame == "ONAFP": self.x = xONAFP_TES self.y = yONAFP_TES elif self.frame == "GRF": self.x = yONAFP_TES self.y = -xONAFP_TES return self.x, self.y, self.fringes
[docs] def get_fringes_from_combination(self, measured_comb=True, verbose=True): """ Compute fringes on the focal plane from Maynooth simulations doing the combination with S_tot, Cminus_i, Cminus_j, Sminus_ij, Ci and Cj. Parameters ---------- measured_comb: bool If True, returns the measured combination: S_tot - Cminus_i - Cminus_j + Sminus_ij. If False, returns the complete combination: S_tot - Cminus_i - Cminus_j + Sminus_ij + Ci + Cj. verbose: bool Returns ------- x, y coordinates on the FP in the ONAFP frame and the corresponding power. """ i = self.baseline[0] j = self.baseline[1] all_open = np.arange(1, 65) first_close = np.delete(all_open, i - 1) second_close = np.delete(all_open, j - 1) both_close = np.delete(all_open, [i - 1, j - 1]) xONAFP, yONAFP, S_tot = get_power_Maynooth( self.rep, all_open, self.theta_source, self.nu_source, self.q.horn.center, verbose=verbose, ) _, _, Cminus_i = get_power_Maynooth( self.rep, first_close, self.theta_source, self.nu_source, self.q.horn.center, verbose=verbose, ) _, _, Cminus_j = get_power_Maynooth( self.rep, second_close, self.theta_source, self.nu_source, self.q.horn.center, verbose=verbose, ) _, _, Sminus_ij = get_power_Maynooth( self.rep, both_close, self.theta_source, self.nu_source, self.q.horn.center, verbose=verbose, ) if measured_comb: fringes_comb_fullreso = S_tot - Cminus_i - Cminus_j + Sminus_ij else: _, _, Ci = get_power_Maynooth( self.rep, [i - 1], self.theta_source, self.nu_source, self.q.horn.center, verbose=verbose, ) _, _, Cj = get_power_Maynooth( self.rep, [j - 1], self.theta_source, self.nu_source, self.q.horn.center, verbose=verbose, ) fringes_comb_fullreso = S_tot - Cminus_i - Cminus_j + Sminus_ij + Ci + Cj # TES centers and TES vertex in the ONAFP frame xONAFP_TES, yONAFP_TES, vONAFP_TES = get_TEScoordinates_ONAFP(self.q) self.fringes_comb = fullreso2TESreso( xONAFP, yONAFP, fringes_comb_fullreso, vONAFP_TES, self.q.detector.area, interp=self.interp, verbose=verbose, ) if self.frame == "ONAFP": self.x = xONAFP_TES self.y = yONAFP_TES elif self.frame == "GRF": self.x = yONAFP_TES self.y = -xONAFP_TES return self.x, self.y, self.fringes_comb