draco.core.io

Tasks for reading and writing data.

File Groups

Several tasks accept groups of files as arguments. These are specified in the YAML file as a dictionary like below.

list_of_file_groups:
    -   tag: first_group  # An optional tag naming the group
        files:
            -   'file1.h5'
            -   'file[3-4].h5'  # Globs are processed
            -   'file7.h5'

    -   files:  # No tag specified, implicitly gets the tag 'group_2'
            -   'another_file1.h5'
            -   'another_file2.h5'


single_group:
    files: ['file1.h5', 'file2.h5']

Functions

get_beamtransfer(obj)

Return a BeamTransfer object out of the input.

get_telescope(obj)

Return a telescope object out of the input.

Classes

BaseLoadFiles()

Base class for loading containers from a file on disk.

FindFiles()

Take a glob or list of files and pass on to other tasks.

LoadBasicCont

alias of LoadFilesFromParams

LoadBeamTransfer()

Loads a beam transfer manager from disk.

LoadFITSCatalog()

Load an SDSS-style FITS source catalog.

LoadFiles()

Load data from files passed into the setup routine.

LoadFilesAndSelect()

Load a collection of files on setup and select specific entries on process.

LoadFilesFromAttrs()

Load files from paths constructed using the attributes of another container.

LoadFilesFromParams()

Load data from files given in the tasks parameters.

LoadMaps()

Load a series of maps from files given in the tasks parameters.

LoadProductManager()

Loads a driftscan product manager from disk.

Print()

Stupid module which just prints whatever it gets.

Save()

Save out the input, and pass it on.

SaveConfig()

Write pipeline config to a text file.

SaveModuleVersions()

Write module versions to a YAML file.

SaveZarrZip()

Save a container as a .zarr.zip file.

SelectionsMixin()

Mixin for parsing axis selections, typically from a yaml config.

Truncate()

Precision truncate data prior to saving with bitshuffle compression.

WaitZarrZip()

Collect Zarr-zipping jobs and wait for them to complete.

ZarrZipHandle(filename, handle)

A handle for keeping track of background Zarr-zipping job.

ZipZarrContainers()

Zip up a Zarr container into a single file.

class draco.core.io.BaseLoadFiles[source]

Bases: SelectionsMixin, SingleTask

Base class for loading containers from a file on disk.

Provides the capability to make selections along axes.

distributed

Whether the file should be loaded distributed across ranks.

Type:

bool, optional

convert_strings

Convert strings to unicode when loading.

Type:

bool, optional

redistribute

An optional axis name to redistribute the container over after it has been read.

Type:

str, optional

class draco.core.io.FindFiles[source]

Bases: TaskBase

Take a glob or list of files and pass on to other tasks.

Files are specified as a parameter in the configuration file.

Parameters:

files (list or glob)

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

setup()[source]

Return list of files specified in the parameters.

draco.core.io.LoadBasicCont

alias of LoadFilesFromParams

class draco.core.io.LoadBeamTransfer[source]

Bases: TaskBase

Loads a beam transfer manager from disk.

product_directory

Path to the saved Beam Transfer products.

Type:

str

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

setup()[source]

Load the beam transfer matrices.

Returns:

  • tel (TransitTelescope) – Object describing the telescope.

  • bt (BeamTransfer) – BeamTransfer manager.

  • feed_info (list, optional) – Optional list providing additional information about each feed.

class draco.core.io.LoadFITSCatalog[source]

Bases: SingleTask

Load an SDSS-style FITS source catalog.

Catalogs are given as one, or a list of File Groups (see draco.core.io). Catalogs within the same group are combined together before being passed on.

catalogs

A dictionary specifying a file group, or a list of them.

Type:

list or dict

z_range

Select only sources with a redshift within the given range.

Type:

list, optional

freq_range

Select only sources with a 21cm line freq within the given range. Overrides z_range.

Type:

list, optional

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process()[source]

Load the groups of catalogs from disk, concatenate them and pass them on.

Returns:

catalog

Return type:

containers.SpectroscopicCatalog

class draco.core.io.LoadFiles[source]

Bases: LoadFilesFromParams

Load data from files passed into the setup routine.

File must be a serialised subclass of memh5.BasicCont.

setup(files)[source]

Set the list of files to load.

Parameters:

files (list) – Files to load

class draco.core.io.LoadFilesAndSelect[source]

Bases: BaseLoadFiles

Load a collection of files on setup and select specific entries on process.

files

