Skip to content

config

Machinary for the IO of configuration YAML files and their validation.

Functions:

Name Description
parse_config

str, config_path: str) -> TaskParameters: Parse a configuration file and return a TaskParameters object of validated parameters for a specific Task. Raises an exception if the provided configuration does not match the expected model.

Raises:

Type Description
ValidationError

Error raised by pydantic during data validation. (From Pydantic)

AnalysisHeader

Bases: BaseModel

Header information for LUTE analysis runs.

Source code in lute/io/models/base.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class AnalysisHeader(BaseModel):
    """Header information for LUTE analysis runs."""

    title: str = Field(
        "LUTE Task Configuration",
        description="Description of the configuration or experiment.",
    )
    experiment: str = Field("", description="Experiment.")
    run: Union[str, int] = Field("", description="Data acquisition run.")
    date: str = Field("1970/01/01", description="Start date of analysis.")
    lute_version: Union[float, str] = Field(
        0.1, description="Version of LUTE used for analysis."
    )
    task_timeout: PositiveInt = Field(
        600,
        description=(
            "Time in seconds until a task times out. Should be slightly shorter"
            " than job timeout if using a job manager (e.g. SLURM)."
        ),
    )
    work_dir: str = Field("", description="Main working directory for LUTE.")

    @validator("work_dir", always=True)
    def validate_work_dir(cls, directory: str, values: Dict[str, Any]) -> str:
        work_dir: str
        if directory == "":
            std_work_dir = (
                f"/sdf/data/lcls/ds/{values['experiment'][:3]}/"
                f"{values['experiment']}/scratch"
            )
            work_dir = std_work_dir
        else:
            work_dir = directory
        # Check existence and permissions
        if not os.path.exists(work_dir):
            raise ValueError(f"Working Directory: {work_dir} does not exist!")
        if not os.access(work_dir, os.W_OK):
            # Need write access for database, files etc.
            raise ValueError(f"Not write access for working directory: {work_dir}!")
        os.environ["LUTE_WORK_DIR"] = work_dir
        return work_dir

    @validator("run", always=True)
    def validate_run(
        cls, run: Union[str, int], values: Dict[str, Any]
    ) -> Union[str, int]:
        if run == "":
            # From Airflow RUN_NUM should have Format "RUN_DATETIME" - Num is first part
            run_time: str = os.environ.get("RUN_NUM", "")
            if run_time != "":
                return int(run_time.split("_")[0])
        return run

    @validator("experiment", always=True)
    def validate_experiment(cls, experiment: str, values: Dict[str, Any]) -> str:
        if experiment == "":
            arp_exp: str = os.environ.get("EXPERIMENT", "EXPX00000")
            return arp_exp
        return experiment

AnalyzeSmallDataXASParameters

Bases: TaskParameters

TaskParameter model for AnalyzeSmallDataXAS Task.

This Task does basic analysis of XAS data based on a SmallData HDF5 output file. It calculates difference absorption and signal binned by various scanned motors.

Source code in lute/io/models/smd.py
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
class AnalyzeSmallDataXASParameters(TaskParameters):
    """TaskParameter model for AnalyzeSmallDataXAS Task.

    This Task does basic analysis of XAS data based on a SmallData HDF5 output
    file. It calculates difference absorption and signal binned by various
    scanned motors.
    """

    class Thresholds(BaseModel):
        min_Iscat: float = Field(
            10.0, description="Minimum scattering intensity to use for filtering."
        )
        min_ipm: float = Field(
            1000.0, description="Minimum X-ray intensity to use for filtering."
        )

    _find_smd_path = validate_smd_path("smd_path")

    smd_path: str = Field(
        "", description="Path to the Small Data HDF5 file to analyze."
    )
    xas_detname: Optional[str] = Field(
        None, description="Name of the detector with absorption data."
    )
    xss_detname: Optional[str] = Field(
        None,
        description="Name of the detector with scattering data, for normalization.",
    )
    ipm_var: str = Field(
        description="Name of the IPM to use for X-Ray intensity filtering."
    )
    scan_var: Optional[Union[List[str], str]] = Field(
        None,
        description=(
            "Name of a scan variable or a list of scan variables to analyze. "
            "E.g. lxt, lens_h, etc."
        ),
    )
    ccm: str = Field(description="Name of the PV for CCM position readback.")
    ccm_set: Optional[str] = Field(
        None, description="Name of the PV for the setpoint of the CCM."
    )
    thresholds: Thresholds = Field(Thresholds())
    element: Optional[bool] = Field(
        None,
        description="Element under investigation. Currently unused. For future EXAFS.",
    )

AnalyzeSmallDataXESParameters

Bases: TaskParameters

TaskParameter model for AnalyzeSmallDataXES Task.

This Task does basic analysis of XES data based on a SmallData HDF5 output file. It calculates difference emission and signal binned by various scanned motors.

Source code in lute/io/models/smd.py
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
class AnalyzeSmallDataXESParameters(TaskParameters):
    """TaskParameter model for AnalyzeSmallDataXES Task.

    This Task does basic analysis of XES data based on a SmallData HDF5 output
    file. It calculates difference emission and signal binned by various
    scanned motors.
    """

    class Thresholds(BaseModel):
        min_Iscat: float = Field(
            10.0, description="Minimum scattering intensity to use for filtering."
        )
        min_ipm: float = Field(
            1000.0, description="Minimum X-ray intensity to use for filtering."
        )

    _find_smd_path = validate_smd_path("smd_path")

    smd_path: str = Field(
        "", description="Path to the Small Data HDF5 file to analyze."
    )
    xes_detname: Optional[str] = Field(
        None, description="Name of the detector with absorption data."
    )
    xss_detname: Optional[str] = Field(
        None,
        description="Name of the detector with scattering data, for normalization.",
    )
    ipm_var: str = Field(
        description="Name of the IPM to use for X-Ray intensity filtering."
    )
    scan_var: Optional[Union[List[str], str]] = Field(
        None,
        description=(
            "Name of a scan variable or a list of scan variables to analyze. "
            "E.g. lxt, lens_h, etc."
        ),
    )
    thresholds: Thresholds = Field(Thresholds())
    invert_xes_axes: bool = Field(
        False,
        description=(
            "Flip the projection axes depending on detector orientation. "
            "Default is that projection along axis 1 is spectrum."
        ),
    )
    rot_angle: Optional[float] = Field(
        None,
        description="Optionally rotate the ROIs by a small amount before projection.",
    )
    batch_size: int = Field(
        0,
        description="If non-zero load ROIs in batches. Slower but may help OOM errors.",
    )

AnalyzeSmallDataXSSParameters

Bases: TaskParameters

TaskParameter model for AnalyzeSmallDataXSS Task.

This Task does basic analysis of XSS data based on a SmallData HDF5 output file. It calculates difference scattering and signal binned by various scanned motors.

Source code in lute/io/models/smd.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
class AnalyzeSmallDataXSSParameters(TaskParameters):
    """TaskParameter model for AnalyzeSmallDataXSS Task.

    This Task does basic analysis of XSS data based on a SmallData HDF5 output
    file. It calculates difference scattering and signal binned by various
    scanned motors.
    """

    class Thresholds(BaseModel):
        min_Iscat: float = Field(
            10.0, description="Minimum scattering intensity to use for filtering."
        )
        min_ipm: float = Field(
            1000.0, description="Minimum X-ray intensity to use for filtering."
        )

    class AnalysisFlags(BaseModel):
        use_pyfai: bool = True
        use_asymls: bool = False

    _find_smd_path = validate_smd_path("smd_path")

    smd_path: str = Field(
        "", description="Path to the Small Data HDF5 file to analyze."
    )
    xss_detname: Optional[str] = Field(
        None, description="Name of the detector with scattering data."
    )
    ipm_var: str = Field(
        description="Name of the IPM to use for X-Ray intensity filtering."
    )
    scan_var: Optional[Union[List[str], str]] = Field(
        None,
        description=(
            "Name of a scan variable or a list of scan variables to analyze. "
            "E.g. lxt, lens_h, etc."
        ),
    )
    thresholds: Thresholds = Field(Thresholds())

CompareHKLParameters

Bases: ThirdPartyParameters

Parameters for CrystFEL's compare_hkl for calculating figures of merit.

There are many parameters, and many combinations. For more information on usage, please refer to the CrystFEL documentation, here: https://www.desy.de/~twhite/crystfel/manual-partialator.html

Source code in lute/io/models/sfx_merge.py
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
class CompareHKLParameters(ThirdPartyParameters):
    """Parameters for CrystFEL's `compare_hkl` for calculating figures of merit.

    There are many parameters, and many combinations. For more information on
    usage, please refer to the CrystFEL documentation, here:
    https://www.desy.de/~twhite/crystfel/manual-partialator.html
    """

    class Config(ThirdPartyParameters.Config):
        long_flags_use_eq: bool = True
        """Whether long command-line arguments are passed like `--long=arg`."""

        set_result: bool = True
        """Whether the Executor should mark a specified parameter as a result."""

    executable: str = Field(
        "/sdf/group/lcls/ds/tools/crystfel/0.10.2/bin/compare_hkl",
        description="CrystFEL's reflection comparison binary.",
        flag_type="",
    )
    in_files: Optional[str] = Field(
        "",
        description="Path to input HKLs. Space-separated list of 2. Use output of partialator e.g.",
        flag_type="",
    )
    ## Need mechanism to set is_result=True ...
    symmetry: str = Field("", description="Point group symmetry.", flag_type="--")
    cell_file: str = Field(
        "",
        description="Path to a file containing unit cell information (PDB or CrystFEL format).",
        flag_type="-",
        rename_param="p",
    )
    fom: str = Field(
        "Rsplit", description="Specify figure of merit to calculate.", flag_type="--"
    )
    nshells: int = Field(10, description="Use n resolution shells.", flag_type="--")
    # NEED A NEW CASE FOR THIS -> Boolean flag, no arg, one hyphen...
    # fix_unity: bool = Field(
    #    False,
    #    description="Fix scale factors to unity.",
    #    flag_type="-",
    #    rename_param="u",
    # )
    shell_file: str = Field(
        "",
        description="Write the statistics in resolution shells to a file.",
        flag_type="--",
        rename_param="shell-file",
        is_result=True,
    )
    ignore_negs: bool = Field(
        False,
        description="Ignore reflections with negative reflections.",
        flag_type="--",
        rename_param="ignore-negs",
    )
    zero_negs: bool = Field(
        False,
        description="Set negative intensities to 0.",
        flag_type="--",
        rename_param="zero-negs",
    )
    sigma_cutoff: Optional[Union[float, int, str]] = Field(
        # "-infinity",
        description="Discard reflections with I/sigma(I) < n. -infinity means no cutoff.",
        flag_type="--",
        rename_param="sigma-cutoff",
    )
    rmin: Optional[float] = Field(
        description="Low resolution cutoff of 1/d (m-1). Use this or --lowres NOT both.",
        flag_type="--",
    )
    lowres: Optional[float] = Field(
        descirption="Low resolution cutoff in Angstroms. Use this or --rmin NOT both.",
        flag_type="--",
    )
    rmax: Optional[float] = Field(
        description="High resolution cutoff in 1/d (m-1). Use this or --highres NOT both.",
        flag_type="--",
    )
    highres: Optional[float] = Field(
        description="High resolution cutoff in Angstroms. Use this or --rmax NOT both.",
        flag_type="--",
    )

    @validator("in_files", always=True)
    def validate_in_files(cls, in_files: str, values: Dict[str, Any]) -> str:
        if in_files == "":
            partialator_file: Optional[str] = read_latest_db_entry(
                f"{values['lute_config'].work_dir}", "MergePartialator", "out_file"
            )
            if partialator_file:
                hkls: str = f"{partialator_file}1 {partialator_file}2"
                return hkls
        return in_files

    @validator("cell_file", always=True)
    def validate_cell_file(cls, cell_file: str, values: Dict[str, Any]) -> str:
        if cell_file == "":
            idx_cell_file: Optional[str] = read_latest_db_entry(
                f"{values['lute_config'].work_dir}",
                "IndexCrystFEL",
                "cell_file",
                valid_only=False,
            )
            if idx_cell_file:
                return idx_cell_file
        return cell_file

    @validator("symmetry", always=True)
    def validate_symmetry(cls, symmetry: str, values: Dict[str, Any]) -> str:
        if symmetry == "":
            partialator_sym: Optional[str] = read_latest_db_entry(
                f"{values['lute_config'].work_dir}", "MergePartialator", "symmetry"
            )
            if partialator_sym:
                return partialator_sym
        return symmetry

    @validator("shell_file", always=True)
    def validate_shell_file(cls, shell_file: str, values: Dict[str, Any]) -> str:
        if shell_file == "":
            partialator_file: Optional[str] = read_latest_db_entry(
                f"{values['lute_config'].work_dir}", "MergePartialator", "out_file"
            )
            if partialator_file:
                shells_out: str = partialator_file.split(".")[0]
                shells_out = f"{shells_out}_{values['fom']}_n{values['nshells']}.dat"
                return shells_out
        return shell_file

Config

Bases: Config

Source code in lute/io/models/sfx_merge.py
389
390
391
392
393
394
class Config(ThirdPartyParameters.Config):
    long_flags_use_eq: bool = True
    """Whether long command-line arguments are passed like `--long=arg`."""

    set_result: bool = True
    """Whether the Executor should mark a specified parameter as a result."""

long_flags_use_eq: bool = True class-attribute instance-attribute

Whether long command-line arguments are passed like --long=arg.

set_result: bool = True class-attribute instance-attribute

Whether the Executor should mark a specified parameter as a result.

ConcatenateStreamFilesParameters

Bases: TaskParameters

Parameters for stream concatenation.

Concatenates the stream file output from CrystFEL indexing for multiple experimental runs.

Source code in lute/io/models/sfx_index.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
class ConcatenateStreamFilesParameters(TaskParameters):
    """Parameters for stream concatenation.

    Concatenates the stream file output from CrystFEL indexing for multiple
    experimental runs.
    """

    class Config(TaskParameters.Config):
        set_result: bool = True
        """Whether the Executor should mark a specified parameter as a result."""

    in_file: str = Field(
        "",
        description="Root of directory tree storing stream files to merge.",
    )

    tag: Optional[str] = Field(
        "",
        description="Tag identifying the stream files to merge.",
    )

    out_file: str = Field(
        "", description="Path to merged output stream file.", is_result=True
    )

    @validator("in_file", always=True)
    def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
        if in_file == "":
            stream_file: Optional[str] = read_latest_db_entry(
                f"{values['lute_config'].work_dir}", "IndexCrystFEL", "out_file"
            )
            if stream_file:
                stream_dir: str = str(Path(stream_file).parent)
                return stream_dir
        return in_file

    @validator("tag", always=True)
    def validate_tag(cls, tag: str, values: Dict[str, Any]) -> str:
        if tag == "":
            stream_file: Optional[str] = read_latest_db_entry(
                f"{values['lute_config'].work_dir}", "IndexCrystFEL", "out_file"
            )
            if stream_file:
                stream_tag: str = Path(stream_file).name.split("_")[-1].split(".")[0]
                return stream_tag
        return tag

    @validator("out_file", always=True)
    def validate_out_file(cls, tag: str, values: Dict[str, Any]) -> str:
        if tag == "":
            stream_out_file: str = str(
                Path(values["in_file"]).parent / f"{values['tag']}.stream"
            )
            return stream_out_file
        return tag

Config

Bases: Config

Source code in lute/io/models/sfx_index.py
460
461
462
class Config(TaskParameters.Config):
    set_result: bool = True
    """Whether the Executor should mark a specified parameter as a result."""

set_result: bool = True class-attribute instance-attribute

Whether the Executor should mark a specified parameter as a result.

DimpleSolveParameters

Bases: ThirdPartyParameters

Parameters for CCP4's dimple program.

There are many parameters. For more information on usage, please refer to the CCP4 documentation, here: https://ccp4.github.io/dimple/

