Creating a "First-Party" Task
The process for creating a "First-Party" Task
is very similar to that for a "Third-Party" Task
, with the difference being that you must also write the analysis code. The steps for integration are:
- Write the
TaskParameters
model. -
Write the
Task
class. There are a few rules that need to be adhered to. Pay special attention if you require either: -
Make your
Task
available by modifying the import function. - Specify an
Executor
Specifying a TaskParameters
Model for your Task
Parameter models have a format that must be followed for "Third-Party" Task
s, but "First-Party" Task
s have a little more liberty in how parameters are dealt with, since the Task
will do all the parsing itself.
To create a model, the basic steps are:
- If necessary, create a new module (e.g.
new_task_category.py
) underlute.io.models
, or find an appropriate pre-existing module in that directory.- An
import
statement must be added tolute.io.models._init_
if a new module is created, so it can be found. - If defining the model in a pre-existing module, make sure to modify the
__all__
statement to include it.
- An
-
Create a new model that inherits from
TaskParameters
. You can look atlute.models.io.tests.TestReadOutputParameters
for an example. The model must be named<YourTaskName>Parameters
- You should include all relevant parameters here, including input file, output file, and any potentially adjustable parameters. These parameters must be included even if there are some implicit dependencies between
Task
s and it would make sense for the parameter to be auto-populated based on some other output. Creating this dependency is done with validators (see step 3.). All parameters should be overridable, and allTask
s should be fully-independently configurable, based solely on their model and the configuration YAML. - To follow the preferred format, parameters should be defined as:
param_name: type = Field([default value], description="This parameter does X.")
- You should include all relevant parameters here, including input file, output file, and any potentially adjustable parameters. These parameters must be included even if there are some implicit dependencies between
-
Use validators to do more complex things for your parameters, including populating default values dynamically:
- E.g. create default values that depend on other parameters in the model - see for example: SubmitSMDParameters.
- E.g. create default values that depend on other
Task
s by reading from the database - see for example: TestReadOutputParameters.
-
The model will have access to some general configuration values by inheriting from
TaskParameters
. These parameters are all stored inlute_config
which is an instance ofAnalysisHeader
(defined here).- For example, the experiment and run number can be obtained from this object and a validator could use these values to define the default input file for the
Task
.
- For example, the experiment and run number can be obtained from this object and a validator could use these values to define the default input file for the
A number of configuration options and Field attributes are also available for "First-Party" Task
models. These are identical to those used for the ThirdPartyTask
s, although there is a smaller selection. These options are reproduced below for convenience.
Config settings and options
Under the class definition for Config
in the model, we can modify global options for all the parameters. In addition, there are a number of configuration options related to specifying what the outputs/results from the associated Task
are, and a number of options to modify runtime behaviour. Currently, the available configuration options are:
Config Parameter | Meaning | Default Value | ThirdPartyTask-specific? |
---|---|---|---|
run_directory |
If provided, can be used to specify the directory from which a Task is run. |
None (not provided) |
NO |
set_result |
bool . If True search the model definition for a parameter that indicates what the result is. |
False |
NO |
result_from_params |
If set_result is True can define a result using this option and a validator. See also is_result below. |
None (not provided) |
NO |
short_flags_use_eq |
Use equals sign instead of space for arguments of - parameters. |
False |
YES - Only affects ThirdPartyTask s |
long_flags_use_eq |
Use equals sign instead of space for arguments of - parameters. |
False |
YES - Only affects ThirdPartyTask s |
These configuration options modify how the parameter models are parsed and passed along on the command-line, as well as what we consider results and where a Task
can run. The default behaviour is that parameters are assumed to be passed as -p arg
and --param arg
, the Task
will be run in the current working directory (or scratch if submitted with the ARP), and we have no information about Task
results . Setting the above options can modify this behaviour.
- By setting
short_flags_use_eq
and/orlong_flags_use_eq
toTrue
parameters are instead passed as-p=arg
and--param=arg
. - By setting
run_directory
to a valid path, we can force aTask
to be run in a specific directory. By default theTask
will be run from the directory you submit the job in, or from your scratch folder (/sdf/scratch/...
) if you submit from the eLog. SomeThirdPartyTask
s rely on searching the correct working directory in order run properly. - By setting
set_result
toTrue
we indicate that theTaskParameters
model will provide information on what theTaskResult
is. This setting must be used with one of two options, either theresult_from_params
Config
option, described below, or the Field attributeis_result
described in the next sub-section (Field Attributes). result_from_params
is a Config option that can be used whenset_result==True
. In conjunction with a validator (described a sections down) we can use this option to specify a result from all the information contained in the model. E.g. if you have aTask
that has parameters for anoutput_directory
and aoutput_filename
, you can setresult_from_params==f"{output_directory}/{output_filename}"
.
Field attributes
In addition to the global configuration options there are a couple of ways to specify individual parameters. The following Field
attributes are used when parsing the model:
Field Attribute | Meaning | Default Value | Example |
---|---|---|---|
description |
Documentation of the parameter's usage or purpose. | N/A | arg = Field(..., description="Argument for...") |
is_result |
bool . If the set_result Config option is True , we can set this to True to indicate a result. |
N/A | output_result = Field(..., is_result=true) |
Writing the Task
You can write your analysis code (or whatever code to be executed) as long as it adheres to the limited rules below. You can create a new module for your Task
in lute.tasks
or add it to any existing module, if it makes sense for it to belong there. The Task
itself is a single class constructed as:
- Your analysis
Task
is a class named in a way that matches its Pydantic model. E.g.RunTask
is theTask
, andRunTaskParameters
is the Pydantic model. - The class must inherit from the
Task
class (see template below). If you intend to use MPI see the following section. - You must provide an implementation of a
_run
method. This is the method that will be executed when theTask
is run. You can in addition write as many methods as you need. For fine-grained execution control you can also provide_pre_run()
and_post_run()
methods, but this is optional. - For all communication (including print statements) you should use the
_report_to_executor(msg: Message)
method. Since theTask
is run as a subprocess this method will pass information to the controllingExecutor
. You can pass any type of object using this method, strings, plots, arrays, etc. - If you did not use the
set_result
configuration option in your parameters model, make sure to provide a result when finished. This is done by settingself._result.payload = ...
. You can set the result to be any object. If you have written the result to a file, for example, please provide a path.
A minimal template is provided below.
"""Standard docstring..."""
__all__ = ["RunTask"]
__author__ = "" # Please include so we know who the SME is
# Include any imports you need here
from lute.execution.ipc import Message # Message for communication
from lute.io.models.my_task import RunTaskParameters # For TaskParameters
from lute.tasks.task import * # For Task
class RunTask(Task): # Inherit from Task
"""Task description goes here, or in __init__"""
def __init__(self, *, params: RunTaskParameters) -> None:
super().__init__(params=params) # Sets up Task, parameters, etc.
# Parameters will be available through:
# self._task_parameters
# You access with . operator: self._task_parameters.param1, etc.
# Your result object is availble through:
# self._result
# self._result.payload <- Main result
# self._result.summary <- Short summary
# self._result.task_status <- Semi-automatic, but can be set manually
def _run(self) -> None:
# THIS METHOD MUST BE PROVIDED
self.do_my_analysis()
def do_my_analysis(self) -> None:
# Send a message, proper way to print:
msg: Message(contents="My message contents", signal="")
self._report_to_executor(msg)
# When done, set result - assume we wrote a file, e.g.
self._result.payload = "/path/to/output_file.h5"
# Optionally also set status - good practice but not obligatory
self._result.task_status = TaskStatus.COMPLETED
Accessing your validated parameters
The parameters specified in your TaskParameters
model will be available from the self._task_parameters
object. They are accessed as attributes on this object, e.g. self._task_parameters.param1
. They can be used in any manner required by the running code.
Using MPI for your Task
In the case your Task
is written to use MPI
a slight modification to the template above is needed. Specifically, an additional keyword argument should be passed to the base class initializer: use_mpi=True
. This tells the base class to adjust signalling/communication behaviour appropriately for a multi-rank MPI program. Doing this prevents tricky-to-track-down problems due to ranks starting, completing and sending messages at different times. The rest of your code can, as before, be written as you see fit. The use of this keyword argument will also synchronize the start of all ranks and wait until all ranks have finished to exit.
"""Task which needs to run with MPI"""
__all__ = ["RunMPITask"]
__author__ = "" # Please include so we know who the SME is
# Include any imports you need here
from lute.execution.ipc import Message # Message for communication
from lute.io.models.base import * # For TaskParameters
from lute.tasks.task import * # For Task
# Only the init is shown
class RunMPITask(Task): # Inherit from Task
"""Task description goes here, or in __init__"""
# Signal the use of MPI!
def __init__(self, *, params: RunMPITaskParameters, use_mpi: bool = True) -> None:
super().__init__(params=params, use_mpi=use_mpi) # Sets up Task, parameters, etc.
# That's it.
Running your Task
in a different environment
In the case your Task
is intended to be run a different environment than the Executor
(usually the psana1 or psana2 environments), you will need to include the following changes to the template to allow the LUTE infrastructure to support this.
"""First-party Task which runs in a different environment."""
__all__ = ["RunTaskNewEnv"]
__author__ = "" # Please include so we know who the SME is
from typing import Optional, TYPE_CHECKING # This will be used to guard incompatible objects at run-time
# Include any imports you need here
from lute.execution.ipc import Message # Message for communication
# from lute.io.models.base import * # For TaskParameters, see below
from lute.tasks.task import * # For Task
from lute.io.parameters import RowIds # New option needed.
if TYPE_CHECKING:
# For type checking we will use your TaskParameters model, so import it here
from lute.io.models.my_task_type import RunTaskNewEnvParameters
else:
# This is the import case at RUN time. We cannot rely on `RunTaskNewEnvParameters`
# being importable in arbitrary environments, so we construct a surrogate.
# The following import is only to avoid errors with type hints. No other
# code changes are needed
from lute.io.parameters import TaskParameters
RunTaskNewEnvParameters = TaskParameters
# Only the init is shown
class RunTaskNewEnv(Task): # Inherit from Task
"""Task description goes here, or in __init__"""
# Provide the new `row_ids` argument
def __init__(self, *, params: RunTaskNewEnvParameters, row_ids: Optional[RowIds] = None) -> None:
super().__init__(params=params, row_ids=row_ids) # Sets up Task, parameters, etc.
# That's it - rest of Task code and accessing parameters is identical to before
For the purposes of running first-party code in a separate environment, we reconstruct the TaskParameters
objects into containers that require no external dependencies (and in particular pydantic). This allows the object to be constructed in any environment (with some restrictions, see below, as all the code is included in LUTE itself.
To summarize, the changes needed in order to support this mode of operation are:
-
A few minor import changes:
- Import the
TYPE_CHECKING
guard from Python'styping
module. - Import
RowIds
fromlute.io.parameters
if TYPE_CHECKING
import yourRunTaskNewEnvParameters
fromlute.io.models....
(this is the pydantic model). Otherwise, importTaskParameters
fromlute.io.parameters
, and THEN setRunTaskNewEnvParameters = TaskParameters
.
- Import the
-
Provide a new argument to the initializer:
row_ids: Optional[RowIds] = None
. This will then get passed to the super-class initialiazer:super().__init__(params=params, row_ids=row_ids)
.
Note: You can use this mode with MPI as well, in which case you also provide the use_mpi=True
argument.
Message signals
Signals in Message
objects are strings and can be one of the following:
LUTE_SIGNALS: Set[str] = {
"NO_PICKLE_MODE",
"TASK_STARTED",
"TASK_FAILED",
"TASK_STOPPED",
"TASK_DONE",
"TASK_CANCELLED",
"TASK_RESULT",
}
Each of these signals is associated with a hook on the Executor
-side. They are for the most part used by base classes; however, you can choose to make use of them manually as well.
Making your Task
available
Once the Task
has been written, it needs to be made available for import. Since different Task
s can have conflicting dependencies and environments, this is managed through an import function. When the Task
is done, or ready for testing, a condition is added to lute.tasks.__init__.import_task
. For example, assume the Task
is called RunXASAnalysis
and it's defined in a module called xas.py
, we would add the following lines to the import_task
function:
# in lute.tasks.__init__
# ...
def import_task(task_name: str) -> Type[Task]:
# ...
if task_name == "RunXASAnalysis":
from .xas import RunXASAnalysis
return RunXASAnalysis
Defining an Executor
The process of Executor
definition is identical to the process as described for ThirdPartyTask
s here. The one exception is if you defined the Task
to use MPI as described in the section above, you will likely consider using the MPIExecutor
.
Environment setup
By default, first-party Task
s are expected to use the same environment as the Executor
. When using the same environment, you can still use the environment update methods to insert specific environment variables, without any other additional changes (assuming these variables do not invalidate PYTHONPATH
, touch conda-specific variables, etc.). It is also possible to use a completely different environment. Please read the template above carefully.
Known Environment Restrictions
The third-party dependencies (outside of pydantic for parameter validation) have been kept relatively slim for the core LUTE infrastructure. This allows first-party code to run under most environments, albeit with the following minor restrictions.
- The environment must be Python 3.8+. Support can likely be extended to Python 3.7 (and potentially 3.5), if a use-case arises.
- There is a soft-dependency on the use of ZMQ for IPC between the
Task
andExecutor
layers. This, however, can be removed by setting the environmentLUTE_USE_ZMQ=0
when launching the ManagedTask
. There is a fully functional implementation using the Pythonsocket
module. - There is a dependency on pickle protocol 4 (current default). If the protocol default is changed in newer Python versions (or support is added for Python 3.7 and below), code changes will be needed to coordinate protocol versions used by the
Task
andExecutor
in order to serialize and deserialize communications between them. - The current "environment switching" mechanism is very naive, and expects the new environment to be a conda environment. In principle, it should not be difficult to add support for other types (spack, etc.); however, this is not currently in place. Speak with the maintainers if you require this support.
Setting Task
Results
The Executor
may decide to perform additional actions with specific Task
results based on their type. In the case of a first-party Task
, it is generally preferred to directly set the results rather than rely on tasklets
(see here).
In general, Task
s have a self._result
attribute, which is an instance of TaskResult
(lute.tasks.dataclasses.TaskResult
). There are two fields in particular to be user-set: summary
and payload
. The meaning of these fields is "squishy", and should be used in a manner that makes intuitive sense for the code being written. As an example, a payload
could be an output file, while a summary
could be a figure generated from that file. It is not necessary to define both (or even any) of these fields.
If the Executor
encounters the following objects in EITHER the summary
or payload
fields, it will take the corresponding action:
Dict[str, str]
: Post key/value pairs under the report section in the control tab of the eLog. These key/values are also posted as run parameters if possible. This return type is best used for short text summaries, e.g. indexing rate, execution time, etc. The key/value pairs are converted to a semi-colon delimited string for storage in the database. E.g.{"Rate": 0.05, "Total": 10}
will be stored asRate: 0.05;Total: 10
in the database.ElogSummaryPlots
: This special dataclass, defined inlute.tasks.dataclasses
is used to create an eLog summary plot under the Summaries tab. The path to the created HTML file is stored in the database.
If you provide a list or tuple of these objects as the summary
or payload
each of the items will be processed independently. For datbase archiving, the various entries are stored semi-colon delimited.