Skip to content

task

Base classes for implementing analysis tasks.

Classes:

Name Description
Task

Abstract base class from which all analysis tasks are derived.

ThirdPartyTask

Class to run a third-party executable binary as a Task.

Task

Bases: ABC

Abstract base class for analysis tasks.

Attributes:

Name Type Description
name str

The name of the Task.

Source code in lute/tasks/task.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
class Task(ABC):
    """Abstract base class for analysis tasks.

    Attributes:
        name (str): The name of the Task.
    """

    def __init__(self, *, params: TaskParameters, use_mpi: bool = False) -> None:
        """Initialize a Task.

        Args:
            params (TaskParameters): Parameters needed to properly configure
                the analysis task. These are NOT related to execution parameters
                (number of cores, etc), except, potentially, in case of binary
                executable sub-classes.

            use_mpi (bool): Whether this Task requires the use of MPI.
                This determines the behaviour and timing of certain signals
                and ensures appropriate barriers are placed to not end
                processing until all ranks have finished.
        """
        self.name: str = str(type(self)).split("'")[1].split(".")[-1]
        self._result: TaskResult = TaskResult(
            task_name=self.name,
            task_status=TaskStatus.PENDING,
            summary="PENDING",
            payload="",
        )
        self._task_parameters: TaskParameters = params
        if (
            hasattr(self._task_parameters.Config, "result_from_params")
            and self._task_parameters.Config.result_from_params is not None
        ):
            object.__setattr__(
                self._task_parameters,
                "_result_from_params",
                self._task_parameters.Config.result_from_params,
            )
        timeout: int = self._task_parameters.lute_config.task_timeout
        signal.setitimer(signal.ITIMER_REAL, timeout)

        run_directory: Optional[str] = self._task_parameters.Config.run_directory
        if run_directory is not None:
            try:
                os.chdir(run_directory)
            except FileNotFoundError:
                warnings.warn(
                    (
                        f"Attempt to change to {run_directory}, but it is not found!\n"
                        f"Will attempt to run from {os.getcwd()}. It may fail!"
                    ),
                    category=UserWarning,
                )
        self._use_mpi: bool = use_mpi

    def run(self) -> None:
        """Calls the analysis routines and any pre/post task functions.

        This method is part of the public API and should not need to be modified
        in any subclasses.
        """
        self._signal_start()
        self._pre_run()
        self._run()
        self._post_run()
        self._signal_result()

    @abstractmethod
    def _run(self) -> None:
        """Actual analysis to run. Overridden by subclasses.

        Separating the calling API from the implementation allows `run` to
        have pre and post task functionality embedded easily into a single
        function call.
        """
        ...

    def _pre_run(self) -> None:
        """Code to run BEFORE the main analysis takes place.

        This function may, or may not, be employed by subclasses.
        """
        ...

    def _post_run(self) -> None:
        """Code to run AFTER the main analysis takes place.

        This function may, or may not, be employed by subclasses.
        """
        ...

    @property
    def result(self) -> TaskResult:
        """TaskResult: Read-only Task Result information."""
        return self._result

    def __call__(self) -> None:
        self.run()

    def _signal_start(self) -> None:
        """Send the signal that the Task will begin shortly."""
        start_msg: Message = Message(
            contents=self._task_parameters, signal="TASK_STARTED"
        )
        self._result.task_status = TaskStatus.RUNNING
        if self._use_mpi:
            from mpi4py import MPI

            comm: MPI.Intracomm = MPI.COMM_WORLD
            rank: int = comm.Get_rank()
            comm.Barrier()
            if rank == 0:
                self._report_to_executor(start_msg)
        else:
            self._report_to_executor(start_msg)

        # We stop process here so Executor can do any tasklet work if needed
        if os.getenv("LUTE_CONFIGPATH") is not None:
            # Guard w/ environment variable that is set only by Executor - don't
            # SIGSTOP if Task is running without Executor
            os.kill(os.getpid(), signal.SIGSTOP)

    def _signal_result(self) -> None:
        """Send the signal that results are ready along with the results."""
        signal: str = "TASK_RESULT"
        results_msg: Message = Message(contents=self.result, signal=signal)
        if self._use_mpi:
            from mpi4py import MPI

            comm: MPI.Intracomm = MPI.COMM_WORLD
            rank: int = comm.Get_rank()
            comm.Barrier()
            if rank == 0:
                self._report_to_executor(results_msg)
        else:
            self._report_to_executor(results_msg)
        time.sleep(0.1)

    def _report_to_executor(self, msg: Message) -> None:
        """Send a message to the Executor.

        Details of `Communicator` choice are hidden from the caller. This
        method may be overriden by subclasses with specialized functionality.

        Args:
            msg (Message): The message object to send.
        """
        communicator: Communicator
        if isinstance(msg.contents, str) or msg.contents is None:
            communicator = PipeCommunicator()
        else:
            communicator = SocketCommunicator()

        communicator.delayed_setup()
        communicator.write(msg)
        communicator.clear_communicator()

    def clean_up_timeout(self) -> None:
        """Perform any necessary cleanup actions before exit if timing out."""
        ...

