Machinary for the IO of configuration YAML files and their validation.
Functions:
Name | Description |
---|---|
parse_config |
str, config_path: str) -> TaskParameters: Parse a configuration file and return a TaskParameters object of validated parameters for a specific Task. Raises an exception if the provided configuration does not match the expected model. |
Raises: |
|
---|
AnalysisHeader
Bases: BaseModel
Header information for LUTE analysis runs.
Source code in lute/io/models/base.py
class AnalysisHeader(BaseModel):
"""Header information for LUTE analysis runs."""
title: str = Field(
"LUTE Task Configuration",
description="Description of the configuration or experiment.",
)
experiment: str = Field("", description="Experiment.")
run: Union[str, int] = Field("", description="Data acquisition run.")
date: str = Field("1970/01/01", description="Start date of analysis.")
lute_version: Union[float, str] = Field(
0.1, description="Version of LUTE used for analysis."
)
task_timeout: PositiveInt = Field(
600,
description=(
"Time in seconds until a task times out. Should be slightly shorter"
" than job timeout if using a job manager (e.g. SLURM)."
),
)
work_dir: str = Field("", description="Main working directory for LUTE.")
@validator("work_dir", always=True)
def validate_work_dir(cls, directory: str, values: Dict[str, Any]) -> str:
work_dir: str
if directory == "":
std_work_dir = (
f"/sdf/data/lcls/ds/{values['experiment'][:3]}/"
f"{values['experiment']}/scratch"
)
work_dir = std_work_dir
else:
work_dir = directory
# Check existence and permissions
if not os.path.exists(work_dir):
raise ValueError(f"Working Directory: {work_dir} does not exist!")
if not os.access(work_dir, os.W_OK):
# Need write access for database, files etc.
raise ValueError(f"Not write access for working directory: {work_dir}!")
return work_dir
@validator("run", always=True)
def validate_run(
cls, run: Union[str, int], values: Dict[str, Any]
) -> Union[str, int]:
if run == "":
# From Airflow RUN_NUM should have Format "RUN_DATETIME" - Num is first part
run_time: str = os.environ.get("RUN_NUM", "")
if run_time != "":
return int(run_time.split("_")[0])
return run
@validator("experiment", always=True)
def validate_experiment(cls, experiment: str, values: Dict[str, Any]) -> str:
if experiment == "":
arp_exp: str = os.environ.get("EXPERIMENT", "EXPX00000")
return arp_exp
return experiment
CompareHKLParameters
Bases: ThirdPartyParameters
Parameters for CrystFEL's compare_hkl
for calculating figures of merit.
There are many parameters, and many combinations. For more information on usage, please refer to the CrystFEL documentation, here: https://www.desy.de/~twhite/crystfel/manual-partialator.html
Source code in lute/io/models/sfx_merge.py
class CompareHKLParameters(ThirdPartyParameters):
"""Parameters for CrystFEL's `compare_hkl` for calculating figures of merit.
There are many parameters, and many combinations. For more information on
usage, please refer to the CrystFEL documentation, here:
https://www.desy.de/~twhite/crystfel/manual-partialator.html
"""
class Config(ThirdPartyParameters.Config):
long_flags_use_eq: bool = True
"""Whether long command-line arguments are passed like `--long=arg`."""
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
executable: str = Field(
"/sdf/group/lcls/ds/tools/crystfel/0.10.2/bin/compare_hkl",
description="CrystFEL's reflection comparison binary.",
flag_type="",
)
in_files: Optional[str] = Field(
"",
description="Path to input HKLs. Space-separated list of 2. Use output of partialator e.g.",
flag_type="",
)
## Need mechanism to set is_result=True ...
symmetry: str = Field("", description="Point group symmetry.", flag_type="--")
cell_file: str = Field(
"",
description="Path to a file containing unit cell information (PDB or CrystFEL format).",
flag_type="-",
rename_param="p",
)
fom: str = Field(
"Rsplit", description="Specify figure of merit to calculate.", flag_type="--"
)
nshells: int = Field(10, description="Use n resolution shells.", flag_type="--")
# NEED A NEW CASE FOR THIS -> Boolean flag, no arg, one hyphen...
# fix_unity: bool = Field(
# False,
# description="Fix scale factors to unity.",
# flag_type="-",
# rename_param="u",
# )
shell_file: str = Field(
"",
description="Write the statistics in resolution shells to a file.",
flag_type="--",
rename_param="shell-file",
is_result=True,
)
ignore_negs: bool = Field(
False,
description="Ignore reflections with negative reflections.",
flag_type="--",
rename_param="ignore-negs",
)
zero_negs: bool = Field(
False,
description="Set negative intensities to 0.",
flag_type="--",
rename_param="zero-negs",
)
sigma_cutoff: Optional[Union[float, int, str]] = Field(
# "-infinity",
description="Discard reflections with I/sigma(I) < n. -infinity means no cutoff.",
flag_type="--",
rename_param="sigma-cutoff",
)
rmin: Optional[float] = Field(
description="Low resolution cutoff of 1/d (m-1). Use this or --lowres NOT both.",
flag_type="--",
)
lowres: Optional[float] = Field(
descirption="Low resolution cutoff in Angstroms. Use this or --rmin NOT both.",
flag_type="--",
)
rmax: Optional[float] = Field(
description="High resolution cutoff in 1/d (m-1). Use this or --highres NOT both.",
flag_type="--",
)
highres: Optional[float] = Field(
description="High resolution cutoff in Angstroms. Use this or --rmax NOT both.",
flag_type="--",
)
@validator("in_files", always=True)
def validate_in_files(cls, in_files: str, values: Dict[str, Any]) -> str:
if in_files == "":
partialator_file: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}", "MergePartialator", "out_file"
)
if partialator_file:
hkls: str = f"{partialator_file}1 {partialator_file}2"
return hkls
return in_files
@validator("cell_file", always=True)
def validate_cell_file(cls, cell_file: str, values: Dict[str, Any]) -> str:
if cell_file == "":
idx_cell_file: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}",
"IndexCrystFEL",
"cell_file",
valid_only=False,
)
if idx_cell_file:
return idx_cell_file
return cell_file
@validator("symmetry", always=True)
def validate_symmetry(cls, symmetry: str, values: Dict[str, Any]) -> str:
if symmetry == "":
partialator_sym: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}", "MergePartialator", "symmetry"
)
if partialator_sym:
return partialator_sym
return symmetry
@validator("shell_file", always=True)
def validate_shell_file(cls, shell_file: str, values: Dict[str, Any]) -> str:
if shell_file == "":
partialator_file: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}", "MergePartialator", "out_file"
)
if partialator_file:
shells_out: str = partialator_file.split(".")[0]
shells_out = f"{shells_out}_{values['fom']}_n{values['nshells']}.dat"
return shells_out
return shell_file
Config
Bases: Config
Source code in lute/io/models/sfx_merge.py
class Config(ThirdPartyParameters.Config):
long_flags_use_eq: bool = True
"""Whether long command-line arguments are passed like `--long=arg`."""
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
long_flags_use_eq: bool = True
class-attribute
instance-attribute
Whether long command-line arguments are passed like --long=arg
.
set_result: bool = True
class-attribute
instance-attribute
Whether the Executor should mark a specified parameter as a result.
ConcatenateStreamFilesParameters
Bases: TaskParameters
Parameters for stream concatenation.
Concatenates the stream file output from CrystFEL indexing for multiple experimental runs.
Source code in lute/io/models/sfx_index.py
class ConcatenateStreamFilesParameters(TaskParameters):
"""Parameters for stream concatenation.
Concatenates the stream file output from CrystFEL indexing for multiple
experimental runs.
"""
class Config(TaskParameters.Config):
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
in_file: str = Field(
"",
description="Root of directory tree storing stream files to merge.",
)
tag: Optional[str] = Field(
"",
description="Tag identifying the stream files to merge.",
)
out_file: str = Field(
"", description="Path to merged output stream file.", is_result=True
)
@validator("in_file", always=True)
def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
if in_file == "":
stream_file: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}", "IndexCrystFEL", "out_file"
)
if stream_file:
stream_dir: str = str(Path(stream_file).parent)
return stream_dir
return in_file
@validator("tag", always=True)
def validate_tag(cls, tag: str, values: Dict[str, Any]) -> str:
if tag == "":
stream_file: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}", "IndexCrystFEL", "out_file"
)
if stream_file:
stream_tag: str = Path(stream_file).name.split("_")[0]
return stream_tag
return tag
@validator("out_file", always=True)
def validate_out_file(cls, tag: str, values: Dict[str, Any]) -> str:
if tag == "":
stream_out_file: str = str(
Path(values["in_file"]).parent / f"{values['tag'].stream}"
)
return stream_out_file
return tag
Config
Bases: Config
Source code in lute/io/models/sfx_index.py
class Config(TaskParameters.Config):
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
set_result: bool = True
class-attribute
instance-attribute
Whether the Executor should mark a specified parameter as a result.
DimpleSolveParameters
Bases: ThirdPartyParameters
Parameters for CCP4's dimple program.
There are many parameters. For more information on usage, please refer to the CCP4 documentation, here: https://ccp4.github.io/dimple/
Source code in lute/io/models/sfx_solve.py
class DimpleSolveParameters(ThirdPartyParameters):
"""Parameters for CCP4's dimple program.
There are many parameters. For more information on
usage, please refer to the CCP4 documentation, here:
https://ccp4.github.io/dimple/
"""
executable: str = Field(
"/sdf/group/lcls/ds/tools/ccp4-8.0/bin/dimple",
description="CCP4 Dimple for solving structures with MR.",
flag_type="",
)
# Positional requirements - all required.
in_file: str = Field(
"",
description="Path to input mtz.",
flag_type="",
)
pdb: str = Field("", description="Path to a PDB.", flag_type="")
out_dir: str = Field("", description="Output DIRECTORY.", flag_type="")
# Most used options
mr_thresh: PositiveFloat = Field(
0.4,
description="Threshold for molecular replacement.",
flag_type="--",
rename_param="mr-when-r",
)
slow: Optional[bool] = Field(
False, description="Perform more refinement.", flag_type="--"
)
# Other options (IO)
hklout: str = Field(
"final.mtz", description="Output mtz file name.", flag_type="--"
)
xyzout: str = Field(
"final.pdb", description="Output PDB file name.", flag_type="--"
)
icolumn: Optional[str] = Field(
# "IMEAN",
description="Name for the I column.",
flag_type="--",
)
sigicolumn: Optional[str] = Field(
# "SIG<ICOL>",
description="Name for the Sig<I> column.",
flag_type="--",
)
fcolumn: Optional[str] = Field(
# "F",
description="Name for the F column.",
flag_type="--",
)
sigfcolumn: Optional[str] = Field(
# "F",
description="Name for the Sig<F> column.",
flag_type="--",
)
libin: Optional[str] = Field(
description="Ligand descriptions for refmac (LIBIN).", flag_type="--"
)
refmac_key: Optional[str] = Field(
description="Extra Refmac keywords to use in refinement.",
flag_type="--",
rename_param="refmac-key",
)
free_r_flags: Optional[str] = Field(
description="Path to a mtz file with freeR flags.",
flag_type="--",
rename_param="free-r-flags",
)
freecolumn: Optional[Union[int, float]] = Field(
# 0,
description="Refree column with an optional value.",
flag_type="--",
)
img_format: Optional[str] = Field(
description="Format of generated images. (png, jpeg, none).",
flag_type="-",
rename_param="f",
)
white_bg: bool = Field(
False,
description="Use a white background in Coot and in images.",
flag_type="--",
rename_param="white-bg",
)
no_cleanup: bool = Field(
False,
description="Retain intermediate files.",
flag_type="--",
rename_param="no-cleanup",
)
# Calculations
no_blob_search: bool = Field(
False,
description="Do not search for unmodelled blobs.",
flag_type="--",
rename_param="no-blob-search",
)
anode: bool = Field(
False, description="Use SHELX/AnoDe to find peaks in the anomalous map."
)
# Run customization
no_hetatm: bool = Field(
False,
description="Remove heteroatoms from the given model.",
flag_type="--",
rename_param="no-hetatm",
)
rigid_cycles: Optional[PositiveInt] = Field(
# 10,
description="Number of cycles of rigid-body refinement to perform.",
flag_type="--",
rename_param="rigid-cycles",
)
jelly: Optional[PositiveInt] = Field(
# 4,
description="Number of cycles of jelly-body refinement to perform.",
flag_type="--",
)
restr_cycles: Optional[PositiveInt] = Field(
# 8,
description="Number of cycles of refmac final refinement to perform.",
flag_type="--",
rename_param="restr-cycles",
)
lim_resolution: Optional[PositiveFloat] = Field(
description="Limit the final resolution.", flag_type="--", rename_param="reso"
)
weight: Optional[str] = Field(
# "auto-weight",
description="The refmac matrix weight.",
flag_type="--",
)
mr_prog: Optional[str] = Field(
# "phaser",
description="Molecular replacement program. phaser or molrep.",
flag_type="--",
rename_param="mr-prog",
)
mr_num: Optional[Union[str, int]] = Field(
# "auto",
description="Number of molecules to use for molecular replacement.",
flag_type="--",
rename_param="mr-num",
)
mr_reso: Optional[PositiveFloat] = Field(
# 3.25,
description="High resolution for molecular replacement. If >10 interpreted as eLLG.",
flag_type="--",
rename_param="mr-reso",
)
itof_prog: Optional[str] = Field(
description="Program to calculate amplitudes. truncate, or ctruncate.",
flag_type="--",
rename_param="ItoF-prog",
)
@validator("in_file", always=True)
def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
if in_file == "":
get_hkl_file: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}", "ManipulateHKL", "out_file"
)
if get_hkl_file:
return get_hkl_file
return in_file
@validator("out_dir", always=True)
def validate_out_dir(cls, out_dir: str, values: Dict[str, Any]) -> str:
if out_dir == "":
get_hkl_file: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}", "ManipulateHKL", "out_file"
)
if get_hkl_file:
return os.path.dirname(get_hkl_file)
return out_dir
FindOverlapXSSParameters
Bases: TaskParameters
TaskParameter model for FindOverlapXSS Task.
This Task determines spatial or temporal overlap between an optical pulse and the FEL pulse based on difference scattering (XSS) signal. This Task uses SmallData HDF5 files as a source.
Source code in lute/io/models/smd.py
class FindOverlapXSSParameters(TaskParameters):
"""TaskParameter model for FindOverlapXSS Task.
This Task determines spatial or temporal overlap between an optical pulse
and the FEL pulse based on difference scattering (XSS) signal. This Task
uses SmallData HDF5 files as a source.
"""
class ExpConfig(BaseModel):
det_name: str
ipm_var: str
scan_var: Union[str, List[str]]
class Thresholds(BaseModel):
min_Iscat: Union[int, float]
min_ipm: Union[int, float]
class AnalysisFlags(BaseModel):
use_pyfai: bool = True
use_asymls: bool = False
exp_config: ExpConfig
thresholds: Thresholds
analysis_flags: AnalysisFlags
FindPeaksPsocakeParameters
Bases: ThirdPartyParameters
Parameters for crystallographic (Bragg) peak finding using Psocake.
This peak finding Task optionally has the ability to compress/decompress data with SZ for the purpose of compression validation. NOTE: This Task is deprecated and provided for compatibility only.
Source code in lute/io/models/sfx_find_peaks.py
class FindPeaksPsocakeParameters(ThirdPartyParameters):
"""Parameters for crystallographic (Bragg) peak finding using Psocake.
This peak finding Task optionally has the ability to compress/decompress
data with SZ for the purpose of compression validation.
NOTE: This Task is deprecated and provided for compatibility only.
"""
class Config(TaskParameters.Config):
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
result_from_params: str = ""
"""Defines a result from the parameters. Use a validator to do so."""
class SZParameters(BaseModel):
compressor: Literal["qoz", "sz3"] = Field(
"qoz", description="SZ compression algorithm (qoz, sz3)"
)
binSize: int = Field(2, description="SZ compression's bin size paramater")
roiWindowSize: int = Field(
2, description="SZ compression's ROI window size paramater"
)
absError: float = Field(10, descriptionp="Maximum absolute error value")
executable: str = Field("mpirun", description="MPI executable.", flag_type="")
np: PositiveInt = Field(
max(int(os.environ.get("SLURM_NPROCS", len(os.sched_getaffinity(0)))) - 1, 1),
description="Number of processes",
flag_type="-",
)
mca: str = Field(
"btl ^openib", description="Mca option for the MPI executable", flag_type="--"
)
p_arg1: str = Field(
"python", description="Executable to run with mpi (i.e. python).", flag_type=""
)
u: str = Field(
"", description="Python option for unbuffered output.", flag_type="-"
)
p_arg2: str = Field(
"findPeaksSZ.py",
description="Executable to run with mpi (i.e. python).",
flag_type="",
)
d: str = Field(description="Detector name", flag_type="-")
e: str = Field("", description="Experiment name", flag_type="-")
r: int = Field(-1, description="Run number", flag_type="-")
outDir: str = Field(
description="Output directory where .cxi will be saved", flag_type="--"
)
algorithm: int = Field(1, description="PyAlgos algorithm to use", flag_type="--")
alg_npix_min: float = Field(
1.0, description="PyAlgos algorithm's npix_min parameter", flag_type="--"
)
alg_npix_max: float = Field(
45.0, description="PyAlgos algorithm's npix_max parameter", flag_type="--"
)
alg_amax_thr: float = Field(
250.0, description="PyAlgos algorithm's amax_thr parameter", flag_type="--"
)
alg_atot_thr: float = Field(
330.0, description="PyAlgos algorithm's atot_thr parameter", flag_type="--"
)
alg_son_min: float = Field(
10.0, description="PyAlgos algorithm's son_min parameter", flag_type="--"
)
alg1_thr_low: float = Field(
80.0, description="PyAlgos algorithm's thr_low parameter", flag_type="--"
)
alg1_thr_high: float = Field(
270.0, description="PyAlgos algorithm's thr_high parameter", flag_type="--"
)
alg1_rank: int = Field(
3, description="PyAlgos algorithm's rank parameter", flag_type="--"
)
alg1_radius: int = Field(
3, description="PyAlgos algorithm's radius parameter", flag_type="--"
)
alg1_dr: int = Field(
1, description="PyAlgos algorithm's dr parameter", flag_type="--"
)
psanaMask_on: str = Field(
"True", description="Whether psana's mask should be used", flag_type="--"
)
psanaMask_calib: str = Field(
"True", description="Psana mask's calib parameter", flag_type="--"
)
psanaMask_status: str = Field(
"True", description="Psana mask's status parameter", flag_type="--"
)
psanaMask_edges: str = Field(
"True", description="Psana mask's edges parameter", flag_type="--"
)
psanaMask_central: str = Field(
"True", description="Psana mask's central parameter", flag_type="--"
)
psanaMask_unbond: str = Field(
"True", description="Psana mask's unbond parameter", flag_type="--"
)
psanaMask_unbondnrs: str = Field(
"True", description="Psana mask's unbondnbrs parameter", flag_type="--"
)
mask: str = Field(
"", description="Path to an additional mask to apply", flag_type="--"
)
clen: str = Field(
description="Epics variable storing the camera length", flag_type="--"
)
coffset: float = Field(0, description="Camera offset in m", flag_type="--")
minPeaks: int = Field(
15,
description="Minimum number of peaks to mark frame for indexing",
flag_type="--",
)
maxPeaks: int = Field(
15,
description="Maximum number of peaks to mark frame for indexing",
flag_type="--",
)
minRes: int = Field(
0,
description="Minimum peak resolution to mark frame for indexing ",
flag_type="--",
)
sample: str = Field("", description="Sample name", flag_type="--")
instrument: Union[None, str] = Field(
None, description="Instrument name", flag_type="--"
)
pixelSize: float = Field(0.0, description="Pixel size", flag_type="--")
auto: str = Field(
"False",
description=(
"Whether to automatically determine peak per event peak "
"finding parameters"
),
flag_type="--",
)
detectorDistance: float = Field(
0.0, description="Detector distance from interaction point in m", flag_type="--"
)
access: Literal["ana", "ffb"] = Field(
"ana", description="Data node type: {ana,ffb}", flag_type="--"
)
szfile: str = Field("qoz.json", description="Path to SZ's JSON configuration file")
lute_template_cfg: TemplateConfig = Field(
TemplateConfig(
template_name="sz.json",
output_path="", # Will want to change where this goes...
),
description="Template information for the sz.json file",
)
sz_parameters: SZParameters = Field(
description="Configuration parameters for SZ Compression", flag_type=""
)
@validator("e", always=True)
def validate_e(cls, e: str, values: Dict[str, Any]) -> str:
if e == "":
return values["lute_config"].experiment
return e
@validator("r", always=True)
def validate_r(cls, r: int, values: Dict[str, Any]) -> int:
if r == -1:
return values["lute_config"].run
return r
@validator("lute_template_cfg", always=True)
def set_output_path(
cls, lute_template_cfg: TemplateConfig, values: Dict[str, Any]
) -> TemplateConfig:
if lute_template_cfg.output_path == "":
lute_template_cfg.output_path = values["szfile"]
return lute_template_cfg
@validator("sz_parameters", always=True)
def set_sz_compression_parameters(
cls, sz_parameters: SZParameters, values: Dict[str, Any]
) -> None:
values["compressor"] = sz_parameters.compressor
values["binSize"] = sz_parameters.binSize
values["roiWindowSize"] = sz_parameters.roiWindowSize
if sz_parameters.compressor == "qoz":
values["pressio_opts"] = {
"pressio:abs": sz_parameters.absError,
"qoz": {"qoz:stride": 8},
}
else:
values["pressio_opts"] = {"pressio:abs": sz_parameters.absError}
return None
@root_validator(pre=False)
def define_result(cls, values: Dict[str, Any]) -> Dict[str, Any]:
exp: str = values["lute_config"].experiment
run: int = int(values["lute_config"].run)
directory: str = values["outDir"]
fname: str = f"{exp}_{run:04d}.lst"
cls.Config.result_from_params = f"{directory}/{fname}"
return values
Config
Bases: Config
Source code in lute/io/models/sfx_find_peaks.py
class Config(TaskParameters.Config):
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
result_from_params: str = ""
"""Defines a result from the parameters. Use a validator to do so."""
result_from_params: str = ''
class-attribute
instance-attribute
Defines a result from the parameters. Use a validator to do so.
set_result: bool = True
class-attribute
instance-attribute
Whether the Executor should mark a specified parameter as a result.
FindPeaksPyAlgosParameters
Bases: TaskParameters
Parameters for crystallographic (Bragg) peak finding using PyAlgos.
This peak finding Task optionally has the ability to compress/decompress data with SZ for the purpose of compression validation.
Source code in lute/io/models/sfx_find_peaks.py
class FindPeaksPyAlgosParameters(TaskParameters):
"""Parameters for crystallographic (Bragg) peak finding using PyAlgos.
This peak finding Task optionally has the ability to compress/decompress
data with SZ for the purpose of compression validation.
"""
class Config(TaskParameters.Config):
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
class SZCompressorParameters(BaseModel):
compressor: Literal["qoz", "sz3"] = Field(
"qoz", description='Compression algorithm ("qoz" or "sz3")'
)
abs_error: float = Field(10.0, description="Absolute error bound")
bin_size: int = Field(2, description="Bin size")
roi_window_size: int = Field(
9,
description="Default window size",
)
outdir: str = Field(
description="Output directory for cxi files",
)
n_events: int = Field(
0,
description="Number of events to process (0 to process all events)",
)
det_name: str = Field(
description="Psana name of the detector storing the image data",
)
event_receiver: Literal["evr0", "evr1"] = Field(
description="Event Receiver to be used: evr0 or evr1",
)
tag: str = Field(
"",
description="Tag to add to the output file names",
)
pv_camera_length: Union[str, float] = Field(
"",
description="PV associated with camera length "
"(if a number, camera length directly)",
)
event_logic: bool = Field(
False,
description="True if only events with a specific event code should be "
"processed. False if the event code should be ignored",
)
event_code: int = Field(
0,
description="Required events code for events to be processed if event logic "
"is True",
)
psana_mask: bool = Field(
False,
description="If True, apply mask from psana Detector object",
)
mask_file: Union[str, None] = Field(
None,
description="File with a custom mask to apply. If None, no custom mask is "
"applied",
)
min_peaks: int = Field(2, description="Minimum number of peaks per image")
max_peaks: int = Field(
2048,
description="Maximum number of peaks per image",
)
npix_min: int = Field(
2,
description="Minimum number of pixels per peak",
)
npix_max: int = Field(
30,
description="Maximum number of pixels per peak",
)
amax_thr: float = Field(
80.0,
description="Minimum intensity threshold for starting a peak",
)
atot_thr: float = Field(
120.0,
description="Minimum summed intensity threshold for pixel collection",
)
son_min: float = Field(
7.0,
description="Minimum signal-to-noise ratio to be considered a peak",
)
peak_rank: int = Field(
3,
description="Radius in which central peak pixel is a local maximum",
)
r0: float = Field(
3.0,
description="Radius of ring for background evaluation in pixels",
)
dr: float = Field(
2.0,
description="Width of ring for background evaluation in pixels",
)
nsigm: float = Field(
7.0,
description="Intensity threshold to include pixel in connected group",
)
compression: Optional[SZCompressorParameters] = Field(
None,
description="Options for the SZ Compression Algorithm",
)
out_file: str = Field(
"",
description="Path to output file.",
flag_type="-",
rename_param="o",
is_result=True,
)
@validator("out_file", always=True)
def validate_out_file(cls, out_file: str, values: Dict[str, Any]) -> str:
if out_file == "":
fname: Path = (
Path(values["outdir"])
/ f"{values['lute_config'].experiment}_{values['lute_config'].run}_"
f"{values['tag']}.list"
)
return str(fname)
return out_file
Config
Bases: Config
Source code in lute/io/models/sfx_find_peaks.py
class Config(TaskParameters.Config):
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
set_result: bool = True
class-attribute
instance-attribute
Whether the Executor should mark a specified parameter as a result.
IndexCrystFELParameters
Bases: ThirdPartyParameters
Parameters for CrystFEL's indexamajig
.
There are many parameters, and many combinations. For more information on usage, please refer to the CrystFEL documentation, here: https://www.desy.de/~twhite/crystfel/manual-indexamajig.html
Source code in lute/io/models/sfx_index.py
class IndexCrystFELParameters(ThirdPartyParameters):
"""Parameters for CrystFEL's `indexamajig`.
There are many parameters, and many combinations. For more information on
usage, please refer to the CrystFEL documentation, here:
https://www.desy.de/~twhite/crystfel/manual-indexamajig.html
"""
class Config(ThirdPartyParameters.Config):
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
long_flags_use_eq: bool = True
"""Whether long command-line arguments are passed like `--long=arg`."""
executable: str = Field(
"/sdf/group/lcls/ds/tools/crystfel/0.10.2/bin/indexamajig",
description="CrystFEL's indexing binary.",
flag_type="",
)
# Basic options
in_file: Optional[str] = Field(
"", description="Path to input file.", flag_type="-", rename_param="i"
)
out_file: str = Field(
"",
description="Path to output file.",
flag_type="-",
rename_param="o",
is_result=True,
)
geometry: str = Field(
"", description="Path to geometry file.", flag_type="-", rename_param="g"
)
zmq_input: Optional[str] = Field(
description="ZMQ address to receive data over. `input` and `zmq-input` are mutually exclusive",
flag_type="--",
rename_param="zmq-input",
)
zmq_subscribe: Optional[str] = Field( # Can be used multiple times...
description="Subscribe to ZMQ message of type `tag`",
flag_type="--",
rename_param="zmq-subscribe",
)
zmq_request: Optional[AnyUrl] = Field(
description="Request new data over ZMQ by sending this value",
flag_type="--",
rename_param="zmq-request",
)
asapo_endpoint: Optional[str] = Field(
description="ASAP::O endpoint. zmq-input and this are mutually exclusive.",
flag_type="--",
rename_param="asapo-endpoint",
)
asapo_token: Optional[str] = Field(
description="ASAP::O authentication token.",
flag_type="--",
rename_param="asapo-token",
)
asapo_beamtime: Optional[str] = Field(
description="ASAP::O beatime.",
flag_type="--",
rename_param="asapo-beamtime",
)
asapo_source: Optional[str] = Field(
description="ASAP::O data source.",
flag_type="--",
rename_param="asapo-source",
)
asapo_group: Optional[str] = Field(
description="ASAP::O consumer group.",
flag_type="--",
rename_param="asapo-group",
)
asapo_stream: Optional[str] = Field(
description="ASAP::O stream.",
flag_type="--",
rename_param="asapo-stream",
)
asapo_wait_for_stream: Optional[str] = Field(
description="If ASAP::O stream does not exist, wait for it to appear.",
flag_type="--",
rename_param="asapo-wait-for-stream",
)
data_format: Optional[str] = Field(
description="Specify format for ZMQ or ASAP::O. `msgpack`, `hdf5` or `seedee`.",
flag_type="--",
rename_param="data-format",
)
basename: bool = Field(
False,
description="Remove directory parts of filenames. Acts before prefix if prefix also given.",
flag_type="--",
)
prefix: Optional[str] = Field(
description="Add a prefix to the filenames from the infile argument.",
flag_type="--",
rename_param="asapo-stream",
)
nthreads: PositiveInt = Field(
max(int(os.environ.get("SLURM_NPROCS", len(os.sched_getaffinity(0)))) - 1, 1),
description="Number of threads to use. See also `max_indexer_threads`.",
flag_type="-",
rename_param="j",
)
no_check_prefix: bool = Field(
False,
description="Don't attempt to correct the prefix if it seems incorrect.",
flag_type="--",
rename_param="no-check-prefix",
)
highres: Optional[float] = Field(
description="Mark all pixels greater than `x` has bad.", flag_type="--"
)
profile: bool = Field(
False, description="Display timing data to monitor performance.", flag_type="--"
)
temp_dir: Optional[str] = Field(
description="Specify a path for the temp files folder.",
flag_type="--",
rename_param="temp-dir",
)
wait_for_file: conint(gt=-2) = Field(
0,
description="Wait at most `x` seconds for a file to be created. A value of -1 means wait forever.",
flag_type="--",
rename_param="wait-for-file",
)
no_image_data: bool = Field(
False,
description="Load only the metadata, no iamges. Can check indexability without high data requirements.",
flag_type="--",
rename_param="no-image-data",
)
# Peak-finding options
# ....
# Indexing options
indexing: Optional[str] = Field(
description="Comma-separated list of supported indexing algorithms to use. Default is to automatically detect.",
flag_type="--",
)
cell_file: Optional[str] = Field(
description="Path to a file containing unit cell information (PDB or CrystFEL format).",
flag_type="-",
rename_param="p",
)
tolerance: str = Field(
"5,5,5,1.5",
description=(
"Tolerances (in percent) for unit cell comparison. "
"Comma-separated list a,b,c,angle. Default=5,5,5,1.5"
),
flag_type="--",
)
no_check_cell: bool = Field(
False,
description="Do not check cell parameters against unit cell. Replaces '-raw' method.",
flag_type="--",
rename_param="no-check-cell",
)
no_check_peaks: bool = Field(
False,
description="Do not verify peaks are accounted for by solution.",
flag_type="--",
rename_param="no-check-peaks",
)
multi: bool = Field(
False, description="Enable multi-lattice indexing.", flag_type="--"
)
wavelength_estimate: Optional[float] = Field(
description="Estimate for X-ray wavelength. Required for some methods.",
flag_type="--",
rename_param="wavelength-estimate",
)
camera_length_estimate: Optional[float] = Field(
description="Estimate for camera distance. Required for some methods.",
flag_type="--",
rename_param="camera-length-estimate",
)
max_indexer_threads: Optional[PositiveInt] = Field(
# 1,
description="Some indexing algos can use multiple threads. In addition to image-based.",
flag_type="--",
rename_param="max-indexer-threads",
)
no_retry: bool = Field(
False,
description="Do not remove weak peaks and try again.",
flag_type="--",
rename_param="no-retry",
)
no_refine: bool = Field(
False,
description="Skip refinement step.",
flag_type="--",
rename_param="no-refine",
)
no_revalidate: bool = Field(
False,
description="Skip revalidation step.",
flag_type="--",
rename_param="no-revalidate",
)
# TakeTwo specific parameters
taketwo_member_threshold: Optional[PositiveInt] = Field(
# 20,
description="Minimum number of vectors to consider.",
flag_type="--",
rename_param="taketwo-member-threshold",
)
taketwo_len_tolerance: Optional[PositiveFloat] = Field(
# 0.001,
description="TakeTwo length tolerance in Angstroms.",
flag_type="--",
rename_param="taketwo-len-tolerance",
)
taketwo_angle_tolerance: Optional[PositiveFloat] = Field(
# 0.6,
description="TakeTwo angle tolerance in degrees.",
flag_type="--",
rename_param="taketwo-angle-tolerance",
)
taketwo_trace_tolerance: Optional[PositiveFloat] = Field(
# 3,
description="Matrix trace tolerance in degrees.",
flag_type="--",
rename_param="taketwo-trace-tolerance",
)
# Felix-specific parameters
# felix_domega
# felix-fraction-max-visits
# felix-max-internal-angle
# felix-max-uniqueness
# felix-min-completeness
# felix-min-visits
# felix-num-voxels
# felix-sigma
# felix-tthrange-max
# felix-tthrange-min
# XGANDALF-specific parameters
xgandalf_sampling_pitch: Optional[NonNegativeInt] = Field(
# 6,
description="Density of reciprocal space sampling.",
flag_type="--",
rename_param="xgandalf-sampling-pitch",
)
xgandalf_grad_desc_iterations: Optional[NonNegativeInt] = Field(
# 4,
description="Number of gradient descent iterations.",
flag_type="--",
rename_param="xgandalf-grad-desc-iterations",
)
xgandalf_tolerance: Optional[PositiveFloat] = Field(
# 0.02,
description="Relative tolerance of lattice vectors",
flag_type="--",
rename_param="xgandalf-tolerance",
)
xgandalf_no_deviation_from_provided_cell: Optional[bool] = Field(
description="Found unit cell must match provided.",
flag_type="--",
rename_param="xgandalf-no-deviation-from-provided-cell",
)
xgandalf_min_lattice_vector_length: Optional[PositiveFloat] = Field(
# 30,
description="Minimum possible lattice length.",
flag_type="--",
rename_param="xgandalf-min-lattice-vector-length",
)
xgandalf_max_lattice_vector_length: Optional[PositiveFloat] = Field(
# 250,
description="Minimum possible lattice length.",
flag_type="--",
rename_param="xgandalf-max-lattice-vector-length",
)
xgandalf_max_peaks: Optional[PositiveInt] = Field(
# 250,
description="Maximum number of peaks to use for indexing.",
flag_type="--",
rename_param="xgandalf-max-peaks",
)
xgandalf_fast_execution: bool = Field(
False,
description="Shortcut to set sampling-pitch=2, and grad-desc-iterations=3.",
flag_type="--",
rename_param="xgandalf-fast-execution",
)
# pinkIndexer parameters
# ...
# asdf_fast: bool = Field(False, description="Enable fast mode for asdf. 3x faster for 7% loss in accuracy.", flag_type="--", rename_param="asdf-fast")
# Integration parameters
integration: str = Field(
"rings-nocen", description="Method for integrating reflections.", flag_type="--"
)
fix_profile_radius: Optional[float] = Field(
description="Fix the profile radius (m^{-1})",
flag_type="--",
rename_param="fix-profile-radius",
)
fix_divergence: Optional[float] = Field(
0,
description="Fix the divergence (rad, full angle).",
flag_type="--",
rename_param="fix-divergence",
)
int_radius: str = Field(
"4,5,7",
description="Inner, middle, and outer radii for 3-ring integration.",
flag_type="--",
rename_param="int-radius",
)
int_diag: str = Field(
"none",
description="Show detailed information on integration when condition is met.",
flag_type="--",
rename_param="int-diag",
)
push_res: str = Field(
"infinity",
description="Integrate `x` higher than apparent resolution limit (nm-1).",
flag_type="--",
rename_param="push-res",
)
overpredict: bool = Field(
False,
description="Over-predict reflections. Maybe useful with post-refinement.",
flag_type="--",
)
cell_parameters_only: bool = Field(
False, description="Do not predict refletions at all", flag_type="--"
)
# Output parameters
no_non_hits_in_stream: bool = Field(
False,
description="Exclude non-hits from the stream file.",
flag_type="--",
rename_param="no-non-hits-in-stream",
)
copy_hheader: Optional[str] = Field(
description="Copy information from header in the image to output stream.",
flag_type="--",
rename_param="copy-hheader",
)
no_peaks_in_stream: bool = Field(
False,
description="Do not record peaks in stream file.",
flag_type="--",
rename_param="no-peaks-in-stream",
)
no_refls_in_stream: bool = Field(
False,
description="Do not record reflections in stream.",
flag_type="--",
rename_param="no-refls-in-stream",
)
serial_offset: Optional[PositiveInt] = Field(
description="Start numbering at `x` instead of 1.",
flag_type="--",
rename_param="serial-offset",
)
harvest_file: Optional[str] = Field(
description="Write parameters to file in JSON format.",
flag_type="--",
rename_param="harvest-file",
)
@validator("in_file", always=True)
def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
if in_file == "":
filename: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}", "FindPeaksPyAlgos", "out_file"
)
if filename is None:
exp: str = values["lute_config"].experiment
run: int = int(values["lute_config"].run)
tag: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}", "FindPeaksPsocake", "tag"
)
out_dir: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}", "FindPeaksPsocake", "outDir"
)
if out_dir is not None:
fname: str = f"{out_dir}/{exp}_{run:04d}"
if tag is not None:
fname = f"{fname}_{tag}"
return f"{fname}.lst"
else:
return filename
return in_file
@validator("out_file", always=True)
def validate_out_file(cls, out_file: str, values: Dict[str, Any]) -> str:
if out_file == "":
expmt: str = values["lute_config"].experiment
run: int = int(values["lute_config"].run)
work_dir: str = values["lute_config"].work_dir
fname: str = f"{expmt}_r{run:04d}.stream"
return f"{work_dir}/{fname}"
return out_file
Config
Bases: Config
Source code in lute/io/models/sfx_index.py
class Config(ThirdPartyParameters.Config):
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
long_flags_use_eq: bool = True
"""Whether long command-line arguments are passed like `--long=arg`."""
long_flags_use_eq: bool = True
class-attribute
instance-attribute
Whether long command-line arguments are passed like --long=arg
.
set_result: bool = True
class-attribute
instance-attribute
Whether the Executor should mark a specified parameter as a result.
ManipulateHKLParameters
Bases: ThirdPartyParameters
Parameters for CrystFEL's get_hkl
for manipulating lists of reflections.
This Task is predominantly used internally to convert hkl
to mtz
files.
Note that performing multiple manipulations is undefined behaviour. Run
the Task with multiple configurations in explicit separate steps. For more
information on usage, please refer to the CrystFEL documentation, here:
https://www.desy.de/~twhite/crystfel/manual-partialator.html
Source code in lute/io/models/sfx_merge.py
class ManipulateHKLParameters(ThirdPartyParameters):
"""Parameters for CrystFEL's `get_hkl` for manipulating lists of reflections.
This Task is predominantly used internally to convert `hkl` to `mtz` files.
Note that performing multiple manipulations is undefined behaviour. Run
the Task with multiple configurations in explicit separate steps. For more
information on usage, please refer to the CrystFEL documentation, here:
https://www.desy.de/~twhite/crystfel/manual-partialator.html
"""
class Config(ThirdPartyParameters.Config):
long_flags_use_eq: bool = True
"""Whether long command-line arguments are passed like `--long=arg`."""
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
executable: str = Field(
"/sdf/group/lcls/ds/tools/crystfel/0.10.2/bin/get_hkl",
description="CrystFEL's reflection manipulation binary.",
flag_type="",
)
in_file: str = Field(
"",
description="Path to input HKL file.",
flag_type="-",
rename_param="i",
)
out_file: str = Field(
"",
description="Path to output file.",
flag_type="-",
rename_param="o",
is_result=True,
)
cell_file: str = Field(
"",
description="Path to a file containing unit cell information (PDB or CrystFEL format).",
flag_type="-",
rename_param="p",
)
output_format: str = Field(
"mtz",
description="Output format. One of mtz, mtz-bij, or xds. Otherwise CrystFEL format.",
flag_type="--",
rename_param="output-format",
)
expand: Optional[str] = Field(
description="Reflections will be expanded to fill asymmetric unit of specified point group.",
flag_type="--",
)
# Reducing reflections to higher symmetry
twin: Optional[str] = Field(
description="Reflections equivalent to specified point group will have intensities summed.",
flag_type="--",
)
no_need_all_parts: Optional[bool] = Field(
description="Use with --twin to allow reflections missing a 'twin mate' to be written out.",
flag_type="--",
rename_param="no-need-all-parts",
)
# Noise - Add to data
noise: Optional[bool] = Field(
description="Generate 10% uniform noise.", flag_type="--"
)
poisson: Optional[bool] = Field(
description="Generate Poisson noise. Intensities assumed to be A.U.",
flag_type="--",
)
adu_per_photon: Optional[int] = Field(
description="Use with --poisson to convert A.U. to photons.",
flag_type="--",
rename_param="adu-per-photon",
)
# Remove duplicate reflections
trim_centrics: Optional[bool] = Field(
description="Duplicated reflections (according to symmetry) are removed.",
flag_type="--",
)
# Restrict to template file
template: Optional[str] = Field(
description="Only reflections which also appear in specified file are written out.",
flag_type="--",
)
# Multiplicity
multiplicity: Optional[bool] = Field(
description="Reflections are multiplied by their symmetric multiplicites.",
flag_type="--",
)
# Resolution cutoffs
cutoff_angstroms: Optional[Union[str, int, float]] = Field(
description="Either n, or n1,n2,n3. For n, reflections < n are removed. For n1,n2,n3 anisotropic trunction performed at separate resolution limits for a*, b*, c*.",
flag_type="--",
rename_param="cutoff-angstroms",
)
lowres: Optional[float] = Field(
description="Remove reflections with d > n", flag_type="--"
)
highres: Optional[float] = Field(
description="Synonym for first form of --cutoff-angstroms"
)
reindex: Optional[str] = Field(
description="Reindex according to specified operator. E.g. k,h,-l.",
flag_type="--",
)
# Override input symmetry
symmetry: Optional[str] = Field(
description="Point group symmetry to use to override. Almost always OMIT this option.",
flag_type="--",
)
@validator("in_file", always=True)
def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
if in_file == "":
partialator_file: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}", "MergePartialator", "out_file"
)
if partialator_file:
return partialator_file
return in_file
@validator("out_file", always=True)
def validate_out_file(cls, out_file: str, values: Dict[str, Any]) -> str:
if out_file == "":
partialator_file: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}", "MergePartialator", "out_file"
)
if partialator_file:
mtz_out: str = partialator_file.split(".")[0]
mtz_out = f"{mtz_out}.mtz"
return mtz_out
return out_file
@validator("cell_file", always=True)
def validate_cell_file(cls, cell_file: str, values: Dict[str, Any]) -> str:
if cell_file == "":
idx_cell_file: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}",
"IndexCrystFEL",
"cell_file",
valid_only=False,
)
if idx_cell_file:
return idx_cell_file
return cell_file
Config
Bases: Config
Source code in lute/io/models/sfx_merge.py
class Config(ThirdPartyParameters.Config):
long_flags_use_eq: bool = True
"""Whether long command-line arguments are passed like `--long=arg`."""
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
long_flags_use_eq: bool = True
class-attribute
instance-attribute
Whether long command-line arguments are passed like --long=arg
.
set_result: bool = True
class-attribute
instance-attribute
Whether the Executor should mark a specified parameter as a result.
MergePartialatorParameters
Bases: ThirdPartyParameters
Parameters for CrystFEL's partialator
.
There are many parameters, and many combinations. For more information on usage, please refer to the CrystFEL documentation, here: https://www.desy.de/~twhite/crystfel/manual-partialator.html
Source code in lute/io/models/sfx_merge.py
class MergePartialatorParameters(ThirdPartyParameters):
"""Parameters for CrystFEL's `partialator`.
There are many parameters, and many combinations. For more information on
usage, please refer to the CrystFEL documentation, here:
https://www.desy.de/~twhite/crystfel/manual-partialator.html
"""
class Config(ThirdPartyParameters.Config):
long_flags_use_eq: bool = True
"""Whether long command-line arguments are passed like `--long=arg`."""
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
executable: str = Field(
"/sdf/group/lcls/ds/tools/crystfel/0.10.2/bin/partialator",
description="CrystFEL's Partialator binary.",
flag_type="",
)
in_file: Optional[str] = Field(
"", description="Path to input stream.", flag_type="-", rename_param="i"
)
out_file: str = Field(
"",
description="Path to output file.",
flag_type="-",
rename_param="o",
is_result=True,
)
symmetry: str = Field(description="Point group symmetry.", flag_type="--")
niter: Optional[int] = Field(
description="Number of cycles of scaling and post-refinement.",
flag_type="-",
rename_param="n",
)
no_scale: Optional[bool] = Field(
description="Disable scaling.", flag_type="--", rename_param="no-scale"
)
no_Bscale: Optional[bool] = Field(
description="Disable Debye-Waller part of scaling.",
flag_type="--",
rename_param="no-Bscale",
)
no_pr: Optional[bool] = Field(
description="Disable orientation model.", flag_type="--", rename_param="no-pr"
)
no_deltacchalf: Optional[bool] = Field(
description="Disable rejection based on deltaCC1/2.",
flag_type="--",
rename_param="no-deltacchalf",
)
model: str = Field(
"unity",
description="Partiality model. Options: xsphere, unity, offset, ggpm.",
flag_type="--",
)
nthreads: int = Field(
max(int(os.environ.get("SLURM_NPROCS", len(os.sched_getaffinity(0)))) - 1, 1),
description="Number of parallel analyses.",
flag_type="-",
rename_param="j",
)
polarisation: Optional[str] = Field(
description="Specification of incident polarisation. Refer to CrystFEL docs for more info.",
flag_type="--",
)
no_polarisation: Optional[bool] = Field(
description="Synonym for --polarisation=none",
flag_type="--",
rename_param="no-polarisation",
)
max_adu: Optional[float] = Field(
description="Maximum intensity of reflection to include.",
flag_type="--",
rename_param="max-adu",
)
min_res: Optional[float] = Field(
description="Only include crystals diffracting to a minimum resolution.",
flag_type="--",
rename_param="min-res",
)
min_measurements: int = Field(
2,
description="Include a reflection only if it appears a minimum number of times.",
flag_type="--",
rename_param="min-measurements",
)
push_res: Optional[float] = Field(
description="Merge reflections up to higher than the apparent resolution limit.",
flag_type="--",
rename_param="push-res",
)
start_after: int = Field(
0,
description="Ignore the first n crystals.",
flag_type="--",
rename_param="start-after",
)
stop_after: int = Field(
0,
description="Stop after processing n crystals. 0 means process all.",
flag_type="--",
rename_param="stop-after",
)
no_free: Optional[bool] = Field(
description="Disable cross-validation. Testing ONLY.",
flag_type="--",
rename_param="no-free",
)
custom_split: Optional[str] = Field(
description="Read a set of filenames, event and dataset IDs from a filename.",
flag_type="--",
rename_param="custom-split",
)
max_rel_B: float = Field(
100,
description="Reject crystals if |relB| > n sq Angstroms.",
flag_type="--",
rename_param="max-rel-B",
)
output_every_cycle: bool = Field(
False,
description="Write per-crystal params after every refinement cycle.",
flag_type="--",
rename_param="output-every-cycle",
)
no_logs: bool = Field(
False,
description="Do not write logs needed for plots, maps and graphs.",
flag_type="--",
rename_param="no-logs",
)
set_symmetry: Optional[str] = Field(
description="Set the apparent symmetry of the crystals to a point group.",
flag_type="-",
rename_param="w",
)
operator: Optional[str] = Field(
description="Specify an ambiguity operator. E.g. k,h,-l.", flag_type="--"
)
force_bandwidth: Optional[float] = Field(
description="Set X-ray bandwidth. As percent, e.g. 0.0013 (0.13%).",
flag_type="--",
rename_param="force-bandwidth",
)
force_radius: Optional[float] = Field(
description="Set the initial profile radius (nm-1).",
flag_type="--",
rename_param="force-radius",
)
force_lambda: Optional[float] = Field(
description="Set the wavelength. In Angstroms.",
flag_type="--",
rename_param="force-lambda",
)
harvest_file: Optional[str] = Field(
description="Write parameters to file in JSON format.",
flag_type="--",
rename_param="harvest-file",
)
@validator("in_file", always=True)
def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
if in_file == "":
stream_file: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}",
"ConcatenateStreamFiles",
"out_file",
)
if stream_file:
return stream_file
return in_file
@validator("out_file", always=True)
def validate_out_file(cls, out_file: str, values: Dict[str, Any]) -> str:
if out_file == "":
in_file: str = values["in_file"]
if in_file:
tag: str = in_file.split(".")[0]
return f"{tag}.hkl"
else:
return "partialator.hkl"
return out_file
Config
Bases: Config
Source code in lute/io/models/sfx_merge.py
class Config(ThirdPartyParameters.Config):
long_flags_use_eq: bool = True
"""Whether long command-line arguments are passed like `--long=arg`."""
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
long_flags_use_eq: bool = True
class-attribute
instance-attribute
Whether long command-line arguments are passed like --long=arg
.
set_result: bool = True
class-attribute
instance-attribute
Whether the Executor should mark a specified parameter as a result.
RunSHELXCParameters
Bases: ThirdPartyParameters
Parameters for CCP4's SHELXC program.
SHELXC prepares files for SHELXD and SHELXE.
For more information please refer to the official documentation: https://www.ccp4.ac.uk/html/crank.html
Source code in lute/io/models/sfx_solve.py
class RunSHELXCParameters(ThirdPartyParameters):
"""Parameters for CCP4's SHELXC program.
SHELXC prepares files for SHELXD and SHELXE.
For more information please refer to the official documentation:
https://www.ccp4.ac.uk/html/crank.html
"""
executable: str = Field(
"/sdf/group/lcls/ds/tools/ccp4-8.0/bin/shelxc",
description="CCP4 SHELXC. Generates input files for SHELXD/SHELXE.",
flag_type="",
)
placeholder: str = Field(
"xx", description="Placeholder filename stem.", flag_type=""
)
in_file: str = Field(
"",
description="Input file for SHELXC with reflections AND proper records.",
flag_type="",
)
@validator("in_file", always=True)
def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
if in_file == "":
# get_hkl needed to be run to produce an XDS format file...
xds_format_file: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}", "ManipulateHKL", "out_file"
)
if xds_format_file:
in_file = xds_format_file
if in_file[0] != "<":
# Need to add a redirection for this program
# Runs like `shelxc xx <input_file.xds`
in_file = f"<{in_file}"
return in_file
SubmitSMDParameters
Bases: ThirdPartyParameters
Parameters for running smalldata to produce reduced HDF5 files.
Source code in lute/io/models/smd.py
class SubmitSMDParameters(ThirdPartyParameters):
"""Parameters for running smalldata to produce reduced HDF5 files."""
class Config(ThirdPartyParameters.Config):
"""Identical to super-class Config but includes a result."""
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
result_from_params: str = ""
"""Defines a result from the parameters. Use a validator to do so."""
executable: str = Field("mpirun", description="MPI executable.", flag_type="")
np: PositiveInt = Field(
max(int(os.environ.get("SLURM_NPROCS", len(os.sched_getaffinity(0)))) - 1, 1),
description="Number of processes",
flag_type="-",
)
p_arg1: str = Field(
"python", description="Executable to run with mpi (i.e. python).", flag_type=""
)
u: str = Field(
"", description="Python option for unbuffered output.", flag_type="-"
)
m: str = Field(
"mpi4py.run",
description="Python option to execute a module's contents as __main__ module.",
flag_type="-",
)
producer: str = Field(
"", description="Path to the SmallData producer Python script.", flag_type=""
)
run: str = Field(
os.environ.get("RUN_NUM", ""), description="DAQ Run Number.", flag_type="--"
)
experiment: str = Field(
os.environ.get("EXPERIMENT", ""),
description="LCLS Experiment Number.",
flag_type="--",
)
stn: NonNegativeInt = Field(0, description="Hutch endstation.", flag_type="--")
nevents: int = Field(
int(1e9), description="Number of events to process.", flag_type="--"
)
directory: Optional[str] = Field(
None,
description="Optional output directory. If None, will be in ${EXP_FOLDER}/hdf5/smalldata.",
flag_type="--",
)
## Need mechanism to set result_from_param=True ...
gather_interval: PositiveInt = Field(
25, description="Number of events to collect at a time.", flag_type="--"
)
norecorder: bool = Field(
False, description="Whether to ignore recorder streams.", flag_type="--"
)
url: HttpUrl = Field(
"https://pswww.slac.stanford.edu/ws-auth/lgbk",
description="Base URL for eLog posting.",
flag_type="--",
)
epicsAll: bool = Field(
False,
description="Whether to store all EPICS PVs. Use with care.",
flag_type="--",
)
full: bool = Field(
False,
description="Whether to store all data. Use with EXTRA care.",
flag_type="--",
)
fullSum: bool = Field(
False,
description="Whether to store sums for all area detector images.",
flag_type="--",
)
default: bool = Field(
False,
description="Whether to store only the default minimal set of data.",
flag_type="--",
)
image: bool = Field(
False,
description="Whether to save everything as images. Use with care.",
flag_type="--",
)
tiff: bool = Field(
False,
description="Whether to save all images as a single TIFF. Use with EXTRA care.",
flag_type="--",
)
centerpix: bool = Field(
False,
description="Whether to mask center pixels for Epix10k2M detectors.",
flag_type="--",
)
postRuntable: bool = Field(
False,
description="Whether to post run tables. Also used as a trigger for summary jobs.",
flag_type="--",
)
wait: bool = Field(
False, description="Whether to wait for a file to appear.", flag_type="--"
)
xtcav: bool = Field(
False,
description="Whether to add XTCAV processing to the HDF5 generation.",
flag_type="--",
)
noarch: bool = Field(
False, description="Whether to not use archiver data.", flag_type="--"
)
lute_template_cfg: TemplateConfig = TemplateConfig(template_name="", output_path="")
@validator("producer", always=True)
def validate_producer_path(cls, producer: str) -> str:
return producer
@validator("lute_template_cfg", always=True)
def use_producer(
cls, lute_template_cfg: TemplateConfig, values: Dict[str, Any]
) -> TemplateConfig:
if not lute_template_cfg.output_path:
lute_template_cfg.output_path = values["producer"]
return lute_template_cfg
@root_validator(pre=False)
def define_result(cls, values: Dict[str, Any]) -> Dict[str, Any]:
exp: str = values["lute_config"].experiment
hutch: str = exp[:3]
run: int = int(values["lute_config"].run)
directory: Optional[str] = values["directory"]
if directory is None:
directory = f"/sdf/data/lcls/ds/{hutch}/{exp}/hdf5/smalldata"
fname: str = f"{exp}_Run{run:04d}.h5"
cls.Config.result_from_params = f"{directory}/{fname}"
return values
Config
Bases: Config
Identical to super-class Config but includes a result.
Source code in lute/io/models/smd.py
class Config(ThirdPartyParameters.Config):
"""Identical to super-class Config but includes a result."""
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
result_from_params: str = ""
"""Defines a result from the parameters. Use a validator to do so."""
result_from_params: str = ''
class-attribute
instance-attribute
Defines a result from the parameters. Use a validator to do so.
set_result: bool = True
class-attribute
instance-attribute
Whether the Executor should mark a specified parameter as a result.
TaskParameters
Bases: BaseSettings
Base class for models of task parameters to be validated.
Parameters are read from a configuration YAML file and validated against subclasses of this type in order to ensure that both all parameters are present, and that the parameters are of the correct type.
Note
Pydantic is used for data validation. Pydantic does not perform "strict" validation by default. Parameter values may be cast to conform with the model specified by the subclass definition if it is possible to do so. Consider whether this may cause issues (e.g. if a float is cast to an int).
Source code in lute/io/models/base.py
class TaskParameters(BaseSettings):
"""Base class for models of task parameters to be validated.
Parameters are read from a configuration YAML file and validated against
subclasses of this type in order to ensure that both all parameters are
present, and that the parameters are of the correct type.
Note:
Pydantic is used for data validation. Pydantic does not perform "strict"
validation by default. Parameter values may be cast to conform with the
model specified by the subclass definition if it is possible to do so.
Consider whether this may cause issues (e.g. if a float is cast to an
int).
"""
class Config:
"""Configuration for parameters model.
The Config class holds Pydantic configuration. A number of LUTE-specific
configuration has also been placed here.
Attributes:
env_prefix (str): Pydantic configuration. Will set parameters from
environment variables containing this prefix. E.g. a model
parameter `input` can be set with an environment variable:
`{env_prefix}input`, in LUTE's case `LUTE_input`.
underscore_attrs_are_private (bool): Pydantic configuration. Whether
to hide attributes (parameters) prefixed with an underscore.
copy_on_model_validation (str): Pydantic configuration. How to copy
the input object passed to the class instance for model
validation. Set to perform a deep copy.
allow_inf_nan (bool): Pydantic configuration. Whether to allow
infinity or NAN in float fields.
run_directory (Optional[str]): None. If set, it should be a valid
path. The `Task` will be run from this directory. This may be
useful for some `Task`s which rely on searching the working
directory.
set_result (bool). False. If True, the model has information about
setting the TaskResult object from the parameters it contains.
E.g. it has an `output` parameter which is marked as the result.
The result can be set with a field value of `is_result=True` on
a specific parameter, or using `result_from_params` and a
validator.
result_from_params (Optional[str]): None. Optionally used to define
results from information available in the model using a custom
validator. E.g. use a `outdir` and `filename` field to set
`result_from_params=f"{outdir}/{filename}`, etc. Only used if
`set_result==True`
result_summary (Optional[str]): None. Defines a result summary that
can be known after processing the Pydantic model. Use of summary
depends on the Executor running the Task. All summaries are
stored in the database, however. Only used if `set_result==True`
impl_schemas (Optional[str]). Specifies a the schemas the
output/results conform to. Only used if `set_result==True`.
"""
env_prefix = "LUTE_"
underscore_attrs_are_private: bool = True
copy_on_model_validation: str = "deep"
allow_inf_nan: bool = False
run_directory: Optional[str] = None
"""Set the directory that the Task is run from."""
set_result: bool = False
"""Whether the Executor should mark a specified parameter as a result."""
result_from_params: Optional[str] = None
"""Defines a result from the parameters. Use a validator to do so."""
result_summary: Optional[str] = None
"""Format a TaskResult.summary from output."""
impl_schemas: Optional[str] = None
"""Schema specification for output result. Will be passed to TaskResult."""
lute_config: AnalysisHeader
Config
Configuration for parameters model.
The Config class holds Pydantic configuration. A number of LUTE-specific configuration has also been placed here.
Attributes: |
|
---|
Source code in lute/io/models/base.py
class Config:
"""Configuration for parameters model.
The Config class holds Pydantic configuration. A number of LUTE-specific
configuration has also been placed here.
Attributes:
env_prefix (str): Pydantic configuration. Will set parameters from
environment variables containing this prefix. E.g. a model
parameter `input` can be set with an environment variable:
`{env_prefix}input`, in LUTE's case `LUTE_input`.
underscore_attrs_are_private (bool): Pydantic configuration. Whether
to hide attributes (parameters) prefixed with an underscore.
copy_on_model_validation (str): Pydantic configuration. How to copy
the input object passed to the class instance for model
validation. Set to perform a deep copy.
allow_inf_nan (bool): Pydantic configuration. Whether to allow
infinity or NAN in float fields.
run_directory (Optional[str]): None. If set, it should be a valid
path. The `Task` will be run from this directory. This may be
useful for some `Task`s which rely on searching the working
directory.
set_result (bool). False. If True, the model has information about
setting the TaskResult object from the parameters it contains.
E.g. it has an `output` parameter which is marked as the result.
The result can be set with a field value of `is_result=True` on
a specific parameter, or using `result_from_params` and a
validator.
result_from_params (Optional[str]): None. Optionally used to define
results from information available in the model using a custom
validator. E.g. use a `outdir` and `filename` field to set
`result_from_params=f"{outdir}/{filename}`, etc. Only used if
`set_result==True`
result_summary (Optional[str]): None. Defines a result summary that
can be known after processing the Pydantic model. Use of summary
depends on the Executor running the Task. All summaries are
stored in the database, however. Only used if `set_result==True`
impl_schemas (Optional[str]). Specifies a the schemas the
output/results conform to. Only used if `set_result==True`.
"""
env_prefix = "LUTE_"
underscore_attrs_are_private: bool = True
copy_on_model_validation: str = "deep"
allow_inf_nan: bool = False
run_directory: Optional[str] = None
"""Set the directory that the Task is run from."""
set_result: bool = False
"""Whether the Executor should mark a specified parameter as a result."""
result_from_params: Optional[str] = None
"""Defines a result from the parameters. Use a validator to do so."""
result_summary: Optional[str] = None
"""Format a TaskResult.summary from output."""
impl_schemas: Optional[str] = None
"""Schema specification for output result. Will be passed to TaskResult."""
impl_schemas: Optional[str] = None
class-attribute
instance-attribute
Schema specification for output result. Will be passed to TaskResult.
result_from_params: Optional[str] = None
class-attribute
instance-attribute
Defines a result from the parameters. Use a validator to do so.
result_summary: Optional[str] = None
class-attribute
instance-attribute
Format a TaskResult.summary from output.
run_directory: Optional[str] = None
class-attribute
instance-attribute
Set the directory that the Task is run from.
set_result: bool = False
class-attribute
instance-attribute
Whether the Executor should mark a specified parameter as a result.
TemplateConfig
Bases: BaseModel
Parameters used for templating of third party configuration files.
Attributes: |
|
---|
Source code in lute/io/models/base.py
class TemplateConfig(BaseModel):
"""Parameters used for templating of third party configuration files.
Attributes:
template_name (str): The name of the template to use. This template must
live in `config/templates`.
output_path (str): The FULL path, including filename to write the
rendered template to.
"""
template_name: str
output_path: str
TemplateParameters
Class for representing parameters for third party configuration files.
These parameters can represent arbitrary data types and are used in
conjunction with templates for modifying third party configuration files
from the single LUTE YAML. Due to the storage of arbitrary data types, and
the use of a template file, a single instance of this class can hold from a
single template variable to an entire configuration file. The data parsing
is done by jinja using the complementary template.
All data is stored in the single model variable params.
The pydantic "dataclass" is used over the BaseModel/Settings to allow
positional argument instantiation of the params
Field.
Source code in lute/io/models/base.py
@dataclass
class TemplateParameters:
"""Class for representing parameters for third party configuration files.
These parameters can represent arbitrary data types and are used in
conjunction with templates for modifying third party configuration files
from the single LUTE YAML. Due to the storage of arbitrary data types, and
the use of a template file, a single instance of this class can hold from a
single template variable to an entire configuration file. The data parsing
is done by jinja using the complementary template.
All data is stored in the single model variable `params.`
The pydantic "dataclass" is used over the BaseModel/Settings to allow
positional argument instantiation of the `params` Field.
"""
params: Any
TestBinaryErrParameters
Bases: ThirdPartyParameters
Same as TestBinary, but exits with non-zero code.
Source code in lute/io/models/tests.py
class TestBinaryErrParameters(ThirdPartyParameters):
"""Same as TestBinary, but exits with non-zero code."""
executable: str = Field(
"/sdf/home/d/dorlhiac/test_tasks/test_threads_err",
description="Multi-threaded tes tbinary with non-zero exit code.",
)
p_arg1: int = Field(1, description="Number of threads.")
TestParameters
Bases: TaskParameters
Parameters for the test Task Test
.
Source code in lute/io/models/tests.py
class TestParameters(TaskParameters):
"""Parameters for the test Task `Test`."""
float_var: float = Field(0.01, description="A floating point number.")
str_var: str = Field("test", description="A string.")
class CompoundVar(BaseModel):
int_var: int = 1
dict_var: Dict[str, str] = {"a": "b"}
compound_var: CompoundVar = Field(
description=(
"A compound parameter - consists of a `int_var` (int) and `dict_var`"
" (Dict[str, str])."
)
)
throw_error: bool = Field(
False, description="If `True`, raise an exception to test error handling."
)
ThirdPartyParameters
Bases: TaskParameters
Base class for third party task parameters.
Contains special validators for extra arguments and handling of parameters used for filling in third party configuration files.
Source code in lute/io/models/base.py
class ThirdPartyParameters(TaskParameters):
"""Base class for third party task parameters.
Contains special validators for extra arguments and handling of parameters
used for filling in third party configuration files.
"""
class Config(TaskParameters.Config):
"""Configuration for parameters model.
The Config class holds Pydantic configuration and inherited configuration
from the base `TaskParameters.Config` class. A number of values are also
overridden, and there are some specific configuration options to
ThirdPartyParameters. A full list of options (with TaskParameters options
repeated) is described below.
Attributes:
env_prefix (str): Pydantic configuration. Will set parameters from
environment variables containing this prefix. E.g. a model
parameter `input` can be set with an environment variable:
`{env_prefix}input`, in LUTE's case `LUTE_input`.
underscore_attrs_are_private (bool): Pydantic configuration. Whether
to hide attributes (parameters) prefixed with an underscore.
copy_on_model_validation (str): Pydantic configuration. How to copy
the input object passed to the class instance for model
validation. Set to perform a deep copy.
allow_inf_nan (bool): Pydantic configuration. Whether to allow
infinity or NAN in float fields.
run_directory (Optional[str]): None. If set, it should be a valid
path. The `Task` will be run from this directory. This may be
useful for some `Task`s which rely on searching the working
directory.
set_result (bool). True. If True, the model has information about
setting the TaskResult object from the parameters it contains.
E.g. it has an `output` parameter which is marked as the result.
The result can be set with a field value of `is_result=True` on
a specific parameter, or using `result_from_params` and a
validator.
result_from_params (Optional[str]): None. Optionally used to define
results from information available in the model using a custom
validator. E.g. use a `outdir` and `filename` field to set
`result_from_params=f"{outdir}/{filename}`, etc.
result_summary (Optional[str]): None. Defines a result summary that
can be known after processing the Pydantic model. Use of summary
depends on the Executor running the Task. All summaries are
stored in the database, however.
impl_schemas (Optional[str]). Specifies a the schemas the
output/results conform to. Only used if set_result is True.
-----------------------
ThirdPartyTask-specific:
extra (str): "allow". Pydantic configuration. Allow (or ignore) extra
arguments.
short_flags_use_eq (bool): False. If True, "short" command-line args
are passed as `-x=arg`. ThirdPartyTask-specific.
long_flags_use_eq (bool): False. If True, "long" command-line args
are passed as `--long=arg`. ThirdPartyTask-specific.
"""
extra: str = "allow"
short_flags_use_eq: bool = False
"""Whether short command-line arguments are passed like `-x=arg`."""
long_flags_use_eq: bool = False
"""Whether long command-line arguments are passed like `--long=arg`."""
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
# lute_template_cfg: TemplateConfig
@root_validator(pre=False)
def extra_fields_to_thirdparty(cls, values: Dict[str, Any]):
for key in values:
if key not in cls.__fields__:
values[key] = TemplateParameters(values[key])
return values
Config
Bases: Config
Configuration for parameters model.
The Config class holds Pydantic configuration and inherited configuration
from the base TaskParameters.Config
class. A number of values are also
overridden, and there are some specific configuration options to
ThirdPartyParameters. A full list of options (with TaskParameters options
repeated) is described below.
Attributes: |
|
---|
Source code in lute/io/models/base.py
class Config(TaskParameters.Config):
"""Configuration for parameters model.
The Config class holds Pydantic configuration and inherited configuration
from the base `TaskParameters.Config` class. A number of values are also
overridden, and there are some specific configuration options to
ThirdPartyParameters. A full list of options (with TaskParameters options
repeated) is described below.
Attributes:
env_prefix (str): Pydantic configuration. Will set parameters from
environment variables containing this prefix. E.g. a model
parameter `input` can be set with an environment variable:
`{env_prefix}input`, in LUTE's case `LUTE_input`.
underscore_attrs_are_private (bool): Pydantic configuration. Whether
to hide attributes (parameters) prefixed with an underscore.
copy_on_model_validation (str): Pydantic configuration. How to copy
the input object passed to the class instance for model
validation. Set to perform a deep copy.
allow_inf_nan (bool): Pydantic configuration. Whether to allow
infinity or NAN in float fields.
run_directory (Optional[str]): None. If set, it should be a valid
path. The `Task` will be run from this directory. This may be
useful for some `Task`s which rely on searching the working
directory.
set_result (bool). True. If True, the model has information about
setting the TaskResult object from the parameters it contains.
E.g. it has an `output` parameter which is marked as the result.
The result can be set with a field value of `is_result=True` on
a specific parameter, or using `result_from_params` and a
validator.
result_from_params (Optional[str]): None. Optionally used to define
results from information available in the model using a custom
validator. E.g. use a `outdir` and `filename` field to set
`result_from_params=f"{outdir}/{filename}`, etc.
result_summary (Optional[str]): None. Defines a result summary that
can be known after processing the Pydantic model. Use of summary
depends on the Executor running the Task. All summaries are
stored in the database, however.
impl_schemas (Optional[str]). Specifies a the schemas the
output/results conform to. Only used if set_result is True.
-----------------------
ThirdPartyTask-specific:
extra (str): "allow". Pydantic configuration. Allow (or ignore) extra
arguments.
short_flags_use_eq (bool): False. If True, "short" command-line args
are passed as `-x=arg`. ThirdPartyTask-specific.
long_flags_use_eq (bool): False. If True, "long" command-line args
are passed as `--long=arg`. ThirdPartyTask-specific.
"""
extra: str = "allow"
short_flags_use_eq: bool = False
"""Whether short command-line arguments are passed like `-x=arg`."""
long_flags_use_eq: bool = False
"""Whether long command-line arguments are passed like `--long=arg`."""
set_result: bool = True
"""Whether the Executor should mark a specified parameter as a result."""
long_flags_use_eq: bool = False
class-attribute
instance-attribute
Whether long command-line arguments are passed like --long=arg
.
set_result: bool = True
class-attribute
instance-attribute
Whether the Executor should mark a specified parameter as a result.
short_flags_use_eq: bool = False
class-attribute
instance-attribute
Whether short command-line arguments are passed like -x=arg
.
parse_config(task_name='test', config_path='')
Parse a configuration file and validate the contents.
Parameters: |
|
---|
Returns: |
|
---|
Raises: |
|
---|
Source code in lute/io/config.py
def parse_config(task_name: str = "test", config_path: str = "") -> TaskParameters:
"""Parse a configuration file and validate the contents.
Args:
task_name (str): Name of the specific task that will be run.
config_path (str): Path to the configuration file.
Returns:
params (TaskParameters): A TaskParameters object of validated
task-specific parameters. Parameters are accessed with "dot"
notation. E.g. `params.param1`.
Raises:
ValidationError: Raised if there are problems with the configuration
file. Passed through from Pydantic.
"""
task_config_name: str = f"{task_name}Parameters"
with open(config_path, "r") as f:
docs: Iterator[Dict[str, Any]] = yaml.load_all(stream=f, Loader=yaml.FullLoader)
header: Dict[str, Any] = next(docs)
config: Dict[str, Any] = next(docs)
substitute_variables(header, header)
substitute_variables(header, config)
LUTE_DEBUG_EXIT("LUTE_DEBUG_EXIT_AT_YAML", pprint.pformat(config))
lute_config: Dict[str, AnalysisHeader] = {"lute_config": AnalysisHeader(**header)}
try:
task_config: Dict[str, Any] = dict(config[task_name])
lute_config.update(task_config)
except KeyError as err:
warnings.warn(
(
f"{task_name} has no parameter definitions in YAML file."
" Attempting default parameter initialization."
)
)
parsed_parameters: TaskParameters = globals()[task_config_name](**lute_config)
return parsed_parameters
substitute_variables(header, config, curr_key=None)
Performs variable substitutions on a dictionary read from config YAML file.
Can be used to define input parameters in terms of other input parameters.
This is similar to functionality employed by validators for parameters in
the specific Task models, but is intended to be more accessible to users.
Variable substitutions are defined using a minimal syntax from Jinja:
{{ experiment }}
defines a substitution of the variable experiment
. The characters {{ }}
can be escaped if the literal symbols are needed in place.
For example, a path to a file can be defined in terms of experiment and run values in the config file: MyTask: experiment: myexp run: 2 special_file: /path/to/{{ experiment }}/{{ run }}/file.inp
Acceptable variables for substitutions are values defined elsewhere in the
YAML file. Environment variables can also be used if prefaced with a $
character. E.g. to get the experiment from an environment variable:
MyTask:
run: 2
special_file: /path/to/{{ $EXPERIMENT }}/{{ run }}/file.inp
Parameters: |
|
---|
Returns: |
|
---|
Source code in lute/io/config.py
def substitute_variables(
header: Dict[str, Any], config: Dict[str, Any], curr_key: Optional[str] = None
) -> None:
"""Performs variable substitutions on a dictionary read from config YAML file.
Can be used to define input parameters in terms of other input parameters.
This is similar to functionality employed by validators for parameters in
the specific Task models, but is intended to be more accessible to users.
Variable substitutions are defined using a minimal syntax from Jinja:
{{ experiment }}
defines a substitution of the variable `experiment`. The characters `{{ }}`
can be escaped if the literal symbols are needed in place.
For example, a path to a file can be defined in terms of experiment and run
values in the config file:
MyTask:
experiment: myexp
run: 2
special_file: /path/to/{{ experiment }}/{{ run }}/file.inp
Acceptable variables for substitutions are values defined elsewhere in the
YAML file. Environment variables can also be used if prefaced with a `$`
character. E.g. to get the experiment from an environment variable:
MyTask:
run: 2
special_file: /path/to/{{ $EXPERIMENT }}/{{ run }}/file.inp
Args:
config (Dict[str, Any]): A dictionary of parsed configuration.
curr_key (Optional[str]): Used to keep track of recursion level when scanning
through iterable items in the config dictionary.
Returns:
subbed_config (Dict[str, Any]): The config dictionary after substitutions
have been made. May be identical to the input if no substitutions are
needed.
"""
_sub_pattern = r"\{\{[^}{]*\}\}"
iterable: Dict[str, Any] = config
if curr_key is not None:
# Need to handle nested levels by interpreting curr_key
keys_by_level: List[str] = curr_key.split(".")
for key in keys_by_level:
iterable = iterable[key]
else:
...
# iterable = config
for param, value in iterable.items():
if isinstance(value, dict):
new_key: str
if curr_key is None:
new_key = param
else:
new_key = f"{curr_key}.{param}"
substitute_variables(header, config, curr_key=new_key)
elif isinstance(value, list):
...
# Scalars str - we skip numeric types
elif isinstance(value, str):
matches: List[str] = re.findall(_sub_pattern, value)
for m in matches:
key_to_sub_maybe_with_fmt: List[str] = m[2:-2].strip().split(":")
key_to_sub: str = key_to_sub_maybe_with_fmt[0]
fmt: Optional[str] = None
if len(key_to_sub_maybe_with_fmt) == 2:
fmt = key_to_sub_maybe_with_fmt[1]
sub: Any
if key_to_sub[0] == "$":
sub = os.getenv(key_to_sub[1:], None)
if sub is None:
print(
f"Environment variable {key_to_sub[1:]} not found! Cannot substitute in YAML config!",
flush=True,
)
continue
# substitutions from env vars will be strings, so convert back
# to numeric in order to perform formatting later on (e.g. {var:04d})
sub = _check_str_numeric(sub)
else:
try:
sub = config
for key in key_to_sub.split("."):
sub = sub[key]
except KeyError:
sub = header[key_to_sub]
pattern: str = (
m.replace("{{", r"\{\{").replace("}}", r"\}\}").replace("$", r"\$")
)
if fmt is not None:
sub = f"{sub:{fmt}}"
else:
sub = f"{sub}"
iterable[param] = re.sub(pattern, sub, iterable[param])
# Reconvert back to numeric values if needed...
iterable[param] = _check_str_numeric(iterable[param])