Bases: Task
Task that merges stream files located within a directory tree.
Source code in lute/tasks/sfx_index.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 | class ConcatenateStreamFiles(Task):
"""
Task that merges stream files located within a directory tree.
"""
def __init__(self, *, params: ConcatenateStreamFilesParameters) -> None:
super().__init__(params=params)
def _run(self) -> None:
self._task_parameters = cast(
ConcatenateStreamFilesParameters, self._task_parameters
)
stream_file_path: Path = Path(self._task_parameters.in_file)
stream_file_list: List[Path] = list(
stream_file_path.rglob(f"*{self._task_parameters.tag}*.stream")
)
processed_file_list = [str(stream_file) for stream_file in stream_file_list]
msg: Message = Message(
contents=f"Merging following stream files: {processed_file_list} into "
f"{self._task_parameters.out_file}",
)
self._report_to_executor(msg)
wfd: BinaryIO
with open(self._task_parameters.out_file, "wb") as wfd:
infile: Path
for infile in stream_file_list:
fd: BinaryIO
with open(infile, "rb") as fd:
shutil.copyfileobj(fd, wfd)
|