Skip to content

executor

Base classes and functions for handling Task execution.

Executors run a Task as a subprocess and handle all communication with other services, e.g., the eLog. They accept specific handlers to override default stream parsing.

Event handlers/hooks are implemented as standalone functions which can be added to an Executor.

Classes:

Name Description
BaseExecutor

Abstract base class from which all Executors are derived.

Executor

Default Executor implementing all basic functionality and IPC.

MPIExecutor

Runs exactly as the Executor but submits the Task using MPI.

Exceptions

BaseExecutor

Bases: ABC

ABC to manage Task execution and communication with user services.

When running in a workflow, "tasks" (not the class instances) are submitted as Executors. The Executor manages environment setup, the actual Task submission, and communication regarding Task results and status with third party services like the eLog.

Attributes:

Methods:

Name Description
add_hook

str, hook: Callable[[None], None]) -> None: Create a new hook to be called each time a specific event occurs.

add_default_hooks

Populate the event hooks with the default functions.

update_environment

Dict[str, str], update_path: str): Update the environment that is passed to the Task subprocess.

execute_task

Run the task as a subprocess.

Source code in lute/execution/executor.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
class BaseExecutor(ABC):
    """ABC to manage Task execution and communication with user services.

    When running in a workflow, "tasks" (not the class instances) are submitted
    as `Executors`. The Executor manages environment setup, the actual Task
    submission, and communication regarding Task results and status with third
    party services like the eLog.

    Attributes:

    Methods:
        add_hook(event: str, hook: Callable[[None], None]) -> None: Create a
            new hook to be called each time a specific event occurs.

        add_default_hooks() -> None: Populate the event hooks with the default
            functions.

        update_environment(env: Dict[str, str], update_path: str): Update the
            environment that is passed to the Task subprocess.

        execute_task(): Run the task as a subprocess.
    """

    Hooks: Type[ExecutorHooks] = ExecutorHooks

    def __init__(
        self,
        task_name: str,
        communicators: List[Communicator],
        poll_interval: float = 0.05,
    ) -> None:
        """The Executor will manage the subprocess in which `task_name` is run.

        Args:
            task_name (str): The name of the Task to be submitted. Must match
                the Task's class name exactly. The parameter specification must
                also be in a properly named model to be identified.

            communicators (List[Communicator]): A list of one or more
                communicators which manage information flow to/from the Task.
                Subclasses may have different defaults, and new functionality
                can be introduced by composing Executors with communicators.

            poll_interval (float): Time to wait between reading/writing to the
                managed subprocess. In seconds.
        """
        result: TaskResult = TaskResult(
            task_name=task_name, task_status=TaskStatus.PENDING, summary="", payload=""
        )
        task_parameters: Optional[TaskParameters] = None
        task_env: Dict[str, str] = os.environ.copy()
        self._communicators: List[Communicator] = communicators
        communicator_desc: List[str] = []
        for comm in self._communicators:
            comm.stage_communicator()
            communicator_desc.append(str(comm))

        self._analysis_desc: DescribedAnalysis = DescribedAnalysis(
            task_result=result,
            task_parameters=task_parameters,
            task_env=task_env,
            poll_interval=poll_interval,
            communicator_desc=communicator_desc,
        )
        self._tasklets: TaskletDict = {"before": None, "after": None}
        self._shell_source_script: Optional[str] = None

    def add_tasklet(
        self,
        tasklet: Callable,
        args: List[Any],
        when: Union[Literal["before"], Literal["after"]] = "after",
        set_result: bool = False,
        set_summary: bool = False,
    ) -> None:
        """Add/register a tasklet to be run by the Executor.

        Adds a tasklet to be run by the Executor in addition to the main Task.
        The tasklet can be run before or after the main Task has been run.

        Args:
            tasklet (Callable[[Any], Any]): The tasklet (function) to run.

            args (List[Any]): A list of all the arguments to be passed to the
                tasklet. Arguments can include substitutions for parameters to
                be extracted from the TaskParameters object. The same jinja-like
                syntax used in configuration file substiutions is used to specify
                a parameter substitution in an argument. E.g. if a Task to be
                run has a parameter `input_file`, the parameter can be substituted
                in the tasklet argument using: `"{{ input_file  }}"`. Note that
                substitutions must be passed as strings. Conversions will be done
                during substitution if required.

            when (str): When to run the tasklet. Either `before` or `after` the
                main Task. Default is after.

            set_result (bool): Whether to use the output from the tasklet as the
                result of the main Task. Default is False.

            set_summary (bool): Whether to use the output from the tasklet as the
                summary of the main Task. Default is False.
        """
        if when not in ("before", "after"):
            logger.error("Can only run tasklet `before` or `after` Task! Ignoring...")
            return
        tasklet_tuple: Tuple[Callable[[Any], Any], List[Any], bool, bool]
        tasklet_tuple = (tasklet, args, set_result, set_summary)
        if self._tasklets[when] is None:
            self._tasklets[when] = [tasklet_tuple]
        else:
            assert isinstance(self._tasklets[when], list)
            cast(list, self._tasklets[when]).append(tasklet_tuple)

    def _sub_tasklet_parameters(self, args: List[Any]) -> List[Any]:
        """Substitute tasklet arguments using TaskParameters members."""
        sub_pattern = r"\{\{[^}{]*\}\}"
        new_args: List[Any] = []
        for arg in args:
            new_arg: Any = arg
            matches: List[str] = re.findall(sub_pattern, arg)
            for m in matches:
                param_name: str = m[2:-2].strip()  # Remove {{}}
                params: Any = self._analysis_desc.task_parameters
                if "." in param_name:
                    # Iterate so we can substitute e.g. {{ lute_config.run }}
                    hier_param_list: List[str] = param_name.split(".")
                    for idx, param in enumerate(hier_param_list):
                        if hasattr(params, param):
                            if idx != len(hier_param_list) - 1:
                                params = getattr(params, param)
                            param_name = param
                        else:
                            break
                if hasattr(params, param_name):
                    pattern: str = m.replace("{{", r"\{\{").replace("}}", r"\}\}")
                    sub: Any = getattr(params, param_name)
                    new_arg = re.sub(pattern, str(sub), new_arg)
                if new_arg.isnumeric():
                    new_arg = int(new_arg)
                else:
                    try:
                        new_arg = float(new_arg)
                    except ValueError:
                        pass
            new_args.append(new_arg)
        return new_args

    def _run_tasklets(
        self, *, when: Union[Literal["before"], Literal["after"]]
    ) -> None:
        """Run all tasklets of the specified kind."""
        if when not in self._tasklets.keys():
            logger.error(f"Ignore request to run tasklets of unknown kind: {when}")
            return
        if self._tasklets[when] is None:
            logger.debug(f"No tasklets to run {when}.")
            return
        for tasklet_spec in cast(list, self._tasklets[when]):
            tasklet: Callable[[Any], Any]
            args: List[Any]
            set_result: bool
            set_summary: bool
            tasklet, args, set_result, set_summary = tasklet_spec
            args = self._sub_tasklet_parameters(args)
            logger.debug(f"Running {tasklet} with {args}")
            output: Any
            try:
                output = tasklet(*args)  # Many don't return anything
            except Exception as err:
                logger.error(f"Tasklet failed! Error: {err}")
                output = None
            # We set result payloads or summaries now, but the processing is done
            # by process_results method called sometime after the last tasklet
            tmp: Any
            if set_result and output is not None:
                if isinstance(self._analysis_desc.task_result.payload, list):
                    # We have multiple payloads to process, append to list
                    self._analysis_desc.task_result.payload.append(output)
                elif self._analysis_desc.task_result.payload != "":
                    # We have one payload already, convert to list and append
                    tmp = self._analysis_desc.task_result.payload
                    self._analysis_desc.task_result.payload = []
                    self._analysis_desc.task_result.payload.append(tmp)
                    self._analysis_desc.task_result.payload.append(output)
                else:
                    # Payload == "" - i.e. hasn't been set
                    self._analysis_desc.task_result.payload = output
            if set_summary and output is not None:
                if isinstance(self._analysis_desc.task_result.summary, list):
                    # We have multiple summary objects to process, append to list
                    self._analysis_desc.task_result.summary.append(output)
                elif self._analysis_desc.task_result.summary != "":
                    # We have one summary already, convert to list and append
                    tmp = self._analysis_desc.task_result.summary
                    self._analysis_desc.task_result.summary = []
                    self._analysis_desc.task_result.summary.append(tmp)
                    self._analysis_desc.task_result.summary.append(output)
                else:
                    # Summary == "" - i.e. hasn't been set
                    self._analysis_desc.task_result.summary = output

    def add_hook(
        self,
        event: str,
        # hook: Callable[[Self, Message, Optional[subprocess.Popen]], Optional[bool]],
        hook: Hook,
    ) -> None:
        """Add a new hook.

        Each hook is a function called any time the Executor receives a signal
        for a particular event, e.g. Task starts, Task ends, etc. Calling this
        method will remove any hook that currently exists for the event. I.e.
        only one hook can be called per event at a time. Creating hooks for
        events which do not exist is not allowed.

        Args:
            event (str): The event for which the hook will be called.

            hook (Callable[[None], None]) The function to be called during each
                occurrence of the event.
        """
        if event.upper() in LUTE_SIGNALS:
            setattr(self.Hooks, event.lower(), hook)

    @abstractmethod
    def add_default_hooks(self) -> None:
        """Populate the set of default event hooks."""

        ...

    def update_environment(
        self, env: Dict[str, str], update_path: str = "prepend"
    ) -> None:
        """Update the stored set of environment variables.

        These are passed to the subprocess to setup its environment.

        Args:
            env (Dict[str, str]): A dictionary of "VAR":"VALUE" pairs of
                environment variables to be added to the subprocess environment.
                If any variables already exist, the new variables will
                overwrite them (except PATH, see below).

            update_path (str): If PATH is present in the new set of variables,
                this argument determines how the old PATH is dealt with. There
                are three options:
                * "prepend" : The new PATH values are prepended to the old ones.
                * "append" : The new PATH values are appended to the old ones.
                * "overwrite" : The old PATH is overwritten by the new one.
                "prepend" is the default option. If PATH is not present in the
                current environment, the new PATH is used without modification.
        """
        if "PATH" in env:
            sep: str = os.pathsep
            if update_path == "prepend":
                env["PATH"] = (
                    f"{env['PATH']}{sep}{self._analysis_desc.task_env['PATH']}"
                )
            elif update_path == "append":
                env["PATH"] = (
                    f"{self._analysis_desc.task_env['PATH']}{sep}{env['PATH']}"
                )
            elif update_path == "overwrite":
                pass
            else:
                raise ValueError(
                    (
                        f"{update_path} is not a valid option for `update_path`!"
                        " Options are: prepend, append, overwrite."
                    )
                )
        os.environ.update(env)
        self._analysis_desc.task_env.update(env)

    def shell_source(self, env: str) -> None:
        """Source a script.

        Unlike `update_environment` this method sources a new file.

        We prepend a token to each environment variable. This allows the initial
        part of the Task to be run using the appropriate environment.

        The environment variables containing the token will be swapped in using
        their appropriate form prior to the actual execution of Task code.

        Args:
            env (str): Path to the script to source.
        """
        self._shell_source_script = env

    def _shell_source(self) -> None:
        """Actually shell source step.

        This is run prior to Task execution.
        """
        if self._shell_source_script is None:
            logger.error("Called _shell_source without defining source script!")
            return
        if not os.path.exists(self._shell_source_script):
            logger.error(f"Cannot source environment from {self._shell_source_script}!")
            return

        script: str = (
            f"set -a\n"
            f'source "{self._shell_source_script}" >/dev/null\n'
            f'{sys.executable} -c "import os; print(dict(os.environ))"\n'
        )
        logger.info(f"Sourcing file {self._shell_source_script}")
        o, e = subprocess.Popen(
            ["bash", "-c", script], stdout=subprocess.PIPE
        ).communicate()
        tmp_environment: Dict[str, str] = eval(o)
        new_environment: Dict[str, str] = {}
        for key, value in tmp_environment.items():
            # Make sure LUTE vars are available
            if "LUTE_" in key or key in ("RUN", "EXPERIMENT"):
                new_environment[key] = value
            else:
                new_environment[f"LUTE_TENV_{key}"] = value
        self._analysis_desc.task_env = new_environment

    def _pre_task(self) -> None:
        """Any actions to be performed before task submission.

        This method may or may not be used by subclasses. It may be useful
        for logging etc.
        """
        # This prevents the Executors in managed_tasks.py from all acquiring
        # resources like sockets.
        for communicator in self._communicators:
            communicator.delayed_setup()
            # Not great, but experience shows we need a bit of time to setup
            # network.
            time.sleep(0.1)
        # Propagate any env vars setup by Communicators - only update LUTE_ vars
        tmp: Dict[str, str] = {
            key: os.environ[key] for key in os.environ if "LUTE_" in key
        }
        self._analysis_desc.task_env.update(tmp)

    def _submit_task(self, cmd: str) -> subprocess.Popen:
        proc: subprocess.Popen = subprocess.Popen(
            cmd.split(),
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            env=self._analysis_desc.task_env,
        )
        if proc.stdout is not None:
            os.set_blocking(proc.stdout.fileno(), False)
        if proc.stderr is not None:
            os.set_blocking(proc.stderr.fileno(), False)
        return proc

    @abstractmethod
    def _task_loop(self, proc: subprocess.Popen) -> None:
        """Actions to perform while the Task is running.

        This function is run in the body of a loop until the Task signals
        that its finished.
        """
        ...

    @abstractmethod
    def _finalize_task(self, proc: subprocess.Popen) -> None:
        """Any actions to be performed after the Task has ended.

        Examples include a final clearing of the pipes, retrieving results,
        reporting to third party services, etc.
        """
        ...

    def _submit_cmd(self, executable_path: str, params: str) -> str:
        """Return a formatted command for launching Task subprocess.

        May be overridden by subclasses.

        The default submission uses the Executor environment. This ensures that
        all necessary packages (e.g. Pydantic for validation) are available to
        the startup scripts. If a Task has a different environment it will be
        swapped prior to execution.

        Args:
            executable_path (str): Path to the LUTE subprocess script.

            params (str): String of formatted command-line arguments.

        Returns:
            cmd (str): Appropriately formatted command for this Executor.
        """
        cmd: str = ""
        if __debug__:
            cmd = f"{sys.executable} -B {executable_path} {params}"
        else:
            cmd = f"{sys.executable} -OB {executable_path} {params}"

        return cmd

    def execute_task(self) -> None:
        """Run the requested Task as a subprocess."""
        self._pre_task()
        lute_path: Optional[str] = os.getenv("LUTE_PATH")
        if lute_path is None:
            logger.debug("Absolute path to subprocess_task.py not found.")
            lute_path = os.path.abspath(f"{os.path.dirname(__file__)}/../..")
            self.update_environment({"LUTE_PATH": lute_path})
        executable_path: str = f"{lute_path}/subprocess_task.py"
        config_path: str = self._analysis_desc.task_env["LUTE_CONFIGPATH"]
        params: str = f"-c {config_path} -t {self._analysis_desc.task_result.task_name}"

        if self._shell_source_script is not None:
            self._shell_source()
        cmd: str = self._submit_cmd(executable_path, params)
        proc: subprocess.Popen = self._submit_task(cmd)

        while self._task_is_running(proc):
            self._task_loop(proc)
            time.sleep(self._analysis_desc.poll_interval)

        if proc.stdout is not None:
            os.set_blocking(proc.stdout.fileno(), True)
        if proc.stderr is not None:
            os.set_blocking(proc.stderr.fileno(), True)

        self._finalize_task(proc)
        if proc.stdout is not None:
            proc.stdout.close()
        if proc.stderr is not None:
            proc.stderr.close()
        proc.wait()
        if ret := proc.returncode:
            logger.warning(f"Task failed with return code: {ret}")
            self._analysis_desc.task_result.task_status = TaskStatus.FAILED
            self.Hooks.task_failed(self, msg=Message())
        elif self._analysis_desc.task_result.task_status == TaskStatus.RUNNING:
            # Ret code is 0, no exception was thrown, task forgot to set status
            self._analysis_desc.task_result.task_status = TaskStatus.COMPLETED
            logger.debug("Task did not change from RUNNING status. Assume COMPLETED.")
            self.Hooks.task_done(self, msg=Message())
        if self._tasklets["after"] is not None:
            # Tasklets before results processing since they may create result
            self._run_tasklets(when="after")

        try:
            self.process_results()
        except Exception as err:
            logger.critical(
                f"Unable to process results! Downstream Tasks may fail! {err}"
            )

        try:
            self._store_configuration()
        except Exception as err:
            logger.critical(
                f"Unable to store configuration! Downstream Tasks may fail! {err}"
            )

        for comm in self._communicators:
            comm.clear_communicator()

        if self._analysis_desc.task_result.task_status == TaskStatus.FAILED:
            logger.info("Exiting after Task failure. Result recorded.")
            sys.exit(-1)

    def _store_configuration(self) -> None:
        """Store configuration and results in the LUTE database."""
        record_analysis_db(copy.deepcopy(self._analysis_desc))

    def _task_is_running(self, proc: subprocess.Popen) -> bool:
        """Whether a subprocess is running.

        Args:
            proc (subprocess.Popen): The subprocess to determine the run status
                of.

        Returns:
            bool: Is the subprocess task running.
        """
        # Add additional conditions - don't want to exit main loop
        # if only stopped
        task_status: TaskStatus = self._analysis_desc.task_result.task_status
        is_running: bool = task_status != TaskStatus.COMPLETED
        is_running &= task_status != TaskStatus.CANCELLED
        is_running &= task_status != TaskStatus.TIMEDOUT
        is_running &= task_status != TaskStatus.FAILED
        return proc.poll() is None and is_running

    def _stop(self, proc: subprocess.Popen) -> None:
        """Stop the Task subprocess."""
        os.kill(proc.pid, signal.SIGTSTP)
        self._analysis_desc.task_result.task_status = TaskStatus.STOPPED

    def _continue(self, proc: subprocess.Popen) -> None:
        """Resume a stopped Task subprocess."""
        os.kill(proc.pid, signal.SIGCONT)
        # status: str = psutil.Process(proc.pid).status()
        # max_tries: int = 10
        # while status != "running":
        #    max_tries -= 1
        #    os.kill(proc.pid, signal.SIGCONT)
        #    status = psutil.Process(proc.pid).status()
        #    if max_tries == 0:
        #        logger.error(
        #            "Cannot resume process from stopped/sleeping state! Exiting!"
        #        )
        #        os.kill(proc.pid, signal.SIGKILL)
        #        self._analysis_desc.task_result.task_status = TaskStatus.FAILED
        #        return None
        self._analysis_desc.task_result.task_status = TaskStatus.RUNNING

    def _set_result_from_parameters(self) -> None:
        """Use TaskParameters object to set TaskResult fields.

        A result may be defined in terms of specific parameters. This is most
        useful for ThirdPartyTasks which would not otherwise have an easy way of
        reporting what the TaskResult is. There are two options for specifying
        results from parameters:
            1. A single parameter (Field) of the model has an attribute
               `is_result`. This is a bool indicating that this parameter points
               to a result. E.g. a parameter `output` may set `is_result=True`.
            2. The `TaskParameters.Config` has a `result_from_params` attribute.
               This is an appropriate option if the result is determinable for
               the Task, but it is not easily defined by a single parameter. The
               TaskParameters.Config.result_from_param can be set by a custom
               validator, e.g. to combine the values of multiple parameters into
               a single result. E.g. an `out_dir` and `out_file` parameter used
               together specify the result. Currently only string specifiers are
               supported.

        A TaskParameters object specifies that it contains information about the
        result by setting a single config option:
                        TaskParameters.Config.set_result=True
        In general, this method should only be called when the above condition is
        met, however, there are minimal checks in it as well.
        """
        # This method shouldn't be called unless appropriate
        # But we will add extra guards here
        if self._analysis_desc.task_parameters is None:
            logger.debug(
                "Cannot set result from TaskParameters. TaskParameters is None!"
            )
            return
        if (
            not hasattr(self._analysis_desc.task_parameters.Config, "set_result")
            or not self._analysis_desc.task_parameters.Config.set_result
        ):
            logger.debug(
                "Cannot set result from TaskParameters. `set_result` not specified!"
            )
            return

        # First try to set from result_from_params (faster)
        if hasattr(self._analysis_desc.task_parameters, "_result_from_params"):
            result_from_params: str = (
                self._analysis_desc.task_parameters._result_from_params
            )
            logger.info(f"TaskResult specified as {result_from_params}.")
            self._analysis_desc.task_result.payload = result_from_params
            del self._analysis_desc.task_parameters._result_from_params
        else:
            # Iterate parameters to find the one that is the result
            schema: Dict[str, Any] = self._analysis_desc.task_parameters.schema()
            for param, value in self._analysis_desc.task_parameters.dict().items():
                if param == "_result_from_params":
                    continue
                param_attrs: Dict[str, Any]
                if isinstance(value, TemplateParameters):
                    # Extract TemplateParameters if needed
                    value = value.params
                    extra_models: List[str] = schema["definitions"].keys()
                    for model in extra_models:
                        if model in ("AnalysisHeader", "TemplateConfig"):
                            continue
                        if param in schema["definitions"][model]["properties"]:
                            param_attrs = schema["definitions"][model]["properties"][
                                param
                            ]
                            break
                    else:
                        if isinstance(
                            self._analysis_desc.task_parameters, ThirdPartyParameters
                        ):
                            param_attrs = self._analysis_desc.task_parameters._unknown_template_params[
                                param
                            ]
                        else:
                            raise ValueError(
                                f"No parameter schema for {param}. Check model!"
                            )
                else:
                    param_attrs = schema["properties"][param]
                if "is_result" in param_attrs:
                    is_result: bool = param_attrs["is_result"]
                    if isinstance(is_result, bool) and is_result:
                        logger.info(f"TaskResult specified as {value}.")
                        self._analysis_desc.task_result.payload = value
                    else:
                        logger.debug(
                            (
                                f"{param} specified as result! But specifier is of "
                                f"wrong type: {type(is_result)}!"
                            )
                        )
                    break  # We should only have 1 result-like parameter!

        # If we get this far and haven't changed the payload we should complain
        task_name: str
        if self._analysis_desc.task_result.payload == "":
            task_name = self._analysis_desc.task_result.task_name
            logger.debug(
                (
                    f"{task_name} specified result be set from {task_name}Parameters,"
                    " but no result provided! Check model definition!"
                )
            )
        # Now check for impl_schemas and pass to result.impl_schemas
        # Currently unused
        impl_schemas: Optional[str] = (
            self._analysis_desc.task_parameters.Config.impl_schemas
        )
        self._analysis_desc.task_result.impl_schemas = impl_schemas
        # If we set_result but didn't get schema information we should complain
        if self._analysis_desc.task_result.impl_schemas is None:
            task_name = self._analysis_desc.task_result.task_name
            logger.debug(
                (
                    f"{task_name} specified result be set from {task_name}Parameters,"
                    " but no schema provided! Check model definition!"
                )
            )

    def process_results(self) -> None:
        """Perform any necessary steps to process TaskResults object.

        Processing will depend on subclass. Examples of steps include, moving
        files, converting file formats, compiling plots/figures into an HTML
        file, etc.
        """
        self._process_results()

    @abstractmethod
    def _process_results(self) -> None: ...

