Source code for pype.process

import os
import re
import shlex
import subprocess
import tempfile
from copy import copy
from dataclasses import dataclass
from glob import glob
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

from pype.__config__ import (
    PYPE_CONDA,
    PYPE_DOCKER,
    PYPE_SINGULARITY_CACHE,
    PYPE_TMP,
)
from pype.exceptions import CommandError, CommandNamespaceError, EnvModulesError
from pype.misc import generate_uid
from pype.utils.containers import get_singularity_image_path, parse_container_reference
from pype.utils.overlay import OverlayVolume, _resolve_backend


[docs] @dataclass class DockerConfig: """Configuration for Docker and container runtime environments. This class manages container runtime configuration and provides properties to identify the runtime type (Docker, Singularity, uDocker). Attributes: exec_path: Path to the container runtime executable runtime: Name of the container runtime ('docker', 'singularity', 'udocker') cache_dir: Optional cache directory, required for Singularity containers """ exec_path: Path runtime: str cache_dir: Optional[Path] = None @property def is_singularity(self) -> bool: """Check if runtime is Singularity.""" return self.runtime == "singularity" @property def is_udocker(self) -> bool: """Check if runtime is uDocker.""" return self.runtime == "udocker"
[docs] def get_module_cmd(): """Get the environment modules command interface.""" try: modules_home = os.environ.get("MODULESHOME") if not modules_home: raise EnvModulesError("No MODULESHOME variable found in the environment") modules_path = Path(modules_home) / "init/python.py" if not modules_path.exists(): raise EnvModulesError(f"No python script {modules_path}") modules = {} exec(modules_path.read_bytes(), modules) return modules["module"] except KeyError as e: raise EnvModulesError(f"Module command not found: {e}")
[docs] def program_string(program_dict: Dict[str, str]) -> str: """Format program string from dictionary.""" try: return f"{program_dict['path']}/{program_dict['version']}" except KeyError: return f"{program_dict['namespace']}/{program_dict['version']}"
[docs] @dataclass class Volume: """Volume binding configuration for containerized environments. Handles the mounting of files and directories between host and container environments, managing read-only and read-write access modes. Attributes: path: Path to the file or directory on the host system output: If True, mount as read-write; if False, mount as read-only bind_prefix: Base path in the container where volumes will be mounted """ path: str output: bool = False bind_prefix: str = "/var/lib/pype" def __post_init__(self): """Initialize volume configuration after instance creation. Sets up volume string templates and generates unique binding paths to avoid conflicts between multiple volumes. """ self.path = str(Path(self.path)) self.volume_str = "--volume=%(host_file)s:%(docker_file)s:%(mode)s" self.mode = "rw" if self.output else "ro" # For outputs, bind the parent directory, but handle top-level directories specially if self.output: parent = os.path.dirname(self.path) # If parent is root (/), bind the directory itself instead if parent in ["/", ""]: self.to_bind = self.path else: self.to_bind = parent else: self.to_bind = self.path # Generate random subfolder for binding random_str = generate_uid(12, False) bind_base = Path(self.bind_prefix) / random_str / Path(self.to_bind).name self.bind_volume = str(bind_base) # When to_bind == path (root-level dir special case), the dir itself is # the mount point so bind_file == bind_volume; otherwise the file/dir # sits one level inside the bound parent directory. if self.output and self.to_bind != self.path: self.bind_file = str(bind_base / Path(self.path).name) else: self.bind_file = self.bind_volume
[docs] def remove_mode(self) -> None: """Remove mode specifier for udocker compatibility.""" self.mode = "" self.volume_str = "--volume=%(host_file)s:%(docker_file)s"
[docs] def singularity_volume(self) -> None: """Format for Singularity binding syntax.""" self.volume_str = "--bind %(host_file)s:%(docker_file)s:%(mode)s"
[docs] def replace_bind_volume(self, bind_path): """ Replaces the bind volume in the container environment with the specified `bind path`. This is useful to manage binding point to multiple paths (defined in multiple `Volume` classes) that are subfolders of another bind volume in the host system. :param bind_path: Binding point to replace instead of the current one randomly generated by the class. :type bind_path: str """ bind_file_i = os.path.join(bind_path, os.path.basename(self.bind_file)) self.bind_file = bind_file_i if self.mode == "rw": self.bind_volume = os.path.dirname(bind_file_i) else: self.bind_volume = bind_file_i
[docs] def replace_bind_dirname(self, bind_path): """ Replaces the bind volume in the container environment with the `dirname` of the specified `bind path`. This is useful to give the same binding point to multiple paths (defined in multiple `Volume` classes) that are in the same folder in the host system. :param bind_path: Binding point to replace instead of the current one randomly generated by the class. :type bind_path: str """ bind_file_i = os.path.join( os.path.dirname(bind_path), os.path.basename(self.bind_file) ) self.bind_file = bind_file_i if self.mode == "rw": self.bind_volume = os.path.dirname(bind_file_i) else: self.bind_volume = bind_file_i
[docs] def to_str(self) -> str: """Get formatted volume string.""" return self.volume_str % { "host_file": self.to_bind, "docker_file": self.bind_volume, "mode": self.mode, }
[docs] class Command: """High-level interface for executing commands with container support. This class provides a unified interface for running commands locally or within containers (Docker, Singularity, uDocker). It handles: - Command execution and piping - Volume mounting and file access - Container runtime configuration - Environment modules integration - Input/output file tracking Attributes: cmd: Command to execute as list of arguments name: Identifier for the command (used in logs) profile: Configuration profile for the environment log: Logger instance for command execution tracking docker: Container runtime configuration inputs: List of input files required by the command outputs: List of output files produced by the command volumes: List of volumes to mount in container environments """
[docs] def __init__(self, cmd: str, log: Any, profile: Any, name: str = ""): """Initialize command execution environment. Args: cmd: Command string to execute log: Logger instance for command output profile: Environment profile configuration name: Optional identifier for the command """ self.cmd = shlex.split(cmd) self.name = name self.profile = profile self.backend = _resolve_backend(log) self.log = log.log # Initialize container configuration self.docker = DockerConfig( exec_path=Path(PYPE_DOCKER), runtime=Path(PYPE_DOCKER).name, cache_dir=( Path(PYPE_SINGULARITY_CACHE) if Path(PYPE_DOCKER).name == "singularity" else None ), ) # Process configuration self.stdin = subprocess.PIPE self.stdout = subprocess.PIPE self.procin: Optional[Command] = None # File tracking self.inputs: List[str] = [] self.outputs: List[str] = [] self._output_harvest: Dict[str, bool] = {} self.volumes: List[Volume] = [] self.overlay_volumes: List[OverlayVolume] = [] # Runtime configuration self.uid = os.geteuid self.gid = os.getegid self.random_work_dir = Path("/") / generate_uid(6, False) self.tmp = Path(PYPE_TMP) if not name: self.log.warning("Proceeding with unnamed Command")
[docs] def add_output(self, out_file: str, harvest: bool = True) -> None: """Add output file to track.""" if out_file not in self.outputs: self.outputs.append(out_file) self._output_harvest[out_file] = harvest
[docs] def add_input(self, in_file: str, match: str = "exact") -> None: """Add input file(s) to track.""" if match == "recursive": for file_in in glob(f"{in_file}*"): self.add_input(file_in, "exact") elif in_file not in self.inputs: self.inputs.append(in_file)
[docs] def add_volume(self, path: str, output: bool = False) -> None: """Add volume to track.""" n_volumes = len(self.volumes) volume = Volume(path, output=output) for existing in self.volumes: if existing.path == volume.path: if volume.mode == "rw" and existing.mode == "ro": existing.mode = "rw" return # If an overlay volume covers this output's parent dir, redirect the # docker host-side path to scratch so container writes land there. if output and self.overlay_volumes: ov = next( (ov for ov in self.overlay_volumes if ov.original_dir == volume.to_bind), None, ) if ov is not None: random_str = generate_uid(12, False) bind_base = ( Path(volume.bind_prefix) / random_str / Path(ov.scratch_dir).name ) volume.to_bind = ov.scratch_dir volume.bind_volume = str(bind_base) volume.bind_file = ( str(bind_base / Path(path).name) if ov.original_dir != str(Path(path)) else str(bind_base) ) new_to_bind = volume.to_bind # effective mount root (parent dir for outputs) for i in range(n_volumes): path_i = self.volumes[i].to_bind mode_i = self.volumes[i].mode has_same_mode = mode_i == volume.mode if is_direct_child_of(path_i, new_to_bind) and has_same_mode: self.volumes[i].replace_bind_volume(volume.bind_volume) elif is_direct_child_of(path, path_i) and has_same_mode: volume.replace_bind_volume(self.volumes[i].bind_volume) elif has_same_basedir(path, path_i) and has_same_mode: volume.replace_bind_dirname(self.volumes[i].bind_volume) self.volumes.append(volume)
[docs] def normalize_volumes(self) -> list: """Normalize volumes to avoid duplicates for bind mounts. Returns: List of unique volumes for bind mounts (deduplicated). Note: self.volumes is kept intact for path substitution. """ _unique_volumes_list_ = [] _to_bind_ = [] _bind_volumes_ = [] for volume in self.volumes: # Keep volume only if BOTH source path AND container path are unique if ( volume.bind_volume not in _bind_volumes_ and volume.to_bind not in _to_bind_ ): _unique_volumes_list_.append(volume) _to_bind_.append(volume.to_bind) _bind_volumes_.append(volume.bind_volume) return _unique_volumes_list_
[docs] def add_namespace(self, program: "ProfileProgram") -> None: self.namespace = Namespace(program, self.log, self.profile)
def _spawn(self, capture_stderr: bool = False) -> None: """Bare Popen on self.cmd — no overlay logic, no logging.""" stderr_pipe = subprocess.PIPE if capture_stderr else None self.proc = subprocess.Popen( self.cmd, stdin=self.stdin, stdout=self.stdout, stderr=stderr_pipe ) self.stdout = self.proc.stdout
[docs] def run(self, local_script: bool = False, capture_stderr: bool = False) -> None: """Execute the command. Args: local_script: If True, execute as local script capture_stderr: If True, capture stderr output (default False to maintain current behavior) """ self.backend.prepare_inputs(self.inputs, self.overlay_volumes) if hasattr(self, "namespace") and self.namespace.name: self.backend.restore_program(self.profile.__name__, self.namespace.name) self.backend.prepare_outputs(self.outputs, self._output_harvest, self.overlay_volumes) if hasattr(self, "namespace") and self.namespace.type == "docker": self.docker_run(local_script) elif hasattr(self, "namespace") and self.namespace.type == "env_module": self.module_run(local_script=local_script, capture_stderr=capture_stderr) elif hasattr(self, "namespace") and self.namespace.type == "conda": self.conda_run(local_script=local_script, capture_stderr=capture_stderr) else: self.local_run(local_script=local_script, capture_stderr=capture_stderr)
[docs] def docker_run(self, local_script: bool) -> None: """Execute command within a container environment. Handles the complexities of running commands in containers including: - Volume mounting and path translation - User permissions - Working directory setup - Container-specific command modifications Args: local_script: If True, indicates the command is a local script that needs to be mounted in the container """ docker_cwd = tempfile.mkdtemp(dir=self.tmp) docker_data = { "user": self.uid(), "group": self.gid(), "random_dir": self.random_work_dir, "docker": self.docker.exec_path, "cwd": docker_cwd, } cmd = copy(self.cmd) # Handle container command based on runtime if self.docker.is_udocker: docker_cmd = ( f"{docker_data['docker']} --quiet run -i --rm " f"{self.namespace.docker_extra_args} " f"--user={docker_data['user']}:{docker_data['group']} " f"--workdir=/var/spool/pype " f"--volume={docker_data['cwd']}:/var/spool/pype" ) elif self.docker.is_singularity: docker_cmd = ( f"{docker_data['docker']} --quiet exec " f"{self.namespace.docker_extra_args} --contain " f"--pid --ipc --pwd {docker_data['random_dir']} " f"--home {docker_data['cwd']}:{docker_data['random_dir']}" ) else: docker_cmd = ( f"{docker_data['docker']} run -i --rm " f"--entrypoint \"\" " f"{self.namespace.docker_extra_args} " f"--user={docker_data['user']}:{docker_data['group']} " f"--workdir=/var/spool/pype " f"--volume={docker_data['cwd']}:/var/spool/pype:rw" ) # Handle volumes for in_file in self.inputs: self.add_volume(in_file) for out_file in self.outputs: self.add_volume(out_file, output=True) # Always add PYPE_TMP as it's equivalent to TMPDIR # a common pattern in various tools self.add_volume(str(self.tmp), output=True) volumes_files = [] if local_script: exec_file = self.cmd[-1] self.add_volume(exec_file) self.replace_values_in_code(exec_file) # First, do path substitution using ALL volumes (before deduplication) for volume in self.volumes: # Update command arguments if they match volume paths for i in range(len(cmd)): if self.cmd[i] == volume.path or self.cmd[i] == volume.to_bind: cmd[i] = volume.bind_file # Then get deduplicated volumes for bind mounts unique_volumes = self.normalize_volumes() for volume in unique_volumes: # Adjust volume string based on container runtime if self.docker.is_udocker: volume.remove_mode() elif self.docker.is_singularity: volume.singularity_volume() volumes_files.append(volume.to_str()) try: docker_image = self.namespace.last() except AttributeError: self.log.error("A namespace is required before executing a Docker command") raise Exception("A namespace is required before executing a Docker command") # Construct final docker command volumes_files = " ".join(volumes_files) docker_cmd = f"{docker_cmd} {volumes_files}" docker_cmd = f"{docker_cmd} {docker_image}" docker_cmd = shlex.split(docker_cmd) docker_cmd.extend(cmd) self.log.info(f"Prepare Docker command {' '.join(docker_cmd)}") self.log.info(f"Replace {' '.join(self.cmd)} with Docker command") self.cmd = docker_cmd self._spawn()
[docs] def local_run(self, local_script: bool = False, capture_stderr: bool = False) -> None: """Execute command locally. Args: local_script: If True, the last element of self.cmd is a script file whose content is rewritten for overlay path substitution. capture_stderr: If True, capture stderr output (default False to maintain current behavior) """ if self.overlay_volumes: if local_script: self.replace_values_in_code(self.cmd[-1]) for ov in self.overlay_volumes: for i in range(len(self.cmd)): if self.cmd[i] == ov.path: self.cmd[i] = ov.bind_file elif self.cmd[i] == ov.original_dir: self.cmd[i] = ov.scratch_dir self.log.info(f"Prepare {self.name} command line") self.log.info(" ".join(map(str, self.cmd))) self.log.info(f"Execute {self.name} with python subprocess.Popen") self._spawn(capture_stderr)
[docs] def module_load(self) -> None: """ load env_modules dependencies """ if len(self.namespace.list) > 0: module = get_module_cmd() self.log.info( f"Purge the module env before loading {self.namespace.namespace}" ) module("purge") for modulepath in self.namespace.modulepaths: self.log.info(f"Use modulepath {modulepath}") module("use", modulepath) for nm in self.namespace.list: self.log.info(f"Add env_module {nm}") module("add", nm)
[docs] def module_run(self, local_script: bool = False, capture_stderr: bool = False) -> None: """Execute command with environment modules. Args: local_script: Passed through to local_run for overlay script rewriting. capture_stderr: If True, capture stderr output """ self.module_load() self.local_run(local_script=local_script, capture_stderr=capture_stderr)
[docs] def conda_run(self, local_script: bool = False, capture_stderr: bool = False) -> None: """Execute command in conda environment. Supports both name-based and path-based conda environments: - If program.path is set: uses -p <path>/<name> - Otherwise: uses -n <name> Args: local_script: Passed through to local_run for overlay script rewriting. capture_stderr: If True, capture stderr output """ # Determine if path-based or name-based environment # Access the program object to check for path field if hasattr(self, "namespace") and hasattr(self.namespace, "program"): program = self.namespace.program env_path = getattr(program, "path", None) else: env_path = None env_name = self.namespace.str if env_path: # Path-based: conda run -p <path>/<name> full_path = os.path.join(env_path, env_name) conda_cmd = f"{PYPE_CONDA} run -p {full_path}" self.log.info(f"Running command in conda environment at path: {full_path}") else: # Name-based: conda run -n <name> conda_cmd = f"{PYPE_CONDA} run -n {env_name}" self.log.info(f"Running command in conda environment: {env_name}") self.cmd = shlex.split(conda_cmd) + self.cmd self.module_load() self.local_run(local_script=local_script, capture_stderr=capture_stderr)
[docs] def pipe_in(self, command: "Command", local_script: bool = False) -> None: """Pipe input from another command.""" self.log.info(f"Pipe in {command.name} in {self.name} command") self.procin = command self.procin.run(local_script) self.stdin = self.procin.stdout
[docs] def release_stdout(self) -> None: """Release stdout to terminal (for final commands in execution). Use this for terminal commands where output should be visible to the user. """ self.stdout = None
[docs] def capture_stdout(self) -> None: """Capture stdout for piping (for intermediate commands). Use this to ensure stdout is captured in subprocess.PIPE for piping to another command. """ self.stdout = subprocess.PIPE
[docs] def child_close(self) -> None: """Close child process.""" if self.procin.stdout is not None: self.procin.stdout.close() self.log.info(f"Close {self.procin.name} stdout stream")
[docs] def close(self, ignore_returncode: bool = False) -> Tuple[Any, int]: """Close command and return result with exit code. Args: ignore_returncode: If False (default), raise exception on non-zero exit. If True, ignore exit code (legacy behavior). Returns: Tuple of (communicate_result, returncode) Raises: CommandError: If returncode != 0 and ignore_returncode is False """ if self.procin is not None: self.child_close() res = self.proc.communicate() returncode = self.proc.returncode self.log.info(f"Terminate {self.name} (exit code: {returncode})") try: if self.overlay_volumes and returncode == 0: seen_scratch: set = set() for ov in self.overlay_volumes: if ov.harvest and ov.scratch_dir not in seen_scratch and Path(ov.scratch_dir).exists(): self.log.info(f"Harvest overlay {ov.scratch_dir}{ov.original_dir}") self.backend.harvest(Path(ov.scratch_dir), Path(ov.original_dir)) seen_scratch.add(ov.scratch_dir) finally: try: self.backend.cleanup() except Exception as e: self.log.warning(f"Storage backend cleanup failed: {e}") if not ignore_returncode and returncode != 0: self.log.error(f"{self.name} failed with exit code {returncode}") raise CommandError( f"Command '{self.name}' failed with exit code {returncode}", command=" ".join(map(str, self.cmd)), exit_code=returncode, ) return res, returncode
[docs] def replace_values_in_code(self, code_file: str) -> None: """Replace path mappings in a script file. Uses a single-pass regex alternation so substituted text is never re-matched (prevents double-substitution when a bind_file contains another overlay path as a prefix). Docker volumes take precedence over overlay volumes when the same path appears in both. """ sub_map: Dict[str, str] = {} for volume in self.volumes: if volume.path not in sub_map: sub_map[volume.path] = volume.bind_file for ov in self.overlay_volumes: if ov.path not in sub_map: sub_map[ov.path] = ov.bind_file if not sub_map: return sorted_paths = sorted(sub_map, key=len, reverse=True) pattern = re.compile("|".join(re.escape(p) for p in sorted_paths)) code_lines = "" with open(code_file, "rt") as code: for line in code: code_lines += pattern.sub(lambda m: sub_map[m.group(0)], line) with open(code_file, "wt") as code: code.write(code_lines) os.chmod(code_file, 0o760)
[docs] def is_direct_child_of(x: str, y: str) -> bool: """Check if x is a direct child of directory y.""" path_a = Path(x).resolve() path_b = Path(y).resolve() return path_a.parent == path_b
[docs] def has_same_basedir(x: str, y: str) -> bool: """Check if x and y have the same base directory.""" path_a = Path(x).resolve().parent path_b = Path(y).resolve().parent return path_a == path_b
[docs] class Namespace: """Environment and dependency management system. Manages different execution environments (path-based, environment modules, containers) and their dependencies. Supports: - Path-based program execution - Environment modules loading - Container image specification - Dependency resolution The namespace format is 'type@item' where: - type: One of 'path', 'env_module', or 'docker' - item: The specific program/container/module to use For example: - 'docker@ubuntu:latest' - 'env_module@gcc/9.3.0' - 'path@/usr/local/bin/python' """
[docs] def __init__( self, program: "ProfileProgram", log: Any, profile: Any, ): """Initialize namespace configuration from a ProfileProgram. Args: program: ProfileProgram object (always — dict support removed) log: Logger instance profile: Environment profile containing program configurations Attributes set: name: Profile dict key — unique identifier for this program type: Namespace type ('docker', 'env_module', 'conda', or 'path') namespace: Original namespace string (e.g., 'docker@sequenza/sequenza') str: Parsed namespace item (e.g., 'sequenza/sequenza') version: Program version string list: List of resolved namespace items docker_extra_args: Extra arguments for container runtimes Raises: CommandNamespaceError: For invalid namespace format or unsupported types """ self.type: Optional[str] = None self.list: List[str] = [] self.modulepaths: List[str] = [] self.docker_extra_args = "" self.version = None self.namespace = None self.str = None self.name: str = program.program_name # profile dict key — unique per profile self.log = log self.program = program namespace_str = program.namespace version_str = program.version deps_list = program.dependencies or [] self.namespace = namespace_str self.version = version_str namespace_list = namespace_str.split("@") if len(namespace_list) == 2: self.type = namespace_list[0] self.log.info(f"Set namespace to {self.type}") namespace_item = namespace_list[1] elif len(namespace_list) == 1: self.type = "path" self.log.info(f"Set namespace to {self.type}") namespace_item = namespace_list[0] else: self.log.error("Wrong Namespace format") raise CommandNamespaceError("Wrong Namespace format") self.str = namespace_item supported_namespaces = ("path", "env_module", "docker", "conda") if self.type not in supported_namespaces: self.log.error(f"Not supported namespace names {self.type}") raise CommandNamespaceError(f"Not supported namespace {self.type}") for dep in deps_list: dep_nm = Namespace(profile.programs[dep], self.log, profile) if dep_nm.type != "env_module": self.log.error("All dependencies must be type env_module") raise CommandNamespaceError( "All dependencies must be type path or env_module" ) self.list += dep_nm.list self.modulepaths += dep_nm.modulepaths if self.type == "env_module": self.list.append( program_string({"namespace": namespace_item, "version": version_str}) ) if program.modulepath: self.modulepaths.append(program.modulepath) elif self.type == "docker": docker_runtime = Path(PYPE_DOCKER).name if program.extra_args: self.docker_extra_args = program.extra_args if docker_runtime == "singularity": nm = get_singularity_image_path( namespace_item, program.version, PYPE_SINGULARITY_CACHE, ) else: registry, image_path = parse_container_reference(namespace_item) nm = f"{registry}/{image_path}:{program.version}" self.list.append(str(nm)) elif self.type == "path": self.list.append(namespace_item)
[docs] def first(self) -> str: return self.list[0]
[docs] def last(self) -> str: return self.list[-1]