Workflows with Maestro
Note: As with other backends, the term DAG, or directed acyclic graph, may be used to refer to "worflows". This page will use the terms workflow and DAG interchangeably.
Relevant Components
In addition to the core LUTE package, a number of components are generally involved to run a workflow.
The main interface to the maestro workflow manager is:
launch_maestro.py(defined in source code). This is installed aslaunch_slurmwhen LUTE is built.submit_launch_slurm.shA wrapper script which submits the above as a batch job so it doesn't need to be watched interactively.
The main components of the maestro sub-project are:
- A
Managerwhich orchestrates the workflow. - A
Serverwhich is used for HTTP communication. Launcherobjects which define how individual steps in the workflow are launched.
Workflow definitions are all YAML documents similarly to what is described in the dynamic workflows page; however, there are a number of small differences between that format (which is used for Airflow and Prefect) and the maestro. The maestro format also supports a number of additional features.
Launch/Submission Scripts
launch_slurm (The source file is: launch_maestro.py)
This script will parse a YAML workflow definition and then pass it to the maestro backend.
The script takes the following parameters:
usage: launch_slurm [-h] -c CONFIG -W WORKFLOW_DEFN [-e EXPERIMENT] [-r RUN] [-d] [--num_server_threads NUM_SERVER_THREADS] [--unbuffered] [$SLURM_ARGS]
A light-weight workflow manager which executes LUTE Managed Tasks.
required arguments:
-c CONFIG, --config CONFIG
Path to config YAML file.
-W WORKFLOW_DEFN, --workflow_defn WORKFLOW_DEFN
Path to a YAML file with workflow.
required arguments when running without the ARP:
-e EXPERIMENT, --experiment EXPERIMENT
Provide an experiment if not running with ARP.
-r RUN, --run RUN Provide a run number if not running with ARP.
optional arguments:
-h, --help show this help message and exit
-d, --debug Run in debug mode.
--num_server_threads NUM_SERVER_THREADS
Number of threads to use for the HTTP server.
--unbuffered Flush logs immediately. Warning: This can make output confusing when running multiple managed Tasks are running in parallel.
Refer to https://github.com/slac-lcls/lute for more information.
-crefers to the path of the configuration YAML that contains the parameters for each managedTaskin the requested workflow.-Wis the path to the custom DAG defined in YAML.-e | --experimentis used to pass the experiment name. Needed if not using the ARP, i.e. running from the command-line.-
-r | --runis used to pass a run number. Needed if not using the ARP, i.e. running from the command-line. -
--debugis an optional flag to run all steps of the workflow in debug mode for verbose logging and output. --num_server_threadsis an optional flag which will override the default number of threads setup by the built-in HTTP server.-
--unbufferedis an optional flag which will display logs immediately as the are written. By default, logs are collected and displayed at the end of each step of the workflow. This unbuffered option will print them immediately. This is probably not useful if you have multiple things running in parallel as part of your workflow, but if all the steps are running serially it could be useful for more immediate feedback. -
SLURM_ARGSare SLURM arguments to be passed to thesubmit_slurm.shscript which are used for each individual managedTask. These arguments to do NOT affect the submission parameters for the job runninglaunch_airflow.py(if usingsubmit_launch_airflow.shbelow).
Lifetime
This script will run for the entire duration of the workflow (DAG). After it launches the DAG, it will enter a status update loop which will keep track of each individual job (each job runs one managed Task). At the end of each job it will collect the log file, in addition to providing a few other status updates/debugging messages, and append it to its own log (if not using the --unbuffered flag described above). This allows all logging for the entire workflow (DAG) to be inspected from an individual file. This is particularly useful when running via the eLog, because only a single log file is displayed.
submit_launch_slurm.sh
This script wraps the launch_slurm script and submits it as a batch job. This is useful to not have to keep a terminal window open to watch the process interactively when submitting manually. It is, however, required when submitting from the eLog. The initial job submitted by the ARP can not have a duration of longer than 30 seconds, as it will then time out. As the launch_slurm job will live for the entire duration of the workflow, it must be submitted as a batch a batch job.
The interface is mostly indentical to launch_slurm itself, except the path to the actual launch_slurm script must be passed as the first argument.
Currently, the wrapper submits the job with minimal resources, requesting only a single core for the workflow manager. In the future, this may be made configurable.
Usage:
submit_launch_slurm.sh /path/to/launch_slurm [-h] -c CONFIG -W WORKFLOW_DEFN [-e EXPERIMENT] [-r RUN] [-d] [--num_server_threads NUM_SERVER_THREADS] [--unbuffered] [$SLURM_ARGS]
Creating a new workflow
Defining a new workflow involves creating a new YAML file.
As an example we will consider the following test workflow.
!LUTE_DAG
- task_name: "Tester"
slurm_params: ""
next:
- !ALL_FAILED
task_name: "SocketTester"
slurm_params: ""
next: []
- task_name: "Tester"
slurm_params: ""
next:
- !branch_daq2
daq2:
task_name: "SocketTester"
slurm_params: ""
next: []
daq1:
task_name: "WriteTester"
slurm_params: ""
next:
- task_name: "ReadTester"
slurm_params: ""
next: []
- All workflows must begin with a
!LUTE_DAGtag. This tells the YAML parser that this should be parsed into the specialJobStepstructures thatmaestrorecognizes as steps to be submitted. -
Next, the top level of the workflow (the first managed
Task(orTasks)) is setup. -
If there is only one step to launch initially, this can be a dictionary, otherwise it must be a list to define two steps to be launched in parallel.
-
Each step in the workflow must contain:
-
A
task_name, which is the name of a managedTaskto be submitted. - A custom string of SLURM arguments in
slurm_params. This can be used to override the arguments which are passed in the command-line, but if it is provided, it must provide EVERY SLURM argument. Currently, only replacing some arguments is not supported. - A field called
nextwhich is a list which sets up the managedTasks that will be submitted after this one completes. If it is left empty, then this branch of the workflow ends here.
Additionally, a set of trigger rules can be provided before each step definition:
!ALL_SUCCESS: This job step will only be submitted if all previous steps in its branch succeed (This is the default)!ANY_SUCCESS: This job step will be submitted as soon as any previous step in its branch succeed.!ALL_COMPLETED: This job step will be submitted as soon as all previous steps complete, whether they succeed or fail.!ALL_FAILED: This job step will be submitted only if all previous steps have failed.!ANY_FAILED: This job step will be submitted as soon as any previous step fails.!ALWAYS: This job step will always run as soon as it is reached in the DAG.
A number of branching conditions can be defined as well. Currently supported are branching based on LCLS1 vs LCLS2 DAQ, and the run type. These are defined by using the tag !branch_daq2 (for example) and defining two dictionaries underneath it for the various cases (daq2 or daq1 in this example).