draco.analysis.transform

Miscellaneous transformations to do on data.

This includes grouping frequencies and products to performing the m-mode transform.

Functions

stokes_I(sstream, tel)

Extract instrumental Stokes I from a time/sidereal stream.

Classes

CollateProducts()

Extract and order the correlation products for map-making.

Downselect()

Apply axis selections to a container.

ElevationDependentHybridVisWeight()

Add elevation dependence to hybrid visibility weights.

FrequencyRebin()

Rebin neighbouring frequency channels.

GenerateSubBands()

Generate multiple sub-bands from an input container.

HPFTimeStream()

High pass filter a timestream.

Jackknife()

Perform a jackknife of two datasets.

MModeInverseTransform()

Transform m-modes to sidereal stream.

MModeTransform()

Transform a sidereal stream to m-modes.

MixData()

Mix together pieces of data with specified weights.

MixTwoDatasets()

Mix two datasets in a single iteration.

ReduceBase()

Apply a weighted reduction operation across specific axes.

ReduceChisq()

Calculate the chi-squared per degree of freedom.

ReduceVar()

Take the weighted variance of a container.

Regridder()

Interpolate time-ordered data onto a regular grid.

SelectFreq()

Select a subset of frequencies from a container.

SelectPol()

Extract a subset of polarisations, including Stokes parameters.

ShiftRA()

Add a shift to the RA axis.

SiderealMModeResample()

Resample a sidereal stream by FFT.

StokesIVis()

Extract instrumental Stokes I from visibilities.

TelescopeStreamMixIn()

A mixin providing functionality for creating telescope-defined sidereal streams.

TransformJanskyToKelvin()

Task to convert from Jy to Kelvin and vice-versa.

class draco.analysis.transform.CollateProducts[source]

Bases: TelescopeStreamMixIn, SingleTask

Extract and order the correlation products for map-making.

The task will take a sidereal task and format the products that are needed or the map-making. It uses a BeamTransfer instance to figure out what these products are, and how they should be ordered. It similarly selects only the required frequencies.

It is important to note that while the input SiderealStream can contain more feeds and frequencies than are contained in the BeamTransfers, the converse is not true. That is, all the frequencies and feeds that are in the BeamTransfers must be found in the timestream object.

Parameters:

weight (string ('natural', 'uniform', or 'inverse_variance')) –

How to weight the redundant baselines when stacking:

’natural’ - each baseline weighted by its redundancy (default) ‘uniform’ - each baseline given equal weight ‘inverse_variance’ - each baseline weighted by the weight attribute

process(ss: SiderealStream) SiderealStream[source]
process(ss: TimeStream) TimeStream

Select and reorder the products.

Parameters:

ss – Data with products

Returns:

Dataset containing only the required products.

Return type:

sp

class draco.analysis.transform.Downselect[source]

Bases: SelectionsMixin, SingleTask

Apply axis selections to a container.

Apply slice or np.take operations across multiple axes of a container. The selections are applied to every dataset.

If a dataset is distributed, there must be at least one axis not included in the selections.

process(data: ContainerBase) ContainerBase[source]

Apply downselections to the container.

Parameters:

data – Container to process

Returns:

Container of same type as the input with specific axis selections. Any datasets not included in the selections will not be initialized.

Return type:

out

class draco.analysis.transform.ElevationDependentHybridVisWeight[source]

Bases: SingleTask

Add elevation dependence to hybrid visibility weights.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(data: HybridVisStream)[source]

Remove the weights dataset and broadcast to elevation weights.

Parameters:

data – Hybrid visibilities with elevation-independent weights.

Returns:

Input container with different weights dataset

Return type:

data

class draco.analysis.transform.FrequencyRebin[source]

Bases: SingleTask

Rebin neighbouring frequency channels.

Parameters:

channel_bin (int) – Number of channels to in together.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(ss)[source]

Take the input dataset and rebin the frequencies.

Parameters:

ss (containers.SiderealStream or containers.TimeStream) – Input data to rebin. Can also be an andata.CorrData instance, however the output will be a containers.TimeStream instance.

Returns:

sb – Rebinned data. Type should match the input.

Return type:

containers.SiderealStream or containers.TimeStream

class draco.analysis.transform.GenerateSubBands[source]

Bases: SelectFreq

Generate multiple sub-bands from an input container.

sub_band_spec

