Source code for lib.Calibration.Qcalibration

# coding: utf-8
import os
from configparser import ConfigParser

import numpy as np
from astropy.io import fits
from pysimulators import Layout, LayoutGrid

from qubic.calfiles import PATH as cal_dir
from qubic.lib.Qhorns import HornLayout
from qubic.lib.Qutilities import find_file

__all__ = ["QubicCalibration"]


[docs] class QubicCalibration(object): """ Class representing the QUBIC calibration tree. It stores the calibration file names and "hardcoded" values and provides access to them. If the path name of a calibration file is relative, it is first searched relatively to the working directory and if not found, in the calibration path. """ def __init__(self, d, path=cal_dir): """ Parameters ---------- path : str, optional The directory path of the calibration tree. The default one is the one that is contained in the qubic package. detarray : str, optional The detector array layout calibration file name. hornarray : str, optional The horn array layout calibration file name. optics : str, optional The optics parameters calibration file name. primbeam : str, optional The primary beam parameter calibration file name. synthbeam : str, optional The synthetic beam parameter calibration file name. """ if path is None: path = "." self.path = os.path.abspath(path) # replace the wildcard with the configuration: either TD or FI epsilon = 1.0e-9 # one Hz of margin for comparisons self.nu = int(d["filter_nu"] / 1e9) if self.nu >= 130 - epsilon and self.nu <= 170 + epsilon: nu_str = "150" elif self.nu >= 190 - epsilon and self.nu <= 247.5 + epsilon: nu_str = "220" else: nu_str = "%03i" % self.nu for key in ["detarray", "hornarray", "optics", "primbeam", "synthbeam"]: calfile = d[key].replace("_CC", "_%s" % d["config"]).replace("_FFF", "_%s" % nu_str) calfile_fullpath = find_file(os.path.join(self.path, calfile), verbosity=1) if calfile_fullpath is None: cmd = "self.%s = None" % key else: cmd = "self.%s = '%s'" % (key, calfile_fullpath) if d["use_synthbeam_fits_file"]: print("executing: %s" % cmd) exec(cmd) if d["debug"]: print("self.synthbeam = %s" % self.synthbeam) def __str__(self): state = [ ("path", self.path), ("detarray", self.detarray), ("hornarray", self.hornarray), ("optics", self.optics), ("primbeam", self.primbeam), ("synthbeam", self.synthbeam), ] return "\n".join([a + ": " + repr(v) for a, v in state]) __repr__ = __str__
[docs] def get(self, name, *args): if name == "detarray": hdus = fits.open(self.detarray) version = hdus[0].header["format version"] vertex = hdus[2].data frame = hdus[0].header["FRAME"] if frame == "ONAFP": # Make a pi/2 rotation from ONAFP -> GRF referential frame vertex[..., [0, 1]] = vertex[..., [1, 0]] vertex[..., 1] *= -1 shape = vertex.shape[:-2] removed = hdus[3].data.view(bool) ordering = hdus[4].data quadrant = hdus[5].data efficiency = hdus[6].data return shape, vertex, removed, ordering, quadrant, efficiency elif name == "hornarray": hdus = fits.open(self.hornarray) version = hdus[0].header["format version"] if version == "1.0": h = hdus[0].header spacing = h["spacing"] center = hdus[1].data shape = center.shape[:-1] layout = Layout(shape, center=center, radius=h["innerrad"], open=None) layout.spacing = spacing elif version == "2.0": h = hdus[0].header spacing = h["spacing"] xreflection = h["xreflection"] yreflection = h["yreflection"] radius = h["radius"] selection = ~hdus[1].data.view(bool) layout = LayoutGrid( removed.shape, spacing, selection=selection, radius=radius, xreflection=xreflection, yreflection=yreflection, open=None, ) else: h = hdus[1].header spacing = h["spacing"] xreflection = h["xreflection"] yreflection = h["yreflection"] angle = h["angle"] radius = h["radius"] selection = ~hdus[2].data.view(bool) shape = selection.shape layout = HornLayout( shape, spacing, selection=selection, radius=radius, xreflection=xreflection, yreflection=yreflection, angle=angle, startswith1=True, id=None, open=None, ) layout.id = np.arange(len(layout)) layout.center = np.concatenate( [layout.center, np.full_like(layout.center[..., :1], 0)], -1 ) layout.open = np.ones(len(layout), bool) return layout elif name == "optics": dtype = [ ("name", "S16"), ("temperature", float), ("transmission", float), ("emissivity", float), ("nstates_pol", int), ] if self.optics.endswith("fits"): header = fits.open(self.optics)[0].header return { "focal length": header["flength"], "detector efficiency": 1.0, "components": np.empty(0, dtype=dtype), } parser = ConfigParser() parser.read(self.optics) # ### The 2 next lines are commented as there is nothing in the section # ### "general" in the optics calibration file. Focal length has been moved to the dictionary. # keys = 'focal length', # out = dict((key, parser.getfloat('general', key)) for key in keys) out = {} raw = parser.items("components") components = np.empty(len(raw), dtype=dtype) for i, r in enumerate(raw): component = (r[0],) + tuple(float(_) for _ in r[1].split(", ")) components[i] = component out["components"] = components return out elif name == "primbeam": hdu = fits.open(self.primbeam) header = hdu[0].header # Gaussian beam if header["format version"] == "1.0": fwhm0_deg = header["fwhm"] return fwhm0_deg # Fitted beam elif header["format version"] == "2.0": if self.nu < 170 and self.nu > 130: omega = hdu[1].header["omega"] par = hdu[1].data else: omega = hdu[2].header["omega"] par = hdu[2].data return par, omega # Multi frequency beam else: parth = hdu[1].data parfr = hdu[2].data parbeam = hdu[3].data alpha = hdu[4].data xspl = hdu[5].data return parth, parfr, parbeam, alpha, xspl raise ValueError("Invalid primary beam calibration version") elif name == "synthbeam": if self.synthbeam is None: print("synthbeam not defined") return None if not os.path.isfile(self.synthbeam): print("File not found: %s" % self.synthbeam) return None hdu = fits.open(self.synthbeam) header = hdu[0].header theta = hdu[0].data phi = hdu[1].data val = hdu[2].data freqs = hdu[3].data return theta, phi, val, freqs, header elif name == "synthbeam_jc": hdu = fits.open(self.synthbeam) header = hdu[0].header theta = hdu[0].data phi = hdu[1].data val = hdu[2].data numpeaks = hdu[3].data return theta, phi, val, numpeaks, header raise ValueError("Invalid calibration item: '{}'".format(name))
# def _newest(self, filename): # if '*' not in filename: # if not os.path.exists(filename): # filename = os.path.join(self.path, filename) # if not os.path.exists(filename): # raise ValueError("No calibration file '{}'.".format(filename)) # return os.path.abspath(filename) ## filenames = glob(filename) # if len(filenames) == 0: # filename = os.path.join(self.path, filename) # filenames = glob(filename) # if len(filenames) == 0: # raise ValueError("No calibration files '{}'.".format(filename)) # regex = re.compile(filename.replace('*', '(.*)')) # version = sorted(regex.search(f).group(1) for f in filenames)[-1] # return os.path.abspath(filename.replace('*', version))