Source code for pype.utils.pipeline

"""Pipeline configuration and execution system.

This module manages the complete pipeline lifecycle:
- Loading pipeline definitions from YAML
- Validating pipeline configurations
- Resolving dependencies between components
- Processing pipeline arguments
- Executing pipeline components
- Managing job submission and tracking

Key classes:
- Pipeline: Main pipeline configuration container
- PipelineItem: Individual executable pipeline component
- PipelineConfig: Configuration validation and loading
- JobState: Pipeline execution state tracking
"""

import csv
import hashlib
import json
import os
import re
import shlex
from copy import copy
from pathlib import Path
from pydoc import locate
from typing import Any, Dict, List, Optional

import yaml

from pype.__config__ import PYPE_PIPELINES, PYPE_QUEUES
from pype.__version__ import PIPELINES_API
from pype.argparse import ArgumentParser, RawTextHelpFormatter
from pype.exceptions import (
    ArgumentError,
    PipelineError,
    PipelineItemError,
    PipelineVersionError,
    SnippetNotFoundError,
)
from pype.misc import generate_uid, get_module_method, package_files
from pype.modules.snippets import PYPE_SNIPPETS_MODULES
from pype.utils.arguments import (
    PipelineItemArguments,
    compose_batch_description,
    get_arg_from_string,
)
from pype.utils.snippets import Snippet


class PipelineRegistry:
    """Per-run registry for deduplication and manifest tracking."""

    def __init__(self):
        self.manifest: List[Dict] = []
        self.jobs_by_hash: Dict[str, Optional[str]] = {}  # hash -> run_id
        self.jobs_by_path: Dict[str, str] = {}            # path -> hash
        self.finalized_steps: set = set()                 # step_ids whose finals are present

    def write_tsv(self, path: str) -> None:
        if not self.manifest:
            return
        with open(path, "w", newline="") as f:
            writer = csv.DictWriter(
                f,
                fieldnames=["snippet_name", "step_name", "result_key", "path", "final"],
                delimiter="\t",
            )
            writer.writeheader()
            writer.writerows(self.manifest)


def _results_hash(snippet_name: str, results_dict: dict) -> str:
    payload = json.dumps(
        {"snippet": snippet_name, "results": results_dict}, sort_keys=True
    )
    return hashlib.md5(payload.encode()).hexdigest()