Dictionary of the format {“band_a”: {“channel_range”: [0, 64]}, …} where each entry is a separate sub-band with the key providing the tag that will be used to describe the sub-band and the value providing a dictionary that can contain any of the config properties used by the SelectFreq task to downselect along the frequency axis to obtain the sub-band from the input container.

Type:

dict

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process()[source]

Select the next sub-band from the container that was provided on setup.

Returns:

sub – Same type of container as was provided on setup, downselected along the frequency axis.

Return type:

container

setup(data)[source]

Cache the data product that will be sub-divided along the frequency axis.

Parameters:

data (container) – Any container with a freq axis.

class draco.analysis.transform.HPFTimeStream[source]

Bases: SingleTask

High pass filter a timestream.

This is done by solving for a low-pass filtered version of the timestream and then subtracting it from the original.

Parameters:
  • tau – Timescale in seconds to filter out fluctuations below.

  • pad – Implicitly pad the timestream with this many multiples of tau worth of zeros. This is used to mitigate edge effects. The default is 2.

  • window – Use a Blackman window when determining the low-pass filtered timestream. When applied this approximately doubles the length of the timescale, which is only crudely corrected for.

  • prior – This should be approximately the size of the large scale fluctuations that we will use as a regulariser.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(tstream: TODContainer) TODContainer[source]

High pass filter a time stream.

Parameters:

tstream – A TOD container that also implements DataWeightContainer.

Returns:

The high-pass filtered time stream.

Return type:

filtered_tstream

class draco.analysis.transform.Jackknife[source]

Bases: MixData

Perform a jackknife of two datasets.

This is identical to MixData but sets the default config properties to values appropriate for carrying out a jackknife of two datasets.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

class draco.analysis.transform.MModeInverseTransform[source]

Bases: SingleTask

Transform m-modes to sidereal stream.

Currently ignores any noise weighting.

Warning

Using apply_integration_window will modify the input mmodes.

nra

Number of RA bins in the output. Note that if the number of samples does not Nyquist sample the maximum m, information may be lost. If not set, then try to get from an original_nra attribute on the incoming MModes, otherwise determine an appropriate number of RA bins from the mmax.

Type:

int

apply_integration_window

Apply the effect of the finite width of the RA integration (presuming a rectangular integration window). This is applied to both the visibilities and the weights. If this is true, as a side effect the input data will be modified in place.

Type:

bool

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(mmodes: MContainer) SiderealContainer[source]

Perform the m-mode inverse transform.

Parameters:

mmodes (containers.MModes) – The input m-modes.

Returns:

sstream – The output sidereal stream.

Return type:

containers.SiderealStream

class draco.analysis.transform.MModeTransform[source]

Bases: SingleTask

Transform a sidereal stream to m-modes.

Currently ignores any noise weighting.

The maximum m used in the container is derived from the number of time samples, or if a manager is supplied telescope.mmax is used.

remove_integration_window

Deconvolve the effect of the finite width of the RA integration (presuming it was a rectangular integration window). This is applied to both the visibilities and the weights.

Type:

bool

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(sstream: SiderealContainer) MContainer[source]

Perform the m-mode transform.

Parameters:

sstream (containers.SiderealStream or containers.HybridVisStream) – The input sidereal stream.

Returns:

mmodes

Return type:

containers.MModes

setup(manager: ProductManager | BeamTransfer | TransitTelescope | None = None)[source]

Set the telescope instance if a manager object is given.

This is used to set the mmax used in the transform.

Parameters:

manager (manager.ProductManager, optional) – The telescope/manager used to set the mmax. If not set, mmax is derived from the timestream.

class draco.analysis.transform.MixData[source]

Bases: SingleTask

Mix together pieces of data with specified weights.

This can generate arbitrary linear combinations of the data and weights for both SiderealStream and RingMap objects, and can be used for many purposes such as: adding together simulated timestreams, injecting signal into data, replacing weights in simulated data with those from real data, performing jackknifes, etc.

All coefficients are applied naively to generate the final combinations, i.e. no normalisations or weighted summation is performed.

data_coeff

A list of coefficients to apply to the data dataset of each input container to produce the final output. These are applied to either the vis or map dataset depending on the the type of the input container.

Type:

list

weight_coeff

Coefficient to be applied to each input containers weights to generate the output.

Type:

list

invert_weight

Invert the weights to convert to variance prior to mixing. Re-invert in the final mixed data product to convert back to inverse variance.

Type:

bool

require_nonzero_weight

