Source code for sio3pack.workflow.execution.process

from sio3pack.exceptions import ParsingFailedOn, WorkflowParsingError
from sio3pack.workflow.execution.descriptors import DescriptorManager
from sio3pack.workflow.execution.mount_namespace import MountNamespace
from sio3pack.workflow.execution.resource_group import ResourceGroup


[docs] class Process: """ A class to represent a process in a workflow. :param Workflow workflow: The workflow the process belongs to. :param Executiontask task: The task which the process belongs to. :param list[str] arguments: Executable arguments for the process. :param dict[str, str] environment: Environment variables for the process. :param str image: The image of the process, which can be a Docker image or similar. :param MountNamespace mount_namespace: The mount namespace to use for the process. :param ResourceGroup resource_group: The resource group of the process. :param str working_directory: The working directory of the process. :param int pid_namespace: The PID namespace of the process. :param list[int] start_after: The processes that must be finished before this process starts. """ def __init__( self, workflow: "Workflow", task: "ExecutionTask", arguments: list[str] = None, environment: dict = None, image: str = "", mount_namespace: MountNamespace = None, resource_group: ResourceGroup = None, pid_namespace: int = 0, working_directory: str = "/", start_after: list[int] = None, ): """ Represent a process. :param arguments: The arguments of the process. :param environment: The environment of the process. :param image: The image of the process. :param mount_namespace: The mount namespace of the process. :param resource_group: The resource group of the process. :param working_directory: The working directory of the process. :param pid_namespace: The PID namespace of the process. :param task: The task of the process. :param workflow: The workflow of the process. :param start_after: The processes that must be finished before this process starts. """ self.arguments = arguments or [] self.environment = environment or {} self.image = image self.mount_namespace = mount_namespace self.resource_group = resource_group self.pid_namespace = pid_namespace self.working_directory = working_directory self.task = task self.workflow = workflow self.descriptor_manager = DescriptorManager(workflow.objects_manager, task.filesystem_manager) self.start_after = start_after or []
[docs] def to_json(self) -> dict: """ Convert the process to a JSON-serializable dictionary. """ return { "arguments": self.arguments, "environment": [f"{key}={value}" for key, value in self.environment.items()], "image": self.image, "mount_namespace": self.mount_namespace.id, "resource_group": self.resource_group.id, "pid_namespace": self.pid_namespace, "working_directory": self.working_directory, "descriptors": self.descriptor_manager.to_json(), "start_after": self.start_after, }
[docs] @classmethod def from_json(cls, data: dict, workflow: "Workflow", task: "Task"): """ Create a new process from a dictionary. :param data: The dictionary to create the process from. :param workflow: The workflow the process belongs to. :param task: The task the process belongs to. """ for key, type in [ ("arguments", list), ("environment", list), ("image", str), ("mount_namespace", int), ("resource_group", int), ("pid_namespace", int), ("working_directory", str), ("descriptors", dict), ]: if key not in data: raise WorkflowParsingError( f"Failed parsing process.", ParsingFailedOn.PROCESS, f"Missing key '{key}' in process data.", ) if not isinstance(data[key], type): raise WorkflowParsingError( f"Failed parsing process.", ParsingFailedOn.PROCESS, f"Key '{key}' in process data is not of type {type.__name__}.", ) if "start_after" in data and not isinstance(data["start_after"], list): raise WorkflowParsingError( f"Failed parsing process.", ParsingFailedOn.PROCESS, "Key 'start_after' in process data is not of type list.", ) env = {} for var in data["environment"]: if "=" not in var: raise WorkflowParsingError( f"Failed parsing process.", ParsingFailedOn.PROCESS, f"Environment variable '{var}' does not contain an '=' sign.", ) key, value = var.split("=", 1) env[key] = value try: mount_namespace = task.mountnamespace_manager.get_by_id(data["mount_namespace"]) except IndexError: raise WorkflowParsingError( f"Failed parsing process.", ParsingFailedOn.PROCESS, f"Mount namespace with ID {data['mount_namespace']} not found.", ) try: resource_group = task.resource_group_manager.get_by_id(data["resource_group"]) except IndexError: raise WorkflowParsingError( f"Failed parsing process.", ParsingFailedOn.PROCESS, f"Resource group with ID {data['resource_group']} not found.", ) process = cls( workflow, task, data["arguments"], env, data["image"], mount_namespace, resource_group, data["pid_namespace"], data["working_directory"], data.get("start_after", []), ) process.descriptor_manager.from_json(data["descriptors"]) return process
[docs] def replace_templates(self, replacements: dict[str, str]): """ Replace strings in the process with the given replacements. :param dict[str, str] replacements: The replacements to make. """ for key, value in replacements.items(): if key in self.image: self.image = self.image.replace(key, value) if key in self.arguments: self.arguments = [arg.replace(key, value) for arg in self.arguments] for _, desc in self.descriptor_manager.items(): desc.replace_templates(replacements)