[docs]classWorkflowCreationError(SIO3PackException):""" Raised when there is an error creating a workflow. """
[docs]classParsingFailedOn(Enum):""" Enum to represent the part of the workflow that failed to parse. """JSON="json"WORKFLOW="workflow"TASK="task"CHANNEL="channel"FILESYSTEM="filesystem"MOUNT_NAMESPACE="mount_namespace"MOUNT_POINT="mount_point"RESOURCE_GROUP="resource_group"PROCESS="process"STREAM="stream"
[docs]classWorkflowParsingError(SIO3PackException):""" Raised when there is an error parsing a workflow configuration. :param str message: A short description of the error. :param ParsingFailedOn failed_on: The part of the workflow that failed to parse. :param str extra_msg: Additional message to append to the error message. :param dict data: Additional data related to the error. :param str full_message: A full message describing the error, if available. """def__init__(self,message:str,failed_on:ParsingFailedOn,extra_msg:str=None,data:dict=None,full_message:str=None,):""" Initialize the WorkflowParsingError. :param str message: A short description of the error. :param ParsingFailedOn failed_on: The part of the workflow that failed to parse. :param str extra_msg: Additional message to append to the error message. :param dict data: Additional data related to the error. :param str full_message: A full message describing the error, if available. """super().__init__(message)self.message=messageself.failed_on=failed_onself.extra_msg=extra_msgself._full_message=full_messageself.data=dataor{}
[docs]defset_data(self,key:str,value:str):""" Set additional data for the exception. :param key: The key for the data. :param value: The value for the data. """self.data[key]=value
def_generate_full_message(self):""" Generate a full message for the exception if not provided. """deftask_name():msg=f"task {self.data['task_index']}"if"task_name"inself.data:msg+=f" ({self.data['task_name']})"returnmsgmsg=Noneifself.failed_on==ParsingFailedOn.WORKFLOW:msg=f"Workflow parsing failed while parsing top-level workflow definition."elifself.failed_on==ParsingFailedOn.TASK:msg=f"Workflow parsing failed while parsing {task_name()}."elifself.failed_on==ParsingFailedOn.CHANNEL:msg=f"Workflow parsing failed while parsing channel configuration {self.data['channel_index']} for {task_name()}."elifself.failed_on==ParsingFailedOn.FILESYSTEM:msg=f"Workflow parsing failed while parsing filesystem configuration {self.data['filesystem_index']} for {task_name()}."elifself.failed_on==ParsingFailedOn.MOUNT_NAMESPACE:msg=f"Workflow parsing failed while parsing mount namespace {self.data['mount_namespace_index']} for {task_name()}."elifself.failed_on==ParsingFailedOn.MOUNT_POINT:msg=f"Workflow parsing failed while parsing mount point {self.data['mountpoint_index']} for mount namespace {self.data['mount_namespace_index']} in {task_name()}."elifself.failed_on==ParsingFailedOn.RESOURCE_GROUP:msg=f"Workflow parsing failed while parsing resource group {self.data['resource_group_index']} for {task_name()}."elifself.failed_on==ParsingFailedOn.PROCESS:msg=f"Workflow parsing failed while parsing process {self.data['process_index']} for {task_name()}."elifself.failed_on==ParsingFailedOn.STREAM:msg=f"Workflow parsing failed while parsing stream {self.data['fd']} for process {self.data['process_index']} for {task_name()}."ifmsgandself.extra_msg:msg+=" "+self.extra_msgreturnmsg