Skip to content

api

Tools for working with the LUTE parameter and configuration database.

The current implementation relies on a sqlite backend database. In the future this may change - therefore relatively few high-level API function calls are intended to be public. These abstract away the details of the database interface and work exclusively on LUTE objects.

Functions:

Name Description
record_analysis_db

DescribedAnalysis) -> None: Writes the configuration to the backend database.

read_latest_db_entry

str, task_name: str, param: str) -> Any: Retrieve the most recent entry from a database for a specific Task.

Raises:

Type Description
DatabaseError

Generic exception raised for LUTE database errors.

get_executions_summary(db_dir)

Return some summary fields of all executions recorded.

Parameters:

Name Type Description Default
db_dir str

Database location.

required

Returns:

Name Type Description
rows List[Tuple[int, str, str, str, str, str, int]]

Returns a list of rows consisting of tuples with the following entries: ( executions.id, executions.timestamp, tasks.name, results.summary, results.payload, results.summary, results.valid_flag, ). An example of how to manipulate this data is in utilities/src/dbview.py

Source code in lute/io/_db/v2/api.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def get_executions_summary(
    db_dir: str,
) -> List[Tuple[int, str, str, str, str, str, int]]:
    """Return some summary fields of all executions recorded.

    Args:
        db_dir (str): Database location.

    Returns:
        rows (List[Tuple[int, str, str, str, str, str, int]]): Returns a list
            of rows consisting of tuples with the following entries:
            (
                executions.id,
                executions.timestamp,
                tasks.name,
                results.summary,
                results.payload,
                results.summary,
                results.valid_flag,
            ).
            An example of how to manipulate this data is in `utilities/src/dbview.py`
    """
    import sqlite3
    from ._sqlite import executions_summary

    db_path: str = f"{db_dir}/lute.db"
    con: sqlite3.Connection = sqlite3.Connection(db_path)
    with con:
        return executions_summary(con=con)

get_task_parameters_defn_and_params(db_dir, row_ids)

Return a TaskParameters definition, and the param/value pairs.

Parameters:

Name Type Description Default
db_dir str

Database location.

required
row_ids RowIds

The ids in the various tables to reconstruct the TaskParameters definition and the various parameters.

required

Returns:

Name Type Description
definition Dict[str, Any]

The TaskParameters definition (JSON schema).

param_values Dict[str, Any]

The parameter: value dictionary.

