Skip to content

elog

Provides utilities for communicating with the LCLS eLog.

Make use of various eLog API endpoint to retrieve information or post results.

Functions:

Name Description
get_elog_opr_auth

str): Return an authorization object to interact with eLog API as an opr account for the hutch where exp was conducted.

get_elog_kerberos_auth

Return the authorization headers for the user account submitting the job.

elog_http_request

str, request_type: str, **params): Make an HTTP request to the API endpoint at url.

format_file_for_post

Union[str, tuple, list]): Prepare files according to the specification needed to add them as attachments to eLog posts.

post_elog_message

str, msg: str, tag: Optional[str], title: Optional[str], in_files: List[Union[str, tuple, list]], auth: Optional[Union[HTTPBasicAuth, Dict]] = None) Post a message to the eLog.

post_elog_run_status

Dict[str, Union[str, int, float]], update_url: Optional[str] = None) Post a run status to the summary section on the Workflows>Control tab.

post_elog_run_table

str, run: int, data: Dict[str, Any], auth: Optional[Union[HTTPBasicAuth, Dict]] = None) Update run table in the eLog.

get_elog_runs_by_tag

str, tag: str, auth: Optional[Union[HTTPBasicAuth, Dict]] = None) Return a list of runs with a specific tag.

get_elog_params_by_run

str, params: List[str], runs: Optional[List[int]]) Retrieve the requested parameters by run. If no run is provided, retrieve the requested parameters for all runs.

_get_current_run_status(update_url)

Retrieve the current 'counters' or status for a workflow.

This function is intended to be called from the posting function to allow for incremental updates to the status. It will only work for currently running workflows, as it does not go back to the database, only the JID/ARP.

Parameters:

Name Type Description Default
update_url str

The JID_UPDATE_COUNTERS url.

required

Returns:

Name Type Description
data Dict[str, str]

A dictionary of key:value pairs of currently displayed data.

Source code in lute/io/elog.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def _get_current_run_status(update_url: str) -> Dict[str, Union[str, int, float]]:
    """Retrieve the current 'counters' or status for a workflow.

    This function is intended to be called from the posting function to allow
    for incremental updates to the status. It will only work for currently
    running workflows, as it does not go back to the database, only the JID/ARP.

    Args:
        update_url (str): The JID_UPDATE_COUNTERS url.

    Returns:
        data (Dict[str, str]): A dictionary of key:value pairs of currently
            displayed data.
    """
    import getpass

    if os.getenv("ARP_ROOT_JOB_ID") is None or os.getenv("RUN_NUM") is None:
        raise RuntimeError(
            "Cannot call _get_current_run_status with no ROOT_JOB or RUN_NUM"
        )
    user: str = getpass.getuser()
    replace_counters_parts: List[str] = update_url.split("/")
    exp: str = replace_counters_parts[-2]
    get_url: str = "/".join(replace_counters_parts[:-3])
    get_url = f"{get_url}/{exp}/get_counters"
    job_doc: Dict[str, str] = {
        "_id": cast(str, os.environ.get("ARP_ROOT_JOB_ID")),
        "experiment": exp,
        "run_num": cast(str, os.environ.get("RUN_NUM")),
        "user": user,
    }
    resp: requests.models.Response = requests.post(
        get_url,
        json=job_doc,
        headers={"Authorization": os.environ.get("Authorization")},
    )
    current_status: Dict[str, Union[str, int, float]] = {
        d["key"]: d["value"] for d in resp.json()["value"]
    }
    return current_status

elog_http_request(exp, endpoint, request_type, **params)

Make an HTTP request to the eLog.

This method will determine the proper authorization method and update the passed parameters appropriately. Functions implementing specific endpoint functionality and calling this function should only pass the necessary endpoint-specific parameters and not include the authorization objects.

Parameters:

Name Type Description Default
exp str

Experiment.

required
endpoint str

eLog API endpoint.

required
request_type str

Type of request to make. Recognized options: POST or GET.

required
**params Dict

Endpoint parameters to pass with the HTTP request! Differs depending on the API endpoint. Do not include auth objects.

{}

Returns:

Name Type Description
status_code int

Response status code. Can be checked for errors.

msg str

An error message, or a message saying SUCCESS.

value Optional[Any]

For GET requests ONLY, return the requested information.

