[docs]classWorkflowOperation:""" A class to handle workflow operations, allowing for the retrieval of workflows and the return of results from those workflows. :param callable get_workflow_func: Function to retrieve workflows. :param bool return_results: Whether to return results from the workflow. :param callable return_results_func: Function to handle returning results. :param list[Any] wf_args: Additional positional arguments for the workflow function. :param dict[str, Any] wf_kwargs: Additional keyword arguments for the workflow function. """def__init__(self,get_workflow_func:callable,return_results:bool=False,return_results_func:callable=None,*wf_args,**wf_kwargs):""" Initialize the WorkflowOperation with a function to get workflows and optionally a function to return results. :param callable get_workflow_func: Function to retrieve workflows. :param bool return_results: Whether to return results from the workflow. :param callable return_results_func: Function to handle returning results. :param wf_args: Additional positional arguments for the workflow function. :param wf_kwargs: Additional keyword arguments for the workflow function. """self.get_workflow_func=get_workflow_funcself.return_results=return_resultsself.return_results_func=return_results_funcself._last=Falseself._data=Noneself._workflow=Noneself._workflow_args=wf_argsself._workflow_kwargs=wf_kwargs
[docs]defget_workflow(self):""" A function to retrieve workflows. It yields workflows until the last one is reached. This function uses the provided `get_workflow_func` to get workflows and yields them one by one. """whilenotself._last:self._workflow,self._last=self.get_workflow_func(self._data,*self._workflow_args,**self._workflow_kwargs)yieldself._workflow
defreturn_results(self,data:dict):""" A function to return results from the workflow. If `return_results_func` is provided, it will be called with the workflow and data. Also stores the data internally for next workflow retrieval. """ifself.return_results_func:returnself.return_results_func(self._workflow,data)self._data=data