Skip to content

sfx_index

Classes for indexing tasks in SFX.

Classes:

Name Description
ConcatenateStreamFIles

task that merges multiple stream files into a single file.

ConcatenateStreamFiles

Bases: Task

Task that merges stream files located within a directory tree.

Source code in lute/tasks/sfx_index.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class ConcatenateStreamFiles(Task):
    """
    Task that merges stream files located within a directory tree.
    """

    def __init__(self, *, params: ConcatenateStreamFilesParameters) -> None:
        super().__init__(params=params)

    def _run(self) -> None:
        self._task_parameters = cast(
            ConcatenateStreamFilesParameters, self._task_parameters
        )
        stream_file_path: Path = Path(self._task_parameters.in_file)
        stream_file_list: List[Path] = list(
            stream_file_path.rglob(f"*{self._task_parameters.tag}*.stream")
        )

        processed_file_list = [str(stream_file) for stream_file in stream_file_list]

        msg: Message = Message(
            contents=f"Merging following stream files: {processed_file_list} into "
            f"{self._task_parameters.out_file}",
        )
        self._report_to_executor(msg)

        wfd: BinaryIO
        with open(self._task_parameters.out_file, "wb") as wfd:
            infile: Path
            for infile in stream_file_list:
                fd: BinaryIO
                with open(infile, "rb") as fd:
                    shutil.copyfileobj(fd, wfd)