Source code in lute/io/elog.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def elog_http_request(
    exp: str, endpoint: str, request_type: str, **params
) -> Tuple[int, str, Optional[Any]]:
    """Make an HTTP request to the eLog.

    This method will determine the proper authorization method and update the
    passed parameters appropriately. Functions implementing specific endpoint
    functionality and calling this function should only pass the necessary
    endpoint-specific parameters and not include the authorization objects.

    Args:
        exp (str): Experiment.

        endpoint (str): eLog API endpoint.

        request_type (str): Type of request to make. Recognized options: POST or
            GET.

        **params (Dict): Endpoint parameters to pass with the HTTP request!
            Differs depending on the API endpoint. Do not include auth objects.

    Returns:
        status_code (int): Response status code. Can be checked for errors.

        msg (str): An error message, or a message saying SUCCESS.

        value (Optional[Any]): For GET requests ONLY, return the requested
            information.
    """
    auth: Union[HTTPBasicAuth, Dict[str, str]] = get_elog_auth(exp)
    base_url: str
    if isinstance(auth, HTTPBasicAuth):
        params.update({"auth": auth})
        base_url = "https://pswww.slac.stanford.edu/ws-auth/lgbk/lgbk"
    elif isinstance(auth, dict):
        params.update({"headers": auth})
        base_url = "https://pswww.slac.stanford.edu/ws-kerb/lgbk/lgbk"

    url: str = f"{base_url}/{endpoint}"

    resp: requests.models.Response
    if request_type.upper() == "POST":
        resp = requests.post(url, **params)
    elif request_type.upper() == "GET":
        resp = requests.get(url, **params)
    else:
        return (-1, "Invalid request type!", None)

    status_code: int = resp.status_code
    msg: str = "SUCCESS"

    if resp.json()["success"] and request_type.upper() == "GET":
        return (status_code, msg, resp.json()["value"])

    if status_code >= 300:
        msg = f"Error when posting to eLog: Response {status_code}"

    if not resp.json()["success"]:
        err_msg = resp.json()["error_msg"]
        msg += f"\nInclude message: {err_msg}"
    return (resp.status_code, msg, None)

format_file_for_post(in_file)

Format a file for attachment to an eLog post.

The eLog API expects a specifically formatted tuple when adding file attachments. This function prepares the tuple to specification given a number of different input types.

Parameters:

Name Type Description Default
in_file str | tuple | list

File to include as an attachment in an eLog post.

required
Source code in lute/io/elog.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def format_file_for_post(
    in_file: Union[str, tuple, list]
) -> Tuple[str, Tuple[str, BufferedReader], Any]:
    """Format a file for attachment to an eLog post.

    The eLog API expects a specifically formatted tuple when adding file
    attachments. This function prepares the tuple to specification given a
    number of different input types.

    Args:
        in_file (str | tuple | list): File to include as an attachment in an
            eLog post.
    """
    description: str
    fptr: BufferedReader
    ftype: Optional[str]
    if isinstance(in_file, str):
        description = os.path.basename(in_file)
        fptr = open(in_file, "rb")
        ftype = mimetypes.guess_type(in_file)[0]
    elif isinstance(in_file, tuple) or isinstance(in_file, list):
        description = in_file[1]
        fptr = open(in_file[0], "rb")
        ftype = mimetypes.guess_type(in_file[0])[0]
    else:
        raise ElogFileFormatError(f"Unrecognized format: {in_file}")

    out_file: Tuple[str, Tuple[str, BufferedReader], Any] = (
        "files",
        (description, fptr),
        ftype,
    )
    return out_file

get_elog_active_expmt(hutch, *, endstation=0)

Get the current active experiment for a hutch.

This function is one of two functions to manage the HTTP request independently. This is because it does not require an authorization object, and its result is needed for the generic function elog_http_request to work properly.

Parameters:

Name Type Description Default
hutch str

The hutch to get the active experiment for.

required
endstation int

The hutch endstation to get the experiment for. This should generally be 0.

0
Source code in lute/io/elog.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def get_elog_active_expmt(hutch: str, *, endstation: int = 0) -> str:
    """Get the current active experiment for a hutch.

    This function is one of two functions to manage the HTTP request independently.
    This is because it does not require an authorization object, and its result
    is needed for the generic function `elog_http_request` to work properly.

    Args:
        hutch (str): The hutch to get the active experiment for.

        endstation (int): The hutch endstation to get the experiment for. This
            should generally be 0.
    """

    base_url: str = "https://pswww.slac.stanford.edu/ws/lgbk/lgbk"
    endpoint: str = "ws/activeexperiment_for_instrument_station"
    url: str = f"{base_url}/{endpoint}"
    params: Dict[str, str] = {"instrument_name": hutch, "station": f"{endstation}"}
    resp: requests.models.Response = requests.get(url, params)
    if resp.status_code > 300:
        raise RuntimeError(
            f"Error getting current experiment!\n\t\tIncorrect hutch: '{hutch}'?"
        )
    if resp.json()["success"]:
        return resp.json()["value"]["name"]
    else:
        msg: str = resp.json()["error_msg"]
        raise RuntimeError(f"Error getting current experiment! Err: {msg}")

