Classes and utilities for communication between Executors and subprocesses.

Communicators manage message passing and parsing between subprocesses. They maintain a limited public interface of "read" and "write" operations. Behind this interface the methods of communication vary from serialization across pipes to Unix sockets, etc. All communicators pass a single object called a "Message" which contains an arbitrary "contents" field as well as an optional "signal" field.

Classes:

Communicator

Bases: ABC

Source code in lute/execution/ipc.py
class Communicator(ABC):
    def __init__(self, party: Party = Party.TASK, use_pickle: bool = True) -> None:
        """Abstract Base Class for IPC Communicator objects.

        Args:
            party (Party): Which object (side/process) the Communicator is
                managing IPC for. I.e., is this the "Task" or "Executor" side.
            use_pickle (bool): Whether to serialize data using pickle prior to
                sending it.
        """
        self._party = party
        self._use_pickle = use_pickle
        self.desc = "Communicator abstract base class."

    @abstractmethod
    def read(self, proc: subprocess.Popen) -> Message:
        """Method for reading data through the communication mechanism."""
        ...

    @abstractmethod
    def write(self, msg: Message) -> None:
        """Method for sending data through the communication mechanism."""
        ...

    def __str__(self):
        name: str = str(type(self)).split("'")[1].split(".")[-1]
        return f"{name}: {self.desc}"

    def __repr__(self):
        return self.__str__()

    def __enter__(self) -> Self:
        return self

    def __exit__(self) -> None: ...

    def stage_communicator(self):
        """Alternative method for staging outside of context manager."""
        self.__enter__()

    def clear_communicator(self):
        """Alternative exit method outside of context manager."""
        self.__exit__()

__init__(party=Party.TASK, use_pickle=True)

Abstract Base Class for IPC Communicator objects.

Parameters:
  • party (Party, default: TASK ) –

    Which object (side/process) the Communicator is managing IPC for. I.e., is this the "Task" or "Executor" side.

  • use_pickle (bool, default: True ) –

    Whether to serialize data using pickle prior to sending it.

Source code in lute/execution/ipc.py
def __init__(self, party: Party = Party.TASK, use_pickle: bool = True) -> None:
    """Abstract Base Class for IPC Communicator objects.

    Args:
        party (Party): Which object (side/process) the Communicator is
            managing IPC for. I.e., is this the "Task" or "Executor" side.
        use_pickle (bool): Whether to serialize data using pickle prior to
            sending it.
    """
    self._party = party
    self._use_pickle = use_pickle
    self.desc = "Communicator abstract base class."

clear_communicator()

Alternative exit method outside of context manager.

Source code in lute/execution/ipc.py
def clear_communicator(self):
    """Alternative exit method outside of context manager."""
    self.__exit__()

read(proc) abstractmethod

Method for reading data through the communication mechanism.

Source code in lute/execution/ipc.py
@abstractmethod
def read(self, proc: subprocess.Popen) -> Message:
    """Method for reading data through the communication mechanism."""
    ...

stage_communicator()

Alternative method for staging outside of context manager.

Source code in lute/execution/ipc.py
def stage_communicator(self):
    """Alternative method for staging outside of context manager."""
    self.__enter__()

write(msg) abstractmethod

Method for sending data through the communication mechanism.

Source code in lute/execution/ipc.py
@abstractmethod
def write(self, msg: Message) -> None:
    """Method for sending data through the communication mechanism."""
    ...

Party

Bases: Enum

Identifier for which party (side/end) is using a communicator.

For some types of communication streams there may be different interfaces depending on which side of the communicator you are on. This enum is used by the communicator to determine which interface to use.

Source code in lute/execution/ipc.py
class Party(Enum):
    """Identifier for which party (side/end) is using a communicator.

    For some types of communication streams there may be different interfaces
    depending on which side of the communicator you are on. This enum is used
    by the communicator to determine which interface to use.
    """

    TASK = 0
    """
    The Task (client) side.
    """
    EXECUTOR = 1
    """
    The Executor (server) side.
    """

EXECUTOR = 1 class-attribute instance-attribute

The Executor (server) side.

TASK = 0 class-attribute instance-attribute

The Task (client) side.

PipeCommunicator

Bases: Communicator

Provides communication through pipes over stderr/stdout.

The implementation of this communicator has reading and writing ocurring on stderr and stdout. In general the Task will be writing while the Executor will be reading. stderr is used for sending signals.

