draco.core.task

An improved base task implementing easy (and explicit) saving of outputs.

Functions

group_tasks(*tasks)

Create a Task that groups a bunch of tasks together.

Classes

Delete()

Delete pipeline products to free memory.

LoggedTask()

A task with logger support.

MPILogFilter([add_mpi_info, level_rank0, ...])

Filter log entries by MPI rank.

MPILoggedTask()

A task base that has MPI aware logging.

MPITask()

Base class for MPI using tasks.

ReturnFirstInputOnFinish()

Workaround for caput.pipeline issues.

ReturnLastInputOnFinish()

Workaround for caput.pipeline issues.

SetMPILogging()

A task used to configure MPI aware logging.

SingleTask()

Process a task with at most one input and output.

class draco.core.task.Delete[source]

Bases: SingleTask

Delete pipeline products to free memory.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(x)[source]

Delete the input and collect garbage.

Parameters:

x (object) – The object to be deleted.

class draco.core.task.LoggedTask[source]

Bases: TaskBase

A task with logger support.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

property log

The logger object for this task.

class draco.core.task.MPILogFilter(add_mpi_info=True, level_rank0=20, level_all=30)[source]

Bases: Filter

Filter log entries by MPI rank.

Also this will optionally add MPI rank information, and add an elapsed time entry.

Parameters:
  • add_mpi_info (boolean, optional) – Add MPI rank/size info to log records that don’t already have it.

  • level_rank0 (int) – Log level for messages from rank=0.

  • level_all (int) – Log level for messages from all other ranks.

Initialize a filter.

Initialize with the name of the logger which, together with its children, will have its events allowed through the filter. If no name is specified, allow every event.

filter(record)[source]

Add MPI info if desired.

class draco.core.task.MPILoggedTask[source]

Bases: MPITask, LoggedTask

A task base that has MPI aware logging.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

class draco.core.task.MPITask[source]

Bases: TaskBase

Base class for MPI using tasks.

Just ensures that the task gets a comm attribute.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

class draco.core.task.ReturnFirstInputOnFinish[source]

Bases: SingleTask

Workaround for caput.pipeline issues.

This caches its input on the first call to process and then returns it for a finish call.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(x)[source]

Take a reference to the input.

Parameters:

x (object) – Object to cache

process_finish()[source]

Return the last input to process.

Returns:

x – Last input to process.

Return type:

object

class draco.core.task.ReturnLastInputOnFinish[source]

Bases: SingleTask

Workaround for caput.pipeline issues.

This caches its input on every call to process and then returns the last one for a finish call.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

process(x)[source]

Take a reference to the input.

Parameters:

x (object) – Object to cache

process_finish()[source]

Return the last input to process.

Returns:

x – Last input to process.

Return type:

object

class draco.core.task.SetMPILogging[source]

Bases: TaskBase

A task used to configure MPI aware logging.

level_rank0, level_all

Log level for rank=0, and other ranks respectively.

Type:

int or str

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

class draco.core.task.SingleTask[source]

Bases: MPILoggedTask, BasicContMixin

Process a task with at most one input and output.

Both input and output are expected to be memh5.BasicCont objects. This class allows writing of the output when requested.

Tasks inheriting from this class should override process and optionally setup() or finish(). They should not override next().

If the value of input_root is anything other than the string “None” then the input will be read (using read_input()) from the file self.input_root + self.input_filename. If the input is specified both as a filename and as a product key in the pipeline configuration, an error will be raised upon initialization.

If the value of output_root is anything other than the string “None” then the output will be written (using write_output()) to the file self.output_root + self.output_filename.

save

Whether to save the output to disk or not. Can be provided as a list if multiple outputs are being handled. Default is False.

Type:

list | bool

attrs

A mapping of attribute names and values to set in the .attrs at the root of the output container. String values will be formatted according to the standard Python .format(…) rules, and can interpolate several other values into the string. These are:

  • count: an integer giving which iteration of the task is this.

  • tag: a string identifier for the output derived from the

    containers tag attribute. If that attribute is not present count is used instead.

  • key: the name of the output key.

  • task: the (unqualified) name of the task.

  • input_tags: a list of the tags for each input argument for the task.

  • Any existing attribute in the container can be interpolated by the name of its key. The specific values above will override any attribute with the same name.

Incorrectly formatted values will cause an error to be thrown.

Type:

dict, optional

tag

Set a format for the tag attached to the output. This is a Python format string which can interpolate the variables listed under attrs above. For example a tag of “cat{count}” will generate catalogs with the tags “cat1”, “cat2”, etc.

Type:

str, optional

output_name

A python format string used to construct the filename. All variables given under attrs above can be interpolated into the filename. Can be provided as a list if multiple output are being handled. Valid identifiers are:

  • count: an integer giving which iteration of the task is this.

  • tag: a string identifier for the output derived from the

    containers tag attribute. If that attribute is not present count is used instead.

  • key: the name of the output key.

  • task: the (unqualified) name of the task.

  • output_root: the value of the output root argument. This is deprecated

    and is just used for legacy support. The default value of output_name means the previous behaviour works.

Type:

list | string

compression

Set compression options for each dataset. Provided as a dict with the dataset names as keys and values for chunks, compression, and compression_opts. Any datasets not included in the dict (including if the dict is empty), will use the default parameters set in the dataset spec. If set to False (or anything that evaluates to False, other than an empty dict) chunks and compression will be disabled for all datasets. If no argument in provided, the default parameters set in the dataset spec are used. Note that this will modify these parameters on the container itself, such that if it is written out again downstream in the pipeline these will be used.

Type:

dict or bool, optional

output_root

Pipeline settable parameter giving the first part of the output path. Deprecated in favour of output_name.

Type:

string

nan_check

Check the output for NaNs (and infs) logging if they are present.

Type:

bool

nan_dump

If NaN’s are found, dump the container to disk.

Type:

bool

nan_skip

If NaN’s are found, don’t pass on the output.

Type:

bool

versions

Keys are module names (str) and values are their version strings. This is attached to output metadata.

Type:

dict

pipeline_config

Global pipeline configuration. This is attached to output metadata.

Type:

dict

Raises:

caput.pipeline.PipelineRuntimeError – If this is used as a baseclass to a task overriding self.process with variable length or optional arguments.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

finish()[source]

Should not need to override. Implement process_finish instead.

next(*input)[source]

Should not need to override. Implement process instead.

draco.core.task.group_tasks(*tasks)[source]

Create a Task that groups a bunch of tasks together.

This method creates a class that inherits from all the subtasks, and calls each process method in sequence, passing the output of one to the input of the next.

This should be used like:

>>> class SuperTask(group_tasks(SubTask1, SubTask2)):
>>>     pass

At the moment if the ensemble has more than one setup method, the SuperTask will need to implement an override that correctly calls each.