Source code for sio3pack.workflow.execution.stream

from enum import Enum

from sio3pack.exceptions import ParsingFailedOn, WorkflowParsingError
from sio3pack.workflow.execution.filesystems import Filesystem, FilesystemManager
from sio3pack.workflow.object import Object, ObjectsManager


[docs] class StreamType(Enum): """ Enum representing the type of stream. """ FILE = "file" NULL = "null" OBJECT_READ = "object_read" OBJECT_WRITE = "object_write" PIPE_READ = "pipe_read" PIPE_WRITE = "pipe_write"
[docs] class FileMode(Enum): """ Enum representing the mode of a file stream. """ READ = "read" READ_WRITE = "read_write" READ_WRITE_APPEND = "read_write_append" READ_WRITE_TRUNCATE = "read_write_truncate" WRITE = "write" WRITE_APPEND = "write_append" WRITE_TRUNCATE = "write_truncate"
[docs] class Stream: """ Base class for all streams. :param StreamType type: The type of the stream. """ _required_keys = ["type"] def __init__(self, type: StreamType): """ Initialize the stream. :param StreamType type: The type of the stream. """ self.type = type @classmethod def _check_required_keys(cls, data): for key in cls._required_keys: if key not in data: raise WorkflowParsingError( f"Failed parsing stream.", ParsingFailedOn.STREAM, f"Missing key '{key}' in stream data.", )
[docs] @classmethod def from_json(cls, data: dict, objects_manager: ObjectsManager, filesystem_manager: FilesystemManager) -> "Stream": """ Create a stream from a JSON-serializable dictionary. :param dict data: The JSON-serializable dictionary to create the stream from. :param ObjectsManager objects_manager: The objects manager. :param FilesystemManager filesystem_manager: The filesystem manager. """ type = StreamType(data["type"]) if type == StreamType.FILE: return FileStream.from_json(filesystem_manager, data) elif type == StreamType.NULL: return NullStream.from_json(data) elif type in (StreamType.OBJECT_READ, StreamType.OBJECT_WRITE): return ObjectStream.from_json(data, objects_manager) elif type in (StreamType.PIPE_READ, StreamType.PIPE_WRITE): return PipeStream.from_json(data) else: raise ValueError(f"Unknown stream type: {type}")
[docs] def to_json(self) -> dict: """ Convert the stream to a JSON-serializable dictionary. """ raise NotImplementedError("Subclasses must implement to_json method")
[docs] def replace_templates(self, replacements: dict[str, str]): """ Replace strings in the stream with the given replacements. This method is a no-op for streams that do not support template replacement. """ pass
[docs] class FileStream(Stream): """ Class representing a file stream. A file will be opened and passed to the process as a file descriptor. :param Filesystem filesystem: The filesystem to use. :param str path: The path to the file. :param FileMode mode: The mode to open the file in. """ _required_keys = ["type", "filesystem", "path", "mode"] def __init__(self, filesystem: Filesystem, path: str, mode: FileMode): super().__init__(StreamType.FILE) self.filesystem = filesystem self.path = path self.mode = mode
[docs] @classmethod def from_json(cls, filesystem_manager: FilesystemManager, data: dict) -> "FileStream": """ Create a file stream from a JSON-serializable dictionary. :param FilesystemManager filesystem_manager: The filesystem manager. :param dict data: The JSON-serializable dictionary to create the file stream from. """ cls._check_required_keys(data) try: filesystem = filesystem_manager.get_by_id(data["filesystem"]) except KeyError: raise WorkflowParsingError( "Failed parsing file stream", ParsingFailedOn.STREAM, f"Invalid filesystem ID {data['filesystem']} in file stream data.", ) return cls( filesystem, data["path"], FileMode(data["mode"]), )
[docs] def to_json(self) -> dict: """ Convert the file stream to a dictionary. :return: The dictionary representation of the file stream. """ return { "type": self.type.value, "filesystem": self.filesystem.id, "path": self.path, "mode": self.mode.value, }
[docs] class NullStream(Stream): """ Class representing a null stream. """ def __init__(self): super().__init__(StreamType.NULL)
[docs] @classmethod def from_json(cls, data: dict) -> "NullStream": """ Create a null stream from a JSON-serializable dictionary. :param dict data: The JSON-serializable dictionary to create the null stream from. """ cls._check_required_keys(data) return cls()
[docs] def to_json(self) -> dict: """ Convert the null stream to a dictionary. :return: The dictionary representation of the null stream. """ return { "type": self.type.value, }
[docs] class ObjectStream(Stream): """ A base class for object streams. An object stream is a stream that reads or writes to an object via a file descriptor. :param StreamType type: The type of the stream. :param Object object: The object to use. """ _required_keys = ["type", "handle"] def __init__(self, type: StreamType, object: Object): if type not in (StreamType.OBJECT_READ, StreamType.OBJECT_WRITE): raise ValueError("Invalid stream type for ObjectStream") super().__init__(type) self.object = object
[docs] @classmethod def from_json(cls, data: dict, objects_manager: ObjectsManager) -> "ObjectStream": """ Create an object stream from a JSON-serializable dictionary. :param dict data: The JSON-serializable dictionary to create the object stream from. """ cls._check_required_keys(data) cl = ObjectReadStream if StreamType(data["type"]) == StreamType.OBJECT_READ else ObjectWriteStream return cl( objects_manager.get_or_create_object(data["handle"]), )
[docs] def to_json(self) -> dict: """ Convert the object stream to a dictionary. """ return { "type": self.type.value, "handle": self.object.handle, }
[docs] def replace_templates(self, replacements: dict[str, str]): """ Replace strings in the object stream with the given replacements. """ self.object.replace_templates(replacements) super().replace_templates(replacements)
[docs] class ObjectReadStream(ObjectStream): """ Class representing an object read stream. An object read stream is a stream that reads from an object via a file descriptor. :param Object object: The object to read from. """ def __init__(self, object: Object): """ Initialize the object read stream. :param Object object: The object to read from. """ super().__init__(StreamType.OBJECT_READ, object)
[docs] class ObjectWriteStream(ObjectStream): """ Class representing an object write stream. An object write stream is a stream that writes to an object via a file descriptor. :param Object object: The object to write to. """ def __init__(self, object: Object): """ Initialize the object write stream. :param Object object: The object to write to. """ super().__init__(StreamType.OBJECT_WRITE, object)
[docs] class PipeStream(Stream): """ A base class for pipe streams. A pipe stream is a stream that reads or writes to a pipe via a file descriptor. :param StreamType type: The type of the stream. :param int pipe_index: The index of the pipe. """ _required_keys = ["type", "pipe"] def __init__(self, type: StreamType, pipe_index: int): """ Initialize the pipe stream. :param StreamType type: The type of the stream. :param int pipe_index: The index of the pipe. """ if type not in (StreamType.PIPE_READ, StreamType.PIPE_WRITE): raise ValueError("Invalid stream type for PipeStream") super().__init__(type) self.pipe_index = pipe_index
[docs] @classmethod def from_json(cls, data: dict) -> "PipeStream": """ Create a pipe stream from a JSON-serializable dictionary. :param dict data: The JSON-serializable dictionary to create the pipe stream from. """ cls._check_required_keys(data) cl = PipeReadStream if StreamType(data["type"]) == StreamType.PIPE_READ else PipeWriteStream return cl(data["pipe"])
[docs] def to_json(self) -> dict: """ Convert the pipe stream to a dictionary. """ return { "type": self.type.value, "pipe": self.pipe_index, }
[docs] class PipeReadStream(PipeStream): """ Class representing a pipe read stream. A pipe read stream is a stream that reads from a pipe via a file descriptor. :param int pipe_index: The index of the pipe. """ def __init__(self, pipe_index: int): """ Initialize the pipe read stream. :param int pipe_index: The index of the pipe. """ super().__init__(StreamType.PIPE_READ, pipe_index)
[docs] class PipeWriteStream(PipeStream): """ Class representing a pipe write stream. A pipe write stream is a stream that writes to a pipe via a file descriptor. :param int pipe_index: The index of the pipe. """ def __init__(self, pipe_index: int): """ Initialize the pipe write stream. :param int pipe_index: The index of the pipe. """ super().__init__(StreamType.PIPE_WRITE, pipe_index)