Base classes and functions for handling Task
execution.
Executors run a Task
as a subprocess and handle all communication with other
services, e.g., the eLog. They accept specific handlers to override default
stream parsing.
Event handlers/hooks are implemented as standalone functions which can be added to an Executor.
Classes:
Name | Description |
---|---|
AnalysisConfig |
Data class for holding a managed Task's configuration. |
BaseExecutor |
Abstract base class from which all Executors are derived. |
Executor |
Default Executor implementing all basic functionality and IPC. |
BinaryExecutor |
Can execute any arbitrary binary/command as a managed task within the framework provided by LUTE. |
Exceptions
BaseExecutor
Bases: ABC
ABC to manage Task execution and communication with user services.
When running in a workflow, "tasks" (not the class instances) are submitted
as Executors
. The Executor manages environment setup, the actual Task
submission, and communication regarding Task results and status with third
party services like the eLog.
Attributes:
Methods:
Name | Description |
---|---|
add_hook |
str, hook: Callable[[None], None]) -> None: Create a new hook to be called each time a specific event occurs. |
add_default_hooks |
Populate the event hooks with the default functions. |
update_environment |
Dict[str, str], update_path: str): Update the environment that is passed to the Task subprocess. |
execute_task |
Run the task as a subprocess. |
Source code in lute/execution/executor.py
class BaseExecutor(ABC):
"""ABC to manage Task execution and communication with user services.
When running in a workflow, "tasks" (not the class instances) are submitted
as `Executors`. The Executor manages environment setup, the actual Task
submission, and communication regarding Task results and status with third
party services like the eLog.
Attributes:
Methods:
add_hook(event: str, hook: Callable[[None], None]) -> None: Create a
new hook to be called each time a specific event occurs.
add_default_hooks() -> None: Populate the event hooks with the default
functions.
update_environment(env: Dict[str, str], update_path: str): Update the
environment that is passed to the Task subprocess.
execute_task(): Run the task as a subprocess.
"""
class Hooks:
"""A container class for the Executor's event hooks.
There is a corresponding function (hook) for each event/signal. Each
function takes two parameters - a reference to the Executor (self) and
a reference to the Message (msg) which includes the corresponding
signal.
"""
def no_pickle_mode(self: Self, msg: Message): ...
def task_started(self: Self, msg: Message): ...
def task_failed(self: Self, msg: Message): ...
def task_stopped(self: Self, msg: Message): ...
def task_done(self: Self, msg: Message): ...
def task_cancelled(self: Self, msg: Message): ...
def task_result(self: Self, msg: Message): ...
def __init__(
self,
task_name: str,
communicators: List[Communicator],
poll_interval: float = 0.05,
) -> None:
"""The Executor will manage the subprocess in which `task_name` is run.
Args:
task_name (str): The name of the Task to be submitted. Must match
the Task's class name exactly. The parameter specification must
also be in a properly named model to be identified.
communicators (List[Communicator]): A list of one or more
communicators which manage information flow to/from the Task.
Subclasses may have different defaults, and new functionality
can be introduced by composing Executors with communicators.
poll_interval (float): Time to wait between reading/writing to the
managed subprocess. In seconds.
"""
result: TaskResult = TaskResult(
task_name=task_name, task_status=TaskStatus.PENDING, summary="", payload=""
)
task_parameters: Optional[TaskParameters] = None
task_env: Dict[str, str] = os.environ.copy()
self._communicators: List[Communicator] = communicators
communicator_desc: List[str] = []
for comm in self._communicators:
comm.stage_communicator()
communicator_desc.append(str(comm))
self._analysis_desc: DescribedAnalysis = DescribedAnalysis(
task_result=result,
task_parameters=task_parameters,
task_env=task_env,
poll_interval=poll_interval,
communicator_desc=communicator_desc,
)
def add_hook(self, event: str, hook: Callable[[Self, Message], None]) -> None:
"""Add a new hook.
Each hook is a function called any time the Executor receives a signal
for a particular event, e.g. Task starts, Task ends, etc. Calling this
method will remove any hook that currently exists for the event. I.e.
only one hook can be called per event at a time. Creating hooks for
events which do not exist is not allowed.
Args:
event (str): The event for which the hook will be called.
hook (Callable[[None], None]) The function to be called during each
occurrence of the event.
"""
if event.upper() in LUTE_SIGNALS:
setattr(self.Hooks, event.lower(), hook)
@abstractmethod
def add_default_hooks(self) -> None:
"""Populate the set of default event hooks."""
...
def update_environment(
self, env: Dict[str, str], update_path: str = "prepend"
) -> None:
"""Update the stored set of environment variables.
These are passed to the subprocess to setup its environment.
Args:
env (Dict[str, str]): A dictionary of "VAR":"VALUE" pairs of
environment variables to be added to the subprocess environment.
If any variables already exist, the new variables will
overwrite them (except PATH, see below).
update_path (str): If PATH is present in the new set of variables,
this argument determines how the old PATH is dealt with. There
are three options:
* "prepend" : The new PATH values are prepended to the old ones.
* "append" : The new PATH values are appended to the old ones.
* "overwrite" : The old PATH is overwritten by the new one.
"prepend" is the default option. If PATH is not present in the
current environment, the new PATH is used without modification.
"""
if "PATH" in env:
sep: str = os.pathsep
if update_path == "prepend":
env["PATH"] = (
f"{env['PATH']}{sep}{self._analysis_desc.task_env['PATH']}"
)
elif update_path == "append":
env["PATH"] = (
f"{self._analysis_desc.task_env['PATH']}{sep}{env['PATH']}"
)
elif update_path == "overwrite":
pass
else:
raise ValueError(
(
f"{update_path} is not a valid option for `update_path`!"
" Options are: prepend, append, overwrite."
)
)
os.environ.update(env)
self._analysis_desc.task_env.update(env)
def shell_source(self, env: str) -> None:
"""Source a script.
Unlike `update_environment` this method sources a new file.
Args:
env (str): Path to the script to source.
"""
import sys
if not os.path.exists(env):
logger.info(f"Cannot source environment from {env}!")
return
script: str = (
f"set -a\n"
f'source "{env}" >/dev/null\n'
f'{sys.executable} -c "import os; print(dict(os.environ))"\n'
)
logger.info(f"Sourcing file {env}")
o, e = subprocess.Popen(
["bash", "-c", script], stdout=subprocess.PIPE
).communicate()
new_environment: Dict[str, str] = eval(o)
self._analysis_desc.task_env = new_environment
def _pre_task(self) -> None:
"""Any actions to be performed before task submission.
This method may or may not be used by subclasses. It may be useful
for logging etc.
"""
...
def _submit_task(self, cmd: str) -> subprocess.Popen:
proc: subprocess.Popen = subprocess.Popen(
cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self._analysis_desc.task_env,
)
os.set_blocking(proc.stdout.fileno(), False)
os.set_blocking(proc.stderr.fileno(), False)
return proc
@abstractmethod
def _task_loop(self, proc: subprocess.Popen) -> None:
"""Actions to perform while the Task is running.
This function is run in the body of a loop until the Task signals
that its finished.
"""
...
@abstractmethod
def _finalize_task(self, proc: subprocess.Popen) -> None:
"""Any actions to be performed after the Task has ended.
Examples include a final clearing of the pipes, retrieving results,
reporting to third party services, etc.
"""
...
def _submit_cmd(self, executable_path: str, params: str) -> str:
"""Return a formatted command for launching Task subprocess.
May be overridden by subclasses.
Args:
executable_path (str): Path to the LUTE subprocess script.
params (str): String of formatted command-line arguments.
Returns:
cmd (str): Appropriately formatted command for this Executor.
"""
cmd: str = ""
if __debug__:
cmd = f"python -B {executable_path} {params}"
else:
cmd = f"python -OB {executable_path} {params}"
return cmd
def execute_task(self) -> None:
"""Run the requested Task as a subprocess."""
lute_path: Optional[str] = os.getenv("LUTE_PATH")
if lute_path is None:
logger.debug("Absolute path to subprocess_task.py not found.")
lute_path = os.path.abspath(f"{os.path.dirname(__file__)}/../..")
self.update_environment({"LUTE_PATH": lute_path})
executable_path: str = f"{lute_path}/subprocess_task.py"
config_path: str = self._analysis_desc.task_env["LUTE_CONFIGPATH"]
params: str = f"-c {config_path} -t {self._analysis_desc.task_result.task_name}"
cmd: str = self._submit_cmd(executable_path, params)
proc: subprocess.Popen = self._submit_task(cmd)
while self._task_is_running(proc):
self._task_loop(proc)
time.sleep(self._analysis_desc.poll_interval)
os.set_blocking(proc.stdout.fileno(), True)
os.set_blocking(proc.stderr.fileno(), True)
self._finalize_task(proc)
proc.stdout.close()
proc.stderr.close()
proc.wait()
if ret := proc.returncode:
logger.info(f"Task failed with return code: {ret}")
self._analysis_desc.task_result.task_status = TaskStatus.FAILED
self.Hooks.task_failed(self, msg=Message())
elif self._analysis_desc.task_result.task_status == TaskStatus.RUNNING:
# Ret code is 0, no exception was thrown, task forgot to set status
self._analysis_desc.task_result.task_status = TaskStatus.COMPLETED
logger.debug(f"Task did not change from RUNNING status. Assume COMPLETED.")
self.Hooks.task_done(self, msg=Message())
self._store_configuration()
for comm in self._communicators:
comm.clear_communicator()
if self._analysis_desc.task_result.task_status == TaskStatus.FAILED:
logger.info("Exiting after Task failure. Result recorded.")
sys.exit(-1)
self.process_results()
def _store_configuration(self) -> None:
"""Store configuration and results in the LUTE database."""
record_analysis_db(copy.deepcopy(self._analysis_desc))
def _task_is_running(self, proc: subprocess.Popen) -> bool:
"""Whether a subprocess is running.
Args:
proc (subprocess.Popen): The subprocess to determine the run status
of.
Returns:
bool: Is the subprocess task running.
"""
# Add additional conditions - don't want to exit main loop
# if only stopped
task_status: TaskStatus = self._analysis_desc.task_result.task_status
is_running: bool = task_status != TaskStatus.COMPLETED
is_running &= task_status != TaskStatus.CANCELLED
is_running &= task_status != TaskStatus.TIMEDOUT
return proc.poll() is None and is_running
def _stop(self, proc: subprocess.Popen) -> None:
"""Stop the Task subprocess."""
os.kill(proc.pid, signal.SIGTSTP)
self._analysis_desc.task_result.task_status = TaskStatus.STOPPED
def _continue(self, proc: subprocess.Popen) -> None:
"""Resume a stopped Task subprocess."""
os.kill(proc.pid, signal.SIGCONT)
self._analysis_desc.task_result.task_status = TaskStatus.RUNNING
def _set_result_from_parameters(self) -> None:
"""Use TaskParameters object to set TaskResult fields.
A result may be defined in terms of specific parameters. This is most
useful for ThirdPartyTasks which would not otherwise have an easy way of
reporting what the TaskResult is. There are two options for specifying
results from parameters:
1. A single parameter (Field) of the model has an attribute
`is_result`. This is a bool indicating that this parameter points
to a result. E.g. a parameter `output` may set `is_result=True`.
2. The `TaskParameters.Config` has a `result_from_params` attribute.
This is an appropriate option if the result is determinable for
the Task, but it is not easily defined by a single parameter. The
TaskParameters.Config.result_from_param can be set by a custom
validator, e.g. to combine the values of multiple parameters into
a single result. E.g. an `out_dir` and `out_file` parameter used
together specify the result. Currently only string specifiers are
supported.
A TaskParameters object specifies that it contains information about the
result by setting a single config option:
TaskParameters.Config.set_result=True
In general, this method should only be called when the above condition is
met, however, there are minimal checks in it as well.
"""
# This method shouldn't be called unless appropriate
# But we will add extra guards here
if self._analysis_desc.task_parameters is None:
logger.debug(
"Cannot set result from TaskParameters. TaskParameters is None!"
)
return
if (
not hasattr(self._analysis_desc.task_parameters.Config, "set_result")
or not self._analysis_desc.task_parameters.Config.set_result
):
logger.debug(
"Cannot set result from TaskParameters. `set_result` not specified!"
)
return
# First try to set from result_from_params (faster)
if self._analysis_desc.task_parameters.Config.result_from_params is not None:
result_from_params: str = (
self._analysis_desc.task_parameters.Config.result_from_params
)
logger.info(f"TaskResult specified as {result_from_params}.")
self._analysis_desc.task_result.payload = result_from_params
else:
# Iterate parameters to find the one that is the result
schema: Dict[str, Any] = self._analysis_desc.task_parameters.schema()
for param, value in self._analysis_desc.task_parameters.dict().items():
param_attrs: Dict[str, Any] = schema["properties"][param]
if "is_result" in param_attrs:
is_result: bool = param_attrs["is_result"]
if isinstance(is_result, bool) and is_result:
logger.info(f"TaskResult specified as {value}.")
self._analysis_desc.task_result.payload = value
else:
logger.debug(
(
f"{param} specified as result! But specifier is of "
f"wrong type: {type(is_result)}!"
)
)
break # We should only have 1 result-like parameter!
# If we get this far and haven't changed the payload we should complain
if self._analysis_desc.task_result.payload == "":
task_name: str = self._analysis_desc.task_result.task_name
logger.debug(
(
f"{task_name} specified result be set from {task_name}Parameters,"
" but no result provided! Check model definition!"
)
)
# Now check for impl_schemas and pass to result.impl_schemas
# Currently unused
impl_schemas: Optional[str] = (
self._analysis_desc.task_parameters.Config.impl_schemas
)
self._analysis_desc.task_result.impl_schemas = impl_schemas
# If we set_result but didn't get schema information we should complain
if self._analysis_desc.task_result.impl_schemas is None:
task_name: str = self._analysis_desc.task_result.task_name
logger.debug(
(
f"{task_name} specified result be set from {task_name}Parameters,"
" but no schema provided! Check model definition!"
)
)
def process_results(self) -> None:
"""Perform any necessary steps to process TaskResults object.
Processing will depend on subclass. Examples of steps include, moving
files, converting file formats, compiling plots/figures into an HTML
file, etc.
"""
self._process_results()
@abstractmethod
def _process_results(self) -> None: ...
Hooks
A container class for the Executor's event hooks.
There is a corresponding function (hook) for each event/signal. Each function takes two parameters - a reference to the Executor (self) and a reference to the Message (msg) which includes the corresponding signal.
Source code in lute/execution/executor.py
class Hooks:
"""A container class for the Executor's event hooks.
There is a corresponding function (hook) for each event/signal. Each
function takes two parameters - a reference to the Executor (self) and
a reference to the Message (msg) which includes the corresponding
signal.
"""
def no_pickle_mode(self: Self, msg: Message): ...
def task_started(self: Self, msg: Message): ...
def task_failed(self: Self, msg: Message): ...
def task_stopped(self: Self, msg: Message): ...
def task_done(self: Self, msg: Message): ...
def task_cancelled(self: Self, msg: Message): ...
def task_result(self: Self, msg: Message): ...
__init__(task_name, communicators, poll_interval=0.05)
The Executor will manage the subprocess in which task_name
is run.
Parameters: |
|
---|
Source code in lute/execution/executor.py
def __init__(
self,
task_name: str,
communicators: List[Communicator],
poll_interval: float = 0.05,
) -> None:
"""The Executor will manage the subprocess in which `task_name` is run.
Args:
task_name (str): The name of the Task to be submitted. Must match
the Task's class name exactly. The parameter specification must
also be in a properly named model to be identified.
communicators (List[Communicator]): A list of one or more
communicators which manage information flow to/from the Task.
Subclasses may have different defaults, and new functionality
can be introduced by composing Executors with communicators.
poll_interval (float): Time to wait between reading/writing to the
managed subprocess. In seconds.
"""
result: TaskResult = TaskResult(
task_name=task_name, task_status=TaskStatus.PENDING, summary="", payload=""
)
task_parameters: Optional[TaskParameters] = None
task_env: Dict[str, str] = os.environ.copy()
self._communicators: List[Communicator] = communicators
communicator_desc: List[str] = []
for comm in self._communicators:
comm.stage_communicator()
communicator_desc.append(str(comm))
self._analysis_desc: DescribedAnalysis = DescribedAnalysis(
task_result=result,
task_parameters=task_parameters,
task_env=task_env,
poll_interval=poll_interval,
communicator_desc=communicator_desc,
)
add_default_hooks()
abstractmethod
Populate the set of default event hooks.
Source code in lute/execution/executor.py
@abstractmethod
def add_default_hooks(self) -> None:
"""Populate the set of default event hooks."""
...
add_hook(event, hook)
Add a new hook.
Each hook is a function called any time the Executor receives a signal for a particular event, e.g. Task starts, Task ends, etc. Calling this method will remove any hook that currently exists for the event. I.e. only one hook can be called per event at a time. Creating hooks for events which do not exist is not allowed.
Parameters: |
|
---|
Source code in lute/execution/executor.py
def add_hook(self, event: str, hook: Callable[[Self, Message], None]) -> None:
"""Add a new hook.
Each hook is a function called any time the Executor receives a signal
for a particular event, e.g. Task starts, Task ends, etc. Calling this
method will remove any hook that currently exists for the event. I.e.
only one hook can be called per event at a time. Creating hooks for
events which do not exist is not allowed.
Args:
event (str): The event for which the hook will be called.
hook (Callable[[None], None]) The function to be called during each
occurrence of the event.
"""
if event.upper() in LUTE_SIGNALS:
setattr(self.Hooks, event.lower(), hook)
execute_task()
Run the requested Task as a subprocess.
Source code in lute/execution/executor.py
def execute_task(self) -> None:
"""Run the requested Task as a subprocess."""
lute_path: Optional[str] = os.getenv("LUTE_PATH")
if lute_path is None:
logger.debug("Absolute path to subprocess_task.py not found.")
lute_path = os.path.abspath(f"{os.path.dirname(__file__)}/../..")
self.update_environment({"LUTE_PATH": lute_path})
executable_path: str = f"{lute_path}/subprocess_task.py"
config_path: str = self._analysis_desc.task_env["LUTE_CONFIGPATH"]
params: str = f"-c {config_path} -t {self._analysis_desc.task_result.task_name}"
cmd: str = self._submit_cmd(executable_path, params)
proc: subprocess.Popen = self._submit_task(cmd)
while self._task_is_running(proc):
self._task_loop(proc)
time.sleep(self._analysis_desc.poll_interval)
os.set_blocking(proc.stdout.fileno(), True)
os.set_blocking(proc.stderr.fileno(), True)
self._finalize_task(proc)
proc.stdout.close()
proc.stderr.close()
proc.wait()
if ret := proc.returncode:
logger.info(f"Task failed with return code: {ret}")
self._analysis_desc.task_result.task_status = TaskStatus.FAILED
self.Hooks.task_failed(self, msg=Message())
elif self._analysis_desc.task_result.task_status == TaskStatus.RUNNING:
# Ret code is 0, no exception was thrown, task forgot to set status
self._analysis_desc.task_result.task_status = TaskStatus.COMPLETED
logger.debug(f"Task did not change from RUNNING status. Assume COMPLETED.")
self.Hooks.task_done(self, msg=Message())
self._store_configuration()
for comm in self._communicators:
comm.clear_communicator()
if self._analysis_desc.task_result.task_status == TaskStatus.FAILED:
logger.info("Exiting after Task failure. Result recorded.")
sys.exit(-1)
self.process_results()
process_results()
Perform any necessary steps to process TaskResults object.
Processing will depend on subclass. Examples of steps include, moving files, converting file formats, compiling plots/figures into an HTML file, etc.
Source code in lute/execution/executor.py
def process_results(self) -> None:
"""Perform any necessary steps to process TaskResults object.
Processing will depend on subclass. Examples of steps include, moving
files, converting file formats, compiling plots/figures into an HTML
file, etc.
"""
self._process_results()
shell_source(env)
Source a script.
Unlike update_environment
this method sources a new file.
Parameters: |
|
---|
Source code in lute/execution/executor.py
def shell_source(self, env: str) -> None:
"""Source a script.
Unlike `update_environment` this method sources a new file.
Args:
env (str): Path to the script to source.
"""
import sys
if not os.path.exists(env):
logger.info(f"Cannot source environment from {env}!")
return
script: str = (
f"set -a\n"
f'source "{env}" >/dev/null\n'
f'{sys.executable} -c "import os; print(dict(os.environ))"\n'
)
logger.info(f"Sourcing file {env}")
o, e = subprocess.Popen(
["bash", "-c", script], stdout=subprocess.PIPE
).communicate()
new_environment: Dict[str, str] = eval(o)
self._analysis_desc.task_env = new_environment
update_environment(env, update_path='prepend')
Update the stored set of environment variables.
These are passed to the subprocess to setup its environment.
Parameters: |
|
---|
Source code in lute/execution/executor.py
def update_environment(
self, env: Dict[str, str], update_path: str = "prepend"
) -> None:
"""Update the stored set of environment variables.
These are passed to the subprocess to setup its environment.
Args:
env (Dict[str, str]): A dictionary of "VAR":"VALUE" pairs of
environment variables to be added to the subprocess environment.
If any variables already exist, the new variables will
overwrite them (except PATH, see below).
update_path (str): If PATH is present in the new set of variables,
this argument determines how the old PATH is dealt with. There
are three options:
* "prepend" : The new PATH values are prepended to the old ones.
* "append" : The new PATH values are appended to the old ones.
* "overwrite" : The old PATH is overwritten by the new one.
"prepend" is the default option. If PATH is not present in the
current environment, the new PATH is used without modification.
"""
if "PATH" in env:
sep: str = os.pathsep
if update_path == "prepend":
env["PATH"] = (
f"{env['PATH']}{sep}{self._analysis_desc.task_env['PATH']}"
)
elif update_path == "append":
env["PATH"] = (
f"{self._analysis_desc.task_env['PATH']}{sep}{env['PATH']}"
)
elif update_path == "overwrite":
pass
else:
raise ValueError(
(
f"{update_path} is not a valid option for `update_path`!"
" Options are: prepend, append, overwrite."
)
)
os.environ.update(env)
self._analysis_desc.task_env.update(env)
Communicator
Bases: ABC
Source code in lute/execution/ipc.py
class Communicator(ABC):
def __init__(self, party: Party = Party.TASK, use_pickle: bool = True) -> None:
"""Abstract Base Class for IPC Communicator objects.
Args:
party (Party): Which object (side/process) the Communicator is
managing IPC for. I.e., is this the "Task" or "Executor" side.
use_pickle (bool): Whether to serialize data using pickle prior to
sending it.
"""
self._party = party
self._use_pickle = use_pickle
self.desc = "Communicator abstract base class."
@abstractmethod
def read(self, proc: subprocess.Popen) -> Message:
"""Method for reading data through the communication mechanism."""
...
@abstractmethod
def write(self, msg: Message) -> None:
"""Method for sending data through the communication mechanism."""
...
def __str__(self):
name: str = str(type(self)).split("'")[1].split(".")[-1]
return f"{name}: {self.desc}"
def __repr__(self):
return self.__str__()
def __enter__(self) -> Self:
return self
def __exit__(self) -> None: ...
def stage_communicator(self):
"""Alternative method for staging outside of context manager."""
self.__enter__()
def clear_communicator(self):
"""Alternative exit method outside of context manager."""
self.__exit__()
__init__(party=Party.TASK, use_pickle=True)
Abstract Base Class for IPC Communicator objects.
Parameters: |
---|
Source code in lute/execution/ipc.py
def __init__(self, party: Party = Party.TASK, use_pickle: bool = True) -> None:
"""Abstract Base Class for IPC Communicator objects.
Args:
party (Party): Which object (side/process) the Communicator is
managing IPC for. I.e., is this the "Task" or "Executor" side.
use_pickle (bool): Whether to serialize data using pickle prior to
sending it.
"""
self._party = party
self._use_pickle = use_pickle
self.desc = "Communicator abstract base class."
clear_communicator()
Alternative exit method outside of context manager.
Source code in lute/execution/ipc.py
def clear_communicator(self):
"""Alternative exit method outside of context manager."""
self.__exit__()
read(proc)
abstractmethod
Method for reading data through the communication mechanism.
Source code in lute/execution/ipc.py
@abstractmethod
def read(self, proc: subprocess.Popen) -> Message:
"""Method for reading data through the communication mechanism."""
...
stage_communicator()
Alternative method for staging outside of context manager.
Source code in lute/execution/ipc.py
def stage_communicator(self):
"""Alternative method for staging outside of context manager."""
self.__enter__()
write(msg)
abstractmethod
Method for sending data through the communication mechanism.
Source code in lute/execution/ipc.py
@abstractmethod
def write(self, msg: Message) -> None:
"""Method for sending data through the communication mechanism."""
...
Executor
Bases: BaseExecutor
Basic implementation of an Executor which manages simple IPC with Task.
Attributes:
Methods:
Name | Description |
---|---|
add_hook |
str, hook: Callable[[None], None]) -> None: Create a new hook to be called each time a specific event occurs. |
add_default_hooks |
Populate the event hooks with the default functions. |
update_environment |
Dict[str, str], update_path: str): Update the environment that is passed to the Task subprocess. |
execute_task |
Run the task as a subprocess. |
Source code in lute/execution/executor.py
class Executor(BaseExecutor):
"""Basic implementation of an Executor which manages simple IPC with Task.
Attributes:
Methods:
add_hook(event: str, hook: Callable[[None], None]) -> None: Create a
new hook to be called each time a specific event occurs.
add_default_hooks() -> None: Populate the event hooks with the default
functions.
update_environment(env: Dict[str, str], update_path: str): Update the
environment that is passed to the Task subprocess.
execute_task(): Run the task as a subprocess.
"""
def __init__(
self,
task_name: str,
communicators: List[Communicator] = [
PipeCommunicator(Party.EXECUTOR),
SocketCommunicator(Party.EXECUTOR),
],
poll_interval: float = 0.05,
) -> None:
super().__init__(
task_name=task_name,
communicators=communicators,
poll_interval=poll_interval,
)
self.add_default_hooks()
def add_default_hooks(self) -> None:
"""Populate the set of default event hooks."""
def no_pickle_mode(self: Executor, msg: Message):
for idx, communicator in enumerate(self._communicators):
if isinstance(communicator, PipeCommunicator):
self._communicators[idx] = PipeCommunicator(
Party.EXECUTOR, use_pickle=False
)
self.add_hook("no_pickle_mode", no_pickle_mode)
def task_started(self: Executor, msg: Message):
if isinstance(msg.contents, TaskParameters):
self._analysis_desc.task_parameters = msg.contents
# Maybe just run this no matter what? Rely on the other guards?
# Perhaps just check if ThirdPartyParameters?
# if isinstance(self._analysis_desc.task_parameters, ThirdPartyParameters):
if hasattr(self._analysis_desc.task_parameters.Config, "set_result"):
# Third party Tasks may mark a parameter as the result
# If so, setup the result now.
self._set_result_from_parameters()
logger.info(
f"Executor: {self._analysis_desc.task_result.task_name} started"
)
self._analysis_desc.task_result.task_status = TaskStatus.RUNNING
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "RUNNING",
}
post_elog_run_status(elog_data)
self.add_hook("task_started", task_started)
def task_failed(self: Executor, msg: Message):
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "FAILED",
}
post_elog_run_status(elog_data)
self.add_hook("task_failed", task_failed)
def task_stopped(self: Executor, msg: Message):
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "STOPPED",
}
post_elog_run_status(elog_data)
self.add_hook("task_stopped", task_stopped)
def task_done(self: Executor, msg: Message):
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "COMPLETED",
}
post_elog_run_status(elog_data)
self.add_hook("task_done", task_done)
def task_cancelled(self: Executor, msg: Message):
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "CANCELLED",
}
post_elog_run_status(elog_data)
self.add_hook("task_cancelled", task_cancelled)
def task_result(self: Executor, msg: Message):
if isinstance(msg.contents, TaskResult):
self._analysis_desc.task_result = msg.contents
logger.info(self._analysis_desc.task_result.summary)
logger.info(self._analysis_desc.task_result.task_status)
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "COMPLETED",
}
post_elog_run_status(elog_data)
self.add_hook("task_result", task_result)
def _task_loop(self, proc: subprocess.Popen) -> None:
"""Actions to perform while the Task is running.
This function is run in the body of a loop until the Task signals
that its finished.
"""
for communicator in self._communicators:
msg: Message = communicator.read(proc)
if msg.signal is not None and msg.signal.upper() in LUTE_SIGNALS:
hook: Callable[[Executor, Message], None] = getattr(
self.Hooks, msg.signal.lower()
)
hook(self, msg)
if msg.contents is not None:
if isinstance(msg.contents, str) and msg.contents != "":
logger.info(msg.contents)
elif not isinstance(msg.contents, str):
logger.info(msg.contents)
def _finalize_task(self, proc: subprocess.Popen) -> None:
"""Any actions to be performed after the Task has ended.
Examples include a final clearing of the pipes, retrieving results,
reporting to third party services, etc.
"""
self._task_loop(proc) # Perform a final read.
def _process_results(self) -> None:
"""Performs result processing.
Actions include:
- For `ElogSummaryPlots`, will save the summary plot to the appropriate
directory for display in the eLog.
"""
task_result: TaskResult = self._analysis_desc.task_result
self._process_result_payload(task_result.payload)
self._process_result_summary(task_result.summary)
def _process_result_payload(self, payload: Any) -> None:
if self._analysis_desc.task_parameters is None:
logger.debug("Please run Task before using this method!")
return
if isinstance(payload, ElogSummaryPlots):
# ElogSummaryPlots has figures and a display name
# display name also serves as a path.
expmt: str = self._analysis_desc.task_parameters.lute_config.experiment
base_path: str = f"/sdf/data/lcls/ds/{expmt[:3]}/{expmt}/stats/summary"
full_path: str = f"{base_path}/{payload.display_name}"
if not os.path.isdir(full_path):
os.makedirs(full_path)
# Preferred plots are pn.Tabs objects which save directly as html
# Only supported plot type that has "save" method - do not want to
# import plot modules here to do type checks.
if hasattr(payload.figures, "save"):
payload.figures.save(f"{full_path}/report.html")
else:
...
elif isinstance(payload, str):
# May be a path to a file...
schemas: Optional[str] = self._analysis_desc.task_result.impl_schemas
# Should also check `impl_schemas` to determine what to do with path
def _process_result_summary(self, summary: str) -> None: ...
add_default_hooks()
Populate the set of default event hooks.
Source code in lute/execution/executor.py
def add_default_hooks(self) -> None:
"""Populate the set of default event hooks."""
def no_pickle_mode(self: Executor, msg: Message):
for idx, communicator in enumerate(self._communicators):
if isinstance(communicator, PipeCommunicator):
self._communicators[idx] = PipeCommunicator(
Party.EXECUTOR, use_pickle=False
)
self.add_hook("no_pickle_mode", no_pickle_mode)
def task_started(self: Executor, msg: Message):
if isinstance(msg.contents, TaskParameters):
self._analysis_desc.task_parameters = msg.contents
# Maybe just run this no matter what? Rely on the other guards?
# Perhaps just check if ThirdPartyParameters?
# if isinstance(self._analysis_desc.task_parameters, ThirdPartyParameters):
if hasattr(self._analysis_desc.task_parameters.Config, "set_result"):
# Third party Tasks may mark a parameter as the result
# If so, setup the result now.
self._set_result_from_parameters()
logger.info(
f"Executor: {self._analysis_desc.task_result.task_name} started"
)
self._analysis_desc.task_result.task_status = TaskStatus.RUNNING
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "RUNNING",
}
post_elog_run_status(elog_data)
self.add_hook("task_started", task_started)
def task_failed(self: Executor, msg: Message):
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "FAILED",
}
post_elog_run_status(elog_data)
self.add_hook("task_failed", task_failed)
def task_stopped(self: Executor, msg: Message):
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "STOPPED",
}
post_elog_run_status(elog_data)
self.add_hook("task_stopped", task_stopped)
def task_done(self: Executor, msg: Message):
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "COMPLETED",
}
post_elog_run_status(elog_data)
self.add_hook("task_done", task_done)
def task_cancelled(self: Executor, msg: Message):
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "CANCELLED",
}
post_elog_run_status(elog_data)
self.add_hook("task_cancelled", task_cancelled)
def task_result(self: Executor, msg: Message):
if isinstance(msg.contents, TaskResult):
self._analysis_desc.task_result = msg.contents
logger.info(self._analysis_desc.task_result.summary)
logger.info(self._analysis_desc.task_result.task_status)
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "COMPLETED",
}
post_elog_run_status(elog_data)
self.add_hook("task_result", task_result)
MPIExecutor
Bases: Executor
Runs first-party Tasks that require MPI.
This Executor is otherwise identical to the standard Executor, except it
uses mpirun
for Task
submission. Currently this Executor assumes a job
has been submitted using SLURM as a first step. It will determine the number
of MPI ranks based on the resources requested. As a fallback, it will try
to determine the number of local cores available for cases where a job has
not been submitted via SLURM. On S3DF, the second determination mechanism
should accurately match the environment variable provided by SLURM indicating
resources allocated.
This Executor will submit the Task to run with a number of processes equal to the total number of cores available minus 1. A single core is reserved for the Executor itself. Note that currently this means that you must submit on 3 cores or more, since MPI requires a minimum of 2 ranks, and the number of ranks is determined from the cores dedicated to Task execution.
Methods:
Name | Description |
---|---|
_submit_cmd |
Run the task as a subprocess using |
Source code in lute/execution/executor.py
class MPIExecutor(Executor):
"""Runs first-party Tasks that require MPI.
This Executor is otherwise identical to the standard Executor, except it
uses `mpirun` for `Task` submission. Currently this Executor assumes a job
has been submitted using SLURM as a first step. It will determine the number
of MPI ranks based on the resources requested. As a fallback, it will try
to determine the number of local cores available for cases where a job has
not been submitted via SLURM. On S3DF, the second determination mechanism
should accurately match the environment variable provided by SLURM indicating
resources allocated.
This Executor will submit the Task to run with a number of processes equal
to the total number of cores available minus 1. A single core is reserved
for the Executor itself. Note that currently this means that you must submit
on 3 cores or more, since MPI requires a minimum of 2 ranks, and the number
of ranks is determined from the cores dedicated to Task execution.
Methods:
_submit_cmd: Run the task as a subprocess using `mpirun`.
"""
def _submit_cmd(self, executable_path: str, params: str) -> str:
"""Override submission command to use `mpirun`
Args:
executable_path (str): Path to the LUTE subprocess script.
params (str): String of formatted command-line arguments.
Returns:
cmd (str): Appropriately formatted command for this Executor.
"""
py_cmd: str = ""
nprocs: int = max(
int(os.environ.get("SLURM_NPROCS", len(os.sched_getaffinity(0)))) - 1, 1
)
mpi_cmd: str = f"mpirun -np {nprocs}"
if __debug__:
py_cmd = f"python -B -u -m mpi4py.run {executable_path} {params}"
else:
py_cmd = f"python -OB -u -m mpi4py.run {executable_path} {params}"
cmd: str = f"{mpi_cmd} {py_cmd}"
return cmd
Party
Bases: Enum
Identifier for which party (side/end) is using a communicator.
For some types of communication streams there may be different interfaces depending on which side of the communicator you are on. This enum is used by the communicator to determine which interface to use.
Source code in lute/execution/ipc.py
class Party(Enum):
"""Identifier for which party (side/end) is using a communicator.
For some types of communication streams there may be different interfaces
depending on which side of the communicator you are on. This enum is used
by the communicator to determine which interface to use.
"""
TASK = 0
"""
The Task (client) side.
"""
EXECUTOR = 1
"""
The Executor (server) side.
"""
EXECUTOR = 1
class-attribute
instance-attribute
The Executor (server) side.
TASK = 0
class-attribute
instance-attribute
The Task (client) side.
PipeCommunicator
Bases: Communicator
Provides communication through pipes over stderr/stdout.
The implementation of this communicator has reading and writing ocurring
on stderr and stdout. In general the Task
will be writing while the
Executor
will be reading. stderr
is used for sending signals.
Source code in lute/execution/ipc.py
class PipeCommunicator(Communicator):
"""Provides communication through pipes over stderr/stdout.
The implementation of this communicator has reading and writing ocurring
on stderr and stdout. In general the `Task` will be writing while the
`Executor` will be reading. `stderr` is used for sending signals.
"""
def __init__(self, party: Party = Party.TASK, use_pickle: bool = True) -> None:
"""IPC through pipes.
Arbitrary objects may be transmitted using pickle to serialize the data.
If pickle is not used
Args:
party (Party): Which object (side/process) the Communicator is
managing IPC for. I.e., is this the "Task" or "Executor" side.
use_pickle (bool): Whether to serialize data using Pickle prior to
sending it. If False, data is assumed to be text whi
"""
super().__init__(party=party, use_pickle=use_pickle)
self.desc = "Communicates through stderr and stdout using pickle."
def read(self, proc: subprocess.Popen) -> Message:
"""Read from stdout and stderr.
Args:
proc (subprocess.Popen): The process to read from.
Returns:
msg (Message): The message read, containing contents and signal.
"""
signal: Optional[str]
contents: Optional[str]
raw_signal: bytes = proc.stderr.read()
raw_contents: bytes = proc.stdout.read()
if raw_signal is not None:
signal = raw_signal.decode()
else:
signal = raw_signal
if raw_contents:
if self._use_pickle:
try:
contents = pickle.loads(raw_contents)
except (pickle.UnpicklingError, ValueError, EOFError) as err:
logger.debug("PipeCommunicator (Executor) - Set _use_pickle=False")
self._use_pickle = False
contents = self._safe_unpickle_decode(raw_contents)
else:
try:
contents = raw_contents.decode()
except UnicodeDecodeError as err:
logger.debug("PipeCommunicator (Executor) - Set _use_pickle=True")
self._use_pickle = True
contents = self._safe_unpickle_decode(raw_contents)
else:
contents = None
if signal and signal not in LUTE_SIGNALS:
# Some tasks write on stderr
# If the signal channel has "non-signal" info, add it to
# contents
if not contents:
contents = f"({signal})"
else:
contents = f"{contents} ({signal})"
signal = None
return Message(contents=contents, signal=signal)
def _safe_unpickle_decode(self, maybe_mixed: bytes) -> Optional[str]:
"""This method is used to unpickle and/or decode a bytes object.
It attempts to handle cases where contents can be mixed, i.e., part of
the message must be decoded and the other part unpickled. It handles
only two-way splits. If there are more complex arrangements such as:
<pickled>:<unpickled>:<pickled> etc, it will give up.
The simpler two way splits are unlikely to occur in normal usage. They
may arise when debugging if, e.g., `print` statements are mixed with the
usage of the `_report_to_executor` method.
Note that this method works because ONLY text data is assumed to be
sent via the pipes. The method needs to be revised to handle non-text
data if the `Task` is modified to also send that via PipeCommunicator.
The use of pickle is supported to provide for this option if it is
necessary. It may be deprecated in the future.
Be careful when making changes. This method has seemingly redundant
checks because unpickling will not throw an error if a full object can
be retrieved. That is, the library will ignore extraneous bytes. This
method attempts to retrieve that information if the pickled data comes
first in the stream.
Args:
maybe_mixed (bytes): A bytes object which could require unpickling,
decoding, or both.
Returns:
contents (Optional[str]): The unpickled/decoded contents if possible.
Otherwise, None.
"""
contents: Optional[str]
try:
contents = pickle.loads(maybe_mixed)
repickled: bytes = pickle.dumps(contents)
if len(repickled) < len(maybe_mixed):
# Successful unpickling, but pickle stops even if there are more bytes
try:
additional_data: str = maybe_mixed[len(repickled) :].decode()
contents = f"{contents}{additional_data}"
except UnicodeDecodeError:
# Can't decode the bytes left by pickle, so they are lost
missing_bytes: int = len(maybe_mixed) - len(repickled)
logger.debug(
f"PipeCommunicator has truncated message. Unable to retrieve {missing_bytes} bytes."
)
except (pickle.UnpicklingError, ValueError, EOFError) as err:
# Pickle may also throw a ValueError, e.g. this bytes: b"Found! \n"
# Pickle may also throw an EOFError, eg. this bytes: b"F0\n"
try:
contents = maybe_mixed.decode()
except UnicodeDecodeError as err2:
try:
contents = maybe_mixed[: err2.start].decode()
contents = f"{contents}{pickle.loads(maybe_mixed[err2.start:])}"
except Exception as err3:
logger.debug(
f"PipeCommunicator unable to decode/parse data! {err3}"
)
contents = None
return contents
def write(self, msg: Message) -> None:
"""Write to stdout and stderr.
The signal component is sent to `stderr` while the contents of the
Message are sent to `stdout`.
Args:
msg (Message): The Message to send.
"""
if self._use_pickle:
signal: bytes
if msg.signal:
signal = msg.signal.encode()
else:
signal = b""
contents: bytes = pickle.dumps(msg.contents)
sys.stderr.buffer.write(signal)
sys.stdout.buffer.write(contents)
sys.stderr.buffer.flush()
sys.stdout.buffer.flush()
else:
raw_signal: str
if msg.signal:
raw_signal = msg.signal
else:
raw_signal = ""
raw_contents: str
if isinstance(msg.contents, str):
raw_contents = msg.contents
elif msg.contents is None:
raw_contents = ""
else:
raise ValueError(
f"Cannot send msg contents of type: {type(msg.contents)} when not using pickle!"
)
sys.stderr.write(raw_signal)
sys.stdout.write(raw_contents)
__init__(party=Party.TASK, use_pickle=True)
IPC through pipes.
Arbitrary objects may be transmitted using pickle to serialize the data. If pickle is not used
Parameters: |
---|
Source code in lute/execution/ipc.py
def __init__(self, party: Party = Party.TASK, use_pickle: bool = True) -> None:
"""IPC through pipes.
Arbitrary objects may be transmitted using pickle to serialize the data.
If pickle is not used
Args:
party (Party): Which object (side/process) the Communicator is
managing IPC for. I.e., is this the "Task" or "Executor" side.
use_pickle (bool): Whether to serialize data using Pickle prior to
sending it. If False, data is assumed to be text whi
"""
super().__init__(party=party, use_pickle=use_pickle)
self.desc = "Communicates through stderr and stdout using pickle."
read(proc)
Read from stdout and stderr.
Parameters: |
|
---|
Returns: |
|
---|
Source code in lute/execution/ipc.py
def read(self, proc: subprocess.Popen) -> Message:
"""Read from stdout and stderr.
Args:
proc (subprocess.Popen): The process to read from.
Returns:
msg (Message): The message read, containing contents and signal.
"""
signal: Optional[str]
contents: Optional[str]
raw_signal: bytes = proc.stderr.read()
raw_contents: bytes = proc.stdout.read()
if raw_signal is not None:
signal = raw_signal.decode()
else:
signal = raw_signal
if raw_contents:
if self._use_pickle:
try:
contents = pickle.loads(raw_contents)
except (pickle.UnpicklingError, ValueError, EOFError) as err:
logger.debug("PipeCommunicator (Executor) - Set _use_pickle=False")
self._use_pickle = False
contents = self._safe_unpickle_decode(raw_contents)
else:
try:
contents = raw_contents.decode()
except UnicodeDecodeError as err:
logger.debug("PipeCommunicator (Executor) - Set _use_pickle=True")
self._use_pickle = True
contents = self._safe_unpickle_decode(raw_contents)
else:
contents = None
if signal and signal not in LUTE_SIGNALS:
# Some tasks write on stderr
# If the signal channel has "non-signal" info, add it to
# contents
if not contents:
contents = f"({signal})"
else:
contents = f"{contents} ({signal})"
signal = None
return Message(contents=contents, signal=signal)
write(msg)
Write to stdout and stderr.
The signal component is sent to stderr
while the contents of the
Message are sent to stdout
.
Parameters: |
|
---|
Source code in lute/execution/ipc.py
def write(self, msg: Message) -> None:
"""Write to stdout and stderr.
The signal component is sent to `stderr` while the contents of the
Message are sent to `stdout`.
Args:
msg (Message): The Message to send.
"""
if self._use_pickle:
signal: bytes
if msg.signal:
signal = msg.signal.encode()
else:
signal = b""
contents: bytes = pickle.dumps(msg.contents)
sys.stderr.buffer.write(signal)
sys.stdout.buffer.write(contents)
sys.stderr.buffer.flush()
sys.stdout.buffer.flush()
else:
raw_signal: str
if msg.signal:
raw_signal = msg.signal
else:
raw_signal = ""
raw_contents: str
if isinstance(msg.contents, str):
raw_contents = msg.contents
elif msg.contents is None:
raw_contents = ""
else:
raise ValueError(
f"Cannot send msg contents of type: {type(msg.contents)} when not using pickle!"
)
sys.stderr.write(raw_signal)
sys.stdout.write(raw_contents)
SocketCommunicator
Bases: Communicator
Provides communication over Unix sockets.
The path to the Unix socket is defined by the environment variable
LUTE_SOCKET=/path/to/socket
This class assumes proper permissions and that this above environment
variable has been defined. The Task
is configured as what would commonly
be referred to as the client
, while the Executor
is configured as the
server. The Executor continuosly monitors for connections and appends any
Messages that are received to a queue. Read requests retrieve Messages from
the queue. Task-side Communicators are fleeting so they open a connection,
send data, and immediately close and clean up.
Source code in lute/execution/ipc.py
class SocketCommunicator(Communicator):
"""Provides communication over Unix sockets.
The path to the Unix socket is defined by the environment variable:
`LUTE_SOCKET=/path/to/socket`
This class assumes proper permissions and that this above environment
variable has been defined. The `Task` is configured as what would commonly
be referred to as the `client`, while the `Executor` is configured as the
server. The Executor continuosly monitors for connections and appends any
Messages that are received to a queue. Read requests retrieve Messages from
the queue. Task-side Communicators are fleeting so they open a connection,
send data, and immediately close and clean up.
"""
READ_TIMEOUT: float = 0.01
"""
Maximum time to wait to retrieve data.
"""
def __init__(self, party: Party = Party.TASK, use_pickle: bool = True) -> None:
"""IPC over a Unix socket.
Unlike with the PipeCommunicator, pickle is always used to send data
through the socket.
Args:
party (Party): Which object (side/process) the Communicator is
managing IPC for. I.e., is this the "Task" or "Executor" side.
use_pickle (bool): Whether to use pickle. Always True currently,
passing False does not change behaviour.
"""
super().__init__(party=party, use_pickle=use_pickle)
self.desc: str = "Communicates through a Unix socket."
self._data_socket: socket.socket = self._create_socket()
self._data_socket.setblocking(0)
def read(self, proc: subprocess.Popen) -> Message:
"""Read data from a socket.
Socket(s) are continuously monitored, and read from when new data is
available.
Args:
proc (subprocess.Popen): The process to read from. Provided for
compatibility with other Communicator subtypes. Is ignored.
Returns:
msg (Message): The message read, containing contents and signal.
"""
has_data, _, has_error = select.select(
[self._data_socket],
[],
[self._data_socket],
SocketCommunicator.READ_TIMEOUT,
)
msg: Message
if has_data:
connection, _ = has_data[0].accept()
full_data: bytes = b""
while True:
data: bytes = connection.recv(1024)
if data:
full_data += data
else:
break
msg = pickle.loads(full_data) if full_data else Message()
connection.close()
else:
msg = Message()
return msg
def write(self, msg: Message) -> None:
"""Send a single Message.
The entire Message (signal and contents) is serialized and sent through
a connection over Unix socket.
Args:
msg (Message): The Message to send.
"""
self._write_socket(msg)
def _create_socket(self) -> socket.socket:
"""Returns a socket object.
Returns:
data_socket (socket.socket): Unix socket object.
"""
socket_path: str
try:
socket_path = os.environ["LUTE_SOCKET"]
except KeyError as err:
import uuid
import tempfile
# Define a path,up and add to environment
# Executor-side always created first, Task will use the same one
socket_path = f"{tempfile.gettempdir()}/lute_{uuid.uuid4().hex}.sock"
os.environ["LUTE_SOCKET"] = socket_path
logger.debug(f"SocketCommunicator defines socket_path: {socket_path}")
data_socket: socket.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
if self._party == Party.EXECUTOR:
if os.path.exists(socket_path):
os.unlink(socket_path)
data_socket.bind(socket_path)
data_socket.listen(1)
elif self._party == Party.TASK:
data_socket.connect(socket_path)
return data_socket
def _write_socket(self, msg: Message) -> None:
"""Sends data over a socket from the 'client' (Task) side.
Communicator objects on the Task-side are fleeting, so a socket is
opened, data is sent, and then the connection and socket are cleaned up.
"""
self._data_socket.sendall(pickle.dumps(msg))
self._clean_up()
def _clean_up(self) -> None:
"""Clean up connections."""
# Check the object exists in case the Communicator is cleaned up before
# opening any connections
if hasattr(self, "_data_socket"):
socket_path: str = self._data_socket.getsockname()
self._data_socket.close()
if self._party == Party.EXECUTOR:
os.unlink(socket_path)
@property
def socket_path(self) -> str:
socket_path: str = self._data_socket.getsockname()
return socket_path
def __exit__(self):
self._clean_up()
READ_TIMEOUT: float = 0.01
class-attribute
instance-attribute
Maximum time to wait to retrieve data.
__init__(party=Party.TASK, use_pickle=True)
IPC over a Unix socket.
Unlike with the PipeCommunicator, pickle is always used to send data through the socket.
Parameters: |
---|
Source code in lute/execution/ipc.py
def __init__(self, party: Party = Party.TASK, use_pickle: bool = True) -> None:
"""IPC over a Unix socket.
Unlike with the PipeCommunicator, pickle is always used to send data
through the socket.
Args:
party (Party): Which object (side/process) the Communicator is
managing IPC for. I.e., is this the "Task" or "Executor" side.
use_pickle (bool): Whether to use pickle. Always True currently,
passing False does not change behaviour.
"""
super().__init__(party=party, use_pickle=use_pickle)
self.desc: str = "Communicates through a Unix socket."
self._data_socket: socket.socket = self._create_socket()
self._data_socket.setblocking(0)
read(proc)
Read data from a socket.
Socket(s) are continuously monitored, and read from when new data is available.
Parameters: |
|
---|
Returns: |
|
---|
Source code in lute/execution/ipc.py
def read(self, proc: subprocess.Popen) -> Message:
"""Read data from a socket.
Socket(s) are continuously monitored, and read from when new data is
available.
Args:
proc (subprocess.Popen): The process to read from. Provided for
compatibility with other Communicator subtypes. Is ignored.
Returns:
msg (Message): The message read, containing contents and signal.
"""
has_data, _, has_error = select.select(
[self._data_socket],
[],
[self._data_socket],
SocketCommunicator.READ_TIMEOUT,
)
msg: Message
if has_data:
connection, _ = has_data[0].accept()
full_data: bytes = b""
while True:
data: bytes = connection.recv(1024)
if data:
full_data += data
else:
break
msg = pickle.loads(full_data) if full_data else Message()
connection.close()
else:
msg = Message()
return msg
write(msg)
Send a single Message.
The entire Message (signal and contents) is serialized and sent through a connection over Unix socket.
Parameters: |
|
---|
Source code in lute/execution/ipc.py
def write(self, msg: Message) -> None:
"""Send a single Message.
The entire Message (signal and contents) is serialized and sent through
a connection over Unix socket.
Args:
msg (Message): The Message to send.
"""
self._write_socket(msg)