Source code in lute/execution/ipc.py
class PipeCommunicator(Communicator):
    """Provides communication through pipes over stderr/stdout.

    The implementation of this communicator has reading and writing ocurring
    on stderr and stdout. In general the `Task` will be writing while the
    `Executor` will be reading. `stderr` is used for sending signals.
    """

    def __init__(self, party: Party = Party.TASK, use_pickle: bool = True) -> None:
        """IPC through pipes.

        Arbitrary objects may be transmitted using pickle to serialize the data.
        If pickle is not used

        Args:
            party (Party): Which object (side/process) the Communicator is
                managing IPC for. I.e., is this the "Task" or "Executor" side.
            use_pickle (bool): Whether to serialize data using Pickle prior to
                sending it. If False, data is assumed to be text whi
        """
        super().__init__(party=party, use_pickle=use_pickle)
        self.desc = "Communicates through stderr and stdout using pickle."

    def read(self, proc: subprocess.Popen) -> Message:
        """Read from stdout and stderr.

        Args:
            proc (subprocess.Popen): The process to read from.

        Returns:
            msg (Message): The message read, containing contents and signal.
        """
        signal: Optional[str]
        contents: Optional[str]
        raw_signal: bytes = proc.stderr.read()
        raw_contents: bytes = proc.stdout.read()
        if raw_signal is not None:
            signal = raw_signal.decode()
        else:
            signal = raw_signal
        if raw_contents:
            if self._use_pickle:
                try:
                    contents = pickle.loads(raw_contents)
                except (pickle.UnpicklingError, ValueError, EOFError) as err:
                    logger.debug("PipeCommunicator (Executor) - Set _use_pickle=False")
                    self._use_pickle = False
                    contents = self._safe_unpickle_decode(raw_contents)
            else:
                try:
                    contents = raw_contents.decode()
                except UnicodeDecodeError as err:
                    logger.debug("PipeCommunicator (Executor) - Set _use_pickle=True")
                    self._use_pickle = True
                    contents = self._safe_unpickle_decode(raw_contents)
        else:
            contents = None

        if signal and signal not in LUTE_SIGNALS:
            # Some tasks write on stderr
            # If the signal channel has "non-signal" info, add it to
            # contents
            if not contents:
                contents = f"({signal})"
            else:
                contents = f"{contents} ({signal})"
            signal = None

        return Message(contents=contents, signal=signal)

    def _safe_unpickle_decode(self, maybe_mixed: bytes) -> Optional[str]:
        """This method is used to unpickle and/or decode a bytes object.

        It attempts to handle cases where contents can be mixed, i.e., part of
        the message must be decoded and the other part unpickled. It handles
        only two-way splits. If there are more complex arrangements such as:
        <pickled>:<unpickled>:<pickled> etc, it will give up.

        The simpler two way splits are unlikely to occur in normal usage. They
        may arise when debugging if, e.g., `print` statements are mixed with the
        usage of the `_report_to_executor` method.

        Note that this method works because ONLY text data is assumed to be
        sent via the pipes. The method needs to be revised to handle non-text
        data if the `Task` is modified to also send that via PipeCommunicator.
        The use of pickle is supported to provide for this option if it is
        necessary. It may be deprecated in the future.

        Be careful when making changes. This method has seemingly redundant
        checks because unpickling will not throw an error if a full object can
        be retrieved. That is, the library will ignore extraneous bytes. This
        method attempts to retrieve that information if the pickled data comes
        first in the stream.

        Args:
            maybe_mixed (bytes): A bytes object which could require unpickling,
                decoding, or both.

        Returns:
            contents (Optional[str]): The unpickled/decoded contents if possible.
                Otherwise, None.
        """
        contents: Optional[str]
        try:
            contents = pickle.loads(maybe_mixed)
            repickled: bytes = pickle.dumps(contents)
            if len(repickled) < len(maybe_mixed):
                # Successful unpickling, but pickle stops even if there are more bytes
                try:
                    additional_data: str = maybe_mixed[len(repickled) :].decode()
                    contents = f"{contents}{additional_data}"
                except UnicodeDecodeError:
                    # Can't decode the bytes left by pickle, so they are lost
                    missing_bytes: int = len(maybe_mixed) - len(repickled)
                    logger.debug(
                        f"PipeCommunicator has truncated message. Unable to retrieve {missing_bytes} bytes."
                    )
        except (pickle.UnpicklingError, ValueError, EOFError) as err:
            # Pickle may also throw a ValueError, e.g. this bytes: b"Found! \n"
            # Pickle may also throw an EOFError, eg. this bytes: b"F0\n"
            try:
                contents = maybe_mixed.decode()
            except UnicodeDecodeError as err2:
                try:
                    contents = maybe_mixed[: err2.start].decode()
                    contents = f"{contents}{pickle.loads(maybe_mixed[err2.start:])}"
                except Exception as err3:
                    logger.debug(
                        f"PipeCommunicator unable to decode/parse data! {err3}"
                    )
                    contents = None
        return contents

    def write(self, msg: Message) -> None:
        """Write to stdout and stderr.

         The signal component is sent to `stderr` while the contents of the
         Message are sent to `stdout`.

        Args:
            msg (Message): The Message to send.
        """
        if self._use_pickle:
            signal: bytes
            if msg.signal:
                signal = msg.signal.encode()
            else:
                signal = b""

            contents: bytes = pickle.dumps(msg.contents)

            sys.stderr.buffer.write(signal)
            sys.stdout.buffer.write(contents)

            sys.stderr.buffer.flush()
            sys.stdout.buffer.flush()
        else:
            raw_signal: str
            if msg.signal:
                raw_signal = msg.signal
            else:
                raw_signal = ""

            raw_contents: str
            if isinstance(msg.contents, str):
                raw_contents = msg.contents
            elif msg.contents is None:
                raw_contents = ""
            else:
                raise ValueError(
                    f"Cannot send msg contents of type: {type(msg.contents)} when not using pickle!"
                )
            sys.stderr.write(raw_signal)
            sys.stdout.write(raw_contents)

