Tools for working with the LUTE parameter and configuration database.
The current implementation relies on a sqlite backend database. In the future this may change - therefore relatively few high-level API function calls are intended to be public. These abstract away the details of the database interface and work exclusively on LUTE objects.
Functions:
Name | Description |
---|---|
record_analysis_db |
DescribedAnalysis) -> None: Writes the configuration to the backend database. |
read_latest_db_entry |
str, task_name: str, param: str) -> Any: Retrieve the most recent entry from a database for a specific Task. |
Raises: |
|
---|
DatabaseError
Bases: Exception
General LUTE database error.
Source code in lute/io/db.py
class DatabaseError(Exception):
"""General LUTE database error."""
...
read_latest_db_entry(db_dir, task_name, param, valid_only=True)
Read most recent value entered into the database for a Task parameter.
(Will be updated for schema compliance as well as Task name.)
Parameters: |
|
---|
Returns: |
|
---|
Source code in lute/io/db.py
def read_latest_db_entry(
db_dir: str, task_name: str, param: str, valid_only: bool = True
) -> Optional[Any]:
"""Read most recent value entered into the database for a Task parameter.
(Will be updated for schema compliance as well as Task name.)
Args:
db_dir (str): Database location.
task_name (str): The name of the Task to check the database for.
param (str): The parameter name for the Task that we want to retrieve.
valid_only (bool): Whether to consider only valid results or not. E.g.
An input file may be useful even if the Task result is invalid
(Failed). Default = True.
Returns:
val (Any): The most recently entered value for `param` of `task_name`
that can be found in the database. Returns None if nothing found.
"""
import sqlite3
from ._sqlite import _select_from_db
con: sqlite3.Connection = sqlite3.Connection(f"{db_dir}/lute.db")
with con:
try:
cond: Dict[str, str] = {}
if valid_only:
cond = {"valid_flag": "1"}
entry: Any = _select_from_db(con, task_name, param, cond)
except sqlite3.OperationalError as err:
logger.debug(f"Cannot retrieve value {param} due to: {err}")
entry = None
return entry
record_analysis_db(cfg)
Write an DescribedAnalysis object to the database.
The DescribedAnalysis object is maintained by the Executor and contains all
information necessary to fully describe a single Task
execution. The
contained fields are split across multiple tables within the database as
some of the information can be shared across multiple Tasks. Refer to
docs/design/database.md
for more information on the database specification.
Source code in lute/io/db.py
def record_analysis_db(cfg: DescribedAnalysis) -> None:
"""Write an DescribedAnalysis object to the database.
The DescribedAnalysis object is maintained by the Executor and contains all
information necessary to fully describe a single `Task` execution. The
contained fields are split across multiple tables within the database as
some of the information can be shared across multiple Tasks. Refer to
`docs/design/database.md` for more information on the database specification.
"""
import sqlite3
from ._sqlite import (
_make_shared_table,
_make_task_table,
_add_row_no_duplicate,
_add_task_entry,
)
try:
work_dir: str = cfg.task_parameters.lute_config.work_dir
except AttributeError:
logger.info(
(
"Unable to access TaskParameters object. Likely wasn't created. "
"Cannot store result."
)
)
return
del cfg.task_parameters.lute_config.work_dir
exec_entry, exec_columns = _cfg_to_exec_entry_cols(cfg)
task_name: str = cfg.task_result.task_name
# All `Task`s have an AnalysisHeader, but this info can be shared so is
# split into a different table
(
task_entry, # Dict[str, Any]
task_columns, # Dict[str, str]
gen_entry, # Dict[str, Any]
gen_columns, # Dict[str, str]
) = _params_to_entry_cols(cfg.task_parameters)
x, y = _result_to_entry_cols(cfg.task_result)
task_entry.update(x)
task_columns.update(y)
con: sqlite3.Connection = sqlite3.Connection(f"{work_dir}/lute.db")
with con:
# --- Table Creation ---#
if not _make_shared_table(con, "gen_cfg", gen_columns):
raise DatabaseError("Could not make general configuration table!")
if not _make_shared_table(con, "exec_cfg", exec_columns):
raise DatabaseError("Could not make Executor configuration table!")
if not _make_task_table(con, task_name, task_columns):
raise DatabaseError(f"Could not make Task table for: {task_name}!")
# --- Row Addition ---#
gen_id: int = _add_row_no_duplicate(con, "gen_cfg", gen_entry)
exec_id: int = _add_row_no_duplicate(con, "exec_cfg", exec_entry)
full_task_entry: Dict[str, Any] = {
"gen_cfg_id": gen_id,
"exec_cfg_id": exec_id,
}
full_task_entry.update(task_entry)
# Prepare flag to indicate whether the task entry is valid or not
# By default we say it is assuming proper completion
valid_flag: int = (
1 if cfg.task_result.task_status == TaskStatus.COMPLETED else 0
)
full_task_entry.update({"valid_flag": valid_flag})
_add_task_entry(con, task_name, full_task_entry)