# utils.py
"""
Utilities module containing various useful
functions for use in other modules.
"""
import logging
import numpy as np
import scipy.linalg as sl
import scipy.sparse as sps
import scipy.special as ss
from pkg_resources import Requirement, resource_filename
from scipy.integrate import odeint
from scipy.interpolate import interp1d
import enterprise
from enterprise import constants as const
from enterprise import signals as sigs # noqa: F401
from enterprise.signals.gp_bases import ( # noqa: F401
createfourierdesignmatrix_dm,
createfourierdesignmatrix_env,
createfourierdesignmatrix_eph,
createfourierdesignmatrix_ephem,
createfourierdesignmatrix_red,
)
from enterprise.signals.gp_priors import powerlaw, turnover # noqa: F401
from enterprise.signals.parameter import function
logger = logging.getLogger(__name__)
try:
from sksparse.cholmod import cholesky
except ImportError: # pragma no cover
logger.warning("sksparse not installed. You'll need sksparse for get_coefficients() with common signals!")
[docs]def get_coefficients(pta, params, n=1, phiinv_method="cliques", common_sparse=False):
ret = []
TNrs = pta.get_TNr(params)
TNTs = pta.get_TNT(params)
phiinvs = pta.get_phiinv(params, logdet=False, method=phiinv_method)
# ...repeated code in the two if branches... refactor at will!
if pta._commonsignals:
if common_sparse:
Sigma = sps.block_diag(TNTs, "csc") + sps.csc_matrix(phiinvs)
TNr = np.concatenate(TNrs)
ch = cholesky(Sigma)
mn = ch(TNr)
Li = sps.linalg.inv(ch.L()).toarray()
else:
Sigma = sl.block_diag(*TNTs) + phiinvs
TNr = np.concatenate(TNrs)
u, s, _ = sl.svd(Sigma)
mn = np.dot(u, np.dot(u.T, TNr) / s)
Li = u * np.sqrt(1 / s)
for j in range(n):
b = mn + np.dot(Li, np.random.randn(Li.shape[0]))
pardict, ntot = {}, 0
for i, model in enumerate(pta.pulsarmodels):
for sig in model._signals:
if sig.signal_type in ["basis", "common basis"]:
nb = sig.get_basis(params=params).shape[1]
if nb + ntot > len(b):
raise IndexError(
"Missing some parameters! " "You need to disable GP " "basis column reuse."
)
pardict[sig.name + "_coefficients"] = b[ntot : nb + ntot]
ntot += nb
if len(ret) <= j:
ret.append(params.copy())
ret[j].update(pardict)
return ret[0] if n == 1 else ret
else:
for i, model in enumerate(pta.pulsarmodels):
phiinv, d, TNT = phiinvs[i], TNrs[i], TNTs[i]
Sigma = TNT + (np.diag(phiinv) if phiinv.ndim == 1 else phiinv)
try:
u, s, _ = sl.svd(Sigma)
mn = np.dot(u, np.dot(u.T, d) / s)
Li = u * np.sqrt(1 / s)
except np.linalg.LinAlgError:
Q, R = sl.qr(Sigma)
Sigi = sl.solve(R, Q.T)
mn = np.dot(Sigi, d)
u, s, _ = sl.svd(Sigi)
Li = u * np.sqrt(1 / s)
for j in range(n):
b = mn + np.dot(Li, np.random.randn(Li.shape[0]))
pardict, ntot = {}, 0
for sig in model._signals:
if sig.signal_type == "basis":
nb = sig.get_basis(params=params).shape[1]
if nb + ntot > len(b):
raise IndexError(
"Missing some parameters! " "You need to disable GP " "basis column reuse."
)
pardict[sig.name + "_coefficients"] = b[ntot : nb + ntot]
ntot += nb
if len(ret) <= j:
ret.append(params.copy())
ret[j].update(pardict)
return ret[0] if n == 1 else ret
[docs]class KernelMatrix(np.ndarray):
def __new__(cls, init):
if isinstance(init, int):
ret = np.zeros(init, "d").view(cls)
else:
ret = init.view(cls)
if ret.ndim == 2:
ret._cliques = -1 * np.ones(ret.shape[0])
ret._clcount = 0
return ret
# see PTA._setcliques
def _setcliques(self, idxs):
allidx = set(self._cliques[idxs])
maxidx = max(allidx)
if maxidx == -1:
self._cliques[idxs] = self._clcount
self._clcount = self._clcount + 1
else:
self._cliques[idxs] = maxidx
if len(allidx) > 1:
self._cliques[np.in1d(self._cliques, allidx)] = maxidx
[docs] def add(self, other, idx):
if other.ndim == 2 and self.ndim == 1:
self = KernelMatrix(np.diag(self))
if self.ndim == 1:
self[idx] += other
else:
if other.ndim == 1:
self[idx, idx] += other
else:
self._setcliques(idx)
idx = (idx, idx) if isinstance(idx, slice) else (idx[:, None], idx)
self[idx] += other
return self
[docs] def set(self, other, idx):
if other.ndim == 2 and self.ndim == 1:
self = KernelMatrix(np.diag(self))
if self.ndim == 1:
self[idx] = other
else:
if other.ndim == 1:
self[idx, idx] = other
else:
self._setcliques(idx)
idx = (idx, idx) if isinstance(idx, slice) else (idx[:, None], idx)
self[idx] = other
return self
[docs] def inv(self, logdet=False):
if self.ndim == 1:
inv = 1.0 / self
if logdet:
return inv, np.sum(np.log(self))
else:
return inv
else:
try:
cf = sl.cho_factor(self)
inv = sl.cho_solve(cf, np.identity(cf[0].shape[0]))
if logdet:
ld = 2.0 * np.sum(np.log(np.diag(cf[0])))
except np.linalg.LinAlgError:
u, s, v = np.linalg.svd(self)
inv = np.dot(u / s, u.T)
if logdet:
ld = np.sum(np.log(s))
if logdet:
return inv, ld
else:
return inv
[docs]def create_stabletimingdesignmatrix(designmat, fastDesign=True):
"""
Stabilize the timing-model design matrix.
:param designmat: Pulsar timing model design matrix
:param fastDesign: Stabilize the design matrix the fast way [True]
:return: Mm: Stabilized timing model design matrix
"""
Mm = designmat.copy()
if fastDesign:
norm = np.sqrt(np.sum(Mm ** 2, axis=0))
Mm /= norm
else:
u, s, v = np.linalg.svd(Mm)
Mm = u[:, : len(s)]
return Mm
###################################
# Deterministic GW signal functions
###################################
[docs]def make_ecc_interpolant():
"""
Make interpolation function from eccentricity file to
determine number of harmonics to use for a given
eccentricity.
:returns: interpolant
"""
pth = resource_filename(Requirement.parse("libstempo"), "libstempo/ecc_vs_nharm.txt")
fil = np.loadtxt(pth)
return interp1d(fil[:, 0], fil[:, 1])
[docs]def get_edot(F, mc, e):
"""
Compute eccentricity derivative from Taylor et al. (2016)
:param F: Orbital frequency [Hz]
:param mc: Chirp mass of binary [Solar Mass]
:param e: Eccentricity of binary
:returns: de/dt
"""
# chirp mass
mc *= const.Tsun
dedt = -304 / (15 * mc) * (2 * np.pi * mc * F) ** (8 / 3) * e * (1 + 121 / 304 * e ** 2) / ((1 - e ** 2) ** (5 / 2))
return dedt
[docs]def get_Fdot(F, mc, e):
"""
Compute frequency derivative from Taylor et al. (2016)
:param F: Orbital frequency [Hz]
:param mc: Chirp mass of binary [Solar Mass]
:param e: Eccentricity of binary
:returns: dF/dt
"""
# chirp mass
mc *= const.Tsun
dFdt = (
48
/ (5 * np.pi * mc ** 2)
* (2 * np.pi * mc * F) ** (11 / 3)
* (1 + 73 / 24 * e ** 2 + 37 / 96 * e ** 4)
/ ((1 - e ** 2) ** (7 / 2))
)
return dFdt
[docs]def get_gammadot(F, mc, q, e):
"""
Compute gamma dot from Barack and Cutler (2004)
:param F: Orbital frequency [Hz]
:param mc: Chirp mass of binary [Solar Mass]
:param q: Mass ratio of binary
:param e: Eccentricity of binary
:returns: dgamma/dt
"""
# chirp mass
mc *= const.Tsun
# total mass
m = (((1 + q) ** 2) / q) ** (3 / 5) * mc
dgdt = (
6
* np.pi
* F
* (2 * np.pi * F * m) ** (2 / 3)
/ (1 - e ** 2)
* (1 + 0.25 * (2 * np.pi * F * m) ** (2 / 3) / (1 - e ** 2) * (26 - 15 * e ** 2))
)
return dgdt
[docs]def get_coupled_constecc_eqns(y, t, mc, e0):
"""
Computes the coupled system of differential
equations from Peters (1964) and Barack &
Cutler (2004). This is a system of three variables:
F: Orbital frequency [Hz]
phase0: Orbital phase [rad]
:param y: Vector of input parameters [F, e, gamma]
:param t: Time [s]
:param mc: Chirp mass of binary [Solar Mass]
:returns: array of derivatives [dF/dt, dphase/dt]
"""
F = y[0]
dFdt = get_Fdot(F, mc, e0)
dphasedt = 2 * np.pi * F
return np.array([dFdt, dphasedt])
[docs]def get_coupled_ecc_eqns(y, t, mc, q):
"""
Computes the coupled system of differential
equations from Peters (1964) and Barack &
Cutler (2004). This is a system of three variables:
F: Orbital frequency [Hz]
e: Orbital eccentricity
gamma: Angle of precession of periastron [rad]
phase0: Orbital phase [rad]
:param y: Vector of input parameters [F, e, gamma]
:param t: Time [s]
:param mc: Chirp mass of binary [Solar Mass]
:param q: Mass ratio of binary
:returns: array of derivatives [dF/dt, de/dt, dgamma/dt, dphase/dt]
"""
F = y[0]
e = y[1]
dFdt = get_Fdot(F, mc, e)
dedt = get_edot(F, mc, e)
dgdt = get_gammadot(F, mc, q, e)
dphasedt = 2 * np.pi * F
return np.array([dFdt, dedt, dgdt, dphasedt])
[docs]def solve_coupled_constecc_solution(F0, e0, phase0, mc, t):
"""
Compute the solution to the coupled system of equations
from from Peters (1964) and Barack & Cutler (2004) at
a given time.
:param F0: Initial orbital frequency [Hz]
:param mc: Chirp mass of binary [Solar Mass]
:param t: Time at which to evaluate solution [s]
:returns: (F(t), phase(t))
"""
y0 = np.array([F0, phase0])
y, infodict = odeint(get_coupled_constecc_eqns, y0, t, args=(mc, e0), full_output=True)
if infodict["message"] == "Integration successful.":
ret = y
else:
ret = 0
return ret
[docs]def solve_coupled_ecc_solution(F0, e0, gamma0, phase0, mc, q, t):
"""
Compute the solution to the coupled system of equations
from from Peters (1964) and Barack & Cutler (2004) at
a given time.
:param F0: Initial orbital frequency [Hz]
:param e0: Initial orbital eccentricity
:param gamma0: Initial angle of precession of periastron [rad]
:param mc: Chirp mass of binary [Solar Mass]
:param q: Mass ratio of binary
:param t: Time at which to evaluate solution [s]
:returns: (F(t), e(t), gamma(t), phase(t))
"""
y0 = np.array([F0, e0, gamma0, phase0])
y, infodict = odeint(get_coupled_ecc_eqns, y0, t, args=(mc, q), full_output=True)
if infodict["message"] == "Integration successful.":
ret = y
else:
ret = 0
return ret
[docs]def get_an(n, mc, dl, h0, F, e):
"""
Compute a_n from Eq. 22 of Taylor et al. (2016).
:param n: Harmonic number
:param mc: Chirp mass of binary [Solar Mass]
:param dl: Luminosity distance [Mpc]
:param F: Orbital frequency of binary [Hz]
:param e: Orbital Eccentricity
:returns: a_n
"""
# convert to seconds
mc *= const.Tsun
dl *= const.Mpc / const.c
omega = 2 * np.pi * F
if h0 is None:
amp = n * mc ** (5 / 3) * omega ** (2 / 3) / dl
elif h0 is not None:
amp = n * h0 / 2.0
ret = -amp * (
ss.jn(n - 2, n * e)
- 2 * e * ss.jn(n - 1, n * e)
+ (2 / n) * ss.jn(n, n * e)
+ 2 * e * ss.jn(n + 1, n * e)
- ss.jn(n + 2, n * e)
)
return ret
[docs]def get_bn(n, mc, dl, h0, F, e):
"""
Compute b_n from Eq. 22 of Taylor et al. (2015).
:param n: Harmonic number
:param mc: Chirp mass of binary [Solar Mass]
:param dl: Luminosity distance [Mpc]
:param F: Orbital frequency of binary [Hz]
:param e: Orbital Eccentricity
:returns: b_n
"""
# convert to seconds
mc *= const.Tsun
dl *= const.Mpc / const.c
omega = 2 * np.pi * F
if h0 is None:
amp = n * mc ** (5 / 3) * omega ** (2 / 3) / dl
elif h0 is not None:
amp = n * h0 / 2.0
ret = -amp * np.sqrt(1 - e ** 2) * (ss.jn(n - 2, n * e) - 2 * ss.jn(n, n * e) + ss.jn(n + 2, n * e))
return ret
[docs]def get_cn(n, mc, dl, h0, F, e):
"""
Compute c_n from Eq. 22 of Taylor et al. (2016).
:param n: Harmonic number
:param mc: Chirp mass of binary [Solar Mass]
:param dl: Luminosity distance [Mpc]
:param F: Orbital frequency of binary [Hz]
:param e: Orbital Eccentricity
:returns: c_n
"""
# convert to seconds
mc *= const.Tsun
dl *= const.Mpc / const.c
omega = 2 * np.pi * F
if h0 is None:
amp = 2 * mc ** (5 / 3) * omega ** (2 / 3) / dl
elif h0 is not None:
amp = h0
ret = amp * ss.jn(n, n * e) / (n * omega)
return ret
[docs]def calculate_splus_scross(nmax, mc, dl, h0, F, e, t, l0, gamma, gammadot, inc):
"""
Calculate splus and scross for a CGW summed over all harmonics.
This waveform differs slightly from that in Taylor et al (2016)
in that it includes the time dependence of the advance of periastron.
:param nmax: Total number of harmonics to use
:param mc: Chirp mass of binary [Solar Mass]
:param dl: Luminosity distance [Mpc]
:param F: Orbital frequency of binary [Hz]
:param e: Orbital Eccentricity
:param t: TOAs [s]
:param l0: Initial eccentric anomoly [rad]
:param gamma: Angle of periastron advance [rad]
:param gammadot: Time derivative of angle of periastron advance [rad/s]
:param inc: Inclination angle [rad]
:return splus, scross: plus and cross time-domain waveforms for a CGW
"""
n = np.arange(1, nmax)
# time dependent amplitudes
an = get_an(n, mc, dl, h0, F, e)
bn = get_bn(n, mc, dl, h0, F, e)
cn = get_cn(n, mc, dl, h0, F, e)
# time dependent terms
omega = 2 * np.pi * F
gt = gamma + gammadot * t
lt = l0 + omega * t
# tiled phase
phase1 = n * np.tile(lt, (nmax - 1, 1)).T
phase2 = np.tile(gt, (nmax - 1, 1)).T
sinp1 = np.sin(phase1)
cosp1 = np.cos(phase1)
sinp2 = np.sin(2 * phase2)
cosp2 = np.cos(2 * phase2)
sinpp = sinp1 * cosp2 + cosp1 * sinp2
cospp = cosp1 * cosp2 - sinp1 * sinp2
sinpm = sinp1 * cosp2 - cosp1 * sinp2
cospm = cosp1 * cosp2 + sinp1 * sinp2
# intermediate terms
sp = sinpm / (n * omega - 2 * gammadot) + sinpp / (n * omega + 2 * gammadot)
sm = sinpm / (n * omega - 2 * gammadot) - sinpp / (n * omega + 2 * gammadot)
cp = cospm / (n * omega - 2 * gammadot) + cospp / (n * omega + 2 * gammadot)
cm = cospm / (n * omega - 2 * gammadot) - cospp / (n * omega + 2 * gammadot)
splus_n = -0.5 * (1 + np.cos(inc) ** 2) * (an * sp - bn * sm) + (1 - np.cos(inc) ** 2) * cn * sinp1
scross_n = np.cos(inc) * (an * cm - bn * cp)
return np.sum(splus_n, axis=1), np.sum(scross_n, axis=1)
[docs]def create_gw_antenna_pattern(pos, gwtheta, gwphi):
"""
Function to create pulsar antenna pattern functions as defined
in Ellis, Siemens, and Creighton (2012).
:param pos: Unit vector from Earth to pulsar
:param gwtheta: GW polar angle in radians
:param gwphi: GW azimuthal angle in radians
:return: (fplus, fcross, cosMu), where fplus and fcross
are the plus and cross antenna pattern functions
and cosMu is the cosine of the angle between the
pulsar and the GW source.
"""
# use definition from Sesana et al 2010 and Ellis et al 2012
m = np.array([np.sin(gwphi), -np.cos(gwphi), 0.0])
n = np.array([-np.cos(gwtheta) * np.cos(gwphi), -np.cos(gwtheta) * np.sin(gwphi), np.sin(gwtheta)])
omhat = np.array([-np.sin(gwtheta) * np.cos(gwphi), -np.sin(gwtheta) * np.sin(gwphi), -np.cos(gwtheta)])
fplus = 0.5 * (np.dot(m, pos) ** 2 - np.dot(n, pos) ** 2) / (1 + np.dot(omhat, pos))
fcross = (np.dot(m, pos) * np.dot(n, pos)) / (1 + np.dot(omhat, pos))
cosMu = -np.dot(omhat, pos)
return fplus, fcross, cosMu
[docs]@function
def bwm_delay(toas, pos, log10_h=-14.0, cos_gwtheta=0.0, gwphi=0.0, gwpol=0.0, t0=55000, antenna_pattern_fn=None):
"""
Function that calculates the earth-term gravitational-wave
burst-with-memory signal, as described in:
Seto et al, van haasteren and Levin, phsirkov et al, Cordes and Jenet.
This version uses the F+/Fx polarization modes, as verified with the
Continuous Wave and Anisotropy papers.
:param toas: Time-of-arrival measurements [s]
:param pos: Unit vector from Earth to pulsar
:param log10_h: log10 of GW strain
:param cos_gwtheta: Cosine of GW polar angle
:param gwphi: GW azimuthal polar angle [rad]
:param gwpol: GW polarization angle
:param t0: Burst central time [day]
:param antenna_pattern_fn:
User defined function that takes `pos`, `gwtheta`, `gwphi` as
arguments and returns (fplus, fcross)
:return: the waveform as induced timing residuals (seconds)
"""
# convert
h = 10 ** log10_h
gwtheta = np.arccos(cos_gwtheta)
t0 *= const.day
# antenna patterns
if antenna_pattern_fn is None:
apc = create_gw_antenna_pattern(pos, gwtheta, gwphi)
else:
apc = antenna_pattern_fn(pos, gwtheta, gwphi)
# grab fplus, fcross
fp, fc = apc[0], apc[1]
# combined polarization
pol = np.cos(2 * gwpol) * fp + np.sin(2 * gwpol) * fc
# Return the time-series for the pulsar
return pol * h * np.heaviside(toas - t0, 0.5) * (toas - t0)
[docs]@function
def create_quantization_matrix(toas, dt=1, nmin=2):
"""Create quantization matrix mapping TOAs to observing epochs."""
isort = np.argsort(toas)
bucket_ref = [toas[isort[0]]]
bucket_ind = [[isort[0]]]
for i in isort[1:]:
if toas[i] - bucket_ref[-1] < dt:
bucket_ind[-1].append(i)
else:
bucket_ref.append(toas[i])
bucket_ind.append([i])
# find only epochs with more than 1 TOA
bucket_ind2 = [ind for ind in bucket_ind if len(ind) >= nmin]
U = np.zeros((len(toas), len(bucket_ind2)), "d")
for i, l in enumerate(bucket_ind2):
U[l, i] = 1
weights = np.ones(U.shape[1])
return U, weights
[docs]def quant2ind(U):
"""
Use quantization matrix to return slices of non-zero elements.
:param U: quantization matrix
:return: list of `slice`s for non-zero elements of U
.. note:: This function assumes that the pulsar TOAs were sorted by time.
"""
inds = []
for cc, col in enumerate(U.T):
epinds = np.flatnonzero(col)
if epinds[-1] - epinds[0] + 1 != len(epinds):
raise ValueError("ERROR: TOAs not sorted properly!")
inds.append(slice(epinds[0], epinds[-1] + 1))
return inds
[docs]def linear_interp_basis(toas, dt=30 * 86400):
"""Provides a basis for linear interpolation.
:param toas: Pulsar TOAs in seconds
:param dt: Linear interpolation step size in seconds.
:returns: Linear interpolation basis and nodes
"""
# evenly spaced points
x = np.arange(toas.min(), toas.max() + dt, dt)
M = np.zeros((len(toas), len(x)))
# make linear interpolation basis
for ii in range(len(x) - 1):
idx = np.logical_and(toas >= x[ii], toas <= x[ii + 1])
M[idx, ii] = (toas[idx] - x[ii + 1]) / (x[ii] - x[ii + 1])
M[idx, ii + 1] = (toas[idx] - x[ii]) / (x[ii + 1] - x[ii])
# only return non-zero columns
idx = M.sum(axis=0) != 0
return M[:, idx], x[idx]
# overlap reduction functions
[docs]@function
def hd_orf(pos1, pos2):
"""Hellings & Downs spatial correlation function."""
if np.all(pos1 == pos2):
return 1
else:
omc2 = (1 - np.dot(pos1, pos2)) / 2
return 1.5 * omc2 * np.log(omc2) - 0.25 * omc2 + 0.5
[docs]@function
def dipole_orf(pos1, pos2):
"""Dipole spatial correlation function."""
if np.all(pos1 == pos2):
return 1 + 1e-5
else:
return np.dot(pos1, pos2)
[docs]@function
def monopole_orf(pos1, pos2):
"""Monopole spatial correlation function."""
if np.all(pos1 == pos2):
return 1.0 + 1e-5
else:
return 1.0
[docs]@function
def anis_orf(pos1, pos2, params, **kwargs):
"""Anisotropic GWB spatial correlation function."""
anis_basis = kwargs["anis_basis"]
psrs_pos = kwargs["psrs_pos"]
lmax = kwargs["lmax"]
psr1_index = [ii for ii in range(len(psrs_pos)) if np.all(psrs_pos[ii] == pos1)][0]
psr2_index = [ii for ii in range(len(psrs_pos)) if np.all(psrs_pos[ii] == pos2)][0]
clm = np.zeros((lmax + 1) ** 2)
clm[0] = 2.0 * np.sqrt(np.pi)
if lmax > 0:
clm[1:] = params
return sum(clm[ii] * basis for ii, basis in enumerate(anis_basis[: (lmax + 1) ** 2, psr1_index, psr2_index]))
[docs]@function
def unnormed_tm_basis(Mmat):
return Mmat, np.ones_like(Mmat.shape[1])
[docs]@function
def normed_tm_basis(Mmat, norm=None):
if norm is None:
norm = np.sqrt(np.sum(Mmat ** 2, axis=0))
nmat = Mmat / norm
nmat[:, norm == 0] = 0
return nmat, np.ones_like(Mmat.shape[1])
[docs]@function
def svd_tm_basis(Mmat):
u, s, v = np.linalg.svd(Mmat, full_matrices=False)
return u, np.ones_like(s)
[docs]@function
def tm_prior(weights):
return weights * 1e40
# Physical ephemeris model utility functions
[docs]def get_planet_orbital_elements(model="setIII"):
"""Grab physical ephemeris model files"""
dpath = enterprise.__path__[0] + "/datafiles/ephemeris/"
return (
np.load(dpath + "/jupiter-" + model + "-mjd.npy"),
np.load(dpath + "/jupiter-" + model + "-xyz-svd.npy"),
np.load(dpath + "/saturn-" + model + "-xyz-svd.npy"),
)
[docs]def ecl2eq_vec(x):
"""
Rotate (n,3) vector time series from ecliptic to equatorial.
"""
M_ecl = const.M_ecl
return np.einsum("jk,ik->ij", M_ecl, x)
[docs]def eq2ecl_vec(x):
"""
Rotate (n,3) vector time series from equatorial to ecliptic.
"""
M_ecl = const.M_ecl
return np.einsum("kj,ik->ij", M_ecl, x)
[docs]def euler_vec(z, y, x, n):
"""
Return (n,3,3) tensor with each (3,3) block containing an
Euler rotation with angles z, y, x. Optionally each of z, y, x
can be a vector of length n.
"""
L = np.zeros((n, 3, 3), "d")
cosx, sinx = np.cos(x), np.sin(x)
L[:, 0, 0] = 1
L[:, 1, 1] = L[:, 2, 2] = cosx
L[:, 1, 2] = -sinx
L[:, 2, 1] = sinx
N = np.zeros((n, 3, 3), "d")
cosy, siny = np.cos(y), np.sin(y)
N[:, 0, 0] = N[:, 2, 2] = cosy
N[:, 1, 1] = 1
N[:, 0, 2] = siny
N[:, 2, 0] = -siny
ret = np.einsum("ijk,ikl->ijl", L, N)
M = np.zeros((n, 3, 3), "d")
cosz, sinz = np.cos(z), np.sin(z)
M[:, 0, 0] = M[:, 1, 1] = cosz
M[:, 0, 1] = -sinz
M[:, 1, 0] = sinz
M[:, 2, 2] = 1
ret = np.einsum("ijk,ikl->ijl", ret, M)
return ret
[docs]def ss_framerotate(mjd, planet, x, y, z, dz, offset=None, equatorial=False):
"""
Rotate planet trajectory given as (n,3) tensor,
by ecliptic Euler angles x, y, z, and by z rate
dz. The rate has units of rad/year, and is referred
to offset 2010/1/1. dates must be given in MJD.
"""
t_offset = 55197.0 # MJD 2010/01/01
if equatorial:
planet = eq2ecl_vec(planet)
E = euler_vec(z + dz * (mjd - t_offset) / 365.25, y, x, planet.shape[0])
planet = np.einsum("ijk,ik->ij", E, planet)
if offset is not None:
planet = np.array(offset) + planet
if equatorial:
planet = ecl2eq_vec(planet)
return planet
[docs]def dmass(planet, dm_over_Msun):
return dm_over_Msun * planet
[docs]@function
def physicalephem_spectrum(sigmas):
# note the creative use of the "labels" (the very sigmas, not frequencies)
return sigmas ** 2
[docs]@function
def createfourierdesignmatrix_physicalephem(
toas,
planetssb,
pos_t,
frame_drift_rate=1e-9,
d_jupiter_mass=1.54976690e-11,
d_saturn_mass=8.17306184e-12,
d_uranus_mass=5.71923361e-11,
d_neptune_mass=7.96103855e-11,
jup_orb_elements=0.05,
sat_orb_elements=0.5,
model="setIII",
):
"""
Construct physical ephemeris perturbation design matrix and 'frequencies'.
Parameters can be excluded by setting the corresponding prior sigma to None
:param toas: vector of time series in seconds
:param pos: pulsar position as Cartesian vector
:param frame_drift_rate: normal sigma for frame drift rate
:param d_jupiter_mass: normal sigma for Jupiter mass perturbation
:param d_saturn_mass: normal sigma for Saturn mass perturbation
:param d_uranus_mass: normal sigma for Uranus mass perturbation
:param d_neptune_mass: normal sigma for Neptune mass perturbation
:param jup_orb_elements: normal sigma for Jupiter orbital elem. perturb.
:param sat_orb_elements: normal sigma for Saturn orbital elem. perturb.
:param model: vector basis used by Jupiter and Saturn perturb.;
see PhysicalEphemerisSignal, defaults to "setIII"
:return: F: Fourier design matrix of shape (len(toas), nvecs)
:return: sigmas: Phi sigmas (nvecs, to be passed to physicalephem_spectrum)
"""
# Jupiter + Saturn orbit definitions that we pass to physical_ephem_delay
oa = {}
(oa["times"], oa["jup_orbit"], oa["sat_orbit"]) = get_planet_orbital_elements(model)
dpar = 1e-5 # may need finessing
Fl, Phil = [], []
for parname in [
"frame_drift_rate",
"d_jupiter_mass",
"d_saturn_mass",
"d_uranus_mass",
"d_neptune_mass",
"jup_orb_elements",
"sat_orb_elements",
]:
ppar = locals()[parname]
if ppar:
if parname not in ["jup_orb_elements", "sat_orb_elements"]:
# need to normalize?
Fl.append(physical_ephem_delay(toas, planetssb, pos_t, **{parname: dpar}) / dpar)
Phil.append(ppar)
else:
for i in range(6):
c = np.zeros(6)
c[i] = dpar
# Fl.append(physical_ephem_delay(toas, planetssb, pos_t,
# **{parname: c}, **oa)/dpar)
kwarg_dict = {parname: c}
kwarg_dict.update(oa)
Fl.append(physical_ephem_delay(toas, planetssb, pos_t, **kwarg_dict) / dpar)
Phil.append(ppar)
return np.array(Fl).T.copy(), np.array(Phil)
[docs]@function
def physical_ephem_delay(
toas,
planetssb,
pos_t,
frame_drift_rate=0,
d_jupiter_mass=0,
d_saturn_mass=0,
d_uranus_mass=0,
d_neptune_mass=0,
jup_orb_elements=np.zeros(6, "d"),
sat_orb_elements=np.zeros(6, "d"),
times=None,
jup_orbit=None,
sat_orbit=None,
equatorial=True,
):
# convert toas to MJD
mjd = toas / 86400
# grab planet-to-SSB vectors
earth = planetssb[:, 2, :3]
jupiter = planetssb[:, 4, :3]
saturn = planetssb[:, 5, :3]
uranus = planetssb[:, 6, :3]
neptune = planetssb[:, 7, :3]
# do frame rotation
earth = ss_framerotate(mjd, earth, 0.0, 0.0, 0.0, frame_drift_rate, offset=None, equatorial=equatorial)
# mass perturbations
for planet, dm in [
(jupiter, d_jupiter_mass),
(saturn, d_saturn_mass),
(uranus, d_uranus_mass),
(neptune, d_neptune_mass),
]:
earth += dmass(planet, dm)
# Jupiter orbit perturbation
if np.any(jup_orb_elements):
tmp = 0.0009547918983127075 * np.einsum("i,ijk->jk", jup_orb_elements, jup_orbit)
earth += np.array([np.interp(mjd, times, tmp[:, aa]) for aa in range(3)]).T
# Saturn orbit perturbation
if np.any(sat_orb_elements):
tmp = 0.00028588567008942334 * np.einsum("i,ijk->jk", sat_orb_elements, sat_orbit)
earth += np.array([np.interp(mjd, times, tmp[:, aa]) for aa in range(3)]).T
# construct the true geocenter to barycenter roemer
tmp_roemer = np.einsum("ij,ij->i", planetssb[:, 2, :3], pos_t)
# create the delay
delay = tmp_roemer - np.einsum("ij,ij->i", earth, pos_t)
return delay