Skip to content

test

Basic test Tasks for testing functionality.

Classes:

Name Description
Test

Simplest test Task - runs a 10 iteration loop and returns a result.

TestSocket

Test Task which sends larger data to test socket IPC.

TestWriteOutput

Test Task which writes an output file.

TestReadOutput

Test Task which reads in a file. Can be used to test database access.

Test

Bases: Task

Simple test Task to ensure subprocess and pipe-based IPC work.

Source code in lute/tasks/test.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class Test(Task):
    """Simple test Task to ensure subprocess and pipe-based IPC work."""

    def __init__(self, *, params: TestParameters) -> None:
        super().__init__(params=params)

    def _run(self) -> None:
        self._task_parameters = cast(TestParameters, self._task_parameters)
        for i in range(10):
            time.sleep(1)
            msg: Message = Message(contents=f"Test message {i}")
            self._report_to_executor(msg)
        if self._task_parameters.throw_error:
            raise RuntimeError("Testing Error!")

    def _post_run(self) -> None:
        self._result.summary = "Test Finished."
        self._result.task_status = TaskStatus.COMPLETED
        time.sleep(0.1)

TestReadOutput

Bases: Task

Simple test Task to read in output from the test Task above.

Its pydantic model relies on a database access to retrieve the output file.

Source code in lute/tasks/test.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class TestReadOutput(Task):
    """Simple test Task to read in output from the test Task above.

    Its pydantic model relies on a database access to retrieve the output file.
    """

    def __init__(self, *, params: TestReadOutputParameters) -> None:
        super().__init__(params=params)

    def _run(self) -> None:
        self._task_parameters = cast(TestReadOutputParameters, self._task_parameters)
        _: np.ndarray = np.loadtxt(self._task_parameters.in_file, delimiter=",")
        self._report_to_executor(msg=Message(contents="Successfully loaded data!"))
        for i in range(5):
            time.sleep(1)

    def _post_run(self) -> None:
        super()._post_run()
        self._result.summary = "Was able to load data."
        self._result.payload = "This Task produces no output."
        self._result.task_status = TaskStatus.COMPLETED

TestSocket

Bases: Task

Simple test Task to ensure basic IPC over Unix sockets works.

Source code in lute/tasks/test.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class TestSocket(Task):
    """Simple test Task to ensure basic IPC over Unix sockets works."""

    def __init__(self, *, params: TestSocketParameters) -> None:
        super().__init__(params=params)

    def _run(self) -> None:
        self._task_parameters = cast(TestSocketParameters, self._task_parameters)
        for i in range(self._task_parameters.num_arrays):
            msg: Message = Message(contents=f"Sending array {i}")
            self._report_to_executor(msg)
            time.sleep(0.05)
            msg = Message(contents=np.random.rand(self._task_parameters.array_size))
            self._report_to_executor(msg)

    def _post_run(self) -> None:
        super()._post_run()
        self._task_parameters = cast(TestSocketParameters, self._task_parameters)
        self._result.summary = f"Sent {self._task_parameters.num_arrays} arrays"
        self._result.payload = np.random.rand(self._task_parameters.array_size)
        self._result.task_status = TaskStatus.COMPLETED

TestWriteOutput

Bases: Task

Simple test Task to write output other Tasks depend on.

Source code in lute/tasks/test.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class TestWriteOutput(Task):
    """Simple test Task to write output other Tasks depend on."""

    def __init__(self, *, params: TestWriteOutputParameters) -> None:
        super().__init__(params=params)

    def _run(self) -> None:
        self._task_parameters = cast(TestWriteOutputParameters, self._task_parameters)
        for i in range(self._task_parameters.num_vals):
            # Doing some calculations...
            time.sleep(0.05)
            if i % 10 == 0:
                msg: Message = Message(contents=f"Processed {i+1} values!")
                self._report_to_executor(msg)

    def _post_run(self) -> None:
        super()._post_run()
        self._task_parameters = cast(TestWriteOutputParameters, self._task_parameters)
        work_dir: str = self._task_parameters.lute_config.work_dir
        out_file: str = f"{work_dir}/{self._task_parameters.outfile_name}"
        array: np.ndarray = np.random.rand(self._task_parameters.num_vals)
        np.savetxt(out_file, array, delimiter=",")
        self._result.summary = "Completed task successfully."
        self._result.payload = out_file
        self._result.task_status = TaskStatus.COMPLETED