__init__(task_name, communicators, poll_interval=0.05)

The Executor will manage the subprocess in which task_name is run.

Parameters:

Name Type Description Default
task_name str

The name of the Task to be submitted. Must match the Task's class name exactly. The parameter specification must also be in a properly named model to be identified.

required
communicators List[Communicator]

A list of one or more communicators which manage information flow to/from the Task. Subclasses may have different defaults, and new functionality can be introduced by composing Executors with communicators.

required
poll_interval float

Time to wait between reading/writing to the managed subprocess. In seconds.

0.05
Source code in lute/execution/executor.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def __init__(
    self,
    task_name: str,
    communicators: List[Communicator],
    poll_interval: float = 0.05,
) -> None:
    """The Executor will manage the subprocess in which `task_name` is run.

    Args:
        task_name (str): The name of the Task to be submitted. Must match
            the Task's class name exactly. The parameter specification must
            also be in a properly named model to be identified.

        communicators (List[Communicator]): A list of one or more
            communicators which manage information flow to/from the Task.
            Subclasses may have different defaults, and new functionality
            can be introduced by composing Executors with communicators.

        poll_interval (float): Time to wait between reading/writing to the
            managed subprocess. In seconds.
    """
    result: TaskResult = TaskResult(
        task_name=task_name, task_status=TaskStatus.PENDING, summary="", payload=""
    )
    task_parameters: Optional[TaskParameters] = None
    task_env: Dict[str, str] = os.environ.copy()
    self._communicators: List[Communicator] = communicators
    communicator_desc: List[str] = []
    for comm in self._communicators:
        comm.stage_communicator()
        communicator_desc.append(str(comm))

    self._analysis_desc: DescribedAnalysis = DescribedAnalysis(
        task_result=result,
        task_parameters=task_parameters,
        task_env=task_env,
        poll_interval=poll_interval,
        communicator_desc=communicator_desc,
    )
    self._tasklets: TaskletDict = {"before": None, "after": None}
    self._shell_source_script: Optional[str] = None