get_elog_auth(exp)

Determine the appropriate auth method depending on experiment state.

Returns:

Name Type Description
auth HTTPBasicAuth | Dict[str, str]

Depending on whether an experiment is active/live, returns authorization for the hutch operator account or the current user submitting a job.

Source code in lute/io/elog.py
182
183
184
185
186
187
188
189
190
191
192
193
194
def get_elog_auth(exp: str) -> Union[HTTPBasicAuth, Dict[str, str]]:
    """Determine the appropriate auth method depending on experiment state.

    Returns:
        auth (HTTPBasicAuth | Dict[str, str]): Depending on whether an experiment
            is active/live, returns authorization for the hutch operator account
            or the current user submitting a job.
    """
    hutch: str = exp[:3]
    if exp.lower() == get_elog_active_expmt(hutch=hutch).lower():
        return get_elog_opr_auth(exp)
    else:
        return get_elog_kerberos_auth()

get_elog_kerberos_auth()

Returns Kerberos authorization key.

This functions returns authorization for the USER account submitting jobs. It assumes that kinit has been run.

Returns:

Name Type Description
auth Dict[str, str]

Dictionary containing Kerberos authorization key.

Source code in lute/io/elog.py
215
216
217
218
219
220
221
222
223
224
225
226
def get_elog_kerberos_auth() -> Dict[str, str]:
    """Returns Kerberos authorization key.

    This functions returns authorization for the USER account submitting jobs.
    It assumes that `kinit` has been run.

    Returns:
        auth (Dict[str, str]): Dictionary containing Kerberos authorization key.
    """
    from krtc import KerberosTicket  # type: ignore

    return KerberosTicket("HTTP@pswww.slac.stanford.edu").getAuthHeaders()

get_elog_opr_auth(exp)

Produce authentication for the "opr" user associated to an experiment.

This method uses basic authentication using username and password.

Parameters:

Name Type Description Default
exp str

Name of the experiment to produce authentication for.

required

Returns:

Name Type Description
auth HTTPBasicAuth

HTTPBasicAuth for an active experiment based on username and password for the associated operator account.

Source code in lute/io/elog.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def get_elog_opr_auth(exp: str) -> HTTPBasicAuth:
    """Produce authentication for the "opr" user associated to an experiment.

    This method uses basic authentication using username and password.

    Args:
        exp (str): Name of the experiment to produce authentication for.

    Returns:
        auth (HTTPBasicAuth): HTTPBasicAuth for an active experiment based on
            username and password for the associated operator account.
    """
    opr: str = f"{exp[:3]}opr"
    with open("/sdf/group/lcls/ds/tools/forElogPost.txt", "r") as f:
        pw: str = f.readline()[:-1]
    return HTTPBasicAuth(opr, pw)

get_elog_params_by_run(exp, params, runs=None)

Retrieve requested parameters by run or for all runs.

Parameters:

Name Type Description Default
exp str

Experiment to retrieve parameters for.

required
params List[str]

A list of parameters to retrieve. These can be any parameter recorded in the eLog (PVs, parameters posted by other Tasks, etc.)

required
Source code in lute/io/elog.py
515
516
517
518
519
520
521
522
523
524
525
526
527
def get_elog_params_by_run(
    exp: str, params: List[str], runs: Optional[List[int]] = None
) -> Optional[Dict[str, str]]:
    """Retrieve requested parameters by run or for all runs.

    Args:
        exp (str): Experiment to retrieve parameters for.

        params (List[str]): A list of parameters to retrieve. These can be any
            parameter recorded in the eLog (PVs, parameters posted by other
            Tasks, etc.)
    """
    return None

get_elog_runs_by_tag(exp, tag, auth=None)

Retrieve run numbers with a specified tag.

Parameters:

Name Type Description Default
exp str

Experiment name.

required
tag str

The tag to retrieve runs for.

required
Source code in lute/io/elog.py
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
def get_elog_runs_by_tag(
    exp: str, tag: str, auth: Optional[Union[HTTPBasicAuth, Dict]] = None
) -> List[int]:
    """Retrieve run numbers with a specified tag.

    Args:
        exp (str): Experiment name.

        tag (str): The tag to retrieve runs for.
    """
    endpoint: str = f"{exp}/ws/get_runs_with_tag?tag={tag}"
    params: Dict[str, Any] = {}

    status_code, resp_msg, tagged_runs = elog_http_request(
        exp=exp, endpoint=endpoint, request_type="GET", **params
    )

    if not tagged_runs:
        tagged_runs = []

    return tagged_runs