result: TaskResult property

TaskResult: Read-only Task Result information.

__init__(*, params, use_mpi=False)

Initialize a Task.

Parameters:

Name Type Description Default
params TaskParameters

Parameters needed to properly configure the analysis task. These are NOT related to execution parameters (number of cores, etc), except, potentially, in case of binary executable sub-classes.

required
use_mpi bool

Whether this Task requires the use of MPI. This determines the behaviour and timing of certain signals and ensures appropriate barriers are placed to not end processing until all ranks have finished.

False
Source code in lute/tasks/task.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def __init__(self, *, params: TaskParameters, use_mpi: bool = False) -> None:
    """Initialize a Task.

    Args:
        params (TaskParameters): Parameters needed to properly configure
            the analysis task. These are NOT related to execution parameters
            (number of cores, etc), except, potentially, in case of binary
            executable sub-classes.

        use_mpi (bool): Whether this Task requires the use of MPI.
            This determines the behaviour and timing of certain signals
            and ensures appropriate barriers are placed to not end
            processing until all ranks have finished.
    """
    self.name: str = str(type(self)).split("'")[1].split(".")[-1]
    self._result: TaskResult = TaskResult(
        task_name=self.name,
        task_status=TaskStatus.PENDING,
        summary="PENDING",
        payload="",
    )
    self._task_parameters: TaskParameters = params
    if (
        hasattr(self._task_parameters.Config, "result_from_params")
        and self._task_parameters.Config.result_from_params is not None
    ):
        object.__setattr__(
            self._task_parameters,
            "_result_from_params",
            self._task_parameters.Config.result_from_params,
        )
    timeout: int = self._task_parameters.lute_config.task_timeout
    signal.setitimer(signal.ITIMER_REAL, timeout)

    run_directory: Optional[str] = self._task_parameters.Config.run_directory
    if run_directory is not None:
        try:
            os.chdir(run_directory)
        except FileNotFoundError:
            warnings.warn(
                (
                    f"Attempt to change to {run_directory}, but it is not found!\n"
                    f"Will attempt to run from {os.getcwd()}. It may fail!"
                ),
                category=UserWarning,
            )
    self._use_mpi: bool = use_mpi

_post_run()

Code to run AFTER the main analysis takes place.

This function may, or may not, be employed by subclasses.

Source code in lute/tasks/task.py
144
145
146
147
148
149
def _post_run(self) -> None:
    """Code to run AFTER the main analysis takes place.

    This function may, or may not, be employed by subclasses.
    """
    ...

_pre_run()

Code to run BEFORE the main analysis takes place.

This function may, or may not, be employed by subclasses.

