Source code for lib.Calibration.Qfluxjumps

# Author: Belén Costanza, Lucas Merlo, Claudia Scóccola

import numpy as np
import os
import bottleneck as bn
from sklearn.cluster import DBSCAN
from sklearn.tree import DecisionTreeRegressor
from typing import Union
import yaml
import pickle
import matplotlib.pyplot as plt


[docs] class badIV: def __init__( self, directory: str, n_times: int, TES_number: int = 256, filename: str = "IVdict2025.yaml" ): """ Class for discarding TES with bad IV curves. This class identifies TES to be excluded based on the number of times they exhibit bad IV curves in a given dataset year (e.g. 2022–2023 or 2025). The information is read from a dictionary where each TES index is associated with its number of bad IV occurrences. TES not present in the dictionary always have good IV curves. Parameters: directory: Directory containing the bad IV dictionary. n_times: Minimum number of bad IV occurrences required to discard a TES. TES_number: Number of TES in the focal plane (default: 256). filename: Name of the dictionary file to read (e.g. "IVdict2025.yaml" or "IVdict2023.yaml"). """ self.directory = directory self.n_times = n_times self.TES_number = TES_number self.filename = filename
[docs] def load_bad_iv(self): """ Load TES with bad IV curves from a directory. """ path = os.path.join(self.directory, self.filename) if not os.path.exists(path): raise FileNotFoundError(f"No bad IV file found at {path}") with open(path, "r") as f: data = yaml.safe_load(f) or {} return {int(k): int(v) for k, v in data.items()}
[docs] def select_badIV(self): """ Select TES indices whose bad IV occurrence count is >= n_times. Parameters ---------- bad_iv_counts : dict[int, int] Dictionary {TES_index: number_of_bad_IVs}. n_times : int Minimum number of bad IV occurrences. Returns ------- list[int] Sorted list of TES indices satisfying n_fallas >= n_times. """ counts = self.load_bad_iv() if not isinstance(self.n_times, int): raise TypeError("n_times must be an integer") if self.n_times < 0: raise ValueError("n_times must be non-negative") if counts: max_times = max(counts.values()) if self.n_times > max_times: raise ValueError( f"n_times={self.n_times} exceeds maximum possible value ({max_times})" ) bad = {tes for tes, n in counts.items() if n >= self.n_times} mask = np.ones(self.TES_number, dtype=bool) for tes in bad: if 0 <= tes < self.TES_number: mask[tes] = False return mask
[docs] class fluxjumps: def __init__(self, thr: float, window_size: int): """Class for detection of discontinuities (flux jumps) in TOD. This class detects flux jumps by applying a Haar filter to the data and identifying discontinuities that exceed specified thresholds. It clusters detected jumps and determines their start and end positions. Parameters: thr: Threshold or list/array of thresholds for the Haar filter. If a flux jump value is higher than the threshold, the time sample is considered a flux jump candidate. window_size: Size of the bottleneck moving median window used in the Haar filter. """ self.thr = np.array(thr) self.window_size = window_size # Constants for jumps_detection method self.MAX_JUMPS_BEFORE_RETHRESHOLD = 11 self.HIGH_THRESHOLD_VALUE = 4e5 self.THRESHOLD_FRACTION_FOR_EDGES = 0.05
[docs] def haar_function(self, todarray: np.ndarray): """Apply Haar filter to detect discontinuities in the TOD array. Parameters: todarray: 1D array containing the time-ordered data. Returns: tod_haar: Array of the same size as todarray with Haar filter values. """ if todarray.ndim != 1: raise ValueError(f"todarray must be 1D, got shape {todarray.shape}") if len(todarray) < self.window_size * 3: raise ValueError( f"todarray length ({len(todarray)}) must be at least " f"3 * window_size ({3 * self.window_size})" ) tod_haar = np.zeros(todarray.size) xf = bn.move_median(todarray, self.window_size)[self.window_size :] tod_haar[ self.window_size + self.window_size // 2 : -self.window_size + self.window_size // 2 ] = xf[: -self.window_size] - xf[self.window_size :] return tod_haar
[docs] def find_candidates(self, tod_haar: np.ndarray, thr: np.ndarray): """Find flux jump candidates using threshold comparisons. Returns boolean mask indicating jump locations. Parameters: tod_haar: Array of Haar filter values. thr: Array of threshold values to test. Returns: Tuple containing: - jumps: Boolean array where True indicates a flux jump candidate. """ if tod_haar.ndim != 1: raise ValueError(f"tod_haar must be 1D, got shape {tod_haar.shape}") jumps = np.zeros_like(tod_haar, dtype=bool) max_haar = np.max(np.abs(tod_haar)) if max_haar >= thr: # Found jumps with this threshold jumps = np.abs(tod_haar) >= thr return jumps
[docs] def clusters(self, todarray: np.ndarray, jumps: np.ndarray): """Cluster detected jump indices to identify distinct flux jump events. Uses DBSCAN clustering to group nearby jump indices into clusters, where each cluster represents a distinct flux jump event. Parameters: todarray: 1D array of the TOD. jumps: Boolean array indicating jump candidate locations. Returns: Tuple containing: - nc: Number of clusters (flux jump events) found. - idx_jumps: Array of indices where jumps occur. - clust: DBSCAN cluster object, or None if no clusters found. """ if len(todarray) != len(jumps): raise ValueError( f"todarray length ({len(todarray)}) must match jumps length ({len(jumps)})" ) idx = np.arange(len(todarray)) idx_jumps = idx[jumps] if idx_jumps.size > 1: clust = DBSCAN(eps=self.window_size // 5, min_samples=1).fit( np.reshape(idx_jumps, (len(idx_jumps), 1)) ) nc = int(np.max(clust.labels_) + 1) else: nc = 0 idx_jumps = np.array([], dtype=int) clust = None return nc, idx_jumps, clust
[docs] def initial_start_end( self, nc: int, idx_jumps: np.ndarray, tod_haar: np.ndarray, thr_used: float, clust: DBSCAN ): """Determine the initial start and end indices of each flux jump cluster. For each cluster, finds where the Haar filter values drop below a fraction of the threshold to determine the actual start and end of the jump. Parameters: nc: Number of clusters. idx_jumps: Array of indices where jumps occur. tod_haar: Array of Haar filter values. thr_used: Threshold value used for jump detection. clust: DBSCAN cluster object. Returns: Tuple containing: - xc: Array of start indices for each flux jump. - xcf: Array of end indices for each flux jump. """ if clust is None or nc == 0: return np.array([], dtype=int), np.array([], dtype=int) if idx_jumps.size == 0: return np.array([], dtype=int), np.array([], dtype=int) xc = np.zeros(nc, dtype=int) xcf = np.zeros(nc, dtype=int) edge_threshold = thr_used * self.THRESHOLD_FRACTION_FOR_EDGES for i in range(nc): idx_jumps_from_thr = idx_jumps[clust.labels_ == i] if len(idx_jumps_from_thr) == 0: continue # jump to next iteration first_jump_idx = idx_jumps_from_thr[0] last_jump_idx = idx_jumps_from_thr[-1] # Find end of jump (where Haar values drop below threshold after last jump) end_mask = np.abs(tod_haar[last_jump_idx:]) < edge_threshold idx_delta_end_jump = np.where(end_mask)[0][0] # Find start of jump (where Haar values drop below threshold before first jump) start_mask = np.abs(tod_haar[:first_jump_idx]) < edge_threshold last_below_threshold_idx = np.where(start_mask)[0][-1] idx_delta_start_jump = first_jump_idx - last_below_threshold_idx xc[i] = first_jump_idx - idx_delta_start_jump xcf[i] = last_jump_idx + idx_delta_end_jump return xc, xcf
[docs] def unique(self, xc: np.ndarray, xcf: np.ndarray): """Remove duplicate start and end indices. Parameters: xc: Array of start indices. xcf: Array of end indices. Returns: Tuple containing: - nc_unique: Number of unique jump pairs. - xc_unique: Array of unique start indices. - xcf_unique: Array of unique end indices. """ xc_unique = np.unique(xc) xcf_unique = np.unique(xcf) nc_unique = len(xc_unique) return nc_unique, xc_unique, xcf_unique
[docs] def change_values( self, xc: Union[np.ndarray, list], xcf: Union[np.ndarray, list], max_gap: int = 10 ): """Merge consecutive jumps that are close together. Groups jumps that are within max_gap samples of each other into a single jump region, which is useful for handling multiple closely-spaced jumps as a single discontinuity. Parameters: xc: Array or list of start indices. xcf: Array or list of end indices. max_gap: Maximum gap between jumps to consider them consecutive (default: 10). Returns: Tuple containing: - xc2: List of merged start indices. - xcf2: List of merged end indices. """ xc2 = [] xcf2 = [] i = 0 while i < len(xc): xini = xc[i] xfin = xcf[i] j = i + 1 # Group while jumps are within the margin while j < len(xc) and xc[j] - xfin <= max_gap: xfin = xcf[j] j += 1 xc2.append(xini) xcf2.append(xfin) i = j return xc2, xcf2
[docs] def jumps_detection(self, todarray: np.ndarray, consec: bool = True, nc_cond: bool = False): """Main method to detect flux jumps in the TOD array. This method performs the complete flux jump detection pipeline: 1. Applies Haar filter to detect discontinuities 2. Finds jump candidates using thresholds 3. Clusters jumps into distinct events 4. Optionally re-thresholds if too many jumps detected 5. Determines start and end positions of jumps 6. Optionally merges consecutive jumps Parameters: todarray: 1D array containing the TOD. consec: If True, merge consecutive jumps that are close together (default: True). nc_cond: If True, apply higher threshold if more than MAX_JUMPS_BEFORE_RETHRESHOLD jumps are detected (default: False). Returns: Tuple containing: - nc_unique: Number of unique flux jumps detected. - xc_unique: Array of start indices (empty array if no jumps). - xcf_unique: Array of end indices (empty array if no jumps). """ if todarray.ndim != 1: raise ValueError(f"todarray must be 1D, got shape {todarray.shape}") # Step 1: Apply Haar filter tod_haar = self.haar_function(todarray) # Step 2: Find jump candidates using thresholds jumps = self.find_candidates(tod_haar, self.thr) # Step 3: Cluster jumps into distinct events nc, idx_jumps, clust = self.clusters(todarray, jumps) # Optional: Re-threshold if too many jumps detected if nc_cond and nc > self.MAX_JUMPS_BEFORE_RETHRESHOLD: self.thr = self.HIGH_THRESHOLD_VALUE tod_haar = self.haar_function(todarray) jumps = self.find_candidates(tod_haar, self.thr) nc, idx_jumps, clust = self.clusters(todarray, jumps) # Handle case with no jumps if nc == 0: return 0, np.array([], dtype=int), np.array([], dtype=int) # Step 4: Find start and end positions of jumps if clust is None: return 0, np.array([], dtype=int), np.array([], dtype=int) xc, xcf = self.initial_start_end(nc, idx_jumps, tod_haar, self.thr, clust) # Step 5: Remove duplicate start/end indices nc_unique, xc_unique, xcf_unique = self.unique(xc, xcf) # Step 6: Optionally merge consecutive jumps if consec: xc_unique, xcf_unique = self.change_values(xc_unique, xcf_unique) nc_unique = len(xc_unique) return nc_unique, np.array(xc_unique), np.array(xcf_unique)
[docs] class correction: def __init__(self, region_off: int = 5, region_amp: int = 10, change_mode: str = "const"): """Class that calculates flux jump amplitudes and applies corrections to TOD. This class corrects flux jumps in TOD by: 1. Calculating the offset amplitude of each jump 2. Removing the offset from the data after each jump 3. Replacing the jump region with appropriate signal based on the change_mode Parameters: region_off: Number of samples to use for calculating offset before/after jumps (default: 5). region_amp: Number of samples to use for calculating mean/std before jumps (default: 10). change_mode: Method for replacing jump regions. Options: - "init": Use initial mean and std for all jumps - "noise": Use mean and std computed before each jump - "const": Use constrained realization based on power spectrum (default). """ if change_mode not in ["init", "noise", "const"]: raise ValueError( f"change_mode must be 'init', 'noise', or 'const', got '{change_mode}'" ) if region_off < 1 or region_amp < 1: raise ValueError("region_off and region_amp must be positive integers") self.region_off = region_off self.region_amp = region_amp self.change_mode = change_mode
[docs] def calculate_amplitude(self, todarray: np.ndarray, xc: np.ndarray, xcf: np.ndarray, nc: int): """Calculate the amplitude offset for each flux jump. For each jump, computes the difference between the median value after the jump and the median value before the jump. This offset represents the amplitude of the discontinuity. Parameters: todarray: 1D array containing the TOD. xc: Array of start indices for each jump. xcf: Array of end indices for each jump. nc: Number of jumps. Returns: offset: Array of offset amplitudes for each jump. """ if todarray.ndim != 1: raise ValueError(f"todarray must be 1D, got shape {todarray.shape}") if len(xc) != nc or len(xcf) != nc: raise ValueError(f"xc and xcf must have length {nc}") if nc == 0: return np.array([], dtype=float) offset = np.zeros(nc) for i in range(nc): # Ensure indices are within bounds start_idx = max(0, xc[i] - self.region_off) end_idx = min(len(todarray), xcf[i] + self.region_off) region_before = todarray[start_idx : xc[i]] region_after = todarray[xcf[i] : end_idx] if len(region_before) == 0 or len(region_after) == 0: offset[i] = 0.0 else: offset[i] = np.median(region_after) - np.median(region_before) return offset
[docs] def move_offset( self, todarray: np.ndarray, offset: np.ndarray, xc: np.ndarray, xcf: np.ndarray, nc: int ): """Remove offset from the TOD after each flux jump. Applies the calculated offset correction to the data after each jump. The offsets are applied in reverse order (from last to first). Parameters: todarray: 1D array containing the TOD. offset: Array of offset amplitudes for each jump. xc: Array of start indices for each jump. xcf: Array of end indices for each jump. nc: Number of jumps. Returns: tod_new: Copy of todarray with offsets removed after each jump. """ if todarray.ndim != 1: raise ValueError(f"todarray must be 1D, got shape {todarray.shape}") if len(offset) != nc or len(xc) != nc or len(xcf) != nc: raise ValueError(f"offset, xc, and xcf must have length {nc}") if nc == 0: return todarray.copy() tod_new = todarray.copy() array_length = len(todarray) if nc == 1: initial = xcf[0] final = array_length if initial < array_length: tod_new[initial:final] = todarray[initial:final] - offset[0] else: # Apply offsets in reverse order for i in range(len(xcf) - 1, -1, -1): initial = xcf[i] final = array_length if initial < array_length: tod_new[initial:final] = tod_new[initial:final] - offset[i] return tod_new
[docs] def changesignal_init(self, tod_new: np.ndarray, xc: np.ndarray, xcf: np.ndarray): """Replace jump regions with noise using initial statistics. Uses the standard deviation computed from the beginning of the data and the mean computed before each jump to generate replacement noise. Parameters: tod_new: TOD array with offsets removed. xc: Array of start indices for each jump. xcf: Array of end indices for each jump. Returns: y_cor: Corrected TOD array with jump regions replaced by noise. """ if tod_new.ndim != 1: raise ValueError(f"tod_new must be 1D, got shape {tod_new.shape}") if len(xc) == 0: return tod_new.copy() y_cor = tod_new.copy() std = np.std(tod_new[: xc[0]]) for i in range(len(xc)): # Compute mean from region before the jump start_idx = max(0, xc[i] - self.region_amp) region_before = tod_new[start_idx : xc[i]] mean = np.mean(region_before) # Generate replacement noise jump_length = xcf[i] - xc[i] if jump_length > 0: ynew = np.random.normal(mean, std, jump_length) y_cor[xc[i] : xcf[i]] = ynew return y_cor
[docs] def changesignal_noise(self, tod_new: np.ndarray, xc: np.ndarray, xcf: np.ndarray): """Replace jump regions with noise using local statistics. For each jump, computes the mean and standard deviation from the region immediately before the jump, then generates replacement noise using these local statistics. Parameters: tod_new: TOD array with offsets removed. xc: Array of start indices for each jump. xcf: Array of end indices for each jump. Returns: y_cor: Corrected TOD array with jump regions replaced by noise. """ if tod_new.ndim != 1: raise ValueError(f"tod_new must be 1D, got shape {tod_new.shape}") if len(xc) == 0: return tod_new.copy() y_cor = tod_new.copy() for i in range(len(xc)): # Compute statistics from region before the jump start_idx = max(0, xc[i] - self.region_amp) region_before = tod_new[start_idx : xc[i]] std = np.std(region_before) mean = np.mean(region_before) # Generate replacement noise jump_length = xcf[i] - xc[i] if jump_length > 0: ynew = np.random.normal(mean, std, jump_length) y_cor[xc[i] : xcf[i]] = ynew return y_cor
[docs] def constrained_realization(self, tod_new: np.ndarray, xini: int, xfin: int): """Replace jump region with constrained realization based on power spectrum. Uses the power spectrum from regions before and after the jump to generate a synthetic signal that maintains the statistical properties while smoothly connecting the edges. Parameters: tod_new: TOD array with offsets removed. xini: Start index of the jump region. xfin: End index of the jump region. Returns: y: TOD array with jump region replaced by constrained realization. """ if tod_new.ndim != 1: raise ValueError(f"tod_new must be 1D, got shape {tod_new.shape}") xini = int(xini) xfin = int(xfin) if xini < 0 or xfin > len(tod_new) or xini >= xfin: raise ValueError( f"Invalid jump indices: xini={xini}, xfin={xfin}, array_length={len(tod_new)}" ) y = tod_new.copy() L = xfin - xini # Determine region sizes before and after jump (don't go beyond array boundaries) region_pre = min(L // 2, xini) region_post = min(L - region_pre, len(y) - xfin) total_need = L still_need = total_need - (region_pre + region_post) # Adjust regions if needed to sum to L if still_need > 0: extra_pre = min(still_need, region_pre) extra_post = still_need - extra_pre region_pre += extra_pre region_post += extra_post # Extract data before and after the jump pre = y[xini - region_pre : xini] post = y[xfin : xfin + region_post] local_data = np.concatenate([pre, post]) N = len(local_data) # Compute power spectrum from data before and after the jump fft_data = np.fft.rfft(local_data - np.mean(local_data)) power_spectrum = np.abs(fft_data) ** 2 # Generate random phase realization with same power spectrum random_phases = np.exp(1j * 2 * np.pi * np.random.rand(len(fft_data))) new_fft = np.sqrt(power_spectrum) * random_phases sim_signal = np.fft.irfft(new_fft, n=N) sim_trunc = sim_signal[:L] start_val = y[xini - 1] end_val = y[xfin] sim_trunc -= np.mean(sim_trunc) # centrado sim_trunc = sim_trunc / np.std(sim_trunc) * np.std(pre) # Linear transition to match edges window = np.linspace(0, 1, L) sim_trunc = ( sim_trunc + (1 - window) * (start_val - sim_trunc[0]) + window * (end_val - sim_trunc[-1]) ) y[xini:xfin] = sim_trunc return y
[docs] def correct_TOD( self, todarray: np.ndarray, offset: np.ndarray, xc: np.ndarray, xcf: np.ndarray, nc: int ): """Apply complete correction to the TOD array. This is the main correction method that: 1. Calculates jump amplitudes (if not provided) 2. Removes offsets from the data 3. Replaces jump regions based on the selected change_mode Parameters: todarray: 1D array containing the time-ordered data. offset: Array of offset amplitudes for each jump. xc: Array of start indices for each jump. xcf: Array of end indices for each jump. nc: Number of jumps. Returns: tod_corr: Corrected TOD array with flux jumps removed. """ if todarray.ndim != 1: raise ValueError(f"todarray must be 1D, got shape {todarray.shape}") if nc == 0: return todarray.copy() # Step 1: Remove offsets tod_new = self.move_offset(todarray, offset, xc, xcf, nc) # Step 2: Replace jump regions based on change_mode if self.change_mode == "init": tod_corr = self.changesignal_init(tod_new, xc, xcf) elif self.change_mode == "noise": tod_corr = self.changesignal_noise(tod_new, xc, xcf) elif self.change_mode == "const": tod_corr = tod_new for i in range(nc): tod_corr = self.constrained_realization(tod_corr, xc[i], xcf[i]) else: raise ValueError(f"Unknown change_mode: {self.change_mode}") return tod_corr
[docs] class DT: def __init__( self, thr_count: int = 600, thr_amp: float = 2e5, tol: float = 1e2, depth: bool = True, depth_number: int = 0, ): """Class for using decision tree to calculate levels between flux jumps. Uses a Decision Tree Regressor to identify discrete levels in the TOD data between flux jumps, which helps improve the correction process. Parameters: thr_count: Threshold for minimum number of repetitions for a level (default: 600). thr_amp: Threshold for minimum amplitude difference between levels (default: 2e5). tol: Tolerance for assigning signal values to specific DT levels (default: 1e2). depth: If True, max_depth is set to the number of flux jumps found. If False, uses depth_number instead (default: True). depth_number: Depth to use for DT if depth is False (default: 0). """ if thr_count < 1: raise ValueError("thr_count must be positive") if thr_amp < 0: raise ValueError("thr_amp must be non-negative") if tol < 0: raise ValueError("tol must be non-negative") if not depth and depth_number < 1: raise ValueError("depth_number must be positive when depth is False") self.thr_count = thr_count self.thr_amp = thr_amp self.tol = tol self.depth = depth self.depth_number = depth_number # Constants self.MIN_DEPTH = 3 self.TOL_MULTIPLIER = 50
[docs] def define_model(self, tt: np.ndarray, todarray: np.ndarray, num: int): """Define and fit a Decision Tree Regressor model. Fits a Decision Tree Regressor to the TOD to identify discrete levels in the signal. Parameters: tt: 1D array of time values. todarray: 1D array containing the TOD. num: Number of flux jumps (used to determine tree depth). Returns: ypred: Predicted values from the decision tree model. """ if tt.ndim != 1 or todarray.ndim != 1: raise ValueError("tt and todarray must be 1D arrays") if len(tt) != len(todarray): raise ValueError( f"tt and todarray must have same length, got {len(tt)} and {len(todarray)}" ) depth = max(self.MIN_DEPTH, num) x = tt.reshape(-1, 1) regressor = DecisionTreeRegressor(max_depth=depth, random_state=42) regressor.fit(x, todarray) ypred = regressor.predict(x) return ypred
[docs] def uniqueindex(self, ypred: np.ndarray): """Find unique predicted values and count their occurrences. Parameters: ypred: Array of predicted values from the decision tree. Returns: Tuple containing: - predunique: Array of unique predicted values. - index: Array of first occurrence indices for each unique value. - count: Array of counts for each unique value. """ if ypred.ndim != 1: raise ValueError(f"ypred must be 1D, got shape {ypred.shape}") if len(ypred) == 0: return np.array([]), np.array([], dtype=int), np.array([], dtype=int) predunique, index = np.unique(ypred, return_index=True) index = np.sort(index) predunique = ypred[index] count = np.zeros(len(predunique), dtype=int) for i in range(len(predunique)): count[i] = len(np.where(ypred == predunique[i])[0]) return predunique, index, count
[docs] def count_filter(self, predunique: np.ndarray, index: np.ndarray, count: np.ndarray): """Filter levels based on count threshold. Removes levels that don't have enough occurrences (below thr_count). Parameters: predunique: Array of unique predicted values. index: Array of first occurrence indices. count: Array of counts for each unique value. Returns: Tuple containing: - filtered_pred: Filtered unique predicted values. - filtered_index: Filtered indices. - filtered_count: Filtered counts. """ if len(predunique) != len(index) or len(predunique) != len(count): raise ValueError("predunique, index, and count must have same length") mask = count > self.thr_count filtered_pred = predunique[mask] filtered_index = index[mask] filtered_count = count[mask] return filtered_pred, filtered_index, filtered_count
[docs] def amplitude_filter(self, filpred: np.ndarray, filindex: np.ndarray, filcount: np.ndarray): """Filter level transitions based on amplitude threshold. Identifies transitions between consecutive levels that exceed the amplitude threshold (thr_amp). Parameters: filpred: Filtered unique predicted values. filindex: Filtered indices. filcount: Filtered counts (unused but kept for consistency). Returns: Tuple containing: - ampnew: List of amplitudes between consecutive levels. - valini: List of initial level values. - valfin: List of final level values. - indexini: List of initial level indices. - indexfin: List of final level indices. """ if len(filpred) < 2: return [], [], [], [], [] ampnew = [] valini = [] valfin = [] indexini = [] indexfin = [] for i in range(len(filpred) - 1): amp = filpred[i + 1] - filpred[i] if abs(amp) > self.thr_amp: ampnew.append(amp) valini.append(filpred[i]) valfin.append(filpred[i + 1]) indexini.append(filindex[i]) indexfin.append(filindex[i + 1]) return ampnew, valini, valfin, indexini, indexfin
[docs] def calculate_start_end(self, todarray, valini, valfin, indexfin): start = np.zeros(len(valini), dtype=int) end = np.zeros(len(valini), dtype=int) for i in range(len(valini)): index1 = np.where( (todarray < valini[i] + self.tol) & (todarray > valini[i] - self.tol) )[0] index2 = np.where( (todarray < valfin[i] + self.tol) & (todarray > valfin[i] - self.tol) )[0] if len(index1) == 0 or len(index2) == 0: tol2 = 50 * self.tol index1 = np.where((todarray < valini[i] + tol2) & (todarray > valini[i] - tol2))[0] index2 = np.where((todarray < valfin[i] + tol2) & (todarray > valfin[i] - tol2))[0] end[i] = index2[np.where(index2 > indexfin[i])[0]][0] start[i] = np.max(index1[index1 < end[i]]) if len(valini) > 1: if end[i - 1] == start[i]: start[i] += 1 return start, end
[docs] def calculate_start_end2( self, todarray: np.ndarray, valini: list, valfin: list, indexini: list, indexfin: list ): """Calculate start and end indices for level transitions. Determines the start and end positions of transitions between levels by finding where the signal matches the initial and final level values within the tolerance range. Parameters: todarray: 1D array containing the time-ordered data. valini: List of initial level values. valfin: List of final level values. indexini: List of initial level indices. indexfin: List of final level indices. Returns: Tuple containing: - start: Array of start indices for each transition. - end: Array of end indices for each transition. """ if todarray.ndim != 1: raise ValueError(f"todarray must be 1D, got shape {todarray.shape}") if ( len(valini) != len(valfin) or len(valini) != len(indexini) or len(valini) != len(indexfin) ): raise ValueError("valini, valfin, indexini, and indexfin must have same length") if len(valini) == 0: return np.array([], dtype=int), np.array([], dtype=int) start = np.zeros(len(valini), dtype=int) end = np.zeros(len(valini), dtype=int) for i in range(len(valini)): index1 = np.where( (todarray < valini[i] + self.tol) & (todarray > valini[i] - self.tol) )[0] index2 = np.where( (todarray < valfin[i] + self.tol) & (todarray > valfin[i] - self.tol) )[0] if len(index1) == 0 or len(index2) == 0: tol2 = self.TOL_MULTIPLIER * self.tol index1 = np.where((todarray < valini[i] + tol2) & (todarray > valini[i] - tol2))[0] index2 = np.where((todarray < valfin[i] + tol2) & (todarray > valfin[i] - tol2))[0] # Find end index after the final level index candidates_end = index2[index2 > indexfin[i]] if len(candidates_end) > 0: end[i] = candidates_end[0] else: end[i] = min(indexfin[i] + 1, len(todarray)) # Find start index between initial and end indices candidates_start = index1[(index1 < end[i]) & (index1 > indexini[i])] if len(candidates_start) > 0: start[i] = np.max(candidates_start) else: start[i] = min(indexini[i] + 1, len(todarray)) # Avoid overlap with previous transition if len(valini) > 1 and i > 0: if end[i - 1] == start[i]: start[i] += 1 return start, end
[docs] def change_values( self, xc: Union[np.ndarray, list], xcf: Union[np.ndarray, list], max_gap: int = 10 ): """Merge consecutive transitions that are close together. Groups transitions that are within max_gap samples of each other into a single transition region. Parameters: xc: Array or list of start indices. xcf: Array or list of end indices. max_gap: Maximum gap between transitions to consider them consecutive (default: 10). Returns: Tuple containing: - xc2: List of merged start indices. - xcf2: List of merged end indices. """ xc = np.array(xc) xcf = np.array(xcf) if len(xc) != len(xcf): raise ValueError("xc and xcf must have same length") if len(xc) == 0: return [], [] order = np.argsort(xc) xc = xc[order] xcf = xcf[order] xc2 = [] xcf2 = [] i = 0 while i < len(xc): xini = xc[i] xfin = xcf[i] j = i + 1 # Group while transitions are within the margin while j < len(xc) and xc[j] - xfin <= max_gap: xfin = xcf[j] j += 1 xc2.append(xini) xcf2.append(xfin) i = j return xc2, xcf2
[docs] def calculate_levels(self, tt: np.ndarray, todarray: np.ndarray, nc: int, consec: bool = True): """Calculate level transitions using decision tree analysis. This is the main method that performs the complete level calculation pipeline: 1. Fits a decision tree model to identify discrete levels 2. Finds unique levels and filters by count 3. Identifies transitions between levels based on amplitude 4. Calculates start and end indices for each transition 5. Optionally merges consecutive transitions Parameters: tt: 1D array of time values. todarray: 1D array containing the time-ordered data. nc: Number of flux jumps (used to determine tree depth if depth=True). consec: If True, merge consecutive transitions that are close together (default: True). Returns: Tuple containing: - xc_unique: List of start indices for level transitions. - xcf_unique: List of end indices for level transitions. """ if tt.ndim != 1 or todarray.ndim != 1: raise ValueError("tt and todarray must be 1D arrays") if len(tt) != len(todarray): raise ValueError( f"tt and todarray must have same length, got {len(tt)} and {len(todarray)}" ) if nc < 0: raise ValueError("nc must be non-negative") # Determine tree depth if not self.depth: num = self.depth_number else: num = nc # Fit decision tree model ypred = self.define_model(tt, todarray, num) # Find unique levels and filter ypred_unique, index, count = self.uniqueindex(ypred) ypred_unique, index, count = self.count_filter(ypred_unique, index, count) # Find level transitions based on amplitude amplitude, valini, valfin, indexini, indexfin = self.amplitude_filter( ypred_unique, index, count ) # Calculate start and end indices for transitions xc, xcf = self.calculate_start_end2(todarray, valini, valfin, indexini, indexfin) # Optionally merge consecutive transitions if consec: xc_unique, xcf_unique = self.change_values(xc, xcf) else: xc_unique, xcf_unique = xc.tolist(), xcf.tolist() return xc_unique, xcf_unique
# ================================================================================ # Metrics functions # ================================================================================
[docs] def compute_residual_jumps_with_z(signal, jump_starts, jump_ends, window=100, robust=False): """ Compute residual jump amplitudes and z-scores. Parameters ---------- signal : array-like Corrected TOD. jump_starts : array-like Start indices of jumps. jump_ends : array-like End indices of jumps. window : int Samples used before and after jump. robust : bool If True, use median and MAD for robustness. Returns ------- residuals : np.ndarray z_scores : np.ndarray rms_residual : float """ signal = np.asarray(signal) residuals = [] z_scores = [] sigma = [] for start, end in zip(jump_starts, jump_ends): if start - window < 0: continue if end + window >= len(signal): continue before = signal[start - window : start] after = signal[end : end + window] if robust: mu_before = np.median(before) mu_after = np.median(after) # MAD → robust std estimate sigma_before = 1.4826 * np.median(np.abs(before - mu_before)) sigma_after = 1.4826 * np.median(np.abs(after - mu_after)) else: mu_before = np.mean(before) mu_after = np.mean(after) sigma_before = np.std(before, ddof=1) sigma_after = np.std(after, ddof=1) R = mu_after - mu_before # standard error of difference of means N = window # sigma_R = np.sqrt(sigma_before**2 / N + sigma_after**2 / N) sigma_local = np.sqrt((sigma_before**2 + sigma_after**2) / 2) if sigma_local > 0: Z = R / sigma_local else: Z = np.nan residuals.append(R) z_scores.append(Z) sigma.append(sigma_local) residuals = np.array(residuals) z_scores = np.array(z_scores) sigma = np.array(sigma) if len(residuals) > 0: rms_residual = np.sqrt(np.mean(residuals**2)) else: rms_residual = np.nan return residuals, z_scores, sigma
[docs] def compute_residual_metrics_from_results( results, todarray, window=100, robust=False, use_dt=True, use_corrected=True ): """ Compute residual jump metrics for all TES using the results dictionary. Parameters ---------- results : dict Results dictionary produced by the flux jump analysis. Expected keys include: - 'TES_yes', 'TES_yes_dt' - 'jump_data', 'dt_jump_data' - 'corrected_data' (dict of corrected TOD per TES index) todarray : np.ndarray Original TOD array of shape (n_TES, n_samples). window : int Samples used before and after each jump. robust : bool If True, use median and MAD for robustness. use_dt : bool If True, use DT-refined jumps ('dt_jump_data' and 'TES_yes_dt'). If False, use Haar jumps ('jump_data' and 'TES_yes'). use_corrected : bool If True, use corrected TOD from results['corrected_data'] when available; otherwise fall back to the original TOD in todarray. Returns ------- metrics_per_tes : dict Dictionary keyed by TES index: {idx: {'residuals': np.ndarray, 'z_scores': np.ndarray, 'sigma': np.ndarray}} global_rms_residual : float RMS of all residuals concatenated over TES (np.nan if none). """ # Select which jump information to use if use_dt and "TES_yes_dt" in results and "dt_jump_data" in results and "corrected_data_dt": tes_list = results.get("TES_yes_dt", []) jump_dict = results.get("dt_jump_data", {}) corrected_data = results.get("corrected_data_dt", {}) start_key = "xcdt" end_key = "xcfdt" else: tes_list = results.get("TES_yes", []) jump_dict = results.get("jump_data", {}) corrected_data = results.get("corrected_data_nodt", {}) start_key = "xc" end_key = "xcf" metrics_per_tes = {} all_residuals = [] for idx in tes_list: # Choose signal: corrected TOD if available and requested, else original if use_corrected and idx in corrected_data: signal = np.asarray(corrected_data[idx]) else: signal = np.asarray(todarray[idx]) if idx not in jump_dict: continue jump_info = jump_dict[idx] starts = np.asarray(jump_info.get(start_key, []), dtype=int) ends = np.asarray(jump_info.get(end_key, []), dtype=int) if len(starts) == 0 or len(ends) == 0: continue residuals, z_scores, sigma = compute_residual_jumps_with_z( signal, starts, ends, window=window, robust=robust, ) metrics_per_tes[idx] = { "residuals": residuals, "z_scores": z_scores, "sigma": sigma, } if len(residuals) > 0: all_residuals.append(residuals) if all_residuals: all_residuals = np.concatenate(all_residuals) global_rms_residual = float(np.sqrt(np.mean(all_residuals**2))) else: global_rms_residual = np.nan return metrics_per_tes, global_rms_residual
# ================================================================================ # Plotting functions # ================================================================================
[docs] def plot_no_jumps(tt, todarray, results): # ============================================================================ # plot TES without jumps # ============================================================================ """ Parameters: ----------- results : dict Results dictionary from analyzed dataset todarray : np.ndarray Original time-ordered data array tt : np.ndarray Time axis """ TES_no = results["TES_no"] if len(TES_no) > 0: n_plot = len(TES_no) n_cols = 5 n_rows = int(np.ceil(n_plot / n_cols)) fig, ax = plt.subplots(n_rows, n_cols, figsize=(20, 4 * n_rows)) if n_rows == 1: ax = ax.reshape(1, -1) ax = ax.flatten() for i, idx in enumerate(TES_no[:n_plot]): ax[i].plot(tt / 60, todarray[idx], "b-", linewidth=0.5) ax[i].set_title(f"TES {idx + 1} (No jumps)", fontsize=10) ax[i].set_xlabel("Time (min)") ax[i].set_ylabel("Flux") ax[i].grid(True, alpha=0.3) # Hide unused subplots for i in range(n_plot, len(ax)): ax[i].axis("off") plt.suptitle( f"TES without Flux Jumps (showing {n_plot} of {len(TES_no)})", fontsize=14, fontweight="bold", ) plt.tight_layout() plt.show() return 0
[docs] def plot_jump_detections(tt, todarray, results, DT=True): # ============================================================================ # Plot TES with jumps # ============================================================================ """ Parameters: ----------- results : dict Results dictionary from analyzed dataset todarray : np.ndarray Original time-ordered data array tt : np.ndarray Time axis """ TES_yes = results["TES_yes"] if DT: jump_data = results["dt_jump_data"] else: jump_data = results["jump_data"] if len(TES_yes) > 0: n_plot = len(TES_yes) n_cols = 4 n_rows = int(np.ceil(n_plot / n_cols)) fig, ax = plt.subplots(n_rows, n_cols, figsize=(16, 4 * n_rows)) if n_rows == 1: ax = ax.reshape(1, -1) ax = ax.flatten() plot_idx = 0 for idx in TES_yes[:n_plot]: if idx in jump_data: # Original data with jump markers ax[plot_idx].plot( tt / 60, todarray[idx], "b-", linewidth=0.5, label="Original", alpha=0.7 ) if DT: xc = jump_data[idx]["xcdt"] xcf = jump_data[idx]["xcfdt"] nc = jump_data[idx]["ncdt"] else: xc = jump_data[idx]["xc"] xcf = jump_data[idx]["xcf"] nc = jump_data[idx]["nc"] if len(xc) > 0: ax[plot_idx].scatter( tt[xc] / 60, todarray[idx][xc], color="red", marker="o", s=30, label="Jump start", zorder=5, ) ax[plot_idx].scatter( tt[xcf] / 60, todarray[idx][xcf], color="green", marker="s", s=30, label="Jump end", zorder=5, ) ax[plot_idx].set_title(f"TES {idx + 1} ({nc} jumps)", fontsize=10) ax[plot_idx].set_xlabel("Time (min)") ax[plot_idx].set_ylabel("Flux") ax[plot_idx].legend(fontsize=8) ax[plot_idx].grid(True, alpha=0.3) plot_idx += 1 # Hide unused subplots for i in range(plot_idx, len(ax)): ax[i].axis("off") plt.suptitle( f"TES with Flux Jumps (showing {n_plot} of {len(TES_yes)})", fontsize=14, fontweight="bold", ) plt.tight_layout() plt.show() return 0
[docs] def plot_corrections(tt, todarray, results, DT=True): # ============================================================================ # Plot TES with jumps corrected # ============================================================================ """ Parameters: ----------- results : dict Results dictionary from analyzed dataset todarray : np.ndarray Original time-ordered data array tt : np.ndarray Time axis """ TES_yes = results["TES_yes"] if DT: corrected_data = results["corrected_data_dt"] else: corrected_data = results["corrected_data_nodt"] if len(TES_yes) > 0: n_plot = len(TES_yes) n_cols = 4 n_rows = int(np.ceil(n_plot / n_cols)) fig, ax = plt.subplots(n_rows, n_cols, figsize=(16, 4 * n_rows)) if n_rows == 1: ax = ax.reshape(1, -1) ax = ax.flatten() plot_idx = 0 for idx in TES_yes[:n_plot]: # Corrected data if idx in corrected_data: ax[plot_idx].plot( tt / 60, corrected_data[idx], "r-", linewidth=0.8, label="Corrected", alpha=0.8 ) ax[plot_idx].set_title(f"TES {idx + 1}", fontsize=10) ax[plot_idx].set_xlabel("Time (min)") ax[plot_idx].set_ylabel("Flux") ax[plot_idx].legend(fontsize=8) ax[plot_idx].grid(True, alpha=0.3) plot_idx += 1 # Hide unused subplots for i in range(plot_idx, len(ax)): ax[i].axis("off") plt.suptitle( f"TES with Flux Jumps Corrected (showing {n_plot} of {len(TES_yes)})", fontsize=14, fontweight="bold", ) plt.tight_layout() plt.show() return 0
# ================================================================================ # saving functions # ================================================================================
[docs] def save_results( results, output_dir="./results_15_26_08", save_format="pickle", dataset_name="15.26.08" ): """ Save analysis results to disk in pickle format. Parameters: ----------- results : dict Results dictionary from analyzed dataset todarray : np.ndarray Original time-ordered data array tt : np.ndarray Time axis output_dir : str Directory where to save results (default: "./results_15_26_08") save_format : str Format to save: pickle" dataset_name : str Name of the dataset for file naming (default: "15.26.08") Returns: -------- saved_files : list List of paths to saved files """ # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) base_name = f"{dataset_name}" saved_files = [] # ============================================================================ # Save format: Pickle format (.pkl) # ============================================================================ if save_format in ["pickle"]: pkl_file = os.path.join(output_dir, f"{base_name}_results.pkl") # Prepare complete results dictionary save_data = {"results": results, "dataset_name": dataset_name} with open(pkl_file, "wb") as f: pickle.dump(save_data, f, protocol=pickle.HIGHEST_PROTOCOL) saved_files.append(pkl_file) print(f"Saved pickle file to: {pkl_file}") print(f"\n{'=' * 70}") print(f"All results saved to: {output_dir}") print(f"Total files saved: {len(saved_files)}") print(f"{'=' * 70}\n") return saved_files
[docs] def load_results(load_file, load_format="pickle"): """ Load previously saved analysis results. Parameters: ----------- load_file : str Path to the saved file load_format : str Format type: "pickle" (default: "pickle") Returns: -------- loaded_data : dict Loaded results dictionary """ if load_format == "pickle": with open(load_file, "rb") as f: loaded_data = pickle.load(f) return loaded_data else: raise ValueError(f"Unsupported load format: {load_format}")