A list of file paths or a glob pattern specifying the files to load.

Type:

list of str or str

key_format

A format string used to generate keys for file selection. Can reference any variables contained in the attributes of the containers. If None, files are stored with numerical indices.

Type:

str, optional

process(incont)[source]

Select and return a file from the collection based on the input container.

If key_format is provided, the selection key is derived from the attributes of the input container. If the resulting key is not found in the collection, a warning is logged, and None is returned.

If key_format is not provided, files are selected sequentially from the collection, cycling back to the beginning if more input containers are received than the number of available files.

Parameters:

incont (memh5.BasicCont subclass) – Container whose attributes are used to determine the selection key.

Returns:

outcont – The selected file if found, otherwise None.

Return type:

memh5.BasicCont subclass or None

setup()[source]

Load and store files in a dictionary.

This method iterates through the list of files, loads their contents, and stores them in the self.collection dictionary. If key_format is provided, it is used to generate a key based on the file attributes. Otherwise, the index of the file in the list is used as the key.

class draco.core.io.LoadFilesFromAttrs[source]

Bases: BaseLoadFiles

Load files from paths constructed using the attributes of another container.

This class enables the dynamic generation of file paths by formatting a specified filename template with attributes from an input container. It inherits from BaseLoadFiles and provides functionality to load files into a container.

filename

Template for the file path, which can include placeholders referencing attributes in the input container. For example: rfi_mask_lsd_{lsd}.h5. The placeholders will be replaced with corresponding attribute values from the input container.

Type:

str

process(incont)[source]

Load a file based on attributes from the input container.

Parameters:

incont (subclass of memh5.BasicCont) – Input container whose attributes are used to construct the file path.

Returns:

outcont – A container populated with data from the loaded file.

Return type:

subclass of memh5.BasicCont

class draco.core.io.LoadFilesFromParams[source]

Bases: BaseLoadFiles

Load data from files given in the tasks parameters.

files

Can either be a glob pattern, or lists of actual files.

Type:

glob pattern, or list

process()[source]

Load the given files in turn and pass on.

Returns:

cont

Return type:

subclass of memh5.BasicCont

class draco.core.io.LoadMaps[source]

Bases: MPILoggedTask

Load a series of maps from files given in the tasks parameters.

Maps are given as one, or a list of File Groups (see draco.core.io). Maps within the same group are added together before being passed on.

maps

A dictionary specifying a file group, or a list of them.

Type:

list or dict

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

next()[source]

Load the groups of maps from disk and pass them on.

Returns:

map

Return type:

containers.Map

class draco.core.io.LoadProductManager[source]

Bases: TaskBase

Loads a driftscan product manager from disk.

product_directory

Path to the root of the products. This is the same as the output directory used by drift-makeproducts.

Type:

str

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

setup()[source]

Load the beam transfer matrices.

Returns:

manager – Object describing the telescope.

Return type:

ProductManager

class draco.core.io.Print[source]

Bases: TaskBase

Stupid module which just prints whatever it gets. Good for debugging.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

next(input_)[source]

Print the input.

class draco.core.io.Save[source]

Bases: TaskBase

Save out the input, and pass it on.

Assumes that the input has a to_hdf5 method. Appends a tag if there is a tag entry in the attributes, otherwise just uses a count.

root

Root of the file name to output to.

Type:

str

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

next(data)[source]

Write out the data file.

Assumes it has an MPIDataset interface.

Parameters:

data (mpidataset.MPIDataset) – Data to write out.

class draco.core.io.SaveConfig[source]

Bases: SingleTask

Write pipeline config to a text file.

Yaml configuration document is written to a text file.

root

Root of the file name to output to.

Type:

str

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process()[source]

Do nothing.

setup()[source]

Save module versions.

class draco.core.io.SaveModuleVersions[source]

Bases: SingleTask

Write module versions to a YAML file.

The list of modules should be added to the configuration under key ‘save_versions’. The version strings are written to a YAML file.

root

Root of the file name to output to.

Type:

str

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process()[source]

Do nothing.

setup()[source]

Save module versions.

class draco.core.io.SaveZarrZip[source]

Bases: ZipZarrContainers

Save a container as a .zarr.zip file.

This task saves the output first as a .zarr container, and then starts a background job to start turning it into a zip file. It returns a handle to this job. All these handles should be fed into a WaitZarrZip task to ensure the pipeline run does not terminate before they are complete.

This accepts most parameters that a standard task would for saving, including compression parameter overrides.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

