import emcee
import numpy as np
from scipy.integrate import cumulative_trapezoid
from scipy.optimize import curve_fit
from qubic.lib.Calibration import Qfiber as ft
__all__ = ["LogLikelihood"]
[docs]
class LogLikelihood:
def __init__(
self, xvals=None, yvals=None, errors=None, model=None, nbins=None, nsiginit=10, nsigprior=20, flatprior=None, fixedpars=None, covariance_model_funct=None, p0=None, nwalkers=32, chi2=None
):
self.prior = None
self.model = model
self.xvals = xvals
self.yvals = yvals
if nbins is None:
self.nbins = len(xvals)
else:
self.nbins = nbins
self.nsiginit = nsiginit
self.nsigprior = nsigprior
self.covariance_model_funct = covariance_model_funct
self.nwalkers = nwalkers
self.fixedpars = fixedpars
self.p0 = p0
self.chi2 = chi2
if np.ndim(errors) == 1:
self.covar = np.zeros((np.size(errors), np.size(errors)))
np.fill_diagonal(self.covar, np.array(errors) ** 2)
else:
self.covar = errors
self.flatprior = flatprior
if flatprior is None:
initial_fit = self.minuit(p0=self.p0, chi2=self.chi2, verbose=False)
self.fitresult = [initial_fit[0], initial_fit[1]]
def __call__(self, mytheta, extra_args=None, verbose=False):
if self.fixedpars is not None:
theta = self.p0.copy()
theta[self.fixedpars == 0] = mytheta
# theta[self.fixedpars == 0] = mytheta[self.fixedpars == 0]
else:
theta = mytheta
# theta = mytheta
self.modelval = self.model(self.xvals[: self.nbins], theta)
if self.covariance_model_funct is None:
self.invcov = np.linalg.inv(self.covar)
else:
cov_repeat = self.make_covariance_matrix()
self.invcov = np.linalg.inv(cov_repeat + self.covar)
lp = self.log_priors(theta)
if verbose:
print("Pars")
print(theta)
print("Y")
print(np.shape(self.yvals))
print(self.yvals[0:10])
print("Model")
print(np.shape(self.modelval))
print(self.modelval[:10])
print("Diff")
print(np.shape((self.yvals - self.modelval)))
print((self.yvals - self.modelval)[0:10])
print("Diff x invcov")
print(np.shape((self.yvals - self.modelval).T @ self.invcov))
print(((self.yvals - self.modelval).T @ self.invcov)[0:10])
logLLH = lp - 0.5 * (((self.yvals - self.modelval).T @ self.invcov) @ (self.yvals - self.modelval))
if not np.isfinite(logLLH):
return -np.inf
else:
return logLLH
[docs]
def make_covariance_matrix(self):
cov = self.covariance_model_funct(self.modelval[: self.nbins])
cov_repeat = np.zeros_like(self.covar)
for i in range(0, len(self.xvals), self.nbins):
cov_repeat[i : i + self.nbins, i : i + self.nbins] = cov
return cov_repeat
[docs]
def compute_sigma68(self, logLLH, rvalues):
LLH = [np.exp(logLLH([rvalues[i]])) for i in range(len(rvalues))]
ct_integral = cumulative_trapezoid(LLH, x=rvalues) # Cumulative integral
ct_integral /= np.max(ct_integral)
sigma68 = np.interp(0.68, ct_integral, rvalues[1:])
return LLH, sigma68
[docs]
def log_priors(self, theta):
ok = 1
for i in range(len(theta)):
if self.flatprior is not None:
if (theta[i] < self.flatprior[i][0]) or (theta[i] > self.flatprior[i][1]):
ok *= 0
else:
if np.abs(theta[i] - self.fitresult[0][i]) > (self.nsigprior * np.sqrt(self.fitresult[1][i, i])):
ok *= 0
if ok == 1:
return 0
else:
return -np.inf
[docs]
def run(self, nbmc):
nwalkers = self.nwalkers
if self.flatprior is not None:
ndim = len(self.flatprior)
pos = np.zeros((nwalkers, ndim))
for d in range(ndim):
pos[:, d] = np.random.rand(nwalkers) * (self.flatprior[d][1] - self.flatprior[d][0]) + self.flatprior[d][0]
else:
nsigmas = self.nsiginit
ndim = len(self.fitresult[0])
pos = np.zeros((nwalkers, ndim))
for d in range(ndim):
pos[:, d] = np.random.randn(nwalkers) * np.sqrt(self.fitresult[1][d, d]) * nsigmas + self.fitresult[0][d]
# print('Ndim init:', ndim)
if self.fixedpars is not None:
ndim = int(np.sum(self.fixedpars == 0))
# print('New ndim:', ndim)
sampler = emcee.EnsembleSampler(nwalkers, ndim, self.__call__)
if self.fixedpars is not None:
# print('Len(pos):', np.shape(pos))
# print('len(fixedpars):', len(self.fixedpars))
pos = pos[:, self.fixedpars == 0]
# print('New len(pos):', np.shape(pos))
sampler.run_mcmc(pos, nbmc, progress=True)
return sampler
[docs]
def fisher_analysis(self, delta_r=1e-7):
# Model
modelval_r0 = self.model(self.xvals[: self.nbins], r=0.0)
modelval_deltar = self.model(self.xvals[: self.nbins], r=delta_r)
# Jacobian, Numerical derivative
J = (modelval_deltar - modelval_r0) / delta_r
# Covariance matrix in new basis
Cov_r = 1 / (J.T @ self.invcov @ J)
# Sigma at 68 pourcent
sigma68 = np.sqrt(Cov_r)
return sigma68
[docs]
def call4curvefit(self, x, *pars):
return self.model(x, pars)
[docs]
def curve_fit(self, p0=None):
if p0 is None:
p0 = self.p0
self.fitresult_curvefit = curve_fit(self.call4curvefit, self.xvals, self.yvals, sigma=np.sqrt(np.diag(self.covar)), maxfev=1000000, ftol=1e-5, p0=p0)
return self.fitresult_curvefit[0], self.fitresult_curvefit[1]
### This should be modified in order to call the current likelihood instead, not an external one...
[docs]
def minuit(self, p0=None, chi2=None, verbose=True, print_level=0, ncallmax=10000, extra_args=None, nsplit=1, return_chi2fct=False):
if p0 is None:
p0 = self.p0
if verbose & (print_level > 1):
print("About to call Minuit with chi2:")
print(chi2)
print("Initial parameters, fixed and bounds:")
for i in range(len(p0)):
print("Param {0:}: init={1:6.2f} Fixed={2:} Range=[{3:6.3f}, {4:6.3f}]".format(i, p0[i], self.fixedpars[i], self.flatprior[i][0], self.flatprior[i][1]))
self.fitresult_minuit = ft.do_minuit(
self.xvals,
self.yvals,
self.covar,
p0,
functname=self.model,
fixpars=self.fixedpars,
rangepars=self.flatprior,
verbose=verbose,
chi2=self.chi2,
print_level=print_level,
ncallmax=ncallmax,
extra_args=extra_args,
nsplit=nsplit,
)
if len(self.fitresult_minuit[3]) == 0:
cov = np.diag(self.fitresult_minuit[2])
else:
cov = self.fitresult_minuit[3]
if return_chi2fct:
return self.fitresult_minuit[1], cov, self.fitresult_minuit[6]
else:
return self.fitresult_minuit[1], cov
[docs]
def random_explore_guess(self, ntry=100, fraction=1):
fit_range_simu = self.flatprior
fit_fixed_simu = self.fixedpars
myguess_params = np.zeros((ntry, len(fit_range_simu)))
for i in range(len(fit_range_simu)):
if fit_fixed_simu[i] == 0:
rng = (fit_range_simu[i][1] - fit_range_simu[i][0]) * fraction
mini = np.max([fit_range_simu[i][0], self.p0[i] - rng / 2])
maxi = np.min([fit_range_simu[i][0], self.p0[i] + rng / 2])
myguess_params[:, i] = np.random.rand(ntry) * (maxi - mini) + mini
else:
myguess_params[:, i] = self.p0[i]
return myguess_params