Source code in lute/tasks/task.py
137
138
139
140
141
142
def _pre_run(self) -> None:
    """Code to run BEFORE the main analysis takes place.

    This function may, or may not, be employed by subclasses.
    """
    ...

_report_to_executor(msg)

Send a message to the Executor.

Details of Communicator choice are hidden from the caller. This method may be overriden by subclasses with specialized functionality.

Parameters:

Name Type Description Default
msg Message

The message object to send.

required
Source code in lute/tasks/task.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def _report_to_executor(self, msg: Message) -> None:
    """Send a message to the Executor.

    Details of `Communicator` choice are hidden from the caller. This
    method may be overriden by subclasses with specialized functionality.

    Args:
        msg (Message): The message object to send.
    """
    communicator: Communicator
    if isinstance(msg.contents, str) or msg.contents is None:
        communicator = PipeCommunicator()
    else:
        communicator = SocketCommunicator()

    communicator.delayed_setup()
    communicator.write(msg)
    communicator.clear_communicator()

_run() abstractmethod

Actual analysis to run. Overridden by subclasses.

Separating the calling API from the implementation allows run to have pre and post task functionality embedded easily into a single function call.

Source code in lute/tasks/task.py
127
128
129
130
131
132
133
134
135
@abstractmethod
def _run(self) -> None:
    """Actual analysis to run. Overridden by subclasses.

    Separating the calling API from the implementation allows `run` to
    have pre and post task functionality embedded easily into a single
    function call.
    """
    ...

_signal_result()

Send the signal that results are ready along with the results.

Source code in lute/tasks/task.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def _signal_result(self) -> None:
    """Send the signal that results are ready along with the results."""
    signal: str = "TASK_RESULT"
    results_msg: Message = Message(contents=self.result, signal=signal)
    if self._use_mpi:
        from mpi4py import MPI

        comm: MPI.Intracomm = MPI.COMM_WORLD
        rank: int = comm.Get_rank()
        comm.Barrier()
        if rank == 0:
            self._report_to_executor(results_msg)
    else:
        self._report_to_executor(results_msg)
    time.sleep(0.1)

_signal_start()

Send the signal that the Task will begin shortly.

Source code in lute/tasks/task.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def _signal_start(self) -> None:
    """Send the signal that the Task will begin shortly."""
    start_msg: Message = Message(
        contents=self._task_parameters, signal="TASK_STARTED"
    )
    self._result.task_status = TaskStatus.RUNNING
    if self._use_mpi:
        from mpi4py import MPI

        comm: MPI.Intracomm = MPI.COMM_WORLD
        rank: int = comm.Get_rank()
        comm.Barrier()
        if rank == 0:
            self._report_to_executor(start_msg)
    else:
        self._report_to_executor(start_msg)

    # We stop process here so Executor can do any tasklet work if needed
    if os.getenv("LUTE_CONFIGPATH") is not None:
        # Guard w/ environment variable that is set only by Executor - don't
        # SIGSTOP if Task is running without Executor
        os.kill(os.getpid(), signal.SIGSTOP)

clean_up_timeout()

Perform any necessary cleanup actions before exit if timing out.

Source code in lute/tasks/task.py
217
218
219
def clean_up_timeout(self) -> None:
    """Perform any necessary cleanup actions before exit if timing out."""
    ...

run()

Calls the analysis routines and any pre/post task functions.

This method is part of the public API and should not need to be modified in any subclasses.

Source code in lute/tasks/task.py
115
116
117
118
119
120
121
122
123
124
125
def run(self) -> None:
    """Calls the analysis routines and any pre/post task functions.

    This method is part of the public API and should not need to be modified
    in any subclasses.
    """
    self._signal_start()
    self._pre_run()
    self._run()
    self._post_run()
    self._signal_result()

ThirdPartyTask

Bases: Task

A Task interface to analysis with binary executables.