Set the weight to zero in the mixed data if the weight is zero in any of the input data.

Type:

bool

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(data: SiderealStream | HybridVisStream | RingMap)[source]

Add the input data into the mixed data output.

Parameters:

data – The data to be added into the mix.

process_finish() SiderealStream | HybridVisStream | RingMap[source]

Return the container with the mixed inputs.

Returns:

The mixed data.

Return type:

mixed_data

setup()[source]

Check the lists have the same length.

class draco.analysis.transform.MixTwoDatasets[source]

Bases: MixData

Mix two datasets in a single iteration.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(data1, data2)[source]

Combine the two datasets into mixed data output.

Parameters:
process_finish()[source]

Overwrite process_finish to no-op.

class draco.analysis.transform.ReduceBase[source]

Bases: SingleTask

Apply a weighted reduction operation across specific axes.

This is non-functional without overriding the reduction method.

There must be at least one axis not included in the reduction.

axes

Axis names to apply the reduction to

Type:

list

dataset

Dataset name to reduce.

Type:

str

weighting

Which type of weighting to use, if applicable. Options are “none”, “masked”, or “weighted”

Type:

str

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(data: ContainerBase) ContainerBase[source]

Downselect and apply the reduction operation to the data.

Parameters:

data – Dataset to process.

Returns:

Dataset of same type as input with axes reduced. Any datasets which are not included in the reduction list will not be initialized, other than weights.

Return type:

out

reduction(arr: ndarray, weight: ndarray, axis: tuple) tuple[ndarray, ndarray][source]

Overwrite to implement the reductino operation.

class draco.analysis.transform.ReduceChisq[source]

Bases: ReduceBase

Calculate the chi-squared per degree of freedom.

Assumes that the visibilities are uncorrelated noise whose inverse variance is given by the weight dataset.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

reduction(arr, weight, axis)[source]

Apply a chi-squared calculation.

class draco.analysis.transform.ReduceVar[source]

Bases: ReduceBase

Take the weighted variance of a container.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

reduction(arr, weight, axis)[source]

Apply a weighted variance.

class draco.analysis.transform.Regridder[source]

Bases: SingleTask

Interpolate time-ordered data onto a regular grid.

Uses a maximum-likelihood inverse of a Lanczos interpolation to do the regridding. This gives a reasonably local regridding, that is pretty well behaved in m-space.

samples

Number of samples to interpolate onto.

Type:

int

start

Start of the interpolated samples.

Type:

float

end

End of the interpolated samples.

Type:

float

lanczos_width

Width of the Lanczos interpolation kernel.

Type:

int

snr_cov

Ratio of signal covariance to noise covariance (used for Wiener filter).

Type:

float

mask_zero_weight

Mask the output noise weights at frequencies where the weights were zero for all time samples.

Type:

bool

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(data)[source]

Regrid visibility data in the time direction.

Parameters:

data (containers.TODContainer) – Time-ordered data.

Returns:

new_data – The regularly gridded interpolated timestream.

Return type:

containers.TODContainer

setup(observer)[source]

Set the local observers position.

Parameters:

observer (Observer) – An Observer object holding the geographic location of the telescope. Note that TransitTelescope instances are also Observers.

class draco.analysis.transform.SelectFreq[source]

Bases: SingleTask

Select a subset of frequencies from a container.

freq_physical

List of physical frequencies in MHz. Given first priority.

Type:

list

channel_range

Range of frequency channel indices, either [start, stop, step], [start, stop], or [stop] is acceptable. Given second priority.

Type:

list

channel_index

List of frequency channel indices. Given third priority.

Type:

list

freq_physical_range

Range of physical frequencies to include given as (low_freq, high_freq). Given fourth priority.

Type:

list

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(data)[source]

Selet a subset of the frequencies.

Parameters:

data (containers.ContainerBase) – A data container with a frequency axis.

Returns:

newdata – New container with trimmed frequencies.

Return type:

containers.ContainerBase

class draco.analysis.transform.SelectPol[source]

Bases: SingleTask

Extract a subset of polarisations, including Stokes parameters.

This currently only extracts Stokes I.

pol

Polarisations to extract. Only Stokes I extraction is supported (i.e. pol = [“I”]).

Type:

list

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(polcont)[source]

Extract the specified polarisation from the input.

This will combine polarisation pairs to get instrumental Stokes polarisations if requested.

Parameters:

polcont (ContainerBase) – A container with a polarisation axis.

