Source code for draco.synthesis.gain

"""Tasks for generating random gain fluctuations in the data and stacking them."""

import numpy as np
from caput import config, mpiarray
from caput.containers import empty_like
from caput.pipeline import exceptions, tasklib

from ..core import containers, io


[docs] class BaseGains(tasklib.base.ContainerTask): """Rudimentary class to generate gain timestreams. The gains are drawn for times which match up to an input timestream file. Attributes ---------- amp: bool Generate gain amplitude fluctuations. Default is True. phase: bool Generate gain phase fluctuations. Default is True. """ amp = config.Property(default=True, proptype=bool) phase = config.Property(default=True, proptype=bool) _prev_time = None
[docs] def process(self, data): """Generate a gain timestream for the inputs and times in `data`. Parameters ---------- data : :class:`containers.TimeStream` Generate gain errors for `data`. Returns ------- gain : :class:`containers.GainData` """ data.redistribute("freq") time = data.time gain_data = containers.GainData(axes_from=data, comm=data.comm) gain_data.redistribute("input") # Save some useful attributes self.ninput_local = gain_data.gain.local_shape[1] self.ninput_global = gain_data.gain.global_shape[1] self.freq = data.index_map["freq"]["centre"][:] gain_amp = 1.0 gain_phase = 0.0 if self.amp: gain_amp = self._generate_amp(time) if self.phase: gain_phase = self._generate_phase(time) # Combine into an overall gain fluctuation gain_comb = gain_amp * np.exp(1.0j * gain_phase) # Copy the gain entries into the output container gain = mpiarray.MPIArray.wrap(gain_comb, axis=1, comm=data.comm) gain_data.gain[:] = gain # Keep a reference to time around for the next round self._prev_time = time return gain_data
def _corr_func(self, zeta, amp): """Generate the correlation function. Parameters ---------- zeta: float Correlation length amp : float Amplitude (given as standard deviation) of fluctuations. """ def _cf(x): dij = x[:, np.newaxis] - x[np.newaxis, :] return amp**2 * np.exp(-0.5 * (dij / zeta) ** 2) return _cf def _generate_amp(self, time): """Generate phase gain errors. This implementation is blank. Must be overriden. Parameters ---------- time : np.ndarray Generate amplitude fluctuations for this time period. """ raise NotImplementedError def _generate_phase(self, time): """Generate phase gain errors. This implementation is blank. Must be overriden. Parameters ---------- time : np.ndarray Generate phase fluctuations for this time period. """ raise NotImplementedError
[docs] class SiderealGains(BaseGains): """Task for simulating sidereal gains. This base class is useful for generating gain errors in sidereal time. The simulation period is set by `start_time` and `end_time` and does not need any input to `process`. Attributes ---------- start_time, end_time : float or datetime Start and end times of the gain timestream to simulate. Needs to be either a `float` (UNIX time) or a `datetime` objects in UTC. This determines the set of LSDs to generate data for. """ start_time = config.utc_time() end_time = config.utc_time()
[docs] def setup(self, bt, sstream): """Set up an observer and the data to use for this simulation. Parameters ---------- bt : beamtransfer.BeamTransfer or manager.ProductManager Sets up an observer holding the geographic location of the telscope. sstream : containers.SiderealStream The sidereal data to use for this gain simulation. """ self.observer = io.get_telescope(bt) self.lsd_start = self.observer.unix_to_lsd(self.start_time) self.lsd_end = self.observer.unix_to_lsd(self.end_time) self.log.info( "Sidereal period requested: LSD=%i to LSD=%i", int(self.lsd_start), int(self.lsd_end), ) # Initialize the current lsd time self._current_lsd = None self.sstream = sstream
[docs] def process(self): """Generate a gain timestream for the inputs and times in `data`. Returns ------- gain : :class:`containers.SiderealGainData` Simulated gain errors in sidereal time. """ # If current_lsd is None then this is the first time we've run if self._current_lsd is None: # Check if lsd is an integer, if not add an lsd if isinstance(self.lsd_start, int): self._current_lsd = int(self.lsd_start) else: self._current_lsd = int(self.lsd_start + 1) # Check if we have reached the end of the requested time if self._current_lsd >= self.lsd_end: raise exceptions.PipelineStopIteration # Convert the current lsd day to unix time unix_start = self.observer.lsd_to_unix(self._current_lsd) unix_end = self.observer.lsd_to_unix(self._current_lsd + 1) # Distribute the sidereal data and create a time array data = self.sstream data.redistribute("freq") self.freq = data.index_map["freq"]["centre"][:] nra = len(data.ra) time = np.linspace(unix_start, unix_end, nra, endpoint=False) # Make a sidereal gain data container gain_data = containers.SiderealGainData(axes_from=data, comm=data.comm) gain_data.redistribute("input") self.ninput_local = gain_data.gain.local_shape[1] self.ninput_global = gain_data.gain.global_shape[1] gain_amp = 1.0 gain_phase = 0.0 if self.amp: gain_amp = self._generate_amp(time) if self.phase: gain_phase = self._generate_phase(time) # Combine into an overall gain fluctuation gain_comb = gain_amp * np.exp(1.0j * gain_phase) # Copy the gain entries into the output container gain = mpiarray.MPIArray.wrap(gain_comb, axis=1, comm=data.comm) gain_data.gain[:] = gain gain_data.attrs["lsd"] = self._current_lsd gain_data.attrs["tag"] = f"lsd_{self._current_lsd:d}" # Increment current lsd self._current_lsd += 1 # Keep a reference to time around for the next round self._prev_time = time return gain_data
[docs] class RandomGains(BaseGains): r"""Generate random gains. Notes ----- The Random Gains class generates random fluctuations in gain amplitude and phase. Attributes ---------- corr_length_amp, corr_length_phase : float Correlation length for amplitude and phase fluctuations in seconds. sigma_amp, sigma_phase : float Size of fluctuations for amplitude (fractional), and phase (radians). Notes ----- This task generates gain time streams which are Gaussian distributed with covariance .. math:: C_{ij} = \sigma^2 \exp{\left(-\frac{1}{2 \xi^2}(t_i - t_j)^2\right)} As the time stream is generated in separate pieces, to ensure that there is consistency between them each gain time stream is drawn as a constrained realisation against the previous file. """ corr_length_amp = config.Property(default=3600.0, proptype=float) corr_length_phase = config.Property(default=3600.0, proptype=float) sigma_amp = config.Property(default=0.02, proptype=float) sigma_phase = config.Property(default=0.1, proptype=float) _prev_amp = None _prev_phase = None def _generate_amp(self, time): # Generate the correlation function cf_amp = self._corr_func(self.corr_length_amp, self.sigma_amp) ninput = self.ninput_local num_realisations = len(self.freq) * ninput ntime = len(time) # Generate amplitude fluctuations gain_amp = generate_fluctuations( time, cf_amp, num_realisations, self._prev_time, self._prev_amp ) # Save amplitude fluctuations to instannce self._prev_amp = gain_amp gain_amp = gain_amp.reshape((len(self.freq), ninput, ntime)) return 1.0 + gain_amp def _generate_phase(self, time): # Generate the correlation function cf_phase = self._corr_func(self.corr_length_phase, self.sigma_phase) ninput = self.ninput_local num_realisations = len(self.freq) * ninput ntime = len(time) # Generate phase fluctuations gain_phase_fluc = generate_fluctuations( time, cf_phase, num_realisations, self._prev_time, self._prev_phase ) # Save phase fluctuations to instannce self._prev_phase = gain_phase_fluc # Reshape to correct size return gain_phase_fluc.reshape((len(self.freq), ninput, ntime))
[docs] class RandomSiderealGains(RandomGains, SiderealGains): """Generate random gains on a Sidereal grid. See the documentation for `RandomGains` and `SiderealGains` for more detail. """ pass
[docs] class GainStacker(tasklib.base.ContainerTask): r"""Take sidereal gain data, make products and stack them up. Attributes ---------- only_gains : bool Whether to return only the stacked gains or the stacked gains mulitplied with the visibilites. Default: False. Notes ----- This task generates products of gain time streams for every sidereal day and stacks them up over the number of days in the simulation. More formally a gain stack can be described as .. math:: G_{ij} = \sum_{a}^{Ndays} g_{i}(t)^{a} g_j(t)^{*a} """ only_gains = config.Property(default=False, proptype=bool) gain_stack = None lsd_list = None
[docs] def setup(self, stream): """Get the sidereal stream onto which we stack the simulated gain data. Parameters ---------- stream : containers.SiderealStream or containers.TimeStream The sidereal or time data to use. """ self.stream = stream
[docs] def process(self, gain): """Make sidereal gain products and stack them up. Parameters ---------- gain : containers.SiderealGainData or containers.GainData Individual sidereal or time ordered gain data. Returns ------- gain_stack : containers.TimeStream or containers.SiderealStream Stacked products of gains. """ stream = self.stream prod = stream.index_map["prod"] if "lsd" in gain.attrs: input_lsd = gain.attrs["lsd"] else: input_lsd = -1 input_lsd = _ensure_list(input_lsd) # If gain_stack is None create an MPIArray to hold the product expanded # gain data and redistribute over all freq if self.gain_stack is None: self.gain_stack = empty_like(stream) self.gain_stack.redistribute("freq") gain.redistribute("freq") gsv = self.gain_stack.vis[:] g = gain.gain[:] for pi, (ii, jj) in enumerate(prod): gsv[:, pi, :] = g[:, ii] * np.conjugate(g[:, jj]) self.gain_stack.weight[:] = np.ones(self.gain_stack.vis.local_shape) self.lsd_list = input_lsd self.log.info("Starting gain stack with LSD:%i", input_lsd[0]) return # Keep gains around for next round, save current lsd to list, log self.log.info("Adding LSD:%i to gain stack", gain.attrs["lsd"]) gain.redistribute("freq") gsv = self.gain_stack.vis[:] g = gain.gain[:] # Calculate the gain products for pi, (ii, jj) in enumerate(prod): gsv[:, pi] += g[:, ii] * np.conjugate(g[:, jj]) self.gain_stack.weight[:] += np.ones(self.gain_stack.vis.local_shape) self.lsd_list += input_lsd
[docs] def process_finish(self): """Multiply summed gain with sidereal stream. Returns ------- data : containers.SiderealStream or containers.TimeStream Stack of sidereal data with gain applied. """ # If requested, or shapes of visibilties and gain stack don't match then just return stack. if ( self.stream.vis[:].shape[-1] != self.gain_stack.vis[:].shape[-1] ) or self.only_gains: self.log.info("Saving only gain stack") self.log.info( "Either requested or shapes of visibilites and gain stack do not match" ) self.gain_stack.vis[:] = self.gain_stack.vis[:] / self.gain_stack.weight[:] return self.gain_stack data = empty_like(self.stream) data.redistribute("freq") self.gain_stack.vis[:] = self.gain_stack.vis[:] / self.gain_stack.weight[:] data.vis[:] = self.stream.vis[:] * self.gain_stack.vis[:] data.weight[:] = self.stream.weight[:] data.attrs["tag"] = "gain_stack" return data
def _ensure_list(x): if hasattr(x, "__iter__"): y = list(x) else: y = [x] return y
[docs] def generate_fluctuations(x, corrfunc, n, prev_x, prev_fluc): """Generate correlated random streams. Generates a Gaussian field from the given correlation function and (potentially) correlated with prior data. Parameters ---------- x : np.ndarray[npoints] Coordinates of samples in the new stream. corrfunc : function See documentation of `gaussian_realisation`. prev_x : np.ndarray[npoints] Coordinates of previous samples. Ignored if `prev_fluc` is None. prev_fluc : np.ndarray[npoints] Values of previous samples. If `None` the stream is initialised. n : int Number of realisations to generate. Returns ------- y : np.ndarray[n, npoints] Realisations of the stream. """ nx = len(x) if prev_fluc is None: fluctuations = gaussian_realisation(x, corrfunc, n).reshape(n, nx) else: fluctuations = constrained_gaussian_realisation( x, corrfunc, n, prev_x, prev_fluc ).reshape(n, nx) return fluctuations
[docs] def gaussian_realisation(x, corrfunc, n, rcond=1e-12): """Generate a Gaussian random field. Parameters ---------- x : np.ndarray[npoints] or np.ndarray[npoints, ndim] Co-ordinates of points to generate. corrfunc : function(x) -> covariance matrix Function that take (vectorized) co-ordinates and returns their covariance functions. n : integer Number of realisations to generate. rcond : float, optional Ignore eigenmodes smaller than `rcond` times the largest eigenvalue. Returns ------- y : np.ndarray[n, npoints] Realisations of the gaussian field. """ return _realisation(corrfunc(x), n, rcond)
def _realisation(C, n, rcond): """Create a realisation of the given covariance matrix. Regularise by throwing away small eigenvalues. """ import scipy.linalg as la # Find the eigendecomposition, truncate small modes, and use this to # construct a matrix projecting from the non-singular space evals, evecs = la.eigh(C) num = np.sum(evals > rcond * evals[-1]) R = evecs[:, -num:] * evals[np.newaxis, -num:] ** 0.5 # Generate independent gaussian variables w = np.random.standard_normal((n, num)) # Apply projection to get random field return np.dot(w, R.T)
[docs] def constrained_gaussian_realisation(x, corrfunc, n, x2, y2, rcond=1e-12): """Generate a constrained Gaussian random field. Given a correlation function generate a Gaussian random field that is consistent with an existing set of values of parameter `y2` located at co-ordinates in parameter `x2`. Parameters ---------- x : np.ndarray[npoints] or np.ndarray[npoints, ndim] Co-ordinates of points to generate. corrfunc : function(x) -> covariance matrix Function that take (vectorized) co-ordinates and returns their covariance functions. n : integer Number of realisations to generate. x2 : np.ndarray[npoints] or np.ndarray[npoints, ndim] Co-ordinates of existing points. y2 : np.ndarray[npoints] or np.ndarray[n, npoints] Existing values of the random field. rcond : float, optional Ignore eigenmodes smaller than `rcond` times the largest eigenvalue. Returns ------- y : np.ndarray[n, npoints] Realisations of the gaussian field. """ import scipy.linalg as la if (y2.ndim >= 2) and (n != y2.shape[0]): raise ValueError("Array y2 of existing data has the wrong shape.") # Calculate the covariance matrix for the full dataset xc = np.concatenate([x, x2]) M = corrfunc(xc) # Select out the different blocks l = len(x) A = M[:l, :l] B = M[:l, l:] C = M[l:, l:] # This method tends to be unstable when there are singular modes in the # covariance matrix (i.e. modes with zero variance). We can remove these by # projecting onto the non-singular modes. # Find the eigendecomposition and construct projection matrices onto the # non-singular space evals_A, evecs_A = la.eigh(A) evals_C, evecs_C = la.eigh(C) num_A = np.sum(evals_A > rcond * evals_A.max()) num_C = np.sum(evals_C > rcond * evals_C.max()) R_A = evecs_A[:, -num_A:] R_C = evecs_C[:, -num_C:] # Construct the covariance blocks in the reduced basis A_r = np.diag(evals_A[-num_A:]) B_r = np.dot(R_A.T, np.dot(B, R_C)) Ci_r = np.diag(1.0 / evals_C[-num_C:]) # Project the existing data into the new basis y2_r = np.dot(y2, R_C) # Calculate the mean of the new variables z_r = np.dot(y2_r, np.dot(Ci_r, B_r.T)) # Generate fluctuations for the new variables (in the reduced basis) Ap_r = A_r - np.dot(B_r, np.dot(Ci_r, B_r.T)) y_r = _realisation(Ap_r, n, rcond) # Project into the original basis for A return np.dot(z_r + y_r, R_A.T)