Source code for pype.utils.pipeline

"""Pipeline configuration and execution system.

This module manages the complete pipeline lifecycle:
- Loading pipeline definitions from YAML
- Validating pipeline configurations
- Resolving dependencies between components
- Processing pipeline arguments
- Executing pipeline components
- Managing job submission and tracking

Key classes:
- Pipeline: Main pipeline configuration container
- PipelineItem: Individual executable pipeline component
- PipelineConfig: Configuration validation and loading
- JobState: Pipeline execution state tracking
"""

import os
import re
import yaml
from argparse import ArgumentParser, RawTextHelpFormatter
from pydoc import locate
from copy import copy
from typing import List, Dict, Any, Optional
from pathlib import Path

from pype.misc import package_files, get_module_method, generate_uid
from pype.utils.arguments import (
    compose_batch_description,
    PipelineItemArguments,
    get_arg_from_string,
)
from pype.modules.snippets import PYPE_SNIPPETS_MODULES
from pype.__config__ import PYPE_PIPELINES, PYPE_QUEUES
from pype.__version__ import PIPELINES_API
from pype.exceptions import (
    PipelineError,
    PipelineVersionError,
    PipelineItemError,
    SnippetError,
    SnippetNotFoundError,
)


[docs] def get_visible_pipelines() -> List[str]: """Get list of available pipeline names. Returns: List of pipeline names, excluding those starting with __ Raises: PipelineError: If pipeline directory cannot be read """ try: visible_pipelines = [] pipelines = package_files(PYPE_PIPELINES, ".yaml") for pipeline in sorted(pipelines): pipe_name = Path(pipeline).stem if not pipe_name.startswith("__"): visible_pipelines.append(pipe_name) return visible_pipelines except Exception as e: raise PipelineError(f"Failed to list pipelines: {str(e)}") from e
[docs] def get_pipelines(subparsers, pipes: Dict[str, Any]) -> Dict[str, Any]: """Get all available pipelines and add them to parser.""" pipelines = package_files(PYPE_PIPELINES, ".yaml") for pipeline in sorted(pipelines): try: with open(pipeline, "rb") as pipe: pipe_dict = yaml.safe_load(pipe) pipe_name = os.path.basename(os.path.splitext(pipeline)[0]) if subparsers: if pipe_name.startswith("__"): subparsers.add_parser(pipe_name, add_help=False) else: help_parser = pipe_dict["info"]["description"] subparsers.add_parser( pipe_name, help=help_parser, add_help=False ) pipes[pipe_name] = Pipeline(pipeline, pipe_name) except AttributeError: pass except Exception as e: raise PipelineError( f"Failed to load pipelines", context={"error": str(e)} ) from e return pipes
def cmp(a: Any, b: Any) -> int: return (a > b) - (a < b) def compare_version(version1: str, version2: str) -> int: def normalize(v: str) -> List[int]: return [int(x) for x in re.sub(r"(\.0+)*$", "", v).split(".")] return cmp(normalize(version1), normalize(version2)) def dump_arguments( item_arguments: PipelineItemArguments, pipeline_item: Optional[Any] = None ) -> List[Any]: """Extract all arguments from pipeline items recursively. Args: item_arguments: Arguments container to process pipeline_item: Optional parent pipeline item for dependency processing Returns: List of processed arguments Raises: PipelineError: If argument processing fails """ try: arguments = [] for argument in item_arguments.arguments: try: arguments += dump_arguments(argument.arguments) except AttributeError: arguments.append(argument) if pipeline_item is not None: try: for dep in pipeline_item.deps: arguments += dump_arguments(dep.arguments, dep) except AttributeError: pass return arguments except Exception as e: raise PipelineError(f"Failed to process arguments: {str(e)}") from e
[docs] class PipelineItem:
[docs] def __init__(self, item: Dict[str, Any]): self.name = item["name"] self.arguments = PipelineItemArguments() for argument in item["arguments"]: try: arg_type = argument["type"] except KeyError: arg_type = "argv_arg" self.arguments.add_argument(argument, arg_type) self.type = item["type"] self.jobs = [] self.requirements = {} try: self.requirements = item["requirements"] except KeyError: if self.type == "snippet" or self.type == "batch_snippet": # At least print the missing snippet to # be fixed within issue #3 try: snippet = copy(PYPE_SNIPPETS_MODULES[self.name]) self.requirements = snippet.requirements() except KeyError: raise Exception("Could not find a snippet named " "%s" % self.name) try: self.deps = [PipelineItem(x) for x in item["dependencies"]["items"]] except KeyError: pass try: self.mute = item["mute"] if self.mute is True: pass else: self.mute = False except KeyError: self.mute = False
[docs] def run( self, argv: List[str], queue: str, profile: str, log: Any, jobs: List[Any] ) -> List[Any]: self.jobs = [] item_run = None try: for dep in self.deps: res = dep.run(argv, queue, profile, log, jobs) if res: self.jobs += res except AttributeError: pass self.jobs += jobs possible_types = ("snippet", "pipeline", "batch_snippet", "batch_pipeline") if self.type in possible_types: if self.type == "snippet": item_run = exec_snippet(self, argv, queue, profile, log) elif self.type == "pipeline": item_run = exec_pipeline(self, argv, queue, profile, log) elif self.type == "batch_snippet": item_run = batch_exec_unit( self, argv, queue, profile, log, exec_snippet_unit ) elif self.type == "batch_pipeline": item_run = batch_exec_unit( self, argv, queue, profile, log, exec_pipeline_unit ) if self.mute is True: return self.jobs return item_run raise Exception( "PipelineItem %s is not in the possible types %s" % (self.type, possible_types) )
[docs] class Pipeline:
[docs] def __init__(self, path: str, name: str): self.__path__ = path self.__name__ = name self.__results__ = [] with open(self.__path__, "rb") as file: pipeline = yaml.safe_load(file) for key in pipeline: setattr(self, key, pipeline[key]) self.pipelineitems = [] for x in self.items: self.pipelineitems.append(PipelineItem(x)) try: api_version = self.info["api"] except KeyError: raise Exception( "Can't find the pipeline API " "version for pipeline %s" % self.__name__ ) version_diff = compare_version(api_version, PIPELINES_API) if version_diff != 0: raise Exception( ( "%s pipeline API version - %s - differs from " "the supported API version - %s -" ) % (self.__name__, api_version, PIPELINES_API) )
[docs] def submit( self, parser: ArgumentParser, argv: List[str], queue: str, profile: str, log: Any, jobs: Optional[List[Any]] = None, ): if jobs is None: jobs = [] log.log.info("Prepare argument parser for pipeline") parse_snippets = pipeline_argparse_ui(self, parser, log) log.log.info("Parse arguments %s" % ", ".join(argv)) args = parse_snippets.parse_args(argv) args = vars(args) log.log.info("Run all snippets with arguments %s" % args) for item in self.pipelineitems: self.__results__ += item.run(args, queue, profile, log, jobs)
def pipeline_argparse_ui( pipeline: Pipeline, parser: ArgumentParser, log: Any ) -> ArgumentParser: parse_snippets = parser.add_parser( pipeline.__name__, help=pipeline.info["description"], add_help=False, formatter_class=RawTextHelpFormatter, ) log.log.info("Retrieve all arguments required in the pipeline snippets") arguments = [] arguments_items = [] arguments_values = [] parser_batch = None parser_opt = None for item in pipeline.pipelineitems: arguments_items += dump_arguments(item.arguments) try: for dep in item.deps: arguments_items += dump_arguments(dep.arguments, dep) except AttributeError: pass for arg in arguments_items: if arg.value not in arguments_values: arguments_values.append(arg.value) arguments.append(arg) log.log.info( ("Use unique tags %s with specified " "type to the pipeline argument parser") % ", ".join([a.value for a in arguments]) ) parser_req = parse_snippets.add_argument_group( title="Required", description="Required pipeline arguments" ) try: batch_args = len(pipeline.info["batches"].keys()) if batch_args >= 1: parser_batch = parse_snippets.add_argument_group( title="Batches", description=("Arguments requiring a batch file") ) except KeyError: pass try: default_args = len(pipeline.info["defaults"].keys()) if default_args >= 1: parser_opt = parse_snippets.add_argument_group( title="Optional", description=("Optional pipeline arguments") ) except KeyError: pass for arg_obj in arguments: arg = arg_obj.value nargs = arg_obj.nargs arg_str_dict = get_arg_from_string(arg) arg = arg_str_dict["arg"] arg_type = arg_str_dict["arg_type"] if arg is not None: try: description = "%s, type: %s" % ( pipeline.info["arguments"][arg], arg_type, ) except KeyError: description = "%s, type: %s" % (arg, arg_type) try: default_val = pipeline.info["defaults"][arg] description = "%s. Default: %s" % (description, default_val) except KeyError: default_val = False try: batch_description = pipeline.info["batches"][arg] description = compose_batch_description(batch_description, description) except KeyError: batch_description = False if arg_obj.action in ["store_true", "store_false"]: parser_req.add_argument( "--%s" % arg, dest=arg, help=description, action=arg_obj.action ) elif default_val is False: if batch_description is False: parser_req.add_argument( "--%s" % arg, dest=arg, help=description, type=locate(arg_type), nargs=nargs, action=arg_obj.action, required=True, ) else: parser_batch.add_argument( "--%s" % arg, dest=arg, help=description, type=locate(arg_type), required=True, ) else: parser_opt.add_argument( "--%s" % arg, dest=arg, help=description, type=locate(arg_type), nargs=nargs, action=arg_obj.action, default=default_val, ) return parse_snippets def flat_list(S: List[Any]) -> List[Any]: if S == []: return S if isinstance(S[0], list): return flat_list(S[0]) + flat_list(S[1:]) return S[:1] + flat_list(S[1:]) def arg_dict_to_str(arg_dict: Dict[str, Any], arg_str: List[str]) -> List[str]: for key in arg_dict: if isinstance(arg_dict[key], list): arg_dict[key] = " ".join(arg_dict[key]) for key in arg_dict: if arg_dict[key] is not False: arg_str.append(key) if arg_dict[key] is not True: arg_str.append(arg_dict[key]) return arg_str def exec_snippet_unit( item: PipelineItem, arg_dict: Dict[str, Any], queue: str, profile: str, log: Any ) -> List[Any]: arg_str = [] results = None snippet = copy(PYPE_SNIPPETS_MODULES[item.name]) if not isinstance(arg_dict, dict): raise Exception("More then one set of argument %ss" % arg_dict) arg_str = arg_dict_to_str(arg_dict, arg_str) log.log.info("Snippet %s relevant item.arguments: %s" % (item.name, arg_dict)) friendly_name = snippet.friendly_name(arg_dict) try: results = snippet.results(arg_dict) results = flat_list(list(results.values())) completed = all(os.path.isfile(x) for x in results) log.log.info( "Results file(s) for snippet %s: %s" % (item.name, ", ".join(results)) ) except AttributeError: completed = False if completed: log.log.info( ("Found results file(s) %s: " "skipping execution of snippet %s") % (", ".join(results), item.name) ) return [] log.log.info("Submit Snippet %s with queue : %s" % (item.name, queue)) queue_name = "%s_%s_%s" % (generate_uid(), friendly_name, queue) log.add_log(queue_name) queuelog = log.programs_logs[queue_name] log.log.info( ("Add log information for snippets %s " "(for results %s) to folder %s") % (item.name, ", ".join(results), queuelog.__path__) ) queuelog.log.info("Execute snippet %s with queue %s" % (item.name, queue)) queue_exec = get_module_method(PYPE_QUEUES, queue, "submit") if len(item.jobs) > 0: log.log.info( "Snippets %s on queue %s depends on jobs: %s" % (item.name, queue, ", ".join(map(str, item.jobs))) ) res_queue = queue_exec( " ".join(map(str, [item.name] + arg_str)), friendly_name, item.requirements, item.jobs, queuelog, profile, ) log.log.info("Snippets %s returned %s" % (item.name, res_queue)) return [res_queue] def exec_pipeline_unit( item: PipelineItem, arg_dict: Dict[str, Any], queue: str, profile: str, log: Any ) -> List[Any]: parser = ArgumentParser(prog="pype", description="exec_pipeline") subparsers = parser.add_subparsers(dest="modules") this_pipeline = get_pipelines(subparsers, {})[item.name] arg_str = [] arg_str = [] if not isinstance(arg_dict, dict): raise Exception("More then one set of argument %ss" % arg_dict) arg_str = arg_dict_to_str(arg_dict, arg_str) # may as well try to change pype.utils.arguments.Argument.to_argv # return in case of store_true/false log.log.info("Pipeline %s relevant item.arguments: %s" % (item.name, arg_dict)) this_pipeline.submit(subparsers, arg_str, queue, profile, log, item.jobs) return this_pipeline.__results__ def exec_snippet( item: PipelineItem, argv: List[str], queue: str, profile: str, log: Any ) -> List[Any]: arg_dict = item.arguments.to_dict(argv) results = exec_snippet_unit(item, arg_dict, queue, profile, log) return results def exec_pipeline( item: PipelineItem, argv: List[str], queue: str, profile: str, log: Any ) -> List[Any]: arg_dict = item.arguments.to_dict(argv) results = exec_pipeline_unit(item, arg_dict, queue, profile, log) return results def batch_exec_unit( item: PipelineItem, argv: List[str], queue: str, profile: str, log: Any, exec_unit: Any, ) -> List[Any]: results = [] arg_dict = item.arguments.to_dict(argv) for tmp_argv in arg_dict: results += exec_unit(item, tmp_argv, queue, profile, log) return results