Source code for sio3pack.workflow.execution.resource_group

from sio3pack.exceptions.workflow import ParsingFailedOn, WorkflowParsingError


[docs] class ResourceGroup: """ A resource group is a set of limits that can be applied to a task. It can limit CPU usage, instruction usage, memory usage, and more. :param id: The id of the resource group. :param cpu_usage_limit: The CPU usage limit. :param instruction_limit: The instruction usage limit. :param memory_limit: The memory limit. :param oom_terminate_all_tasks: Whether to terminate all tasks on OOM. :param pid_limit: The PID limit. :param swap_limit: The swap limit. :param time_limit: The time limit. """ def __init__( self, cpu_usage_limit: int = 100.0, instruction_limit: int = 1e9, memory_limit: int = 2147483648, oom_terminate_all_tasks: bool = False, pid_limit: int = 2, swap_limit: int = 0, time_limit: int = 1e9, id: int = None, ): """ Create a new resource group. :param id: The id of the resource group. :param cpu_usage_limit: The CPU usage limit. :param instruction_limit: The instruction usage limit. :param memory_limit: The memory limit. :param oom_terminate_all_tasks: Whether to terminate all tasks on OOM. :param pid_limit: The PID limit. :param swap_limit: The swap limit. :param time_limit: The time limit. """ self.id = id self.cpu_usage_limit = cpu_usage_limit self.instruction_limit = instruction_limit self.memory_limit = memory_limit self.oom_terminate_all_tasks = oom_terminate_all_tasks self.pid_limit = pid_limit self.swap_limit = swap_limit self.time_limit = time_limit def _set_id(self, id: int): """ Set the id of the resource group. Should only be used by the ResourceGroupManager. :param id: The id to set. """ self.id = id
[docs] def set_limits(self, cpu_usage_limit: int, instruction_limit: int, memory_limit: int, time_limit: int): """ Set the limits of the resource group. :param cpu_usage_limit: The CPU usage limit. :param instruction_limit: The instruction usage limit. :param memory_limit: The memory limit. :param time_limit: The time limit. """ self.cpu_usage_limit = cpu_usage_limit self.instruction_limit = instruction_limit self.memory_limit = memory_limit self.time_limit = time_limit
[docs] @classmethod def from_json(cls, data: dict, id: int): """ Create a new resource group from a dictionary. :param data: The dictionary to create the resource group from. :param id: The id of the resource group. """ for key, type in [ ("cpu_usage_limit", float), ("instruction_limit", int), ("memory_limit", int), ("oom_terminate_all_tasks", bool), ("pid_limit", int), ("swap_limit", int), ("time_limit", int), ]: if key not in data: raise WorkflowParsingError( "Parsing resource group failed.", ParsingFailedOn.RESOURCE_GROUP, f"Missing key '{key}' in resource group data.", ) if not isinstance(data[key], type): raise WorkflowParsingError( "Parsing resource group failed.", ParsingFailedOn.RESOURCE_GROUP, f"Key '{key}' in resource group data is not of type {type.__name__}.", ) return cls( data["cpu_usage_limit"], data["instruction_limit"], data["memory_limit"], data["oom_terminate_all_tasks"], data["pid_limit"], data["swap_limit"], data["time_limit"], id, )
[docs] def to_json(self) -> dict: """ Convert the resource group to a dictionary. """ return { "cpu_usage_limit": self.cpu_usage_limit, "instruction_limit": self.instruction_limit, "memory_limit": self.memory_limit, "oom_terminate_all_tasks": self.oom_terminate_all_tasks, "pid_limit": self.pid_limit, "swap_limit": self.swap_limit, "time_limit": self.time_limit, }
[docs] class ResourceGroupManager: """ A class to manage resource groups in a workflow. Allows creation, retrieval, and management of resource groups. """ def __init__(self, task: "Task"): """ Create a new resource group manager. :param task: The task the resource group manager belongs to. """ self.resource_groups: list[ResourceGroup] = [] self.id = 0
[docs] def add(self, resource_group: ResourceGroup): """ Add a resource group to the resource group manager. :param ResourceGroup resource_group: The resource group to add. """ resource_group._set_id(self.id) self.resource_groups.append(resource_group) self.id += 1
[docs] def get_by_id(self, id: int) -> ResourceGroup: """ Get a resource group by its id. :param id: The id of the resource group to get. """ return self.resource_groups[id]
[docs] def to_json(self) -> list[dict]: """ Convert the resource group manager to a dictionary. """ return [resource_group.to_json() for resource_group in self.resource_groups]
[docs] def from_json(self, data: list[dict]): """ Create a new resource group manager from a list of dictionaries. :param data: The list of dictionaries to create the resource group manager from. """ for i, resource_group in enumerate(data): try: self.add(ResourceGroup.from_json(resource_group, self.id)) except WorkflowParsingError as e: e.set_data("resource_group_index", str(i)) raise e self.id += 1
[docs] def all(self) -> list[ResourceGroup]: """ Get all resource groups. """ return self.resource_groups