_continue(proc)

Resume a stopped Task subprocess.

Source code in lute/execution/executor.py
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
def _continue(self, proc: subprocess.Popen) -> None:
    """Resume a stopped Task subprocess."""
    os.kill(proc.pid, signal.SIGCONT)
    # status: str = psutil.Process(proc.pid).status()
    # max_tries: int = 10
    # while status != "running":
    #    max_tries -= 1
    #    os.kill(proc.pid, signal.SIGCONT)
    #    status = psutil.Process(proc.pid).status()
    #    if max_tries == 0:
    #        logger.error(
    #            "Cannot resume process from stopped/sleeping state! Exiting!"
    #        )
    #        os.kill(proc.pid, signal.SIGKILL)
    #        self._analysis_desc.task_result.task_status = TaskStatus.FAILED
    #        return None
    self._analysis_desc.task_result.task_status = TaskStatus.RUNNING

_finalize_task(proc) abstractmethod

Any actions to be performed after the Task has ended.

Examples include a final clearing of the pipes, retrieving results, reporting to third party services, etc.

Source code in lute/execution/executor.py
489
490
491
492
493
494
495
496
@abstractmethod
def _finalize_task(self, proc: subprocess.Popen) -> None:
    """Any actions to be performed after the Task has ended.

    Examples include a final clearing of the pipes, retrieving results,
    reporting to third party services, etc.
    """
    ...

_pre_task()

Any actions to be performed before task submission.

This method may or may not be used by subclasses. It may be useful for logging etc.

Source code in lute/execution/executor.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def _pre_task(self) -> None:
    """Any actions to be performed before task submission.

    This method may or may not be used by subclasses. It may be useful
    for logging etc.
    """
    # This prevents the Executors in managed_tasks.py from all acquiring
    # resources like sockets.
    for communicator in self._communicators:
        communicator.delayed_setup()
        # Not great, but experience shows we need a bit of time to setup
        # network.
        time.sleep(0.1)
    # Propagate any env vars setup by Communicators - only update LUTE_ vars
    tmp: Dict[str, str] = {
        key: os.environ[key] for key in os.environ if "LUTE_" in key
    }
    self._analysis_desc.task_env.update(tmp)

_run_tasklets(*, when)

Run all tasklets of the specified kind.

Source code in lute/execution/executor.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def _run_tasklets(
    self, *, when: Union[Literal["before"], Literal["after"]]
) -> None:
    """Run all tasklets of the specified kind."""
    if when not in self._tasklets.keys():
        logger.error(f"Ignore request to run tasklets of unknown kind: {when}")
        return
    if self._tasklets[when] is None:
        logger.debug(f"No tasklets to run {when}.")
        return
    for tasklet_spec in cast(list, self._tasklets[when]):
        tasklet: Callable[[Any], Any]
        args: List[Any]
        set_result: bool
        set_summary: bool
        tasklet, args, set_result, set_summary = tasklet_spec
        args = self._sub_tasklet_parameters(args)
        logger.debug(f"Running {tasklet} with {args}")
        output: Any
        try:
            output = tasklet(*args)  # Many don't return anything
        except Exception as err:
            logger.error(f"Tasklet failed! Error: {err}")
            output = None
        # We set result payloads or summaries now, but the processing is done
        # by process_results method called sometime after the last tasklet
        tmp: Any
        if set_result and output is not None:
            if isinstance(self._analysis_desc.task_result.payload, list):
                # We have multiple payloads to process, append to list
                self._analysis_desc.task_result.payload.append(output)
            elif self._analysis_desc.task_result.payload != "":
                # We have one payload already, convert to list and append
                tmp = self._analysis_desc.task_result.payload
                self._analysis_desc.task_result.payload = []
                self._analysis_desc.task_result.payload.append(tmp)
                self._analysis_desc.task_result.payload.append(output)
            else:
                # Payload == "" - i.e. hasn't been set
                self._analysis_desc.task_result.payload = output
        if set_summary and output is not None:
            if isinstance(self._analysis_desc.task_result.summary, list):
                # We have multiple summary objects to process, append to list
                self._analysis_desc.task_result.summary.append(output)
            elif self._analysis_desc.task_result.summary != "":
                # We have one summary already, convert to list and append
                tmp = self._analysis_desc.task_result.summary
                self._analysis_desc.task_result.summary = []
                self._analysis_desc.task_result.summary.append(tmp)
                self._analysis_desc.task_result.summary.append(output)
            else:
                # Summary == "" - i.e. hasn't been set
                self._analysis_desc.task_result.summary = output