[docs] def get_visible_pipelines() -> List[str]: """Get list of available pipeline names. Returns: List of pipeline names, excluding those starting with __ Raises: PipelineError: If pipeline directory cannot be read """ try: visible_pipelines = [] pipelines = package_files(PYPE_PIPELINES, ".yaml") for pipeline in sorted(pipelines): pipe_name = Path(pipeline).stem if not pipe_name.startswith("__"): visible_pipelines.append(pipe_name) return visible_pipelines except Exception as e: # Catches FileNotFoundError, PermissionError, and other I/O errors # when accessing pipeline package files raise PipelineError(f"Failed to list pipelines: {str(e)}") from e
[docs] def get_pipelines(subparsers, pipes: Dict[str, Any]) -> Dict[str, Any]: """Get all available pipelines and add them to parser.""" pipelines = package_files(PYPE_PIPELINES, ".yaml") for pipeline in sorted(pipelines): try: with open(pipeline, "rb") as pipe: pipe_dict = yaml.safe_load(pipe) pipe_name = os.path.basename(os.path.splitext(pipeline)[0]) if subparsers: if pipe_name.startswith("__"): subparsers.add_parser(pipe_name, add_help=False) else: help_parser = pipe_dict["info"]["description"] subparsers.add_parser( pipe_name, help=help_parser, add_help=False ) pipes[pipe_name] = Pipeline(pipeline, pipe_name) except AttributeError: pass except Exception as e: raise PipelineError( "Failed to load pipelines", context={"error": str(e)} ) from e return pipes
def cmp(a: Any, b: Any) -> int: return (a > b) - (a < b) def compare_version(version1: str, version2: str) -> int: def normalize(v: str) -> List[int]: return [int(x) for x in re.sub(r"(\.0+)*$", "", v).split(".")] return cmp(normalize(version1), normalize(version2)) def normalize_arg_spec(arg_spec: Any) -> Any: """Normalize a single argument spec (handles composite_arg recursively).""" if ( isinstance(arg_spec, dict) and arg_spec.get("type") == "composite_arg" and "value" in arg_spec ): composite_value = arg_spec["value"].copy() if "result_arguments" in composite_value: composite_value["result_arguments"] = normalize_arguments( composite_value["result_arguments"] ) return {"type": arg_spec["type"], "value": composite_value} return arg_spec def normalize_arguments(arg_dict: Dict[str, Any]) -> List[Dict[str, Any]]: """ Convert compact arguments format into 2.0.0 API compatible list of objects. Supports recursive composite arguments. """ result = [] for prefix, value in arg_dict.items(): # Case 1: simple string/number if isinstance(value, str): result.append({"prefix": prefix, "pipeline_arg": value}) elif isinstance(value, int): result.append({"prefix": prefix, "pipeline_arg": str(value)}) elif isinstance(value, float): result.append({"prefix": prefix, "pipeline_arg": str(value)}) # Case 2: list of values (multiple same-prefix args) # Can contain strings, numbers, or composite arguments (dicts) elif isinstance(value, list): for v in value: # Handle dict items in list (composite arguments) if isinstance(v, dict): entry = {"prefix": prefix} # composite arguments if v.get("type") == "composite_arg" and isinstance( v.get("value"), dict ): normalized = normalize_arg_spec(v) entry["pipeline_arg"] = normalized["value"] elif "value" in v: entry["pipeline_arg"] = v["value"] else: raise ValueError( f"Invalid argument object in list for {prefix}: {v}" ) # copy over extra metadata (e.g., type, required) entry.update({k: val for k, val in v.items() if k not in ("value")}) result.append(entry) # Handle simple values in list (strings, numbers) else: result.append({"prefix": prefix, "pipeline_arg": v}) # Case 3: dictionary with metadata (extended form) elif isinstance(value, dict): entry = {"prefix": prefix} # composite arguments if value.get("type") == "composite_arg" and isinstance( value.get("value"), dict ): normalized = normalize_arg_spec(value) entry["pipeline_arg"] = normalized["value"] # batch_list arguments with dict format (new dict-of-arguments) elif value.get("type") == "batch_list_arg" and isinstance( value.get("value"), dict ): batch_list_value = value["value"].copy() if isinstance(batch_list_value, dict): entry["pipeline_arg"] = { k: normalize_arg_spec(v) for k, v in batch_list_value.items() } else: # Legacy list format - keep as-is entry["pipeline_arg"] = batch_list_value elif "value" in value: entry["pipeline_arg"] = value["value"] else: raise ValueError(f"Invalid argument object for {prefix}: {value}") # copy over extra metadata (e.g., type, required) entry.update({k: v for k, v in value.items() if k not in ("value")}) result.append(entry) else: raise ValueError(f"Unsupported argument format for {prefix}: {value}") return result def topological_sort(steps_dict: Dict[str, Any]) -> List[str]: """Perform topological sort on pipeline steps. Args: steps_dict: Dictionary of step_id -> step definition Returns: List of step IDs in execution order (dependencies before dependents) """ # Build adjacency list and in-degree map graph = {step_id: [] for step_id in steps_dict} in_degree = {step_id: 0 for step_id in steps_dict} for step_id, step_def in steps_dict.items(): for dep_id in step_def.get("depends_on", []): if dep_id in graph: graph[dep_id].append(step_id) in_degree[step_id] += 1 # Kahn's algorithm queue = [step_id for step_id in steps_dict if in_degree[step_id] == 0] sorted_steps = [] while queue: current = queue.pop(0) sorted_steps.append(current) for neighbor in graph[current]: in_degree[neighbor] -= 1 if in_degree[neighbor] == 0: queue.append(neighbor) if len(sorted_steps) != len(steps_dict): raise ValueError("Circular dependency detected in pipeline") return sorted_steps def build_dag_structure(steps: Dict[str, Any]) -> List[Dict[str, Any]]: """Build flat DAG structure for pipeline execution. Instead of nested dependencies, creates a flat list of steps with explicit dependency tracking. Each step appears exactly once. Args: steps: Dictionary of step_id -> step definition from YAML Returns: List of step items in topological order (ready for execution) """ step_map = {k: v for k, v in steps.items()} sorted_steps = topological_sort(step_map) items = [] for step_id in sorted_steps: step = step_map[step_id] item = { "step_id": step_id, # Track original step ID "name": step["name"], "type": step["type"], "arguments": normalize_arguments(step.get("arguments", {})), "depends_on": step.get("depends_on", []), # Store as flat list } # Copy optional fields if "requirements" in step: item["requirements"] = step["requirements"] if "mute" in step: item["mute"] = step["mute"] if "final_outputs" in step: item["final_outputs"] = step["final_outputs"] items.append(item) return items def build_nested_structure(steps): """ Convert flattened steps into the API 2.0.0 nested structure. Each step lists the steps it *depends on* as nested dependencies. TODO: API 2.0.0 (nested) and API 2.1.0 (flat DAG) currently use two separate execution paths in Pipeline.submit(). The nested path has a latent bug: steps shared by multiple dependents (diamond DAGs) are executed more than once. Consolidate both APIs onto the flat DAG execution path and remove build_nested_structure. """ step_map = {k: v for k, v in steps.items()} # Identify which steps are depended upon by others all_steps = set(step_map.keys()) depended_on = { dep for step in step_map.values() for dep in step.get("depends_on", []) } roots = list(all_steps - depended_on) # top-level (final output) steps def build_item(step_id): step = step_map[step_id] item = { "name": step["name"], "type": step["type"], "arguments": normalize_arguments(step.get("arguments", {})), } # Steps it depends on should appear as nested dependencies if step.get("depends_on"): item["dependencies"] = { "items": [build_item(dep) for dep in step["depends_on"]] } return item return {"items": [build_item(r) for r in roots]} def dump_arguments( item_arguments: PipelineItemArguments, pipeline_item: Optional[Any] = None ) -> List[Any]: """Extract all arguments from pipeline items recursively. Args: item_arguments: Arguments container to process pipeline_item: Optional parent pipeline item for dependency processing Returns: List of processed arguments Raises: PipelineError: If argument processing fails """ try: arguments = [] for argument in item_arguments.arguments: # Skip composite and batch_list arguments - their internal arguments # shouldn't be extracted to pipeline level (they may contain literals # or embedded defs, or be None for inline lists) if hasattr(argument, "type") and argument.type in ( "composite_arg", "batch_arg", ): arguments.append(argument) if hasattr(argument, "arguments") and argument.arguments is not None: arguments += dump_arguments(argument.arguments) continue try: arguments += dump_arguments(argument.arguments) except AttributeError: arguments.append(argument) if pipeline_item is not None: try: for dep in pipeline_item.deps: arguments += dump_arguments(dep.arguments, dep) except AttributeError: pass return arguments except Exception as e: # Catches KeyError, ValueError, TypeError, and other processing errors # during argument extraction and normalization raise PipelineError(f"Failed to process arguments: {str(e)}") from e
[docs] class PipelineItem:
[docs] def __init__( self, item: Dict[str, Any], pype_snippets_modules: Dict = PYPE_SNIPPETS_MODULES, ): self.name = item["name"] self.step_id = item.get("step_id") # For DAG execution tracking self.arguments = PipelineItemArguments() self.batch_id = None for argument in item["arguments"]: try: arg_type = argument["type"] except KeyError: arg_type = "argv_arg" self.arguments.add_argument(argument, arg_type) self.type = item["type"] self.jobs = [] self.requirements = {} self.depends_on = item.get("depends_on", []) self.final_outputs: List[str] = item.get("final_outputs", []) self.snippet: Optional[Snippet] = None # Flat list of dependency step IDs try: self.requirements = item["requirements"] except KeyError: if self.type == "snippet" or self.type == "batch_snippet": # At least print the missing snippet to # be fixed within issue #3 try: self.snippet = copy(pype_snippets_modules[self.name]) self.requirements = self.snippet.requirements() except KeyError: raise SnippetNotFoundError(self.name) # Support deprecated nested dependencies (for backwards compatibility) self.deps = [] try: self.deps = [PipelineItem(x) for x in item["dependencies"]["items"]] except KeyError: pass try: self.mute = item["mute"] if not self.mute: self.mute = False except KeyError: self.mute = False
[docs] def generate_batch_id( self, ): self.batch_id = generate_uid(10)[-10:]
[docs] def run( self, argv: List[str], queue: str, profile: str, log: Any, jobs: List[Any], progress: Optional[Any] = None, registry: Optional[PipelineRegistry] = None, ) -> Optional[List[Any]]: """Run pipeline item (without recursively running dependencies). In DAG execution model, dependencies are handled by Pipeline.submit(), so this method just executes the current step. For backwards compatibility, still supports deprecated nested deps. Args: argv: Command line arguments queue: Queue system to use profile: Profile name log: Logger instance jobs: List of submitted jobs (job IDs of dependencies) progress: Optional Progress tracker for resumable execution Returns: List of job results """ self.jobs: List = [] item_run: Optional[list[Any]] = None # Support deprecated nested dependencies (for backwards compatibility) try: for dep in self.deps: res = dep.run(argv, queue, profile, log, jobs, progress) if res: self.jobs += res except AttributeError: pass # jobs parameter contains dependency job IDs from DAG execution self.jobs += jobs possible_types = ("snippet", "pipeline", "batch_snippet", "batch_pipeline") if self.type in possible_types: if self.type == "snippet": item_run = exec_snippet(self, argv, queue, profile, log, progress, registry=registry) elif self.type == "pipeline": item_run = exec_pipeline(self, argv, queue, profile, log, progress, registry=registry) elif self.type == "batch_snippet": item_run = batch_exec_unit( self, argv, queue, profile, log, exec_snippet_unit, progress, registry=registry ) elif self.type == "batch_pipeline": self.batch_id = None item_run = batch_exec_unit( self, argv, queue, profile, log, exec_pipeline_unit, progress, registry=registry ) if self.mute: return self.jobs return item_run raise PipelineItemError( f"Item type '{self.type}' is not in the possible types {possible_types}", self.name, self.type, )
[docs] class Pipeline:
[docs] def __init__( self, path: str, name: str, pype_snippets_module: Dict = PYPE_SNIPPETS_MODULES ): self.__path__ = path self.__name__ = name self.__results__ = [] with open(self.__path__, "rb") as file: pipeline = yaml.safe_load(file) for key in pipeline: setattr(self, key, pipeline[key]) try: api_version = self.info["api"] except KeyError: raise PipelineVersionError("unknown", PIPELINES_API[0], self.__name__) version_diffs = map( compare_version, PIPELINES_API, [api_version for i in range(len(PIPELINES_API))], ) if all([version_diff != 0 for version_diff in version_diffs]): raise PipelineVersionError(api_version, PIPELINES_API[0], self.__name__) self.pipelineitems = [] self.step_map = {} # Map of step_id -> PipelineItem for DAG execution # Handle both API 2.1.0 (steps with depends_on) and API 2.0.0 (items with nested dependencies) if hasattr(self, "steps"): # API 2.1.0: Use DAG structure for proper execution ordering self.items = build_dag_structure(self.steps) elif hasattr(self, "items"): # API 2.0.0: items are already in the correct nested structure, keep as-is pass # self.items is already set from YAML loading else: raise PipelineError( "Pipeline must have either 'steps' (API 2.1.0) or 'items' (API 2.0.0)", self.__name__, ) for x in self.items: item = PipelineItem(x, pype_snippets_module) self.pipelineitems.append(item) # Track by step_id for DAG dependency resolution if hasattr(item, "step_id") and item.step_id: self.step_map[item.step_id] = item
def _prune_upstream_jobs( self, finalized_step_id: str, step_to_downstream: Dict[str, List[str]], step_results: Dict[str, Any], registry: PipelineRegistry, runtime_file: str, log: Any, ) -> None: """Cancel pending YAML jobs upstream of a finalized step. Walks backward through the pipeline items. A step is prunable when: - ALL its downstream steps are already prunable, AND EITHER - it has no final_outputs (it only existed to feed downstream), OR - all its declared final outputs are present on disk (already done). Steps whose finals are missing are never pruned — they still need to run. Only jobs with status 'pending' in the YAML are cancelled; submitted or running jobs are left alone. """ from pype.utils.runtime import JobStatus, Runtime as _Runtime # Build finals status per step from the registry manifest step_finals_status: Dict[str, Dict] = {} for row in registry.manifest: step_name = row.get("step_name", "") if row.get("final", "").lower() == "true": entry = step_finals_status.setdefault( step_name, {"has_finals": True, "all_present": True} ) if not os.path.isfile(row.get("path", "")): entry["all_present"] = False # Collect all steps reachable backward from finalized_step_id step_to_upstream = { item.step_id: item.depends_on for item in self.pipelineitems if item.step_id } reachable: set = set() queue_bfs = list(step_to_upstream.get(finalized_step_id, [])) while queue_bfs: sid = queue_bfs.pop(0) if sid in reachable: continue reachable.add(sid) queue_bfs.extend(step_to_upstream.get(sid, [])) if not reachable: return # Single reverse-topological pass (pipelineitems is deps-first; reversed = deps-last) prunable: set = {finalized_step_id} for item in reversed(self.pipelineitems): sid = item.step_id if not sid or sid not in reachable or sid in prunable: continue downstream = step_to_downstream.get(sid, []) if not all(d in prunable for d in downstream): continue finals = step_finals_status.get(sid) if finals: if finals["all_present"]: prunable.add(sid) # else: finals missing — step must run, stop propagation else: prunable.add(sid) # no finals: only purpose was feeding downstream prunable.discard(finalized_step_id) if not prunable: return rt = _Runtime(runtime_file) for sid in prunable: run_ids = step_results.get(sid) if not run_ids or not isinstance(run_ids, list): continue for run_id in run_ids: if not isinstance(run_id, str): continue log.log.info( "Pruning step '%s' (%s): not needed, downstream finals present" % (sid, run_id[:8]) ) rt.update( run_id, JobStatus.CANCELLED.value, error_msg="Pruned: downstream finals already present", )
[docs] def submit( self, parser: ArgumentParser, argv: List[str], queue: str, profile: str, log: Any, progress: Optional[Any] = None, jobs: Optional[List[Any]] = None, registry: Optional[PipelineRegistry] = None, ): """Submit pipeline for execution using DAG-based dependency ordering. Args: parser: Argument parser argv: Command line arguments queue: Queue system to use profile: Profile name log: Logger instance progress: Optional Progress tracker for resumable execution jobs: Optional list of jobs registry: Optional PipelineRegistry from parent pipeline (sub-pipeline calls only) """ if jobs is None: jobs = [] log.log.info("Prepare argument parser for pipeline") parse_snippets = pipeline_argparse_ui(self, parser, log) log.log.info("Parse arguments %s" % ", ".join(str(a) for a in argv)) args = parse_snippets.parse_args(argv) args = vars(args) log.log.info("Run all snippets with arguments %s" % args) # Root call creates the registry; sub-pipeline calls receive it from parent is_root = registry is None if is_root: registry = PipelineRegistry() # DAG-based execution: execute items in order with proper dependency tracking step_results = {} # Map of step_id -> job_ids # Pre-compute which step_ids are depended on by others; only leaf steps emit results depended_step_ids = {dep for item in self.pipelineitems for dep in getattr(item, "depends_on", [])} # Map step_id -> downstream step_ids (used for upstream pruning) step_to_downstream: Dict[str, List[str]] = {} for _item in self.pipelineitems: for _dep in getattr(_item, "depends_on", []): step_to_downstream.setdefault(_dep, []).append(_item.step_id) runtime_file = os.path.join(log.__path__, "pipeline_runtime.yaml") for item in self.pipelineitems: # Collect job IDs from dependencies dep_jobs = [] if hasattr(item, "depends_on") and item.depends_on: for dep_step_id in item.depends_on: if dep_step_id in step_results: # dep_results could be a list of job IDs dep_job_ids = step_results[dep_step_id] if isinstance(dep_job_ids, list): dep_jobs.extend(dep_job_ids) else: dep_jobs.append(dep_job_ids) else: # If this item has no internal dependencies (root item in this pipeline), # inherit external dependencies from parent pipeline (if nested) if jobs: if isinstance(jobs, list): dep_jobs.extend(jobs) else: dep_jobs.append(jobs) # Execute step with dependency job IDs results = item.run(args, queue, profile, log, dep_jobs, progress, registry=registry) # Track results for this step. if hasattr(item, "step_id") and item.step_id: if item.step_id in registry.finalized_steps: # Finals present: prune upstream pending jobs and cut dep chain so # downstream steps don't wait for work that no longer needs doing. self._prune_upstream_jobs( item.step_id, step_to_downstream, step_results, registry, runtime_file, log, ) step_results[item.step_id] = [] else: # Normal: propagate dep_jobs when step was skipped (files exist) step_results[item.step_id] = results if results else dep_jobs # Only accumulate results for leaf steps (not depended on by any other step) if not item.step_id or item.step_id not in depended_step_ids: self.__results__ += results if results else [] if is_root: manifest_path = os.path.join(log.__path__, "pipeline_outputs.tsv") registry.write_tsv(manifest_path) log.log.info("Pipeline manifest written to %s" % manifest_path)
def pipeline_argparse_ui( pipeline: Pipeline, parser: ArgumentParser, log: Any ) -> ArgumentParser: parse_snippets = parser.add_parser( pipeline.__name__, help=pipeline.info["description"], add_help=False, formatter_class=RawTextHelpFormatter, ) log.log.info("Retrieve all arguments required in the pipeline snippets") arguments = [] arguments_items = [] arguments_values = [] parser_batch = None parser_opt = None for item in pipeline.pipelineitems: arguments_items += dump_arguments(item.arguments) try: for dep in item.deps: arguments_items += dump_arguments(dep.arguments, dep) except AttributeError: pass for arg in arguments_items: # Skip composite arguments (they have value=None and shouldn't be counted as pipeline args) if hasattr(arg, "type") and arg.type == "composite_arg": continue if arg.value not in arguments_values: arguments_values.append(arg.value) arguments.append(arg) log.log.info( ("Use unique tags %s with specified type to the pipeline argument parser") % ", ".join([str(a.value) for a in arguments]) ) parser_req = parse_snippets.add_argument_group( title="Required", description="Required pipeline arguments" ) try: batch_args = len(pipeline.info["batches"].keys()) if batch_args >= 1: parser_batch = parse_snippets.add_argument_group( title="Batches", description=("Arguments requiring a batch file") ) except KeyError: pass try: default_args = len(pipeline.info["defaults"].keys()) if default_args >= 1: parser_opt = parse_snippets.add_argument_group( title="Optional", description=("Optional pipeline arguments") ) except KeyError: pass for arg_obj in arguments: arg = arg_obj.value if not isinstance(arg, str): continue # DEPRECATED: arg_obj.nargs from step-level definitions # This is kept for backward compatibility but will be phased out. # Use pipeline.info["nargs"][arg] instead (see below). nargs_deprecated = arg_obj.nargs arg_str_dict = get_arg_from_string(arg) arg = arg_str_dict["arg"] arg_type = arg_str_dict["arg_type"] if arg is not None: try: description = "%s, type: %s" % ( pipeline.info["arguments"][arg], arg_type, ) except KeyError: description = "%s, type: %s" % (arg, arg_type) try: default_val = pipeline.info["defaults"][arg] description = "%s. Default: %s" % (description, default_val) except KeyError: default_val = False try: batch_description = pipeline.info["batches"][arg] description = compose_batch_description(batch_description, description) except KeyError: batch_description = False # Get nargs from normalized info section (preferred) or fall back to deprecated step-level try: nargs = pipeline.info["nargs"][arg] except KeyError: # Fall back to deprecated step-level nargs for backward compatibility nargs = nargs_deprecated # Get choices from normalized info section try: choices = pipeline.info["choices"][arg] description = "%s. Choices: %s" % ( description, ", ".join(map(str, choices)), ) except KeyError: choices = None if arg_obj.action in ["store_true", "store_false"]: parser_req.add_argument( "--%s" % arg, dest=arg, help=description, action=arg_obj.action ) elif not default_val: if not batch_description: parser_req.add_argument( "--%s" % arg, dest=arg, help=description, type=locate(arg_type), nargs=nargs, choices=choices, action=arg_obj.action, required=True, ) else: parser_batch.add_argument( "--%s" % arg, dest=arg, help=description, type=locate(arg_type), required=True, ) else: parser_opt.add_argument( "--%s" % arg, dest=arg, help=description, type=locate(arg_type), nargs=nargs, choices=choices, action=arg_obj.action, default=default_val, ) return parse_snippets def flat_list(S: List[Any]) -> List[Any]: """Flatten a nested list structure into a single-level list. Recursively flattens lists that may contain nested lists, returning a single flat list containing all non-list elements. Args: S: A list that may contain nested lists and other elements. Returns: A flattened list with all nested lists expanded into a single level. Example: >>> flat_list([[1, 2], 3, [4, [5, 6]]]) [1, 2, 3, 4, 5, 6] """ if S == []: return S if isinstance(S[0], list): return flat_list(S[0]) + flat_list(S[1:]) return S[:1] + flat_list(S[1:]) def arg_dict_to_str(arg_dict: Dict[str, Any], arg_str: List[str]) -> List[str]: """Convert an argument dictionary to command-line argument list. Transforms a dictionary of arguments into a flat list suitable for shell command execution. Handles various argument types including booleans, strings, numbers, and lists. Boolean values are handled specially: - False values are skipped entirely - True values are added as flags (key only, no value) - Other values are added as key-value pairs Args: arg_dict: Dictionary where keys are argument names (e.g., '-o', '--param') and values can be strings, numbers, booleans, or lists. arg_str: Existing argument list to append to (typically starts as []). Returns: The modified arg_str list with all arguments appended. Example: >>> arg_dict = {'-o': 'output.txt', '-v': True, '--skip': False} >>> arg_dict_to_str(arg_dict, []) ['-o', 'output.txt', '-v'] """ # FIXME: This eems to breaks volume bindings by creating a quoted string with multiple # arguments, they are not recognized as different object to bind separately. # however commenting this out doesn't seems to fix it without undesired side effect. # for key in arg_dict: # if isinstance(arg_dict[key], list): # # Only join if all items are strings/numbers (not dicts/composite args) # if all(isinstance(item, (str, int, float)) for item in arg_dict[key]): # arg_dict[key] = " ".join(str(item) for item in arg_dict[key]) # # Otherwise keep as list (for composite arguments or other complex types) for key in arg_dict: if arg_dict[key] is not False: arg_str.append(key) if arg_dict[key] is not True: # Handle both string values and lists (for repeated args) if isinstance(arg_dict[key], list): arg_str.extend(str(v) for v in flat_list(arg_dict[key])) else: arg_str.append(str(arg_dict[key])) return arg_str def exec_snippet_unit( item: PipelineItem, arg_dict: Dict[str, Any], queue: str, profile: str, log: Any, progress: Optional[Any] = None, registry: Optional[PipelineRegistry] = None, ) -> List[Any]: """Execute a single snippet with two-layer skip strategy. Two-layer skip strategy: 1. Layer 1 (Progress): Check if snippet completed in previous run 2. Layer 2 (Files): Check if result files exist Args: item: Pipeline item to execute arg_dict: Argument dictionary queue: Queue system profile: Profile name log: Logger instance progress: Optional Progress tracker Returns: List of job results """ arg_str = [] results = None if not isinstance(arg_dict, dict): raise ArgumentError( f"Argument must be a dictionary, got {type(arg_dict).__name__}", argument="arg_dict", ) arg_str = arg_dict_to_str(arg_dict, arg_str) log.log.info("Snippet %s relevant item.arguments: %s" % (item.name, arg_dict)) snippet = copy(item.snippet) friendly_name = snippet.friendly_name(arg_dict) # Generate unique job ID for this snippet execution # job_id = "%s_%s" % (item.name, generate_uid(10)[-10:]) results_dict: Dict[str, Any] = {} try: results_dict = snippet.results(arg_dict) results = flat_list(list(results_dict.values())) log.log.info( "Results file(s) for snippet %s: %s" % (item.name, ", ".join(results)) ) except AttributeError: results = [] # Deduplication and manifest registration if registry is not None and results_dict: h = _results_hash(item.name, results_dict) step_name = item.step_id or item.name # Flatten list-valued results (e.g. scattered interval lists) to individual paths flat_results = [ (key, str(p)) for key, value in results_dict.items() for p in (flat_list(value) if isinstance(value, list) else [value]) ] if h in registry.jobs_by_hash: # Identical snippet+outputs already scheduled: reuse the existing job. # Still record manifest rows so every step appears in the TSV. existing_run_id = registry.jobs_by_hash[h] for key, path in flat_results: registry.manifest.append({ "snippet_name": item.name, "step_name": step_name, "result_key": key, "path": path, "final": str(key in item.final_outputs).lower(), }) log.log.info( "Dedup: snippet %s outputs already scheduled, reusing job %s" % (item.name, existing_run_id) ) return [existing_run_id] if existing_run_id else [] # Conflict check: different snippet claiming same path for key, path in flat_results: existing_h = registry.jobs_by_path.get(path) if existing_h and existing_h != h: raise PipelineError( f"Output path conflict: {item.name} result '{key}' -> {path} " f"already claimed by a different snippet" ) # Register paths and add manifest rows registry.jobs_by_hash[h] = None # placeholder; updated after submission for key, path in flat_results: registry.jobs_by_path[path] = h registry.manifest.append({ "snippet_name": item.name, "step_name": step_name, "result_key": key, "path": path, "final": str(key in item.final_outputs).lower(), }) # NOTE: Progress tracking is deprecated and no longer supported # Job resumption and state tracking is now handled by queue modules via pipeline_runtime.yaml # The progress parameter is kept for backwards compatibility but not used # LAYER 2a: Check if all declared final outputs exist. # When finals are present, the step is treated as permanently complete even if non-final # (temporary) outputs were cleaned up. Records step_id in registry.finalized_steps so # Pipeline.submit() can prune upstream pending jobs from the YAML. if registry is not None and item.step_id and item.final_outputs and results_dict: final_paths_flat = [ str(p) for k in item.final_outputs if k in results_dict for p in (flat_list(results_dict[k]) if isinstance(results_dict[k], list) else [results_dict[k]]) ] if final_paths_flat and all(os.path.isfile(p) for p in final_paths_flat): registry.finalized_steps.add(item.step_id) log.log.info( "Final outputs present for snippet %s: treating as complete, will prune upstream" % item.name ) return [] # LAYER 2b: Check if all result files exist (standard skip) files_exist = all(os.path.isfile(x) for x in results) if results else False if files_exist: log.log.info( ("Found results file(s) %s: skipping execution of snippet %s") % (", ".join(results), item.name) ) return [] # Neither layer says skip - execute the snippet log.log.info("Submit Snippet %s with queue : %s" % (item.name, queue)) queue_name = "%s_%s_%s" % (generate_uid(), friendly_name, queue) log.add_log(queue_name) queuelog = log.programs_logs[queue_name] log.log.info( ("Add log information for snippets %s (for results %s) to folder %s") % (item.name, ", ".join(results), queuelog.__path__) ) queuelog.log.info("Execute snippet %s with queue %s" % (item.name, queue)) queue_exec = get_module_method(PYPE_QUEUES, queue, "register") if item.jobs: log.log.info( "Snippets %s on queue %s depends on jobs: %s" % (item.name, queue, ", ".join(map(str, item.jobs))) ) try: res_queue = queue_exec( item.name + " " + " ".join(shlex.quote(str(arg)) for arg in arg_str), friendly_name, item.requirements, item.jobs, queuelog, profile, batch_id=item.batch_id, ) except TypeError as e: if "batch_id" in str(e): # Old queue module without batch_id support log.log.warning( f"Queue module doesn't support batch_id, retrying without it" ) res_queue = queue_exec( item.name + " " + " ".join(shlex.quote(str(arg)) for arg in arg_str), friendly_name, item.requirements, item.jobs, queuelog, profile, ) else: raise log.log.info("Snippets %s returned %s" % (item.name, res_queue)) # Update hash placeholder with the actual run_id if registry is not None and results_dict: h = _results_hash(item.name, results_dict) registry.jobs_by_hash[h] = res_queue return [res_queue] def exec_pipeline_unit( item: PipelineItem, arg_dict: Dict[str, Any], queue: str, profile: str, log: Any, progress: Optional[Any] = None, registry: Optional[PipelineRegistry] = None, ) -> List[Any]: """Execute sub-pipeline unit. Args: item: Pipeline item arg_dict: Argument dictionary queue: Queue system profile: Profile name log: Logger progress: Optional Progress tracker registry: Optional PipelineRegistry shared with parent pipeline Returns: List of pipeline results """ parser = ArgumentParser(prog="pype", description="exec_pipeline") subparsers = parser.add_subparsers(dest="modules") this_pipeline = get_pipelines(subparsers, {})[item.name] arg_str = [] if not isinstance(arg_dict, dict): raise ArgumentError( f"Argument must be a dictionary, got {type(arg_dict).__name__}", argument="arg_dict", ) arg_str = arg_dict_to_str(arg_dict, arg_str) # may as well try to change pype.utils.arguments.Argument.to_argv # return in case of store_true/false log.log.info("Pipeline %s relevant item.arguments: %s" % (item.name, arg_dict)) this_pipeline.submit(subparsers, arg_str, queue, profile, log, progress, item.jobs, registry=registry) return this_pipeline.__results__ def exec_snippet( item: PipelineItem, argv: List[str], queue: str, profile: str, log: Any, progress: Optional[Any] = None, registry: Optional[PipelineRegistry] = None, ) -> List[Any]: """Execute snippet with Progress tracking. Args: item: Pipeline item argv: Arguments queue: Queue system profile: Profile name log: Logger progress: Optional Progress tracker registry: Optional PipelineRegistry for dedup and manifest tracking Returns: List of job results """ arg_dict = item.arguments.to_dict(argv) results = exec_snippet_unit(item, arg_dict, queue, profile, log, progress, registry=registry) return results def exec_pipeline( item: PipelineItem, argv: List[str], queue: str, profile: str, log: Any, progress: Optional[Any] = None, registry: Optional[PipelineRegistry] = None, ) -> List[Any]: """Execute sub-pipeline with Progress tracking. Args: item: Pipeline item argv: Arguments queue: Queue system profile: Profile name log: Logger progress: Optional Progress tracker registry: Optional PipelineRegistry shared with parent pipeline Returns: List of pipeline results """ arg_dict = item.arguments.to_dict(argv) results = exec_pipeline_unit(item, arg_dict, queue, profile, log, progress, registry=registry) return results def batch_exec_unit( item: PipelineItem, argv: List[str], queue: str, profile: str, log: Any, exec_unit: Any, progress: Optional[Any] = None, registry: Optional[PipelineRegistry] = None, ) -> List[Any]: """Execute batch unit. Args: item: Pipeline item argv: Arguments queue: Queue system profile: Profile name log: Logger exec_unit: Execution function progress: Optional Progress tracker registry: Optional PipelineRegistry for dedup and manifest tracking Returns: List of results """ results = [] arg_dict = item.arguments.to_dict(argv) item.generate_batch_id() for tmp_argv in arg_dict: results += exec_unit(item, tmp_argv, queue, profile, log, progress, registry=registry) return results