get_elog_workflows(exp)

Get the current workflow definitions for an experiment.

Returns:

Name Type Description
defns Dict[str, str]

A dictionary of workflow definitions.

Source code in lute/io/elog.py
71
72
73
74
75
76
77
def get_elog_workflows(exp: str) -> Dict[str, str]:
    """Get the current workflow definitions for an experiment.

    Returns:
        defns (Dict[str, str]): A dictionary of workflow definitions.
    """
    raise NotImplementedError

post_elog_message(exp, msg, *, tag, title, in_files=[])

Post a new message to the eLog. Inspired by the elog package.

Parameters:

Name Type Description Default
exp str

Experiment name.

required
msg str

BODY of the eLog post.

required
tag str | None

Optional "tag" to associate with the eLog post.

required
title str | None

Optional title to include in the eLog post.

required
in_files List[str | tuple | list]

Files to include as attachments in the eLog post.

[]

Returns:

Name Type Description
err_msg str | None

If successful, nothing is returned, otherwise, return an error message.

Source code in lute/io/elog.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
def post_elog_message(
    exp: str,
    msg: str,
    *,
    tag: Optional[str],
    title: Optional[str],
    in_files: List[Union[str, tuple, list]] = [],
) -> Optional[str]:
    """Post a new message to the eLog. Inspired by the `elog` package.

    Args:
        exp (str): Experiment name.

        msg (str): BODY of the eLog post.

        tag (str | None): Optional "tag" to associate with the eLog post.

        title (str | None): Optional title to include in the eLog post.

        in_files (List[str | tuple | list]): Files to include as attachments in
            the eLog post.

    Returns:
        err_msg (str | None): If successful, nothing is returned, otherwise,
            return an error message.
    """
    # MOSTLY CORRECT
    out_files: list = []
    for f in in_files:
        try:
            out_files.append(format_file_for_post(in_file=f))
        except ElogFileFormatError as err:
            logger.error(f"ElogFileFormatError: {err}")
    post: Dict[str, str] = {}
    post["log_text"] = msg
    if tag:
        post["log_tags"] = tag
    if title:
        post["log_title"] = title

    endpoint: str = f"{exp}/ws/new_elog_entry"

    params: Dict[str, Any] = {"data": post}

    if out_files:
        params.update({"files": out_files})

    status_code, resp_msg, _ = elog_http_request(
        exp=exp, endpoint=endpoint, request_type="POST", **params
    )

    if resp_msg != "SUCCESS":
        return resp_msg
    # NEED to handle/propagate errors...
    return None

post_elog_run_status(data, update_url=None)

Post a summary to the status/report section of a specific run.

In contrast to most eLog update/post mechanisms, this function searches for a specific environment variable which contains a specific URL for posting. This is updated every job/run as jobs are submitted by the JID. The URL can optionally be passed to this function if it is known.

Parameters:

Name Type Description Default
data Dict[str, Union[str, int, float]]

The data to post to the eLog report section. Formatted in key:value pairs.

required
update_url Optional[str]

Optional update URL. If not provided, the function searches for the corresponding environment variable. If neither is found, the function aborts

None
Source code in lute/io/elog.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def post_elog_run_status(
    data: Mapping[str, Union[str, int, float]], update_url: Optional[str] = None
) -> None:
    """Post a summary to the status/report section of a specific run.

    In contrast to most eLog update/post mechanisms, this function searches
    for a specific environment variable which contains a specific URL for
    posting. This is updated every job/run as jobs are submitted by the JID.
    The URL can optionally be passed to this function if it is known.

    Args:
        data (Dict[str, Union[str, int, float]]): The data to post to the eLog
            report section. Formatted in key:value pairs.

        update_url (Optional[str]): Optional update URL. If not provided, the
            function searches for the corresponding environment variable. If
            neither is found, the function aborts
    """
    if update_url is None:
        update_url = os.environ.get("JID_UPDATE_COUNTERS")
        if update_url is None:
            logger.error("eLog Update Failed! JID_UPDATE_COUNTERS is not defined!")
            return
    current_status: Dict[str, Union[str, int, float]] = _get_current_run_status(
        update_url
    )
    current_status.update(data)
    post_list: List[Dict[str, str]] = [
        {"key": f"{key}", "value": f"{value}"} for key, value in current_status.items()
    ]
    _: requests.models.Response = requests.post(update_url, json=post_list)

post_elog_run_table(exp, run, data)

Post data for eLog run tables.

Parameters:

Name Type Description Default
exp str

