Creating a "First-Party" Task
The process for creating a "First-Party" Task is very similar to that for a "Third-Party" Task, with the difference being that you must also write the analysis code. The steps for integration are:
- Write the
TaskParametersmodel. -
Write the
Taskclass. There are a few rules that need to be adhered to. Pay special attention if you require either: -
Make your
Taskavailable by modifying the import function. - Specify an
Executor
Specifying a TaskParameters Model for your Task
Parameter models have a format that must be followed for "Third-Party" Tasks, but "First-Party" Tasks have a little more liberty in how parameters are dealt with, since the Task will do all the parsing itself.
To create a model, the basic steps are:
- If necessary, create a new module (e.g.
new_task_category.py) underlute.io.models, or find an appropriate pre-existing module in that directory.- An
importstatement must be added tolute.io.models._init_if a new module is created, so it can be found. - If defining the model in a pre-existing module, make sure to modify the
__all__statement to include it.
- An
-
Create a new model that inherits from
TaskParameters. You can look atlute.models.io.tests.TestReadOutputParametersfor an example. The model must be named<YourTaskName>Parameters- You should include all relevant parameters here, including input file, output file, and any potentially adjustable parameters. These parameters must be included even if there are some implicit dependencies between
Tasks and it would make sense for the parameter to be auto-populated based on some other output. Creating this dependency is done with validators (see step 3.). All parameters should be overridable, and allTasks should be fully-independently configurable, based solely on their model and the configuration YAML. - To follow the preferred format, parameters should be defined as:
param_name: type = Field([default value], description="This parameter does X.")
- You should include all relevant parameters here, including input file, output file, and any potentially adjustable parameters. These parameters must be included even if there are some implicit dependencies between
-
Use validators to do more complex things for your parameters, including populating default values dynamically:
- E.g. create default values that depend on other parameters in the model - see for example: SubmitSMDParameters.
- E.g. create default values that depend on other
Tasks by reading from the database - see for example: TestReadOutputParameters.
-
The model will have access to some general configuration values by inheriting from
TaskParameters. These parameters are all stored inlute_configwhich is an instance ofAnalysisHeader(defined here).- For example, the experiment and run number can be obtained from this object and a validator could use these values to define the default input file for the
Task.
- For example, the experiment and run number can be obtained from this object and a validator could use these values to define the default input file for the
A number of configuration options and Field attributes are also available for "First-Party" Task models. These are identical to those used for the ThirdPartyTasks, although there is a smaller selection. These options are reproduced below for convenience.
Config settings and options
Under the class definition for Config in the model, we can modify global options for all the parameters. In addition, there are a number of configuration options related to specifying what the outputs/results from the associated Task are, and a number of options to modify runtime behaviour. Currently, the available configuration options are:
| Config Parameter | Meaning | Default Value | ThirdPartyTask-specific? |
|---|---|---|---|
run_directory |
If provided, can be used to specify the directory from which a Task is run. |
None (not provided) |
NO |
set_result |
bool. If True search the model definition for a parameter that indicates what the result is. |
False |
NO |
result_from_params |
If set_result is True can define a result using this option and a validator. See also is_result below. |
None (not provided) |
NO |
short_flags_use_eq |
Use equals sign instead of space for arguments of - parameters. |
False |
YES - Only affects ThirdPartyTasks |
long_flags_use_eq |
Use equals sign instead of space for arguments of - parameters. |
False |
YES - Only affects ThirdPartyTasks |
These configuration options modify how the parameter models are parsed and passed along on the command-line, as well as what we consider results and where a Task can run. The default behaviour is that parameters are assumed to be passed as -p arg and --param arg, the Task will be run in the current working directory (or scratch if submitted with the ARP), and we have no information about Task results . Setting the above options can modify this behaviour.
- By setting
short_flags_use_eqand/orlong_flags_use_eqtoTrueparameters are instead passed as-p=argand--param=arg. - By setting
run_directoryto a valid path, we can force aTaskto be run in a specific directory. By default theTaskwill be run from the directory you submit the job in, or from your scratch folder (/sdf/scratch/...) if you submit from the eLog. SomeThirdPartyTasks rely on searching the correct working directory in order run properly. - By setting
set_resulttoTruewe indicate that theTaskParametersmodel will provide information on what theTaskResultis. This setting must be used with one of two options, either theresult_from_paramsConfigoption, described below, or the Field attributeis_resultdescribed in the next sub-section (Field Attributes). result_from_paramsis a Config option that can be used whenset_result==True. In conjunction with a validator (described a sections down) we can use this option to specify a result from all the information contained in the model. E.g. if you have aTaskthat has parameters for anoutput_directoryand aoutput_filename, you can setresult_from_params==f"{output_directory}/{output_filename}".
Field attributes
In addition to the global configuration options there are a couple of ways to specify individual parameters. The following Field attributes are used when parsing the model:
| Field Attribute | Meaning | Default Value | Example |
|---|---|---|---|
description |
Documentation of the parameter's usage or purpose. | N/A | arg = Field(..., description="Argument for...") |
is_result |
bool. If the set_result Config option is True, we can set this to True to indicate a result. |
N/A | output_result = Field(..., is_result=true) |
Writing the Task
You can write your analysis code (or whatever code to be executed) as long as it adheres to the limited rules below. You can create a new module for your Task in lute.tasks or add it to any existing module, if it makes sense for it to belong there. The Task itself is a single class constructed as:
- Your analysis
Taskis a class named in a way that matches its Pydantic model. E.g.RunTaskis theTask, andRunTaskParametersis the Pydantic model. - The class must inherit from the
Taskclass (see template below). If you intend to use MPI see the following section. - You must provide an implementation of a
_runmethod. This is the method that will be executed when theTaskis run. You can in addition write as many methods as you need. For fine-grained execution control you can also provide_pre_run()and_post_run()methods, but this is optional. - For all communication (including print statements) you should use the
_report_to_executor(msg: Message)method. Since theTaskis run as a subprocess this method will pass information to the controllingExecutor. You can pass any type of object using this method, strings, plots, arrays, etc. - If you did not use the
set_resultconfiguration option in your parameters model, make sure to provide a result when finished. This is done by settingself._result.payload = .... You can set the result to be any object. If you have written the result to a file, for example, please provide a path.
A minimal template is provided below.
"""Standard docstring..."""
__all__ = ["RunTask"]
__author__ = "" # Please include so we know who the SME is
# Include any imports you need here
from lute.execution.ipc import Message # Message for communication
from lute.io.models.my_task import RunTaskParameters # For TaskParameters
from lute.tasks.task import * # For Task
class RunTask(Task): # Inherit from Task
"""Task description goes here, or in __init__"""
def __init__(self, *, params: RunTaskParameters) -> None:
super().__init__(params=params) # Sets up Task, parameters, etc.
# Parameters will be available through:
# self._task_parameters
# You access with . operator: self._task_parameters.param1, etc.
# Your result object is availble through:
# self._result
# self._result.payload <- Main result
# self._result.summary <- Short summary
# self._result.task_status <- Semi-automatic, but can be set manually
def _run(self) -> None:
# THIS METHOD MUST BE PROVIDED
self.do_my_analysis()
def do_my_analysis(self) -> None:
# Send a message, proper way to print:
msg: Message(contents="My message contents", signal="")
self._report_to_executor(msg)
# When done, set result - assume we wrote a file, e.g.
self._result.payload = "/path/to/output_file.h5"
# Optionally also set status - good practice but not obligatory
self._result.task_status = TaskStatus.COMPLETED
Accessing your validated parameters
The parameters specified in your TaskParameters model will be available from the self._task_parameters object. They are accessed as attributes on this object, e.g. self._task_parameters.param1. They can be used in any manner required by the running code.
Using MPI for your Task
In the case your Task is written to use MPI a slight modification to the template above is needed. Specifically, an additional keyword argument should be passed to the base class initializer: use_mpi=True. This tells the base class to adjust signalling/communication behaviour appropriately for a multi-rank MPI program. Doing this prevents tricky-to-track-down problems due to ranks starting, completing and sending messages at different times. The rest of your code can, as before, be written as you see fit. The use of this keyword argument will also synchronize the start of all ranks and wait until all ranks have finished to exit.
"""Task which needs to run with MPI"""
__all__ = ["RunMPITask"]
__author__ = "" # Please include so we know who the SME is
# Include any imports you need here
from lute.execution.ipc import Message # Message for communication
from lute.io.models.base import * # For TaskParameters
from lute.tasks.task import * # For Task
# Only the init is shown
class RunMPITask(Task): # Inherit from Task
"""Task description goes here, or in __init__"""
# Signal the use of MPI!
def __init__(self, *, params: RunMPITaskParameters, use_mpi: bool = True) -> None:
super().__init__(params=params, use_mpi=use_mpi) # Sets up Task, parameters, etc.
# That's it.
Running your Task in a different environment
In the case your Task is intended to be run a different environment than the Executor (usually the psana1 or psana2 environments), you will need to include the following changes to the template to allow the LUTE infrastructure to support this.
"""First-party Task which runs in a different environment."""
__all__ = ["RunTaskNewEnv"]
__author__ = "" # Please include so we know who the SME is
from typing import Optional, TYPE_CHECKING # This will be used to guard incompatible objects at run-time
# Include any imports you need here
from lute.execution.ipc import Message # Message for communication
# from lute.io.models.base import * # For TaskParameters, see below
from lute.tasks.task import * # For Task
from lute.io.parameters import RowIds # New option needed.
if TYPE_CHECKING:
# For type checking we will use your TaskParameters model, so import it here
from lute.io.models.my_task_type import RunTaskNewEnvParameters
else:
# This is the import case at RUN time. We cannot rely on `RunTaskNewEnvParameters`
# being importable in arbitrary environments, so we construct a surrogate.
# The following import is only to avoid errors with type hints. No other
# code changes are needed
from lute.io.parameters import TaskParameters
RunTaskNewEnvParameters = TaskParameters
# Only the init is shown
class RunTaskNewEnv(Task): # Inherit from Task
"""Task description goes here, or in __init__"""
# Provide the new `row_ids` argument
def __init__(self, *, params: RunTaskNewEnvParameters, row_ids: Optional[RowIds] = None) -> None:
super().__init__(params=params, row_ids=row_ids) # Sets up Task, parameters, etc.
# That's it - rest of Task code and accessing parameters is identical to before
For the purposes of running first-party code in a separate environment, we reconstruct the TaskParameters objects into containers that require no external dependencies (and in particular pydantic). This allows the object to be constructed in any environment (with some restrictions, see below, as all the code is included in LUTE itself.
To summarize, the changes needed in order to support this mode of operation are:
-
A few minor import changes:
- Import the
TYPE_CHECKINGguard from Python'stypingmodule. - Import
RowIdsfromlute.io.parameters if TYPE_CHECKINGimport yourRunTaskNewEnvParametersfromlute.io.models....(this is the pydantic model). Otherwise, importTaskParametersfromlute.io.parameters, and THEN setRunTaskNewEnvParameters = TaskParameters.
- Import the
-
Provide a new argument to the initializer:
row_ids: Optional[RowIds] = None. This will then get passed to the super-class initialiazer:super().__init__(params=params, row_ids=row_ids).
Note: You can use this mode with MPI as well, in which case you also provide the use_mpi=True argument.
Message signals
Signals in Message objects are strings and can be one of the following:
LUTE_SIGNALS: Set[str] = {
"NO_PICKLE_MODE",
"TASK_STARTED",
"TASK_FAILED",
"TASK_STOPPED",
"TASK_DONE",
"TASK_CANCELLED",
"TASK_RESULT",
}
Each of these signals is associated with a hook on the Executor-side. They are for the most part used by base classes; however, you can choose to make use of them manually as well.
Making your Task available
Once the Task has been written, it needs to be made available for import. Since different Tasks can have conflicting dependencies and environments, this is managed through an import function. When the Task is done, or ready for testing, a condition is added to lute.tasks.__init__.import_task. For example, assume the Task is called RunXASAnalysis and it's defined in a module called xas.py, we would add the following lines to the import_task function:
# in lute.tasks.__init__
# ...
def import_task(task_name: str) -> Type[Task]:
# ...
if task_name == "RunXASAnalysis":
from .xas import RunXASAnalysis
return RunXASAnalysis
Defining an Executor
The process of Executor definition is identical to the process as described for ThirdPartyTasks here. The one exception is if you defined the Task to use MPI as described in the section above, you will likely consider using the MPIExecutor.
Environment setup
By default, first-party Tasks are expected to use the same environment as the Executor. When using the same environment, you can still use the environment update methods to insert specific environment variables, without any other additional changes (assuming these variables do not invalidate PYTHONPATH, touch conda-specific variables, etc.). It is also possible to use a completely different environment. Please read the template above carefully.
Known Environment Restrictions
The third-party dependencies (outside of pydantic for parameter validation) have been kept relatively slim for the core LUTE infrastructure. This allows first-party code to run under most environments, albeit with the following minor restrictions.
- The environment must be Python 3.8+. Support can likely be extended to Python 3.7 (and potentially 3.5), if a use-case arises.
- There is a soft-dependency on the use of ZMQ for IPC between the
TaskandExecutorlayers. This, however, can be removed by setting the environmentLUTE_USE_ZMQ=0when launching the ManagedTask. There is a fully functional implementation using the Pythonsocketmodule. - There is a dependency on pickle protocol 4 (current default). If the protocol default is changed in newer Python versions (or support is added for Python 3.7 and below), code changes will be needed to coordinate protocol versions used by the
TaskandExecutorin order to serialize and deserialize communications between them. - The current "environment switching" mechanism is very naive, and expects the new environment to be a conda environment. In principle, it should not be difficult to add support for other types (spack, etc.); however, this is not currently in place. Speak with the maintainers if you require this support.
Setting Task Results
The Executor may decide to perform additional actions with specific Task results based on their type. In the case of a first-party Task, it is generally preferred to directly set the results rather than rely on tasklets (see here).
In general, Tasks have a self._result attribute, which is an instance of TaskResult (lute.tasks.dataclasses.TaskResult). There are two fields in particular to be user-set: summary and payload. The meaning of these fields is "squishy", and should be used in a manner that makes intuitive sense for the code being written. As an example, a payload could be an output file, while a summary could be a figure generated from that file. It is not necessary to define both (or even any) of these fields.
If the Executor encounters the following objects in EITHER the summary or payload fields, it will take the corresponding action:
Dict[str, str]: Post key/value pairs under the report section in the control tab of the eLog. These key/values are also posted as run parameters if possible. This return type is best used for short text summaries, e.g. indexing rate, execution time, etc. The key/value pairs are converted to a semi-colon delimited string for storage in the database. E.g.{"Rate": 0.05, "Total": 10}will be stored asRate: 0.05;Total: 10in the database.ElogSummaryPlots: This special dataclass, defined inlute.tasks.dataclassesis used to create an eLog summary plot under the Summaries tab. The path to the created HTML file is stored in the database.
If you provide a list or tuple of these objects as the summary or payload each of the items will be processed independently. For datbase archiving, the various entries are stored semi-colon delimited.