Source code in lute/tasks/task.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
class ThirdPartyTask(Task):
    """A `Task` interface to analysis with binary executables."""

    def __init__(self, *, params: ThirdPartyParameters) -> None:
        """Initialize a Task.

        Args:
            params (TaskParameters): Parameters needed to properly configure
                the analysis task. `Task`s of this type MUST include the name
                of a binary to run and any arguments which should be passed to
                it (as would be done via command line). The binary is included
                with the parameter `executable`. All other parameter names are
                assumed to be the long/extended names of the flag passed on the
                command line by default:
                    * `arg_name = 3` is converted to `--arg_name 3`
                Positional arguments can be included with `p_argN` where `N` is
                any integer:
                    * `p_arg1 = 3` is converted to `3`

                Note that it is NOT recommended to rely on this default behaviour
                as command-line arguments can be passed in many ways. Refer to
                the dcoumentation at
                https://slac-lcls.github.io/lute/tutorial/new_task/
                under "Speciyfing a TaskParameters Model for your Task" for more
                information on how to control parameter parsing from within your
                TaskParameters model definition.
        """
        super().__init__(params=params)
        if not hasattr(self._task_parameters, "executable"):
            raise RuntimeError("ThirdPartyTask must have executable defined!")
        self._cmd = self._task_parameters.executable
        self._args_list: List[str] = [self._cmd]
        self._template_context: Dict[str, Any] = {}

    def _add_to_jinja_context(self, param_name: str, value: Any) -> None:
        """Store a parameter as a Jinja template variable.

        Variables are stored in a dictionary which is used to fill in a
        premade Jinja template for a third party configuration file.

        Args:
            param_name (str): Name to store the variable as. This should be
                the name defined in the corresponding pydantic model. This name
                MUST match the name used in the Jinja Template!
            value (Any): The value to store. If possible, large chunks of the
                template should be represented as a single dictionary for
                simplicity; however, any type can be stored as needed.
        """
        context_update: Dict[str, Any] = {param_name: value}
        if __debug__:
            msg: Message = Message(contents=f"TemplateParameters: {context_update}")
            self._report_to_executor(msg)
        self._template_context.update(context_update)

    def _template_to_config_file(self) -> None:
        """Convert a template file into a valid configuration file.

        Uses Jinja to fill in a provided template file with variables supplied
        through the LUTE config file. This facilitates parameter modification
        for third party tasks which use a separate configuration, in addition
        to, or instead of, command-line arguments.
        """
        from jinja2 import Environment, FileSystemLoader, Template

        if not hasattr(self._task_parameters, "lute_template_cfg"):
            raise RuntimeError("Missing lute_template_cfg! Cannot compile template!")

        out_file: str = self._task_parameters.lute_template_cfg.output_path
        template_name: str = self._task_parameters.lute_template_cfg.template_name

        lute_path: Optional[str] = os.getenv("LUTE_PATH")
        template_dir: str
        if lute_path is None:
            warnings.warn(
                "LUTE_PATH is None in Task process! Using relative path for templates!",
                category=UserWarning,
            )
            template_dir = "../../config/templates"
        else:
            template_dir = f"{lute_path}/config/templates"
        environment: Environment = Environment(loader=FileSystemLoader(template_dir))
        template: Template = environment.get_template(template_name)

        with open(out_file, "w", encoding="utf-8") as cfg_out:
            cfg_out.write(template.render(self._template_context))

    def _pre_run(self) -> None:
        """Parse the parameters into an appropriate argument list.

        Arguments are identified by a `flag_type` attribute, defined in the
        pydantic model, which indicates how to pass the parameter and its
        argument on the command-line. This method parses flag:value pairs
        into an appropriate list to be used to call the executable.

        Note:
        ThirdPartyParameter objects are returned by custom model validators.
        Objects of this type are assumed to be used for a templated config
        file used by the third party executable for configuration. The parsing
        of these parameters is performed separately by a template file used as
        an input to Jinja. This method solely identifies the necessary objects
        and passes them all along. Refer to the template files and pydantic
        models for more information on how these parameters are defined and
        identified.
        """
        super()._pre_run()
        full_schema: Dict[str, Any] = self._task_parameters.schema()
        short_flags_use_eq: bool
        long_flags_use_eq: bool
        if hasattr(self._task_parameters.Config, "short_flags_use_eq"):
            short_flags_use_eq = self._task_parameters.Config.short_flags_use_eq
        else:
            short_flags_use_eq = False

        if hasattr(self._task_parameters.Config, "long_flags_use_eq"):
            long_flags_use_eq = self._task_parameters.Config.long_flags_use_eq
        else:
            long_flags_use_eq = False
        for param, value in self._task_parameters.dict().items():
            # Clunky test with __dict__[param] because compound model-types are
            # converted to `dict`. E.g. type(value) = dict not AnalysisHeader
            if (
                param in ("executable", "_result_from_params")
                or value is None  # Cannot have empty values in argument list for execvp
                or value == ""  # But do want to include, e.g. 0
                or isinstance(self._task_parameters.__dict__[param], TemplateConfig)
                or isinstance(self._task_parameters.__dict__[param], AnalysisHeader)
            ):
                continue
            if isinstance(self._task_parameters.__dict__[param], TemplateParameters):
                # TemplateParameters objects have a single parameter `params`
                self._add_to_jinja_context(param_name=param, value=value.params)
                continue

            param_attributes: Dict[str, Any] = full_schema["properties"][param]
            # Some model params do not match the commnad-line parameter names
            param_repr: str
            if "rename_param" in param_attributes:
                param_repr = param_attributes["rename_param"]
            else:
                param_repr = param
            if "flag_type" in param_attributes:
                flag: str = param_attributes["flag_type"]
                if flag:
                    # "-" or "--" flags
                    if flag == "--" and isinstance(value, bool) and not value:
                        continue
                    constructed_flag: str = f"{flag}{param_repr}"
                    if flag == "--" and isinstance(value, bool) and value:
                        # On/off flag, e.g. something like --verbose: No Arg
                        self._args_list.append(f"{constructed_flag}")
                        continue
                    if (flag == "-" and short_flags_use_eq) or (
                        flag == "--" and long_flags_use_eq
                    ):  # Must come after above check! Otherwise you get --param=True
                        # Flags following --param=value or -param=value
                        constructed_flag = f"{constructed_flag}={value}"
                        self._args_list.append(f"{constructed_flag}")
                        continue
                    self._args_list.append(f"{constructed_flag}")
            else:
                warnings.warn(
                    (
                        f"Model parameters should be defined using Field(...,flag_type='')"
                        f" in the future.  Parameter: {param}"
                    ),
                    category=PendingDeprecationWarning,
                )
                if len(param) == 1:  # Single-dash flags
                    if short_flags_use_eq:
                        self._args_list.append(f"-{param_repr}={value}")
                        continue
                    self._args_list.append(f"-{param_repr}")
                elif "p_arg" in param:  # Positional arguments
                    pass
                else:  # Double-dash flags
                    if isinstance(value, bool) and not value:
                        continue
                    if long_flags_use_eq:
                        self._args_list.append(f"--{param_repr}={value}")
                        continue
                    self._args_list.append(f"--{param_repr}")
                    if isinstance(value, bool) and value:
                        continue
            if isinstance(value, str) and " " in value:
                for val in value.split():
                    self._args_list.append(f"{val}")
            else:
                self._args_list.append(f"{value}")
        if (
            hasattr(self._task_parameters, "lute_template_cfg")
            and self._template_context
        ):
            self._template_to_config_file()

    def _run(self) -> None:
        """Execute the new program by replacing the current process."""
        if __debug__:
            time.sleep(0.1)
            msg: Message = Message(contents=self._formatted_command())
            self._report_to_executor(msg)
        LUTE_DEBUG_EXIT("LUTE_DEBUG_BEFORE_TPP_EXEC")
        self._setup_env()
        os.execvp(file=self._cmd, args=self._args_list)

    def _formatted_command(self) -> str:
        """Returns the command as it would passed on the command-line."""
        formatted_cmd: str = "".join(f"{arg} " for arg in self._args_list)
        return formatted_cmd

    def _signal_start(self) -> None:
        """Override start signal method to switch communication methods."""
        super()._signal_start()
        time.sleep(0.05)
        signal: str = "NO_PICKLE_MODE"
        msg: Message = Message(signal=signal)
        self._report_to_executor(msg)

    def _setup_env(self) -> None:
        new_env: Dict[str, str] = {}
        for key, value in os.environ.items():
            if "LUTE_TENV_" in key:
                # Set if using a custom environment
                new_key: str = key[10:]
                new_env[new_key] = value
        os.environ.update(new_env)