Experiment name.

required
run int

Run number corresponding to the data being posted.

required
data Dict[str, Any]

Data to be posted in format data["column_header"] = value.

required

Returns:

Name Type Description
err_msg None | str

If successful, nothing is returned, otherwise, return an error message.

Source code in lute/io/elog.py
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
def post_elog_run_table(
    exp: str,
    run: int,
    data: Dict[str, Any],
) -> Optional[str]:
    """Post data for eLog run tables.

    Args:
        exp (str): Experiment name.

        run (int): Run number corresponding to the data being posted.

        data (Dict[str, Any]): Data to be posted in format
            data["column_header"] = value.

    Returns:
        err_msg (None | str): If successful, nothing is returned, otherwise,
            return an error message.
    """
    endpoint: str = f"run_control/{exp}/ws/add_run_params"

    params: Dict[str, Any] = {"params": {"run_num": run}, "json": data}

    status_code, resp_msg, _ = elog_http_request(
        exp=exp, endpoint=endpoint, request_type="POST", **params
    )

    if resp_msg != "SUCCESS":
        return resp_msg
    # NEED to handle/propagate errors....
    return None

post_elog_workflow(exp, name, executable, wf_params, *, trigger='run_end', location='S3DF', **trig_args)

Create a new eLog workflow, or update an existing one.

The workflow will run a specific executable as a batch job when the specified trigger occurs. The precise arguments may vary depending on the selected trigger type.

Parameters:

Name Type Description Default
name str

An identifying name for the workflow. E.g. "process data"

required
executable str

Full path to the executable to be run.

required
wf_params str

All command-line parameters for the executable as a string.

required
trigger str

When to trigger execution of the specified executable. One of: - 'manual': Must be manually triggered. No automatic processing. - 'run_start': Execute immediately if a new run begins. - 'run_end': As soon as a run ends. - 'param_is': As soon as a parameter has a specific value for a run.

'run_end'
location str

Where to submit the job. S3DF or NERSC.

'S3DF'
**trig_args str

Arguments required for a specific trigger type. trigger='param_is' - 2 Arguments trig_param (str): Name of the parameter to watch for. trig_param_val (str): Value the parameter should have to trigger.

{}
Source code in lute/io/elog.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def post_elog_workflow(
    exp: str,
    name: str,
    executable: str,
    wf_params: str,
    *,
    trigger: str = "run_end",
    location: str = "S3DF",
    **trig_args: str,
) -> None:
    """Create a new eLog workflow, or update an existing one.

    The workflow will run a specific executable as a batch job when the
    specified trigger occurs. The precise arguments may vary depending on the
    selected trigger type.

    Args:
        name (str): An identifying name for the workflow. E.g. "process data"

        executable (str): Full path to the executable to be run.

        wf_params (str): All command-line parameters for the executable as a string.

        trigger (str): When to trigger execution of the specified executable.
            One of:
                - 'manual': Must be manually triggered. No automatic processing.
                - 'run_start': Execute immediately if a new run begins.
                - 'run_end': As soon as a run ends.
                - 'param_is': As soon as a parameter has a specific value for a run.

        location (str): Where to submit the job. S3DF or NERSC.

        **trig_args (str): Arguments required for a specific trigger type.
            trigger='param_is' - 2 Arguments
                trig_param (str): Name of the parameter to watch for.
                trig_param_val (str): Value the parameter should have to trigger.
    """
    endpoint: str = f"{exp}/ws/create_update_workflow_def"
    trig_map: Dict[str, str] = {
        "manual": "MANUAL",
        "run_start": "START_OF_RUN",
        "run_end": "END_OF_RUN",
        "param_is": "RUN_PARAM_IS_VALUE",
    }
    if trigger not in trig_map.keys():
        raise NotImplementedError(
            f"Cannot create workflow with trigger type: {trigger}"
        )
    wf_defn: Dict[str, str] = {
        "name": name,
        "executable": executable,
        "parameters": wf_params,
        "trigger": trig_map[trigger],
        "location": location,
    }
    if trigger == "param_is":
        if "trig_param" not in trig_args or "trig_param_val" not in trig_args:
            raise RuntimeError(
                "Trigger type 'param_is' requires: 'trig_param' and 'trig_param_val' arguments"
            )
        wf_defn.update(
            {
                "run_param_name": trig_args["trig_param"],
                "run_param_val": trig_args["trig_param_val"],
            }
        )
    post_params: Dict[str, Dict[str, str]] = {"json": wf_defn}
    status_code, resp_msg, _ = elog_http_request(
        exp, endpoint=endpoint, request_type="POST", **post_params
    )