next(container: BasicCont) ZarrZipHandle[source]

Take a container and save it out as a .zarr.zip file.

Parameters:

container – Container to save out.

Returns:

A handle to use to determine if the job has successfully completed. This should be given to the WaitZarrZip task.

Return type:

handle

setup()[source]

Check the parameters and determine the ranks to use.

class draco.core.io.SelectionsMixin[source]

Bases: object

Mixin for parsing axis selections, typically from a yaml config.

selections

A dictionary of axis selections. See below for details.

Type:

dict, optional

allow_index_map

If true, selections can be made based on an index_map dataset. This cannot be implemented when reading from disk. See below for details. Default is False.

Type:

bool, optional

Selections
----------
Selections can be given to limit the data read to specified subsets. They can be
given for any named axis in the container.
Selections can be given as a slice with an `<axis name>_range` key with either
`[start, stop]` or `[start, stop, step]` as the value. Alternatively a list of
explicit indices to extract can be given with the `<axis name>_index` key, and
the value is a list of the indices. Finally, selection based on an `index_map`
can be given with specific `index_map` entries with the `<axis name>_map` key,
which will be converted to axis indices. `<axis name>_range` will take precedence
over `<axis name>_index`, which will in turn take precedence over `<axis_name>_map`,
but you should clearly avoid doing this.
Additionally, index-based selections currently don't work for distributed reads.
Here's an example in the YAML format that the pipeline uses
.. code-block:: yaml
selections:

freq_range: [256, 512, 4] # A strided slice stack_index: [1, 2, 4, 9, 16, 25, 36, 49, 64] # A sparse selection stack_range: [1, 14] # Will override the selection above pol_map: [“XX”, “YY”] # Select the indices corresponding to these entries

setup()[source]

Resolve the selections.

class draco.core.io.Truncate[source]

Bases: SingleTask

Precision truncate data prior to saving with bitshuffle compression.

If no configuration is provided, will look for preset values for the input container. Any properties defined in the config will override the presets.

If available, each specified dataset will be truncated relative to a (specified) weight dataset with the truncation increasing the variance up to the specified maximum in variance_increase. If there is no specified weight dataset then the truncation falls back to using the fixed_precision.

dataset

Datasets to be truncated as keys. Possible values are: - bool : Whether or not to truncate, using default fixed precision. - float : Truncate to this relative precision. - dict : Specify values for weight_dataset, fixed_precision, variance_increase.

Type:

dict

ensure_chunked

If True, ensure datasets are chunked according to their dataset_spec.

Type:

bool

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(data)[source]

Truncate the incoming data.

The truncation is done in place.

Parameters:

data (containers.ContainerBase) – Data to truncate.

Returns:

truncated_data – Truncated data.

Return type:

containers.ContainerBase

Raises:

config.CaputConfigError – If the input data container has no preset values and fixed_precision or variance_increase are not set in the config.

class draco.core.io.WaitZarrZip[source]

Bases: MPILoggedTask

Collect Zarr-zipping jobs and wait for them to complete.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

finish()[source]

Wait for all Zarr zipping jobs to complete.

next(handle: ZarrZipHandle)[source]

Receive the handles to wait on.

Parameters:

handle – The handle to wait on.

class draco.core.io.ZarrZipHandle(filename: str, handle: Popen | None)[source]

Bases: object

A handle for keeping track of background Zarr-zipping job.

class draco.core.io.ZipZarrContainers[source]

Bases: SingleTask

Zip up a Zarr container into a single file.

This is useful to save on file quota and speed up IO by combining the chunk data into a single file. Note that the file cannot really be updated after this process has been performed.

As this process is IO limited in most cases, it will attempt to parallelise the compression across different distinct nodes. That means at most only one rank per node will participate.

containers

The names of the Zarr containers to compress. The zipped files will have the same names with .zip appended.

Type:

list

remove

Remove the original data when finished. Defaults to True.

Type:

bool

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process()[source]

Compress the listed zarr containers.

Only the lowest rank on each node will participate.

setup(_=None)[source]

Setup the task.

This routine does nothing at all with the input, but it means the process won’t run until the (optional) requirement is received. This can be used to delay evaluation until you know that all the files are available.

draco.core.io.get_beamtransfer(obj)[source]

Return a BeamTransfer object out of the input.

Either ProductManager or BeamTransfer.

draco.core.io.get_telescope(obj)[source]

Return a telescope object out of the input.

Either ProductManager, BeamTransfer, or TransitTelescope.