__init__(*, params)

Initialize a Task.

Parameters:

Name Type Description Default
params TaskParameters

Parameters needed to properly configure the analysis task. Tasks of this type MUST include the name of a binary to run and any arguments which should be passed to it (as would be done via command line). The binary is included with the parameter executable. All other parameter names are assumed to be the long/extended names of the flag passed on the command line by default: * arg_name = 3 is converted to --arg_name 3 Positional arguments can be included with p_argN where N is any integer: * p_arg1 = 3 is converted to 3

Note that it is NOT recommended to rely on this default behaviour as command-line arguments can be passed in many ways. Refer to the dcoumentation at https://slac-lcls.github.io/lute/tutorial/new_task/ under "Speciyfing a TaskParameters Model for your Task" for more information on how to control parameter parsing from within your TaskParameters model definition.

required
Source code in lute/tasks/task.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def __init__(self, *, params: ThirdPartyParameters) -> None:
    """Initialize a Task.

    Args:
        params (TaskParameters): Parameters needed to properly configure
            the analysis task. `Task`s of this type MUST include the name
            of a binary to run and any arguments which should be passed to
            it (as would be done via command line). The binary is included
            with the parameter `executable`. All other parameter names are
            assumed to be the long/extended names of the flag passed on the
            command line by default:
                * `arg_name = 3` is converted to `--arg_name 3`
            Positional arguments can be included with `p_argN` where `N` is
            any integer:
                * `p_arg1 = 3` is converted to `3`

            Note that it is NOT recommended to rely on this default behaviour
            as command-line arguments can be passed in many ways. Refer to
            the dcoumentation at
            https://slac-lcls.github.io/lute/tutorial/new_task/
            under "Speciyfing a TaskParameters Model for your Task" for more
            information on how to control parameter parsing from within your
            TaskParameters model definition.
    """
    super().__init__(params=params)
    if not hasattr(self._task_parameters, "executable"):
        raise RuntimeError("ThirdPartyTask must have executable defined!")
    self._cmd = self._task_parameters.executable
    self._args_list: List[str] = [self._cmd]
    self._template_context: Dict[str, Any] = {}