_set_result_from_parameters()

Use TaskParameters object to set TaskResult fields.

A result may be defined in terms of specific parameters. This is most useful for ThirdPartyTasks which would not otherwise have an easy way of reporting what the TaskResult is. There are two options for specifying results from parameters: 1. A single parameter (Field) of the model has an attribute is_result. This is a bool indicating that this parameter points to a result. E.g. a parameter output may set is_result=True. 2. The TaskParameters.Config has a result_from_params attribute. This is an appropriate option if the result is determinable for the Task, but it is not easily defined by a single parameter. The TaskParameters.Config.result_from_param can be set by a custom validator, e.g. to combine the values of multiple parameters into a single result. E.g. an out_dir and out_file parameter used together specify the result. Currently only string specifiers are supported.

A TaskParameters object specifies that it contains information about the result by setting a single config option: TaskParameters.Config.set_result=True In general, this method should only be called when the above condition is met, however, there are minimal checks in it as well.

Source code in lute/execution/executor.py
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
def _set_result_from_parameters(self) -> None:
    """Use TaskParameters object to set TaskResult fields.

    A result may be defined in terms of specific parameters. This is most
    useful for ThirdPartyTasks which would not otherwise have an easy way of
    reporting what the TaskResult is. There are two options for specifying
    results from parameters:
        1. A single parameter (Field) of the model has an attribute
           `is_result`. This is a bool indicating that this parameter points
           to a result. E.g. a parameter `output` may set `is_result=True`.
        2. The `TaskParameters.Config` has a `result_from_params` attribute.
           This is an appropriate option if the result is determinable for
           the Task, but it is not easily defined by a single parameter. The
           TaskParameters.Config.result_from_param can be set by a custom
           validator, e.g. to combine the values of multiple parameters into
           a single result. E.g. an `out_dir` and `out_file` parameter used
           together specify the result. Currently only string specifiers are
           supported.

    A TaskParameters object specifies that it contains information about the
    result by setting a single config option:
                    TaskParameters.Config.set_result=True
    In general, this method should only be called when the above condition is
    met, however, there are minimal checks in it as well.
    """
    # This method shouldn't be called unless appropriate
    # But we will add extra guards here
    if self._analysis_desc.task_parameters is None:
        logger.debug(
            "Cannot set result from TaskParameters. TaskParameters is None!"
        )
        return
    if (
        not hasattr(self._analysis_desc.task_parameters.Config, "set_result")
        or not self._analysis_desc.task_parameters.Config.set_result
    ):
        logger.debug(
            "Cannot set result from TaskParameters. `set_result` not specified!"
        )
        return

    # First try to set from result_from_params (faster)
    if hasattr(self._analysis_desc.task_parameters, "_result_from_params"):
        result_from_params: str = (
            self._analysis_desc.task_parameters._result_from_params
        )
        logger.info(f"TaskResult specified as {result_from_params}.")
        self._analysis_desc.task_result.payload = result_from_params
        del self._analysis_desc.task_parameters._result_from_params
    else:
        # Iterate parameters to find the one that is the result
        schema: Dict[str, Any] = self._analysis_desc.task_parameters.schema()
        for param, value in self._analysis_desc.task_parameters.dict().items():
            if param == "_result_from_params":
                continue
            param_attrs: Dict[str, Any]
            if isinstance(value, TemplateParameters):
                # Extract TemplateParameters if needed
                value = value.params
                extra_models: List[str] = schema["definitions"].keys()
                for model in extra_models:
                    if model in ("AnalysisHeader", "TemplateConfig"):
                        continue
                    if param in schema["definitions"][model]["properties"]:
                        param_attrs = schema["definitions"][model]["properties"][
                            param
                        ]
                        break
                else:
                    if isinstance(
                        self._analysis_desc.task_parameters, ThirdPartyParameters
                    ):
                        param_attrs = self._analysis_desc.task_parameters._unknown_template_params[
                            param
                        ]
                    else:
                        raise ValueError(
                            f"No parameter schema for {param}. Check model!"
                        )
            else:
                param_attrs = schema["properties"][param]
            if "is_result" in param_attrs:
                is_result: bool = param_attrs["is_result"]
                if isinstance(is_result, bool) and is_result:
                    logger.info(f"TaskResult specified as {value}.")
                    self._analysis_desc.task_result.payload = value
                else:
                    logger.debug(
                        (
                            f"{param} specified as result! But specifier is of "
                            f"wrong type: {type(is_result)}!"
                        )
                    )
                break  # We should only have 1 result-like parameter!

    # If we get this far and haven't changed the payload we should complain
    task_name: str
    if self._analysis_desc.task_result.payload == "":
        task_name = self._analysis_desc.task_result.task_name
        logger.debug(
            (
                f"{task_name} specified result be set from {task_name}Parameters,"
                " but no result provided! Check model definition!"
            )
        )
    # Now check for impl_schemas and pass to result.impl_schemas
    # Currently unused
    impl_schemas: Optional[str] = (
        self._analysis_desc.task_parameters.Config.impl_schemas
    )
    self._analysis_desc.task_result.impl_schemas = impl_schemas
    # If we set_result but didn't get schema information we should complain
    if self._analysis_desc.task_result.impl_schemas is None:
        task_name = self._analysis_desc.task_result.task_name
        logger.debug(
            (
                f"{task_name} specified result be set from {task_name}Parameters,"
                " but no schema provided! Check model definition!"
            )
        )

_shell_source()

Actually shell source step.

This is run prior to Task execution.

Source code in lute/execution/executor.py
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def _shell_source(self) -> None:
    """Actually shell source step.

    This is run prior to Task execution.
    """
    if self._shell_source_script is None:
        logger.error("Called _shell_source without defining source script!")
        return
    if not os.path.exists(self._shell_source_script):
        logger.error(f"Cannot source environment from {self._shell_source_script}!")
        return

    script: str = (
        f"set -a\n"
        f'source "{self._shell_source_script}" >/dev/null\n'
        f'{sys.executable} -c "import os; print(dict(os.environ))"\n'
    )
    logger.info(f"Sourcing file {self._shell_source_script}")
    o, e = subprocess.Popen(
        ["bash", "-c", script], stdout=subprocess.PIPE
    ).communicate()
    tmp_environment: Dict[str, str] = eval(o)
    new_environment: Dict[str, str] = {}
    for key, value in tmp_environment.items():
        # Make sure LUTE vars are available
        if "LUTE_" in key or key in ("RUN", "EXPERIMENT"):
            new_environment[key] = value
        else:
            new_environment[f"LUTE_TENV_{key}"] = value
    self._analysis_desc.task_env = new_environment

_stop(proc)

Stop the Task subprocess.

Source code in lute/execution/executor.py
613
614
615
616
def _stop(self, proc: subprocess.Popen) -> None:
    """Stop the Task subprocess."""
    os.kill(proc.pid, signal.SIGTSTP)
    self._analysis_desc.task_result.task_status = TaskStatus.STOPPED

_store_configuration()

Store configuration and results in the LUTE database.

Source code in lute/execution/executor.py
590
591
592
def _store_configuration(self) -> None:
    """Store configuration and results in the LUTE database."""
    record_analysis_db(copy.deepcopy(self._analysis_desc))

_sub_tasklet_parameters(args)

Substitute tasklet arguments using TaskParameters members.

Source code in lute/execution/executor.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def _sub_tasklet_parameters(self, args: List[Any]) -> List[Any]:
    """Substitute tasklet arguments using TaskParameters members."""
    sub_pattern = r"\{\{[^}{]*\}\}"
    new_args: List[Any] = []
    for arg in args:
        new_arg: Any = arg
        matches: List[str] = re.findall(sub_pattern, arg)
        for m in matches:
            param_name: str = m[2:-2].strip()  # Remove {{}}
            params: Any = self._analysis_desc.task_parameters
            if "." in param_name:
                # Iterate so we can substitute e.g. {{ lute_config.run }}
                hier_param_list: List[str] = param_name.split(".")
                for idx, param in enumerate(hier_param_list):
                    if hasattr(params, param):
                        if idx != len(hier_param_list) - 1:
                            params = getattr(params, param)
                        param_name = param
                    else:
                        break
            if hasattr(params, param_name):
                pattern: str = m.replace("{{", r"\{\{").replace("}}", r"\}\}")
                sub: Any = getattr(params, param_name)
                new_arg = re.sub(pattern, str(sub), new_arg)
            if new_arg.isnumeric():
                new_arg = int(new_arg)
            else:
                try:
                    new_arg = float(new_arg)
                except ValueError:
                    pass
        new_args.append(new_arg)
    return new_args

_submit_cmd(executable_path, params)

Return a formatted command for launching Task subprocess.

May be overridden by subclasses.

The default submission uses the Executor environment. This ensures that all necessary packages (e.g. Pydantic for validation) are available to the startup scripts. If a Task has a different environment it will be swapped prior to execution.

Parameters:

Name Type Description Default
executable_path str

Path to the LUTE subprocess script.

required
params str

String of formatted command-line arguments.

required

Returns:

Name Type Description
cmd str

Appropriately formatted command for this Executor.

