Source code for pype.process

import os
import subprocess
import shlex
import tempfile
from pathlib import Path
from typing import List, Optional, Any, Dict
from dataclasses import dataclass
from copy import copy
from glob import glob
from pype.__config__ import PYPE_TMP, PYPE_DOCKER, PYPE_SINGULARITY_CACHE
from pype.exceptions import EnvModulesError, CommandNamespaceError
from pype.misc import generate_uid


[docs] @dataclass class DockerConfig: """Configuration for Docker and container runtime environments. This class manages container runtime configuration and provides properties to identify the runtime type (Docker, Singularity, uDocker). Attributes: exec_path: Path to the container runtime executable runtime: Name of the container runtime ('docker', 'singularity', 'udocker') cache_dir: Optional cache directory, required for Singularity containers """ exec_path: Path runtime: str cache_dir: Optional[Path] = None @property def is_singularity(self) -> bool: """Check if runtime is Singularity.""" return self.runtime == "singularity" @property def is_udocker(self) -> bool: """Check if runtime is uDocker.""" return self.runtime == "udocker"
[docs] def get_module_cmd(): """Get the environment modules command interface.""" try: modules_home = os.environ.get("MODULESHOME") if not modules_home: raise EnvModulesError("No MODULESHOME variable found in the environment") modules_path = Path(modules_home) / "init/python.py" if not modules_path.exists(): raise EnvModulesError(f"No python script {modules_path}") modules = {} exec(modules_path.read_bytes(), modules) return modules["module"] except KeyError as e: raise EnvModulesError(f"Module command not found: {e}")
[docs] def module_avail(module: Optional[str] = None, modulepath: Optional[str] = None): modulepath_cmd = f"module use {modulepath}" if modulepath else "" command = "module avail" if module: command = f"{command} {module}" if modulepath is not None: command = f"{modulepath_cmd} && {command}" call_proc = subprocess.Popen(command, shell=True, stderr=subprocess.PIPE) while True: try: yield next(call_proc.stderr).decode("utf-8") except StopIteration: break
[docs] def program_string(program_dict: Dict[str, str]) -> str: """Format program string from dictionary.""" try: return f"{program_dict['path']}/{program_dict['version']}" except KeyError: return f"{program_dict['namespace']}/{program_dict['version']}"
[docs] @dataclass class Volume: """Volume binding configuration for containerized environments. Handles the mounting of files and directories between host and container environments, managing read-only and read-write access modes. Attributes: path: Path to the file or directory on the host system output: If True, mount as read-write; if False, mount as read-only bind_prefix: Base path in the container where volumes will be mounted """ path: str output: bool = False bind_prefix: str = "/var/lib/pype" def __post_init__(self): """Initialize volume configuration after instance creation. Sets up volume string templates and generates unique binding paths to avoid conflicts between multiple volumes. """ self.volume_str = "--volume=%(host_file)s:%(docker_file)s:%(mode)s" self.mode = "rw" if self.output else "ro" self.to_bind = os.path.dirname(self.path) if self.output else self.path # Generate random subfolder for binding random_str = generate_uid(12, False) bind_base = Path(self.bind_prefix) / random_str / Path(self.to_bind).name self.bind_volume = str(bind_base) self.bind_file = ( str(bind_base / Path(self.path).name) if self.output else self.bind_volume )
[docs] def remove_mode(self) -> None: """Remove mode specifier for udocker compatibility.""" self.mode = "" self.volume_str = "--volume=%(host_file)s:%(docker_file)s"
[docs] def singularity_volume(self) -> None: """Format for Singularity binding syntax.""" self.volume_str = "--bind %(host_file)s:%(docker_file)s:%(mode)s"
[docs] def to_str(self) -> str: """Get formatted volume string.""" return self.volume_str % { "host_file": self.to_bind, "docker_file": self.bind_volume, "mode": self.mode, }
[docs] class Command: """High-level interface for executing commands with container support. This class provides a unified interface for running commands locally or within containers (Docker, Singularity, uDocker). It handles: - Command execution and piping - Volume mounting and file access - Container runtime configuration - Environment modules integration - Input/output file tracking Attributes: cmd: Command to execute as list of arguments name: Identifier for the command (used in logs) profile: Configuration profile for the environment log: Logger instance for command execution tracking docker: Container runtime configuration inputs: List of input files required by the command outputs: List of output files produced by the command volumes: List of volumes to mount in container environments """
[docs] def __init__(self, cmd: str, log: Any, profile: Any, name: str = ""): """Initialize command execution environment. Args: cmd: Command string to execute log: Logger instance for command output profile: Environment profile configuration name: Optional identifier for the command """ self.cmd = shlex.split(cmd) self.name = name self.profile = profile self.log = log.log # Initialize container configuration self.docker = DockerConfig( exec_path=Path(PYPE_DOCKER), runtime=Path(PYPE_DOCKER).name, cache_dir=( Path(PYPE_SINGULARITY_CACHE) if Path(PYPE_DOCKER).name == "singularity" else None ), ) # Process configuration self.stdin = subprocess.PIPE self.stdout = subprocess.PIPE self.procin: Optional[Command] = None # File tracking self.inputs: List[str] = [] self.outputs: List[str] = [] self.volumes: List[Volume] = [] # Runtime configuration self.uid = os.geteuid self.gid = os.getegid self.random_work_dir = Path("/") / generate_uid(6, False) self.tmp = Path(PYPE_TMP) if not name: self.log.warning("Proceeding with unnamed Command")
[docs] def add_output(self, out_file: str) -> None: """Add output file to track.""" if out_file not in self.outputs: self.outputs.append(out_file) self.add_volume(out_file, output=True)
[docs] def add_input(self, in_file: str, match: str = "exact") -> None: """Add input file(s) to track.""" if match == "recursive": for file_in in glob(f"{in_file}*"): self.add_input(file_in, "exact") elif in_file not in self.inputs: self.inputs.append(in_file) self.add_volume(in_file)
[docs] def add_volume(self, path: str, output: bool = False) -> None: """Add volume to track.""" n_volumes = len(self.volumes) volume = Volume(path, output=output) for i in range(n_volumes): path_i = self.volumes[i].to_bind mode_i = self.volumes[i].mode has_same_mode = mode_i == volume.mode if is_in_directory(path_i, path) and has_same_mode: self.volumes[i].replace_bind_volume(volume.bind_volume) elif is_in_directory(path, path_i) and has_same_mode: volume.replace_bind_volume(self.volumes[i].bind_volume) elif has_same_basedir(path, path_i) and has_same_mode: volume.replace_bind_dirname(self.volumes[i].bind_volume) self.volumes.append(volume)
[docs] def normalize_volumes(self) -> None: """Normalize volumes to avoid duplicates.""" _unique_volumes_list_ = [] _to_bind_ = [] _bind_volumes_ = [] for volume in self.volumes: if ( volume.bind_volume not in _bind_volumes_ and volume.to_bind not in _to_bind_ ): _unique_volumes_list_.append(volume) _to_bind_.append(volume.to_bind) _bind_volumes_.append(volume.bind_volume) self.volumes = _unique_volumes_list_
[docs] def add_namespace(self, namespace: Dict[str, Any]) -> None: """Add namespace to command.""" self.namespace = Namespace(namespace, self.log, self.profile)
[docs] def run(self, local_script: bool = False) -> None: """Execute the command.""" try: if hasattr(self, "namespace") and self.namespace.type == "docker": self.docker_run(local_script) elif hasattr(self, "namespace") and self.namespace.type == "env_module": self.module_run() else: self.local_run() except Exception as e: self.log.error(f"Command execution failed: {e}") raise
[docs] def docker_run(self, local_script: bool) -> None: """Execute command within a container environment. Handles the complexities of running commands in containers including: - Volume mounting and path translation - User permissions - Working directory setup - Container-specific command modifications Args: local_script: If True, indicates the command is a local script that needs to be mounted in the container """ docker_cwd = tempfile.mkdtemp(dir=self.tmp) docker_data = { "user": self.uid(), "group": self.gid(), "random_dir": self.random_work_dir, "docker": self.docker.exec_path, "cwd": docker_cwd, } cmd = copy(self.cmd) # Handle container command based on runtime if self.docker.is_udocker: docker_cmd = ( f"{docker_data['docker']} --quiet run -i --rm " f"--user={docker_data['user']}:{docker_data['group']} " f"--workdir=/var/spool/pype " f"--volume={docker_data['cwd']}:/var/spool/pype" ) elif self.docker.is_singularity: docker_cmd = ( f"{docker_data['docker']} --quiet exec " f"--contain --pid --ipc " f"--pwd {docker_data['random_dir']} " f"--home {docker_data['cwd']}:{docker_data['random_dir']}" ) else: docker_cmd = ( f"{docker_data['docker']} run -i --rm " f"--user={docker_data['user']}:{docker_data['group']} " f"--workdir=/var/spool/pype " f"--volume={docker_data['cwd']}:/var/spool/pype:rw" ) # Handle volumes for in_file in self.inputs: self.add_volume(in_file) for out_file in self.outputs: self.add_volume(out_file, output=True) volumes_files = [] if local_script: exec_file = self.cmd[0] self.add_volume(exec_file) self.replace_values_in_code(exec_file) self.normalize_volumes() for volume in self.volumes: # Update command arguments if they match volume paths for i in range(len(cmd)): if volume.path == self.cmd[i]: cmd[i] = volume.bind_file # Adjust volume string based on container runtime if self.docker.is_udocker: volume.remove_mode() elif self.docker.is_singularity: volume.singularity_volume() volumes_files.append(volume.to_str()) try: docker_image = self.namespace.first() except AttributeError: self.log.error("A namespace is required before executing a Docker command") raise Exception("A namespace is required before executing a Docker command") # Construct final docker command volumes_files = " ".join(volumes_files) docker_cmd = f"{docker_cmd} {volumes_files}" docker_cmd = f"{docker_cmd} {docker_image}" docker_cmd = shlex.split(docker_cmd) docker_cmd.extend(cmd) self.log.info(f'Prepare Docker command {" ".join(docker_cmd)}') self.log.info(f'Replace {" ".join(self.cmd)} with Docker command') self.cmd = docker_cmd
[docs] def local_run(self) -> None: """Execute command locally.""" self.log.info(f"Prepare {self.name} command line") self.log.info(" ".join(map(str, self.cmd))) self.log.info(f"Execute {self.name} with python subprocess.Popen") self.proc = subprocess.Popen(self.cmd, stdin=self.stdin, stdout=self.stdout) self.stdout = self.proc.stdout
[docs] def module_run(self) -> None: """Execute command with environment modules.""" module = get_module_cmd() for nm in self.namespace.list: self.log.info(f"Add env_module {nm}") module("add", nm)
[docs] def pipe_in(self, command: "Command", local_script: bool = False) -> None: """Pipe input from another command.""" self.log.info(f"Pipe in {command.name} in {self.name} command") self.procin = command self.procin.run(local_script) self.stdin = self.procin.stdout
[docs] def child_close(self) -> None: """Close child process.""" try: self.procin.stdout.close() self.log.info(f"Close {self.procin.name} stdout stream") except AttributeError: pass
[docs] def close(self) -> Any: """Close command and return result.""" if self.procin is not None: self.child_close() res = self.proc.communicate() self.log.info(f"Terminate {self.name}") return res
[docs] def replace_values_in_code(self, code_file: str) -> None: """Replace values in code file with bind volumes.""" code_lines = "" with open(code_file, "rt") as code: for line in code: for volume in self.volumes: line = line.replace(volume.path, volume.bind_file) code_lines += line with open(code_file, "wt") as code: code.write(code_lines) os.chmod(code_file, 0o760)
[docs] def is_in_directory(x: str, y: str) -> bool: """Check if x is in directory y.""" try: path_b = os.path.realpath(y) path_a = os.path.realpath(x) # Check if path_a is inside path_b relative = os.path.relpath(path_a, path_b) return not relative.startswith("..") except ValueError: return False
[docs] def has_same_basedir(x: str, y: str) -> bool: """Check if x and y have the same base directory.""" path_a = Path(x).resolve().parent path_b = Path(y).resolve().parent return path_a == path_b
[docs] class Namespace: """Environment and dependency management system. Manages different execution environments (path-based, environment modules, containers) and their dependencies. Supports: - Path-based program execution - Environment modules loading - Container image specification - Dependency resolution The namespace format is 'type@item' where: - type: One of 'path', 'env_module', or 'docker' - item: The specific program/container/module to use For example: - 'docker@ubuntu:latest' - 'env_module@gcc/9.3.0' - 'path@/usr/local/bin/python' """
[docs] def __init__(self, program_dict: Dict[str, Any], log: Any, profile: Any): """Initialize namespace configuration. Args: program_dict: Dictionary containing program configuration: - namespace: String in format 'type@item' - version: Program version - dependencies: Optional list of dependent programs log: Logger instance for namespace operations profile: Environment profile containing program configurations Raises: CommandNamespaceError: For invalid namespace format or unsupported types """ self.type: Optional[str] = None self.list: List[str] = [] self.log = log namespace_list = program_dict["namespace"].split("@") if len(namespace_list) == 2: self.type = namespace_list[0] self.log.info(f"Set namespace to {self.type}") namespace_item = namespace_list[1] elif len(namespace_list) == 1: self.type = "path" self.log.info(f"Set namespace to {self.type}") namespace_item = namespace_list[0] else: self.log.error("Wrong Namespace format") raise CommandNamespaceError("Wrong Namespace format") supported_namespaces = ("path", "env_module", "docker") if self.type not in supported_namespaces: self.log.error(f"Not supported namespace names {self.type}") raise CommandNamespaceError(f"Not supported namespace {self.type}") if self.type == "env_module": try: deps_list = program_dict["dependencies"] except KeyError: deps_list = [] for dep in deps_list: dep_dict = profile.programs[dep] dep_nm = Namespace(dep_dict, self.log, profile) if dep_nm.type != "env_module": self.log.error("All dependencies must be type env_module") raise CommandNamespaceError( "All dependencies must be type env_module" ) self.list += dep_nm.list self.list.append( program_string( {"namespace": namespace_item, "version": program_dict["version"]} ) ) elif self.type == "docker": docker_cmd = PYPE_DOCKER docker_runtime = Path(docker_cmd).name if docker_runtime == "singularity": singularity_cache = PYPE_SINGULARITY_CACHE sif_file = f'{namespace_item}_{program_dict["version"]}.sif' nm = Path(singularity_cache) / sif_file else: nm = f'{namespace_item}:{program_dict["version"]}' self.list.append(str(nm)) elif self.type == "path": self.list.append(namespace_item)
[docs] def first(self) -> str: return self.list[0]