_add_to_jinja_context(param_name, value)

Store a parameter as a Jinja template variable.

Variables are stored in a dictionary which is used to fill in a premade Jinja template for a third party configuration file.

Parameters:

Name Type Description Default
param_name str

Name to store the variable as. This should be the name defined in the corresponding pydantic model. This name MUST match the name used in the Jinja Template!

required
value Any

The value to store. If possible, large chunks of the template should be represented as a single dictionary for simplicity; however, any type can be stored as needed.

required
Source code in lute/tasks/task.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def _add_to_jinja_context(self, param_name: str, value: Any) -> None:
    """Store a parameter as a Jinja template variable.

    Variables are stored in a dictionary which is used to fill in a
    premade Jinja template for a third party configuration file.

    Args:
        param_name (str): Name to store the variable as. This should be
            the name defined in the corresponding pydantic model. This name
            MUST match the name used in the Jinja Template!
        value (Any): The value to store. If possible, large chunks of the
            template should be represented as a single dictionary for
            simplicity; however, any type can be stored as needed.
    """
    context_update: Dict[str, Any] = {param_name: value}
    if __debug__:
        msg: Message = Message(contents=f"TemplateParameters: {context_update}")
        self._report_to_executor(msg)
    self._template_context.update(context_update)

_formatted_command()