Returns:

selectedpolcont – A new container with the selected polarisation.

Return type:

same as polcont

class draco.analysis.transform.ShiftRA[source]

Bases: SingleTask

Add a shift to the RA axis.

This is useful for fixing a bug in earlier revisions of CHIME processing.

Parameters:
  • delta (float) – The shift to add to the RA axis.

  • periodic (bool, optional) – If True, wrap any time sample that is shifted to RA > 360 deg around to its 360-degree-periodic counterpart, and likewise for any sample that is shifted to RA < 0 deg. This wrapping is applied to the RA index_map along with any dataset with an ra axis. Default: False.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(sscont: SiderealContainer) SiderealContainer[source]

Add a shift to the input sidereal container.

Parameters:

sscont – The container to shift. The input is modified in place.

Returns:

The shifted container.

Return type:

sscont

class draco.analysis.transform.SiderealMModeResample[source]

Bases: TaskGroup

Resample a sidereal stream by FFT.

This performs a forward and inverse m-mode transform to resample a sidereal stream.

nra

The number of RA bins for the output stream.

Type:

int

remove_integration_window, apply_integration_window

Remove the integration window from the incoming data, and/or apply it to the output sidereal stream.

Type:

bool

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

class draco.analysis.transform.StokesIVis[source]

Bases: SingleTask

Extract instrumental Stokes I from visibilities.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(data)[source]

Extract instrumental Stokes I.

This process will reduce the length of the baseline axis.

Parameters:

data (containers.VisContainer) – Container with visibilities and baselines matching the telescope object.

Returns:

data – Container with the same type as data, with polarised baselines combined into Stokes I.

Return type:

containers.VisContainer

setup(telescope)[source]

Set the local observers.

Parameters:

telescope (Observer) – An Observer object holding the geographic location of the telescope. Note that TransitTelescope instances are also Observers.

class draco.analysis.transform.TelescopeStreamMixIn[source]

Bases: object

A mixin providing functionality for creating telescope-defined sidereal streams.

This mixin is designed to be used with pipeline tasks that require certain index maps in order to create SiderealStream containers compatible with the baseline configuration provided in a telescope instance.

setup(tel)[source]

Set up the telescope instance and precompute index maps.

Parameters:

tel (TransitTelescope) – The telescope instance to use to compute the prod, stack, and reverse_stack index maps.

class draco.analysis.transform.TransformJanskyToKelvin[source]

Bases: SingleTask

Task to convert from Jy to Kelvin and vice-versa.

This integrates over the primary beams in the telescope class to derive the brightness temperature to flux conversion.

convert_Jy_to_K

If True, apply a Jansky to Kelvin conversion factor. If False apply a Kelvin to Jansky conversion.

Type:

bool

reference_declination

The declination to set the flux reference for. A source transiting at this declination will produce a visibility signal equal to its flux. If None (default) use the zenith.

Type:

float, optional

share

Which datasets should the output share with the input. Default is “all”.

Type:

{“none”, “all”}

nside

The NSIDE to use for the primary beam area calculation. This may need to be increased for beams with intricate small scale structure. Default is 256.

Type:

int

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(sstream: SiderealStream) SiderealStream[source]

Apply the brightness temperature to flux conversion to the data.

Parameters:

sstream – The visibilities to apply the conversion to. They are converted to/from brightness temperature units depending on the setting of convert_Jy_to_K.

Returns:

Visibilities with the conversion applied. This may be the same as the input container if share == “all”.

Return type:

new_sstream

setup(telescope: ProductManager | BeamTransfer | TransitTelescope)[source]

Set the telescope object.

Parameters:

telescope – An object we can get a telescope object from. This telescope must be able to calculate the beams at all incoming frequencies.

draco.analysis.transform.stokes_I(sstream, tel)[source]

Extract instrumental Stokes I from a time/sidereal stream.

Parameters:
  • sstream (containers.SiderealStream, container.TimeStream) – Stream of correlation data.

  • tel (TransitTelescope) – Instance describing the telescope.

Returns:

  • vis_I (mpiarray.MPIArray[nfreq, nbase, ntime]) – The instrumental Stokes I visibilities, distributed over baselines.

  • vis_weight (mpiarray.MPIArray[nfreq, nbase, ntime]) – The weights for each visibility, distributed over baselines.

  • ubase (np.ndarray[nbase, 2]) – Baseline vectors corresponding to output.