Source code in lute/io/_db/v2/api.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def get_task_parameters_defn_and_params(
    db_dir: str,
    row_ids: RowIds,
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
    """Return a TaskParameters definition, and the param/value pairs.

    Args:
        db_dir (str): Database location.

        row_ids (RowIds): The ids in the various tables to reconstruct the
            TaskParameters definition and the various parameters.

    Returns:
        definition (Dict[str, Any]): The TaskParameters definition (JSON schema).

        param_values (Dict[str, Any]): The parameter: value dictionary.
    """
    import sqlite3
    from ._sqlite import get_task_parameters_definition_and_params

    db_path: str = f"{db_dir}/lute.db"
    con: sqlite3.Connection = sqlite3.Connection(db_path)
    with con:
        rows: List[Tuple[str, str, str]] = get_task_parameters_definition_and_params(
            con=con, row_ids=row_ids
        )
        definition: Dict[str, Any] = {}
        param_values: Dict[str, Any] = {}
        for row in rows:
            if not definition:
                definition = json.loads(row[0])
            param_values[row[1]] = json.loads(row[2])

    return definition, param_values

get_task_parameters_summary(db_dir, task_name)

Return parameters for a specific task ordered by execution.

Parameters:

Name Type Description Default
db_dir str

Database location.

required
task_name str

Name of the Task to retrieve parameters for.

required

Returns:

Name Type Description
rows List[Tuple[int, str, str, str, str, str, int]]

Returns a list of rows consisting of tuples with the following entries: ( executions.id, executions.timestamp, results.valid_flag, parameters.name, parameters.value, ). An example of how to manipulate this data is in utilities/src/dbview.py

Source code in lute/io/_db/v2/api.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def get_task_parameters_summary(
    db_dir: str, task_name: str
) -> List[Tuple[int, str, int, str, str]]:
    """Return parameters for a specific task ordered by execution.

    Args:
        db_dir (str): Database location.

        task_name (str): Name of the Task to retrieve parameters for.

    Returns:
        rows (List[Tuple[int, str, str, str, str, str, int]]): Returns a list
            of rows consisting of tuples with the following entries:
            (
                executions.id,
                executions.timestamp,
                results.valid_flag,
                parameters.name,
                parameters.value,
            ).
            An example of how to manipulate this data is in `utilities/src/dbview.py`
    """
    import sqlite3
    from ._sqlite import task_parameters_summary

    db_path: str = f"{db_dir}/lute.db"
    con: sqlite3.Connection = sqlite3.Connection(db_path)
    with con:
        return task_parameters_summary(con=con, task_name=task_name)

read_latest_db_entry(db_dir, task_name, param, valid_only=True, for_run=os.getenv('RUN'))

Read most recent value entered into the database for a Task parameter.

(Will be updated for schema compliance as well as Task name.)

Parameters:

Name Type Description Default
db_dir str

Database location.

required
task_name str

The name of the Task to check the database for.

required
param str

The parameter name for the Task that we want to retrieve.

required
valid_only bool

Whether to consider only valid results or not. E.g. An input file may be useful even if the Task result is invalid (Failed). Default = True.

True
for_run Optional[str | int]

Only consider latest entries from the specific experiment run provided.

getenv('RUN')

Returns:

Name Type Description
val Any

The most recently entered value for param of task_name that can be found in the database. Returns None if nothing found.

Source code in lute/io/_db/v2/api.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def read_latest_db_entry(
    db_dir: str,
    task_name: str,
    param: str,
    valid_only: bool = True,
    for_run: Optional[Union[str, int]] = os.getenv("RUN"),
) -> Optional[Any]:
    """Read most recent value entered into the database for a Task parameter.

    (Will be updated for schema compliance as well as Task name.)

    Args:
        db_dir (str): Database location.

        task_name (str): The name of the Task to check the database for.

        param (str): The parameter name for the Task that we want to retrieve.

        valid_only (bool): Whether to consider only valid results or not. E.g.
            An input file may be useful even if the Task result is invalid
            (Failed). Default = True.

        for_run (Optional[str | int]): Only consider latest entries from the
            specific experiment run provided.

    Returns:
        val (Any): The most recently entered value for `param` of `task_name`
            that can be found in the database. Returns None if nothing found.
    """
    import sqlite3
    from ._sqlite import select_param_from_db

    db_path: str = f"{db_dir}/lute.db"
    con: sqlite3.Connection = sqlite3.Connection(db_path)
    with con:
        try:
            cond: Dict[str, str] = {}
            if valid_only:
                cond["valid_flag"] = "1"

            if for_run is not None:
                cond["run"] = str(for_run)

            new_param: str = param
            is_result: bool = False
            if "result." in param:
                new_param = param.split(".")[1]
                is_result = True

            return select_param_from_db(
                con=con,
                task_name=task_name,
                param_name=new_param,
                condition=cond,
                is_result=is_result,
            )
        except sqlite3.OperationalError as err:
            logger.error(f"Cannot retrieve value {param} due to: {err}")
            return None

record_analysis_db(cfg)

Write an DescribedAnalysis object to the database.

The DescribedAnalysis object is maintained by the Executor and contains all information necessary to fully describe a single Task execution. The contained fields are split across multiple tables within the database as some of the information can be shared across multiple Tasks. Refer to docs/design/database.md for more information on the database specification.

This function is meant to be called by the Executor at the end of Task execution, assuming the Task has not previously entered partial data into the database. See record_parameters_db and update_task_entry_db for how to handle the case where the Task and Executor both store some information.

Parameters:

Name Type Description Default
cfg DescribedAnalysis

The DescribedAnalysis completed by the Executor after Task completion.

required
Source code in lute/io/_db/v2/api.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def record_analysis_db(cfg: DescribedAnalysis) -> None:
    """Write an DescribedAnalysis object to the database.

    The DescribedAnalysis object is maintained by the Executor and contains all
    information necessary to fully describe a single `Task` execution. The
    contained fields are split across multiple tables within the database as
    some of the information can be shared across multiple Tasks. Refer to
    `docs/design/database.md` for more information on the database specification.

    This function is meant to be called by the Executor at the end of Task
    execution, assuming the Task has not previously entered partial data into the
    database. See `record_parameters_db` and `update_task_entry_db` for how to
    handle the case where the Task and Executor both store some information.

    Args:
        cfg (DescribedAnalysis): The DescribedAnalysis completed by the Executor
            after Task completion.
    """
    import sqlite3
    from ._sqlite import create_tables, add_execution

    from lute.io.models.base import TaskParameters

    assert isinstance(cfg.task_parameters, TaskParameters)
    try:
        assert hasattr(cfg.task_parameters, "lute_config")
        work_dir: str = cfg.task_parameters.lute_config.work_dir
    except AttributeError:
        logger.error(
            (
                "Unable to access TaskParameters object. Likely wasn't created. "
                "Cannot store result."
            )
        )
        return
    assert hasattr(cfg.task_parameters, "lute_config")
    del cfg.task_parameters.lute_config.work_dir

    db_path: str = f"{work_dir}/lute.db"
    con: sqlite3.Connection = sqlite3.Connection(db_path)
    create_tables(con=con)
    with con:
        try:
            create_tables(con=con)
            add_execution(con=con, cfg=cfg)
        except sqlite3.OperationalError as err:
            logger.error(f"Database storage error: {err}")
    try:
        os.chmod(db_path, 0o664)
    except Exception:
        logger.error("Cannot setup permissions on database!")

record_parameters_db(params)

Write all tables that are possible using only the TaskParameters.

This function is intended to be called at the Task layer, prior to completing execution. It allows saving state into the database.

The entries are then intended to be updated by the Executor upon completion with all information which was excluded.

Parameters:

Name Type Description Default
params TaskParameters

The TaskParameters object - fully validated.

required
Source code in lute/io/_db/v2/api.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def record_parameters_db(params: TaskParameters) -> Optional[RowIds]:
    """Write all tables that are possible using only the TaskParameters.

    This function is intended to be called at the Task layer, prior to completing
    execution. It allows saving state into the database.

    The entries are then intended to be updated by the Executor upon completion
    with all information which was excluded.

    Args:
        params (TaskParameters): The TaskParameters object - fully validated.
    """
    import sqlite3
    from ._sqlite import create_tables, add_placeholder_execution

    from lute.io.models.base import TaskParameters

    assert isinstance(params, TaskParameters)
    try:
        assert hasattr(params, "lute_config")
        work_dir: str = params.lute_config.work_dir
    except AttributeError:
        logger.error(
            (
                "Unable to access TaskParameters object. Likely wasn't created. "
                "Cannot store result."
            )
        )
        return
    assert hasattr(params, "lute_config")
    del params.lute_config.work_dir

    db_path: str = f"{work_dir}/lute.db"
    con: sqlite3.Connection = sqlite3.Connection(db_path)
    create_tables(con=con)
    with con:
        try:
            create_tables(con=con)
            row_ids: Optional[RowIds] = add_placeholder_execution(
                con=con, params=params
            )
        except sqlite3.OperationalError as err:
            logger.error(f"Database storage error: {err}")
            row_ids = None
    try:
        os.chmod(db_path, 0o664)
    except Exception:
        logger.error("Cannot setup permissions on database!")
    params.lute_config.work_dir = work_dir
    return row_ids