Returns the command as it would passed on the command-line.

Source code in lute/tasks/task.py
426
427
428
429
def _formatted_command(self) -> str:
    """Returns the command as it would passed on the command-line."""
    formatted_cmd: str = "".join(f"{arg} " for arg in self._args_list)
    return formatted_cmd

_pre_run()

Parse the parameters into an appropriate argument list.

Arguments are identified by a flag_type attribute, defined in the pydantic model, which indicates how to pass the parameter and its argument on the command-line. This method parses flag:value pairs into an appropriate list to be used to call the executable.

Note: ThirdPartyParameter objects are returned by custom model validators. Objects of this type are assumed to be used for a templated config file used by the third party executable for configuration. The parsing of these parameters is performed separately by a template file used as an input to Jinja. This method solely identifies the necessary objects and passes them all along. Refer to the template files and pydantic models for more information on how these parameters are defined and identified.

Source code in lute/tasks/task.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def _pre_run(self) -> None:
    """Parse the parameters into an appropriate argument list.

    Arguments are identified by a `flag_type` attribute, defined in the
    pydantic model, which indicates how to pass the parameter and its
    argument on the command-line. This method parses flag:value pairs
    into an appropriate list to be used to call the executable.

    Note:
    ThirdPartyParameter objects are returned by custom model validators.
    Objects of this type are assumed to be used for a templated config
    file used by the third party executable for configuration. The parsing
    of these parameters is performed separately by a template file used as
    an input to Jinja. This method solely identifies the necessary objects
    and passes them all along. Refer to the template files and pydantic
    models for more information on how these parameters are defined and
    identified.
    """
    super()._pre_run()
    full_schema: Dict[str, Any] = self._task_parameters.schema()
    short_flags_use_eq: bool
    long_flags_use_eq: bool
    if hasattr(self._task_parameters.Config, "short_flags_use_eq"):
        short_flags_use_eq = self._task_parameters.Config.short_flags_use_eq
    else:
        short_flags_use_eq = False

    if hasattr(self._task_parameters.Config, "long_flags_use_eq"):
        long_flags_use_eq = self._task_parameters.Config.long_flags_use_eq
    else:
        long_flags_use_eq = False
    for param, value in self._task_parameters.dict().items():
        # Clunky test with __dict__[param] because compound model-types are
        # converted to `dict`. E.g. type(value) = dict not AnalysisHeader
        if (
            param in ("executable", "_result_from_params")
            or value is None  # Cannot have empty values in argument list for execvp
            or value == ""  # But do want to include, e.g. 0
            or isinstance(self._task_parameters.__dict__[param], TemplateConfig)
            or isinstance(self._task_parameters.__dict__[param], AnalysisHeader)
        ):
            continue
        if isinstance(self._task_parameters.__dict__[param], TemplateParameters):
            # TemplateParameters objects have a single parameter `params`
            self._add_to_jinja_context(param_name=param, value=value.params)
            continue

        param_attributes: Dict[str, Any] = full_schema["properties"][param]
        # Some model params do not match the commnad-line parameter names
        param_repr: str
        if "rename_param" in param_attributes:
            param_repr = param_attributes["rename_param"]
        else:
            param_repr = param
        if "flag_type" in param_attributes:
            flag: str = param_attributes["flag_type"]
            if flag:
                # "-" or "--" flags
                if flag == "--" and isinstance(value, bool) and not value:
                    continue
                constructed_flag: str = f"{flag}{param_repr}"
                if flag == "--" and isinstance(value, bool) and value:
                    # On/off flag, e.g. something like --verbose: No Arg
                    self._args_list.append(f"{constructed_flag}")
                    continue
                if (flag == "-" and short_flags_use_eq) or (
                    flag == "--" and long_flags_use_eq
                ):  # Must come after above check! Otherwise you get --param=True
                    # Flags following --param=value or -param=value
                    constructed_flag = f"{constructed_flag}={value}"
                    self._args_list.append(f"{constructed_flag}")
                    continue
                self._args_list.append(f"{constructed_flag}")
        else:
            warnings.warn(
                (
                    f"Model parameters should be defined using Field(...,flag_type='')"
                    f" in the future.  Parameter: {param}"
                ),
                category=PendingDeprecationWarning,
            )
            if len(param) == 1:  # Single-dash flags
                if short_flags_use_eq:
                    self._args_list.append(f"-{param_repr}={value}")
                    continue
                self._args_list.append(f"-{param_repr}")
            elif "p_arg" in param:  # Positional arguments
                pass
            else:  # Double-dash flags
                if isinstance(value, bool) and not value:
                    continue
                if long_flags_use_eq:
                    self._args_list.append(f"--{param_repr}={value}")
                    continue
                self._args_list.append(f"--{param_repr}")
                if isinstance(value, bool) and value:
                    continue
        if isinstance(value, str) and " " in value:
            for val in value.split():
                self._args_list.append(f"{val}")
        else:
            self._args_list.append(f"{value}")
    if (
        hasattr(self._task_parameters, "lute_template_cfg")
        and self._template_context
    ):
        self._template_to_config_file()