__init__(party=Party.TASK, use_pickle=True)

IPC through pipes.

Arbitrary objects may be transmitted using pickle to serialize the data. If pickle is not used

Parameters:
  • party (Party, default: TASK ) –

    Which object (side/process) the Communicator is managing IPC for. I.e., is this the "Task" or "Executor" side.

  • use_pickle (bool, default: True ) –

    Whether to serialize data using Pickle prior to sending it. If False, data is assumed to be text whi

Source code in lute/execution/ipc.py
def __init__(self, party: Party = Party.TASK, use_pickle: bool = True) -> None:
    """IPC through pipes.

    Arbitrary objects may be transmitted using pickle to serialize the data.
    If pickle is not used

    Args:
        party (Party): Which object (side/process) the Communicator is
            managing IPC for. I.e., is this the "Task" or "Executor" side.
        use_pickle (bool): Whether to serialize data using Pickle prior to
            sending it. If False, data is assumed to be text whi
    """
    super().__init__(party=party, use_pickle=use_pickle)
    self.desc = "Communicates through stderr and stdout using pickle."

read(proc)

Read from stdout and stderr.

Parameters:
  • proc (Popen) –

    The process to read from.

Returns:
  • msg( Message ) –

    The message read, containing contents and signal.

Source code in lute/execution/ipc.py
def read(self, proc: subprocess.Popen) -> Message:
    """Read from stdout and stderr.

    Args:
        proc (subprocess.Popen): The process to read from.

    Returns:
        msg (Message): The message read, containing contents and signal.
    """
    signal: Optional[str]
    contents: Optional[str]
    raw_signal: bytes = proc.stderr.read()
    raw_contents: bytes = proc.stdout.read()
    if raw_signal is not None:
        signal = raw_signal.decode()
    else:
        signal = raw_signal
    if raw_contents:
        if self._use_pickle:
            try:
                contents = pickle.loads(raw_contents)
            except (pickle.UnpicklingError, ValueError, EOFError) as err:
                logger.debug("PipeCommunicator (Executor) - Set _use_pickle=False")
                self._use_pickle = False
                contents = self._safe_unpickle_decode(raw_contents)
        else:
            try:
                contents = raw_contents.decode()
            except UnicodeDecodeError as err:
                logger.debug("PipeCommunicator (Executor) - Set _use_pickle=True")
                self._use_pickle = True
                contents = self._safe_unpickle_decode(raw_contents)
    else:
        contents = None

    if signal and signal not in LUTE_SIGNALS:
        # Some tasks write on stderr
        # If the signal channel has "non-signal" info, add it to
        # contents
        if not contents:
            contents = f"({signal})"
        else:
            contents = f"{contents} ({signal})"
        signal = None

    return Message(contents=contents, signal=signal)

write(msg)

Write to stdout and stderr.

