import re
from sio3pack.exceptions import ParsingFailedOn, WorkflowParsingError
from sio3pack.workflow.execution.channels import Channel
from sio3pack.workflow.execution.filesystems import Filesystem, FilesystemManager
from sio3pack.workflow.execution.mount_namespace import MountNamespace, MountNamespaceManager
from sio3pack.workflow.execution.process import Process
from sio3pack.workflow.execution.resource_group import ResourceGroup, ResourceGroupManager
from sio3pack.workflow.object import Object
[docs]
class Task:
"""
Base class for a task.
"""
[docs]
@classmethod
def from_json(cls, data: dict, workflow: "Workflow"):
"""
Create a new task from a dictionary.
:param dict data: The dictionary to create the task from.
:param Workflow workflow: The workflow the task belongs to.
"""
if "type" not in data:
raise WorkflowParsingError(
"Parsing task failed.",
ParsingFailedOn.TASK,
"Missing key 'type'.",
{"type": "missing_key"},
)
if data["type"] == "execution":
return ExecutionTask.from_json(data, workflow)
elif data["type"] == "script":
return ScriptTask.from_json(data, workflow)
else:
raise WorkflowParsingError(
"Parsing task failed.",
ParsingFailedOn.TASK,
f"Unknown task type '{data['type']}'.",
{"type": "wrong_type"},
)
[docs]
def to_json(self, reg_map: dict[str, int] = None) -> dict:
"""
Convert the task to a dictionary.
:param reg_map: A mapping of register names to register numbers.
:return dict: The dictionary representation of the task.
"""
raise NotImplementedError("Subclasses must implement this method.")
[docs]
def replace_templates(self, replacements: dict[str, str]):
"""
Replace strings in the task with the given replacements.
:param replacements: The replacements to make.
"""
raise NotImplementedError("Subclasses must implement this method.")
[docs]
class ExecutionTask(Task):
"""
A task that executes a program.
:param name: The name of the task.
:param Workflow workflow: The workflow the task belongs to.
:param bool exclusive: Whether the task is exclusive.
:param int hard_time_limit: The hard time limit.
:param int extra_limit: If set, the hard_time_limit for the task will be the maximum time limit of all resource groups plus this value.
:param int | str output_register: The output register of the task.
:param int pid_namespaces: The number of PID namespaces.
:param list[Process] processes: The processes of the task.
:param int pipes: The number of pipes available to the task.
:param list[Channel] channels: Configuration of the channels for the task.
"""
def __init__(
self,
name: str,
workflow: "Workflow",
exclusive: bool = False,
hard_time_limit: int = None,
extra_limit: int = None,
output_register: int | str = None,
pid_namespaces: int = 1,
processes: list[Process] = None,
pipes: int = 0,
channels: list[Channel] = None,
):
"""
Create a new execution task.
:param name: The name of the task.
:param workflow: The workflow the task belongs to.
:param exclusive: Whether the task is exclusive.
:param hard_time_limit: The hard time limit.
:param extra_limit: If set, the hard_time_limit for the task will be the maximum time limit of all resource groups
plus this value.
:param output_register: The output register of the task.
:param pid_namespaces: The number of PID namespaces.
:param processes: The processes of the task.
:param pipes: The number of pipes.
:param channels: Configuration of the channels for the task.
"""
self.name = name
self.workflow = workflow
self.exclusive = exclusive
if hard_time_limit is not None:
self.hard_time_limit = hard_time_limit
self.output_register = output_register
self.pid_namespaces = pid_namespaces
self.processes = processes or []
self.pipes = pipes
self.extra_limit = extra_limit
self.channels = channels or []
self.filesystem_manager = FilesystemManager(self)
self.mountnamespace_manager = MountNamespaceManager(self, self.filesystem_manager)
self.resource_group_manager = ResourceGroupManager(self)
[docs]
@classmethod
def from_json(cls, data: dict, workflow: "Workflow"):
"""
Create a new execution task from a dictionary.
:param dict data: The dictionary to create the task from.
:param Workflow workflow: The workflow the task belongs to.
"""
channels = []
for i, channel in enumerate(data.get("channels", [])):
try:
channels.append(Channel.from_json(channel))
except WorkflowParsingError as e:
e.set_data("channel_index", str(i))
raise e
for key in [
"name",
"exclusive",
"pid_namespaces",
"pipes",
"output_register",
"filesystems",
"mount_namespaces",
"resource_groups",
"processes",
]:
if key not in data:
raise WorkflowParsingError(
"Parsing task failed.",
ParsingFailedOn.TASK,
f"Missing key '{key}'.",
{"type": "missing_key"},
)
for key in [("hard_time_limit", int), ("exclusive", bool), ("pid_namespaces", int), ("pipes", int)]:
if key[0] in data and not isinstance(data[key[0]], key[1]):
raise WorkflowParsingError(
"Parsing task failed.",
ParsingFailedOn.TASK,
f"Key '{key[0]}' must be of type {key[1].__name__}.",
{"type": "wrong_type"},
)
task = cls(
data["name"],
workflow,
data["exclusive"],
data.get("hard_time_limit"),
output_register=data["output_register"],
pid_namespaces=data["pid_namespaces"],
pipes=int(data["pipes"]),
channels=channels,
)
task.filesystem_manager.from_json(data["filesystems"], workflow)
task.mountnamespace_manager.from_json(data["mount_namespaces"])
task.resource_group_manager.from_json(data["resource_groups"])
task.processes = []
for i, process in enumerate(data.get("processes")):
try:
task.processes.append(Process.from_json(process, workflow, task))
except WorkflowParsingError as e:
e.set_data("process_index", str(i))
raise e
return task
[docs]
def to_json(self, reg_map: dict[str, int] = None) -> dict:
"""
Convert the task to a dictionary.
:param reg_map: A mapping of register names to register numbers.
:return dict: The dictionary representation of the task.
"""
hard_time_limit = getattr(self, "hard_time_limit", 0)
if self.extra_limit is not None:
hard_time_limit = 0
for rg in self.resource_group_manager.all():
hard_time_limit = max(hard_time_limit, rg.time_limit)
hard_time_limit += self.extra_limit
if reg_map:
self.output_register = reg_map.get(self.output_register, self.output_register)
res = {
"name": self.name,
"type": "execution",
"channels": [channel.to_json() for channel in self.channels],
"exclusive": self.exclusive,
"hard_time_limit": hard_time_limit,
"output_register": self.output_register,
"pid_namespaces": self.pid_namespaces,
"filesystems": self.filesystem_manager.to_json(),
"mount_namespaces": self.mountnamespace_manager.to_json(),
"pipes": self.pipes,
"resource_groups": self.resource_group_manager.to_json(),
"processes": [process.to_json() for process in self.processes],
}
return res
[docs]
def add_filesystem(self, filesystem: Filesystem):
"""
Add a filesystem to the task.
:param Filesystem filesystem: The filesystem to add.
"""
self.filesystem_manager.add(filesystem)
[docs]
def add_mount_namespace(self, mount_namespace: MountNamespace):
"""
Add a mount namespace to the task.
:param MountNamespace mount_namespace: The mount namespace to add.
"""
self.mountnamespace_manager.add(mount_namespace)
[docs]
def add_resource_group(self, resource_group: ResourceGroup):
"""
Add a resource group to the task.
:param ResourceGroup resource_group: The resource group to add.
"""
self.resource_group_manager.add(resource_group)
[docs]
def add_process(self, process: Process):
"""
Add a process to the task.
:param Process process: The process to add.
"""
self.processes.append(process)
[docs]
def replace_templates(self, replacements: dict[str, str]):
"""
Replace strings in the task with the given replacements.
:param replacements: The replacements to make.
"""
for process in self.processes:
process.replace_templates(replacements)
self.filesystem_manager.replace_templates(replacements)
for key, value in replacements.items():
if key in self.name:
self.name = self.name.replace(key, value)
if isinstance(self.output_register, str) and key in self.output_register:
self.output_register = self.output_register.replace(key, value)
[docs]
class ScriptTask(Task):
"""
A task that runs a script.
:param str name: The name of the task.
:param Workflow workflow: The workflow the task belongs to.
:param bool reactive: Whether the task is reactive.
:param list[int | str] input_registers: The input registers of the task.
:param list[int | str] output_registers: The output registers of the task.
:param list[Object] objects: The objects the task uses.
:param str script: The script to run.
"""
def __init__(
self,
name: str,
workflow: "Workflow",
reactive: bool = False,
input_registers: list[int | str] = None,
output_registers: list[int | str] = None,
objects: list[Object] = None,
script: str = None,
):
"""
Create a new script task.
:param name: The name of the task.
:param workflow: The workflow the task belongs to.
:param reactive: Whether the task is reactive.
:param input_registers: The input registers of the task.
:param output_registers: The output registers of the task.
:param objects: The objects the task uses.
:param script: The script to run.
"""
self.name = name
self.workflow = workflow
self.reactive = reactive
self.input_registers = input_registers or []
self.output_registers = output_registers or []
self.objects = objects or []
self.script = script
def __str__(self):
return f"<ScriptTask {self.name} reactive={self.reactive}>"
[docs]
@classmethod
def from_json(cls, data: dict, workflow: "Workflow"):
"""
Create a new script task from a dictionary.
:param data: The dictionary to create the task from.
:param workflow: The workflow the task belongs to.
"""
for key, type in [
("name", str),
("reactive", bool),
("input_registers", list),
("output_registers", list),
("script", str),
]:
if key not in data:
raise WorkflowParsingError(
"Parsing task failed.",
ParsingFailedOn.TASK,
f"Missing key '{key}'.",
{"type": "missing_key"},
)
if not isinstance(data[key], type):
raise WorkflowParsingError(
"Parsing task failed.",
ParsingFailedOn.TASK,
f"Key '{key}' must be of type {type.__name__}.",
{"type": "wrong_type"},
)
return cls(
data["name"],
workflow,
data["reactive"],
data["input_registers"],
data["output_registers"],
[workflow.objects_manager.get_or_create_object(obj) for obj in data.get("objects", [])],
data["script"],
)
[docs]
def to_json(self, reg_map: dict[str, int] = None) -> dict:
"""
Convert the task to a dictionary.
:param reg_map: A mapping of register names to register numbers.
:return: The dictionary representation of the task.
"""
if reg_map:
self.input_registers = [reg_map.get(reg, reg) for reg in self.input_registers]
self.output_registers = [reg_map.get(reg, reg) for reg in self.output_registers]
# Now, replace the register names in the script. Since we want this to not be slow
# let's use regex.
reg_pattern = r"<(r:[a-zA-Z0-9_]+)>"
obs_pattern = r"<(obsreg:[a-zA-Z0-9_]+)>"
self.script = re.sub(reg_pattern, lambda m: f"{reg_map.get(m.group(1), m.group(1))}", self.script)
self.script = re.sub(obs_pattern, lambda m: f"{reg_map.get(m.group(1), m.group(1))}", self.script)
res = {
"name": self.name,
"type": "script",
"reactive": self.reactive,
"input_registers": self.input_registers,
"output_registers": self.output_registers,
"script": self.script,
}
if self.objects:
res["objects"] = [obj.handle for obj in self.objects]
return res
[docs]
def replace_templates(self, replacements: dict[str, str]):
"""
Replace strings in the task with the given replacements.
:param replacements: The replacements to make.
"""
for obj in self.objects:
obj.replace_templates(replacements)
for key, value in replacements.items():
if key in self.name:
self.name = self.name.replace(key, value)
if key in self.script:
self.script = self.script.replace(key, value)
new_inputs = []
for reg in self.input_registers:
if isinstance(reg, str) and key in reg:
# If the replacement is a list, extend the new_inputs list with the values
if isinstance(value, list):
new_inputs.extend(value)
else:
new_inputs.append(reg.replace(key, value))
else:
new_inputs.append(reg)
self.input_registers = new_inputs
new_outputs = []
for reg in self.output_registers:
if isinstance(reg, str) and key in reg:
# If the replacement is a list, extend the new_outputs list with the values
if isinstance(value, list):
new_outputs.extend(value)
else:
new_outputs.append(reg.replace(key, value))
else:
new_outputs.append(reg)
self.output_registers = new_outputs