Source code in lute/execution/executor.py
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
def _submit_cmd(self, executable_path: str, params: str) -> str:
    """Return a formatted command for launching Task subprocess.

    May be overridden by subclasses.

    The default submission uses the Executor environment. This ensures that
    all necessary packages (e.g. Pydantic for validation) are available to
    the startup scripts. If a Task has a different environment it will be
    swapped prior to execution.

    Args:
        executable_path (str): Path to the LUTE subprocess script.

        params (str): String of formatted command-line arguments.

    Returns:
        cmd (str): Appropriately formatted command for this Executor.
    """
    cmd: str = ""
    if __debug__:
        cmd = f"{sys.executable} -B {executable_path} {params}"
    else:
        cmd = f"{sys.executable} -OB {executable_path} {params}"

    return cmd

_task_is_running(proc)

Whether a subprocess is running.

Parameters:

Name Type Description Default
proc Popen

The subprocess to determine the run status of.

required

Returns:

Name Type Description
bool bool

Is the subprocess task running.

Source code in lute/execution/executor.py
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
def _task_is_running(self, proc: subprocess.Popen) -> bool:
    """Whether a subprocess is running.

    Args:
        proc (subprocess.Popen): The subprocess to determine the run status
            of.

    Returns:
        bool: Is the subprocess task running.
    """
    # Add additional conditions - don't want to exit main loop
    # if only stopped
    task_status: TaskStatus = self._analysis_desc.task_result.task_status
    is_running: bool = task_status != TaskStatus.COMPLETED
    is_running &= task_status != TaskStatus.CANCELLED
    is_running &= task_status != TaskStatus.TIMEDOUT
    is_running &= task_status != TaskStatus.FAILED
    return proc.poll() is None and is_running

_task_loop(proc) abstractmethod

Actions to perform while the Task is running.

This function is run in the body of a loop until the Task signals that its finished.

Source code in lute/execution/executor.py
480
481
482
483
484
485
486
487
@abstractmethod
def _task_loop(self, proc: subprocess.Popen) -> None:
    """Actions to perform while the Task is running.

    This function is run in the body of a loop until the Task signals
    that its finished.
    """
    ...

add_default_hooks() abstractmethod

Populate the set of default event hooks.

Source code in lute/execution/executor.py
351
352
353
354
355
@abstractmethod
def add_default_hooks(self) -> None:
    """Populate the set of default event hooks."""

    ...

add_hook(event, hook)

Add a new hook.

Each hook is a function called any time the Executor receives a signal for a particular event, e.g. Task starts, Task ends, etc. Calling this method will remove any hook that currently exists for the event. I.e. only one hook can be called per event at a time. Creating hooks for events which do not exist is not allowed.

Parameters:

Name Type Description Default
event str

The event for which the hook will be called.

required
Source code in lute/execution/executor.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def add_hook(
    self,
    event: str,
    # hook: Callable[[Self, Message, Optional[subprocess.Popen]], Optional[bool]],
    hook: Hook,
) -> None:
    """Add a new hook.

    Each hook is a function called any time the Executor receives a signal
    for a particular event, e.g. Task starts, Task ends, etc. Calling this
    method will remove any hook that currently exists for the event. I.e.
    only one hook can be called per event at a time. Creating hooks for
    events which do not exist is not allowed.

    Args:
        event (str): The event for which the hook will be called.

        hook (Callable[[None], None]) The function to be called during each
            occurrence of the event.
    """
    if event.upper() in LUTE_SIGNALS:
        setattr(self.Hooks, event.lower(), hook)

add_tasklet(tasklet, args, when='after', set_result=False, set_summary=False)

Add/register a tasklet to be run by the Executor.

Adds a tasklet to be run by the Executor in addition to the main Task. The tasklet can be run before or after the main Task has been run.

Parameters:

Name Type Description Default
tasklet Callable[[Any], Any]

The tasklet (function) to run.

required
args List[Any]

A list of all the arguments to be passed to the tasklet. Arguments can include substitutions for parameters to be extracted from the TaskParameters object. The same jinja-like syntax used in configuration file substiutions is used to specify a parameter substitution in an argument. E.g. if a Task to be run has a parameter input_file, the parameter can be substituted in the tasklet argument using: "{{ input_file }}". Note that substitutions must be passed as strings. Conversions will be done during substitution if required.

required
when str

When to run the tasklet. Either before or after the main Task. Default is after.

'after'
set_result bool

Whether to use the output from the tasklet as the result of the main Task. Default is False.

False
set_summary bool

Whether to use the output from the tasklet as the summary of the main Task. Default is False.

False
Source code in lute/execution/executor.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def add_tasklet(
    self,
    tasklet: Callable,
    args: List[Any],
    when: Union[Literal["before"], Literal["after"]] = "after",
    set_result: bool = False,
    set_summary: bool = False,
) -> None:
    """Add/register a tasklet to be run by the Executor.

    Adds a tasklet to be run by the Executor in addition to the main Task.
    The tasklet can be run before or after the main Task has been run.

    Args:
        tasklet (Callable[[Any], Any]): The tasklet (function) to run.

        args (List[Any]): A list of all the arguments to be passed to the
            tasklet. Arguments can include substitutions for parameters to
            be extracted from the TaskParameters object. The same jinja-like
            syntax used in configuration file substiutions is used to specify
            a parameter substitution in an argument. E.g. if a Task to be
            run has a parameter `input_file`, the parameter can be substituted
            in the tasklet argument using: `"{{ input_file  }}"`. Note that
            substitutions must be passed as strings. Conversions will be done
            during substitution if required.

        when (str): When to run the tasklet. Either `before` or `after` the
            main Task. Default is after.

        set_result (bool): Whether to use the output from the tasklet as the
            result of the main Task. Default is False.

        set_summary (bool): Whether to use the output from the tasklet as the
            summary of the main Task. Default is False.
    """
    if when not in ("before", "after"):
        logger.error("Can only run tasklet `before` or `after` Task! Ignoring...")
        return
    tasklet_tuple: Tuple[Callable[[Any], Any], List[Any], bool, bool]
    tasklet_tuple = (tasklet, args, set_result, set_summary)
    if self._tasklets[when] is None:
        self._tasklets[when] = [tasklet_tuple]
    else:
        assert isinstance(self._tasklets[when], list)
        cast(list, self._tasklets[when]).append(tasklet_tuple)

execute_task()

Run the requested Task as a subprocess.

Source code in lute/execution/executor.py
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
def execute_task(self) -> None:
    """Run the requested Task as a subprocess."""
    self._pre_task()
    lute_path: Optional[str] = os.getenv("LUTE_PATH")
    if lute_path is None:
        logger.debug("Absolute path to subprocess_task.py not found.")
        lute_path = os.path.abspath(f"{os.path.dirname(__file__)}/../..")
        self.update_environment({"LUTE_PATH": lute_path})
    executable_path: str = f"{lute_path}/subprocess_task.py"
    config_path: str = self._analysis_desc.task_env["LUTE_CONFIGPATH"]
    params: str = f"-c {config_path} -t {self._analysis_desc.task_result.task_name}"

    if self._shell_source_script is not None:
        self._shell_source()
    cmd: str = self._submit_cmd(executable_path, params)
    proc: subprocess.Popen = self._submit_task(cmd)

    while self._task_is_running(proc):
        self._task_loop(proc)
        time.sleep(self._analysis_desc.poll_interval)

    if proc.stdout is not None:
        os.set_blocking(proc.stdout.fileno(), True)
    if proc.stderr is not None:
        os.set_blocking(proc.stderr.fileno(), True)

    self._finalize_task(proc)
    if proc.stdout is not None:
        proc.stdout.close()
    if proc.stderr is not None:
        proc.stderr.close()
    proc.wait()
    if ret := proc.returncode:
        logger.warning(f"Task failed with return code: {ret}")
        self._analysis_desc.task_result.task_status = TaskStatus.FAILED
        self.Hooks.task_failed(self, msg=Message())
    elif self._analysis_desc.task_result.task_status == TaskStatus.RUNNING:
        # Ret code is 0, no exception was thrown, task forgot to set status
        self._analysis_desc.task_result.task_status = TaskStatus.COMPLETED
        logger.debug("Task did not change from RUNNING status. Assume COMPLETED.")
        self.Hooks.task_done(self, msg=Message())
    if self._tasklets["after"] is not None:
        # Tasklets before results processing since they may create result
        self._run_tasklets(when="after")

    try:
        self.process_results()
    except Exception as err:
        logger.critical(
            f"Unable to process results! Downstream Tasks may fail! {err}"
        )

    try:
        self._store_configuration()
    except Exception as err:
        logger.critical(
            f"Unable to store configuration! Downstream Tasks may fail! {err}"
        )

    for comm in self._communicators:
        comm.clear_communicator()

    if self._analysis_desc.task_result.task_status == TaskStatus.FAILED:
        logger.info("Exiting after Task failure. Result recorded.")
        sys.exit(-1)

process_results()

Perform any necessary steps to process TaskResults object.

Processing will depend on subclass. Examples of steps include, moving files, converting file formats, compiling plots/figures into an HTML file, etc.

Source code in lute/execution/executor.py
757
758
759
760
761
762
763
764
def process_results(self) -> None:
    """Perform any necessary steps to process TaskResults object.

    Processing will depend on subclass. Examples of steps include, moving
    files, converting file formats, compiling plots/figures into an HTML
    file, etc.
    """
    self._process_results()

shell_source(env)

Source a script.

Unlike update_environment this method sources a new file.

We prepend a token to each environment variable. This allows the initial part of the Task to be run using the appropriate environment.

The environment variables containing the token will be swapped in using their appropriate form prior to the actual execution of Task code.

Parameters:

Name Type Description Default
env str

Path to the script to source.

required
Source code in lute/execution/executor.py
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
def shell_source(self, env: str) -> None:
    """Source a script.

    Unlike `update_environment` this method sources a new file.

    We prepend a token to each environment variable. This allows the initial
    part of the Task to be run using the appropriate environment.

    The environment variables containing the token will be swapped in using
    their appropriate form prior to the actual execution of Task code.

    Args:
        env (str): Path to the script to source.
    """
    self._shell_source_script = env

update_environment(env, update_path='prepend')

Update the stored set of environment variables.

These are passed to the subprocess to setup its environment.

