"""Pipeline configuration and execution system.
This module manages the complete pipeline lifecycle:
- Loading pipeline definitions from YAML
- Validating pipeline configurations
- Resolving dependencies between components
- Processing pipeline arguments
- Executing pipeline components
- Managing job submission and tracking
Key classes:
- Pipeline: Main pipeline configuration container
- PipelineItem: Individual executable pipeline component
- PipelineConfig: Configuration validation and loading
- JobState: Pipeline execution state tracking
"""
import os
import re
import yaml
from argparse import ArgumentParser, RawTextHelpFormatter
from pydoc import locate
from copy import copy
from typing import List, Dict, Any, Optional
from pathlib import Path
from pype.misc import package_files, get_module_method, generate_uid
from pype.utils.arguments import (
compose_batch_description,
PipelineItemArguments,
get_arg_from_string,
)
from pype.modules.snippets import PYPE_SNIPPETS_MODULES
from pype.__config__ import PYPE_PIPELINES, PYPE_QUEUES
from pype.__version__ import PIPELINES_API
from pype.exceptions import (
PipelineError,
PipelineVersionError,
PipelineItemError,
SnippetError,
SnippetNotFoundError,
)
[docs]
def get_visible_pipelines() -> List[str]:
"""Get list of available pipeline names.
Returns:
List of pipeline names, excluding those starting with __
Raises:
PipelineError: If pipeline directory cannot be read
"""
try:
visible_pipelines = []
pipelines = package_files(PYPE_PIPELINES, ".yaml")
for pipeline in sorted(pipelines):
pipe_name = Path(pipeline).stem
if not pipe_name.startswith("__"):
visible_pipelines.append(pipe_name)
return visible_pipelines
except Exception as e:
raise PipelineError(f"Failed to list pipelines: {str(e)}") from e
[docs]
def get_pipelines(subparsers, pipes: Dict[str, Any]) -> Dict[str, Any]:
"""Get all available pipelines and add them to parser."""
pipelines = package_files(PYPE_PIPELINES, ".yaml")
for pipeline in sorted(pipelines):
try:
with open(pipeline, "rb") as pipe:
pipe_dict = yaml.safe_load(pipe)
pipe_name = os.path.basename(os.path.splitext(pipeline)[0])
if subparsers:
if pipe_name.startswith("__"):
subparsers.add_parser(pipe_name, add_help=False)
else:
help_parser = pipe_dict["info"]["description"]
subparsers.add_parser(
pipe_name, help=help_parser, add_help=False
)
pipes[pipe_name] = Pipeline(pipeline, pipe_name)
except AttributeError:
pass
except Exception as e:
raise PipelineError(
f"Failed to load pipelines", context={"error": str(e)}
) from e
return pipes
def cmp(a: Any, b: Any) -> int:
return (a > b) - (a < b)
def compare_version(version1: str, version2: str) -> int:
def normalize(v: str) -> List[int]:
return [int(x) for x in re.sub(r"(\.0+)*$", "", v).split(".")]
return cmp(normalize(version1), normalize(version2))
def dump_arguments(
item_arguments: PipelineItemArguments, pipeline_item: Optional[Any] = None
) -> List[Any]:
"""Extract all arguments from pipeline items recursively.
Args:
item_arguments: Arguments container to process
pipeline_item: Optional parent pipeline item for dependency processing
Returns:
List of processed arguments
Raises:
PipelineError: If argument processing fails
"""
try:
arguments = []
for argument in item_arguments.arguments:
try:
arguments += dump_arguments(argument.arguments)
except AttributeError:
arguments.append(argument)
if pipeline_item is not None:
try:
for dep in pipeline_item.deps:
arguments += dump_arguments(dep.arguments, dep)
except AttributeError:
pass
return arguments
except Exception as e:
raise PipelineError(f"Failed to process arguments: {str(e)}") from e
[docs]
class PipelineItem:
[docs]
def __init__(self, item: Dict[str, Any]):
self.name = item["name"]
self.arguments = PipelineItemArguments()
for argument in item["arguments"]:
try:
arg_type = argument["type"]
except KeyError:
arg_type = "argv_arg"
self.arguments.add_argument(argument, arg_type)
self.type = item["type"]
self.jobs = []
self.requirements = {}
try:
self.requirements = item["requirements"]
except KeyError:
if self.type == "snippet" or self.type == "batch_snippet":
# At least print the missing snippet to
# be fixed within issue #3
try:
snippet = copy(PYPE_SNIPPETS_MODULES[self.name])
self.requirements = snippet.requirements()
except KeyError:
raise Exception("Could not find a snippet named " "%s" % self.name)
try:
self.deps = [PipelineItem(x) for x in item["dependencies"]["items"]]
except KeyError:
pass
try:
self.mute = item["mute"]
if self.mute is True:
pass
else:
self.mute = False
except KeyError:
self.mute = False
[docs]
def run(
self, argv: List[str], queue: str, profile: str, log: Any, jobs: List[Any]
) -> List[Any]:
self.jobs = []
item_run = None
try:
for dep in self.deps:
res = dep.run(argv, queue, profile, log, jobs)
if res:
self.jobs += res
except AttributeError:
pass
self.jobs += jobs
possible_types = ("snippet", "pipeline", "batch_snippet", "batch_pipeline")
if self.type in possible_types:
if self.type == "snippet":
item_run = exec_snippet(self, argv, queue, profile, log)
elif self.type == "pipeline":
item_run = exec_pipeline(self, argv, queue, profile, log)
elif self.type == "batch_snippet":
item_run = batch_exec_unit(
self, argv, queue, profile, log, exec_snippet_unit
)
elif self.type == "batch_pipeline":
item_run = batch_exec_unit(
self, argv, queue, profile, log, exec_pipeline_unit
)
if self.mute is True:
return self.jobs
return item_run
raise Exception(
"PipelineItem %s is not in the possible types %s"
% (self.type, possible_types)
)
[docs]
class Pipeline:
[docs]
def __init__(self, path: str, name: str):
self.__path__ = path
self.__name__ = name
self.__results__ = []
with open(self.__path__, "rb") as file:
pipeline = yaml.safe_load(file)
for key in pipeline:
setattr(self, key, pipeline[key])
self.pipelineitems = []
for x in self.items:
self.pipelineitems.append(PipelineItem(x))
try:
api_version = self.info["api"]
except KeyError:
raise Exception(
"Can't find the pipeline API " "version for pipeline %s" % self.__name__
)
version_diff = compare_version(api_version, PIPELINES_API)
if version_diff != 0:
raise Exception(
(
"%s pipeline API version - %s - differs from "
"the supported API version - %s -"
)
% (self.__name__, api_version, PIPELINES_API)
)
[docs]
def submit(
self,
parser: ArgumentParser,
argv: List[str],
queue: str,
profile: str,
log: Any,
jobs: Optional[List[Any]] = None,
):
if jobs is None:
jobs = []
log.log.info("Prepare argument parser for pipeline")
parse_snippets = pipeline_argparse_ui(self, parser, log)
log.log.info("Parse arguments %s" % ", ".join(argv))
args = parse_snippets.parse_args(argv)
args = vars(args)
log.log.info("Run all snippets with arguments %s" % args)
for item in self.pipelineitems:
self.__results__ += item.run(args, queue, profile, log, jobs)
def pipeline_argparse_ui(
pipeline: Pipeline, parser: ArgumentParser, log: Any
) -> ArgumentParser:
parse_snippets = parser.add_parser(
pipeline.__name__,
help=pipeline.info["description"],
add_help=False,
formatter_class=RawTextHelpFormatter,
)
log.log.info("Retrieve all arguments required in the pipeline snippets")
arguments = []
arguments_items = []
arguments_values = []
parser_batch = None
parser_opt = None
for item in pipeline.pipelineitems:
arguments_items += dump_arguments(item.arguments)
try:
for dep in item.deps:
arguments_items += dump_arguments(dep.arguments, dep)
except AttributeError:
pass
for arg in arguments_items:
if arg.value not in arguments_values:
arguments_values.append(arg.value)
arguments.append(arg)
log.log.info(
("Use unique tags %s with specified " "type to the pipeline argument parser")
% ", ".join([a.value for a in arguments])
)
parser_req = parse_snippets.add_argument_group(
title="Required", description="Required pipeline arguments"
)
try:
batch_args = len(pipeline.info["batches"].keys())
if batch_args >= 1:
parser_batch = parse_snippets.add_argument_group(
title="Batches", description=("Arguments requiring a batch file")
)
except KeyError:
pass
try:
default_args = len(pipeline.info["defaults"].keys())
if default_args >= 1:
parser_opt = parse_snippets.add_argument_group(
title="Optional", description=("Optional pipeline arguments")
)
except KeyError:
pass
for arg_obj in arguments:
arg = arg_obj.value
nargs = arg_obj.nargs
arg_str_dict = get_arg_from_string(arg)
arg = arg_str_dict["arg"]
arg_type = arg_str_dict["arg_type"]
if arg is not None:
try:
description = "%s, type: %s" % (
pipeline.info["arguments"][arg],
arg_type,
)
except KeyError:
description = "%s, type: %s" % (arg, arg_type)
try:
default_val = pipeline.info["defaults"][arg]
description = "%s. Default: %s" % (description, default_val)
except KeyError:
default_val = False
try:
batch_description = pipeline.info["batches"][arg]
description = compose_batch_description(batch_description, description)
except KeyError:
batch_description = False
if arg_obj.action in ["store_true", "store_false"]:
parser_req.add_argument(
"--%s" % arg, dest=arg, help=description, action=arg_obj.action
)
elif default_val is False:
if batch_description is False:
parser_req.add_argument(
"--%s" % arg,
dest=arg,
help=description,
type=locate(arg_type),
nargs=nargs,
action=arg_obj.action,
required=True,
)
else:
parser_batch.add_argument(
"--%s" % arg,
dest=arg,
help=description,
type=locate(arg_type),
required=True,
)
else:
parser_opt.add_argument(
"--%s" % arg,
dest=arg,
help=description,
type=locate(arg_type),
nargs=nargs,
action=arg_obj.action,
default=default_val,
)
return parse_snippets
def flat_list(S: List[Any]) -> List[Any]:
if S == []:
return S
if isinstance(S[0], list):
return flat_list(S[0]) + flat_list(S[1:])
return S[:1] + flat_list(S[1:])
def arg_dict_to_str(arg_dict: Dict[str, Any], arg_str: List[str]) -> List[str]:
for key in arg_dict:
if isinstance(arg_dict[key], list):
arg_dict[key] = " ".join(arg_dict[key])
for key in arg_dict:
if arg_dict[key] is not False:
arg_str.append(key)
if arg_dict[key] is not True:
arg_str.append(arg_dict[key])
return arg_str
def exec_snippet_unit(
item: PipelineItem, arg_dict: Dict[str, Any], queue: str, profile: str, log: Any
) -> List[Any]:
arg_str = []
results = None
snippet = copy(PYPE_SNIPPETS_MODULES[item.name])
if not isinstance(arg_dict, dict):
raise Exception("More then one set of argument %ss" % arg_dict)
arg_str = arg_dict_to_str(arg_dict, arg_str)
log.log.info("Snippet %s relevant item.arguments: %s" % (item.name, arg_dict))
friendly_name = snippet.friendly_name(arg_dict)
try:
results = snippet.results(arg_dict)
results = flat_list(list(results.values()))
completed = all(os.path.isfile(x) for x in results)
log.log.info(
"Results file(s) for snippet %s: %s" % (item.name, ", ".join(results))
)
except AttributeError:
completed = False
if completed:
log.log.info(
("Found results file(s) %s: " "skipping execution of snippet %s")
% (", ".join(results), item.name)
)
return []
log.log.info("Submit Snippet %s with queue : %s" % (item.name, queue))
queue_name = "%s_%s_%s" % (generate_uid(), friendly_name, queue)
log.add_log(queue_name)
queuelog = log.programs_logs[queue_name]
log.log.info(
("Add log information for snippets %s " "(for results %s) to folder %s")
% (item.name, ", ".join(results), queuelog.__path__)
)
queuelog.log.info("Execute snippet %s with queue %s" % (item.name, queue))
queue_exec = get_module_method(PYPE_QUEUES, queue, "submit")
if len(item.jobs) > 0:
log.log.info(
"Snippets %s on queue %s depends on jobs: %s"
% (item.name, queue, ", ".join(map(str, item.jobs)))
)
res_queue = queue_exec(
" ".join(map(str, [item.name] + arg_str)),
friendly_name,
item.requirements,
item.jobs,
queuelog,
profile,
)
log.log.info("Snippets %s returned %s" % (item.name, res_queue))
return [res_queue]
def exec_pipeline_unit(
item: PipelineItem, arg_dict: Dict[str, Any], queue: str, profile: str, log: Any
) -> List[Any]:
parser = ArgumentParser(prog="pype", description="exec_pipeline")
subparsers = parser.add_subparsers(dest="modules")
this_pipeline = get_pipelines(subparsers, {})[item.name]
arg_str = []
arg_str = []
if not isinstance(arg_dict, dict):
raise Exception("More then one set of argument %ss" % arg_dict)
arg_str = arg_dict_to_str(arg_dict, arg_str)
# may as well try to change pype.utils.arguments.Argument.to_argv
# return in case of store_true/false
log.log.info("Pipeline %s relevant item.arguments: %s" % (item.name, arg_dict))
this_pipeline.submit(subparsers, arg_str, queue, profile, log, item.jobs)
return this_pipeline.__results__
def exec_snippet(
item: PipelineItem, argv: List[str], queue: str, profile: str, log: Any
) -> List[Any]:
arg_dict = item.arguments.to_dict(argv)
results = exec_snippet_unit(item, arg_dict, queue, profile, log)
return results
def exec_pipeline(
item: PipelineItem, argv: List[str], queue: str, profile: str, log: Any
) -> List[Any]:
arg_dict = item.arguments.to_dict(argv)
results = exec_pipeline_unit(item, arg_dict, queue, profile, log)
return results
def batch_exec_unit(
item: PipelineItem,
argv: List[str],
queue: str,
profile: str,
log: Any,
exec_unit: Any,
) -> List[Any]:
results = []
arg_dict = item.arguments.to_dict(argv)
for tmp_argv in arg_dict:
results += exec_unit(item, tmp_argv, queue, profile, log)
return results