The signal component is sent to stderr while the contents of the Message are sent to stdout.

Parameters:
  • msg (Message) –

    The Message to send.

Source code in lute/execution/ipc.py
def write(self, msg: Message) -> None:
    """Write to stdout and stderr.

     The signal component is sent to `stderr` while the contents of the
     Message are sent to `stdout`.

    Args:
        msg (Message): The Message to send.
    """
    if self._use_pickle:
        signal: bytes
        if msg.signal:
            signal = msg.signal.encode()
        else:
            signal = b""

        contents: bytes = pickle.dumps(msg.contents)

        sys.stderr.buffer.write(signal)
        sys.stdout.buffer.write(contents)

        sys.stderr.buffer.flush()
        sys.stdout.buffer.flush()
    else:
        raw_signal: str
        if msg.signal:
            raw_signal = msg.signal
        else:
            raw_signal = ""

        raw_contents: str
        if isinstance(msg.contents, str):
            raw_contents = msg.contents
        elif msg.contents is None:
            raw_contents = ""
        else:
            raise ValueError(
                f"Cannot send msg contents of type: {type(msg.contents)} when not using pickle!"
            )
        sys.stderr.write(raw_signal)
        sys.stdout.write(raw_contents)

SocketCommunicator

Bases: Communicator

Provides communication over Unix sockets.

The path to the Unix socket is defined by the environment variable

LUTE_SOCKET=/path/to/socket

This class assumes proper permissions and that this above environment variable has been defined. The Task is configured as what would commonly be referred to as the client, while the Executor is configured as the server. The Executor continuosly monitors for connections and appends any Messages that are received to a queue. Read requests retrieve Messages from the queue. Task-side Communicators are fleeting so they open a connection, send data, and immediately close and clean up.

Source code in lute/execution/ipc.py
class SocketCommunicator(Communicator):
    """Provides communication over Unix sockets.

    The path to the Unix socket is defined by the environment variable:
                      `LUTE_SOCKET=/path/to/socket`
    This class assumes proper permissions and that this above environment
    variable has been defined. The `Task` is configured as what would commonly
    be referred to as the `client`, while the `Executor` is configured as the
    server. The Executor continuosly monitors for connections and appends any
    Messages that are received to a queue. Read requests retrieve Messages from
    the queue. Task-side Communicators are fleeting so they open a connection,
    send data, and immediately close and clean up.
    """

    READ_TIMEOUT: float = 0.01
    """
    Maximum time to wait to retrieve data.
    """

    def __init__(self, party: Party = Party.TASK, use_pickle: bool = True) -> None:
        """IPC over a Unix socket.

        Unlike with the PipeCommunicator, pickle is always used to send data
        through the socket.

        Args:
            party (Party): Which object (side/process) the Communicator is
                managing IPC for. I.e., is this the "Task" or "Executor" side.

            use_pickle (bool): Whether to use pickle. Always True currently,
                passing False does not change behaviour.
        """
        super().__init__(party=party, use_pickle=use_pickle)
        self.desc: str = "Communicates through a Unix socket."

        self._data_socket: socket.socket = self._create_socket()
        self._data_socket.setblocking(0)

    def read(self, proc: subprocess.Popen) -> Message:
        """Read data from a socket.

        Socket(s) are continuously monitored, and read from when new data is
        available.

        Args:
            proc (subprocess.Popen): The process to read from. Provided for
                compatibility with other Communicator subtypes. Is ignored.

        Returns:
             msg (Message): The message read, containing contents and signal.
        """
        has_data, _, has_error = select.select(
            [self._data_socket],
            [],
            [self._data_socket],
            SocketCommunicator.READ_TIMEOUT,
        )

        msg: Message
        if has_data:
            connection, _ = has_data[0].accept()
            full_data: bytes = b""
            while True:
                data: bytes = connection.recv(1024)
                if data:
                    full_data += data
                else:
                    break
            msg = pickle.loads(full_data) if full_data else Message()
            connection.close()
        else:
            msg = Message()

        return msg

    def write(self, msg: Message) -> None:
        """Send a single Message.

        The entire Message (signal and contents) is serialized and sent through
        a connection over Unix socket.

        Args:
            msg (Message): The Message to send.
        """
        self._write_socket(msg)

    def _create_socket(self) -> socket.socket:
        """Returns a socket object.

        Returns:
            data_socket (socket.socket): Unix socket object.
        """
        socket_path: str
        try:
            socket_path = os.environ["LUTE_SOCKET"]
        except KeyError as err:
            import uuid
            import tempfile

            # Define a path,up and add to environment
            # Executor-side always created first, Task will use the same one
            socket_path = f"{tempfile.gettempdir()}/lute_{uuid.uuid4().hex}.sock"
            os.environ["LUTE_SOCKET"] = socket_path
            logger.debug(f"SocketCommunicator defines socket_path: {socket_path}")

        data_socket: socket.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)

        if self._party == Party.EXECUTOR:
            if os.path.exists(socket_path):
                os.unlink(socket_path)
            data_socket.bind(socket_path)
            data_socket.listen(1)
        elif self._party == Party.TASK:
            data_socket.connect(socket_path)

        return data_socket

    def _write_socket(self, msg: Message) -> None:
        """Sends data over a socket from the 'client' (Task) side.

        Communicator objects on the Task-side are fleeting, so a socket is
        opened, data is sent, and then the connection and socket are cleaned up.
        """
        self._data_socket.sendall(pickle.dumps(msg))

        self._clean_up()

    def _clean_up(self) -> None:
        """Clean up connections."""
        # Check the object exists in case the Communicator is cleaned up before
        # opening any connections
        if hasattr(self, "_data_socket"):
            socket_path: str = self._data_socket.getsockname()
            self._data_socket.close()

            if self._party == Party.EXECUTOR:
                os.unlink(socket_path)

    @property
    def socket_path(self) -> str:
        socket_path: str = self._data_socket.getsockname()
        return socket_path

    def __exit__(self):
        self._clean_up()