Parameters:

Name Type Description Default
env Dict[str, str]

A dictionary of "VAR":"VALUE" pairs of environment variables to be added to the subprocess environment. If any variables already exist, the new variables will overwrite them (except PATH, see below).

required
update_path str

If PATH is present in the new set of variables, this argument determines how the old PATH is dealt with. There are three options: * "prepend" : The new PATH values are prepended to the old ones. * "append" : The new PATH values are appended to the old ones. * "overwrite" : The old PATH is overwritten by the new one. "prepend" is the default option. If PATH is not present in the current environment, the new PATH is used without modification.

'prepend'
Source code in lute/execution/executor.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def update_environment(
    self, env: Dict[str, str], update_path: str = "prepend"
) -> None:
    """Update the stored set of environment variables.

    These are passed to the subprocess to setup its environment.

    Args:
        env (Dict[str, str]): A dictionary of "VAR":"VALUE" pairs of
            environment variables to be added to the subprocess environment.
            If any variables already exist, the new variables will
            overwrite them (except PATH, see below).

        update_path (str): If PATH is present in the new set of variables,
            this argument determines how the old PATH is dealt with. There
            are three options:
            * "prepend" : The new PATH values are prepended to the old ones.
            * "append" : The new PATH values are appended to the old ones.
            * "overwrite" : The old PATH is overwritten by the new one.
            "prepend" is the default option. If PATH is not present in the
            current environment, the new PATH is used without modification.
    """
    if "PATH" in env:
        sep: str = os.pathsep
        if update_path == "prepend":
            env["PATH"] = (
                f"{env['PATH']}{sep}{self._analysis_desc.task_env['PATH']}"
            )
        elif update_path == "append":
            env["PATH"] = (
                f"{self._analysis_desc.task_env['PATH']}{sep}{env['PATH']}"
            )
        elif update_path == "overwrite":
            pass
        else:
            raise ValueError(
                (
                    f"{update_path} is not a valid option for `update_path`!"
                    " Options are: prepend, append, overwrite."
                )
            )
    os.environ.update(env)
    self._analysis_desc.task_env.update(env)

Executor

Bases: BaseExecutor

Basic implementation of an Executor which manages simple IPC with Task.

Attributes:

Methods:

Name Description
add_hook

str, hook: Callable[[None], None]) -> None: Create a new hook to be called each time a specific event occurs.

add_default_hooks

Populate the event hooks with the default functions.

update_environment

Dict[str, str], update_path: str): Update the environment that is passed to the Task subprocess.

execute_task

Run the task as a subprocess.

Source code in lute/execution/executor.py
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
class Executor(BaseExecutor):
    """Basic implementation of an Executor which manages simple IPC with Task.

    Attributes:

    Methods:
        add_hook(event: str, hook: Callable[[None], None]) -> None: Create a
            new hook to be called each time a specific event occurs.

        add_default_hooks() -> None: Populate the event hooks with the default
            functions.

        update_environment(env: Dict[str, str], update_path: str): Update the
            environment that is passed to the Task subprocess.

        execute_task(): Run the task as a subprocess.
    """

    def __init__(
        self,
        task_name: str,
        communicators: List[Communicator] = [
            PipeCommunicator(Party.EXECUTOR),
            SocketCommunicator(Party.EXECUTOR),
        ],
        poll_interval: float = 0.05,
    ) -> None:
        super().__init__(
            task_name=task_name,
            communicators=communicators,
            poll_interval=poll_interval,
        )
        self.add_default_hooks()

    def add_default_hooks(self) -> None:
        """Populate the set of default event hooks."""

        def no_pickle_mode(
            executor: Executor_T,
            msg: Message,
            proc: Optional[subprocess.Popen] = None,
        ) -> Optional[bool]:
            for idx, communicator in enumerate(executor._communicators):
                if isinstance(communicator, PipeCommunicator):
                    executor._communicators[idx] = PipeCommunicator(
                        Party.EXECUTOR, use_pickle=False
                    )
            return None

        self.add_hook("no_pickle_mode", no_pickle_mode)

        def task_started(
            executor: Executor_T,
            msg: Message,
            proc: Optional[subprocess.Popen] = None,
        ) -> Optional[bool]:
            assert proc is not None
            if isinstance(msg.contents, TaskParameters):
                executor._analysis_desc.task_parameters = msg.contents
                # Run "before" tasklets
                if executor._tasklets["before"] is not None:
                    executor._run_tasklets(when="before")
                # Need to continue since Task._signal_start raises SIGSTOP
                executor._continue(proc)
                if hasattr(
                    executor._analysis_desc.task_parameters.Config, "set_result"
                ):
                    # Tasks may mark a parameter as the result
                    # If so, setup the result now.
                    executor._set_result_from_parameters()
            logger.info(
                f"Executor: {executor._analysis_desc.task_result.task_name} started"
            )
            executor._analysis_desc.task_result.task_status = TaskStatus.RUNNING
            elog_data: Dict[str, str] = {
                f"{executor._analysis_desc.task_result.task_name} status": "RUNNING",
            }
            post_elog_run_status(elog_data)
            return None

        self.add_hook("task_started", task_started)

        def task_failed(
            executor: Executor_T,
            msg: Message,
            proc: Optional[subprocess.Popen] = None,
        ) -> Optional[bool]:
            elog_data: Dict[str, str] = {
                f"{executor._analysis_desc.task_result.task_name} status": "FAILED",
            }
            post_elog_run_status(elog_data)
            return None

        self.add_hook("task_failed", task_failed)

        def task_stopped(
            executor: Executor_T,
            msg: Message,
            proc: Optional[subprocess.Popen] = None,
        ) -> Optional[bool]:
            elog_data: Dict[str, str] = {
                f"{executor._analysis_desc.task_result.task_name} status": "STOPPED",
            }
            post_elog_run_status(elog_data)
            return None

        self.add_hook("task_stopped", task_stopped)

        def task_done(
            executor: Executor_T,
            msg: Message,
            proc: Optional[subprocess.Popen] = None,
        ) -> Optional[bool]:
            elog_data: Dict[str, str] = {
                f"{executor._analysis_desc.task_result.task_name} status": "COMPLETED",
            }
            post_elog_run_status(elog_data)
            return None

        self.add_hook("task_done", task_done)

        def task_cancelled(
            executor: Executor_T,
            msg: Message,
            proc: Optional[subprocess.Popen] = None,
        ) -> Optional[bool]:
            elog_data: Dict[str, str] = {
                f"{executor._analysis_desc.task_result.task_name} status": "CANCELLED",
            }
            post_elog_run_status(elog_data)
            return None

        self.add_hook("task_cancelled", task_cancelled)

        def task_result(
            executor: Executor_T,
            msg: Message,
            proc: Optional[subprocess.Popen] = None,
        ) -> Optional[bool]:
            if isinstance(msg.contents, TaskResult):
                executor._analysis_desc.task_result = msg.contents
                # flake8: noqa: E731
                is_printable_type: Callable[[Any], bool] = lambda x: isinstance(
                    x, dict
                ) or isinstance(x, str)
                if is_printable_type(executor._analysis_desc.task_result.summary):
                    logger.info(executor._analysis_desc.task_result.summary)
                elif isinstance(executor._analysis_desc.task_result.summary, list):
                    for item in executor._analysis_desc.task_result.summary:
                        if is_printable_type(item):
                            logger.info(item)

                logger.info(executor._analysis_desc.task_result.task_status)
            elog_data: Dict[str, str] = {
                f"{executor._analysis_desc.task_result.task_name} status": "COMPLETED",
            }
            post_elog_run_status(elog_data)

            return True

        self.add_hook("task_result", task_result)

        def task_log(
            executor: Executor_T,
            msg: Message,
            proc: Optional[subprocess.Popen] = None,
        ) -> Optional[bool]:
            if isinstance(msg.contents, str):
                # This should be log formatted already
                print(msg.contents)
                return True
            return False

        self.add_hook("task_log", task_log)

    def _task_loop(self, proc: subprocess.Popen) -> None:
        """Actions to perform while the Task is running.

        This function is run in the body of a loop until the Task signals
        that its finished.
        """
        # Some hooks may ask that the rest of the task loop be skipped (continued)
        should_continue: Optional[bool]
        for communicator in self._communicators:
            while True:
                msg: Message = communicator.read(proc)
                if msg.signal is not None and msg.signal.upper() in LUTE_SIGNALS:
                    hook: Hook = getattr(self.Hooks, msg.signal.lower())
                    should_continue = hook(self, msg, proc)
                    if should_continue:
                        continue

                if msg.contents is not None:
                    if isinstance(msg.contents, str) and msg.contents != "":
                        logger.info(msg.contents)
                    elif not isinstance(msg.contents, str):
                        logger.info(msg.contents)
                if not communicator.has_messages:
                    break

    def _finalize_task(self, proc: subprocess.Popen) -> None:
        """Any actions to be performed after the Task has ended.

        Examples include a final clearing of the pipes, retrieving results,
        reporting to third party services, etc.
        """
        self._task_loop(proc)  # Perform a final read.

    def _process_results(self) -> None:
        """Performs result processing.

        Actions include:
        - For `ElogSummaryPlots`, will save the summary plot to the appropriate
            directory for display in the eLog.
        """
        task_result: TaskResult = self._analysis_desc.task_result
        self._process_result_payload(task_result.payload)
        self._process_result_summary(task_result.summary)

    def _process_result_payload(self, payload: Any) -> None:
        if self._analysis_desc.task_parameters is None:
            logger.error(
                "Please run Task before using this method! (_process_result_payload). "
                "If you did run a Task, it may have failed immediately!"
            )
            return
        new_payload: Optional[str]
        if isinstance(payload, ElogSummaryPlots):
            new_payload = self._process_elog_plot(payload)
            if new_payload is not None:
                self._analysis_desc.task_result.payload = new_payload
        elif isinstance(payload, list) or isinstance(payload, tuple):
            new_payload = ""
            for item in payload:
                if isinstance(item, ElogSummaryPlots):
                    ret: Optional[str] = self._process_elog_plot(item)
                    if ret is not None:
                        new_payload = ";".join(filter(None, (new_payload, ret)))
            if new_payload != "":
                self._analysis_desc.task_result.payload = new_payload
        elif isinstance(payload, str):
            ...
            # May be a path to a file...
            # schemas: Optional[str] = self._analysis_desc.task_result.impl_schemas
            # Should also check `impl_schemas` to determine what to do with path

    def _process_elog_plot(self, plots: ElogSummaryPlots) -> Optional[str]:
        """Process an ElogSummaryPlots

        Writes out the eLog summary plot for display and returns the path of
        where the plots were written out so they can be stored as the result
        payload.

        ElogSummaryPlots objects already convert the plots to a byte stream
        which can be directly written to an HTML file.

        Args:
            plots (ElogSummaryPlots): The plots dataclass.

        Returns:
            path (str): Path the plots were written out to.
        """
        if self._analysis_desc.task_parameters is None:
            logger.error(
                "Please run Task before using this method! (_process_elog_plot). "
                "If you did run a Task, it may have failed immediately!"
            )
            return None
        # ElogSummaryPlots has figures and a display name
        # display name also serves as a path.
        expmt: str = self._analysis_desc.task_parameters.lute_config.experiment
        base_path: str = f"/sdf/data/lcls/ds/{expmt[:3]}/{expmt}/stats/summary"
        full_path: str = f"{base_path}/{plots.display_name}"
        if not os.path.isdir(full_path):
            os.makedirs(full_path)

        path: str = f"{full_path}/report.html"
        with open(f"{full_path}/report.html", "wb") as f:
            f.write(plots.figures)

        return path

    def _process_result_summary(self, summary: Any) -> None:
        """Process an object destined for the results summary.

        Args:
            summary (Any): The object to be set as a summary. If a dictionary
                it is assumed to be a set of key/value pairs to be written out
                as run parameters in the eLog. If a list each item is processed
                individually.
        """
        if self._analysis_desc.task_parameters is None:
            logger.error(
                "Please run Task before using this method! (_process_result_summary). "
                "If you did run a Task, it may have failed immediately!"
            )
            return
        if isinstance(summary, dict):
            # Assume dict is key: value pairs of eLog run parameters to post
            self._analysis_desc.task_result.summary = self._process_summary_run_params(
                summary
            )
        elif isinstance(summary, list) or isinstance(summary, tuple):
            new_summary_str: str = ""
            for item in summary:
                if isinstance(item, dict):
                    ret: str = self._process_summary_run_params(item)
                    new_summary_str = ";".join(filter(None, (new_summary_str, ret)))
                elif isinstance(item, ElogSummaryPlots):
                    plot_path: Optional[str] = self._process_elog_plot(item)
                    new_summary_str = ";".join(
                        filter(None, (new_summary_str, plot_path))
                    )
            self._analysis_desc.task_result.summary = new_summary_str
        elif isinstance(summary, str):
            ...
        else:
            ...

    def _process_summary_run_params(self, params: Dict[str, str]) -> str:
        """Process a dictionary of run parameters to be posted to the eLog.

        Args:
            params (Dict[str, str]): Key/value pairs to be posted as run parameters.

        Returns:
            summary_str (str): New string of key/value pairs to be stored in
                summary field of the database.
        """
        if self._analysis_desc.task_parameters is None:
            logger.error(
                "Please run Task before using this method! (_process_summary_run_params). "
                "If you did run a Task, it may have failed immediately!"
            )
            return ""
        exp: str = self._analysis_desc.task_parameters.lute_config.experiment
        run: int = int(self._analysis_desc.task_parameters.lute_config.run)
        logger.debug("Posting eLog run parameters.")
        try:
            post_elog_run_table(exp, run, params)
        except Exception as err:
            logger.error(f"Unable to post run parameters! Error: {err}")
        post_elog_run_status(params)
        summary_str: str = ";".join(f"{key}: {value}" for key, value in params.items())
        return summary_str

