# -*- coding: utf-8 -*-
import math
import os
import sys
import time
from glob import glob
import iminuit
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
import numpy as np
import scipy.signal as scsig
import scipy.stats
from astropy.io import fits
from iminuit.util import describe, make_func_code
from pysimulators import FitsArray
from qubicpack import qubicfp
from qubicpack.pix2tes import assign_pix2tes, assign_pix_grid, pix2tes
from scipy.ndimage.filters import correlate1d
from qubic.lib.Qutilities import progress_bar
pix_grid = assign_pix_grid()
TES2PIX = assign_pix2tes()
[docs]
def printnow(truc):
print(truc)
sys.stdout.flush()
[docs]
def statstr(x, divide=False, median=False, cut=None):
if median:
m = np.median(x[np.isfinite(x)])
s = np.std(x[np.isfinite(x)])
elif cut is not None:
m, s = meancut(x[np.isfinite(x)], cut, disp=False)
else:
m = np.mean(x[np.isfinite(x)])
s = np.std(x[np.isfinite(x)])
if divide:
nn = len(x[np.isfinite(x)])
s /= nn
return "{0:6.5f} +/- {1:6.5f}".format(m, s)
[docs]
def qgrid():
for i in range(17):
plt.axvline(x=i - 0.5, alpha=0.3, color="k")
plt.axhline(y=i - 0.5, alpha=0.3, color="k")
[docs]
def image_asics(data1=None, data2=None, all1=None):
"""
Return an image of detectors on the focal plane.
Each asic has 124 TES and 4 thermometers.
Parameters
----------
data1 :
data2 :
all1 : array
signal from 2 asics (256 detectors)
Returns
-------
"""
if all1 is not None:
nn = int(len(all1) / 2)
data2 = all1[nn:]
data1 = all1[0:nn]
if data1 is not None:
pix_grid[1, 16] = 1005
pix_grid[2, 16] = 1006
pix_grid[3, 16] = 1007
pix_grid[4, 16] = 1008
if data2 is not None:
pix_grid[0, 15] = 1004
pix_grid[0, 14] = 1003
pix_grid[0, 13] = 1002
pix_grid[0, 12] = 1001
nrows = 17
ncols = 17
img = np.zeros((nrows, ncols)) + np.nan
for row in range(nrows):
for col in range(ncols):
physpix = pix_grid[row, col]
TES, asic = pix2tes(physpix)
if data1 is not None and asic == 1:
img[row, col] = data1[TES - 1]
if data2 is not None and asic == 2:
img[row, col] = data2[TES - 1]
return img
"""
################################## Fitting ###################################
##############################################################################
"""
[docs]
def thepolynomial(x, pars, extra_args=None):
"""
Generic polynomial function
"""
f = np.poly1d(pars)
return f(x)
[docs]
class MyChi2(object):
"""
Class defining the minimizer and the data
"""
def __init__(self, xin, yin, covarin, functname, extra_args=None, invcovar=None, diag=False):
self.x = xin
self.y = yin
self.covar = covarin
if invcovar is None:
self.invcov = np.linalg.inv(covarin)
else:
self.invcov = invcovar
self.functname = functname
self.extra_args = extra_args
self.diag = diag
def __call__(self, *pars, extra_args=None):
val = self.functname(self.x, pars, extra_args=self.extra_args)
if not self.diag:
chi2 = np.dot(np.dot(self.y - val, self.invcov), self.y - val)
else:
chi2 = np.sum((self.y - val) ** 2 / np.diag(self.covar))
return chi2
[docs]
class Chi2Minimizer(object):
"""
Parent classes with the model to minimize
"""
def __init__(self, functname, xin, yin, covarin, extra_args=None, invcovar=None):
self.x = xin
self.y = yin
self.covar = covarin
if invcovar is None:
self.invcov = np.linalg.inv(covarin)
else:
self.invcov = invcovar
self.functname = functname
self.extra_args = extra_args
def __call__(self, *pars, extra_args=None):
val = self.functname(self.x, pars, extra_args=self.extra_args)
chi2 = np.dot(np.dot(self.y - val, self.invcov), self.y - val)
return chi2
[docs]
class Chi2Implement(Chi2Minimizer):
def __init__(self, functname, x, y, covarin, extra_args=None):
super().__init__(functname, x, y, covarin, extra_args=extra_args)
self.func_code = make_func_code(describe(functname)[1:-2])
[docs]
class MyChi2_nocov:
"""
Class defining the minimizer and the data
"""
def __init__(self, xin, yin, functname):
self.x = xin
self.y = yin
self.functname = functname
def __call__(self, *pars):
val = self.functname(self.x, pars)
chi2 = np.sum((self.y - val) ** 2)
# chi2 = np.dot(self.y - val, self.y - val)
return chi2
# ## Call Minuit
[docs]
def do_minuit(
x,
y,
covarin,
guess,
functname=thepolynomial,
fixpars=None,
chi2=None,
rangepars=None,
nohesse=False,
force_chi2_ndf=False,
verbose=True,
minos=True,
extra_args=None,
print_level=0,
force_diag=False,
nsplit=1,
ncallmax=10000,
precision=None,
):
# check if covariance or error bars were given
covar = covarin.copy()
invcovar = None
diag = False
if np.size(np.shape(covarin)) == 1:
diag = True
if force_diag:
covar = covarin.copy()
else:
err = covarin
covar = np.zeros((np.size(err), np.size(err)))
covar[np.arange(np.size(err)), np.arange(np.size(err))] = err**2
invcovar = np.zeros((np.size(err), np.size(err)))
invcovar[np.arange(np.size(err)), np.arange(np.size(err))] = 1.0 / err**2
# instantiate minimizer
if chi2 is None:
chi2 = MyChi2(x, y, covar, functname, extra_args=extra_args, invcovar=invcovar, diag=diag)
else:
chi2 = Chi2Implement(functname, x, y, covar, extra_args=extra_args)
# nohesse=False
# elif chi2.__name__ is 'MyChi2_nocov':
# chi2 = chi2(x, y, covar, functname)
# variables
ndim = np.size(guess)
parnames = []
for i in range(ndim):
parnames.append("c" + str(i))
# initial guess
theguess = dict(zip(parnames, guess))
# fixed parameters
dfix = {}
if fixpars is not None:
for i in range(len(parnames)):
dfix["fix_" + parnames[i]] = fixpars[i]
else:
for i in range(len(parnames)):
dfix["fix_" + parnames[i]] = False
# range for parameters
drng = {}
dstep = {}
if rangepars is not None:
step_norm = 100
for i in range(len(parnames)):
drng["limit_" + parnames[i]] = rangepars[i]
dstep["error_" + parnames[i]] = (rangepars[i][1] - rangepars[i][0]) / step_norm
else:
for i in range(len(parnames)):
drng["limit_" + parnames[i]] = False
dstep["error_" + parnames[i]] = False
# Run Minuit
if verbose:
print("Fitting with Minuit")
ver = sys.version_info
if ver.major < 3:
theargs = dict(theguess.items() + dfix.items() + dstep.items())
if rangepars is not None:
theargs.update(dict(theguess.items() + drng.items()))
else:
for k in dfix.keys():
theguess[k] = dfix[k]
theargs = theguess
if rangepars is not None:
for k in drng.keys():
theguess[k] = drng[k]
theargs.update(theguess)
if isinstance(chi2, MyChi2):
m = iminuit.Minuit(chi2, name=parnames, errordef=0.1, print_level=print_level, **theargs)
m.migrad(ncall=ncallmax * nsplit, nsplit=nsplit, precision=precision)
elif isinstance(chi2, Chi2Implement):
# if verbose:
# print("Minimizer object: ", chi2.__dict__)
# print("Guess: ", *guess)
# print("ncallmax, nsplit, precision: ", ncallmax, nsplit, precision)
# if iminuit.version==2.2
# m = iminuit.Minuit(chi2, *guess)
# m.migrad(ncall = ncallmax * nsplit)
# if iminuit.version==1.3
m = iminuit.Minuit(chi2, name=parnames, errordef=0.1, print_level=print_level, **theargs)
m.migrad(ncall=ncallmax * nsplit, nsplit=nsplit, precision=precision)
# print('Migrad Done')
if minos:
try:
m.minos()
if verbose:
print("Minos Done")
except Exception:
if verbose:
print("Minos Failed !")
if not nohesse:
try:
m.hesse()
if verbose:
print("Hesse Done")
except Exception:
if verbose:
print("Hesse failed !")
# build np.array output
parfit = []
for i in parnames:
parfit.append(m.values[i])
errfit = []
for i in parnames:
errfit.append(m.errors[i])
if fixpars is not None:
parnamesfit = []
for i in range(len(parnames)):
if fixpars[i] is False:
parnamesfit.append(parnames[i])
if fixpars[i]:
errfit[i] = 0
else:
parnamesfit = parnames
ndimfit = len(parnamesfit) # int(np.sqrt(len(m.errors)))
covariance = np.zeros((ndimfit, ndimfit))
if m.covariance:
for i in range(ndimfit):
for j in range(ndimfit):
covariance[i, j] = m.covariance[(parnamesfit[i], parnamesfit[j])]
if isinstance(chi2, MyChi2):
chisq = chi2(*parfit)
elif isinstance(chi2, Chi2Implement):
ChiEvaluate = Chi2Minimizer(functname, x, y, covar)
chisq = ChiEvaluate(*parfit)
ndf = np.size(x) - ndim
if force_chi2_ndf:
correct = chisq / ndf
if verbose:
print("correcting errorbars to have chi2/ndf=1 - correction = {}".format(chisq))
else:
correct = 1.0
if verbose:
print(np.array(parfit))
print(np.array(errfit) * np.sqrt(correct))
print("Chi2=", chisq)
print("ndf=", ndf)
return m, np.array(parfit), np.array(errfit) * np.sqrt(correct), np.array(covariance) * correct, chi2(*parfit), ndf, chi2
###############################################################################
# ##############################################################################
[docs]
def profile(xin, yin, rng=None, nbins=10, fmt=None, plot=True, dispersion=True, log=False, median=False, cutbad=True, rebin_as_well=None, clip=None, mode=False):
""" """
ok = np.isfinite(xin) * np.isfinite(yin)
x = xin[ok]
y = yin[ok]
if rng is None:
mini = np.min(x)
maxi = np.max(x)
else:
mini = rng[0]
maxi = rng[1]
if log is False:
xx = np.linspace(mini, maxi, nbins + 1)
else:
xx = np.logspace(np.log10(mini), np.log10(maxi), nbins + 1)
xmin = xx[0:nbins]
xmax = xx[1:]
yval = np.zeros(nbins)
xc = np.zeros(nbins)
dy = np.zeros(nbins)
dx = np.zeros(nbins)
nn = np.zeros(nbins)
if rebin_as_well is not None:
nother = len(rebin_as_well)
others = np.zeros((nbins, nother))
else:
others = None
for i in np.arange(nbins):
ok = (x > xmin[i]) & (x < xmax[i])
if ok.sum() > 0:
newy = y[ok]
if clip is not None:
for k in np.arange(3):
newy, mini, maxi = scipy.stats.sigmaclip(newy, low=clip, high=clip)
nn[i] = len(newy)
if median:
yval[i] = np.median(y[ok])
elif mode:
mm, ss = meancut(y[ok], 3)
hh = np.histogram(y[ok], bins=int(np.min([len(y[ok]) / 30, 100])), range=[mm - 5 * ss, mm + 5 * ss])
idmax = np.argmax(hh[0])
yval[i] = 0.5 * (hh[1][idmax + 1] + hh[1][idmax])
else:
yval[i] = np.mean(y[ok])
xc[i] = (xmax[i] + xmin[i]) / 2
if rebin_as_well is not None:
for o in range(nother):
others[i, o] = np.mean(rebin_as_well[o][ok])
if dispersion:
fact = 1
else:
fact = np.sqrt(len(y[ok]))
dy[i] = np.std(y[ok]) / fact
dx[i] = np.std(x[ok]) / fact
if plot:
if fmt is None:
fmt = "ro"
plt.errorbar(xc, yval, xerr=dx, yerr=dy, fmt=fmt)
ok = (nn != 0) & (dy != 0)
if cutbad:
if others is None:
return xc[ok], yval[ok], dx[ok], dy[ok], others
else:
return xc[ok], yval[ok], dx[ok], dy[ok], others[ok, :]
else:
yval[~ok] = 0
dy[~ok] = 0
return xc, yval, dx, dy, others
[docs]
def exponential_filter1d(input, sigma, axis=-1, output=None, mode="reflect", cval=0.0, truncate=10.0, power=1):
"""
One-dimensional Exponential filter.
Parameters
----------
input
sigma : scalar
Tau of exponential kernel
axis : int, optional
The axis of input along which to calculate.
Default is -1.
output : array or dtype, optional
The array in which to place the output, or the dtype of the returned array.
By default an array of the same dtype as input will be created.
mode : {‘reflect’, ‘constant’, ‘nearest’, ‘mirror’, ‘wrap’}, optional
The mode parameter determines how the input array is extended beyond its boundaries.
Default is ‘reflect’.
cval : scalar, optional
Value to fill past edges of input if mode is ‘constant’. Default is 0.0.
truncate : float
Truncate the filter at this many standard deviations.
Default is 4.0.
"""
sd = float(sigma)
# make the radius of the filter equal to truncate standard deviations
lw = int(truncate * sd + 0.5)
weights = [0.0] * (2 * lw + 1)
weights[lw] = 1.0
sum = 1.0
# calculate the kernel:
for ii in range(1, lw + 1):
tmp = math.exp(-((float(ii) / sd) ** power))
weights[lw + ii] = tmp * 0
weights[lw - ii] = tmp
sum += tmp
for ii in range(2 * lw + 1):
weights[ii] /= sum
return correlate1d(input, weights, axis, output, mode, cval, 0)
[docs]
def qs2array(dataset, FREQ_SAMPLING=None, timerange=None, asic=None):
"""
Loads qubic instance to create 'dd' which is TOD for each TES
Also normalises raw data
Also returns 'time' which is linear time array
Can also specify a timerange
Parameters
----------
dataset : directory containing the QubicStudio dataset
FREQ_SAMPLING : this argument is kept for backwards compatibility
frequency sampling is given by the qubicpack qubicfp object
timerange : array
Time range, low and high boundaries
Returns : time array, data array, qubicfp object
-------
"""
if os.path.isfile(dataset):
fitspath = dataset.copy()
scidir = os.path.dirname(fitspath)
dataset = os.path.dirname(scidir)
if not os.path.isdir(dataset):
print("ERROR! Dataset not found: %s" % dataset)
return None
if asic is None:
print("Please give an asic number.")
return None
a = qubicfp()
a.read_qubicstudio_dataset(dataset)
dd = a.timeline_array(asic=asic)
dd = a.ADU2I(dd)
time = a.timeaxis(asic=asic, datatype="sci")
if timerange is not None:
print("Selecting time range: {} to {} sec".format(timerange[0], timerange[1]))
oktime = (time >= timerange[0]) & (time <= timerange[1])
time = time[oktime]
dd = dd[:, oktime]
return time, dd, a
[docs]
def read_hkintern(basedir, thefieldname=None):
"""
Parameters
----------
basedir : str
directory of the file
thefieldname : str
Returns
-------
newdate : array
New time array of te measurement
hk : array
Angle position given by the encoder (number of encoder steps).
"""
hkinternfile = glob(basedir + "/Hks/hk-intern*")
hk = fits.open(hkinternfile[0])
nfields = hk[1].header["TFIELDS"]
fields = {}
for idx in range(nfields):
fieldno = idx + 1
ttype = "TTYPE%i" % fieldno
fieldname = hk[1].header[ttype]
fields[fieldname] = fieldno
if thefieldname is None:
print("List of available fields:")
print("-------------------------")
for idx in range(nfields):
print(fields.keys()[idx])
return None
else:
gpsdate = hk[1].data.field(fields["GPSDate"] - 1) # in ms
pps = hk[1].data.field(fields["Platform-PPS"] - 1) # 0 and 1
pps[0] = 1
gpsdate[0] -= 1000
ppson = pps == 1
indices = np.arange(len(gpsdate))
newdate = np.interp(indices, indices[ppson], gpsdate[ppson] + 1000) * 1e-3
# read the azimuth position
fieldno = fields[thefieldname]
hk = hk[1].data.field(fieldno - 1)
return newdate, hk
[docs]
def butter_bandpass(lowcut, highcut, fs, order=5):
"""
Parameters
----------
lowcut : scalar
highcut : scalar
fs :
order : int
order of the filter
Returns
-------
b, a : (ndarray, ndarray)
Numerator (b) and denominator (a) polynomials of the IIR filter.
"""
nyq = 0.5 * fs
low = lowcut / nyq
high = highcut / nyq
b, a = scsig.butter(order, [low, high], btype="band", output="ba")
return b, a
[docs]
def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
"""
Parameters
----------
data : array like
lowcut
highcut
fs
order
Returns
-------
y : array
The output of the digital filter.
"""
b, a = butter_bandpass(lowcut, highcut, fs, order=order)
y = scsig.lfilter(b, a, data)
return y
[docs]
def notch_array(freqs, bw):
"""
Returns array to be used with notch_filter
Parameters
----------
freqs : list
Frequencies to filter
bw : scalar
The filter bandwidth
Returns
-------
"""
notch = []
for idx, freq in enumerate(freqs):
notch.append([freq, bw * (1 + idx)])
return np.array(notch)
[docs]
def notch_filter(data, f0, bw, fs):
"""
Parameters
----------
data
f0
bw
fs
Returns
-------
y : array
Output of the notch filter
"""
Q = f0 / bw
b, a = scsig.iirnotch(f0, Q, fs)
y = scsig.lfilter(b, a, data)
return y
[docs]
def meancut(data, nsig, med=False, disp=True):
"""
Parameters
----------
data: array like
nsig: float
Lower and upper bound factor of sigma clipping.
med: bool
If True, perform the median and not the mean.
disp: bool
If True, return the dispersion (STD),
if False, return the error on the mean (STD/sqrt(N))
Returns
-------
The mean/median and the dispersion/error.
"""
dd = data.copy()
for i in range(10):
dd, mini, maxi = scipy.stats.sigmaclip(dd, low=nsig, high=nsig)
if disp:
sc = 1
else:
sc = np.sqrt(len(dd))
if med:
return np.median(dd), np.std(dd) / sc
else:
return np.mean(dd), np.std(dd) / sc
[docs]
def weighted_mean(x, dx, dispersion=True, renorm=False):
"""
Calculated the weighted mean of data, errors
If dispersion is True (default) the error on the mean comes from the RMS of the data, otherwise
the error on the weighted mean is analytically calculated from input errors
"""
w = 1.0 / dx**2
sumw = np.sum(w)
mm = np.sum(w * x) / sumw
if dispersion:
ss = np.std(x)
else:
ss = 1.0 / np.sqrt(sumw)
if renorm:
### We scale errors to have a chi2=1
ch2 = np.sum(w * (x - mm) ** 2)
ss *= np.sqrt(ch2)
return mm, ss
[docs]
def simsig(x, pars, extra_args=None):
"""
Parameters
----------
x : list
pars : list
List with 4 parameters: cycle, ctime, initial time, amplitude
Returns
-------
A simulated signal.
"""
dx = x[1] - x[0]
cycle = np.nan_to_num(pars[0])
ctime = np.nan_to_num(pars[1])
t0 = np.nan_to_num(pars[2])
amp = np.nan_to_num(pars[3])
sim_init = np.zeros(len(x))
ok = x < (cycle * (np.max(x)))
sim_init[ok] = 1.0
# Add a phase
sim_init_shift = np.interp((x - t0) % max(x), x, sim_init)
# Convolved by a filter
# thesim = -1 * gaussian_filter1d(sim_init_shift, ctime, mode='wrap')
thesim = -1 * exponential_filter1d(sim_init_shift, ctime / dx, mode="wrap")
# Normalization
thesim = (thesim - np.mean(thesim)) / np.std(thesim) * amp
return np.nan_to_num(thesim)
[docs]
def simsig_nonorm(x, pars, extra_args=None):
"""
Same as simsig but without normalisation.
"""
dx = x[1] - x[0]
cycle = np.nan_to_num(pars[0])
ctime = np.nan_to_num(pars[1])
t0 = np.nan_to_num(pars[2])
amp = np.nan_to_num(pars[3])
sim_init = np.zeros(len(x))
ok = x < (cycle * (np.max(x)))
sim_init[ok] = amp
# Add a phase
sim_init_shift = np.interp((x - t0) % max(x), x, sim_init)
# Convolved by a filter
thesim = -1 * exponential_filter1d(sim_init_shift, ctime / dx, mode="wrap")
# Center the signal
thesim = thesim - np.mean(thesim)
return thesim
[docs]
def simsig_asym(x, pars, extra_args=None):
cycle = np.nan_to_num(pars[0])
ctime_rise = np.nan_to_num(pars[1])
ctime_fall = np.nan_to_num(pars[2])
t0 = np.nan_to_num(pars[3])
amp = np.nan_to_num(pars[4])
offset = np.nan_to_num(pars[5])
sim_init = np.zeros(len(x))
ok = x < (cycle * (np.max(x)))
sim_init[ok] = -1 + np.exp(-x[ok] / ctime_rise)
if ok.sum() > 0:
endval = sim_init[ok][-1]
else:
endval = -1.0
sim_init[~ok] = -np.exp(-(x[~ok] - x[~ok][0]) / ctime_fall) + 1 + endval
thesim = np.interp((x - t0) % max(x), x, sim_init)
thesim = thesim * amp + offset
return np.nan_to_num(thesim)
[docs]
def simsig_fringes(time, stable_time, params):
"""
Simulate a TOD signal obtained during the fringe measurement.
This function was done to make a fit.
Parameters
----------
time : array
Time sampling.
stable_time: float
Stable time [s] on each step.
params : list
ctime (constant time response of the detector), starting time, the 6 amplitudes.
Returns
-------
The simulated signal.
"""
dt = time[1] - time[0]
tf = time[-1]
npoints = len(time)
ctime = params[0]
t0 = params[1]
amp = params[2:]
sim_init = np.zeros(npoints)
for i in range(6):
a = int(npoints / tf * stable_time * i)
b = int((stable_time * i + stable_time) * npoints / tf)
sim_init[a:b] = amp[i]
# Add a phase
sim_init_shift = np.interp((time - t0) % max(time), time, sim_init)
# Convolved by an exponential filter
thesim = exponential_filter1d(sim_init_shift, ctime / dt, mode="wrap")
return np.array(thesim).astype(np.float64)
[docs]
def fold_data(
time, dd, period, nbins, lowcut=None, highcut=None, notch=None, rebin=None, median=False, mode=False, clip=None, return_error=False, return_noise_harmonics=None, silent=False, verbose=None
):
"""
Parameters
----------
time : array
dd : array
Data signal.
period : float
Data will be folded on this period.
lowcut : float
Low cut for the filter.
highcut : float
High cut for the filter.
If both lowcut and highcut are given, a bandpass filter is applied.
If lowcut is given but no highcut, a highpass filter at f_cut = lowcut is applied.
If highcut is given but no lowcut, a lowpass filter at f_cut = highcut is applied.
If none of them is given, no cut frequency filter is applied.
In any case notch filter can still be used.
nbins
notch
return_error
"""
tfold = time % period
sh = np.shape(dd)
ndet = sh[0]
if return_noise_harmonics is not None:
# We estimate the noise in between the harmonics of the signal between harm=1 and
# nharm=return_noise_harmonics
# First we find the corresponding frequencies, below we measure the noise
nharm = return_noise_harmonics
margin = 0.2
fmin = np.zeros(nharm)
fmax = np.zeros(nharm)
fnoise = np.zeros(nharm)
noise = np.zeros((ndet, nharm))
for i in range(nharm):
fmin[i] = 1.0 / period * (i + 1) * (1 + margin / (i + 1))
fmax[i] = 1.0 / period * (i + 2) * (1 - margin / (i + 1))
fnoise[i] = 0.5 * (fmin[i] + fmax[i])
folded = np.zeros((ndet, nbins))
folded_nonorm = np.zeros((ndet, nbins))
dfolded = np.zeros((ndet, nbins))
dfolded_nonorm = np.zeros((ndet, nbins))
newdata = np.zeros((ndet, len(time)))
if not silent:
bar = progress_bar(ndet, "Detectors ")
for THEPIX in range(ndet):
if not silent:
bar.update()
data = dd[THEPIX, :]
newdd = filter_data(time, data, lowcut=lowcut, highcut=highcut, notch=notch, rebin=rebin, verbose=verbose)
newdata[THEPIX, :] = newdd
t, yy, dx, dy, others = profile(tfold, newdd, nbins=nbins, dispersion=False, plot=False, cutbad=False, median=median, mode=mode, clip=clip)
folded[THEPIX, :] = (yy - np.mean(yy)) / np.std(yy)
folded_nonorm[THEPIX, :] = yy - np.mean(yy)
dfolded[THEPIX, :] = dy / np.std(yy)
dfolded_nonorm[THEPIX, :] = dy
if return_noise_harmonics is not None:
spectrum, freq = power_spectrum(time, newdd, rebin=True)
for i in range(nharm):
ok = (freq >= fmin[i]) & (freq < fmax[i])
noise[THEPIX, i] = np.sqrt(np.mean(spectrum[ok]))
if return_error:
if return_noise_harmonics is not None:
return folded, t, folded_nonorm, dfolded, dfolded_nonorm, newdata, fnoise, noise
else:
return folded, t, folded_nonorm, dfolded, dfolded_nonorm, newdata
else:
if return_noise_harmonics is not None:
return folded, t, folded_nonorm, newdata, fnoise, noise
else:
return folded, t, folded_nonorm, newdata
[docs]
def power_spectrum(time_in, data_in, rebin=True):
if rebin:
### Resample the data on a regular grid
time = np.linspace(time_in[0], time_in[-1], len(time_in))
data = np.interp(time, time_in, data_in)
else:
time = time_in
data = data_in
spectrum_f, freq_f = mlab.psd(data, Fs=1.0 / (time[1] - time[0]), NFFT=len(data), window=mlab.window_hanning)
return spectrum_f, freq_f
[docs]
def filter_data(time_in, data_in, lowcut=None, highcut=None, rebin=True, verbose=False, notch=None, order=5):
sh = np.shape(data_in)
if rebin:
if verbose:
printnow("Rebinning before Filtering")
### Resample the data on a regular grid
time = np.linspace(time_in[0], time_in[-1], len(time_in))
if len(sh) == 1:
data = np.interp(time, time_in, data_in)
else:
data = vec_interp(time, time_in, data_in)
else:
if verbose:
printnow("No rebinning before Filtering")
time = time_in
data = data_in
FREQ_SAMPLING = 1.0 / ((np.max(time) - np.min(time)) / len(time))
if lowcut is None and highcut is None:
dataf = data
if verbose:
print("No cut frequency filter applied")
else:
if lowcut is None and highcut is not None:
freqs = highcut
filter_type = "lowpass"
if verbose:
print("Applying lowpass filter with f_cut = {}".format(highcut))
elif lowcut is not None and highcut is None:
freqs = lowcut
filter_type = "highpass"
if verbose:
print("Applying highpass filter with f_cut = {}".format(lowcut))
else:
freqs = [lowcut, highcut]
filter_type = "bandpass"
if verbose:
print("Applying bandpass filter with f_cut = [{}, {}]".format(lowcut, highcut))
filt = scsig.butter(order, freqs, btype=filter_type, output="sos", fs=FREQ_SAMPLING)
if len(sh) == 1:
dataf = scsig.sosfilt(filt, data)
else:
dataf = scsig.sosfilt(filt, data, axis=1)
if notch is not None:
for i in range(len(notch)):
ftocut = notch[i][0]
bw = notch[i][1]
if len(notch[i]) < 3:
nharmonics = 0
else:
nharmonics = notch[i][2].astype(int)
if verbose:
print("Notching {} Hz with width {} and {} harmonics".format(ftocut, bw, nharmonics))
for j in range(nharmonics):
dataf = notch_filter(dataf, ftocut * (j + 1), bw, FREQ_SAMPLING)
return dataf
[docs]
def vec_interp(x, xin, yin):
sh = np.shape(yin)
nvec = sh[0]
yout = np.zeros_like(yin)
for i in range(nvec):
yout[i, :] = np.interp(x, xin, yin[i, :])
return yout
[docs]
def fit_average(t, folded, fff, dc, fib, Vtes, initpars=None, fixpars=[0, 0, 0, 0], doplot=True, functname=simsig, clear=True, name="fib"):
"""
Parameters
----------
t
folded
fff : float
Modulation frequency of the external source.
dc : float
Duty cycle of the modulation.
fib
Vtes
initpars
fixpars
doplot : bool
If true make the plot.
functname
clear : bool
If true, clear the window before plotting.
name
Returns
-------
"""
sh = np.shape(folded)
npix = sh[0]
####### Average folded data
av = np.median(np.nan_to_num(folded), axis=0)
if initpars is None:
# derivatives = np.gradient(av)
# src_on = np.min()
# try to detect the start time
nnn = 100
t0 = np.linspace(0, 1.0 / fff, nnn)
diff2 = np.zeros(nnn)
for i in range(nnn):
diff2[i] = np.sum((av - functname(t, [dc, 0.1, t0[i], 1.0])) ** 2)
ttry = t0[np.argmin(diff2)]
bla = do_minuit(
t,
av,
np.ones(len(t)),
[dc, 0.1, ttry, 1.0],
functname=functname,
rangepars=[[0.0, 1.0], [0.0, 0.2], [0.0, 1.0 / fff], [0.0, 20.0]],
fixpars=[1, 1, 0, 1],
force_chi2_ndf=True,
verbose=True,
nohesse=True,
)
initpars = [dc, 0.1, bla[1][2], 1.0]
# ion()
# clf()
# xlim(0,1./fff)
# plot(t,av,color='b',lw=4,alpha=0.3, label='Median')
# plot(t, functname(t, bla[1]), 'r--',lw=4)
# show()
####### Fit
bla = do_minuit(
t, av, np.ones(len(t)), initpars, functname=functname, rangepars=[[0.0, 1.0], [0.0, 0.5], [0.0, 1.0 / fff], [0.0, 2.0]], fixpars=fixpars, force_chi2_ndf=True, verbose=False, nohesse=True
)
params_av = bla[1]
err_av = bla[2]
if doplot:
plt.ion()
if clear:
plt.clf()
plt.xlim(0, 1.0 / fff)
for i in range(npix):
plt.plot(t, folded[i, :], alpha=0.1, color="k")
plt.plot(t, av, color="b", lw=4, alpha=0.3, label="Median")
plt.plot(
t,
functname(t, bla[1]),
"r--",
lw=4,
label="Fitted average of {8:} pixels \n cycle={0:8.3f}+/-{1:8.3f} \n tau = {2:8.3f}+/-{3:8.3f}s \n t0 = {4:8.3f}+/-{5:8.3f}s \n amp = {6:8.3f}+/-{7:8.3f}".format(
params_av[0], err_av[0], params_av[1], err_av[1], params_av[2], err_av[2], params_av[3], err_av[3], npix
),
)
plt.legend(fontsize=7, frameon=False, loc="lower left")
plt.xlabel("Time(sec)")
plt.ylabel("Stacked")
plt.title("{} {}: Freq_Mod={}Hz - Cycle={}% - Vtes={}V".format(name, fib, fff, dc * 100, Vtes))
plt.show()
time.sleep(0.1)
return av, params_av, err_av
[docs]
def fit_all(t, folded, av, initpars=None, fixpars=[0, 0, 0, 0], stop_each=False, functname=simsig, rangepars=None):
"""
Parameters
----------
t
folded
av
initpars
fixpars
stop_each
functname
rangepars
Returns
-------
"""
npix, nbins = np.shape(folded)
print(" Got {} pixels to fit".format(npix))
##### Now fit each TES fixing cycle to dc and t0 to the one fitted on the median
allparams = np.zeros((npix, 4))
allerr = np.zeros((npix, 4))
allchi2 = np.zeros(npix)
bar = progress_bar(npix, "Detectors ")
ok = np.zeros(npix, dtype=bool)
for i in range(npix):
bar.update()
thedd = folded[i, :]
#### First a fit with no error correction in order to have a chi2 distribution
theres = do_minuit(t, thedd, np.ones(len(t)), initpars, functname=functname, fixpars=fixpars, rangepars=rangepars, force_chi2_ndf=True, verbose=False, nohesse=True)
ndf = theres[5]
params = theres[1]
err = theres[2]
allparams[i, :] = params
allerr[i, :] = err
allchi2[i] = theres[4]
if stop_each:
plt.clf()
plt.plot(t, thedd, color="k")
plt.plot(t, av, color="b", lw=4, alpha=0.2, label="Median")
plt.plot(
t,
functname(t, theres[1]),
"r--",
lw=4,
label="Fitted: \n cycle={0:8.3f}+/-{1:8.3f} \n tau = {2:8.3f}+/-{3:8.3f}s \n t0 = {4:8.3f}+/-{5:8.3f}s \n amp = {6:8.3f}+/-{7:8.3f}".format(
params[0], err[0], params[1], err[1], params[2], err[2], params[3], err[3]
),
)
plt.legend()
plt.show()
msg = "TES #{}".format(i)
if i in [3, 35, 67, 99]:
msg = "Channel #{} - BEWARE THIS IS A THERMOMETER !".format(i)
plt.title(msg)
# Changing so 'i' select prompts plot inversion
bla = input("Press [y] if fit OK, [i] to invert, other key otherwise...")
if bla == "y":
ok[i] = True
# invert to check if TES okay,
# thedd refers to the indexed TES in loop
if bla == "i":
plt.plot(t, thedd * (-1.0), color="olive")
plt.show()
ibla = input("Press [y] if INVERTED fit OK, otherwise anykey")
# and invert thedd in the original datset
if ibla == "y":
ok[i] = True
folded[i, :] = thedd * -1.0
print(ok[i])
return allparams, allerr, allchi2, ndf, ok
[docs]
def run_asic(
fpobj,
idnum,
Vtes,
fff,
dc,
asic,
reselect_ok=False,
lowcut=0.5,
highcut=15.0,
nbins=50,
nointeractive=False,
doplot=True,
notch=None,
lastpassallfree=False,
name="fib",
okfile=None,
initpars=None,
timerange=None,
removesat=False,
stop_each=False,
rangepars=None,
):
"""
Parameters
----------
fpobj : a qubicpack.qubicfp object
idnum : fibre number
Vtes : bias voltage (this is normally accessible in the qubicfp object, but maybe not for old data)
fff : float
Modulation frequency of the external source
dc : float
Duty cycle of the modulation
asic : the asic number (1 or 2)
reselect_ok : bool
If true, you will select the good TES one by one, if False, you will use the file created before.
lowcut
highcut
nbins
nointeractive
doplot
notch
lastpassallfree
name
okfile
initpars
timerange
removesat
stop_each
rangepars
Returns
-------
"""
fib = idnum
### Read data
time = fpobj.timeaxis(datatype="sci", asic=asic)
dd = fpobj.timeline_array(asic=asic) # measured voltage (ADU)
dd = fpobj.ADU2I(dd) # convert ADU to current in muAmps
### Fold the data at the modulation period of the fibers
### Signal is also badpass filtered before folding
folding_result = fold_data(time, dd, 1.0 / fff, lowcut, highcut, nbins, notch=notch)
folded, tt, folded_nonorm = folding_result[:3]
if nointeractive:
reselect_ok = False
answer = "n"
else:
if reselect_ok:
print("\n\n")
answer = input("This will overwrite the file for OK TES. Are you sure you want to proceed [y/n]")
else:
answer = "n"
if answer == "y":
print("Now going to reselect the OK TES and overwrite the corresponding file")
#### Pass 1 - allows to obtain good values for t0 basically
#### Now perform the fit on the median folded data
print("")
print("FIRST PASS")
print("First Pass is only to have a good guess of the t0, your selection should be very conservative - only high S/N")
av, params, err = fit_average(tt, folded, fff, dc, fib, Vtes, initpars=initpars, fixpars=[0, 0, 0, 0], doplot=True, name=name)
#### And the fit on all data with this as a first guess forcing some parameters
#### it returns the list of OK detectorsy
allparams, allerr, allchi2, ndf, ok = fit_all(tt, folded, av, initpars=[dc, params[1], params[2], params[3]], fixpars=[1, 0, 1, 0], rangepars=rangepars, stop_each=True)
#### Pass 2
#### Refit with only the above selected ones in order to have good t0
#### Refit the median of the OK detectors
print("")
print("SECOND PASS")
print("Second pass is the final one, please select the pixels that seem OK")
av, params, err = fit_average(tt, folded[ok, :], fff, dc, fib, Vtes, initpars=initpars, fixpars=[0, 0, 0, 0], doplot=True, name=name)
#### And the fit on all data with this as a first guess forcing some parameters
#### it returns the list of OK detectors
allparams, allerr, allchi2, ndf, ok = fit_all(tt, folded, av, initpars=[dc, params[1], params[2], params[3]], rangepars=rangepars, fixpars=[1, 0, 1, 0], stop_each=True)
#### Final Pass
#### The refit them all with only tau and amp as free parameters
#### also do not normalize amplitudes of folded
allparams, allerr, allchi2, ndf, ok_useless = fit_all(
tt, 1e3 * folded_nonorm, av, initpars=[dc, params[1], params[2], params[3]], rangepars=rangepars, fixpars=[1, 0, 1, 0], functname=simsig_nonorm
) # 1e3 converts muA to nA
okfinal = ok * (allparams[:, 1] < 1.0)
### Make sure no thermometer is included
okfinal[[3, 35, 67, 99]] = False
# Save the list of OK bolometers
if okfile is None:
FitsArray(okfinal.astype(int)).save("TES-OK-{}{}-asic{}.fits".format(name, fib, asic))
else:
FitsArray(okfinal.astype(int)).save(okfile)
else:
if okfile is None:
okfinal = np.array(FitsArray("TES-OK-{}{}-asic{}.fits".format(name, fib, asic))).astype(bool)
else:
okfinal = np.array(FitsArray(okfile)).astype(bool)
if removesat:
#### remove pixels looking saturated, sat_value in muA
saturated = np.min(folded_nonorm, axis=1) < removesat
okfinal = (okfinal * ~saturated).astype(bool)
if doplot is False:
### Now redo the fits one last time
av, params, err = fit_average(tt, folded[okfinal, :], fff, dc, fib, Vtes, initpars=initpars, fixpars=[0, 0, 0, 0], doplot=False, clear=False, name=name)
allparams, allerr, allchi2, ndf, ok_useless = fit_all(
tt, 1e3 * folded_nonorm, av, initpars=[dc, params[1], params[2], params[3]], fixpars=[1, 0, 1, 0], functname=simsig_nonorm, rangepars=rangepars
)
# 1e3 converts muA to nA
else:
plt.figure(figsize=(6, 8))
plt.subplot(3, 1, 1)
### Now redo the fits one last time
av, params, err = fit_average(tt, folded[okfinal, :], fff, dc, fib, Vtes, initpars=initpars, fixpars=[0, 0, 0, 0], doplot=True, clear=False, name=name)
print(params)
print(err)
if lastpassallfree:
fixed = [0, 0, 0, 0]
else:
fixed = [1, 0, 1, 0]
allparams, allerr, allchi2, ndf, ok_useless = fit_all(
tt, 1e3 * folded_nonorm, av, initpars=[dc, params[1], params[2], params[3]], fixpars=fixed, functname=simsig_nonorm, stop_each=stop_each, rangepars=rangepars
) # 1e3 converts muA to nA
plt.subplot(3, 2, 3)
mmt, sst = meancut(allparams[okfinal, 1], 3)
plt.hist(allparams[okfinal, 1], range=[0, mmt + 4 * sst], bins=10, label=statstr(allparams[okfinal, 1], cut=3))
plt.xlabel("Tau [sec]")
plt.legend()
plt.title("Asic {} - {} {}".format(name, asic, fib))
plt.subplot(3, 2, 4)
mma, ssa = meancut(allparams[okfinal, 3], 3)
plt.hist(allparams[okfinal, 3], range=[0, mma + 4 * ssa], bins=10, label=statstr(allparams[okfinal, 3], cut=3))
plt.legend()
plt.xlabel("Amp [ nA ]")
pars = allparams
tau = pars[:, 1]
tau[~okfinal] = np.nan
amp = pars[:, 3]
amp[~okfinal] = np.nan
if asic == 1:
tau1 = tau
tau2 = None
amp1 = amp
amp2 = None
else:
tau1 = None
tau2 = tau
amp1 = None
amp2 = amp
plt.subplot(3, 2, 5)
imtau = image_asics(data1=tau1, data2=tau2)
plt.imshow(imtau, vmin=0, vmax=mmt + 4 * sst, cmap="viridis", interpolation="nearest")
plt.title("Tau - {} {} - asic {}".format(name, fib, asic))
plt.colorbar()
plt.subplot(3, 2, 6)
imamp = image_asics(data1=amp1, data2=amp2)
plt.imshow(imamp, vmin=0, vmax=mma + 6 * ssa, cmap="viridis", interpolation="nearest")
plt.colorbar()
plt.title("Amp - {} {} - asic {}".format(name, fib, asic))
plt.tight_layout()
return tt, folded, okfinal, allparams, allerr, allchi2, ndf
[docs]
def calibrate(fib, pow_maynooth, allparams, allerr, allok, cutparam=None, cuterr=None, bootstrap=None):
"""
Parameters
----------
fib
pow_maynooth
allparams
allerr
allok
cutparam
cuterr
bootstrap
Returns
-------
"""
img_maynooth = image_asics(all1=pow_maynooth)
plt.clf()
plt.subplot(2, 2, 1)
plt.plot(allparams[allok, 3], allerr[allok, 3], "k.")
if cuterr is not None:
thecut_err = cuterr
else:
thecut_err = 1e10
if cutparam is not None:
thecut_amp = cutparam
else:
thecut_amp = 1e10
newok = allok * (allerr[:, 3] < thecut_err) * (allparams[:, 3] < thecut_amp)
plt.plot([np.min(allparams[allok, 3]), np.max(allparams[allok, 3])], [thecut_err, thecut_err], "g--")
plt.plot([thecut_amp, thecut_amp], [np.min(allerr[allok, 3]), np.max(allerr[allok, 3])], "g--")
plt.plot(allparams[newok, 3], allerr[newok, 3], "r.")
allparams[~newok, :] = np.nan
plt.ylabel("$\sigma_{amp}$ [nA]")
plt.xlabel("Amp Fib{} [nA]".format(fib))
plt.subplot(2, 2, 3)
plt.errorbar(pow_maynooth[newok], allparams[newok, 3], yerr=allerr[newok, 3], fmt="r.")
xx = pow_maynooth[newok]
yy = allparams[newok, 3]
yyerr = allerr[newok, 3]
res = do_minuit(xx, yy, yyerr, np.array([1.0, 0]), fixpars=[0, 0])
paramfit = res[1]
if bootstrap is None:
errfit = res[2]
else:
bsres = []
bar = progress_bar(bootstrap, "Bootstrap")
for i in range(bootstrap):
bar.update()
order = np.argsort(np.random.rand(len(xx)))
xxbs = xx.copy()
yybs = yy[order]
yybserr = yyerr[order]
theres = do_minuit(xxbs, yybs, yybserr, np.array([1.0, 0]), fixpars=[0, 0], verbose=False)
bsres.append(theres[1])
bsres = np.array(bsres)
errfit = np.std(bsres, axis=0)
xxx = np.linspace(0, np.max(pow_maynooth), 100)
plt.plot(xxx, thepolynomial(xxx, res[1]), "g", lw=3, label="a={0:8.3f} +/- {1:8.3f} \n b={2:8.3f} +/- {3:8.3f}".format(paramfit[0], errfit[0], paramfit[1], errfit[1]))
if bootstrap is not None:
bsdata = np.zeros((bootstrap, len(xxx)))
for i in range(bootstrap):
bsdata[i, :] = thepolynomial(xxx, bsres[i, :])
mm = np.mean(bsdata, axis=0)
ss = np.std(bsdata, axis=0)
plt.fill_between(xxx, mm - ss, y2=mm + ss, color="b", alpha=0.3)
plt.fill_between(xxx, mm - 2 * ss, y2=mm + 2 * ss, color="b", alpha=0.2)
plt.fill_between(xxx, mm - 3 * ss, y2=mm + 3 * ss, color="b", alpha=0.1)
plt.plot(xxx, mm, "b", label="Mean bootstrap")
plt.ylim(0, np.max(allparams[newok, 3]) * 1.1)
plt.xlim(np.min(pow_maynooth[newok]) * 0.99, np.max(pow_maynooth[newok]) * 1.01)
plt.ylabel("Amp Fib{} [nA]".format(fib))
plt.xlabel("Maynooth [mW]")
plt.legend(fontsize=8, framealpha=0.5)
plt.subplot(2, 2, 2)
plt.imshow(img_maynooth, vmin=np.min(pow_maynooth), vmax=np.max(pow_maynooth), interpolation="nearest")
plt.colorbar()
plt.title("Maynooth [mW]")
plt.subplot(2, 2, 4)
img = image_asics(all1=allparams[:, 3] / res[1][0])
plt.imshow(img, interpolation="nearest")
plt.colorbar()
plt.title("Amp Fib{} converted to mW".format(fib))
plt.tight_layout()
return res[1], res[2], newok