Source code in lute/io/models/sfx_solve.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
class DimpleSolveParameters(ThirdPartyParameters):
    """Parameters for CCP4's dimple program.

    There are many parameters. For more information on
    usage, please refer to the CCP4 documentation, here:
    https://ccp4.github.io/dimple/
    """

    class Config(ThirdPartyParameters.Config):
        """Identical to super-class Config but includes a result."""

        set_result: bool = True
        """Whether the Executor should mark a specified parameter as a result."""

        result_from_params: str = ""
        """Defines a result from the parameters. Use a validator to do so."""

    executable: str = Field(
        "/sdf/group/lcls/ds/tools/ccp4-8.0/bin/dimple",
        description="CCP4 Dimple for solving structures with MR.",
        flag_type="",
    )
    # Positional requirements - all required.
    in_file: str = Field(
        "",
        description="Path to input mtz.",
        flag_type="",
    )
    pdb: str = Field("", description="Path to a PDB.", flag_type="")
    out_dir: str = Field("", description="Output DIRECTORY.", flag_type="")
    # Most used options
    mr_thresh: PositiveFloat = Field(
        0.4,
        description="Threshold for molecular replacement.",
        flag_type="--",
        rename_param="mr-when-r",
    )
    slow: Optional[bool] = Field(
        False, description="Perform more refinement.", flag_type="--"
    )
    # Other options (IO)
    hklout: str = Field(
        "final.mtz", description="Output mtz file name.", flag_type="--"
    )
    xyzout: str = Field(
        "final.pdb", description="Output PDB file name.", flag_type="--"
    )
    icolumn: Optional[str] = Field(
        # "IMEAN",
        description="Name for the I column.",
        flag_type="--",
    )
    sigicolumn: Optional[str] = Field(
        # "SIG<ICOL>",
        description="Name for the Sig<I> column.",
        flag_type="--",
    )
    fcolumn: Optional[str] = Field(
        # "F",
        description="Name for the F column.",
        flag_type="--",
    )
    sigfcolumn: Optional[str] = Field(
        # "F",
        description="Name for the Sig<F> column.",
        flag_type="--",
    )
    libin: Optional[str] = Field(
        description="Ligand descriptions for refmac (LIBIN).", flag_type="--"
    )
    refmac_key: Optional[str] = Field(
        description="Extra Refmac keywords to use in refinement.",
        flag_type="--",
        rename_param="refmac-key",
    )
    free_r_flags: Optional[str] = Field(
        description="Path to a mtz file with freeR flags.",
        flag_type="--",
        rename_param="free-r-flags",
    )
    freecolumn: Optional[Union[int, float]] = Field(
        # 0,
        description="Refree column with an optional value.",
        flag_type="--",
    )
    img_format: Optional[str] = Field(
        description="Format of generated images. (png, jpeg, none).",
        flag_type="-",
        rename_param="f",
    )
    white_bg: bool = Field(
        False,
        description="Use a white background in Coot and in images.",
        flag_type="--",
        rename_param="white-bg",
    )
    no_cleanup: bool = Field(
        False,
        description="Retain intermediate files.",
        flag_type="--",
        rename_param="no-cleanup",
    )
    # Calculations
    no_blob_search: bool = Field(
        False,
        description="Do not search for unmodelled blobs.",
        flag_type="--",
        rename_param="no-blob-search",
    )
    anode: bool = Field(
        False,
        description="Use SHELX/AnoDe to find peaks in the anomalous map.",
        flag_type="--",
    )
    # Run customization
    no_hetatm: bool = Field(
        False,
        description="Remove heteroatoms from the given model.",
        flag_type="--",
        rename_param="no-hetatm",
    )
    rigid_cycles: Optional[PositiveInt] = Field(
        # 10,
        description="Number of cycles of rigid-body refinement to perform.",
        flag_type="--",
        rename_param="rigid-cycles",
    )
    jelly: Optional[PositiveInt] = Field(
        # 4,
        description="Number of cycles of jelly-body refinement to perform.",
        flag_type="--",
    )
    restr_cycles: Optional[PositiveInt] = Field(
        # 8,
        description="Number of cycles of refmac final refinement to perform.",
        flag_type="--",
        rename_param="restr-cycles",
    )
    lim_resolution: Optional[PositiveFloat] = Field(
        description="Limit the final resolution.", flag_type="--", rename_param="reso"
    )
    weight: Optional[str] = Field(
        # "auto-weight",
        description="The refmac matrix weight.",
        flag_type="--",
    )
    mr_prog: Optional[str] = Field(
        # "phaser",
        description="Molecular replacement program. phaser or molrep.",
        flag_type="--",
        rename_param="mr-prog",
    )
    mr_num: Optional[Union[str, int]] = Field(
        # "auto",
        description="Number of molecules to use for molecular replacement.",
        flag_type="--",
        rename_param="mr-num",
    )
    mr_reso: Optional[PositiveFloat] = Field(
        # 3.25,
        description="High resolution for molecular replacement. If >10 interpreted as eLLG.",
        flag_type="--",
        rename_param="mr-reso",
    )
    itof_prog: Optional[str] = Field(
        description="Program to calculate amplitudes. truncate, or ctruncate.",
        flag_type="--",
        rename_param="ItoF-prog",
    )

    @validator("in_file", always=True)
    def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
        if in_file == "":
            get_hkl_file: Optional[str] = read_latest_db_entry(
                f"{values['lute_config'].work_dir}", "ManipulateHKL", "out_file"
            )
            if get_hkl_file:
                return get_hkl_file
        return in_file

    @validator("out_dir", always=True)
    def validate_out_dir(cls, out_dir: str, values: Dict[str, Any]) -> str:
        if out_dir == "":
            get_hkl_file: Optional[str] = read_latest_db_entry(
                f"{values['lute_config'].work_dir}", "ManipulateHKL", "out_file"
            )
            if get_hkl_file:
                return os.path.dirname(get_hkl_file)
        return out_dir

    @root_validator(pre=False)
    def define_result(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        out_dir: str = values["out_dir"]
        result: str
        if out_dir != "":
            result = f"{out_dir}/final.mtz;{out_dir}/final.pdb"
        else:
            result = ""
        cls.Config.result_from_params = result
        return values

Config

Bases: Config

Identical to super-class Config but includes a result.

Source code in lute/io/models/sfx_solve.py
28
29
30
31
32
33
34
35
class Config(ThirdPartyParameters.Config):
    """Identical to super-class Config but includes a result."""

    set_result: bool = True
    """Whether the Executor should mark a specified parameter as a result."""

    result_from_params: str = ""
    """Defines a result from the parameters. Use a validator to do so."""

result_from_params: str = '' class-attribute instance-attribute

Defines a result from the parameters. Use a validator to do so.

set_result: bool = True class-attribute instance-attribute

Whether the Executor should mark a specified parameter as a result.

FindPeaksPsocakeParameters

Bases: ThirdPartyParameters

Parameters for crystallographic (Bragg) peak finding using Psocake.

This peak finding Task optionally has the ability to compress/decompress data with SZ for the purpose of compression validation. NOTE: This Task is deprecated and provided for compatibility only.

Source code in lute/io/models/sfx_find_peaks.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
class FindPeaksPsocakeParameters(ThirdPartyParameters):
    """Parameters for crystallographic (Bragg) peak finding using Psocake.

    This peak finding Task optionally has the ability to compress/decompress
    data with SZ for the purpose of compression validation.
    NOTE: This Task is deprecated and provided for compatibility only.
    """

    class Config(TaskParameters.Config):
        set_result: bool = True
        """Whether the Executor should mark a specified parameter as a result."""

        result_from_params: str = ""
        """Defines a result from the parameters. Use a validator to do so."""

    class SZParameters(BaseModel):
        compressor: Literal["qoz", "sz3"] = Field(
            "qoz", description="SZ compression algorithm (qoz, sz3)"
        )
        binSize: int = Field(2, description="SZ compression's bin size paramater")
        roiWindowSize: int = Field(
            2, description="SZ compression's ROI window size paramater"
        )
        absError: float = Field(10, descriptionp="Maximum absolute error value")

    executable: str = Field("mpirun", description="MPI executable.", flag_type="")
    np: PositiveInt = Field(
        max(int(os.environ.get("SLURM_NPROCS", len(os.sched_getaffinity(0)))) - 1, 1),
        description="Number of processes",
        flag_type="-",
    )
    mca: str = Field(
        "btl ^openib", description="Mca option for the MPI executable", flag_type="--"
    )
    p_arg1: str = Field(
        "python", description="Executable to run with mpi (i.e. python).", flag_type=""
    )
    u: str = Field(
        "", description="Python option for unbuffered output.", flag_type="-"
    )
    p_arg2: str = Field(
        "findPeaksSZ.py",
        description="Executable to run with mpi (i.e. python).",
        flag_type="",
    )
    d: str = Field(description="Detector name", flag_type="-")
    e: str = Field("", description="Experiment name", flag_type="-")
    r: int = Field(-1, description="Run number", flag_type="-")
    outDir: str = Field(
        description="Output directory where .cxi will be saved", flag_type="--"
    )
    algorithm: int = Field(1, description="PyAlgos algorithm to use", flag_type="--")
    alg_npix_min: float = Field(
        1.0, description="PyAlgos algorithm's npix_min parameter", flag_type="--"
    )
    alg_npix_max: float = Field(
        45.0, description="PyAlgos algorithm's npix_max parameter", flag_type="--"
    )
    alg_amax_thr: float = Field(
        250.0, description="PyAlgos algorithm's amax_thr parameter", flag_type="--"
    )
    alg_atot_thr: float = Field(
        330.0, description="PyAlgos algorithm's atot_thr parameter", flag_type="--"
    )
    alg_son_min: float = Field(
        10.0, description="PyAlgos algorithm's son_min parameter", flag_type="--"
    )
    alg1_thr_low: float = Field(
        80.0, description="PyAlgos algorithm's thr_low parameter", flag_type="--"
    )
    alg1_thr_high: float = Field(
        270.0, description="PyAlgos algorithm's thr_high parameter", flag_type="--"
    )
    alg1_rank: int = Field(
        3, description="PyAlgos algorithm's rank parameter", flag_type="--"
    )
    alg1_radius: int = Field(
        3, description="PyAlgos algorithm's radius parameter", flag_type="--"
    )
    alg1_dr: int = Field(
        1, description="PyAlgos algorithm's dr parameter", flag_type="--"
    )
    psanaMask_on: str = Field(
        "True", description="Whether psana's mask should be used", flag_type="--"
    )
    psanaMask_calib: str = Field(
        "True", description="Psana mask's calib parameter", flag_type="--"
    )
    psanaMask_status: str = Field(
        "True", description="Psana mask's status parameter", flag_type="--"
    )
    psanaMask_edges: str = Field(
        "True", description="Psana mask's edges parameter", flag_type="--"
    )
    psanaMask_central: str = Field(
        "True", description="Psana mask's central parameter", flag_type="--"
    )
    psanaMask_unbond: str = Field(
        "True", description="Psana mask's unbond parameter", flag_type="--"
    )
    psanaMask_unbondnrs: str = Field(
        "True", description="Psana mask's unbondnbrs parameter", flag_type="--"
    )
    mask: str = Field(
        "", description="Path to an additional mask to apply", flag_type="--"
    )
    clen: str = Field(
        description="Epics variable storing the camera length", flag_type="--"
    )
    coffset: float = Field(0, description="Camera offset in m", flag_type="--")
    minPeaks: int = Field(
        15,
        description="Minimum number of peaks to mark frame for indexing",
        flag_type="--",
    )
    maxPeaks: int = Field(
        15,
        description="Maximum number of peaks to mark frame for indexing",
        flag_type="--",
    )
    minRes: int = Field(
        0,
        description="Minimum peak resolution to mark frame for indexing ",
        flag_type="--",
    )
    sample: str = Field("", description="Sample name", flag_type="--")
    instrument: Union[None, str] = Field(
        None, description="Instrument name", flag_type="--"
    )
    pixelSize: float = Field(0.0, description="Pixel size", flag_type="--")
    auto: str = Field(
        "False",
        description=(
            "Whether to automatically determine peak per event peak "
            "finding parameters"
        ),
        flag_type="--",
    )
    detectorDistance: float = Field(
        0.0, description="Detector distance from interaction point in m", flag_type="--"
    )
    access: Literal["ana", "ffb"] = Field(
        "ana", description="Data node type: {ana,ffb}", flag_type="--"
    )
    szfile: str = Field("qoz.json", description="Path to SZ's JSON configuration file")
    lute_template_cfg: TemplateConfig = Field(
        TemplateConfig(
            template_name="sz.json",
            output_path="",  # Will want to change where this goes...
        ),
        description="Template information for the sz.json file",
    )
    sz_parameters: SZParameters = Field(
        description="Configuration parameters for SZ Compression", flag_type=""
    )

    @validator("e", always=True)
    def validate_e(cls, e: str, values: Dict[str, Any]) -> str:
        if e == "":
            return values["lute_config"].experiment
        return e

    @validator("r", always=True)
    def validate_r(cls, r: int, values: Dict[str, Any]) -> int:
        if r == -1:
            return values["lute_config"].run
        return r

    @validator("lute_template_cfg", always=True)
    def set_output_path(
        cls, lute_template_cfg: TemplateConfig, values: Dict[str, Any]
    ) -> TemplateConfig:
        if lute_template_cfg.output_path == "":
            lute_template_cfg.output_path = values["szfile"]
        return lute_template_cfg

    @validator("sz_parameters", always=True)
    def set_sz_compression_parameters(
        cls, sz_parameters: SZParameters, values: Dict[str, Any]
    ) -> None:
        values["compressor"] = sz_parameters.compressor
        values["binSize"] = sz_parameters.binSize
        values["roiWindowSize"] = sz_parameters.roiWindowSize
        if sz_parameters.compressor == "qoz":
            values["pressio_opts"] = {
                "pressio:abs": sz_parameters.absError,
                "qoz": {"qoz:stride": 8},
            }
        else:
            values["pressio_opts"] = {"pressio:abs": sz_parameters.absError}
        return None

    @root_validator(pre=False)
    def define_result(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        exp: str = values["lute_config"].experiment
        run: int = int(values["lute_config"].run)
        directory: str = values["outDir"]
        fname: str = f"{exp}_{run:04d}.lst"

        cls.Config.result_from_params = f"{directory}/{fname}"
        return values

Config

Bases: Config

Source code in lute/io/models/sfx_find_peaks.py
145
146
147
148
149
150
class Config(TaskParameters.Config):
    set_result: bool = True
    """Whether the Executor should mark a specified parameter as a result."""

    result_from_params: str = ""
    """Defines a result from the parameters. Use a validator to do so."""

result_from_params: str = '' class-attribute instance-attribute

Defines a result from the parameters. Use a validator to do so.

set_result: bool = True class-attribute instance-attribute

Whether the Executor should mark a specified parameter as a result.

FindPeaksPyAlgosParameters

Bases: TaskParameters

Parameters for crystallographic (Bragg) peak finding using PyAlgos.

This peak finding Task optionally has the ability to compress/decompress data with SZ for the purpose of compression validation.

Source code in lute/io/models/sfx_find_peaks.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class FindPeaksPyAlgosParameters(TaskParameters):
    """Parameters for crystallographic (Bragg) peak finding using PyAlgos.

    This peak finding Task optionally has the ability to compress/decompress
    data with SZ for the purpose of compression validation.
    """

    class Config(TaskParameters.Config):
        set_result: bool = True
        """Whether the Executor should mark a specified parameter as a result."""

    class SZCompressorParameters(BaseModel):
        compressor: Literal["qoz", "sz3"] = Field(
            "qoz", description='Compression algorithm ("qoz" or "sz3")'
        )
        abs_error: float = Field(10.0, description="Absolute error bound")
        bin_size: int = Field(2, description="Bin size")
        roi_window_size: int = Field(
            9,
            description="Default window size",
        )

    outdir: str = Field(
        description="Output directory for cxi files",
    )
    n_events: int = Field(
        0,
        description="Number of events to process (0 to process all events)",
    )
    det_name: str = Field(
        description="Psana name of the detector storing the image data",
    )
    event_receiver: Literal["evr0", "evr1"] = Field(
        description="Event Receiver to be used: evr0 or evr1",
    )
    tag: str = Field(
        "",
        description="Tag to add to the output file names",
    )
    pv_camera_length: Union[str, float] = Field(
        description="PV associated with camera length "
        "(if a number, camera length directly)",
    )
    event_logic: bool = Field(
        False,
        description="True if only events with a specific event code should be "
        "processed. False if the event code should be ignored",
    )
    event_code: int = Field(
        0,
        description="Required events code for events to be processed if event logic "
        "is True",
    )
    psana_mask: bool = Field(
        False,
        description="If True, apply mask from psana Detector object",
    )
    mask_file: Union[str, None] = Field(
        None,
        description="File with a custom mask to apply. If None, no custom mask is "
        "applied",
    )
    min_peaks: int = Field(2, description="Minimum number of peaks per image")
    max_peaks: int = Field(
        2048,
        description="Maximum number of peaks per image",
    )
    npix_min: int = Field(
        2,
        description="Minimum number of pixels per peak",
    )
    npix_max: int = Field(
        30,
        description="Maximum number of pixels per peak",
    )
    amax_thr: float = Field(
        80.0,
        description="Minimum intensity threshold for starting a peak",
    )
    atot_thr: float = Field(
        120.0,
        description="Minimum summed intensity threshold for pixel collection",
    )
    son_min: float = Field(
        7.0,
        description="Minimum signal-to-noise ratio to be considered a peak",
    )
    peak_rank: int = Field(
        3,
        description="Radius in which central peak pixel is a local maximum",
    )
    r0: float = Field(
        3.0,
        description="Radius of ring for background evaluation in pixels",
    )
    dr: float = Field(
        2.0,
        description="Width of ring for background evaluation in pixels",
    )
    nsigm: float = Field(
        7.0,
        description="Intensity threshold to include pixel in connected group",
    )
    compression: Optional[SZCompressorParameters] = Field(
        None,
        description="Options for the SZ Compression Algorithm",
    )
    out_file: str = Field(
        "",
        description="Path to output file.",
        flag_type="-",
        rename_param="o",
        is_result=True,
    )

    @validator("out_file", always=True)
    def validate_out_file(cls, out_file: str, values: Dict[str, Any]) -> str:
        if out_file == "":
            fname: Path = (
                Path(values["outdir"])
                / f"{values['lute_config'].experiment}_{values['lute_config'].run}_"
                f"{values['tag']}.list"
            )
            return str(fname)
        return out_file

Config

Bases: Config

Source code in lute/io/models/sfx_find_peaks.py
17
18
19
class Config(TaskParameters.Config):
    set_result: bool = True
    """Whether the Executor should mark a specified parameter as a result."""

set_result: bool = True class-attribute instance-attribute

Whether the Executor should mark a specified parameter as a result.

IndexCCTBXXFELParameters

Bases: ThirdPartyParameters

Parameters for indexing with cctbx.xfel.

Source code in lute/io/models/sfx_index.py
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
class IndexCCTBXXFELParameters(ThirdPartyParameters):
    """Parameters for indexing with cctbx.xfel."""

    class Config(ThirdPartyParameters.Config):
        set_result: bool = False
        """Whether the Executor should mark a specified parameter as a result."""

    class PhilParameters(BaseModel):
        """Template parameters for CCTBX phil file."""

        class Config(BaseModel.Config):
            extra: str = "allow"

        # Generic input settings: input_
        input_reference_geometry: Optional[str] = Field(
            None,
            description=(
                "Provide an models.expt file with exactly one detector model. Data "
                "processing will use that geometry instead of the geometry found "
                "in the image headers."
            ),
        )

        # Generic geometry: geometry_
        geometry_detector_panel_origin: Optional[Tuple[float, float, float]] = Field(
            None,
            description="Override the panel origin. Requires fast_axis and slow_axis.",
        )

        # Generic output settings: output_
        output_output_dir: str = Field(
            "",
            description="Directory output files will be placed",
        )
        output_composite_output: bool = Field(
            True,
            description=(
                "If True, save one set of experiment/reflection files per process, "
                "where each is a concatenated list of all the successful events "
                "examined by that process. If False, output a separate "
                "experiment/reflection file per image (generates a lot of files)."
            ),
        )
        output_logging_dir: str = Field(
            "", description="Directory output log files will be placed"
        )

        # Dispatch settings: dispatch_
        dispatch_index: bool = Field(
            True,
            description=(
                "Attempt to index images. find_spots also needs to be True for "
                "this to work"
            ),
        )
        dispatch_refine: bool = Field(
            False, description="If True, after indexing, refine the experimental models"
        )
        dispatch_integrate: bool = Field(
            True,
            description=(
                "Integrate indexed images. Ignored if index=False or "
                "find_spots=False"
            ),
        )

        # Parallel processing parameters: mp_
        mp_method: str = Field(
            "mpi",  # *multiprocessing sge lsf pbs mpi
            description="The multiprocessing method to use",
        )

        # Spotfinding parameters: spotfinder_
        spotfinder_lookup_mask: Optional[str] = Field(
            None, description="The path to the mask file."
        )
        spotfinder_threshold_dispersion_gain: Optional[float] = Field(
            None,
            description=(
                "Use a flat gain map for the entire detector to act as a "
                "multiplier for the gain set by the format. Cannot be used in "
                "conjunction with lookup.gain_map parameter."
            ),
        )
        spotfinder_threshold_dispersion_sigma_bkgnd: float = Field(
            6,
            description=(
                "The number of standard deviations of the index of dispersion "
                "(variance / mean) in the local area below which the pixel "
                "will be classified as background."
            ),
        )
        spotfinder_threshold_dispersion_sigma_strong: float = Field(
            3,
            description=(
                "The number of standard deviations above the mean in the local "
                "area above which the pixel will be classified as strong."
            ),
        )
        spotfinder_threshold_dispersion_global_threshold: float = Field(
            0,
            description=(
                "The global threshold value. Consider all pixels less than "
                "this value to be part of the background."
            ),
        )
        spotfinder_threshold_dispersion_kernel_size: Tuple[int, int] = Field(
            (6, 6),
            description=(
                "The size of the local area around the spot in which to "
                "calculate the mean and variance. The kernel is given as a box "
                "of size (2 * nx + 1, 2 * ny + 1) centred at the pixel."
            ),
        )
        spotfinder_filter_min_spot_size: Optional[int] = Field(
            3,
            description=(
                "The minimum number of contiguous pixels for a spot to be "
                "accepted by the filtering algorithm."
            ),
        )
        spotfinder_filter_d_min: Optional[float] = Field(
            None,
            description=(
                "The high resolution limit in Angstrom for a pixel to be "
                "accepted by the filtering algorithm."
            ),
        )

        # Indexing parameters: indexing_
        indexing_stills_refine_candidates_with_known_symmetry: bool = Field(
            False,
            description=(
                "If False, when choosing the best set of candidate basis "
                "solutions, refine the candidates in the P1 setting. If True, "
                "after indexing in P1, convert the candidates to the known "
                "symmetry and apply the corresponding change of basis to the "
                "indexed reflections."
            ),
        )
        indexing_stills_refine_all_candidates: bool = Field(
            True,
            description=(
                "If False, no attempt is made to refine the model from initial "
                "basis vector selection. The indexing solution with the best "
                "RMSD is chosen."
            ),
        )
        indexing_known_symmetry_space_group: Optional[str] = Field(
            None, description="Target space group for indexing."
        )
        indexing_known_symmetry_unit_cell: Optional[str] = Field(
            None, description="Target unit cell for indexing."
        )

        # Integration parameters: integration_
        integration_background_simple_outlier_plane_n_sigma: int = Field(
            10,
            description=(
                "The number of standard deviations above the threshold "
                "plane to use in rejecting outliers from background "
                "calculation."
            ),
        )
        integration_summation_detector_gain: float = Field(
            1.0,
            description=(
                "Multiplier for variances after integration of still images. See "
                "Leslie 1999."
            ),
        )

        # Profiling parameters: profile_
        profile_gaussian_rs_centroid_definition: str = Field(
            "com",
            description="The centroid to use as beam divergence (centre of mass or s1)",
        )

        # Refinement options: refinement_
        refinement_reflections_outlier_algorithm: Optional[str] = Field(
            None,
            description=(
                "Outlier rejection algorithm. If auto is selected, the "
                "algorithm is chosen automatically."
            ),
        )

        @validator("output_output_dir", always=True)
        def set_output_dir(cls, output: str, values: Dict[str, Any]) -> str:
            if output == "":
                return os.getenv("LUTE_WORK_DIR", ".")
            return output

        @validator("output_logging_dir", always=True)
        def set_output_log_dir(cls, output: str, values: Dict[str, Any]) -> str:
            if output == "":
                return values["output_output_dir"]
            return output

    _set_phil_template_parameters = template_parameter_validator("phil_parameters")

    executable: str = Field(
        "/sdf/group/lcls/ds/tools/cctbx/conda_base/bin/mpirun",
        description="MPI executable.",
        flag_type="",
    )
    cctbx_executable: str = Field(
        "/sdf/group/lcls/ds/tools/cctbx/build/bin/dials.stills_process",
        description="CCTBX indexing program (DIALS).",
        flag_type="",
    )
    in_file: str = Field(
        "",
        description=(
            "The location of a data specification for LCLS. "
            "This file will be written for you based on the data_spec parameter. "
            "If not running at LCLS, this can be an input file, or a glob."
        ),
        flag_type="",
    )
    data_spec: Optional[Dict[str, Union[str, float, int]]] = Field(
        None,
        description="Provide a CCTBX specification for data access.",
        flag_type="",
    )
    phil_file: str = Field(
        "",
        description="Location of the input settings ('phil') file.",
        flag_type="",
    )
    phil_parameters: Optional[PhilParameters] = Field(
        None,
        description="Optional template parameters to fill in a CCTBX phil file.",
        flag_type="",  # Does nothing since always None by time it's seen by Task
    )
    lute_template_cfg: TemplateConfig = Field(
        TemplateConfig(
            template_name="cctbx_index.phil",
            output_path="",
        ),
        description="Template information for the cctbx_index file.",
    )

    @validator("phil_file", always=True)
    def set_default_phil_path(cls, phil_file: str, values: Dict[str, Any]) -> str:
        if phil_file == "":
            return f"{values['lute_config'].work_dir}/cctbx_index.phil"
        return phil_file

    @validator("lute_template_cfg", always=True)
    def set_phil_template_path(
        cls, lute_template_cfg: TemplateConfig, values: Dict[str, Any]
    ) -> TemplateConfig:
        if lute_template_cfg.output_path == "":
            lute_template_cfg.output_path = values["phil_file"]
        return lute_template_cfg

    @validator("in_file", always=True)
    def set_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
        if in_file == "":
            exp: str = values["lute_config"].experiment
            run: str = str(values["lute_config"].run)
            work_dir: str = values["lute_config"].work_dir
            return f"{work_dir}/data_{exp}_{run}.loc"
        return in_file

    @validator("data_spec", always=True)
    def write_data_spec_file(
        cls,
        data_spec: Optional[Dict[str, Union[str, float, int]]],
        values: Dict[str, Any],
    ) -> None:
        if data_spec is not None:
            with open(values["in_file"], "w") as f:
                for key, value in data_spec.items():
                    spec_line: str = f"{key}={value}\n"
                    f.write(spec_line)
        return None

Config

Bases: Config

Source code in lute/io/models/sfx_index.py
513
514
515
class Config(ThirdPartyParameters.Config):
    set_result: bool = False
    """Whether the Executor should mark a specified parameter as a result."""

set_result: bool = False class-attribute instance-attribute

Whether the Executor should mark a specified parameter as a result.

PhilParameters

Bases: BaseModel

Template parameters for CCTBX phil file.

Source code in lute/io/models/sfx_index.py
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
class PhilParameters(BaseModel):
    """Template parameters for CCTBX phil file."""

    class Config(BaseModel.Config):
        extra: str = "allow"

    # Generic input settings: input_
    input_reference_geometry: Optional[str] = Field(
        None,
        description=(
            "Provide an models.expt file with exactly one detector model. Data "
            "processing will use that geometry instead of the geometry found "
            "in the image headers."
        ),
    )

    # Generic geometry: geometry_
    geometry_detector_panel_origin: Optional[Tuple[float, float, float]] = Field(
        None,
        description="Override the panel origin. Requires fast_axis and slow_axis.",
    )

    # Generic output settings: output_
    output_output_dir: str = Field(
        "",
        description="Directory output files will be placed",
    )
    output_composite_output: bool = Field(
        True,
        description=(
            "If True, save one set of experiment/reflection files per process, "
            "where each is a concatenated list of all the successful events "
            "examined by that process. If False, output a separate "
            "experiment/reflection file per image (generates a lot of files)."
        ),
    )
    output_logging_dir: str = Field(
        "", description="Directory output log files will be placed"
    )

    # Dispatch settings: dispatch_
    dispatch_index: bool = Field(
        True,
        description=(
            "Attempt to index images. find_spots also needs to be True for "
            "this to work"
        ),
    )
    dispatch_refine: bool = Field(
        False, description="If True, after indexing, refine the experimental models"
    )
    dispatch_integrate: bool = Field(
        True,
        description=(
            "Integrate indexed images. Ignored if index=False or "
            "find_spots=False"
        ),
    )

    # Parallel processing parameters: mp_
    mp_method: str = Field(
        "mpi",  # *multiprocessing sge lsf pbs mpi
        description="The multiprocessing method to use",
    )

    # Spotfinding parameters: spotfinder_
    spotfinder_lookup_mask: Optional[str] = Field(
        None, description="The path to the mask file."
    )
    spotfinder_threshold_dispersion_gain: Optional[float] = Field(
        None,
        description=(
            "Use a flat gain map for the entire detector to act as a "
            "multiplier for the gain set by the format. Cannot be used in "
            "conjunction with lookup.gain_map parameter."
        ),
    )
    spotfinder_threshold_dispersion_sigma_bkgnd: float = Field(
        6,
        description=(
            "The number of standard deviations of the index of dispersion "
            "(variance / mean) in the local area below which the pixel "
            "will be classified as background."
        ),
    )
    spotfinder_threshold_dispersion_sigma_strong: float = Field(
        3,
        description=(
            "The number of standard deviations above the mean in the local "
            "area above which the pixel will be classified as strong."
        ),
    )
    spotfinder_threshold_dispersion_global_threshold: float = Field(
        0,
        description=(
            "The global threshold value. Consider all pixels less than "
            "this value to be part of the background."
        ),
    )
    spotfinder_threshold_dispersion_kernel_size: Tuple[int, int] = Field(
        (6, 6),
        description=(
            "The size of the local area around the spot in which to "
            "calculate the mean and variance. The kernel is given as a box "
            "of size (2 * nx + 1, 2 * ny + 1) centred at the pixel."
        ),
    )
    spotfinder_filter_min_spot_size: Optional[int] = Field(
        3,
        description=(
            "The minimum number of contiguous pixels for a spot to be "
            "accepted by the filtering algorithm."
        ),
    )
    spotfinder_filter_d_min: Optional[float] = Field(
        None,
        description=(
            "The high resolution limit in Angstrom for a pixel to be "
            "accepted by the filtering algorithm."
        ),
    )

    # Indexing parameters: indexing_
    indexing_stills_refine_candidates_with_known_symmetry: bool = Field(
        False,
        description=(
            "If False, when choosing the best set of candidate basis "
            "solutions, refine the candidates in the P1 setting. If True, "
            "after indexing in P1, convert the candidates to the known "
            "symmetry and apply the corresponding change of basis to the "
            "indexed reflections."
        ),
    )
    indexing_stills_refine_all_candidates: bool = Field(
        True,
        description=(
            "If False, no attempt is made to refine the model from initial "
            "basis vector selection. The indexing solution with the best "
            "RMSD is chosen."
        ),
    )
    indexing_known_symmetry_space_group: Optional[str] = Field(
        None, description="Target space group for indexing."
    )
    indexing_known_symmetry_unit_cell: Optional[str] = Field(
        None, description="Target unit cell for indexing."
    )

    # Integration parameters: integration_
    integration_background_simple_outlier_plane_n_sigma: int = Field(
        10,
        description=(
            "The number of standard deviations above the threshold "
            "plane to use in rejecting outliers from background "
            "calculation."
        ),
    )
    integration_summation_detector_gain: float = Field(
        1.0,
        description=(
            "Multiplier for variances after integration of still images. See "
            "Leslie 1999."
        ),
    )

    # Profiling parameters: profile_
    profile_gaussian_rs_centroid_definition: str = Field(
        "com",
        description="The centroid to use as beam divergence (centre of mass or s1)",
    )

    # Refinement options: refinement_
    refinement_reflections_outlier_algorithm: Optional[str] = Field(
        None,
        description=(
            "Outlier rejection algorithm. If auto is selected, the "
            "algorithm is chosen automatically."
        ),
    )

    @validator("output_output_dir", always=True)
    def set_output_dir(cls, output: str, values: Dict[str, Any]) -> str:
        if output == "":
            return os.getenv("LUTE_WORK_DIR", ".")
        return output

    @validator("output_logging_dir", always=True)
    def set_output_log_dir(cls, output: str, values: Dict[str, Any]) -> str:
        if output == "":
            return values["output_output_dir"]
        return output

IndexCrystFELParameters

Bases: ThirdPartyParameters

Parameters for CrystFEL's indexamajig.

There are many parameters, and many combinations. For more information on usage, please refer to the CrystFEL documentation, here: https://www.desy.de/~twhite/crystfel/manual-indexamajig.html

Source code in lute/io/models/sfx_index.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
class IndexCrystFELParameters(ThirdPartyParameters):
    """Parameters for CrystFEL's `indexamajig`.

    There are many parameters, and many combinations. For more information on
    usage, please refer to the CrystFEL documentation, here:
    https://www.desy.de/~twhite/crystfel/manual-indexamajig.html
    """

    class Config(ThirdPartyParameters.Config):
        set_result: bool = True
        """Whether the Executor should mark a specified parameter as a result."""

        long_flags_use_eq: bool = True
        """Whether long command-line arguments are passed like `--long=arg`."""

    executable: str = Field(
        "/sdf/group/lcls/ds/tools/crystfel/0.10.2/bin/indexamajig",
        description="CrystFEL's indexing binary.",
        flag_type="",
    )
    # Basic options
    in_file: Optional[str] = Field(
        "", description="Path to input file.", flag_type="-", rename_param="i"
    )
    out_file: str = Field(
        "",
        description="Path to output file.",
        flag_type="-",
        rename_param="o",
        is_result=True,
    )
    peaks: Optional[str] = Field(
        None,
        description=(
            "Peak finding algorithm, or file type. E.g. peakfinder8 to peak find, "
            "or use cxi for CXI files."
        ),
        flag_type="--",
    )
    geometry: str = Field(
        "", description="Path to geometry file.", flag_type="-", rename_param="g"
    )
    zmq_input: Optional[str] = Field(
        description="ZMQ address to receive data over. `input` and `zmq-input` are mutually exclusive",
        flag_type="--",
        rename_param="zmq-input",
    )
    zmq_subscribe: Optional[str] = Field(  # Can be used multiple times...
        description="Subscribe to ZMQ message of type `tag`",
        flag_type="--",
        rename_param="zmq-subscribe",
    )
    zmq_request: Optional[AnyUrl] = Field(
        description="Request new data over ZMQ by sending this value",
        flag_type="--",
        rename_param="zmq-request",
    )
    asapo_endpoint: Optional[str] = Field(
        description="ASAP::O endpoint. zmq-input and this are mutually exclusive.",
        flag_type="--",
        rename_param="asapo-endpoint",
    )
    asapo_token: Optional[str] = Field(
        description="ASAP::O authentication token.",
        flag_type="--",
        rename_param="asapo-token",
    )
    asapo_beamtime: Optional[str] = Field(
        description="ASAP::O beatime.",
        flag_type="--",
        rename_param="asapo-beamtime",
    )
    asapo_source: Optional[str] = Field(
        description="ASAP::O data source.",
        flag_type="--",
        rename_param="asapo-source",
    )
    asapo_group: Optional[str] = Field(
        description="ASAP::O consumer group.",
        flag_type="--",
        rename_param="asapo-group",
    )
    asapo_stream: Optional[str] = Field(
        description="ASAP::O stream.",
        flag_type="--",
        rename_param="asapo-stream",
    )
    asapo_wait_for_stream: Optional[str] = Field(
        description="If ASAP::O stream does not exist, wait for it to appear.",
        flag_type="--",
        rename_param="asapo-wait-for-stream",
    )
    data_format: Optional[str] = Field(
        description="Specify format for ZMQ or ASAP::O. `msgpack`, `hdf5` or `seedee`.",
        flag_type="--",
        rename_param="data-format",
    )
    basename: bool = Field(
        False,
        description="Remove directory parts of filenames. Acts before prefix if prefix also given.",
        flag_type="--",
    )
    prefix: Optional[str] = Field(
        description="Add a prefix to the filenames from the infile argument.",
        flag_type="--",
        rename_param="asapo-stream",
    )
    nthreads: PositiveInt = Field(
        max(int(os.environ.get("SLURM_NPROCS", len(os.sched_getaffinity(0)))) - 1, 1),
        description="Number of threads to use. See also `max_indexer_threads`.",
        flag_type="-",
        rename_param="j",
    )
    no_check_prefix: bool = Field(
        False,
        description="Don't attempt to correct the prefix if it seems incorrect.",
        flag_type="--",
        rename_param="no-check-prefix",
    )
    highres: Optional[float] = Field(
        description="Mark all pixels greater than `x` has bad.", flag_type="--"
    )
    profile: bool = Field(
        False, description="Display timing data to monitor performance.", flag_type="--"
    )
    temp_dir: Optional[str] = Field(
        description="Specify a path for the temp files folder.",
        flag_type="--",
        rename_param="temp-dir",
    )
    wait_for_file: conint(gt=-2) = Field(
        0,
        description="Wait at most `x` seconds for a file to be created. A value of -1 means wait forever.",
        flag_type="--",
        rename_param="wait-for-file",
    )
    no_image_data: bool = Field(
        False,
        description="Load only the metadata, no iamges. Can check indexability without high data requirements.",
        flag_type="--",
        rename_param="no-image-data",
    )
    # Peak-finding options
    # ....
    # Indexing options
    indexing: Optional[str] = Field(
        description="Comma-separated list of supported indexing algorithms to use. Default is to automatically detect.",
        flag_type="--",
    )
    cell_file: Optional[str] = Field(
        description="Path to a file containing unit cell information (PDB or CrystFEL format).",
        flag_type="-",
        rename_param="p",
    )
    tolerance: str = Field(
        "5,5,5,1.5",
        description=(
            "Tolerances (in percent) for unit cell comparison. "
            "Comma-separated list a,b,c,angle. Default=5,5,5,1.5"
        ),
        flag_type="--",
    )
    no_check_cell: bool = Field(
        False,
        description="Do not check cell parameters against unit cell. Replaces '-raw' method.",
        flag_type="--",
        rename_param="no-check-cell",
    )
    no_check_peaks: bool = Field(
        False,
        description="Do not verify peaks are accounted for by solution.",
        flag_type="--",
        rename_param="no-check-peaks",
    )
    multi: bool = Field(
        False, description="Enable multi-lattice indexing.", flag_type="--"
    )
    wavelength_estimate: Optional[float] = Field(
        description="Estimate for X-ray wavelength. Required for some methods.",
        flag_type="--",
        rename_param="wavelength-estimate",
    )
    camera_length_estimate: Optional[float] = Field(
        description="Estimate for camera distance. Required for some methods.",
        flag_type="--",
        rename_param="camera-length-estimate",
    )
    max_indexer_threads: Optional[PositiveInt] = Field(
        # 1,
        description="Some indexing algos can use multiple threads. In addition to image-based.",
        flag_type="--",
        rename_param="max-indexer-threads",
    )
    no_retry: bool = Field(
        False,
        description="Do not remove weak peaks and try again.",
        flag_type="--",
        rename_param="no-retry",
    )
    no_refine: bool = Field(
        False,
        description="Skip refinement step.",
        flag_type="--",
        rename_param="no-refine",
    )
    no_revalidate: bool = Field(
        False,
        description="Skip revalidation step.",
        flag_type="--",
        rename_param="no-revalidate",
    )
    # TakeTwo specific parameters
    taketwo_member_threshold: Optional[PositiveInt] = Field(
        # 20,
        description="Minimum number of vectors to consider.",
        flag_type="--",
        rename_param="taketwo-member-threshold",
    )
    taketwo_len_tolerance: Optional[PositiveFloat] = Field(
        # 0.001,
        description="TakeTwo length tolerance in Angstroms.",
        flag_type="--",
        rename_param="taketwo-len-tolerance",
    )
    taketwo_angle_tolerance: Optional[PositiveFloat] = Field(
        # 0.6,
        description="TakeTwo angle tolerance in degrees.",
        flag_type="--",
        rename_param="taketwo-angle-tolerance",
    )
    taketwo_trace_tolerance: Optional[PositiveFloat] = Field(
        # 3,
        description="Matrix trace tolerance in degrees.",
        flag_type="--",
        rename_param="taketwo-trace-tolerance",
    )
    # Felix-specific parameters
    # felix_domega
    # felix-fraction-max-visits
    # felix-max-internal-angle
    # felix-max-uniqueness
    # felix-min-completeness
    # felix-min-visits
    # felix-num-voxels
    # felix-sigma
    # felix-tthrange-max
    # felix-tthrange-min
    # XGANDALF-specific parameters
    xgandalf_sampling_pitch: Optional[NonNegativeInt] = Field(
        # 6,
        description="Density of reciprocal space sampling.",
        flag_type="--",
        rename_param="xgandalf-sampling-pitch",
    )
    xgandalf_grad_desc_iterations: Optional[NonNegativeInt] = Field(
        # 4,
        description="Number of gradient descent iterations.",
        flag_type="--",
        rename_param="xgandalf-grad-desc-iterations",
    )
    xgandalf_tolerance: Optional[PositiveFloat] = Field(
        # 0.02,
        description="Relative tolerance of lattice vectors",
        flag_type="--",
        rename_param="xgandalf-tolerance",
    )
    xgandalf_no_deviation_from_provided_cell: Optional[bool] = Field(
        description="Found unit cell must match provided.",
        flag_type="--",
        rename_param="xgandalf-no-deviation-from-provided-cell",
    )
    xgandalf_min_lattice_vector_length: Optional[PositiveFloat] = Field(
        # 30,
        description="Minimum possible lattice length.",
        flag_type="--",
        rename_param="xgandalf-min-lattice-vector-length",
    )
    xgandalf_max_lattice_vector_length: Optional[PositiveFloat] = Field(
        # 250,
        description="Minimum possible lattice length.",
        flag_type="--",
        rename_param="xgandalf-max-lattice-vector-length",
    )
    xgandalf_max_peaks: Optional[PositiveInt] = Field(
        # 250,
        description="Maximum number of peaks to use for indexing.",
        flag_type="--",
        rename_param="xgandalf-max-peaks",
    )
    xgandalf_fast_execution: bool = Field(
        False,
        description="Shortcut to set sampling-pitch=2, and grad-desc-iterations=3.",
        flag_type="--",
        rename_param="xgandalf-fast-execution",
    )
    # pinkIndexer parameters
    # ...
    # asdf_fast: bool = Field(False, description="Enable fast mode for asdf. 3x faster for 7% loss in accuracy.", flag_type="--", rename_param="asdf-fast")
    # Integration parameters
    integration: str = Field(
        "rings-nocen", description="Method for integrating reflections.", flag_type="--"
    )
    fix_profile_radius: Optional[float] = Field(
        description="Fix the profile radius (m^{-1})",
        flag_type="--",
        rename_param="fix-profile-radius",
    )
    fix_divergence: Optional[float] = Field(
        0,
        description="Fix the divergence (rad, full angle).",
        flag_type="--",
        rename_param="fix-divergence",
    )
    int_radius: str = Field(
        "4,5,7",
        description="Inner, middle, and outer radii for 3-ring integration.",
        flag_type="--",
        rename_param="int-radius",
    )
    int_diag: str = Field(
        "none",
        description="Show detailed information on integration when condition is met.",
        flag_type="--",
        rename_param="int-diag",
    )
    push_res: str = Field(
        "infinity",
        description="Integrate `x` higher than apparent resolution limit (nm-1).",
        flag_type="--",
        rename_param="push-res",
    )
    overpredict: bool = Field(
        False,
        description="Over-predict reflections. Maybe useful with post-refinement.",
        flag_type="--",
    )
    cell_parameters_only: bool = Field(
        False, description="Do not predict refletions at all", flag_type="--"
    )
    # Output parameters
    no_non_hits_in_stream: bool = Field(
        False,
        description="Exclude non-hits from the stream file.",
        flag_type="--",
        rename_param="no-non-hits-in-stream",
    )
    copy_hheader: Optional[str] = Field(
        description="Copy information from header in the image to output stream.",
        flag_type="--",
        rename_param="copy-hheader",
    )
    no_peaks_in_stream: bool = Field(
        False,
        description="Do not record peaks in stream file.",
        flag_type="--",
        rename_param="no-peaks-in-stream",
    )
    no_refls_in_stream: bool = Field(
        False,
        description="Do not record reflections in stream.",
        flag_type="--",
        rename_param="no-refls-in-stream",
    )
    serial_offset: Optional[PositiveInt] = Field(
        description="Start numbering at `x` instead of 1.",
        flag_type="--",
        rename_param="serial-offset",
    )
    harvest_file: Optional[str] = Field(
        description="Write parameters to file in JSON format.",
        flag_type="--",
        rename_param="harvest-file",
    )

    @validator("in_file", always=True)
    def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
        if in_file == "":
            filename: Optional[str] = read_latest_db_entry(
                f"{values['lute_config'].work_dir}", "FindPeaksPyAlgos", "out_file"
            )
            if filename is None:
                exp: str = values["lute_config"].experiment
                run: int = int(values["lute_config"].run)
                tag: Optional[str] = read_latest_db_entry(
                    f"{values['lute_config'].work_dir}", "FindPeaksPsocake", "tag"
                )
                out_dir: Optional[str] = read_latest_db_entry(
                    f"{values['lute_config'].work_dir}", "FindPeaksPsocake", "outDir"
                )
                if out_dir is not None:
                    fname: str = f"{out_dir}/{exp}_{run:04d}"
                    if tag is not None:
                        fname = f"{fname}_{tag}"
                    return f"{fname}.lst"
            else:
                return filename
        return in_file

    @validator("out_file", always=True)
    def validate_out_file(cls, out_file: str, values: Dict[str, Any]) -> str:
        if out_file == "":
            expmt: str = values["lute_config"].experiment
            run: int = int(values["lute_config"].run)
            work_dir: str = values["lute_config"].work_dir
            tag: Optional[str] = read_latest_db_entry(
                f"{values['lute_config'].work_dir}", "FindPeaksPyAlgos", "tag"
            )
            if tag is None:
                tag: Optional[str] = read_latest_db_entry(
                    f"{values['lute_config'].work_dir}", "FindPeaksPsocake", "tag"
                )
            fname: str = f"{expmt}_r{run:04d}"
            if tag is not None:
                fname = f"{fname}_{tag}"
            return f"{work_dir}/{fname}.stream"
        return out_file

Config

Bases: Config

Source code in lute/io/models/sfx_index.py
43
44
45
46
47
48
class Config(ThirdPartyParameters.Config):
    set_result: bool = True
    """Whether the Executor should mark a specified parameter as a result."""

    long_flags_use_eq: bool = True
    """Whether long command-line arguments are passed like `--long=arg`."""

long_flags_use_eq: bool = True class-attribute instance-attribute

Whether long command-line arguments are passed like --long=arg.

set_result: bool = True class-attribute instance-attribute

Whether the Executor should mark a specified parameter as a result.

ManipulateHKLParameters

Bases: ThirdPartyParameters

Parameters for CrystFEL's get_hkl for manipulating lists of reflections.

This Task is predominantly used internally to convert hkl to mtz files. Note that performing multiple manipulations is undefined behaviour. Run the Task with multiple configurations in explicit separate steps. For more information on usage, please refer to the CrystFEL documentation, here: https://www.desy.de/~twhite/crystfel/manual-partialator.html

Source code in lute/io/models/sfx_merge.py
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
class ManipulateHKLParameters(ThirdPartyParameters):
    """Parameters for CrystFEL's `get_hkl` for manipulating lists of reflections.

    This Task is predominantly used internally to convert `hkl` to `mtz` files.
    Note that performing multiple manipulations is undefined behaviour. Run
    the Task with multiple configurations in explicit separate steps. For more
    information on usage, please refer to the CrystFEL documentation, here:
    https://www.desy.de/~twhite/crystfel/manual-partialator.html
    """

    class Config(ThirdPartyParameters.Config):
        long_flags_use_eq: bool = True
        """Whether long command-line arguments are passed like `--long=arg`."""

        set_result: bool = True
        """Whether the Executor should mark a specified parameter as a result."""

    executable: str = Field(
        "/sdf/group/lcls/ds/tools/crystfel/0.10.2/bin/get_hkl",
        description="CrystFEL's reflection manipulation binary.",
        flag_type="",
    )
    in_file: str = Field(
        "",
        description="Path to input HKL file.",
        flag_type="-",
        rename_param="i",
    )
    out_file: str = Field(
        "",
        description="Path to output file.",
        flag_type="-",
        rename_param="o",
        is_result=True,
    )
    cell_file: str = Field(
        "",
        description="Path to a file containing unit cell information (PDB or CrystFEL format).",
        flag_type="-",
        rename_param="p",
    )
    output_format: str = Field(
        "mtz",
        description="Output format. One of mtz, mtz-bij, or xds. Otherwise CrystFEL format.",
        flag_type="--",
        rename_param="output-format",
    )
    expand: Optional[str] = Field(
        description="Reflections will be expanded to fill asymmetric unit of specified point group.",
        flag_type="--",
    )
    # Reducing reflections to higher symmetry
    twin: Optional[str] = Field(
        description="Reflections equivalent to specified point group will have intensities summed.",
        flag_type="--",
    )
    no_need_all_parts: Optional[bool] = Field(
        description="Use with --twin to allow reflections missing a 'twin mate' to be written out.",
        flag_type="--",
        rename_param="no-need-all-parts",
    )
    # Noise - Add to data
    noise: Optional[bool] = Field(
        description="Generate 10% uniform noise.", flag_type="--"
    )
    poisson: Optional[bool] = Field(
        description="Generate Poisson noise. Intensities assumed to be A.U.",
        flag_type="--",
    )
    adu_per_photon: Optional[int] = Field(
        description="Use with --poisson to convert A.U. to photons.",
        flag_type="--",
        rename_param="adu-per-photon",
    )
    # Remove duplicate reflections
    trim_centrics: Optional[bool] = Field(
        description="Duplicated reflections (according to symmetry) are removed.",
        flag_type="--",
    )
    # Restrict to template file
    template: Optional[str] = Field(
        description="Only reflections which also appear in specified file are written out.",
        flag_type="--",
    )
    # Multiplicity
    multiplicity: Optional[bool] = Field(
        description="Reflections are multiplied by their symmetric multiplicites.",
        flag_type="--",
    )
    # Resolution cutoffs
    cutoff_angstroms: Optional[Union[str, int, float]] = Field(
        description=(
            "Either n, or n1,n2,n3. For n, reflections < n are removed. "
            "For n1,n2,n3 anisotropic trunction performed at separate resolution "
            "limits for a*, b*, c*."
        ),
        flag_type="--",
        rename_param="cutoff-angstroms",
    )
    lowres: Optional[float] = Field(
        description="Remove reflections with d > n", flag_type="--"
    )
    highres: Optional[float] = Field(
        description="Synonym for first form of --cutoff-angstroms"
    )
    reindex: Optional[str] = Field(
        description="Reindex according to specified operator. E.g. k,h,-l.",
        flag_type="--",
    )
    # Override input symmetry
    symmetry: Optional[str] = Field(
        description="Point group symmetry to use to override. Almost always OMIT this option.",
        flag_type="--",
    )

    @validator("in_file", always=True)
    def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
        if in_file == "":
            partialator_file: Optional[str] = read_latest_db_entry(
                f"{values['lute_config'].work_dir}", "MergePartialator", "out_file"
            )
            if partialator_file:
                return partialator_file
        return in_file

    @validator("out_file", always=True)
    def validate_out_file(cls, out_file: str, values: Dict[str, Any]) -> str:
        if out_file == "":
            partialator_file: Optional[str] = read_latest_db_entry(
                f"{values['lute_config'].work_dir}", "MergePartialator", "out_file"
            )
            if partialator_file:
                mtz_out: str = partialator_file.split(".")[0]
                mtz_out = f"{mtz_out}.mtz"
                return mtz_out
        return out_file

    @validator("cell_file", always=True)
    def validate_cell_file(cls, cell_file: str, values: Dict[str, Any]) -> str:
        if cell_file == "":
            idx_cell_file: Optional[str] = read_latest_db_entry(
                f"{values['lute_config'].work_dir}",
                "IndexCrystFEL",
                "cell_file",
                valid_only=False,
            )
            if idx_cell_file:
                return idx_cell_file
        return cell_file

Config

Bases: Config

Source code in lute/io/models/sfx_merge.py
524
525
526
527
528
529
class Config(ThirdPartyParameters.Config):
    long_flags_use_eq: bool = True
    """Whether long command-line arguments are passed like `--long=arg`."""

    set_result: bool = True
    """Whether the Executor should mark a specified parameter as a result."""

long_flags_use_eq: bool = True class-attribute instance-attribute

Whether long command-line arguments are passed like --long=arg.

set_result: bool = True class-attribute instance-attribute

Whether the Executor should mark a specified parameter as a result.

MergeCCTBXXFELParameters

Bases: ThirdPartyParameters

Parameters for merging with cctbx.xfel.

Source code in lute/io/models/sfx_merge.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
class MergeCCTBXXFELParameters(ThirdPartyParameters):
    """Parameters for merging with cctbx.xfel."""

    class Config(ThirdPartyParameters.Config):
        set_result: bool = False
        """Whether the Executor should mark a specified parameter as a result."""

    class PhilParameters(BaseModel):
        """Template parameters for CCTBX phil file."""

        class Config(BaseModel.Config):
            extra: str = "allow"

        # Generic input settings: input_
        input_path: str = Field(
            "",
            description="Input file(s).",
        )
        input_experiments_suffix: str = Field(
            "_integrated.expt", description="Suffix appened to experiments."
        )
        input_reflections_suffix: str = Field(
            "_integrated.refl", description="Suffix appened to experiments."
        )
        input_parallel_file_load_method: str = Field(
            "uniform",  # *uniform node_memory
            description="Parallel file loading method.",
        )

        # Filtering settings: filter_
        filter_algorithm: str = Field(
            "unit_cell",  # n_obs reindex resolution unit_cell report
            description="",
        )
        filter_unit_cell_algorithm: str = Field(
            "cluster", description=""  # range *value cluster
        )
        filter_unit_cell_cluster_covariance_file: str = Field(
            "",  # $MODULES/$COV?
            description="",
        )
        filter_unit_cell_cluster_covariance_component: int = Field(
            0,
            description="",
        )
        filter_unit_cell_cluster_covariance_mahalanobis: float = Field(
            5.0,
            description="",
        )
        filter_outlier_min_corr: float = Field(
            -1.0,
            description="",
        )

        # Selection settings: select_
        select_algorithm: str = Field(
            "significance_filter",
            description="",
        )
        select_significance_filter_sigma: float = Field(
            0.1,
            description="",
        )

        # Scaling settings: scaling_
        scaling_model: str = Field(
            "",  # $MODULES/$COV?
            description="",
        )
        scaling_resolution_scalar: float = Field(
            0.993420862158964,
            description="",
        )

        # Post-refinement: postrefinement_
        postrefinement_enable: bool = Field(
            True, description="Enable post-refinement processing?"
        )
        postrefinement_algorithm: str = Field("rs", description="")

        # Merging: merging_
        merging_d_min: int = Field(
            3,  # What's a good default?
            description="",
        )
        merging_merge_anomalous: bool = Field(False, description="")
        merging_set_average_unit_cell: bool = Field(True, description="")
        merging_error_model: str = Field(
            "ev11", description=""  # ha14 *ev11 mm24 errors_from_sample_residuals
        )

        # Statistics: statistics_
        statistics_n_bins: int = Field(20, description="")
        statistics_report_ML: bool = Field(True, description="")
        statistics_cciso_mtz_file: str = Field(
            "",  # $H5_SIM_PATH/ground_truth.mtz
            description="",
        )
        statistics_cciso_mtz_column_F: str = Field("F", description="")

        # Output settings: output_
        output_prefix: str = Field("", description="")
        output_output_dir: str = Field(
            "",
            description="",
        )
        output_tmp_dir: str = Field(
            "",
            description="",
        )
        output_do_timing: bool = Field(True, description="")
        output_log_level: int = Field(0, description="")
        output_save_experiments_and_reflections: bool = Field(True, description="")

        # Parallel processing settings: parallel_
        parallel_a2a: int = Field(1, description="")

    _set_phil_template_parameters = template_parameter_validator("phil_parameters")

    executable: str = Field(
        "/sdf/group/lcls/ds/tools/cctbx/conda_base/bin/mpirun",
        description="MPI executable.",
        flag_type="",
    )
    cctbx_executable: str = Field(
        "/sdf/group/lcls/ds/tools/cctbx/build/bin/cctbx.xfel.merge",
        description="CCTBX merge program.",
        flag_type="",
    )
    phil_file: str = Field(
        "",
        description="Location of the input settings ('phil') file.",
        flag_type="",
    )
    phil_parameters: Optional[PhilParameters] = Field(
        None,
        description="Optional template parameters to fill in a CCTBX phil file.",
        flag_type="",  # Does nothing since always None by time it's seen by Task
    )
    lute_template_cfg: TemplateConfig = Field(
        TemplateConfig(
            template_name="cctbx_merge.phil",
            output_path="",
        ),
        description="Template information for the cctbx_merge file.",
    )

    @validator("phil_file", always=True)
    def set_default_phil_path(cls, phil_file: str, values: Dict[str, Any]) -> str:
        if phil_file == "":
            return f"{values['lute_config'].work_dir}/cctbx_merge.phil"
        return phil_file

    @validator("lute_template_cfg", always=True)
    def set_phil_template_path(
        cls, lute_template_cfg: TemplateConfig, values: Dict[str, Any]
    ) -> TemplateConfig:
        if lute_template_cfg.output_path == "":
            lute_template_cfg.output_path = values["phil_file"]
        return lute_template_cfg

Config

Bases: Config

Source code in lute/io/models/sfx_merge.py
222
223
224
class Config(ThirdPartyParameters.Config):
    set_result: bool = False
    """Whether the Executor should mark a specified parameter as a result."""

set_result: bool = False class-attribute instance-attribute

Whether the Executor should mark a specified parameter as a result.

PhilParameters

Bases: BaseModel

Template parameters for CCTBX phil file.

Source code in lute/io/models/sfx_merge.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
class PhilParameters(BaseModel):
    """Template parameters for CCTBX phil file."""

    class Config(BaseModel.Config):
        extra: str = "allow"

    # Generic input settings: input_
    input_path: str = Field(
        "",
        description="Input file(s).",
    )
    input_experiments_suffix: str = Field(
        "_integrated.expt", description="Suffix appened to experiments."
    )
    input_reflections_suffix: str = Field(
        "_integrated.refl", description="Suffix appened to experiments."
    )
    input_parallel_file_load_method: str = Field(
        "uniform",  # *uniform node_memory
        description="Parallel file loading method.",
    )

    # Filtering settings: filter_
    filter_algorithm: str = Field(
        "unit_cell",  # n_obs reindex resolution unit_cell report
        description="",
    )
    filter_unit_cell_algorithm: str = Field(
        "cluster", description=""  # range *value cluster
    )
    filter_unit_cell_cluster_covariance_file: str = Field(
        "",  # $MODULES/$COV?
        description="",
    )
    filter_unit_cell_cluster_covariance_component: int = Field(
        0,
        description="",
    )
    filter_unit_cell_cluster_covariance_mahalanobis: float = Field(
        5.0,
        description="",
    )
    filter_outlier_min_corr: float = Field(
        -1.0,
        description="",
    )

    # Selection settings: select_
    select_algorithm: str = Field(
        "significance_filter",
        description="",
    )
    select_significance_filter_sigma: float = Field(
        0.1,
        description="",
    )

    # Scaling settings: scaling_
    scaling_model: str = Field(
        "",  # $MODULES/$COV?
        description="",
    )
    scaling_resolution_scalar: float = Field(
        0.993420862158964,
        description="",
    )

    # Post-refinement: postrefinement_
    postrefinement_enable: bool = Field(
        True, description="Enable post-refinement processing?"
    )
    postrefinement_algorithm: str = Field("rs", description="")

    # Merging: merging_
    merging_d_min: int = Field(
        3,  # What's a good default?
        description="",
    )
    merging_merge_anomalous: bool = Field(False, description="")
    merging_set_average_unit_cell: bool = Field(True, description="")
    merging_error_model: str = Field(
        "ev11", description=""  # ha14 *ev11 mm24 errors_from_sample_residuals
    )

    # Statistics: statistics_
    statistics_n_bins: int = Field(20, description="")
    statistics_report_ML: bool = Field(True, description="")
    statistics_cciso_mtz_file: str = Field(
        "",  # $H5_SIM_PATH/ground_truth.mtz
        description="",
    )
    statistics_cciso_mtz_column_F: str = Field("F", description="")

    # Output settings: output_
    output_prefix: str = Field("", description="")
    output_output_dir: str = Field(
        "",
        description="",
    )
    output_tmp_dir: str = Field(
        "",
        description="",
    )
    output_do_timing: bool = Field(True, description="")
    output_log_level: int = Field(0, description="")
    output_save_experiments_and_reflections: bool = Field(True, description="")

    # Parallel processing settings: parallel_
    parallel_a2a: int = Field(1, description="")

MergePartialatorParameters

Bases: ThirdPartyParameters

Parameters for CrystFEL's partialator.

There are many parameters, and many combinations. For more information on usage, please refer to the CrystFEL documentation, here: https://www.desy.de/~twhite/crystfel/manual-partialator.html

Source code in lute/io/models/sfx_merge.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
class MergePartialatorParameters(ThirdPartyParameters):
    """Parameters for CrystFEL's `partialator`.

    There are many parameters, and many combinations. For more information on
    usage, please refer to the CrystFEL documentation, here:
    https://www.desy.de/~twhite/crystfel/manual-partialator.html
    """

    class Config(ThirdPartyParameters.Config):
        long_flags_use_eq: bool = True
        """Whether long command-line arguments are passed like `--long=arg`."""

        set_result: bool = True
        """Whether the Executor should mark a specified parameter as a result."""

    executable: str = Field(
        "/sdf/group/lcls/ds/tools/crystfel/0.10.2/bin/partialator",
        description="CrystFEL's Partialator binary.",
        flag_type="",
    )
    in_file: Optional[str] = Field(
        "", description="Path to input stream.", flag_type="-", rename_param="i"
    )
    out_file: str = Field(
        "",
        description="Path to output file.",
        flag_type="-",
        rename_param="o",
        is_result=True,
    )
    symmetry: str = Field(description="Point group symmetry.", flag_type="--")
    niter: Optional[int] = Field(
        description="Number of cycles of scaling and post-refinement.",
        flag_type="-",
        rename_param="n",
    )
    no_scale: Optional[bool] = Field(
        description="Disable scaling.", flag_type="--", rename_param="no-scale"
    )
    no_Bscale: Optional[bool] = Field(
        description="Disable Debye-Waller part of scaling.",
        flag_type="--",
        rename_param="no-Bscale",
    )
    no_pr: Optional[bool] = Field(
        description="Disable orientation model.", flag_type="--", rename_param="no-pr"
    )
    no_deltacchalf: Optional[bool] = Field(
        description="Disable rejection based on deltaCC1/2.",
        flag_type="--",
        rename_param="no-deltacchalf",
    )
    model: str = Field(
        "unity",
        description="Partiality model. Options: xsphere, unity, offset, ggpm.",
        flag_type="--",
    )
    nthreads: int = Field(
        max(int(os.environ.get("SLURM_NPROCS", len(os.sched_getaffinity(0)))) - 1, 1),
        description="Number of parallel analyses.",
        flag_type="-",
        rename_param="j",
    )
    polarisation: Optional[str] = Field(
        description="Specification of incident polarisation. Refer to CrystFEL docs for more info.",
        flag_type="--",
    )
    no_polarisation: Optional[bool] = Field(
        description="Synonym for --polarisation=none",
        flag_type="--",
        rename_param="no-polarisation",
    )
    max_adu: Optional[float] = Field(
        description="Maximum intensity of reflection to include.",
        flag_type="--",
        rename_param="max-adu",
    )
    min_res: Optional[float] = Field(
        description="Only include crystals diffracting to a minimum resolution.",
        flag_type="--",
        rename_param="min-res",
    )
    min_measurements: int = Field(
        2,
        description="Include a reflection only if it appears a minimum number of times.",
        flag_type="--",
        rename_param="min-measurements",
    )
    push_res: Optional[float] = Field(
        description="Merge reflections up to higher than the apparent resolution limit.",
        flag_type="--",
        rename_param="push-res",
    )
    start_after: int = Field(
        0,
        description="Ignore the first n crystals.",
        flag_type="--",
        rename_param="start-after",
    )
    stop_after: int = Field(
        0,
        description="Stop after processing n crystals. 0 means process all.",
        flag_type="--",
        rename_param="stop-after",
    )
    no_free: Optional[bool] = Field(
        description="Disable cross-validation. Testing ONLY.",
        flag_type="--",
        rename_param="no-free",
    )
    custom_split: Optional[str] = Field(
        description="Read a set of filenames, event and dataset IDs from a filename.",
        flag_type="--",
        rename_param="custom-split",
    )
    max_rel_B: float = Field(
        100,
        description="Reject crystals if |relB| > n sq Angstroms.",
        flag_type="--",
        rename_param="max-rel-B",
    )
    output_every_cycle: bool = Field(
        False,
        description="Write per-crystal params after every refinement cycle.",
        flag_type="--",
        rename_param="output-every-cycle",
    )
    no_logs: bool = Field(
        False,
        description="Do not write logs needed for plots, maps and graphs.",
        flag_type="--",
        rename_param="no-logs",
    )
    set_symmetry: Optional[str] = Field(
        description="Set the apparent symmetry of the crystals to a point group.",
        flag_type="-",
        rename_param="w",
    )
    operator: Optional[str] = Field(
        description="Specify an ambiguity operator. E.g. k,h,-l.", flag_type="--"
    )
    force_bandwidth: Optional[float] = Field(
        description="Set X-ray bandwidth. As percent, e.g. 0.0013 (0.13%).",
        flag_type="--",
        rename_param="force-bandwidth",
    )
    force_radius: Optional[float] = Field(
        description="Set the initial profile radius (nm-1).",
        flag_type="--",
        rename_param="force-radius",
    )
    force_lambda: Optional[float] = Field(
        description="Set the wavelength. In Angstroms.",
        flag_type="--",
        rename_param="force-lambda",
    )
    harvest_file: Optional[str] = Field(
        description="Write parameters to file in JSON format.",
        flag_type="--",
        rename_param="harvest-file",
    )

    @validator("in_file", always=True)
    def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
        if in_file == "":
            stream_file: Optional[str] = read_latest_db_entry(
                f"{values['lute_config'].work_dir}",
                "ConcatenateStreamFiles",
                "out_file",
            )
            if stream_file:
                return stream_file
        return in_file

    @validator("out_file", always=True)
    def validate_out_file(cls, out_file: str, values: Dict[str, Any]) -> str:
        if out_file == "":
            in_file: str = values["in_file"]
            if in_file:
                tag: str = in_file.split(".")[0]
                return f"{tag}.hkl"
            else:
                return "partialator.hkl"
        return out_file

Config

Bases: Config

Source code in lute/io/models/sfx_merge.py
41
42
43
44
45
46
class Config(ThirdPartyParameters.Config):
    long_flags_use_eq: bool = True
    """Whether long command-line arguments are passed like `--long=arg`."""

    set_result: bool = True
    """Whether the Executor should mark a specified parameter as a result."""

long_flags_use_eq: bool = True class-attribute instance-attribute

Whether long command-line arguments are passed like --long=arg.

set_result: bool = True class-attribute instance-attribute

Whether the Executor should mark a specified parameter as a result.

OptimizeAgBhGeometryExhaustiveParameters

Bases: TaskParameters

TaskParameter model for OptimizeAgBhGeometryExhaustive Task.

This Task does geometry optimization of detector distance and beam center based on a powder image produced from acquiring a run of Ag Behenate.

Source code in lute/io/models/geometry.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class OptimizeAgBhGeometryExhaustiveParameters(TaskParameters):
    """TaskParameter model for OptimizeAgBhGeometryExhaustive Task.

    This Task does geometry optimization of detector distance and beam center
    based on a powder image produced from acquiring a run of Ag Behenate.
    """

    _find_smd_path = validate_smd_path("powder")

    detname: str = Field(description="Name of the detector to optimize geometry for.")

    powder: str = Field(
        "", description="Path to the powder image, or file containing it."
    )

    geom_out_dir: str = Field(
        "", description="Directory to write new geometry files to."
    )

    mask: Optional[str] = Field(
        None, description="Path to a detector mask, or file containing it."
    )

    n_peaks: int = Field(4, description="")

    n_iterations: int = Field(
        5, description="Number of optimization iterations. Per MPI rank."
    )

    threshold: float = Field(
        1e6,
        description=(
            "Pixels in the powder image with an intensity above this threshold "
            "are set to 0."
        ),
    )

    dx: Tuple[float, float, int] = Field(
        (-6, 6, 5),
        description=(
            "Defines the search radius for beam center x position as offsets from "
            "the image center. Format: (left, right, num_steps). In units of pixels."
        ),
    )

    dy: Tuple[float, float, int] = Field(
        (-6, 6, 5),
        description=(
            "Defines the search radius for beam center y position as offsets from "
            "the image center. Format: (up, down, num_steps). In units of pixels."
        ),
    )

    center_guess: Optional[Tuple[float, float]] = Field(
        None, description=("Provide an optional starting guess for the beam center.")
    )

    distance_guess: Optional[float] = Field(
        None,
        description="Provide an optional starting guess for the detector distance (mm).",
    )

RunSHELXCParameters

Bases: ThirdPartyParameters

Parameters for CCP4's SHELXC program.

SHELXC prepares files for SHELXD and SHELXE.

For more information please refer to the official documentation: https://www.ccp4.ac.uk/html/crank.html

Source code in lute/io/models/sfx_solve.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
class RunSHELXCParameters(ThirdPartyParameters):
    """Parameters for CCP4's SHELXC program.

    SHELXC prepares files for SHELXD and SHELXE.

    For more information please refer to the official documentation:
    https://www.ccp4.ac.uk/html/crank.html
    """

    executable: str = Field(
        "/sdf/group/lcls/ds/tools/ccp4-8.0/bin/shelxc",
        description="CCP4 SHELXC. Generates input files for SHELXD/SHELXE.",
        flag_type="",
    )
    placeholder: str = Field(
        "xx", description="Placeholder filename stem.", flag_type=""
    )
    in_file: str = Field(
        "",
        description="Input file for SHELXC with reflections AND proper records.",
        flag_type="",
    )

    @validator("in_file", always=True)
    def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
        if in_file == "":
            # get_hkl needed to be run to produce an XDS format file...
            xds_format_file: Optional[str] = read_latest_db_entry(
                f"{values['lute_config'].work_dir}", "ManipulateHKL", "out_file"
            )
            if xds_format_file:
                in_file = xds_format_file
        if in_file[0] != "<":
            # Need to add a redirection for this program
            # Runs like `shelxc xx <input_file.xds`
            in_file = f"<{in_file}"
        return in_file

SubmitSMDParameters

Bases: ThirdPartyParameters

Parameters for running smalldata to produce reduced HDF5 files.

Source code in lute/io/models/smd.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
class SubmitSMDParameters(ThirdPartyParameters):
    """Parameters for running smalldata to produce reduced HDF5 files."""

    class Config(ThirdPartyParameters.Config):
        """Identical to super-class Config but includes a result."""

        set_result: bool = True
        """Whether the Executor should mark a specified parameter as a result."""

        result_from_params: str = ""
        """Defines a result from the parameters. Use a validator to do so."""

    class ProducerParameters(BaseModel):
        class ROIParams(BaseModel):
            ROIs: List[List[List[int]]] = Field(
                description="Definition of ROIs, can define multiple."
            )

            writeArea: bool = Field(
                False, description="Whether to write out the area image of the ROI."
            )

            thresADU: Optional[float] = Field(
                None, description="Optional threshold on ADU."
            )

        class AzIntParams(BaseModel):
            eBeam: float = Field(description="Beam energy in keV.")

            center: List[float] = Field(description="Beam center in micrometers")

            dis_to_sam: float = Field(description="Detector distance in millimeters.")

            tx: float = Field(0, description="Tilt in x, degrees")

            ty: float = Field(0, description="Tilt in y, degress")

        class AzIntPyFAIParams(BaseModel):
            class AiKwargs(BaseModel):
                dist: float = Field(description="Detector distance.")
                poni1: float = Field(description="First poni.")
                poni2: float = Field(description="Second poni.")

            poni_file: Optional[str] = Field(
                None,
                description="Path to a poni file. Must provide poni_file or ai_kwargs.",
            )

            ai_kwargs: Optional[AiKwargs] = Field(
                None, description="Integration paramters if not using a poni file."
            )

            npts: int = Field(512, description="Number of q points/bins.")

            npts_az: int = Field(13, description="Number of phi bins.")

            int_units: str = Field("2th_deg", description="Integration units")

            return2d: bool = Field(
                False, description="Whether to return the 2D q/phi integration."
            )

        class PhotonParams(BaseModel):
            ADU_per_photon: float = Field(9.5, description="Number of ADU per photon.")

            thresADU: float = Field(
                0.8, description="Threshold in fraction of ADU_per_photon."
            )

        class DropletParams(BaseModel):
            name: str = Field(
                "droplet", description="HDF5 key name for storing droplet data."
            )

            # mask: Optional[np.ndarray] = Field(None, description="Optionally pass a separate mask.")

            threshold: float = Field(
                5,
                description="Threshold for pixel to be part of a droplet. Sigma or ADU depending on useRms.",
            )

            thresholdLow: float = Field(
                5, description="Lower threshold to make spectrum sharper."
            )

            thresADU: float = Field(
                60, description="Threshold on droplet ADU. Rejects droplets below this."
            )

            useRms: bool = Field(
                True,
                description="If True, threshold/thresholdLow are RMS of data, otherwise in ADU.",
            )

            nData: Optional[int] = Field(1e5, description="(float,int or None).")

            relabel: bool = Field(
                True, description="After initial finding, relabel image."
            )

        class Droplet2PhotonParams(BaseModel):
            class DropletParams(BaseModel):
                threshold: float = Field(
                    5,
                    description="Threshold for pixel to be part of a droplet. Sigma or ADU depending on useRms.",
                )

                thresholdLow: float = Field(
                    5, description="Lower threshold to make spectrum sharper."
                )

                thresADU: float = Field(
                    60,
                    description="Threshold on droplet ADU. Rejects droplets below this.",
                )

                useRms: bool = Field(
                    True,
                    description="If True, threshold/thresholdLow are RMS of data, otherwise in ADU.",
                )

            droplet: DropletParams = Field(
                DropletParams(), description="Droplet finding parameters."
            )

            aduspphot: int = Field(162, description="")

            cputime: bool = Field(True, description="")

            nData: float = Field(3e4, description="")

        class SvdParams(BaseModel):
            name: str = Field("svdFit", description="DetObject name.")

            n_components: int = Field(
                2, description="Number of components to use. Max is 25."
            )

            basis_file: Optional[str] = Field(None, description="")

            n_pulse: int = Field(1, description="Number of pulses to fit.")

            delay: Optional[List[float]] = Field(
                [0], description="Delay between pulses."
            )

            mode: str = Field(
                "max",
                description="Method to calculate pulse amplitudes. max, norm, or both.",
            )
            return_reconstructed: bool = Field(
                False, description="Return the reconstructed waveforms."
            )

        class AutocorrParams(BaseModel):
            class IlluminationParams(BaseModel):
                correction: str = Field(
                    description="Path to correction arrays. One per mask/ROI."
                )

                kernel: int = Field(
                    description="Kernel size used in the creation of the correction."
                )

            name: str = Field("autocorr", description="DetObject name.")

            threshADU: List[float] = Field(
                [-1e6, 1e6], description="Low and high pixel intensity thresholds."
            )

            mask: Optional[str] = Field(
                None, description="Mask to define a non-rectangular ROI."
            )

            save_lineout: bool = Field(
                False,
                description="Save autocorr image or only vertical/horizontal lineouts.",
            )

            save_range: Tuple[int, int] = Field(
                (
                    50,
                    50,
                ),
                description="Size of the autocorr image to save.",
            )

            illumination_correction: Optional[IlluminationParams] = Field(
                None, description="Corrections for each mask/ROI."
            )

        detnames: Optional[List[str]] = Field(
            None, description="List of detectors to process."
        )

        epicsPV: Optional[List[Union[str, Tuple[str, str]]]] = Field(
            None, description="List of PVs to save once per event."
        )

        epicsOncePV: Optional[List[Union[str, Tuple[str, str]]]] = Field(
            None, description="List of PVs to save once per run."
        )

        ttCalib: Optional[List[float]] = Field(
            None, description="Alternative calibration parameters for the timetool."
        )

        aioParams: Optional[List[List[Union[str, int, float]]]] = Field(
            None,
            description="Save analog inputs and give them nice names. [[inp],['name']]",
        )

        getROIs: Optional[Dict[str, ROIParams]] = Field(
            None, description="Dictionary of ROI parameters by detector."
        )

        getAzIntParams: Optional[Dict[str, AzIntParams]] = Field(
            None,
            description="Dictionary of azimuthal integration parameters by detector.",
        )

        getAzIntPyFAIParams: Optional[Dict[str, AzIntPyFAIParams]] = Field(
            None,
            description="Dictionary of azimuthal integration with PyFAI parameters.",
        )

        getPhotonParams: Optional[Dict[str, PhotonParams]] = Field(
            None,
            description="Dictionary of photon counting parameters by detector.",
        )

        getDropletParams: Optional[Dict[str, DropletParams]] = Field(
            None,
            description="Dictionary of droplet finding parameters by detector.",
        )

        getDroplet2Photons: Optional[Dict[str, Droplet2PhotonParams]] = Field(
            None,
            description="Dictionary of droplet2photon parameters by detector.",
        )

        getSvdParams: Optional[Dict[str, SvdParams]] = Field(
            None,
            description="Dictionary of SVD parameters by detector.",
        )

        getAutocorrParams: Optional[Dict[str, AutocorrParams]] = Field(
            None,
            description="Dictionary of auto-correlation parameters by detector.",
        )

    _set_producer_template_parameters = template_parameter_validator(
        "producer_parameters"
    )

    executable: str = Field("mpirun", description="MPI executable.", flag_type="")
    np: PositiveInt = Field(
        max(int(os.environ.get("SLURM_NPROCS", len(os.sched_getaffinity(0)))) - 1, 1),
        description="Number of processes",
        flag_type="-",
    )
    p_arg1: str = Field(
        "python", description="Executable to run with mpi (i.e. python).", flag_type=""
    )
    u: str = Field(
        "", description="Python option for unbuffered output.", flag_type="-"
    )
    m: str = Field(
        "mpi4py.run",
        description="Python option to execute a module's contents as __main__ module.",
        flag_type="-",
    )
    producer: str = Field(
        "", description="Path to the SmallData producer Python script.", flag_type=""
    )
    run: str = Field(
        os.environ.get("RUN_NUM", ""), description="DAQ Run Number.", flag_type="--"
    )
    experiment: str = Field(
        os.environ.get("EXPERIMENT", ""),
        description="LCLS Experiment Number.",
        flag_type="--",
    )
    stn: NonNegativeInt = Field(0, description="Hutch endstation.", flag_type="--")
    nevents: int = Field(
        int(1e9), description="Number of events to process.", flag_type="--"
    )
    directory: Optional[str] = Field(
        None,
        description="Optional output directory. If None, will be in ${EXP_FOLDER}/hdf5/smalldata.",
        flag_type="--",
    )
    ## Need mechanism to set result_from_param=True ...
    gather_interval: PositiveInt = Field(
        25, description="Number of events to collect at a time.", flag_type="--"
    )
    norecorder: bool = Field(
        False, description="Whether to ignore recorder streams.", flag_type="--"
    )
    url: HttpUrl = Field(
        "https://pswww.slac.stanford.edu/ws-auth/lgbk",
        description="Base URL for eLog posting.",
        flag_type="--",
    )
    epicsAll: bool = Field(
        False,
        description="Whether to store all EPICS PVs. Use with care.",
        flag_type="--",
    )
    full: bool = Field(
        False,
        description="Whether to store all data. Use with EXTRA care.",
        flag_type="--",
    )
    fullSum: bool = Field(
        False,
        description="Whether to store sums for all area detector images.",
        flag_type="--",
    )
    default: bool = Field(
        False,
        description="Whether to store only the default minimal set of data.",
        flag_type="--",
    )
    image: bool = Field(
        False,
        description="Whether to save everything as images. Use with care.",
        flag_type="--",
    )
    tiff: bool = Field(
        False,
        description="Whether to save all images as a single TIFF. Use with EXTRA care.",
        flag_type="--",
    )
    centerpix: bool = Field(
        False,
        description="Whether to mask center pixels for Epix10k2M detectors.",
        flag_type="--",
    )
    postRuntable: bool = Field(
        False,
        description="Whether to post run tables. Also used as a trigger for summary jobs.",
        flag_type="--",
    )
    wait: bool = Field(
        False, description="Whether to wait for a file to appear.", flag_type="--"
    )
    xtcav: bool = Field(
        False,
        description="Whether to add XTCAV processing to the HDF5 generation.",
        flag_type="--",
    )
    noarch: bool = Field(
        False, description="Whether to not use archiver data.", flag_type="--"
    )

    lute_template_cfg: TemplateConfig = TemplateConfig(
        template_name="smd_producer_template.py", output_path=""
    )

    producer_parameters: Optional[ProducerParameters] = Field(
        None,
        description="Optional parameters to fill in a producer file.",
        flag_type="",  # Does nothing since always None by time it's seen by Task
    )

    @validator("producer", always=True)
    def validate_producer_path(cls, producer: str, values: Dict[str, Any]) -> str:
        if producer == "":
            exp: str = values["lute_config"].experiment
            hutch: str = exp[:3]
            base_path: str = f"/sdf/data/lcls/ds/{hutch}/{exp}/results/smalldata_tools"
            path: str
            if hutch.lower() in ("cxi", "mec", "mfx", "xcs", "xpp"):
                path = f"{base_path}/lcls1_producers/smd_producer.py"
            else:
                path = f"{base_path}/lcls2_producers/smd_producer.py"
            return path
        return producer

    @validator("lute_template_cfg", always=True)
    def use_producer(
        cls, lute_template_cfg: TemplateConfig, values: Dict[str, Any]
    ) -> TemplateConfig:
        if not lute_template_cfg.output_path:
            lute_template_cfg.output_path = values["producer"]
        return lute_template_cfg

    @root_validator(pre=False)
    def define_result(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        exp: str = values["lute_config"].experiment
        hutch: str = exp[:3]
        run: int = int(values["lute_config"].run)
        directory: Optional[str] = values["directory"]
        if directory is None:
            directory = f"/sdf/data/lcls/ds/{hutch}/{exp}/hdf5/smalldata"
        fname: str = f"{exp}_Run{run:04d}.h5"

        cls.Config.result_from_params = f"{directory}/{fname}"
        return values

Config

Bases: Config

Identical to super-class Config but includes a result.

Source code in lute/io/models/smd.py
49
50
51
52
53
54
55
56
class Config(ThirdPartyParameters.Config):
    """Identical to super-class Config but includes a result."""

    set_result: bool = True
    """Whether the Executor should mark a specified parameter as a result."""

    result_from_params: str = ""
    """Defines a result from the parameters. Use a validator to do so."""

result_from_params: str = '' class-attribute instance-attribute

Defines a result from the parameters. Use a validator to do so.

set_result: bool = True class-attribute instance-attribute

Whether the Executor should mark a specified parameter as a result.

TaskParameters

Bases: BaseSettings

Base class for models of task parameters to be validated.

Parameters are read from a configuration YAML file and validated against subclasses of this type in order to ensure that both all parameters are present, and that the parameters are of the correct type.

Note

Pydantic is used for data validation. Pydantic does not perform "strict" validation by default. Parameter values may be cast to conform with the model specified by the subclass definition if it is possible to do so. Consider whether this may cause issues (e.g. if a float is cast to an int).

Source code in lute/io/models/base.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
class TaskParameters(BaseSettings):
    """Base class for models of task parameters to be validated.

    Parameters are read from a configuration YAML file and validated against
    subclasses of this type in order to ensure that both all parameters are
    present, and that the parameters are of the correct type.

    Note:
        Pydantic is used for data validation. Pydantic does not perform "strict"
        validation by default. Parameter values may be cast to conform with the
        model specified by the subclass definition if it is possible to do so.
        Consider whether this may cause issues (e.g. if a float is cast to an
        int).
    """

    class Config:
        """Configuration for parameters model.

        The Config class holds Pydantic configuration. A number of LUTE-specific
        configuration has also been placed here.

        Attributes:
            env_prefix (str): Pydantic configuration. Will set parameters from
                environment variables containing this prefix. E.g. a model
                parameter `input` can be set with an environment variable:
                `{env_prefix}input`, in LUTE's case `LUTE_input`.

            underscore_attrs_are_private (bool): Pydantic configuration. Whether
                to hide attributes (parameters) prefixed with an underscore.

            copy_on_model_validation (str): Pydantic configuration. How to copy
                the input object passed to the class instance for model
                validation. Set to perform a deep copy.

            allow_inf_nan (bool): Pydantic configuration. Whether to allow
                infinity or NAN in float fields.

            run_directory (Optional[str]): None. If set, it should be a valid
                path. The `Task` will be run from this directory. This may be
                useful for some `Task`s which rely on searching the working
                directory.

            set_result (bool). False. If True, the model has information about
                setting the TaskResult object from the parameters it contains.
                E.g. it has an `output` parameter which is marked as the result.
                The result can be set with a field value of `is_result=True` on
                a specific parameter, or using `result_from_params` and a
                validator.

            result_from_params (Optional[str]): None. Optionally used to define
                results from information available in the model using a custom
                validator. E.g. use a `outdir` and `filename` field to set
                `result_from_params=f"{outdir}/{filename}`, etc. Only used if
                `set_result==True`

            result_summary (Optional[str]): None. Defines a result summary that
                can be known after processing the Pydantic model. Use of summary
                depends on the Executor running the Task. All summaries are
                stored in the database, however. Only used if `set_result==True`

            impl_schemas (Optional[str]). Specifies a the schemas the
                output/results conform to. Only used if `set_result==True`.
        """

        env_prefix = "LUTE_"
        underscore_attrs_are_private: bool = True
        copy_on_model_validation: str = "deep"
        allow_inf_nan: bool = False

        run_directory: Optional[str] = None
        """Set the directory that the Task is run from."""
        set_result: bool = False
        """Whether the Executor should mark a specified parameter as a result."""
        result_from_params: Optional[str] = None
        """Defines a result from the parameters. Use a validator to do so."""
        result_summary: Optional[str] = None
        """Format a TaskResult.summary from output."""
        impl_schemas: Optional[str] = None
        """Schema specification for output result. Will be passed to TaskResult."""

    lute_config: AnalysisHeader

Config

Configuration for parameters model.

The Config class holds Pydantic configuration. A number of LUTE-specific configuration has also been placed here.

Attributes:

Name Type Description
env_prefix str

Pydantic configuration. Will set parameters from environment variables containing this prefix. E.g. a model parameter input can be set with an environment variable: {env_prefix}input, in LUTE's case LUTE_input.

underscore_attrs_are_private bool

Pydantic configuration. Whether to hide attributes (parameters) prefixed with an underscore.

copy_on_model_validation str

Pydantic configuration. How to copy the input object passed to the class instance for model validation. Set to perform a deep copy.

allow_inf_nan bool

Pydantic configuration. Whether to allow infinity or NAN in float fields.

run_directory Optional[str]

None. If set, it should be a valid path. The Task will be run from this directory. This may be useful for some Tasks which rely on searching the working directory.

result_from_params Optional[str]

None. Optionally used to define results from information available in the model using a custom validator. E.g. use a outdir and filename field to set result_from_params=f"{outdir}/{filename}, etc. Only used if set_result==True

result_summary Optional[str]

None. Defines a result summary that can be known after processing the Pydantic model. Use of summary depends on the Executor running the Task. All summaries are stored in the database, however. Only used if set_result==True

Source code in lute/io/models/base.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class Config:
    """Configuration for parameters model.

    The Config class holds Pydantic configuration. A number of LUTE-specific
    configuration has also been placed here.

    Attributes:
        env_prefix (str): Pydantic configuration. Will set parameters from
            environment variables containing this prefix. E.g. a model
            parameter `input` can be set with an environment variable:
            `{env_prefix}input`, in LUTE's case `LUTE_input`.

        underscore_attrs_are_private (bool): Pydantic configuration. Whether
            to hide attributes (parameters) prefixed with an underscore.

        copy_on_model_validation (str): Pydantic configuration. How to copy
            the input object passed to the class instance for model
            validation. Set to perform a deep copy.

        allow_inf_nan (bool): Pydantic configuration. Whether to allow
            infinity or NAN in float fields.

        run_directory (Optional[str]): None. If set, it should be a valid
            path. The `Task` will be run from this directory. This may be
            useful for some `Task`s which rely on searching the working
            directory.

        set_result (bool). False. If True, the model has information about
            setting the TaskResult object from the parameters it contains.
            E.g. it has an `output` parameter which is marked as the result.
            The result can be set with a field value of `is_result=True` on
            a specific parameter, or using `result_from_params` and a
            validator.

        result_from_params (Optional[str]): None. Optionally used to define
            results from information available in the model using a custom
            validator. E.g. use a `outdir` and `filename` field to set
            `result_from_params=f"{outdir}/{filename}`, etc. Only used if
            `set_result==True`

        result_summary (Optional[str]): None. Defines a result summary that
            can be known after processing the Pydantic model. Use of summary
            depends on the Executor running the Task. All summaries are
            stored in the database, however. Only used if `set_result==True`

        impl_schemas (Optional[str]). Specifies a the schemas the
            output/results conform to. Only used if `set_result==True`.
    """

    env_prefix = "LUTE_"
    underscore_attrs_are_private: bool = True
    copy_on_model_validation: str = "deep"
    allow_inf_nan: bool = False

    run_directory: Optional[str] = None
    """Set the directory that the Task is run from."""
    set_result: bool = False
    """Whether the Executor should mark a specified parameter as a result."""
    result_from_params: Optional[str] = None
    """Defines a result from the parameters. Use a validator to do so."""
    result_summary: Optional[str] = None
    """Format a TaskResult.summary from output."""
    impl_schemas: Optional[str] = None
    """Schema specification for output result. Will be passed to TaskResult."""

impl_schemas: Optional[str] = None class-attribute instance-attribute

Schema specification for output result. Will be passed to TaskResult.

result_from_params: Optional[str] = None class-attribute instance-attribute

Defines a result from the parameters. Use a validator to do so.

result_summary: Optional[str] = None class-attribute instance-attribute

Format a TaskResult.summary from output.

run_directory: Optional[str] = None class-attribute instance-attribute

Set the directory that the Task is run from.

set_result: bool = False class-attribute instance-attribute

Whether the Executor should mark a specified parameter as a result.

TemplateConfig

Bases: BaseModel

Parameters used for templating of third party configuration files.

Attributes:

Name Type Description
template_name str

The name of the template to use. This template must live in config/templates.

output_path str

The FULL path, including filename to write the rendered template to.

Source code in lute/io/models/base.py
317
318
319
320
321
322
323
324
325
326
327
328
329
class TemplateConfig(BaseModel):
    """Parameters used for templating of third party configuration files.

    Attributes:
        template_name (str): The name of the template to use. This template must
            live in `config/templates`.

        output_path (str): The FULL path, including filename to write the
            rendered template to.
    """

    template_name: str
    output_path: str

TemplateParameters

Class for representing parameters for third party configuration files.

These parameters can represent arbitrary data types and are used in conjunction with templates for modifying third party configuration files from the single LUTE YAML. Due to the storage of arbitrary data types, and the use of a template file, a single instance of this class can hold from a single template variable to an entire configuration file. The data parsing is done by jinja using the complementary template. All data is stored in the single model variable params.

The pydantic "dataclass" is used over the BaseModel/Settings to allow positional argument instantiation of the params Field.

Source code in lute/io/models/base.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
@dataclass
class TemplateParameters:
    """Class for representing parameters for third party configuration files.

    These parameters can represent arbitrary data types and are used in
    conjunction with templates for modifying third party configuration files
    from the single LUTE YAML. Due to the storage of arbitrary data types, and
    the use of a template file, a single instance of this class can hold from a
    single template variable to an entire configuration file. The data parsing
    is done by jinja using the complementary template.
    All data is stored in the single model variable `params.`

    The pydantic "dataclass" is used over the BaseModel/Settings to allow
    positional argument instantiation of the `params` Field.
    """

    params: Any

TestBinaryErrParameters

Bases: ThirdPartyParameters

Same as TestBinary, but exits with non-zero code.

Source code in lute/io/models/tests.py
71
72
73
74
75
76
77
78
class TestBinaryErrParameters(ThirdPartyParameters):
    """Same as TestBinary, but exits with non-zero code."""

    executable: str = Field(
        "/sdf/home/d/dorlhiac/test_tasks/test_threads_err",
        description="Multi-threaded tes tbinary with non-zero exit code.",
    )
    p_arg1: int = Field(1, description="Number of threads.")

TestMultiNodeCommunicationParameters

Bases: TaskParameters

Parameters for the test Task TestMultiNodeCommunication.

Test verifies communication across multiple machines.

Source code in lute/io/models/mpi_tests.py
38
39
40
41
42
43
44
45
46
47
48
49
class TestMultiNodeCommunicationParameters(TaskParameters):
    """Parameters for the test Task `TestMultiNodeCommunication`.

    Test verifies communication across multiple machines.
    """

    send_obj: Literal["plot", "array"] = Field(
        "array", description="Object to send to Executor. `plot` or `array`"
    )
    arr_size: Optional[int] = Field(
        None, description="Size of array to send back to Executor."
    )

TestParameters

Bases: TaskParameters

Parameters for the test Task Test.

Source code in lute/io/models/tests.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class TestParameters(TaskParameters):
    """Parameters for the test Task `Test`."""

    float_var: float = Field(0.01, description="A floating point number.")
    str_var: str = Field("test", description="A string.")

    class CompoundVar(BaseModel):
        int_var: int = 1
        dict_var: Dict[str, str] = {"a": "b"}

    compound_var: CompoundVar = Field(
        description=(
            "A compound parameter - consists of a `int_var` (int) and `dict_var`"
            " (Dict[str, str])."
        )
    )
    throw_error: bool = Field(
        False, description="If `True`, raise an exception to test error handling."
    )

ThirdPartyParameters

Bases: TaskParameters

Base class for third party task parameters.

Contains special validators for extra arguments and handling of parameters used for filling in third party configuration files.

Source code in lute/io/models/base.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
class ThirdPartyParameters(TaskParameters):
    """Base class for third party task parameters.

    Contains special validators for extra arguments and handling of parameters
    used for filling in third party configuration files.
    """

    class Config(TaskParameters.Config):
        """Configuration for parameters model.

        The Config class holds Pydantic configuration and inherited configuration
        from the base `TaskParameters.Config` class. A number of values are also
        overridden, and there are some specific configuration options to
        ThirdPartyParameters. A full list of options (with TaskParameters options
        repeated) is described below.

        Attributes:
            env_prefix (str): Pydantic configuration. Will set parameters from
                environment variables containing this prefix. E.g. a model
                parameter `input` can be set with an environment variable:
                `{env_prefix}input`, in LUTE's case `LUTE_input`.

            underscore_attrs_are_private (bool): Pydantic configuration. Whether
                to hide attributes (parameters) prefixed with an underscore.

            copy_on_model_validation (str): Pydantic configuration. How to copy
                the input object passed to the class instance for model
                validation. Set to perform a deep copy.

            allow_inf_nan (bool): Pydantic configuration. Whether to allow
                infinity or NAN in float fields.

            run_directory (Optional[str]): None. If set, it should be a valid
                path. The `Task` will be run from this directory. This may be
                useful for some `Task`s which rely on searching the working
                directory.

            set_result (bool). True. If True, the model has information about
                setting the TaskResult object from the parameters it contains.
                E.g. it has an `output` parameter which is marked as the result.
                The result can be set with a field value of `is_result=True` on
                a specific parameter, or using `result_from_params` and a
                validator.

            result_from_params (Optional[str]): None. Optionally used to define
                results from information available in the model using a custom
                validator. E.g. use a `outdir` and `filename` field to set
                `result_from_params=f"{outdir}/{filename}`, etc.

            result_summary (Optional[str]): None. Defines a result summary that
                can be known after processing the Pydantic model. Use of summary
                depends on the Executor running the Task. All summaries are
                stored in the database, however.

            impl_schemas (Optional[str]). Specifies a the schemas the
                output/results conform to. Only used if set_result is True.

            -----------------------
            ThirdPartyTask-specific:

            extra (str): "allow". Pydantic configuration. Allow (or ignore) extra
                arguments.

            short_flags_use_eq (bool): False. If True, "short" command-line args
                are passed as `-x=arg`. ThirdPartyTask-specific.

            long_flags_use_eq (bool): False. If True, "long" command-line args
                are passed as `--long=arg`. ThirdPartyTask-specific.
        """

        extra: str = "allow"
        short_flags_use_eq: bool = False
        """Whether short command-line arguments are passed like `-x=arg`."""
        long_flags_use_eq: bool = False
        """Whether long command-line arguments are passed like `--long=arg`."""
        set_result: bool = True
        """Whether the Executor should mark a specified parameter as a result."""

    _unknown_template_params: Dict[str, Any] = PrivateAttr()
    # lute_template_cfg: TemplateConfig

    @root_validator(pre=False)
    def extra_fields_to_thirdparty(cls, values: Dict[str, Any]):
        cls._unknown_template_params = {}
        my_schema: Dict[str, Any] = model_schema(
            cls, by_alias=True, ref_template=default_ref_template
        )
        param_schema_template: Dict[str, Any] = {
            "title": "",
            "description": "Unknown template parameters.",
            "type": "object",
            "properties": {
                "params": "",
                "type": "object",
            },
        }
        new_values: Dict[str, Any] = {}
        for key in values:
            if key not in cls.__fields__:
                new_values[key] = TemplateParameters(values[key])
                param_schema: Dict[str, Any] = param_schema_template.copy()
                param_schema["title"] = key
                param_schema["properties"]["params"] = values[key]
                cls._unknown_template_params[key] = param_schema
            else:
                new_values[key] = values[key]
        return new_values

Config

Bases: Config

Configuration for parameters model.

The Config class holds Pydantic configuration and inherited configuration from the base TaskParameters.Config class. A number of values are also overridden, and there are some specific configuration options to ThirdPartyParameters. A full list of options (with TaskParameters options repeated) is described below.

Attributes:

Name Type Description
env_prefix str

Pydantic configuration. Will set parameters from environment variables containing this prefix. E.g. a model parameter input can be set with an environment variable: {env_prefix}input, in LUTE's case LUTE_input.

underscore_attrs_are_private bool

Pydantic configuration. Whether to hide attributes (parameters) prefixed with an underscore.

copy_on_model_validation str

Pydantic configuration. How to copy the input object passed to the class instance for model validation. Set to perform a deep copy.

allow_inf_nan bool

Pydantic configuration. Whether to allow infinity or NAN in float fields.

run_directory Optional[str]

None. If set, it should be a valid path. The Task will be run from this directory. This may be useful for some Tasks which rely on searching the working directory.

result_from_params Optional[str]

None. Optionally used to define results from information available in the model using a custom validator. E.g. use a outdir and filename field to set result_from_params=f"{outdir}/{filename}, etc.

result_summary Optional[str]

None. Defines a result summary that can be known after processing the Pydantic model. Use of summary depends on the Executor running the Task. All summaries are stored in the database, however.

ThirdPartyTask-specific Optional[str]
extra str

"allow". Pydantic configuration. Allow (or ignore) extra arguments.

short_flags_use_eq bool

False. If True, "short" command-line args are passed as -x=arg. ThirdPartyTask-specific.

long_flags_use_eq bool

False. If True, "long" command-line args are passed as --long=arg. ThirdPartyTask-specific.

Source code in lute/io/models/base.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
class Config(TaskParameters.Config):
    """Configuration for parameters model.

    The Config class holds Pydantic configuration and inherited configuration
    from the base `TaskParameters.Config` class. A number of values are also
    overridden, and there are some specific configuration options to
    ThirdPartyParameters. A full list of options (with TaskParameters options
    repeated) is described below.

    Attributes:
        env_prefix (str): Pydantic configuration. Will set parameters from
            environment variables containing this prefix. E.g. a model
            parameter `input` can be set with an environment variable:
            `{env_prefix}input`, in LUTE's case `LUTE_input`.

        underscore_attrs_are_private (bool): Pydantic configuration. Whether
            to hide attributes (parameters) prefixed with an underscore.

        copy_on_model_validation (str): Pydantic configuration. How to copy
            the input object passed to the class instance for model
            validation. Set to perform a deep copy.

        allow_inf_nan (bool): Pydantic configuration. Whether to allow
            infinity or NAN in float fields.

        run_directory (Optional[str]): None. If set, it should be a valid
            path. The `Task` will be run from this directory. This may be
            useful for some `Task`s which rely on searching the working
            directory.

        set_result (bool). True. If True, the model has information about
            setting the TaskResult object from the parameters it contains.
            E.g. it has an `output` parameter which is marked as the result.
            The result can be set with a field value of `is_result=True` on
            a specific parameter, or using `result_from_params` and a
            validator.

        result_from_params (Optional[str]): None. Optionally used to define
            results from information available in the model using a custom
            validator. E.g. use a `outdir` and `filename` field to set
            `result_from_params=f"{outdir}/{filename}`, etc.

        result_summary (Optional[str]): None. Defines a result summary that
            can be known after processing the Pydantic model. Use of summary
            depends on the Executor running the Task. All summaries are
            stored in the database, however.

        impl_schemas (Optional[str]). Specifies a the schemas the
            output/results conform to. Only used if set_result is True.

        -----------------------
        ThirdPartyTask-specific:

        extra (str): "allow". Pydantic configuration. Allow (or ignore) extra
            arguments.

        short_flags_use_eq (bool): False. If True, "short" command-line args
            are passed as `-x=arg`. ThirdPartyTask-specific.

        long_flags_use_eq (bool): False. If True, "long" command-line args
            are passed as `--long=arg`. ThirdPartyTask-specific.
    """

    extra: str = "allow"
    short_flags_use_eq: bool = False
    """Whether short command-line arguments are passed like `-x=arg`."""
    long_flags_use_eq: bool = False
    """Whether long command-line arguments are passed like `--long=arg`."""
    set_result: bool = True
    """Whether the Executor should mark a specified parameter as a result."""

long_flags_use_eq: bool = False class-attribute instance-attribute

Whether long command-line arguments are passed like --long=arg.

set_result: bool = True class-attribute instance-attribute

Whether the Executor should mark a specified parameter as a result.

short_flags_use_eq: bool = False class-attribute instance-attribute

Whether short command-line arguments are passed like -x=arg.

_check_str_numeric(string)

Check if a string is an integer or float and return it as such.

Parameters:

Name Type Description Default
string str

Input string to check.

required

Returns:

Name Type Description
output str | int | float

Returns an int or float if the string can be converted to one. Otherwise returns the original string.

Source code in lute/io/config.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def _check_str_numeric(string: str) -> Union[str, int, float]:
    """Check if a string is an integer or float and return it as such.

    Args:
        string (str): Input string to check.

    Returns:
        output (str | int | float): Returns an int or float if the string can be
            converted to one. Otherwise returns the original string.
    """
    if string.isnumeric():
        return int(string)
    elif _isfloat(string):
        return float(string)
    else:
        return string

parse_config(task_name='test', config_path='')

Parse a configuration file and validate the contents.

Parameters:

Name Type Description Default
task_name str

Name of the specific task that will be run.

'test'
config_path str

Path to the configuration file.

''

Returns:

Name Type Description
params TaskParameters

A TaskParameters object of validated task-specific parameters. Parameters are accessed with "dot" notation. E.g. params.param1.

Raises:

Type Description
ValidationError

Raised if there are problems with the configuration file. Passed through from Pydantic.

Source code in lute/io/config.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def parse_config(task_name: str = "test", config_path: str = "") -> TaskParameters:
    """Parse a configuration file and validate the contents.

    Args:
        task_name (str): Name of the specific task that will be run.

        config_path (str): Path to the configuration file.

    Returns:
        params (TaskParameters): A TaskParameters object of validated
            task-specific parameters. Parameters are accessed with "dot"
            notation. E.g. `params.param1`.

    Raises:
        ValidationError: Raised if there are problems with the configuration
            file. Passed through from Pydantic.
    """
    task_config_name: str = f"{task_name}Parameters"

    with open(config_path, "r") as f:
        docs: Iterator[Dict[str, Any]] = yaml.load_all(stream=f, Loader=yaml.FullLoader)
        header: Dict[str, Any] = next(docs)
        config: Dict[str, Any] = next(docs)
    substitute_variables(header, header)
    substitute_variables(header, config)
    LUTE_DEBUG_EXIT("LUTE_DEBUG_EXIT_AT_YAML", pprint.pformat(config))
    lute_config: Dict[str, AnalysisHeader] = {"lute_config": AnalysisHeader(**header)}
    try:
        task_config: Dict[str, Any] = dict(config[task_name])
        lute_config.update(task_config)
    except KeyError as err:
        warnings.warn(
            (
                f"{task_name} has no parameter definitions in YAML file."
                " Attempting default parameter initialization."
            )
        )
    parsed_parameters: TaskParameters = globals()[task_config_name](**lute_config)
    return parsed_parameters

substitute_variables(header, config, curr_key=None)

Performs variable substitutions on a dictionary read from config YAML file.

Can be used to define input parameters in terms of other input parameters. This is similar to functionality employed by validators for parameters in the specific Task models, but is intended to be more accessible to users. Variable substitutions are defined using a minimal syntax from Jinja: {{ experiment }} defines a substitution of the variable experiment. The characters {{ }} can be escaped if the literal symbols are needed in place.

For example, a path to a file can be defined in terms of experiment and run values in the config file: MyTask: experiment: myexp run: 2 special_file: /path/to/{{ experiment }}/{{ run }}/file.inp

Acceptable variables for substitutions are values defined elsewhere in the YAML file. Environment variables can also be used if prefaced with a $ character. E.g. to get the experiment from an environment variable: MyTask: run: 2 special_file: /path/to/{{ $EXPERIMENT }}/{{ run }}/file.inp

Parameters:

Name Type Description Default
config Dict[str, Any]

A dictionary of parsed configuration.

required
curr_key Optional[str]

Used to keep track of recursion level when scanning through iterable items in the config dictionary.

None

Returns:

Name Type Description
subbed_config Dict[str, Any]

The config dictionary after substitutions have been made. May be identical to the input if no substitutions are needed.

Source code in lute/io/config.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def substitute_variables(
    header: Dict[str, Any], config: Dict[str, Any], curr_key: Optional[str] = None
) -> None:
    """Performs variable substitutions on a dictionary read from config YAML file.

    Can be used to define input parameters in terms of other input parameters.
    This is similar to functionality employed by validators for parameters in
    the specific Task models, but is intended to be more accessible to users.
    Variable substitutions are defined using a minimal syntax from Jinja:
                               {{ experiment }}
    defines a substitution of the variable `experiment`. The characters `{{ }}`
    can be escaped if the literal symbols are needed in place.

    For example, a path to a file can be defined in terms of experiment and run
    values in the config file:
        MyTask:
          experiment: myexp
          run: 2
          special_file: /path/to/{{ experiment }}/{{ run }}/file.inp

    Acceptable variables for substitutions are values defined elsewhere in the
    YAML file. Environment variables can also be used if prefaced with a `$`
    character. E.g. to get the experiment from an environment variable:
        MyTask:
          run: 2
          special_file: /path/to/{{ $EXPERIMENT }}/{{ run }}/file.inp

    Args:
        config (Dict[str, Any]):  A dictionary of parsed configuration.

        curr_key (Optional[str]): Used to keep track of recursion level when scanning
            through iterable items in the config dictionary.

    Returns:
        subbed_config (Dict[str, Any]): The config dictionary after substitutions
            have been made. May be identical to the input if no substitutions are
            needed.
    """
    _sub_pattern = r"\{\{[^}{]*\}\}"
    iterable: Dict[str, Any] = config
    if curr_key is not None:
        # Need to handle nested levels by interpreting curr_key
        keys_by_level: List[str] = curr_key.split(".")
        for key in keys_by_level:
            iterable = iterable[key]
    else:
        ...
        # iterable = config
    for param, value in iterable.items():
        if isinstance(value, dict):
            new_key: str
            if curr_key is None:
                new_key = param
            else:
                new_key = f"{curr_key}.{param}"
            substitute_variables(header, config, curr_key=new_key)
        elif isinstance(value, list):
            ...
        # Scalars str - we skip numeric types
        elif isinstance(value, str):
            matches: List[str] = re.findall(_sub_pattern, value)
            for m in matches:
                key_to_sub_maybe_with_fmt: List[str] = m[2:-2].strip().split(":")
                key_to_sub: str = key_to_sub_maybe_with_fmt[0]
                fmt: Optional[str] = None
                if len(key_to_sub_maybe_with_fmt) == 2:
                    fmt = key_to_sub_maybe_with_fmt[1]
                sub: Any
                if key_to_sub[0] == "$":
                    sub = os.getenv(key_to_sub[1:], None)
                    if sub is None:
                        # Check if we use a different env - substitution happens
                        # before environment reset
                        sub = os.getenv(f"LUTE_TENV_{key_to_sub[1:]}")
                    if sub is None:
                        print(
                            f"Environment variable {key_to_sub[1:]} not found! Cannot substitute in YAML config!",
                            flush=True,
                        )
                        continue
                    # substitutions from env vars will be strings, so convert back
                    # to numeric in order to perform formatting later on (e.g. {var:04d})
                    sub = _check_str_numeric(sub)
                else:
                    try:
                        sub = config
                        for key in key_to_sub.split("."):
                            sub = sub[key]
                    except KeyError:
                        sub = header[key_to_sub]
                pattern: str = (
                    m.replace("{{", r"\{\{").replace("}}", r"\}\}").replace("$", r"\$")
                )
                if fmt is not None:
                    sub = f"{sub:{fmt}}"
                else:
                    sub = f"{sub}"
                iterable[param] = re.sub(pattern, sub, iterable[param])
            # Reconvert back to numeric values if needed...
            iterable[param] = _check_str_numeric(iterable[param])