READ_TIMEOUT: float = 0.01 class-attribute instance-attribute

Maximum time to wait to retrieve data.

__init__(party=Party.TASK, use_pickle=True)

IPC over a Unix socket.

Unlike with the PipeCommunicator, pickle is always used to send data through the socket.

Parameters:
  • party (Party, default: TASK ) –

    Which object (side/process) the Communicator is managing IPC for. I.e., is this the "Task" or "Executor" side.

  • use_pickle (bool, default: True ) –

    Whether to use pickle. Always True currently, passing False does not change behaviour.

Source code in lute/execution/ipc.py
def __init__(self, party: Party = Party.TASK, use_pickle: bool = True) -> None:
    """IPC over a Unix socket.

    Unlike with the PipeCommunicator, pickle is always used to send data
    through the socket.

    Args:
        party (Party): Which object (side/process) the Communicator is
            managing IPC for. I.e., is this the "Task" or "Executor" side.

        use_pickle (bool): Whether to use pickle. Always True currently,
            passing False does not change behaviour.
    """
    super().__init__(party=party, use_pickle=use_pickle)
    self.desc: str = "Communicates through a Unix socket."

    self._data_socket: socket.socket = self._create_socket()
    self._data_socket.setblocking(0)

read(proc)

Read data from a socket.

Socket(s) are continuously monitored, and read from when new data is available.

Parameters:
  • proc (Popen) –

    The process to read from. Provided for compatibility with other Communicator subtypes. Is ignored.

Returns:
  • msg( Message ) –

    The message read, containing contents and signal.

Source code in lute/execution/ipc.py
def read(self, proc: subprocess.Popen) -> Message:
    """Read data from a socket.

    Socket(s) are continuously monitored, and read from when new data is
    available.

    Args:
        proc (subprocess.Popen): The process to read from. Provided for
            compatibility with other Communicator subtypes. Is ignored.

    Returns:
         msg (Message): The message read, containing contents and signal.
    """
    has_data, _, has_error = select.select(
        [self._data_socket],
        [],
        [self._data_socket],
        SocketCommunicator.READ_TIMEOUT,
    )

    msg: Message
    if has_data:
        connection, _ = has_data[0].accept()
        full_data: bytes = b""
        while True:
            data: bytes = connection.recv(1024)
            if data:
                full_data += data
            else:
                break
        msg = pickle.loads(full_data) if full_data else Message()
        connection.close()
    else:
        msg = Message()

    return msg

write(msg)

Send a single Message.

The entire Message (signal and contents) is serialized and sent through a connection over Unix socket.

Parameters:
  • msg (Message) –

    The Message to send.

Source code in lute/execution/ipc.py
def write(self, msg: Message) -> None:
    """Send a single Message.

    The entire Message (signal and contents) is serialized and sent through
    a connection over Unix socket.

    Args:
        msg (Message): The Message to send.
    """
    self._write_socket(msg)