# coding: utf-8
import numpy as np
from astropy.time import Time, TimeDelta
from pyoperators import (
Cartesian2SphericalOperator,
Rotation3dOperator,
Spherical2CartesianOperator,
rule_manager,
)
from pyoperators.utils import deprecated, isscalarlike
from pysimulators import (
CartesianEquatorial2GalacticOperator,
CartesianEquatorial2HorizontalOperator,
CartesianGalactic2EquatorialOperator,
CartesianHorizontal2EquatorialOperator,
SamplingHorizontal,
SphericalEquatorial2GalacticOperator,
SphericalEquatorial2HorizontalOperator,
SphericalGalactic2EquatorialOperator,
SphericalHorizontal2EquatorialOperator,
)
from pysimulators.interfaces.healpy import Cartesian2HealpixOperator
__all__ = [
"QubicSampling",
"get_pointing",
"create_random_pointings",
"create_repeat_pointings",
"create_sweeping_pointings",
"equ2gal",
"equ2hor",
"gal2equ",
"gal2hor",
"hor2equ",
"hor2gal",
]
DOMECLAT = -(24 + 11.0 / 60)
DOMECLON = -(66 + 28.0 / 60)
[docs]
class QubicSampling(SamplingHorizontal):
"""
Attributes
----------
azimuth : array-like
The pointing azimuth [degrees].
elevation : array-like
The pointing elevation [degrees].
pitch : array-like
The instrument pitch angle [degrees].
angle_hwp : array-like
The half-wave plate angle [degrees].
time : array-like
Elapsed time for each pointing since the observation start date,
in seconds.
date_obs : string or astropy.time.Time
The observation start date.
period : float
The sampling period [s].
latitude : float
Telescope latitude [degrees].
longitude : float
Telescope longitude [degrees].
Examples
--------
pointing = QubicSampling(azimuth=azimuth, elevation=elevation,
angle_hwp=angle_hwp, period=...)
pointing = QubicSampling(azimuth, elevation, pitch, angle_hwp, period=...)
"""
DEFAULT_DATE_OBS = "2016-01-01 00:00:00"
DEFAULT_PERIOD = 1
DEFAULT_LATITUDE = DOMECLAT
DEFAULT_LONGITUDE = DOMECLON
def __init__(self, *args, **keywords):
if len(args) == 4:
args = list(args)
angle_hwp = args.pop()
else:
angle_hwp = keywords.pop("angle_hwp", 0)
SamplingHorizontal.__init__(
self, angle_hwp=angle_hwp, healpix=None, *args, **keywords
)
[docs]
def healpix(self, nside):
time = self.date_obs + TimeDelta(self.time, format="sec")
c2h = Cartesian2HealpixOperator(nside)
e2g = CartesianEquatorial2GalacticOperator()
h2e = CartesianHorizontal2EquatorialOperator(
"NE", time, self.latitude, self.longitude
)
rotation = c2h(e2g(h2e))
return rotation(self.cartesian)
@property
def cartesian_galactic2instrument(self):
"""
Return the galactic-to-instrument transform.
"""
time = self.date_obs + TimeDelta(self.time, format="sec")
with rule_manager(none=False):
r = (
Rotation3dOperator(
"ZY'Z''", self.azimuth, 90 - self.elevation, self.pitch, degrees=True
).T
* CartesianEquatorial2HorizontalOperator(
"NE", time, self.latitude, self.longitude
)
* CartesianGalactic2EquatorialOperator()
)
return r
@property
def cartesian_instrument2galactic(self):
return self.cartesian_galactic2instrument.I
@property
def cartesian_horizontal2instrument(self):
"""
Return the galactic-to-instrument transform.
"""
with rule_manager(none=False):
r = Rotation3dOperator(
"ZY'Z''", self.azimuth, 90 - self.elevation, self.pitch, degrees=True
).T
return r
@property
def cartesian_instrument2horizontal(self):
return self.cartesian_horizontal2instrument.I
@deprecated
class QubicPointing(QubicSampling):
pass
[docs]
def get_pointing(d):
if [d["random_pointing"], d["sweeping_pointing"], d["repeat_pointing"]].count(
True
) != 1:
raise ValueError("Error: you should choose one pointing")
center = (d["RA_center"], d["DEC_center"])
if d["random_pointing"] is True:
return create_random_pointings(
center,
d["npointings"],
d["dtheta"],
d["hwp_stepsize"],
date_obs=d["date_obs"],
period=d["period"],
latitude=d["latitude"],
longitude=d["longitude"],
seed=d["seed"],
)
elif d["repeat_pointing"] is True:
return create_repeat_pointings(
center,
d["npointings"],
d["dtheta"],
d["nhwp_angles"],
date_obs=d["date_obs"],
period=d["period"],
latitude=d["latitude"],
longitude=d["longitude"],
seed=d["seed"],
)
elif d["sweeping_pointing"] is True:
return create_sweeping_pointings(
center,
d["duration"],
d["period"],
d["angspeed"],
d["delta_az"],
d["nsweeps_per_elevation"],
d["angspeed_psi"],
d["maxpsi"],
d["hwp_stepsize"],
date_obs=d["date_obs"],
latitude=d["latitude"],
longitude=d["longitude"],
fix_azimuth=d["fix_azimuth"],
random_hwp=d["random_hwp"],
)
[docs]
def create_random_pointings(
center,
npointings,
dtheta,
hwp_stepsize,
date_obs=None,
period=None,
latitude=None,
longitude=None,
seed=None,
):
"""
Return pointings randomly and uniformly distributed in a spherical cap.
1) Creates random coordinates theta, phi. Range: 0 < theta < dtheta, 0 < phi < 360
(then are converted from spherical to cartesian coordinates),
2) It rotates the points to center them in direction (RA, DEC) using Rotation3dOperator
(equatorial reference system)
3) Convertion: Equatorial to Horizontal reference system
(using CartesianEquatorial2HorizontalOperator's operator)
4) Back to cartesian coordinates (using Cartesian2SphericalOperator)
Parameters
----------
center : 2-tuple
The R.A. and declination of the center of the FOV, in degrees.
npointings : int
The number of requested pointings
dtheta : float
The maximum angular distance to the center.
hwp_stepsize : float
Step angle size for the HWP.
date_obs : str or astropy.time.Time, optional
The starting date of the observation (UTC).
period : float, optional
The sampling period of the pointings, in seconds.
latitude : float, optional
The observer's latitude [degrees]. Default is DOMEC's.
longitude : float, optional
The observer's longitude [degrees]. Default is DOMEC's.
seed : int
Random seed.
"""
r = np.random.RandomState(seed)
cosdtheta = np.cos(np.radians(dtheta))
theta = np.degrees(np.arccos(cosdtheta + (1 - cosdtheta) * r.rand(npointings)))
phi = r.rand(npointings) * 360
pitch = r.rand(npointings) * 360
p = QubicSampling(
npointings,
date_obs=date_obs,
period=period,
latitude=latitude,
longitude=longitude,
)
time = p.date_obs + TimeDelta(p.time, format="sec")
c2s = Cartesian2SphericalOperator("azimuth,elevation", degrees=True)
e2h = CartesianEquatorial2HorizontalOperator("NE", time, p.latitude, p.longitude)
rot = Rotation3dOperator("ZY'", center[0], 90 - center[1], degrees=True)
s2c = Spherical2CartesianOperator("zenith,azimuth", degrees=True)
rotation = c2s(e2h(rot(s2c)))
coords = rotation(np.asarray([theta.T, phi.T]).T)
p.azimuth = coords[..., 0]
p.elevation = coords[..., 1]
p.pitch = pitch
p.fix_az = False
p.angle_hwp = r.randint(0, int(90 / hwp_stepsize + 1), npointings) * hwp_stepsize
return p
[docs]
def create_repeat_pointings(
center,
npointings,
dtheta,
nhwp_angles,
date_obs=None,
period=None,
latitude=None,
longitude=None,
seed=None,
):
"""
Return pointings randomly and uniformly distributed in a spherical cap.
The same pointing is repeated nhwp_angles times with a different
hwp angle each time.
Parameters
----------
center : 2-tuple
The R.A. and declination of the center of the FOV, in degrees.
npointings : int
The number of requested pointings
dtheta : float
The maximum angular distance to the center.
nhwp_angles : int
The number of HWP angles used.
date_obs : str or astropy.time.Time, optional
The starting date of the observation (UTC).
period : float, optional
The sampling period of the pointings, in seconds.
latitude : float, optional
The observer's latitude [degrees]. Default is DOMEC's.
longitude : float, optional
The observer's longitude [degrees]. Default is DOMEC's.
seed : int
Random seed.
"""
r = np.random.RandomState(seed)
nrandom = int(npointings / nhwp_angles) # number of real random pointings
print(
"You asked {0} pointings with repeat strategy so I will provide {1} pointings repeated {2} times.".format(
npointings, nrandom, nhwp_angles
)
)
# Creation of nrandom pointing
cosdtheta = np.cos(np.radians(dtheta))
theta = np.degrees(np.arccos(cosdtheta + (1 - cosdtheta) * r.rand(nrandom)))
phi = r.rand(nrandom) * 360
pitch = r.rand(nrandom) * 360
p = QubicSampling(
nrandom, date_obs=date_obs, period=period, latitude=latitude, longitude=longitude
)
time = p.date_obs + TimeDelta(p.time, format="sec")
c2s = Cartesian2SphericalOperator("azimuth,elevation", degrees=True)
e2h = CartesianEquatorial2HorizontalOperator("NE", time, p.latitude, p.longitude)
rot = Rotation3dOperator("ZY'", center[0], 90 - center[1], degrees=True)
s2c = Spherical2CartesianOperator("zenith,azimuth", degrees=True)
rotation = c2s(e2h(rot(s2c)))
coords = rotation(np.asarray([theta.T, phi.T]).T)
p.azimuth = coords[..., 0]
p.elevation = coords[..., 1]
p.pitch = pitch
p.fix_az = False
# Replication of the same pointing with others fix hwp angles
pp = QubicSampling(
nrandom * nhwp_angles,
date_obs=date_obs,
period=period,
latitude=latitude,
longitude=longitude,
)
pp.azimuth = np.tile(p.azimuth, nhwp_angles)
pp.elevation = np.tile(p.elevation, nhwp_angles)
pp.pitch = np.tile(p.pitch, nhwp_angles)
pp.time = np.tile(p.time, nhwp_angles)
pp.angle_hwp = np.zeros(nrandom * nhwp_angles)
pp.fix_az = False
for hwp in range(nhwp_angles):
pp.angle_hwp[hwp * nrandom : (hwp + 1) * nrandom] = np.array(
np.rad2deg(hwp * np.pi / (nhwp_angles * 2))
)
return pp
[docs]
def create_sweeping_pointings(
center,
duration,
period,
angspeed,
delta_az,
nsweeps_per_elevation,
angspeed_psi,
maxpsi,
hwp_stepsize,
date_obs=None,
latitude=None,
longitude=None,
fix_azimuth=None,
random_hwp=True,
):
"""
Return pointings according to the sweeping strategy:
Sweep around the tracked FOV center azimuth at a fixed elevation, and
update elevation towards the FOV center at discrete times.
Parameters
----------
center : array-like of size 2
The R.A. and Declination of the center of the FOV.
duration : float
The duration of the observation, in hours.
period : float
The sampling period of the pointings, in seconds.
angspeed : float
The pointing angular speed, in deg / s.
delta_az : float
The sweeping extent in degrees.
nsweeps_per_elevation : int
The number of sweeps during a phase of constant elevation.
angspeed_psi : float
The pitch angular speed, in deg / s.
maxpsi : float
The maximum pitch angle, in degrees.
latitude : float, optional
The observer's latitude [degrees]. Default is DOMEC's.
longitude : float, optional
The observer's longitude [degrees]. Default is DOMEC's.
fix_azimuth : bool
hwp_stepsize : float
Step angle size for the HWP.
date_obs : str or astropy.time.Time, optional
The starting date of the observation (UTC).
random_hwp : bool
Returns
-------
pointings : QubicSampling
Structured array containing the azimuth, elevation and pitch angles,
in degrees.
"""
nsamples = int(np.ceil(duration * 3600 / period))
out = QubicSampling(
nsamples, date_obs=date_obs, period=period, latitude=latitude, longitude=longitude
)
racenter = center[0]
deccenter = center[1]
backforthdt = delta_az / angspeed * 2
# compute the sweep number
isweeps = np.floor(out.time / backforthdt).astype(int)
# azimuth/elevation of the center of the field as a function of time
if fix_azimuth["apply"]:
azcenter = out.time * 0 + fix_azimuth["az"]
elcenter = out.time * 0 + fix_azimuth["el"]
else:
azcenter, elcenter = equ2hor(
racenter,
deccenter,
out.time,
date_obs=out.date_obs,
latitude=out.latitude,
longitude=out.longitude,
)
# compute azimuth offset for all time samples
daz = out.time * angspeed
daz = daz % (delta_az * 2)
mask = daz > delta_az
daz[mask] = -daz[mask] + 2 * delta_az
daz -= delta_az / 2
# elevation is kept constant during nsweeps_per_elevation
elcst = np.zeros(nsamples)
ielevations = isweeps // nsweeps_per_elevation
nelevations = ielevations[-1] + 1
for i in range(nelevations):
mask = ielevations == i
elcst[mask] = np.mean(elcenter[mask])
if fix_azimuth is not None:
if fix_azimuth["apply"]:
el_step = fix_azimuth["el_step"]
elcst[mask] = elcenter[mask] - nelevations / 2 * el_step + i * el_step
# azimuth and elevations to use for pointing
azptg = azcenter + daz
elptg = elcst
### scan psi as well
pitch = out.time * angspeed_psi
pitch = pitch % (4 * maxpsi)
mask = pitch > (2 * maxpsi)
pitch[mask] = -pitch[mask] + 4 * maxpsi
pitch -= maxpsi
out.azimuth = azptg
out.elevation = elptg
out.pitch = pitch
if random_hwp:
out.angle_hwp = (
np.random.randint(0, int(90 / hwp_stepsize + 1), nsamples) * hwp_stepsize
)
else:
out.angle_hwp = np.zeros(nsamples)
max_sweeps = np.max(isweeps)
delta = int(nsamples / max_sweeps)
for i in range(max_sweeps):
out.angle_hwp[i * delta : (i + 1) * delta] = hwp_stepsize * np.mod(
i, int(90 / hwp_stepsize + 1)
)
if fix_azimuth["apply"]:
out.fix_az = True
if fix_azimuth["fix_hwp"]:
out.angle_hwp = out.pitch * 0 + hwp_stepsize
if fix_azimuth["fix_pitch"]:
out.pitch = 0
else:
out.fix_az = False
return out
def _format_sphconv(a, b, date_obs=None, time=None):
incoords = np.empty(np.broadcast(a, b).shape + (2,))
incoords[..., 0] = a
incoords[..., 1] = b
if date_obs is None:
return incoords
import astropy
if astropy.__version__ < "1":
time = Time(
date_obs if isscalarlike(time) else [date_obs], scale="utc"
) + TimeDelta(time, format="sec")
else:
time = Time(date_obs, scale="utc") + TimeDelta(time, format="sec")
return incoords, time
[docs]
def equ2gal(ra, dec):
"""
equ2gal(ra, dec) -> l, b
Equatorial to galactic spherical conversion. Angles are in degrees.
"""
incoords = _format_sphconv(ra, dec)
outcoords = SphericalEquatorial2GalacticOperator(degrees=True)(incoords)
return outcoords[..., 0], outcoords[..., 1]
[docs]
def gal2equ(l, b):
"""
gal2equ(l, b) -> ra, dec
Galactic to equatorial spherical conversion. Angles are in degrees.
"""
incoords = _format_sphconv(l, b)
outcoords = SphericalGalactic2EquatorialOperator(degrees=True)(incoords)
return outcoords[..., 0], outcoords[..., 1]
[docs]
def equ2hor(
ra,
dec,
time,
date_obs=QubicSampling.DEFAULT_DATE_OBS,
latitude=DOMECLAT,
longitude=DOMECLON,
):
"""
equ2hor(ra, dec, time, [date_obs, [latitude, [longitude]]]) -> az, el
Equatorial to horizontal spherical conversion. Angles are in degrees.
Parameters
----------
ra : float
Right ascension in degrees.
dec : float
Declination in degrees.
time : array-like
Elapsed time in seconds since date_obs.
date_obs : string
The starting date, UTC.
latitude : float
The observer's latitude geolocation. Default is Dome C.
longitude : float
The observer's longitude geolocation. Default is Dome C.
Example
-------
>>> equ2hor(0, 0, 0, date_obs='2000-01-01 00:00:00')
(array(135.71997181016644), array(-10.785386358099927))
"""
_, time = _format_sphconv(ra, dec, date_obs, time)
ra_array = np.ones(time.shape) * ra
dec_array = np.ones(time.shape) * dec
incoords = np.zeros((time.shape[0], 2))
incoords[..., 0], incoords[..., 1] = ra_array, dec_array
outcoords = SphericalEquatorial2HorizontalOperator(
"NE", time, latitude, longitude, degrees=True
)(incoords)
return outcoords[..., 0], outcoords[..., 1]
[docs]
def hor2equ(
azimuth,
elevation,
time,
date_obs=QubicSampling.DEFAULT_DATE_OBS,
latitude=DOMECLAT,
longitude=DOMECLON,
):
"""
hor2equ(az, el, time, [date_obs, [latitude, [longitude]]]) -> ra, dec
Horizontal to equatorial spherical conversion. Angles are in degrees.
Parameters
----------
azimuth : float
elevation : float
time : array-like
Elapsed time in seconds since date_obs.
date_obs : string
The starting date, UTC.
latitude : float
The observer's latitude geolocation. Default is Dome C.
longitude : float
The observer's longitude geolocation. Default is Dome C.
Example
-------
>>> hor2equ(135.71997181016644, -10.785386358099927, 0,
... date_obs='2000-01-01 00:00:00')
(array(1.1927080055488187e-14), array(-1.2722218725854067e-14))
"""
incoords, time = _format_sphconv(azimuth, elevation, date_obs, time)
outcoords = SphericalHorizontal2EquatorialOperator(
"NE", time, latitude, longitude, degrees=True
)(incoords)
return outcoords[..., 0], outcoords[..., 1]
[docs]
def gal2hor(
l,
b,
time,
date_obs=QubicSampling.DEFAULT_DATE_OBS,
latitude=DOMECLAT,
longitude=DOMECLON,
):
"""
gal2hor(l, b, time, [date_obs, [latitude, [longitude]]]) -> az, el
Galactic to horizontal spherical conversion. Angles are in degrees.
Parameters
----------
time : array-like
Elapsed time in seconds since date_obs.
date_obs : string
The starting date, UTC.
latitude : float
The observer's latitude geolocation. Default is Dome C.
longitude : float
The observer's longitude geolocation. Default is Dome C.
Example
-------
>>> gal2hor(0, 0, 0)
(array(50.35837815921487), array(39.212362279976155))
"""
incoords, time = _format_sphconv(l, b, date_obs, time)
g2e = SphericalGalactic2EquatorialOperator(degrees=True)
e2h = SphericalEquatorial2HorizontalOperator(
"NE", time, latitude, longitude, degrees=True
)
outcoords = e2h(g2e(incoords))
return outcoords[..., 0], outcoords[..., 1]
[docs]
def hor2gal(
azimuth,
elevation,
time,
date_obs=QubicSampling.DEFAULT_DATE_OBS,
latitude=DOMECLAT,
longitude=DOMECLON,
):
"""
hor2gal(az, el, time, [date_obs, [latitude, [longitude]]]) -> l, b
Horizontal to galactic spherical conversion. Angles are in degrees.
Parameters
----------
azimuth : float
elevation : float
time : array-like
Elapsed time in seconds since date_obs.
date_obs : string
The starting date, UTC.
latitude : float
The observer's latitude geolocation. Default is Dome C.
longitude : float
The observer's longitude geolocation. Default is Dome C.
Example
-------
>>> hor2gal(50.35837815921487, 39.212362279976155, 0)
(array(4.452776554048925e-14), array(-7.63333123551244e-14))
"""
incoords, time = _format_sphconv(azimuth, elevation, date_obs, time)
h2e = SphericalHorizontal2EquatorialOperator(
"NE", time, latitude, longitude, degrees=True
)
e2g = SphericalEquatorial2GalacticOperator(degrees=True)
outcoords = e2g(h2e(incoords))
return outcoords[..., 0], outcoords[..., 1]