Base classes for implementing analysis tasks.
Classes:
Name | Description |
---|---|
Task |
Abstract base class from which all analysis tasks are derived. |
ThirdPartyTask |
Class to run a third-party executable binary as a |
DescribedAnalysis
dataclass
Complete analysis description. Held by an Executor.
Source code in lute/tasks/dataclasses.py
@dataclass
class DescribedAnalysis:
"""Complete analysis description. Held by an Executor."""
task_result: TaskResult
task_parameters: Optional[TaskParameters]
task_env: Dict[str, str]
poll_interval: float
communicator_desc: List[str]
ElogSummaryPlots
dataclass
Holds a graphical summary intended for display in the eLog.
Attributes: |
|
---|
Source code in lute/tasks/dataclasses.py
@dataclass
class ElogSummaryPlots:
"""Holds a graphical summary intended for display in the eLog.
Attributes:
display_name (str): This represents both a path and how the result will be
displayed in the eLog. Can include "/" characters. E.g.
`display_name = "scans/my_motor_scan"` will have plots shown
on a "my_motor_scan" page, under a "scans" tab. This format mirrors
how the file is stored on disk as well.
"""
display_name: str
figures: Union[pn.Tabs, hv.Image, plt.Figure]
Task
Bases: ABC
Abstract base class for analysis tasks.
Attributes: |
|
---|
Source code in lute/tasks/task.py
class Task(ABC):
"""Abstract base class for analysis tasks.
Attributes:
name (str): The name of the Task.
"""
def __init__(self, *, params: TaskParameters) -> None:
"""Initialize a Task.
Args:
params (TaskParameters): Parameters needed to properly configure
the analysis task. These are NOT related to execution parameters
(number of cores, etc), except, potentially, in case of binary
executable sub-classes.
"""
self.name: str = str(type(self)).split("'")[1].split(".")[-1]
self._result: TaskResult = TaskResult(
task_name=self.name,
task_status=TaskStatus.PENDING,
summary="PENDING",
payload="",
)
self._task_parameters: TaskParameters = params
timeout: int = self._task_parameters.lute_config.task_timeout
signal.setitimer(signal.ITIMER_REAL, timeout)
run_directory: Optional[str] = self._task_parameters.Config.run_directory
if run_directory is not None:
try:
os.chdir(run_directory)
except FileNotFoundError:
warnings.warn(
(
f"Attempt to change to {run_directory}, but it is not found!\n"
f"Will attempt to run from {os.getcwd()}. It may fail!"
),
category=UserWarning,
)
def run(self) -> None:
"""Calls the analysis routines and any pre/post task functions.
This method is part of the public API and should not need to be modified
in any subclasses.
"""
self._signal_start()
self._pre_run()
self._run()
self._post_run()
self._signal_result()
@abstractmethod
def _run(self) -> None:
"""Actual analysis to run. Overridden by subclasses.
Separating the calling API from the implementation allows `run` to
have pre and post task functionality embedded easily into a single
function call.
"""
...
def _pre_run(self) -> None:
"""Code to run BEFORE the main analysis takes place.
This function may, or may not, be employed by subclasses.
"""
...
def _post_run(self) -> None:
"""Code to run AFTER the main analysis takes place.
This function may, or may not, be employed by subclasses.
"""
...
@property
def result(self) -> TaskResult:
"""TaskResult: Read-only Task Result information."""
return self._result
def __call__(self) -> None:
self.run()
def _signal_start(self) -> None:
"""Send the signal that the Task will begin shortly."""
start_msg: Message = Message(
contents=self._task_parameters, signal="TASK_STARTED"
)
self._result.task_status = TaskStatus.RUNNING
self._report_to_executor(start_msg)
def _signal_result(self) -> None:
"""Send the signal that results are ready along with the results."""
signal: str = "TASK_RESULT"
results_msg: Message = Message(contents=self.result, signal=signal)
self._report_to_executor(results_msg)
time.sleep(0.1)
def _report_to_executor(self, msg: Message) -> None:
"""Send a message to the Executor.
Details of `Communicator` choice are hidden from the caller. This
method may be overriden by subclasses with specialized functionality.
Args:
msg (Message): The message object to send.
"""
communicator: Communicator
if isinstance(msg.contents, str) or msg.contents is None:
communicator = PipeCommunicator()
else:
communicator = SocketCommunicator()
communicator.write(msg)
def clean_up_timeout(self) -> None:
"""Perform any necessary cleanup actions before exit if timing out."""
...
result: TaskResult
property
TaskResult: Read-only Task Result information.
__init__(*, params)
Initialize a Task.
Parameters: |
|
---|
Source code in lute/tasks/task.py
def __init__(self, *, params: TaskParameters) -> None:
"""Initialize a Task.
Args:
params (TaskParameters): Parameters needed to properly configure
the analysis task. These are NOT related to execution parameters
(number of cores, etc), except, potentially, in case of binary
executable sub-classes.
"""
self.name: str = str(type(self)).split("'")[1].split(".")[-1]
self._result: TaskResult = TaskResult(
task_name=self.name,
task_status=TaskStatus.PENDING,
summary="PENDING",
payload="",
)
self._task_parameters: TaskParameters = params
timeout: int = self._task_parameters.lute_config.task_timeout
signal.setitimer(signal.ITIMER_REAL, timeout)
run_directory: Optional[str] = self._task_parameters.Config.run_directory
if run_directory is not None:
try:
os.chdir(run_directory)
except FileNotFoundError:
warnings.warn(
(
f"Attempt to change to {run_directory}, but it is not found!\n"
f"Will attempt to run from {os.getcwd()}. It may fail!"
),
category=UserWarning,
)
clean_up_timeout()
Perform any necessary cleanup actions before exit if timing out.
Source code in lute/tasks/task.py
def clean_up_timeout(self) -> None:
"""Perform any necessary cleanup actions before exit if timing out."""
...
run()
Calls the analysis routines and any pre/post task functions.
This method is part of the public API and should not need to be modified in any subclasses.
Source code in lute/tasks/task.py
def run(self) -> None:
"""Calls the analysis routines and any pre/post task functions.
This method is part of the public API and should not need to be modified
in any subclasses.
"""
self._signal_start()
self._pre_run()
self._run()
self._post_run()
self._signal_result()
TaskResult
dataclass
Class for storing the result of a Task's execution with metadata.
Attributes: |
|
---|
Source code in lute/tasks/dataclasses.py
@dataclass
class TaskResult:
"""Class for storing the result of a Task's execution with metadata.
Attributes:
task_name (str): Name of the associated task which produced it.
task_status (TaskStatus): Status of associated task.
summary (str): Short message/summary associated with the result.
payload (Any): Actual result. May be data in any format.
impl_schemas (Optional[str]): A string listing `Task` schemas implemented
by the associated `Task`. Schemas define the category and expected
output of the `Task`. An individual task may implement/conform to
multiple schemas. Multiple schemas are separated by ';', e.g.
* impl_schemas = "schema1;schema2"
"""
task_name: str
task_status: TaskStatus
summary: str
payload: Any
impl_schemas: Optional[str] = None
TaskStatus
Bases: Enum
Possible Task statuses.
Source code in lute/tasks/dataclasses.py
class TaskStatus(Enum):
"""Possible Task statuses."""
PENDING = 0
"""
Task has yet to run. Is Queued, or waiting for prior tasks.
"""
RUNNING = 1
"""
Task is in the process of execution.
"""
COMPLETED = 2
"""
Task has completed without fatal errors.
"""
FAILED = 3
"""
Task encountered a fatal error.
"""
STOPPED = 4
"""
Task was, potentially temporarily, stopped/suspended.
"""
CANCELLED = 5
"""
Task was cancelled prior to completion or failure.
"""
TIMEDOUT = 6
"""
Task did not reach completion due to timeout.
"""
CANCELLED = 5
class-attribute
instance-attribute
Task was cancelled prior to completion or failure.
COMPLETED = 2
class-attribute
instance-attribute
Task has completed without fatal errors.
FAILED = 3
class-attribute
instance-attribute
Task encountered a fatal error.
PENDING = 0
class-attribute
instance-attribute
Task has yet to run. Is Queued, or waiting for prior tasks.
RUNNING = 1
class-attribute
instance-attribute
Task is in the process of execution.
STOPPED = 4
class-attribute
instance-attribute
Task was, potentially temporarily, stopped/suspended.
TIMEDOUT = 6
class-attribute
instance-attribute
Task did not reach completion due to timeout.
ThirdPartyTask
Bases: Task
A Task
interface to analysis with binary executables.
Source code in lute/tasks/task.py
class ThirdPartyTask(Task):
"""A `Task` interface to analysis with binary executables."""
def __init__(self, *, params: TaskParameters) -> None:
"""Initialize a Task.
Args:
params (TaskParameters): Parameters needed to properly configure
the analysis task. `Task`s of this type MUST include the name
of a binary to run and any arguments which should be passed to
it (as would be done via command line). The binary is included
with the parameter `executable`. All other parameter names are
assumed to be the long/extended names of the flag passed on the
command line:
* `arg_name = 3` is converted to `--arg_name 3`
Positional arguments can be included with `p_argN` where `N` is
any integer:
* `p_arg1 = 3` is converted to `3`
"""
super().__init__(params=params)
self._cmd = self._task_parameters.executable
self._args_list: List[str] = [self._cmd]
self._template_context: Dict[str, Any] = {}
def _add_to_jinja_context(self, param_name: str, value: Any) -> None:
"""Store a parameter as a Jinja template variable.
Variables are stored in a dictionary which is used to fill in a
premade Jinja template for a third party configuration file.
Args:
param_name (str): Name to store the variable as. This should be
the name defined in the corresponding pydantic model. This name
MUST match the name used in the Jinja Template!
value (Any): The value to store. If possible, large chunks of the
template should be represented as a single dictionary for
simplicity; however, any type can be stored as needed.
"""
context_update: Dict[str, Any] = {param_name: value}
if __debug__:
msg: Message = Message(contents=f"TemplateParameters: {context_update}")
self._report_to_executor(msg)
self._template_context.update(context_update)
def _template_to_config_file(self) -> None:
"""Convert a template file into a valid configuration file.
Uses Jinja to fill in a provided template file with variables supplied
through the LUTE config file. This facilitates parameter modification
for third party tasks which use a separate configuration, in addition
to, or instead of, command-line arguments.
"""
from jinja2 import Environment, FileSystemLoader, Template
out_file: str = self._task_parameters.lute_template_cfg.output_path
template_name: str = self._task_parameters.lute_template_cfg.template_name
lute_path: Optional[str] = os.getenv("LUTE_PATH")
template_dir: str
if lute_path is None:
warnings.warn(
"LUTE_PATH is None in Task process! Using relative path for templates!",
category=UserWarning,
)
template_dir: str = "../../config/templates"
else:
template_dir = f"{lute_path}/config/templates"
environment: Environment = Environment(loader=FileSystemLoader(template_dir))
template: Template = environment.get_template(template_name)
with open(out_file, "w", encoding="utf-8") as cfg_out:
cfg_out.write(template.render(self._template_context))
def _pre_run(self) -> None:
"""Parse the parameters into an appropriate argument list.
Arguments are identified by a `flag_type` attribute, defined in the
pydantic model, which indicates how to pass the parameter and its
argument on the command-line. This method parses flag:value pairs
into an appropriate list to be used to call the executable.
Note:
ThirdPartyParameter objects are returned by custom model validators.
Objects of this type are assumed to be used for a templated config
file used by the third party executable for configuration. The parsing
of these parameters is performed separately by a template file used as
an input to Jinja. This method solely identifies the necessary objects
and passes them all along. Refer to the template files and pydantic
models for more information on how these parameters are defined and
identified.
"""
super()._pre_run()
full_schema: Dict[str, Union[str, Dict[str, Any]]] = (
self._task_parameters.schema()
)
short_flags_use_eq: bool
long_flags_use_eq: bool
if hasattr(self._task_parameters.Config, "short_flags_use_eq"):
short_flags_use_eq: bool = self._task_parameters.Config.short_flags_use_eq
long_flags_use_eq: bool = self._task_parameters.Config.long_flags_use_eq
else:
short_flags_use_eq = False
long_flags_use_eq = False
for param, value in self._task_parameters.dict().items():
# Clunky test with __dict__[param] because compound model-types are
# converted to `dict`. E.g. type(value) = dict not AnalysisHeader
if (
param == "executable"
or value is None # Cannot have empty values in argument list for execvp
or value == "" # But do want to include, e.g. 0
or isinstance(self._task_parameters.__dict__[param], TemplateConfig)
or isinstance(self._task_parameters.__dict__[param], AnalysisHeader)
):
continue
if isinstance(self._task_parameters.__dict__[param], TemplateParameters):
# TemplateParameters objects have a single parameter `params`
self._add_to_jinja_context(param_name=param, value=value.params)
continue
param_attributes: Dict[str, Any] = full_schema["properties"][param]
# Some model params do not match the commnad-line parameter names
param_repr: str
if "rename_param" in param_attributes:
param_repr = param_attributes["rename_param"]
else:
param_repr = param
if "flag_type" in param_attributes:
flag: str = param_attributes["flag_type"]
if flag:
# "-" or "--" flags
if flag == "--" and isinstance(value, bool) and not value:
continue
constructed_flag: str = f"{flag}{param_repr}"
if flag == "--" and isinstance(value, bool) and value:
# On/off flag, e.g. something like --verbose: No Arg
self._args_list.append(f"{constructed_flag}")
continue
if (flag == "-" and short_flags_use_eq) or (
flag == "--" and long_flags_use_eq
): # Must come after above check! Otherwise you get --param=True
# Flags following --param=value or -param=value
constructed_flag = f"{constructed_flag}={value}"
self._args_list.append(f"{constructed_flag}")
continue
self._args_list.append(f"{constructed_flag}")
else:
warnings.warn(
(
f"Model parameters should be defined using Field(...,flag_type='')"
f" in the future. Parameter: {param}"
),
category=PendingDeprecationWarning,
)
if len(param) == 1: # Single-dash flags
if short_flags_use_eq:
self._args_list.append(f"-{param_repr}={value}")
continue
self._args_list.append(f"-{param_repr}")
elif "p_arg" in param: # Positional arguments
pass
else: # Double-dash flags
if isinstance(value, bool) and not value:
continue
if long_flags_use_eq:
self._args_list.append(f"--{param_repr}={value}")
continue
self._args_list.append(f"--{param_repr}")
if isinstance(value, bool) and value:
continue
if isinstance(value, str) and " " in value:
for val in value.split():
self._args_list.append(f"{val}")
else:
self._args_list.append(f"{value}")
if (
hasattr(self._task_parameters, "lute_template_cfg")
and self._template_context
):
self._template_to_config_file()
def _run(self) -> None:
"""Execute the new program by replacing the current process."""
if __debug__:
time.sleep(0.1)
msg: Message = Message(contents=self._formatted_command())
self._report_to_executor(msg)
LUTE_DEBUG_EXIT("LUTE_DEBUG_BEFORE_TPP_EXEC")
os.execvp(file=self._cmd, args=self._args_list)
def _formatted_command(self) -> str:
"""Returns the command as it would passed on the command-line."""
formatted_cmd: str = "".join(f"{arg} " for arg in self._args_list)
return formatted_cmd
def _signal_start(self) -> None:
"""Override start signal method to switch communication methods."""
super()._signal_start()
time.sleep(0.05)
signal: str = "NO_PICKLE_MODE"
msg: Message = Message(signal=signal)
self._report_to_executor(msg)
__init__(*, params)
Initialize a Task.
Parameters: |
|
---|
Source code in lute/tasks/task.py
def __init__(self, *, params: TaskParameters) -> None:
"""Initialize a Task.
Args:
params (TaskParameters): Parameters needed to properly configure
the analysis task. `Task`s of this type MUST include the name
of a binary to run and any arguments which should be passed to
it (as would be done via command line). The binary is included
with the parameter `executable`. All other parameter names are
assumed to be the long/extended names of the flag passed on the
command line:
* `arg_name = 3` is converted to `--arg_name 3`
Positional arguments can be included with `p_argN` where `N` is
any integer:
* `p_arg1 = 3` is converted to `3`
"""
super().__init__(params=params)
self._cmd = self._task_parameters.executable
self._args_list: List[str] = [self._cmd]
self._template_context: Dict[str, Any] = {}