"""Tasks for generating random gain fluctuations in the data and stacking them."""
import numpy as np
from caput import config, mpiarray, pipeline
from ..core import containers, io, task
[docs]
class BaseGains(task.SingleTask):
"""Rudimentary class to generate gain timestreams.
The gains are drawn for times which match up to an input timestream file.
Attributes
----------
amp: bool
Generate gain amplitude fluctuations. Default is True.
phase: bool
Generate gain phase fluctuations. Default is True.
"""
amp = config.Property(default=True, proptype=bool)
phase = config.Property(default=True, proptype=bool)
_prev_time = None
[docs]
def process(self, data):
"""Generate a gain timestream for the inputs and times in `data`.
Parameters
----------
data : :class:`containers.TimeStream`
Generate gain errors for `data`.
Returns
-------
gain : :class:`containers.GainData`
"""
data.redistribute("freq")
time = data.time
gain_data = containers.GainData(axes_from=data, comm=data.comm)
gain_data.redistribute("input")
# Save some useful attributes
self.ninput_local = gain_data.gain.local_shape[1]
self.ninput_global = gain_data.gain.global_shape[1]
self.freq = data.index_map["freq"]["centre"][:]
gain_amp = 1.0
gain_phase = 0.0
if self.amp:
gain_amp = self._generate_amp(time)
if self.phase:
gain_phase = self._generate_phase(time)
# Combine into an overall gain fluctuation
gain_comb = gain_amp * np.exp(1.0j * gain_phase)
# Copy the gain entries into the output container
gain = mpiarray.MPIArray.wrap(gain_comb, axis=1, comm=data.comm)
gain_data.gain[:] = gain
# Keep a reference to time around for the next round
self._prev_time = time
return gain_data
def _corr_func(self, zeta, amp):
"""Generate the correlation function.
Parameters
----------
zeta: float
Correlation length
amp : float
Amplitude (given as standard deviation) of fluctuations.
"""
def _cf(x):
dij = x[:, np.newaxis] - x[np.newaxis, :]
return amp**2 * np.exp(-0.5 * (dij / zeta) ** 2)
return _cf
def _generate_amp(self, time):
"""Generate phase gain errors.
This implementation is blank. Must be overriden.
Parameters
----------
time : np.ndarray
Generate amplitude fluctuations for this time period.
"""
raise NotImplementedError
def _generate_phase(self, time):
"""Generate phase gain errors.
This implementation is blank. Must be overriden.
Parameters
----------
time : np.ndarray
Generate phase fluctuations for this time period.
"""
raise NotImplementedError
[docs]
class SiderealGains(BaseGains):
"""Task for simulating sidereal gains.
This base class is useful for generating gain errors in sidereal time.
The simulation period is set by `start_time` and `end_time` and does not
need any input to `process`.
Attributes
----------
start_time, end_time : float or datetime
Start and end times of the gain timestream to simulate. Needs to be either a
`float` (UNIX time) or a `datetime` objects in UTC. This determines the set
of LSDs to generate data for.
"""
start_time = config.utc_time()
end_time = config.utc_time()
[docs]
def setup(self, bt, sstream):
"""Set up an observer and the data to use for this simulation.
Parameters
----------
bt : beamtransfer.BeamTransfer or manager.ProductManager
Sets up an observer holding the geographic location of the telscope.
sstream : containers.SiderealStream
The sidereal data to use for this gain simulation.
"""
self.observer = io.get_telescope(bt)
self.lsd_start = self.observer.unix_to_lsd(self.start_time)
self.lsd_end = self.observer.unix_to_lsd(self.end_time)
self.log.info(
"Sidereal period requested: LSD=%i to LSD=%i",
int(self.lsd_start),
int(self.lsd_end),
)
# Initialize the current lsd time
self._current_lsd = None
self.sstream = sstream
[docs]
def process(self):
"""Generate a gain timestream for the inputs and times in `data`.
Returns
-------
gain : :class:`containers.SiderealGainData`
Simulated gain errors in sidereal time.
"""
# If current_lsd is None then this is the first time we've run
if self._current_lsd is None:
# Check if lsd is an integer, if not add an lsd
if isinstance(self.lsd_start, int):
self._current_lsd = int(self.lsd_start)
else:
self._current_lsd = int(self.lsd_start + 1)
# Check if we have reached the end of the requested time
if self._current_lsd >= self.lsd_end:
raise pipeline.PipelineStopIteration
# Convert the current lsd day to unix time
unix_start = self.observer.lsd_to_unix(self._current_lsd)
unix_end = self.observer.lsd_to_unix(self._current_lsd + 1)
# Distribute the sidereal data and create a time array
data = self.sstream
data.redistribute("freq")
self.freq = data.index_map["freq"]["centre"][:]
nra = len(data.ra)
time = np.linspace(unix_start, unix_end, nra, endpoint=False)
# Make a sidereal gain data container
gain_data = containers.SiderealGainData(axes_from=data, comm=data.comm)
gain_data.redistribute("input")
self.ninput_local = gain_data.gain.local_shape[1]
self.ninput_global = gain_data.gain.global_shape[1]
gain_amp = 1.0
gain_phase = 0.0
if self.amp:
gain_amp = self._generate_amp(time)
if self.phase:
gain_phase = self._generate_phase(time)
# Combine into an overall gain fluctuation
gain_comb = gain_amp * np.exp(1.0j * gain_phase)
# Copy the gain entries into the output container
gain = mpiarray.MPIArray.wrap(gain_comb, axis=1, comm=data.comm)
gain_data.gain[:] = gain
gain_data.attrs["lsd"] = self._current_lsd
gain_data.attrs["tag"] = f"lsd_{self._current_lsd:d}"
# Increment current lsd
self._current_lsd += 1
# Keep a reference to time around for the next round
self._prev_time = time
return gain_data
[docs]
class RandomGains(BaseGains):
r"""Generate random gains.
Notes
-----
The Random Gains class generates random fluctuations in gain amplitude and
phase.
Attributes
----------
corr_length_amp, corr_length_phase : float
Correlation length for amplitude and phase fluctuations in seconds.
sigma_amp, sigma_phase : float
Size of fluctuations for amplitude (fractional), and phase (radians).
Notes
-----
This task generates gain time streams which are Gaussian distributed with
covariance
.. math::
C_{ij} = \sigma^2 \exp{\left(-\frac{1}{2 \xi^2}(t_i - t_j)^2\right)}
As the time stream is generated in separate pieces, to ensure that there is
consistency between them each gain time stream is drawn as a constrained
realisation against the previous file.
"""
corr_length_amp = config.Property(default=3600.0, proptype=float)
corr_length_phase = config.Property(default=3600.0, proptype=float)
sigma_amp = config.Property(default=0.02, proptype=float)
sigma_phase = config.Property(default=0.1, proptype=float)
_prev_amp = None
_prev_phase = None
def _generate_amp(self, time):
# Generate the correlation function
cf_amp = self._corr_func(self.corr_length_amp, self.sigma_amp)
ninput = self.ninput_local
num_realisations = len(self.freq) * ninput
ntime = len(time)
# Generate amplitude fluctuations
gain_amp = generate_fluctuations(
time, cf_amp, num_realisations, self._prev_time, self._prev_amp
)
# Save amplitude fluctuations to instannce
self._prev_amp = gain_amp
gain_amp = gain_amp.reshape((len(self.freq), ninput, ntime))
return 1.0 + gain_amp
def _generate_phase(self, time):
# Generate the correlation function
cf_phase = self._corr_func(self.corr_length_phase, self.sigma_phase)
ninput = self.ninput_local
num_realisations = len(self.freq) * ninput
ntime = len(time)
# Generate phase fluctuations
gain_phase_fluc = generate_fluctuations(
time, cf_phase, num_realisations, self._prev_time, self._prev_phase
)
# Save phase fluctuations to instannce
self._prev_phase = gain_phase_fluc
# Reshape to correct size
return gain_phase_fluc.reshape((len(self.freq), ninput, ntime))
[docs]
class RandomSiderealGains(RandomGains, SiderealGains):
"""Generate random gains on a Sidereal grid.
See the documentation for `RandomGains` and `SiderealGains` for more detail.
"""
pass
[docs]
class GainStacker(task.SingleTask):
r"""Take sidereal gain data, make products and stack them up.
Attributes
----------
only_gains : bool
Whether to return only the stacked gains or the stacked gains
mulitplied with the visibilites. Default: False.
Notes
-----
This task generates products of gain time streams for every sidereal day and
stacks them up over the number of days in the simulation.
More formally a gain stack can be described as
.. math::
G_{ij} = \sum_{a}^{Ndays} g_{i}(t)^{a} g_j(t)^{*a}
"""
only_gains = config.Property(default=False, proptype=bool)
gain_stack = None
lsd_list = None
[docs]
def setup(self, stream):
"""Get the sidereal stream onto which we stack the simulated gain data.
Parameters
----------
stream : containers.SiderealStream or containers.TimeStream
The sidereal or time data to use.
"""
self.stream = stream
[docs]
def process(self, gain):
"""Make sidereal gain products and stack them up.
Parameters
----------
gain : containers.SiderealGainData or containers.GainData
Individual sidereal or time ordered gain data.
Returns
-------
gain_stack : containers.TimeStream or containers.SiderealStream
Stacked products of gains.
"""
stream = self.stream
prod = stream.index_map["prod"]
if "lsd" in gain.attrs:
input_lsd = gain.attrs["lsd"]
else:
input_lsd = -1
input_lsd = _ensure_list(input_lsd)
# If gain_stack is None create an MPIArray to hold the product expanded
# gain data and redistribute over all freq
if self.gain_stack is None:
self.gain_stack = containers.empty_like(stream)
self.gain_stack.redistribute("freq")
gain.redistribute("freq")
gsv = self.gain_stack.vis[:]
g = gain.gain[:]
for pi, (ii, jj) in enumerate(prod):
gsv[:, pi, :] = g[:, ii] * np.conjugate(g[:, jj])
self.gain_stack.weight[:] = np.ones(self.gain_stack.vis.local_shape)
self.lsd_list = input_lsd
self.log.info("Starting gain stack with LSD:%i", input_lsd[0])
return
# Keep gains around for next round, save current lsd to list, log
self.log.info("Adding LSD:%i to gain stack", gain.attrs["lsd"])
gain.redistribute("freq")
gsv = self.gain_stack.vis[:]
g = gain.gain[:]
# Calculate the gain products
for pi, (ii, jj) in enumerate(prod):
gsv[:, pi] += g[:, ii] * np.conjugate(g[:, jj])
self.gain_stack.weight[:] += np.ones(self.gain_stack.vis.local_shape)
self.lsd_list += input_lsd
[docs]
def process_finish(self):
"""Multiply summed gain with sidereal stream.
Returns
-------
data : containers.SiderealStream or containers.TimeStream
Stack of sidereal data with gain applied.
"""
# If requested, or shapes of visibilties and gain stack don't match then just return stack.
if (
self.stream.vis[:].shape[-1] != self.gain_stack.vis[:].shape[-1]
) or self.only_gains:
self.log.info("Saving only gain stack")
self.log.info(
"Either requested or shapes of visibilites and gain stack do not match"
)
self.gain_stack.vis[:] = self.gain_stack.vis[:] / self.gain_stack.weight[:]
return self.gain_stack
data = containers.empty_like(self.stream)
data.redistribute("freq")
self.gain_stack.vis[:] = self.gain_stack.vis[:] / self.gain_stack.weight[:]
data.vis[:] = self.stream.vis[:] * self.gain_stack.vis[:]
data.weight[:] = self.stream.weight[:]
data.attrs["tag"] = "gain_stack"
return data
def _ensure_list(x):
if hasattr(x, "__iter__"):
y = list(x)
else:
y = [x]
return y
[docs]
def generate_fluctuations(x, corrfunc, n, prev_x, prev_fluc):
"""Generate correlated random streams.
Generates a Gaussian field from the given correlation function and (potentially)
correlated with prior data.
Parameters
----------
x : np.ndarray[npoints]
Coordinates of samples in the new stream.
corrfunc : function
See documentation of `gaussian_realisation`.
prev_x : np.ndarray[npoints]
Coordinates of previous samples. Ignored if `prev_fluc` is None.
prev_fluc : np.ndarray[npoints]
Values of previous samples. If `None` the stream is initialised.
n : int
Number of realisations to generate.
Returns
-------
y : np.ndarray[n, npoints]
Realisations of the stream.
"""
nx = len(x)
if prev_fluc is None:
fluctuations = gaussian_realisation(x, corrfunc, n).reshape(n, nx)
else:
fluctuations = constrained_gaussian_realisation(
x, corrfunc, n, prev_x, prev_fluc
).reshape(n, nx)
return fluctuations
[docs]
def gaussian_realisation(x, corrfunc, n, rcond=1e-12):
"""Generate a Gaussian random field.
Parameters
----------
x : np.ndarray[npoints] or np.ndarray[npoints, ndim]
Co-ordinates of points to generate.
corrfunc : function(x) -> covariance matrix
Function that take (vectorized) co-ordinates and returns their
covariance functions.
n : integer
Number of realisations to generate.
rcond : float, optional
Ignore eigenmodes smaller than `rcond` times the largest eigenvalue.
Returns
-------
y : np.ndarray[n, npoints]
Realisations of the gaussian field.
"""
return _realisation(corrfunc(x), n, rcond)
def _realisation(C, n, rcond):
"""Create a realisation of the given covariance matrix.
Regularise by throwing away small eigenvalues.
"""
import scipy.linalg as la
# Find the eigendecomposition, truncate small modes, and use this to
# construct a matrix projecting from the non-singular space
evals, evecs = la.eigh(C)
num = np.sum(evals > rcond * evals[-1])
R = evecs[:, -num:] * evals[np.newaxis, -num:] ** 0.5
# Generate independent gaussian variables
w = np.random.standard_normal((n, num))
# Apply projection to get random field
return np.dot(w, R.T)
[docs]
def constrained_gaussian_realisation(x, corrfunc, n, x2, y2, rcond=1e-12):
"""Generate a constrained Gaussian random field.
Given a correlation function generate a Gaussian random field that is
consistent with an existing set of values of parameter `y2` located at
co-ordinates in parameter `x2`.
Parameters
----------
x : np.ndarray[npoints] or np.ndarray[npoints, ndim]
Co-ordinates of points to generate.
corrfunc : function(x) -> covariance matrix
Function that take (vectorized) co-ordinates and returns their
covariance functions.
n : integer
Number of realisations to generate.
x2 : np.ndarray[npoints] or np.ndarray[npoints, ndim]
Co-ordinates of existing points.
y2 : np.ndarray[npoints] or np.ndarray[n, npoints]
Existing values of the random field.
rcond : float, optional
Ignore eigenmodes smaller than `rcond` times the largest eigenvalue.
Returns
-------
y : np.ndarray[n, npoints]
Realisations of the gaussian field.
"""
import scipy.linalg as la
if (y2.ndim >= 2) and (n != y2.shape[0]):
raise ValueError("Array y2 of existing data has the wrong shape.")
# Calculate the covariance matrix for the full dataset
xc = np.concatenate([x, x2])
M = corrfunc(xc)
# Select out the different blocks
l = len(x)
A = M[:l, :l]
B = M[:l, l:]
C = M[l:, l:]
# This method tends to be unstable when there are singular modes in the
# covariance matrix (i.e. modes with zero variance). We can remove these by
# projecting onto the non-singular modes.
# Find the eigendecomposition and construct projection matrices onto the
# non-singular space
evals_A, evecs_A = la.eigh(A)
evals_C, evecs_C = la.eigh(C)
num_A = np.sum(evals_A > rcond * evals_A.max())
num_C = np.sum(evals_C > rcond * evals_C.max())
R_A = evecs_A[:, -num_A:]
R_C = evecs_C[:, -num_C:]
# Construct the covariance blocks in the reduced basis
A_r = np.diag(evals_A[-num_A:])
B_r = np.dot(R_A.T, np.dot(B, R_C))
Ci_r = np.diag(1.0 / evals_C[-num_C:])
# Project the existing data into the new basis
y2_r = np.dot(y2, R_C)
# Calculate the mean of the new variables
z_r = np.dot(y2_r, np.dot(Ci_r, B_r.T))
# Generate fluctuations for the new variables (in the reduced basis)
Ap_r = A_r - np.dot(B_r, np.dot(Ci_r, B_r.T))
y_r = _realisation(Ap_r, n, rcond)
# Project into the original basis for A
return np.dot(z_r + y_r, R_A.T)