Tools for working with the LUTE parameter and configuration database.

The current implementation relies on a sqlite backend database. In the future this may change - therefore relatively few high-level API function calls are intended to be public. These abstract away the details of the database interface and work exclusively on LUTE objects.

Functions:

Name Description
record_analysis_db

DescribedAnalysis) -> None: Writes the configuration to the backend database.

read_latest_db_entry

str, task_name: str, param: str) -> Any: Retrieve the most recent entry from a database for a specific Task.

Raises:
  • DatabaseError

    Generic exception raised for LUTE database errors.

DatabaseError

Bases: Exception

General LUTE database error.

Source code in lute/io/db.py
class DatabaseError(Exception):
    """General LUTE database error."""

    ...

read_latest_db_entry(db_dir, task_name, param, valid_only=True)

Read most recent value entered into the database for a Task parameter.

(Will be updated for schema compliance as well as Task name.)

Parameters:
  • db_dir (str) –

    Database location.

  • task_name (str) –

    The name of the Task to check the database for.

  • param (str) –

    The parameter name for the Task that we want to retrieve.

  • valid_only (bool, default: True ) –

    Whether to consider only valid results or not. E.g. An input file may be useful even if the Task result is invalid (Failed). Default = True.

Returns:
  • val( Any ) –

    The most recently entered value for param of task_name that can be found in the database. Returns None if nothing found.

Source code in lute/io/db.py
def read_latest_db_entry(
    db_dir: str, task_name: str, param: str, valid_only: bool = True
) -> Optional[Any]:
    """Read most recent value entered into the database for a Task parameter.

    (Will be updated for schema compliance as well as Task name.)

    Args:
        db_dir (str): Database location.

        task_name (str): The name of the Task to check the database for.

        param (str): The parameter name for the Task that we want to retrieve.

        valid_only (bool): Whether to consider only valid results or not. E.g.
            An input file may be useful even if the Task result is invalid
            (Failed). Default = True.

    Returns:
        val (Any): The most recently entered value for `param` of `task_name`
            that can be found in the database. Returns None if nothing found.
    """
    import sqlite3
    from ._sqlite import _select_from_db

    con: sqlite3.Connection = sqlite3.Connection(f"{db_dir}/lute.db")
    with con:
        try:
            cond: Dict[str, str] = {}
            if valid_only:
                cond = {"valid_flag": "1"}
            entry: Any = _select_from_db(con, task_name, param, cond)
        except sqlite3.OperationalError as err:
            logger.debug(f"Cannot retrieve value {param} due to: {err}")
            entry = None
    return entry

record_analysis_db(cfg)

Write an DescribedAnalysis object to the database.

The DescribedAnalysis object is maintained by the Executor and contains all information necessary to fully describe a single Task execution. The contained fields are split across multiple tables within the database as some of the information can be shared across multiple Tasks. Refer to docs/design/database.md for more information on the database specification.

Source code in lute/io/db.py
def record_analysis_db(cfg: DescribedAnalysis) -> None:
    """Write an DescribedAnalysis object to the database.

    The DescribedAnalysis object is maintained by the Executor and contains all
    information necessary to fully describe a single `Task` execution. The
    contained fields are split across multiple tables within the database as
    some of the information can be shared across multiple Tasks. Refer to
    `docs/design/database.md` for more information on the database specification.
    """
    import sqlite3
    from ._sqlite import (
        _make_shared_table,
        _make_task_table,
        _add_row_no_duplicate,
        _add_task_entry,
    )

    try:
        work_dir: str = cfg.task_parameters.lute_config.work_dir
    except AttributeError:
        logger.info(
            (
                "Unable to access TaskParameters object. Likely wasn't created. "
                "Cannot store result."
            )
        )
        return
    del cfg.task_parameters.lute_config.work_dir

    exec_entry, exec_columns = _cfg_to_exec_entry_cols(cfg)
    task_name: str = cfg.task_result.task_name
    # All `Task`s have an AnalysisHeader, but this info can be shared so is
    # split into a different table
    (
        task_entry,  # Dict[str, Any]
        task_columns,  # Dict[str, str]
        gen_entry,  # Dict[str, Any]
        gen_columns,  # Dict[str, str]
    ) = _params_to_entry_cols(cfg.task_parameters)
    x, y = _result_to_entry_cols(cfg.task_result)
    task_entry.update(x)
    task_columns.update(y)

    con: sqlite3.Connection = sqlite3.Connection(f"{work_dir}/lute.db")
    with con:
        # --- Table Creation ---#
        if not _make_shared_table(con, "gen_cfg", gen_columns):
            raise DatabaseError("Could not make general configuration table!")
        if not _make_shared_table(con, "exec_cfg", exec_columns):
            raise DatabaseError("Could not make Executor configuration table!")
        if not _make_task_table(con, task_name, task_columns):
            raise DatabaseError(f"Could not make Task table for: {task_name}!")

        # --- Row Addition ---#
        gen_id: int = _add_row_no_duplicate(con, "gen_cfg", gen_entry)
        exec_id: int = _add_row_no_duplicate(con, "exec_cfg", exec_entry)

        full_task_entry: Dict[str, Any] = {
            "gen_cfg_id": gen_id,
            "exec_cfg_id": exec_id,
        }
        full_task_entry.update(task_entry)
        # Prepare flag to indicate whether the task entry is valid or not
        # By default we say it is assuming proper completion
        valid_flag: int = (
            1 if cfg.task_result.task_status == TaskStatus.COMPLETED else 0
        )
        full_task_entry.update({"valid_flag": valid_flag})

        _add_task_entry(con, task_name, full_task_entry)