_finalize_task(proc)

Any actions to be performed after the Task has ended.

Examples include a final clearing of the pipes, retrieving results, reporting to third party services, etc.

Source code in lute/execution/executor.py
970
971
972
973
974
975
976
def _finalize_task(self, proc: subprocess.Popen) -> None:
    """Any actions to be performed after the Task has ended.

    Examples include a final clearing of the pipes, retrieving results,
    reporting to third party services, etc.
    """
    self._task_loop(proc)  # Perform a final read.

_process_elog_plot(plots)

Process an ElogSummaryPlots

Writes out the eLog summary plot for display and returns the path of where the plots were written out so they can be stored as the result payload.

ElogSummaryPlots objects already convert the plots to a byte stream which can be directly written to an HTML file.

Parameters:

Name Type Description Default
plots ElogSummaryPlots

The plots dataclass.

required

Returns:

Name Type Description
path str

Path the plots were written out to.

Source code in lute/execution/executor.py
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
def _process_elog_plot(self, plots: ElogSummaryPlots) -> Optional[str]:
    """Process an ElogSummaryPlots

    Writes out the eLog summary plot for display and returns the path of
    where the plots were written out so they can be stored as the result
    payload.

    ElogSummaryPlots objects already convert the plots to a byte stream
    which can be directly written to an HTML file.

    Args:
        plots (ElogSummaryPlots): The plots dataclass.

    Returns:
        path (str): Path the plots were written out to.
    """
    if self._analysis_desc.task_parameters is None:
        logger.error(
            "Please run Task before using this method! (_process_elog_plot). "
            "If you did run a Task, it may have failed immediately!"
        )
        return None
    # ElogSummaryPlots has figures and a display name
    # display name also serves as a path.
    expmt: str = self._analysis_desc.task_parameters.lute_config.experiment
    base_path: str = f"/sdf/data/lcls/ds/{expmt[:3]}/{expmt}/stats/summary"
    full_path: str = f"{base_path}/{plots.display_name}"
    if not os.path.isdir(full_path):
        os.makedirs(full_path)

    path: str = f"{full_path}/report.html"
    with open(f"{full_path}/report.html", "wb") as f:
        f.write(plots.figures)

    return path

_process_result_summary(summary)

Process an object destined for the results summary.

Parameters:

Name Type Description Default
summary Any

The object to be set as a summary. If a dictionary it is assumed to be a set of key/value pairs to be written out as run parameters in the eLog. If a list each item is processed individually.

required
Source code in lute/execution/executor.py
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
def _process_result_summary(self, summary: Any) -> None:
    """Process an object destined for the results summary.

    Args:
        summary (Any): The object to be set as a summary. If a dictionary
            it is assumed to be a set of key/value pairs to be written out
            as run parameters in the eLog. If a list each item is processed
            individually.
    """
    if self._analysis_desc.task_parameters is None:
        logger.error(
            "Please run Task before using this method! (_process_result_summary). "
            "If you did run a Task, it may have failed immediately!"
        )
        return
    if isinstance(summary, dict):
        # Assume dict is key: value pairs of eLog run parameters to post
        self._analysis_desc.task_result.summary = self._process_summary_run_params(
            summary
        )
    elif isinstance(summary, list) or isinstance(summary, tuple):
        new_summary_str: str = ""
        for item in summary:
            if isinstance(item, dict):
                ret: str = self._process_summary_run_params(item)
                new_summary_str = ";".join(filter(None, (new_summary_str, ret)))
            elif isinstance(item, ElogSummaryPlots):
                plot_path: Optional[str] = self._process_elog_plot(item)
                new_summary_str = ";".join(
                    filter(None, (new_summary_str, plot_path))
                )
        self._analysis_desc.task_result.summary = new_summary_str
    elif isinstance(summary, str):
        ...
    else:
        ...

_process_results()

Performs result processing.

Actions include: - For ElogSummaryPlots, will save the summary plot to the appropriate directory for display in the eLog.

Source code in lute/execution/executor.py
978
979
980
981
982
983
984
985
986
987
def _process_results(self) -> None:
    """Performs result processing.

    Actions include:
    - For `ElogSummaryPlots`, will save the summary plot to the appropriate
        directory for display in the eLog.
    """
    task_result: TaskResult = self._analysis_desc.task_result
    self._process_result_payload(task_result.payload)
    self._process_result_summary(task_result.summary)

_process_summary_run_params(params)

Process a dictionary of run parameters to be posted to the eLog.

Parameters:

Name Type Description Default
params Dict[str, str]

Key/value pairs to be posted as run parameters.

required

Returns:

Name Type Description
summary_str str

New string of key/value pairs to be stored in summary field of the database.

Source code in lute/execution/executor.py
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
def _process_summary_run_params(self, params: Dict[str, str]) -> str:
    """Process a dictionary of run parameters to be posted to the eLog.

    Args:
        params (Dict[str, str]): Key/value pairs to be posted as run parameters.

    Returns:
        summary_str (str): New string of key/value pairs to be stored in
            summary field of the database.
    """
    if self._analysis_desc.task_parameters is None:
        logger.error(
            "Please run Task before using this method! (_process_summary_run_params). "
            "If you did run a Task, it may have failed immediately!"
        )
        return ""
    exp: str = self._analysis_desc.task_parameters.lute_config.experiment
    run: int = int(self._analysis_desc.task_parameters.lute_config.run)
    logger.debug("Posting eLog run parameters.")
    try:
        post_elog_run_table(exp, run, params)
    except Exception as err:
        logger.error(f"Unable to post run parameters! Error: {err}")
    post_elog_run_status(params)
    summary_str: str = ";".join(f"{key}: {value}" for key, value in params.items())
    return summary_str

_task_loop(proc)

Actions to perform while the Task is running.

This function is run in the body of a loop until the Task signals that its finished.