_run()

Execute the new program by replacing the current process.

Source code in lute/tasks/task.py
416
417
418
419
420
421
422
423
424
def _run(self) -> None:
    """Execute the new program by replacing the current process."""
    if __debug__:
        time.sleep(0.1)
        msg: Message = Message(contents=self._formatted_command())
        self._report_to_executor(msg)
    LUTE_DEBUG_EXIT("LUTE_DEBUG_BEFORE_TPP_EXEC")
    self._setup_env()
    os.execvp(file=self._cmd, args=self._args_list)

_signal_start()

Override start signal method to switch communication methods.

Source code in lute/tasks/task.py
431
432
433
434
435
436
437
def _signal_start(self) -> None:
    """Override start signal method to switch communication methods."""
    super()._signal_start()
    time.sleep(0.05)
    signal: str = "NO_PICKLE_MODE"
    msg: Message = Message(signal=signal)
    self._report_to_executor(msg)

_template_to_config_file()

Convert a template file into a valid configuration file.

Uses Jinja to fill in a provided template file with variables supplied through the LUTE config file. This facilitates parameter modification for third party tasks which use a separate configuration, in addition to, or instead of, command-line arguments.

Source code in lute/tasks/task.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def _template_to_config_file(self) -> None:
    """Convert a template file into a valid configuration file.

    Uses Jinja to fill in a provided template file with variables supplied
    through the LUTE config file. This facilitates parameter modification
    for third party tasks which use a separate configuration, in addition
    to, or instead of, command-line arguments.
    """
    from jinja2 import Environment, FileSystemLoader, Template

    if not hasattr(self._task_parameters, "lute_template_cfg"):
        raise RuntimeError("Missing lute_template_cfg! Cannot compile template!")

    out_file: str = self._task_parameters.lute_template_cfg.output_path
    template_name: str = self._task_parameters.lute_template_cfg.template_name

    lute_path: Optional[str] = os.getenv("LUTE_PATH")
    template_dir: str
    if lute_path is None:
        warnings.warn(
            "LUTE_PATH is None in Task process! Using relative path for templates!",
            category=UserWarning,
        )
        template_dir = "../../config/templates"
    else:
        template_dir = f"{lute_path}/config/templates"
    environment: Environment = Environment(loader=FileSystemLoader(template_dir))
    template: Template = environment.get_template(template_name)

    with open(out_file, "w", encoding="utf-8") as cfg_out:
        cfg_out.write(template.render(self._template_context))