Tools for working with the LUTE parameter and configuration database.
The current implementation relies on a sqlite backend database. In the future this may change - therefore relatively few high-level API function calls are intended to be public. These abstract away the details of the database interface and work exclusively on LUTE objects.
Functions:
| Name | Description | 
|---|---|
record_analysis_db | 
              
                 DescribedAnalysis) -> None: Writes the configuration to the backend database.  | 
            
read_latest_db_entry | 
              
                 str, task_name: str, param: str) -> Any: Retrieve the most recent entry from a database for a specific Task.  | 
            
| Raises: | 
        
  | 
    
|---|
            DatabaseError
    
              Bases: Exception
General LUTE database error.
Source code in lute/io/db.py
                class DatabaseError(Exception):
    """General LUTE database error."""
    ...
              
            read_latest_db_entry(db_dir, task_name, param, valid_only=True)
    Read most recent value entered into the database for a Task parameter.
(Will be updated for schema compliance as well as Task name.)
| Parameters: | 
        
  | 
    
|---|
| Returns: | 
        
  | 
    
|---|
Source code in lute/io/db.py
              def read_latest_db_entry(
    db_dir: str, task_name: str, param: str, valid_only: bool = True
) -> Optional[Any]:
    """Read most recent value entered into the database for a Task parameter.
    (Will be updated for schema compliance as well as Task name.)
    Args:
        db_dir (str): Database location.
        task_name (str): The name of the Task to check the database for.
        param (str): The parameter name for the Task that we want to retrieve.
        valid_only (bool): Whether to consider only valid results or not. E.g.
            An input file may be useful even if the Task result is invalid
            (Failed). Default = True.
    Returns:
        val (Any): The most recently entered value for `param` of `task_name`
            that can be found in the database. Returns None if nothing found.
    """
    import sqlite3
    from ._sqlite import _select_from_db
    con: sqlite3.Connection = sqlite3.Connection(f"{db_dir}/lute.db")
    with con:
        try:
            cond: Dict[str, str] = {}
            if valid_only:
                cond = {"valid_flag": "1"}
            entry: Any = _select_from_db(con, task_name, param, cond)
        except sqlite3.OperationalError as err:
            logger.debug(f"Cannot retrieve value {param} due to: {err}")
            entry = None
    return entry
            
            record_analysis_db(cfg)
    Write an DescribedAnalysis object to the database.
The DescribedAnalysis object is maintained by the Executor and contains all
information necessary to fully describe a single Task execution. The
contained fields are split across multiple tables within the database as
some of the information can be shared across multiple Tasks. Refer to
docs/design/database.md for more information on the database specification.
Source code in lute/io/db.py
              def record_analysis_db(cfg: DescribedAnalysis) -> None:
    """Write an DescribedAnalysis object to the database.
    The DescribedAnalysis object is maintained by the Executor and contains all
    information necessary to fully describe a single `Task` execution. The
    contained fields are split across multiple tables within the database as
    some of the information can be shared across multiple Tasks. Refer to
    `docs/design/database.md` for more information on the database specification.
    """
    import sqlite3
    from ._sqlite import (
        _make_shared_table,
        _make_task_table,
        _add_row_no_duplicate,
        _add_task_entry,
    )
    try:
        work_dir: str = cfg.task_parameters.lute_config.work_dir
    except AttributeError:
        logger.info(
            (
                "Unable to access TaskParameters object. Likely wasn't created. "
                "Cannot store result."
            )
        )
        return
    del cfg.task_parameters.lute_config.work_dir
    exec_entry, exec_columns = _cfg_to_exec_entry_cols(cfg)
    task_name: str = cfg.task_result.task_name
    # All `Task`s have an AnalysisHeader, but this info can be shared so is
    # split into a different table
    (
        task_entry,  # Dict[str, Any]
        task_columns,  # Dict[str, str]
        gen_entry,  # Dict[str, Any]
        gen_columns,  # Dict[str, str]
    ) = _params_to_entry_cols(cfg.task_parameters)
    x, y = _result_to_entry_cols(cfg.task_result)
    task_entry.update(x)
    task_columns.update(y)
    con: sqlite3.Connection = sqlite3.Connection(f"{work_dir}/lute.db")
    with con:
        # --- Table Creation ---#
        if not _make_shared_table(con, "gen_cfg", gen_columns):
            raise DatabaseError("Could not make general configuration table!")
        if not _make_shared_table(con, "exec_cfg", exec_columns):
            raise DatabaseError("Could not make Executor configuration table!")
        if not _make_task_table(con, task_name, task_columns):
            raise DatabaseError(f"Could not make Task table for: {task_name}!")
        # --- Row Addition ---#
        gen_id: int = _add_row_no_duplicate(con, "gen_cfg", gen_entry)
        exec_id: int = _add_row_no_duplicate(con, "exec_cfg", exec_entry)
        full_task_entry: Dict[str, Any] = {
            "gen_cfg_id": gen_id,
            "exec_cfg_id": exec_id,
        }
        full_task_entry.update(task_entry)
        # Prepare flag to indicate whether the task entry is valid or not
        # By default we say it is assuming proper completion
        valid_flag: int = (
            1 if cfg.task_result.task_status == TaskStatus.COMPLETED else 0
        )
        full_task_entry.update({"valid_flag": valid_flag})
        _add_task_entry(con, task_name, full_task_entry)