Source code in lute/execution/executor.py
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
def _task_loop(self, proc: subprocess.Popen) -> None:
    """Actions to perform while the Task is running.

    This function is run in the body of a loop until the Task signals
    that its finished.
    """
    # Some hooks may ask that the rest of the task loop be skipped (continued)
    should_continue: Optional[bool]
    for communicator in self._communicators:
        while True:
            msg: Message = communicator.read(proc)
            if msg.signal is not None and msg.signal.upper() in LUTE_SIGNALS:
                hook: Hook = getattr(self.Hooks, msg.signal.lower())
                should_continue = hook(self, msg, proc)
                if should_continue:
                    continue

            if msg.contents is not None:
                if isinstance(msg.contents, str) and msg.contents != "":
                    logger.info(msg.contents)
                elif not isinstance(msg.contents, str):
                    logger.info(msg.contents)
            if not communicator.has_messages:
                break

add_default_hooks()

Populate the set of default event hooks.

Source code in lute/execution/executor.py
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
def add_default_hooks(self) -> None:
    """Populate the set of default event hooks."""

    def no_pickle_mode(
        executor: Executor_T,
        msg: Message,
        proc: Optional[subprocess.Popen] = None,
    ) -> Optional[bool]:
        for idx, communicator in enumerate(executor._communicators):
            if isinstance(communicator, PipeCommunicator):
                executor._communicators[idx] = PipeCommunicator(
                    Party.EXECUTOR, use_pickle=False
                )
        return None

    self.add_hook("no_pickle_mode", no_pickle_mode)

    def task_started(
        executor: Executor_T,
        msg: Message,
        proc: Optional[subprocess.Popen] = None,
    ) -> Optional[bool]:
        assert proc is not None
        if isinstance(msg.contents, TaskParameters):
            executor._analysis_desc.task_parameters = msg.contents
            # Run "before" tasklets
            if executor._tasklets["before"] is not None:
                executor._run_tasklets(when="before")
            # Need to continue since Task._signal_start raises SIGSTOP
            executor._continue(proc)
            if hasattr(
                executor._analysis_desc.task_parameters.Config, "set_result"
            ):
                # Tasks may mark a parameter as the result
                # If so, setup the result now.
                executor._set_result_from_parameters()
        logger.info(
            f"Executor: {executor._analysis_desc.task_result.task_name} started"
        )
        executor._analysis_desc.task_result.task_status = TaskStatus.RUNNING
        elog_data: Dict[str, str] = {
            f"{executor._analysis_desc.task_result.task_name} status": "RUNNING",
        }
        post_elog_run_status(elog_data)
        return None

    self.add_hook("task_started", task_started)

    def task_failed(
        executor: Executor_T,
        msg: Message,
        proc: Optional[subprocess.Popen] = None,
    ) -> Optional[bool]:
        elog_data: Dict[str, str] = {
            f"{executor._analysis_desc.task_result.task_name} status": "FAILED",
        }
        post_elog_run_status(elog_data)
        return None

    self.add_hook("task_failed", task_failed)

    def task_stopped(
        executor: Executor_T,
        msg: Message,
        proc: Optional[subprocess.Popen] = None,
    ) -> Optional[bool]:
        elog_data: Dict[str, str] = {
            f"{executor._analysis_desc.task_result.task_name} status": "STOPPED",
        }
        post_elog_run_status(elog_data)
        return None

    self.add_hook("task_stopped", task_stopped)

    def task_done(
        executor: Executor_T,
        msg: Message,
        proc: Optional[subprocess.Popen] = None,
    ) -> Optional[bool]:
        elog_data: Dict[str, str] = {
            f"{executor._analysis_desc.task_result.task_name} status": "COMPLETED",
        }
        post_elog_run_status(elog_data)
        return None

    self.add_hook("task_done", task_done)

    def task_cancelled(
        executor: Executor_T,
        msg: Message,
        proc: Optional[subprocess.Popen] = None,
    ) -> Optional[bool]:
        elog_data: Dict[str, str] = {
            f"{executor._analysis_desc.task_result.task_name} status": "CANCELLED",
        }
        post_elog_run_status(elog_data)
        return None

    self.add_hook("task_cancelled", task_cancelled)

    def task_result(
        executor: Executor_T,
        msg: Message,
        proc: Optional[subprocess.Popen] = None,
    ) -> Optional[bool]:
        if isinstance(msg.contents, TaskResult):
            executor._analysis_desc.task_result = msg.contents
            # flake8: noqa: E731
            is_printable_type: Callable[[Any], bool] = lambda x: isinstance(
                x, dict
            ) or isinstance(x, str)
            if is_printable_type(executor._analysis_desc.task_result.summary):
                logger.info(executor._analysis_desc.task_result.summary)
            elif isinstance(executor._analysis_desc.task_result.summary, list):
                for item in executor._analysis_desc.task_result.summary:
                    if is_printable_type(item):
                        logger.info(item)

            logger.info(executor._analysis_desc.task_result.task_status)
        elog_data: Dict[str, str] = {
            f"{executor._analysis_desc.task_result.task_name} status": "COMPLETED",
        }
        post_elog_run_status(elog_data)

        return True

    self.add_hook("task_result", task_result)

    def task_log(
        executor: Executor_T,
        msg: Message,
        proc: Optional[subprocess.Popen] = None,
    ) -> Optional[bool]:
        if isinstance(msg.contents, str):
            # This should be log formatted already
            print(msg.contents)
            return True
        return False

    self.add_hook("task_log", task_log)

ExecutorHooks

A container class for the Executor's event hooks.

There is a corresponding function (hook) for each event/signal. Each function takes three parameters - a reference to the Executor, a reference to the Message (msg) which includes the corresponding signal and the Task subprocess.

Source code in lute/execution/executor.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
class ExecutorHooks:
    """A container class for the Executor's event hooks.

    There is a corresponding function (hook) for each event/signal. Each
    function takes three parameters - a reference to the Executor, a reference
    to the Message (msg) which includes the corresponding signal and the Task
    subprocess.
    """

    __slots__ = LUTE_SIGNALS

    no_pickle_mode: Hook
    task_started: Hook
    task_failed: Hook
    task_stopped: Hook
    task_done: Hook
    task_cancelled: Hook
    task_result: Hook
    task_log: Hook

MPIExecutor

Bases: Executor

Runs first-party Tasks that require MPI.

This Executor is otherwise identical to the standard Executor, except it uses mpirun for Task submission. Currently this Executor assumes a job has been submitted using SLURM as a first step. It will determine the number of MPI ranks based on the resources requested. As a fallback, it will try to determine the number of local cores available for cases where a job has not been submitted via SLURM. On S3DF, the second determination mechanism should accurately match the environment variable provided by SLURM indicating resources allocated.

This Executor will submit the Task to run with a number of processes equal to the total number of cores available minus 1. A single core is reserved for the Executor itself. Note that currently this means that you must submit on 3 cores or more, since MPI requires a minimum of 2 ranks, and the number of ranks is determined from the cores dedicated to Task execution.

Methods:

Name Description
_submit_cmd

Run the task as a subprocess using mpirun.

Source code in lute/execution/executor.py
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
class MPIExecutor(Executor):
    """Runs first-party Tasks that require MPI.

    This Executor is otherwise identical to the standard Executor, except it
    uses `mpirun` for `Task` submission. Currently this Executor assumes a job
    has been submitted using SLURM as a first step. It will determine the number
    of MPI ranks based on the resources requested. As a fallback, it will try
    to determine the number of local cores available for cases where a job has
    not been submitted via SLURM. On S3DF, the second determination mechanism
    should accurately match the environment variable provided by SLURM indicating
    resources allocated.

    This Executor will submit the Task to run with a number of processes equal
    to the total number of cores available minus 1. A single core is reserved
    for the Executor itself. Note that currently this means that you must submit
    on 3 cores or more, since MPI requires a minimum of 2 ranks, and the number
    of ranks is determined from the cores dedicated to Task execution.

    Methods:
        _submit_cmd: Run the task as a subprocess using `mpirun`.
    """

    def _submit_cmd(self, executable_path: str, params: str) -> str:
        """Override submission command to use `mpirun`

        Args:
            executable_path (str): Path to the LUTE subprocess script.

            params (str): String of formatted command-line arguments.

        Returns:
            cmd (str): Appropriately formatted command for this Executor.
        """
        py_cmd: str = ""
        nprocs: int = max(
            int(os.environ.get("SLURM_NPROCS", len(os.sched_getaffinity(0)))) - 1, 1
        )
        mpi_cmd: str = f"mpirun -np {nprocs}"
        if __debug__:
            py_cmd = f"python -B -u -m mpi4py.run {executable_path} {params}"
        else:
            py_cmd = f"python -OB -u -m mpi4py.run {executable_path} {params}"

        cmd: str = f"{mpi_cmd} {py_cmd}"
        return cmd

_submit_cmd(executable_path, params)

Override submission command to use mpirun

Parameters:

Name Type Description Default
executable_path str

Path to the LUTE subprocess script.

required
params str

String of formatted command-line arguments.

required

Returns:

Name Type Description
cmd str

Appropriately formatted command for this Executor.

Source code in lute/execution/executor.py
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
def _submit_cmd(self, executable_path: str, params: str) -> str:
    """Override submission command to use `mpirun`

    Args:
        executable_path (str): Path to the LUTE subprocess script.

        params (str): String of formatted command-line arguments.

    Returns:
        cmd (str): Appropriately formatted command for this Executor.
    """
    py_cmd: str = ""
    nprocs: int = max(
        int(os.environ.get("SLURM_NPROCS", len(os.sched_getaffinity(0)))) - 1, 1
    )
    mpi_cmd: str = f"mpirun -np {nprocs}"
    if __debug__:
        py_cmd = f"python -B -u -m mpi4py.run {executable_path} {params}"
    else:
        py_cmd = f"python -OB -u -m mpi4py.run {executable_path} {params}"

    cmd: str = f"{mpi_cmd} {py_cmd}"
    return cmd